-
Notifications
You must be signed in to change notification settings - Fork 0
fix: moved lifetime handling to DI, now stateless result processor #130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughResult processing was reworked: a new ResultProcessorProvider wires a ResultProcessor with an idempotent dispatcher and consumer via DI; ResultProcessor was simplified to explicit public handlers (completed/failed/timeout); tests and the worker entrypoint were updated to use the DI-driven startup and new consumer wiring. Changes
Sequence Diagram(s)sequenceDiagram
participant Kafka as "Kafka (EXECUTION_EVENTS)"
participant ProcConsumer as "UnifiedConsumer (proc_consumer)"
participant Dispatcher as "IdempotentEventDispatcher"
participant Processor as "ResultProcessor"
participant Repo as "ExecutionRepository"
participant Producer as "UnifiedProducer"
Kafka->>ProcConsumer: deliver message (ExecutionCompleted/Failed/Timeout)
ProcConsumer->>Dispatcher: dispatch event (idempotent)
Dispatcher->>Processor: invoke handler (completed/failed/timeout)
Processor->>Repo: fetch/update execution, persist result
alt publish stored result
Processor->>Producer: publish ResultStoredEvent or ResultFailedEvent
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/core/providers.py`:
- Around line 941-947: Change the Kafka consumer to disable auto-commit in the
ConsumerConfig (set enable_auto_commit=False where ConsumerConfig is
instantiated) and update EventDispatcher._execute_handler so it does not swallow
exceptions: let handler exceptions propagate (re-raise) instead of
returning/suppressing them so UnifiedConsumer._process_message can detect
failures and avoid committing offsets (the commit calls currently at the end of
UnifiedConsumer._process_message should only run when no exception was raised).
Ensure exceptions like TypeError/ExecutionNotFoundError/ValueError are not
caught-and-swallowed in EventDispatcher._execute_handler so DLQ/retry logic in
UnifiedConsumer can operate correctly.
In `@backend/app/services/result_processor/processor.py`:
- Around line 52-66: The current parsing of _settings.K8S_POD_MEMORY_LIMIT using
memory_limit_mib = int(settings_limit.rstrip("Mi")) is brittle and will
mis-handle values like "1Gi" or unit-less strings; update the code in the
ExecutionCompletedEvent handling (where event.resource_usage, runtime_seconds,
memory_mib and memory_limit_mib are used and where
_metrics.memory_utilization_percent.record is called) to explicitly parse units:
trim whitespace, detect suffixes "Mi" and "Gi" (convert Gi to Mi by multiplying
by 1024), accept plain numbers as Mi, handle unknown units by logging/warning
and using a safe default, and ensure a non-zero guard before computing
memory_percent to avoid division-by-zero before calling
_metrics.memory_utilization_percent.record with attributes={"lang_and_version":
lang_and_version}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 7 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/result_processor/processor.py">
<violation number="1" location="backend/app/services/result_processor/processor.py:52">
P2: ExecutionCompletedEvent.resource_usage is optional, but the new ValueError aborts processing when it is missing. This can turn valid completion events into failures and skip result storage. Guard metrics on resource_usage instead of raising.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/events/core/consumer.py">
<violation number="1" location="backend/app/events/core/consumer.py:127">
P2: Avoid swallowing all exceptions in the consume loop. This hides unexpected failures (e.g., commit errors) and makes the consumer silently continue without logs or metrics.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|



Summary by cubic
Moved the result processor lifecycle into DI and made the processor stateless. The DI container now owns the Kafka consumer and idempotency dispatcher; the processor only exposes event handlers.
Refactors
Migration
Written for commit 509e50c. Summary will update on new commits.
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Tests