fix(transport): FastStream 0.7 regression#231
Conversation
… growth) + repair FastStream 0.7 filtering
QualOps Code Quality AnalysisStatus: ✅ PASSED - No issues found Summary
No issues found in the analyzed code. 📊 Full ReportPowered by QualOps |
There was a problem hiding this comment.
Review: FastStream 0.7 regression fix
The core fix is correct and the right call. The regression is real. FastStream 0.7.0 dropped the middlewares argument from subscriber() and the broker constructor, and replacing subscriber-middleware filtering with handler wrapping (wrap_handler_with_filters) is transport-agnostic and CI-verified against the lockfile's faststream 0.7.0 across Python 3.10 to 3.13. The widened pin >=0.6,<0.8 is consistent. The Redis max_len work is sound: I confirmed against faststream 0.7.0 that broker.publish(maxlen=...) reaches redis-py's xadd, which defaults approximate=True, so the producer path uses MAXLEN ~ trimming, matching the reclaimer and the docstring.
There are a few edge-case gaps in the new wrap_handler_with_filters, none blocking the primary fix. Each was reproduced by running the wrapper directly.
Inline findings (verified)
- No functools.wraps on the wrappers. This erases the handler name and signature and breaks FastStream dependency injection for handlers that request injected arguments. It is also inconsistent with make_tracing_wrapper, which uses functools.wraps.
- await handler(...) breaks synchronous handlers. tracing.py supports sync handlers via an iscoroutinefunction guard; the filter path assumes async and raises TypeError at runtime for sync handlers using any filter option.
- filter_by_data without data_type is silently dropped. The predicate is discarded with no error, the same case the PR correctly rejects for filter_by_message with data_type. Add the symmetric guard.
- A non-type Pydantic model raises a confusing KeyError("type") (nit). A clear ValueError would diagnose better.
Design notes (intentional or out of scope, not blockers)
- Malformed-message handling changed. The old data_type middleware let ValidationError bubble up to retry or DLQ; the new typed_handler swallows it and acks. This is intentional and tested (test_data_type_skips_invalid_payload) and correct for shared-channel type routing. Worth confirming that losing the DLQ path for genuinely malformed messages aimed at this handler is acceptable.
- DLQ shares the retry max_len cap (pending_reclaimer.py). This is documented behavior, default retry_max_len=10000, but a separate unbounded dlq_max_len would avoid silently trimming forensic data. Enhancement, not a defect.
- Consumer default with explicit group= (redis.py). When a caller uses transport.subscribe() directly with group= but no handler_id, consumer stays None. The normal Agent.subscribe to start() path always sets handler_id, so this only affects direct-API callers, and the code comment already notes it.
Two automated-scan flags I checked and dismissed
- Missing Redis skip-guard on the new maxlen test: false positive. tests/conftest.py auto-skips every test_redis.py test when localhost:6379 is unavailable.
- publish(maxlen=) using exact trimming: false positive. redis-py defaults approximate=True and faststream 0.7.0 does not override it.
Verification note: I confirmed CI is green on faststream 0.7.0 and reproduced the wrapper findings standalone with pydantic only. I did not run the full suite against a live 0.7.0 broker locally.
…-memory consistency
pontino
left a comment
There was a problem hiding this comment.
Verified the fixes in f20b00d. All four inline findings are resolved, with tests and green CI on Python 3.10 to 3.13. I checked each by running the new wrap_handler_with_filters directly:
- Sync handlers: the new _invoke helper checks inspect.isawaitable before awaiting. A sync handler now works on both the filter_by_message and data_type paths instead of raising TypeError.
- filter_by_data without data_type: now raises a clear ValueError, symmetric with the existing filter_by_message plus data_type guard.
- data_type without a type field: now raises a descriptive ValueError instead of a bare KeyError.
- Handler identity: _carry_identity copies name, qualname, doc, module.
On the identity fix: not using functools.wraps is the correct call. functools.wraps sets wrapped, which inspect.signature follows, so FastStream would introspect the original handler signature and try to decode the message into that type before the wrapper runs. Keeping the wrapper's own (message) signature is required. My earlier comment suggesting functools.wraps would have been wrong here.
Also good to see the in-memory transport now delegating to wrap_handler_with_filters, which removes the duplicated filter logic across the three transports.
The Redis max_len work checks out: faststream 0.7.0 forwards maxlen to redis-py xadd, which defaults approximate=True, so the producer path uses MAXLEN ~ trimming as documented.
The remaining design notes (separate dlq_max_len, consumer default when group= is passed via direct transport.subscribe) are non-blocking and fine to leave for a follow-up.
LGTM.
This PR fixes a FastStream 0.7 regression found and closes the two gaps (load balancing, unbounded growth) mentioned in documentation.
Changes
1. FastStream 0.7 filtering regression —
fix(critical)0.7 removed publisher/subscriber-level middlewares, so
filter_by_message/data_type/filter_by_dataraised TypeErrorat subscribe time on Redis and Kafka. A clean install resolves faststream to the newest version under<0.8(currently 0.7.1), so new users hit this on first install. Filtering now lives in EggAI's own handler wrapper (application code), matching the in-memory transport.data_typesubscriptions now deliver the **typed model instance** to handlers; combiningdata_typewithfilter_by_messageraisesValueError` rather than silently dropping the predicate.2. Consumer-group load balancing —
fixConsumer name defaulted to
handler_id, identical to the group, so multiple workers shared one consumer name and conflated their pending-entries lists. It now defaults to a per-process-unique name ({handler_id}-{hostname}-{pid}) withthe group still stable — true competing consumers. The auto-created retry-stream subscriber gets the same treatment, so retried messages load-balance too.
3. Bounded stream growth —
featRedisTransport(max_len=…, retry_max_len=…), applied as approximate trimming (XADD … MAXLEN ~).max_lenproducer path) is opt-in because MAXLEN trims by count regardless of ack state and could drop un-consumed messages;retry_max_lendefaults to 10_000 to bound the retry/DLQ streams.