From 235c13baf6785d4d416d8bf9751868191fcafefa Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 02:31:49 +0900 Subject: [PATCH 1/6] backup: M5-3 slice A+B - decoder partition plumbing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per design doc PR #914 §'Decoder lift'. Widens the two partitioned-key parsers to surface the partition u32 they already extract internally, and adds the sqsMessageRecord.Partition *uint32 field so the parsed value reaches the dump JSON. Changes: - parseSQSPartitionedQueueAndTrailer: (string, error) -> (string, uint32, error). The partition was already read off the trailer (binary.BigEndian.Uint32); return it instead of discarding. - parseSQSMessageDataKey: (string, error) -> (string, uint32, bool, error). The third return signals whether the key was the partitioned shape (so HandleMessageData knows when to attach Partition to the record). Classic-key path returns (enc, 0, false, nil) - byte-identical with the pre-M5-3 contract. - parseSQSGenericKey: signature unchanged externally. The new partition return from parseSQSPartitionedQueueAndTrailer is intentionally discarded - side-record dispatch routes by queue name only (HandleSideRecord wraps it). Comment pins the contract. - HandleMessageData: wires rec.Partition = &partition when isPartitioned == true. Classic call site leaves rec.Partition == nil, preserving legacy-dump round-trip. - sqsMessageRecord.Partition *uint32 with omitempty. Pointer (not uint32) so partition-0 messages are distinguishable from absent (codex P1 v914 v2; design doc decision matrix). Caller audit (semantic change to two parse fns): - parseSQSMessageDataKey production callers: HandleMessageData only. Updated to 4-tuple in this commit. 3 test sites also updated. - parseSQSPartitionedQueueAndTrailer production callers: parseSQSMessageDataKey + parseSQSGenericKey only. Both updated. - parseSQSGenericKey signature unchanged - no caller audit needed for that symbol. go test ./internal/backup/ passes. --- internal/backup/sqs.go | 65 ++++++++++++++++++++++++++----------- internal/backup/sqs_test.go | 12 +++++-- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/internal/backup/sqs.go b/internal/backup/sqs.go index c8952a3d7..7969331f3 100644 --- a/internal/backup/sqs.go +++ b/internal/backup/sqs.go @@ -248,6 +248,12 @@ type sqsMessageRecord struct { MessageDedupID string `json:"message_deduplication_id,omitempty"` SequenceNumber uint64 `json:"sequence_number,omitempty"` DeadLetterSourceArn string `json:"dead_letter_source_arn,omitempty"` + // Partition is non-nil only when the message was read from a + // partitioned (`!sqs|msg|data|p|`) key. The value is the partition + // number recovered from the on-disk key trailer; M5-3 reverse + // encoder uses it to reproduce the partitioned key shape on + // restore. Pre-M5-3 dumps lack this field — Partition == nil. + Partition *uint32 `json:"partition,omitempty"` } // NewSQSEncoder constructs an encoder rooted at /sqs/. @@ -338,7 +344,7 @@ func (s *SQSEncoder) HandleQueueGen(key, value []byte) error { // the per-queue routing key; the message is buffered until Finalize so it // can be sorted and emitted in send-order. func (s *SQSEncoder) HandleMessageData(key, value []byte) error { - encQueue, err := parseSQSMessageDataKey(key) + encQueue, partition, isPartitioned, err := parseSQSMessageDataKey(key) if err != nil { return err } @@ -352,6 +358,14 @@ func (s *SQSEncoder) HandleMessageData(key, value []byte) error { rec.ReceiveCount = 0 rec.FirstReceiveMillis = 0 } + // M5-3: wire the parsed partition into the record so the reverse + // encoder can reproduce the partitioned key shape. Classic-key call + // site keeps rec.Partition == nil so legacy dumps round-trip + // unchanged (design doc §"Decoder lift"). + if isPartitioned { + p := partition + rec.Partition = &p + } st := s.queueState(encQueue) st.messages = append(st.messages, rec) return nil @@ -531,13 +545,17 @@ func stripPrefixSegment(key, prefix []byte) (string, error) { // // Boundary detection (partitioned): the queue segment is terminated by // a literal '|' before the fixed-width partition u32. Codex P1 round 9. -func parseSQSMessageDataKey(key []byte) (string, error) { +func parseSQSMessageDataKey(key []byte) (encQueue string, partition uint32, isPartitioned bool, err error) { rest, err := stripPrefixSegment(key, []byte(SQSMsgDataPrefix)) if err != nil { - return "", err + return "", 0, false, err } if isPartitionedRest(rest) { - return parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key) + enc, part, perr := parseSQSPartitionedQueueAndTrailer(rest, true /*hasMsgID*/, key) + if perr != nil { + return "", 0, false, perr + } + return enc, part, true, nil } idx := scanBase64URLBoundary(rest) // idx == 0 -> no queue segment; idx+genBytes >= len(rest) -> no @@ -545,21 +563,21 @@ func parseSQSMessageDataKey(key []byte) (string, error) { // AWS SQS message IDs are non-empty by construction, so an empty // msg-id segment can never be a legitimate snapshot record. if idx == 0 || idx+genBytes >= len(rest) { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, false, errors.Wrapf(ErrSQSMalformedKey, "queue segment or message-id segment not found in %q", key) } - encQueue := rest[:idx] - if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + enc := rest[:idx] + if _, err := base64.RawURLEncoding.DecodeString(enc); err != nil { + return "", 0, false, errors.Wrap(ErrSQSMalformedKey, err.Error()) } // Validate the msg-id segment decodes too; if it doesn't, the // boundary detection got it wrong and we surface an error rather // than emit a record under a wrong queue. encMsgID := rest[idx+genBytes:] if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, false, errors.Wrap(ErrSQSMalformedKey, err.Error()) } - return encQueue, nil + return enc, 0, false, nil } // parseSQSGenericKey is a coarse parser for the side-record prefixes @@ -574,7 +592,14 @@ func parseSQSGenericKey(key []byte, prefix string) (string, error) { return "", err } if isPartitionedRest(rest) { - return parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key) + // Side-record dispatch routes by queue only — the partition + // trailer is parsed for validation but the value is discarded + // here. M5-3 design doc §"Decoder lift" pins this contract. + encQueue, _, perr := parseSQSPartitionedQueueAndTrailer(rest, false /*hasMsgID*/, key) + if perr != nil { + return "", perr + } + return encQueue, nil } idx := scanBase64URLBoundary(rest) // All side-record key shapes (vis / byage / dedup / group / @@ -608,37 +633,39 @@ func isPartitionedRest(rest string) bool { // // Anything else surfaces ErrSQSMalformedKey rather than emitting // records under a wrong queue. -func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, error) { +func parseSQSPartitionedQueueAndTrailer(rest string, hasMsgID bool, originalKey []byte) (string, uint32, error) { body := rest[len(sqsPartitionedDiscriminator):] terminator := strings.IndexByte(body, '|') if terminator <= 0 { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned key missing queue terminator in %q", originalKey) } encQueue := body[:terminator] if _, err := base64.RawURLEncoding.DecodeString(encQueue); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, errors.Wrap(ErrSQSMalformedKey, err.Error()) } trailer := body[terminator+1:] const fixedTrailerBytes = sqsPartitionBytes + genBytes if hasMsgID { // Need partition+gen plus at least 1 byte of msg-id. if len(trailer) <= fixedTrailerBytes { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned msg-data key missing message-id in %q", originalKey) } encMsgID := trailer[fixedTrailerBytes:] if _, err := base64.RawURLEncoding.DecodeString(encMsgID); err != nil { - return "", errors.Wrap(ErrSQSMalformedKey, err.Error()) + return "", 0, errors.Wrap(ErrSQSMalformedKey, err.Error()) } - return encQueue, nil + partition := binary.BigEndian.Uint32([]byte(trailer[:sqsPartitionBytes])) + return encQueue, partition, nil } // Side records: trailer must carry at least partition+gen. if len(trailer) < fixedTrailerBytes { - return "", errors.Wrapf(ErrSQSMalformedKey, + return "", 0, errors.Wrapf(ErrSQSMalformedKey, "partitioned side-record key trailer truncated in %q", originalKey) } - return encQueue, nil + partition := binary.BigEndian.Uint32([]byte(trailer[:sqsPartitionBytes])) + return encQueue, partition, nil } // scanBase64URLBoundary returns the index of the first byte in s that is diff --git a/internal/backup/sqs_test.go b/internal/backup/sqs_test.go index c1a0bf903..ba4b2bdc1 100644 --- a/internal/backup/sqs_test.go +++ b/internal/backup/sqs_test.go @@ -372,7 +372,7 @@ func TestSQS_ParseMessageDataKey_RejectsEmptyMsgIDSegment(t *testing.T) { // a legitimate snapshot record. key := append([]byte(SQSMsgDataPrefix), []byte("cQ")...) key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) - if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + if _, _, _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { t.Fatalf("err=%v want ErrSQSMalformedKey for empty msg-id", err) } } @@ -597,13 +597,19 @@ func TestSQS_ParsePartitionedMessageDataKey(t *testing.T) { key = append(key, 0x00, 0x00, 0x00, 0x07) // partition = 7 key = append(key, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01) // gen key = append(key, []byte(encMsgID)...) - got, err := parseSQSMessageDataKey(key) + got, partition, isPartitioned, err := parseSQSMessageDataKey(key) if err != nil { t.Fatalf("parseSQSMessageDataKey: %v", err) } if got != encQueue { t.Fatalf("got %q want %q", got, encQueue) } + if !isPartitioned { + t.Fatalf("isPartitioned=false, want true for partitioned key") + } + if partition != 7 { + t.Fatalf("partition=%d want 7", partition) + } } // TestSQS_ParsePartitionedSideRecordKey covers parseSQSGenericKey for @@ -643,7 +649,7 @@ func TestSQS_ParsePartitionedMessageDataKey_RejectsTruncatedTrailer(t *testing.T key := []byte(SQSMsgDataPrefix + sqsPartitionedDiscriminator + encQueue + "|") // Only 4 partition bytes, no gen, no msg-id. key = append(key, 0x00, 0x00, 0x00, 0x01) - if _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { + if _, _, _, err := parseSQSMessageDataKey(key); !errors.Is(err, ErrSQSMalformedKey) { t.Fatalf("err=%v want ErrSQSMalformedKey for truncated partitioned trailer", err) } } From 6640674a666849a11cf236468db4d1c7c96adef6 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 02:34:35 +0900 Subject: [PATCH 2/6] backup: M5-3 slice C - mirror partitioned key constructors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per design doc PR #914 §'Encoder lift'. Adds the four partitioned-key constructors (data, vis, byage, dedup) plus the effectivePartitionCount helper into internal/backup/, mirroring adapter/sqs_keys.go and adapter/sqs_keys_dispatch.go. internal/backup/ cannot import adapter (M3b-3 circular-dep pattern; see existing sqsFifoDedupWindowMillis mirror at encode_sqs_side.go:15). The constructors and constants are duplicated locally instead. New symbols (encode_sqs.go): - const sqsPartitionedQueueTerminator byte = '|' (mirror of adapter:82) - const sqsFifoThroughputPerQueue = "perQueue" (mirror of unexported htfifoThroughputPerQueue at adapter/sqs_partitioning.go:37) - const SQSPartitionedMsg{Data,Vis,ByAge,Dedup}Prefix (mirror of adapter Sqs* prefixes at adapter/sqs_keys.go:91..95) - sqsPartitionedMsgDataKeyBytes - effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 - mirror of adapter helper, operating on the dump-side struct. Doc comment explicitly pins that this is NOT used by encoder validation gates (codex P2 v914 v7) - those use raw meta.PartitionCount > 1. New symbols (encode_sqs_side.go): - sqsPartitionedMsgVisKeyBytes - sqsPartitionedMsgByAgeKeyBytes - sqsPartitionedMsgDedupKeyBytes - includes the second '|' between the variable-length group and dedup segments (CodeRabbit major PR #732 round 6; design doc §line 19) to prevent FNV-collision false-dups. Tests (new file encode_sqs_partitioned_test.go): - TestSQSEncodePartitionedMsg{Data,Vis,ByAge,Dedup}KeyByteShape - byte-equality assertions vs manually-assembled expected bytes (since direct adapter cross-check is impossible). - TestSQSEncodePartitionedMsgDedupKeyByteShape also pins the byte at the group+dedup terminator offset so a future change that drops the '|' regressing the FNV-collision class fails this test. - TestSQSEffectivePartitionCount - 6-case table covering nil meta, PartitionCount<=1, perQueue collapse, perGroup, and unset throughput. Pure additions: no behavior change yet. Slice D will wire these into addMessage / addSideRecords with the validation gates and the new sentinels. --- internal/backup/encode_sqs.go | 64 +++++++ .../backup/encode_sqs_partitioned_test.go | 159 ++++++++++++++++++ internal/backup/encode_sqs_side.go | 50 ++++++ 3 files changed, 273 insertions(+) create mode 100644 internal/backup/encode_sqs_partitioned_test.go diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index 1702e58c1..dc40a5cdf 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -400,3 +400,67 @@ func sqsMsgDataKeyBytes(queueName string, generation uint64, messageID string) [ out = binary.BigEndian.AppendUint64(out, generation) return append(out, encodeSQSSegment(messageID)...) } + +// sqsPartitionedQueueTerminator mirrors adapter/sqs_keys.go:82 — the +// literal '|' that terminates the encoded queue segment (and, in +// dedup keys, the encoded group segment) inside a partitioned key. +// encodeSQSSegment uses base64.RawURLEncoding which never emits '|', +// so the terminator is unambiguous. +const sqsPartitionedQueueTerminator byte = '|' + +// sqsFifoThroughputPerQueue mirrors adapter/sqs_partitioning.go:37 +// (htfifoThroughputPerQueue). When this is the FifoThroughputLimit +// value, partitionFor collapses every group to partition 0 +// regardless of meta.PartitionCount; the encoder uses it for one +// fail-closed gate (ErrSQSEncodePartitionRoutingMismatch in slice D) +// and as an effectivePartitionCount input. CANNOT be imported from +// adapter — htfifoThroughputPerQueue is unexported (M3b-3 circular +// dependency pattern). +const sqsFifoThroughputPerQueue = "perQueue" + +// Partitioned key prefixes mirror adapter/sqs_keys.go:91..95. They +// are the legacy prefixes with the partitioned discriminator "p|" +// appended. +const ( + SQSPartitionedMsgDataPrefix = SQSMsgDataPrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgVisPrefix = SQSMsgVisPrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgByAgePrefix = SQSMsgByAgePrefix + sqsPartitionedDiscriminator + SQSPartitionedMsgDedupPrefix = SQSMsgDedupPrefix + sqsPartitionedDiscriminator +) + +// sqsPartitionedMsgDataKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgDataKey for a partitioned +// queue: prefix + base64url(queue) + '|' + BE-u32(partition) + +// BE-u64(gen) + base64url(messageID). Mirrored locally because +// internal/backup/ cannot import adapter (M3b-3 pattern). +func sqsPartitionedMsgDataKeyBytes(queueName string, partition uint32, generation uint64, messageID string) []byte { + out := []byte(SQSPartitionedMsgDataPrefix) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, generation) + return append(out, encodeSQSSegment(messageID)...) +} + +// effectivePartitionCount mirrors adapter/sqs_keys_dispatch.go:121 +// (which operates on the unexported *adapter.sqsQueueMeta). Returns +// 1 for nil meta or PartitionCount <= 1, 1 when FifoThroughputLimit +// == "perQueue" (the live router collapses every group to partition +// 0 in that mode), otherwise meta.PartitionCount. +// +// NOTE: this helper is NOT used by the encoder's validation gates — +// the M5-3 design pins those gates on raw meta.PartitionCount > 1 +// (codex P2 v914 v7). Kept here for diagnostics, ReceiveMessage scan +// fan-out cross-checks, and future symmetry with the adapter; using +// it in a gate predicate would silently let a perQueue dump with +// Partition == nil emit classic keys against a partitioned-keyspace +// queue. +func effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 { + if meta == nil || meta.PartitionCount <= 1 { + return 1 + } + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + return 1 + } + return meta.PartitionCount +} diff --git a/internal/backup/encode_sqs_partitioned_test.go b/internal/backup/encode_sqs_partitioned_test.go new file mode 100644 index 000000000..2b718e1ab --- /dev/null +++ b/internal/backup/encode_sqs_partitioned_test.go @@ -0,0 +1,159 @@ +package backup + +import ( + "bytes" + "encoding/binary" + "testing" +) + +// TestSQSEncodePartitionedMsgDataKeyByteShape pins +// sqsPartitionedMsgDataKeyBytes against the wire format mandated by +// adapter/sqs_keys.go:sqsPartitionedMsgDataKey: +// prefix + base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// base64url(messageID). +// internal/backup cannot import adapter to cross-check directly (M3b-3 +// circular-dep ban; same constraint sqsFifoDedupWindowMillis lives +// under), so this is a manually-assembled byte-equality assertion. +func TestSQSEncodePartitionedMsgDataKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "q1" + msgID = "msg-001" + partN = uint32(3) + genCnt = uint64(7) + ) + want := []byte(SQSPartitionedMsgDataPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgDataKeyBytes(queue, partN, genCnt, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned data key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgVisKeyByteShape pins +// sqsPartitionedMsgVisKeyBytes: prefix + base64url(queue) + '|' + +// BE-u32(partition) + BE-u64(gen) + BE-u64(visibleAt) + +// base64url(messageID). +func TestSQSEncodePartitionedMsgVisKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qvis" + msgID = "mid" + partN = uint32(0) + genCnt = uint64(1) + visMs = int64(1_700_000_000_000) + ) + want := []byte(SQSPartitionedMsgVisPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = binary.BigEndian.AppendUint64(want, uint64(visMs)) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgVisKeyBytes(queue, partN, genCnt, visMs, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned vis key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgByAgeKeyByteShape pins +// sqsPartitionedMsgByAgeKeyBytes: prefix + base64url(queue) + '|' + +// BE-u32(partition) + BE-u64(gen) + BE-u64(sendTs) + +// base64url(messageID). +func TestSQSEncodePartitionedMsgByAgeKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qage" + msgID = "mid-age" + partN = uint32(15) + genCnt = uint64(1) + sendMs = int64(1_600_000_000_000) + ) + want := []byte(SQSPartitionedMsgByAgePrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = binary.BigEndian.AppendUint64(want, uint64(sendMs)) + want = append(want, encodeSQSSegment(msgID)...) + + got := sqsPartitionedMsgByAgeKeyBytes(queue, partN, genCnt, sendMs, msgID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned byage key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedMsgDedupKeyByteShape pins +// sqsPartitionedMsgDedupKeyBytes including the second '|' between +// the variable-length group and dedup segments (CodeRabbit major +// PR #732 round 6 / M5-3 design doc §line 19): prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// base64url(groupID) + '|' + base64url(dedupID). +func TestSQSEncodePartitionedMsgDedupKeyByteShape(t *testing.T) { + t.Parallel() + const ( + queue = "qdedup" + groupID = "group-A" + dedupID = "dedup-001" + partN = uint32(2) + genCnt = uint64(1) + ) + want := []byte(SQSPartitionedMsgDedupPrefix) + want = append(want, encodeSQSSegment(queue)...) + want = append(want, sqsPartitionedQueueTerminator) + want = binary.BigEndian.AppendUint32(want, partN) + want = binary.BigEndian.AppendUint64(want, genCnt) + want = append(want, encodeSQSSegment(groupID)...) + want = append(want, sqsPartitionedQueueTerminator) + want = append(want, encodeSQSSegment(dedupID)...) + + got := sqsPartitionedMsgDedupKeyBytes(queue, partN, genCnt, groupID, dedupID) + if !bytes.Equal(got, want) { + t.Fatalf("partitioned dedup key mismatch\ngot: %x\nwant: %x", got, want) + } + + // Also pin that the group+dedup terminator is present at the + // expected offset so a future change that drops it (regressing + // the FNV-collision class) fails this test. + offset := len(SQSPartitionedMsgDedupPrefix) + + len(encodeSQSSegment(queue)) + 1 + // queue + '|' + 4 + 8 + // partition u32 + gen u64 + len(encodeSQSSegment(groupID)) + if got[offset] != sqsPartitionedQueueTerminator { + t.Fatalf("dedup key byte at offset %d = %x, want '|'", offset, got[offset]) + } +} + +// TestSQSEffectivePartitionCount pins the local mirror of +// adapter/sqs_keys_dispatch.go:effectivePartitionCount: nil meta, +// PartitionCount<=1, and perQueue all collapse to 1; otherwise the +// declared PartitionCount is returned. +func TestSQSEffectivePartitionCount(t *testing.T) { + t.Parallel() + cases := []struct { + name string + meta *sqsQueueMetaPublic + want uint32 + }{ + {"nil meta", nil, 1}, + {"PartitionCount=0", &sqsQueueMetaPublic{PartitionCount: 0}, 1}, + {"PartitionCount=1", &sqsQueueMetaPublic{PartitionCount: 1}, 1}, + {"PartitionCount=4 perQueue", &sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: sqsFifoThroughputPerQueue}, 1}, + {"PartitionCount=4 perGroup", &sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: "perMessageGroupId"}, 4}, + {"PartitionCount=4 unset throughput", &sqsQueueMetaPublic{PartitionCount: 4}, 4}, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + t.Parallel() + if got := effectivePartitionCount(c.meta); got != c.want { + t.Fatalf("effectivePartitionCount(%+v) = %d, want %d", c.meta, got, c.want) + } + }) + } +} diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 5da82e99f..9f1124a00 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -70,6 +70,56 @@ func sqsMsgDedupKeyBytes(queueName, dedupID string) []byte { return append(out, encodeSQSSegment(dedupID)...) } +// sqsPartitionedMsgVisKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgVisKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// BE-u64(visibleAt) + base64url(messageID). Mirrored locally per +// M3b-3 (internal/backup cannot import adapter). +func sqsPartitionedMsgVisKeyBytes(queueName string, partition uint32, gen uint64, visibleAtMillis int64, messageID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgVisPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgVisPrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(visibleAtMillis)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsPartitionedMsgByAgeKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgByAgeKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// BE-u64(sendTs) + base64url(messageID). +func sqsPartitionedMsgByAgeKeyBytes(queueName string, partition uint32, gen uint64, sendTimestampMs int64, messageID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgByAgePrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgByAgePrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsClampNonNegativeMillis(sendTimestampMs)) + return append(out, encodeSQSSegment(messageID)...) +} + +// sqsPartitionedMsgDedupKeyBytes reproduces +// adapter/sqs_keys.go:sqsPartitionedMsgDedupKey: prefix + +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// base64url(groupID) + '|' + base64url(dedupID). Note the second +// '|' between the variable-length group and dedup segments — without +// it distinct (groupID, dedupID) pairs can FNV-collapse onto the +// same key (CodeRabbit major PR #732 round 6; design doc §line 19). +func sqsPartitionedMsgDedupKeyBytes(queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { + out := make([]byte, 0, len(SQSPartitionedMsgDedupPrefix)+sqsSideKeyAllocBytes) + out = append(out, SQSPartitionedMsgDedupPrefix...) + out = append(out, encodeSQSSegment(queueName)...) + out = append(out, sqsPartitionedQueueTerminator) + out = binary.BigEndian.AppendUint32(out, partition) + out = binary.BigEndian.AppendUint64(out, gen) + out = append(out, encodeSQSSegment(groupID)...) + out = append(out, sqsPartitionedQueueTerminator) + return append(out, encodeSQSSegment(dedupID)...) +} + // sqsClampNonNegativeMillis mirrors adapter/sqs_messages.go uint64MaxZero: // wall-clock millis should never be negative, but a negative int64 would // silently overflow under a direct uint64() cast and produce a far-future From 1037deba099f5e81ec29d32fd55be61c3712c965 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 02:40:18 +0900 Subject: [PATCH 3/6] backup: M5-3 slice D - lift partitioned gate + thread partition through emit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per design doc PR #914 §'Encoder lift' and §'Validation invariants'. Drops ErrSQSEncodeUnsupportedPartitioned and replaces it with the four fail-closed validation gates from the design's decision matrix, plus partition-aware dispatch in addMessage / addSideRecords. Sentinel changes: - Remove ErrSQSEncodeUnsupportedPartitioned (no longer reachable). - Add ErrSQSEncodeMissingPartition (partitioned + Partition==nil). - Add ErrSQSEncodeOutOfRangePartition (*Partition >= PartitionCount). - Add ErrSQSEncodePartitionRoutingMismatch (perQueue + nonzero partition). Validation: validatePartitioning(meta, records) runs the four gates before any record is staged. All four use raw meta.PartitionCount > 1 as the partitioned-queue predicate, never effectivePartitionCount (codex P2 v914 v7 demonstrated that effective-count gates would let a perQueue dump with Partition==nil emit classic keys against a partitioned-keyspace queue - silent data loss). Split into validatePartitioningOne + validatePartitioningPartitioned to keep cyclomatic complexity under the 10-cyclop limit. Sort: sortMessagesForPartitionedEmit sorts by (partition, send_ts, sequence_number, message_id). Classic sortMessagesForEmit at sqs.go:842 is unchanged (used by the decoder's writeMessagesJSONL). addMessage signature: (b, queueName, partition *uint32, rec) - new partition param selects sqsPartitionedMsgDataKeyBytes when non-nil, classic sqsMsgDataKeyBytes when nil. addSideRecords signature: (b, queueName, partition *uint32, meta, rec) - new partition param selects the partitioned (vis, byage, dedup) constructors when non-nil; dedup additionally takes rec.MessageGroupID for the partitioned +|+ shape. Extracted stageMessageRecords from encodeQueueMessages to keep that function under cyclop as well. Caller audit (semantic changes - confirmed single-site each): - addMessage: encodeQueueMessages only (now via stageMessageRecords). - addSideRecords: encodeQueueMessages only (now via stageMessageRecords). - validatePartitioning: encodeQueueMessages only. - sortMessagesForPartitionedEmit: encodeQueueMessages only. - ErrSQSEncodeUnsupportedPartitioned production site at the old encode_sqs.go:163 deleted. The one test (TestSQSEncodeRejectsPartitioned) is removed in this commit; the four new gate tests land in Slice E. go test + golangci-lint on ./internal/backup/ both pass. Classic-queue tests remain green: Partition==nil + PartitionCount<=1 takes the legacy paths unchanged in both addMessage and addSideRecords. --- internal/backup/encode_sqs.go | 192 +++++++++++++++++++++++++---- internal/backup/encode_sqs_side.go | 25 +++- internal/backup/encode_sqs_test.go | 12 -- 3 files changed, 190 insertions(+), 39 deletions(-) diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index dc40a5cdf..9a044cd01 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -6,6 +6,7 @@ import ( "encoding/json" "os" "path/filepath" + "sort" "strconv" "github.com/cockroachdb/errors" @@ -54,10 +55,30 @@ var ( // ErrSQSEncodeNotRegular is returned when a dump file is not a regular // file (symlink / FIFO / device / directory). ErrSQSEncodeNotRegular = errors.New("backup: sqs dump file is not a regular file") - // ErrSQSEncodeUnsupportedPartitioned is returned for HT-FIFO queues - // (partition_count > 1), whose partitioned key family this slice does - // not yet reproduce. - ErrSQSEncodeUnsupportedPartitioned = errors.New("backup: sqs partitioned (HT-FIFO) queue not yet supported by encoder") + // ErrSQSEncodeMissingPartition fires when a partitioned queue + // (meta.PartitionCount > 1) has a message whose Partition field is + // nil. Pre-M5-3 dumps lack the field entirely; replaying one against + // a partitioned queue would silently route every message to + // partition 0 via the `partition != nil` dispatch in addMessage, + // while the live readers scan the partitioned keyspace selected + // from raw PartitionCount > 1 — i.e. classic-shape keys against a + // partitioned reader = invisible on first read. Pinned by + // TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue. + ErrSQSEncodeMissingPartition = errors.New("backup: sqs partitioned queue message missing partition field") + // ErrSQSEncodeOutOfRangePartition fires when a partitioned-queue + // message carries *Partition >= meta.PartitionCount. The dump is + // malformed (operator hand-edit or a future M5-3 decoder bug). + ErrSQSEncodeOutOfRangePartition = errors.New("backup: sqs message partition out of range for queue partition_count") + // ErrSQSEncodePartitionRoutingMismatch fires when a partitioned + // FIFO queue with FifoThroughputLimit == "perQueue" has a message + // with *Partition != 0. The live partitionFor + // (adapter/sqs_partitioning.go:71-72) forces every group to + // partition 0 in perQueue mode regardless of PartitionCount, and + // ReceiveMessage only scans the partition-0 lane. Accepting any + // other partition value would restore messages onto |p|N|... lanes + // the live receive fan-out never visits — silent data loss on + // first read. Codex P2 v914 v4 caught this gap. + ErrSQSEncodePartitionRoutingMismatch = errors.New("backup: sqs perQueue HT-FIFO queue must keep all messages on partition 0") ) // sqsStoredQueueMeta mirrors the live adapter's sqsQueueMeta JSON shape @@ -159,10 +180,6 @@ func (e *SQSRecordEncoder) encodeQueue(b *snapshotBuilder, root *os.Root, queueD if meta.Name == "" { return errors.Wrapf(ErrSQSEncodeInvalidQueue, "%s/_queue.json: empty queue name", queueDir) } - if meta.PartitionCount > 1 { - return errors.Wrapf(ErrSQSEncodeUnsupportedPartitioned, - "%s: partition_count %d", queueDir, meta.PartitionCount) - } if err := e.addQueueMeta(b, meta); err != nil { return err } @@ -205,18 +222,28 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root if err != nil { return err } - var maxSeq uint64 - for i := range records { - seq, err := e.addMessage(b, meta.Name, records[i]) - if err != nil { - return err - } - if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil { - return err - } - if seq > maxSeq { - maxSeq = seq - } + // Fail-closed validation up-front so a malformed dump never stages + // partial records. Uses raw meta.PartitionCount > 1 as the + // partitioned-queue predicate (NOT effectivePartitionCount; codex + // P2 v914 v7 - using the effective count would allow a perQueue + // dump with Partition == nil to slip past the missing-partition + // gate, then addMessage's `partition != nil` dispatch would emit + // classic-shape keys against a partitioned-keyspace queue, making + // every restored message invisible). + if err := validatePartitioning(&meta, records); err != nil { + return err + } + // Per-partition deterministic emission for partitioned queues. The + // snapshotBuilder sorts by key bytes on WriteTo so this does not + // directly affect the .fsm byte output, but it makes the in-loop + // state (maxSeq, future per-partition counters) deterministic and + // matches the design doc's stated contract. + if meta.PartitionCount > 1 { + sortMessagesForPartitionedEmit(records) + } + maxSeq, err := e.stageMessageRecords(b, &meta, records) + if err != nil { + return err } if meta.FIFO && maxSeq > 0 { // The live FIFO send path (adapter/sqs_fifo.go: loadFifoSequence @@ -235,10 +262,126 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root return nil } +// stageMessageRecords iterates the (pre-validated, pre-sorted) records +// and stages each one's data + side records on b, returning the +// maximum SequenceNumber observed (used to write the FIFO seq counter). +// Split out of encodeQueueMessages so the parent stays under cyclop. +func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { + var maxSeq uint64 + for i := range records { + seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i]) + if err != nil { + return 0, err + } + if err := e.addSideRecords(b, meta.Name, records[i].Partition, meta, &records[i]); err != nil { + return 0, err + } + if seq > maxSeq { + maxSeq = seq + } + } + return maxSeq, nil +} + +// validatePartitioning runs the four fail-closed gates from the M5-3 +// design doc §"Validation invariants" before any message is staged. +// All four use raw meta.PartitionCount > 1 as the partitioned-queue +// predicate, never effectivePartitionCount (codex P2 v914 v7). +func validatePartitioning(meta *sqsQueueMetaPublic, records []sqsMessageRecord) error { + for i := range records { + if err := validatePartitioningOne(meta, &records[i]); err != nil { + return err + } + } + return nil +} + +// validatePartitioningOne checks one message against the four gates. +// Split out of validatePartitioning so the outer loop stays under the +// cyclop limit. +func validatePartitioningOne(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { + if meta.PartitionCount > 1 { + return validatePartitioningPartitioned(meta, rec) + } + // Classic queue (PartitionCount <= 1): the only invalid state is + // a non-zero Partition field — the dump is internally + // inconsistent (classic-queue keyspace has no partition concept). + if rec.Partition != nil && *rec.Partition != 0 { + return errors.Wrapf(ErrSQSEncodeInvalidMessage, + "queue %q (partition_count=%d): classic queue message %q has partition=%d", + meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) + } + return nil +} + +// validatePartitioningPartitioned runs the three partitioned-queue +// gates (missing partition, out-of-range, perQueue routing mismatch). +// Caller has already verified meta.PartitionCount > 1. +func validatePartitioningPartitioned(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { + // Gate 1: missing partition. Pre-M5-3 dump replayed against a + // partitioned queue, or M5-3 decoder bug. The operator must + // re-decode with an M5-3 decoder. + if rec.Partition == nil { + return errors.Wrapf(ErrSQSEncodeMissingPartition, + "queue %q (partition_count=%d): message %q missing partition field", + meta.Name, meta.PartitionCount, rec.MessageID) + } + // Gate 2: out-of-range partition. Dump is malformed. + if *rec.Partition >= meta.PartitionCount { + return errors.Wrapf(ErrSQSEncodeOutOfRangePartition, + "queue %q (partition_count=%d): message %q partition=%d out of range", + meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) + } + // Gate 3: perQueue HT-FIFO with nonzero partition. The live + // partitionFor collapses every group to partition 0 in perQueue + // mode; accepting any other partition would write to a lane the + // live receive never scans. + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue && *rec.Partition != 0 { + return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch, + "queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0", + meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition) + } + return nil +} + +// sortMessagesForPartitionedEmit sorts messages by +// (partition, send_timestamp_millis, sequence_number, message_id). +// Partition is the leading key so per-partition state (maxSeq, future +// counters) is computed deterministically; the remaining three fields +// match sortMessagesForEmit (sqs.go:842) for the classic path. +// validatePartitioning ensures Partition is non-nil whenever the +// caller invokes this fn (PartitionCount > 1), so the *rec.Partition +// dereference is safe. +func sortMessagesForPartitionedEmit(msgs []sqsMessageRecord) { + partitionOf := func(r *sqsMessageRecord) uint32 { + if r.Partition == nil { + return 0 + } + return *r.Partition + } + sort.SliceStable(msgs, func(i, j int) bool { + a, b := &msgs[i], &msgs[j] + pa, pb := partitionOf(a), partitionOf(b) + switch { + case pa != pb: + return pa < pb + case a.SendTimestampMillis != b.SendTimestampMillis: + return a.SendTimestampMillis < b.SendTimestampMillis + case a.SequenceNumber != b.SequenceNumber: + return a.SequenceNumber < b.SequenceNumber + default: + return a.MessageID < b.MessageID + } + }) +} + // addMessage stages one !sqs|msg|data| record and returns the message's // sequence number (for the seq-counter computation). Visibility state is // zeroed (every message restores visible), matching the decoder default. -func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec sqsMessageRecord) (uint64, error) { +// partition is non-nil only when the queue is partitioned +// (meta.PartitionCount > 1); validatePartitioning enforces this +// invariant before encodeQueueMessages invokes addMessage. +func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) { if rec.MessageID == "" { return 0, errors.Wrap(ErrSQSEncodeInvalidMessage, "message missing message_id") } @@ -265,7 +408,12 @@ func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec if err != nil { return 0, err } - key := sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) + var key []byte + if partition != nil { + key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID) + } else { + key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) + } if err := b.Add(key, val, 0); err != nil { return 0, err } diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 9f1124a00..157fd6062 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -180,15 +180,20 @@ func resolveDedupID(rec *sqsMessageRecord, meta *sqsQueueMetaPublic) string { // - dedup: FIFO + resolveDedupID(rec, meta) non-empty. ExpiresAtMillis = // SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256 // derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID). -func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { +func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { msgIDBytes := []byte(rec.MessageID) - visKey := sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + var visKey, byAgeKey []byte + if partition != nil { + visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + } else { + visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + } if err := b.Add(visKey, msgIDBytes, 0); err != nil { return err } - - byAgeKey := sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) if err := b.Add(byAgeKey, msgIDBytes, 0); err != nil { return err } @@ -210,6 +215,16 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, if err != nil { return err } - dedupKey := sqsMsgDedupKeyBytes(queueName, dedupID) + var dedupKey []byte + if partition != nil { + // Partitioned dedup includes + '|' + + // per the partitioned dedup constructor — see + // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit + // major PR #732 round 6: the second '|' prevents FNV-collapse + // across distinct (groupID, dedupID) pairs on the same partition). + dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageGroupID, dedupID) + } else { + dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) + } return b.Add(dedupKey, val, 0) } diff --git a/internal/backup/encode_sqs_test.go b/internal/backup/encode_sqs_test.go index e05ba9950..0f1b4842e 100644 --- a/internal/backup/encode_sqs_test.go +++ b/internal/backup/encode_sqs_test.go @@ -237,18 +237,6 @@ func TestSQSEncodeBinaryBodyRoundTrip(t *testing.T) { } } -// TestSQSEncodeRejectsPartitioned pins fail-closed for an HT-FIFO queue. -func TestSQSEncodeRejectsPartitioned(t *testing.T) { - t.Parallel() - in := t.TempDir() - writeSQSQueue(t, in, "ht.fifo", []byte(`{"format_version":1,"name":"ht.fifo","fifo":true,`+ - `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0,"partition_count":4}`), nil) - b := newSnapshotBuilder(sqsEncTS) - if err := NewSQSRecordEncoder(in).Encode(b); !errors.Is(err, ErrSQSEncodeUnsupportedPartitioned) { - t.Fatalf("Encode err = %v, want ErrSQSEncodeUnsupportedPartitioned", err) - } -} - // TestSQSEncodeRejectsNonRegularQueueMeta pins the file guard: a // _queue.json that is a directory is refused with ErrSQSEncodeNotRegular. func TestSQSEncodeRejectsNonRegularQueueMeta(t *testing.T) { From 7603bf7877261faef213a73fb4caa104fac1e28b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 02:44:52 +0900 Subject: [PATCH 4/6] backup: M5-3 slice E - partitioned encoder test coverage matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per design doc PR #914 §'Test plan'. Adds the design's eight test rows plus two regression tests pinning the codex P2 v914 v7 fix and the 4-field sort stability. Tests added (encode_sqs_partitioned_test.go): - TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue - gate 1 (codex P1 v914 v2). - TestSQSEncodeRejectsOutOfRangePartition - gate 2. - TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO - gate 3 (codex P2 v914 v4). - TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue - gate 4. - TestSQSEncodeGateUsesRawPartitionCount - codex P2 v914 v7 regression. Pins that gates use raw meta.PartitionCount > 1 rather than effectivePartitionCount, which would otherwise let a perQueue + PartitionCount=2 + Partition==nil dump slip past validation and silently emit classic keys against a partitioned-keyspace queue. - TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip - pre-M5-3 classic dumps still round-trip with Partition==nil throughout. - TestSQSEncodePartitionedQueueRoundTrip - 3-message partitioned queue emits data/vis/byage/dedup with |p| prefix; no classic-shape data/vis/byage/dedup keys leak. - TestSQSEncodePartitionedDedupBuildsGroupSegment - partitioned dedup key shape matches sqsPartitionedMsgDedupKeyBytes byte-for-byte. - TestSQSEncodePartitionedSideRecordsByteCrossCheck - vis + byage + dedup all byte-equal the constructors (M5-2-style in-package cross-check; adapter constructors are unexported). - TestSQSEncodePartitionedSortStableAcrossPartitions - 4-field sort groups by partition first, then send_ts, then sequence_number. Constructor adjustment (encode_sqs_side.go): - sqsPartitionedMsgDedupKeyBytes: drop the gen parameter (always sqsRestoreGeneration on the reverse-encode path). Mirrors the classic sqsMsgDedupKeyBytes hardcode pattern and satisfies the unparam lint. Comment updated to explain the divergence from the adapter-side variable-gen signature. go test + golangci-lint clean on ./internal/backup/. --- .../backup/encode_sqs_partitioned_test.go | 403 +++++++++++++++++- internal/backup/encode_sqs_side.go | 20 +- 2 files changed, 394 insertions(+), 29 deletions(-) diff --git a/internal/backup/encode_sqs_partitioned_test.go b/internal/backup/encode_sqs_partitioned_test.go index 2b718e1ab..73736bac8 100644 --- a/internal/backup/encode_sqs_partitioned_test.go +++ b/internal/backup/encode_sqs_partitioned_test.go @@ -4,23 +4,27 @@ import ( "bytes" "encoding/binary" "testing" + + "github.com/cockroachdb/errors" ) // TestSQSEncodePartitionedMsgDataKeyByteShape pins // sqsPartitionedMsgDataKeyBytes against the wire format mandated by // adapter/sqs_keys.go:sqsPartitionedMsgDataKey: -// prefix + base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + -// base64url(messageID). +// +// prefix + base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + +// base64url(messageID). +// // internal/backup cannot import adapter to cross-check directly (M3b-3 // circular-dep ban; same constraint sqsFifoDedupWindowMillis lives // under), so this is a manually-assembled byte-equality assertion. func TestSQSEncodePartitionedMsgDataKeyByteShape(t *testing.T) { t.Parallel() const ( - queue = "q1" - msgID = "msg-001" - partN = uint32(3) - genCnt = uint64(7) + queue = "q1" + msgID = "msg-001" + partN = uint32(3) + genCnt = uint64(7) ) want := []byte(SQSPartitionedMsgDataPrefix) want = append(want, encodeSQSSegment(queue)...) @@ -42,11 +46,11 @@ func TestSQSEncodePartitionedMsgDataKeyByteShape(t *testing.T) { func TestSQSEncodePartitionedMsgVisKeyByteShape(t *testing.T) { t.Parallel() const ( - queue = "qvis" - msgID = "mid" - partN = uint32(0) - genCnt = uint64(1) - visMs = int64(1_700_000_000_000) + queue = "qvis" + msgID = "mid" + partN = uint32(0) + genCnt = uint64(1) + visMs = int64(1_700_000_000_000) ) want := []byte(SQSPartitionedMsgVisPrefix) want = append(want, encodeSQSSegment(queue)...) @@ -69,11 +73,11 @@ func TestSQSEncodePartitionedMsgVisKeyByteShape(t *testing.T) { func TestSQSEncodePartitionedMsgByAgeKeyByteShape(t *testing.T) { t.Parallel() const ( - queue = "qage" - msgID = "mid-age" - partN = uint32(15) - genCnt = uint64(1) - sendMs = int64(1_600_000_000_000) + queue = "qage" + msgID = "mid-age" + partN = uint32(15) + genCnt = uint64(1) + sendMs = int64(1_600_000_000_000) ) want := []byte(SQSPartitionedMsgByAgePrefix) want = append(want, encodeSQSSegment(queue)...) @@ -93,8 +97,11 @@ func TestSQSEncodePartitionedMsgByAgeKeyByteShape(t *testing.T) { // sqsPartitionedMsgDedupKeyBytes including the second '|' between // the variable-length group and dedup segments (CodeRabbit major // PR #732 round 6 / M5-3 design doc §line 19): prefix + -// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + -// base64url(groupID) + '|' + base64url(dedupID). +// base64url(queue) + '|' + BE-u32(partition) + +// BE-u64(sqsRestoreGeneration) + base64url(groupID) + '|' + +// base64url(dedupID). gen is hardcoded to sqsRestoreGeneration in +// the constructor (mirroring the classic sqsMsgDedupKeyBytes pattern; +// satisfies unparam). func TestSQSEncodePartitionedMsgDedupKeyByteShape(t *testing.T) { t.Parallel() const ( @@ -102,18 +109,17 @@ func TestSQSEncodePartitionedMsgDedupKeyByteShape(t *testing.T) { groupID = "group-A" dedupID = "dedup-001" partN = uint32(2) - genCnt = uint64(1) ) want := []byte(SQSPartitionedMsgDedupPrefix) want = append(want, encodeSQSSegment(queue)...) want = append(want, sqsPartitionedQueueTerminator) want = binary.BigEndian.AppendUint32(want, partN) - want = binary.BigEndian.AppendUint64(want, genCnt) + want = binary.BigEndian.AppendUint64(want, sqsRestoreGeneration) want = append(want, encodeSQSSegment(groupID)...) want = append(want, sqsPartitionedQueueTerminator) want = append(want, encodeSQSSegment(dedupID)...) - got := sqsPartitionedMsgDedupKeyBytes(queue, partN, genCnt, groupID, dedupID) + got := sqsPartitionedMsgDedupKeyBytes(queue, partN, groupID, dedupID) if !bytes.Equal(got, want) { t.Fatalf("partitioned dedup key mismatch\ngot: %x\nwant: %x", got, want) } @@ -130,6 +136,361 @@ func TestSQSEncodePartitionedMsgDedupKeyByteShape(t *testing.T) { } } +// TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue pins gate 1 +// of validatePartitioning: a partitioned queue (PartitionCount > 1) +// with a message whose Partition field is nil (legacy pre-M5-3 dump +// shape) fails closed with ErrSQSEncodeMissingPartition. Without +// this gate the message would route to the classic key keyspace +// (addMessage's `partition != nil` dispatch falls to +// sqsMsgDataKeyBytes) and become invisible to the live readers +// scanning the partitioned keyspace (codex P1 v914 v2). +func TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2}`), + [][]byte{ + // No "partition" field — legacy dump shape. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1"}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeMissingPartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeMissingPartition", err) + } +} + +// TestSQSEncodeRejectsOutOfRangePartition pins gate 2: a partitioned +// queue with a message whose *Partition >= PartitionCount fails +// closed with ErrSQSEncodeOutOfRangePartition. Such a dump is +// internally inconsistent (operator hand-edit or M5-3 decoder bug). +func TestSQSEncodeRejectsOutOfRangePartition(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1","partition":5}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeOutOfRangePartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeOutOfRangePartition", err) + } +} + +// TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO pins gate 3 +// (codex P2 v914 v4). For a partitioned queue with +// FifoThroughputLimit == "perQueue", the live partitionFor +// (adapter/sqs_partitioning.go:71-72) forces every group to +// partition 0; ReceiveMessage only scans the partition-0 lane. +// Accepting *Partition != 0 would restore messages onto |p|1|... or +// |p|N|... lanes the live receive never visits. +func TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perQueue"}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1","partition":1}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodePartitionRoutingMismatch) { + t.Fatalf("Encode err = %v, want ErrSQSEncodePartitionRoutingMismatch", err) + } +} + +// TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue pins gate 4: +// classic queue (PartitionCount <= 1) with a message whose +// Partition is non-nil and non-zero fails closed with the existing +// ErrSQSEncodeInvalidMessage sentinel. Classic keyspace has no +// partition concept; the dump is internally inconsistent. +func TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,"partition":2}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeInvalidMessage) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeInvalidMessage", err) + } +} + +// TestSQSEncodeGateUsesRawPartitionCount is the regression for codex +// P2 v914 v7. The four validation gates MUST use raw +// meta.PartitionCount > 1 as the partitioned-queue predicate, never +// effectivePartitionCount. A perQueue queue with PartitionCount=2 +// collapses effectivePartitionCount to 1; if the missing-partition +// gate used the effective count, a message with Partition==nil +// would slip past validation, then addMessage's `partition != nil` +// dispatch would emit the classic-shape data key against a +// partitioned-keyspace queue — invisible on first read. +// +// This test would silently pass if the gate predicate were changed +// to effectivePartitionCount(&meta) > 1. +func TestSQSEncodeGateUsesRawPartitionCount(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perQueue"}`), + [][]byte{ + // partition_count=2 + perQueue: effectivePartitionCount==1. + // No "partition" field. Raw-PartitionCount gate must fire + // before the perQueue routing gate has a chance to inspect + // a nil Partition. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g1"}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodeMissingPartition) { + t.Fatalf("Encode err = %v, want ErrSQSEncodeMissingPartition (codex P2 v914 v7 — gate must use raw PartitionCount)", err) + } +} + +// TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip pins that a +// pre-M5-3 messages.jsonl (no "partition" field on any line) still +// round-trips through the M5-3 encoder unchanged when the queue is +// classic (PartitionCount <= 1). The classic constructor branch is +// selected by addMessage when Partition == nil. +func TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "legacy", + []byte(`{"format_version":1,"name":"legacy",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000}`), + []byte(`{"message_id":"m2","body":"world",` + + `"send_timestamp_millis":2000,"available_at_millis":2000}`), + }) + _, msgs := decodeSQSAndRead(t, encodeSQSTree(t, in), "legacy") + if len(msgs) != 2 { + t.Fatalf("round-tripped %d messages, want 2", len(msgs)) + } + for i := range msgs { + if msgs[i].Partition != nil { + t.Fatalf("msg[%d].Partition = %v, want nil for classic round-trip", i, *msgs[i].Partition) + } + } +} + +// TestSQSEncodePartitionedQueueRoundTrip verifies the partitioned +// data + side records are emitted with the |p| prefix and the +// partition is preserved end-to-end through encode → decode. +func TestSQSEncodePartitionedQueueRoundTrip(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"m0a","body":"p0-a","send_timestamp_millis":1000,` + + `"available_at_millis":1000,"message_group_id":"g0","message_deduplication_id":"d0a",` + + `"sequence_number":1,"partition":0}`), + []byte(`{"message_id":"m1a","body":"p1-a","send_timestamp_millis":1100,` + + `"available_at_millis":1100,"message_group_id":"g1","message_deduplication_id":"d1a",` + + `"sequence_number":2,"partition":1}`), + []byte(`{"message_id":"m1b","body":"p1-b","send_timestamp_millis":1200,` + + `"available_at_millis":1200,"message_group_id":"g1","message_deduplication_id":"d1b",` + + `"sequence_number":3,"partition":1}`), + }) + + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // All data/vis/byage/dedup keys for this queue must be the + // partitioned shape (|p|). + wantPrefixes := []string{ + SQSPartitionedMsgDataPrefix, + SQSPartitionedMsgVisPrefix, + SQSPartitionedMsgByAgePrefix, + SQSPartitionedMsgDedupPrefix, + } + for _, prefix := range wantPrefixes { + found := 0 + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(prefix)) { + found++ + } + } + if found == 0 { + t.Errorf("no entries with prefix %q (3-message partitioned queue should emit ≥1)", prefix) + } + } + // No classic-shape data/vis/byage/dedup keys for this queue. + for _, classic := range []string{SQSMsgDataPrefix, SQSMsgVisPrefix, SQSMsgByAgePrefix, SQSMsgDedupPrefix} { + for _, e := range entries { + if !bytes.HasPrefix(e.UserKey, []byte(classic)) { + continue + } + // Skip if it's actually a partitioned key (the partitioned + // prefix starts with the classic prefix + "p|"). + classicAndDiscriminator := classic + sqsPartitionedDiscriminator + if bytes.HasPrefix(e.UserKey, []byte(classicAndDiscriminator)) { + continue + } + t.Errorf("classic-shape key on a partitioned queue: %q", e.UserKey) + } + } +} + +// TestSQSEncodePartitionedDedupBuildsGroupSegment pins the +// partitioned dedup row's +|+ shape: the +// MessageGroupID is base64-encoded as part of the key and a +// terminator '|' separates it from the dedupID segment (CodeRabbit +// major PR #732 round 6). +func TestSQSEncodePartitionedDedupBuildsGroupSegment(t *testing.T) { + t.Parallel() + in := t.TempDir() + const ( + queue = "p.fifo" + groupID = "groupA" + dedupID = "dedup-1" + ) + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"` + groupID + `",` + + `"message_deduplication_id":"` + dedupID + `",` + + `"sequence_number":1,"partition":1}`), + }) + + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // Find the dedup entry and compare to the manually-built key. + want := sqsPartitionedMsgDedupKeyBytes(queue, 1, groupID, dedupID) + var got []byte + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDedupPrefix)) { + got = e.UserKey + break + } + } + if got == nil { + t.Fatalf("no partitioned dedup entry emitted") + } + if !bytes.Equal(got, want) { + t.Fatalf("partitioned dedup key mismatch\ngot: %x\nwant: %x", got, want) + } +} + +// TestSQSEncodePartitionedSideRecordsByteCrossCheck verifies the +// three partitioned side-record bytes emitted by addSideRecords +// equal what sqsPartitionedMsg{Vis,ByAge,Dedup}KeyBytes produce +// directly. This is the in-package cross-check the M5-2 precedent +// at TestSQSEncodeSideRecordsCrossCheckClassic uses for the classic +// path (the adapter's unexported sqsPartitionedMsg* constructors +// cannot be invoked from this package). +func TestSQSEncodePartitionedSideRecordsByteCrossCheck(t *testing.T) { + t.Parallel() + in := t.TempDir() + const ( + queue = "p.fifo" + groupID = "g0" + dedupID = "d0" + msgID = "mid-001" + sendMs = int64(2000) + availMs = int64(2000) + partN = uint32(1) + ) + writeSQSQueue(t, in, queue, + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + []byte(`{"message_id":"mid-001","body":"x","send_timestamp_millis":2000,` + + `"available_at_millis":2000,"message_group_id":"g0",` + + `"message_deduplication_id":"d0","sequence_number":1,"partition":1}`), + }) + entries, _, err := DecodeLiveEntries(bytes.NewReader(encodeSQSTree(t, in))) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + wantVis := sqsPartitionedMsgVisKeyBytes(queue, partN, sqsRestoreGeneration, availMs, msgID) + wantByAge := sqsPartitionedMsgByAgeKeyBytes(queue, partN, sqsRestoreGeneration, sendMs, msgID) + wantDedup := sqsPartitionedMsgDedupKeyBytes(queue, partN, groupID, dedupID) + var gotVis, gotByAge, gotDedup []byte + for _, e := range entries { + switch { + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgVisPrefix)): + gotVis = e.UserKey + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgByAgePrefix)): + gotByAge = e.UserKey + case bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDedupPrefix)): + gotDedup = e.UserKey + } + } + if !bytes.Equal(gotVis, wantVis) { + t.Errorf("partitioned vis mismatch\ngot: %x\nwant: %x", gotVis, wantVis) + } + if !bytes.Equal(gotByAge, wantByAge) { + t.Errorf("partitioned byage mismatch\ngot: %x\nwant: %x", gotByAge, wantByAge) + } + if !bytes.Equal(gotDedup, wantDedup) { + t.Errorf("partitioned dedup mismatch\ngot: %x\nwant: %x", gotDedup, wantDedup) + } +} + +// TestSQSEncodePartitionedSortStableAcrossPartitions pins the +// 4-field sort: messages within the same partition with identical +// send_timestamp_millis are ordered by sequence_number, and +// messages from different partitions group together by partition +// first. +func TestSQSEncodePartitionedSortStableAcrossPartitions(t *testing.T) { + t.Parallel() + // Two messages on partition 1 with identical send_ts but + // different sequence_number, plus one on partition 0 with + // later send_ts. + records := []sqsMessageRecord{ + {MessageID: "m1-b", SendTimestampMillis: 1000, SequenceNumber: 2, Partition: u32ptr(1)}, + {MessageID: "m0", SendTimestampMillis: 2000, SequenceNumber: 1, Partition: u32ptr(0)}, + {MessageID: "m1-a", SendTimestampMillis: 1000, SequenceNumber: 1, Partition: u32ptr(1)}, + } + sortMessagesForPartitionedEmit(records) + wantOrder := []string{"m0", "m1-a", "m1-b"} + for i, r := range records { + if r.MessageID != wantOrder[i] { + t.Errorf("records[%d].MessageID = %q, want %q", i, r.MessageID, wantOrder[i]) + } + } +} + +func u32ptr(v uint32) *uint32 { return &v } + // TestSQSEffectivePartitionCount pins the local mirror of // adapter/sqs_keys_dispatch.go:effectivePartitionCount: nil meta, // PartitionCount<=1, and perQueue all collapse to 1; otherwise the diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 157fd6062..1a3810861 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -103,18 +103,22 @@ func sqsPartitionedMsgByAgeKeyBytes(queueName string, partition uint32, gen uint // sqsPartitionedMsgDedupKeyBytes reproduces // adapter/sqs_keys.go:sqsPartitionedMsgDedupKey: prefix + -// base64url(queue) + '|' + BE-u32(partition) + BE-u64(gen) + -// base64url(groupID) + '|' + base64url(dedupID). Note the second -// '|' between the variable-length group and dedup segments — without -// it distinct (groupID, dedupID) pairs can FNV-collapse onto the -// same key (CodeRabbit major PR #732 round 6; design doc §line 19). -func sqsPartitionedMsgDedupKeyBytes(queueName string, partition uint32, gen uint64, groupID, dedupID string) []byte { +// base64url(queue) + '|' + BE-u32(partition) + BE-u64(sqsRestoreGeneration) + +// base64url(groupID) + '|' + base64url(dedupID). The live adapter signature +// accepts a variable gen, but every M5-3 call site uses +// sqsRestoreGeneration (matching the classic sqsMsgDedupKeyBytes +// hardcode), so the parameter is hardcoded here to satisfy unparam. +// Note the second '|' between the variable-length group and dedup +// segments — without it distinct (groupID, dedupID) pairs can +// FNV-collapse onto the same key (CodeRabbit major PR #732 round 6; +// design doc §line 19). +func sqsPartitionedMsgDedupKeyBytes(queueName string, partition uint32, groupID, dedupID string) []byte { out := make([]byte, 0, len(SQSPartitionedMsgDedupPrefix)+sqsSideKeyAllocBytes) out = append(out, SQSPartitionedMsgDedupPrefix...) out = append(out, encodeSQSSegment(queueName)...) out = append(out, sqsPartitionedQueueTerminator) out = binary.BigEndian.AppendUint32(out, partition) - out = binary.BigEndian.AppendUint64(out, gen) + out = binary.BigEndian.AppendUint64(out, sqsRestoreGeneration) out = append(out, encodeSQSSegment(groupID)...) out = append(out, sqsPartitionedQueueTerminator) return append(out, encodeSQSSegment(dedupID)...) @@ -222,7 +226,7 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit // major PR #732 round 6: the second '|' prevents FNV-collapse // across distinct (groupID, dedupID) pairs on the same partition). - dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageGroupID, dedupID) + dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID) } else { dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) } From 1a5540ea171738d4e9ddde4ab60994e31f56509d Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 22:27:47 +0900 Subject: [PATCH 5/6] backup: M5-3 fix critical - decouple key-shape from Partition pointer (codex P1 #929 + gemini critical) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: a classic queue dump (PartitionCount<=1) with an explicit rec.Partition=&0 silently emitted partitioned-shape keys (!sqs|msg|data|p|0|...). validatePartitioningOne gate 4 lets *Partition==0 through, but addMessage/addSideRecords routed on 'partition != nil' and the live reader for a classic queue only scans the classic keyspace - the messages become invisible on first read (silent data loss). Root cause: the key-shape decision was tied to a per-MESSAGE property (rec.Partition != nil), but the keyspace selection is a per-QUEUE property (meta.PartitionCount > 1). They CAN disagree. Fix: introduce (isPartitioned bool, partition uint32) signatures for addMessage and addSideRecords, derived once from raw meta.PartitionCount > 1 in stageMessageRecords. Dispatch branches now key on isPartitioned, not pointer nullability. Caller audit (signature change, semantic-changing): - addMessage callers: stageMessageRecords only (single site, updated). - addSideRecords callers: stageMessageRecords only (single site, updated). - rec.Partition retains its *uint32 type and gate 4 invariant; the encoder simply ignores Partition when isPartitioned==false rather than reading the value off the message. Regression test: TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys sends a classic queue with explicit 'partition: 0' through the encoder and verifies no |p| keys leak. Would fail under the buggy dispatch. Addresses codex P1 #929 and 6 gemini critical findings (line 284 + 384 + 416 in encode_sqs.go; line 187 + 197 + 232 in encode_sqs_side.go) — all the same root cause. --- internal/backup/encode_sqs.go | 34 ++++++++--- .../backup/encode_sqs_partitioned_test.go | 56 +++++++++++++++++++ internal/backup/encode_sqs_side.go | 12 ++-- 3 files changed, 88 insertions(+), 14 deletions(-) diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index 9a044cd01..b37736bcd 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -266,14 +266,29 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root // and stages each one's data + side records on b, returning the // maximum SequenceNumber observed (used to write the FIFO seq counter). // Split out of encodeQueueMessages so the parent stays under cyclop. +// +// isPartitioned is derived once from raw meta.PartitionCount > 1 — NOT +// from rec.Partition != nil. Codex P1 / gemini critical (PR #929): +// a classic queue dump (PartitionCount<=1) may carry an explicit +// rec.Partition = &0 that validatePartitioningOne lets through (gate 4 +// only rejects *Partition != 0). Routing on `partition != nil` would +// emit partitioned keys for that classic queue, and the live reader +// only scans the classic keyspace - the messages become invisible on +// first read. Keying the dispatch on isPartitioned (a property of the +// QUEUE, not the message) closes that gap. func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { var maxSeq uint64 + isPartitioned := meta.PartitionCount > 1 for i := range records { - seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i]) + var partition uint32 + if isPartitioned && records[i].Partition != nil { + partition = *records[i].Partition + } + seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i]) if err != nil { return 0, err } - if err := e.addSideRecords(b, meta.Name, records[i].Partition, meta, &records[i]); err != nil { + if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil { return 0, err } if seq > maxSeq { @@ -378,10 +393,13 @@ func sortMessagesForPartitionedEmit(msgs []sqsMessageRecord) { // addMessage stages one !sqs|msg|data| record and returns the message's // sequence number (for the seq-counter computation). Visibility state is // zeroed (every message restores visible), matching the decoder default. -// partition is non-nil only when the queue is partitioned -// (meta.PartitionCount > 1); validatePartitioning enforces this -// invariant before encodeQueueMessages invokes addMessage. -func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) { +// +// isPartitioned is derived from raw meta.PartitionCount > 1 by the +// caller, NOT from rec.Partition != nil. The partition uint32 carries +// the value (0 if the queue is classic). See stageMessageRecords doc +// for why the key-shape decision MUST be made on the queue-level +// predicate (codex P1 / gemini critical PR #929). +func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) { if rec.MessageID == "" { return 0, errors.Wrap(ErrSQSEncodeInvalidMessage, "message missing message_id") } @@ -409,8 +427,8 @@ func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, part return 0, err } var key []byte - if partition != nil { - key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID) + if isPartitioned { + key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID) } else { key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) } diff --git a/internal/backup/encode_sqs_partitioned_test.go b/internal/backup/encode_sqs_partitioned_test.go index 73736bac8..e2931f62e 100644 --- a/internal/backup/encode_sqs_partitioned_test.go +++ b/internal/backup/encode_sqs_partitioned_test.go @@ -270,6 +270,62 @@ func TestSQSEncodeGateUsesRawPartitionCount(t *testing.T) { } } +// TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys is +// the regression for codex P1 / gemini critical (PR #929). A classic +// queue dump with an explicit `"partition": 0` is allowed past gate 4 +// (which only rejects *Partition != 0) — but the dispatch MUST still +// pick classic-shape keys because the live reader for a classic queue +// only scans the classic keyspace. +// +// Before the (isPartitioned bool, partition uint32) signature, the +// `partition != nil` branch incorrectly chose partitioned keys here, +// making restored messages invisible. This test would fail (silently +// emit |p|0|... keys) under that buggy dispatch. +func TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "classic", + []byte(`{"format_version":1,"name":"classic",`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,"delay_seconds":0}`), + [][]byte{ + // PartitionCount<=1 (classic), but message has explicit + // "partition": 0. Gate 4 must NOT fire (*Partition == 0), + // and addMessage must use classic keys. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,"partition":0}`), + }) + fsm := encodeSQSTree(t, in) + entries, _, err := DecodeLiveEntries(bytes.NewReader(fsm)) + if err != nil { + t.Fatalf("DecodeLiveEntries: %v", err) + } + // No partitioned-shape data/vis/byage/dedup keys allowed. + for _, e := range entries { + for _, p := range []string{ + SQSPartitionedMsgDataPrefix, + SQSPartitionedMsgVisPrefix, + SQSPartitionedMsgByAgePrefix, + SQSPartitionedMsgDedupPrefix, + } { + if bytes.HasPrefix(e.UserKey, []byte(p)) { + t.Errorf("partitioned-shape key on classic queue with explicit partition=0: %q (prefix %q)", e.UserKey, p) + } + } + } + // At least one classic data key must exist. + found := false + for _, e := range entries { + if bytes.HasPrefix(e.UserKey, []byte(SQSMsgDataPrefix)) && + !bytes.HasPrefix(e.UserKey, []byte(SQSPartitionedMsgDataPrefix)) { + found = true + break + } + } + if !found { + t.Fatalf("no classic data record found") + } +} + // TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip pins that a // pre-M5-3 messages.jsonl (no "partition" field on any line) still // round-trips through the M5-3 encoder unchanged when the queue is diff --git a/internal/backup/encode_sqs_side.go b/internal/backup/encode_sqs_side.go index 1a3810861..a48ed0847 100644 --- a/internal/backup/encode_sqs_side.go +++ b/internal/backup/encode_sqs_side.go @@ -184,13 +184,13 @@ func resolveDedupID(rec *sqsMessageRecord, meta *sqsQueueMetaPublic) string { // - dedup: FIFO + resolveDedupID(rec, meta) non-empty. ExpiresAtMillis = // SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256 // derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID). -func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { +func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { msgIDBytes := []byte(rec.MessageID) var visKey, byAgeKey []byte - if partition != nil { - visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) - byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) + if isPartitioned { + visKey = sqsPartitionedMsgVisKeyBytes(queueName, partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) + byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) } else { visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) @@ -220,13 +220,13 @@ func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, return err } var dedupKey []byte - if partition != nil { + if isPartitioned { // Partitioned dedup includes + '|' + // per the partitioned dedup constructor — see // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit // major PR #732 round 6: the second '|' prevents FNV-collapse // across distinct (groupID, dedupID) pairs on the same partition). - dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID) + dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID) } else { dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) } From 0460e285eb68a2560faed39a58011224bb562fd7 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Thu, 4 Jun 2026 22:39:38 +0900 Subject: [PATCH 6/6] backup: M5-3 add gate 5 - perMessageGroupId partition-hash consistency (codex P2 #929) For perMessageGroupId HT-FIFO queues the live partitionFor maps each MessageGroupId to one partition via FNV-1a-mod-PartitionCount (adapter/sqs_partitioning.go:64). Receivers and group-lock readers use partitionFor's mapping to locate messages and lock state. An in-range but inconsistent partition in a dump would restore under the wrong lane, splitting a FIFO group across two partition-scoped group-lock keyspaces and allowing concurrent out-of-order delivery on first read. Gate 5 mirrors partitionFor inside the encoder and rejects any perMessageGroupId message whose dump-recorded partition disagrees with the recomputed hash. New: partitionForGroup(meta, messageGroupID) - FNV-1a-32 mirror of adapter/sqs_partitioning.go's inlined hash, masked on PartitionCount-1 (power-of-two validator-enforced live). Returns 0 for classic queues, perQueue, or empty MessageGroupID. New sentinel: ErrSQSEncodePartitionHashMismatch. Caller audit (validation gate addition - widens fail-closed surface): - validatePartitioningPartitioned: the single caller chain is validatePartitioning -> validatePartitioningOne -> ...Partitioned. Only invoked from stageMessageRecords. No production callers changed. - partitionForGroup: new helper, callers are validation + tests only. Test fixture realignment: TestSQSEncodePartitionedSideRecordsByteCrossCheck used group_id='g0' (hashes to partition 0) with partition=1, which would now correctly fail gate 5. Changed to group_id='g1' (hashes to 1) to keep the partition=1 byte-shape coverage and remain consistent with the live hash. New regression tests: - TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId - g0+partition=1 fails closed with ErrSQSEncodePartitionHashMismatch. - TestSQSEncodePartitionForGroup_LiveAdapterParity - 5-case table verifying the mirror matches the live FNV-1a-mod-N spec for classic / perQueue / empty group / hash-mod-2 cases. --- internal/backup/encode_sqs.go | 63 +++++++++++++++-- .../backup/encode_sqs_partitioned_test.go | 67 ++++++++++++++++++- 2 files changed, 124 insertions(+), 6 deletions(-) diff --git a/internal/backup/encode_sqs.go b/internal/backup/encode_sqs.go index b37736bcd..a3968475b 100644 --- a/internal/backup/encode_sqs.go +++ b/internal/backup/encode_sqs.go @@ -79,6 +79,15 @@ var ( // the live receive fan-out never visits — silent data loss on // first read. Codex P2 v914 v4 caught this gap. ErrSQSEncodePartitionRoutingMismatch = errors.New("backup: sqs perQueue HT-FIFO queue must keep all messages on partition 0") + // ErrSQSEncodePartitionHashMismatch fires for a perMessageGroupId + // HT-FIFO message whose Partition disagrees with + // partitionFor(MessageGroupID). The live send/receive/group-lock + // path uses partitionFor's FNV-1a-mod-PartitionCount mapping + // (adapter/sqs_partitioning.go:64); restoring under the wrong + // lane would split a FIFO group across two partition-scoped + // group-lock keyspaces and break FIFO order on first read. Codex + // P2 #929. + ErrSQSEncodePartitionHashMismatch = errors.New("backup: sqs perMessageGroupId HT-FIFO message partition disagrees with partitionFor(message_group_id)") ) // sqsStoredQueueMeta mirrors the live adapter's sqsQueueMeta JSON shape @@ -351,14 +360,60 @@ func validatePartitioningPartitioned(meta *sqsQueueMetaPublic, rec *sqsMessageRe // partitionFor collapses every group to partition 0 in perQueue // mode; accepting any other partition would write to a lane the // live receive never scans. - if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue && *rec.Partition != 0 { - return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch, - "queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0", - meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition) + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + if *rec.Partition != 0 { + return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch, + "queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0", + meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition) + } + return nil + } + // Gate 5: perMessageGroupId HT-FIFO partition-hash consistency. + // For perMessageGroupId queues, the live partitionFor maps each + // group_id to one partition via FNV-1a hash; the receivers / + // group-lock reader use that partition value to find messages. + // A dump line with an in-range but inconsistent partition would + // be restored under the wrong lane and split a FIFO group across + // two partition-scoped group-lock keyspaces, allowing concurrent + // out-of-order delivery. Mirror the hash here. Codex P2 #929. + want := partitionForGroup(meta, rec.MessageGroupID) + if *rec.Partition != want { + return errors.Wrapf(ErrSQSEncodePartitionHashMismatch, + "queue %q (partition_count=%d, group_id=%q): message %q partition=%d, partitionFor=%d", + meta.Name, meta.PartitionCount, rec.MessageGroupID, rec.MessageID, *rec.Partition, want) } return nil } +// partitionForGroup mirrors adapter/sqs_partitioning.go:partitionFor +// for use by the encoder's validation gate 5. Same FNV-1a 32-bit +// inlined hash, same masking on (PartitionCount - 1). MUST be a copy +// (M3b-3 circular-dep ban: internal/backup cannot import adapter). +// Returns 0 when partitionFor would: classic queues, perQueue mode, +// or empty MessageGroupID. +func partitionForGroup(meta *sqsQueueMetaPublic, messageGroupID string) uint32 { + if meta == nil || meta.PartitionCount <= 1 { + return 0 + } + if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { + return 0 + } + if messageGroupID == "" { + return 0 + } + const ( + fnv32Offset uint32 = 2166136261 + fnv32Prime uint32 = 16777619 + ) + hash := fnv32Offset + for i := 0; i < len(messageGroupID); i++ { + hash ^= uint32(messageGroupID[i]) + hash *= fnv32Prime + } + // PartitionCount is power-of-two (live validator-enforced). + return hash & (meta.PartitionCount - 1) +} + // sortMessagesForPartitionedEmit sorts messages by // (partition, send_timestamp_millis, sequence_number, message_id). // Partition is the leading key so per-partition state (maxSeq, future diff --git a/internal/backup/encode_sqs_partitioned_test.go b/internal/backup/encode_sqs_partitioned_test.go index e2931f62e..dfaf3ec63 100644 --- a/internal/backup/encode_sqs_partitioned_test.go +++ b/internal/backup/encode_sqs_partitioned_test.go @@ -270,6 +270,66 @@ func TestSQSEncodeGateUsesRawPartitionCount(t *testing.T) { } } +// TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId pins gate 5 +// (codex P2 #929). For a perMessageGroupId HT-FIFO queue, the +// partition value MUST match partitionFor(MessageGroupID). An +// in-range but wrong partition would split a FIFO group across two +// partition-scoped group-lock keyspaces and break FIFO order on +// first read. +// +// "g0" hashes (FNV-1a-mod-2) to partition 0; pairing it with +// partition=1 violates the invariant. +func TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId(t *testing.T) { + t.Parallel() + in := t.TempDir() + writeSQSQueue(t, in, "p.fifo", + []byte(`{"format_version":1,"name":"p.fifo","fifo":true,`+ + `"visibility_timeout_seconds":30,"message_retention_seconds":345600,`+ + `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), + [][]byte{ + // group_id="g0" hashes to partition 0; partition=1 is + // in-range but inconsistent. + []byte(`{"message_id":"m1","body":"hello",` + + `"send_timestamp_millis":1000,"available_at_millis":1000,` + + `"message_group_id":"g0","partition":1}`), + }) + b := newSnapshotBuilder(sqsEncTS) + err := NewSQSRecordEncoder(in).Encode(b) + if !errors.Is(err, ErrSQSEncodePartitionHashMismatch) { + t.Fatalf("Encode err = %v, want ErrSQSEncodePartitionHashMismatch", err) + } +} + +// TestSQSEncodePartitionForGroup_LiveAdapterParity verifies the +// internal partitionForGroup mirror matches adapter/sqs_partitioning.go +// for representative inputs. We cannot import adapter (M3b-3 ban), +// so this is a parity-by-spec check against the FNV-1a constants. +func TestSQSEncodePartitionForGroup_LiveAdapterParity(t *testing.T) { + t.Parallel() + cases := []struct { + meta *sqsQueueMetaPublic + group string + want uint32 + }{ + // Classic queue: always 0. + {&sqsQueueMetaPublic{PartitionCount: 1}, "anything", 0}, + // perQueue: always 0 regardless of PartitionCount. + {&sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: sqsFifoThroughputPerQueue}, "g0", 0}, + // perMessageGroupId, empty group: 0. + {&sqsQueueMetaPublic{PartitionCount: 4, FifoThroughputLimit: "perMessageGroupId"}, "", 0}, + // perMessageGroupId, FNV-1a-mod-2 of "g0" = 0, "g1" = 1. + {&sqsQueueMetaPublic{PartitionCount: 2, FifoThroughputLimit: "perMessageGroupId"}, "g0", 0}, + {&sqsQueueMetaPublic{PartitionCount: 2, FifoThroughputLimit: "perMessageGroupId"}, "g1", 1}, + } + for _, c := range cases { + got := partitionForGroup(c.meta, c.group) + if got != c.want { + t.Errorf("partitionForGroup(meta{PC=%d,limit=%q}, %q) = %d, want %d", + c.meta.PartitionCount, c.meta.FifoThroughputLimit, c.group, got, c.want) + } + } +} + // TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys is // the regression for codex P1 / gemini critical (PR #929). A classic // queue dump with an explicit `"partition": 0` is allowed past gate 4 @@ -474,9 +534,12 @@ func TestSQSEncodePartitionedDedupBuildsGroupSegment(t *testing.T) { func TestSQSEncodePartitionedSideRecordsByteCrossCheck(t *testing.T) { t.Parallel() in := t.TempDir() + // groupID "g1" hashes (FNV-1a-mod-2) to partition 1; pair it with + // partition=1 so gate 5 (partition-hash consistency for + // perMessageGroupId) is satisfied. const ( queue = "p.fifo" - groupID = "g0" + groupID = "g1" dedupID = "d0" msgID = "mid-001" sendMs = int64(2000) @@ -489,7 +552,7 @@ func TestSQSEncodePartitionedSideRecordsByteCrossCheck(t *testing.T) { `"delay_seconds":0,"partition_count":2,"fifo_throughput_limit":"perMessageGroupId"}`), [][]byte{ []byte(`{"message_id":"mid-001","body":"x","send_timestamp_millis":2000,` + - `"available_at_millis":2000,"message_group_id":"g0",` + + `"available_at_millis":2000,"message_group_id":"g1",` + `"message_deduplication_id":"d0","sequence_number":1,"partition":1}`), }) entries, _, err := DecodeLiveEntries(bytes.NewReader(encodeSQSTree(t, in)))