Conversation
📝 WalkthroughWalkthroughRefactors Kafka producer lifecycle into the DI provider (explicitly instantiate/start/stop AIOKafkaProducer), simplifies UnifiedProducer by removing internal lifecycle/state and requiring a key for produce(), updates call sites to pass keys, and removes ProducerState and related exports. Changes
Sequence Diagram(s)sequenceDiagram
participant Provider as DI Provider
participant AIOKafka as AIOKafkaProducer
participant UnifiedProducer as UnifiedProducer
participant Service as Event Service
participant Kafka as Kafka
Note over Provider,AIOKafka: Provider-managed lifecycle
Provider->>AIOKafka: instantiate AIOKafkaProducer()
Provider->>AIOKafka: start()
AIOKafka-->>Provider: ready
Provider->>UnifiedProducer: construct with AIOKafkaProducer, deps
Provider->>Service: inject UnifiedProducer
Note over Service,UnifiedProducer: Publishing flow
Service->>UnifiedProducer: produce(event, key=execution_id)
UnifiedProducer->>UnifiedProducer: inject_trace_context(), build headers
UnifiedProducer->>AIOKafka: send(topic, value, key, headers)
AIOKafka->>Kafka: publish message
Kafka-->>AIOKafka: ack / error
AIOKafka-->>UnifiedProducer: result
UnifiedProducer->>UnifiedProducer: update ProducerMetrics
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/k8s_worker/worker.py (1)
225-236:⚠️ Potential issue | 🟡 MinorRemove the unused
_publish_execution_startedmethod.The method is defined but never called anywhere in the codebase, making it dead code that should be removed.
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 73-79: The except block in producer.py can reference an undefined
topic if self._schema_registry.serialize_event(...) raises a KafkaError; to fix,
ensure topic is defined before the try that may raise (e.g. compute or assign
topic prior to calling _schema_registry.serialize_event) or initialize topic =
None before the try and update record_kafka_production_error and the error log
to handle a None topic (only include topic when defined) so
record_kafka_production_error(topic=...) and self.logger.error(...) never access
an unassigned variable; update the logic around
_schema_registry.serialize_event,
self._event_metrics.record_kafka_production_error, and the logger.error call
accordingly.
🧹 Nitpick comments (3)
backend/app/core/providers.py (1)
169-186: Producer lifecycle management looks good overall, but consider exception safety.The producer configuration is solid with
acks="all",enable_idempotence=True, and gzip compression. However, ifUnifiedProducerconstruction fails afteraiokafka_producer.start()succeeds, the producer won't be stopped (thefinallyblock only runs after yielding).♻️ Suggested safer pattern
`@provide` async def get_kafka_producer( self, settings: Settings, schema_registry: SchemaRegistryManager, logger: logging.Logger, event_metrics: EventMetrics ) -> AsyncIterator[UnifiedProducer]: aiokafka_producer = AIOKafkaProducer( bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS, client_id=f"{settings.SERVICE_NAME}-producer", acks="all", compression_type="gzip", max_batch_size=16384, linger_ms=10, enable_idempotence=True, ) await aiokafka_producer.start() logger.info(f"Kafka producer started: {settings.KAFKA_BOOTSTRAP_SERVERS}") try: - yield UnifiedProducer( - aiokafka_producer, schema_registry, logger, settings, event_metrics, ProducerMetrics(), - ) + producer = UnifiedProducer( + aiokafka_producer, schema_registry, logger, settings, event_metrics, ProducerMetrics(), + ) + yield producer finally: await aiokafka_producer.stop() logger.info("Kafka producer stopped")This ensures that if
UnifiedProducer.__init__raises an exception, the producer is still stopped. In practice,UnifiedProducer.__init__is simple assignment so this is low risk, but the pattern is safer for future modifications.backend/app/events/core/producer.py (2)
140-145: Metrics tracking is inconsistent withproduce()method.When DLQ send fails, only
messages_failedis incremented. In contrast, theproduce()method also setslast_errorandlast_error_timeon failure. Consider aligning the error tracking for better observability.♻️ Proposed fix for consistent error tracking
except Exception as e: # If we can't send to DLQ, log critically but don't crash self.logger.critical( f"Failed to send event {original_event.event_id} to DLQ: {e}. Original error: {error}", exc_info=True ) self.metrics.messages_failed += 1 + self.metrics.last_error = f"DLQ send failed: {e}" + self.metrics.last_error_time = datetime.now(timezone.utc)
130-133: Consider trackingbytes_sentfor DLQ messages.For consistent metrics tracking, consider incrementing
bytes_senthere as done in theproduce()method. Alternatively, if DLQ traffic should be tracked separately, consider adding dedicated DLQ metrics.♻️ Proposed fix to track bytes
# Record metrics self._event_metrics.record_kafka_message_produced(dlq_topic) self.metrics.messages_sent += 1 + self.metrics.bytes_sent += len(serialized_value)
|



Summary by cubic
Moved Kafka producer lifecycle to DI and made the producer build tracing/correlation headers automatically. Callers now pass a required key for each publish, simplifying event publishing and ensuring consistent partitioning.
Written for commit 8ddef28. Summary will update on new commits.
Summary by CodeRabbit
Refactor
Behavior Changes
Tests