Skip to content

fix: producer to DI#132

Merged
HardMax71 merged 2 commits intomainfrom
fix/producer-lifecycle
Feb 3, 2026
Merged

fix: producer to DI#132
HardMax71 merged 2 commits intomainfrom
fix/producer-lifecycle

Conversation

@HardMax71
Copy link
Owner

@HardMax71 HardMax71 commented Feb 3, 2026


Summary by cubic

Moved Kafka producer lifecycle to DI and made the producer build tracing/correlation headers automatically. Callers now pass a required key for each publish, simplifying event publishing and ensuring consistent partitioning.

  • Refactors
    • UnifiedProducer no longer manages start/stop; DI starts/stops AIOKafkaProducer and injects ProducerMetrics.
    • produce now requires key; callers pass execution_id/aggregate_id/event_id where appropriate.
    • Header creation centralized in producer via inject_trace_context; removed header logic from KafkaEventService.
    • Removed ProducerState and lifecycle methods; updated tests to assert metrics instead of state.
    • Event replay runner no longer enters producer context; lifecycle handled by DI.

Written for commit 8ddef28. Summary will update on new commits.

Summary by CodeRabbit

  • Refactor

    • Producer lifecycle moved to dependency management for more reliable resource handling and metrics.
  • Behavior Changes

    • Message publishing now includes a deterministic key for better partitioning and correlation.
    • Dead-letter handling updated for more consistent failure delivery and metrics.
  • Tests

    • End-to-end producer test simplified to match new lifecycle and publish behavior.

@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

📝 Walkthrough

Walkthrough

Refactors Kafka producer lifecycle into the DI provider (explicitly instantiate/start/stop AIOKafkaProducer), simplifies UnifiedProducer by removing internal lifecycle/state and requiring a key for produce(), updates call sites to pass keys, and removes ProducerState and related exports.

Changes

Cohort / File(s) Summary
Provider / Lifecycle
backend/app/core/providers.py
Provider now manually instantiates, starts, yields, and stops AIOKafkaProducer; injects ProducerMetrics.
Producer implementation
backend/app/events/core/producer.py
Removed lifecycle ownership from UnifiedProducer; it now accepts an external AIOKafkaProducer, drops lifecycle/state APIs, requires key in produce(), builds tracing headers internally, and updates ProducerMetrics usage.
Public types / exports
backend/app/events/core/types.py, backend/app/events/core/__init__.py
Removed ProducerState enum and its export from the core events namespace.
Event publishing service
backend/app/services/kafka_event_service.py
Updated produces to use produce(event_to_produce=..., key=...); removed manual header construction/injection and removed close() method.
Call-site updates (keyed publishes)
backend/app/services/coordinator/coordinator.py, backend/app/services/execution_service.py, backend/app/services/k8s_worker/worker.py, backend/app/services/event_replay/replay_service.py
Multiple producer call sites now pass deterministic keys (execution_id, aggregate_id/event_id) to produce() for partitioning.
Tests & workers
backend/tests/e2e/events/test_producer_roundtrip.py, backend/workers/run_event_replay.py
Tests simplified to retrieve UnifiedProducer from DI and removed explicit context-managed lifecycle; worker removed local UnifiedProducer initialization.

Sequence Diagram(s)

sequenceDiagram
    participant Provider as DI Provider
    participant AIOKafka as AIOKafkaProducer
    participant UnifiedProducer as UnifiedProducer
    participant Service as Event Service
    participant Kafka as Kafka

    Note over Provider,AIOKafka: Provider-managed lifecycle
    Provider->>AIOKafka: instantiate AIOKafkaProducer()
    Provider->>AIOKafka: start()
    AIOKafka-->>Provider: ready
    Provider->>UnifiedProducer: construct with AIOKafkaProducer, deps
    Provider->>Service: inject UnifiedProducer

    Note over Service,UnifiedProducer: Publishing flow
    Service->>UnifiedProducer: produce(event, key=execution_id)
    UnifiedProducer->>UnifiedProducer: inject_trace_context(), build headers
    UnifiedProducer->>AIOKafka: send(topic, value, key, headers)
    AIOKafka->>Kafka: publish message
    Kafka-->>AIOKafka: ack / error
    AIOKafka-->>UnifiedProducer: result
    UnifiedProducer->>UnifiedProducer: update ProducerMetrics
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Possibly related PRs

Poem

🐰 I hopped into code with a curious squeak,
Provider tended the producer, no lifecycle antique,
Keys now guide messages down tidy lanes,
Metrics hum softly, no lingering chains,
A carrot of clarity — rejoice, hop, and tweak! 🥕✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.75% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'fix: producer to DI' accurately summarizes the main change: moving Kafka producer lifecycle into dependency injection, which is the core objective of this changeset.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/producer-lifecycle

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/k8s_worker/worker.py (1)

225-236: ⚠️ Potential issue | 🟡 Minor

Remove the unused _publish_execution_started method.

The method is defined but never called anywhere in the codebase, making it dead code that should be removed.

🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 73-79: The except block in producer.py can reference an undefined
topic if self._schema_registry.serialize_event(...) raises a KafkaError; to fix,
ensure topic is defined before the try that may raise (e.g. compute or assign
topic prior to calling _schema_registry.serialize_event) or initialize topic =
None before the try and update record_kafka_production_error and the error log
to handle a None topic (only include topic when defined) so
record_kafka_production_error(topic=...) and self.logger.error(...) never access
an unassigned variable; update the logic around
_schema_registry.serialize_event,
self._event_metrics.record_kafka_production_error, and the logger.error call
accordingly.
🧹 Nitpick comments (3)
backend/app/core/providers.py (1)

169-186: Producer lifecycle management looks good overall, but consider exception safety.

The producer configuration is solid with acks="all", enable_idempotence=True, and gzip compression. However, if UnifiedProducer construction fails after aiokafka_producer.start() succeeds, the producer won't be stopped (the finally block only runs after yielding).

♻️ Suggested safer pattern
     `@provide`
     async def get_kafka_producer(
             self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger,
             event_metrics: EventMetrics
     ) -> AsyncIterator[UnifiedProducer]:
         aiokafka_producer = AIOKafkaProducer(
             bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
             client_id=f"{settings.SERVICE_NAME}-producer",
             acks="all",
             compression_type="gzip",
             max_batch_size=16384,
             linger_ms=10,
             enable_idempotence=True,
         )
         await aiokafka_producer.start()
         logger.info(f"Kafka producer started: {settings.KAFKA_BOOTSTRAP_SERVERS}")
         try:
-            yield UnifiedProducer(
-                aiokafka_producer, schema_registry, logger, settings, event_metrics, ProducerMetrics(),
-            )
+            producer = UnifiedProducer(
+                aiokafka_producer, schema_registry, logger, settings, event_metrics, ProducerMetrics(),
+            )
+            yield producer
         finally:
             await aiokafka_producer.stop()
             logger.info("Kafka producer stopped")

This ensures that if UnifiedProducer.__init__ raises an exception, the producer is still stopped. In practice, UnifiedProducer.__init__ is simple assignment so this is low risk, but the pattern is safer for future modifications.

backend/app/events/core/producer.py (2)

140-145: Metrics tracking is inconsistent with produce() method.

When DLQ send fails, only messages_failed is incremented. In contrast, the produce() method also sets last_error and last_error_time on failure. Consider aligning the error tracking for better observability.

♻️ Proposed fix for consistent error tracking
         except Exception as e:
             # If we can't send to DLQ, log critically but don't crash
             self.logger.critical(
                 f"Failed to send event {original_event.event_id} to DLQ: {e}. Original error: {error}", exc_info=True
             )
             self.metrics.messages_failed += 1
+            self.metrics.last_error = f"DLQ send failed: {e}"
+            self.metrics.last_error_time = datetime.now(timezone.utc)

130-133: Consider tracking bytes_sent for DLQ messages.

For consistent metrics tracking, consider incrementing bytes_sent here as done in the produce() method. Alternatively, if DLQ traffic should be tracked separately, consider adding dedicated DLQ metrics.

♻️ Proposed fix to track bytes
             # Record metrics
             self._event_metrics.record_kafka_message_produced(dlq_topic)
             self.metrics.messages_sent += 1
+            self.metrics.bytes_sent += len(serialized_value)

Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 11 files

@sonarqubecloud
Copy link

sonarqubecloud bot commented Feb 3, 2026

@HardMax71 HardMax71 merged commit bcf6779 into main Feb 3, 2026
15 checks passed
@HardMax71 HardMax71 deleted the fix/producer-lifecycle branch February 3, 2026 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant