Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/actions/e2e-ready/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ runs:
cp backend/config.test.toml backend/config.toml
cp backend/secrets.example.toml backend/secrets.toml

- name: Pre-pull test runtime image into K3s
shell: bash
run: sudo k3s crictl pull docker.io/library/python:3.11-slim

- name: Wait for image pull and infra
shell: bash
run: |
Expand Down
9 changes: 4 additions & 5 deletions backend/app/api/routes/admin/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from app.api.dependencies import admin_user
from app.core.correlation import CorrelationContext
from app.domain.enums.events import EventType
from app.domain.events.event_models import EventFilter
from app.domain.replay import ReplayFilter
from app.schemas_pydantic.admin_events import (
Expand Down Expand Up @@ -69,14 +68,14 @@ async def get_event_stats(
@router.get("/export/csv")
async def export_events_csv(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"),
start_time: datetime | None = Query(None, description="Start time"),
end_time: datetime | None = Query(None, description="End time"),
limit: int = Query(default=10000, le=50000),
) -> StreamingResponse:
try:
export_filter = EventFilter(
event_types=event_types,
topics=topics,
start_time=start_time,
end_time=end_time,
)
Expand All @@ -94,7 +93,7 @@ async def export_events_csv(
@router.get("/export/json")
async def export_events_json(
service: FromDishka[AdminEventsService],
event_types: list[EventType] | None = Query(None, description="Event types (repeat param for multiple)"),
topics: list[str] | None = Query(None, description="Event topics (repeat param for multiple)"),
aggregate_id: str | None = Query(None, description="Aggregate ID filter"),
correlation_id: str | None = Query(None, description="Correlation ID filter"),
user_id: str | None = Query(None, description="User ID filter"),
Expand All @@ -106,7 +105,7 @@ async def export_events_json(
"""Export events as JSON with comprehensive filtering."""
try:
export_filter = EventFilter(
event_types=event_types,
topics=topics,
aggregate_id=aggregate_id,
correlation_id=correlation_id,
user_id=user_id,
Expand Down
4 changes: 1 addition & 3 deletions backend/app/api/routes/dlq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from app.dlq import RetryPolicy
from app.dlq.manager import DLQManager
from app.dlq.models import DLQMessageStatus
from app.domain.enums.events import EventType
from app.schemas_pydantic.dlq import (
DLQBatchRetryResponse,
DLQMessageDetail,
Expand Down Expand Up @@ -36,12 +35,11 @@ async def get_dlq_messages(
repository: FromDishka[DLQRepository],
status: DLQMessageStatus | None = Query(None),
topic: str | None = None,
event_type: EventType | None = Query(None),
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0),
) -> DLQMessagesResponse:
result = await repository.get_messages(
status=status, topic=topic, event_type=event_type, limit=limit, offset=offset
status=status, topic=topic, limit=limit, offset=offset
)

# Convert domain messages to response models using model_validate
Expand Down
59 changes: 23 additions & 36 deletions backend/app/api/routes/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from app.core.correlation import CorrelationContext
from app.core.utils import get_client_ip
from app.domain.enums.common import SortOrder
from app.domain.enums.events import EventType
from app.domain.enums.user import UserRole
from app.domain.events.event_models import EventFilter
from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata
from app.domain.events.typed import BaseEvent, EventMetadata
from app.events.core import EventPublisher
from app.schemas_pydantic.events import (
DeleteEventResponse,
EventAggregationRequest,
Expand All @@ -28,7 +28,6 @@
from app.schemas_pydantic.user import UserResponse
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
from app.services.kafka_event_service import KafkaEventService
from app.settings import Settings

router = APIRouter(prefix="/events", tags=["events"], route_class=DishkaRoute)
Expand Down Expand Up @@ -74,7 +73,7 @@ async def get_execution_events(
async def get_user_events(
current_user: Annotated[UserResponse, Depends(current_user)],
event_service: FromDishka[EventService],
event_types: list[EventType] | None = Query(None),
topics: list[str] | None = Query(None),
start_time: datetime | None = Query(None),
end_time: datetime | None = Query(None),
limit: int = Query(100, ge=1, le=1000),
Expand All @@ -84,7 +83,7 @@ async def get_user_events(
"""Get events for the current user"""
result = await event_service.get_user_events_paginated(
user_id=current_user.user_id,
event_types=event_types,
topics=topics,
start_time=start_time,
end_time=end_time,
limit=limit,
Expand All @@ -108,7 +107,7 @@ async def query_events(
event_service: FromDishka[EventService],
) -> EventListResponse:
event_filter = EventFilter(
event_types=filter_request.event_types,
topics=filter_request.topics,
aggregate_id=filter_request.aggregate_id,
correlation_id=filter_request.correlation_id,
user_id=filter_request.user_id,
Expand Down Expand Up @@ -218,10 +217,10 @@ async def get_event_statistics(
return EventStatistics.model_validate(stats)


@router.get("/{event_id}", response_model=DomainEvent)
@router.get("/{event_id}", response_model=BaseEvent)
async def get_event(
event_id: str, current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
) -> DomainEvent:
) -> BaseEvent:
"""Get a specific event by ID"""
event = await event_service.get_event(event_id=event_id, user_id=current_user.user_id, user_role=current_user.role)
if event is None:
Expand All @@ -234,28 +233,30 @@ async def publish_custom_event(
admin: Annotated[UserResponse, Depends(admin_user)],
event_request: PublishEventRequest,
request: Request,
event_service: FromDishka[KafkaEventService],
producer: FromDishka[EventPublisher],
settings: FromDishka[Settings],
) -> PublishEventResponse:
"""Publish a custom event (admin only). Creates a BaseEvent with the provided payload."""
base_meta = EventMetadata(
service_name=settings.SERVICE_NAME,
service_version=settings.SERVICE_VERSION,
user_id=admin.user_id,
ip_address=get_client_ip(request),
user_agent=request.headers.get("user-agent"),
correlation_id=event_request.correlation_id or CorrelationContext.get_correlation_id(),
)
# Merge any additional metadata provided in request (extra allowed)
if event_request.metadata:
base_meta = base_meta.model_copy(update=event_request.metadata)

event_id = await event_service.publish_event(
event_type=event_request.event_type,
payload=event_request.payload,
# Create a BaseEvent with the custom payload in metadata
event = BaseEvent(
aggregate_id=event_request.aggregate_id,
correlation_id=event_request.correlation_id,
metadata=base_meta,
)

event_id = await producer.publish(event=event, key=event_request.aggregate_id)
Comment on lines +239 to +258
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Find and inspect PublishEventRequest class definition
rg -n -A15 'class PublishEventRequest' -S

Repository: HardMax71/Integr8sCode

Length of output: 1452


🏁 Script executed:

#!/bin/bash
# Find and inspect BaseEvent class definition
rg -n -A15 'class BaseEvent' -S

Repository: HardMax71/Integr8sCode

Length of output: 2592


🏁 Script executed:

#!/bin/bash
# Check the BaseEvent model configuration for extra fields handling
rg -n -B5 -A20 'class BaseEvent' backend/app/domain/events/typed.py -S

Repository: HardMax71/Integr8sCode

Length of output: 1029


🏁 Script executed:

#!/bin/bash
# Read the exact function code around lines 239-258 in events.py
sed -n '230,260p' backend/app/api/routes/events.py

Repository: HardMax71/Integr8sCode

Length of output: 1370


🏁 Script executed:

#!/bin/bash
# Check if BaseEvent has any extra="allow" or similar configuration
rg -n 'extra' backend/app/domain/events/typed.py -A2 -B2

Repository: HardMax71/Integr8sCode

Length of output: 374


Custom event payload is dropped and will not be published.

PublishEventRequest includes a required payload field, but it is never passed to the BaseEvent constructor. Since BaseEvent doesn't have extra="allow" configured and lacks a payload field, the custom payload data will be silently discarded due to Pydantic's default behavior of ignoring unknown fields.

💡 Proposed fix (preserve payload explicitly)
-    # Create a BaseEvent with the custom payload in metadata
-    event = BaseEvent(
-        aggregate_id=event_request.aggregate_id,
-        metadata=base_meta,
-    )
+    payload = event_request.payload or {}
+    event = BaseEvent.model_validate(
+        {**payload, "aggregate_id": event_request.aggregate_id, "metadata": base_meta}
+    )

Alternatively, enable extra field passthrough on BaseEvent with extra="allow" (as done in UserSettingsUpdatedEvent).

🤖 Prompt for AI Agents
In `@backend/app/api/routes/events.py` around lines 239 - 258, The
PublishEventRequest.payload is never attached to the BaseEvent, so the custom
payload is dropped when creating event = BaseEvent(...); update the creation to
include the payload (from event_request.payload) into the BaseEvent metadata or
a dedicated payload field so the data is preserved before calling
producer.publish; alternatively change BaseEvent to allow extra fields
(extra="allow") like UserSettingsUpdatedEvent if you prefer passthrough, but the
quick fix is to pass event_request.payload into the BaseEvent constructor (or
into EventMetadata) so producer.publish receives the payload.


return PublishEventResponse(event_id=event_id, status="published", timestamp=datetime.now(timezone.utc))


Expand All @@ -275,12 +276,12 @@ async def aggregate_events(
return result.results


@router.get("/types/list", response_model=list[str])
async def list_event_types(
@router.get("/topics/list", response_model=list[str])
async def list_topics(
current_user: Annotated[UserResponse, Depends(current_user)], event_service: FromDishka[EventService]
) -> list[str]:
event_types = await event_service.list_event_types(user_id=current_user.user_id, user_role=current_user.role)
return event_types
topics = await event_service.list_topics(user_id=current_user.user_id, user_role=current_user.role)
return topics


@router.delete("/{event_id}", response_model=DeleteEventResponse)
Expand All @@ -300,7 +301,6 @@ async def delete_event(
extra={
"event_id": event_id,
"admin_email": admin.email,
"event_type": result.event_type,
"aggregate_id": result.aggregate_id,
"correlation_id": result.metadata.correlation_id,
},
Expand All @@ -316,8 +316,7 @@ async def replay_aggregate_events(
aggregate_id: str,
admin: Annotated[UserResponse, Depends(admin_user)],
event_service: FromDishka[EventService],
kafka_event_service: FromDishka[KafkaEventService],
settings: FromDishka[Settings],
producer: FromDishka[EventPublisher],
logger: FromDishka[logging.Logger],
target_service: str | None = Query(None, description="Service to replay events to"),
dry_run: bool = Query(True, description="If true, only show what would be replayed"),
Expand All @@ -331,7 +330,7 @@ async def replay_aggregate_events(
dry_run=True,
aggregate_id=aggregate_id,
event_count=replay_info.event_count,
event_types=replay_info.event_types,
topics=replay_info.topics,
start_time=replay_info.start_time,
end_time=replay_info.end_time,
)
Expand All @@ -346,21 +345,9 @@ async def replay_aggregate_events(
await asyncio.sleep(0.1)

try:
meta = EventMetadata(
service_name=settings.SERVICE_NAME,
service_version=settings.SERVICE_VERSION,
user_id=admin.user_id,
)
# Extract payload fields (exclude base event fields + event_type discriminator)
base_fields = set(BaseEvent.model_fields.keys()) | {"event_type"}
extra_fields = {k: v for k, v in event.model_dump().items() if k not in base_fields}
await kafka_event_service.publish_event(
event_type=event.event_type,
payload=extra_fields,
aggregate_id=aggregate_id,
correlation_id=replay_correlation_id,
metadata=meta,
)
# Update correlation_id for replay tracking
event.metadata.correlation_id = replay_correlation_id
await producer.publish(event=event, key=aggregate_id)
replayed_count += 1
except Exception as e:
logger.error(f"Failed to replay event {event.event_id}: {e}")
Expand Down
39 changes: 10 additions & 29 deletions backend/app/api/routes/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
from app.api.dependencies import admin_user, current_user
from app.core.tracing import EventAttributes, add_span_attributes
from app.core.utils import get_client_ip
from app.domain.enums.events import EventType
from app.domain.enums.execution import ExecutionStatus
from app.domain.enums.user import UserRole
from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata
from app.domain.events.typed import BaseEvent, EventMetadata
from app.domain.exceptions import DomainError
from app.domain.idempotency import KeyStrategy
from app.schemas_pydantic.execution import (
Expand All @@ -32,8 +31,6 @@
from app.services.event_service import EventService
from app.services.execution_service import ExecutionService
from app.services.idempotency import IdempotencyManager
from app.services.kafka_event_service import KafkaEventService
from app.settings import Settings

router = APIRouter(route_class=DishkaRoute, tags=["execution"])

Expand Down Expand Up @@ -76,10 +73,9 @@ async def create_execution(
# Handle idempotency if key provided
pseudo_event = None
if idempotency_key:
# Create a pseudo-event for idempotency tracking
# Create a pseudo-event for idempotency tracking (just needs event_id for key generation)
pseudo_event = BaseEvent(
event_id=str(uuid4()),
event_type=EventType.EXECUTION_REQUESTED,
timestamp=datetime.now(timezone.utc),
metadata=EventMetadata(
user_id=current_user.user_id, correlation_id=str(uuid4()), service_name="api", service_version="1.0.0"
Expand Down Expand Up @@ -160,8 +156,7 @@ async def cancel_execution(
execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
current_user: Annotated[UserResponse, Depends(current_user)],
cancel_request: CancelExecutionRequest,
event_service: FromDishka[KafkaEventService],
settings: FromDishka[Settings],
execution_service: FromDishka[ExecutionService],
) -> CancelResponse:
# Handle terminal states
terminal_states = [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.TIMEOUT]
Expand All @@ -175,32 +170,18 @@ async def cancel_execution(
execution_id=execution.execution_id,
status="already_cancelled",
message="Execution was already cancelled",
event_id="-1", # exact event_id unknown
)

payload = {
"execution_id": execution.execution_id,
"status": str(ExecutionStatus.CANCELLED),
"reason": cancel_request.reason or "User requested cancellation",
"previous_status": str(execution.status),
}
meta = EventMetadata(
service_name=settings.SERVICE_NAME,
service_version=settings.SERVICE_VERSION,
await execution_service.cancel_execution(
execution_id=execution.execution_id,
reason=cancel_request.reason or "User requested cancellation",
user_id=current_user.user_id,
)
event_id = await event_service.publish_event(
event_type=EventType.EXECUTION_CANCELLED,
payload=payload,
aggregate_id=execution.execution_id,
metadata=meta,
)

return CancelResponse(
execution_id=execution.execution_id,
status="cancellation_requested",
message="Cancellation request submitted",
event_id=event_id,
)


Expand Down Expand Up @@ -231,16 +212,16 @@ async def retry_execution(
return ExecutionResponse.model_validate(new_result)


@router.get("/executions/{execution_id}/events", response_model=list[DomainEvent])
@router.get("/executions/{execution_id}/events", response_model=list[BaseEvent])
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0: The response_model=list[BaseEvent] will cause data loss in the API response. BaseEvent does not include subclass-specific fields (like script, exit_code, reason) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/execution.py, line 215:

<comment>The `response_model=list[BaseEvent]` will cause data loss in the API response. `BaseEvent` does not include subclass-specific fields (like `script`, `exit_code`, `reason`) and does not allow extra fields. Consequently, Pydantic will strip all these fields from the output, returning only the base metadata (id, timestamp, etc.). This makes the endpoint useless for inspecting event details.</comment>

<file context>
@@ -231,16 +212,16 @@ async def retry_execution(
 
 
-@router.get("/executions/{execution_id}/events", response_model=list[DomainEvent])
+@router.get("/executions/{execution_id}/events", response_model=list[BaseEvent])
 async def get_execution_events(
         execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
</file context>
Fix with Cubic

async def get_execution_events(
execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
event_service: FromDishka[EventService],
event_types: list[EventType] | None = Query(None, description="Event types to filter"),
topics: list[str] | None = Query(None, description="Event topics to filter"),
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Replacing event_types with topics is a breaking API change. Existing clients sending event_types will have their filter ignored (receiving all events instead of a subset), and they must now update to use snake_case topic names instead of the previous enum values. Consider supporting event_types for backward compatibility or documenting this break.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/execution.py, line 219:

<comment>Replacing `event_types` with `topics` is a breaking API change. Existing clients sending `event_types` will have their filter ignored (receiving all events instead of a subset), and they must now update to use snake_case topic names instead of the previous enum values. Consider supporting `event_types` for backward compatibility or documenting this break.</comment>

<file context>
@@ -231,16 +212,16 @@ async def retry_execution(
         execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)],
         event_service: FromDishka[EventService],
-        event_types: list[EventType] | None = Query(None, description="Event types to filter"),
+        topics: list[str] | None = Query(None, description="Event topics to filter"),
         limit: int = Query(100, ge=1, le=1000),
-) -> list[DomainEvent]:
</file context>
Fix with Cubic

limit: int = Query(100, ge=1, le=1000),
) -> list[DomainEvent]:
) -> list[BaseEvent]:
"""Get all events for an execution."""
events = await event_service.get_events_by_aggregate(
aggregate_id=execution.execution_id, event_types=event_types, limit=limit
aggregate_id=execution.execution_id, topics=topics, limit=limit
)
return events

Expand Down
Loading
Loading