Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 3, 2026


Summary by cubic

Switched event handling to FastStream KafkaBroker and removed nearly all manual start/stop lifecycle code. Events are now consumed via FastStream subscribers with Avro decoding, and services publish through a broker-backed UnifiedProducer.

  • New Features

    • Added FastStream KafkaBroker with Dishka integration for FastAPI and worker processes.
    • Introduced an Avro decoder using SchemaRegistryManager for subscriber payloads.
    • Registered FastStream subscribers for EventStore, notifications, SSE routing, coordinator, result processor, k8s worker, pod monitor, saga orchestrator, and DLQ processor.
    • UnifiedProducer now uses KafkaBroker; DLQManager publishes via the broker.
  • Refactors

    • Removed UnifiedConsumer, EventDispatcher, DLQ handler helpers, consumer types, and idempotency middleware.
    • Simplified DI providers and containers to accept KafkaBroker; lifecycle is handled by FastStream instead of manual start/stop.
    • Updated app startup to initialize the broker and wire Dishka for both FastAPI and FastStream.
    • Reworked workers to create the broker and register subscribers directly; simplified PodMonitor and tests accordingly.
    • Dependencies: added faststream[kafka], upgraded dishka to 1.7.2, pinned aiokafka to 0.12.0 for remaining test usage.

Written for commit 644d06f. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Central Kafka broker with Avro decoding and managed lifecycle; workers now run via FastStream lifecycles and register event subscribers.
  • Bug Fixes

    • More reliable DLQ handling with improved retry policies, filtering, and guaranteed broker shutdown.
  • Refactor

    • Simplified event pipeline and lifecycles: moved to broker + scheduler model; removed legacy consumer/dispatcher/idempotency indirection.
  • Chores

    • Dependency updates and test adjustments.

@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Containers, providers, and workers were migrated to a KafkaBroker-centric model: a new broker factory was added and the broker is created and passed into DI containers; producers/DLQ now use KafkaBroker; consumer/dispatcher/idempotency primitives were removed; worker entrypoints moved to FastStream apps registering broker subscribers.

Changes

Cohort / File(s) Summary
DI Container & Lifespan
backend/app/core/container.py, backend/app/core/dishka_lifespan.py
All container builders now accept a KafkaBroker parameter and include KafkaBroker in DI context; lifespan starts/stops broker and resolves services via DI.
Broker & Avro
backend/app/events/broker.py
Added create_broker() and an Avro decoder factory to construct a KafkaBroker wired to SchemaRegistryManager and settings.
Providers & DI refactor
backend/app/core/providers.py
Major provider rewrite to broker-driven provisioning: unified producer, schema registry, event store, DLQManager, APScheduler-based jobs, Beanie init moved into DB provider; many provider signatures updated to accept broker and managed lifecycles.
Producer & DLQ
backend/app/events/core/producer.py, backend/app/dlq/manager.py
UnifiedProducer and DLQManager constructors now take KafkaBroker; publishing uses broker.publish() with header dicts; DLQManager adds retry policies, parsing helpers, and monitoring cycle.
Event Handlers / Subscribers
backend/app/events/handlers.py
New broker subscriber registration functions (coordinator, k8s worker, result processor, saga, event store, SSE, notifications, DLQ) and idempotency helper used by subscribers.
Removed consumer/dispatcher/idempotency
backend/app/events/core/consumer.py, backend/app/events/core/dispatcher.py, backend/app/events/core/dlq_handler.py, backend/app/events/core/types.py, backend/app/services/idempotency/middleware.py, backend/app/events/core/__init__.py
Deleted: UnifiedConsumer, EventDispatcher, dlq handler factories, consumer types/metrics/status models, idempotent middleware; package exports pruned.
Service handler method changes
backend/app/services/coordinator/..., backend/app/services/k8s_worker/worker.py, backend/app/services/notification_service.py
Removed dispatcher constructor params; internal wrapper handlers converted to direct public handler methods and invoked directly.
PodMonitor refactor & config
backend/app/services/pod_monitor/monitor.py, backend/app/services/pod_monitor/config.py
Removed state machine and reconciliation config fields; switched to list-then-watch watch_pod_events flow and removed lifecycle/status APIs.
Workers → FastStream entrypoints
backend/workers/*.py (run_coordinator.py, run_k8s_worker.py, run_result_processor.py, run_saga_orchestrator.py, dlq_processor.py, run_pod_monitor.py, run_event_replay.py)
Replaced run_* coroutines with main() FastStream apps that create schema registry + broker, register subscribers, wire DI with broker, and use startup/shutdown hooks to init/close resources.
App wiring
backend/app/main.py
App now builds SchemaRegistryManager and KafkaBroker, stores broker on app.state, registers subscribers, and passes broker into container creation.
Tests & fixtures
backend/tests/e2e/*, backend/tests/unit/*, backend/tests/*/conftest.py
Many consumer/dispatcher/idempotency tests removed or simplified; tests updated to use public handler methods or broker-backed DLQManager; PodList fixture now exposes resource_version.
Dependencies
backend/pyproject.toml
Added faststream[kafka]==0.6.6 and updated aiokafka / dishka versions.

Sequence Diagram(s)

sequenceDiagram
    participant App as FastAPI / FastStream App
    participant Broker as KafkaBroker (faststream)
    participant DI as Dishka Container
    participant DB as Database / Beanie
    participant Handler as Domain Handler

    App->>DI: create_schema_registry(settings)
    App->>Broker: create_broker(settings, schema_registry, logger)
    App->>DI: create_container(settings, broker)
    App->>Broker: register_subscribers(register_*_subscriber)
    App->>Broker: broker.start()
    Broker->>Handler: on_message(StreamMessage)
    Handler->>DI: resolve services (repos, producer, metrics)
    Handler->>DB: persist/read domain state (Beanie)
    Handler->>Broker: broker.publish(output_topic, payload, headers)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Poem

🐰 I hopped from queues to broker song,
New subscribers hum where consumers throng,
DI plants seeds, the broker starts to sing,
Retries and DLQ dances take to wing,
A rabbit cheers: new streams for everything!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.78% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'updates: faststream, start/stop (almost) removed' clearly and specifically summarizes the main changes: adoption of FastStream and removal of manual start/stop lifecycle code.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/start-stop

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 issues found across 43 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/workers/run_k8s_worker.py">

<violation number="1" location="backend/workers/run_k8s_worker.py:61">
P2: The bootstrap task is created without retaining a reference. The previous implementation kept the task to prevent premature garbage collection. Keep a reference (or store it on the container/app) so the daemonset bootstrap reliably completes.</violation>
</file>

<file name="backend/workers/run_saga_orchestrator.py">

<violation number="1" location="backend/workers/run_saga_orchestrator.py:79">
P2: Task cancellation should be awaited to ensure clean shutdown. Without awaiting, the `timeout_loop` may still be running when `container.close()` is called, potentially causing race conditions or resource cleanup issues.</violation>
</file>

<file name="backend/workers/dlq_processor.py">

<violation number="1" location="backend/workers/dlq_processor.py:154">
P2: broker.start() is invoked before the AsyncExitStack registers broker.close. If initialization fails before entering the stack, the broker will not be closed, leaking the connection. Consider registering broker.close in the exit stack (or a try/finally) before starting it.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/k8s_worker/worker.py (1)

74-85: ⚠️ Potential issue | 🟠 Major

Fix race condition: mark execution active before scheduling task.

A race condition exists between checking if execution_id is in _active_creations (line 79) and actually adding it inside the background task (line 116). A duplicate command can arrive during this window and pass the check, resulting in two pods created for the same execution. Add execution_id to the set immediately after the check, before spawning the task:

        # Check if already processing
        if execution_id in self._active_creations:
            self.logger.warning(f"Already creating pod for execution {execution_id}")
            return

+       # Mark active before scheduling to prevent duplicates
+       self._active_creations.add(execution_id)
+       self.metrics.update_k8s_active_creations(len(self._active_creations))
        # Create pod asynchronously
        asyncio.create_task(self._create_pod_for_execution(command))

Remove the redundant add and metrics call from inside _create_pod_for_execution (currently lines 116-117).

🤖 Fix all issues with AI agents
In `@backend/pyproject.toml`:
- Line 128: Update the pinned dependency string "faststream[kafka]==0.6.5" to
"faststream[kafka]==0.6.6" in the project's dependency specification so the
project uses the latest patch release compatible with Python 3.12; commit the
change and run dependency installation/lockfile update (e.g., pip-compile/poetry
update) and tests to verify nothing breaks.

In `@backend/workers/run_event_replay.py`:
- Around line 31-35: The broker created by create_broker(...) is never started
or closed, causing publish calls in EventReplayService.produce() to fail and
leaking connections on shutdown; update the worker to await broker.start() after
creation (using the same tmp_logger/schema_registry context) before creating the
container or running the service, and ensure broker.close() is called in a
finally/cleanup path or on shutdown (mirroring other workers) so the KafkaBroker
is properly started and torn down.

In `@backend/workers/run_k8s_worker.py`:
- Around line 58-67: bootstrap() currently fires-and-forgets the coroutine from
KubernetesWorker.ensure_image_pre_puller_daemonset(), which leaves exceptions
unobserved and prevents proper shutdown; change bootstrap() to save the created
task to a module-level or app-scoped variable (e.g., pre_puller_task), start it
with asyncio.create_task(...), and in shutdown() check if pre_puller_task exists
and is not done, call pre_puller_task.cancel(), then await it inside a
try/except to suppress asyncio.CancelledError and log other exceptions so the
background task is properly cancelled and awaited during shutdown.

In `@backend/workers/run_saga_orchestrator.py`:
- Around line 76-81: In shutdown(), after calling _timeout_task.cancel(), await
the task to ensure it finishes and surface any exceptions: call await
_timeout_task inside a try/except that handles asyncio.CancelledError (ignore)
and logs any other Exception via logger, so exceptions are not swallowed and
pending-task warnings are avoided; keep the existing await container.close() and
logger.info("SagaOrchestrator shutdown complete") afterwards and reference the
shutdown function and _timeout_task by name when making the change.
🧹 Nitpick comments (3)
backend/app/services/coordinator/coordinator.py (1)

72-132: Consider explicit type guards for the new public handlers.
Now that these handlers are public and invoked by subscribers, a fast‑fail type check (as in saga_orchestrator) makes misrouted events obvious. Apply to all handler methods.

♻️ Suggested pattern
 async def handle_execution_requested(self, event: ExecutionRequestedEvent) -> None:
     """Handle execution requested event - add to queue for processing."""
+    if not isinstance(event, ExecutionRequestedEvent):
+        raise TypeError(f"Expected ExecutionRequestedEvent, got {type(event).__name__}")
     self.logger.info(f"HANDLER CALLED: handle_execution_requested for event {event.event_id}")
backend/workers/run_pod_monitor.py (1)

37-60: Ensure broker/container cleanup on early startup failures.

broker.start() happens before the try block, so exceptions during container.get(PodMonitor) or signal setup would bypass cleanup. Wrap post-start initialization in the same try/finally (or use AsyncExitStack) to guarantee shutdown.

Suggested refactor
-    await broker.start()
-
-    # Services are already started by the DI container providers
-    monitor = await container.get(PodMonitor)
-
-    # Shutdown event - signal handlers just set this
-    shutdown_event = asyncio.Event()
-    loop = asyncio.get_running_loop()
-    for sig in (signal.SIGINT, signal.SIGTERM):
-        loop.add_signal_handler(sig, shutdown_event.set)
-
-    logger.info("PodMonitor started and running")
-
-    try:
-        # Wait for shutdown signal or service to stop
-        while monitor.state == MonitorState.RUNNING and not shutdown_event.is_set():
-            await asyncio.sleep(RECONCILIATION_LOG_INTERVAL)
-            status = await monitor.get_status()
-            logger.info(f"Pod monitor status: {status}")
-    finally:
-        # Container cleanup stops everything
-        logger.info("Initiating graceful shutdown...")
-        await broker.close()
-        await container.close()
+    await broker.start()
+    try:
+        # Services are already started by the DI container providers
+        monitor = await container.get(PodMonitor)
+
+        # Shutdown event - signal handlers just set this
+        shutdown_event = asyncio.Event()
+        loop = asyncio.get_running_loop()
+        for sig in (signal.SIGINT, signal.SIGTERM):
+            loop.add_signal_handler(sig, shutdown_event.set)
+
+        logger.info("PodMonitor started and running")
+
+        # Wait for shutdown signal or service to stop
+        while monitor.state == MonitorState.RUNNING and not shutdown_event.is_set():
+            await asyncio.sleep(RECONCILIATION_LOG_INTERVAL)
+            status = await monitor.get_status()
+            logger.info(f"Pod monitor status: {status}")
+    finally:
+        # Container cleanup stops everything
+        logger.info("Initiating graceful shutdown...")
+        await broker.close()
+        await container.close()
backend/workers/dlq_processor.py (1)

151-199: Guard broker/container cleanup on startup exceptions.

broker.start() runs before AsyncExitStack is entered. If initialization fails before the stack, broker/container cleanup is skipped. Moving the stack earlier ensures cleanup is always registered.

Suggested refactor
-    tmp_logger = setup_logger(settings.LOG_LEVEL)
-    schema_registry = SchemaRegistryManager(settings, tmp_logger)
-    broker = create_broker(settings, schema_registry, tmp_logger)
-    await broker.start()
-
-    container = create_dlq_processor_container(settings, broker)
-    logger = await container.get(logging.Logger)
-    logger.info("Starting DLQ Processor...")
-
-    db = await container.get(Database)
-    await init_beanie(database=db, document_models=ALL_DOCUMENTS)
-
-    manager = await container.get(DLQManager)
-
-    _configure_retry_policies(manager, logger)
-    _configure_filters(manager, testing=settings.TESTING, logger=logger)
-
-    topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DEAD_LETTER_QUEUE}"
-    consumer = AIOKafkaConsumer(
-        topic_name,
-        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
-        group_id=GroupId.DLQ_MANAGER,
-        enable_auto_commit=False,
-        auto_offset_reset="earliest",
-        client_id="dlq-manager-consumer",
-        session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
-        heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
-        max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS,
-        request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
-    )
-
-    stop_event = asyncio.Event()
-    loop = asyncio.get_running_loop()
-
-    def signal_handler() -> None:
-        logger.info("Received signal, initiating shutdown...")
-        stop_event.set()
-
-    for sig in (signal.SIGINT, signal.SIGTERM):
-        loop.add_signal_handler(sig, signal_handler)
-
-    async with AsyncExitStack() as stack:
-        stack.push_async_callback(broker.close)
-        stack.push_async_callback(container.close)
-        await consumer.start()
-        stack.push_async_callback(consumer.stop)
+    tmp_logger = setup_logger(settings.LOG_LEVEL)
+    schema_registry = SchemaRegistryManager(settings, tmp_logger)
+    broker = create_broker(settings, schema_registry, tmp_logger)
+    container = create_dlq_processor_container(settings, broker)
+
+    async with AsyncExitStack() as stack:
+        stack.push_async_callback(container.close)
+        await broker.start()
+        stack.push_async_callback(broker.close)
+
+        logger = await container.get(logging.Logger)
+        logger.info("Starting DLQ Processor...")
+
+        db = await container.get(Database)
+        await init_beanie(database=db, document_models=ALL_DOCUMENTS)
+
+        manager = await container.get(DLQManager)
+
+        _configure_retry_policies(manager, logger)
+        _configure_filters(manager, testing=settings.TESTING, logger=logger)
+
+        topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{KafkaTopic.DEAD_LETTER_QUEUE}"
+        consumer = AIOKafkaConsumer(
+            topic_name,
+            bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
+            group_id=GroupId.DLQ_MANAGER,
+            enable_auto_commit=False,
+            auto_offset_reset="earliest",
+            client_id="dlq-manager-consumer",
+            session_timeout_ms=settings.KAFKA_SESSION_TIMEOUT_MS,
+            heartbeat_interval_ms=settings.KAFKA_HEARTBEAT_INTERVAL_MS,
+            max_poll_interval_ms=settings.KAFKA_MAX_POLL_INTERVAL_MS,
+            request_timeout_ms=settings.KAFKA_REQUEST_TIMEOUT_MS,
+        )
+
+        stop_event = asyncio.Event()
+        loop = asyncio.get_running_loop()
+
+        def signal_handler() -> None:
+            logger.info("Received signal, initiating shutdown...")
+            stop_event.set()
+
+        for sig in (signal.SIGINT, signal.SIGTERM):
+            loop.add_signal_handler(sig, signal_handler)
+
+        await consumer.start()
+        stack.push_async_callback(consumer.stop)

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 22 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/workers/dlq_processor.py">

<violation number="1" location="backend/workers/dlq_processor.py:22">
P2: `logging.basicConfig(...)` attaches a plain root handler while `setup_logger` already configures JSON/sanitized logging. Because the `integr8scode` logger propagates, messages will be emitted a second time without sanitization/structure. Disable propagation on this logger or remove the basicConfig call to avoid leaking unsanitized data and duplicate logs.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/dlq/manager.py (1)

164-189: ⚠️ Potential issue | 🟠 Major

Use Avro serialization for retried messages to match broker's Avro decoder.

The retry_message method publishes to both the retry and original topic using JSON serialization (json.dumps(event.model_dump(mode="json")).encode()), but the broker is configured for Avro deserialization. Downstream consumers will fail to decode these messages. The schema_registry is already available as an instance variable and is used correctly elsewhere in the class.

🛠️ Suggested fix
         event = message.event
-        serialized = json.dumps(event.model_dump(mode="json")).encode()
+        serialized = await self.schema_registry.serialize_event(event)
🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 313-365: The code currently saves the DLQ message and records
metrics before calling manager.handle_message, causing filtered messages to be
persisted and counted twice; in register_dlq_subscriber/on_dlq_message remove
the explicit await manager.repository.save_message(dlq_msg) and move the metric
calls (manager.metrics.record_dlq_message_received and
manager.metrics.record_dlq_message_age and the final
record_dlq_processing_duration) to after await manager.handle_message(dlq_msg)
(keep the tracing block around handle_message), since manager.handle_message
already applies filters and performs saving so metrics and explicit save must
only run post-handle to avoid double-save and false counts.
🧹 Nitpick comments (4)
backend/tests/unit/services/saga/test_saga_orchestrator_unit.py (2)

62-74: Consider more realistic fake behavior for edge case testing.

A few observations on the fake implementation:

  1. create_allocation uses a hardcoded allocation_id="alloc-1" - if a test creates multiple allocations, they'll share the same ID, which could mask bugs.
  2. release_allocation always returns True, but the real implementation returns False when the allocation doesn't exist. This could hide bugs in compensation logic that depend on accurate return values.

These are acceptable for current tests but may cause issues if test coverage expands.

♻️ Optional: More realistic fake implementation
+    _alloc_counter: int = 0
+
     async def create_allocation(self, create_data: DomainResourceAllocationCreate) -> DomainResourceAllocation:
+        self._alloc_counter += 1
         alloc = DomainResourceAllocation(
-            allocation_id="alloc-1",
+            allocation_id=f"alloc-{self._alloc_counter}",
             **create_data.model_dump(),
         )
         self.allocations.append(alloc)
         return alloc

     async def release_allocation(self, allocation_id: str) -> bool:
-        return True
+        return any(a.allocation_id == allocation_id for a in self.allocations)

92-96: Test assertion could be more precise.

The assertion len(fake_repo.saved) >= 1 is quite permissive. Since the comment indicates the saga "runs to completion," consider:

  1. Asserting a specific expected count (e.g., == 2 for initial creation + completion update) to catch unexpected duplicate saves.
  2. Validating the final saga state (e.g., SagaState.COMPLETED) to confirm the saga actually completed successfully rather than just started.

The current test verifies the saga was triggered but doesn't confirm it reached the expected terminal state.

♻️ Suggested: Add state validation
     # The saga is created and fully executed (steps run to completion)
     assert len(fake_repo.saved) >= 1
     first_saved = fake_repo.saved[0]
     assert first_saved.execution_id == "e"
     assert first_saved.saga_name == ExecutionSaga.get_name()
+    # Verify final state
+    last_saved = fake_repo.saved[-1]
+    assert last_saved.state == SagaState.COMPLETED
backend/app/core/providers.py (2)

125-143: Consider handling update_config failure more gracefully.

If update_config fails within the try block at line 139, the error is logged but the service is still returned without any default configuration. This could lead to rate limiting being effectively disabled without clear indication.

Consider whether the service should fail initialization if rate limit config cannot be established, or ensure a fallback is applied.


747-762: Avoid accessing private attribute _last_resource_version directly.

Line 753 directly manipulates monitor._last_resource_version, which breaks encapsulation. If the internal implementation of PodMonitor changes, this code will silently break.

Consider adding a public method to PodMonitor for resetting the watch cursor:

♻️ Suggested approach

Add a method to PodMonitor:

def reset_watch_cursor(self) -> None:
    """Reset the resource version cursor for watch reconnection."""
    self._last_resource_version = None

Then update the provider:

                 if e.status == 410:
                     logger.warning("Resource version expired, resetting watch cursor")
-                    monitor._last_resource_version = None
+                    monitor.reset_watch_cursor()
                     kubernetes_metrics.record_pod_monitor_watch_error(ErrorType.RESOURCE_VERSION_EXPIRED)

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 11 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/handlers.py">

<violation number="1" location="backend/app/events/handlers.py:358">
P3: DLQ “message received”/age metrics are now recorded only after `handle_message` succeeds. If `handle_message` raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a `finally` block.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

):
await manager.handle_message(dlq_msg)

manager.metrics.record_dlq_message_received(dlq_msg.original_topic, dlq_msg.event.event_type)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: DLQ “message received”/age metrics are now recorded only after handle_message succeeds. If handle_message raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a finally block.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/handlers.py, line 358:

<comment>DLQ “message received”/age metrics are now recorded only after `handle_message` succeeds. If `handle_message` raises (e.g., repository save failure), these counters are skipped, undercounting received DLQ messages. Record these metrics before handling or in a `finally` block.</comment>

<file context>
@@ -360,6 +355,10 @@ async def on_dlq_message(
         ):
             await manager.handle_message(dlq_msg)
 
+        manager.metrics.record_dlq_message_received(dlq_msg.original_topic, dlq_msg.event.event_type)
+        manager.metrics.record_dlq_message_age(
+            (datetime.now(timezone.utc) - dlq_msg.failed_at).total_seconds()
</file context>
Fix with Cubic

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@backend/app/events/handlers.py`:
- Around line 75-129: Header values arrive as bytes so the current lambda
filters comparing msg.headers.get("event_type", "") directly against EventType
strings always fail; add a small helper like _normalize_event_type(msg) that
reads raw.headers or msg.headers, decodes the event_type bytes to str (safe to
return "" if missing), and then update all event_type filters (e.g., in
register_coordinator_subscriber's
on_execution_requested/on_execution_completed/on_execution_failed/on_execution_cancelled
and the other register_*_subscriber functions) to use lambda msg:
_normalize_event_type(msg) == EventType.EXECUTION_REQUESTED (or the appropriate
EventType) so comparisons succeed; ensure the helper handles None and empty
headers consistently.

In `@backend/workers/run_pod_monitor.py`:
- Around line 42-56: The current pattern calls asyncio.run(app.run()) which
passes a coroutine directly instead of awaiting FastStream.run(); change to
define an async wrapper (e.g., async def run_app(): await app.run()) and then
call asyncio.run(run_app()) so FastStream.run() is awaited properly; update this
in run_pod_monitor.py (identify symbols: FastStream, app, app.run, asyncio.run),
and apply the same change to the other worker files: run_saga_orchestrator.py,
run_k8s_worker.py, run_coordinator.py, dlq_processor.py, and
run_result_processor.py.

In `@backend/workers/run_saga_orchestrator.py`:
- Around line 35-42: The SchemaRegistryManager instance is never closed, leaking
AsyncSchemaRegistryClient's HTTP pool; add an async close method on
SchemaRegistryManager (e.g., async def close(self): await self._client.close())
and store the created instance in the module (schema_registry =
SchemaRegistryManager(...)); then update the application shutdown handler (the
`@app.on_shutdown` coroutine) to await schema_registry.close() before awaiting
container.close(). Ensure create_broker(...) still receives the schema_registry
instance.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@backend/app/core/dishka_lifespan.py`:
- Around line 68-81: The code currently awaits broker.start() outside the try
block so if broker.start() raises the DI-initialized resources (e.g.,
NotificationScheduler created by container.get(NotificationScheduler)) are never
cleaned up; wrap the broker start in a try/except/finally or move it inside the
existing try so any exception triggers cleanup: call container/DI shutdown or
stop the NotificationScheduler and ensure broker.stop() is invoked if
broker.start() succeeded partially, log the error with context, and re-raise the
exception so the app fails fast; refer to NotificationScheduler,
container.get(NotificationScheduler), broker.start(), and broker.stop() when
making the changes.
🧹 Nitpick comments (4)
backend/app/core/providers.py (3)

236-272: The database parameter enforces DI ordering but could benefit from a clarifying comment.

The database: Database parameter (line 245) ensures Beanie initialization completes before the DLQManager starts, but it's not used in the method body. While this pattern appears intentionally in other providers (e.g., get_notification_scheduler at line 558), adding a brief comment would clarify its purpose for future maintainers.

📝 Suggested documentation
     async def get_dlq_manager(
             self,
             broker: KafkaBroker,
             settings: Settings,
             schema_registry: SchemaRegistryManager,
             logger: logging.Logger,
             dlq_metrics: DLQMetrics,
             repository: DLQRepository,
-            database: Database,
+            database: Database,  # noqa: ARG002 - ensures Beanie init completes before DLQ starts
     ) -> AsyncIterator[DLQManager]:

747-762: Accessing private attribute _last_resource_version breaks encapsulation.

The _watch_cycle function directly accesses monitor._last_resource_version (line 753) to reset the watch cursor. This couples the provider to internal implementation details of PodMonitor.

Consider adding a public method to PodMonitor for resetting the watch cursor:

♻️ Suggested approach

Add a public method to PodMonitor:

# In PodMonitor class
def reset_watch_cursor(self) -> None:
    """Reset watch cursor after resource version expiration."""
    self._last_resource_version = None

Then update the provider:

-                    monitor._last_resource_version = None
+                    monitor.reset_watch_cursor()

918-927: Consider making cleanup retention period configurable.

The older_than_hours: 48 is hardcoded. For operational flexibility, consider moving this to Settings:

📝 Suggested change
         scheduler.add_job(
             service.cleanup_old_sessions,
             trigger="interval",
             hours=6,
-            kwargs={"older_than_hours": 48},
+            kwargs={"older_than_hours": settings.REPLAY_SESSION_RETENTION_HOURS},
             id="replay_cleanup_old_sessions",
             max_instances=1,
             misfire_grace_time=300,
         )

This would require adding REPLAY_SESSION_RETENTION_HOURS: int = 48 to your Settings class and injecting settings: Settings into the provider method.

backend/app/events/handlers.py (1)

316-364: Consider defensive header decoding, but recognize that malformed headers indicate an upstream Kafka/aiokafka issue.

According to aiokafka and faststream documentation, ConsumerRecord.headers is typed as Sequence[tuple[str, bytes]], meaning header values should always be bytes. The current code headers = {k: v.decode() for k, v in (raw.headers or [])} is technically correct under that contract. However, adding defensive type checks is reasonable robustness engineering, especially in a DLQ handler. Consider the proposed fix if you want to tolerate malformed headers gracefully, but note this addresses edge cases outside the normal Kafka specification, not a guaranteed failure mode.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 4, 2026

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/notification_service.py (1)

155-160: ⚠️ Potential issue | 🟡 Minor

Inconsistent TESTING bypass for throttle checks.

The TESTING mode bypass is added here but not in _create_system_for_user (lines 267-275), which has its own throttle check. In TESTING mode, direct calls to create_notification will bypass throttling, but calls through create_system_notification will still be throttled at the _create_system_for_user level before reaching this code.

🔧 Proposed fix for consistent TESTING bypass
     async def _create_system_for_user(
         self,
         user_id: str,
         cfg: SystemConfig,
         title: str,
         base_context: NotificationContext,
         tags: list[str],
     ) -> str:
         try:
-            if not cfg.throttle_exempt:
+            if not cfg.throttle_exempt and not self.settings.TESTING:
                 throttled = await self._throttle_cache.check_throttle(
                     user_id,
                     cfg.severity,
                     window_hours=self.settings.NOTIF_THROTTLE_WINDOW_HOURS,
                     max_per_hour=self.settings.NOTIF_THROTTLE_MAX_PER_HOUR,
                 )
                 if throttled:
                     return "throttled"

@HardMax71 HardMax71 merged commit a9f9ec4 into main Feb 4, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/start-stop branch February 4, 2026 00:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants