Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,43 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- **Redis & Kafka transports**: `filter_by_message`, `data_type`, and
`filter_by_data` subscriptions raised `TypeError` at subscribe time under
FastStream 0.7, which removed publisher/subscriber-level middlewares. Filtering
and typed-message handling are now applied in EggAI's own handler wrapper
(application code) rather than via FastStream subscriber middlewares — aligning
with FastStream 0.7's removal of subscriber/publisher middlewares. This keeps the
per-subscription filter logic independent of FastStream's middleware API (it is
the same approach the in-memory transport already uses). As part of this,
`data_type` subscriptions on Redis/Kafka now deliver the **typed model instance**
to the handler (matching the in-memory transport and the documented behaviour),
rather than the raw dict. Invalid filter-option combinations now raise
`ValueError` instead of silently dropping an option: `data_type` and
`filter_by_message` are mutually exclusive, and `filter_by_data` requires
`data_type`. These validations are consistent across the Redis, Kafka, and
in-memory transports.

### Added
- **RedisTransport**: New `max_len` and `retry_max_len` constructor options to cap
Redis stream growth via approximate trimming (`XADD ... MAXLEN ~`). `max_len`
(default `None`/unbounded) caps the producer/`publish()` path; it is opt-in
because `MAXLEN` trims the oldest entries by count regardless of ack state, so a
value below `throughput × consumer-lag` can drop un-delivered messages.
`retry_max_len` (default `10_000`) caps the SDK-managed retry and DLQ streams,
bounding the blast radius of a runaway retry loop. This wires up the previously
documented-but-inert `max_len` knob.

### Changed
- **RedisTransport**: The stream consumer name now defaults to a per-process-unique
value (`{handler_id}-{hostname}-{pid}`) while the consumer **group** still defaults
to the stable `handler_id`. A fleet of workers running the same handler now shares
one group (Redis load-balances the stream across them) while each worker owns a
distinct slice of the pending-entries list — the competing-consumers pattern. The
auto-created retry-stream subscriber gets the same per-process-unique consumer, so
retried messages load-balance across a worker fleet too. Pass an explicit
`consumer=` to opt out.

## [0.3.0] - 2026-06-03

### Security
Expand Down
62 changes: 12 additions & 50 deletions sdk/eggai/transport/inmemory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from eggai.schemas import BaseMessage
from eggai.transport import Transport
from eggai.transport.middleware_utils import wrap_handler_with_filters

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -122,56 +123,17 @@ async def subscribe(
group_id = kwargs.get("group_id", handler_id or uuid.uuid4().hex)
key = (channel, group_id)

final_callback = callback

# Handle data_type filtering
if "data_type" in kwargs:
data_type = kwargs["data_type"]

async def data_type_filtered_callback(data):
try:
typed_message = data_type.model_validate(data)
# Check if message type matches expected type
if typed_message.type != data_type.model_fields["type"].default:
return
# Pass the typed message object to the handler
await callback(typed_message)
except Exception:
# Skip messages that don't match the data type
return

final_callback = data_type_filtered_callback

# Handle filter_by_data if present along with data_type
if "filter_by_data" in kwargs:
filter_func = kwargs["filter_by_data"]

async def data_and_filter_callback(data):
try:
typed_message = data_type.model_validate(data)
# Check if message type matches expected type
if typed_message.type != data_type.model_fields["type"].default:
return
# Apply the data filter
if filter_func(typed_message):
await callback(typed_message)
except (json.JSONDecodeError, ValueError, TypeError) as e:
# Skip messages that don't match the data type or filter
logger.debug(f"Message validation failed: {e}")
return

final_callback = data_and_filter_callback

# Handle legacy filter_by_message (for backward compatibility)
elif "filter_by_message" in kwargs:
filter_func = kwargs["filter_by_message"]
original_callback = final_callback # Store original before reassignment

async def filtered_callback(data):
if filter_func(data):
await original_callback(data) # Use original, not final_callback

final_callback = filtered_callback
# Content filtering / typed-subscription handling is shared with the Redis
# and Kafka transports (the consume loop below already hands callbacks a
# decoded dict, the same input the wrapper expects). This keeps validation
# and typed delivery identical across all transports and is the single place
# that rejects invalid filter-option combinations. Tracing stays outermost.
final_callback = wrap_handler_with_filters(
callback,
data_type=kwargs.get("data_type"),
filter_by_data=kwargs.get("filter_by_data"),
filter_by_message=kwargs.get("filter_by_message"),
)

from eggai.tracing import make_tracing_wrapper

Expand Down
54 changes: 24 additions & 30 deletions sdk/eggai/transport/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@

from eggai.schemas import BaseMessage
from eggai.transport.base import Transport
from eggai.transport.middleware_utils import (
create_data_type_middleware,
create_filter_by_data_middleware,
create_filter_middleware,
)
from eggai.transport.middleware_utils import wrap_handler_with_filters


class KafkaTransport(Transport):
Expand Down Expand Up @@ -181,7 +177,14 @@ async def subscribe(self, channel: str, handler, **kwargs) -> Callable:
**kwargs: Additional keyword arguments that can be used to configure the subscription.

Keyword Args:
filter_by_message (Callable, optional): A function to filter incoming messages based on their payload.
filter_by_message (Callable, optional): Predicate applied to the decoded message dict; the handler
is invoked (with the dict) only when it returns truthy.
data_type (BaseModel, optional): A Pydantic model class used to validate and type incoming
messages. Messages that fail validation, or whose ``type`` field does not match the model's
default ``type``, are skipped. Matching messages are passed to the handler as the **typed
model instance** (not the raw dict).
filter_by_data (Callable, optional): Predicate applied to the validated typed message (requires
`data_type`); the handler runs only when it returns truthy.
group_id (Optional[str], optional): The consumer group name for dynamic partition assignment.
auto_offset_reset (str, optional): Policy for resetting offsets ('earliest' or 'latest').

Expand All @@ -190,30 +193,21 @@ async def subscribe(self, channel: str, handler, **kwargs) -> Callable:
"""
from eggai.tracing import make_tracing_wrapper

handler = make_tracing_wrapper(channel, handler)

if "filter_by_message" in kwargs:
if "middlewares" not in kwargs:
kwargs["middlewares"] = []
kwargs["middlewares"].append(
create_filter_middleware(kwargs.pop("filter_by_message"))
)

if "data_type" in kwargs:
data_type = kwargs.pop("data_type")

if "middlewares" not in kwargs:
kwargs["middlewares"] = []
kwargs["middlewares"].append(create_data_type_middleware(data_type))

if "filter_by_data" in kwargs:
if "middlewares" not in kwargs:
kwargs["middlewares"] = []
kwargs["middlewares"].append(
create_filter_by_data_middleware(
data_type, kwargs.pop("filter_by_data")
)
)
# EggAI applies content filtering (filter_by_message) and typed-subscription
# support (data_type / filter_by_data) by wrapping the handler — see
# wrap_handler_with_filters — NOT via FastStream subscriber middlewares,
# which FastStream 0.7 removed. Tracing stays OUTERMOST so traceparent is
# read from the raw decoded dict before a data_type subscription validates
# it into a typed model: tracing( filters( handler ) ).
handler = make_tracing_wrapper(
channel,
wrap_handler_with_filters(
handler,
filter_by_message=kwargs.pop("filter_by_message", None),
data_type=kwargs.pop("data_type", None),
filter_by_data=kwargs.pop("filter_by_data", None),
),
)

# Use handler_id as default group_id (preserves broadcast behavior)
handler_id = kwargs.pop("handler_id", None)
Expand Down
186 changes: 109 additions & 77 deletions sdk/eggai/transport/middleware_utils.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,125 @@
"""
Shared middleware utilities for transport implementations.

This module provides common middleware factories used by Kafka and Redis transports
to handle message filtering and data type validation.
Shared message-filtering utilities for transport implementations.

Kafka and Redis both apply EggAI's content filtering (``filter_by_message``) and
typed-subscription support (``data_type`` / ``filter_by_data``) by wrapping the
handler, *not* via broker subscriber middlewares. FastStream 0.7 removed the
``middlewares`` argument from ``subscriber()`` (and from the broker constructor),
so the old middleware-based approach raised ``TypeError`` at subscribe time. This
handler-wrapping approach is independent of FastStream's middleware API and keeps
the behaviour identical across the Kafka, Redis, and in-memory transports.
"""

import json
from collections.abc import Awaitable, Callable
import inspect
from collections.abc import Callable
from typing import Any

from faststream.message.message import StreamMessage


def create_filter_middleware(filter_func: Callable[[dict[str, Any]], bool]) -> Callable:
"""
Create a middleware that filters messages based on a predicate function.

Args:
filter_func: A function that takes a message dict and returns True if the message
should be processed, False otherwise.
from pydantic import ValidationError

Returns:
A middleware function that applies the filter.
"""

async def middleware(
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[Any],
) -> Any:
if filter_func(json.loads(msg.body.decode("utf-8"))):
return await call_next(msg)
return None
# Identity attributes copied from the user handler onto a wrapper. We intentionally
# do NOT use functools.wraps here: it sets __wrapped__, which inspect.signature
# follows — FastStream would then introspect the *original* handler's signature
# (e.g. ``order: OrderMessage``) and try to decode the message into that type
# itself, before our wrapper runs. Our wrappers must keep their own ``(message)``
# signature so FastStream hands them the raw dict to validate/filter. Copying just
# these attributes preserves the handler's name/docs for logging and AsyncAPI
# without changing what FastStream decodes.
_IDENTITY_ATTRS = ("__module__", "__name__", "__qualname__", "__doc__")

return middleware

def _carry_identity(wrapper: Callable, handler: Callable) -> Callable:
for attr in _IDENTITY_ATTRS:
try:
setattr(wrapper, attr, getattr(handler, attr))
except AttributeError:
pass
return wrapper

def create_data_type_middleware(data_type: type) -> Callable:
"""
Create a middleware that validates and filters messages by data type.

Args:
data_type: A Pydantic model class with a 'type' field that will be used
for validation and filtering.
async def _invoke(handler: Callable, arg: Any) -> Any:
"""Call ``handler`` with ``arg``, awaiting the result only if it is awaitable.

Returns:
A middleware function that validates the message against the data type
and filters out messages that don't match the expected type.
Mirrors ``make_tracing_wrapper``'s tolerance of synchronous handlers: a sync
handler combined with a filter option must not raise ``TypeError`` from an
unconditional ``await``.
"""

async def middleware(
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[Any],
) -> Any:
typed_message = data_type.model_validate(json.loads(msg.body.decode("utf-8")))

if typed_message.type != data_type.model_fields["type"].default:
return None

return await call_next(msg)

return middleware


def create_filter_by_data_middleware(
data_type: type, filter_func: Callable[[Any], bool]
result = handler(arg)
if inspect.isawaitable(result):
return await result
return result


def wrap_handler_with_filters(
handler: Callable,
*,
data_type: type | None = None,
filter_by_data: Callable[[Any], bool] | None = None,
filter_by_message: Callable[[dict[str, Any]], bool] | None = None,
) -> Callable:
"""Wrap ``handler`` with EggAI's content filtering / typed-message support.

The returned coroutine receives the broker-decoded message (a ``dict``) and:

- ``data_type``: validates the dict against the Pydantic model. Messages that
fail validation, or whose ``type`` field does not match the model's default
``type``, are skipped. Matching messages are passed to ``handler`` as the
**typed model instance** (e.g. ``OrderMessage``), not the raw dict.
- ``data_type`` + ``filter_by_data``: as above, and additionally skipped unless
``filter_by_data(typed_message)`` returns truthy.
- ``filter_by_message`` (no ``data_type``): ``handler`` is called with the raw
dict only when ``filter_by_message(dict)`` returns truthy.

Skipped messages return ``None`` without invoking ``handler`` — a clean no-op,
so the broker acknowledges them (they are not retried). When no filtering
option is supplied, ``handler`` is returned unchanged.

``filter_by_message`` and ``data_type`` are mutually exclusive: the former is
the untyped (raw-dict) filter, the latter validates into a typed model and
pairs with ``filter_by_data``. ``filter_by_data`` requires ``data_type``.
Invalid combinations raise ``ValueError`` rather than silently dropping an
option.
"""
Create a middleware that validates messages by data type and applies a filter.

This combines data type validation with a custom filter function that operates
on the validated/typed message object.

Args:
data_type: A Pydantic model class for validation.
filter_func: A function that takes the validated message object and returns
True if it should be processed, False otherwise.
if data_type is not None and filter_by_message is not None:
raise ValueError(
"filter_by_message cannot be combined with data_type. Use filter_by_data "
"(which receives the validated typed message) to filter typed "
"subscriptions, or filter_by_message on its own for raw-dict filtering."
)
if filter_by_data is not None and data_type is None:
raise ValueError(
"filter_by_data requires data_type — it receives the validated typed "
"message. Use filter_by_message to filter on the raw dict instead."
)

if data_type is not None:
if "type" not in data_type.model_fields:
raise ValueError(
f"data_type {data_type.__name__!r} must define a 'type' field "
"(the discriminator used to match messages, as on BaseMessage)."
)
expected_type = data_type.model_fields["type"].default
Comment thread
pontino marked this conversation as resolved.

async def typed_handler(message: dict[str, Any]) -> Any:
Comment thread
pontino marked this conversation as resolved.
try:
typed_message = data_type.model_validate(message)
except (ValidationError, ValueError, TypeError):
# Wrong shape / payload for this data_type — not ours to handle.
return None
if typed_message.type != expected_type:
return None
if filter_by_data is not None and not filter_by_data(typed_message):
return None
return await _invoke(handler, typed_message)

return _carry_identity(typed_handler, handler)

if filter_by_message is not None:

async def filtered_handler(message: dict[str, Any]) -> Any:
if filter_by_message(message):
return await _invoke(handler, message)
return None

Returns:
A middleware function that validates and filters messages.
"""
return _carry_identity(filtered_handler, handler)

async def middleware(
call_next: Callable[[Any], Awaitable[Any]],
msg: StreamMessage[Any],
) -> Any:
data = json.loads(msg.body.decode("utf-8"))
typed_message = data_type.model_validate(data)
if filter_func(typed_message):
return await call_next(msg)
return None

return middleware
return handler
Comment thread
pontino marked this conversation as resolved.
Loading
Loading