diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index 1702e58c..7fb54b85 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -6,6 +6,7 @@ import ( "encoding/json" "os" "path/filepath" + "sort" "strconv" "github.com/cockroachdb/errors" @@ -54,10 +55,39 @@ var ( // ErrSQSEncodeNotRegular is returned when a dump file is not a regular // file (symlink / FIFO / device / directory). ErrSQSEncodeNotRegular = errors.New("backup: sqs dump file is not a regular file") - // ErrSQSEncodeUnsupportedPartitioned is returned for HT-FIFO queues - // (partition_count > 1), whose partitioned key family this slice does - // not yet reproduce. - ErrSQSEncodeUnsupportedPartitioned = errors.New("backup: sqs partitioned (HT-FIFO) queue not yet supported by encoder") + // ErrSQSEncodeMissingPartition fires when a partitioned queue + // (meta.PartitionCount > 1) has a message whose Partition field is + // nil. Pre-M5-3 dumps lack the field entirely; replaying one against + // a partitioned queue would silently route every message to + // partition 0 via the `partition != nil` dispatch in addMessage, + // while the live readers scan the partitioned keyspace selected + // from raw PartitionCount > 1 — i.e. classic-shape keys against a + // partitioned reader = invisible on first read. Pinned by + // TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue. + ErrSQSEncodeMissingPartition = errors.New("backup: sqs partitioned queue message missing partition field") + // ErrSQSEncodeOutOfRangePartition fires when a partitioned-queue + // message carries *Partition >= meta.PartitionCount. The dump is + // malformed (operator hand-edit or a future M5-3 decoder bug). + ErrSQSEncodeOutOfRangePartition = errors.New("backup: sqs message partition out of range for queue partition_count") + // ErrSQSEncodePartitionRoutingMismatch fires when a partitioned + // FIFO queue with FifoThroughputLimit == "perQueue" has a message + // with *Partition != 0. The live partitionFor + // (adapter/sqs_partitioning.go:71-72) forces every group to + // partition 0 in perQueue mode regardless of PartitionCount, and + // ReceiveMessage only scans the partition-0 lane. Accepting any + // other partition value would restore messages onto |p|N|... lanes + // the live receive fan-out never visits — silent data loss on + // first read. Codex P2 v914 v4 caught this gap. + ErrSQSEncodePartitionRoutingMismatch = errors.New("backup: sqs perQueue HT-FIFO queue must keep all messages on partition 0") + // ErrSQSEncodePartitionHashMismatch fires for a perMessageGroupId + // HT-FIFO message whose Partition disagrees with + // partitionFor(MessageGroupID). The live send/receive/group-lock + // path uses partitionFor's FNV-1a-mod-PartitionCount mapping + // (adapter/sqs_partitioning.go:64); restoring under the wrong + // lane would split a FIFO group across two partition-scoped + // group-lock keyspaces and break FIFO order on first read. Codex + // P2 #929. + ErrSQSEncodePartitionHashMismatch = errors.New("backup: sqs perMessageGroupId HT-FIFO message partition disagrees with partitionFor(message_group_id)") ) // sqsStoredQueueMeta mirrors the live adapter's sqsQueueMeta JSON shape @@ -159,10 +189,6 @@ func (e *SQSRecordEncoder) encodeQueue(b *snapshotBuilder, root *os.Root, queueD if meta.Name == "" { return errors.Wrapf(ErrSQSEncodeInvalidQueue, "%s/_queue.json: empty queue name", queueDir) } - if meta.PartitionCount > 1 { - return errors.Wrapf(ErrSQSEncodeUnsupportedPartitioned, - "%s: partition_count %d", queueDir, meta.PartitionCount) - } if err := e.addQueueMeta(b, meta); err != nil { return err } @@ -205,18 +231,28 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root if err != nil { return err } - var maxSeq uint64 - for i := range records { - seq, err := e.addMessage(b, meta.Name, records[i]) - if err != nil { - return err - } - if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil { - return err - } - if seq > maxSeq { - maxSeq = seq - } + // Fail-closed validation up-front so a malformed dump never stages + // partial records. Uses raw meta.PartitionCount > 1 as the + // partitioned-queue predicate (NOT effectivePartitionCount; codex + // P2 v914 v7 - using the effective count would allow a perQueue + // dump with Partition == nil to slip past the missing-partition + // gate, then addMessage's `partition != nil` dispatch would emit + // classic-shape keys against a partitioned-keyspace queue, making + // every restored message invisible). + if err := validatePartitioning(&meta, records); err != nil { + return err + } + // Per-partition deterministic emission for partitioned queues. The + // snapshotBuilder sorts by key bytes on WriteTo so this does not + // directly affect the .fsm byte output, but it makes the in-loop + // state (maxSeq, future per-partition counters) deterministic and + // matches the design doc's stated contract. + if meta.PartitionCount > 1 { + sortMessagesForPartitionedEmit(records) + } + maxSeq, err := e.stageMessageRecords(b, &meta, records) + if err != nil { + return err } if meta.FIFO && maxSeq > 0 { // The live FIFO send path (adapter/sqs_fifo.go: loadFifoSequence @@ -235,10 +271,262 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root return nil } +// stageMessageRecords iterates the (pre-validated, pre-sorted) records +// and stages each one's data + side records on b, returning the +// maximum SequenceNumber observed (used to write the FIFO seq counter). +// Split out of encodeQueueMessages so the parent stays under cyclop. +// +// isPartitioned is derived once from raw meta.PartitionCount > 1 — NOT +// from rec.Partition != nil. Codex P1 / gemini critical (PR #929): +// a classic queue dump (PartitionCount<=1) may carry an explicit +// rec.Partition = &0 that validatePartitioningOne lets through (gate 4 +// only rejects *Partition != 0). Routing on `partition != nil` would +// emit partitioned keys for that classic queue, and the live reader +// only scans the classic keyspace - the messages become invisible on +// first read. Keying the dispatch on isPartitioned (a property of the +// QUEUE, not the message) closes that gap. +func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { + var maxSeq uint64 + isPartitioned := meta.PartitionCount > 1 + // Pre-pass: pick the LATEST send per dedup-key tuple so only one + // dedup row is emitted per (group, dedupID) [classic] or + // (partition, group, dedupID) [partitioned]. When the live FIFO + // dedup window expires, a duplicate (group, dedupID) re-send + // overwrites the prior row; the dump may carry BOTH messages but + // the live keyspace only ever held one dedup row at a time. Naive + // per-message emission would collide on snapshotBuilder.Add and + // abort the restore. Codex P2 #929. + dedupWinner := pickDedupWinners(meta, records, isPartitioned) + for i := range records { + var partition uint32 + if isPartitioned && records[i].Partition != nil { + partition = *records[i].Partition + } + seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i]) + if err != nil { + return 0, err + } + if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, dedupWinner[i], meta, &records[i]); err != nil { + return 0, err + } + if seq > maxSeq { + maxSeq = seq + } + } + return maxSeq, nil +} + +// pickDedupWinners returns a boolean slice indexed by records' position; +// true means "this message is the LATEST send for its dedup-key tuple +// and therefore owns the dedup row". Caller passes the boolean into +// addSideRecords so non-winners skip dedup emission. +// +// The winner-key tuple MUST match the live dedup-key shape exactly, +// otherwise two messages that would collide on snapshotBuilder.Add +// can both be marked as winners: +// - Partitioned: live key is sqsPartitionedMsgDedupKey(queue, +// partition, gen, groupID, dedupID), so the tuple is +// (partition, group, dedupID). +// - Classic: live key is sqsMsgDedupKey(queue, gen, dedupID) — NO +// group / partition in the key — so the tuple collapses to +// ("", "", dedupID). Codex P2 #929: keeping MessageGroupID in the +// classic winner key would let two retained-but-expired duplicates +// in DIFFERENT groups but with the same dedupID both win, then +// both emit the same sqsMsgDedupKeyBytes(queue, dedupID) and +// collide. +// +// For non-FIFO queues or messages with empty resolvedDedupID the +// returned slice entry is false — dedup emission was already skipped +// by addSideRecords for those cases. +func pickDedupWinners(meta *sqsQueueMetaPublic, records []sqsMessageRecord, isPartitioned bool) []bool { + winners := make([]bool, len(records)) + if !meta.FIFO { + return winners + } + type dedupKey struct { + partition uint32 + group string + dedup string + } + latest := make(map[dedupKey]int, len(records)) + for i := range records { + d := resolveDedupID(&records[i], meta) + if d == "" { + continue + } + k := dedupKey{dedup: d} + if isPartitioned { + // Partitioned dedup key includes partition + group + + // dedupID; group is load-bearing because the partitioned + // constructor splits group / dedup segments with a '|'. + k.group = records[i].MessageGroupID + if records[i].Partition != nil { + k.partition = *records[i].Partition + } + } + // Classic case leaves k.partition=0 and k.group="" so two + // retained-but-expired duplicates in different groups with + // the same dedupID collapse to the same winner key — + // matching the classic sqsMsgDedupKey shape. + prev, seen := latest[k] + if !seen || records[i].SendTimestampMillis > records[prev].SendTimestampMillis { + latest[k] = i + } + } + for _, idx := range latest { + winners[idx] = true + } + return winners +} + +// validatePartitioning runs the four fail-closed gates from the M5-3 +// design doc §"Validation invariants" before any message is staged. +// All four use raw meta.PartitionCount > 1 as the partitioned-queue +// predicate, never effectivePartitionCount (codex P2 v914 v7). +func validatePartitioning(meta *sqsQueueMetaPublic, records []sqsMessageRecord) error { + for i := range records { + if err := validatePartitioningOne(meta, &records[i]); err != nil { + return err + } + } + return nil +} + +// validatePartitioningOne checks one message against the four gates. +// Split out of validatePartitioning so the outer loop stays under the +// cyclop limit. +func validatePartitioningOne(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { + if meta.PartitionCount > 1 { + return validatePartitioningPartitioned(meta, rec) + } + // Classic queue (PartitionCount <= 1): the only invalid state is + // a non-zero Partition field — the dump is internally + // inconsistent (classic-queue keyspace has no partition concept). + if rec.Partition != nil && *rec.Partition != 0 { + return errors.Wrapf(ErrSQSEncodeInvalidMessage, + "queue %q (partition_count=%d): classic queue message %q has partition=%d", + meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) + } + return nil +} + +// validatePartitioningPartitioned runs the three partitioned-queue +// gates (missing partition, out-of-range, perQueue routing mismatch). +// Caller has already verified meta.PartitionCount > 1. +func validatePartitioningPartitioned(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { + // Gate 1: missing partition. Pre-M5-3 dump replayed against a + // partitioned queue, or M5-3 decoder bug. The operator must + // re-decode with an M5-3 decoder. + if rec.Partition == nil { + return errors.Wrapf(ErrSQSEncodeMissingPartition, + "queue %q (partition_count=%d): message %q missing partition field", + meta.Name, meta.PartitionCount, rec.MessageID) + } + // Gate 2: out-of-range partition. Dump is malformed. + if *rec.Partition >= meta.PartitionCount { + return errors.Wrapf(ErrSQSEncodeOutOfRangePartition, + "queue %q (partition_count=%d): message %q partition=%d out of range", + meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) + } + // Gate 3: perQueue HT-FIFO with nonzero partition. The live + // partitionFor collapses every group to partition 0 in perQueue + // mode; accepting any other partition would write to a lane the + // live receive never scans. + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + if *rec.Partition != 0 { + return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch, + "queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0", + meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition) + } + return nil + } + // Gate 5: perMessageGroupId HT-FIFO partition-hash consistency. + // For perMessageGroupId queues, the live partitionFor maps each + // group_id to one partition via FNV-1a hash; the receivers / + // group-lock reader use that partition value to find messages. + // A dump line with an in-range but inconsistent partition would + // be restored under the wrong lane and split a FIFO group across + // two partition-scoped group-lock keyspaces, allowing concurrent + // out-of-order delivery. Mirror the hash here. Codex P2 #929. + want := partitionForGroup(meta, rec.MessageGroupID) + if *rec.Partition != want { + return errors.Wrapf(ErrSQSEncodePartitionHashMismatch, + "queue %q (partition_count=%d, group_id=%q): message %q partition=%d, partitionFor=%d", + meta.Name, meta.PartitionCount, rec.MessageGroupID, rec.MessageID, *rec.Partition, want) + } + return nil +} + +// partitionForGroup mirrors adapter/sqs_partitioning.go:partitionFor +// for use by the encoder's validation gate 5. Same FNV-1a 32-bit +// inlined hash, same masking on (PartitionCount - 1). MUST be a copy +// (M3b-3 circular-dep ban: internal/backup cannot import adapter). +// Returns 0 when partitionFor would: classic queues, perQueue mode, +// or empty MessageGroupID. +func partitionForGroup(meta *sqsQueueMetaPublic, messageGroupID string) uint32 { + if meta == nil || meta.PartitionCount <= 1 { + return 0 + } + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + return 0 + } + if messageGroupID == "" { + return 0 + } + const ( + fnv32Offset uint32 = 2166136261 + fnv32Prime uint32 = 16777619 + ) + hash := fnv32Offset + for i := 0; i < len(messageGroupID); i++ { + hash ^= uint32(messageGroupID[i]) + hash *= fnv32Prime + } + // PartitionCount is power-of-two (live validator-enforced). + return hash & (meta.PartitionCount - 1) +} + +// sortMessagesForPartitionedEmit sorts messages by +// (partition, send_timestamp_millis, sequence_number, message_id). +// Partition is the leading key so per-partition state (maxSeq, future +// counters) is computed deterministically; the remaining three fields +// match sortMessagesForEmit (sqs.go:842) for the classic path. +// validatePartitioning ensures Partition is non-nil whenever the +// caller invokes this fn (PartitionCount > 1), so the *rec.Partition +// dereference is safe. +func sortMessagesForPartitionedEmit(msgs []sqsMessageRecord) { + partitionOf := func(r *sqsMessageRecord) uint32 { + if r.Partition == nil { + return 0 + } + return *r.Partition + } + sort.SliceStable(msgs, func(i, j int) bool { + a, b := &msgs[i], &msgs[j] + pa, pb := partitionOf(a), partitionOf(b) + switch { + case pa != pb: + return pa < pb + case a.SendTimestampMillis != b.SendTimestampMillis: + return a.SendTimestampMillis < b.SendTimestampMillis + case a.SequenceNumber != b.SequenceNumber: + return a.SequenceNumber < b.SequenceNumber + default: + return a.MessageID < b.MessageID + } + }) +} + // addMessage stages one !sqs|msg|data| record and returns the message's // sequence number (for the seq-counter computation). Visibility state is // zeroed (every message restores visible), matching the decoder default. -func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec sqsMessageRecord) (uint64, error) { +// +// isPartitioned is derived from raw meta.PartitionCount > 1 by the +// caller, NOT from rec.Partition != nil. The partition uint32 carries +// the value (0 if the queue is classic). See stageMessageRecords doc +// for why the key-shape decision MUST be made on the queue-level +// predicate (codex P1 / gemini critical PR #929). +func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) { if rec.MessageID == "" { return 0, errors.Wrap(ErrSQSEncodeInvalidMessage, "message missing message_id") } @@ -265,7 +553,12 @@ func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec if err != nil { return 0, err } - key := sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) + var key []byte + if isPartitioned { + key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID) + } else { + key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) + } if err := b.Add(key, val, 0); err != nil { return 0, err } @@ -400,3 +693,67 @@ func sqsMsgDataKeyBytes(queueName string, generation uint64, messageID string) [ out = binary.BigEndian.AppendUint64(out, generation) return append(out, encodeSQSSegment(messageID)...) } + +// sqsPartitionedQueueTerminator mirrors adapter/sqs_keys.go:82 — the +// literal '|' that terminates the encoded queue segment (and, in +// dedup keys, the encoded group segment) inside a partitioned key. +// encodeSQSSegment uses base64.RawURLEncoding which never emits '|', +// so the terminator is unambiguous. +const sqsPartitionedQueueTerminator byte = '|' + +// sqsFifoThroughputPerQueue mirrors adapter/sqs_partitioning.go:37 +// (htfifoThroughputPerQueue). When this is the FifoThroughputLimit +// value, partitionFor collapses every group to partition 0 +// regardless of meta.PartitionCount; the encoder uses it for one +// fail-closed gate (ErrSQSEncodePartitionRoutingMismatch in slice D) +// and as an effectivePartitionCount input. CANNOT be imported from +// adapter — htfifoThroughputPerQueue is unexported (M3b-3 circular +// dependency pattern). +const sqsFifoThroughputPerQueue = "perQueue" + +// Partitioned key prefixes mirror adapter/sqs_keys.go:91..95. They +// are the legacy prefixes with the partitioned discriminator "p|" +// appended. +const ( + SQSPartitionedMsgDataPrefix = SQSMsgDataPrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgVisPrefix = SQSMsgVisPrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgByAgePrefix = SQSMsgByAgePrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgDedupPrefix = SQSMsgDedupPrefix + sqsPartitionedDiscriminator +) + +// sqsPartitionedMsgDataKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgDataKey for a partitioned +// queue: prefix + base64url(queue) + '|' + BE-u32(partition) + +// BE-u64(gen) + base64url(messageID). Mirrored locally because +// internal/backup/ cannot import adapter (M3b-3 pattern). +func sqsPartitionedMsgDataKeyBytes(queueName string, partition uint32, generation uint64, messageID string) []byte { + out := []byte(SQSPartitionedMsgDataPrefix) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, generation) + return append(out, encodeSQSSegment(messageID)...) +} + +// effectivePartitionCount mirrors adapter/sqs_keys_dispatch.go:121 +// (which operates on the unexported *adapter.sqsQueueMeta). Returns +// 1 for nil meta or PartitionCount <= 1, 1 when FifoThroughputLimit +// == "perQueue" (the live router collapses every group to partition +// 0 in that mode), otherwise meta.PartitionCount. +// +// NOTE: this helper is NOT used by the encoder's validation gates — +// the M5-3 design pins those gates on raw meta.PartitionCount > 1 +// (codex P2 v914 v7). Kept here for diagnostics, ReceiveMessage scan +// fan-out cross-checks, and future symmetry with the adapter; using +// it in a gate predicate would silently let a perQueue dump with +// Partition == nil emit classic keys against a partitioned-keyspace +// queue. +func effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 { + if meta == nil || meta.PartitionCount <= 1 { + return 1 + } + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + return 1 + } + return meta.PartitionCount +} diff --git a/internal/backup/encode_sqs_partitioned_test.go b/internal/backup/encode_sqs_partitioned_test.go new file mode 100644 index 00000000..49f3dccf --- /dev/null +++ b/internal/backup/encode_sqs_partitioned_test.go @@ -0,0 +1,801 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "testing" + + "github.com/cockroachdb/errors" +) + +// TestSQSEncodePartitionedMsgDataKeyByteShape pins +// sqsPartitionedMsgDataKeyBytes against the wire format mandated by +// adapter/sqs_keys.go:sqsPartitionedMsgDataKey: +// +// prefix + base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// base64url(messageID). +// +// internal/backup cannot import adapter to cross-check directly (M3b-3 +// circular-dep ban; same constraint sqsFifoDedupWindowMillis lives +// under), so this is a manually-assembled byte-equality assertion. +func TestSQSEncodePartitionedMsgDataKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "q1" + msgID = "msg-001" + partN = uint32(3) + genCnt = uint64(7) + ) + want := []byte(SQSPartitionedMsgDataPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgDataKeyBytes(queue, partN, genCnt, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned data key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgVisKeyByteShape pins +// sqsPartitionedMsgVisKeyBytes: prefix + base64url(queue) + '|' + +// BE-u32(partition) + BE-u64(gen) + BE-u64(visibleAt) + +// base64url(messageID). +func TestSQSEncodePartitionedMsgVisKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qvis" + msgID = "mid" + partN = uint32(0) + genCnt = uint64(1) + visMs = int64(1_700_000_000_000) + ) + want := []byte(SQSPartitionedMsgVisPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = binary.BigEndian.AppendUint64(want, uint64(visMs)) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgVisKeyBytes(queue, partN, genCnt, visMs, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned vis key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgByAgeKeyByteShape pins +// sqsPartitionedMsgByAgeKeyBytes: prefix + base64url(queue) + '|' + +// BE-u32(partition) + BE-u64(gen) + BE-u64(sendTs) + +// base64url(messageID). +func TestSQSEncodePartitionedMsgByAgeKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qage" + msgID = "mid-age" + partN = uint32(15) + genCnt = uint64(1) + sendMs = int64(1_600_000_000_000) + ) + want := []byte(SQSPartitionedMsgByAgePrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = binary.BigEndian.AppendUint64(want, uint64(sendMs)) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgByAgeKeyBytes(queue, partN, genCnt, sendMs, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned byage key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgDedupKeyByteShape pins +// sqsPartitionedMsgDedupKeyBytes including the second '|' between +// the variable-length group and dedup segments (CodeRabbit major +// PR #732 round 6 / M5-3 design doc §line 19): prefix + +// base64url(queue) + '|' + BE-u32(partition) + +// BE-u64(sqsRestoreGeneration) + base64url(groupID) + '|' + +// base64url(dedupID). gen is hardcoded to sqsRestoreGeneration in +// the constructor (mirroring the classic sqsMsgDedupKeyBytes pattern; +// satisfies unparam). +func TestSQSEncodePartitionedMsgDedupKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qdedup" + groupID = "group-A" + dedupID = "dedup-001" + partN = uint32(2) + ) + want := []byte(SQSPartitionedMsgDedupPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, sqsRestoreGeneration) + want = append(want, encodeSQSSegment(groupID)...) + want = append(want, sqsPartitionedQueueTerminator) + want = append(want, encodeSQSSegment(dedupID)...) + + got := sqsPartitionedMsgDedupKeyBytes(queue, partN, groupID, dedupID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned dedup key mismatch\ngot: %x\nwant: %x", got, want) + } + + // Also pin that the group+dedup terminator is present at the + // expected offset so a future change that drops it (regressing + // the FNV-collision class) fails this test. + offset := len(SQSPartitionedMsgDedupPrefix) + + len(encodeSQSSegment(queue)) + 1 + // queue + '|' + 4 + 8 + // partition u32 + gen u64 + len(encodeSQSSegment(groupID)) + if got[offset] != sqsPartitionedQueueTerminator { + t.Fatalf("dedup key byte at offset %d = %x, want '|'", offset, got[offset]) + } +} + +// TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue pins gate 1 +// of validatePartitioning: a partitioned queue (PartitionCount > 1) +// with a message whose Partition field is nil (legacy pre-M5-3 dump +// shape) fails closed with ErrSQSEncodeMissingPartition. Without +// this gate the message would route to the classic key keyspace +// (addMessage's `partition != nil` dispatch falls to +// sqsMsgDataKeyBytes) and become invisible to the live readers +// scanning the partitioned keyspace (codex P1 v914 v2). +func TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2}`), + [][]byte{ + // No "partition" field — legacy dump shape. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1"}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeMissingPartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeMissingPartition", err) + } +} + +// TestSQSEncodeRejectsOutOfRangePartition pins gate 2: a partitioned +// queue with a message whose *Partition >= PartitionCount fails +// closed with ErrSQSEncodeOutOfRangePartition. Such a dump is +// internally inconsistent (operator hand-edit or M5-3 decoder bug). +func TestSQSEncodeRejectsOutOfRangePartition(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1","partition":5}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeOutOfRangePartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeOutOfRangePartition", err) + } +} + +// TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO pins gate 3 +// (codex P2 v914 v4). For a partitioned queue with +// FifoThroughputLimit == "perQueue", the live partitionFor +// (adapter/sqs_partitioning.go:71-72) forces every group to +// partition 0; ReceiveMessage only scans the partition-0 lane. +// Accepting *Partition != 0 would restore messages onto |p|1|... or +// |p|N|... lanes the live receive never visits. +func TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perQueue"}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1","partition":1}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodePartitionRoutingMismatch) { + t.Fatalf("Encode err = %v, want ErrSQSEncodePartitionRoutingMismatch", err) + } +} + +// TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue pins gate 4: +// classic queue (PartitionCount <= 1) with a message whose +// Partition is non-nil and non-zero fails closed with the existing +// ErrSQSEncodeInvalidMessage sentinel. Classic keyspace has no +// partition concept; the dump is internally inconsistent. +func TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,"partition":2}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeInvalidMessage) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeInvalidMessage", err) + } +} + +// TestSQSEncodeGateUsesRawPartitionCount is the regression for codex +// P2 v914 v7. The four validation gates MUST use raw +// meta.PartitionCount > 1 as the partitioned-queue predicate, never +// effectivePartitionCount. A perQueue queue with PartitionCount=2 +// collapses effectivePartitionCount to 1; if the missing-partition +// gate used the effective count, a message with Partition==nil +// would slip past validation, then addMessage's `partition != nil` +// dispatch would emit the classic-shape data key against a +// partitioned-keyspace queue — invisible on first read. +// +// This test would silently pass if the gate predicate were changed +// to effectivePartitionCount(&meta) > 1. +func TestSQSEncodeGateUsesRawPartitionCount(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perQueue"}`), + [][]byte{ + // partition_count=2 + perQueue: effectivePartitionCount==1. + // No "partition" field. Raw-PartitionCount gate must fire + // before the perQueue routing gate has a chance to inspect + // a nil Partition. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1"}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeMissingPartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeMissingPartition (codex P2 v914 v7 — gate must use raw PartitionCount)", err) + } +} + +// TestSQSEncodePartitionedDedupKeepsLatestOnExpiredCollision pins +// codex P2 #929. Two FIFO partitioned messages with the SAME +// (group, dedupID) tuple but sent more than the dedup window +// (5 minutes) apart end up with the same dedup-row key. The live +// keyspace only holds one row at a time (the later send overwrites +// the prior expired one), but the dump captures BOTH message data +// records. Naive per-message dedup emission would collide on +// snapshotBuilder.Add and abort the restore. pickDedupWinners must +// keep only the LATEST send's dedup row. +// +// "g1" hashes to partition 1 (FNV-1a-mod-2). Both messages share +// group_id="g1" + dedup_id="d" but differ in send_timestamp by 10 +// minutes (well beyond sqsFifoDedupWindowMillis = 5 min). +func TestSQSEncodePartitionedDedupKeepsLatestOnExpiredCollision(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + // Older send (send_ts = 1_000_000). + []byte(`{"message_id":"m-old","body":"first","send_timestamp_millis":1000000,` + + `"available_at_millis":1000000,"message_group_id":"g1",` + + `"message_deduplication_id":"d","sequence_number":1,"partition":1}`), + // Newer send 10 min later (send_ts = 1_600_000); dedup + // window already expired for the older send. + []byte(`{"message_id":"m-new","body":"second","send_timestamp_millis":1600000,` + + `"available_at_millis":1600000,"message_group_id":"g1",` + + `"message_deduplication_id":"d","sequence_number":2,"partition":1}`), + }) + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // Exactly ONE dedup row, and it must encode m-new (the latest). + var dedupCount int + var dedupVal []byte + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDedupPrefix)) { + dedupCount++ + dedupVal = e.UserValue + } + } + if dedupCount != 1 { + t.Fatalf("dedup-row count = %d, want 1 (only the latest send keeps the row)", dedupCount) + } + if !bytes.Contains(dedupVal, []byte(`"message_id":"m-new"`)) { + t.Fatalf("dedup row contents = %s, want winner = m-new", dedupVal) + } + // Both DATA records still emitted. + dataCount := 0 + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDataPrefix)) { + dataCount++ + } + } + if dataCount != 2 { + t.Fatalf("data-record count = %d, want 2 (both messages still emitted)", dataCount) + } +} + +// TestSQSEncodeClassicDedupKeepsLatestOnExpiredCollision pins the +// same fix for classic FIFO queues. Codex P2 #929 framed the issue +// around partitioned queues, but the same key-collision class +// exists for classic queues (where the dedup key is just +// (queue, dedupID) — no partition / group disambiguation). +func TestSQSEncodeClassicDedupKeepsLatestOnExpiredCollision(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m-old","body":"first","send_timestamp_millis":1000000,` + + `"available_at_millis":1000000,"message_group_id":"g",` + + `"message_deduplication_id":"d","sequence_number":1}`), + []byte(`{"message_id":"m-new","body":"second","send_timestamp_millis":1600000,` + + `"available_at_millis":1600000,"message_group_id":"g",` + + `"message_deduplication_id":"d","sequence_number":2}`), + }) + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + var dedupCount int + var dedupVal []byte + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSMsgDedupPrefix)) { + dedupCount++ + dedupVal = e.UserValue + } + } + if dedupCount != 1 { + t.Fatalf("classic dedup-row count = %d, want 1", dedupCount) + } + if !bytes.Contains(dedupVal, []byte(`"message_id":"m-new"`)) { + t.Fatalf("classic dedup row contents = %s, want winner = m-new", dedupVal) + } +} + +// TestSQSEncodeClassicDedupKeepsLatestAcrossGroups pins codex P2 +// #929 (round 2): for classic FIFO queues the live dedup key is +// (queue, generation, dedupID) — NO group / partition. Two retained +// messages in DIFFERENT MessageGroupIds but the SAME dedupID +// (after the 5-minute window expired) must collapse to one winner; +// otherwise both groups emit the same classic dedup key and +// snapshotBuilder.Add fails. +// +// Earlier pickDedupWinners keyed on (partition, group, dedupID) for +// both shapes; this test would have produced 2 dedup rows and +// aborted the restore. The classic-collapse keying produces 1 row +// for the latest send and 2 data records. +func TestSQSEncodeClassicDedupKeepsLatestAcrossGroups(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m-grpA","body":"a","send_timestamp_millis":1000000,` + + `"available_at_millis":1000000,"message_group_id":"groupA",` + + `"message_deduplication_id":"d","sequence_number":1}`), + // Different group, same dedup, 10 min later. + []byte(`{"message_id":"m-grpB","body":"b","send_timestamp_millis":1600000,` + + `"available_at_millis":1600000,"message_group_id":"groupB",` + + `"message_deduplication_id":"d","sequence_number":2}`), + }) + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + var dedupCount int + var dedupVal []byte + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSMsgDedupPrefix)) { + dedupCount++ + dedupVal = e.UserValue + } + } + if dedupCount != 1 { + t.Fatalf("classic cross-group dedup-row count = %d, want 1", dedupCount) + } + if !bytes.Contains(dedupVal, []byte(`"message_id":"m-grpB"`)) { + t.Fatalf("classic cross-group dedup row = %s, want winner = m-grpB", dedupVal) + } + // Both DATA records emitted (data key includes message_id). + dataCount := 0 + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSMsgDataPrefix)) && + !bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDataPrefix)) { + dataCount++ + } + } + if dataCount != 2 { + t.Fatalf("classic cross-group data-record count = %d, want 2", dataCount) + } +} + +// TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId pins gate 5 +// (codex P2 #929). For a perMessageGroupId HT-FIFO queue, the +// partition value MUST match partitionFor(MessageGroupID). An +// in-range but wrong partition would split a FIFO group across two +// partition-scoped group-lock keyspaces and break FIFO order on +// first read. +// +// "g0" hashes (FNV-1a-mod-2) to partition 0; pairing it with +// partition=1 violates the invariant. +func TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + // group_id="g0" hashes to partition 0; partition=1 is + // in-range but inconsistent. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g0","partition":1}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodePartitionHashMismatch) { + t.Fatalf("Encode err = %v, want ErrSQSEncodePartitionHashMismatch", err) + } +} + +// TestSQSEncodePartitionForGroup_LiveAdapterParity verifies the +// internal partitionForGroup mirror matches adapter/sqs_partitioning.go +// for representative inputs. We cannot import adapter (M3b-3 ban), +// so this is a parity-by-spec check against the FNV-1a constants. +func TestSQSEncodePartitionForGroup_LiveAdapterParity(t *testing.T) { + t.Parallel() + cases := []struct { + meta *sqsQueueMetaPublic + group string + want uint32 + }{ + // Classic queue: always 0. + {&sqsQueueMetaPublic{PartitionCount: 1}, "anything", 0}, + // perQueue: always 0 regardless of PartitionCount. + {&sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: sqsFifoThroughputPerQueue}, "g0", 0}, + // perMessageGroupId, empty group: 0. + {&sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: "perMessageGroupId"}, "", 0}, + // perMessageGroupId, FNV-1a-mod-2 of "g0" = 0, "g1" = 1. + {&sqsQueueMetaPublic{PartitionCount: 2, FifoThroughputLimit: "perMessageGroupId"}, "g0", 0}, + {&sqsQueueMetaPublic{PartitionCount: 2, FifoThroughputLimit: "perMessageGroupId"}, "g1", 1}, + } + for _, c := range cases { + got := partitionForGroup(c.meta, c.group) + if got != c.want { + t.Errorf("partitionForGroup(meta{PC=%d,limit=%q}, %q) = %d, want %d", + c.meta.PartitionCount, c.meta.FifoThroughputLimit, c.group, got, c.want) + } + } +} + +// TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys is +// the regression for codex P1 / gemini critical (PR #929). A classic +// queue dump with an explicit `"partition": 0` is allowed past gate 4 +// (which only rejects *Partition != 0) — but the dispatch MUST still +// pick classic-shape keys because the live reader for a classic queue +// only scans the classic keyspace. +// +// Before the (isPartitioned bool, partition uint32) signature, the +// `partition != nil` branch incorrectly chose partitioned keys here, +// making restored messages invisible. This test would fail (silently +// emit |p|0|... keys) under that buggy dispatch. +func TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + // PartitionCount<=1 (classic), but message has explicit + // "partition": 0. Gate 4 must NOT fire (*Partition == 0), + // and addMessage must use classic keys. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,"partition":0}`), + }) + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // No partitioned-shape data/vis/byage/dedup keys allowed. + for _, e := range entries { + for _, p := range []string{ + SQSPartitionedMsgDataPrefix, + SQSPartitionedMsgVisPrefix, + SQSPartitionedMsgByAgePrefix, + SQSPartitionedMsgDedupPrefix, + } { + if bytes.HasPrefix(e.UserKey, []byte(p)) { + t.Errorf("partitioned-shape key on classic queue with explicit partition=0: %q (prefix %q)", e.UserKey, p) + } + } + } + // At least one classic data key must exist. + found := false + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSMsgDataPrefix)) && + !bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDataPrefix)) { + found = true + break + } + } + if !found { + t.Fatalf("no classic data record found") + } +} + +// TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip pins that a +// pre-M5-3 messages.jsonl (no "partition" field on any line) still +// round-trips through the M5-3 encoder unchanged when the queue is +// classic (PartitionCount <= 1). The classic constructor branch is +// selected by addMessage when Partition == nil. +func TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "legacy", + []byte(`{"format_version":1,"name":"legacy",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000}`), + []byte(`{"message_id":"m2","body":"world",` + + `"send_timestamp_millis":2000,"available_at_millis":2000}`), + }) + _, msgs := decodeSQSAndRead(t, encodeSQSTree(t, in), "legacy") + if len(msgs) != 2 { + t.Fatalf("round-tripped %d messages, want 2", len(msgs)) + } + for i := range msgs { + if msgs[i].Partition != nil { + t.Fatalf("msg[%d].Partition = %v, want nil for classic round-trip", i, *msgs[i].Partition) + } + } +} + +// TestSQSEncodePartitionedQueueRoundTrip verifies the partitioned +// data + side records are emitted with the |p| prefix and the +// partition is preserved end-to-end through encode → decode. +func TestSQSEncodePartitionedQueueRoundTrip(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"m0a","body":"p0-a","send_timestamp_millis":1000,` + + `"available_at_millis":1000,"message_group_id":"g0","message_deduplication_id":"d0a",` + + `"sequence_number":1,"partition":0}`), + []byte(`{"message_id":"m1a","body":"p1-a","send_timestamp_millis":1100,` + + `"available_at_millis":1100,"message_group_id":"g1","message_deduplication_id":"d1a",` + + `"sequence_number":2,"partition":1}`), + []byte(`{"message_id":"m1b","body":"p1-b","send_timestamp_millis":1200,` + + `"available_at_millis":1200,"message_group_id":"g1","message_deduplication_id":"d1b",` + + `"sequence_number":3,"partition":1}`), + }) + + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // All data/vis/byage/dedup keys for this queue must be the + // partitioned shape (|p|). + wantPrefixes := []string{ + SQSPartitionedMsgDataPrefix, + SQSPartitionedMsgVisPrefix, + SQSPartitionedMsgByAgePrefix, + SQSPartitionedMsgDedupPrefix, + } + for _, prefix := range wantPrefixes { + found := 0 + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(prefix)) { + found++ + } + } + if found == 0 { + t.Errorf("no entries with prefix %q (3-message partitioned queue should emit ≥1)", prefix) + } + } + // No classic-shape data/vis/byage/dedup keys for this queue. + for _, classic := range []string{SQSMsgDataPrefix, SQSMsgVisPrefix, SQSMsgByAgePrefix, SQSMsgDedupPrefix} { + for _, e := range entries { + if !bytes.HasPrefix(e.UserKey, []byte(classic)) { + continue + } + // Skip if it's actually a partitioned key (the partitioned + // prefix starts with the classic prefix + "p|"). + classicAndDiscriminator := classic + sqsPartitionedDiscriminator + if bytes.HasPrefix(e.UserKey, []byte(classicAndDiscriminator)) { + continue + } + t.Errorf("classic-shape key on a partitioned queue: %q", e.UserKey) + } + } +} + +// TestSQSEncodePartitionedDedupBuildsGroupSegment pins the +// partitioned dedup row's +|+ shape: the +// MessageGroupID is base64-encoded as part of the key and a +// terminator '|' separates it from the dedupID segment (CodeRabbit +// major PR #732 round 6). +func TestSQSEncodePartitionedDedupBuildsGroupSegment(t *testing.T) { + t.Parallel() + in := t.TempDir() + const ( + queue = "p.fifo" + groupID = "groupA" + dedupID = "dedup-1" + ) + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"` + groupID + `",` + + `"message_deduplication_id":"` + dedupID + `",` + + `"sequence_number":1,"partition":1}`), + }) + + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // Find the dedup entry and compare to the manually-built key. + want := sqsPartitionedMsgDedupKeyBytes(queue, 1, groupID, dedupID) + var got []byte + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDedupPrefix)) { + got = e.UserKey + break + } + } + if got == nil { + t.Fatalf("no partitioned dedup entry emitted") + } + if !bytes.Equal(got, want) { + t.Fatalf("partitioned dedup key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedSideRecordsByteCrossCheck verifies the +// three partitioned side-record bytes emitted by addSideRecords +// equal what sqsPartitionedMsg{Vis,ByAge,Dedup}KeyBytes produce +// directly. This is the in-package cross-check the M5-2 precedent +// at TestSQSEncodeSideRecordsCrossCheckClassic uses for the classic +// path (the adapter's unexported sqsPartitionedMsg* constructors +// cannot be invoked from this package). +func TestSQSEncodePartitionedSideRecordsByteCrossCheck(t *testing.T) { + t.Parallel() + in := t.TempDir() + // groupID "g1" hashes (FNV-1a-mod-2) to partition 1; pair it with + // partition=1 so gate 5 (partition-hash consistency for + // perMessageGroupId) is satisfied. + const ( + queue = "p.fifo" + groupID = "g1" + dedupID = "d0" + msgID = "mid-001" + sendMs = int64(2000) + availMs = int64(2000) + partN = uint32(1) + ) + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"mid-001","body":"x","send_timestamp_millis":2000,` + + `"available_at_millis":2000,"message_group_id":"g1",` + + `"message_deduplication_id":"d0","sequence_number":1,"partition":1}`), + }) + entries, _, err := DecodeLiveEntries(bytes.NewReader(encodeSQSTree(t, in))) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + wantVis := sqsPartitionedMsgVisKeyBytes(queue, partN, sqsRestoreGeneration, availMs, msgID) + wantByAge := sqsPartitionedMsgByAgeKeyBytes(queue, partN, sqsRestoreGeneration, sendMs, msgID) + wantDedup := sqsPartitionedMsgDedupKeyBytes(queue, partN, groupID, dedupID) + var gotVis, gotByAge, gotDedup []byte + for _, e := range entries { + switch { + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgVisPrefix)): + gotVis = e.UserKey + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgByAgePrefix)): + gotByAge = e.UserKey + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDedupPrefix)): + gotDedup = e.UserKey + } + } + if !bytes.Equal(gotVis, wantVis) { + t.Errorf("partitioned vis mismatch\ngot: %x\nwant: %x", gotVis, wantVis) + } + if !bytes.Equal(gotByAge, wantByAge) { + t.Errorf("partitioned byage mismatch\ngot: %x\nwant: %x", gotByAge, wantByAge) + } + if !bytes.Equal(gotDedup, wantDedup) { + t.Errorf("partitioned dedup mismatch\ngot: %x\nwant: %x", gotDedup, wantDedup) + } +} + +// TestSQSEncodePartitionedSortStableAcrossPartitions pins the +// 4-field sort: messages within the same partition with identical +// send_timestamp_millis are ordered by sequence_number, and +// messages from different partitions group together by partition +// first. +func TestSQSEncodePartitionedSortStableAcrossPartitions(t *testing.T) { + t.Parallel() + // Two messages on partition 1 with identical send_ts but + // different sequence_number, plus one on partition 0 with + // later send_ts. + records := []sqsMessageRecord{ + {MessageID: "m1-b", SendTimestampMillis: 1000, SequenceNumber: 2, Partition: u32ptr(1)}, + {MessageID: "m0", SendTimestampMillis: 2000, SequenceNumber: 1, Partition: u32ptr(0)}, + {MessageID: "m1-a", SendTimestampMillis: 1000, SequenceNumber: 1, Partition: u32ptr(1)}, + } + sortMessagesForPartitionedEmit(records) + wantOrder := []string{"m0", "m1-a", "m1-b"} + for i, r := range records { + if r.MessageID != wantOrder[i] { + t.Errorf("records[%d].MessageID = %q, want %q", i, r.MessageID, wantOrder[i]) + } + } +} + +func u32ptr(v uint32) *uint32 { return &v } + +// TestSQSEffectivePartitionCount pins the local mirror of +// adapter/sqs_keys_dispatch.go:effectivePartitionCount: nil meta, +// PartitionCount<=1, and perQueue all collapse to 1; otherwise the +// declared PartitionCount is returned. +func TestSQSEffectivePartitionCount(t *testing.T) { + t.Parallel() + cases := []struct { + name string + meta *sqsQueueMetaPublic + want uint32 + }{ + {"nil meta", nil, 1}, + {"PartitionCount=0", &sqsQueueMetaPublic{PartitionCount: 0}, 1}, + {"PartitionCount=1", &sqsQueueMetaPublic{PartitionCount: 1}, 1}, + {"PartitionCount=4 perQueue", &sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: sqsFifoThroughputPerQueue}, 1}, + {"PartitionCount=4 perGroup", &sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: "perMessageGroupId"}, 4}, + {"PartitionCount=4 unset throughput", &sqsQueueMetaPublic{PartitionCount: 4}, 4}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + if got := effectivePartitionCount(c.meta); got != c.want { + t.Fatalf("effectivePartitionCount(%+v) = %d, want %d", c.meta, got, c.want) + } + }) + } +} diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 5da82e99..2f079ac9 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -70,6 +70,60 @@ func sqsMsgDedupKeyBytes(queueName, dedupID string) []byte { return append(out, encodeSQSSegment(dedupID)...) } +// sqsPartitionedMsgVisKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgVisKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// BE-u64(visibleAt) + base64url(messageID). Mirrored locally per +// M3b-3 (internal/backup cannot import adapter). +func sqsPartitionedMsgVisKeyBytes(queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgVisPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgVisPrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(visibleAtMillis)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsPartitionedMsgByAgeKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgByAgeKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// BE-u64(sendTs) + base64url(messageID). +func sqsPartitionedMsgByAgeKeyBytes(queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgByAgePrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgByAgePrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(sendTimestampMs)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsPartitionedMsgDedupKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgDedupKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(sqsRestoreGeneration) + +// base64url(groupID) + '|' + base64url(dedupID). The live adapter signature +// accepts a variable gen, but every M5-3 call site uses +// sqsRestoreGeneration (matching the classic sqsMsgDedupKeyBytes +// hardcode), so the parameter is hardcoded here to satisfy unparam. +// Note the second '|' between the variable-length group and dedup +// segments — without it distinct (groupID, dedupID) pairs can +// FNV-collapse onto the same key (CodeRabbit major PR #732 round 6; +// design doc §line 19). +func sqsPartitionedMsgDedupKeyBytes(queueName string, partition uint32, groupID, dedupID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgDedupPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgDedupPrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, sqsRestoreGeneration) + out = append(out, encodeSQSSegment(groupID)...) + out = append(out, sqsPartitionedQueueTerminator) + return append(out, encodeSQSSegment(dedupID)...) +} + // sqsClampNonNegativeMillis mirrors adapter/sqs_messages.go uint64MaxZero: // wall-clock millis should never be negative, but a negative int64 would // silently overflow under a direct uint64() cast and produce a far-future @@ -130,19 +184,34 @@ func resolveDedupID(rec *sqsMessageRecord, meta *sqsQueueMetaPublic) string { // - dedup: FIFO + resolveDedupID(rec, meta) non-empty. ExpiresAtMillis = // SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256 // derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID). -func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { +func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, emitDedup bool, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { msgIDBytes := []byte(rec.MessageID) - visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + var visKey, byAgeKey []byte + if isPartitioned { + visKey = sqsPartitionedMsgVisKeyBytes(queueName, partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + } else { + visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + } if err := b.Add(visKey, msgIDBytes, 0); err != nil { return err } - - byAgeKey := sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) if err := b.Add(byAgeKey, msgIDBytes, 0); err != nil { return err } + // emitDedup is set by stageMessageRecords' pre-pass (pickDedupWinners) + // to false for every message whose (group, dedupID) tuple is owned + // by a LATER send. The live FIFO dedup window + // (sqsFifoDedupWindowMillis = 5 minutes) lets a duplicate re-send + // overwrite the prior dedup row, so when the dump captures both + // messages naive per-message emission would collide on + // snapshotBuilder.Add. Codex P2 #929. + if !emitDedup { + return nil + } if !meta.FIFO { return nil } @@ -160,6 +229,16 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, if err != nil { return err } - dedupKey := sqsMsgDedupKeyBytes(queueName, dedupID) + var dedupKey []byte + if isPartitioned { + // Partitioned dedup includes + '|' + + // per the partitioned dedup constructor — see + // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit + // major PR #732 round 6: the second '|' prevents FNV-collapse + // across distinct (groupID, dedupID) pairs on the same partition). + dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID) + } else { + dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) + } return b.Add(dedupKey, val, 0) } diff --git a/internal/backup/encode_sqs_test.go b/internal/backup/encode_sqs_test.go index e05ba995..0f1b4842 100644 --- a/internal/backup/encode_sqs_test.go +++ b/internal/backup/encode_sqs_test.go @@ -237,18 +237,6 @@ func TestSQSEncodeBinaryBodyRoundTrip(t *testing.T) { } } -// TestSQSEncodeRejectsPartitioned pins fail-closed for an HT-FIFO queue. -func TestSQSEncodeRejectsPartitioned(t *testing.T) { - t.Parallel() - in := t.TempDir() - writeSQSQueue(t, in, "ht.fifo", []byte(`{"format_version":1,"name":"ht.fifo","fifo":true,`+ - `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0,"partition_count":4}`), nil) - b := newSnapshotBuilder(sqsEncTS) - if err := NewSQSRecordEncoder(in).Encode(b); !errors.Is(err, ErrSQSEncodeUnsupportedPartitioned) { - t.Fatalf("Encode err = %v, want ErrSQSEncodeUnsupportedPartitioned", err) - } -} - // TestSQSEncodeRejectsNonRegularQueueMeta pins the file guard: a // _queue.json that is a directory is refused with ErrSQSEncodeNotRegular. func TestSQSEncodeRejectsNonRegularQueueMeta(t *testing.T) { diff --git a/internal/backup/sqs.go b/internal/backup/sqs.go index c8952a3d..7969331f 100644 --- a/internal/backup/sqs.go +++ b/internal/backup/sqs.go @@ -248,6 +248,12 @@ type sqsMessageRecord struct { MessageDedupID string `json:"message_deduplication_id,omitempty"` SequenceNumber uint64 `json:"sequence_number,omitempty"` DeadLetterSourceArn string `json:"dead_letter_source_arn,omitempty"` + // Partition is non-nil only when the message was read from a + // partitioned (`!sqs|msg|data|p|`) key. The value is the partition + // number recovered from the on-disk key trailer; M5-3 reverse + // encoder uses it to reproduce the partitioned key shape on + // restore. Pre-M5-3 dumps lack this field — Partition == nil. + Partition *uint32 `json:"partition,omitempty"` } // NewSQSEncoder constructs an encoder rooted at /sqs/. @@ -338,7 +344,7 @@ func (s *SQSEncoder) HandleQueueGen(key, value []byte) error { // the per-queue routing key; the message is buffered until Finalize so it // can be sorted and emitted in send-order. func (s *SQSEncoder) HandleMessageData(key, value []byte) error { - encQueue, err := parseSQSMessageDataKey(key) + encQueue, partition, isPartitioned, err := parseSQSMessageDataKey(key) if err != nil { return err } @@ -352,6 +358,14 @@ func (s *SQSEncoder) HandleMessageData(key, value []byte) error { rec.ReceiveCount = 0 rec.FirstReceiveMillis = 0 } + // M5-3: wire the parsed partition into the record so the reverse + // encoder can reproduce the partitioned key shape. Classic-key call + // site keeps rec.Partition == nil so legacy dumps round-trip + // unchanged (design doc §"Decoder lift"). + if isPartitioned { + p := partition + rec.Partition = &p + } st := s.queueState(encQueue) st.messages = append(st.messages, rec) return nil @@ -531,13 +545,17 @@ func stripPrefixSegment(key, prefix []byte) (string, error) { // // Boundary detection (partitioned): the queue segment is terminated by // a literal '|' before the fixed-width partition u32. Codex P1 round 9. -func parseSQSMessageDataKey(key []byte) (string, error) { +func parseSQSMessageDataKey(key []byte) (encQueue string, partition uint32, isPartitioned bool, err error) { rest, err := stripPrefixSegment(key, []byte(SQSMsgDataPrefix)) if err != nil { - return "", err + return "", 0, false, err } if isPartitionedRest(rest) { - return parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key) + enc, part, perr := parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key) + if perr != nil { + return "", 0, false, perr + } + return enc, part, true, nil } idx := scanBase64URLBoundary(rest) // idx == 0 -> no queue segment; idx+genBytes >= len(rest) -> no @@ -545,21 +563,21 @@ func parseSQSMessageDataKey(key []byte) (string, error) { // AWS SQS message IDs are non-empty by construction, so an empty // msg-id segment can never be a legitimate snapshot record. if idx == 0 || idx+genBytes >= len(rest) { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, false, errors.Wrapf(ErrSQSMalformedKey, "queue segment or message-id segment not found in %q", key) } - encQueue := rest[:idx] - if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + enc := rest[:idx] + if _, err := base64.RawURLEncoding.DecodeString(enc); err != nil { + return "", 0, false, errors.Wrap(ErrSQSMalformedKey, err.Error()) } // Validate the msg-id segment decodes too; if it doesn't, the // boundary detection got it wrong and we surface an error rather // than emit a record under a wrong queue. encMsgID := rest[idx+genBytes:] if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, false, errors.Wrap(ErrSQSMalformedKey, err.Error()) } - return encQueue, nil + return enc, 0, false, nil } // parseSQSGenericKey is a coarse parser for the side-record prefixes @@ -574,7 +592,14 @@ func parseSQSGenericKey(key []byte, prefix string) (string, error) { return "", err } if isPartitionedRest(rest) { - return parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key) + // Side-record dispatch routes by queue only — the partition + // trailer is parsed for validation but the value is discarded + // here. M5-3 design doc §"Decoder lift" pins this contract. + encQueue, _, perr := parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key) + if perr != nil { + return "", perr + } + return encQueue, nil } idx := scanBase64URLBoundary(rest) // All side-record key shapes (vis / byage / dedup / group / @@ -608,37 +633,39 @@ func isPartitionedRest(rest string) bool { // // Anything else surfaces ErrSQSMalformedKey rather than emitting // records under a wrong queue. -func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, error) { +func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, uint32, error) { body := rest[len(sqsPartitionedDiscriminator):] terminator := strings.IndexByte(body, '|') if terminator <= 0 { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned key missing queue terminator in %q", originalKey) } encQueue := body[:terminator] if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, errors.Wrap(ErrSQSMalformedKey, err.Error()) } trailer := body[terminator+1:] const fixedTrailerBytes = sqsPartitionBytes + genBytes if hasMsgID { // Need partition+gen plus at least 1 byte of msg-id. if len(trailer) <= fixedTrailerBytes { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned msg-data key missing message-id in %q", originalKey) } encMsgID := trailer[fixedTrailerBytes:] if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, errors.Wrap(ErrSQSMalformedKey, err.Error()) } - return encQueue, nil + partition := binary.BigEndian.Uint32([]byte(trailer[:sqsPartitionBytes])) + return encQueue, partition, nil } // Side records: trailer must carry at least partition+gen. if len(trailer) < fixedTrailerBytes { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned side-record key trailer truncated in %q", originalKey) } - return encQueue, nil + partition := binary.BigEndian.Uint32([]byte(trailer[:sqsPartitionBytes])) + return encQueue, partition, nil } // scanBase64URLBoundary returns the index of the first byte in s that is diff --git a/internal/backup/sqs_test.go b/internal/backup/sqs_test.go index c1a0bf90..ba4b2bdc 100644 --- a/internal/backup/sqs_test.go +++ b/internal/backup/sqs_test.go @@ -372,7 +372,7 @@ func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) { // a legitimate snapshot record. key := append([]byte(SQSMsgDataPrefix), []byte("cQ")...) key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) - if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + if _, _, _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { t.Fatalf("err=%v want ErrSQSMalformedKey for empty msg-id", err) } } @@ -597,13 +597,19 @@ func TestSQS_ParsePartitionedMessageDataKey(t *testing.T) { key = append(key, 0x00, 0x00, 0x00, 0x07) // partition = 7 key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen key = append(key, []byte(encMsgID)...) - got, err := parseSQSMessageDataKey(key) + got, partition, isPartitioned, err := parseSQSMessageDataKey(key) if err != nil { t.Fatalf("parseSQSMessageDataKey: %v", err) } if got != encQueue { t.Fatalf("got %q want %q", got, encQueue) } + if !isPartitioned { + t.Fatalf("isPartitioned=false, want true for partitioned key") + } + if partition != 7 { + t.Fatalf("partition=%d want 7", partition) + } } // TestSQS_ParsePartitionedSideRecordKey covers parseSQSGenericKey for @@ -643,7 +649,7 @@ func TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer(t *testing.T key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|") // Only 4 partition bytes, no gen, no msg-id. key = append(key, 0x00, 0x00, 0x00, 0x01) - if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + if _, _, _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { t.Fatalf("err=%v want ErrSQSMalformedKey for truncated partitioned trailer", err) } }