Skip to content

fix(redis): scope retry/dlq stream keys per-handler (#225)#226

Merged
pontino merged 2 commits into
mainfrom
fix/per-handler-retry-stream-key
Jun 3, 2026
Merged

fix(redis): scope retry/dlq stream keys per-handler (#225)#226
pontino merged 2 commits into
mainfrom
fix/per-handler-retry-stream-key

Conversation

@tsiaeggai

@tsiaeggai tsiaeggai commented May 22, 2026

Copy link
Copy Markdown
Collaborator

When multiple handlers subscribe to the same Redis stream with different
consumer groups, a failure in one group's PEL is replayed to every
handler on the channel — not just the one that failed. The reclaimer
republishes to a single shared {channel}.retry key, and each handler
auto-subscribes to that key under its own -retry consumer group, so
every new XADD fans out to all of them. This breaks the isolation
users reasonably expect from independent consumer groups and amplifies
any nack into N replays (N = number of consumer groups on the channel).

This PR scopes the retry/dlq stream keys per-handler: each handler that
subscribes with retry_on_idle_ms now writes to its own
{channel}.{handler_suffix}.retry and (when max_retries is set)
{channel}.{handler_suffix}.dlq streams.

Closes #225

Type of Change

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • Documentation update
  • Refactoring (no functional changes)
  • Performance improvement
  • Test coverage improvement
  • CI/CD update
  • Dependency update

Related Issue

Fixes #225

Changes Made

  • Derive a per-handler handler_suffix from handler_id (or {channel}-{handler.__name__} fallback) in RedisTransport.subscribe()
  • Retry stream key: {channel}.{handler_suffix}.retry (was {channel}.retry)
  • DLQ stream key: {channel}.{handler_suffix}.dlq (was {channel}.dlq)
  • Companion -retry consumer group on the retry stream is unchanged in shape — still {handler_suffix}-retry
  • Second reclaimer (retry stream → retry stream) still uses retry_stream=retry_stream, so the "no .retry.retry chain" property is preserved
  • Added regression test test_retry_does_not_fan_out_to_other_handlers — two agents on the same channel, one nacks, asserts the other is invoked exactly once
  • Updated 7 existing tests that referenced the legacy literal stream names
  • Updated sdk/CHANGELOG.md under [Unreleased] with ### Fixed and ### Migration sections
  • Updated docs/docs/sdk/redis-transport.md (table, mermaid diagram, prose) to reflect the new per-handler key pattern

Migration

Breaking on-wire change for the retry/dlq stream keys. Operators
upgrading mid-flight will:

  • Still see existing {channel}.retry / {channel}.dlq streams in Redis after upgrade. They will no longer receive new entries.
  • Need to drain those streams or manually XADD pending entries into the new per-handler keys before old data is dropped (e.g., on XTRIM or eviction).

Migration note also documented in sdk/CHANGELOG.md.

Testing

  • Existing tests pass locally (make test)
  • Added new tests for new functionality

Local run: 22/22 redis-transport tests pass against a Redis 7 container,
including the new regression test.

Changelog

  • I have updated sdk/CHANGELOG.md under [Unreleased] section (required for code changes)

Checklist

  • My code follows the project's code style (make lint passes)
  • My code is properly formatted (make format applied)
  • I have added/updated tests that prove my fix/feature works
  • I have added/updated documentation as needed
  • All tests pass locally
  • I have reviewed my own code
  • My changes generate no new warnings
  • My commit messages follow the Conventional Commits standard

When multiple handlers subscribe to the same channel with different
consumer groups, the shared `{channel}.retry` and `{channel}.dlq`
stream keys caused one handler's failures to be redelivered to every
other handler on that channel. Each handler now writes to (and reads
from) its own `{channel}.{handler_suffix}.retry` and `.dlq` streams.

Adds a regression test that spins up two agents on the same channel,
fails one, and asserts the other is invoked exactly once.

Breaking on-wire change: inflight messages in the legacy
`{channel}.retry` / `{channel}.dlq` streams will not be picked up by
upgraded consumers. Migration note added to CHANGELOG.
@github-actions

github-actions Bot commented May 22, 2026

Copy link
Copy Markdown
Contributor

QualOps Code Quality Analysis

Status: ✅ PASSED - No issues found

Summary

  • Total Issues: 0
  • Critical: 0 🔴
  • High: 0 🟠
  • Medium: 0 🟡
  • Low: 0 🟢
  • Files Analyzed: 0

No issues found in the analyzed code.

📊 Full Report

View detailed report


Powered by QualOps

@tsiaeggai tsiaeggai requested review from Codesleuth and pontino May 22, 2026 14:37

@pontino pontino left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review

The core fix is correct. Per-handler retry/dlq stream keys solve issue #225, and the new fan-out regression test exercises the right scenario. There are a few gaps worth addressing before merge, plus several pre-existing concerns the PR newly exposes.

Blockers

  1. test_retry_on_idle_ms_no_retry_retry_stream is now a tautology. The assertion checks for absence of eggai.{channel}.{group_main}.retry.{group_main}-retry.retry, but a real regression of the retry_stream=f"{retry_stream}.retry" class would produce eggai.{channel}.{group_main}.retry.retry (no embedded group segment). The current key is unreachable by any plausible mutation, so the test passes against the regression it claims to guard. Assert on the latter form, or use KEYS eggai.{channel}* and assert no key ends with .retry.retry.

  2. sdk/README.md still documents the old per-channel keys in seven places (lines 158, 160, 168, 171, 183, 209, 219), including a literal eggai.orders.retry / eggai.orders.dlq in the ASCII diagram. docs/docs/sdk/redis-transport.md was updated, this sibling that ships on PyPI was missed. Users following the README will look for streams that no longer exist.

  3. The new test shares one RedisTransport across two agents. connect() is called twice (once per agent), and _group_monitor_task is overwritten without cancelling the previous task. Task A keeps polling Redis against the same _stream_subscriptions list; on disconnect only task B is awaited, and the redis client task A created is never aclose()d. Either guard connect() against re-entry, or keep a list of monitor tasks.

Pre-existing, newly exposed

  1. connect() also re-invokes broker.start() and _reclaimer_manager.start() without a guard. The shared-transport test pattern works only because FastStream and the reclaimer happen to be re-entrant today.

  2. The recursive subscribe() forwards last_id=last_id. If an operator restarts with last_id='0' to replay a main-stream backlog, the retry-stream subscriber also replays its entire history every restart. The retry stream should pin last_id='>'.

  3. The recursive subscribe() also forwards user-supplied ack_policy. Under AckPolicy.ACK, retry-stream failures never leave a PEL entry, so the reclaimer finds nothing and max_retries / DLQ never trigger. The per-handler DLQ becomes a no-op for that policy.

  4. The same wrapped handler is registered on both the main and retry streams. make_tracing_wrapper(channel, handler) captures the original channel name in its closure (tracing.py:184), so retry-attempt spans emit messaging.destination=<original channel> instead of the retry stream key. Operators cannot dashboard retry-stream consumption separately from main.

  5. handler_suffix = handler_id if handler_id else f"{channel}-{handler.__name__}". Direct transport.subscribe() callers that bypass Agent (which sets handler_id via HANDLERS_IDS counter) with two same-named handlers on the same channel still collapse to the same suffix, reproducing #225 in that path. Lambdas all produce <lambda>. functools.partial raises AttributeError because it has no __name__, after the main-stream registration has already committed.

  6. _inject_retry_metadata returns (data, 0) on parse failure. The retry-stream reclaimer republishes back to the same stream, so a corrupt envelope loops forever and never reaches DLQ. With per-handler retry streams, every handler now owns its own permanent livelock surface.

  7. The recursive call passes **kwargs which still references the same middlewares list mutated by the parent. Main and retry FastStream subscribers share the list by reference. A defensive kwargs['middlewares'] = list(kwargs.get('middlewares', [])) before the recursive call would isolate them.

Test quality

  1. test_retry_does_not_fan_out_to_other_handlers asserts only the lengths of ok_calls and fail_calls. Adding assert ok_calls == ['x'] and assert fail_calls == ['x', 'x'] would lock the payload identity in. (The 2-second window after fail_retry_done is fine: both the retry-success and the hypothetical fan-out are gated on the same reclaimer XADD, so they race on the 100ms FastStream poll, not the reclaimer interval.)

  2. The new test does not clean up the per-handler retry/dlq streams or consumer groups it creates. Not a regression on its own (sibling tests do the same), but worth a fixture if you ever decide to clean things up.

Doc rot

  1. sdk/eggai/transport/pending_reclaimer.py:52 still has # e.g. "eggai.orders.dlq". No runtime impact, just stale.

Minor

  1. _stream_subscriptions.append is unconditional. Repeated subscribe() calls (or partial-failure retries) duplicate entries, and the monitor then issues redundant XGROUP CREATE per duplicate every interval. A set keyed by (stream_key, group) would dedup.

  2. subscribe() has no rollback if the recursive retry-stream subscribe() raises. Main-stream broker.subscriber(...), _stream_subscriptions.append, and the first _setup_reclaimer are already committed. A torn state ships, and any retry by the caller double-appends.

Follow-up to the review on #226:

- Reject ack_policy=ACK/ACK_FIRST together with retry_on_idle_ms: those XACK on
  handler failure, emptying the PEL the reclaimer scans, so retries/DLQ would
  silently never fire.
- Pin the auto-created retry-stream subscriber to last_id=">" instead of
  forwarding the caller's last_id, so replaying the main stream (last_id="0")
  no longer replays the retry stream's history every restart.
- Route a permanently-unparseable ("poison") retry message to the DLQ (or drop
  it with an error log when no DLQ is configured) instead of re-queueing it
  forever; its retry count can never be incremented.
- Trace the retry-stream subscriber with the retry stream as messaging.destination
  (was the original channel), so retry consumption can be dashboarded separately.
- Make connect() idempotent for a transport shared across agents: start only
  not-yet-running subscribers on re-entry and don't orphan the group monitor task.
- Roll back partial in-memory registration (reclaimer configs, stream entries) if
  retry-stream setup raises; de-duplicate _stream_subscriptions (now a set).
- Guard handler_suffix against handlers without __name__ (e.g. functools.partial)
  and derive it from the original handler rather than the tracing wrapper.
- Broaden the no-".retry.retry" test to a KEYS-based check; lock payload identity
  and clean up per-handler streams in the fan-out test; add tests for the above.
- Sync sdk/README.md to the per-handler stream-key scheme and fix stale comments
  in pending_reclaimer.py; document the changes in CHANGELOG.
@tsiaeggai

Copy link
Copy Markdown
Collaborator Author

Thanks for the thorough review, Stefano — almost every mechanism checked out. All addressed in be7c8a7. Point by point:

Fixed

  1. test_..._no_retry_retry_stream "tautology" — partly fair. The asserted key is reachable if the _internal_retry guard breaks (that's the regression it was guarding), but you're right it misses the retry_stream + ".retry" class, which produces …retry.retry without the group segment. Kept the nested check and added a KEYS eggai.{channel}* assertion that no key ends with .retry.retry, and corrected the docstring.
  2. sdk/README.md — synced to the per-handler keys (table, prose, code comment, ASCII diagram). Good catch — this is the PyPI long-description, so it mattered.
  3. Orphaned monitor task on shared transportconnect() no longer overwrites a running monitor task. (Minor: the monitor's client is aclose()d via its finally once _running flips, but the orphaning was real and is fixed.)
  4. connect() re-invocationconnect() is now idempotent: full broker.start() on first call, then only not-yet-running subscribers on re-entry, so a shared transport doesn't spawn duplicate consume loops. (The reclaimer was already guarded.)
  5. last_id forwarded to retry stream — pinned the retry subscriber to last_id=">". (FYI the "replays entire history" framing isn't quite how XREADGROUP behaves — a non-> id re-reads only the consumer's own PEL — but decoupling the retry stream from the operator's main-stream replay intent is the right call regardless.)
  6. AckPolicy.ACK makes DLQ a no-opretry_on_idle_ms now raises ValueError for ACK/ACK_FIRST (both XACK on failure and defeat the PEL-based reclaimer).
  7. Tracing destination on retry stream — the retry subscriber is now wrapped with the retry stream key, so its spans report the retry stream as messaging.destination. Wrapped from the original handler to keep it to one consumer span per retry.
  8. handler_suffix fallback — now getattr(handler, "__name__", …) so functools.partial no longer raises after the main subscriber is registered, and it's derived from the original handler (not the tracing wrapper). Documented that direct transport.subscribe() callers should pass a distinct handler_id; Agent/Channel always do.
  9. Poison-message livelock — an unparseable envelope now routes to the DLQ (or is dropped with an error log when no DLQ is configured) instead of being re-queued forever.
  10. Test asserts only lengths — added ok_calls == ["x"] / fail_calls == ["x", "x"].
  11. Test stream cleanup — the fan-out test now deletes its per-handler streams/groups on teardown.
  12. Stale pending_reclaimer.py comments — fixed the # e.g. "eggai.orders.dlq" example and the now-incorrect # never == stream.
  13. Unconditional _stream_subscriptions.append — it's now a set (identical entries dedup), and the monitor iterates a snapshot to stay safe against concurrent subscribe().
  14. No rollback if recursive subscribe() raisessubscribe() now rolls back the reclaimer configs and stream entries it registered on failure. (FastStream doesn't expose un-registering a broker subscriber, but nothing is live until connect(), which never runs on failure.)

Declined

  1. Shared middlewares list — I don't think this is a bug. FastStream snapshots the list into an immutable tuple per subscriber at registration (usecase.py), the recursive frame can't re-append (the filter_by_message/data_type/filter_by_data keys are pop'd on the parent), and the list is never mutated again. So there's no double-application or aliasing in practice. Happy to add the defensive copy anyway if you'd prefer it as hardening.

Note: most of 3–10/14/15 predate this PR; fixed them here since the per-handler change exercises them.

@pontino pontino merged commit 1ec7f7e into main Jun 3, 2026
10 checks passed
@pontino pontino deleted the fix/per-handler-retry-stream-key branch June 3, 2026 09:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Redis transport: shared *.retry stream broadcasts a single group's failures to every other handler on the same channel

4 participants