Skip to content

backup: Phase 0b M5-3 impl - SQS partitioned-FIFO reverse encoder#929

Open
bootjp wants to merge 6 commits into
mainfrom
feat/m5-3-partitioned-impl
Open

backup: Phase 0b M5-3 impl - SQS partitioned-FIFO reverse encoder#929
bootjp wants to merge 6 commits into
mainfrom
feat/m5-3-partitioned-impl

Conversation

@bootjp
Copy link
Copy Markdown
Owner

@bootjp bootjp commented Jun 4, 2026

Summary

Phase 0b M5-3 implementation — SQS partitioned-FIFO reverse encoder.
The design doc landed via PR #914 (merged 2026-06-03). This PR carries
the implementation that the doc described, in 4 commits matching the
slice plan:

  • Slice A+B (235c13ba): decoder partition plumbing —
    parseSQSMessageDataKey widens to (enc, partition, isPartitioned, err),
    sqsMessageRecord.Partition *uint32, HandleMessageData wires the
    parsed partition.
  • Slice C (6640674a): mirror the four partitioned key constructors
    (sqsPartitionedMsgDataKeyBytes, …Vis…, …ByAge…, …Dedup…) plus
    effectivePartitionCount and the "perQueue" constant. M3b-3
    circular-dep pattern — internal/backup/ cannot import adapter/.
  • Slice D (1037deba): drop ErrSQSEncodeUnsupportedPartitioned, add
    four new sentinels (ErrSQSEncodeMissingPartition,
    ErrSQSEncodeOutOfRangePartition,
    ErrSQSEncodePartitionRoutingMismatch), add validatePartitioning +
    sortMessagesForPartitionedEmit, thread partition *uint32 through
    addMessage and addSideRecords.
  • Slice E (7603bf78): 10 tests — the 8 design-doc rows plus
    TestSQSEncodeGateUsesRawPartitionCount (codex P2 v914 v7 regression)
    and TestSQSEncodePartitionedSortStableAcrossPartitions.

Validation gates

All four use raw meta.PartitionCount > 1 as the partitioned-queue
predicate, never effectivePartitionCount. Codex P2 v914 v7 demonstrated
that effective-count gates would silently allow a perQueue +
PartitionCount=2 + Partition==nil dump to slip past validation, then
addMessage's partition != nil branch would emit classic-shape keys
against a partitioned-keyspace queue (silent data loss on first read).

Gate Predicate Sentinel
missing partition PartitionCount > 1 && Partition == nil ErrSQSEncodeMissingPartition
out-of-range PartitionCount > 1 && *Partition >= PartitionCount ErrSQSEncodeOutOfRangePartition
classic + nonzero PartitionCount <= 1 && *Partition != 0 ErrSQSEncodeInvalidMessage
perQueue + nonzero PartitionCount > 1 && perQueue && *Partition != 0 ErrSQSEncodePartitionRoutingMismatch

5-lens self-review

  1. Data loss — All four gates enforce key-shape ↔ predicate
    consistency. Classic-queue tests stay green
    (Partition==nil + PartitionCount<=1 takes legacy path).
  2. Concurrency — Offline encoder; no new goroutines / locks /
    shared state.
  3. Performance — One extra O(N) pre-walk over records. No extra
    allocations. effectivePartitionCount not on the hot path.
  4. Data consistency — Raw PartitionCount > 1 selects key shape in
    both this encoder AND the live adapter/sqs_keys_dispatch.go — they
    agree. Partitioned dedup key includes <group-seg>+|+<dedup-seg>
    (CodeRabbit major PR feat(sqs): wire partitioned-FIFO data plane through dispatch helpers (Phase 3.D PR 5b-2) #732 round 6).
  5. Test coverage — 10 new tests; 4 are explicit gate regressions for
    prior review findings.

Test plan

  • go test ./internal/backup/ passes
  • golangci-lint run --config=.golangci.yaml ./internal/backup/ 0 issues
  • Bot review cycle

bootjp added 4 commits June 4, 2026 21:49
Per design doc PR #914 §'Decoder lift'. Widens the two partitioned-key
parsers to surface the partition u32 they already extract internally,
and adds the sqsMessageRecord.Partition *uint32 field so the parsed
value reaches the dump JSON.

Changes:
- parseSQSPartitionedQueueAndTrailer: (string, error) -> (string, uint32, error).
  The partition was already read off the trailer (binary.BigEndian.Uint32);
  return it instead of discarding.
- parseSQSMessageDataKey: (string, error) -> (string, uint32, bool, error).
  The third return signals whether the key was the partitioned shape
  (so HandleMessageData knows when to attach Partition to the record).
  Classic-key path returns (enc, 0, false, nil) - byte-identical with
  the pre-M5-3 contract.
- parseSQSGenericKey: signature unchanged externally. The new
  partition return from parseSQSPartitionedQueueAndTrailer is
  intentionally discarded - side-record dispatch routes by queue name
  only (HandleSideRecord wraps it). Comment pins the contract.
- HandleMessageData: wires rec.Partition = &partition when
  isPartitioned == true. Classic call site leaves rec.Partition == nil,
  preserving legacy-dump round-trip.
- sqsMessageRecord.Partition *uint32 with omitempty. Pointer (not
  uint32) so partition-0 messages are distinguishable from absent
  (codex P1 v914 v2; design doc decision matrix).

Caller audit (semantic change to two parse fns):
- parseSQSMessageDataKey production callers: HandleMessageData only.
  Updated to 4-tuple in this commit. 3 test sites also updated.
- parseSQSPartitionedQueueAndTrailer production callers:
  parseSQSMessageDataKey + parseSQSGenericKey only. Both updated.
- parseSQSGenericKey signature unchanged - no caller audit needed
  for that symbol.

go test ./internal/backup/ passes.
Per design doc PR #914 §'Encoder lift'. Adds the four partitioned-key
constructors (data, vis, byage, dedup) plus the effectivePartitionCount
helper into internal/backup/, mirroring adapter/sqs_keys.go and
adapter/sqs_keys_dispatch.go.

internal/backup/ cannot import adapter (M3b-3 circular-dep pattern;
see existing sqsFifoDedupWindowMillis mirror at encode_sqs_side.go:15).
The constructors and constants are duplicated locally instead.

New symbols (encode_sqs.go):
- const sqsPartitionedQueueTerminator byte = '|' (mirror of adapter:82)
- const sqsFifoThroughputPerQueue = "perQueue" (mirror of unexported
  htfifoThroughputPerQueue at adapter/sqs_partitioning.go:37)
- const SQSPartitionedMsg{Data,Vis,ByAge,Dedup}Prefix (mirror of adapter
  Sqs* prefixes at adapter/sqs_keys.go:91..95)
- sqsPartitionedMsgDataKeyBytes
- effectivePartitionCount(meta *sqsQueueMetaPublic) uint32 - mirror of
  adapter helper, operating on the dump-side struct. Doc comment
  explicitly pins that this is NOT used by encoder validation gates
  (codex P2 v914 v7) - those use raw meta.PartitionCount > 1.

New symbols (encode_sqs_side.go):
- sqsPartitionedMsgVisKeyBytes
- sqsPartitionedMsgByAgeKeyBytes
- sqsPartitionedMsgDedupKeyBytes - includes the second '|' between the
  variable-length group and dedup segments (CodeRabbit major PR #732
  round 6; design doc §line 19) to prevent FNV-collision false-dups.

Tests (new file encode_sqs_partitioned_test.go):
- TestSQSEncodePartitionedMsg{Data,Vis,ByAge,Dedup}KeyByteShape -
  byte-equality assertions vs manually-assembled expected bytes
  (since direct adapter cross-check is impossible).
- TestSQSEncodePartitionedMsgDedupKeyByteShape also pins the byte at
  the group+dedup terminator offset so a future change that drops the
  '|' regressing the FNV-collision class fails this test.
- TestSQSEffectivePartitionCount - 6-case table covering nil meta,
  PartitionCount<=1, perQueue collapse, perGroup, and unset throughput.

Pure additions: no behavior change yet. Slice D will wire these into
addMessage / addSideRecords with the validation gates and the new
sentinels.
…gh emit

Per design doc PR #914 §'Encoder lift' and §'Validation invariants'.
Drops ErrSQSEncodeUnsupportedPartitioned and replaces it with the four
fail-closed validation gates from the design's decision matrix, plus
partition-aware dispatch in addMessage / addSideRecords.

Sentinel changes:
- Remove ErrSQSEncodeUnsupportedPartitioned (no longer reachable).
- Add ErrSQSEncodeMissingPartition (partitioned + Partition==nil).
- Add ErrSQSEncodeOutOfRangePartition (*Partition >= PartitionCount).
- Add ErrSQSEncodePartitionRoutingMismatch (perQueue + nonzero
  partition).

Validation: validatePartitioning(meta, records) runs the four gates
before any record is staged. All four use raw meta.PartitionCount > 1
as the partitioned-queue predicate, never effectivePartitionCount
(codex P2 v914 v7 demonstrated that effective-count gates would let
a perQueue dump with Partition==nil emit classic keys against a
partitioned-keyspace queue - silent data loss).

Split into validatePartitioningOne + validatePartitioningPartitioned
to keep cyclomatic complexity under the 10-cyclop limit.

Sort: sortMessagesForPartitionedEmit sorts by (partition, send_ts,
sequence_number, message_id). Classic sortMessagesForEmit at
sqs.go:842 is unchanged (used by the decoder's writeMessagesJSONL).

addMessage signature: (b, queueName, partition *uint32, rec) - new
partition param selects sqsPartitionedMsgDataKeyBytes when non-nil,
classic sqsMsgDataKeyBytes when nil.

addSideRecords signature: (b, queueName, partition *uint32, meta, rec)
- new partition param selects the partitioned (vis, byage, dedup)
constructors when non-nil; dedup additionally takes rec.MessageGroupID
for the partitioned <group-seg>+|+<dedup-seg> shape.

Extracted stageMessageRecords from encodeQueueMessages to keep that
function under cyclop as well.

Caller audit (semantic changes - confirmed single-site each):
- addMessage: encodeQueueMessages only (now via stageMessageRecords).
- addSideRecords: encodeQueueMessages only (now via stageMessageRecords).
- validatePartitioning: encodeQueueMessages only.
- sortMessagesForPartitionedEmit: encodeQueueMessages only.
- ErrSQSEncodeUnsupportedPartitioned production site at the old
  encode_sqs.go:163 deleted. The one test
  (TestSQSEncodeRejectsPartitioned) is removed in this commit; the
  four new gate tests land in Slice E.

go test + golangci-lint on ./internal/backup/ both pass.

Classic-queue tests remain green: Partition==nil + PartitionCount<=1
takes the legacy paths unchanged in both addMessage and
addSideRecords.
Per design doc PR #914 §'Test plan'. Adds the design's eight test rows
plus two regression tests pinning the codex P2 v914 v7 fix and the
4-field sort stability.

Tests added (encode_sqs_partitioned_test.go):
- TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue - gate 1
  (codex P1 v914 v2).
- TestSQSEncodeRejectsOutOfRangePartition - gate 2.
- TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO - gate 3
  (codex P2 v914 v4).
- TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue - gate 4.
- TestSQSEncodeGateUsesRawPartitionCount - codex P2 v914 v7
  regression. Pins that gates use raw meta.PartitionCount > 1 rather
  than effectivePartitionCount, which would otherwise let a perQueue
  + PartitionCount=2 + Partition==nil dump slip past validation and
  silently emit classic keys against a partitioned-keyspace queue.
- TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip - pre-M5-3
  classic dumps still round-trip with Partition==nil throughout.
- TestSQSEncodePartitionedQueueRoundTrip - 3-message partitioned
  queue emits data/vis/byage/dedup with |p| prefix; no classic-shape
  data/vis/byage/dedup keys leak.
- TestSQSEncodePartitionedDedupBuildsGroupSegment - partitioned dedup
  key shape matches sqsPartitionedMsgDedupKeyBytes byte-for-byte.
- TestSQSEncodePartitionedSideRecordsByteCrossCheck - vis + byage +
  dedup all byte-equal the constructors (M5-2-style in-package
  cross-check; adapter constructors are unexported).
- TestSQSEncodePartitionedSortStableAcrossPartitions - 4-field sort
  groups by partition first, then send_ts, then sequence_number.

Constructor adjustment (encode_sqs_side.go):
- sqsPartitionedMsgDedupKeyBytes: drop the gen parameter (always
  sqsRestoreGeneration on the reverse-encode path). Mirrors the
  classic sqsMsgDedupKeyBytes hardcode pattern and satisfies the
  unparam lint. Comment updated to explain the divergence from the
  adapter-side variable-gen signature.

go test + golangci-lint clean on ./internal/backup/.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Jun 4, 2026

Warning

Review limit reached

@bootjp, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 44 minutes and 31 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ed2f6066-86e6-4c8a-b6ab-de6560dd8721

📥 Commits

Reviewing files that changed from the base of the PR and between 0ed19ec and 0460e28.

📒 Files selected for processing (6)
  • internal/backup/encode_sqs.go
  • internal/backup/encode_sqs_partitioned_test.go
  • internal/backup/encode_sqs_side.go
  • internal/backup/encode_sqs_test.go
  • internal/backup/sqs.go
  • internal/backup/sqs_test.go
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/m5-3-partitioned-impl

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7603bf7877

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread internal/backup/encode_sqs.go Outdated
func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
var maxSeq uint64
for i := range records {
seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i])
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid emitting partitioned keys for classic partition 0

When a classic queue dump (partition_count <= 1) includes an explicit "partition": 0, validatePartitioningOne allows it, but this unconditional records[i].Partition argument makes addMessage/addSideRecords choose the partitioned |p| key constructors. The restored queue metadata still selects the classic keyspace, so those messages and indexes are written where the live readers for that queue will not scan them; either reject any non-nil partition on classic queues or pass nil unless meta.PartitionCount > 1.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements support for partitioned SQS queues in the backup encoder and decoder, introducing validation gates for partitioning invariants, deterministic message sorting, and partitioned key-generation helpers. The code review highlights a critical correctness bug where relying solely on the nullability of the partition pointer to determine the key shape can cause classic queues with an explicit partition of 0 to write partitioned keys, leading to silent data loss on restore. The reviewer suggests resolving this by explicitly passing an isPartitioned boolean and the raw partition value to addMessage and addSideRecords.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +269 to +284
func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
var maxSeq uint64
for i := range records {
seq, err := e.addMessage(b, meta.Name, records[i].Partition, records[i])
if err != nil {
return 0, err
}
if err := e.addSideRecords(b, meta.Name, records[i].Partition, meta, &records[i]); err != nil {
return 0, err
}
if seq > maxSeq {
maxSeq = seq
}
}
return maxSeq, nil
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Using partition != nil as the sole indicator to determine whether to write partitioned keys introduces a critical correctness bug. If a classic queue (PartitionCount <= 1) message has an explicit Partition field set to 0 (which is allowed by validatePartitioningOne), records[i].Partition will be non-nil. This causes addMessage and addSideRecords to write partitioned keys (!sqs|msg|data|p|...) instead of classic keys (!sqs|msg|data|...). Since the live reader for a classic queue only scans the classic keyspace, these restored messages will be completely invisible, leading to silent data loss on restore.

To fix this, decouple the key shape decision from the pointer nullability by passing isPartitioned bool (derived from meta.PartitionCount > 1) and the raw partition uint32 value to addMessage and addSideRecords.

func (e *SQSRecordEncoder) stageMessageRecords(b *snapshotBuilder, meta *sqsQueueMetaPublic, records []sqsMessageRecord) (uint64, error) {
	var maxSeq uint64
	isPartitioned := meta.PartitionCount > 1
	for i := range records {
		var partition uint32
		if isPartitioned && records[i].Partition != nil {
			partition = *records[i].Partition
		}
		seq, err := e.addMessage(b, meta.Name, isPartitioned, partition, records[i])
		if err != nil {
			return 0, err
		}
		if err := e.addSideRecords(b, meta.Name, isPartitioned, partition, meta, &records[i]); err != nil {
			return 0, err
		}
		if seq > maxSeq {
			maxSeq = seq
		}
	}
	return maxSeq, nil
}

Comment thread internal/backup/encode_sqs.go Outdated
// partition is non-nil only when the queue is partitioned
// (meta.PartitionCount > 1); validatePartitioning enforces this
// invariant before encodeQueueMessages invokes addMessage.
func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the addMessage signature to accept isPartitioned bool and partition uint32 instead of partition *uint32 to prevent the classic-queue partitioned key generation bug.

Suggested change
func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, partition *uint32, rec sqsMessageRecord) (uint64, error) {
func (e *SQSRecordEncoder) addMessage(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, rec sqsMessageRecord) (uint64, error) {

Comment on lines +411 to +416
var key []byte
if partition != nil {
key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Dispatch the key shape based on the explicit isPartitioned boolean rather than the pointer nullability.

Suggested change
var key []byte
if partition != nil {
key = sqsPartitionedMsgDataKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}
var key []byte
if isPartitioned {
key = sqsPartitionedMsgDataKeyBytes(queueName, partition, sqsRestoreGeneration, rec.MessageID)
} else {
key = sqsMsgDataKeyBytes(queueName, sqsRestoreGeneration, rec.MessageID)
}

Comment thread internal/backup/encode_sqs_side.go Outdated
// SendTimestampMs + sqsFifoDedupWindowMillis. CBD queues get a SHA-256
// derived dedup-id (matches adapter/sqs_fifo.go resolveFifoDedupID).
func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Update the addSideRecords signature to accept isPartitioned bool and partition uint32 instead of partition *uint32 to prevent writing partitioned side records for classic queues.

Suggested change
func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, partition *uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {
func (e *SQSRecordEncoder) addSideRecords(b *snapshotBuilder, queueName string, isPartitioned bool, partition uint32, meta *sqsQueueMetaPublic, rec *sqsMessageRecord) error {

Comment on lines +190 to +197
var visKey, byAgeKey []byte
if partition != nil {
visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
} else {
visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Dispatch the visibility and age index key shapes based on the explicit isPartitioned boolean.

Suggested change
var visKey, byAgeKey []byte
if partition != nil {
visKey = sqsPartitionedMsgVisKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, *partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
} else {
visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
}
var visKey, byAgeKey []byte
if isPartitioned {
visKey = sqsPartitionedMsgVisKeyBytes(queueName, partition, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsPartitionedMsgByAgeKeyBytes(queueName, partition, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
} else {
visKey = sqsMsgVisKeyBytes(queueName, sqsRestoreGeneration, rec.AvailableAtMillis, rec.MessageID)
byAgeKey = sqsMsgByAgeKeyBytes(queueName, sqsRestoreGeneration, rec.SendTimestampMillis, rec.MessageID)
}

Comment on lines +222 to +232
var dedupKey []byte
if partition != nil {
// Partitioned dedup includes <group-seg> + '|' + <dedup-seg>
// per the partitioned dedup constructor — see
// sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit
// major PR #732 round 6: the second '|' prevents FNV-collapse
// across distinct (groupID, dedupID) pairs on the same partition).
dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID)
} else {
dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Dispatch the deduplication key shape based on the explicit isPartitioned boolean.

Suggested change
var dedupKey []byte
if partition != nil {
// Partitioned dedup includes <group-seg> + '|' + <dedup-seg>
// per the partitioned dedup constructor — see
// sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit
// major PR #732 round 6: the second '|' prevents FNV-collapse
// across distinct (groupID, dedupID) pairs on the same partition).
dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, *partition, rec.MessageGroupID, dedupID)
} else {
dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID)
}
var dedupKey []byte
if isPartitioned {
// Partitioned dedup includes <group-seg> + '|' + <dedup-seg>
// per the partitioned dedup constructor — see
// sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit
// major PR #732 round 6: the second '|' prevents FNV-collapse
// across distinct (groupID, dedupID) pairs on the same partition).
dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID)
} else {
dedupKey = sqsMsgDedupKeyBytes(queueName, dedupID)
}

… (codex P1 #929 + gemini critical)

Bug: a classic queue dump (PartitionCount<=1) with an explicit
rec.Partition=&0 silently emitted partitioned-shape keys
(!sqs|msg|data|p|0|...). validatePartitioningOne gate 4 lets *Partition==0
through, but addMessage/addSideRecords routed on 'partition != nil'
and the live reader for a classic queue only scans the classic
keyspace - the messages become invisible on first read (silent data
loss).

Root cause: the key-shape decision was tied to a per-MESSAGE
property (rec.Partition != nil), but the keyspace selection is a
per-QUEUE property (meta.PartitionCount > 1). They CAN disagree.

Fix: introduce (isPartitioned bool, partition uint32) signatures for
addMessage and addSideRecords, derived once from raw
meta.PartitionCount > 1 in stageMessageRecords. Dispatch branches now
key on isPartitioned, not pointer nullability.

Caller audit (signature change, semantic-changing):
- addMessage callers: stageMessageRecords only (single site, updated).
- addSideRecords callers: stageMessageRecords only (single site,
  updated).
- rec.Partition retains its *uint32 type and gate 4 invariant; the
  encoder simply ignores Partition when isPartitioned==false rather
  than reading the value off the message.

Regression test:
TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys
sends a classic queue with explicit 'partition: 0' through the
encoder and verifies no |p| keys leak. Would fail under the buggy
dispatch.

Addresses codex P1 #929 and 6 gemini critical findings (line 284 +
384 + 416 in encode_sqs.go; line 187 + 197 + 232 in
encode_sqs_side.go) — all the same root cause.
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1a5540ea17

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +345 to +349
if *rec.Partition >= meta.PartitionCount {
return errors.Wrapf(ErrSQSEncodeOutOfRangePartition,
"queue %q (partition_count=%d): message %q partition=%d out of range",
meta.Name, meta.PartitionCount, rec.MessageID, *rec.Partition)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject partitions that do not match the message group hash

For perMessageGroupId HT-FIFO dumps, an in-range but wrong partition still passes these gates and is then restored under that wrong lane. Live sends route each MessageGroupId with partitionFor (adapter/sqs_partitioning.go:96-103), while receives/group locks use the candidate lane (adapter/sqs_messages.go:1205-1206, adapter/sqs_fifo.go:292-294), so a malformed or hand-edited dump can split the same FIFO group across two partitioned group-lock keyspaces and allow concurrent/out-of-order delivery after restore. Please mirror the hash check here (except for perQueue, already handled) before emitting the partitioned keys.

Useful? React with 👍 / 👎.

@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Jun 4, 2026

@claude review

Addresses codex P1 + 6 gemini critical findings (same root cause):

A classic queue dump (PartitionCount<=1) with an explicit Partition=&0 was silently emitting partitioned-shape keys (!sqs|msg|data|p|0|...). Gate 4 of validatePartitioningOne allowed *Partition==0, but addMessage / addSideRecords dispatched on partition != nil — the message-level pointer rather than the queue-level meta.PartitionCount > 1. Live readers for a classic queue only scan the classic keyspace, so restored messages would have been invisible.

Fix: addMessage and addSideRecords now take (isPartitioned bool, partition uint32) instead of partition *uint32. stageMessageRecords derives isPartitioned from raw meta.PartitionCount > 1 once per queue.

Caller audit: both functions have a single call site (stageMessageRecords), updated. rec.Partition *uint32 shape preserved; the encoder simply ignores it when isPartitioned == false.

Regression pinned by TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys — would fail under the buggy dispatch.

@claude
Copy link
Copy Markdown

claude Bot commented Jun 4, 2026

Claude finished @bootjp's task in 2m 15s —— View job


Code Review — commit 1a5540e

  • Load tools
  • Read changed files
  • Verify fix correctness
  • Check regression test coverage
  • Review remaining open issue (Codex P2)

P1 / Gemini critical fix — LGTM ✓

The root cause is correctly diagnosed and the fix is sound. The key insight is that the key-shape decision belongs to the queue (meta.PartitionCount > 1), not the message (rec.Partition != nil). Gate 4 of validatePartitioningOne intentionally allows classic-queue messages with *Partition == 0 through (field present but semantically no-op), so an isPartitioned predicate derived from the message pointer was always the wrong signal.

Trace through the fixed path:

Input isPartitioned partition Key emitted
classic, Partition==nil false 0 (unused) !sqs|msg|data|...
classic, *Partition==0 (was buggy) false 0 (unused) !sqs|msg|data|... ✓ (fixed)
partitioned, *Partition==1 true 1 !sqs|msg|data|p|...|1...

The same isPartitioned signal correctly threads through addSideRecords (vis, byage, dedup keys).

One defensive note (encode_sqs.go:284):

var partition uint32
if isPartitioned && records[i].Partition != nil {
    partition = *records[i].Partition
}

The records[i].Partition == nil branch inside isPartitioned=true is unreachable — validatePartitioning gate 1 (ErrSQSEncodeMissingPartition) would have already aborted. The defensive nil guard is safe, but consider adding an inline comment that clarifies it's a belt-and-suspenders guard against future callers that bypass validatePartitioning.


Regression test coverage — Solid ✓

Test What it pins
TestSQSEncodeClassicQueueWithExplicitPartitionZeroUsesClassicKeys The exact bug — verifies no `
TestSQSEncodeRejectsMissingPartitionOnPartitionedQueue Gate 1 — nil partition on partitioned queue
TestSQSEncodeRejectsOutOfRangePartition Gate 2 — *Partition >= PartitionCount
TestSQSEncodeRejectsNonzeroPartitionOnPerQueueHTFIFO Gate 3 — perQueue with *Partition != 0
TestSQSEncodeRejectsNonZeroPartitionOnClassicQueue Gate 4 — classic with non-zero partition
TestSQSEncodeGateUsesRawPartitionCount Pins that effective-count is never used in gates
TestSQSEncodePartitioned{Data,Vis,ByAge,Dedup}KeyByteShape Wire-format byte assertions
TestSQSEncodePartitionedSortStableAcrossPartitions Partition-major sort ordering
TestSQSEncodeLegacyDumpsWithoutPartitionStillRoundTrip Backwards compat: pre-M5-3 dumps still round-trip

The TestSQSEncodePartitionedQueueRoundTrip test verifies no classic-shape keys bleed through for a partitioned queue, and TestSQSEncodePartitionedSideRecordsByteCrossCheck cross-checks all three side-record key shapes against their constructors.


Codex P2 — still open (not claimed by this PR)

The Codex P2 finding at encode_sqs.go:349 (13:31 review) is still unresolved: for perMessageGroupId HT-FIFO queues, a message whose *Partition is in-range but wrong for its MessageGroupID passes all four gates and restores to the wrong partition lane. Live sends hash MessageGroupID via partitionFor (adapter/sqs_partitioning.go:96-103); ReceiveMessage and group locks use the same lane, so a malformed dump can scatter the same FIFO group across two partitioned group-lock keyspaces and allow concurrent or out-of-order delivery after restore.

This is a separate concern from the P1 (which this commit correctly fixes), but it's worth tracking before merge. Mirroring partitionFor locally would hit the usual M3b-3 circular-dep constraint — the hash function and the perMessageGroupId constant would need to be reproduced in internal/backup/ the same way sqsFifoThroughputPerQueue was. A fifth gate like *Partition != partitionFor(rec.MessageGroupID, meta.PartitionCount) && meta.FifoThroughputLimit == "perMessageGroupId" would close it.

Bottom line: P1 fix is correct and complete. The isPartitioned bool decoupling properly closes the silent-data-loss path on classic queues with explicit Partition=&0. Test coverage is thorough. P2 hash validation for perMessageGroupId queues warrants a follow-up before this merges.

…y (codex P2 #929)

For perMessageGroupId HT-FIFO queues the live partitionFor maps each
MessageGroupId to one partition via FNV-1a-mod-PartitionCount
(adapter/sqs_partitioning.go:64). Receivers and group-lock readers
use partitionFor's mapping to locate messages and lock state. An
in-range but inconsistent partition in a dump would restore under
the wrong lane, splitting a FIFO group across two partition-scoped
group-lock keyspaces and allowing concurrent out-of-order delivery
on first read.

Gate 5 mirrors partitionFor inside the encoder and rejects any
perMessageGroupId message whose dump-recorded partition disagrees
with the recomputed hash.

New: partitionForGroup(meta, messageGroupID) - FNV-1a-32 mirror of
adapter/sqs_partitioning.go's inlined hash, masked on PartitionCount-1
(power-of-two validator-enforced live). Returns 0 for classic
queues, perQueue, or empty MessageGroupID.

New sentinel: ErrSQSEncodePartitionHashMismatch.

Caller audit (validation gate addition - widens fail-closed surface):
- validatePartitioningPartitioned: the single caller chain is
  validatePartitioning -> validatePartitioningOne -> ...Partitioned.
  Only invoked from stageMessageRecords. No production callers
  changed.
- partitionForGroup: new helper, callers are validation + tests only.

Test fixture realignment: TestSQSEncodePartitionedSideRecordsByteCrossCheck
used group_id='g0' (hashes to partition 0) with partition=1, which
would now correctly fail gate 5. Changed to group_id='g1' (hashes to
1) to keep the partition=1 byte-shape coverage and remain consistent
with the live hash.

New regression tests:
- TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId - g0+partition=1
  fails closed with ErrSQSEncodePartitionHashMismatch.
- TestSQSEncodePartitionForGroup_LiveAdapterParity - 5-case table
  verifying the mirror matches the live FNV-1a-mod-N spec for
  classic / perQueue / empty group / hash-mod-2 cases.
@bootjp
Copy link
Copy Markdown
Owner Author

bootjp commented Jun 4, 2026

@claude review

Addresses codex P2 #929 (encode_sqs.go:349):

For perMessageGroupId HT-FIFO queues, the live router maps each MessageGroupId to one partition via FNV-1a hash (adapter/sqs_partitioning.go:64). An in-range but inconsistent partition would split a FIFO group across two partition-scoped group-lock keyspaces — out-of-order delivery on first read.

Gate 5 added (ErrSQSEncodePartitionHashMismatch): for perMessageGroupId queues, the encoder mirrors partitionFor via new partitionForGroup helper (FNV-1a-32 + mask on PartitionCount-1) and rejects any message where *rec.Partition != partitionForGroup(meta, rec.MessageGroupID).

Caller audit: gate addition only widens fail-closed surface; validatePartitioningPartitioned is invoked from a single chain (stageMessageRecords). partitionForGroup is a new helper, used only by gate 5 + tests.

Regression: TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId + TestSQSEncodePartitionForGroup_LiveAdapterParity (5-case parity table). Realigned TestSQSEncodePartitionedSideRecordsByteCrossCheck fixture (g0→g1) so its partition value is consistent with the live hash.

@claude
Copy link
Copy Markdown

claude Bot commented Jun 4, 2026

Claude encountered an error after 2m 45s —— View job


Code Review — Codex P2 fix: Gate 5 (ErrSQSEncodePartitionHashMismatch)

  • Load tools
  • Read changed files (encode_sqs.go, encode_sqs_partitioned_test.go, encode_sqs_side.go, sqs.go)
  • Verify partitionForGroup hash parity with live adapter/sqs_partitioning.go:64
  • Review Gate 5 predicate correctness
  • Check regression test coverage
  • Review fixture realignment in TestSQSEncodePartitionedSideRecordsByteCrossCheck

Hash parity — CONFIRMED ✓

partitionForGroup (encode_sqs.go:394) is a byte-for-byte mirror of the live partitionFor (adapter/sqs_partitioning.go:64):

Condition Live partitionFor partitionForGroup
meta == nil return 0 return 0
PartitionCount <= 1 return 0 return 0
FifoThroughputLimit == "perQueue" return 0 return 0
messageGroupID == "" return 0 return 0
Hash constants fnv32Offset=2166136261, fnv32Prime=16777619 identical
Mask hash & (meta.PartitionCount - 1) identical

Both use inlined FNV-1a 32-bit over raw bytes (not via hash/fnv.New32a) and the same power-of-two mask. This means any partitioned queue that was live when the backup was taken will produce the same routing result at restore time.


Gate 5 placement — CORRECT ✓

Gate 5 sits after gate 3 in validatePartitioningPartitioned (encode_sqs.go:344). The ordering is correct:

  1. Gate 1 (nil Partition) short-circuits before pointer dereference
  2. Gate 2 (out-of-range) bounds the value before the hash check
  3. Gate 3 (perQueue) does an early return nil — avoids computing a hash that perQueue routing would never use
  4. Gate 5 fires for everything else: both explicit "perMessageGroupId" and any other non-perQueue FifoThroughputLimit

One minor documentation gap: the error sentinel's comment (encode_sqs.go:83) and the error message say "perMessageGroupId HT-FIFO message", but Gate 5 actually fires for any non-perQueue partitioned queue, including those with an empty FifoThroughputLimit. In practice this doesn't matter — the live validatePartitionConfig effectively requires either "perQueue" or "perMessageGroupId" for any partitioned queue — but the comment is technically narrower than the code. This is harmless but worth a future clarification.


Fixture realignment — CORRECT ✓

The TestSQSEncodePartitionedSideRecordsByteCrossCheck fixture change (g0+partition=0g1+partition=1) is correct. Gate 5 was not present in earlier slices; re-pairing the fixture so its MessageGroupID matches the FNV-1a partition is the right call. The parity table in TestSQSEncodePartitionForGroup_LiveAdapterParity confirms "g1" → 1 for PartitionCount=2. ✓

Unverified manually: TestSQSEncodePartitionedDedupBuildsGroupSegment uses "groupA" with "partition":1. Since Gate 5 is now active and the PR claims go test ./internal/backup/ passes, FNV-1a("groupA") & 1 must equal 1. Worth verifying in CI if not done already.


Regression test coverage — SOLID ✓

Test What it pins
TestSQSEncodeRejectsHashMismatchOnPerMessageGroupId Gate 5 fires for "g0" + partition=1 mismatch
TestSQSEncodePartitionForGroup_LiveAdapterParity 5-case parity table versus the live FNV-1a spec
TestSQSEncodePartitionedSideRecordsByteCrossCheck Realigned fixture "g1"+partition=1, all 3 side-record key shapes
Prior gates 1–4 + prior tests Unaffected; gate 5 sits after gate 3's return nil

The one missing case worth considering: a partitioned queue with FifoThroughputLimit="" (not explicitly set) + hash mismatch — Gate 5 would fire there too, but there's no test for it. Given the live validator enforces explicit FifoThroughputLimit for PartitionCount > 1, this is a theoretical gap only.


Summary

Gate 5 implementation is correct and complete:

  • Hash is a faithful mirror of the live partitionFor
  • Gate placement is correct (after out-of-range, with perQueue early-exit before hash computation)
  • Regression tests cover the main failure mode and the parity invariant
  • Fixture realignment is properly motivated and consistent

Bottom line: LGTM. The only open item before merge is confirming CI passes for the "groupA" fixture in TestSQSEncodePartitionedDedupBuildsGroupSegment (hash parity cannot be verified by code inspection alone for that case).

Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 0460e285eb

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

// sqsPartitionedMsgDedupKeyBytes for the rationale (CodeRabbit
// major PR #732 round 6: the second '|' prevents FNV-collapse
// across distinct (groupID, dedupID) pairs on the same partition).
dedupKey = sqsPartitionedMsgDedupKeyBytes(queueName, partition, rec.MessageGroupID, dedupID)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid rebuilding duplicate partitioned dedup rows

When a partitioned FIFO queue retains two messages in the same MessageGroupId with the same MessageDeduplicationId (or same CBD body) sent more than five minutes apart, the live queue can contain both messages because the earlier dedup entry has expired/been overwritten. This branch reconstructs a dedup row for every restored message using only (queue, partition, group, dedupID), so the later message stages the same user key and snapshotBuilder.Add rejects it as ErrEncodeDuplicateKey, aborting the restore for an otherwise valid queue; only the current/non-expired dedup entry should be rebuilt, or expired duplicates should be skipped.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant