fix: removed most of schema encode/decode overhead#135
fix: removed most of schema encode/decode overhead#135
Conversation
📝 WalkthroughWalkthroughCentralizes event deserialization on DomainEventAdapter, removes the EventStore and its APIs, shifts producer persistence into EventRepository (producer persists then serializes/publishes), simplifies SchemaRegistry to serializer-driven serialization, and updates DI/providers and call sites to use EventRepository instead of EventStore. Changes
Sequence Diagram(s)sequenceDiagram
participant Service as KafkaEventService
participant Producer as UnifiedProducer
participant Repo as EventRepository
participant SR as SchemaRegistry/Serializer
participant Kafka as KafkaBroker
participant Adapter as DomainEventAdapter
Service->>Adapter: validate_python(payload) → DomainEvent
Service->>Producer: produce(DomainEvent)
Producer->>Repo: store_event(DomainEvent)
Producer->>SR: serialize_event(DomainEvent) → bytes
Producer->>Kafka: publish(bytes, headers)
Kafka->>SR: incoming raw message
SR-->>Adapter: serializer.decode_message(raw) → payload
Adapter-->>Service: validate_python(payload) → DomainEvent
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/events/schema/schema_registry.py`:
- Around line 31-32: The payload is being produced with
event.model_dump(mode="python") which leaves datetimes/enums as Python objects
causing Avro encoding to fail; change the call in schema_registry.py that
assigns payload (the event.model_dump invocation used before calling
self.serializer.encode_record_with_schema) to use mode="json" (i.e.,
event.model_dump(mode="json", by_alias=False, exclude_unset=False)) so datetimes
become ISO strings and enums are serialized correctly before passing to
encode_record_with_schema.
In `@backend/app/services/event_replay/replay_service.py`:
- Around line 228-231: Remove the redundant if-check around
DomainEventAdapter.validate_python(doc) and wrap the call in a try/except to
catch ValidationError so a single malformed document doesn't abort the replay;
specifically, replace the current pattern that uses
DomainEventAdapter.validate_python followed by "if event:" with a try block that
calls DomainEventAdapter.validate_python(doc), appends the returned event to
batch and increments events_processed, and an except ValidationError that logs a
warning via self.logger.warning (include the error string in extra or message)
and increments session.failed_events to track skipped documents.
🧹 Nitpick comments (3)
backend/app/events/broker.py (1)
19-21: Consider adding error handling for decode failures.The
avro_decoderdoes not handle exceptions fromdecode_message(schema resolution failures, malformed payloads) orvalidate_python(unknown event types). If FastStream's broker doesn't catch these for DLQ routing, unhandled exceptions could crash consumers.If exception propagation is intentional for upstream DLQ handling, this is fine as-is.
💡 Optional: Add error handling
async def avro_decoder(msg: StreamMessage[Any]) -> DomainEvent: - payload = await schema_registry.serializer.decode_message(msg.body) - return DomainEventAdapter.validate_python(payload) + try: + payload = await schema_registry.serializer.decode_message(msg.body) + return DomainEventAdapter.validate_python(payload) + except Exception as e: + logger.error("Failed to decode message", extra={"error": str(e)}) + raisebackend/app/db/repositories/event_repository.py (1)
400-401: Missingfrom_attributes=Trueparameter for aggregation results.Line 401 validates events from
doc["events"](aggregation pipeline results) withoutfrom_attributes=True, while all other usages in this file include it. The$$ROOTin the pipeline pushes raw BSON documents which Beanie converts to dicts, sofrom_attributes=Truemay not be strictly required here. However, for consistency and to avoid potential issues if the pipeline behavior changes, consider adding the parameter.♻️ Optional fix for consistency
- events = [DomainEventAdapter.validate_python(e) for e in doc["events"]] + events = [DomainEventAdapter.validate_python(e, from_attributes=True) for e in doc["events"]]backend/app/events/event_store.py (1)
115-115: Consider consistency with repository pattern.The
EventStoreusesdoc.model_dump()before validation, whileEventRepositoryusesfrom_attributes=True. Both approaches work correctly:
doc.model_dump()converts to dict explicitlyfrom_attributes=Truereads attributes directly from the objectThe current approach is fine, but if you want consistency across the codebase, you could use
from_attributes=Truehere as well. This is purely a style consideration.♻️ Optional: Use from_attributes=True for consistency
- event = DomainEventAdapter.validate_python(doc.model_dump()) + event = DomainEventAdapter.validate_python(doc, from_attributes=True)Apply similar change to lines 141, 158, 180, 201, 214, 238.
There was a problem hiding this comment.
1 issue found across 15 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/event_replay/replay_service.py">
<violation number="1" location="backend/app/services/event_replay/replay_service.py:228">
P1: `DomainEventAdapter.validate_python` raises a `ValidationError` when validation fails, rather than returning `None`. This exception will cause the entire replay session to fail immediately in `_run_replay`, bypassing the `session.config.skip_errors` logic that handles errors in `_process_batch`.
If the database contains any events that don't match the current schema, the replay will crash.
Wrap this in a `try...except` block to handle validation errors (respecting `skip_errors` or logging warnings), and remove the redundant `if event:` check.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
2 issues found across 16 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/events/schema/schema_registry.py">
<violation number="1" location="backend/app/events/schema/schema_registry.py:21">
P1: Schema registry authentication has been removed. If the registry requires auth credentials, this change will cause connection failures. Verify that authentication is no longer needed, or restore the `auth` parameter.</violation>
<violation number="2" location="backend/app/events/schema/schema_registry.py:28">
P1: The serialization logic changes may break Avro encoding. The previous code converted `timestamp` to microseconds (Avro long format) and removed `event_type` from the payload. The new `event.model_dump()` call doesn't perform these transformations, which could cause schema validation failures if the Avro schema expects timestamps as longs or doesn't include `event_type`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| self.logger.info(f"Initialized {len(_get_all_event_classes())} event schemas") | ||
| avro = schema.AvroSchema(event.avro_schema(namespace=self.namespace)) | ||
| subject = f"{self.subject_prefix}{avro.name}-value" | ||
| return await self.serializer.encode_record_with_schema(subject, avro, event.model_dump()) |
There was a problem hiding this comment.
P1: The serialization logic changes may break Avro encoding. The previous code converted timestamp to microseconds (Avro long format) and removed event_type from the payload. The new event.model_dump() call doesn't perform these transformations, which could cause schema validation failures if the Avro schema expects timestamps as longs or doesn't include event_type.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/schema/schema_registry.py, line 28:
<comment>The serialization logic changes may break Avro encoding. The previous code converted `timestamp` to microseconds (Avro long format) and removed `event_type` from the payload. The new `event.model_dump()` call doesn't perform these transformations, which could cause schema validation failures if the Avro schema expects timestamps as longs or doesn't include `event_type`.</comment>
<file context>
@@ -1,119 +1,28 @@
- self.logger.info(f"Initialized {len(_get_all_event_classes())} event schemas")
+ avro = schema.AvroSchema(event.avro_schema(namespace=self.namespace))
+ subject = f"{self.subject_prefix}{avro.name}-value"
+ return await self.serializer.encode_record_with_schema(subject, avro, event.model_dump())
</file context>
| self._schema_id_cache[cls] = schema_id | ||
| return cls | ||
| return None | ||
| self._client = AsyncSchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL) |
There was a problem hiding this comment.
P1: Schema registry authentication has been removed. If the registry requires auth credentials, this change will cause connection failures. Verify that authentication is no longer needed, or restore the auth parameter.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/events/schema/schema_registry.py, line 21:
<comment>Schema registry authentication has been removed. If the registry requires auth credentials, this change will cause connection failures. Verify that authentication is no longer needed, or restore the `auth` parameter.</comment>
<file context>
@@ -1,119 +1,28 @@
- self._schema_id_cache[cls] = schema_id
- return cls
- return None
+ self._client = AsyncSchemaRegistryClient(url=settings.SCHEMA_REGISTRY_URL)
+ self.serializer = AsyncAvroMessageSerializer(self._client)
</file context>
There was a problem hiding this comment.
2 issues found across 12 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/execution_service.py">
<violation number="1" location="backend/app/services/execution_service.py:306">
P1: The `event_types` parameter is accepted by the method but ignored in the `event_repository.get_execution_events` call. This creates a functional regression where filtering by event type is lost, and the method will return all events for the execution up to the limit, potentially including unwanted event types or exposing internal system events if the caller relied on filtering.
Since `EventRepository.get_execution_events` does not appear to support filtering by event type, either the repository needs to be updated or the filtering must be handled in-memory (though in-memory filtering combined with a database limit can cause pagination issues).</violation>
</file>
<file name="backend/app/events/core/producer.py">
<violation number="1" location="backend/app/events/core/producer.py:45">
P2: Persisting the event before serialization and topic lookup risks creating "orphaned" records in the database if those subsequent steps fail (e.g., due to schema validation errors or missing topic mappings). Consider performing validation and serialization before persistence to ensure only valid events are stored.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/execution_service.py (1)
289-307:⚠️ Potential issue | 🟠 Major
event_typesfilter is ignored.
event_typesis accepted but never applied, so callers can’t filter results anymore. Either push the filter intoEventRepository.get_execution_eventsor filter in‑memory (noting thelimitsemantics).🩹 Minimal in‑memory filter (short‑term)
- result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit) - events = result.events + result = await self.event_repository.get_execution_events(execution_id=execution_id, limit=limit) + events = result.events + if event_types: + events = [e for e in events if e.event_type in event_types]
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 43-46: The produce method currently stores the event via
_event_repository.store_event and then publishes to Kafka (using
EVENT_TYPE_TO_TOPIC to build topic), which can leave DB events orphaned if
publish fails; change the flow to implement an outbox/publish-status pattern:
when creating the event in produce (or via _event_repository.store_event)
persist it with a publish_status field set to "pending"/"queued", then attempt
publishing to Kafka and upon success update that stored event's publish_status
to "published" (or "failed" on non-transient errors) using the event id returned
by store_event; additionally add a retry/reconciler background worker that scans
events with publish_status "pending"/"failed" and retries publishing, or adopt a
transactional outbox write+publish flow if supported—modify produce,
_event_repository.store_event (or the returned event object), and the Kafka
publish path that uses EVENT_TYPE_TO_TOPIC/topic accordingly to support status
updates and retries.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/events/core/producer.py`:
- Around line 73-101: The DLQ status header currently uses
str(DLQMessageStatus.PENDING) which yields "DLQMessageStatus.PENDING" and is
hard to parse; change the header to use the enum's stable value (e.g.,
DLQMessageStatus.PENDING.value) instead of str(...) in the headers dict built in
the failed-event sender (the block that builds headers for
inject_trace_context), ensuring the status is a plain string value the consumer
(handlers.py) can parse consistently.
In `@backend/app/events/handlers.py`:
- Around line 293-341: The DLQ header parsing in on_dlq_message is unsafe: wrap
parsing of headers["failed_at"] and DLQMessageStatus(...) in guarded logic so a
missing or malformed header doesn't crash the consumer; use
headers.get("failed_at") and if present attempt datetime.fromisoformat inside a
try/except (fallback to datetime.utcnow() or None) and similarly attempt to
construct DLQMessageStatus from headers.get("status") inside a try/except
(fallback to DLQMessageStatus.PENDING or a safe default), and emit a warning via
the provided logger when a parse fails; then build the DLQMessage with these
safe values before calling manager.handle_message so a single bad message won’t
stall on_dlq_message.
There was a problem hiding this comment.
1 issue found across 3 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/dlq/manager.py">
<violation number="1" location="backend/app/dlq/manager.py:163">
P1: Switching to `schema_registry.serialize_event` introduces potential exceptions (schema validation, network errors) that were less likely with `json.dumps`. The caller `process_due_retries` (visible in file context) does not handle exceptions within its loop. A single message failing serialization will crash the retry cycle, preventing other messages from being processed (Poison Pill).
Consider adding error handling in `process_due_retries` to catch, log, and skip invalid messages.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/db/repositories/event_repository.py (1)
64-76:⚠️ Potential issue | 🟠 MajorBatch insert should preserve idempotency on duplicates.
insert_many()will fail the entire batch on a duplicate key event, breaking the idempotent behavior you have instore_event(). Sinceevent_idis unique, this causes operational issues when the same events are processed twice. Resolve by using unordered inserts with duplicate-key error handling to match the single-insert behavior:Suggested fix
-from pymongo.errors import DuplicateKeyError +from pymongo.errors import BulkWriteError, DuplicateKeyError ... - await EventDocument.insert_many(docs) + try: + await EventDocument.insert_many(docs, ordered=False) + except BulkWriteError as e: + dup_errors = [err for err in e.details.get("writeErrors", []) if err.get("code") == 11000] + if dup_errors: + self.logger.debug("Some events already stored, skipping duplicates") + else: + raise
There was a problem hiding this comment.
3 issues found across 3 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/app/services/k8s_worker/worker.py">
<violation number="1" location="backend/app/services/k8s_worker/worker.py:299">
P1: The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.
The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.
**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</violation>
<violation number="2" location="backend/app/services/k8s_worker/worker.py:313">
P1: The readiness check is prone to a race condition. It may mistakenly consider the DaemonSet "ready" based on the status of the *previous* generation before the controller has processed the update.
**Issues**:
1. **Race Condition**: `observed_generation` is not checked against `metadata.generation`.
2. **Unhandled Events**: The loop processes all event types (including `ERROR`, `DELETED`), which can cause `AttributeError` crashes when accessing `ds.status`.
3. **Infinite Blocking**: There is no timeout. If the cluster cannot fulfill the request (e.g., resource quotas, image pull errors), the application will hang indefinitely during startup.
**Fix**: Verify `observed_generation`, handle event types, and add a timeout or strict error handling.</violation>
</file>
<file name=".github/actions/e2e-ready/action.yml">
<violation number="1" location=".github/actions/e2e-ready/action.yml:76">
P2: Add an explicit timeout to the rollout status check so the workflow doesn’t hang indefinitely if the daemonset is stuck.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| await self.apps_v1.replace_namespaced_daemon_set( | ||
| name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type] | ||
| try: | ||
| await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace) |
There was a problem hiding this comment.
P1: The replace_namespaced_daemon_set call uses PUT semantics and requires metadata.resourceVersion to be set in the body for optimistic locking. Without it, the API server will reject the request with a 409 Conflict.
The code reads the DaemonSet but discards the result, failing to capture the resourceVersion.
Fix: Capture the existing DaemonSet and inject its resourceVersion into the manifest, or use patch_namespaced_daemon_set for easier idempotency.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/k8s_worker/worker.py, line 299:
<comment>The `replace_namespaced_daemon_set` call uses PUT semantics and requires `metadata.resourceVersion` to be set in the body for optimistic locking. Without it, the API server will reject the request with a `409 Conflict`.
The code reads the DaemonSet but discards the result, failing to capture the `resourceVersion`.
**Fix**: Capture the existing DaemonSet and inject its `resourceVersion` into the manifest, or use `patch_namespaced_daemon_set` for easier idempotency.</comment>
<file context>
@@ -251,63 +252,75 @@ async def wait_for_active_creations(self, timeout: float = 30.0) -> None:
- await self.apps_v1.replace_namespaced_daemon_set(
- name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type]
+ try:
+ await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
+ self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.")
+ await self.apps_v1.replace_namespaced_daemon_set(
</file context>
.github/actions/e2e-ready/action.yml
Outdated
|
|
||
| - name: Wait for runtime images | ||
| shell: bash | ||
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller |
There was a problem hiding this comment.
P2: Add an explicit timeout to the rollout status check so the workflow doesn’t hang indefinitely if the daemonset is stuck.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At .github/actions/e2e-ready/action.yml, line 76:
<comment>Add an explicit timeout to the rollout status check so the workflow doesn’t hang indefinitely if the daemonset is stuck.</comment>
<file context>
@@ -71,6 +71,10 @@ runs:
+ - name: Wait for runtime images
+ shell: bash
+ run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller
+
- name: Seed test users
</file context>
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller | |
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller --timeout=120s |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 74-76: The "Wait for runtime images" step uses `kubectl -n
integr8scode rollout status daemonset/runtime-image-pre-puller` which can hang
indefinitely; add an explicit rollout timeout flag (e.g., `--timeout=5m`) to
bound the wait. Update the run line for the "Wait for runtime images" step to
call `kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller
--timeout=5m` so the CI job fails fast instead of hanging if the DaemonSet never
becomes ready.
In `@backend/app/services/k8s_worker/worker.py`:
- Around line 313-325: The watch loop using k8s_watch.Watch() (variable w) can
block forever and may leak if an exception/cancellation occurs; wrap the stream
iteration over self.apps_v1.list_namespaced_daemon_set (filtered by
daemonset_name) with a client-side timeout (e.g., asyncio.wait_for) or an
overall timeout mechanism and ensure w.close() is always called in a finally
block; inside the try block iterate the async for events and check desired/ready
as before, and in except/asyncio.CancelledError re-raise after cleanup and log
via self.logger to record timeout/errors.
- Around line 298-304: When updating an existing DaemonSet in the try/except
block, extract the existing object's metadata.resourceVersion from the result of
self.apps_v1.read_namespaced_daemon_set(...) and set that value into the
manifest's metadata.resourceVersion before calling
replace_namespaced_daemon_set; specifically, after reading the DaemonSet, pull
existing.metadata.resource_version (or existing.metadata.resourceVersion
depending on the client object) and assign it to
manifest['metadata']['resourceVersion'] (or manifest.metadata.resourceVersion)
so the replace_namespaced_daemon_set call includes the required resourceVersion.
.github/actions/e2e-ready/action.yml
Outdated
| - name: Wait for runtime images | ||
| shell: bash | ||
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
kubectl rollout status timeout flag documentation
💡 Result:
kubectl rollout status supports --timeout (type: duration) to limit how long it watches the rollout before exiting:
- Flag:
--timeout duration - Meaning: “The length of time to wait before ending watch; 0 means never.”
- Format: include a time unit, e.g.
1s,2m,3h. [1] - Example:
kubectl rollout status deployment/nginx --timeout=2m[1]
Authoritative docs: Kubernetes kubectl rollout status reference page. [1]
Sources
[1] Kubernetes docs — “kubectl rollout status” https://kubernetes.io/docs/reference/kubectl/generated/kubectl_rollout/kubectl_rollout_status/
🏁 Script executed:
# Check for other kubectl rollout status calls in the repository
rg "kubectl.*rollout\s+status" --type yaml --type shRepository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check the action file context for related rollout or wait steps
cat -n .github/actions/e2e-ready/action.ymlRepository: HardMax71/Integr8sCode
Length of output: 3478
Add a rollout timeout to prevent CI hangs.
kubectl rollout status waits indefinitely by default (timeout=0). Without a bounded timeout, this step can hang the entire workflow if the DaemonSet fails to become ready. The workflow already uses explicit timeouts for other readiness checks (lines 23, 66, 70)—apply the same pattern here.
🛠️ Suggested fix
- run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller
+ run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller --timeout=120s📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - name: Wait for runtime images | |
| shell: bash | |
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller | |
| - name: Wait for runtime images | |
| shell: bash | |
| run: kubectl -n integr8scode rollout status daemonset/runtime-image-pre-puller --timeout=120s |
🤖 Prompt for AI Agents
In @.github/actions/e2e-ready/action.yml around lines 74 - 76, The "Wait for
runtime images" step uses `kubectl -n integr8scode rollout status
daemonset/runtime-image-pre-puller` which can hang indefinitely; add an explicit
rollout timeout flag (e.g., `--timeout=5m`) to bound the wait. Update the run
line for the "Wait for runtime images" step to call `kubectl -n integr8scode
rollout status daemonset/runtime-image-pre-puller --timeout=5m` so the CI job
fails fast instead of hanging if the DaemonSet never becomes ready.
| try: | ||
| await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace) | ||
| self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.") | ||
| await self.apps_v1.replace_namespaced_daemon_set( | ||
| name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type] | ||
| ) | ||
| except ApiException as e: |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Kubernetes Python client replace_namespaced_daemon_set resourceVersion requirement
💡 Result:
replace_namespaced_daemon_set(...) in the Kubernetes Python client performs a full-object PUT (replace) to:
PUT /apis/apps/v1/namespaces/{namespace}/daemonsets/{name}. [1]
For Kubernetes update/replace operations, the object you send is expected to include the current metadata.resourceVersion (optimistic concurrency control). If you omit it (or send a stale one), the apiserver rejects the request (commonly 422 Unprocessable Entity: metadata.resourceVersion ... must be specified for an update or 409 Conflict when it changed). [2][3]
What to do (typical pattern):
- GET the DaemonSet.
- Modify fields you want (usually under
.spec). - Call replace with the same object, preserving
metadata.resourceVersion.
from kubernetes import client
api = client.AppsV1Api()
ds = api.read_namespaced_daemon_set(name, namespace) # has metadata.resource_version
ds.spec.template.spec.containers[0].image = "repo/img:tag"
api.replace_namespaced_daemon_set(name, namespace, ds) # sends resourceVersion backIf you don’t want to manage resourceVersion / full-object replacement, use PATCH instead (patch_namespaced_daemon_set), which updates only specified fields and is commonly used for small changes. [3]
Sources: Kubernetes DaemonSet PUT/replace endpoint [1], Kubernetes API resourceVersion / optimistic concurrency behavior [2][3].
References:
[1] (dev-k8sref-io.web.app)
[2] (kubernetes.io)
[3] (stackoverflow.com)
Citations:
- 1: https://dev-k8sref-io.web.app/docs/workloads/daemonset-v1/?utm_source=openai
- 2: https://kubernetes.io/docs/reference/using-api/api-concepts/?utm_source=openai
- 3: https://stackoverflow.com/questions/47100389/what-is-the-difference-between-a-resourceversion-and-a-generation?utm_source=openai
🏁 Script executed:
cd backend && find . -name "worker.py" -type f | head -5Repository: HardMax71/Integr8sCode
Length of output: 101
🏁 Script executed:
sed -n '290,310p' backend/app/services/k8s_worker/worker.py | cat -nRepository: HardMax71/Integr8sCode
Length of output: 1235
🏁 Script executed:
sed -n '260,310p' backend/app/services/k8s_worker/worker.py | cat -nRepository: HardMax71/Integr8sCode
Length of output: 2779
Add resourceVersion from the existing DaemonSet to the manifest before replacing.
The Kubernetes API requires metadata.resourceVersion in replace operations. Without it, replace_namespaced_daemon_set will fail with a 409/422 error when the DaemonSet already exists. Currently, the code reads the existing DaemonSet but discards it instead of extracting its resourceVersion.
Suggested fix
try:
- await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
+ existing = await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace)
+ manifest["metadata"]["resourceVersion"] = existing.metadata.resource_version
self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.")
await self.apps_v1.replace_namespaced_daemon_set(
name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type]
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace) | |
| self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.") | |
| await self.apps_v1.replace_namespaced_daemon_set( | |
| name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type] | |
| ) | |
| except ApiException as e: | |
| try: | |
| existing = await self.apps_v1.read_namespaced_daemon_set(name=daemonset_name, namespace=namespace) | |
| manifest["metadata"]["resourceVersion"] = existing.metadata.resource_version | |
| self.logger.info(f"DaemonSet '{daemonset_name}' exists. Replacing to ensure it is up-to-date.") | |
| await self.apps_v1.replace_namespaced_daemon_set( | |
| name=daemonset_name, namespace=namespace, body=manifest # type: ignore[arg-type] | |
| ) | |
| except ApiException as e: |
🤖 Prompt for AI Agents
In `@backend/app/services/k8s_worker/worker.py` around lines 298 - 304, When
updating an existing DaemonSet in the try/except block, extract the existing
object's metadata.resourceVersion from the result of
self.apps_v1.read_namespaced_daemon_set(...) and set that value into the
manifest's metadata.resourceVersion before calling
replace_namespaced_daemon_set; specifically, after reading the DaemonSet, pull
existing.metadata.resource_version (or existing.metadata.resourceVersion
depending on the client object) and assign it to
manifest['metadata']['resourceVersion'] (or manifest.metadata.resourceVersion)
so the replace_namespaced_daemon_set call includes the required resourceVersion.
| # Block on a watch stream until every node has pulled all images | ||
| w = k8s_watch.Watch() | ||
| async for event in w.stream( | ||
| self.apps_v1.list_namespaced_daemon_set, | ||
| namespace=namespace, | ||
| field_selector=f"metadata.name={daemonset_name}", | ||
| ): | ||
| ds = event["object"] | ||
| desired = ds.status.desired_number_scheduled or 0 | ||
| ready = ds.status.number_ready or 0 | ||
| self.logger.info(f"DaemonSet '{daemonset_name}': {ready}/{desired} pods ready") | ||
| if desired > 0 and ready >= desired: | ||
| await w.close() |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
kubernetes_asyncio Watch stream timeout best practices
💡 Result:
What to do about watch “stream timeouts” in kubernetes_asyncio (best practices)
-
Expect watch connections to end and design for restart
- In
kubernetes_asyncio, you typically do:async for e in watch.stream(...): ...and the server-side watch can end normally (including due to timeout). Treat that as a routine reconnect path, not an error path. [1]
- In
-
Use
timeout_secondsdeliberately (server-side watch duration cap)timeout_secondsis sent to the API server and limits the duration of the watch call regardless of activity. Set it so your client periodically re-establishes the watch (helps with load balancers / proxies and keeps recovery logic exercised). [2]
-
Also set
_request_timeout(client-side HTTP timeout)_request_timeoutlimits how long the underlying HTTP request is allowed to take (connect + read). Use it to avoid hanging forever if the network path breaks. Ensure_request_timeoutis slightly larger thantimeout_seconds(plus some slack). [2]
-
Track and resume using
resourceVersion; request bookmarks to advance RV even when idle- Kubernetes’ recommended pattern is: LIST → get a
resourceVersion→ WATCH starting from thatresourceVersion. [3] - Ask for periodic
BOOKMARKevents usingallow_watch_bookmarks=True; bookmarks carry a newer.metadata.resourceVersioneven if nothing changes, so you can persist progress during quiet periods. (Don’t assume bookmarks arrive on a fixed schedule.) [2][3]
- Kubernetes’ recommended pattern is: LIST → get a
-
Handle
410 Gone(“too old resourceVersion”) by re-LISTing- API servers may return HTTP 410 when the requested
resourceVersionis older than what they retain; clients must tolerate this. The usual recovery is: do a fresh LIST (new snapshot + RV), then restart WATCH from that RV. [3]
- API servers may return HTTP 410 when the requested
-
Prefer modern “initial sync then watch” options when available
- Some endpoints support
send_initial_events=truewithwatch=true, which starts with synthetic initial events and then a bookmark that marks the end of the initial sync; when using this, Kubernetes requires settingresource_version_matchappropriately (notablyNotOlderThan). This can simplify “LIST then WATCH” into one stream for some use cases. [2]
- Some endpoints support
Minimal sketch (pattern-focused):
from kubernetes_asyncio import client, watch
from kubernetes_asyncio.client.exceptions import ApiException
w = watch.Watch()
rv = None
while True:
try:
if rv is None:
lst = await client.CoreV1Api().list_namespaced_pod(
"default",
_request_timeout=(5, 30),
)
rv = lst.metadata.resource_version
async for ev in w.stream(
client.CoreV1Api().list_namespaced_pod,
"default",
resource_version=rv,
allow_watch_bookmarks=True,
timeout_seconds=240,
_request_timeout=(5, 260),
):
obj = ev["object"]
rv = obj.metadata.resource_version # also update on BOOKMARK
except ApiException as e:
if e.status == 410:
rv = None # force re-LIST
continue
raiseKey idea: timeouts are normal; pick timeout_seconds + _request_timeout to force clean recycling, keep RV moving (bookmarks), and always recover from 410 by re-LIST + re-WATCH. [2][3]
🏁 Script executed:
# Let's examine the actual file to see the full context
find . -name "worker.py" -type f | grep k8s_workerRepository: HardMax71/Integr8sCode
Length of output: 109
🏁 Script executed:
# Search for imports and usage patterns in the file
rg -A 3 -B 3 "k8s_watch.Watch" --type pyRepository: HardMax71/Integr8sCode
Length of output: 1128
🏁 Script executed:
# Let's look at the full context of the file around lines 313-325
sed -n '280,330p' backend/app/services/k8s_worker/worker.pyRepository: HardMax71/Integr8sCode
Length of output: 2212
🏁 Script executed:
# Check the imports at the top of the file
head -50 backend/app/services/k8s_worker/worker.pyRepository: HardMax71/Integr8sCode
Length of output: 1535
🏁 Script executed:
# Search for other watch usage patterns in the codebase
rg -B 5 -A 10 "async for.*\.stream\(" --type pyRepository: HardMax71/Integr8sCode
Length of output: 2765
Add timeout and ensure watch stream always closes on errors.
The current watch blocks indefinitely if readiness never reaches the desired count, and may leak the connection if the stream raises an exception or is cancelled. A client-side timeout with asyncio.wait_for() and a finally block ensures proper cleanup in all paths.
Note: The suggested fix uses a nested async function which adds unnecessary complexity—the timeout and error handling could be applied directly to the stream loop with try/finally instead.
🛠️ Suggested fix
# Block on a watch stream until every node has pulled all images
- w = k8s_watch.Watch()
- async for event in w.stream(
- self.apps_v1.list_namespaced_daemon_set,
- namespace=namespace,
- field_selector=f"metadata.name={daemonset_name}",
- ):
- ds = event["object"]
- desired = ds.status.desired_number_scheduled or 0
- ready = ds.status.number_ready or 0
- self.logger.info(f"DaemonSet '{daemonset_name}': {ready}/{desired} pods ready")
- if desired > 0 and ready >= desired:
- await w.close()
- return
+ timeout_seconds = 600
+ w = k8s_watch.Watch()
+ try:
+ async def _wait_ready() -> None:
+ async for event in w.stream(
+ self.apps_v1.list_namespaced_daemon_set,
+ namespace=namespace,
+ field_selector=f"metadata.name={daemonset_name}",
+ ):
+ ds = event["object"]
+ desired = ds.status.desired_number_scheduled or 0
+ ready = ds.status.number_ready or 0
+ self.logger.info(f"DaemonSet '{daemonset_name}': {ready}/{desired} pods ready")
+ if desired > 0 and ready >= desired:
+ return
+ await asyncio.wait_for(_wait_ready(), timeout=timeout_seconds)
+ except asyncio.TimeoutError:
+ self.logger.error(f"Timed out waiting for DaemonSet '{daemonset_name}' to become ready")
+ raise
+ finally:
+ await w.close()🤖 Prompt for AI Agents
In `@backend/app/services/k8s_worker/worker.py` around lines 313 - 325, The watch
loop using k8s_watch.Watch() (variable w) can block forever and may leak if an
exception/cancellation occurs; wrap the stream iteration over
self.apps_v1.list_namespaced_daemon_set (filtered by daemonset_name) with a
client-side timeout (e.g., asyncio.wait_for) or an overall timeout mechanism and
ensure w.close() is always called in a finally block; inside the try block
iterate the async for events and check desired/ready as before, and in
except/asyncio.CancelledError re-raise after cleanup and log via self.logger to
record timeout/errors.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In @.github/actions/e2e-ready/action.yml:
- Around line 35-37: The "Pre-pull test runtime image into K3s" step currently
runs `sudo k3s ctr images pull docker.io/library/python:3.11-slim` with no
timeout; update that step to enforce a pull timeout (e.g., prefix the command
with the timeout utility like `timeout 5m`) so the step fails fast instead of
hanging, ensuring consistency with other steps that use explicit timeouts.
In `@backend/app/services/k8s_worker/worker.py`:
- Around line 268-270: The container_name generation can exceed Kubernetes'
63-character DNS-1123 label limit and you also have a redundant list() call;
update the loop that iterates sorted(list(all_images)) to use
sorted(all_images), normalize sanitized_image_ref to DNS-1123 rules (lowercase,
alphanumeric and hyphens, no leading/trailing hyphen), and truncate or replace
sanitized_image_ref so f"pull-{i}-{sanitized_image_ref}" is at most 63 chars —
preserve uniqueness (e.g., append or replace with a short hash/suffix of the
original) when truncating; ensure the final container_name variable meets
DNS-1123 constraints before use.
| - name: Pre-pull test runtime image into K3s | ||
| shell: bash | ||
| run: sudo k3s ctr images pull docker.io/library/python:3.11-slim |
There was a problem hiding this comment.
Add a timeout to prevent CI hangs on image pull.
The k3s ctr images pull command has no timeout. If the registry is slow or unreachable, this step can hang indefinitely. Other steps in this workflow use explicit timeouts (lines 23, 70, 74)—apply the same pattern here for consistency.
🛠️ Proposed fix
- name: Pre-pull test runtime image into K3s
shell: bash
- run: sudo k3s ctr images pull docker.io/library/python:3.11-slim
+ run: timeout 120 sudo k3s ctr images pull docker.io/library/python:3.11-slim🤖 Prompt for AI Agents
In @.github/actions/e2e-ready/action.yml around lines 35 - 37, The "Pre-pull
test runtime image into K3s" step currently runs `sudo k3s ctr images pull
docker.io/library/python:3.11-slim` with no timeout; update that step to enforce
a pull timeout (e.g., prefix the command with the timeout utility like `timeout
5m`) so the step fails fast instead of hanging, ensuring consistency with other
steps that use explicit timeouts.
| for i, image_ref in enumerate(sorted(list(all_images))): | ||
| sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-") | ||
| container_name = f"pull-{i}-{sanitized_image_ref}" |
There was a problem hiding this comment.
Container names may exceed Kubernetes 63-character limit.
Kubernetes container names must be valid DNS-1123 labels (max 63 characters). With the pattern pull-{i}-{sanitized_image_ref}, long image names could exceed this limit and cause API errors.
🛠️ Suggested fix: truncate container name
for i, image_ref in enumerate(sorted(all_images)):
sanitized_image_ref = image_ref.split("/")[-1].replace(":", "-").replace(".", "-").replace("_", "-")
- container_name = f"pull-{i}-{sanitized_image_ref}"
+ container_name = f"pull-{i}-{sanitized_image_ref}"[:63]
init_containers.append(Also, list() in sorted(list(all_images)) is redundant since sorted() already returns a list.
🤖 Prompt for AI Agents
In `@backend/app/services/k8s_worker/worker.py` around lines 268 - 270, The
container_name generation can exceed Kubernetes' 63-character DNS-1123 label
limit and you also have a redundant list() call; update the loop that iterates
sorted(list(all_images)) to use sorted(all_images), normalize
sanitized_image_ref to DNS-1123 rules (lowercase, alphanumeric and hyphens, no
leading/trailing hyphen), and truncate or replace sanitized_image_ref so
f"pull-{i}-{sanitized_image_ref}" is at most 63 chars — preserve uniqueness
(e.g., append or replace with a short hash/suffix of the original) when
truncating; ensure the final container_name variable meets DNS-1123 constraints
before use.



Summary by cubic
Removes most schema encode/decode overhead by decoding with the Confluent serializer and validating with a single DomainEventAdapter. Drops EventStore and eager schema registration; producer now persists events idempotently, services/repositories validate directly from payload/DB docs, and DLQ now uses Avro events with metadata in headers.
Performance
Migration
Written for commit 6800210. Summary will update on new commits.
Summary by CodeRabbit
Refactor
New Features
Bug Fixes
Tests
Chores