Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion adk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from agenticlayer.otel import setup_otel
from google.adk.agents import LlmAgent

# Set up OpenTelemetry instrumentation, logging and metrics
setup_otel()
setup_otel(capture_http_bodies=True)

# Parse sub agents and tools from JSON configuration
sub_agent, agent_tools = parse_sub_agents("{}")
Expand Down Expand Up @@ -81,3 +81,22 @@ The JSON configuration for `AGENT_TOOLS` should follow this structure:
The SDK automatically configures OpenTelemetry observability when running `setup_otel()`. You can customize the OTLP
exporters using standard OpenTelemetry environment variables:
https://opentelemetry.io/docs/specs/otel/configuration/sdk-environment-variables/

### HTTP Body Logging

By default, HTTP request/response bodies are not captured in traces for security and privacy reasons. To enable body
logging for debugging purposes, pass `enable_body_logging=True` to `setup_otel()`.

When enabled, body logging applies to both:
- **HTTPX client requests/responses** (outgoing HTTP calls)
- **Starlette server requests/responses** (incoming HTTP requests to your app)

Body logging behavior:
- Only text-based content types are logged (JSON, XML, plain text, form data)
- Bodies are truncated to 100KB to prevent memory issues
- Binary content (images, PDFs, etc.) is never logged
- Streaming requests/responses are skipped to avoid consuming streams
- All exceptions during body capture are logged but won't break HTTP requests

**Note**: Starlette body logging is more limited than HTTPX because it must avoid consuming request/response streams.
Bodies are only captured when already buffered in the ASGI scope.
7 changes: 2 additions & 5 deletions adk/agenticlayer/agent_to_a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import contextlib
import logging
import os
from typing import AsyncIterator, Awaitable, Callable

from a2a.server.apps import A2AStarletteApplication
Expand Down Expand Up @@ -157,10 +156,8 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]:
starlette_app = Starlette(lifespan=lifespan)

# Instrument the Starlette app with OpenTelemetry
# env needs to be set here since _excluded_urls is initialized at module import time
os.environ.setdefault("OTEL_PYTHON_STARLETTE_EXCLUDED_URLS", AGENT_CARD_WELL_KNOWN_PATH)
from opentelemetry.instrumentation.starlette import StarletteInstrumentor
from .otel_starlette import instrument_starlette_app

StarletteInstrumentor().instrument_app(starlette_app)
instrument_starlette_app(starlette_app)

return starlette_app
79 changes: 76 additions & 3 deletions adk/agenticlayer/otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging

import httpx
from openinference.instrumentation.google_adk import GoogleADKInstrumentor
from opentelemetry import metrics, trace
from opentelemetry._logs import set_logger_provider
Expand All @@ -16,9 +17,78 @@
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.trace.export import SimpleSpanProcessor

_logger = logging.getLogger(__name__)

def setup_otel() -> None:
"""Set up OpenTelemetry tracing, logging and metrics."""
_MAX_BODY_SIZE = 100 * 1024 # 100KB
_capture_http_bodies = False # Set by setup_otel()


def _is_text_content(content_type: str) -> bool:
"""Check if content type is text-based and safe to log."""
text_types = ("application/json", "application/xml", "text/", "application/x-www-form-urlencoded")
return any(ct in content_type.lower() for ct in text_types)


def _truncate_body(body: bytes) -> str:
"""Safely truncate and decode body to string, limiting size."""
if len(body) > _MAX_BODY_SIZE:
body = body[:_MAX_BODY_SIZE]
try:
decoded = body.decode("utf-8", errors="replace")
if len(body) == _MAX_BODY_SIZE:
decoded += f"\n... [truncated, exceeded {_MAX_BODY_SIZE} bytes]"
return decoded
except Exception:
_logger.exception("Failed to decode body content")
return "[body decoding failed]"


def request_hook(span: trace.Span, request: httpx.Request) -> None:
"""Hook to capture request body in traces if enabled."""
if not _capture_http_bodies:
return

try:
# Skip streaming requests to avoid consuming the stream
if hasattr(request, "stream") and request.stream is not None:
return

content_type = request.headers.get("content-type", "")
if _is_text_content(content_type) and hasattr(request, "content") and request.content:
span.set_attribute("http.request.body", _truncate_body(request.content))
except Exception:
_logger.exception("Failed to capture request body in trace")


def response_hook(span: trace.Span, request: httpx.Request, response: httpx.Response) -> None:
"""Hook to capture response body in traces if enabled."""
if not _capture_http_bodies:
return

try:
# Skip streaming responses to avoid consuming the stream
# Check both the is_stream_consumed flag and if stream is still active
if hasattr(response, "is_stream_consumed") and not response.is_stream_consumed:
return

content_type = response.headers.get("content-type", "")
if _is_text_content(content_type) and hasattr(response, "content") and response.content:
span.set_attribute("http.response.body", _truncate_body(response.content))
except Exception:
_logger.exception("Failed to capture response body in trace")


def setup_otel(capture_http_bodies: bool = False) -> None:
"""Set up OpenTelemetry tracing, logging and metrics.

Args:
capture_http_bodies: Enable capturing HTTP request/response bodies in traces.
Only text-based content types are logged, truncated to 100KB.
Streaming requests/responses are skipped to avoid consuming streams.
Defaults to False for security/privacy reasons.
"""
global _capture_http_bodies
_capture_http_bodies = capture_http_bodies

# Set log level for urllib to WARNING to reduce noise (like sending logs to OTLP)
logging.getLogger("urllib3").setLevel(logging.WARNING)
Expand All @@ -32,7 +102,10 @@ def setup_otel() -> None:
# Instrument Google ADK using openinference instrumentation
GoogleADKInstrumentor().instrument()
# Instrument HTTPX clients (this also transfers the trace context automatically)
HTTPXClientInstrumentor().instrument()
HTTPXClientInstrumentor().instrument(
request_hook=request_hook,
response_hook=response_hook,
)

# Logs
logger_provider = LoggerProvider()
Expand Down
109 changes: 109 additions & 0 deletions adk/agenticlayer/otel_starlette.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""OpenTelemetry instrumentation for Starlette applications."""

import logging
import os
from typing import Any, Dict

from a2a.utils.constants import AGENT_CARD_WELL_KNOWN_PATH
from opentelemetry import trace
from starlette.applications import Starlette

from .otel import _capture_http_bodies, _is_text_content, _truncate_body

_logger = logging.getLogger(__name__)


def _starlette_server_request_hook(span: trace.Span, scope: Dict[str, Any]) -> None:
"""Hook to capture Starlette request body in traces if enabled.

Note: This captures the body from the ASGI scope's cached body if available.
It does not consume the request stream to avoid breaking request handling.
"""

if not _capture_http_bodies:
return

try:
# Only process HTTP requests
if scope.get("type") != "http":
return

# Check if body is cached in scope (some middleware/frameworks cache it)
# Don't try to read the stream directly as it would consume it
if "body" in scope:
body = scope["body"]
if body:
# Get content type from headers
headers = dict(scope.get("headers", []))
content_type = headers.get(b"content-type", b"").decode("latin1")

if _is_text_content(content_type):
span.set_attribute("http.request.body", _truncate_body(body))
except Exception:
_logger.exception("Failed to capture Starlette request body in trace")


def _starlette_client_request_hook(span: trace.Span, scope: Dict[str, Any], message: Dict[str, Any]) -> None:
"""Hook to capture Starlette client request body in traces if enabled."""
# Import here to avoid circular dependency
from .otel import _capture_http_bodies, _is_text_content, _truncate_body

if not _capture_http_bodies:
return

try:
# Capture body from the message if available and it's the body message
if message.get("type") == "http.request" and "body" in message:
body = message["body"]
if body:
# Get content type from scope headers
headers = dict(scope.get("headers", []))
content_type = headers.get(b"content-type", b"").decode("latin1")

if _is_text_content(content_type):
span.set_attribute("http.request.body", _truncate_body(body))
except Exception:
_logger.exception("Failed to capture Starlette client request body in trace")


def _starlette_client_response_hook(span: trace.Span, scope: Dict[str, Any], message: Dict[str, Any]) -> None:
"""Hook to capture Starlette client response body in traces if enabled."""

if not _capture_http_bodies:
return

try:
# Capture body from response message
if message.get("type") == "http.response.body" and "body" in message:
body = message["body"]
if body:
# We don't have easy access to response headers here
# Could try to get from span attributes if set earlier
span.set_attribute("http.response.body", _truncate_body(body))
except Exception:
_logger.exception("Failed to capture Starlette client response body in trace")


def instrument_starlette_app(app: Starlette) -> None:
"""Instrument a Starlette application with OpenTelemetry.

Args:
app: The Starlette application to instrument

Note:
Body logging is controlled by the enable_body_logging parameter passed to setup_otel().
This should be called after setup_otel() has been called to set up the tracer provider.
Body logging for Starlette is limited compared to HTTPX as it must avoid consuming
request/response streams. Bodies are only captured when already buffered in the ASGI scope.
"""

# env needs to be set here since _excluded_urls is initialized at module import time
os.environ.setdefault("OTEL_PYTHON_STARLETTE_EXCLUDED_URLS", AGENT_CARD_WELL_KNOWN_PATH)
from opentelemetry.instrumentation.starlette import StarletteInstrumentor

StarletteInstrumentor().instrument_app(
app,
server_request_hook=_starlette_server_request_hook,
client_request_hook=_starlette_client_request_hook,
client_response_hook=_starlette_client_response_hook,
)