diff --git a/.flake8 b/.flake8 index 06a52ca..f32ded8 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] -max-line-length = 115 \ No newline at end of file +max-line-length = 115 +ignore = F403, F401, F405, W503, E302 diff --git a/example/DEMO_WORKFLOWS.md b/example/DEMO_WORKFLOWS.md new file mode 100644 index 0000000..420a418 --- /dev/null +++ b/example/DEMO_WORKFLOWS.md @@ -0,0 +1,224 @@ +# Durable Execution Engine SDK - Demo Workflows + +This document provides a comprehensive overview of all workflows in the demo application, including their failure characteristics, retry configurations, and expected behavior. + +## Overview + +The demo showcases three main services with multiple workflows that demonstrate durable execution patterns, retry mechanisms, and error handling: + +- **Orders Service**: Complex multi-step workflows with dependent actions +- **Users Service**: User management with notifications +- **Payments Service**: Payment processing and refund handling + +## Services and Workflows + +### 🛒 Orders Service (`orders`) + +#### 1. Process Order (`process_order`) +**Endpoint**: `POST /execute/orders/process_order` + +A complex multi-step workflow demonstrating sequential action execution with different retry strategies. + +**Input**: `OrderInput` +```json +{ + "order_id": "string", + "customer_email": "string", + "items": [ + { + "id": "string", + "quantity": number, + "price": number + } + ], + "total_amount": number +} +``` + +**Workflow Steps**: +1. **Payment Validation** (8s processing time) + - Action: `validate_payment` + - Failure Rate: **50%** + - Max Retries: 3 + - Retry Mechanism: Exponential + - Error: "Payment validation failed" + +2. **Inventory Reservation** (10s processing time per item) + - Action: `reserve_inventory` (executed for each item) + - Failure Rate: **50%** + - Max Retries: 2 + - Retry Mechanism: Linear + - Error: "Insufficient inventory for {item_id}" + - Custom Action Names: `reserve_inventory_0`, `reserve_inventory_1`, etc. + +3. **Order Confirmation Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Constant + - Recipient: Customer email + +**Expected Behavior**: Due to high failure rates in payment and inventory steps, this workflow frequently requires retries and may fail entirely if retries are exhausted. + +#### 2. Get Order Status (`get_order_status`) +**Endpoint**: `POST /execute/orders/get_order_status` + +A simple status check workflow that always succeeds. + +**Input**: `OrderStatusInput` +```json +{ + "order_id": "string" +} +``` + +**Workflow Steps**: +1. **Status Check** (5s processing time) + - Action: `check_order_status` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 1 + - Retry Mechanism: Constant + - Returns random status: "pending", "processing", "shipped", or "delivered" + +**Expected Behavior**: Reliable workflow that demonstrates successful action execution. + +### 👤 Users Service (`users`) + +#### 1. Register User (`register_user`) +**Endpoint**: `POST /execute/users/register_user` + +User registration workflow with welcome notification. + +**Input**: `UserInput` +```json +{ + "email": "string", + "username": "string", + "password": "string" +} +``` + +**Workflow Steps**: +1. **User Creation** (7s processing time) + - Action: `create_user` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Exponential + +2. **Welcome Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 1 + - Retry Mechanism: Constant + - Message: "Welcome {username}!" + +**Expected Behavior**: Highly reliable workflow that consistently succeeds. + +### 💳 Payments Service (`payments`) + +#### 1. Process Refund (`process_refund`) +**Endpoint**: `POST /execute/payments/process_refund` + +Complex refund processing with order verification and notifications. + +**Input**: `RefundInput` +```json +{ + "order_id": "string", + "amount": number, + "reason": "string" +} +``` + +**Workflow Steps**: +1. **Pre-Refund Order Check** (5s processing time) + - Action: `check_order_status` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Constant + - Custom Action Name: `pre_refund_order_check` + +2. **Refund Processing** (9s processing time) + - Action: `process_refund` + - Failure Rate: **90%** (Very high failure rate) + - Max Retries: 3 + - Retry Mechanism: Exponential + - Error: "Refund processing failed" + +3. **Finance Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Linear + - Recipient: "finance@company.com" + +**Expected Behavior**: This workflow has the highest failure rate and will frequently exhaust retries due to the 90% failure rate in refund processing. + +#### 2. Verify Payment and Notify (`verify_payment_and_notify`) +**Endpoint**: `POST /execute/payments/verify_payment_and_notify` + +Payment verification with admin notification. + +**Input**: `PaymentInput` +```json +{ + "amount": number, + "payment_method": "string" +} +``` + +**Workflow Steps**: +1. **Payment Validation** (8s processing time) + - Action: `validate_payment` + - Failure Rate: **50%** + - Max Retries: 3 + - Retry Mechanism: Exponential + - Custom Action Name: `primary_payment_validation` + +2. **Admin Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Linear + - Recipient: "admin@company.com" + +**Expected Behavior**: Moderate failure rate due to payment validation step. + +## Action Failure Summary + +| Action | Processing Time | Failure Rate | Typical Error | +|--------|----------------|--------------|---------------| +| `validate_payment` | 8 seconds | **50%** | "Payment validation failed" | +| `reserve_inventory` | 10 seconds | **50%** | "Insufficient inventory for {item_id}" | +| `process_refund` | 9 seconds | **90%** | "Refund processing failed" | +| `send_notification` | 6 seconds | **0%** | Never fails | +| `create_user` | 7 seconds | **0%** | Never fails | +| `check_order_status` | 5 seconds | **0%** | Never fails | + +## Retry Mechanisms + +The demo showcases three different retry mechanisms: + +1. **Exponential**: Exponentially increasing delays between retries +2. **Linear**: Fixed incremental delays between retries +3. **Constant**: Fixed delay between retries + +## Testing the Demo + +### High Success Rate Workflows +- `users/register_user` - Should consistently succeed +- `orders/get_order_status` - Always succeeds + +### Moderate Failure Rate Workflows +- `orders/process_order` - 50% failure rate on payment and inventory +- `payments/verify_payment_and_notify` - 50% failure rate on payment + +### High Failure Rate Workflows +- `payments/process_refund` - 90% failure rate on refund processing + +### Recommended Test Scenarios + +1. **Success Path**: Call `register_user` or `get_order_status` +2. **Retry Demonstration**: Call `process_order` or `verify_payment_and_notify` multiple times +3. **Failure Demonstration**: Call `process_refund` to see retry exhaustion +4. **Complex Workflow**: Call `process_order` with multiple items to see parallel inventory reservations diff --git a/example/Dockerfile b/example/Dockerfile index f7f00b4..0051cad 100644 --- a/example/Dockerfile +++ b/example/Dockerfile @@ -1,37 +1,35 @@ -# Stage 1: Build stage FROM python:3.10-slim AS builder -# Set the working directory WORKDIR /app -# Copy the requirements file into the container -COPY requirements.txt . - -# Create a virtual environment and install dependencies -RUN python -m venv /opt/venv -ENV PATH="/opt/venv/bin:$PATH" +COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt -# Copy the application code -COPY . . +COPY ../src ./src +COPY ./ ./example -# Stage 2: Final stage FROM python:3.10-slim -# Set the working directory to /app/Example -WORKDIR /app/Example +WORKDIR /app + +COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin -# Copy only the virtual environment from the builder stage -COPY --from=builder /opt/venv /opt/venv -# Copy the application code into the Example directory -COPY . . +COPY src/ ./src/ +COPY example/ ./example/ -# Set the environment variables -ENV PATH="/opt/venv/bin:$PATH" +ENV DURABLE_ENGINE_BASE_URL=http://host.docker.internal:8080/api/v1 +ENV PYTHONPATH=/app/src:/app +ENV PYTHONUNBUFFERED=1 + +ENV LOG_LEVEL=DEBUG -# Expose the port the app runs on EXPOSE 8000 -# Command to run the application using Uvicorn -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/discover || exit 1 + +WORKDIR /app/example + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/example/app/__init__.py b/example/app/__init__.py deleted file mode 100644 index 339827c..0000000 --- a/example/app/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# This file is intentionally left empty. diff --git a/example/app/main.py b/example/app/main.py deleted file mode 100644 index ba72bb6..0000000 --- a/example/app/main.py +++ /dev/null @@ -1,7 +0,0 @@ -# flake8: noqa -# from fastapi import FastAPI - -# # Define a route for the root URL ("/") -# @app.get("/") -# def hello_world(): -# return {"message": "Hello, World! This is a Python app running in Docker with Uvicorn."} diff --git a/example/demo/__init__.py b/example/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/demo/actions.py b/example/demo/actions.py new file mode 100644 index 0000000..05ddf1e --- /dev/null +++ b/example/demo/actions.py @@ -0,0 +1,79 @@ +import asyncio +import random +import time + +from .models import * + + +def validate_payment_action(input_data: PaymentInput) -> PaymentResult: + time.sleep(8) + + if random.random() < 0.5: + raise Exception("Payment validation failed") + + return PaymentResult( + payment_id=f"pay_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="validated", + ) + + +def reserve_inventory_action(input_data: InventoryInput) -> InventoryResult: + time.sleep(10) + + if random.random() < 0.50: + raise Exception(f"Insufficient inventory for {input_data.item_id}") + + return InventoryResult( + reservation_id=f"res_{random.randint(1000, 9999)}", + item_id=input_data.item_id, + quantity=input_data.quantity, + status="reserved", + ) + + +async def send_notification_action( + input_data: NotificationInput, +) -> NotificationResult: + await asyncio.sleep(6) + + return NotificationResult( + notification_id=f"notif_{random.randint(1000, 9999)}", + recipient=input_data.recipient, + status="sent", + ) + + +def create_user_action(input_data: UserInput) -> UserResult: + time.sleep(7) + + return UserResult( + user_id=f"user_{random.randint(1000, 9999)}", + email=input_data.email, + status="active", + ) + + +def process_refund_action(input_data: RefundInput) -> RefundResult: + time.sleep(9) + + if random.random() < 0.9: + raise Exception("Refund processing failed") + + return RefundResult( + refund_id=f"ref_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="processed", + ) + + +def check_order_status_action(order_id: str) -> dict: + time.sleep(5) + + return { + "order_id": order_id, + "status": random.choice( + ["pending", "processing", "shipped", "delivered"] + ), + "tracking_number": f"TRK{random.randint(100000, 999999)}", + } diff --git a/example/demo/models.py b/example/demo/models.py new file mode 100644 index 0000000..b78ce64 --- /dev/null +++ b/example/demo/models.py @@ -0,0 +1,85 @@ +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class OrderItem(BaseModel): + id: str + quantity: int = Field(gt=0) + price: float = Field(gt=0) + + +class OrderInput(BaseModel): + order_id: str + customer_email: str + items: List[OrderItem] + total_amount: float = Field(gt=0) + + +class PaymentInput(BaseModel): + amount: float = Field(gt=0) + payment_method: str = "credit_card" + + +class PaymentResult(BaseModel): + payment_id: str + amount: float + status: str + + +class InventoryInput(BaseModel): + item_id: str + quantity: int = Field(gt=0) + + +class InventoryResult(BaseModel): + reservation_id: str + item_id: str + quantity: int + status: str + + +class NotificationInput(BaseModel): + recipient: str + message: str + type: str = "email" + + +class NotificationResult(BaseModel): + notification_id: str + recipient: str + status: str + + +class UserInput(BaseModel): + email: str + username: str = Field(min_length=3) + password: str = Field(min_length=6) + + +class UserResult(BaseModel): + user_id: str + email: str + status: str + + +class OrderStatusInput(BaseModel): + order_id: str + + +class OrderStatusResult(BaseModel): + order_id: str + status: str + tracking_number: Optional[str] = None + + +class RefundInput(BaseModel): + order_id: str + amount: float = Field(gt=0) + reason: str = "Customer request" + + +class RefundResult(BaseModel): + refund_id: str + amount: float + status: str diff --git a/example/main.py b/example/main.py new file mode 100644 index 0000000..7ea50db --- /dev/null +++ b/example/main.py @@ -0,0 +1,208 @@ +import asyncio +import logging + +import uvicorn +from demo.actions import * +from demo.models import * +from fastapi import FastAPI + +from app import DurableApp, Service, WorkflowContext +from app.types import RetryMechanism + +# logging to show all levels +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) + + +def main(): + app = FastAPI(title="Durable Execution Demo", version="1.0.0") + + order_service = Service("orders") + user_service = Service("users") + payment_service = Service("payments") + + @order_service.workflow() + async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: + await asyncio.sleep(0.5) + + payment_result = await ctx.execute_action( + action=validate_payment_action, + input_data=PaymentInput( + amount=input.total_amount, payment_method="credit_card" + ), + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL, + ) + + await asyncio.sleep(0.3) + + reservations = [] + for idx, item in enumerate(input.items): + reservation = await ctx.execute_action( + action=reserve_inventory_action, + input_data=InventoryInput( + item_id=item.id, quantity=item.quantity + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR, + action_name=f"reserve_inventory_{idx}", + ) + reservations.append(reservation) + + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification_action, + input_data=NotificationInput( + recipient=input.customer_email, + message=f"Order {input.order_id} confirmed", + type="email", + ), + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT, + ) + + return { + "order_id": input.order_id, + "status": "completed", + "payment": payment_result, + "reservations": reservations, + "notification": notification_result, + } + + @order_service.workflow() + def get_order_status( + ctx: WorkflowContext, input: OrderStatusInput + ) -> OrderStatusResult: + result = ctx.execute_action( + action=check_order_status_action, + input_data=input.order_id, + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT, + ) + + return OrderStatusResult( + order_id=result["order_id"], + status=result["status"], + tracking_number=result.get("tracking_number"), + ) + + @user_service.workflow() + async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: + await asyncio.sleep(0.5) + + user_result = await ctx.execute_action( + action=create_user_action, + input_data=input, + max_retries=2, + retry_mechanism=RetryMechanism.EXPONENTIAL, + ) + + notification_result = await ctx.execute_action( + action=send_notification_action, + input_data=NotificationInput( + recipient=input.email, + message=f"Welcome {input.username}!", + type="email", + ), + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT, + ) + + return { + "success": True, + "user": user_result, + "notification": notification_result, + } + + @payment_service.workflow() + async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: + await asyncio.sleep(0.3) + order_status = await ctx.execute_action( + action=check_order_status_action, + input_data=input.order_id, + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT, + action_name="pre_refund_order_check", + ) + + await asyncio.sleep(0.2) + + refund_result = await ctx.execute_action( + action=process_refund_action, + input_data=input, + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL, + ) + + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification_action, + input_data=NotificationInput( + recipient="finance@company.com", + message=( + f"Refund processed: ${refund_result.amount} for order " + f"{input.order_id}. Refund ID: {refund_result.refund_id}" + ), + type="email", + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR, + ) + + return { + "order_id": input.order_id, + "order_status": order_status, + "refund": refund_result, + "notification": notification_result, + "status": "completed", + } + + @payment_service.workflow() + async def verify_payment_and_notify( + ctx: WorkflowContext, input: PaymentInput + ) -> dict: + await asyncio.sleep(0.3) + + payment_result = await ctx.execute_action( + action=validate_payment_action, + input_data=input, + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL, + action_name="primary_payment_validation", + ) + + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification_action, + input_data=NotificationInput( + recipient="admin@company.com", + message=f"Payment of ${payment_result.amount} validated with ID {payment_result.payment_id}", + type="email", + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR, + ) + + return { + "payment_id": payment_result.payment_id, + "amount": payment_result.amount, + "payment_status": payment_result.status, + "notification": notification_result, + "workflow_status": "completed", + } + + DurableApp(app) + + return app + + +if __name__ == "__main__": + app = main() + print("🚀 Demo Server Starting...") + print("Services: orders, users, payments") + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ccdbdf2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.115.12,<1.0.0 +pydantic>=1.0.0,<2.0.0 +requests>=2.32.4,<3.0.0 +httpx>=0.28.1,<1.0.0 +uvicorn>=0.32.1,<1.0.0 \ No newline at end of file diff --git a/src/app/__init__.py b/src/app/__init__.py index ce23bf1..19f1842 100644 --- a/src/app/__init__.py +++ b/src/app/__init__.py @@ -1,14 +1,14 @@ from .app import DurableApp from .service import Service -from .workflow_context import WorkflowContext from .types import ( EndureException, ErrorResponse, - Response, Log, LogStatus, + Response, RetryMechanism, ) +from .workflow_context import WorkflowContext __all__ = [ "DurableApp", diff --git a/src/app/_internal/__init__.py b/src/app/_internal/__init__.py index 0dffed3..15040b9 100644 --- a/src/app/_internal/__init__.py +++ b/src/app/_internal/__init__.py @@ -2,6 +2,7 @@ Internal implementation details. Do not import directly. This module is for internal use by app.py, service.py, and workflow_context.py only. """ + from .internal_client import InternalEndureClient from .service_registry import ServiceRegistry from .utils import validate_retention_period diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index fa01686..0759dfd 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -1,6 +1,8 @@ -import os import logging +import os + import requests + from ..types import Log, Response @@ -26,12 +28,21 @@ def send_log(self, execution_id: str, log: Log, action_name: str): requests.exceptions.HTTPError: If the request fails. """ # noqa: E501 try: + logging.info( + f"Attempting to send log to engine - Execution ID: {execution_id}, Action: {action_name}" + ) + logging.info(f"Base URL: {self._base_url}") + if not self._base_url: + logging.error( + "DURABLE_ENGINE_BASE_URL is not set in environment variables." + ) raise ValueError( "DURABLE_ENGINE_BASE_URL is not set in environment variables." ) if not log or not action_name: + logging.error("log and action_name must be provided.") raise ValueError("log and action_name must be provided.") url = ( @@ -39,30 +50,84 @@ def send_log(self, execution_id: str, log: Log, action_name: str): ) headers = {"Content-Type": "application/json"} payload = log.to_dict() + + logging.info(f"Making request to: {url}") + logging.info(f"Request headers: {headers}") + logging.info(f"Request payload: {payload}") + response = requests.patch(url, headers=headers, json=payload) + logging.info( + "Log sent to the Durable Execution Engine: {}".format(log) + ) + logging.info(f"Response status code: {response.status_code}") + # Safety check for headers attribute (for MockResponse in tests) + if hasattr(response, 'headers'): + logging.info(f"Response headers: {dict(response.headers)}") + else: + logging.info("Response headers: Not available (MockResponse)") + logging.info("Response after sending log: {}".format(response)) + response.raise_for_status() try: response_payload = response.json() - except ValueError: + logging.info("Response payload: {}".format(response_payload)) + except ValueError as e: + logging.error("Error parsing response payload: {}".format(e)) + if hasattr(response, 'text'): + logging.error(f"Raw response text: {response.text}") + else: + logging.error("Raw response text: Not available (MockResponse)") response_payload = {} response = Response( status_code=response.status_code, payload=response_payload, ) except requests.exceptions.HTTPError as e: + logging.error(f"HTTP ERROR: Status {e.response.status_code}") + logging.error(f"HTTP ERROR: URL {e.response.url}") + # Safety check for headers attribute (for MockResponse in tests) + if hasattr(e.response, 'headers'): + logging.error(f"HTTP ERROR: Headers {dict(e.response.headers)}") + else: + logging.error("HTTP ERROR: Headers Not available (MockResponse)") + # Safety check for text attribute (for MockResponse in tests) + if hasattr(e.response, 'text'): + logging.error(f"HTTP ERROR: Text {e.response.text}") + else: + logging.error("HTTP ERROR: Text Not available (MockResponse)") try: error_payload = e.response.json() - except Exception: + logging.info( + "Error payload: {}".format(error_payload) + ) + except Exception as parse_error: error_payload = {} + # Safety check for text attribute (for MockResponse in tests) + if hasattr(e.response, 'text'): + logging.error( + f"Error parsing error payload: {parse_error}. Raw text: {e.response.text}" + ) + else: + logging.error( + f"Error parsing error payload: {parse_error}. Raw text: Not available (MockResponse)" + ) response = Response( status_code=e.response.status_code, payload=error_payload, ) except requests.exceptions.RequestException as e: logging.error( - "Engine is unreachable. Aborting retries: {}".format(e) + f"NETWORK ERROR: Engine is unreachable. Error type: {type(e).__name__}" + ) + logging.error(f"NETWORK ERROR: Error details: {e}") + raise e + except Exception as e: + logging.error( + f"UNEXPECTED ERROR in send_log: {type(e).__name__}: {e}" ) raise e + + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() @classmethod @@ -81,24 +146,60 @@ def mark_execution_as_running(self, execution_id: str): requests.exceptions.HTTPError: If the request fails. """ try: + logging.info( + f"Attempting to mark execution as running - Execution ID: {execution_id}" + ) + logging.info(f"Base URL: {self._base_url}") + if not self._base_url: + logging.error( + "DURABLE_ENGINE_BASE_URL is not set in environment variables." + ) raise ValueError( "DURABLE_ENGINE_BASE_URL is not set in environment variables." ) url = f"{self._base_url}/executions/{execution_id}/started" headers = {"Content-Type": "application/json"} + + logging.info(f"Making request to: {url}") + logging.info(f"Request headers: {headers}") + response = requests.patch(url, headers=headers) + logging.info( + "Execution marked as running: {}".format(response) + ) + logging.info(f"Response status code: {response.status_code}") + if hasattr(response, 'headers'): + logging.info(f"Response headers: {dict(response.headers)}") + else: + logging.info("Response headers: Not available (MockResponse)") + logging.info( + "Response after marking execution as running: {}".format(response) + ) response.raise_for_status() response = Response( status_code=response.status_code, ) except requests.exceptions.HTTPError as e: + logging.error( + f"HTTP ERROR marking execution as running: Status {e.response.status_code}" + ) + logging.error(f"HTTP ERROR: URL {e.response.url}") + logging.error(f"HTTP ERROR: Text {e.response.text}") response = Response( status_code=e.response.status_code, ) except requests.exceptions.RequestException as e: logging.error( - "Engine is unreachable. Aborting retries: {}".format(e) + f"NETWORK ERROR: Engine is unreachable. Error type: {type(e).__name__}" ) + logging.error(f"NETWORK ERROR: Error details: {e}") raise e + except Exception as e: + logging.error( + f"UNEXPECTED ERROR in mark_execution_as_running: {type(e).__name__}: {e}" + ) + raise e + + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() diff --git a/src/app/_internal/utils.py b/src/app/_internal/utils.py index 54c77fc..a878372 100644 --- a/src/app/_internal/utils.py +++ b/src/app/_internal/utils.py @@ -1,3 +1,9 @@ +from dataclasses import asdict, is_dataclass +from typing import Any + +from pydantic import BaseModel + + def validate_retention_period(retention: int) -> None: """ Validate that the retention period is a non-negative integer. @@ -14,3 +20,44 @@ def validate_retention_period(retention: int) -> None: raise ValueError("Retention period must be a non-negative integer.") if retention > 30: raise ValueError("Retention period cannot exceed 30 days.") + + +def serialize_data(data: Any) -> Any: + """ + Recursively serialize Pydantic models and dataclasses to JSON-compatible dictionaries. + + Args: + data: The data to serialize (can be any type) + + Returns: + The serialized data with all Pydantic models/dataclasses converted to dicts + + Examples: + >>> serialize_data(UserModel(name="John", age=30)) + {"name": "John", "age": 30} + + >>> serialize_data({"user": UserModel(name="John"), "count": 5}) + {"user": {"name": "John"}, "count": 5} + + >>> serialize_data([UserModel(name="John"), UserModel(name="Jane")]) + [{"name": "John"}, {"name": "Jane"}] + """ + if isinstance(data, BaseModel): + # Handle both Pydantic v1 (.dict()) and v2 (.model_dump()) + if hasattr(data, "model_dump"): + return data.model_dump() + elif hasattr(data, "dict"): + return data.dict() + else: + # Fallback: try to convert to dict manually + return {field: getattr(data, field) for field in data.__fields__} + elif is_dataclass(data): + return asdict(data) + elif isinstance(data, dict): + return {key: serialize_data(value) for key, value in data.items()} + elif isinstance(data, list): + return [serialize_data(item) for item in data] + elif isinstance(data, tuple): + return tuple(serialize_data(item) for item in data) + else: + return data diff --git a/src/app/_internal/workflow.py b/src/app/_internal/workflow.py index d7ac68d..0a5e1b0 100644 --- a/src/app/_internal/workflow.py +++ b/src/app/_internal/workflow.py @@ -1,14 +1,16 @@ import asyncio -import requests -from typing import Any, Callable, Union, get_type_hints, get_origin, get_args +from dataclasses import is_dataclass +from typing import Any, Callable, Union, get_args, get_origin, get_type_hints -from fastapi import Request, status, HTTPException, types -from pydantic import ValidationError +import requests +from fastapi import HTTPException, Request, status, types +from pydantic import BaseModel, ValidationError from app.workflow_context import WorkflowContext -from .internal_client import InternalEndureClient from ..types import EndureException +from .internal_client import InternalEndureClient +from .utils import serialize_data class Workflow: @@ -26,6 +28,7 @@ class Workflow: retention_period (int, optional): Number of days to retain workflow execution history. input (Any): Structured description of the input type (derived from type hints). output (Any): Structured description of the return type (derived from type hints). + input_type (type): The actual input type from type hints for automatic conversion. Example: @workflow @@ -55,7 +58,39 @@ def __init__(self, func: Callable, retention_period: int = None): self.func = func self.name = func.__name__ self.retention_period = retention_period - self.input, self.output = self._get_io(func) + self.input, self.output, self.input_type = self._get_io(func) + + def _convert_input(self, raw_input: Any) -> Any: + """ + Convert raw input (typically a dict from JSON) to the expected input type. + + Args: + raw_input: The raw input value from the request + + Returns: + The converted input value + """ + # If no type hint or Any, pass through as-is + if self.input_type is Any or self.input_type is None: + return raw_input + + # If input is already the correct type, pass through + if not isinstance(raw_input, dict): + return raw_input + + # Check if expected type is a Pydantic model or dataclass + try: + if isinstance(self.input_type, type) and ( + issubclass(self.input_type, BaseModel) + or is_dataclass(self.input_type) + ): + return self.input_type(**raw_input) + except (TypeError, ValidationError, ValueError) as e: + raise ValueError( + f"Failed to convert input to {self.input_type.__name__}: {e}" + ) + # For all other cases, pass through as-is + return raw_input def _get_type_description(self, typ): """ @@ -141,10 +176,10 @@ def _get_io(self, func): func (Callable): The workflow function to analyze. Returns: - tuple: A tuple containing (input_type, output_type), where each is either: - - A string representing a simple type (e.g., "int", "str") - - A string representing a complex type (e.g., "list[int]", "dict[str, MyClass]") - - A dict representing the structure of a user-defined class + tuple: A tuple containing (input_type_description, output_type_description, input_type), where: + - input_type_description: String representation of input type for discovery + - output_type_description: String representation of output type for discovery + - input_type: The actual input type for automatic conversion If type hints aren't provided, "Any" is used as a fallback. Note: @@ -154,9 +189,11 @@ def _get_io(self, func): input_type = hints.get("input", Any) output_type = hints.get("return", Any) - return self._get_type_description( - input_type - ), self._get_type_description(output_type) + return ( + self._get_type_description(input_type), + self._get_type_description(output_type), + input_type, + ) def get_handler_route(self): """ @@ -209,10 +246,17 @@ async def handler(request: Request): InternalEndureClient.mark_execution_as_running( body["execution_id"] ) - output = self.func(ctx, body["input"]) + + converted_input = self._convert_input(body["input"]) + + output = self.func(ctx, converted_input) if asyncio.iscoroutine(output): output = await output - return {"output": output} + + # Recursively serialize all Pydantic models and dataclasses + serialized_output = serialize_data(output) + + return {"output": serialized_output} except ValueError as ve: if isinstance(ve, ValidationError): raise EndureException( diff --git a/src/app/app.py b/src/app/app.py index f7cc349..109af90 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -3,9 +3,7 @@ from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from app._internal import ( - ServiceRegistry, -) +from app._internal import ServiceRegistry from app.types import EndureException, ErrorResponse @@ -81,23 +79,21 @@ def _discover(self): dict: A dictionary containing all registered services and their workflows. """ services = self.serviceRegistry.get_services() - return { - "services": [ - { - "name": service_name, - "workflows": [ - { - "name": workflow.name, - "input": workflow.input, - "output": workflow.output, - "idem_retention": workflow.retention_period, - } - for workflow in workflows - ], - } - for service_name, workflows in services.items() - ] - } + return [ + { + "service_name": service_name, + "workflows": [ + { + "name": workflow.name, + "input": workflow.input, + "output": workflow.output, + "idem_retention": workflow.retention_period, + } + for workflow in workflows + ], + } + for service_name, workflows in services.items() + ] async def raise_exception( self, request: Request, exc: EndureException, _=None diff --git a/src/app/service.py b/src/app/service.py index a008d98..4b4ccf9 100644 --- a/src/app/service.py +++ b/src/app/service.py @@ -1,8 +1,4 @@ -from app._internal import ( - ServiceRegistry, - Workflow, - validate_retention_period, -) +from app._internal import ServiceRegistry, Workflow, validate_retention_period from app.workflow_context import WorkflowContext diff --git a/src/app/types.py b/src/app/types.py index f901a12..2c28e7f 100644 --- a/src/app/types.py +++ b/src/app/types.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import Optional @@ -23,10 +23,14 @@ def log_to_dict(log: "Log") -> dict: "input": log.input, "output": log.output, "max_retries": log.max_retries, - "retry_mechanism": ( + "retry_method": ( log.retry_mechanism.value if log.retry_mechanism else None ), - "timestamp": log.timestamp.isoformat() if log.timestamp else None, + "timestamp": ( + log.timestamp.replace(tzinfo=timezone.utc).isoformat() + if log.timestamp + else None + ), } @@ -37,7 +41,9 @@ class Log: output: Optional[dict] = None max_retries: Optional[int] = None retry_mechanism: Optional[RetryMechanism] = None - timestamp: datetime = field(default_factory=datetime.now) + timestamp: datetime = field( + default_factory=lambda: datetime.now(timezone.utc) + ) def to_dict(self): """Convert Log to a dictionary for JSON serialization""" diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 98ca83b..5bfa3c8 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -1,19 +1,15 @@ -import time -import logging import asyncio -from fastapi import status +import logging +import time + import requests -from app._internal.internal_client import ( - InternalEndureClient, -) -from app.types import ( - Log, - LogStatus, - RetryMechanism, - EndureException, -) +from fastapi import status from pydantic import ValidationError +from app._internal.internal_client import InternalEndureClient +from app._internal.utils import serialize_data +from app.types import EndureException, Log, LogStatus, RetryMechanism + class WorkflowContext: """ @@ -65,6 +61,7 @@ async def execute_action( input_data, max_retries: int, retry_mechanism: RetryMechanism, + action_name: str = None, ) -> any: """ Execute an action with durability guarantees and automatic retry capabilities. @@ -93,6 +90,8 @@ async def execute_action( - LINEAR_BACKOFF - EXPONENTIAL_BACKOFF etc. + action_name (str, optional): Custom name for the action in logs. If not provided, + uses action.__name__. Returns: any: Either: @@ -131,85 +130,201 @@ def process_payment(input_data: dict) -> dict: """ log = Log( status=LogStatus.STARTED, - input=input_data, + input=serialize_data(input_data), retry_mechanism=retry_mechanism, max_retries=max_retries, ) + name = action_name if action_name is not None else action.__name__ + logging.info("Sending log for action: {}".format(log)) engine_response = InternalEndureClient.send_log( - self.execution_id, log, action.__name__ + self.execution_id, log, name ) + logging.info("Engine response: {}".format(engine_response)) if not engine_response: + logging.error( + "CRITICAL ERROR: Engine response is None or empty. " + "This indicates a communication failure with the durable engine." + ) raise ValueError( "Base URL is not set in environment variables or missing required parameters (log or action_name)." ) + + logging.info( + f"Detailed engine response - Status: {engine_response.get('status_code')}, " + f"Payload: {engine_response.get('payload')}, " + f"Headers: {engine_response.get('headers', {})}" + ) + status_code = engine_response["status_code"] + logging.info(f"Processing status code: {status_code}") + match status_code: case status.HTTP_201_CREATED | status.HTTP_200_OK: - attempt = 0 - while attempt <= max_retries: + logging.info( + f"Status {status_code} - Proceeding with action execution" + ) + while True: try: try: + logging.info( + "Executing action: {}".format(action.__name__) + ) if asyncio.iscoroutinefunction(action): result = await action(input_data) else: result = action(input_data) + logging.info("Action result: {}".format(result)) except (ValueError, ValidationError) as e: - InternalEndureClient.send_log( + logging.error( + f"VALIDATION ERROR in action {action.__name__}: {type(e).__name__}: {e}" + ) + engine_response = InternalEndureClient.send_log( self.execution_id, Log( status=LogStatus.FAILED, - output={"error": str(e)}, + output=serialize_data({"error": str(e)}), ), - action.__name__, + name, ) logging.info( + "Engine response after validation error: {}".format( + engine_response + ) + ) + logging.error( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" ) raise log = Log( status=LogStatus.COMPLETED, - output=result, + output=serialize_data(result), + ) + logging.info( + "Sending log for completed action: {}".format(log) ) - InternalEndureClient.send_log( + engine_response = InternalEndureClient.send_log( self.execution_id, log, - action.__name__, + name, + ) + logging.info( + "Engine response after completion: {}".format( + engine_response + ) ) + logging.info("Returning result: {}".format(result)) return result except ( ValueError, ValidationError, requests.exceptions.RequestException, ) as e: - logging.debug( - f"DEBUG: Caught exception of type {type(e)}: {e}" + logging.error( + f"CRITICAL ERROR: Caught exception of type {type(e)}: {e}" + ) + logging.error( + f"Exception details - Args: {e.args}, Traceback: {type(e).__name__}" ) raise except Exception as e: - if attempt == max_retries: - raise EndureException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - output={ - "error": str( - "Action failed after reaching max retries" - ) - }, - ) + logging.error( + f"UNEXPECTED ERROR in action {action.__name__}: {type(e).__name__}: {e}" + ) + logging.error( + f"Error details - Args: {e.args}, Traceback: {type(e).__name__}" + ) log = Log( status=LogStatus.FAILED, - output={"error": str(e)}, + output=serialize_data({"error": str(e)}), + ) + logging.info( + "Sending log for failed action: {}".format(log) ) engine_response = InternalEndureClient.send_log( - self.execution_id, log, action.__name__ + self.execution_id, log, name + ) + logging.info( + "Engine response after failure: {}".format( + engine_response + ) + ) + engine_status = engine_response.get("status_code") + logging.info( + f"Engine status after failure: {engine_status}" ) - attempt += 1 + + if engine_status in [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, + ]: + logging.error( + f"ENGINE ERROR: Received {engine_status} from engine. " + f"Original error: {type(e).__name__}: {e}" + ) + if engine_status == status.HTTP_409_CONFLICT: + logging.error( + "Execution Paused or Terminated , no retries will be attempted." + ) + raise EndureException( + status_code=engine_status, + output=serialize_data( + { + "error": str( + "Execution Paused or Terminated" + ) + } + ), + ) + raise EndureException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + output=serialize_data( + { + "error": str( + "Action failed after reaching max retries" + ) + } + ), + ) retry_at_unix = engine_response.get("payload", {}).get( "retry_at" ) + logging.info("Retry at unix: {}".format(retry_at_unix)) if retry_at_unix: sleep_seconds = retry_at_unix - time.time() if sleep_seconds > 0: + logging.info( + "Sleeping for {} seconds".format( + sleep_seconds + ) + ) time.sleep(sleep_seconds) + else: + logging.warning( + f"Retry time {retry_at_unix} is in the past. " + f"Current time: {time.time()}" + ) + else: + logging.error( + f"CRITICAL ERROR: No retry_at time provided by " + f"engine for retryable status {engine_status}. " + f"Engine response: {engine_response}" + ) + raise RuntimeError( + f"Engine did not provide retry_at time for retryable status {engine_status}. " + f"Response: {engine_response}" + ) case status.HTTP_208_ALREADY_REPORTED: + logging.info( + "Returning cached result: {}".format(engine_response) + ) output = engine_response.get("payload", {}).get("output") return output if output else {} + case _: + logging.error( + f"UNEXPECTED STATUS CODE: {status_code}. " + f"Full response: {engine_response}" + ) + raise RuntimeError( + f"Unexpected status code {status_code} from engine: {engine_response}" + ) diff --git a/tests/conftest.py b/tests/conftest.py index c7200e6..ebab7c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,14 +1,16 @@ import os -import pytest from unittest.mock import AsyncMock, Mock, patch + +import pytest + from app import ( DurableApp, - Service, - WorkflowContext, Log, LogStatus, - RetryMechanism, Response, + RetryMechanism, + Service, + WorkflowContext, ) from app._internal import InternalEndureClient, ServiceRegistry diff --git a/tests/internal/test_internal_client.py b/tests/internal/test_internal_client.py index b7aa0de..ed20df2 100644 --- a/tests/internal/test_internal_client.py +++ b/tests/internal/test_internal_client.py @@ -1,8 +1,8 @@ import os -import pytest from unittest.mock import patch -from fastapi import status +import pytest +from fastapi import status from app._internal.internal_client import InternalEndureClient from app.types import Log, LogStatus, Response diff --git a/tests/internal/test_service_registry.py b/tests/internal/test_service_registry.py index 2ae179b..03b23f5 100644 --- a/tests/internal/test_service_registry.py +++ b/tests/internal/test_service_registry.py @@ -1,9 +1,11 @@ -import pytest -from app._internal.workflow import Workflow, WorkflowContext -from app._internal.service_registry import ServiceRegistry from typing import Dict + +import pytest from fastapi import APIRouter +from app._internal.service_registry import ServiceRegistry +from app._internal.workflow import Workflow, WorkflowContext + class TestServiceRegistry: @pytest.fixture(autouse=True) diff --git a/tests/internal/test_workflow.py b/tests/internal/test_workflow.py index 74f3c5e..ee6a2af 100644 --- a/tests/internal/test_workflow.py +++ b/tests/internal/test_workflow.py @@ -1,12 +1,14 @@ -import pytest -from unittest.mock import AsyncMock, patch from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi import HTTPException +from pydantic import BaseModel +from starlette.responses import Response + from app._internal.workflow import Workflow from app.types import EndureException from app.workflow_context import WorkflowContext -from starlette.responses import Response -from fastapi import HTTPException -from pydantic import BaseModel class InputModel: diff --git a/tests/test_app.py b/tests/test_app.py index 6ea29d0..167e040 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,9 +1,10 @@ +from unittest.mock import patch + import pytest from fastapi import FastAPI from fastapi.testclient import TestClient -from unittest.mock import patch -from app import DurableApp, Service, WorkflowContext, EndureException +from app import DurableApp, EndureException, Service, WorkflowContext from app._internal import ServiceRegistry @@ -48,11 +49,11 @@ def test_workflow(input: dict, ctx: WorkflowContext): assert response.status_code == 200 data = response.json() - assert "services" in data - assert len(data["services"]) == 1 + assert isinstance(data, list) + assert len(data) == 1 - service = data["services"][0] - assert service["name"] == "test_service" + service = data[0] + assert service["service_name"] == "test_service" assert len(service["workflows"]) == 1 workflow = service["workflows"][0] @@ -123,17 +124,17 @@ def workflow3(input: dict, ctx: WorkflowContext): assert response.status_code == 200 data = response.json() - assert len(data["services"]) == 2 + assert len(data) == 2 service1_data = next( - s for s in data["services"] if s["name"] == "service1" + s for s in data if s["service_name"] == "service1" ) assert len(service1_data["workflows"]) == 2 workflow_names = {w["name"] for w in service1_data["workflows"]} assert workflow_names == {"workflow1", "workflow2"} service2_data = next( - s for s in data["services"] if s["name"] == "service2" + s for s in data if s["service_name"] == "service2" ) assert len(service2_data["workflows"]) == 1 assert service2_data["workflows"][0]["name"] == "workflow3" diff --git a/tests/test_service.py b/tests/test_service.py index 2bc5faf..fd69147 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1,5 +1,6 @@ import pytest -from app import WorkflowContext, Service + +from app import Service, WorkflowContext from app._internal import ServiceRegistry, Workflow diff --git a/tests/test_workflow_context.py b/tests/test_workflow_context.py index 7e3bf16..992cf22 100644 --- a/tests/test_workflow_context.py +++ b/tests/test_workflow_context.py @@ -1,11 +1,13 @@ -from app.types import LogStatus, RetryMechanism, Response -import pytest -from unittest.mock import patch -from fastapi import status, HTTPException +import asyncio import time -from pydantic import ValidationError, BaseModel +from unittest.mock import patch + +import pytest import requests -import asyncio +from fastapi import HTTPException, status +from pydantic import BaseModel, ValidationError + +from app.types import LogStatus, Response, RetryMechanism @pytest.mark.asyncio @@ -222,7 +224,7 @@ def failing_action(input_data): assert mock_sleep.call_count == 3 sleep_duration = mock_sleep.call_args[0][0] assert sleep_duration > 0 and sleep_duration <= 5 - assert mock_send_log.call_count == 4 + assert mock_send_log.call_count == 5 @pytest.mark.asyncio