Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 180 additions & 0 deletions adapter/sqs_batch_send_dedup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package adapter

import (
"context"
"testing"

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
)

// recordingBatchCoordinator records the mutation keys of each dispatch and
// fails the FIRST dispatch with a retryable WriteConflict — modelling a
// committed-but-conflicted attempt under leader churn (the apply may have
// landed, but the dispatch surfaced WriteConflict). The second dispatch
// succeeds. It embeds stubAdapterCoordinator for the Coordinator surface
// (IsLeader, Clock, ...) and only overrides Dispatch.
type recordingBatchCoordinator struct {
stubAdapterCoordinator
dispatchKeys [][]string
// beforeDispatch, if set, runs at the start of each Dispatch with the
// 1-based dispatch number — lets a test mutate queue state between attempts.
beforeDispatch func(n int)
}

func (c *recordingBatchCoordinator) Dispatch(_ context.Context, req *kv.OperationGroup[kv.OP]) (*kv.CoordinateResponse, error) {
if c.beforeDispatch != nil {
c.beforeDispatch(len(c.dispatchKeys) + 1)
}
keys := make([]string, 0, len(req.Elems))
for _, e := range req.Elems {
keys = append(keys, string(e.Key))
}
c.dispatchKeys = append(c.dispatchKeys, keys)
if len(c.dispatchKeys) == 1 {
return nil, store.ErrWriteConflict
}
return &kv.CoordinateResponse{}, nil
}

func seedStandardQueue(t *testing.T, st store.MVCCStore) {
const name = "q"
t.Helper()
meta := &sqsQueueMeta{
Name: name,
Generation: 1,
MaximumMessageSize: 262144,
IsFIFO: false,
}
body, err := encodeSQSQueueMeta(meta)
require.NoError(t, err)
require.NoError(t, st.PutAt(context.Background(), sqsQueueMetaKey(name), body, 1, 0))
}

// TestSendMessageBatchStandard_RetryReusesStableKeys pins the standard-queue
// batch-send dedup fix: when the first dispatch commits-then-conflicts under
// leader churn, the retry must reuse the SAME storage keys (stable MessageIDs +
// timestamps minted once before the loop) so the committed attempt is
// overwritten idempotently. Without the fix the retry re-mints fresh
// MessageIDs, writing a SECOND copy of every entry at new keys — the
// :duplicate-elements double-send (same class as DynamoDB PR #920).
func TestSendMessageBatchStandard_RetryReusesStableKeys(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()
seedStandardQueue(t, st)

coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
srv := &SQSServer{store: st, coordinator: coord}

entries := []sqsSendMessageBatchEntryInput{
{Id: "a", MessageBody: "alpha"},
{Id: "b", MessageBody: "bravo"},
}
successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
require.NoError(t, err)
require.Empty(t, failed)
require.Len(t, successful, 2)

require.Len(t, coord.dispatchKeys, 2, "attempt 1 conflicts (committed-but-conflicted), attempt 2 retries")
require.NotEmpty(t, coord.dispatchKeys[0])
require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1],
"the retry MUST reuse the same storage keys; fresh MessageIDs would double-send every entry under leader churn")
}

// TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange pins codex P2:
// if a SetQueueAttributes changes the queue's DelaySeconds between a
// committed-but-conflicted attempt and its retry, the retry must still write
// the SAME vis/by-age keys (AvailableAtMillis is captured in the identity at
// first mint, not recomputed from the changed delay) — otherwise the first
// attempt's vis index entry is orphaned and can redeliver the message.
func TestSendMessageBatchStandard_VisKeyStableAcrossDelayChange(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()
seedStandardQueue(t, st) // DelaySeconds = 0

coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
coord.beforeDispatch = func(n int) {
if n != 2 {
return
}
// SetQueueAttributes raises DelaySeconds to 900 before the retry.
meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 262144, DelaySeconds: 900}
body, err := encodeSQSQueueMeta(meta)
require.NoError(t, err)
require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0))
}
srv := &SQSServer{store: st, coordinator: coord}

entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // no per-message DelaySeconds
successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
require.NoError(t, err)
require.Len(t, successful, 1)

require.Len(t, coord.dispatchKeys, 2)
require.Equal(t, coord.dispatchKeys[0], coord.dispatchKeys[1],
"vis/by-age keys must stay stable across a mid-retry DelaySeconds change (cached AvailableAtMillis)")
}

// TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry pins codex
// P2 round-2: if a prior attempt minted the identity but did NOT commit (lost a
// normal OCC race) and a concurrent SetQueueAttributes then lowered
// MaximumMessageSize below the body, the retry must re-validate against the
// fresh meta and REJECT the entry rather than commit it on the cached identity.
func TestSendMessageBatchStandard_RevalidatesAgainstCurrentMetaOnRetry(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()
seedStandardQueue(t, st) // MaximumMessageSize = 262144

coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
coord.beforeDispatch = func(n int) {
if n != 1 {
return
}
// SetQueueAttributes lowers MaximumMessageSize below the body size,
// landing during attempt 1's dispatch (which does not commit — it
// returns a WriteConflict). The retry then loads this lowered meta
// BEFORE re-validating, so the entry must be rejected.
meta := &sqsQueueMeta{Name: "q", Generation: 1, MaximumMessageSize: 3}
body, err := encodeSQSQueueMeta(meta)
require.NoError(t, err)
require.NoError(t, st.PutAt(ctx, sqsQueueMetaKey("q"), body, 100, 0))
}
srv := &SQSServer{store: st, coordinator: coord}

entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}} // 5 bytes > new limit 3
successful, failed, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
require.NoError(t, err)
require.Empty(t, successful, "the retry must re-validate against the lowered limit and reject the entry")
require.Len(t, failed, 1)
require.Equal(t, "a", failed[0].Id)
}

// TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry pins that
// the MessageIds reported to the client are the ones actually stored on the
// retry (not a discarded first-attempt set), so a consumer cannot observe a
// MessageId the client never received.
func TestSendMessageBatchStandard_ReturnedMessageIdsStableAcrossRetry(t *testing.T) {
t.Parallel()
ctx := context.Background()
st := store.NewMVCCStore()
seedStandardQueue(t, st)

coord := &recordingBatchCoordinator{stubAdapterCoordinator: stubAdapterCoordinator{clock: kv.NewHLC()}}
srv := &SQSServer{store: st, coordinator: coord}

entries := []sqsSendMessageBatchEntryInput{{Id: "a", MessageBody: "alpha"}}
successful, _, err := srv.sendMessageBatchWithRetry(ctx, "q", entries)
require.NoError(t, err)
require.Len(t, successful, 1)

// The data key embeds the MessageID; both attempts must carry the
// returned MessageID so the client's id matches what is stored.
wantDataKey := string(sqsMsgDataKeyDispatch(&sqsQueueMeta{Name: "q", Generation: 1}, "q", 0, 1, successful[0].MessageId))
for i, keys := range coord.dispatchKeys {
require.Contains(t, keys, wantDataKey, "dispatch %d must write the returned MessageID's data key", i)
}
}
68 changes: 59 additions & 9 deletions adapter/sqs_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,28 +653,78 @@ func resolveSendDelay(meta *sqsQueueMeta, requested *int64) (int64, error) {
return *requested, nil
}

func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) {
// sqsSendIdentity is the per-message identity that MUST stay stable across a
// retry loop. Every field feeds the message's storage keys: the data key via
// MessageID; the vis key via AvailableAtMillis; the by-age key via
// SendTimestampMillis. If they were re-minted on every retry attempt, a
// committed-but-conflicted attempt under leader churn plus the recomputed retry
// would land at two different key sets, double-sending the message (the
// :duplicate-elements class fixed for DynamoDB in PR #920).
//
// AvailableAtMillis (= sendTs + resolved delay) is captured here too, NOT
// recomputed per attempt: otherwise a SetQueueAttributes that changes the
// queue's DelaySeconds between a committed-but-conflicted attempt and its retry
// would shift the vis key and leave the first attempt's vis entry orphaned
// (codex P2 on PR #923) — a stale visibility index entry that can redeliver the
// message. The batch send path lazily mints one identity per entry on its first
// standard-path attempt and reuses it on every retry, so the keys are stable.
type sqsSendIdentity struct {
messageID string
token []byte
sendTsMillis int64
availableAtMillis int64
}

// newSendIdentity mints a fresh stable identity for the given resolved delay.
// Single-message sends call this inline via buildSendRecord (they have no
// in-process retry, so a self-inflicted conflict surfaces to the client as a
// normal at-least-once SDK retry); the batch path mints once per entry and
// reuses across its retry loop.
func newSendIdentity(delay int64) (sqsSendIdentity, error) {
messageID, err := newMessageIDHex()
if err != nil {
return nil, nil, errors.WithStack(err)
return sqsSendIdentity{}, errors.WithStack(err)
}
token, err := newReceiptToken()
if err != nil {
return nil, nil, errors.WithStack(err)
return sqsSendIdentity{}, errors.WithStack(err)
}
now := time.Now().UnixMilli()
availableAt := now + delay*sqsMillisPerSecond
return sqsSendIdentity{
messageID: messageID,
token: token,
sendTsMillis: now,
availableAtMillis: now + delay*sqsMillisPerSecond,
}, nil
}

func buildSendRecord(meta *sqsQueueMeta, in sqsSendMessageInput, delay int64) (*sqsMessageRecord, []byte, error) {
id, err := newSendIdentity(delay)
if err != nil {
return nil, nil, err
}
return buildSendRecordWithIdentity(meta, in, id)
}

// buildSendRecordWithIdentity builds the message record from a caller-supplied
// stable identity (MessageID + token + timestamps) instead of minting a fresh
// one. The record value embeds the current meta.Generation, so callers re-invoke
// this per retry attempt with the freshly-read generation while keeping the
// identity fixed — the keys stay stable for a given generation (idempotent
// overwrite under leader churn) and follow the generation if a concurrent
// DeleteQueue/PurgeQueue bumps it.
func buildSendRecordWithIdentity(meta *sqsQueueMeta, in sqsSendMessageInput, id sqsSendIdentity) (*sqsMessageRecord, []byte, error) {
body := []byte(in.MessageBody)
rec := &sqsMessageRecord{
MessageID: messageID,
MessageID: id.messageID,
Body: body,
MD5OfBody: sqsMD5Hex(body),
MD5OfMessageAttributes: md5OfAttributesHex(in.MessageAttributes),
MessageAttributes: in.MessageAttributes,
SendTimestampMillis: now,
AvailableAtMillis: availableAt,
VisibleAtMillis: availableAt,
CurrentReceiptToken: token,
SendTimestampMillis: id.sendTsMillis,
AvailableAtMillis: id.availableAtMillis,
VisibleAtMillis: id.availableAtMillis,
CurrentReceiptToken: id.token,
QueueGeneration: meta.Generation,
MessageGroupId: in.MessageGroupId,
MessageDeduplicationId: in.MessageDeduplicationId,
Expand Down
Loading
Loading