-
Notifications
You must be signed in to change notification settings - Fork 0
fix: removed most of schema encode/decode overhead #135
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4889721
61ef83a
748abbf
333c86f
ed71b67
c4f1f86
b8c0123
eda7419
6800210
20288b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,10 +9,9 @@ | |
| from app.api.dependencies import admin_user, current_user | ||
| from app.core.tracing import EventAttributes, add_span_attributes | ||
| from app.core.utils import get_client_ip | ||
| from app.domain.enums.events import EventType | ||
| from app.domain.enums.execution import ExecutionStatus | ||
| from app.domain.enums.user import UserRole | ||
| from app.domain.events.typed import BaseEvent, DomainEvent, EventMetadata | ||
| from app.domain.events.typed import BaseEvent, EventMetadata | ||
| from app.domain.exceptions import DomainError | ||
| from app.domain.idempotency import KeyStrategy | ||
| from app.schemas_pydantic.execution import ( | ||
|
|
@@ -32,8 +31,6 @@ | |
| from app.services.event_service import EventService | ||
| from app.services.execution_service import ExecutionService | ||
| from app.services.idempotency import IdempotencyManager | ||
| from app.services.kafka_event_service import KafkaEventService | ||
| from app.settings import Settings | ||
|
|
||
| router = APIRouter(route_class=DishkaRoute, tags=["execution"]) | ||
|
|
||
|
|
@@ -76,10 +73,9 @@ async def create_execution( | |
| # Handle idempotency if key provided | ||
| pseudo_event = None | ||
| if idempotency_key: | ||
| # Create a pseudo-event for idempotency tracking | ||
| # Create a pseudo-event for idempotency tracking (just needs event_id for key generation) | ||
| pseudo_event = BaseEvent( | ||
| event_id=str(uuid4()), | ||
| event_type=EventType.EXECUTION_REQUESTED, | ||
| timestamp=datetime.now(timezone.utc), | ||
| metadata=EventMetadata( | ||
| user_id=current_user.user_id, correlation_id=str(uuid4()), service_name="api", service_version="1.0.0" | ||
|
|
@@ -160,8 +156,7 @@ async def cancel_execution( | |
| execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], | ||
| current_user: Annotated[UserResponse, Depends(current_user)], | ||
| cancel_request: CancelExecutionRequest, | ||
| event_service: FromDishka[KafkaEventService], | ||
| settings: FromDishka[Settings], | ||
| execution_service: FromDishka[ExecutionService], | ||
| ) -> CancelResponse: | ||
| # Handle terminal states | ||
| terminal_states = [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED, ExecutionStatus.TIMEOUT] | ||
|
|
@@ -175,32 +170,18 @@ async def cancel_execution( | |
| execution_id=execution.execution_id, | ||
| status="already_cancelled", | ||
| message="Execution was already cancelled", | ||
| event_id="-1", # exact event_id unknown | ||
| ) | ||
|
|
||
| payload = { | ||
| "execution_id": execution.execution_id, | ||
| "status": str(ExecutionStatus.CANCELLED), | ||
| "reason": cancel_request.reason or "User requested cancellation", | ||
| "previous_status": str(execution.status), | ||
| } | ||
| meta = EventMetadata( | ||
| service_name=settings.SERVICE_NAME, | ||
| service_version=settings.SERVICE_VERSION, | ||
| await execution_service.cancel_execution( | ||
| execution_id=execution.execution_id, | ||
| reason=cancel_request.reason or "User requested cancellation", | ||
| user_id=current_user.user_id, | ||
| ) | ||
| event_id = await event_service.publish_event( | ||
| event_type=EventType.EXECUTION_CANCELLED, | ||
| payload=payload, | ||
| aggregate_id=execution.execution_id, | ||
| metadata=meta, | ||
| ) | ||
|
|
||
| return CancelResponse( | ||
| execution_id=execution.execution_id, | ||
| status="cancellation_requested", | ||
| message="Cancellation request submitted", | ||
| event_id=event_id, | ||
| ) | ||
|
|
||
|
|
||
|
|
@@ -231,16 +212,16 @@ async def retry_execution( | |
| return ExecutionResponse.model_validate(new_result) | ||
|
|
||
|
|
||
| @router.get("/executions/{execution_id}/events", response_model=list[DomainEvent]) | ||
| @router.get("/executions/{execution_id}/events", response_model=list[BaseEvent]) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P0: The Prompt for AI agents |
||
| async def get_execution_events( | ||
| execution: Annotated[ExecutionInDB, Depends(get_execution_with_access)], | ||
| event_service: FromDishka[EventService], | ||
| event_types: list[EventType] | None = Query(None, description="Event types to filter"), | ||
| topics: list[str] | None = Query(None, description="Event topics to filter"), | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. P1: Replacing Prompt for AI agents |
||
| limit: int = Query(100, ge=1, le=1000), | ||
| ) -> list[DomainEvent]: | ||
| ) -> list[BaseEvent]: | ||
| """Get all events for an execution.""" | ||
| events = await event_service.get_events_by_aggregate( | ||
| aggregate_id=execution.execution_id, event_types=event_types, limit=limit | ||
| aggregate_id=execution.execution_id, topics=topics, limit=limit | ||
| ) | ||
| return events | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
Repository: HardMax71/Integr8sCode
Length of output: 1452
🏁 Script executed:
Repository: HardMax71/Integr8sCode
Length of output: 2592
🏁 Script executed:
Repository: HardMax71/Integr8sCode
Length of output: 1029
🏁 Script executed:
Repository: HardMax71/Integr8sCode
Length of output: 1370
🏁 Script executed:
Repository: HardMax71/Integr8sCode
Length of output: 374
Custom event payload is dropped and will not be published.
PublishEventRequestincludes a requiredpayloadfield, but it is never passed to theBaseEventconstructor. SinceBaseEventdoesn't haveextra="allow"configured and lacks a payload field, the custom payload data will be silently discarded due to Pydantic's default behavior of ignoring unknown fields.💡 Proposed fix (preserve payload explicitly)
Alternatively, enable extra field passthrough on
BaseEventwithextra="allow"(as done inUserSettingsUpdatedEvent).🤖 Prompt for AI Agents