Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api_server_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,31 @@

from cloud_pipelines_backend import api_router
from cloud_pipelines_backend import database_ops
from cloud_pipelines_backend.instrumentation import api_tracing
from cloud_pipelines_backend.instrumentation import contextual_logging

app = fastapi.FastAPI(
title="Cloud Pipelines API",
version="0.0.1",
separate_input_output_schemas=False,
)

# Add request context middleware for automatic request_id generation
app.add_middleware(api_tracing.RequestContextMiddleware)


@app.exception_handler(Exception)
def handle_error(request: fastapi.Request, exc: BaseException):
exception_str = traceback.format_exception(type(exc), exc, exc.__traceback__)
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=503,
content={"exception": exception_str},
)
# Add request_id to error responses for traceability
request_id = contextual_logging.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response


DEFAULT_DATABASE_URI = "sqlite:///db.sqlite"
Expand Down
15 changes: 13 additions & 2 deletions cloud_pipelines_backend/api_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from . import component_library_api_server as components_api
from . import database_ops
from . import errors
from .instrumentation import contextual_logging

if typing.TYPE_CHECKING:
from .launchers import interfaces as launcher_interfaces
Expand Down Expand Up @@ -95,17 +96,27 @@ def _setup_routes_internal(

@app.exception_handler(errors.ItemNotFoundError)
def handle_not_found_error(request: fastapi.Request, exc: errors.ItemNotFoundError):
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=404,
content={"message": str(exc)},
)
# Add request_id to error responses for traceability
request_id = contextual_logging.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response

@app.exception_handler(errors.PermissionError)
def handle_permission_error(request: fastapi.Request, exc: errors.PermissionError):
return fastapi.responses.JSONResponse(
response = fastapi.responses.JSONResponse(
status_code=403,
content={"message": str(exc)},
)
# Add request_id to error responses for traceability
request_id = contextual_logging.get_context_metadata("request_id")
if request_id:
response.headers["x-tangle-request-id"] = request_id
return response

get_user_details_dependency = fastapi.Depends(user_details_getter)

Expand Down
64 changes: 64 additions & 0 deletions cloud_pipelines_backend/instrumentation/api_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Request context middleware for FastAPI applications.

This middleware automatically generates a request_id for each incoming HTTP request,
sets it in the logging context for the duration of the request, and includes it in
the response headers.
"""

import logging
import secrets

from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response

from . import contextual_logging

logger = logging.getLogger(__name__)


def generate_request_id() -> str:
"""Generate a new request ID compatible with OpenTelemetry format.

OpenTelemetry trace IDs are 16-byte (128-bit) values represented as
32 hexadecimal characters (lowercase). We use the same format for
request IDs to maintain compatibility.

Returns:
A 32-character hexadecimal string representing the request ID
"""
return secrets.token_hex(16)


class RequestContextMiddleware(BaseHTTPMiddleware):
"""Middleware to manage request_id for each request.

For each incoming request:
1. Generates a new request_id (32-character hex string)
2. Sets it in the logging context (as 'request_id' key)
3. Adds it to the response headers as 'x-tangle-request-id'
4. Clears it after the request completes

This ensures all logs during the request processing include the same request_id.
"""

async def dispatch(self, request: Request, call_next) -> Response:
"""Process each request with a new request_id.

Args:
request: The incoming HTTP request
call_next: The next middleware or route handler

Returns:
The HTTP response with request_id in headers
"""
# Generate a new request_id for this request
request_id = generate_request_id()

# Use generic logging_context to set request_id
with contextual_logging.logging_context(request_id=request_id):
# Process the request
response = await call_next(request)
# Add request_id to response headers for client reference
response.headers["x-tangle-request-id"] = request_id
return response
127 changes: 127 additions & 0 deletions cloud_pipelines_backend/instrumentation/contextual_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Logging context management for distributed tracing and execution tracking.

This module provides utilities for managing arbitrary metadata in the logging context.
This metadata is automatically added to all log records for better filtering and correlation.

Common metadata keys:
- request_id: From API requests - groups all logs from a single API call
- pipeline_run_id: From PipelineRun.id - tracks the entire pipeline run
- execution_id: From ExecutionNode.id - tracks individual execution nodes
- container_execution_id: From ContainerExecution.id - tracks running containers
- user_id: User who initiated the operation
- Any other metadata you want to track in logs

Usage:
# Set metadata in context
with logging_context(request_id="abc123", user_id="user@example.com"):
logger.info("Processing") # Both fields in logs

# Or use individual functions
set_context_metadata("request_id", "abc123")
delete_context_metadata("request_id") # Remove a specific key
"""

import contextvars
from contextlib import contextmanager
from typing import Any, Optional

# Single context variable to store all metadata as a dictionary
_context_metadata: contextvars.ContextVar[dict[str, Any]] = contextvars.ContextVar(
"context_metadata", default={}
)


def set_context_metadata(key: str, value: Any) -> None:
"""Set a metadata value in the current context.

Args:
key: The metadata key (e.g., 'execution_id', 'request_id', 'user_id')
value: The value to set
"""
metadata = _context_metadata.get().copy()
metadata[key] = value
_context_metadata.set(metadata)


def delete_context_metadata(key: str) -> None:
"""Delete a metadata key from the current context.

Similar to dict.pop() but doesn't return a value. If the key doesn't exist,
this is a no-op (no error is raised).

Args:
key: The metadata key to delete (e.g., 'execution_id', 'request_id')
"""
metadata = _context_metadata.get().copy()
metadata.pop(key, None) # Use None as default to avoid KeyError
_context_metadata.set(metadata)


def get_context_metadata(key: str) -> Optional[Any]:
"""Get a metadata value from the current context.

Args:
key: The metadata key to retrieve

Returns:
The metadata value or None if not set
"""
return _context_metadata.get().get(key)


def get_all_context_metadata() -> dict[str, Any]:
"""Get all metadata from the current context.

Returns:
Dictionary of all context metadata
"""
return _context_metadata.get().copy()


def clear_context_metadata() -> None:
"""Clear all metadata from the current context."""
_context_metadata.set({})


@contextmanager
def logging_context(**metadata: Any):
"""Context manager for setting arbitrary metadata that is automatically cleared.

This is the recommended way to set logging context. It ensures metadata is
always cleaned up, even if an exception occurs.

You can pass any keyword arguments, and they will be available in log records.
Common keys include: request_id, pipeline_run_id, execution_id, container_execution_id, user_id

Args:
**metadata: Arbitrary keyword arguments to add to the context

Example with IDs:
>>> with logging_context(pipeline_run_id="run123", execution_id="exec456"):
... logger.info("Processing execution") # Will include both IDs

Example with custom metadata:
>>> with logging_context(
... execution_id="exec456",
... user_id="user@example.com",
... operation="reprocessing"
... ):
... logger.info("Custom operation") # All metadata in logs

Example for API requests:
>>> request_id = generate_request_id()
>>> with logging_context(request_id=request_id):
... logger.info("Handling API request")
"""
# Store previous metadata to restore nested contexts
prev_metadata = get_all_context_metadata()

try:
# Set all provided metadata
for key, value in metadata.items():
if value is not None: # Only set non-None values
set_context_metadata(key, value)
yield
finally:
# Restore previous metadata
_context_metadata.set(prev_metadata)
50 changes: 50 additions & 0 deletions cloud_pipelines_backend/instrumentation/structured_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""Structured logging components for context-aware log formatting.

This module provides logging filters and formatters that integrate with the
contextual_logging module to automatically include context metadata in log records.
"""

import logging

from . import contextual_logging


class LoggingContextFilter(logging.Filter):
"""Logging filter that adds contextual metadata to log records.

This filter automatically adds metadata like execution_id and container_execution_id
to log records, making it easier to trace logs for specific executions.
"""

def filter(self, record: logging.LogRecord) -> bool:
"""Add contextual metadata to the log record."""
for key, value in contextual_logging.get_all_context_metadata().items():
if value is not None:
setattr(record, key, value)
return True


class ContextAwareFormatter(logging.Formatter):
"""Formatter that dynamically includes context fields only when they're set."""

def format(self, record: logging.LogRecord) -> str:
"""Format log record with dynamic context fields."""
# Base format
base_format = "%(asctime)s [%(levelname)s] %(name)s"

# Collect context fields that are present
context_parts = []
context_metadata = contextual_logging.get_all_context_metadata()
for key, value in context_metadata.items():
if value is not None and hasattr(record, key):
context_parts.append(f"{key}={value}")

# Add context to format if any exists
if context_parts:
base_format += " [" + " ".join(context_parts) + "]"

base_format += ": %(message)s"

# Create formatter with the dynamic format
formatter = logging.Formatter(base_format)
return formatter.format(record)
Loading