-
Notifications
You must be signed in to change notification settings - Fork 10
fix(transport): FastStream 0.7 regression #231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,93 +1,125 @@ | ||
| """ | ||
| Shared middleware utilities for transport implementations. | ||
|
|
||
| This module provides common middleware factories used by Kafka and Redis transports | ||
| to handle message filtering and data type validation. | ||
| Shared message-filtering utilities for transport implementations. | ||
|
|
||
| Kafka and Redis both apply EggAI's content filtering (``filter_by_message``) and | ||
| typed-subscription support (``data_type`` / ``filter_by_data``) by wrapping the | ||
| handler, *not* via broker subscriber middlewares. FastStream 0.7 removed the | ||
| ``middlewares`` argument from ``subscriber()`` (and from the broker constructor), | ||
| so the old middleware-based approach raised ``TypeError`` at subscribe time. This | ||
| handler-wrapping approach is independent of FastStream's middleware API and keeps | ||
| the behaviour identical across the Kafka, Redis, and in-memory transports. | ||
| """ | ||
|
|
||
| import json | ||
| from collections.abc import Awaitable, Callable | ||
| import inspect | ||
| from collections.abc import Callable | ||
| from typing import Any | ||
|
|
||
| from faststream.message.message import StreamMessage | ||
|
|
||
|
|
||
| def create_filter_middleware(filter_func: Callable[[dict[str, Any]], bool]) -> Callable: | ||
| """ | ||
| Create a middleware that filters messages based on a predicate function. | ||
|
|
||
| Args: | ||
| filter_func: A function that takes a message dict and returns True if the message | ||
| should be processed, False otherwise. | ||
| from pydantic import ValidationError | ||
|
|
||
| Returns: | ||
| A middleware function that applies the filter. | ||
| """ | ||
|
|
||
| async def middleware( | ||
| call_next: Callable[[Any], Awaitable[Any]], | ||
| msg: StreamMessage[Any], | ||
| ) -> Any: | ||
| if filter_func(json.loads(msg.body.decode("utf-8"))): | ||
| return await call_next(msg) | ||
| return None | ||
| # Identity attributes copied from the user handler onto a wrapper. We intentionally | ||
| # do NOT use functools.wraps here: it sets __wrapped__, which inspect.signature | ||
| # follows — FastStream would then introspect the *original* handler's signature | ||
| # (e.g. ``order: OrderMessage``) and try to decode the message into that type | ||
| # itself, before our wrapper runs. Our wrappers must keep their own ``(message)`` | ||
| # signature so FastStream hands them the raw dict to validate/filter. Copying just | ||
| # these attributes preserves the handler's name/docs for logging and AsyncAPI | ||
| # without changing what FastStream decodes. | ||
| _IDENTITY_ATTRS = ("__module__", "__name__", "__qualname__", "__doc__") | ||
|
|
||
| return middleware | ||
|
|
||
| def _carry_identity(wrapper: Callable, handler: Callable) -> Callable: | ||
| for attr in _IDENTITY_ATTRS: | ||
| try: | ||
| setattr(wrapper, attr, getattr(handler, attr)) | ||
| except AttributeError: | ||
| pass | ||
| return wrapper | ||
|
|
||
| def create_data_type_middleware(data_type: type) -> Callable: | ||
| """ | ||
| Create a middleware that validates and filters messages by data type. | ||
|
|
||
| Args: | ||
| data_type: A Pydantic model class with a 'type' field that will be used | ||
| for validation and filtering. | ||
| async def _invoke(handler: Callable, arg: Any) -> Any: | ||
| """Call ``handler`` with ``arg``, awaiting the result only if it is awaitable. | ||
|
|
||
| Returns: | ||
| A middleware function that validates the message against the data type | ||
| and filters out messages that don't match the expected type. | ||
| Mirrors ``make_tracing_wrapper``'s tolerance of synchronous handlers: a sync | ||
| handler combined with a filter option must not raise ``TypeError`` from an | ||
| unconditional ``await``. | ||
| """ | ||
|
|
||
| async def middleware( | ||
| call_next: Callable[[Any], Awaitable[Any]], | ||
| msg: StreamMessage[Any], | ||
| ) -> Any: | ||
| typed_message = data_type.model_validate(json.loads(msg.body.decode("utf-8"))) | ||
|
|
||
| if typed_message.type != data_type.model_fields["type"].default: | ||
| return None | ||
|
|
||
| return await call_next(msg) | ||
|
|
||
| return middleware | ||
|
|
||
|
|
||
| def create_filter_by_data_middleware( | ||
| data_type: type, filter_func: Callable[[Any], bool] | ||
| result = handler(arg) | ||
| if inspect.isawaitable(result): | ||
| return await result | ||
| return result | ||
|
|
||
|
|
||
| def wrap_handler_with_filters( | ||
| handler: Callable, | ||
| *, | ||
| data_type: type | None = None, | ||
| filter_by_data: Callable[[Any], bool] | None = None, | ||
| filter_by_message: Callable[[dict[str, Any]], bool] | None = None, | ||
| ) -> Callable: | ||
| """Wrap ``handler`` with EggAI's content filtering / typed-message support. | ||
|
|
||
| The returned coroutine receives the broker-decoded message (a ``dict``) and: | ||
|
|
||
| - ``data_type``: validates the dict against the Pydantic model. Messages that | ||
| fail validation, or whose ``type`` field does not match the model's default | ||
| ``type``, are skipped. Matching messages are passed to ``handler`` as the | ||
| **typed model instance** (e.g. ``OrderMessage``), not the raw dict. | ||
| - ``data_type`` + ``filter_by_data``: as above, and additionally skipped unless | ||
| ``filter_by_data(typed_message)`` returns truthy. | ||
| - ``filter_by_message`` (no ``data_type``): ``handler`` is called with the raw | ||
| dict only when ``filter_by_message(dict)`` returns truthy. | ||
|
|
||
| Skipped messages return ``None`` without invoking ``handler`` — a clean no-op, | ||
| so the broker acknowledges them (they are not retried). When no filtering | ||
| option is supplied, ``handler`` is returned unchanged. | ||
|
|
||
| ``filter_by_message`` and ``data_type`` are mutually exclusive: the former is | ||
| the untyped (raw-dict) filter, the latter validates into a typed model and | ||
| pairs with ``filter_by_data``. ``filter_by_data`` requires ``data_type``. | ||
| Invalid combinations raise ``ValueError`` rather than silently dropping an | ||
| option. | ||
| """ | ||
| Create a middleware that validates messages by data type and applies a filter. | ||
|
|
||
| This combines data type validation with a custom filter function that operates | ||
| on the validated/typed message object. | ||
|
|
||
| Args: | ||
| data_type: A Pydantic model class for validation. | ||
| filter_func: A function that takes the validated message object and returns | ||
| True if it should be processed, False otherwise. | ||
| if data_type is not None and filter_by_message is not None: | ||
| raise ValueError( | ||
| "filter_by_message cannot be combined with data_type. Use filter_by_data " | ||
| "(which receives the validated typed message) to filter typed " | ||
| "subscriptions, or filter_by_message on its own for raw-dict filtering." | ||
| ) | ||
| if filter_by_data is not None and data_type is None: | ||
| raise ValueError( | ||
| "filter_by_data requires data_type — it receives the validated typed " | ||
| "message. Use filter_by_message to filter on the raw dict instead." | ||
| ) | ||
|
|
||
| if data_type is not None: | ||
| if "type" not in data_type.model_fields: | ||
| raise ValueError( | ||
| f"data_type {data_type.__name__!r} must define a 'type' field " | ||
| "(the discriminator used to match messages, as on BaseMessage)." | ||
| ) | ||
| expected_type = data_type.model_fields["type"].default | ||
|
|
||
| async def typed_handler(message: dict[str, Any]) -> Any: | ||
|
pontino marked this conversation as resolved.
|
||
| try: | ||
| typed_message = data_type.model_validate(message) | ||
| except (ValidationError, ValueError, TypeError): | ||
| # Wrong shape / payload for this data_type — not ours to handle. | ||
| return None | ||
| if typed_message.type != expected_type: | ||
| return None | ||
| if filter_by_data is not None and not filter_by_data(typed_message): | ||
| return None | ||
| return await _invoke(handler, typed_message) | ||
|
|
||
| return _carry_identity(typed_handler, handler) | ||
|
|
||
| if filter_by_message is not None: | ||
|
|
||
| async def filtered_handler(message: dict[str, Any]) -> Any: | ||
| if filter_by_message(message): | ||
| return await _invoke(handler, message) | ||
| return None | ||
|
|
||
| Returns: | ||
| A middleware function that validates and filters messages. | ||
| """ | ||
| return _carry_identity(filtered_handler, handler) | ||
|
|
||
| async def middleware( | ||
| call_next: Callable[[Any], Awaitable[Any]], | ||
| msg: StreamMessage[Any], | ||
| ) -> Any: | ||
| data = json.loads(msg.body.decode("utf-8")) | ||
| typed_message = data_type.model_validate(data) | ||
| if filter_func(typed_message): | ||
| return await call_next(msg) | ||
| return None | ||
|
|
||
| return middleware | ||
| return handler | ||
|
pontino marked this conversation as resolved.
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.