-
Notifications
You must be signed in to change notification settings - Fork 0
updates: faststream, start/stop (almost) removed #134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughContainers, providers, and workers were migrated to a KafkaBroker-centric model: a new broker factory was added and the broker is created and passed into DI containers; producers/DLQ now use KafkaBroker; consumer/dispatcher/idempotency primitives were removed; worker entrypoints moved to FastStream apps registering broker subscribers. Changes
Sequence Diagram(s)sequenceDiagram
participant App as FastAPI / FastStream App
participant Broker as KafkaBroker (faststream)
participant DI as Dishka Container
participant DB as Database / Beanie
participant Handler as Domain Handler
App->>DI: create_schema_registry(settings)
App->>Broker: create_broker(settings, schema_registry, logger)
App->>DI: create_container(settings, broker)
App->>Broker: register_subscribers(register_*_subscriber)
App->>Broker: broker.start()
Broker->>Handler: on_message(StreamMessage)
Handler->>DI: resolve services (repos, producer, metrics)
Handler->>DB: persist/read domain state (Beanie)
Handler->>Broker: broker.publish(output_topic, payload, headers)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 issues found across 43 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/workers/run_k8s_worker.py">
<violation number="1" location="backend/workers/run_k8s_worker.py:61">
P2: The bootstrap task is created without retaining a reference. The previous implementation kept the task to prevent premature garbage collection. Keep a reference (or store it on the container/app) so the daemonset bootstrap reliably completes.</violation>
</file>
<file name="backend/workers/run_saga_orchestrator.py">
<violation number="1" location="backend/workers/run_saga_orchestrator.py:79">
P2: Task cancellation should be awaited to ensure clean shutdown. Without awaiting, the `timeout_loop` may still be running when `container.close()` is called, potentially causing race conditions or resource cleanup issues.</violation>
</file>
<file name="backend/workers/dlq_processor.py">
<violation number="1" location="backend/workers/dlq_processor.py:154">
P2: broker.start() is invoked before the AsyncExitStack registers broker.close. If initialization fails before entering the stack, the broker will not be closed, leaking the connection. Consider registering broker.close in the exit stack (or a try/finally) before starting it.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/k8s_worker/worker.py (1)
74-85:⚠️ Potential issue | 🟠 MajorFix race condition: mark execution active before scheduling task.
A race condition exists between checking if
execution_idis in_active_creations(line 79) and actually adding it inside the background task (line 116). A duplicate command can arrive during this window and pass the check, resulting in two pods created for the same execution. Addexecution_idto the set immediately after the check, before spawning the task:# Check if already processing if execution_id in self._active_creations: self.logger.warning(f"Already creating pod for execution {execution_id}") return + # Mark active before scheduling to prevent duplicates + self._active_creations.add(execution_id) + self.metrics.update_k8s_active_creations(len(self._active_creations)) # Create pod asynchronously asyncio.create_task(self._create_pod_for_execution(command))Remove the redundant add and metrics call from inside
_create_pod_for_execution(currently lines 116-117).
🤖 Fix all issues with AI agents
In `@backend/pyproject.toml`:
- Line 128: Update the pinned dependency string "faststream[kafka]==0.6.5" to
"faststream[kafka]==0.6.6" in the project's dependency specification so the
project uses the latest patch release compatible with Python 3.12; commit the
change and run dependency installation/lockfile update (e.g., pip-compile/poetry
update) and tests to verify nothing breaks.
In `@backend/workers/run_event_replay.py`:
- Around line 31-35: The broker created by create_broker(...) is never started
or closed, causing publish calls in EventReplayService.produce() to fail and
leaking connections on shutdown; update the worker to await broker.start() after
creation (using the same tmp_logger/schema_registry context) before creating the
container or running the service, and ensure broker.close() is called in a
finally/cleanup path or on shutdown (mirroring other workers) so the KafkaBroker
is properly started and torn down.
In `@backend/workers/run_k8s_worker.py`:
- Around line 58-67: bootstrap() currently fires-and-forgets the coroutine from
KubernetesWorker.ensure_image_pre_puller_daemonset(), which leaves exceptions
unobserved and prevents proper shutdown; change bootstrap() to save the created
task to a module-level or app-scoped variable (e.g., pre_puller_task), start it
with asyncio.create_task(...), and in shutdown() check if pre_puller_task exists
and is not done, call pre_puller_task.cancel(), then await it inside a
try/except to suppress asyncio.CancelledError and log other exceptions so the
background task is properly cancelled and awaited during shutdown.
In `@backend/workers/run_saga_orchestrator.py`:
- Around line 76-81: In shutdown(), after calling _timeout_task.cancel(), await
the task to ensure it finishes and surface any exceptions: call await
_timeout_task inside a try/except that handles asyncio.CancelledError (ignore)
and logs any other Exception via logger, so exceptions are not swallowed and
pending-task warnings are avoided; keep the existing await container.close() and
logger.info("SagaOrchestrator shutdown complete") afterwards and reference the
shutdown function and _timeout_task by name when making the change.
🧹 Nitpick comments (3)
backend/app/services/coordinator/coordinator.py (1)
72-132: Consider explicit type guards for the new public handlers.
Now that these handlers are public and invoked by subscribers, a fast‑fail type check (as insaga_orchestrator) makes misrouted events obvious. Apply to all handler methods.♻️ Suggested pattern
async def handle_execution_requested(self, event: ExecutionRequestedEvent) -> None: """Handle execution requested event - add to queue for processing.""" + if not isinstance(event, ExecutionRequestedEvent): + raise TypeError(f"Expected ExecutionRequestedEvent, got {type(event).__name__}") self.logger.info(f"HANDLER CALLED: handle_execution_requested for event {event.event_id}")backend/workers/run_pod_monitor.py (1)
37-60: Ensure broker/container cleanup on early startup failures.
broker.start()happens before thetryblock, so exceptions duringcontainer.get(PodMonitor)or signal setup would bypass cleanup. Wrap post-start initialization in the sametry/finally(or useAsyncExitStack) to guarantee shutdown.Suggested refactor
- await broker.start() - - # Services are already started by the DI container providers - monitor = await container.get(PodMonitor) - - # Shutdown event - signal handlers just set this - shutdown_event = asyncio.Event() - loop = asyncio.get_running_loop() - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, shutdown_event.set) - - logger.info("PodMonitor started and running") - - try: - # Wait for shutdown signal or service to stop - while monitor.state == MonitorState.RUNNING and not shutdown_event.is_set(): - await asyncio.sleep(RECONCILIATION_LOG_INTERVAL) - status = await monitor.get_status() - logger.info(f"Pod monitor status: {status}") - finally: - # Container cleanup stops everything - logger.info("Initiating graceful shutdown...") - await broker.close() - await container.close() + await broker.start() + try: + # Services are already started by the DI container providers + monitor = await container.get(PodMonitor) + + # Shutdown event - signal handlers just set this + shutdown_event = asyncio.Event() + loop = asyncio.get_running_loop() + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, shutdown_event.set) + + logger.info("PodMonitor started and running") + + # Wait for shutdown signal or service to stop + while monitor.state == MonitorState.RUNNING and not shutdown_event.is_set(): + await asyncio.sleep(RECONCILIATION_LOG_INTERVAL) + status = await monitor.get_status() + logger.info(f"Pod monitor status: {status}") + finally: + # Container cleanup stops everything + logger.info("Initiating graceful shutdown...") + await broker.close() + await container.close()backend/workers/dlq_processor.py (1)
151-199: Guard broker/container cleanup on startup exceptions.
broker.start()runs beforeAsyncExitStackis entered. If initialization fails before the stack, broker/container cleanup is skipped. Moving the stack earlier ensures cleanup is always registered.Suggested refactor
- tmp_logger = setup_logger(settings.LOG_LEVEL) - schema_registry = SchemaRegistryManager(settings, tmp_logger) - broker = create_broker(settings, schema_registry, tmp_logger) - await broker.start() - - container = create_dlq_processor_container(settings, broker) - logger = await container.get(logging.Logger) - logger.info("Starting DLQ Processor...") - - db = await container.get(Database) - await init_beanie(database=db, document_models=ALL_DOCUMENTS) - - manager = await container.get(DLQManager) - - _configure_retry_policies(manager, logger) - _configure_filters(manager, testing=settings.TESTING, logger=logger) - - topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DEAD_LETTER_QUEUE}" - consumer = AIOKafkaConsumer( - topic_name, - bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, - group_id=GroupId.DLQ_MANAGER, - enable_auto_commit=False, - auto_offset_reset="earliest", - client_id="dlq-manager-consumer", - session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS, - heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS, - max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS, - request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS, - ) - - stop_event = asyncio.Event() - loop = asyncio.get_running_loop() - - def signal_handler() -> None: - logger.info("Received signal, initiating shutdown...") - stop_event.set() - - for sig in (signal.SIGINT, signal.SIGTERM): - loop.add_signal_handler(sig, signal_handler) - - async with AsyncExitStack() as stack: - stack.push_async_callback(broker.close) - stack.push_async_callback(container.close) - await consumer.start() - stack.push_async_callback(consumer.stop) + tmp_logger = setup_logger(settings.LOG_LEVEL) + schema_registry = SchemaRegistryManager(settings, tmp_logger) + broker = create_broker(settings, schema_registry, tmp_logger) + container = create_dlq_processor_container(settings, broker) + + async with AsyncExitStack() as stack: + stack.push_async_callback(container.close) + await broker.start() + stack.push_async_callback(broker.close) + + logger = await container.get(logging.Logger) + logger.info("Starting DLQ Processor...") + + db = await container.get(Database) + await init_beanie(database=db, document_models=ALL_DOCUMENTS) + + manager = await container.get(DLQManager) + + _configure_retry_policies(manager, logger) + _configure_filters(manager, testing=settings.TESTING, logger=logger) + + topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DEAD_LETTER_QUEUE}" + consumer = AIOKafkaConsumer( + topic_name, + bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, + group_id=GroupId.DLQ_MANAGER, + enable_auto_commit=False, + auto_offset_reset="earliest", + client_id="dlq-manager-consumer", + session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS, + heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS, + max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS, + request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS, + ) + + stop_event = asyncio.Event() + loop = asyncio.get_running_loop() + + def signal_handler() -> None: + logger.info("Received signal, initiating shutdown...") + stop_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + loop.add_signal_handler(sig, signal_handler) + + await consumer.start() + stack.push_async_callback(consumer.stop)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 22 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/workers/dlq_processor.py">
<violation number="1" location="backend/workers/dlq_processor.py:22">
P2: `logging.basicConfig(...)` attaches a plain root handler while `setup_logger` already configures JSON/sanitized logging. Because the `integr8scode` logger propagates, messages will be emitted a second time without sanitization/structure. Disable propagation on this logger or remove the basicConfig call to avoid leaking unsanitized data and duplicate logs.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/dlq/manager.py (1)
164-189:⚠️ Potential issue | 🟠 MajorUse Avro serialization for retried messages to match broker's Avro decoder.
The
retry_messagemethod publishes to both the retry and original topic using JSON serialization (json.dumps(event.model_dump(mode="json")).encode()), but the broker is configured for Avro deserialization. Downstream consumers will fail to decode these messages. Theschema_registryis already available as an instance variable and is used correctly elsewhere in the class.🛠️ Suggested fix
event = message.event - serialized = json.dumps(event.model_dump(mode="json")).encode() + serialized = await self.schema_registry.serialize_event(event)
🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 313-365: The code currently saves the DLQ message and records
metrics before calling manager.handle_message, causing filtered messages to be
persisted and counted twice; in register_dlq_subscriber/on_dlq_message remove
the explicit await manager.repository.save_message(dlq_msg) and move the metric
calls (manager.metrics.record_dlq_message_received and
manager.metrics.record_dlq_message_age and the final
record_dlq_processing_duration) to after await manager.handle_message(dlq_msg)
(keep the tracing block around handle_message), since manager.handle_message
already applies filters and performs saving so metrics and explicit save must
only run post-handle to avoid double-save and false counts.
🧹 Nitpick comments (4)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (2)
62-74: Consider more realistic fake behavior for edge case testing.A few observations on the fake implementation:
create_allocationuses a hardcodedallocation_id="alloc-1"- if a test creates multiple allocations, they'll share the same ID, which could mask bugs.release_allocationalways returnsTrue, but the real implementation returnsFalsewhen the allocation doesn't exist. This could hide bugs in compensation logic that depend on accurate return values.These are acceptable for current tests but may cause issues if test coverage expands.
♻️ Optional: More realistic fake implementation
+ _alloc_counter: int = 0 + async def create_allocation(self, create_data: DomainResourceAllocationCreate) -> DomainResourceAllocation: + self._alloc_counter += 1 alloc = DomainResourceAllocation( - allocation_id="alloc-1", + allocation_id=f"alloc-{self._alloc_counter}", **create_data.model_dump(), ) self.allocations.append(alloc) return alloc async def release_allocation(self, allocation_id: str) -> bool: - return True + return any(a.allocation_id == allocation_id for a in self.allocations)
92-96: Test assertion could be more precise.The assertion
len(fake_repo.saved) >= 1is quite permissive. Since the comment indicates the saga "runs to completion," consider:
- Asserting a specific expected count (e.g.,
== 2for initial creation + completion update) to catch unexpected duplicate saves.- Validating the final saga state (e.g.,
SagaState.COMPLETED) to confirm the saga actually completed successfully rather than just started.The current test verifies the saga was triggered but doesn't confirm it reached the expected terminal state.
♻️ Suggested: Add state validation
# The saga is created and fully executed (steps run to completion) assert len(fake_repo.saved) >= 1 first_saved = fake_repo.saved[0] assert first_saved.execution_id == "e" assert first_saved.saga_name == ExecutionSaga.get_name() + # Verify final state + last_saved = fake_repo.saved[-1] + assert last_saved.state == SagaState.COMPLETEDbackend/app/core/providers.py (2)
125-143: Consider handlingupdate_configfailure more gracefully.If
update_configfails within thetryblock at line 139, the error is logged but the service is still returned without any default configuration. This could lead to rate limiting being effectively disabled without clear indication.Consider whether the service should fail initialization if rate limit config cannot be established, or ensure a fallback is applied.
747-762: Avoid accessing private attribute_last_resource_versiondirectly.Line 753 directly manipulates
monitor._last_resource_version, which breaks encapsulation. If the internal implementation ofPodMonitorchanges, this code will silently break.Consider adding a public method to
PodMonitorfor resetting the watch cursor:♻️ Suggested approach
Add a method to
PodMonitor:def reset_watch_cursor(self) -> None: """Reset the resource version cursor for watch reconnection.""" self._last_resource_version = NoneThen update the provider:
if e.status == 410: logger.warning("Resource version expired, resetting watch cursor") - monitor._last_resource_version = None + monitor.reset_watch_cursor() kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 11 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/events/handlers.py">
<violation number="1" location="backend/app/events/handlers.py:358">
P3: DLQ “message received”/age metrics are now recorded only after `handle_message` succeeds. If `handle_message` raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a `finally` block.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| ): | ||
| await manager.handle_message(dlq_msg) | ||
|
|
||
| manager.metrics.record_dlq_message_received(dlq_msg.original_topic, dlq_msg.event.event_type) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P3: DLQ “message received”/age metrics are now recorded only after handle_message succeeds. If handle_message raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a finally block.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/handlers.py, line 358:
<comment>DLQ “message received”/age metrics are now recorded only after `handle_message` succeeds. If `handle_message` raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a `finally` block.</comment>
<file context>
@@ -360,6 +355,10 @@ async def on_dlq_message(
):
await manager.handle_message(dlq_msg)
+ manager.metrics.record_dlq_message_received(dlq_msg.original_topic, dlq_msg.event.event_type)
+ manager.metrics.record_dlq_message_age(
+ (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds()
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 75-129: Header values arrive as bytes so the current lambda
filters comparing msg.headers.get("event_type", "") directly against EventType
strings always fail; add a small helper like _normalize_event_type(msg) that
reads raw.headers or msg.headers, decodes the event_type bytes to str (safe to
return "" if missing), and then update all event_type filters (e.g., in
register_coordinator_subscriber's
on_execution_requested/on_execution_completed/on_execution_failed/on_execution_cancelled
and the other register_*_subscriber functions) to use lambda msg:
_normalize_event_type(msg) == EventType.EXECUTION_REQUESTED (or the appropriate
EventType) so comparisons succeed; ensure the helper handles None and empty
headers consistently.
In `@backend/workers/run_pod_monitor.py`:
- Around line 42-56: The current pattern calls asyncio.run(app.run()) which
passes a coroutine directly instead of awaiting FastStream.run(); change to
define an async wrapper (e.g., async def run_app(): await app.run()) and then
call asyncio.run(run_app()) so FastStream.run() is awaited properly; update this
in run_pod_monitor.py (identify symbols: FastStream, app, app.run, asyncio.run),
and apply the same change to the other worker files: run_saga_orchestrator.py,
run_k8s_worker.py, run_coordinator.py, dlq_processor.py, and
run_result_processor.py.
In `@backend/workers/run_saga_orchestrator.py`:
- Around line 35-42: The SchemaRegistryManager instance is never closed, leaking
AsyncSchemaRegistryClient's HTTP pool; add an async close method on
SchemaRegistryManager (e.g., async def close(self): await self._client.close())
and store the created instance in the module (schema_registry =
SchemaRegistryManager(...)); then update the application shutdown handler (the
`@app.on_shutdown` coroutine) to await schema_registry.close() before awaiting
container.close(). Ensure create_broker(...) still receives the schema_registry
instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@backend/app/core/dishka_lifespan.py`:
- Around line 68-81: The code currently awaits broker.start() outside the try
block so if broker.start() raises the DI-initialized resources (e.g.,
NotificationScheduler created by container.get(NotificationScheduler)) are never
cleaned up; wrap the broker start in a try/except/finally or move it inside the
existing try so any exception triggers cleanup: call container/DI shutdown or
stop the NotificationScheduler and ensure broker.stop() is invoked if
broker.start() succeeded partially, log the error with context, and re-raise the
exception so the app fails fast; refer to NotificationScheduler,
container.get(NotificationScheduler), broker.start(), and broker.stop() when
making the changes.
🧹 Nitpick comments (4)
backend/app/core/providers.py (3)
236-272: Thedatabaseparameter enforces DI ordering but could benefit from a clarifying comment.The
database: Databaseparameter (line 245) ensures Beanie initialization completes before the DLQManager starts, but it's not used in the method body. While this pattern appears intentionally in other providers (e.g.,get_notification_schedulerat line 558), adding a brief comment would clarify its purpose for future maintainers.📝 Suggested documentation
async def get_dlq_manager( self, broker: KafkaBroker, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger, dlq_metrics: DLQMetrics, repository: DLQRepository, - database: Database, + database: Database, # noqa: ARG002 - ensures Beanie init completes before DLQ starts ) -> AsyncIterator[DLQManager]:
747-762: Accessing private attribute_last_resource_versionbreaks encapsulation.The
_watch_cyclefunction directly accessesmonitor._last_resource_version(line 753) to reset the watch cursor. This couples the provider to internal implementation details ofPodMonitor.Consider adding a public method to
PodMonitorfor resetting the watch cursor:♻️ Suggested approach
Add a public method to
PodMonitor:# In PodMonitor class def reset_watch_cursor(self) -> None: """Reset watch cursor after resource version expiration.""" self._last_resource_version = NoneThen update the provider:
- monitor._last_resource_version = None + monitor.reset_watch_cursor()
918-927: Consider making cleanup retention period configurable.The
older_than_hours: 48is hardcoded. For operational flexibility, consider moving this toSettings:📝 Suggested change
scheduler.add_job( service.cleanup_old_sessions, trigger="interval", hours=6, - kwargs={"older_than_hours": 48}, + kwargs={"older_than_hours": settings.REPLAY_SESSION_RETENTION_HOURS}, id="replay_cleanup_old_sessions", max_instances=1, misfire_grace_time=300, )This would require adding
REPLAY_SESSION_RETENTION_HOURS: int = 48to your Settings class and injectingsettings: Settingsinto the provider method.backend/app/events/handlers.py (1)
316-364: Consider defensive header decoding, but recognize that malformed headers indicate an upstream Kafka/aiokafka issue.According to aiokafka and faststream documentation,
ConsumerRecord.headersis typed asSequence[tuple[str, bytes]], meaning header values should always be bytes. The current codeheaders = {k: v.decode() for k, v in (raw.headers or [])}is technically correct under that contract. However, adding defensive type checks is reasonable robustness engineering, especially in a DLQ handler. Consider the proposed fix if you want to tolerate malformed headers gracefully, but note this addresses edge cases outside the normal Kafka specification, not a guaranteed failure mode.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/notification_service.py (1)
155-160:⚠️ Potential issue | 🟡 MinorInconsistent TESTING bypass for throttle checks.
The TESTING mode bypass is added here but not in
_create_system_for_user(lines 267-275), which has its own throttle check. In TESTING mode, direct calls tocreate_notificationwill bypass throttling, but calls throughcreate_system_notificationwill still be throttled at the_create_system_for_userlevel before reaching this code.🔧 Proposed fix for consistent TESTING bypass
async def _create_system_for_user( self, user_id: str, cfg: SystemConfig, title: str, base_context: NotificationContext, tags: list[str], ) -> str: try: - if not cfg.throttle_exempt: + if not cfg.throttle_exempt and not self.settings.TESTING: throttled = await self._throttle_cache.check_throttle( user_id, cfg.severity, window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS, max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR, ) if throttled: return "throttled"



Summary by cubic
Switched event handling to FastStream KafkaBroker and removed nearly all manual start/stop lifecycle code. Events are now consumed via FastStream subscribers with Avro decoding, and services publish through a broker-backed UnifiedProducer.
New Features
Refactors
Written for commit 644d06f. Summary will update on new commits.
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Chores