-
Notifications
You must be signed in to change notification settings - Fork 2
backup: Phase 0b M5-3 impl - SQS partitioned-FIFO reverse encoder #929
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
235c13b
6640674
1037deb
7603bf7
1a5540e
0460e28
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||||
| "os" | ||||||||||||||||||||||||||
| "path/filepath" | ||||||||||||||||||||||||||
| "sort" | ||||||||||||||||||||||||||
| "strconv" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| "github.com/cockroachdb/errors" | ||||||||||||||||||||||||||
|
|
@@ -54,10 +55,39 @@ var ( | |||||||||||||||||||||||||
| // ErrSQSEncodeNotRegular is returned when a dump file is not a regular | ||||||||||||||||||||||||||
| // file (symlink / FIFO / device / directory). | ||||||||||||||||||||||||||
| ErrSQSEncodeNotRegular = errors.New("backup: sqs dump file is not a regular file") | ||||||||||||||||||||||||||
| // ErrSQSEncodeUnsupportedPartitioned is returned for HT-FIFO queues | ||||||||||||||||||||||||||
| // (partition_count > 1), whose partitioned key family this slice does | ||||||||||||||||||||||||||
| // not yet reproduce. | ||||||||||||||||||||||||||
| ErrSQSEncodeUnsupportedPartitioned = errors.New("backup: sqs partitioned (HT-FIFO) queue not yet supported by encoder") | ||||||||||||||||||||||||||
| // ErrSQSEncodeMissingPartition fires when a partitioned queue | ||||||||||||||||||||||||||
| // (meta.PartitionCount > 1) has a message whose Partition field is | ||||||||||||||||||||||||||
| // nil. Pre-M5-3 dumps lack the field entirely; replaying one against | ||||||||||||||||||||||||||
| // a partitioned queue would silently route every message to | ||||||||||||||||||||||||||
| // partition 0 via the `partition != nil` dispatch in addMessage, | ||||||||||||||||||||||||||
| // while the live readers scan the partitioned keyspace selected | ||||||||||||||||||||||||||
| // from raw PartitionCount > 1 — i.e. classic-shape keys against a | ||||||||||||||||||||||||||
| // partitioned reader = invisible on first read. Pinned by | ||||||||||||||||||||||||||
| // TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue. | ||||||||||||||||||||||||||
| ErrSQSEncodeMissingPartition = errors.New("backup: sqs partitioned queue message missing partition field") | ||||||||||||||||||||||||||
| // ErrSQSEncodeOutOfRangePartition fires when a partitioned-queue | ||||||||||||||||||||||||||
| // message carries *Partition >= meta.PartitionCount. The dump is | ||||||||||||||||||||||||||
| // malformed (operator hand-edit or a future M5-3 decoder bug). | ||||||||||||||||||||||||||
| ErrSQSEncodeOutOfRangePartition = errors.New("backup: sqs message partition out of range for queue partition_count") | ||||||||||||||||||||||||||
| // ErrSQSEncodePartitionRoutingMismatch fires when a partitioned | ||||||||||||||||||||||||||
| // FIFO queue with FifoThroughputLimit == "perQueue" has a message | ||||||||||||||||||||||||||
| // with *Partition != 0. The live partitionFor | ||||||||||||||||||||||||||
| // (adapter/sqs_partitioning.go:71-72) forces every group to | ||||||||||||||||||||||||||
| // partition 0 in perQueue mode regardless of PartitionCount, and | ||||||||||||||||||||||||||
| // ReceiveMessage only scans the partition-0 lane. Accepting any | ||||||||||||||||||||||||||
| // other partition value would restore messages onto |p|N|... lanes | ||||||||||||||||||||||||||
| // the live receive fan-out never visits — silent data loss on | ||||||||||||||||||||||||||
| // first read. Codex P2 v914 v4 caught this gap. | ||||||||||||||||||||||||||
| ErrSQSEncodePartitionRoutingMismatch = errors.New("backup: sqs perQueue HT-FIFO queue must keep all messages on partition 0") | ||||||||||||||||||||||||||
| // ErrSQSEncodePartitionHashMismatch fires for a perMessageGroupId | ||||||||||||||||||||||||||
| // HT-FIFO message whose Partition disagrees with | ||||||||||||||||||||||||||
| // partitionFor(MessageGroupID). The live send/receive/group-lock | ||||||||||||||||||||||||||
| // path uses partitionFor's FNV-1a-mod-PartitionCount mapping | ||||||||||||||||||||||||||
| // (adapter/sqs_partitioning.go:64); restoring under the wrong | ||||||||||||||||||||||||||
| // lane would split a FIFO group across two partition-scoped | ||||||||||||||||||||||||||
| // group-lock keyspaces and break FIFO order on first read. Codex | ||||||||||||||||||||||||||
| // P2 #929. | ||||||||||||||||||||||||||
| ErrSQSEncodePartitionHashMismatch = errors.New("backup: sqs perMessageGroupId HT-FIFO message partition disagrees with partitionFor(message_group_id)") | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // sqsStoredQueueMeta mirrors the live adapter's sqsQueueMeta JSON shape | ||||||||||||||||||||||||||
|
|
@@ -159,10 +189,6 @@ func (e *SQSRecordEncoder) encodeQueue(b *snapshotBuilder, root *os.Root, queueD | |||||||||||||||||||||||||
| if meta.Name == "" { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodeInvalidQueue, "%s/_queue.json: empty queue name", queueDir) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if meta.PartitionCount > 1 { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodeUnsupportedPartitioned, | ||||||||||||||||||||||||||
| "%s: partition_count %d", queueDir, meta.PartitionCount) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if err := e.addQueueMeta(b, meta); err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
@@ -205,18 +231,28 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root | |||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| var maxSeq uint64 | ||||||||||||||||||||||||||
| for i := range records { | ||||||||||||||||||||||||||
| seq, err := e.addMessage(b, meta.Name, records[i]) | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if seq > maxSeq { | ||||||||||||||||||||||||||
| maxSeq = seq | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Fail-closed validation up-front so a malformed dump never stages | ||||||||||||||||||||||||||
| // partial records. Uses raw meta.PartitionCount > 1 as the | ||||||||||||||||||||||||||
| // partitioned-queue predicate (NOT effectivePartitionCount; codex | ||||||||||||||||||||||||||
| // P2 v914 v7 - using the effective count would allow a perQueue | ||||||||||||||||||||||||||
| // dump with Partition == nil to slip past the missing-partition | ||||||||||||||||||||||||||
| // gate, then addMessage's `partition != nil` dispatch would emit | ||||||||||||||||||||||||||
| // classic-shape keys against a partitioned-keyspace queue, making | ||||||||||||||||||||||||||
| // every restored message invisible). | ||||||||||||||||||||||||||
| if err := validatePartitioning(&meta, records); err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Per-partition deterministic emission for partitioned queues. The | ||||||||||||||||||||||||||
| // snapshotBuilder sorts by key bytes on WriteTo so this does not | ||||||||||||||||||||||||||
| // directly affect the .fsm byte output, but it makes the in-loop | ||||||||||||||||||||||||||
| // state (maxSeq, future per-partition counters) deterministic and | ||||||||||||||||||||||||||
| // matches the design doc's stated contract. | ||||||||||||||||||||||||||
| if meta.PartitionCount > 1 { | ||||||||||||||||||||||||||
| sortMessagesForPartitionedEmit(records) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| maxSeq, err := e.stageMessageRecords(b, &meta, records) | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if meta.FIFO && maxSeq > 0 { | ||||||||||||||||||||||||||
| // The live FIFO send path (adapter/sqs_fifo.go: loadFifoSequence | ||||||||||||||||||||||||||
|
|
@@ -235,10 +271,190 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root | |||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // stageMessageRecords iterates the (pre-validated, pre-sorted) records | ||||||||||||||||||||||||||
| // and stages each one's data + side records on b, returning the | ||||||||||||||||||||||||||
| // maximum SequenceNumber observed (used to write the FIFO seq counter). | ||||||||||||||||||||||||||
| // Split out of encodeQueueMessages so the parent stays under cyclop. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // isPartitioned is derived once from raw meta.PartitionCount > 1 — NOT | ||||||||||||||||||||||||||
| // from rec.Partition != nil. Codex P1 / gemini critical (PR #929): | ||||||||||||||||||||||||||
| // a classic queue dump (PartitionCount<=1) may carry an explicit | ||||||||||||||||||||||||||
| // rec.Partition = &0 that validatePartitioningOne lets through (gate 4 | ||||||||||||||||||||||||||
| // only rejects *Partition != 0). Routing on `partition != nil` would | ||||||||||||||||||||||||||
| // emit partitioned keys for that classic queue, and the live reader | ||||||||||||||||||||||||||
| // only scans the classic keyspace - the messages become invisible on | ||||||||||||||||||||||||||
| // first read. Keying the dispatch on isPartitioned (a property of the | ||||||||||||||||||||||||||
| // QUEUE, not the message) closes that gap. | ||||||||||||||||||||||||||
| func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) { | ||||||||||||||||||||||||||
| var maxSeq uint64 | ||||||||||||||||||||||||||
| isPartitioned := meta.PartitionCount > 1 | ||||||||||||||||||||||||||
| for i := range records { | ||||||||||||||||||||||||||
| var partition uint32 | ||||||||||||||||||||||||||
| if isPartitioned && records[i].Partition != nil { | ||||||||||||||||||||||||||
| partition = *records[i].Partition | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i]) | ||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return 0, err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil { | ||||||||||||||||||||||||||
| return 0, err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if seq > maxSeq { | ||||||||||||||||||||||||||
| maxSeq = seq | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return maxSeq, nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // validatePartitioning runs the four fail-closed gates from the M5-3 | ||||||||||||||||||||||||||
| // design doc §"Validation invariants" before any message is staged. | ||||||||||||||||||||||||||
| // All four use raw meta.PartitionCount > 1 as the partitioned-queue | ||||||||||||||||||||||||||
| // predicate, never effectivePartitionCount (codex P2 v914 v7). | ||||||||||||||||||||||||||
| func validatePartitioning(meta *sqsQueueMetaPublic, records []sqsMessageRecord) error { | ||||||||||||||||||||||||||
| for i := range records { | ||||||||||||||||||||||||||
| if err := validatePartitioningOne(meta, &records[i]); err != nil { | ||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // validatePartitioningOne checks one message against the four gates. | ||||||||||||||||||||||||||
| // Split out of validatePartitioning so the outer loop stays under the | ||||||||||||||||||||||||||
| // cyclop limit. | ||||||||||||||||||||||||||
| func validatePartitioningOne(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { | ||||||||||||||||||||||||||
| if meta.PartitionCount > 1 { | ||||||||||||||||||||||||||
| return validatePartitioningPartitioned(meta, rec) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Classic queue (PartitionCount <= 1): the only invalid state is | ||||||||||||||||||||||||||
| // a non-zero Partition field — the dump is internally | ||||||||||||||||||||||||||
| // inconsistent (classic-queue keyspace has no partition concept). | ||||||||||||||||||||||||||
| if rec.Partition != nil && *rec.Partition != 0 { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodeInvalidMessage, | ||||||||||||||||||||||||||
| "queue %q (partition_count=%d): classic queue message %q has partition=%d", | ||||||||||||||||||||||||||
| meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // validatePartitioningPartitioned runs the three partitioned-queue | ||||||||||||||||||||||||||
| // gates (missing partition, out-of-range, perQueue routing mismatch). | ||||||||||||||||||||||||||
| // Caller has already verified meta.PartitionCount > 1. | ||||||||||||||||||||||||||
| func validatePartitioningPartitioned(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error { | ||||||||||||||||||||||||||
| // Gate 1: missing partition. Pre-M5-3 dump replayed against a | ||||||||||||||||||||||||||
| // partitioned queue, or M5-3 decoder bug. The operator must | ||||||||||||||||||||||||||
| // re-decode with an M5-3 decoder. | ||||||||||||||||||||||||||
| if rec.Partition == nil { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodeMissingPartition, | ||||||||||||||||||||||||||
| "queue %q (partition_count=%d): message %q missing partition field", | ||||||||||||||||||||||||||
| meta.Name, meta.PartitionCount, rec.MessageID) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Gate 2: out-of-range partition. Dump is malformed. | ||||||||||||||||||||||||||
| if *rec.Partition >= meta.PartitionCount { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodeOutOfRangePartition, | ||||||||||||||||||||||||||
| "queue %q (partition_count=%d): message %q partition=%d out of range", | ||||||||||||||||||||||||||
| meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+354
to
+358
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
For Useful? React with 👍 / 👎. |
||||||||||||||||||||||||||
| // Gate 3: perQueue HT-FIFO with nonzero partition. The live | ||||||||||||||||||||||||||
| // partitionFor collapses every group to partition 0 in perQueue | ||||||||||||||||||||||||||
| // mode; accepting any other partition would write to a lane the | ||||||||||||||||||||||||||
| // live receive never scans. | ||||||||||||||||||||||||||
| if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { | ||||||||||||||||||||||||||
| if *rec.Partition != 0 { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch, | ||||||||||||||||||||||||||
| "queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0", | ||||||||||||||||||||||||||
| meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // Gate 5: perMessageGroupId HT-FIFO partition-hash consistency. | ||||||||||||||||||||||||||
| // For perMessageGroupId queues, the live partitionFor maps each | ||||||||||||||||||||||||||
| // group_id to one partition via FNV-1a hash; the receivers / | ||||||||||||||||||||||||||
| // group-lock reader use that partition value to find messages. | ||||||||||||||||||||||||||
| // A dump line with an in-range but inconsistent partition would | ||||||||||||||||||||||||||
| // be restored under the wrong lane and split a FIFO group across | ||||||||||||||||||||||||||
| // two partition-scoped group-lock keyspaces, allowing concurrent | ||||||||||||||||||||||||||
| // out-of-order delivery. Mirror the hash here. Codex P2 #929. | ||||||||||||||||||||||||||
| want := partitionForGroup(meta, rec.MessageGroupID) | ||||||||||||||||||||||||||
| if *rec.Partition != want { | ||||||||||||||||||||||||||
| return errors.Wrapf(ErrSQSEncodePartitionHashMismatch, | ||||||||||||||||||||||||||
| "queue %q (partition_count=%d, group_id=%q): message %q partition=%d, partitionFor=%d", | ||||||||||||||||||||||||||
| meta.Name, meta.PartitionCount, rec.MessageGroupID, rec.MessageID, *rec.Partition, want) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // partitionForGroup mirrors adapter/sqs_partitioning.go:partitionFor | ||||||||||||||||||||||||||
| // for use by the encoder's validation gate 5. Same FNV-1a 32-bit | ||||||||||||||||||||||||||
| // inlined hash, same masking on (PartitionCount - 1). MUST be a copy | ||||||||||||||||||||||||||
| // (M3b-3 circular-dep ban: internal/backup cannot import adapter). | ||||||||||||||||||||||||||
| // Returns 0 when partitionFor would: classic queues, perQueue mode, | ||||||||||||||||||||||||||
| // or empty MessageGroupID. | ||||||||||||||||||||||||||
| func partitionForGroup(meta *sqsQueueMetaPublic, messageGroupID string) uint32 { | ||||||||||||||||||||||||||
| if meta == nil || meta.PartitionCount <= 1 { | ||||||||||||||||||||||||||
| return 0 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { | ||||||||||||||||||||||||||
| return 0 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if messageGroupID == "" { | ||||||||||||||||||||||||||
| return 0 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| const ( | ||||||||||||||||||||||||||
| fnv32Offset uint32 = 2166136261 | ||||||||||||||||||||||||||
| fnv32Prime uint32 = 16777619 | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
| hash := fnv32Offset | ||||||||||||||||||||||||||
| for i := 0; i < len(messageGroupID); i++ { | ||||||||||||||||||||||||||
| hash ^= uint32(messageGroupID[i]) | ||||||||||||||||||||||||||
| hash *= fnv32Prime | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| // PartitionCount is power-of-two (live validator-enforced). | ||||||||||||||||||||||||||
| return hash & (meta.PartitionCount - 1) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // sortMessagesForPartitionedEmit sorts messages by | ||||||||||||||||||||||||||
| // (partition, send_timestamp_millis, sequence_number, message_id). | ||||||||||||||||||||||||||
| // Partition is the leading key so per-partition state (maxSeq, future | ||||||||||||||||||||||||||
| // counters) is computed deterministically; the remaining three fields | ||||||||||||||||||||||||||
| // match sortMessagesForEmit (sqs.go:842) for the classic path. | ||||||||||||||||||||||||||
| // validatePartitioning ensures Partition is non-nil whenever the | ||||||||||||||||||||||||||
| // caller invokes this fn (PartitionCount > 1), so the *rec.Partition | ||||||||||||||||||||||||||
| // dereference is safe. | ||||||||||||||||||||||||||
| func sortMessagesForPartitionedEmit(msgs []sqsMessageRecord) { | ||||||||||||||||||||||||||
| partitionOf := func(r *sqsMessageRecord) uint32 { | ||||||||||||||||||||||||||
| if r.Partition == nil { | ||||||||||||||||||||||||||
| return 0 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return *r.Partition | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| sort.SliceStable(msgs, func(i, j int) bool { | ||||||||||||||||||||||||||
| a, b := &msgs[i], &msgs[j] | ||||||||||||||||||||||||||
| pa, pb := partitionOf(a), partitionOf(b) | ||||||||||||||||||||||||||
| switch { | ||||||||||||||||||||||||||
| case pa != pb: | ||||||||||||||||||||||||||
| return pa < pb | ||||||||||||||||||||||||||
| case a.SendTimestampMillis != b.SendTimestampMillis: | ||||||||||||||||||||||||||
| return a.SendTimestampMillis < b.SendTimestampMillis | ||||||||||||||||||||||||||
| case a.SequenceNumber != b.SequenceNumber: | ||||||||||||||||||||||||||
| return a.SequenceNumber < b.SequenceNumber | ||||||||||||||||||||||||||
| default: | ||||||||||||||||||||||||||
| return a.MessageID < b.MessageID | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // addMessage stages one !sqs|msg|data| record and returns the message's | ||||||||||||||||||||||||||
| // sequence number (for the seq-counter computation). Visibility state is | ||||||||||||||||||||||||||
| // zeroed (every message restores visible), matching the decoder default. | ||||||||||||||||||||||||||
| func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec sqsMessageRecord) (uint64, error) { | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // isPartitioned is derived from raw meta.PartitionCount > 1 by the | ||||||||||||||||||||||||||
| // caller, NOT from rec.Partition != nil. The partition uint32 carries | ||||||||||||||||||||||||||
| // the value (0 if the queue is classic). See stageMessageRecords doc | ||||||||||||||||||||||||||
| // for why the key-shape decision MUST be made on the queue-level | ||||||||||||||||||||||||||
| // predicate (codex P1 / gemini critical PR #929). | ||||||||||||||||||||||||||
| func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) { | ||||||||||||||||||||||||||
| if rec.MessageID == "" { | ||||||||||||||||||||||||||
| return 0, errors.Wrap(ErrSQSEncodeInvalidMessage, "message missing message_id") | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
@@ -265,7 +481,12 @@ func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec | |||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||
| return 0, err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| key := sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) | ||||||||||||||||||||||||||
| var key []byte | ||||||||||||||||||||||||||
| if isPartitioned { | ||||||||||||||||||||||||||
| key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID) | ||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||
| key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+484
to
+489
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dispatch the key shape based on the explicit
Suggested change
|
||||||||||||||||||||||||||
| if err := b.Add(key, val, 0); err != nil { | ||||||||||||||||||||||||||
| return 0, err | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
@@ -400,3 +621,67 @@ func sqsMsgDataKeyBytes(queueName string, generation uint64, messageID string) [ | |||||||||||||||||||||||||
| out = binary.BigEndian.AppendUint64(out, generation) | ||||||||||||||||||||||||||
| return append(out, encodeSQSSegment(messageID)...) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // sqsPartitionedQueueTerminator mirrors adapter/sqs_keys.go:82 — the | ||||||||||||||||||||||||||
| // literal '|' that terminates the encoded queue segment (and, in | ||||||||||||||||||||||||||
| // dedup keys, the encoded group segment) inside a partitioned key. | ||||||||||||||||||||||||||
| // encodeSQSSegment uses base64.RawURLEncoding which never emits '|', | ||||||||||||||||||||||||||
| // so the terminator is unambiguous. | ||||||||||||||||||||||||||
| const sqsPartitionedQueueTerminator byte = '|' | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // sqsFifoThroughputPerQueue mirrors adapter/sqs_partitioning.go:37 | ||||||||||||||||||||||||||
| // (htfifoThroughputPerQueue). When this is the FifoThroughputLimit | ||||||||||||||||||||||||||
| // value, partitionFor collapses every group to partition 0 | ||||||||||||||||||||||||||
| // regardless of meta.PartitionCount; the encoder uses it for one | ||||||||||||||||||||||||||
| // fail-closed gate (ErrSQSEncodePartitionRoutingMismatch in slice D) | ||||||||||||||||||||||||||
| // and as an effectivePartitionCount input. CANNOT be imported from | ||||||||||||||||||||||||||
| // adapter — htfifoThroughputPerQueue is unexported (M3b-3 circular | ||||||||||||||||||||||||||
| // dependency pattern). | ||||||||||||||||||||||||||
| const sqsFifoThroughputPerQueue = "perQueue" | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Partitioned key prefixes mirror adapter/sqs_keys.go:91..95. They | ||||||||||||||||||||||||||
| // are the legacy prefixes with the partitioned discriminator "p|" | ||||||||||||||||||||||||||
| // appended. | ||||||||||||||||||||||||||
| const ( | ||||||||||||||||||||||||||
| SQSPartitionedMsgDataPrefix = SQSMsgDataPrefix + sqsPartitionedDiscriminator | ||||||||||||||||||||||||||
| SQSPartitionedMsgVisPrefix = SQSMsgVisPrefix + sqsPartitionedDiscriminator | ||||||||||||||||||||||||||
| SQSPartitionedMsgByAgePrefix = SQSMsgByAgePrefix + sqsPartitionedDiscriminator | ||||||||||||||||||||||||||
| SQSPartitionedMsgDedupPrefix = SQSMsgDedupPrefix + sqsPartitionedDiscriminator | ||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // sqsPartitionedMsgDataKeyBytes reproduces | ||||||||||||||||||||||||||
| // adapter/sqs_keys.go:sqsPartitionedMsgDataKey for a partitioned | ||||||||||||||||||||||||||
| // queue: prefix + base64url(queue) + '|' + BE-u32(partition) + | ||||||||||||||||||||||||||
| // BE-u64(gen) + base64url(messageID). Mirrored locally because | ||||||||||||||||||||||||||
| // internal/backup/ cannot import adapter (M3b-3 pattern). | ||||||||||||||||||||||||||
| func sqsPartitionedMsgDataKeyBytes(queueName string, partition uint32, generation uint64, messageID string) []byte { | ||||||||||||||||||||||||||
| out := []byte(SQSPartitionedMsgDataPrefix) | ||||||||||||||||||||||||||
| out = append(out, encodeSQSSegment(queueName)...) | ||||||||||||||||||||||||||
| out = append(out, sqsPartitionedQueueTerminator) | ||||||||||||||||||||||||||
| out = binary.BigEndian.AppendUint32(out, partition) | ||||||||||||||||||||||||||
| out = binary.BigEndian.AppendUint64(out, generation) | ||||||||||||||||||||||||||
| return append(out, encodeSQSSegment(messageID)...) | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // effectivePartitionCount mirrors adapter/sqs_keys_dispatch.go:121 | ||||||||||||||||||||||||||
| // (which operates on the unexported *adapter.sqsQueueMeta). Returns | ||||||||||||||||||||||||||
| // 1 for nil meta or PartitionCount <= 1, 1 when FifoThroughputLimit | ||||||||||||||||||||||||||
| // == "perQueue" (the live router collapses every group to partition | ||||||||||||||||||||||||||
| // 0 in that mode), otherwise meta.PartitionCount. | ||||||||||||||||||||||||||
| // | ||||||||||||||||||||||||||
| // NOTE: this helper is NOT used by the encoder's validation gates — | ||||||||||||||||||||||||||
| // the M5-3 design pins those gates on raw meta.PartitionCount > 1 | ||||||||||||||||||||||||||
| // (codex P2 v914 v7). Kept here for diagnostics, ReceiveMessage scan | ||||||||||||||||||||||||||
| // fan-out cross-checks, and future symmetry with the adapter; using | ||||||||||||||||||||||||||
| // it in a gate predicate would silently let a perQueue dump with | ||||||||||||||||||||||||||
| // Partition == nil emit classic keys against a partitioned-keyspace | ||||||||||||||||||||||||||
| // queue. | ||||||||||||||||||||||||||
| func effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 { | ||||||||||||||||||||||||||
| if meta == nil || meta.PartitionCount <= 1 { | ||||||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue { | ||||||||||||||||||||||||||
| return 1 | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return meta.PartitionCount | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using
partition != nilas the sole indicator to determine whether to write partitioned keys introduces a critical correctness bug. If a classic queue (PartitionCount <= 1) message has an explicitPartitionfield set to0(which is allowed byvalidatePartitioningOne),records[i].Partitionwill be non-nil. This causesaddMessageandaddSideRecordsto write partitioned keys (!sqs|msg|data|p|...) instead of classic keys (!sqs|msg|data|...). Since the live reader for a classic queue only scans the classic keyspace, these restored messages will be completely invisible, leading to silent data loss on restore.To fix this, decouple the key shape decision from the pointer nullability by passing
isPartitioned bool(derived frommeta.PartitionCount > 1) and the rawpartition uint32value toaddMessageandaddSideRecords.