Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- **RedisTransport / PEL reclaimer**: connection-resilience kwargs passed to the
transport (`socket_timeout`, `socket_connect_timeout`, `socket_keepalive`,
`socket_keepalive_options`, `health_check_interval`, `retry_on_timeout`,
`retry_on_error`, `max_connections`, and the `ssl_*` options) are now forwarded
to the background reclaimer's independent Redis client. Previously the reclaimer
created its client with no socket timeout or keepalive regardless of the
transport's settings, so a silently dropped connection (e.g. cloud Redis
failover or an idle-connection reaper) left its blocking reads hung indefinitely
with no way to recover. `decode_responses` remains pinned to `False` for binary
passthrough and cannot be overridden by callers.

### Added
- **RedisTransport**: Exponential **retry backoff** for SDK-managed retries. New
`subscribe()` options `retry_backoff_multiplier` (default `1.0` = the previous
Expand Down
15 changes: 13 additions & 2 deletions sdk/eggai/transport/pending_reclaimer.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,15 @@ class PendingReclaimerManager:
message body) can be used for application-level deduplication.
"""

def __init__(self, redis_url: str):
def __init__(self, redis_url: str, connection_kwargs: dict[str, Any] | None = None):
self._redis_url = redis_url
# Connection-resilience settings (socket_timeout, socket_keepalive,
# health_check_interval, retry_on_timeout, …) forwarded from the
# transport so this independent client recovers from a silently dropped
# connection the same way the broker does. Without them a blocking read
# against a half-dead socket (e.g. cloud Redis failover) hangs forever
# with no socket timeout to break it.
self._connection_kwargs: dict[str, Any] = connection_kwargs or {}
self._redis_client: aioredis.Redis | None = None
self._configs: dict[tuple[str, str, str], ReclaimerConfig] = {}
self._tasks: dict[tuple[str, str, str], asyncio.Task] = {}
Expand All @@ -157,7 +164,11 @@ async def start(self) -> None:
"""Start one background task per registered config. Safe to call again after stop."""
# decode_responses=False: field values are kept as raw bytes so that
# FastStream's binary-encoded __data__ field is passed through unchanged.
self._redis_client = aioredis.from_url(self._redis_url, decode_responses=False)
# decode_responses is pinned here and must not be overridden by callers.
self._redis_client = aioredis.from_url(
self._redis_url,
**{**self._connection_kwargs, "decode_responses": False},
)
for key, config in self._configs.items():
if key in self._tasks and not self._tasks[key].done():
continue
Expand Down
31 changes: 30 additions & 1 deletion sdk/eggai/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@
# the stream and own their own PEL entries — the competing-consumers pattern.
_CONSUMER_INSTANCE = f"{socket.gethostname()}-{os.getpid()}"

# Connection settings that, when passed to the broker, must also be forwarded to
# the PEL reclaimer's independent Redis client so both connections recover from a
# silently dropped socket the same way. Kept to redis-py connection/resilience
# kwargs that are valid for ``aioredis.from_url`` — broker/FastStream-only kwargs
# (decoder, middlewares, asyncapi_*, …) are intentionally excluded. ``decode_responses``
# is omitted on purpose: the reclaimer pins it to False for binary passthrough.
_RECLAIMER_CONNECTION_KEYS = (
"socket_timeout",
"socket_connect_timeout",
"socket_keepalive",
"socket_keepalive_options",
"health_check_interval",
"retry_on_timeout",
"retry_on_error",
"max_connections",
"ssl_keyfile",
"ssl_certfile",
"ssl_cert_reqs",
"ssl_ca_certs",
"ssl_check_hostname",
)


@dataclass(frozen=True)
class _StreamGroupInfo:
Expand Down Expand Up @@ -132,6 +154,11 @@ def __init__(
else:
self.broker = RedisBroker(url, log_level=logging.INFO, **kwargs)
self._redis_url = url
# Forward connection-resilience kwargs to the reclaimer's own client so it
# doesn't hang on a silently dropped connection while the broker recovers.
self._connection_kwargs: dict[str, Any] = {
k: kwargs[k] for k in _RECLAIMER_CONNECTION_KEYS if k in kwargs
}
self._max_len = max_len
self._retry_max_len = retry_max_len
self._running = False
Expand Down Expand Up @@ -684,7 +711,9 @@ def _setup_reclaimer(
backoff_jitter: float = 0.0,
) -> tuple[str, str, str]:
if self._reclaimer_manager is None:
self._reclaimer_manager = PendingReclaimerManager(self._redis_url)
self._reclaimer_manager = PendingReclaimerManager(
self._redis_url, connection_kwargs=self._connection_kwargs
)
return self._reclaimer_manager.add(
ReclaimerConfig(
stream=self._get_stream_key(stream),
Expand Down
76 changes: 76 additions & 0 deletions sdk/tests/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,82 @@ async def handler(message):
assert all(config.max_len == 250 for config in configs2)


@pytest.mark.asyncio
async def test_connection_kwargs_propagate_to_reclaimer():
"""Connection-resilience kwargs passed to the transport reach the reclaimer's
own client, so it recovers from a dropped connection like the broker does."""
transport = RedisTransport(
socket_timeout=10.0,
socket_keepalive=True,
health_check_interval=30,
retry_on_timeout=True,
# A broker/FastStream-only kwarg that must NOT leak into the reclaimer's
# aioredis.from_url call.
graceful_timeout=20.0,
)

assert transport._connection_kwargs == {
"socket_timeout": 10.0,
"socket_keepalive": True,
"health_check_interval": 30,
"retry_on_timeout": True,
}

async def handler(message):
return message

await transport.subscribe(
"orders", handler, handler_id="orders-handler-1", retry_on_idle_ms=500
)

assert transport._reclaimer_manager is not None
assert transport._reclaimer_manager._connection_kwargs == {
"socket_timeout": 10.0,
"socket_keepalive": True,
"health_check_interval": 30,
"retry_on_timeout": True,
}


@pytest.mark.asyncio
async def test_reclaimer_start_applies_connection_kwargs(monkeypatch):
"""start() forwards connection_kwargs to from_url while pinning
decode_responses=False (callers cannot override the binary-passthrough flag)."""
from eggai.transport import pending_reclaimer
from eggai.transport.pending_reclaimer import PendingReclaimerManager

captured: dict = {}

class _FakeClient:
async def aclose(self):
pass

def fake_from_url(url, **kwargs):
captured["url"] = url
captured["kwargs"] = kwargs
return _FakeClient()

monkeypatch.setattr(pending_reclaimer.aioredis, "from_url", fake_from_url)

manager = PendingReclaimerManager(
"redis://localhost:6379",
connection_kwargs={
"socket_timeout": 10.0,
"socket_keepalive": True,
# Even if a caller sneaks decode_responses in, it must stay False.
"decode_responses": True,
},
)
await manager.start()

assert captured["url"] == "redis://localhost:6379"
assert captured["kwargs"]["socket_timeout"] == 10.0
assert captured["kwargs"]["socket_keepalive"] is True
assert captured["kwargs"]["decode_responses"] is False

await manager.stop()


@pytest.mark.asyncio
async def test_publish_maxlen_bounds_stream_length():
"""Integration: publishing well past max_len keeps the stream approximately
Expand Down
Loading