fix(redis): scope retry/dlq stream keys per-handler (#225)#226
Conversation
When multiple handlers subscribe to the same channel with different
consumer groups, the shared `{channel}.retry` and `{channel}.dlq`
stream keys caused one handler's failures to be redelivered to every
other handler on that channel. Each handler now writes to (and reads
from) its own `{channel}.{handler_suffix}.retry` and `.dlq` streams.
Adds a regression test that spins up two agents on the same channel,
fails one, and asserts the other is invoked exactly once.
Breaking on-wire change: inflight messages in the legacy
`{channel}.retry` / `{channel}.dlq` streams will not be picked up by
upgraded consumers. Migration note added to CHANGELOG.
QualOps Code Quality AnalysisStatus: ✅ PASSED - No issues found Summary
No issues found in the analyzed code. 📊 Full ReportPowered by QualOps |
pontino
left a comment
There was a problem hiding this comment.
Review
The core fix is correct. Per-handler retry/dlq stream keys solve issue #225, and the new fan-out regression test exercises the right scenario. There are a few gaps worth addressing before merge, plus several pre-existing concerns the PR newly exposes.
Blockers
-
test_retry_on_idle_ms_no_retry_retry_streamis now a tautology. The assertion checks for absence ofeggai.{channel}.{group_main}.retry.{group_main}-retry.retry, but a real regression of theretry_stream=f"{retry_stream}.retry"class would produceeggai.{channel}.{group_main}.retry.retry(no embedded group segment). The current key is unreachable by any plausible mutation, so the test passes against the regression it claims to guard. Assert on the latter form, or useKEYS eggai.{channel}*and assert no key ends with.retry.retry. -
sdk/README.mdstill documents the old per-channel keys in seven places (lines 158, 160, 168, 171, 183, 209, 219), including a literaleggai.orders.retry/eggai.orders.dlqin the ASCII diagram.docs/docs/sdk/redis-transport.mdwas updated, this sibling that ships on PyPI was missed. Users following the README will look for streams that no longer exist. -
The new test shares one
RedisTransportacross two agents.connect()is called twice (once per agent), and_group_monitor_taskis overwritten without cancelling the previous task. Task A keeps polling Redis against the same_stream_subscriptionslist; on disconnect only task B is awaited, and the redis client task A created is neveraclose()d. Either guardconnect()against re-entry, or keep a list of monitor tasks.
Pre-existing, newly exposed
-
connect()also re-invokesbroker.start()and_reclaimer_manager.start()without a guard. The shared-transport test pattern works only because FastStream and the reclaimer happen to be re-entrant today. -
The recursive
subscribe()forwardslast_id=last_id. If an operator restarts withlast_id='0'to replay a main-stream backlog, the retry-stream subscriber also replays its entire history every restart. The retry stream should pinlast_id='>'. -
The recursive
subscribe()also forwards user-suppliedack_policy. UnderAckPolicy.ACK, retry-stream failures never leave a PEL entry, so the reclaimer finds nothing andmax_retries/ DLQ never trigger. The per-handler DLQ becomes a no-op for that policy. -
The same wrapped handler is registered on both the main and retry streams.
make_tracing_wrapper(channel, handler)captures the original channel name in its closure (tracing.py:184), so retry-attempt spans emitmessaging.destination=<original channel>instead of the retry stream key. Operators cannot dashboard retry-stream consumption separately from main. -
handler_suffix = handler_id if handler_id else f"{channel}-{handler.__name__}". Directtransport.subscribe()callers that bypassAgent(which setshandler_idviaHANDLERS_IDScounter) with two same-named handlers on the same channel still collapse to the same suffix, reproducing #225 in that path. Lambdas all produce<lambda>.functools.partialraisesAttributeErrorbecause it has no__name__, after the main-stream registration has already committed. -
_inject_retry_metadatareturns(data, 0)on parse failure. The retry-stream reclaimer republishes back to the same stream, so a corrupt envelope loops forever and never reaches DLQ. With per-handler retry streams, every handler now owns its own permanent livelock surface. -
The recursive call passes
**kwargswhich still references the samemiddlewareslist mutated by the parent. Main and retry FastStream subscribers share the list by reference. A defensivekwargs['middlewares'] = list(kwargs.get('middlewares', []))before the recursive call would isolate them.
Test quality
-
test_retry_does_not_fan_out_to_other_handlersasserts only the lengths ofok_callsandfail_calls. Addingassert ok_calls == ['x']andassert fail_calls == ['x', 'x']would lock the payload identity in. (The 2-second window afterfail_retry_doneis fine: both the retry-success and the hypothetical fan-out are gated on the same reclaimer XADD, so they race on the 100ms FastStream poll, not the reclaimer interval.) -
The new test does not clean up the per-handler retry/dlq streams or consumer groups it creates. Not a regression on its own (sibling tests do the same), but worth a fixture if you ever decide to clean things up.
Doc rot
sdk/eggai/transport/pending_reclaimer.py:52still has# e.g. "eggai.orders.dlq". No runtime impact, just stale.
Minor
-
_stream_subscriptions.appendis unconditional. Repeatedsubscribe()calls (or partial-failure retries) duplicate entries, and the monitor then issues redundantXGROUP CREATEper duplicate every interval. A set keyed by(stream_key, group)would dedup. -
subscribe()has no rollback if the recursive retry-streamsubscribe()raises. Main-streambroker.subscriber(...),_stream_subscriptions.append, and the first_setup_reclaimerare already committed. A torn state ships, and any retry by the caller double-appends.
Follow-up to the review on #226: - Reject ack_policy=ACK/ACK_FIRST together with retry_on_idle_ms: those XACK on handler failure, emptying the PEL the reclaimer scans, so retries/DLQ would silently never fire. - Pin the auto-created retry-stream subscriber to last_id=">" instead of forwarding the caller's last_id, so replaying the main stream (last_id="0") no longer replays the retry stream's history every restart. - Route a permanently-unparseable ("poison") retry message to the DLQ (or drop it with an error log when no DLQ is configured) instead of re-queueing it forever; its retry count can never be incremented. - Trace the retry-stream subscriber with the retry stream as messaging.destination (was the original channel), so retry consumption can be dashboarded separately. - Make connect() idempotent for a transport shared across agents: start only not-yet-running subscribers on re-entry and don't orphan the group monitor task. - Roll back partial in-memory registration (reclaimer configs, stream entries) if retry-stream setup raises; de-duplicate _stream_subscriptions (now a set). - Guard handler_suffix against handlers without __name__ (e.g. functools.partial) and derive it from the original handler rather than the tracing wrapper. - Broaden the no-".retry.retry" test to a KEYS-based check; lock payload identity and clean up per-handler streams in the fan-out test; add tests for the above. - Sync sdk/README.md to the per-handler stream-key scheme and fix stale comments in pending_reclaimer.py; document the changes in CHANGELOG.
|
Thanks for the thorough review, Stefano — almost every mechanism checked out. All addressed in Fixed
Declined
Note: most of 3–10/14/15 predate this PR; fixed them here since the per-handler change exercises them. |
When multiple handlers subscribe to the same Redis stream with different
consumer groups, a failure in one group's PEL is replayed to every
handler on the channel — not just the one that failed. The reclaimer
republishes to a single shared
{channel}.retrykey, and each handlerauto-subscribes to that key under its own
-retryconsumer group, soevery new
XADDfans out to all of them. This breaks the isolationusers reasonably expect from independent consumer groups and amplifies
any nack into N replays (N = number of consumer groups on the channel).
This PR scopes the retry/dlq stream keys per-handler: each handler that
subscribes with
retry_on_idle_msnow writes to its own{channel}.{handler_suffix}.retryand (whenmax_retriesis set){channel}.{handler_suffix}.dlqstreams.Closes #225
Type of Change
Related Issue
Fixes #225
Changes Made
handler_suffixfromhandler_id(or{channel}-{handler.__name__}fallback) inRedisTransport.subscribe(){channel}.{handler_suffix}.retry(was{channel}.retry){channel}.{handler_suffix}.dlq(was{channel}.dlq)-retryconsumer group on the retry stream is unchanged in shape — still{handler_suffix}-retryretry_stream=retry_stream, so the "no.retry.retrychain" property is preservedtest_retry_does_not_fan_out_to_other_handlers— two agents on the same channel, one nacks, asserts the other is invoked exactly oncesdk/CHANGELOG.mdunder[Unreleased]with### Fixedand### Migrationsectionsdocs/docs/sdk/redis-transport.md(table, mermaid diagram, prose) to reflect the new per-handler key patternMigration
Breaking on-wire change for the retry/dlq stream keys. Operators
upgrading mid-flight will:
{channel}.retry/{channel}.dlqstreams in Redis after upgrade. They will no longer receive new entries.XADDpending entries into the new per-handler keys before old data is dropped (e.g., onXTRIMor eviction).Migration note also documented in
sdk/CHANGELOG.md.Testing
make test)Local run: 22/22 redis-transport tests pass against a Redis 7 container,
including the new regression test.
Changelog
sdk/CHANGELOG.mdunder[Unreleased]section (required for code changes)Checklist
make lintpasses)make formatapplied)