Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions basalt/observability/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,17 @@ def _with_span_handle(
# AND this is a start_observe call (basalt_trace type)
# -> Treat as Basalt root (allows start_observe to work inside FastAPI handlers)
is_root = True

# Retroactively apply basalt trace metadata to the parent span
# This ensures the entire trace tree has consistent basalt attributes
if parent_span is not None and parent_span.is_recording():
parent_span.set_attribute(semconv.BasaltSpan.IN_TRACE, True)

parent_span.set_attribute(semconv.BasaltSpan.KIND, "span")

# Also propagate feature_slug to parent if available
if feature_slug is not None:
parent_span.set_attribute(semconv.BasaltSpan.FEATURE_SLUG, feature_slug)
else:
# Parent exists and either:
# - We're already in a Basalt trace, OR
Expand Down
49 changes: 37 additions & 12 deletions basalt/observability/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,32 +294,57 @@ def on_start(self, span: Span, parent_context: Context | None = None) -> None:
"""
Called when a span starts.

Detects auto-instrumented spans and applies pending injection data.
Also automatically sets basalt.span.kind based on the instrumentation scope.
Applies basalt trace metadata to ALL spans within a basalt trace,
then applies auto-instrumentation-specific attributes to known scopes.
"""
# Check if this is a recording span
if not span.is_recording():
return

# Get instrumentation scope
# ========================================================================
# SECTION A: Universal trace metadata propagation
# Apply to ALL spans within a basalt trace, regardless of instrumentation scope
# ========================================================================
from .context_managers import ROOT_SPAN_CONTEXT_KEY

# Check for basalt trace context in both parent_context and current context
# This ensures we catch the ROOT_SPAN_CONTEXT_KEY regardless of how it's propagated
in_basalt_trace = False
if parent_context is not None:
in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY, parent_context) is not None

if not in_basalt_trace:
# Also check current context as fallback
in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None

if in_basalt_trace:
# Mark ALL spans within a basalt trace with basalt.in_trace
span.set_attribute(semconv.BasaltSpan.IN_TRACE, True)

# Propagate feature_slug to ALL spans within a basalt trace
_apply_feature_slug_from_context(span, parent_context)

# Do not set basalt.span.kind for unknown scopes.
# Known auto-instrumentation scopes set the kind in Section B.

# ========================================================================
# SECTION B: Auto-instrumentation-specific logic
# Apply only to spans from known auto-instrumentation libraries
# ========================================================================
scope = span.instrumentation_scope
if not scope or scope.name not in KNOWN_AUTO_INSTRUMENTATION_SCOPES:
# Not a known auto-instrumented span, but it still received
# universal trace metadata above if in a basalt trace
return

# Get context for injection data
ctx = parent_context or otel_context.get_current()

# Automatically set span kind based on instrumentation scope
span_kind = INSTRUMENTATION_SCOPE_KINDS.get(scope.name)
if span_kind:
span.set_attribute(semconv.BasaltSpan.KIND, span_kind)

# Mark auto-instrumented spans within a basalt trace
from .context_managers import ROOT_SPAN_CONTEXT_KEY

if otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None:
span.set_attribute(semconv.BasaltSpan.IN_TRACE, True)

# Get context
ctx = parent_context or otel_context.get_current()

# Read and apply input
input_payload = otel_context.get_value(PENDING_INJECT_INPUT_KEY, ctx)
if input_payload is not None:
Expand Down
18 changes: 17 additions & 1 deletion examples/reproduce_trace_bug/service_a.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastapi import FastAPI
from google import genai
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

Expand All @@ -26,7 +27,22 @@
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

def build_basalt_client() -> Basalt:
telemetry_config = TelemetryConfig(service_name="service-a", trace_content=True,enabled_providers=["google_generativeai"])
# Use environment variable for OTLP endpoint or default to localhost
otlp_endpoint = os.getenv("BASALT_OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")

# Create custom exporter with authentication headers
# Note: insecure=True is used for local/demo purposes
exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
headers={"authorization": f"Bearer {BASALT_API_KEY}"},
insecure=True,
timeout=10,
)
telemetry_config = TelemetryConfig(service_name="service-a",
trace_content=True,
enabled_providers=["google_generativeai"],
exporter=[exporter]
)
basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry_config)
return basalt_client

Expand Down
24 changes: 22 additions & 2 deletions examples/reproduce_trace_bug/service_b.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from fastapi import FastAPI, Request
from google import genai
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from basalt import Basalt
from basalt import Basalt, TelemetryConfig
from basalt.observability import start_observe

load_dotenv()
Expand All @@ -25,7 +26,26 @@
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")

def build_basalt_client()-> Basalt:
basalt_client = Basalt(api_key=BASALT_API_KEY)
# Use environment variable for OTLP endpoint or default to localhost
otlp_endpoint = os.getenv("BASALT_OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")

# Create custom exporter with authentication headers
exporter = OTLPSpanExporter(
endpoint=otlp_endpoint,
headers={"authorization": f"Bearer {BASALT_API_KEY}"},
insecure=True,
timeout=10,
)

# Configure telemetry with Gemini auto-instrumentation
telemetry = TelemetryConfig(
service_name="service-b",
enabled_providers=["google_generativeai"], # NEW Google GenAI SDK (from google import genai)
trace_content=True, # Capture prompt and completion content
exporter=[exporter], # Use custom local exporter
)

basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry)
return basalt_client


Expand Down
111 changes: 111 additions & 0 deletions tests/observability/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,3 +314,114 @@ def test_auto_instrumentation_processor_prompt_context_with_optional_fields():
finally:
_current_prompt_context.reset(cv_token)
otel_context.detach(otel_token)


def test_auto_instrumentation_processor_propagates_to_unknown_scopes():
"""Test that unknown instrumentation scopes receive basalt.in_trace and feature_slug."""
from opentelemetry import context as otel_context

from basalt.observability import semconv
from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY
from basalt.observability.trace_context import FEATURE_SLUG_CONTEXT_KEY

# Create a mock span with an UNKNOWN instrumentation scope (e.g., httpx)
mock_span = MagicMock()
mock_span.is_recording.return_value = True
mock_scope = MagicMock()
mock_scope.name = "opentelemetry.instrumentation.httpx" # Not in KNOWN_AUTO_INSTRUMENTATION_SCOPES
mock_span.instrumentation_scope = mock_scope

# Create a basalt trace context with feature_slug
mock_root_span = MagicMock()
ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span)
ctx = otel_context.set_value(FEATURE_SLUG_CONTEXT_KEY, "test-feature", ctx)
token = otel_context.attach(ctx)

try:
# Create processor and call on_start
processor = processors.BasaltAutoInstrumentationProcessor()
processor.on_start(mock_span, None)

# Verify that basalt.in_trace was set to True
mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.IN_TRACE, True)
# Verify that feature_slug was propagated
mock_span.set_attribute.assert_any_call(
semconv.BasaltSpan.FEATURE_SLUG, "test-feature"
)

# Verify that basalt.span.kind was NOT set (only for known scopes)
calls = mock_span.set_attribute.call_args_list
kind_calls = [call for call in calls if call[0][0] == semconv.BasaltSpan.KIND]
assert len(kind_calls) == 0, "Unknown scopes should not receive span.kind"
finally:
otel_context.detach(token)


def test_auto_instrumentation_processor_propagates_feature_slug_to_known_scopes():
"""Test that known instrumentation scopes receive both in_trace and feature_slug."""
from opentelemetry import context as otel_context

from basalt.observability import semconv
from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY
from basalt.observability.trace_context import FEATURE_SLUG_CONTEXT_KEY

# Create a mock span with OpenAI v1 instrumentation scope
mock_span = MagicMock()
mock_span.is_recording.return_value = True
mock_scope = MagicMock()
mock_scope.name = "opentelemetry.instrumentation.openai.v1"
mock_span.instrumentation_scope = mock_scope

# Create a basalt trace context with feature_slug
mock_root_span = MagicMock()
ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span)
ctx = otel_context.set_value(FEATURE_SLUG_CONTEXT_KEY, "openai-feature", ctx)
token = otel_context.attach(ctx)

try:
# Create processor and call on_start
processor = processors.BasaltAutoInstrumentationProcessor()
processor.on_start(mock_span, None)

# Verify that basalt.in_trace was set
mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.IN_TRACE, True)
# Verify that feature_slug was propagated
mock_span.set_attribute.assert_any_call(
semconv.BasaltSpan.FEATURE_SLUG, "openai-feature"
)
# Verify that basalt.span.kind was set (for known scopes)
mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.KIND, "generation")
finally:
otel_context.detach(token)


def test_auto_instrumentation_processor_skips_spans_outside_basalt_trace():
"""Test that spans outside a basalt trace do not receive basalt attributes."""
from opentelemetry import context as otel_context

from basalt.observability import semconv

# Create a mock span with an unknown instrumentation scope
mock_span = MagicMock()
mock_span.is_recording.return_value = True
mock_scope = MagicMock()
mock_scope.name = "opentelemetry.instrumentation.httpx"
mock_span.instrumentation_scope = mock_scope

# DO NOT set ROOT_SPAN_CONTEXT_KEY - simulating a span outside basalt trace
ctx = otel_context.get_current()
token = otel_context.attach(ctx)

try:
# Create processor and call on_start
processor = processors.BasaltAutoInstrumentationProcessor()
processor.on_start(mock_span, None)

# Verify that NO basalt attributes were set
calls = mock_span.set_attribute.call_args_list
basalt_calls = [call for call in calls if "basalt." in str(call[0][0])]
assert (
len(basalt_calls) == 0
), "Spans outside basalt trace should not receive basalt attributes"
finally:
otel_context.detach(token)