Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ from basalt.observability import observe, start_observe

# Root span with identity tracking
@start_observe(
feature_slug="dataset-processing",
name="process_workflow",
identity={
"organization": {"id": "123", "name": "ACME"},
Expand Down Expand Up @@ -261,6 +262,55 @@ Your Basalt instance exposes a `prompts` property for interacting with your Basa
basalt.shutdown()
```

- **Context Managers for Observability** (Recommended)

Use prompts as context managers to automatically nest LLM calls under a prompt span for better trace organization and observability:

**Sync Example:**

```python
from basalt import Basalt
import openai

basalt = Basalt(api_key="your-api-key")
client = openai.OpenAI()

# Use context manager for automatic span nesting
with basalt.prompts.get_sync('summary-prompt', tag='production') as prompt:
response = client.chat.completions.create(
model=prompt.model.model,
messages=[{'role': 'user', 'content': prompt.text}]
)
print(response.choices[0].message.content)

basalt.shutdown()
```

**Async Example:**

```python
import asyncio
from basalt import Basalt
import openai

async def generate():
basalt = Basalt(api_key="your-api-key")
client = openai.AsyncOpenAI()

async with await basalt.prompts.get('summary-prompt', tag='production') as prompt:
response = await client.chat.completions.create(
model=prompt.model.model,
messages=[{'role': 'user', 'content': prompt.text}]
)
print(response.choices[0].message.content)

basalt.shutdown()

asyncio.run(generate())
```

See the [Prompts guide](./docs/03-prompts.md#observability-with-context-managers) for complete details.

- **Describe a Prompt**

Get metadata about a prompt including available versions and tags.
Expand Down
6 changes: 5 additions & 1 deletion basalt/observability/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ class TelemetryConfig:
Example: ["langchain", "llamaindex"]
"""

exporter: SpanExporter | None = None
exporter: SpanExporter | list[SpanExporter] | None = None

extra_resource_attributes: dict[str, Any] = field(default_factory=dict)

def clone(self) -> TelemetryConfig:
Expand All @@ -136,6 +137,9 @@ def clone(self) -> TelemetryConfig:
cloned.extra_resource_attributes = dict(self.extra_resource_attributes)
cloned.enabled_providers = list(self.enabled_providers) if self.enabled_providers else None
cloned.disabled_providers = list(self.disabled_providers) if self.disabled_providers else None
# Clone exporter list if it's a list (shallow copy of list, not exporters themselves)
if isinstance(self.exporter, list):
cloned.exporter = list(self.exporter)
return cloned

def with_env_overrides(self) -> TelemetryConfig:
Expand Down
85 changes: 72 additions & 13 deletions basalt/observability/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,17 @@ def __init__(

def create_tracer_provider(
config: BasaltConfig,
exporter: SpanExporter | None = None,
exporter: SpanExporter | list[SpanExporter] | None = None,
) -> TracerProvider:
"""
Create and configure an OpenTelemetry TracerProvider for Basalt.

Args:
config: BasaltConfig instance with service and environment info.
exporter: Optional SpanExporter. Defaults to ConsoleSpanExporter for debugging.
exporter: Optional SpanExporter or list of SpanExporters. Can be:
- None: Defaults to ConsoleSpanExporter for debugging
- Single SpanExporter: Exports to one destination
- List of SpanExporters: Exports to multiple destinations simultaneously

Returns:
A configured TracerProvider instance.
Expand All @@ -102,46 +105,75 @@ def create_tracer_provider(
resource = Resource.create(resource_attrs)
provider = TracerProvider(resource=resource)

# Normalize exporter to list
if exporter is None:
exporter = ConsoleSpanExporter()
exporters = [ConsoleSpanExporter()]
warnings.warn(
"No span exporter configured and default Basalt OTEL endpoint unavailable. "
"Using ConsoleSpanExporter for debugging. For production, configure an exporter "
"via TelemetryConfig.exporter or set BASALT_OTEL_EXPORTER_OTLP_ENDPOINT environment variable.",
UserWarning,
stacklevel=3,
)
elif isinstance(exporter, list):
# Handle empty list
exporters = exporter if exporter else [ConsoleSpanExporter()]
if not exporter:
warnings.warn(
"Empty exporter list provided. Using ConsoleSpanExporter for debugging.",
UserWarning,
stacklevel=3,
)
else:
exporters = [exporter]

processor_cls = SimpleSpanProcessor if isinstance(exporter, ConsoleSpanExporter) else BatchSpanProcessor
provider.add_span_processor(processor_cls(exporter))
# Add a span processor for each exporter
for exp in exporters:
processor_cls = SimpleSpanProcessor if isinstance(exp, ConsoleSpanExporter) else BatchSpanProcessor
provider.add_span_processor(processor_cls(exp))

return provider


def setup_tracing(
config: BasaltConfig,
exporter: SpanExporter | None = None,
exporter: SpanExporter | list[SpanExporter] | None = None,
) -> TracerProvider:
"""
Set up global OpenTelemetry tracing for the Basalt SDK.

Args:
config: Tracing configuration.
exporter: Optional SpanExporter to use.
exporter: Optional SpanExporter or list of SpanExporters to use.

Returns:
The configured TracerProvider.

Note:
If a TracerProvider is already set globally, this will return the existing
provider instead of creating a new one to avoid "Overriding of current
TracerProvider is not allowed" errors.
If a TracerProvider is already set globally (e.g., by Datadog, Honeycomb,
or another observability tool), this will return the existing provider instead
of creating a new one to avoid "Overriding of current TracerProvider is not allowed" errors.

When an existing provider is detected, Basalt's span processors
(BasaltContextProcessor, BasaltCallEvaluatorProcessor, BasaltAutoInstrumentationProcessor)
will be attached to it via _install_basalt_processors() in initialize().
This ensures that all spans (including those from external tools) are enriched
with Basalt's custom metadata, evaluators, and prompt context.

Integration order: For best results, initialize external observability tools
(Datadog, Honeycomb) before Basalt. If Basalt initializes first, external tools
may fail to override the global provider.
"""
# Check if a tracer provider is already set globally
existing_provider = trace.get_tracer_provider()
# If it's a real TracerProvider (not the default proxy), reuse it
if hasattr(existing_provider, 'add_span_processor'):
logger.debug("Reusing existing global TracerProvider")
provider_type = type(existing_provider).__name__
provider_module = type(existing_provider).__module__
logger.info(
f"Reusing existing global TracerProvider: {provider_module}.{provider_type}. "
f"Basalt processors will be attached to this provider to enrich all spans."
)
return existing_provider # type: ignore[return-value]

# Otherwise create and set a new one
Expand Down Expand Up @@ -180,14 +212,32 @@ def initialize(
self._initialized = True
return

exporter = effective_config.exporter or self._build_exporter_from_env()
# Combine user-provided exporters with environment-built exporter
user_exporters = effective_config.exporter
env_exporter = self._build_exporter_from_env()

# Normalize user_exporters to list
if user_exporters is None:
exporters_list = []
elif isinstance(user_exporters, list):
exporters_list = user_exporters.copy()
else:
exporters_list = [user_exporters]

# Add environment exporter if available and not already in list
if env_exporter and env_exporter not in exporters_list:
exporters_list.append(env_exporter)

# Pass to setup_tracing (will handle None/empty list → ConsoleSpanExporter)
final_exporter = exporters_list if exporters_list else None

basalt_config = BasaltConfig(
service_name=effective_config.service_name,
service_version=effective_config.service_version or "",
environment=effective_config.environment,
extra_resource_attributes=effective_config.extra_resource_attributes,
)
self._tracer_provider = setup_tracing(basalt_config, exporter=exporter)
self._tracer_provider = setup_tracing(basalt_config, exporter=final_exporter)
if self._tracer_provider:
self._install_basalt_processors(self._tracer_provider)

Expand Down Expand Up @@ -403,18 +453,27 @@ def _initialize_instrumentation(self, config: TelemetryConfig) -> None:

def _install_basalt_processors(self, provider: TracerProvider) -> None:
if getattr(provider, "_basalt_processors_installed", False):
logger.debug("Basalt processors already installed on this provider, skipping")
return

processors: list[OTelSpanProcessor] = [
BasaltContextProcessor(),
BasaltCallEvaluatorProcessor(),
BasaltAutoInstrumentationProcessor(),
]

provider_type = type(provider).__name__
logger.info(
f"Installing {len(processors)} Basalt span processors on {provider_type}: "
f"{', '.join(type(p).__name__ for p in processors)}"
)

for processor in processors:
provider.add_span_processor(processor)

provider._basalt_processors_installed = True # type: ignore[attr-defined]
self._span_processors = processors
logger.debug(f"Successfully installed Basalt processors on {provider_type}")


def _uninstrument_providers(self) -> None:
Expand Down
67 changes: 67 additions & 0 deletions examples/multi_exporter_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import os

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GRPCSpanExporter
from opentelemetry.sdk.trace.export import ConsoleSpanExporter

from basalt import Basalt, TelemetryConfig
from basalt.observability import start_observe

# Configuration
BASALT_API_KEY = os.environ.get("BASALT_API_KEY", "your-basalt-api-key")
BASALT_COLLECTOR_ENDPOINT = "https://grpc.otel.getbasalt.ai" # Basalt's default collector
LOCAL_COLLECTOR_ENDPOINT = "localhost:4317" # Local OTel collector


# Create exporters for three destinations

basalt_exporter = GRPCSpanExporter(
endpoint=BASALT_COLLECTOR_ENDPOINT,
headers=(("authorization", f"Bearer {BASALT_API_KEY}"),),
)

local_exporter = GRPCSpanExporter(
endpoint=LOCAL_COLLECTOR_ENDPOINT,
insecure=True,
# Local collector typically doesn't need auth headers
)

console_exporter = ConsoleSpanExporter()

# Configure Basalt with THREE exporters
telemetry_config = TelemetryConfig(
service_name="multi-export-demo",
environment="production",
exporter=[
basalt_exporter, # Export to Basalt for advanced features
local_exporter, # Export to local collector
console_exporter, # Export to console for debugging
],
)

basalt = Basalt(api_key=BASALT_API_KEY, telemetry_config=telemetry_config)


# Simulate a traced workflow


@start_observe(feature_slug="support-ticket", name="onboard_user")
def onboard_user(user_id: str):
"""Simulate a customer onboarding workflow."""
from basalt.observability import observe

observe.set_input({"user_id": user_id})
observe.set_metadata({"workflow_version": "2.1"})

# In a real app, you might call OpenAI here
# The auto-instrumentation will capture those calls too
result = {"status": "success", "user_id": user_id, "onboarding_complete": True}

observe.set_output(result)
return result


# Execute the workflow
result = onboard_user("user-12345")

# Flush to ensure traces are sent before exit
basalt.shutdown()
53 changes: 53 additions & 0 deletions tests/observability/test_instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,56 @@ def test_grpc_exporter_not_wrapped(self, mock_grpc_exporter):
# Should NOT be wrapped, should be the gRPC exporter directly
self.assertNotIsInstance(exporter, ResilientSpanExporter)
self.assertIs(exporter, mock_grpc_exporter_instance)

@mock.patch("basalt.observability.instrumentation.trace")
def test_install_processors_on_existing_provider(self, mock_trace):
"""Test that Basalt processors are installed on an existing TracerProvider (e.g., Datadog)."""
from opentelemetry.sdk.trace import TracerProvider

# Simulate an external tool (like Datadog) creating a provider first
external_provider = TracerProvider()
mock_trace.get_tracer_provider.return_value = external_provider
mock_trace.set_tracer_provider = mock.Mock() # Should not be called

manager = InstrumentationManager()
config = TelemetryConfig(service_name="test", enabled=True)

manager.initialize(config)

# Verify that setup_tracing reused the existing provider
mock_trace.set_tracer_provider.assert_not_called()

# Verify that Basalt processors were installed on the external provider
self.assertTrue(hasattr(external_provider, "_basalt_processors_installed"))
self.assertTrue(external_provider._basalt_processors_installed)

# Verify that the manager has references to the processors
self.assertEqual(len(manager._span_processors), 3)

# Verify that the manager stored the external provider
self.assertIs(manager._tracer_provider, external_provider)

@mock.patch("basalt.observability.instrumentation.trace")
def test_processors_not_installed_twice_on_same_provider(self, mock_trace):
"""Test that Basalt processors are not installed twice on the same provider."""
from opentelemetry.sdk.trace import TracerProvider

external_provider = TracerProvider()
mock_trace.get_tracer_provider.return_value = external_provider

manager1 = InstrumentationManager()
manager2 = InstrumentationManager()

config = TelemetryConfig(service_name="test", enabled=True)

# First initialization should install processors
manager1.initialize(config)
processor_count_after_first = len(external_provider._active_span_processor._span_processors)

# Second initialization should NOT add processors again (idempotent)
manager2.initialize(config)
processor_count_after_second = len(external_provider._active_span_processor._span_processors)

# Verify processors were only added once
self.assertEqual(processor_count_after_first, processor_count_after_second)
self.assertTrue(external_provider._basalt_processors_installed)
Loading