diff --git a/adk/README.md b/adk/README.md index c70fc44..2556b23 100644 --- a/adk/README.md +++ b/adk/README.md @@ -27,7 +27,7 @@ from agenticlayer.otel import setup_otel from google.adk.agents import LlmAgent # Set up OpenTelemetry instrumentation, logging and metrics -setup_otel() +setup_otel(capture_http_bodies=True) # Parse sub agents and tools from JSON configuration sub_agent, agent_tools = parse_sub_agents("{}") @@ -81,3 +81,22 @@ The JSON configuration for `AGENT_TOOLS` should follow this structure: The SDK automatically configures OpenTelemetry observability when running `setup_otel()`. You can customize the OTLP exporters using standard OpenTelemetry environment variables: https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/ + +### HTTP Body Logging + +By default, HTTP request/response bodies are not captured in traces for security and privacy reasons. To enable body +logging for debugging purposes, pass `enable_body_logging=True` to `setup_otel()`. + +When enabled, body logging applies to both: +- **HTTPX client requests/responses** (outgoing HTTP calls) +- **Starlette server requests/responses** (incoming HTTP requests to your app) + +Body logging behavior: +- Only text-based content types are logged (JSON, XML, plain text, form data) +- Bodies are truncated to 100KB to prevent memory issues +- Binary content (images, PDFs, etc.) is never logged +- Streaming requests/responses are skipped to avoid consuming streams +- All exceptions during body capture are logged but won't break HTTP requests + +**Note**: Starlette body logging is more limited than HTTPX because it must avoid consuming request/response streams. +Bodies are only captured when already buffered in the ASGI scope. diff --git a/adk/agenticlayer/agent_to_a2a.py b/adk/agenticlayer/agent_to_a2a.py index 9df4858..ee1d29c 100644 --- a/adk/agenticlayer/agent_to_a2a.py +++ b/adk/agenticlayer/agent_to_a2a.py @@ -5,7 +5,6 @@ import contextlib import logging -import os from typing import AsyncIterator, Awaitable, Callable from a2a.server.apps import A2AStarletteApplication @@ -157,10 +156,8 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: starlette_app = Starlette(lifespan=lifespan) # Instrument the Starlette app with OpenTelemetry - # env needs to be set here since _excluded_urls is initialized at module import time - os.environ.setdefault("OTEL_PYTHON_STARLETTE_EXCLUDED_URLS", AGENT_CARD_WELL_KNOWN_PATH) - from opentelemetry.instrumentation.starlette import StarletteInstrumentor + from .otel_starlette import instrument_starlette_app - StarletteInstrumentor().instrument_app(starlette_app) + instrument_starlette_app(starlette_app) return starlette_app diff --git a/adk/agenticlayer/otel.py b/adk/agenticlayer/otel.py index 4a9b8cd..665b82e 100644 --- a/adk/agenticlayer/otel.py +++ b/adk/agenticlayer/otel.py @@ -2,6 +2,7 @@ import logging +import httpx from openinference.instrumentation.google_adk import GoogleADKInstrumentor from opentelemetry import metrics, trace from opentelemetry._logs import set_logger_provider @@ -16,9 +17,78 @@ from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader from opentelemetry.sdk.trace.export import SimpleSpanProcessor +_logger = logging.getLogger(__name__) -def setup_otel() -> None: - """Set up OpenTelemetry tracing, logging and metrics.""" +_MAX_BODY_SIZE = 100 * 1024 # 100KB +_capture_http_bodies = False # Set by setup_otel() + + +def _is_text_content(content_type: str) -> bool: + """Check if content type is text-based and safe to log.""" + text_types = ("application/json", "application/xml", "text/", "application/x-www-form-urlencoded") + return any(ct in content_type.lower() for ct in text_types) + + +def _truncate_body(body: bytes) -> str: + """Safely truncate and decode body to string, limiting size.""" + if len(body) > _MAX_BODY_SIZE: + body = body[:_MAX_BODY_SIZE] + try: + decoded = body.decode("utf-8", errors="replace") + if len(body) == _MAX_BODY_SIZE: + decoded += f"\n... [truncated, exceeded {_MAX_BODY_SIZE} bytes]" + return decoded + except Exception: + _logger.exception("Failed to decode body content") + return "[body decoding failed]" + + +def request_hook(span: trace.Span, request: httpx.Request) -> None: + """Hook to capture request body in traces if enabled.""" + if not _capture_http_bodies: + return + + try: + # Skip streaming requests to avoid consuming the stream + if hasattr(request, "stream") and request.stream is not None: + return + + content_type = request.headers.get("content-type", "") + if _is_text_content(content_type) and hasattr(request, "content") and request.content: + span.set_attribute("http.request.body", _truncate_body(request.content)) + except Exception: + _logger.exception("Failed to capture request body in trace") + + +def response_hook(span: trace.Span, request: httpx.Request, response: httpx.Response) -> None: + """Hook to capture response body in traces if enabled.""" + if not _capture_http_bodies: + return + + try: + # Skip streaming responses to avoid consuming the stream + # Check both the is_stream_consumed flag and if stream is still active + if hasattr(response, "is_stream_consumed") and not response.is_stream_consumed: + return + + content_type = response.headers.get("content-type", "") + if _is_text_content(content_type) and hasattr(response, "content") and response.content: + span.set_attribute("http.response.body", _truncate_body(response.content)) + except Exception: + _logger.exception("Failed to capture response body in trace") + + +def setup_otel(capture_http_bodies: bool = False) -> None: + """Set up OpenTelemetry tracing, logging and metrics. + + Args: + capture_http_bodies: Enable capturing HTTP request/response bodies in traces. + Only text-based content types are logged, truncated to 100KB. + Streaming requests/responses are skipped to avoid consuming streams. + Defaults to False for security/privacy reasons. + """ + global _capture_http_bodies + _capture_http_bodies = capture_http_bodies # Set log level for urllib to WARNING to reduce noise (like sending logs to OTLP) logging.getLogger("urllib3").setLevel(logging.WARNING) @@ -32,7 +102,10 @@ def setup_otel() -> None: # Instrument Google ADK using openinference instrumentation GoogleADKInstrumentor().instrument() # Instrument HTTPX clients (this also transfers the trace context automatically) - HTTPXClientInstrumentor().instrument() + HTTPXClientInstrumentor().instrument( + request_hook=request_hook, + response_hook=response_hook, + ) # Logs logger_provider = LoggerProvider() diff --git a/adk/agenticlayer/otel_starlette.py b/adk/agenticlayer/otel_starlette.py new file mode 100644 index 0000000..4a67ee2 --- /dev/null +++ b/adk/agenticlayer/otel_starlette.py @@ -0,0 +1,109 @@ +"""OpenTelemetry instrumentation for Starlette applications.""" + +import logging +import os +from typing import Any, Dict + +from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH +from opentelemetry import trace +from starlette.applications import Starlette + +from .otel import _capture_http_bodies, _is_text_content, _truncate_body + +_logger = logging.getLogger(__name__) + + +def _starlette_server_request_hook(span: trace.Span, scope: Dict[str, Any]) -> None: + """Hook to capture Starlette request body in traces if enabled. + + Note: This captures the body from the ASGI scope's cached body if available. + It does not consume the request stream to avoid breaking request handling. + """ + + if not _capture_http_bodies: + return + + try: + # Only process HTTP requests + if scope.get("type") != "http": + return + + # Check if body is cached in scope (some middleware/frameworks cache it) + # Don't try to read the stream directly as it would consume it + if "body" in scope: + body = scope["body"] + if body: + # Get content type from headers + headers = dict(scope.get("headers", [])) + content_type = headers.get(b"content-type", b"").decode("latin1") + + if _is_text_content(content_type): + span.set_attribute("http.request.body", _truncate_body(body)) + except Exception: + _logger.exception("Failed to capture Starlette request body in trace") + + +def _starlette_client_request_hook(span: trace.Span, scope: Dict[str, Any], message: Dict[str, Any]) -> None: + """Hook to capture Starlette client request body in traces if enabled.""" + # Import here to avoid circular dependency + from .otel import _capture_http_bodies, _is_text_content, _truncate_body + + if not _capture_http_bodies: + return + + try: + # Capture body from the message if available and it's the body message + if message.get("type") == "http.request" and "body" in message: + body = message["body"] + if body: + # Get content type from scope headers + headers = dict(scope.get("headers", [])) + content_type = headers.get(b"content-type", b"").decode("latin1") + + if _is_text_content(content_type): + span.set_attribute("http.request.body", _truncate_body(body)) + except Exception: + _logger.exception("Failed to capture Starlette client request body in trace") + + +def _starlette_client_response_hook(span: trace.Span, scope: Dict[str, Any], message: Dict[str, Any]) -> None: + """Hook to capture Starlette client response body in traces if enabled.""" + + if not _capture_http_bodies: + return + + try: + # Capture body from response message + if message.get("type") == "http.response.body" and "body" in message: + body = message["body"] + if body: + # We don't have easy access to response headers here + # Could try to get from span attributes if set earlier + span.set_attribute("http.response.body", _truncate_body(body)) + except Exception: + _logger.exception("Failed to capture Starlette client response body in trace") + + +def instrument_starlette_app(app: Starlette) -> None: + """Instrument a Starlette application with OpenTelemetry. + + Args: + app: The Starlette application to instrument + + Note: + Body logging is controlled by the enable_body_logging parameter passed to setup_otel(). + This should be called after setup_otel() has been called to set up the tracer provider. + Body logging for Starlette is limited compared to HTTPX as it must avoid consuming + request/response streams. Bodies are only captured when already buffered in the ASGI scope. + """ + + # env needs to be set here since _excluded_urls is initialized at module import time + os.environ.setdefault("OTEL_PYTHON_STARLETTE_EXCLUDED_URLS", AGENT_CARD_WELL_KNOWN_PATH) + from opentelemetry.instrumentation.starlette import StarletteInstrumentor + + StarletteInstrumentor().instrument_app( + app, + server_request_hook=_starlette_server_request_hook, + client_request_hook=_starlette_client_request_hook, + client_response_hook=_starlette_client_response_hook, + )