Skip to content

fix(transport): FastStream 0.7 regression#231

Merged
pontino merged 2 commits into
mainfrom
feat/redis-maxlen-and-consumer-naming
Jun 4, 2026
Merged

fix(transport): FastStream 0.7 regression#231
pontino merged 2 commits into
mainfrom
feat/redis-maxlen-and-consumer-naming

Conversation

@rocky-jaiswal

Copy link
Copy Markdown
Contributor

This PR fixes a FastStream 0.7 regression found and closes the two gaps (load balancing, unbounded growth) mentioned in documentation.

Changes

1. FastStream 0.7 filtering regression — fix (critical)
0.7 removed publisher/subscriber-level middlewares, so filter_by_message / data_type / filter_by_data raised TypeErrorat subscribe time on Redis and Kafka. A clean install resolves faststream to the newest version under<0.8(currently 0.7.1), so new users hit this on first install. Filtering now lives in EggAI's own handler wrapper (application code), matching the in-memory transport.data_typesubscriptions now deliver the **typed model instance** to handlers; combiningdata_typewithfilter_by_messageraisesValueError` rather than silently dropping the predicate.

2. Consumer-group load balancing — fix
Consumer name defaulted to handler_id, identical to the group, so multiple workers shared one consumer name and conflated their pending-entries lists. It now defaults to a per-process-unique name ({handler_id}-{hostname}-{pid}) with
the group still stable — true competing consumers. The auto-created retry-stream subscriber gets the same treatment, so retried messages load-balance too.

3. Bounded stream growth — feat
RedisTransport(max_len=…, retry_max_len=…), applied as approximate trimming (XADD … MAXLEN ~). max_len producer path) is opt-in because MAXLEN trims by count regardless of ack state and could drop un-consumed messages; retry_max_len defaults to 10_000 to bound the retry/DLQ streams.

@github-actions

github-actions Bot commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

QualOps Code Quality Analysis

Status: ✅ PASSED - No issues found

Summary

  • Total Issues: 0
  • Critical: 0 🔴
  • High: 0 🟠
  • Medium: 0 🟡
  • Low: 0 🟢
  • Files Analyzed: 0

No issues found in the analyzed code.

📊 Full Report

View detailed report


Powered by QualOps

@pontino pontino left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: FastStream 0.7 regression fix

The core fix is correct and the right call. The regression is real. FastStream 0.7.0 dropped the middlewares argument from subscriber() and the broker constructor, and replacing subscriber-middleware filtering with handler wrapping (wrap_handler_with_filters) is transport-agnostic and CI-verified against the lockfile's faststream 0.7.0 across Python 3.10 to 3.13. The widened pin >=0.6,<0.8 is consistent. The Redis max_len work is sound: I confirmed against faststream 0.7.0 that broker.publish(maxlen=...) reaches redis-py's xadd, which defaults approximate=True, so the producer path uses MAXLEN ~ trimming, matching the reclaimer and the docstring.

There are a few edge-case gaps in the new wrap_handler_with_filters, none blocking the primary fix. Each was reproduced by running the wrapper directly.

Inline findings (verified)

  1. No functools.wraps on the wrappers. This erases the handler name and signature and breaks FastStream dependency injection for handlers that request injected arguments. It is also inconsistent with make_tracing_wrapper, which uses functools.wraps.
  2. await handler(...) breaks synchronous handlers. tracing.py supports sync handlers via an iscoroutinefunction guard; the filter path assumes async and raises TypeError at runtime for sync handlers using any filter option.
  3. filter_by_data without data_type is silently dropped. The predicate is discarded with no error, the same case the PR correctly rejects for filter_by_message with data_type. Add the symmetric guard.
  4. A non-type Pydantic model raises a confusing KeyError("type") (nit). A clear ValueError would diagnose better.

Design notes (intentional or out of scope, not blockers)

  • Malformed-message handling changed. The old data_type middleware let ValidationError bubble up to retry or DLQ; the new typed_handler swallows it and acks. This is intentional and tested (test_data_type_skips_invalid_payload) and correct for shared-channel type routing. Worth confirming that losing the DLQ path for genuinely malformed messages aimed at this handler is acceptable.
  • DLQ shares the retry max_len cap (pending_reclaimer.py). This is documented behavior, default retry_max_len=10000, but a separate unbounded dlq_max_len would avoid silently trimming forensic data. Enhancement, not a defect.
  • Consumer default with explicit group= (redis.py). When a caller uses transport.subscribe() directly with group= but no handler_id, consumer stays None. The normal Agent.subscribe to start() path always sets handler_id, so this only affects direct-API callers, and the code comment already notes it.

Two automated-scan flags I checked and dismissed

  • Missing Redis skip-guard on the new maxlen test: false positive. tests/conftest.py auto-skips every test_redis.py test when localhost:6379 is unavailable.
  • publish(maxlen=) using exact trimming: false positive. redis-py defaults approximate=True and faststream 0.7.0 does not override it.

Verification note: I confirmed CI is green on faststream 0.7.0 and reproduced the wrapper findings standalone with pydantic only. I did not run the full suite against a live 0.7.0 broker locally.

Comment thread sdk/eggai/transport/middleware_utils.py
Comment thread sdk/eggai/transport/middleware_utils.py
Comment thread sdk/eggai/transport/middleware_utils.py Outdated
Comment thread sdk/eggai/transport/middleware_utils.py

@pontino pontino left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verified the fixes in f20b00d. All four inline findings are resolved, with tests and green CI on Python 3.10 to 3.13. I checked each by running the new wrap_handler_with_filters directly:

  • Sync handlers: the new _invoke helper checks inspect.isawaitable before awaiting. A sync handler now works on both the filter_by_message and data_type paths instead of raising TypeError.
  • filter_by_data without data_type: now raises a clear ValueError, symmetric with the existing filter_by_message plus data_type guard.
  • data_type without a type field: now raises a descriptive ValueError instead of a bare KeyError.
  • Handler identity: _carry_identity copies name, qualname, doc, module.

On the identity fix: not using functools.wraps is the correct call. functools.wraps sets wrapped, which inspect.signature follows, so FastStream would introspect the original handler signature and try to decode the message into that type before the wrapper runs. Keeping the wrapper's own (message) signature is required. My earlier comment suggesting functools.wraps would have been wrong here.

Also good to see the in-memory transport now delegating to wrap_handler_with_filters, which removes the duplicated filter logic across the three transports.

The Redis max_len work checks out: faststream 0.7.0 forwards maxlen to redis-py xadd, which defaults approximate=True, so the producer path uses MAXLEN ~ trimming as documented.

The remaining design notes (separate dlq_max_len, consumer default when group= is passed via direct transport.subscribe) are non-blocking and fine to leave for a follow-up.

LGTM.

@pontino pontino merged commit 5e229cc into main Jun 4, 2026
11 checks passed
@pontino pontino deleted the feat/redis-maxlen-and-consumer-naming branch June 4, 2026 15:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants