From 84d10b190ff56f90307afba82236d688bf190811 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Fri, 20 Jun 2025 01:18:15 +0300 Subject: [PATCH 01/22] intial demo --- example/app/__init__.py | 1 - example/app/main.py | 7 --- example/demo/__init__.py | 0 example/demo/actions.py | 72 +++++++++++++++++++++ example/demo/models.py | 84 +++++++++++++++++++++++++ example/main.py | 131 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 287 insertions(+), 8 deletions(-) delete mode 100644 example/app/__init__.py delete mode 100644 example/app/main.py create mode 100644 example/demo/__init__.py create mode 100644 example/demo/actions.py create mode 100644 example/demo/models.py create mode 100644 example/main.py diff --git a/example/app/__init__.py b/example/app/__init__.py deleted file mode 100644 index 339827c..0000000 --- a/example/app/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# This file is intentionally left empty. diff --git a/example/app/main.py b/example/app/main.py deleted file mode 100644 index ba72bb6..0000000 --- a/example/app/main.py +++ /dev/null @@ -1,7 +0,0 @@ -# flake8: noqa -# from fastapi import FastAPI - -# # Define a route for the root URL ("/") -# @app.get("/") -# def hello_world(): -# return {"message": "Hello, World! This is a Python app running in Docker with Uvicorn."} diff --git a/example/demo/__init__.py b/example/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/demo/actions.py b/example/demo/actions.py new file mode 100644 index 0000000..da66051 --- /dev/null +++ b/example/demo/actions.py @@ -0,0 +1,72 @@ +import random +import time +import asyncio +from .models import * + + +def validate_payment(input_data: PaymentInput) -> PaymentResult: + if random.random() < 0.2: + raise Exception("Payment validation failed") + + return PaymentResult( + payment_id=f"pay_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="validated" + ) + + +def reserve_inventory(input_data: InventoryInput) -> InventoryResult: + if random.random() < 0.15: + raise Exception(f"Insufficient inventory for {input_data.item_id}") + + return InventoryResult( + reservation_id=f"res_{random.randint(1000, 9999)}", + item_id=input_data.item_id, + quantity=input_data.quantity, + status="reserved" + ) + + +async def send_notification(input_data: NotificationInput) -> NotificationResult: + await asyncio.sleep(0.1) + + if random.random() < 0.1: + raise Exception(f"Failed to send {input_data.type}") + + return NotificationResult( + notification_id=f"notif_{random.randint(1000, 9999)}", + recipient=input_data.recipient, + status="sent" + ) + + +def create_user(input_data: UserInput) -> UserResult: + if random.random() < 0.1: + raise Exception("User creation failed") + + return UserResult( + user_id=f"user_{random.randint(1000, 9999)}", + email=input_data.email, + status="active" + ) + + +def process_refund(input_data: RefundInput) -> RefundResult: + time.sleep(0.1) + + if random.random() < 0.05: + raise Exception("Refund processing failed") + + return RefundResult( + refund_id=f"ref_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="processed" + ) + + +def check_order_status(order_id: str) -> dict: + return { + "order_id": order_id, + "status": random.choice(["pending", "processing", "shipped", "delivered"]), + "tracking_number": f"TRK{random.randint(100000, 999999)}" + } diff --git a/example/demo/models.py b/example/demo/models.py new file mode 100644 index 0000000..2e9c914 --- /dev/null +++ b/example/demo/models.py @@ -0,0 +1,84 @@ +from pydantic import BaseModel, Field +from typing import List, Optional + + +class OrderItem(BaseModel): + id: str + quantity: int = Field(gt=0) + price: float = Field(gt=0) + + +class OrderInput(BaseModel): + order_id: str + customer_email: str + items: List[OrderItem] + total_amount: float = Field(gt=0) + + +class PaymentInput(BaseModel): + amount: float = Field(gt=0) + payment_method: str = "credit_card" + + +class PaymentResult(BaseModel): + payment_id: str + amount: float + status: str + + +class InventoryInput(BaseModel): + item_id: str + quantity: int = Field(gt=0) + + +class InventoryResult(BaseModel): + reservation_id: str + item_id: str + quantity: int + status: str + + +class NotificationInput(BaseModel): + recipient: str + message: str + type: str = "email" + + +class NotificationResult(BaseModel): + notification_id: str + recipient: str + status: str + + +class UserInput(BaseModel): + email: str + username: str = Field(min_length=3) + password: str = Field(min_length=6) + + +class UserResult(BaseModel): + user_id: str + email: str + status: str + + +class OrderStatusInput(BaseModel): + order_id: str + + +class OrderStatusResult(BaseModel): + order_id: str + status: str + tracking_number: Optional[str] = None + + +class RefundInput(BaseModel): + order_id: str + amount: float = Field(gt=0) + reason: str = "Customer request" + + +class RefundResult(BaseModel): + refund_id: str + amount: float + status: str diff --git a/example/main.py b/example/main.py new file mode 100644 index 0000000..d9642a6 --- /dev/null +++ b/example/main.py @@ -0,0 +1,131 @@ +import sys +import os +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) + +from app import DurableApp, Service, WorkflowContext +from app.types import RetryMechanism +from fastapi import FastAPI +from demo.models import * +from demo.actions import * +import uvicorn + + +def main(): + app = FastAPI(title="Durable Execution Demo", version="1.0.0") + + order_service = Service("orders") + user_service = Service("users") + payment_service = Service("payments") + + @order_service.workflow() + async def process_order(input: OrderInput, ctx: WorkflowContext) -> dict: + + payment_result = await ctx.execute_action( + action=validate_payment, + input_data=PaymentInput( + amount=input.total_amount, + payment_method="credit_card" + ), + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + reservations = [] + for item in input.items: + reservation = await ctx.execute_action( + action=reserve_inventory, + input_data=InventoryInput( + item_id=item.id, + quantity=item.quantity + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR + ) + reservations.append(reservation) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient=input.customer_email, + message=f"Order {input.order_id} confirmed", + type="email" + ), + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return { + "order_id": input.order_id, + "status": "completed", + "payment": payment_result.dict(), + "reservations": [r.dict() for r in reservations], + "notification": notification_result.dict() + } + + + @order_service.workflow() + def get_order_status(input: OrderStatusInput, ctx: WorkflowContext) -> OrderStatusResult: + result = ctx.execute_action( + action=check_order_status, + input_data=input.order_id, + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return OrderStatusResult( + order_id=result["order_id"], + status=result["status"], + tracking_number=result.get("tracking_number") + ) + + @user_service.workflow() + async def register_user(input: UserInput, ctx: WorkflowContext) -> dict: + user_result = await ctx.execute_action( + action=create_user, + input_data=input, + max_retries=2, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient=input.email, + message=f"Welcome {input.username}!", + type="email" + ), + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return { + "success": True, + "user": user_result.dict(), + "notification": notification_result.dict() + } + + @payment_service.workflow() + async def process_refund(input: RefundInput, ctx: WorkflowContext) -> dict: + refund_result = await ctx.execute_action( + action=process_refund, + input_data=input, + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + return { + "order_id": input.order_id, + "refund": refund_result.dict(), + "status": "completed" + } + + durable_app = DurableApp(app) + + return app + + +if __name__ == "__main__": + app = main() + print("🚀 Demo Server Starting...") + print("Services: orders, users, payments") + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file From bf8919b96f31c9d25615d5755269b5caffe571a9 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Fri, 20 Jun 2025 01:18:15 +0300 Subject: [PATCH 02/22] intial demo --- example/app/__init__.py | 1 - example/app/main.py | 7 -- example/demo/__init__.py | 0 example/demo/actions.py | 80 ++++++++++++++++++++++ example/demo/models.py | 84 +++++++++++++++++++++++ example/main.py | 140 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 304 insertions(+), 8 deletions(-) delete mode 100644 example/app/__init__.py delete mode 100644 example/app/main.py create mode 100644 example/demo/__init__.py create mode 100644 example/demo/actions.py create mode 100644 example/demo/models.py create mode 100644 example/main.py diff --git a/example/app/__init__.py b/example/app/__init__.py deleted file mode 100644 index 339827c..0000000 --- a/example/app/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# This file is intentionally left empty. diff --git a/example/app/main.py b/example/app/main.py deleted file mode 100644 index ba72bb6..0000000 --- a/example/app/main.py +++ /dev/null @@ -1,7 +0,0 @@ -# flake8: noqa -# from fastapi import FastAPI - -# # Define a route for the root URL ("/") -# @app.get("/") -# def hello_world(): -# return {"message": "Hello, World! This is a Python app running in Docker with Uvicorn."} diff --git a/example/demo/__init__.py b/example/demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/demo/actions.py b/example/demo/actions.py new file mode 100644 index 0000000..89bc381 --- /dev/null +++ b/example/demo/actions.py @@ -0,0 +1,80 @@ +import random +import time +import asyncio +from .models import * + + +def validate_payment(input_data: PaymentInput) -> PaymentResult: + time.sleep(2.0) + + if random.random() < 0.2: + raise Exception("Payment validation failed") + + return PaymentResult( + payment_id=f"pay_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="validated" + ) + + +def reserve_inventory(input_data: InventoryInput) -> InventoryResult: + time.sleep(1.5) + + if random.random() < 0.15: + raise Exception(f"Insufficient inventory for {input_data.item_id}") + + return InventoryResult( + reservation_id=f"res_{random.randint(1000, 9999)}", + item_id=input_data.item_id, + quantity=input_data.quantity, + status="reserved" + ) + + +async def send_notification(input_data: NotificationInput) -> NotificationResult: + await asyncio.sleep(1.0) + + if random.random() < 0.1: + raise Exception(f"Failed to send {input_data.type}") + + return NotificationResult( + notification_id=f"notif_{random.randint(1000, 9999)}", + recipient=input_data.recipient, + status="sent" + ) + + +def create_user(input_data: UserInput) -> UserResult: + time.sleep(1.5) + + if random.random() < 0.1: + raise Exception("User creation failed") + + return UserResult( + user_id=f"user_{random.randint(1000, 9999)}", + email=input_data.email, + status="active" + ) + + +def process_refund(input_data: RefundInput) -> RefundResult: + time.sleep(3.0) + + if random.random() < 0.05: + raise Exception("Refund processing failed") + + return RefundResult( + refund_id=f"ref_{random.randint(1000, 9999)}", + amount=input_data.amount, + status="processed" + ) + + +def check_order_status(order_id: str) -> dict: + time.sleep(0.5) + + return { + "order_id": order_id, + "status": random.choice(["pending", "processing", "shipped", "delivered"]), + "tracking_number": f"TRK{random.randint(100000, 999999)}" + } diff --git a/example/demo/models.py b/example/demo/models.py new file mode 100644 index 0000000..2e9c914 --- /dev/null +++ b/example/demo/models.py @@ -0,0 +1,84 @@ +from pydantic import BaseModel, Field +from typing import List, Optional + + +class OrderItem(BaseModel): + id: str + quantity: int = Field(gt=0) + price: float = Field(gt=0) + + +class OrderInput(BaseModel): + order_id: str + customer_email: str + items: List[OrderItem] + total_amount: float = Field(gt=0) + + +class PaymentInput(BaseModel): + amount: float = Field(gt=0) + payment_method: str = "credit_card" + + +class PaymentResult(BaseModel): + payment_id: str + amount: float + status: str + + +class InventoryInput(BaseModel): + item_id: str + quantity: int = Field(gt=0) + + +class InventoryResult(BaseModel): + reservation_id: str + item_id: str + quantity: int + status: str + + +class NotificationInput(BaseModel): + recipient: str + message: str + type: str = "email" + + +class NotificationResult(BaseModel): + notification_id: str + recipient: str + status: str + + +class UserInput(BaseModel): + email: str + username: str = Field(min_length=3) + password: str = Field(min_length=6) + + +class UserResult(BaseModel): + user_id: str + email: str + status: str + + +class OrderStatusInput(BaseModel): + order_id: str + + +class OrderStatusResult(BaseModel): + order_id: str + status: str + tracking_number: Optional[str] = None + + +class RefundInput(BaseModel): + order_id: str + amount: float = Field(gt=0) + reason: str = "Customer request" + + +class RefundResult(BaseModel): + refund_id: str + amount: float + status: str diff --git a/example/main.py b/example/main.py new file mode 100644 index 0000000..a8ca6ea --- /dev/null +++ b/example/main.py @@ -0,0 +1,140 @@ +import sys +import os +sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) + +from app import DurableApp, Service, WorkflowContext +from app.types import RetryMechanism +from fastapi import FastAPI +from demo.models import * +from demo.actions import * +import uvicorn +import asyncio + + +def main(): + app = FastAPI(title="Durable Execution Demo", version="1.0.0") + + order_service = Service("orders") + user_service = Service("users") + payment_service = Service("payments") + + @order_service.workflow() + async def process_order(input: OrderInput, ctx: WorkflowContext) -> dict: + await asyncio.sleep(0.5) + + payment_result = await ctx.execute_action( + action=validate_payment, + input_data=PaymentInput( + amount=input.total_amount, + payment_method="credit_card" + ), + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + await asyncio.sleep(0.3) + + reservations = [] + for idx, item in enumerate(input.items): + reservation = await ctx.execute_action( + action=reserve_inventory, + input_data=InventoryInput( + item_id=item.id, + quantity=item.quantity + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR, + action_name=f"reserve_inventory_{idx}" + ) + reservations.append(reservation) + + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient=input.customer_email, + message=f"Order {input.order_id} confirmed", + type="email" + ), + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return { + "order_id": input.order_id, + "status": "completed", + "payment": payment_result.dict(), + "reservations": [r.dict() for r in reservations], + "notification": notification_result.dict() + } + + + @order_service.workflow() + def get_order_status(input: OrderStatusInput, ctx: WorkflowContext) -> OrderStatusResult: + result = ctx.execute_action( + action=check_order_status, + input_data=input.order_id, + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return OrderStatusResult( + order_id=result["order_id"], + status=result["status"], + tracking_number=result.get("tracking_number") + ) + + @user_service.workflow() + async def register_user(input: UserInput, ctx: WorkflowContext) -> dict: + await asyncio.sleep(0.5) + + user_result = await ctx.execute_action( + action=create_user, + input_data=input, + max_retries=2, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient=input.email, + message=f"Welcome {input.username}!", + type="email" + ), + max_retries=1, + retry_mechanism=RetryMechanism.CONSTANT + ) + + return { + "success": True, + "user": user_result.dict(), + "notification": notification_result.dict() + } + + @payment_service.workflow() + async def process_refund(input: RefundInput, ctx: WorkflowContext) -> dict: + refund_result = await ctx.execute_action( + action=process_refund, + input_data=input, + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL + ) + + return { + "order_id": input.order_id, + "refund": refund_result.dict(), + "status": "completed" + } + + durable_app = DurableApp(app) + + return app + + +if __name__ == "__main__": + app = main() + print("🚀 Demo Server Starting...") + print("Services: orders, users, payments") + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file From 749bb110deb37fd2c56ab827df3563fc805248b5 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Fri, 20 Jun 2025 16:42:37 +0300 Subject: [PATCH 03/22] add action name + fix discover endpoint --- src/app/app.py | 6 ++---- src/app/workflow_context.py | 12 ++++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/app/app.py b/src/app/app.py index f7cc349..a8d5bfd 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -81,10 +81,9 @@ def _discover(self): dict: A dictionary containing all registered services and their workflows. """ services = self.serviceRegistry.get_services() - return { - "services": [ + return [ { - "name": service_name, + "service_name": service_name, "workflows": [ { "name": workflow.name, @@ -97,7 +96,6 @@ def _discover(self): } for service_name, workflows in services.items() ] - } async def raise_exception( self, request: Request, exc: EndureException, _=None diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 98ca83b..c21a540 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -65,6 +65,7 @@ async def execute_action( input_data, max_retries: int, retry_mechanism: RetryMechanism, + action_name: str = None, ) -> any: """ Execute an action with durability guarantees and automatic retry capabilities. @@ -93,6 +94,8 @@ async def execute_action( - LINEAR_BACKOFF - EXPONENTIAL_BACKOFF etc. + action_name (str, optional): Custom name for the action in logs. If not provided, + uses action.__name__. Returns: any: Either: @@ -135,8 +138,9 @@ def process_payment(input_data: dict) -> dict: retry_mechanism=retry_mechanism, max_retries=max_retries, ) + name = action_name if action_name is not None else action.__name__ engine_response = InternalEndureClient.send_log( - self.execution_id, log, action.__name__ + self.execution_id, log, name ) if not engine_response: raise ValueError( @@ -160,7 +164,7 @@ def process_payment(input_data: dict) -> dict: status=LogStatus.FAILED, output={"error": str(e)}, ), - action.__name__, + name, ) logging.info( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" @@ -173,7 +177,7 @@ def process_payment(input_data: dict) -> dict: InternalEndureClient.send_log( self.execution_id, log, - action.__name__, + name, ) return result except ( @@ -200,7 +204,7 @@ def process_payment(input_data: dict) -> dict: output={"error": str(e)}, ) engine_response = InternalEndureClient.send_log( - self.execution_id, log, action.__name__ + self.execution_id, log, name ) attempt += 1 retry_at_unix = engine_response.get("payload", {}).get( From e8df594c52edc5dd43e44659c82936d484965ebb Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Fri, 20 Jun 2025 18:50:55 +0300 Subject: [PATCH 04/22] fix input output serilization --- example/main.py | 20 +++++++-------- src/app/_internal/utils.py | 46 +++++++++++++++++++++++++++++++++++ src/app/_internal/workflow.py | 10 ++++---- src/app/workflow_context.py | 9 ++++--- 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/example/main.py b/example/main.py index a8ca6ea..9777281 100644 --- a/example/main.py +++ b/example/main.py @@ -19,7 +19,7 @@ def main(): payment_service = Service("payments") @order_service.workflow() - async def process_order(input: OrderInput, ctx: WorkflowContext) -> dict: + async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: await asyncio.sleep(0.5) payment_result = await ctx.execute_action( @@ -64,14 +64,14 @@ async def process_order(input: OrderInput, ctx: WorkflowContext) -> dict: return { "order_id": input.order_id, "status": "completed", - "payment": payment_result.dict(), - "reservations": [r.dict() for r in reservations], - "notification": notification_result.dict() + "payment": payment_result, + "reservations": reservations, + "notification": notification_result } @order_service.workflow() - def get_order_status(input: OrderStatusInput, ctx: WorkflowContext) -> OrderStatusResult: + def get_order_status(ctx: WorkflowContext, input: OrderStatusInput) -> OrderStatusResult: result = ctx.execute_action( action=check_order_status, input_data=input.order_id, @@ -86,7 +86,7 @@ def get_order_status(input: OrderStatusInput, ctx: WorkflowContext) -> OrderStat ) @user_service.workflow() - async def register_user(input: UserInput, ctx: WorkflowContext) -> dict: + async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: await asyncio.sleep(0.5) user_result = await ctx.execute_action( @@ -109,12 +109,12 @@ async def register_user(input: UserInput, ctx: WorkflowContext) -> dict: return { "success": True, - "user": user_result.dict(), - "notification": notification_result.dict() + "user": user_result, + "notification": notification_result } @payment_service.workflow() - async def process_refund(input: RefundInput, ctx: WorkflowContext) -> dict: + async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: refund_result = await ctx.execute_action( action=process_refund, input_data=input, @@ -124,7 +124,7 @@ async def process_refund(input: RefundInput, ctx: WorkflowContext) -> dict: return { "order_id": input.order_id, - "refund": refund_result.dict(), + "refund": refund_result, "status": "completed" } diff --git a/src/app/_internal/utils.py b/src/app/_internal/utils.py index 54c77fc..45548be 100644 --- a/src/app/_internal/utils.py +++ b/src/app/_internal/utils.py @@ -1,3 +1,8 @@ +from typing import Any +from dataclasses import is_dataclass, asdict +from pydantic import BaseModel + + def validate_retention_period(retention: int) -> None: """ Validate that the retention period is a non-negative integer. @@ -14,3 +19,44 @@ def validate_retention_period(retention: int) -> None: raise ValueError("Retention period must be a non-negative integer.") if retention > 30: raise ValueError("Retention period cannot exceed 30 days.") + + +def serialize_data(data: Any) -> Any: + """ + Recursively serialize Pydantic models and dataclasses to JSON-compatible dictionaries. + + Args: + data: The data to serialize (can be any type) + + Returns: + The serialized data with all Pydantic models/dataclasses converted to dicts + + Examples: + >>> serialize_data(UserModel(name="John", age=30)) + {"name": "John", "age": 30} + + >>> serialize_data({"user": UserModel(name="John"), "count": 5}) + {"user": {"name": "John"}, "count": 5} + + >>> serialize_data([UserModel(name="John"), UserModel(name="Jane")]) + [{"name": "John"}, {"name": "Jane"}] + """ + if isinstance(data, BaseModel): + # Handle both Pydantic v1 (.dict()) and v2 (.model_dump()) + if hasattr(data, 'model_dump'): + return data.model_dump() + elif hasattr(data, 'dict'): + return data.dict() + else: + # Fallback: try to convert to dict manually + return {field: getattr(data, field) for field in data.__fields__} + elif is_dataclass(data): + return asdict(data) + elif isinstance(data, dict): + return {key: serialize_data(value) for key, value in data.items()} + elif isinstance(data, list): + return [serialize_data(item) for item in data] + elif isinstance(data, tuple): + return tuple(serialize_data(item) for item in data) + else: + return data diff --git a/src/app/_internal/workflow.py b/src/app/_internal/workflow.py index ef229be..3f4126f 100644 --- a/src/app/_internal/workflow.py +++ b/src/app/_internal/workflow.py @@ -9,6 +9,7 @@ from app.workflow_context import WorkflowContext from .internal_client import InternalEndureClient +from .utils import serialize_data from ..types import EndureException @@ -59,6 +60,7 @@ def __init__(self, func: Callable, retention_period: int = None): self.retention_period = retention_period self.input, self.output, self.input_type = self._get_io(func) + def _convert_input(self, raw_input: Any) -> Any: """ Convert raw input (typically a dict from JSON) to the expected input type. @@ -248,12 +250,10 @@ async def handler(request: Request): if asyncio.iscoroutine(output): output = await output - if isinstance(output, BaseModel): - output = output.model_dump() - elif is_dataclass(output): - output = asdict(output) + # Recursively serialize all Pydantic models and dataclasses + serialized_output = serialize_data(output) - return {"output": output} + return {"output": serialized_output} except ValueError as ve: if isinstance(ve, ValidationError): raise EndureException( diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index c21a540..4234ec3 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -6,6 +6,7 @@ from app._internal.internal_client import ( InternalEndureClient, ) +from app._internal.utils import serialize_data from app.types import ( Log, LogStatus, @@ -134,7 +135,7 @@ def process_payment(input_data: dict) -> dict: """ log = Log( status=LogStatus.STARTED, - input=input_data, + input=serialize_data(input_data), retry_mechanism=retry_mechanism, max_retries=max_retries, ) @@ -162,7 +163,7 @@ def process_payment(input_data: dict) -> dict: self.execution_id, Log( status=LogStatus.FAILED, - output={"error": str(e)}, + output=serialize_data({"error": str(e)}), ), name, ) @@ -172,7 +173,7 @@ def process_payment(input_data: dict) -> dict: raise log = Log( status=LogStatus.COMPLETED, - output=result, + output=serialize_data(result), ) InternalEndureClient.send_log( self.execution_id, @@ -201,7 +202,7 @@ def process_payment(input_data: dict) -> dict: ) log = Log( status=LogStatus.FAILED, - output={"error": str(e)}, + output=serialize_data({"error": str(e)}), ) engine_response = InternalEndureClient.send_log( self.execution_id, log, name From 74f473c10b9c2e73ba5005af9bd8422febf8594a Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Fri, 20 Jun 2025 21:21:34 +0300 Subject: [PATCH 05/22] fix bad request for log --- src/app/types.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/app/types.py b/src/app/types.py index f901a12..f4b1d25 100644 --- a/src/app/types.py +++ b/src/app/types.py @@ -1,5 +1,5 @@ from dataclasses import dataclass, field -from datetime import datetime +from datetime import datetime, timezone from enum import Enum from typing import Optional @@ -23,10 +23,10 @@ def log_to_dict(log: "Log") -> dict: "input": log.input, "output": log.output, "max_retries": log.max_retries, - "retry_mechanism": ( + "retry_method": ( log.retry_mechanism.value if log.retry_mechanism else None ), - "timestamp": log.timestamp.isoformat() if log.timestamp else None, + "timestamp": log.timestamp.replace(tzinfo=timezone.utc).isoformat() if log.timestamp else None, } @@ -37,7 +37,7 @@ class Log: output: Optional[dict] = None max_retries: Optional[int] = None retry_mechanism: Optional[RetryMechanism] = None - timestamp: datetime = field(default_factory=datetime.now) + timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) def to_dict(self): """Convert Log to a dictionary for JSON serialization""" From f7541359c5bd3d16332a3d35ee03b3811dfa0f1e Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sat, 21 Jun 2025 00:33:51 +0300 Subject: [PATCH 06/22] increase sleep time + increase failure rate --- example/demo/actions.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/example/demo/actions.py b/example/demo/actions.py index 89bc381..2c76508 100644 --- a/example/demo/actions.py +++ b/example/demo/actions.py @@ -5,9 +5,9 @@ def validate_payment(input_data: PaymentInput) -> PaymentResult: - time.sleep(2.0) + time.sleep(8) - if random.random() < 0.2: + if random.random() < 0.5: raise Exception("Payment validation failed") return PaymentResult( @@ -18,9 +18,9 @@ def validate_payment(input_data: PaymentInput) -> PaymentResult: def reserve_inventory(input_data: InventoryInput) -> InventoryResult: - time.sleep(1.5) + time.sleep(10) - if random.random() < 0.15: + if random.random() < 0.60: raise Exception(f"Insufficient inventory for {input_data.item_id}") return InventoryResult( @@ -32,9 +32,9 @@ def reserve_inventory(input_data: InventoryInput) -> InventoryResult: async def send_notification(input_data: NotificationInput) -> NotificationResult: - await asyncio.sleep(1.0) + await asyncio.sleep(6) - if random.random() < 0.1: + if random.random() < 0.75: raise Exception(f"Failed to send {input_data.type}") return NotificationResult( @@ -45,9 +45,9 @@ async def send_notification(input_data: NotificationInput) -> NotificationResult def create_user(input_data: UserInput) -> UserResult: - time.sleep(1.5) + time.sleep(7) - if random.random() < 0.1: + if random.random() < 0.7: raise Exception("User creation failed") return UserResult( @@ -58,9 +58,9 @@ def create_user(input_data: UserInput) -> UserResult: def process_refund(input_data: RefundInput) -> RefundResult: - time.sleep(3.0) + time.sleep(9) - if random.random() < 0.05: + if random.random() < 0.85: raise Exception("Refund processing failed") return RefundResult( @@ -71,7 +71,7 @@ def process_refund(input_data: RefundInput) -> RefundResult: def check_order_status(order_id: str) -> dict: - time.sleep(0.5) + time.sleep(5) return { "order_id": order_id, From edd49c07849bfc1dfbbc73708fa2422e2e43001d Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sat, 21 Jun 2025 15:32:59 +0300 Subject: [PATCH 07/22] fix workflows raise execption --- example/demo/actions.py | 10 ++----- example/main.py | 60 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 8 deletions(-) diff --git a/example/demo/actions.py b/example/demo/actions.py index 2c76508..4c298b4 100644 --- a/example/demo/actions.py +++ b/example/demo/actions.py @@ -20,7 +20,7 @@ def validate_payment(input_data: PaymentInput) -> PaymentResult: def reserve_inventory(input_data: InventoryInput) -> InventoryResult: time.sleep(10) - if random.random() < 0.60: + if random.random() < 0.50: raise Exception(f"Insufficient inventory for {input_data.item_id}") return InventoryResult( @@ -34,9 +34,6 @@ def reserve_inventory(input_data: InventoryInput) -> InventoryResult: async def send_notification(input_data: NotificationInput) -> NotificationResult: await asyncio.sleep(6) - if random.random() < 0.75: - raise Exception(f"Failed to send {input_data.type}") - return NotificationResult( notification_id=f"notif_{random.randint(1000, 9999)}", recipient=input_data.recipient, @@ -47,9 +44,6 @@ async def send_notification(input_data: NotificationInput) -> NotificationResult def create_user(input_data: UserInput) -> UserResult: time.sleep(7) - if random.random() < 0.7: - raise Exception("User creation failed") - return UserResult( user_id=f"user_{random.randint(1000, 9999)}", email=input_data.email, @@ -60,7 +54,7 @@ def create_user(input_data: UserInput) -> UserResult: def process_refund(input_data: RefundInput) -> RefundResult: time.sleep(9) - if random.random() < 0.85: + if random.random() < 0.9: raise Exception("Refund processing failed") return RefundResult( diff --git a/example/main.py b/example/main.py index 9777281..0020df8 100644 --- a/example/main.py +++ b/example/main.py @@ -115,6 +115,17 @@ async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: @payment_service.workflow() async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: + await asyncio.sleep(0.3) + order_status = await ctx.execute_action( + action=check_order_status, + input_data=input.order_id, + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT, + action_name="pre_refund_order_check" + ) + + await asyncio.sleep(0.2) + refund_result = await ctx.execute_action( action=process_refund, input_data=input, @@ -122,12 +133,61 @@ async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: retry_mechanism=RetryMechanism.EXPONENTIAL ) + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient="finance@company.com", + message=f"Refund processed: ${refund_result.amount} for order {input.order_id}. Refund ID: {refund_result.refund_id}", + type="email" + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR + ) + return { "order_id": input.order_id, + "order_status": order_status, "refund": refund_result, + "notification": notification_result, "status": "completed" } + @payment_service.workflow() + async def verify_payment_and_notify(ctx: WorkflowContext, input: PaymentInput) -> dict: + await asyncio.sleep(0.3) + + payment_result = await ctx.execute_action( + action=validate_payment, + input_data=input, + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL, + action_name="primary_payment_validation" + ) + + await asyncio.sleep(0.2) + + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient="admin@company.com", + message=f"Payment of ${payment_result.amount} validated with ID {payment_result.payment_id}", + type="email" + ), + max_retries=2, + retry_mechanism=RetryMechanism.LINEAR + ) + + return { + "payment_id": payment_result.payment_id, + "amount": payment_result.amount, + "payment_status": payment_result.status, + "notification": notification_result, + "workflow_status": "completed" + } + durable_app = DurableApp(app) return app From 8ec519848f922d673b11691cf9d516fe760cf3de Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sat, 21 Jun 2025 16:13:50 +0300 Subject: [PATCH 08/22] format and linting --- .flake8 | 3 +- example/demo/actions.py | 41 +++--- example/demo/models.py | 3 +- example/main.py | 179 ++++++++++++------------ src/app/__init__.py | 10 +- src/app/_internal/__init__.py | 1 + src/app/_internal/internal_client.py | 4 +- src/app/_internal/utils.py | 17 +-- src/app/_internal/workflow.py | 43 +++--- src/app/app.py | 32 ++--- src/app/service.py | 6 +- src/app/types.py | 10 +- src/app/workflow_context.py | 21 ++- tests/conftest.py | 15 +- tests/internal/test_internal_client.py | 4 +- tests/internal/test_service_registry.py | 8 +- tests/internal/test_workflow.py | 12 +- tests/test_app.py | 5 +- tests/test_service.py | 3 +- tests/test_workflow_context.py | 14 +- 20 files changed, 219 insertions(+), 212 deletions(-) diff --git a/.flake8 b/.flake8 index 06a52ca..f32ded8 100644 --- a/.flake8 +++ b/.flake8 @@ -1,2 +1,3 @@ [flake8] -max-line-length = 115 \ No newline at end of file +max-line-length = 115 +ignore = F403, F401, F405, W503, E302 diff --git a/example/demo/actions.py b/example/demo/actions.py index 4c298b4..0d48eb2 100644 --- a/example/demo/actions.py +++ b/example/demo/actions.py @@ -1,74 +1,79 @@ +import asyncio import random import time -import asyncio + from .models import * def validate_payment(input_data: PaymentInput) -> PaymentResult: time.sleep(8) - + if random.random() < 0.5: raise Exception("Payment validation failed") - + return PaymentResult( payment_id=f"pay_{random.randint(1000, 9999)}", amount=input_data.amount, - status="validated" + status="validated", ) def reserve_inventory(input_data: InventoryInput) -> InventoryResult: time.sleep(10) - + if random.random() < 0.50: raise Exception(f"Insufficient inventory for {input_data.item_id}") - + return InventoryResult( reservation_id=f"res_{random.randint(1000, 9999)}", item_id=input_data.item_id, quantity=input_data.quantity, - status="reserved" + status="reserved", ) -async def send_notification(input_data: NotificationInput) -> NotificationResult: +async def send_notification( + input_data: NotificationInput, +) -> NotificationResult: await asyncio.sleep(6) - + return NotificationResult( notification_id=f"notif_{random.randint(1000, 9999)}", recipient=input_data.recipient, - status="sent" + status="sent", ) def create_user(input_data: UserInput) -> UserResult: time.sleep(7) - + return UserResult( user_id=f"user_{random.randint(1000, 9999)}", email=input_data.email, - status="active" + status="active", ) def process_refund(input_data: RefundInput) -> RefundResult: time.sleep(9) - + if random.random() < 0.9: raise Exception("Refund processing failed") - + return RefundResult( refund_id=f"ref_{random.randint(1000, 9999)}", amount=input_data.amount, - status="processed" + status="processed", ) def check_order_status(order_id: str) -> dict: time.sleep(5) - + return { "order_id": order_id, - "status": random.choice(["pending", "processing", "shipped", "delivered"]), - "tracking_number": f"TRK{random.randint(100000, 999999)}" + "status": random.choice( + ["pending", "processing", "shipped", "delivered"] + ), + "tracking_number": f"TRK{random.randint(100000, 999999)}", } diff --git a/example/demo/models.py b/example/demo/models.py index 2e9c914..b78ce64 100644 --- a/example/demo/models.py +++ b/example/demo/models.py @@ -1,6 +1,7 @@ -from pydantic import BaseModel, Field from typing import List, Optional +from pydantic import BaseModel, Field + class OrderItem(BaseModel): id: str diff --git a/example/main.py b/example/main.py index 0020df8..3e47172 100644 --- a/example/main.py +++ b/example/main.py @@ -1,116 +1,113 @@ -import sys -import os -sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src')) +import asyncio + +import uvicorn +from demo.actions import * +from demo.models import * +from fastapi import FastAPI from app import DurableApp, Service, WorkflowContext from app.types import RetryMechanism -from fastapi import FastAPI -from demo.models import * -from demo.actions import * -import uvicorn -import asyncio def main(): app = FastAPI(title="Durable Execution Demo", version="1.0.0") - + order_service = Service("orders") user_service = Service("users") payment_service = Service("payments") @order_service.workflow() async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: - await asyncio.sleep(0.5) + await asyncio.sleep(0.5) - payment_result = await ctx.execute_action( - action=validate_payment, - input_data=PaymentInput( - amount=input.total_amount, - payment_method="credit_card" - ), - max_retries=3, - retry_mechanism=RetryMechanism.EXPONENTIAL - ) - - await asyncio.sleep(0.3) - - reservations = [] - for idx, item in enumerate(input.items): - reservation = await ctx.execute_action( - action=reserve_inventory, - input_data=InventoryInput( - item_id=item.id, - quantity=item.quantity - ), - max_retries=2, - retry_mechanism=RetryMechanism.LINEAR, - action_name=f"reserve_inventory_{idx}" - ) - reservations.append(reservation) - - await asyncio.sleep(0.2) - - notification_result = await ctx.execute_action( - action=send_notification, - input_data=NotificationInput( - recipient=input.customer_email, - message=f"Order {input.order_id} confirmed", - type="email" + payment_result = await ctx.execute_action( + action=validate_payment, + input_data=PaymentInput( + amount=input.total_amount, payment_method="credit_card" + ), + max_retries=3, + retry_mechanism=RetryMechanism.EXPONENTIAL, + ) + + await asyncio.sleep(0.3) + + reservations = [] + for idx, item in enumerate(input.items): + reservation = await ctx.execute_action( + action=reserve_inventory, + input_data=InventoryInput( + item_id=item.id, quantity=item.quantity ), max_retries=2, - retry_mechanism=RetryMechanism.CONSTANT + retry_mechanism=RetryMechanism.LINEAR, + action_name=f"reserve_inventory_{idx}", ) - - return { - "order_id": input.order_id, - "status": "completed", - "payment": payment_result, - "reservations": reservations, - "notification": notification_result - } + reservations.append(reservation) + + await asyncio.sleep(0.2) + + notification_result = await ctx.execute_action( + action=send_notification, + input_data=NotificationInput( + recipient=input.customer_email, + message=f"Order {input.order_id} confirmed", + type="email", + ), + max_retries=2, + retry_mechanism=RetryMechanism.CONSTANT, + ) + return { + "order_id": input.order_id, + "status": "completed", + "payment": payment_result, + "reservations": reservations, + "notification": notification_result, + } @order_service.workflow() - def get_order_status(ctx: WorkflowContext, input: OrderStatusInput) -> OrderStatusResult: + def get_order_status( + ctx: WorkflowContext, input: OrderStatusInput + ) -> OrderStatusResult: result = ctx.execute_action( action=check_order_status, input_data=input.order_id, max_retries=1, - retry_mechanism=RetryMechanism.CONSTANT + retry_mechanism=RetryMechanism.CONSTANT, ) - + return OrderStatusResult( order_id=result["order_id"], status=result["status"], - tracking_number=result.get("tracking_number") + tracking_number=result.get("tracking_number"), ) @user_service.workflow() async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: await asyncio.sleep(0.5) - + user_result = await ctx.execute_action( action=create_user, input_data=input, max_retries=2, - retry_mechanism=RetryMechanism.EXPONENTIAL + retry_mechanism=RetryMechanism.EXPONENTIAL, ) - + notification_result = await ctx.execute_action( action=send_notification, input_data=NotificationInput( recipient=input.email, message=f"Welcome {input.username}!", - type="email" + type="email", ), max_retries=1, - retry_mechanism=RetryMechanism.CONSTANT + retry_mechanism=RetryMechanism.CONSTANT, ) - + return { "success": True, "user": user_result, - "notification": notification_result + "notification": notification_result, } @payment_service.workflow() @@ -121,74 +118,78 @@ async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: input_data=input.order_id, max_retries=2, retry_mechanism=RetryMechanism.CONSTANT, - action_name="pre_refund_order_check" + action_name="pre_refund_order_check", ) - - await asyncio.sleep(0.2) - + + await asyncio.sleep(0.2) + refund_result = await ctx.execute_action( - action=process_refund, + action=process_refund_action, input_data=input, max_retries=3, - retry_mechanism=RetryMechanism.EXPONENTIAL + retry_mechanism=RetryMechanism.EXPONENTIAL, ) - + await asyncio.sleep(0.2) - + notification_result = await ctx.execute_action( action=send_notification, input_data=NotificationInput( recipient="finance@company.com", - message=f"Refund processed: ${refund_result.amount} for order {input.order_id}. Refund ID: {refund_result.refund_id}", - type="email" + message=( + f"Refund processed: ${refund_result.amount} for order " + f"{input.order_id}. Refund ID: {refund_result.refund_id}" + ), + type="email", ), max_retries=2, - retry_mechanism=RetryMechanism.LINEAR + retry_mechanism=RetryMechanism.LINEAR, ) - + return { "order_id": input.order_id, "order_status": order_status, "refund": refund_result, "notification": notification_result, - "status": "completed" + "status": "completed", } @payment_service.workflow() - async def verify_payment_and_notify(ctx: WorkflowContext, input: PaymentInput) -> dict: + async def verify_payment_and_notify( + ctx: WorkflowContext, input: PaymentInput + ) -> dict: await asyncio.sleep(0.3) - + payment_result = await ctx.execute_action( action=validate_payment, input_data=input, max_retries=3, retry_mechanism=RetryMechanism.EXPONENTIAL, - action_name="primary_payment_validation" + action_name="primary_payment_validation", ) - + await asyncio.sleep(0.2) - - + notification_result = await ctx.execute_action( action=send_notification, input_data=NotificationInput( recipient="admin@company.com", message=f"Payment of ${payment_result.amount} validated with ID {payment_result.payment_id}", - type="email" + type="email", ), max_retries=2, - retry_mechanism=RetryMechanism.LINEAR + retry_mechanism=RetryMechanism.LINEAR, ) - + return { "payment_id": payment_result.payment_id, "amount": payment_result.amount, "payment_status": payment_result.status, "notification": notification_result, - "workflow_status": "completed" + "workflow_status": "completed", } - durable_app = DurableApp(app) + DurableApp(app) return app @@ -197,4 +198,4 @@ async def verify_payment_and_notify(ctx: WorkflowContext, input: PaymentInput) - app = main() print("🚀 Demo Server Starting...") print("Services: orders, users, payments") - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/src/app/__init__.py b/src/app/__init__.py index ce23bf1..41d8912 100644 --- a/src/app/__init__.py +++ b/src/app/__init__.py @@ -1,14 +1,8 @@ from .app import DurableApp from .service import Service +from .types import (EndureException, ErrorResponse, Log, LogStatus, Response, + RetryMechanism) from .workflow_context import WorkflowContext -from .types import ( - EndureException, - ErrorResponse, - Response, - Log, - LogStatus, - RetryMechanism, -) __all__ = [ "DurableApp", diff --git a/src/app/_internal/__init__.py b/src/app/_internal/__init__.py index 0dffed3..15040b9 100644 --- a/src/app/_internal/__init__.py +++ b/src/app/_internal/__init__.py @@ -2,6 +2,7 @@ Internal implementation details. Do not import directly. This module is for internal use by app.py, service.py, and workflow_context.py only. """ + from .internal_client import InternalEndureClient from .service_registry import ServiceRegistry from .utils import validate_retention_period diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index fa01686..b5639a7 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -1,6 +1,8 @@ -import os import logging +import os + import requests + from ..types import Log, Response diff --git a/src/app/_internal/utils.py b/src/app/_internal/utils.py index 45548be..a878372 100644 --- a/src/app/_internal/utils.py +++ b/src/app/_internal/utils.py @@ -1,5 +1,6 @@ +from dataclasses import asdict, is_dataclass from typing import Any -from dataclasses import is_dataclass, asdict + from pydantic import BaseModel @@ -24,28 +25,28 @@ def validate_retention_period(retention: int) -> None: def serialize_data(data: Any) -> Any: """ Recursively serialize Pydantic models and dataclasses to JSON-compatible dictionaries. - + Args: data: The data to serialize (can be any type) - + Returns: The serialized data with all Pydantic models/dataclasses converted to dicts - + Examples: >>> serialize_data(UserModel(name="John", age=30)) {"name": "John", "age": 30} - + >>> serialize_data({"user": UserModel(name="John"), "count": 5}) {"user": {"name": "John"}, "count": 5} - + >>> serialize_data([UserModel(name="John"), UserModel(name="Jane")]) [{"name": "John"}, {"name": "Jane"}] """ if isinstance(data, BaseModel): # Handle both Pydantic v1 (.dict()) and v2 (.model_dump()) - if hasattr(data, 'model_dump'): + if hasattr(data, "model_dump"): return data.model_dump() - elif hasattr(data, 'dict'): + elif hasattr(data, "dict"): return data.dict() else: # Fallback: try to convert to dict manually diff --git a/src/app/_internal/workflow.py b/src/app/_internal/workflow.py index 3f4126f..0a5e1b0 100644 --- a/src/app/_internal/workflow.py +++ b/src/app/_internal/workflow.py @@ -1,16 +1,16 @@ import asyncio -import requests -from typing import Any, Callable, Union, get_type_hints, get_origin, get_args -from dataclasses import is_dataclass, asdict +from dataclasses import is_dataclass +from typing import Any, Callable, Union, get_args, get_origin, get_type_hints -from fastapi import Request, status, HTTPException, types -from pydantic import ValidationError, BaseModel +import requests +from fastapi import HTTPException, Request, status, types +from pydantic import BaseModel, ValidationError from app.workflow_context import WorkflowContext +from ..types import EndureException from .internal_client import InternalEndureClient from .utils import serialize_data -from ..types import EndureException class Workflow: @@ -60,31 +60,35 @@ def __init__(self, func: Callable, retention_period: int = None): self.retention_period = retention_period self.input, self.output, self.input_type = self._get_io(func) - def _convert_input(self, raw_input: Any) -> Any: """ Convert raw input (typically a dict from JSON) to the expected input type. - + Args: raw_input: The raw input value from the request - + Returns: The converted input value """ # If no type hint or Any, pass through as-is if self.input_type is Any or self.input_type is None: return raw_input - + # If input is already the correct type, pass through if not isinstance(raw_input, dict): return raw_input - + # Check if expected type is a Pydantic model or dataclass try: - if isinstance(self.input_type, type) and (issubclass(self.input_type, BaseModel) or is_dataclass(self.input_type)): + if isinstance(self.input_type, type) and ( + issubclass(self.input_type, BaseModel) + or is_dataclass(self.input_type) + ): return self.input_type(**raw_input) except (TypeError, ValidationError, ValueError) as e: - raise ValueError(f"Failed to convert input to {self.input_type.__name__}: {e}") + raise ValueError( + f"Failed to convert input to {self.input_type.__name__}: {e}" + ) # For all other cases, pass through as-is return raw_input @@ -174,7 +178,7 @@ def _get_io(self, func): Returns: tuple: A tuple containing (input_type_description, output_type_description, input_type), where: - input_type_description: String representation of input type for discovery - - output_type_description: String representation of output type for discovery + - output_type_description: String representation of output type for discovery - input_type: The actual input type for automatic conversion If type hints aren't provided, "Any" is used as a fallback. @@ -188,7 +192,7 @@ def _get_io(self, func): return ( self._get_type_description(input_type), self._get_type_description(output_type), - input_type + input_type, ) def get_handler_route(self): @@ -242,17 +246,16 @@ async def handler(request: Request): InternalEndureClient.mark_execution_as_running( body["execution_id"] ) - - + converted_input = self._convert_input(body["input"]) - + output = self.func(ctx, converted_input) if asyncio.iscoroutine(output): output = await output - + # Recursively serialize all Pydantic models and dataclasses serialized_output = serialize_data(output) - + return {"output": serialized_output} except ValueError as ve: if isinstance(ve, ValidationError): diff --git a/src/app/app.py b/src/app/app.py index a8d5bfd..109af90 100644 --- a/src/app/app.py +++ b/src/app/app.py @@ -3,9 +3,7 @@ from fastapi import FastAPI, Request from fastapi.responses import JSONResponse -from app._internal import ( - ServiceRegistry, -) +from app._internal import ServiceRegistry from app.types import EndureException, ErrorResponse @@ -82,20 +80,20 @@ def _discover(self): """ services = self.serviceRegistry.get_services() return [ - { - "service_name": service_name, - "workflows": [ - { - "name": workflow.name, - "input": workflow.input, - "output": workflow.output, - "idem_retention": workflow.retention_period, - } - for workflow in workflows - ], - } - for service_name, workflows in services.items() - ] + { + "service_name": service_name, + "workflows": [ + { + "name": workflow.name, + "input": workflow.input, + "output": workflow.output, + "idem_retention": workflow.retention_period, + } + for workflow in workflows + ], + } + for service_name, workflows in services.items() + ] async def raise_exception( self, request: Request, exc: EndureException, _=None diff --git a/src/app/service.py b/src/app/service.py index a008d98..4b4ccf9 100644 --- a/src/app/service.py +++ b/src/app/service.py @@ -1,8 +1,4 @@ -from app._internal import ( - ServiceRegistry, - Workflow, - validate_retention_period, -) +from app._internal import ServiceRegistry, Workflow, validate_retention_period from app.workflow_context import WorkflowContext diff --git a/src/app/types.py b/src/app/types.py index f4b1d25..2c28e7f 100644 --- a/src/app/types.py +++ b/src/app/types.py @@ -26,7 +26,11 @@ def log_to_dict(log: "Log") -> dict: "retry_method": ( log.retry_mechanism.value if log.retry_mechanism else None ), - "timestamp": log.timestamp.replace(tzinfo=timezone.utc).isoformat() if log.timestamp else None, + "timestamp": ( + log.timestamp.replace(tzinfo=timezone.utc).isoformat() + if log.timestamp + else None + ), } @@ -37,7 +41,9 @@ class Log: output: Optional[dict] = None max_retries: Optional[int] = None retry_mechanism: Optional[RetryMechanism] = None - timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) + timestamp: datetime = field( + default_factory=lambda: datetime.now(timezone.utc) + ) def to_dict(self): """Convert Log to a dictionary for JSON serialization""" diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 4234ec3..2ca7919 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -1,20 +1,15 @@ -import time -import logging import asyncio -from fastapi import status +import logging +import time + import requests -from app._internal.internal_client import ( - InternalEndureClient, -) -from app._internal.utils import serialize_data -from app.types import ( - Log, - LogStatus, - RetryMechanism, - EndureException, -) +from fastapi import status from pydantic import ValidationError +from app._internal.internal_client import InternalEndureClient +from app._internal.utils import serialize_data +from app.types import EndureException, Log, LogStatus, RetryMechanism + class WorkflowContext: """ diff --git a/tests/conftest.py b/tests/conftest.py index c7200e6..1cee49c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,15 +1,10 @@ import os -import pytest from unittest.mock import AsyncMock, Mock, patch -from app import ( - DurableApp, - Service, - WorkflowContext, - Log, - LogStatus, - RetryMechanism, - Response, -) + +import pytest + +from app import (DurableApp, Log, LogStatus, Response, RetryMechanism, Service, + WorkflowContext) from app._internal import InternalEndureClient, ServiceRegistry diff --git a/tests/internal/test_internal_client.py b/tests/internal/test_internal_client.py index b7aa0de..ed20df2 100644 --- a/tests/internal/test_internal_client.py +++ b/tests/internal/test_internal_client.py @@ -1,8 +1,8 @@ import os -import pytest from unittest.mock import patch -from fastapi import status +import pytest +from fastapi import status from app._internal.internal_client import InternalEndureClient from app.types import Log, LogStatus, Response diff --git a/tests/internal/test_service_registry.py b/tests/internal/test_service_registry.py index 2ae179b..03b23f5 100644 --- a/tests/internal/test_service_registry.py +++ b/tests/internal/test_service_registry.py @@ -1,9 +1,11 @@ -import pytest -from app._internal.workflow import Workflow, WorkflowContext -from app._internal.service_registry import ServiceRegistry from typing import Dict + +import pytest from fastapi import APIRouter +from app._internal.service_registry import ServiceRegistry +from app._internal.workflow import Workflow, WorkflowContext + class TestServiceRegistry: @pytest.fixture(autouse=True) diff --git a/tests/internal/test_workflow.py b/tests/internal/test_workflow.py index 74f3c5e..ee6a2af 100644 --- a/tests/internal/test_workflow.py +++ b/tests/internal/test_workflow.py @@ -1,12 +1,14 @@ -import pytest -from unittest.mock import AsyncMock, patch from typing import Any +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi import HTTPException +from pydantic import BaseModel +from starlette.responses import Response + from app._internal.workflow import Workflow from app.types import EndureException from app.workflow_context import WorkflowContext -from starlette.responses import Response -from fastapi import HTTPException -from pydantic import BaseModel class InputModel: diff --git a/tests/test_app.py b/tests/test_app.py index 6ea29d0..e7b75d1 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -1,9 +1,10 @@ +from unittest.mock import patch + import pytest from fastapi import FastAPI from fastapi.testclient import TestClient -from unittest.mock import patch -from app import DurableApp, Service, WorkflowContext, EndureException +from app import DurableApp, EndureException, Service, WorkflowContext from app._internal import ServiceRegistry diff --git a/tests/test_service.py b/tests/test_service.py index 2bc5faf..fd69147 100644 --- a/tests/test_service.py +++ b/tests/test_service.py @@ -1,5 +1,6 @@ import pytest -from app import WorkflowContext, Service + +from app import Service, WorkflowContext from app._internal import ServiceRegistry, Workflow diff --git a/tests/test_workflow_context.py b/tests/test_workflow_context.py index 7e3bf16..118c924 100644 --- a/tests/test_workflow_context.py +++ b/tests/test_workflow_context.py @@ -1,11 +1,13 @@ -from app.types import LogStatus, RetryMechanism, Response -import pytest -from unittest.mock import patch -from fastapi import status, HTTPException +import asyncio import time -from pydantic import ValidationError, BaseModel +from unittest.mock import patch + +import pytest import requests -import asyncio +from fastapi import HTTPException, status +from pydantic import BaseModel, ValidationError + +from app.types import LogStatus, Response, RetryMechanism @pytest.mark.asyncio From 5e321e9c8267c656acdaf84eb3a1c1863bef2d8f Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sat, 21 Jun 2025 18:04:48 +0300 Subject: [PATCH 09/22] fix max retries log issue --- src/app/workflow_context.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 2ca7919..479c55b 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -145,8 +145,7 @@ def process_payment(input_data: dict) -> dict: status_code = engine_response["status_code"] match status_code: case status.HTTP_201_CREATED | status.HTTP_200_OK: - attempt = 0 - while attempt <= max_retries: + while True: try: try: if asyncio.iscoroutinefunction(action): @@ -186,15 +185,6 @@ def process_payment(input_data: dict) -> dict: ) raise except Exception as e: - if attempt == max_retries: - raise EndureException( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - output={ - "error": str( - "Action failed after reaching max retries" - ) - }, - ) log = Log( status=LogStatus.FAILED, output=serialize_data({"error": str(e)}), @@ -202,7 +192,21 @@ def process_payment(input_data: dict) -> dict: engine_response = InternalEndureClient.send_log( self.execution_id, log, name ) - attempt += 1 + + engine_status = engine_response.get("status_code") + if engine_status in [ + status.HTTP_400_BAD_REQUEST, + status.HTTP_404_NOT_FOUND, + ]: + raise EndureException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + output=serialize_data({ + "error": str( + "Action failed after reaching max retries" + ) + }), + ) + retry_at_unix = engine_response.get("payload", {}).get( "retry_at" ) From 6aa10e259a2a60b65caae7cddca050cafbcc77b5 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sat, 21 Jun 2025 19:04:40 +0300 Subject: [PATCH 10/22] fix discover test + workflow context --- tests/test_app.py | 14 +++++++------- tests/test_workflow_context.py | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test_app.py b/tests/test_app.py index e7b75d1..167e040 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -49,11 +49,11 @@ def test_workflow(input: dict, ctx: WorkflowContext): assert response.status_code == 200 data = response.json() - assert "services" in data - assert len(data["services"]) == 1 + assert isinstance(data, list) + assert len(data) == 1 - service = data["services"][0] - assert service["name"] == "test_service" + service = data[0] + assert service["service_name"] == "test_service" assert len(service["workflows"]) == 1 workflow = service["workflows"][0] @@ -124,17 +124,17 @@ def workflow3(input: dict, ctx: WorkflowContext): assert response.status_code == 200 data = response.json() - assert len(data["services"]) == 2 + assert len(data) == 2 service1_data = next( - s for s in data["services"] if s["name"] == "service1" + s for s in data if s["service_name"] == "service1" ) assert len(service1_data["workflows"]) == 2 workflow_names = {w["name"] for w in service1_data["workflows"]} assert workflow_names == {"workflow1", "workflow2"} service2_data = next( - s for s in data["services"] if s["name"] == "service2" + s for s in data if s["service_name"] == "service2" ) assert len(service2_data["workflows"]) == 1 assert service2_data["workflows"][0]["name"] == "workflow3" diff --git a/tests/test_workflow_context.py b/tests/test_workflow_context.py index 118c924..992cf22 100644 --- a/tests/test_workflow_context.py +++ b/tests/test_workflow_context.py @@ -224,7 +224,7 @@ def failing_action(input_data): assert mock_sleep.call_count == 3 sleep_duration = mock_sleep.call_args[0][0] assert sleep_duration > 0 and sleep_duration <= 5 - assert mock_send_log.call_count == 4 + assert mock_send_log.call_count == 5 @pytest.mark.asyncio From 3252287837f44a71aafdfc973e9d46a3bde6ed20 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sun, 22 Jun 2025 00:02:41 +0300 Subject: [PATCH 11/22] docs listing service of the demo --- example/DEMO_WORKFLOWS.md | 224 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 example/DEMO_WORKFLOWS.md diff --git a/example/DEMO_WORKFLOWS.md b/example/DEMO_WORKFLOWS.md new file mode 100644 index 0000000..420a418 --- /dev/null +++ b/example/DEMO_WORKFLOWS.md @@ -0,0 +1,224 @@ +# Durable Execution Engine SDK - Demo Workflows + +This document provides a comprehensive overview of all workflows in the demo application, including their failure characteristics, retry configurations, and expected behavior. + +## Overview + +The demo showcases three main services with multiple workflows that demonstrate durable execution patterns, retry mechanisms, and error handling: + +- **Orders Service**: Complex multi-step workflows with dependent actions +- **Users Service**: User management with notifications +- **Payments Service**: Payment processing and refund handling + +## Services and Workflows + +### 🛒 Orders Service (`orders`) + +#### 1. Process Order (`process_order`) +**Endpoint**: `POST /execute/orders/process_order` + +A complex multi-step workflow demonstrating sequential action execution with different retry strategies. + +**Input**: `OrderInput` +```json +{ + "order_id": "string", + "customer_email": "string", + "items": [ + { + "id": "string", + "quantity": number, + "price": number + } + ], + "total_amount": number +} +``` + +**Workflow Steps**: +1. **Payment Validation** (8s processing time) + - Action: `validate_payment` + - Failure Rate: **50%** + - Max Retries: 3 + - Retry Mechanism: Exponential + - Error: "Payment validation failed" + +2. **Inventory Reservation** (10s processing time per item) + - Action: `reserve_inventory` (executed for each item) + - Failure Rate: **50%** + - Max Retries: 2 + - Retry Mechanism: Linear + - Error: "Insufficient inventory for {item_id}" + - Custom Action Names: `reserve_inventory_0`, `reserve_inventory_1`, etc. + +3. **Order Confirmation Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Constant + - Recipient: Customer email + +**Expected Behavior**: Due to high failure rates in payment and inventory steps, this workflow frequently requires retries and may fail entirely if retries are exhausted. + +#### 2. Get Order Status (`get_order_status`) +**Endpoint**: `POST /execute/orders/get_order_status` + +A simple status check workflow that always succeeds. + +**Input**: `OrderStatusInput` +```json +{ + "order_id": "string" +} +``` + +**Workflow Steps**: +1. **Status Check** (5s processing time) + - Action: `check_order_status` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 1 + - Retry Mechanism: Constant + - Returns random status: "pending", "processing", "shipped", or "delivered" + +**Expected Behavior**: Reliable workflow that demonstrates successful action execution. + +### 👤 Users Service (`users`) + +#### 1. Register User (`register_user`) +**Endpoint**: `POST /execute/users/register_user` + +User registration workflow with welcome notification. + +**Input**: `UserInput` +```json +{ + "email": "string", + "username": "string", + "password": "string" +} +``` + +**Workflow Steps**: +1. **User Creation** (7s processing time) + - Action: `create_user` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Exponential + +2. **Welcome Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 1 + - Retry Mechanism: Constant + - Message: "Welcome {username}!" + +**Expected Behavior**: Highly reliable workflow that consistently succeeds. + +### 💳 Payments Service (`payments`) + +#### 1. Process Refund (`process_refund`) +**Endpoint**: `POST /execute/payments/process_refund` + +Complex refund processing with order verification and notifications. + +**Input**: `RefundInput` +```json +{ + "order_id": "string", + "amount": number, + "reason": "string" +} +``` + +**Workflow Steps**: +1. **Pre-Refund Order Check** (5s processing time) + - Action: `check_order_status` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Constant + - Custom Action Name: `pre_refund_order_check` + +2. **Refund Processing** (9s processing time) + - Action: `process_refund` + - Failure Rate: **90%** (Very high failure rate) + - Max Retries: 3 + - Retry Mechanism: Exponential + - Error: "Refund processing failed" + +3. **Finance Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Linear + - Recipient: "finance@company.com" + +**Expected Behavior**: This workflow has the highest failure rate and will frequently exhaust retries due to the 90% failure rate in refund processing. + +#### 2. Verify Payment and Notify (`verify_payment_and_notify`) +**Endpoint**: `POST /execute/payments/verify_payment_and_notify` + +Payment verification with admin notification. + +**Input**: `PaymentInput` +```json +{ + "amount": number, + "payment_method": "string" +} +``` + +**Workflow Steps**: +1. **Payment Validation** (8s processing time) + - Action: `validate_payment` + - Failure Rate: **50%** + - Max Retries: 3 + - Retry Mechanism: Exponential + - Custom Action Name: `primary_payment_validation` + +2. **Admin Notification** (6s processing time) + - Action: `send_notification` + - Failure Rate: **0%** (Always succeeds) + - Max Retries: 2 + - Retry Mechanism: Linear + - Recipient: "admin@company.com" + +**Expected Behavior**: Moderate failure rate due to payment validation step. + +## Action Failure Summary + +| Action | Processing Time | Failure Rate | Typical Error | +|--------|----------------|--------------|---------------| +| `validate_payment` | 8 seconds | **50%** | "Payment validation failed" | +| `reserve_inventory` | 10 seconds | **50%** | "Insufficient inventory for {item_id}" | +| `process_refund` | 9 seconds | **90%** | "Refund processing failed" | +| `send_notification` | 6 seconds | **0%** | Never fails | +| `create_user` | 7 seconds | **0%** | Never fails | +| `check_order_status` | 5 seconds | **0%** | Never fails | + +## Retry Mechanisms + +The demo showcases three different retry mechanisms: + +1. **Exponential**: Exponentially increasing delays between retries +2. **Linear**: Fixed incremental delays between retries +3. **Constant**: Fixed delay between retries + +## Testing the Demo + +### High Success Rate Workflows +- `users/register_user` - Should consistently succeed +- `orders/get_order_status` - Always succeeds + +### Moderate Failure Rate Workflows +- `orders/process_order` - 50% failure rate on payment and inventory +- `payments/verify_payment_and_notify` - 50% failure rate on payment + +### High Failure Rate Workflows +- `payments/process_refund` - 90% failure rate on refund processing + +### Recommended Test Scenarios + +1. **Success Path**: Call `register_user` or `get_order_status` +2. **Retry Demonstration**: Call `process_order` or `verify_payment_and_notify` multiple times +3. **Failure Demonstration**: Call `process_refund` to see retry exhaustion +4. **Complex Workflow**: Call `process_order` with multiple items to see parallel inventory reservations From 713133ec2e4397efff2d35d9bfb32191ffc44a14 Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sun, 22 Jun 2025 01:13:53 +0300 Subject: [PATCH 12/22] fix demo actions --- example/demo/actions.py | 12 ++++++------ example/main.py | 20 ++++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/example/demo/actions.py b/example/demo/actions.py index 0d48eb2..05ddf1e 100644 --- a/example/demo/actions.py +++ b/example/demo/actions.py @@ -5,7 +5,7 @@ from .models import * -def validate_payment(input_data: PaymentInput) -> PaymentResult: +def validate_payment_action(input_data: PaymentInput) -> PaymentResult: time.sleep(8) if random.random() < 0.5: @@ -18,7 +18,7 @@ def validate_payment(input_data: PaymentInput) -> PaymentResult: ) -def reserve_inventory(input_data: InventoryInput) -> InventoryResult: +def reserve_inventory_action(input_data: InventoryInput) -> InventoryResult: time.sleep(10) if random.random() < 0.50: @@ -32,7 +32,7 @@ def reserve_inventory(input_data: InventoryInput) -> InventoryResult: ) -async def send_notification( +async def send_notification_action( input_data: NotificationInput, ) -> NotificationResult: await asyncio.sleep(6) @@ -44,7 +44,7 @@ async def send_notification( ) -def create_user(input_data: UserInput) -> UserResult: +def create_user_action(input_data: UserInput) -> UserResult: time.sleep(7) return UserResult( @@ -54,7 +54,7 @@ def create_user(input_data: UserInput) -> UserResult: ) -def process_refund(input_data: RefundInput) -> RefundResult: +def process_refund_action(input_data: RefundInput) -> RefundResult: time.sleep(9) if random.random() < 0.9: @@ -67,7 +67,7 @@ def process_refund(input_data: RefundInput) -> RefundResult: ) -def check_order_status(order_id: str) -> dict: +def check_order_status_action(order_id: str) -> dict: time.sleep(5) return { diff --git a/example/main.py b/example/main.py index 3e47172..6e301e4 100644 --- a/example/main.py +++ b/example/main.py @@ -21,7 +21,7 @@ async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: await asyncio.sleep(0.5) payment_result = await ctx.execute_action( - action=validate_payment, + action=validate_payment_action, input_data=PaymentInput( amount=input.total_amount, payment_method="credit_card" ), @@ -34,7 +34,7 @@ async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: reservations = [] for idx, item in enumerate(input.items): reservation = await ctx.execute_action( - action=reserve_inventory, + action=reserve_inventory_action, input_data=InventoryInput( item_id=item.id, quantity=item.quantity ), @@ -47,7 +47,7 @@ async def process_order(ctx: WorkflowContext, input: OrderInput) -> dict: await asyncio.sleep(0.2) notification_result = await ctx.execute_action( - action=send_notification, + action=send_notification_action, input_data=NotificationInput( recipient=input.customer_email, message=f"Order {input.order_id} confirmed", @@ -70,7 +70,7 @@ def get_order_status( ctx: WorkflowContext, input: OrderStatusInput ) -> OrderStatusResult: result = ctx.execute_action( - action=check_order_status, + action=check_order_status_action, input_data=input.order_id, max_retries=1, retry_mechanism=RetryMechanism.CONSTANT, @@ -87,14 +87,14 @@ async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: await asyncio.sleep(0.5) user_result = await ctx.execute_action( - action=create_user, + action=create_user_action, input_data=input, max_retries=2, retry_mechanism=RetryMechanism.EXPONENTIAL, ) notification_result = await ctx.execute_action( - action=send_notification, + action=send_notification_action, input_data=NotificationInput( recipient=input.email, message=f"Welcome {input.username}!", @@ -114,7 +114,7 @@ async def register_user(ctx: WorkflowContext, input: UserInput) -> dict: async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: await asyncio.sleep(0.3) order_status = await ctx.execute_action( - action=check_order_status, + action=check_order_status_action, input_data=input.order_id, max_retries=2, retry_mechanism=RetryMechanism.CONSTANT, @@ -133,7 +133,7 @@ async def process_refund(ctx: WorkflowContext, input: RefundInput) -> dict: await asyncio.sleep(0.2) notification_result = await ctx.execute_action( - action=send_notification, + action=send_notification_action, input_data=NotificationInput( recipient="finance@company.com", message=( @@ -161,7 +161,7 @@ async def verify_payment_and_notify( await asyncio.sleep(0.3) payment_result = await ctx.execute_action( - action=validate_payment, + action=validate_payment_action, input_data=input, max_retries=3, retry_mechanism=RetryMechanism.EXPONENTIAL, @@ -171,7 +171,7 @@ async def verify_payment_and_notify( await asyncio.sleep(0.2) notification_result = await ctx.execute_action( - action=send_notification, + action=send_notification_action, input_data=NotificationInput( recipient="admin@company.com", message=f"Payment of ${payment_result.amount} validated with ID {payment_result.payment_id}", From 883f17a12526ba1824624c20fcbc5bc8805e395d Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 13:26:46 +0300 Subject: [PATCH 13/22] added logs for SDK --- src/app/_internal/internal_client.py | 34 +++++++++++++++++++++++++ src/app/workflow_context.py | 37 +++++++++++++++++++++++++--- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index b5639a7..cf6e874 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -29,11 +29,17 @@ def send_log(self, execution_id: str, log: Log, action_name: str): """ # noqa: E501 try: if not self._base_url: + logging.error( + "DURABLE_ENGINE_BASE_URL is not set in environment variables." + ) raise ValueError( "DURABLE_ENGINE_BASE_URL is not set in environment variables." ) if not log or not action_name: + logging.error( + "log and action_name must be provided." + ) raise ValueError("log and action_name must be provided.") url = ( @@ -42,10 +48,20 @@ def send_log(self, execution_id: str, log: Log, action_name: str): headers = {"Content-Type": "application/json"} payload = log.to_dict() response = requests.patch(url, headers=headers, json=payload) + logging.info( + "Log sent to the Durable Execution Engine: {}".format(log) + ) + logging.info("Response after sending log: {}".format(response)) response.raise_for_status() try: response_payload = response.json() + logging.info( + "Response payload: {}".format(response_payload) + ) except ValueError: + logging.error( + "Error parsing response payload: {}".format(e) + ) response_payload = {} response = Response( status_code=response.status_code, @@ -54,8 +70,14 @@ def send_log(self, execution_id: str, log: Log, action_name: str): except requests.exceptions.HTTPError as e: try: error_payload = e.response.json() + logging.info( + "Error payload: {}".format(error_payload) + ) except Exception: error_payload = {} + logging.error( + "Error payload: {}".format(error_payload) + ) response = Response( status_code=e.response.status_code, payload=error_payload, @@ -84,17 +106,29 @@ def mark_execution_as_running(self, execution_id: str): """ try: if not self._base_url: + logging.error( + "DURABLE_ENGINE_BASE_URL is not set in environment variables." + ) raise ValueError( "DURABLE_ENGINE_BASE_URL is not set in environment variables." ) url = f"{self._base_url}/executions/{execution_id}/started" headers = {"Content-Type": "application/json"} response = requests.patch(url, headers=headers) + logging.info( + "Execution marked as running: {}".format(response) + ) + logging.info( + "Response after marking execution as running: {}".format(response) + ) response.raise_for_status() response = Response( status_code=response.status_code, ) except requests.exceptions.HTTPError as e: + logging.error( + "Error marking execution as running: {}".format(e) + ) response = Response( status_code=e.response.status_code, ) diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 479c55b..4ba885d 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -138,6 +138,9 @@ def process_payment(input_data: dict) -> dict: engine_response = InternalEndureClient.send_log( self.execution_id, log, name ) + logging.info( + "Engine response: {}".format(engine_response) + ) if not engine_response: raise ValueError( "Base URL is not set in environment variables or missing required parameters (log or action_name)." @@ -148,10 +151,16 @@ def process_payment(input_data: dict) -> dict: while True: try: try: + logging.info( + "Executing action: {}".format(action.__name__) + ) if asyncio.iscoroutinefunction(action): result = await action(input_data) else: result = action(input_data) + logging.info( + "Action result: {}".format(result) + ) except (ValueError, ValidationError) as e: InternalEndureClient.send_log( self.execution_id, @@ -161,7 +170,7 @@ def process_payment(input_data: dict) -> dict: ), name, ) - logging.info( + logging.error( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" ) raise @@ -169,11 +178,17 @@ def process_payment(input_data: dict) -> dict: status=LogStatus.COMPLETED, output=serialize_data(result), ) + logging.info( + "Sending log for completed action: {}".format(log) + ) InternalEndureClient.send_log( self.execution_id, log, name, ) + logging.info( + "Returning result: {}".format(result) + ) return result except ( ValueError, @@ -189,15 +204,23 @@ def process_payment(input_data: dict) -> dict: status=LogStatus.FAILED, output=serialize_data({"error": str(e)}), ) + logging.info( + "Sending log for failed action: {}".format(log) + ) engine_response = InternalEndureClient.send_log( self.execution_id, log, name ) - + logging.info( + "Engine response: {}".format(engine_response) + ) engine_status = engine_response.get("status_code") if engine_status in [ status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND, ]: + logging.error( + "Raising EndureException: {}".format(e) + ) raise EndureException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, output=serialize_data({ @@ -206,14 +229,22 @@ def process_payment(input_data: dict) -> dict: ) }), ) - retry_at_unix = engine_response.get("payload", {}).get( "retry_at" ) + logging.info( + "Retry at unix: {}".format(retry_at_unix) + ) if retry_at_unix: sleep_seconds = retry_at_unix - time.time() if sleep_seconds > 0: + logging.info( + "Sleeping for {} seconds".format(sleep_seconds) + ) time.sleep(sleep_seconds) case status.HTTP_208_ALREADY_REPORTED: + logging.info( + "Returning cached result: {}".format(engine_response) + ) output = engine_response.get("payload", {}).get("output") return output if output else {} From dca97b7ae8ece27ec80fe025732a1548b8393bef Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 13:27:50 +0300 Subject: [PATCH 14/22] linting --- src/app/_internal/internal_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index cf6e874..2337679 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -58,7 +58,7 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.info( "Response payload: {}".format(response_payload) ) - except ValueError: + except ValueError as e: logging.error( "Error parsing response payload: {}".format(e) ) From 571a757b75b72de3798e7f0ac2bab5603ce402d0 Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 13:37:34 +0300 Subject: [PATCH 15/22] add logging --- example/Dockerfile | 2 ++ example/main.py | 6 ++++++ 2 files changed, 8 insertions(+) diff --git a/example/Dockerfile b/example/Dockerfile index f7f00b4..8f1e707 100644 --- a/example/Dockerfile +++ b/example/Dockerfile @@ -28,6 +28,8 @@ COPY --from=builder /opt/venv /opt/venv COPY . . # Set the environment variables +ENV PYTHONPATH=/app +ENV LOG_LEVEL=DEBUG ENV PATH="/opt/venv/bin:$PATH" # Expose the port the app runs on diff --git a/example/main.py b/example/main.py index 6e301e4..33c4474 100644 --- a/example/main.py +++ b/example/main.py @@ -1,4 +1,5 @@ import asyncio +import logging import uvicorn from demo.actions import * @@ -8,6 +9,11 @@ from app import DurableApp, Service, WorkflowContext from app.types import RetryMechanism +#logging to show all levels +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) def main(): app = FastAPI(title="Durable Execution Demo", version="1.0.0") From 5d64dbed002083060d8161b4b3c4b0011105907f Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 13:38:41 +0300 Subject: [PATCH 16/22] linitng --- example/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example/main.py b/example/main.py index 33c4474..cec5fd0 100644 --- a/example/main.py +++ b/example/main.py @@ -9,7 +9,7 @@ from app import DurableApp, Service, WorkflowContext from app.types import RetryMechanism -#logging to show all levels +# logging to show all levels logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' From 449778a73bbd7b846c8bfc7c68987887d18a7bda Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 13:59:11 +0300 Subject: [PATCH 17/22] added logs --- src/app/workflow_context.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 4ba885d..7a941f4 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -162,7 +162,7 @@ def process_payment(input_data: dict) -> dict: "Action result: {}".format(result) ) except (ValueError, ValidationError) as e: - InternalEndureClient.send_log( + engine_response = InternalEndureClient.send_log( self.execution_id, Log( status=LogStatus.FAILED, @@ -170,6 +170,9 @@ def process_payment(input_data: dict) -> dict: ), name, ) + logging.info( + "Engine response: {}".format(engine_response) + ) logging.error( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" ) @@ -181,11 +184,14 @@ def process_payment(input_data: dict) -> dict: logging.info( "Sending log for completed action: {}".format(log) ) - InternalEndureClient.send_log( + engine_response = InternalEndureClient.send_log( self.execution_id, log, name, ) + logging.info( + "Engine response: {}".format(engine_response) + ) logging.info( "Returning result: {}".format(result) ) From 82a0f507a0b705b736b8957aebf3a792f13fc18f Mon Sep 17 00:00:00 2001 From: Salma Elsoly Date: Sun, 22 Jun 2025 14:07:47 +0300 Subject: [PATCH 18/22] add dockerfile for demo --- example/Dockerfile | 40 ++++++++++++++++++---------------------- requirements.txt | 5 +++++ 2 files changed, 23 insertions(+), 22 deletions(-) create mode 100644 requirements.txt diff --git a/example/Dockerfile b/example/Dockerfile index f7f00b4..0407f4e 100644 --- a/example/Dockerfile +++ b/example/Dockerfile @@ -1,37 +1,33 @@ -# Stage 1: Build stage FROM python:3.10-slim AS builder -# Set the working directory WORKDIR /app -# Copy the requirements file into the container -COPY requirements.txt . - -# Create a virtual environment and install dependencies -RUN python -m venv /opt/venv -ENV PATH="/opt/venv/bin:$PATH" +COPY requirements.txt ./ RUN pip install --no-cache-dir -r requirements.txt -# Copy the application code -COPY . . +COPY ../src ./src +COPY ./ ./example -# Stage 2: Final stage FROM python:3.10-slim -# Set the working directory to /app/Example -WORKDIR /app/Example +WORKDIR /app + +COPY --from=builder /usr/local/lib/python3.10/site-packages /usr/local/lib/python3.10/site-packages +COPY --from=builder /usr/local/bin /usr/local/bin -# Copy only the virtual environment from the builder stage -COPY --from=builder /opt/venv /opt/venv -# Copy the application code into the Example directory -COPY . . +COPY src/ ./src/ +COPY example/ ./example/ -# Set the environment variables -ENV PATH="/opt/venv/bin:$PATH" +ENV DURABLE_ENGINE_BASE_URL=http://host.docker.internal:8080/api/v1 +ENV PYTHONPATH=/app/src:/app +ENV PYTHONUNBUFFERED=1 -# Expose the port the app runs on EXPOSE 8000 -# Command to run the application using Uvicorn -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] \ No newline at end of file +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8000/discover || exit 1 + +WORKDIR /app/example + +CMD ["python", "main.py"] \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ccdbdf2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +fastapi>=0.115.12,<1.0.0 +pydantic>=1.0.0,<2.0.0 +requests>=2.32.4,<3.0.0 +httpx>=0.28.1,<1.0.0 +uvicorn>=0.32.1,<1.0.0 \ No newline at end of file From 325dd91957b9cd35678212f60942597dbb1143bd Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 14:52:59 +0300 Subject: [PATCH 19/22] adjust logs --- src/app/_internal/internal_client.py | 51 +++++++++++++++++++--- src/app/workflow_context.py | 65 +++++++++++++++++++++++++--- 2 files changed, 103 insertions(+), 13 deletions(-) diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index 2337679..4057b4f 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -28,6 +28,9 @@ def send_log(self, execution_id: str, log: Log, action_name: str): requests.exceptions.HTTPError: If the request fails. """ # noqa: E501 try: + logging.info(f"Attempting to send log to engine - Execution ID: {execution_id}, Action: {action_name}") + logging.info(f"Base URL: {self._base_url}") + if not self._base_url: logging.error( "DURABLE_ENGINE_BASE_URL is not set in environment variables." @@ -47,11 +50,19 @@ def send_log(self, execution_id: str, log: Log, action_name: str): ) headers = {"Content-Type": "application/json"} payload = log.to_dict() + + logging.info(f"Making request to: {url}") + logging.info(f"Request headers: {headers}") + logging.info(f"Request payload: {payload}") + response = requests.patch(url, headers=headers, json=payload) logging.info( "Log sent to the Durable Execution Engine: {}".format(log) ) + logging.info(f"Response status code: {response.status_code}") + logging.info(f"Response headers: {dict(response.headers)}") logging.info("Response after sending log: {}".format(response)) + response.raise_for_status() try: response_payload = response.json() @@ -62,21 +73,26 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.error( "Error parsing response payload: {}".format(e) ) + logging.error(f"Raw response text: {response.text}") response_payload = {} response = Response( status_code=response.status_code, payload=response_payload, ) except requests.exceptions.HTTPError as e: + logging.error(f"HTTP ERROR: Status {e.response.status_code}") + logging.error(f"HTTP ERROR: URL {e.response.url}") + logging.error(f"HTTP ERROR: Headers {dict(e.response.headers)}") + logging.error(f"HTTP ERROR: Text {e.response.text}") try: error_payload = e.response.json() logging.info( "Error payload: {}".format(error_payload) ) - except Exception: + except Exception as parse_error: error_payload = {} logging.error( - "Error payload: {}".format(error_payload) + f"Error parsing error payload: {parse_error}. Raw text: {e.response.text}" ) response = Response( status_code=e.response.status_code, @@ -84,9 +100,15 @@ def send_log(self, execution_id: str, log: Log, action_name: str): ) except requests.exceptions.RequestException as e: logging.error( - "Engine is unreachable. Aborting retries: {}".format(e) + f"NETWORK ERROR: Engine is unreachable. Error type: {type(e).__name__}" ) + logging.error(f"NETWORK ERROR: Error details: {e}") + raise e + except Exception as e: + logging.error(f"UNEXPECTED ERROR in send_log: {type(e).__name__}: {e}") raise e + + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() @classmethod @@ -105,6 +127,9 @@ def mark_execution_as_running(self, execution_id: str): requests.exceptions.HTTPError: If the request fails. """ try: + logging.info(f"Attempting to mark execution as running - Execution ID: {execution_id}") + logging.info(f"Base URL: {self._base_url}") + if not self._base_url: logging.error( "DURABLE_ENGINE_BASE_URL is not set in environment variables." @@ -114,10 +139,16 @@ def mark_execution_as_running(self, execution_id: str): ) url = f"{self._base_url}/executions/{execution_id}/started" headers = {"Content-Type": "application/json"} + + logging.info(f"Making request to: {url}") + logging.info(f"Request headers: {headers}") + response = requests.patch(url, headers=headers) logging.info( "Execution marked as running: {}".format(response) ) + logging.info(f"Response status code: {response.status_code}") + logging.info(f"Response headers: {dict(response.headers)}") logging.info( "Response after marking execution as running: {}".format(response) ) @@ -126,15 +157,21 @@ def mark_execution_as_running(self, execution_id: str): status_code=response.status_code, ) except requests.exceptions.HTTPError as e: - logging.error( - "Error marking execution as running: {}".format(e) - ) + logging.error(f"HTTP ERROR marking execution as running: Status {e.response.status_code}") + logging.error(f"HTTP ERROR: URL {e.response.url}") + logging.error(f"HTTP ERROR: Text {e.response.text}") response = Response( status_code=e.response.status_code, ) except requests.exceptions.RequestException as e: logging.error( - "Engine is unreachable. Aborting retries: {}".format(e) + f"NETWORK ERROR: Engine is unreachable. Error type: {type(e).__name__}" ) + logging.error(f"NETWORK ERROR: Error details: {e}") + raise e + except Exception as e: + logging.error(f"UNEXPECTED ERROR in mark_execution_as_running: {type(e).__name__}: {e}") raise e + + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 7a941f4..dbf07ba 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -135,6 +135,9 @@ def process_payment(input_data: dict) -> dict: max_retries=max_retries, ) name = action_name if action_name is not None else action.__name__ + logging.info( + "Sending log for action: {}".format(log) + ) engine_response = InternalEndureClient.send_log( self.execution_id, log, name ) @@ -142,12 +145,26 @@ def process_payment(input_data: dict) -> dict: "Engine response: {}".format(engine_response) ) if not engine_response: + logging.error( + "CRITICAL ERROR: Engine response is None or empty. This indicates a communication failure with the durable engine." + ) raise ValueError( "Base URL is not set in environment variables or missing required parameters (log or action_name)." ) + + # Log detailed response information + logging.info( + f"Detailed engine response - Status: {engine_response.get('status_code')}, " + f"Payload: {engine_response.get('payload')}, " + f"Headers: {engine_response.get('headers', {})}" + ) + status_code = engine_response["status_code"] + logging.info(f"Processing status code: {status_code}") + match status_code: case status.HTTP_201_CREATED | status.HTTP_200_OK: + logging.info(f"Status {status_code} - Proceeding with action execution") while True: try: try: @@ -162,6 +179,9 @@ def process_payment(input_data: dict) -> dict: "Action result: {}".format(result) ) except (ValueError, ValidationError) as e: + logging.error( + f"VALIDATION ERROR in action {action.__name__}: {type(e).__name__}: {e}" + ) engine_response = InternalEndureClient.send_log( self.execution_id, Log( @@ -171,7 +191,7 @@ def process_payment(input_data: dict) -> dict: name, ) logging.info( - "Engine response: {}".format(engine_response) + "Engine response after validation error: {}".format(engine_response) ) logging.error( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" @@ -190,7 +210,7 @@ def process_payment(input_data: dict) -> dict: name, ) logging.info( - "Engine response: {}".format(engine_response) + "Engine response after completion: {}".format(engine_response) ) logging.info( "Returning result: {}".format(result) @@ -201,11 +221,20 @@ def process_payment(input_data: dict) -> dict: ValidationError, requests.exceptions.RequestException, ) as e: - logging.debug( - f"DEBUG: Caught exception of type {type(e)}: {e}" + logging.error( + f"CRITICAL ERROR: Caught exception of type {type(e)}: {e}" + ) + logging.error( + f"Exception details - Args: {e.args}, Traceback: {type(e).__name__}" ) raise except Exception as e: + logging.error( + f"UNEXPECTED ERROR in action {action.__name__}: {type(e).__name__}: {e}" + ) + logging.error( + f"Error details - Args: {e.args}, Traceback: {type(e).__name__}" + ) log = Log( status=LogStatus.FAILED, output=serialize_data({"error": str(e)}), @@ -217,15 +246,18 @@ def process_payment(input_data: dict) -> dict: self.execution_id, log, name ) logging.info( - "Engine response: {}".format(engine_response) + "Engine response after failure: {}".format(engine_response) ) engine_status = engine_response.get("status_code") + logging.info(f"Engine status after failure: {engine_status}") + if engine_status in [ status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND, ]: logging.error( - "Raising EndureException: {}".format(e) + f"ENGINE ERROR: Received {engine_status} from engine. " + f"Original error: {type(e).__name__}: {e}" ) raise EndureException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, @@ -248,9 +280,30 @@ def process_payment(input_data: dict) -> dict: "Sleeping for {} seconds".format(sleep_seconds) ) time.sleep(sleep_seconds) + else: + logging.warning( + f"Retry time {retry_at_unix} is in the past. " + f"Current time: {time.time()}" + ) + else: + logging.error( + "CRITICAL ERROR: No retry_at time provided by engine. " + f"Engine response: {engine_response}" + ) + raise RuntimeError( + f"Engine did not provide retry_at time. Response: {engine_response}" + ) case status.HTTP_208_ALREADY_REPORTED: logging.info( "Returning cached result: {}".format(engine_response) ) output = engine_response.get("payload", {}).get("output") return output if output else {} + case _: + logging.error( + f"UNEXPECTED STATUS CODE: {status_code}. " + f"Full response: {engine_response}" + ) + raise RuntimeError( + f"Unexpected status code {status_code} from engine: {engine_response}" + ) From cda13164e176538e06a3f6a6a5aeb0e23a23d26c Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 15:11:35 +0300 Subject: [PATCH 20/22] added more logs --- src/app/workflow_context.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index dbf07ba..4dd4f73 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -152,7 +152,6 @@ def process_payment(input_data: dict) -> dict: "Base URL is not set in environment variables or missing required parameters (log or action_name)." ) - # Log detailed response information logging.info( f"Detailed engine response - Status: {engine_response.get('status_code')}, " f"Payload: {engine_response.get('payload')}, " @@ -254,11 +253,24 @@ def process_payment(input_data: dict) -> dict: if engine_status in [ status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND, + status.HTTP_409_CONFLICT, ]: logging.error( f"ENGINE ERROR: Received {engine_status} from engine. " f"Original error: {type(e).__name__}: {e}" ) + if engine_status == status.HTTP_409_CONFLICT: + logging.error( + "Execution Paused or Terminated , no retries will be attempted." + ) + raise EndureException( + status_code=engine_status, + output=serialize_data({ + "error": str( + "Execution Paused or Terminated" + ) + }), + ) raise EndureException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, output=serialize_data({ @@ -287,11 +299,11 @@ def process_payment(input_data: dict) -> dict: ) else: logging.error( - "CRITICAL ERROR: No retry_at time provided by engine. " + "CRITICAL ERROR: No retry_at time provided by engine for retryable status {engine_status}. " f"Engine response: {engine_response}" ) raise RuntimeError( - f"Engine did not provide retry_at time. Response: {engine_response}" + f"Engine did not provide retry_at time for retryable status {engine_status}. Response: {engine_response}" ) case status.HTTP_208_ALREADY_REPORTED: logging.info( From 61a8b1e76472335ff3edcc5fa8e63a21a508adb7 Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 15:15:00 +0300 Subject: [PATCH 21/22] linting --- example/main.py | 3 +- src/app/__init__.py | 10 +++- src/app/_internal/internal_client.py | 62 ++++++++++---------- src/app/workflow_context.py | 85 +++++++++++++++------------- tests/conftest.py | 11 +++- 5 files changed, 98 insertions(+), 73 deletions(-) diff --git a/example/main.py b/example/main.py index cec5fd0..7ea50db 100644 --- a/example/main.py +++ b/example/main.py @@ -12,9 +12,10 @@ # logging to show all levels logging.basicConfig( level=logging.DEBUG, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) + def main(): app = FastAPI(title="Durable Execution Demo", version="1.0.0") diff --git a/src/app/__init__.py b/src/app/__init__.py index 41d8912..19f1842 100644 --- a/src/app/__init__.py +++ b/src/app/__init__.py @@ -1,7 +1,13 @@ from .app import DurableApp from .service import Service -from .types import (EndureException, ErrorResponse, Log, LogStatus, Response, - RetryMechanism) +from .types import ( + EndureException, + ErrorResponse, + Log, + LogStatus, + Response, + RetryMechanism, +) from .workflow_context import WorkflowContext __all__ = [ diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index 4057b4f..5e78d92 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -28,9 +28,11 @@ def send_log(self, execution_id: str, log: Log, action_name: str): requests.exceptions.HTTPError: If the request fails. """ # noqa: E501 try: - logging.info(f"Attempting to send log to engine - Execution ID: {execution_id}, Action: {action_name}") + logging.info( + f"Attempting to send log to engine - Execution ID: {execution_id}, Action: {action_name}" + ) logging.info(f"Base URL: {self._base_url}") - + if not self._base_url: logging.error( "DURABLE_ENGINE_BASE_URL is not set in environment variables." @@ -40,9 +42,7 @@ def send_log(self, execution_id: str, log: Log, action_name: str): ) if not log or not action_name: - logging.error( - "log and action_name must be provided." - ) + logging.error("log and action_name must be provided.") raise ValueError("log and action_name must be provided.") url = ( @@ -50,11 +50,11 @@ def send_log(self, execution_id: str, log: Log, action_name: str): ) headers = {"Content-Type": "application/json"} payload = log.to_dict() - + logging.info(f"Making request to: {url}") logging.info(f"Request headers: {headers}") logging.info(f"Request payload: {payload}") - + response = requests.patch(url, headers=headers, json=payload) logging.info( "Log sent to the Durable Execution Engine: {}".format(log) @@ -62,17 +62,13 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.info(f"Response status code: {response.status_code}") logging.info(f"Response headers: {dict(response.headers)}") logging.info("Response after sending log: {}".format(response)) - + response.raise_for_status() try: response_payload = response.json() - logging.info( - "Response payload: {}".format(response_payload) - ) + logging.info("Response payload: {}".format(response_payload)) except ValueError as e: - logging.error( - "Error parsing response payload: {}".format(e) - ) + logging.error("Error parsing response payload: {}".format(e)) logging.error(f"Raw response text: {response.text}") response_payload = {} response = Response( @@ -86,9 +82,7 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.error(f"HTTP ERROR: Text {e.response.text}") try: error_payload = e.response.json() - logging.info( - "Error payload: {}".format(error_payload) - ) + logging.info("Error payload: {}".format(error_payload)) except Exception as parse_error: error_payload = {} logging.error( @@ -105,9 +99,11 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.error(f"NETWORK ERROR: Error details: {e}") raise e except Exception as e: - logging.error(f"UNEXPECTED ERROR in send_log: {type(e).__name__}: {e}") + logging.error( + f"UNEXPECTED ERROR in send_log: {type(e).__name__}: {e}" + ) raise e - + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() @@ -127,9 +123,11 @@ def mark_execution_as_running(self, execution_id: str): requests.exceptions.HTTPError: If the request fails. """ try: - logging.info(f"Attempting to mark execution as running - Execution ID: {execution_id}") + logging.info( + f"Attempting to mark execution as running - Execution ID: {execution_id}" + ) logging.info(f"Base URL: {self._base_url}") - + if not self._base_url: logging.error( "DURABLE_ENGINE_BASE_URL is not set in environment variables." @@ -139,25 +137,27 @@ def mark_execution_as_running(self, execution_id: str): ) url = f"{self._base_url}/executions/{execution_id}/started" headers = {"Content-Type": "application/json"} - + logging.info(f"Making request to: {url}") logging.info(f"Request headers: {headers}") - + response = requests.patch(url, headers=headers) - logging.info( - "Execution marked as running: {}".format(response) - ) + logging.info("Execution marked as running: {}".format(response)) logging.info(f"Response status code: {response.status_code}") logging.info(f"Response headers: {dict(response.headers)}") logging.info( - "Response after marking execution as running: {}".format(response) + "Response after marking execution as running: {}".format( + response + ) ) response.raise_for_status() response = Response( status_code=response.status_code, ) except requests.exceptions.HTTPError as e: - logging.error(f"HTTP ERROR marking execution as running: Status {e.response.status_code}") + logging.error( + f"HTTP ERROR marking execution as running: Status {e.response.status_code}" + ) logging.error(f"HTTP ERROR: URL {e.response.url}") logging.error(f"HTTP ERROR: Text {e.response.text}") response = Response( @@ -170,8 +170,10 @@ def mark_execution_as_running(self, execution_id: str): logging.error(f"NETWORK ERROR: Error details: {e}") raise e except Exception as e: - logging.error(f"UNEXPECTED ERROR in mark_execution_as_running: {type(e).__name__}: {e}") + logging.error( + f"UNEXPECTED ERROR in mark_execution_as_running: {type(e).__name__}: {e}" + ) raise e - + logging.info(f"Returning response: {response.to_dict()}") return response.to_dict() diff --git a/src/app/workflow_context.py b/src/app/workflow_context.py index 4dd4f73..5bfa3c8 100644 --- a/src/app/workflow_context.py +++ b/src/app/workflow_context.py @@ -135,35 +135,34 @@ def process_payment(input_data: dict) -> dict: max_retries=max_retries, ) name = action_name if action_name is not None else action.__name__ - logging.info( - "Sending log for action: {}".format(log) - ) + logging.info("Sending log for action: {}".format(log)) engine_response = InternalEndureClient.send_log( self.execution_id, log, name ) - logging.info( - "Engine response: {}".format(engine_response) - ) + logging.info("Engine response: {}".format(engine_response)) if not engine_response: logging.error( - "CRITICAL ERROR: Engine response is None or empty. This indicates a communication failure with the durable engine." + "CRITICAL ERROR: Engine response is None or empty. " + "This indicates a communication failure with the durable engine." ) raise ValueError( "Base URL is not set in environment variables or missing required parameters (log or action_name)." ) - + logging.info( f"Detailed engine response - Status: {engine_response.get('status_code')}, " f"Payload: {engine_response.get('payload')}, " f"Headers: {engine_response.get('headers', {})}" ) - + status_code = engine_response["status_code"] logging.info(f"Processing status code: {status_code}") - + match status_code: case status.HTTP_201_CREATED | status.HTTP_200_OK: - logging.info(f"Status {status_code} - Proceeding with action execution") + logging.info( + f"Status {status_code} - Proceeding with action execution" + ) while True: try: try: @@ -174,9 +173,7 @@ def process_payment(input_data: dict) -> dict: result = await action(input_data) else: result = action(input_data) - logging.info( - "Action result: {}".format(result) - ) + logging.info("Action result: {}".format(result)) except (ValueError, ValidationError) as e: logging.error( f"VALIDATION ERROR in action {action.__name__}: {type(e).__name__}: {e}" @@ -190,7 +187,9 @@ def process_payment(input_data: dict) -> dict: name, ) logging.info( - "Engine response after validation error: {}".format(engine_response) + "Engine response after validation error: {}".format( + engine_response + ) ) logging.error( f"WORKFLOW DEBUG: About to raise exception of type {type(e)}: {e}" @@ -209,11 +208,11 @@ def process_payment(input_data: dict) -> dict: name, ) logging.info( - "Engine response after completion: {}".format(engine_response) - ) - logging.info( - "Returning result: {}".format(result) + "Engine response after completion: {}".format( + engine_response + ) ) + logging.info("Returning result: {}".format(result)) return result except ( ValueError, @@ -245,11 +244,15 @@ def process_payment(input_data: dict) -> dict: self.execution_id, log, name ) logging.info( - "Engine response after failure: {}".format(engine_response) + "Engine response after failure: {}".format( + engine_response + ) ) engine_status = engine_response.get("status_code") - logging.info(f"Engine status after failure: {engine_status}") - + logging.info( + f"Engine status after failure: {engine_status}" + ) + if engine_status in [ status.HTTP_400_BAD_REQUEST, status.HTTP_404_NOT_FOUND, @@ -265,31 +268,35 @@ def process_payment(input_data: dict) -> dict: ) raise EndureException( status_code=engine_status, - output=serialize_data({ - "error": str( - "Execution Paused or Terminated" - ) - }), + output=serialize_data( + { + "error": str( + "Execution Paused or Terminated" + ) + } + ), ) raise EndureException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - output=serialize_data({ - "error": str( - "Action failed after reaching max retries" - ) - }), + output=serialize_data( + { + "error": str( + "Action failed after reaching max retries" + ) + } + ), ) retry_at_unix = engine_response.get("payload", {}).get( "retry_at" ) - logging.info( - "Retry at unix: {}".format(retry_at_unix) - ) + logging.info("Retry at unix: {}".format(retry_at_unix)) if retry_at_unix: sleep_seconds = retry_at_unix - time.time() if sleep_seconds > 0: logging.info( - "Sleeping for {} seconds".format(sleep_seconds) + "Sleeping for {} seconds".format( + sleep_seconds + ) ) time.sleep(sleep_seconds) else: @@ -299,11 +306,13 @@ def process_payment(input_data: dict) -> dict: ) else: logging.error( - "CRITICAL ERROR: No retry_at time provided by engine for retryable status {engine_status}. " + f"CRITICAL ERROR: No retry_at time provided by " + f"engine for retryable status {engine_status}. " f"Engine response: {engine_response}" ) raise RuntimeError( - f"Engine did not provide retry_at time for retryable status {engine_status}. Response: {engine_response}" + f"Engine did not provide retry_at time for retryable status {engine_status}. " + f"Response: {engine_response}" ) case status.HTTP_208_ALREADY_REPORTED: logging.info( diff --git a/tests/conftest.py b/tests/conftest.py index 1cee49c..ebab7c8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,15 @@ import pytest -from app import (DurableApp, Log, LogStatus, Response, RetryMechanism, Service, - WorkflowContext) +from app import ( + DurableApp, + Log, + LogStatus, + Response, + RetryMechanism, + Service, + WorkflowContext, +) from app._internal import InternalEndureClient, ServiceRegistry From 3f11dd6c34323a929a89fa2673fc0277f94ca1e7 Mon Sep 17 00:00:00 2001 From: Farah Tharwat Date: Sun, 22 Jun 2025 15:18:30 +0300 Subject: [PATCH 22/22] safe checks on logs --- src/app/_internal/internal_client.py | 52 +++++++++++++++++++++------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/src/app/_internal/internal_client.py b/src/app/_internal/internal_client.py index 5e78d92..0759dfd 100644 --- a/src/app/_internal/internal_client.py +++ b/src/app/_internal/internal_client.py @@ -60,7 +60,11 @@ def send_log(self, execution_id: str, log: Log, action_name: str): "Log sent to the Durable Execution Engine: {}".format(log) ) logging.info(f"Response status code: {response.status_code}") - logging.info(f"Response headers: {dict(response.headers)}") + # Safety check for headers attribute (for MockResponse in tests) + if hasattr(response, 'headers'): + logging.info(f"Response headers: {dict(response.headers)}") + else: + logging.info("Response headers: Not available (MockResponse)") logging.info("Response after sending log: {}".format(response)) response.raise_for_status() @@ -69,7 +73,10 @@ def send_log(self, execution_id: str, log: Log, action_name: str): logging.info("Response payload: {}".format(response_payload)) except ValueError as e: logging.error("Error parsing response payload: {}".format(e)) - logging.error(f"Raw response text: {response.text}") + if hasattr(response, 'text'): + logging.error(f"Raw response text: {response.text}") + else: + logging.error("Raw response text: Not available (MockResponse)") response_payload = {} response = Response( status_code=response.status_code, @@ -78,16 +85,32 @@ def send_log(self, execution_id: str, log: Log, action_name: str): except requests.exceptions.HTTPError as e: logging.error(f"HTTP ERROR: Status {e.response.status_code}") logging.error(f"HTTP ERROR: URL {e.response.url}") - logging.error(f"HTTP ERROR: Headers {dict(e.response.headers)}") - logging.error(f"HTTP ERROR: Text {e.response.text}") + # Safety check for headers attribute (for MockResponse in tests) + if hasattr(e.response, 'headers'): + logging.error(f"HTTP ERROR: Headers {dict(e.response.headers)}") + else: + logging.error("HTTP ERROR: Headers Not available (MockResponse)") + # Safety check for text attribute (for MockResponse in tests) + if hasattr(e.response, 'text'): + logging.error(f"HTTP ERROR: Text {e.response.text}") + else: + logging.error("HTTP ERROR: Text Not available (MockResponse)") try: error_payload = e.response.json() - logging.info("Error payload: {}".format(error_payload)) + logging.info( + "Error payload: {}".format(error_payload) + ) except Exception as parse_error: error_payload = {} - logging.error( - f"Error parsing error payload: {parse_error}. Raw text: {e.response.text}" - ) + # Safety check for text attribute (for MockResponse in tests) + if hasattr(e.response, 'text'): + logging.error( + f"Error parsing error payload: {parse_error}. Raw text: {e.response.text}" + ) + else: + logging.error( + f"Error parsing error payload: {parse_error}. Raw text: Not available (MockResponse)" + ) response = Response( status_code=e.response.status_code, payload=error_payload, @@ -142,13 +165,16 @@ def mark_execution_as_running(self, execution_id: str): logging.info(f"Request headers: {headers}") response = requests.patch(url, headers=headers) - logging.info("Execution marked as running: {}".format(response)) + logging.info( + "Execution marked as running: {}".format(response) + ) logging.info(f"Response status code: {response.status_code}") - logging.info(f"Response headers: {dict(response.headers)}") + if hasattr(response, 'headers'): + logging.info(f"Response headers: {dict(response.headers)}") + else: + logging.info("Response headers: Not available (MockResponse)") logging.info( - "Response after marking execution as running: {}".format( - response - ) + "Response after marking execution as running: {}".format(response) ) response.raise_for_status() response = Response(