From a63444ee342c8f4a5420910cd83388658e7a3558 Mon Sep 17 00:00:00 2001 From: corentings Date: Wed, 28 Jan 2026 17:56:26 +0100 Subject: [PATCH 1/2] fix: propagation to non supported --- basalt/observability/processors.py | 38 +++++--- examples/reproduce_trace_bug/service_a.py | 18 +++- examples/reproduce_trace_bug/service_b.py | 24 ++++- tests/observability/test_processors.py | 111 ++++++++++++++++++++++ 4 files changed, 176 insertions(+), 15 deletions(-) diff --git a/basalt/observability/processors.py b/basalt/observability/processors.py index 6dce23f..c6b590e 100644 --- a/basalt/observability/processors.py +++ b/basalt/observability/processors.py @@ -294,16 +294,39 @@ def on_start(self, span: Span, parent_context: Context | None = None) -> None: """ Called when a span starts. - Detects auto-instrumented spans and applies pending injection data. - Also automatically sets basalt.span.kind based on the instrumentation scope. + Applies basalt trace metadata to ALL spans within a basalt trace, + then applies auto-instrumentation-specific attributes to known scopes. """ # Check if this is a recording span if not span.is_recording(): return - # Get instrumentation scope + # Get context + ctx = parent_context or otel_context.get_current() + + # ======================================================================== + # SECTION A: Universal trace metadata propagation + # Apply to ALL spans within a basalt trace, regardless of instrumentation scope + # ======================================================================== + from .context_managers import ROOT_SPAN_CONTEXT_KEY + + in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY, ctx) is not None + + if in_basalt_trace: + # Mark ALL spans within a basalt trace with basalt.in_trace + span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + + # Propagate feature_slug to ALL spans within a basalt trace + _apply_feature_slug_from_context(span, ctx) + + # ======================================================================== + # SECTION B: Auto-instrumentation-specific logic + # Apply only to spans from known auto-instrumentation libraries + # ======================================================================== scope = span.instrumentation_scope if not scope or scope.name not in KNOWN_AUTO_INSTRUMENTATION_SCOPES: + # Not a known auto-instrumented span, but it still received + # universal trace metadata above if in a basalt trace return # Automatically set span kind based on instrumentation scope @@ -311,15 +334,6 @@ def on_start(self, span: Span, parent_context: Context | None = None) -> None: if span_kind: span.set_attribute(semconv.BasaltSpan.KIND, span_kind) - # Mark auto-instrumented spans within a basalt trace - from .context_managers import ROOT_SPAN_CONTEXT_KEY - - if otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None: - span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) - - # Get context - ctx = parent_context or otel_context.get_current() - # Read and apply input input_payload = otel_context.get_value(PENDING_INJECT_INPUT_KEY, ctx) if input_payload is not None: diff --git a/examples/reproduce_trace_bug/service_a.py b/examples/reproduce_trace_bug/service_a.py index d437bde..1ebbde1 100644 --- a/examples/reproduce_trace_bug/service_a.py +++ b/examples/reproduce_trace_bug/service_a.py @@ -12,6 +12,7 @@ from fastapi import FastAPI from google import genai from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor @@ -26,7 +27,22 @@ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") def build_basalt_client() -> Basalt: - telemetry_config = TelemetryConfig(service_name="service-a", trace_content=True,enabled_providers=["google_generativeai"]) + # Use environment variable for OTLP endpoint or default to localhost + otlp_endpoint = os.getenv("BASALT_OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") + + # Create custom exporter with authentication headers + # Note: insecure=True is used for local/demo purposes + exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + headers={"authorization": f"Bearer {BASALT_API_KEY}"}, + insecure=True, + timeout=10, + ) + telemetry_config = TelemetryConfig(service_name="service-a", + trace_content=True, + enabled_providers=["google_generativeai"], + # exporter=[exporter] + ) basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry_config) return basalt_client diff --git a/examples/reproduce_trace_bug/service_b.py b/examples/reproduce_trace_bug/service_b.py index 4378172..fe0c8ef 100644 --- a/examples/reproduce_trace_bug/service_b.py +++ b/examples/reproduce_trace_bug/service_b.py @@ -12,9 +12,10 @@ from fastapi import FastAPI, Request from google import genai from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from basalt import Basalt +from basalt import Basalt, TelemetryConfig from basalt.observability import start_observe load_dotenv() @@ -25,7 +26,26 @@ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") def build_basalt_client()-> Basalt: - basalt_client = Basalt(api_key=BASALT_API_KEY) + # Use environment variable for OTLP endpoint or default to localhost + otlp_endpoint = os.getenv("BASALT_OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317") + + # Create custom exporter with authentication headers + exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + headers={"authorization": f"Bearer {BASALT_API_KEY}"}, + insecure=True, + timeout=10, + ) + + # Configure telemetry with Gemini auto-instrumentation + telemetry = TelemetryConfig( + service_name="service-b", + enabled_providers=["google_generativeai"], # NEW Google GenAI SDK (from google import genai) + trace_content=True, # Capture prompt and completion content + # exporter=[exporter], # Use custom local exporter + ) + + basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry) return basalt_client diff --git a/tests/observability/test_processors.py b/tests/observability/test_processors.py index 10589e8..180d3e0 100644 --- a/tests/observability/test_processors.py +++ b/tests/observability/test_processors.py @@ -314,3 +314,114 @@ def test_auto_instrumentation_processor_prompt_context_with_optional_fields(): finally: _current_prompt_context.reset(cv_token) otel_context.detach(otel_token) + + +def test_auto_instrumentation_processor_propagates_to_unknown_scopes(): + """Test that unknown instrumentation scopes receive basalt.in_trace and feature_slug.""" + from opentelemetry import context as otel_context + + from basalt.observability import semconv + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + from basalt.observability.trace_context import FEATURE_SLUG_CONTEXT_KEY + + # Create a mock span with an UNKNOWN instrumentation scope (e.g., httpx) + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.httpx" # Not in KNOWN_AUTO_INSTRUMENTATION_SCOPES + mock_span.instrumentation_scope = mock_scope + + # Create a basalt trace context with feature_slug + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + ctx = otel_context.set_value(FEATURE_SLUG_CONTEXT_KEY, "test-feature", ctx) + token = otel_context.attach(ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that basalt.in_trace was set to True + mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.IN_TRACE, True) + # Verify that feature_slug was propagated + mock_span.set_attribute.assert_any_call( + semconv.BasaltSpan.FEATURE_SLUG, "test-feature" + ) + + # Verify that basalt.span.kind was NOT set (only for known scopes) + calls = mock_span.set_attribute.call_args_list + kind_calls = [call for call in calls if call[0][0] == semconv.BasaltSpan.KIND] + assert len(kind_calls) == 0, "Unknown scopes should not receive span.kind" + finally: + otel_context.detach(token) + + +def test_auto_instrumentation_processor_propagates_feature_slug_to_known_scopes(): + """Test that known instrumentation scopes receive both in_trace and feature_slug.""" + from opentelemetry import context as otel_context + + from basalt.observability import semconv + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + from basalt.observability.trace_context import FEATURE_SLUG_CONTEXT_KEY + + # Create a mock span with OpenAI v1 instrumentation scope + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.openai.v1" + mock_span.instrumentation_scope = mock_scope + + # Create a basalt trace context with feature_slug + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + ctx = otel_context.set_value(FEATURE_SLUG_CONTEXT_KEY, "openai-feature", ctx) + token = otel_context.attach(ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that basalt.in_trace was set + mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.IN_TRACE, True) + # Verify that feature_slug was propagated + mock_span.set_attribute.assert_any_call( + semconv.BasaltSpan.FEATURE_SLUG, "openai-feature" + ) + # Verify that basalt.span.kind was set (for known scopes) + mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.KIND, "generation") + finally: + otel_context.detach(token) + + +def test_auto_instrumentation_processor_skips_spans_outside_basalt_trace(): + """Test that spans outside a basalt trace do not receive basalt attributes.""" + from opentelemetry import context as otel_context + + from basalt.observability import semconv + + # Create a mock span with an unknown instrumentation scope + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.httpx" + mock_span.instrumentation_scope = mock_scope + + # DO NOT set ROOT_SPAN_CONTEXT_KEY - simulating a span outside basalt trace + ctx = otel_context.get_current() + token = otel_context.attach(ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that NO basalt attributes were set + calls = mock_span.set_attribute.call_args_list + basalt_calls = [call for call in calls if "basalt." in str(call[0][0])] + assert ( + len(basalt_calls) == 0 + ), "Spans outside basalt trace should not receive basalt attributes" + finally: + otel_context.detach(token) From 1e118541344e044fb938ed21db7c9347c070cc2e Mon Sep 17 00:00:00 2001 From: corentings Date: Thu, 29 Jan 2026 16:11:37 +0100 Subject: [PATCH 2/2] feat: better trace propagation in processor or context manager --- basalt/observability/context_managers.py | 11 +++++++++++ basalt/observability/processors.py | 21 ++++++++++++++++----- examples/reproduce_trace_bug/service_a.py | 2 +- examples/reproduce_trace_bug/service_b.py | 2 +- 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/basalt/observability/context_managers.py b/basalt/observability/context_managers.py index 77f7799..5e5f317 100644 --- a/basalt/observability/context_managers.py +++ b/basalt/observability/context_managers.py @@ -744,6 +744,17 @@ def _with_span_handle( # AND this is a start_observe call (basalt_trace type) # -> Treat as Basalt root (allows start_observe to work inside FastAPI handlers) is_root = True + + # Retroactively apply basalt trace metadata to the parent span + # This ensures the entire trace tree has consistent basalt attributes + if parent_span is not None and parent_span.is_recording(): + parent_span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + + parent_span.set_attribute(semconv.BasaltSpan.KIND, "span") + + # Also propagate feature_slug to parent if available + if feature_slug is not None: + parent_span.set_attribute(semconv.BasaltSpan.FEATURE_SLUG, feature_slug) else: # Parent exists and either: # - We're already in a Basalt trace, OR diff --git a/basalt/observability/processors.py b/basalt/observability/processors.py index c6b590e..730ab14 100644 --- a/basalt/observability/processors.py +++ b/basalt/observability/processors.py @@ -301,23 +301,31 @@ def on_start(self, span: Span, parent_context: Context | None = None) -> None: if not span.is_recording(): return - # Get context - ctx = parent_context or otel_context.get_current() - # ======================================================================== # SECTION A: Universal trace metadata propagation # Apply to ALL spans within a basalt trace, regardless of instrumentation scope # ======================================================================== from .context_managers import ROOT_SPAN_CONTEXT_KEY - in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY, ctx) is not None + # Check for basalt trace context in both parent_context and current context + # This ensures we catch the ROOT_SPAN_CONTEXT_KEY regardless of how it's propagated + in_basalt_trace = False + if parent_context is not None: + in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY, parent_context) is not None + + if not in_basalt_trace: + # Also check current context as fallback + in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None if in_basalt_trace: # Mark ALL spans within a basalt trace with basalt.in_trace span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) # Propagate feature_slug to ALL spans within a basalt trace - _apply_feature_slug_from_context(span, ctx) + _apply_feature_slug_from_context(span, parent_context) + + # Do not set basalt.span.kind for unknown scopes. + # Known auto-instrumentation scopes set the kind in Section B. # ======================================================================== # SECTION B: Auto-instrumentation-specific logic @@ -329,6 +337,9 @@ def on_start(self, span: Span, parent_context: Context | None = None) -> None: # universal trace metadata above if in a basalt trace return + # Get context for injection data + ctx = parent_context or otel_context.get_current() + # Automatically set span kind based on instrumentation scope span_kind = INSTRUMENTATION_SCOPE_KINDS.get(scope.name) if span_kind: diff --git a/examples/reproduce_trace_bug/service_a.py b/examples/reproduce_trace_bug/service_a.py index 1ebbde1..15bbe12 100644 --- a/examples/reproduce_trace_bug/service_a.py +++ b/examples/reproduce_trace_bug/service_a.py @@ -41,7 +41,7 @@ def build_basalt_client() -> Basalt: telemetry_config = TelemetryConfig(service_name="service-a", trace_content=True, enabled_providers=["google_generativeai"], - # exporter=[exporter] + exporter=[exporter] ) basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry_config) return basalt_client diff --git a/examples/reproduce_trace_bug/service_b.py b/examples/reproduce_trace_bug/service_b.py index fe0c8ef..bfed14e 100644 --- a/examples/reproduce_trace_bug/service_b.py +++ b/examples/reproduce_trace_bug/service_b.py @@ -42,7 +42,7 @@ def build_basalt_client()-> Basalt: service_name="service-b", enabled_providers=["google_generativeai"], # NEW Google GenAI SDK (from google import genai) trace_content=True, # Capture prompt and completion content - # exporter=[exporter], # Use custom local exporter + exporter=[exporter], # Use custom local exporter ) basalt_client = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry)