Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 307 additions & 22 deletions internal/backup/encode_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"os"
"path/filepath"
"sort"
"strconv"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -54,10 +55,39 @@ var (
// ErrSQSEncodeNotRegular is returned when a dump file is not a regular
// file (symlink / FIFO / device / directory).
ErrSQSEncodeNotRegular = errors.New("backup: sqs dump file is not a regular file")
// ErrSQSEncodeUnsupportedPartitioned is returned for HT-FIFO queues
// (partition_count > 1), whose partitioned key family this slice does
// not yet reproduce.
ErrSQSEncodeUnsupportedPartitioned = errors.New("backup: sqs partitioned (HT-FIFO) queue not yet supported by encoder")
// ErrSQSEncodeMissingPartition fires when a partitioned queue
// (meta.PartitionCount > 1) has a message whose Partition field is
// nil. Pre-M5-3 dumps lack the field entirely; replaying one against
// a partitioned queue would silently route every message to
// partition 0 via the `partition != nil` dispatch in addMessage,
// while the live readers scan the partitioned keyspace selected
// from raw PartitionCount > 1 — i.e. classic-shape keys against a
// partitioned reader = invisible on first read. Pinned by
// TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue.
ErrSQSEncodeMissingPartition = errors.New("backup: sqs partitioned queue message missing partition field")
// ErrSQSEncodeOutOfRangePartition fires when a partitioned-queue
// message carries *Partition >= meta.PartitionCount. The dump is
// malformed (operator hand-edit or a future M5-3 decoder bug).
ErrSQSEncodeOutOfRangePartition = errors.New("backup: sqs message partition out of range for queue partition_count")
// ErrSQSEncodePartitionRoutingMismatch fires when a partitioned
// FIFO queue with FifoThroughputLimit == "perQueue" has a message
// with *Partition != 0. The live partitionFor
// (adapter/sqs_partitioning.go:71-72) forces every group to
// partition 0 in perQueue mode regardless of PartitionCount, and
// ReceiveMessage only scans the partition-0 lane. Accepting any
// other partition value would restore messages onto |p|N|... lanes
// the live receive fan-out never visits — silent data loss on
// first read. Codex P2 v914 v4 caught this gap.
ErrSQSEncodePartitionRoutingMismatch = errors.New("backup: sqs perQueue HT-FIFO queue must keep all messages on partition 0")
// ErrSQSEncodePartitionHashMismatch fires for a perMessageGroupId
// HT-FIFO message whose Partition disagrees with
// partitionFor(MessageGroupID). The live send/receive/group-lock
// path uses partitionFor's FNV-1a-mod-PartitionCount mapping
// (adapter/sqs_partitioning.go:64); restoring under the wrong
// lane would split a FIFO group across two partition-scoped
// group-lock keyspaces and break FIFO order on first read. Codex
// P2 #929.
ErrSQSEncodePartitionHashMismatch = errors.New("backup: sqs perMessageGroupId HT-FIFO message partition disagrees with partitionFor(message_group_id)")
)

// sqsStoredQueueMeta mirrors the live adapter's sqsQueueMeta JSON shape
Expand Down Expand Up @@ -159,10 +189,6 @@ func (e *SQSRecordEncoder) encodeQueue(b *snapshotBuilder, root *os.Root, queueD
if meta.Name == "" {
return errors.Wrapf(ErrSQSEncodeInvalidQueue, "%s/_queue.json: empty queue name", queueDir)
}
if meta.PartitionCount > 1 {
return errors.Wrapf(ErrSQSEncodeUnsupportedPartitioned,
"%s: partition_count %d", queueDir, meta.PartitionCount)
}
if err := e.addQueueMeta(b, meta); err != nil {
return err
}
Expand Down Expand Up @@ -205,18 +231,28 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root
if err != nil {
return err
}
var maxSeq uint64
for i := range records {
seq, err := e.addMessage(b, meta.Name, records[i])
if err != nil {
return err
}
if err := e.addSideRecords(b, meta.Name, &meta, &records[i]); err != nil {
return err
}
if seq > maxSeq {
maxSeq = seq
}
// Fail-closed validation up-front so a malformed dump never stages
// partial records. Uses raw meta.PartitionCount > 1 as the
// partitioned-queue predicate (NOT effectivePartitionCount; codex
// P2 v914 v7 - using the effective count would allow a perQueue
// dump with Partition == nil to slip past the missing-partition
// gate, then addMessage's `partition != nil` dispatch would emit
// classic-shape keys against a partitioned-keyspace queue, making
// every restored message invisible).
if err := validatePartitioning(&meta, records); err != nil {
return err
}
// Per-partition deterministic emission for partitioned queues. The
// snapshotBuilder sorts by key bytes on WriteTo so this does not
// directly affect the .fsm byte output, but it makes the in-loop
// state (maxSeq, future per-partition counters) deterministic and
// matches the design doc's stated contract.
if meta.PartitionCount > 1 {
sortMessagesForPartitionedEmit(records)
}
maxSeq, err := e.stageMessageRecords(b, &meta, records)
if err != nil {
return err
}
if meta.FIFO && maxSeq > 0 {
// The live FIFO send path (adapter/sqs_fifo.go: loadFifoSequence
Expand All @@ -235,10 +271,190 @@ func (e *SQSRecordEncoder) encodeQueueMessages(b *snapshotBuilder, root *os.Root
return nil
}

// stageMessageRecords iterates the (pre-validated, pre-sorted) records
// and stages each one's data + side records on b, returning the
// maximum SequenceNumber observed (used to write the FIFO seq counter).
// Split out of encodeQueueMessages so the parent stays under cyclop.
//
// isPartitioned is derived once from raw meta.PartitionCount > 1 — NOT
// from rec.Partition != nil. Codex P1 / gemini critical (PR #929):
// a classic queue dump (PartitionCount<=1) may carry an explicit
// rec.Partition = &0 that validatePartitioningOne lets through (gate 4
// only rejects *Partition != 0). Routing on `partition != nil` would
// emit partitioned keys for that classic queue, and the live reader
// only scans the classic keyspace - the messages become invisible on
// first read. Keying the dispatch on isPartitioned (a property of the
// QUEUE, not the message) closes that gap.
func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
var maxSeq uint64
isPartitioned := meta.PartitionCount > 1
for i := range records {
var partition uint32
if isPartitioned && records[i].Partition != nil {
partition = *records[i].Partition
}
seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i])
if err != nil {
return 0, err
}
if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil {
return 0, err
}
if seq > maxSeq {
maxSeq = seq
}
}
return maxSeq, nil
}
Comment on lines +288 to +308
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using partition != nil as the sole indicator to determine whether to write partitioned keys introduces a critical correctness bug. If a classic queue (PartitionCount <= 1) message has an explicit Partition field set to 0 (which is allowed by validatePartitioningOne), records[i].Partition will be non-nil. This causes addMessage and addSideRecords to write partitioned keys (!sqs|msg|data|p|...) instead of classic keys (!sqs|msg|data|...). Since the live reader for a classic queue only scans the classic keyspace, these restored messages will be completely invisible, leading to silent data loss on restore.

To fix this, decouple the key shape decision from the pointer nullability by passing isPartitioned bool (derived from meta.PartitionCount > 1) and the raw partition uint32 value to addMessage and addSideRecords.

func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
	var maxSeq uint64
	isPartitioned := meta.PartitionCount > 1
	for i := range records {
		var partition uint32
		if isPartitioned && records[i].Partition != nil {
			partition = *records[i].Partition
		}
		seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i])
		if err != nil {
			return 0, err
		}
		if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil {
			return 0, err
		}
		if seq > maxSeq {
			maxSeq = seq
		}
	}
	return maxSeq, nil
}


// validatePartitioning runs the four fail-closed gates from the M5-3
// design doc §"Validation invariants" before any message is staged.
// All four use raw meta.PartitionCount > 1 as the partitioned-queue
// predicate, never effectivePartitionCount (codex P2 v914 v7).
func validatePartitioning(meta *sqsQueueMetaPublic, records []sqsMessageRecord) error {
for i := range records {
if err := validatePartitioningOne(meta, &records[i]); err != nil {
return err
}
}
return nil
}

// validatePartitioningOne checks one message against the four gates.
// Split out of validatePartitioning so the outer loop stays under the
// cyclop limit.
func validatePartitioningOne(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
if meta.PartitionCount > 1 {
return validatePartitioningPartitioned(meta, rec)
}
// Classic queue (PartitionCount <= 1): the only invalid state is
// a non-zero Partition field — the dump is internally
// inconsistent (classic-queue keyspace has no partition concept).
if rec.Partition != nil && *rec.Partition != 0 {
return errors.Wrapf(ErrSQSEncodeInvalidMessage,
"queue %q (partition_count=%d): classic queue message %q has partition=%d",
meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition)
}
return nil
}

// validatePartitioningPartitioned runs the three partitioned-queue
// gates (missing partition, out-of-range, perQueue routing mismatch).
// Caller has already verified meta.PartitionCount > 1.
func validatePartitioningPartitioned(meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
// Gate 1: missing partition. Pre-M5-3 dump replayed against a
// partitioned queue, or M5-3 decoder bug. The operator must
// re-decode with an M5-3 decoder.
if rec.Partition == nil {
return errors.Wrapf(ErrSQSEncodeMissingPartition,
"queue %q (partition_count=%d): message %q missing partition field",
meta.Name, meta.PartitionCount, rec.MessageID)
}
// Gate 2: out-of-range partition. Dump is malformed.
if *rec.Partition >= meta.PartitionCount {
return errors.Wrapf(ErrSQSEncodeOutOfRangePartition,
"queue %q (partition_count=%d): message %q partition=%d out of range",
meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition)
}
Comment on lines +354 to +358
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject partitions that do not match the message group hash

For perMessageGroupId HT-FIFO dumps, an in-range but wrong partition still passes these gates and is then restored under that wrong lane. Live sends route each MessageGroupId with partitionFor (adapter/sqs_partitioning.go:96-103), while receives/group locks use the candidate lane (adapter/sqs_messages.go:1205-1206, adapter/sqs_fifo.go:292-294), so a malformed or hand-edited dump can split the same FIFO group across two partitioned group-lock keyspaces and allow concurrent/out-of-order delivery after restore. Please mirror the hash check here (except for perQueue, already handled) before emitting the partitioned keys.

Useful? React with 👍 / 👎.

// Gate 3: perQueue HT-FIFO with nonzero partition. The live
// partitionFor collapses every group to partition 0 in perQueue
// mode; accepting any other partition would write to a lane the
// live receive never scans.
if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue {
if *rec.Partition != 0 {
return errors.Wrapf(ErrSQSEncodePartitionRoutingMismatch,
"queue %q (partition_count=%d, fifo_throughput_limit=%q): message %q partition=%d, want 0",
meta.Name, meta.PartitionCount, meta.FifoThroughputLimit, rec.MessageID, *rec.Partition)
}
return nil
}
// Gate 5: perMessageGroupId HT-FIFO partition-hash consistency.
// For perMessageGroupId queues, the live partitionFor maps each
// group_id to one partition via FNV-1a hash; the receivers /
// group-lock reader use that partition value to find messages.
// A dump line with an in-range but inconsistent partition would
// be restored under the wrong lane and split a FIFO group across
// two partition-scoped group-lock keyspaces, allowing concurrent
// out-of-order delivery. Mirror the hash here. Codex P2 #929.
want := partitionForGroup(meta, rec.MessageGroupID)
if *rec.Partition != want {
return errors.Wrapf(ErrSQSEncodePartitionHashMismatch,
"queue %q (partition_count=%d, group_id=%q): message %q partition=%d, partitionFor=%d",
meta.Name, meta.PartitionCount, rec.MessageGroupID, rec.MessageID, *rec.Partition, want)
}
return nil
}

// partitionForGroup mirrors adapter/sqs_partitioning.go:partitionFor
// for use by the encoder's validation gate 5. Same FNV-1a 32-bit
// inlined hash, same masking on (PartitionCount - 1). MUST be a copy
// (M3b-3 circular-dep ban: internal/backup cannot import adapter).
// Returns 0 when partitionFor would: classic queues, perQueue mode,
// or empty MessageGroupID.
func partitionForGroup(meta *sqsQueueMetaPublic, messageGroupID string) uint32 {
if meta == nil || meta.PartitionCount <= 1 {
return 0
}
if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue {
return 0
}
if messageGroupID == "" {
return 0
}
const (
fnv32Offset uint32 = 2166136261
fnv32Prime uint32 = 16777619
)
hash := fnv32Offset
for i := 0; i < len(messageGroupID); i++ {
hash ^= uint32(messageGroupID[i])
hash *= fnv32Prime
}
// PartitionCount is power-of-two (live validator-enforced).
return hash & (meta.PartitionCount - 1)
}

// sortMessagesForPartitionedEmit sorts messages by
// (partition, send_timestamp_millis, sequence_number, message_id).
// Partition is the leading key so per-partition state (maxSeq, future
// counters) is computed deterministically; the remaining three fields
// match sortMessagesForEmit (sqs.go:842) for the classic path.
// validatePartitioning ensures Partition is non-nil whenever the
// caller invokes this fn (PartitionCount > 1), so the *rec.Partition
// dereference is safe.
func sortMessagesForPartitionedEmit(msgs []sqsMessageRecord) {
partitionOf := func(r *sqsMessageRecord) uint32 {
if r.Partition == nil {
return 0
}
return *r.Partition
}
sort.SliceStable(msgs, func(i, j int) bool {
a, b := &msgs[i], &msgs[j]
pa, pb := partitionOf(a), partitionOf(b)
switch {
case pa != pb:
return pa < pb
case a.SendTimestampMillis != b.SendTimestampMillis:
return a.SendTimestampMillis < b.SendTimestampMillis
case a.SequenceNumber != b.SequenceNumber:
return a.SequenceNumber < b.SequenceNumber
default:
return a.MessageID < b.MessageID
}
})
}

// addMessage stages one !sqs|msg|data| record and returns the message's
// sequence number (for the seq-counter computation). Visibility state is
// zeroed (every message restores visible), matching the decoder default.
func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec sqsMessageRecord) (uint64, error) {
//
// isPartitioned is derived from raw meta.PartitionCount > 1 by the
// caller, NOT from rec.Partition != nil. The partition uint32 carries
// the value (0 if the queue is classic). See stageMessageRecords doc
// for why the key-shape decision MUST be made on the queue-level
// predicate (codex P1 / gemini critical PR #929).
func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) {
if rec.MessageID == "" {
return 0, errors.Wrap(ErrSQSEncodeInvalidMessage, "message missing message_id")
}
Expand All @@ -265,7 +481,12 @@ func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, rec
if err != nil {
return 0, err
}
key := sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
var key []byte
if isPartitioned {
key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}
Comment on lines +484 to +489
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Dispatch the key shape based on the explicit isPartitioned boolean rather than the pointer nullability.

Suggested change
var key []byte
if partition != nil {
key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}
var key []byte
if isPartitioned {
key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}

if err := b.Add(key, val, 0); err != nil {
return 0, err
}
Expand Down Expand Up @@ -400,3 +621,67 @@ func sqsMsgDataKeyBytes(queueName string, generation uint64, messageID string) [
out = binary.BigEndian.AppendUint64(out, generation)
return append(out, encodeSQSSegment(messageID)...)
}

// sqsPartitionedQueueTerminator mirrors adapter/sqs_keys.go:82 — the
// literal '|' that terminates the encoded queue segment (and, in
// dedup keys, the encoded group segment) inside a partitioned key.
// encodeSQSSegment uses base64.RawURLEncoding which never emits '|',
// so the terminator is unambiguous.
const sqsPartitionedQueueTerminator byte = '|'

// sqsFifoThroughputPerQueue mirrors adapter/sqs_partitioning.go:37
// (htfifoThroughputPerQueue). When this is the FifoThroughputLimit
// value, partitionFor collapses every group to partition 0
// regardless of meta.PartitionCount; the encoder uses it for one
// fail-closed gate (ErrSQSEncodePartitionRoutingMismatch in slice D)
// and as an effectivePartitionCount input. CANNOT be imported from
// adapter — htfifoThroughputPerQueue is unexported (M3b-3 circular
// dependency pattern).
const sqsFifoThroughputPerQueue = "perQueue"

// Partitioned key prefixes mirror adapter/sqs_keys.go:91..95. They
// are the legacy prefixes with the partitioned discriminator "p|"
// appended.
const (
SQSPartitionedMsgDataPrefix = SQSMsgDataPrefix + sqsPartitionedDiscriminator
SQSPartitionedMsgVisPrefix = SQSMsgVisPrefix + sqsPartitionedDiscriminator
SQSPartitionedMsgByAgePrefix = SQSMsgByAgePrefix + sqsPartitionedDiscriminator
SQSPartitionedMsgDedupPrefix = SQSMsgDedupPrefix + sqsPartitionedDiscriminator
)

// sqsPartitionedMsgDataKeyBytes reproduces
// adapter/sqs_keys.go:sqsPartitionedMsgDataKey for a partitioned
// queue: prefix + base64url(queue) + '|' + BE-u32(partition) +
// BE-u64(gen) + base64url(messageID). Mirrored locally because
// internal/backup/ cannot import adapter (M3b-3 pattern).
func sqsPartitionedMsgDataKeyBytes(queueName string, partition uint32, generation uint64, messageID string) []byte {
out := []byte(SQSPartitionedMsgDataPrefix)
out = append(out, encodeSQSSegment(queueName)...)
out = append(out, sqsPartitionedQueueTerminator)
out = binary.BigEndian.AppendUint32(out, partition)
out = binary.BigEndian.AppendUint64(out, generation)
return append(out, encodeSQSSegment(messageID)...)
}

// effectivePartitionCount mirrors adapter/sqs_keys_dispatch.go:121
// (which operates on the unexported *adapter.sqsQueueMeta). Returns
// 1 for nil meta or PartitionCount <= 1, 1 when FifoThroughputLimit
// == "perQueue" (the live router collapses every group to partition
// 0 in that mode), otherwise meta.PartitionCount.
//
// NOTE: this helper is NOT used by the encoder's validation gates —
// the M5-3 design pins those gates on raw meta.PartitionCount > 1
// (codex P2 v914 v7). Kept here for diagnostics, ReceiveMessage scan
// fan-out cross-checks, and future symmetry with the adapter; using
// it in a gate predicate would silently let a perQueue dump with
// Partition == nil emit classic keys against a partitioned-keyspace
// queue.
func effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 {
if meta == nil || meta.PartitionCount <= 1 {
return 1
}
if meta.FifoThroughputLimit == sqsFifoThroughputPerQueue {
return 1
}
return meta.PartitionCount
}
Loading
Loading