backup: Phase 0b M5-3 impl - SQS partitioned-FIFO reverse encoder#929
backup: Phase 0b M5-3 impl - SQS partitioned-FIFO reverse encoder#929bootjp wants to merge 6 commits into
Conversation
Per design doc PR #914 §'Decoder lift'. Widens the two partitioned-key parsers to surface the partition u32 they already extract internally, and adds the sqsMessageRecord.Partition *uint32 field so the parsed value reaches the dump JSON. Changes: - parseSQSPartitionedQueueAndTrailer: (string, error) -> (string, uint32, error). The partition was already read off the trailer (binary.BigEndian.Uint32); return it instead of discarding. - parseSQSMessageDataKey: (string, error) -> (string, uint32, bool, error). The third return signals whether the key was the partitioned shape (so HandleMessageData knows when to attach Partition to the record). Classic-key path returns (enc, 0, false, nil) - byte-identical with the pre-M5-3 contract. - parseSQSGenericKey: signature unchanged externally. The new partition return from parseSQSPartitionedQueueAndTrailer is intentionally discarded - side-record dispatch routes by queue name only (HandleSideRecord wraps it). Comment pins the contract. - HandleMessageData: wires rec.Partition = &partition when isPartitioned == true. Classic call site leaves rec.Partition == nil, preserving legacy-dump round-trip. - sqsMessageRecord.Partition *uint32 with omitempty. Pointer (not uint32) so partition-0 messages are distinguishable from absent (codex P1 v914 v2; design doc decision matrix). Caller audit (semantic change to two parse fns): - parseSQSMessageDataKey production callers: HandleMessageData only. Updated to 4-tuple in this commit. 3 test sites also updated. - parseSQSPartitionedQueueAndTrailer production callers: parseSQSMessageDataKey + parseSQSGenericKey only. Both updated. - parseSQSGenericKey signature unchanged - no caller audit needed for that symbol. go test ./internal/backup/ passes.
Per design doc PR #914 §'Encoder lift'. Adds the four partitioned-key constructors (data, vis, byage, dedup) plus the effectivePartitionCount helper into internal/backup/, mirroring adapter/sqs_keys.go and adapter/sqs_keys_dispatch.go. internal/backup/ cannot import adapter (M3b-3 circular-dep pattern; see existing sqsFifoDedupWindowMillis mirror at encode_sqs_side.go:15). The constructors and constants are duplicated locally instead. New symbols (encode_sqs.go): - const sqsPartitionedQueueTerminator byte = '|' (mirror of adapter:82) - const sqsFifoThroughputPerQueue = "perQueue" (mirror of unexported htfifoThroughputPerQueue at adapter/sqs_partitioning.go:37) - const SQSPartitionedMsg{Data,Vis,ByAge,Dedup}Prefix (mirror of adapter Sqs* prefixes at adapter/sqs_keys.go:91..95) - sqsPartitionedMsgDataKeyBytes - effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 - mirror of adapter helper, operating on the dump-side struct. Doc comment explicitly pins that this is NOT used by encoder validation gates (codex P2 v914 v7) - those use raw meta.PartitionCount > 1. New symbols (encode_sqs_side.go): - sqsPartitionedMsgVisKeyBytes - sqsPartitionedMsgByAgeKeyBytes - sqsPartitionedMsgDedupKeyBytes - includes the second '|' between the variable-length group and dedup segments (CodeRabbit major PR #732 round 6; design doc §line 19) to prevent FNV-collision false-dups. Tests (new file encode_sqs_partitioned_test.go): - TestSQSEncodePartitionedMsg{Data,Vis,ByAge,Dedup}KeyByteShape - byte-equality assertions vs manually-assembled expected bytes (since direct adapter cross-check is impossible). - TestSQSEncodePartitionedMsgDedupKeyByteShape also pins the byte at the group+dedup terminator offset so a future change that drops the '|' regressing the FNV-collision class fails this test. - TestSQSEffectivePartitionCount - 6-case table covering nil meta, PartitionCount<=1, perQueue collapse, perGroup, and unset throughput. Pure additions: no behavior change yet. Slice D will wire these into addMessage / addSideRecords with the validation gates and the new sentinels.
…gh emit Per design doc PR #914 §'Encoder lift' and §'Validation invariants'. Drops ErrSQSEncodeUnsupportedPartitioned and replaces it with the four fail-closed validation gates from the design's decision matrix, plus partition-aware dispatch in addMessage / addSideRecords. Sentinel changes: - Remove ErrSQSEncodeUnsupportedPartitioned (no longer reachable). - Add ErrSQSEncodeMissingPartition (partitioned + Partition==nil). - Add ErrSQSEncodeOutOfRangePartition (*Partition >= PartitionCount). - Add ErrSQSEncodePartitionRoutingMismatch (perQueue + nonzero partition). Validation: validatePartitioning(meta, records) runs the four gates before any record is staged. All four use raw meta.PartitionCount > 1 as the partitioned-queue predicate, never effectivePartitionCount (codex P2 v914 v7 demonstrated that effective-count gates would let a perQueue dump with Partition==nil emit classic keys against a partitioned-keyspace queue - silent data loss). Split into validatePartitioningOne + validatePartitioningPartitioned to keep cyclomatic complexity under the 10-cyclop limit. Sort: sortMessagesForPartitionedEmit sorts by (partition, send_ts, sequence_number, message_id). Classic sortMessagesForEmit at sqs.go:842 is unchanged (used by the decoder's writeMessagesJSONL). addMessage signature: (b, queueName, partition *uint32, rec) - new partition param selects sqsPartitionedMsgDataKeyBytes when non-nil, classic sqsMsgDataKeyBytes when nil. addSideRecords signature: (b, queueName, partition *uint32, meta, rec) - new partition param selects the partitioned (vis, byage, dedup) constructors when non-nil; dedup additionally takes rec.MessageGroupID for the partitioned <group-seg>+|+<dedup-seg> shape. Extracted stageMessageRecords from encodeQueueMessages to keep that function under cyclop as well. Caller audit (semantic changes - confirmed single-site each): - addMessage: encodeQueueMessages only (now via stageMessageRecords). - addSideRecords: encodeQueueMessages only (now via stageMessageRecords). - validatePartitioning: encodeQueueMessages only. - sortMessagesForPartitionedEmit: encodeQueueMessages only. - ErrSQSEncodeUnsupportedPartitioned production site at the old encode_sqs.go:163 deleted. The one test (TestSQSEncodeRejectsPartitioned) is removed in this commit; the four new gate tests land in Slice E. go test + golangci-lint on ./internal/backup/ both pass. Classic-queue tests remain green: Partition==nil + PartitionCount<=1 takes the legacy paths unchanged in both addMessage and addSideRecords.
Per design doc PR #914 §'Test plan'. Adds the design's eight test rows plus two regression tests pinning the codex P2 v914 v7 fix and the 4-field sort stability. Tests added (encode_sqs_partitioned_test.go): - TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue - gate 1 (codex P1 v914 v2). - TestSQSEncodeRejectsOutOfRangePartition - gate 2. - TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO - gate 3 (codex P2 v914 v4). - TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue - gate 4. - TestSQSEncodeGateUsesRawPartitionCount - codex P2 v914 v7 regression. Pins that gates use raw meta.PartitionCount > 1 rather than effectivePartitionCount, which would otherwise let a perQueue + PartitionCount=2 + Partition==nil dump slip past validation and silently emit classic keys against a partitioned-keyspace queue. - TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip - pre-M5-3 classic dumps still round-trip with Partition==nil throughout. - TestSQSEncodePartitionedQueueRoundTrip - 3-message partitioned queue emits data/vis/byage/dedup with |p| prefix; no classic-shape data/vis/byage/dedup keys leak. - TestSQSEncodePartitionedDedupBuildsGroupSegment - partitioned dedup key shape matches sqsPartitionedMsgDedupKeyBytes byte-for-byte. - TestSQSEncodePartitionedSideRecordsByteCrossCheck - vis + byage + dedup all byte-equal the constructors (M5-2-style in-package cross-check; adapter constructors are unexported). - TestSQSEncodePartitionedSortStableAcrossPartitions - 4-field sort groups by partition first, then send_ts, then sequence_number. Constructor adjustment (encode_sqs_side.go): - sqsPartitionedMsgDedupKeyBytes: drop the gen parameter (always sqsRestoreGeneration on the reverse-encode path). Mirrors the classic sqsMsgDedupKeyBytes hardcode pattern and satisfies the unparam lint. Comment updated to explain the divergence from the adapter-side variable-gen signature. go test + golangci-lint clean on ./internal/backup/.
|
Warning Review limit reached
More reviews will be available in 44 minutes and 31 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7603bf7877
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { | ||
| var maxSeq uint64 | ||
| for i := range records { | ||
| seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i]) |
There was a problem hiding this comment.
Avoid emitting partitioned keys for classic partition 0
When a classic queue dump (partition_count <= 1) includes an explicit "partition": 0, validatePartitioningOne allows it, but this unconditional records[i].Partition argument makes addMessage/addSideRecords choose the partitioned |p| key constructors. The restored queue metadata still selects the classic keyspace, so those messages and indexes are written where the live readers for that queue will not scan them; either reject any non-nil partition on classic queues or pass nil unless meta.PartitionCount > 1.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request implements support for partitioned SQS queues in the backup encoder and decoder, introducing validation gates for partitioning invariants, deterministic message sorting, and partitioned key-generation helpers. The code review highlights a critical correctness bug where relying solely on the nullability of the partition pointer to determine the key shape can cause classic queues with an explicit partition of 0 to write partitioned keys, leading to silent data loss on restore. The reviewer suggests resolving this by explicitly passing an isPartitioned boolean and the raw partition value to addMessage and addSideRecords.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { | ||
| var maxSeq uint64 | ||
| for i := range records { | ||
| seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i]) | ||
| if err != nil { | ||
| return 0, err | ||
| } | ||
| if err := e.addSideRecords(b, meta.Name, records[i].Partition, meta, &records[i]); err != nil { | ||
| return 0, err | ||
| } | ||
| if seq > maxSeq { | ||
| maxSeq = seq | ||
| } | ||
| } | ||
| return maxSeq, nil | ||
| } |
There was a problem hiding this comment.
Using partition != nil as the sole indicator to determine whether to write partitioned keys introduces a critical correctness bug. If a classic queue (PartitionCount <= 1) message has an explicit Partition field set to 0 (which is allowed by validatePartitioningOne), records[i].Partition will be non-nil. This causes addMessage and addSideRecords to write partitioned keys (!sqs|msg|data|p|...) instead of classic keys (!sqs|msg|data|...). Since the live reader for a classic queue only scans the classic keyspace, these restored messages will be completely invisible, leading to silent data loss on restore.
To fix this, decouple the key shape decision from the pointer nullability by passing isPartitioned bool (derived from meta.PartitionCount > 1) and the raw partition uint32 value to addMessage and addSideRecords.
func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
var maxSeq uint64
isPartitioned := meta.PartitionCount > 1
for i := range records {
var partition uint32
if isPartitioned && records[i].Partition != nil {
partition = *records[i].Partition
}
seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i])
if err != nil {
return 0, err
}
if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil {
return 0, err
}
if seq > maxSeq {
maxSeq = seq
}
}
return maxSeq, nil
}| // partition is non-nil only when the queue is partitioned | ||
| // (meta.PartitionCount > 1); validatePartitioning enforces this | ||
| // invariant before encodeQueueMessages invokes addMessage. | ||
| func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) { |
There was a problem hiding this comment.
Update the addMessage signature to accept isPartitioned bool and partition uint32 instead of partition *uint32 to prevent the classic-queue partitioned key generation bug.
| func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) { | |
| func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) { |
| var key []byte | ||
| if partition != nil { | ||
| key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID) | ||
| } else { | ||
| key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) | ||
| } |
There was a problem hiding this comment.
Dispatch the key shape based on the explicit isPartitioned boolean rather than the pointer nullability.
| var key []byte | |
| if partition != nil { | |
| key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID) | |
| } else { | |
| key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) | |
| } | |
| var key []byte | |
| if isPartitioned { | |
| key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID) | |
| } else { | |
| key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) | |
| } |
| // SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256 | ||
| // derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID). | ||
| func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { | ||
| func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { |
There was a problem hiding this comment.
Update the addSideRecords signature to accept isPartitioned bool and partition uint32 instead of partition *uint32 to prevent writing partitioned side records for classic queues.
| func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { | |
| func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { |
| var visKey, byAgeKey []byte | ||
| if partition != nil { | ||
| visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | ||
| byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | ||
| } else { | ||
| visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | ||
| byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | ||
| } |
There was a problem hiding this comment.
Dispatch the visibility and age index key shapes based on the explicit isPartitioned boolean.
| var visKey, byAgeKey []byte | |
| if partition != nil { | |
| visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | |
| byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | |
| } else { | |
| visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | |
| byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | |
| } | |
| var visKey, byAgeKey []byte | |
| if isPartitioned { | |
| visKey = sqsPartitionedMsgVisKeyBytes(queueName, partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | |
| byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | |
| } else { | |
| visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID) | |
| byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID) | |
| } |
| var dedupKey []byte | ||
| if partition != nil { | ||
| // Partitioned dedup includes <group-seg> + '|' + <dedup-seg> | ||
| // per the partitioned dedup constructor — see | ||
| // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit | ||
| // major PR #732 round 6: the second '|' prevents FNV-collapse | ||
| // across distinct (groupID, dedupID) pairs on the same partition). | ||
| dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID) | ||
| } else { | ||
| dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) | ||
| } |
There was a problem hiding this comment.
Dispatch the deduplication key shape based on the explicit isPartitioned boolean.
| var dedupKey []byte | |
| if partition != nil { | |
| // Partitioned dedup includes <group-seg> + '|' + <dedup-seg> | |
| // per the partitioned dedup constructor — see | |
| // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit | |
| // major PR #732 round 6: the second '|' prevents FNV-collapse | |
| // across distinct (groupID, dedupID) pairs on the same partition). | |
| dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID) | |
| } else { | |
| dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) | |
| } | |
| var dedupKey []byte | |
| if isPartitioned { | |
| // Partitioned dedup includes <group-seg> + '|' + <dedup-seg> | |
| // per the partitioned dedup constructor — see | |
| // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit | |
| // major PR #732 round 6: the second '|' prevents FNV-collapse | |
| // across distinct (groupID, dedupID) pairs on the same partition). | |
| dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID) | |
| } else { | |
| dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID) | |
| } |
… (codex P1 #929 + gemini critical) Bug: a classic queue dump (PartitionCount<=1) with an explicit rec.Partition=&0 silently emitted partitioned-shape keys (!sqs|msg|data|p|0|...). validatePartitioningOne gate 4 lets *Partition==0 through, but addMessage/addSideRecords routed on 'partition != nil' and the live reader for a classic queue only scans the classic keyspace - the messages become invisible on first read (silent data loss). Root cause: the key-shape decision was tied to a per-MESSAGE property (rec.Partition != nil), but the keyspace selection is a per-QUEUE property (meta.PartitionCount > 1). They CAN disagree. Fix: introduce (isPartitioned bool, partition uint32) signatures for addMessage and addSideRecords, derived once from raw meta.PartitionCount > 1 in stageMessageRecords. Dispatch branches now key on isPartitioned, not pointer nullability. Caller audit (signature change, semantic-changing): - addMessage callers: stageMessageRecords only (single site, updated). - addSideRecords callers: stageMessageRecords only (single site, updated). - rec.Partition retains its *uint32 type and gate 4 invariant; the encoder simply ignores Partition when isPartitioned==false rather than reading the value off the message. Regression test: TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys sends a classic queue with explicit 'partition: 0' through the encoder and verifies no |p| keys leak. Would fail under the buggy dispatch. Addresses codex P1 #929 and 6 gemini critical findings (line 284 + 384 + 416 in encode_sqs.go; line 187 + 197 + 232 in encode_sqs_side.go) — all the same root cause.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1a5540ea17
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if *rec.Partition >= meta.PartitionCount { | ||
| return errors.Wrapf(ErrSQSEncodeOutOfRangePartition, | ||
| "queue %q (partition_count=%d): message %q partition=%d out of range", | ||
| meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) | ||
| } |
There was a problem hiding this comment.
Reject partitions that do not match the message group hash
For perMessageGroupId HT-FIFO dumps, an in-range but wrong partition still passes these gates and is then restored under that wrong lane. Live sends route each MessageGroupId with partitionFor (adapter/sqs_partitioning.go:96-103), while receives/group locks use the candidate lane (adapter/sqs_messages.go:1205-1206, adapter/sqs_fifo.go:292-294), so a malformed or hand-edited dump can split the same FIFO group across two partitioned group-lock keyspaces and allow concurrent/out-of-order delivery after restore. Please mirror the hash check here (except for perQueue, already handled) before emitting the partitioned keys.
Useful? React with 👍 / 👎.
|
@claude review Addresses codex P1 + 6 gemini critical findings (same root cause): A classic queue dump ( Fix: Caller audit: both functions have a single call site ( Regression pinned by |
|
Claude finished @bootjp's task in 2m 15s —— View job Code Review — commit
|
| Input | isPartitioned |
partition |
Key emitted |
|---|---|---|---|
classic, Partition==nil |
false |
0 (unused) |
!sqs|msg|data|... ✓ |
classic, *Partition==0 (was buggy) |
false |
0 (unused) |
!sqs|msg|data|... ✓ (fixed) |
partitioned, *Partition==1 |
true |
1 |
!sqs|msg|data|p|...|1... ✓ |
The same isPartitioned signal correctly threads through addSideRecords (vis, byage, dedup keys).
One defensive note (encode_sqs.go:284):
var partition uint32
if isPartitioned && records[i].Partition != nil {
partition = *records[i].Partition
}The records[i].Partition == nil branch inside isPartitioned=true is unreachable — validatePartitioning gate 1 (ErrSQSEncodeMissingPartition) would have already aborted. The defensive nil guard is safe, but consider adding an inline comment that clarifies it's a belt-and-suspenders guard against future callers that bypass validatePartitioning.
Regression test coverage — Solid ✓
| Test | What it pins |
|---|---|
TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys |
The exact bug — verifies no ` |
TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue |
Gate 1 — nil partition on partitioned queue |
TestSQSEncodeRejectsOutOfRangePartition |
Gate 2 — *Partition >= PartitionCount |
TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO |
Gate 3 — perQueue with *Partition != 0 |
TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue |
Gate 4 — classic with non-zero partition |
TestSQSEncodeGateUsesRawPartitionCount |
Pins that effective-count is never used in gates |
TestSQSEncodePartitioned{Data,Vis,ByAge,Dedup}KeyByteShape |
Wire-format byte assertions |
TestSQSEncodePartitionedSortStableAcrossPartitions |
Partition-major sort ordering |
TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip |
Backwards compat: pre-M5-3 dumps still round-trip |
The TestSQSEncodePartitionedQueueRoundTrip test verifies no classic-shape keys bleed through for a partitioned queue, and TestSQSEncodePartitionedSideRecordsByteCrossCheck cross-checks all three side-record key shapes against their constructors.
Codex P2 — still open (not claimed by this PR)
The Codex P2 finding at encode_sqs.go:349 (13:31 review) is still unresolved: for perMessageGroupId HT-FIFO queues, a message whose *Partition is in-range but wrong for its MessageGroupID passes all four gates and restores to the wrong partition lane. Live sends hash MessageGroupID via partitionFor (adapter/sqs_partitioning.go:96-103); ReceiveMessage and group locks use the same lane, so a malformed dump can scatter the same FIFO group across two partitioned group-lock keyspaces and allow concurrent or out-of-order delivery after restore.
This is a separate concern from the P1 (which this commit correctly fixes), but it's worth tracking before merge. Mirroring partitionFor locally would hit the usual M3b-3 circular-dep constraint — the hash function and the perMessageGroupId constant would need to be reproduced in internal/backup/ the same way sqsFifoThroughputPerQueue was. A fifth gate like *Partition != partitionFor(rec.MessageGroupID, meta.PartitionCount) && meta.FifoThroughputLimit == "perMessageGroupId" would close it.
Bottom line: P1 fix is correct and complete. The isPartitioned bool decoupling properly closes the silent-data-loss path on classic queues with explicit Partition=&0. Test coverage is thorough. P2 hash validation for perMessageGroupId queues warrants a follow-up before this merges.
…y (codex P2 #929) For perMessageGroupId HT-FIFO queues the live partitionFor maps each MessageGroupId to one partition via FNV-1a-mod-PartitionCount (adapter/sqs_partitioning.go:64). Receivers and group-lock readers use partitionFor's mapping to locate messages and lock state. An in-range but inconsistent partition in a dump would restore under the wrong lane, splitting a FIFO group across two partition-scoped group-lock keyspaces and allowing concurrent out-of-order delivery on first read. Gate 5 mirrors partitionFor inside the encoder and rejects any perMessageGroupId message whose dump-recorded partition disagrees with the recomputed hash. New: partitionForGroup(meta, messageGroupID) - FNV-1a-32 mirror of adapter/sqs_partitioning.go's inlined hash, masked on PartitionCount-1 (power-of-two validator-enforced live). Returns 0 for classic queues, perQueue, or empty MessageGroupID. New sentinel: ErrSQSEncodePartitionHashMismatch. Caller audit (validation gate addition - widens fail-closed surface): - validatePartitioningPartitioned: the single caller chain is validatePartitioning -> validatePartitioningOne -> ...Partitioned. Only invoked from stageMessageRecords. No production callers changed. - partitionForGroup: new helper, callers are validation + tests only. Test fixture realignment: TestSQSEncodePartitionedSideRecordsByteCrossCheck used group_id='g0' (hashes to partition 0) with partition=1, which would now correctly fail gate 5. Changed to group_id='g1' (hashes to 1) to keep the partition=1 byte-shape coverage and remain consistent with the live hash. New regression tests: - TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId - g0+partition=1 fails closed with ErrSQSEncodePartitionHashMismatch. - TestSQSEncodePartitionForGroup_LiveAdapterParity - 5-case table verifying the mirror matches the live FNV-1a-mod-N spec for classic / perQueue / empty group / hash-mod-2 cases.
|
@claude review Addresses codex P2 #929 ( For perMessageGroupId HT-FIFO queues, the live router maps each Gate 5 added ( Caller audit: gate addition only widens fail-closed surface; Regression: |
|
Claude encountered an error after 2m 45s —— View job Code Review — Codex P2 fix: Gate 5 (
|
| Condition | Live partitionFor |
partitionForGroup |
|---|---|---|
meta == nil |
return 0 |
return 0 |
PartitionCount <= 1 |
return 0 |
return 0 |
FifoThroughputLimit == "perQueue" |
return 0 |
return 0 |
messageGroupID == "" |
return 0 |
return 0 |
| Hash constants | fnv32Offset=2166136261, fnv32Prime=16777619 |
identical |
| Mask | hash & (meta.PartitionCount - 1) |
identical |
Both use inlined FNV-1a 32-bit over raw bytes (not via hash/fnv.New32a) and the same power-of-two mask. This means any partitioned queue that was live when the backup was taken will produce the same routing result at restore time.
Gate 5 placement — CORRECT ✓
Gate 5 sits after gate 3 in validatePartitioningPartitioned (encode_sqs.go:344). The ordering is correct:
- Gate 1 (nil Partition) short-circuits before pointer dereference
- Gate 2 (out-of-range) bounds the value before the hash check
- Gate 3 (perQueue) does an early
return nil— avoids computing a hash that perQueue routing would never use - Gate 5 fires for everything else: both explicit
"perMessageGroupId"and any other non-perQueue FifoThroughputLimit
One minor documentation gap: the error sentinel's comment (encode_sqs.go:83) and the error message say "perMessageGroupId HT-FIFO message", but Gate 5 actually fires for any non-perQueue partitioned queue, including those with an empty FifoThroughputLimit. In practice this doesn't matter — the live validatePartitionConfig effectively requires either "perQueue" or "perMessageGroupId" for any partitioned queue — but the comment is technically narrower than the code. This is harmless but worth a future clarification.
Fixture realignment — CORRECT ✓
The TestSQSEncodePartitionedSideRecordsByteCrossCheck fixture change (g0+partition=0 → g1+partition=1) is correct. Gate 5 was not present in earlier slices; re-pairing the fixture so its MessageGroupID matches the FNV-1a partition is the right call. The parity table in TestSQSEncodePartitionForGroup_LiveAdapterParity confirms "g1" → 1 for PartitionCount=2. ✓
Unverified manually: TestSQSEncodePartitionedDedupBuildsGroupSegment uses "groupA" with "partition":1. Since Gate 5 is now active and the PR claims go test ./internal/backup/ passes, FNV-1a("groupA") & 1 must equal 1. Worth verifying in CI if not done already.
Regression test coverage — SOLID ✓
| Test | What it pins |
|---|---|
TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId |
Gate 5 fires for "g0" + partition=1 mismatch |
TestSQSEncodePartitionForGroup_LiveAdapterParity |
5-case parity table versus the live FNV-1a spec |
TestSQSEncodePartitionedSideRecordsByteCrossCheck |
Realigned fixture "g1"+partition=1, all 3 side-record key shapes |
| Prior gates 1–4 + prior tests | Unaffected; gate 5 sits after gate 3's return nil |
The one missing case worth considering: a partitioned queue with FifoThroughputLimit="" (not explicitly set) + hash mismatch — Gate 5 would fire there too, but there's no test for it. Given the live validator enforces explicit FifoThroughputLimit for PartitionCount > 1, this is a theoretical gap only.
Summary
Gate 5 implementation is correct and complete:
- Hash is a faithful mirror of the live
partitionFor - Gate placement is correct (after out-of-range, with perQueue early-exit before hash computation)
- Regression tests cover the main failure mode and the parity invariant
- Fixture realignment is properly motivated and consistent
Bottom line: LGTM. The only open item before merge is confirming CI passes for the "groupA" fixture in TestSQSEncodePartitionedDedupBuildsGroupSegment (hash parity cannot be verified by code inspection alone for that case).
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0460e285eb
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit | ||
| // major PR #732 round 6: the second '|' prevents FNV-collapse | ||
| // across distinct (groupID, dedupID) pairs on the same partition). | ||
| dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID) |
There was a problem hiding this comment.
Avoid rebuilding duplicate partitioned dedup rows
When a partitioned FIFO queue retains two messages in the same MessageGroupId with the same MessageDeduplicationId (or same CBD body) sent more than five minutes apart, the live queue can contain both messages because the earlier dedup entry has expired/been overwritten. This branch reconstructs a dedup row for every restored message using only (queue, partition, group, dedupID), so the later message stages the same user key and snapshotBuilder.Add rejects it as ErrEncodeDuplicateKey, aborting the restore for an otherwise valid queue; only the current/non-expired dedup entry should be rebuilt, or expired duplicates should be skipped.
Useful? React with 👍 / 👎.
Summary
Phase 0b M5-3 implementation — SQS partitioned-FIFO reverse encoder.
The design doc landed via PR #914 (merged 2026-06-03). This PR carries
the implementation that the doc described, in 4 commits matching the
slice plan:
235c13ba): decoder partition plumbing —parseSQSMessageDataKeywidens to(enc, partition, isPartitioned, err),sqsMessageRecord.Partition *uint32,HandleMessageDatawires theparsed partition.
6640674a): mirror the four partitioned key constructors(
sqsPartitionedMsgDataKeyBytes,…Vis…,…ByAge…,…Dedup…) pluseffectivePartitionCountand the"perQueue"constant. M3b-3circular-dep pattern —
internal/backup/cannot importadapter/.1037deba): dropErrSQSEncodeUnsupportedPartitioned, addfour new sentinels (
ErrSQSEncodeMissingPartition,ErrSQSEncodeOutOfRangePartition,ErrSQSEncodePartitionRoutingMismatch), addvalidatePartitioning+sortMessagesForPartitionedEmit, threadpartition *uint32throughaddMessageandaddSideRecords.7603bf78): 10 tests — the 8 design-doc rows plusTestSQSEncodeGateUsesRawPartitionCount(codex P2 v914 v7 regression)and
TestSQSEncodePartitionedSortStableAcrossPartitions.Validation gates
All four use raw
meta.PartitionCount > 1as the partitioned-queuepredicate, never
effectivePartitionCount. Codex P2 v914 v7 demonstratedthat effective-count gates would silently allow a
perQueue+PartitionCount=2+Partition==nildump to slip past validation, thenaddMessage'spartition != nilbranch would emit classic-shape keysagainst a partitioned-keyspace queue (silent data loss on first read).
PartitionCount > 1 && Partition == nilErrSQSEncodeMissingPartitionPartitionCount > 1 && *Partition >= PartitionCountErrSQSEncodeOutOfRangePartitionPartitionCount <= 1 && *Partition != 0ErrSQSEncodeInvalidMessagePartitionCount > 1 && perQueue && *Partition != 0ErrSQSEncodePartitionRoutingMismatch5-lens self-review
consistency. Classic-queue tests stay green
(
Partition==nil+PartitionCount<=1takes legacy path).shared state.
records. No extraallocations.
effectivePartitionCountnot on the hot path.PartitionCount > 1selects key shape inboth this encoder AND the live
adapter/sqs_keys_dispatch.go— theyagree. Partitioned dedup key includes
<group-seg>+|+<dedup-seg>(CodeRabbit major PR feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2) #732 round 6).
prior review findings.
Test plan
go test ./internal/backup/passesgolangci-lint run --config=.golangci.yaml ./internal/backup/0 issues