diff --git a/adapter/sqs_batch_send_dedup_test.go b/adapter/sqs_batch_send_dedup_test.go new file mode 100644 index 00000000..8371bfe2 --- /dev/null +++ b/adapter/sqs_batch_send_dedup_test.go @@ -0,0 +1,180 @@ +package adapter + +import ( + "context" + "testing" + + "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// recordingBatchCoordinator records the mutation keys of each dispatch and +// fails the FIRST dispatch with a retryable WriteConflict — modelling a +// committed-but-conflicted attempt under leader churn (the apply may have +// landed, but the dispatch surfaced WriteConflict). The second dispatch +// succeeds. It embeds stubAdapterCoordinator for the Coordinator surface +// (IsLeader, Clock, ...) and only overrides Dispatch. +type recordingBatchCoordinator struct { + stubAdapterCoordinator + dispatchKeys [][]string + // beforeDispatch, if set, runs at the start of each Dispatch with the + // 1-based dispatch number — lets a test mutate queue state between attempts. + beforeDispatch func(n int) +} + +func (c *recordingBatchCoordinator) Dispatch(_ context.Context, req *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) { + if c.beforeDispatch != nil { + c.beforeDispatch(len(c.dispatchKeys) + 1) + } + keys := make([]string, 0, len(req.Elems)) + for _, e := range req.Elems { + keys = append(keys, string(e.Key)) + } + c.dispatchKeys = append(c.dispatchKeys, keys) + if len(c.dispatchKeys) == 1 { + return nil, store.ErrWriteConflict + } + return &kv.CoordinateResponse{}, nil +} + +func seedStandardQueue(t *testing.T, st store.MVCCStore) { + const name = "q" + t.Helper() + meta := &sqsQueueMeta{ + Name: name, + Generation: 1, + MaximumMessageSize: 262144, + IsFIFO: false, + } + body, err := encodeSQSQueueMeta(meta) + require.NoError(t, err) + require.NoError(t, st.PutAt(context.Background(), sqsQueueMetaKey(name), body, 1, 0)) +} + +// TestSendMessageBatchStandard_RetryReusesStableKeys pins the standard-queue +// batch-send dedup fix: when the first dispatch commits-then-conflicts under +// leader churn, the retry must reuse the SAME storage keys (stable MessageIDs + +// timestamps minted once before the loop) so the committed attempt is +// overwritten idempotently. Without the fix the retry re-mints fresh +// MessageIDs, writing a SECOND copy of every entry at new keys — the +// :duplicate-elements double-send (same class as DynamoDB PR #920). +func TestSendMessageBatchStandard_RetryReusesStableKeys(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + seedStandardQueue(t, st) + + coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}} + srv := &SQSServer{store: st, coordinator: coord} + + entries := []sqsSendMessageBatchEntryInput{ + {Id: "a", MessageBody: "alpha"}, + {Id: "b", MessageBody: "bravo"}, + } + successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries) + require.NoError(t, err) + require.Empty(t, failed) + require.Len(t, successful, 2) + + require.Len(t, coord.dispatchKeys, 2, "attempt 1 conflicts (committed-but-conflicted), attempt 2 retries") + require.NotEmpty(t, coord.dispatchKeys[0]) + require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1], + "the retry MUST reuse the same storage keys; fresh MessageIDs would double-send every entry under leader churn") +} + +// TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange pins codex P2: +// if a SetQueueAttributes changes the queue's DelaySeconds between a +// committed-but-conflicted attempt and its retry, the retry must still write +// the SAME vis/by-age keys (AvailableAtMillis is captured in the identity at +// first mint, not recomputed from the changed delay) — otherwise the first +// attempt's vis index entry is orphaned and can redeliver the message. +func TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + seedStandardQueue(t, st) // DelaySeconds = 0 + + coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}} + coord.beforeDispatch = func(n int) { + if n != 2 { + return + } + // SetQueueAttributes raises DelaySeconds to 900 before the retry. + meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 262144, DelaySeconds: 900} + body, err := encodeSQSQueueMeta(meta) + require.NoError(t, err) + require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0)) + } + srv := &SQSServer{store: st, coordinator: coord} + + entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // no per-message DelaySeconds + successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries) + require.NoError(t, err) + require.Len(t, successful, 1) + + require.Len(t, coord.dispatchKeys, 2) + require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1], + "vis/by-age keys must stay stable across a mid-retry DelaySeconds change (cached AvailableAtMillis)") +} + +// TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry pins codex +// P2 round-2: if a prior attempt minted the identity but did NOT commit (lost a +// normal OCC race) and a concurrent SetQueueAttributes then lowered +// MaximumMessageSize below the body, the retry must re-validate against the +// fresh meta and REJECT the entry rather than commit it on the cached identity. +func TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + seedStandardQueue(t, st) // MaximumMessageSize = 262144 + + coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}} + coord.beforeDispatch = func(n int) { + if n != 1 { + return + } + // SetQueueAttributes lowers MaximumMessageSize below the body size, + // landing during attempt 1's dispatch (which does not commit — it + // returns a WriteConflict). The retry then loads this lowered meta + // BEFORE re-validating, so the entry must be rejected. + meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 3} + body, err := encodeSQSQueueMeta(meta) + require.NoError(t, err) + require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0)) + } + srv := &SQSServer{store: st, coordinator: coord} + + entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // 5 bytes > new limit 3 + successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries) + require.NoError(t, err) + require.Empty(t, successful, "the retry must re-validate against the lowered limit and reject the entry") + require.Len(t, failed, 1) + require.Equal(t, "a", failed[0].Id) +} + +// TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry pins that +// the MessageIds reported to the client are the ones actually stored on the +// retry (not a discarded first-attempt set), so a consumer cannot observe a +// MessageId the client never received. +func TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry(t *testing.T) { + t.Parallel() + ctx := context.Background() + st := store.NewMVCCStore() + seedStandardQueue(t, st) + + coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}} + srv := &SQSServer{store: st, coordinator: coord} + + entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} + successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries) + require.NoError(t, err) + require.Len(t, successful, 1) + + // The data key embeds the MessageID; both attempts must carry the + // returned MessageID so the client's id matches what is stored. + wantDataKey := string(sqsMsgDataKeyDispatch(&sqsQueueMeta{Name: "q", Generation: 1}, "q", 0, 1, successful[0].MessageId)) + for i, keys := range coord.dispatchKeys { + require.Contains(t, keys, wantDataKey, "dispatch %d must write the returned MessageID's data key", i) + } +} diff --git a/adapter/sqs_messages.go b/adapter/sqs_messages.go index 368f5819..086cdb83 100644 --- a/adapter/sqs_messages.go +++ b/adapter/sqs_messages.go @@ -653,28 +653,78 @@ func resolveSendDelay(meta *sqsQueueMeta, requested *int64) (int64, error) { return *requested, nil } -func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) { +// sqsSendIdentity is the per-message identity that MUST stay stable across a +// retry loop. Every field feeds the message's storage keys: the data key via +// MessageID; the vis key via AvailableAtMillis; the by-age key via +// SendTimestampMillis. If they were re-minted on every retry attempt, a +// committed-but-conflicted attempt under leader churn plus the recomputed retry +// would land at two different key sets, double-sending the message (the +// :duplicate-elements class fixed for DynamoDB in PR #920). +// +// AvailableAtMillis (= sendTs + resolved delay) is captured here too, NOT +// recomputed per attempt: otherwise a SetQueueAttributes that changes the +// queue's DelaySeconds between a committed-but-conflicted attempt and its retry +// would shift the vis key and leave the first attempt's vis entry orphaned +// (codex P2 on PR #923) — a stale visibility index entry that can redeliver the +// message. The batch send path lazily mints one identity per entry on its first +// standard-path attempt and reuses it on every retry, so the keys are stable. +type sqsSendIdentity struct { + messageID string + token []byte + sendTsMillis int64 + availableAtMillis int64 +} + +// newSendIdentity mints a fresh stable identity for the given resolved delay. +// Single-message sends call this inline via buildSendRecord (they have no +// in-process retry, so a self-inflicted conflict surfaces to the client as a +// normal at-least-once SDK retry); the batch path mints once per entry and +// reuses across its retry loop. +func newSendIdentity(delay int64) (sqsSendIdentity, error) { messageID, err := newMessageIDHex() if err != nil { - return nil, nil, errors.WithStack(err) + return sqsSendIdentity{}, errors.WithStack(err) } token, err := newReceiptToken() if err != nil { - return nil, nil, errors.WithStack(err) + return sqsSendIdentity{}, errors.WithStack(err) } now := time.Now().UnixMilli() - availableAt := now + delay*sqsMillisPerSecond + return sqsSendIdentity{ + messageID: messageID, + token: token, + sendTsMillis: now, + availableAtMillis: now + delay*sqsMillisPerSecond, + }, nil +} + +func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) { + id, err := newSendIdentity(delay) + if err != nil { + return nil, nil, err + } + return buildSendRecordWithIdentity(meta, in, id) +} + +// buildSendRecordWithIdentity builds the message record from a caller-supplied +// stable identity (MessageID + token + timestamps) instead of minting a fresh +// one. The record value embeds the current meta.Generation, so callers re-invoke +// this per retry attempt with the freshly-read generation while keeping the +// identity fixed — the keys stay stable for a given generation (idempotent +// overwrite under leader churn) and follow the generation if a concurrent +// DeleteQueue/PurgeQueue bumps it. +func buildSendRecordWithIdentity(meta *sqsQueueMeta, in sqsSendMessageInput, id sqsSendIdentity) (*sqsMessageRecord, []byte, error) { body := []byte(in.MessageBody) rec := &sqsMessageRecord{ - MessageID: messageID, + MessageID: id.messageID, Body: body, MD5OfBody: sqsMD5Hex(body), MD5OfMessageAttributes: md5OfAttributesHex(in.MessageAttributes), MessageAttributes: in.MessageAttributes, - SendTimestampMillis: now, - AvailableAtMillis: availableAt, - VisibleAtMillis: availableAt, - CurrentReceiptToken: token, + SendTimestampMillis: id.sendTsMillis, + AvailableAtMillis: id.availableAtMillis, + VisibleAtMillis: id.availableAtMillis, + CurrentReceiptToken: id.token, QueueGeneration: meta.Generation, MessageGroupId: in.MessageGroupId, MessageDeduplicationId: in.MessageDeduplicationId, diff --git a/adapter/sqs_messages_batch.go b/adapter/sqs_messages_batch.go index 6a9c61ce..6b65cebf 100644 --- a/adapter/sqs_messages_batch.go +++ b/adapter/sqs_messages_batch.go @@ -121,10 +121,21 @@ func (s *SQSServer) sendMessageBatchWithRetry( queueName string, entries []sqsSendMessageBatchEntryInput, ) ([]sqsSendMessageBatchResultEntry, []sqsBatchResultErrorEntry, error) { + // Per-entry stable identities, indexed by entry position and shared (by + // reference) across every retry attempt. The standard-queue path keys each + // message by its random MessageID and send-derived timestamps, so re-minting + // per attempt would land a committed-but-conflicted attempt's messages at one + // key set and the retry's at another — double-sending every entry under + // leader churn. The standard path lazily mints each identity on its first + // attempt (sendBatchStandardOnce) and reuses it thereafter, so the keys stay + // stable. Left zero-valued here: FIFO entries never touch these (they mint + // their own per-entry dedup-fenced identity), so FIFO batches pay no + // crypto/rand cost for identities they ignore. + identities := make([]sqsSendIdentity, len(entries)) backoff := transactRetryInitialBackoff deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { - successful, failed, retry, err := s.trySendMessageBatchOnce(ctx, queueName, entries) + successful, failed, retry, err := s.trySendMessageBatchOnce(ctx, queueName, entries, identities) if err != nil { return nil, nil, err } @@ -154,6 +165,7 @@ func (s *SQSServer) trySendMessageBatchOnce( ctx context.Context, queueName string, entries []sqsSendMessageBatchEntryInput, + identities []sqsSendIdentity, ) ([]sqsSendMessageBatchResultEntry, []sqsBatchResultErrorEntry, bool, error) { readTS := s.nextTxnReadTS(ctx) meta, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) @@ -166,7 +178,7 @@ func (s *SQSServer) trySendMessageBatchOnce( if meta.IsFIFO { return s.sendBatchFifoEntries(ctx, queueName, meta, entries) } - return s.sendBatchStandardOnce(ctx, queueName, meta, entries, readTS) + return s.sendBatchStandardOnce(ctx, queueName, meta, entries, identities, readTS) } // sendBatchStandardOnce is the original single-OCC fast path for @@ -177,6 +189,7 @@ func (s *SQSServer) sendBatchStandardOnce( queueName string, meta *sqsQueueMeta, entries []sqsSendMessageBatchEntryInput, + identities []sqsSendIdentity, readTS uint64, ) ([]sqsSendMessageBatchResultEntry, []sqsBatchResultErrorEntry, bool, error) { successful := make([]sqsSendMessageBatchResultEntry, 0, len(entries)) @@ -187,8 +200,12 @@ func (s *SQSServer) sendBatchStandardOnce( // against. const opsPerEntry = 3 elems := make([]*kv.Elem[kv.OP], 0, opsPerEntry*len(entries)) - for _, entry := range entries { - rec, recordBytes, apiErr := buildBatchSendRecord(meta, entry) + for i, entry := range entries { + // &identities[i] is the stable per-entry identity, lazily minted on the + // first attempt and reused on every retry; reusing it keeps the storage + // keys constant across retries so a committed-but-conflicted attempt is + // overwritten idempotently rather than double-sent. + rec, recordBytes, apiErr := buildBatchSendRecord(meta, entry, &identities[i]) if apiErr != nil { failed = append(failed, batchErrorEntryFromAPIErr(entry.Id, apiErr)) continue @@ -386,20 +403,21 @@ func (s *SQSServer) resolveFreshFifoSnapshot(ctx context.Context, queueName stri return meta, dedupID, delay, nil } -// buildBatchSendRecord runs every per-entry validation a single -// SendMessage would, but returns the *sqsAPIError so the batch path -// can drop the entry into Failed[] instead of failing the whole -// request. -func buildBatchSendRecord(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput) (*sqsMessageRecord, []byte, error) { - if len(entry.MessageBody) == 0 { - return nil, nil, newSQSAPIError(http.StatusBadRequest, sqsErrValidation, "MessageBody is required") - } - if int64(len(entry.MessageBody)) > meta.MaximumMessageSize { - return nil, nil, newSQSAPIError(http.StatusBadRequest, sqsErrMessageTooLong, "message body exceeds MaximumMessageSize") - } - if err := validateMessageAttributes(entry.MessageAttributes); err != nil { - return nil, nil, err - } +// buildBatchSendRecord builds one standard-queue batch entry's record. It splits +// two concerns that must NOT be conflated: +// +// - Validation against the CURRENT meta runs on EVERY attempt (`validateBatchEntry`). +// A prior, uncommitted attempt may have minted the identity; if a concurrent +// SetQueueAttributes then tightened a limit (e.g. lowered MaximumMessageSize), +// the retry must still reject a now-invalid entry rather than commit it +// (codex P2 round-2). +// - The stable identity (MessageID + token + timestamps incl. AvailableAtMillis) +// is minted ONCE, on the first attempt, and reused. This keeps the storage +// keys constant across retries so a committed-but-conflicted attempt is +// overwritten idempotently, and so a mid-retry DelaySeconds change cannot +// shift the vis key (codex P2 round-1). AvailableAtMillis is captured from +// the first attempt's resolved delay. +func buildBatchSendRecord(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput, id *sqsSendIdentity) (*sqsMessageRecord, []byte, error) { asSingle := sqsSendMessageInput{ MessageBody: entry.MessageBody, DelaySeconds: entry.DelaySeconds, @@ -407,14 +425,41 @@ func buildBatchSendRecord(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInpu MessageGroupId: entry.MessageGroupId, MessageDeduplicationId: entry.MessageDeduplicationId, } - if err := validateSendFIFOParams(meta, asSingle); err != nil { - return nil, nil, err - } - delay, err := resolveSendDelay(meta, entry.DelaySeconds) + delay, err := validateBatchEntry(meta, entry, asSingle) if err != nil { return nil, nil, err } - return buildSendRecord(meta, asSingle, delay) + if id.messageID == "" { + minted, mintErr := newSendIdentity(delay) + if mintErr != nil { + return nil, nil, mintErr + } + *id = minted + } + return buildSendRecordWithIdentity(meta, asSingle, *id) +} + +// validateBatchEntry runs every per-attempt check for a standard batch entry +// against the currently-loaded queue meta and returns the resolved delay. It is +// invoked on EVERY attempt (not just the first mint) so a concurrent +// SetQueueAttributes that tightened limits is honored on the retry. The returned +// delay seeds AvailableAtMillis only on the first mint; on retries the cached +// identity's AvailableAtMillis is reused, so re-resolving here never shifts the +// keys — it only re-checks bounds. +func validateBatchEntry(meta *sqsQueueMeta, entry sqsSendMessageBatchEntryInput, asSingle sqsSendMessageInput) (int64, error) { + if len(entry.MessageBody) == 0 { + return 0, newSQSAPIError(http.StatusBadRequest, sqsErrValidation, "MessageBody is required") + } + if int64(len(entry.MessageBody)) > meta.MaximumMessageSize { + return 0, newSQSAPIError(http.StatusBadRequest, sqsErrMessageTooLong, "message body exceeds MaximumMessageSize") + } + if err := validateMessageAttributes(entry.MessageAttributes); err != nil { + return 0, err + } + if err := validateSendFIFOParams(meta, asSingle); err != nil { + return 0, err + } + return resolveSendDelay(meta, entry.DelaySeconds) } // ------------------------ DeleteMessageBatch ------------------------ diff --git a/docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md b/docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md index 17db158c..4533c57c 100644 --- a/docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md +++ b/docs/design/2026_06_03_partial_dynamodb_onephase_dedup.md @@ -330,8 +330,46 @@ every replica applying the same log entry. the workload classifies as a definite `:fail` (excluded from the history), so it does not produce the duplicate. Bringing it under reuse-dedup is tractable but doubles the test matrix; defer it. -- **Out of scope:** S3 / SQS adapters (same `readTS`-then-mutate pattern, same - latent bug class) — separate docs once this lands as the template. +- **S3 / SQS — audited (2026-06-03).** A handler-by-handler adversarial sweep + found that the original "same latent bug class" claim was over-broad. The + DynamoDB bug is specific to a read-modify-write that **recomputes a growing + value** (`list_append`) at a **stable key** on a self-conflict retry. The + audit verdicts: + - **S3 — no duplicate possible.** Object writes are whole-value PUTs at a + stable key (`PutObject`, `CompleteMultipartUpload` whose manifest is + re-assembled deterministically from the immutable uploaded parts, not from + a re-read of the object value). `PutObject` carries a caller-supplied + `StartTS`, so a leader-move coordinator retry that races an interleaved + write fails the OCC fence rather than clobbering it; it has no in-process + re-read-recompute retry. No `list_append`-style growth, no recompute-into-a- + different-value → no `:duplicate-elements`. + - **SQS single `SendMessage` / FIFO — safe.** Single send keys the message by + a random `MessageId` with no in-process retry (a conflict surfaces to the + client as a normal at-least-once SDK retry). FIFO send is fenced by the + `MessageDeduplicationId` dedup record (committed atomically with the + sequence), whose hit short-circuits any retry. + - **SQS standard `SendMessageBatch` — HAD the bug, FIXED here.** Unlike single + send, the batch path added an **in-process** retry loop + (`sendMessageBatchWithRetry`) that re-minted fresh random `MessageId`s (and + send timestamps) per attempt, so a committed-but-conflicted attempt plus the + retry double-sent every entry. Fixed by pre-generating one stable + `sqsSendIdentity` per entry **before** the retry loop and reusing it on + every attempt (`buildSendRecordWithIdentity`), so the retry overwrites the + same keys idempotently. This needs **no** FSM probe / `PrevCommitTS` / gate — + the keys become content-stable, so re-applying is a plain idempotent + overwrite. The identity also pins `AvailableAtMillis` (vis-key input) and the + per-entry validation re-runs against the current meta on every attempt, so a + mid-retry `SetQueueAttributes` neither shifts the vis key (codex P2 round-1) + nor lets a now-too-large body through (codex P2 round-2). Tests: + `adapter/sqs_batch_send_dedup_test.go`. + - **Residual edge (within at-least-once, no action):** if attempt 1 + *commits* (committed-but-conflicted) and a concurrent `SetQueueAttributes` + tightens a limit (e.g. lowers `MaximumMessageSize`) before the retry, the + retry's re-validation rejects the entry into `Failed[]` even though it is + already in the queue — an inconsistent client view (message stored, client + told it failed) but never a double-send. This is within the SQS + standard-queue at-least-once contract; distinguishing committed-vs-not + would need a dedup probe, which is out of proportion for this corner. ## Milestones