Skip to content

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 2, 2026


Summary by cubic

Moved the result processor lifecycle into DI and made the processor stateless. The DI container now owns the Kafka consumer and idempotency dispatcher; the processor only exposes event handlers.

  • Refactors

    • Added ResultProcessorProvider to create the idempotent dispatcher and start/stop the consumer.
    • ResultProcessor no longer extends LifecycleEnabled and no longer manages a consumer or state.
    • Removed ResultProcessorConfig and status APIs; processor now exposes:
      • handle_execution_completed, handle_execution_failed, handle_execution_timeout (with strict type checks).
    • Simplified worker (run_result_processor) to rely on DI startup and graceful shutdown; initializes schemas and DB via DI.
    • Hardened consumer error handling: log unexpected exceptions in the loop and propagate processing errors to error callbacks.
    • Revised unit and e2e tests to register handlers on a dispatcher and manage a test consumer explicitly.
  • Migration

    • Do not instantiate or context-manage ResultProcessor; get it from the DI container.
    • Do not use ResultProcessorConfig or get_status; register handlers with a dispatcher if testing without DI.

Written for commit 509e50c. Summary will update on new commits.

Summary by CodeRabbit

  • New Features

    • Added idempotent result processing pipeline for execution events.
  • Improvements

    • Stricter resource-usage validation and clearer memory/utilization metrics.
    • Revised result handling with explicit completion/failure/timeout flows.
    • DI-driven startup for more reliable service initialization.
  • Bug Fixes

    • Consumer loop now catches and logs processing errors to avoid unexpected shutdowns.
  • Tests

    • Updated end-to-end and unit tests to cover the new processor behavior and idempotency.

@coderabbitai
Copy link

coderabbitai bot commented Feb 2, 2026

📝 Walkthrough

Walkthrough

Result processing was reworked: a new ResultProcessorProvider wires a ResultProcessor with an idempotent dispatcher and consumer via DI; ResultProcessor was simplified to explicit public handlers (completed/failed/timeout); tests and the worker entrypoint were updated to use the DI-driven startup and new consumer wiring.

Changes

Cohort / File(s) Summary
DI container & provider
backend/app/core/container.py, backend/app/core/providers.py
Added ResultProcessorProvider and included it in container creation; provider wires a ResultProcessor with an IdempotentEventDispatcher, registers handlers for EXECUTION_COMPLETED/FAILED/TIMEOUT, starts/stops a UnifiedConsumer.
ResultProcessor implementation
backend/app/services/result_processor/processor.py, backend/app/services/result_processor/__init__.py
Refactored ResultProcessor from lifecycle-enabled orchestrator to a plain class with public methods handle_execution_completed/failed/timeout; constructor parameters reduced; removed internal dispatcher/consumer lifecycle and get_status; ResultProcessorConfig de-exported.
Tests
backend/tests/unit/services/result_processor/test_processor.py, backend/tests/e2e/result_processor/test_result_processor.py
Unit tests rewritten for new processor API and type-checked event handlers; e2e test updated to create a dedicated proc_consumer/proc_dispatcher and to use UnifiedConsumer(..., event_dispatcher=...) keyword API.
Worker / runtime
backend/workers/run_result_processor.py
Replaced manual construction and lifecycle management with DI-based initialization (retrieve Database and start via container); removed manual Beanie/processor lifecycle handling; simplified shutdown to container.close().
Consumer loop robustness
backend/app/events/core/consumer.py
Added a generic exception catch in the consumer loop to prevent termination on handler errors and re-raises dispatcher errors after logging/metrics handling.

Sequence Diagram(s)

sequenceDiagram
  participant Kafka as "Kafka (EXECUTION_EVENTS)"
  participant ProcConsumer as "UnifiedConsumer (proc_consumer)"
  participant Dispatcher as "IdempotentEventDispatcher"
  participant Processor as "ResultProcessor"
  participant Repo as "ExecutionRepository"
  participant Producer as "UnifiedProducer"

  Kafka->>ProcConsumer: deliver message (ExecutionCompleted/Failed/Timeout)
  ProcConsumer->>Dispatcher: dispatch event (idempotent)
  Dispatcher->>Processor: invoke handler (completed/failed/timeout)
  Processor->>Repo: fetch/update execution, persist result
  alt publish stored result
    Processor->>Producer: publish ResultStoredEvent or ResultFailedEvent
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Poem

🐰
I hopped through code on nimble feet,
Wired handlers tidy, idempotent, neat.
DI gave me structure, tests in tow,
Now events flow steady, watch them go!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 39.13% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately describes the main refactoring: moving lifetime/lifecycle management to DI and making ResultProcessor stateless.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/result-processor

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 941-947: Change the Kafka consumer to disable auto-commit in the
ConsumerConfig (set enable_auto_commit=False where ConsumerConfig is
instantiated) and update EventDispatcher._execute_handler so it does not swallow
exceptions: let handler exceptions propagate (re-raise) instead of
returning/suppressing them so UnifiedConsumer._process_message can detect
failures and avoid committing offsets (the commit calls currently at the end of
UnifiedConsumer._process_message should only run when no exception was raised).
Ensure exceptions like TypeError/ExecutionNotFoundError/ValueError are not
caught-and-swallowed in EventDispatcher._execute_handler so DLQ/retry logic in
UnifiedConsumer can operate correctly.

In `@backend/app/services/result_processor/processor.py`:
- Around line 52-66: The current parsing of _settings.K8S_POD_MEMORY_LIMIT using
memory_limit_mib = int(settings_limit.rstrip("Mi")) is brittle and will
mis-handle values like "1Gi" or unit-less strings; update the code in the
ExecutionCompletedEvent handling (where event.resource_usage, runtime_seconds,
memory_mib and memory_limit_mib are used and where
_metrics.memory_utilization_percent.record is called) to explicitly parse units:
trim whitespace, detect suffixes "Mi" and "Gi" (convert Gi to Mi by multiplying
by 1024), accept plain numbers as Mi, handle unknown units by logging/warning
and using a safe default, and ensure a non-zero guard before computing
memory_percent to avoid division-by-zero before calling
_metrics.memory_utilization_percent.record with attributes={"lang_and_version":
lang_and_version}.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 7 files

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/services/result_processor/processor.py">

<violation number="1" location="backend/app/services/result_processor/processor.py:52">
P2: ExecutionCompletedEvent.resource_usage is optional, but the new ValueError aborts processing when it is missing. This can turn valid completion events into failures and skip result storage. Guard metrics on resource_usage instead of raising.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files (changes from recent commits).

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them.


<file name="backend/app/events/core/consumer.py">

<violation number="1" location="backend/app/events/core/consumer.py:127">
P2: Avoid swallowing all exceptions in the consume loop. This hides unexpected failures (e.g., commit errors) and makes the consumer silently continue without logs or metrics.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 2, 2026

@HardMax71 HardMax71 merged commit 6206126 into main Feb 2, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/result-processor branch February 2, 2026 18:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants