Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
8e2e8db
adjust project structure
Farahtharwat35 Jun 14, 2025
fa3a316
fix all modules imports issues
Farahtharwat35 Jun 15, 2025
87f8299
fix linting problems
Farahtharwat35 Jun 15, 2025
14eb4de
git commit -m "Rename Example folder to example"
Farahtharwat35 Jun 15, 2025
c592a2e
add test_service_registry.py
Farahtharwat35 Jun 15, 2025
6cbd78b
finalize test_service_registry
Farahtharwat35 Jun 15, 2025
aaa2bbf
add test_worklow.py
Farahtharwat35 Jun 15, 2025
7954004
trigger linter
Farahtharwat35 Jun 15, 2025
88801d9
test wokflow_context.py
Farahtharwat35 Jun 15, 2025
0d0e7f6
linting fix
Farahtharwat35 Jun 15, 2025
f19c2ab
test internal_client.py
Farahtharwat35 Jun 16, 2025
2e05297
test service.py
Farahtharwat35 Jun 16, 2025
cd15096
add and test the input and output extraction from a workflow fn
Farahtharwat35 Jun 16, 2025
7fb29ad
add exceptions test cases for workflow class
Farahtharwat35 Jun 16, 2025
7286683
add test_app.py
Farahtharwat35 Jun 16, 2025
18526a3
update test_app.py
Farahtharwat35 Jun 16, 2025
d84685c
linting triggered
Farahtharwat35 Jun 16, 2025
8904057
adjust functions descriptions
Farahtharwat35 Jun 17, 2025
6cca019
revert patch changes
Farahtharwat35 Jun 18, 2025
6cc2a65
Revert "revert patch changes"
Farahtharwat35 Jun 18, 2025
dc507bc
adjust workflow_context and internal_client to properly handle except…
Farahtharwat35 Jun 18, 2025
da6df6f
adjust workflow_context and internal_client to be in a try except block
Farahtharwat35 Jun 18, 2025
e123655
re-raising exceptions for users using WorkflowContext
Farahtharwat35 Jun 18, 2025
1c55be8
adjust internal_client.py test cases
Farahtharwat35 Jun 18, 2025
3fc52b9
adjust workflow test cases
Farahtharwat35 Jun 19, 2025
f472bab
refactor WorkflowContext class and update test cases
Farahtharwat35 Jun 19, 2025
e91541c
update test cases for internal_client.py
Farahtharwat35 Jun 19, 2025
6fd4f39
adjust input/ctx condition
Farahtharwat35 Jun 19, 2025
8819528
adjust workflow_context to support async action execution
Farahtharwat35 Jun 19, 2025
5be3666
add test case
Farahtharwat35 Jun 19, 2025
7617edd
add test cases for input/ctx validation
Farahtharwat35 Jun 19, 2025
b310b75
trigger linter
Farahtharwat35 Jun 19, 2025
80900c3
add logging instead of prints
Farahtharwat35 Jun 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[flake8]
max-line-length = 115
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ __pycache__/

.vscode/
.idea/
.DS_Store
.DS_Store
.coverage
1 change: 0 additions & 1 deletion Example/app/__init__.py

This file was deleted.

File renamed without changes.
1 change: 1 addition & 0 deletions example/app/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# This file is intentionally left empty.
2 changes: 1 addition & 1 deletion Example/app/main.py → example/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# # Define a route for the root URL ("/")
# @app.get("/")
# def hello_world():
# return {"message": "Hello, World! This is a Python app running in Docker with Uvicorn."}
# return {"message": "Hello, World! This is a Python app running in Docker with Uvicorn."}
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ format:
lint:
poetry run flake8 .
test:
poetry run python -m pytest test/ --verbose
poetry run python -m pytest tests/ -v --cov=app


659 changes: 433 additions & 226 deletions poetry.lock

Large diffs are not rendered by default.

14 changes: 12 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[tool.poetry]
name = "durable-execution-engine-sdk"
version = "0.1.0"
packages = [{include = "src"}]
description = ""
authors = []
packages = [{ include = "app", from = "src" }]

[tool.poetry.dependencies]
python = "^3.10"
Expand All @@ -11,8 +13,16 @@ flake8 = "^7.1.2"
pytest = "^8.3.5"
pytest-asyncio = "^0.25.3"
fastapi = "^0.115.12"
pydantic = "^2.11.5"
pydantic = "<2.0.0"
requests = "^2.32.4"
httpx = "^0.28.1"

[tool.poetry.group.dev.dependencies]
pytest-cov = "^6.2.1"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.black]
line-length = 79
24 changes: 24 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[pytest]
# src directory to Python path so tests can import from app
pythonpath = src

# Where to look for test files
testpaths = tests

# Pattern for test files to discover
python_files = test_*.py

# Pattern for test classes
python_classes = Test

# Pattern for test functions
python_functions = test_*

asyncio_mode = auto

# Command line options to always include
addopts = -ra -q --cov=app

# Environment variables for tests
env =
DURABLE_ENGINE_BASE_URL=http://test-engine:8000
16 changes: 0 additions & 16 deletions requirements.txt

This file was deleted.

24 changes: 23 additions & 1 deletion src/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,23 @@
__all__ = ["workflow_context", "service", "app"]
from .app import DurableApp
from .service import Service
from .workflow_context import WorkflowContext
from .types import (
EndureException,
ErrorResponse,
Response,
Log,
LogStatus,
RetryMechanism,
)

__all__ = [
"DurableApp",
"Service",
"WorkflowContext",
"EndureException",
"ErrorResponse",
"Response",
"Log",
"LogStatus",
"RetryMechanism",
]
15 changes: 15 additions & 0 deletions src/app/_internal/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
Internal implementation details. Do not import directly.
This module is for internal use by app.py, service.py, and workflow_context.py only.
"""
from .internal_client import InternalEndureClient
from .service_registry import ServiceRegistry
from .utils import validate_retention_period
from .workflow import Workflow

__all__ = [
"InternalEndureClient",
"ServiceRegistry",
"validate_retention_period",
"Workflow",
]
104 changes: 104 additions & 0 deletions src/app/_internal/internal_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os
import logging
import requests
from ..types import Log, Response


class InternalEndureClient:

_base_url = os.getenv("DURABLE_ENGINE_BASE_URL")

@classmethod
def send_log(self, execution_id: str, log: Log, action_name: str):
"""
Sends a log message to the Durable Execution Engine.

Args:
execution_id (str): The ID of the execution context.
log (Log): The log message object to send.
action_name (str): The name of the action.

Returns:
dict: A dictionary containing the response from the Durable Execution Engine.

Raises:
ValueError: If DURABLE_ENGINE_BASE_URL is not set or if required parameters are missing.
requests.exceptions.HTTPError: If the request fails.
""" # noqa: E501
try:
if not self._base_url:
raise ValueError(
"DURABLE_ENGINE_BASE_URL is not set in environment variables."
)

if not log or not action_name:
raise ValueError("log and action_name must be provided.")

url = (
f"{self._base_url}/executions/{execution_id}/log/{action_name}"
)
headers = {"Content-Type": "application/json"}
payload = log.to_dict()
response = requests.patch(url, headers=headers, json=payload)
response.raise_for_status()
try:
response_payload = response.json()
except ValueError:
response_payload = {}
response = Response(
status_code=response.status_code,
payload=response_payload,
)
except requests.exceptions.HTTPError as e:
try:
error_payload = e.response.json()
except Exception:
error_payload = {}
response = Response(
status_code=e.response.status_code,
payload=error_payload,
)
except requests.exceptions.RequestException as e:
logging.error(
"Engine is unreachable. Aborting retries: {}".format(e)
)
raise e
return response.to_dict()

@classmethod
def mark_execution_as_running(self, execution_id: str):
"""
Marks an execution as running in the Durable Execution Engine.

Args:
execution_id (str): The ID of the execution context.

Returns:
dict: A dictionary containing the response from the Durable Execution Engine.

Raises:
ValueError: If DURABLE_ENGINE_BASE_URL is not set or if execution_id is missing.
requests.exceptions.HTTPError: If the request fails.
"""
try:
if not self._base_url:
raise ValueError(
"DURABLE_ENGINE_BASE_URL is not set in environment variables."
)
url = f"{self._base_url}/executions/{execution_id}/started"
headers = {"Content-Type": "application/json"}
response = requests.patch(url, headers=headers)
response.raise_for_status()
response = Response(
status_code=response.status_code,
)
except requests.exceptions.HTTPError as e:
response = Response(
status_code=e.response.status_code,
)
except requests.exceptions.RequestException as e:
logging.error(
"Engine is unreachable. Aborting retries: {}".format(e)
)
raise e
return response.to_dict()
126 changes: 126 additions & 0 deletions src/app/_internal/service_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from typing import Dict, List

from fastapi import APIRouter

from .workflow import Workflow


class ServiceRegistry:
"""
Singleton class for managing durable workflow services and their API routes in FastAPI.
Each service can contain multiple workflows, and each workflow gets its own API endpoint
for execution.

Attributes:
_instance (ServiceRegistry): The singleton instance of the registry.
_services (Dict[str, List[Workflow]]): Mapping of service names to lists of registered workflows.
_router (APIRouter): FastAPI router containing dynamically registered workflow endpoints.

Methods:
__new__(cls): Creates or returns the singleton instance.
register_workflow(service_name: str, workflow: Workflow): Registers a workflow under a service.
register_workflow_in_router(service_name: str, workflow: Workflow): Creates an API endpoint for the workflow.
get_services() -> Dict[str, List[Workflow]]: Returns a copy of registered services and workflows.
get_router() -> APIRouter: Returns the router with all workflow endpoints.
clear(): Resets the registry to its initial state.
""" # noqa: E501

_instance = None
_services: Dict[str, List[Workflow]]
_router: APIRouter

def __new__(cls):
"""
Implements the singleton pattern, ensuring only one instance of ServiceRegistry exists.
Creates and initializes the instance if it doesn't exist, otherwise returns the existing instance.

Returns:
ServiceRegistry: The singleton instance of the registry.
"""
if cls._instance is None:
cls._instance = super(ServiceRegistry, cls).__new__(cls)
cls._instance._services = {}
cls._instance._router = APIRouter()
return cls._instance

def register_workflow(self, service_name: str, workflow: Workflow):
"""
Registers a workflow under the specified service name. If the service doesn't exist,
it will be created. Prevents duplicate workflow names within the same service.

Args:
service_name (str): Name of the service to register the workflow under.
workflow (Workflow): The workflow instance to register.

Raises:
ValueError: If service_name is empty or not a string,
if workflow is None or not a Workflow instance,
or if a workflow with the same name already exists in the service.
"""
if not service_name or not isinstance(service_name, str):
raise ValueError("Service name must be a non-empty string")
if not workflow or not isinstance(workflow, Workflow):
raise ValueError("Workflow must be a valid Workflow instance")

if service_name not in self._services:
self._services[service_name] = []

# checks for duplicate workflow names within the service
if any(w.name == workflow.name for w in self._services[service_name]):
raise ValueError(
f"Workflow with name '{workflow.name}' already exists in service '{service_name}'"
)

self._services[service_name].append(workflow)

def register_workflow_in_router(
self, service_name: str, workflow: Workflow
):
"""
Creates an API endpoint for the workflow under the given service name.
The endpoint will be available at /execute/{service_name}/{workflow.name}
and will accept POST requests.

Args:
service_name (str): The service name to use in the endpoint path.
workflow (Workflow): The workflow whose handler will be registered.
"""
self._router.add_api_route(
f"/execute/{service_name}/{workflow.name}",
workflow.get_handler_route(),
methods=["POST"],
)

def get_services(self) -> Dict[str, List[Workflow]]:
"""
Returns a shallow copy of the services dictionary to prevent direct modification
of the internal state.

Returns:
Dict[str, List[Workflow]]: A copy of the mapping between service names and their workflows.
"""
return self._services.copy()

def get_router(self) -> APIRouter:
"""
Returns the FastAPI router containing all registered workflow endpoints.

Returns:
APIRouter: The router with all workflow endpoints.
"""
return self._router

def clear(self):
"""
Resets the registry to its initial state by:
- Clearing all registered services
- Creating a new empty router
- Resetting the singleton instance
- Creating a new instance

This is primarily useful for testing purposes.
"""
self._services.clear()
self._router = APIRouter()
self.__class__._instance = None
self.__class__()
16 changes: 16 additions & 0 deletions src/app/_internal/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
def validate_retention_period(retention: int) -> None:
"""
Validate that the retention period is a non-negative integer.

Args:
retention (int): The retention period in days.

Raises:
ValueError: If the retention period is not a non-negative integer.
"""
if not isinstance(retention, int):
raise ValueError("Retention period must be an integer.")
if retention < 0:
raise ValueError("Retention period must be a non-negative integer.")
if retention > 30:
raise ValueError("Retention period cannot exceed 30 days.")
Loading