Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed .coverage
Binary file not shown.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ test.py
.idea/
venv/
.github/instructions/*.md
.copilot-tracking
.copilot-tracking
.env*
requirements*
.serena
8 changes: 8 additions & 0 deletions basalt/observability/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def __enter__(self) -> StartSpanHandle:
evaluators=self.evaluators,
feature_slug=self.feature_slug,
metadata=self._metadata,
evaluate_config=self.evaluate_config,
experiment=self.experiment,
)
span = self._ctx_manager.__enter__()
# Type assertion: we know this is StartSpanHandle since we passed it as handle_cls
Expand Down Expand Up @@ -150,6 +152,8 @@ def wrapper(*args, **kwargs):
evaluators=pre_evaluators,
feature_slug=self.feature_slug,
metadata=self._metadata,
evaluate_config=self.evaluate_config,
experiment=self.experiment,
) as handle:
# Type assertion: we know this is StartSpanHandle since we passed it as handle_cls
assert isinstance(handle, StartSpanHandle)
Expand Down Expand Up @@ -188,6 +192,8 @@ async def async_wrapper(*args, **kwargs):
evaluators=pre_evaluators,
feature_slug=self.feature_slug,
metadata=self._metadata,
evaluate_config=self.evaluate_config,
experiment=self.experiment,
) as handle:
# Type assertion: we know this is StartSpanHandle since we passed it as handle_cls
assert isinstance(handle, StartSpanHandle)
Expand Down Expand Up @@ -879,6 +885,8 @@ async def __aenter__(self) -> StartSpanHandle:
evaluators=self.evaluators,
feature_slug=self.feature_slug,
metadata=self._metadata,
evaluate_config=self.evaluate_config,
experiment=self.experiment,
)
span = await self._ctx_manager.__aenter__()
# Type assertion: we know this is StartSpanHandle since we passed it as handle_cls
Expand Down
20 changes: 20 additions & 0 deletions basalt/observability/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ class TelemetryConfig:

extra_resource_attributes: dict[str, Any] = field(default_factory=dict)

sample_rate: float = 0.0
"""
Global default sampling rate for trace-level evaluation (0.0-1.0, default 0.0).
Controls whether evaluators run for a trace via should_evaluate attribute.
Can be overridden per-trace via EvaluationConfig(sample_rate=...) in start_observe().
"""

def __post_init__(self) -> None:
if not 0.0 <= self.sample_rate <= 1.0:
raise ValueError("sample_rate must be within [0.0, 1.0].")

def clone(self) -> TelemetryConfig:
"""Return a defensive copy of the telemetry configuration."""
cloned = replace(self)
Expand Down Expand Up @@ -175,6 +186,15 @@ def with_env_overrides(self) -> TelemetryConfig:
if disabled_instruments:
cfg.disabled_providers = [p.strip() for p in disabled_instruments.split(",") if p.strip()]

sample_rate_env = os.getenv("BASALT_SAMPLE_RATE")
if sample_rate_env:
try:
rate = float(sample_rate_env)
if 0.0 <= rate <= 1.0:
cfg.sample_rate = rate
except ValueError:
pass # Ignore invalid values
Comment on lines +189 to +196
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When BASALT_SAMPLE_RATE environment variable contains an invalid value (line 195), the error is silently ignored. This makes debugging difficult if users misconfigure the value. Consider logging a warning when an invalid sample_rate is provided so users are aware their configuration is being ignored. Example: logger.warning("Invalid BASALT_SAMPLE_RATE value '%s', must be a float between 0.0 and 1.0", sample_rate_env).

Copilot uses AI. Check for mistakes.
Comment on lines +189 to +196
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring on line 165 does not list BASALT_SAMPLE_RATE as a supported environment variable, even though it is implemented in lines 189-196. This inconsistency makes the configuration option undiscoverable for users. Consider updating the docstring to include BASALT_SAMPLE_RATE in the list of supported environment variables (note: the docstring is at line 165, outside this diff region, but the omission impacts the discoverability of this feature).

Copilot uses AI. Check for mistakes.

if not cfg.service_version:
# basalt_sdk_config is a mapping defined in `basalt.config` module
cfg.service_version = basalt_sdk_config.get("sdk_version", "unknown")
Expand Down
86 changes: 82 additions & 4 deletions basalt/observability/context_managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import os
import random
from collections.abc import AsyncGenerator, Generator, Mapping, Sequence
from contextlib import asynccontextmanager, contextmanager
from dataclasses import dataclass, field
Expand All @@ -22,6 +23,7 @@
from . import semconv
from .trace_context import (
ORGANIZATION_CONTEXT_KEY,
SHOULD_EVALUATE_CONTEXT_KEY,
USER_CONTEXT_KEY,
TraceIdentity,
_current_trace_defaults,
Expand All @@ -44,14 +46,16 @@ class EvaluationConfig:
"""
Type-safe configuration for evaluators attached to a span.

This configuration is span-scoped and shared by all evaluators in the span.
It's not handled client-side but attached to the span for server-side processing.
This configuration is span-scoped and controls trace-level sampling for evaluators.
The sample_rate determines whether evaluators run for the entire trace.

Attributes:
sample_rate: Sampling rate for evaluators (0.0-1.0). Default is 1.0 (100%).
sample_rate: Sampling rate for trace-level evaluation (0.0-1.0). Default is 0.0 (no sampling).
When set, one sampling decision is made at root span creation and propagated
to all spans in the trace via basalt.span.should_evaluate attribute.
"""

sample_rate: float = 1.0
sample_rate: float = 0.0
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking API change. The default sample_rate has been changed from 1.0 (100% sampling) to 0.0 (no sampling). This means that existing code relying on the default behavior will now skip evaluations unless explicitly configured. Consider documenting this as a breaking change in release notes or providing a migration guide for users who depend on the previous default behavior.

Copilot uses AI. Check for mistakes.

def __post_init__(self) -> None:
if not 0.0 <= self.sample_rate <= 1.0:
Expand Down Expand Up @@ -678,6 +682,8 @@ def _with_span_handle(
organization: TraceIdentity | Mapping[str, Any] | None = None,
feature_slug: str | None = None,
metadata: Mapping[str, Any] | None = None,
evaluate_config: EvaluationConfig | None = None,
experiment: Any = None,
) -> Generator[SpanHandle, None, None]:
tracer = get_tracer(tracer_name)
defaults = _current_trace_defaults()
Expand Down Expand Up @@ -714,6 +720,37 @@ def _with_span_handle(
# Check if we're inside a basalt trace
in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None

# Make trace-level sampling decision
should_evaluate_token = None
if is_root:
# Root span: make new sampling decision
# If experiment is attached, ALWAYS evaluate (should_evaluate=True)
if experiment is not None:
should_evaluate = True
else:
# Get sample_rate from evaluate_config if provided, otherwise use global default
if evaluate_config is not None:
effective_sample_rate = evaluate_config.sample_rate
else:
effective_sample_rate = defaults.sample_rate
should_evaluate = random.random() < effective_sample_rate
should_evaluate_token = attach(set_value(SHOULD_EVALUATE_CONTEXT_KEY, should_evaluate))
else:
# Check if should_evaluate already exists in context
existing_should_evaluate = otel_context.get_value(SHOULD_EVALUATE_CONTEXT_KEY)
if existing_should_evaluate is None:
# Orphan span without root - make its own decision
# If experiment is attached, ALWAYS evaluate
if experiment is not None:
should_evaluate = True
else:
if evaluate_config is not None:
effective_sample_rate = evaluate_config.sample_rate
else:
effective_sample_rate = defaults.sample_rate
should_evaluate = random.random() < effective_sample_rate
should_evaluate_token = attach(set_value(SHOULD_EVALUATE_CONTEXT_KEY, should_evaluate))
Comment on lines +723 to +752
Copy link

Copilot AI Jan 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sampling decision logic is duplicated in the async version (_async_with_span_handle) at lines 893-922. Consider extracting this into a shared helper function to avoid code duplication and ensure consistent behavior between sync and async implementations. The logic spans approximately 30 lines and is identical in both places, making it a prime candidate for refactoring.

Copilot uses AI. Check for mistakes.

try:
with tracer.start_as_current_span(name) as span:
# Store root span in context for retrieval from nested spans
Expand Down Expand Up @@ -780,6 +817,10 @@ def _with_span_handle(
handle.set_output(output_payload)

finally:
# Detach should_evaluate token if it was set
if should_evaluate_token is not None:
detach(should_evaluate_token)

# Detach root span token if it was set
if root_span_token is not None:
detach(root_span_token)
Expand All @@ -805,6 +846,8 @@ async def _async_with_span_handle(
organization: TraceIdentity | Mapping[str, Any] | None = None,
feature_slug: str | None = None,
metadata: Mapping[str, Any] | None = None,
evaluate_config: EvaluationConfig | None = None,
experiment: Any = None,
) -> AsyncGenerator[SpanHandle, None]:
"""Async version of _with_span_handle.

Expand Down Expand Up @@ -847,6 +890,37 @@ async def _async_with_span_handle(
# Check if we're inside a basalt trace
in_basalt_trace = otel_context.get_value(ROOT_SPAN_CONTEXT_KEY) is not None

# Make trace-level sampling decision
should_evaluate_token = None
if is_root:
# Root span: make new sampling decision
# If experiment is attached, ALWAYS evaluate (should_evaluate=True)
if experiment is not None:
should_evaluate = True
else:
# Get sample_rate from evaluate_config if provided, otherwise use global default
if evaluate_config is not None:
effective_sample_rate = evaluate_config.sample_rate
else:
effective_sample_rate = defaults.sample_rate
should_evaluate = random.random() < effective_sample_rate
should_evaluate_token = attach(set_value(SHOULD_EVALUATE_CONTEXT_KEY, should_evaluate))
else:
# Check if should_evaluate already exists in context
existing_should_evaluate = otel_context.get_value(SHOULD_EVALUATE_CONTEXT_KEY)
if existing_should_evaluate is None:
# Orphan span without root - make its own decision
# If experiment is attached, ALWAYS evaluate
if experiment is not None:
should_evaluate = True
else:
if evaluate_config is not None:
effective_sample_rate = evaluate_config.sample_rate
else:
effective_sample_rate = defaults.sample_rate
should_evaluate = random.random() < effective_sample_rate
should_evaluate_token = attach(set_value(SHOULD_EVALUATE_CONTEXT_KEY, should_evaluate))

try:
with tracer.start_as_current_span(name) as span:
# Store root span in context for retrieval from nested spans
Expand Down Expand Up @@ -913,6 +987,10 @@ async def _async_with_span_handle(
handle.set_output(output_payload)

finally:
# Detach should_evaluate token if it was set
if should_evaluate_token is not None:
detach(should_evaluate_token)

# Detach root span token if it was set
if root_span_token is not None:
detach(root_span_token)
Expand Down
6 changes: 6 additions & 0 deletions basalt/observability/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
BasaltAutoInstrumentationProcessor,
BasaltCallEvaluatorProcessor,
BasaltContextProcessor,
BasaltShouldEvaluateProcessor,
)
from .resilient_exporters import ResilientSpanExporter

Expand Down Expand Up @@ -431,6 +432,10 @@ def _initialize_instrumentation(self, config: TelemetryConfig) -> None:
Args:
config: Telemetry configuration specifying trace content and provider settings.
"""
# Set global sample rate from config
from .trace_context import set_global_sample_rate
set_global_sample_rate(config.sample_rate)

# Set environment variables for third-party OpenTelemetry instrumentors
# These variables are READ by the instrumentation libraries (openai, anthropic, etc.)
# and control whether they capture prompts/completions in traces.
Expand Down Expand Up @@ -459,6 +464,7 @@ def _install_basalt_processors(self, provider: TracerProvider) -> None:
processors: list[OTelSpanProcessor] = [
BasaltContextProcessor(),
BasaltCallEvaluatorProcessor(),
BasaltShouldEvaluateProcessor(),
BasaltAutoInstrumentationProcessor(),
]

Expand Down
33 changes: 33 additions & 0 deletions basalt/observability/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,39 @@ def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[over
return True


class BasaltShouldEvaluateProcessor(SpanProcessor):
"""
Span processor that applies the trace-level should_evaluate attribute.

Reads the should_evaluate decision from OpenTelemetry context and applies
it as a span attribute. This ensures all spans in a trace have the same
should_evaluate value, enabling trace-level sampling for evaluators.
"""

def on_start(self, span: Span, parent_context: Any | None = None) -> None: # type: ignore[override]
if not span.is_recording():
return

from .trace_context import SHOULD_EVALUATE_CONTEXT_KEY

# Read should_evaluate from context
# Use parent_context if provided, otherwise use current context
ctx = parent_context if parent_context is not None else otel_context.get_current()
should_evaluate = otel_context.get_value(SHOULD_EVALUATE_CONTEXT_KEY, ctx)

if should_evaluate is not None:
span.set_attribute(semconv.BasaltSpan.SHOULD_EVALUATE, bool(should_evaluate))

def on_end(self, span: ReadableSpan) -> None: # type: ignore[override]
return

def shutdown(self) -> None: # type: ignore[override]
return

def force_flush(self, timeout_millis: int = 30000) -> bool: # type: ignore[override]
return True


# Known auto-instrumentation scope names
KNOWN_AUTO_INSTRUMENTATION_SCOPES: Final[frozenset[str]] = frozenset({
"opentelemetry.instrumentation.openai",
Expand Down
13 changes: 6 additions & 7 deletions basalt/observability/semconv.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,15 @@ class BasaltSpan:
"""
Optional, span-scoped configuration applied to evaluators as a whole.
Type: JSON object (string-serialized) or key/value attributes under this prefix
Examples: '{"sample_rate": 0.25, "mode": "async"}'
Examples: '{"sample_rate": 0.25}'
"""

# Optional prefix for span-scoped evaluator metadata (not per evaluator)
# Example usage: set attributes like "basalt.span.evaluator.sample_rate" = 0.5
EVALUATOR_PREFIX: Final[str] = "basalt.span.evaluator"
SHOULD_EVALUATE: Final[str] = "basalt.span.should_evaluate"
"""
Prefix for evaluator-related, span-scoped attributes.
Type: various (string, number, boolean)
Examples: "basalt.span.evaluator.sample_rate" = 0.5
Boolean indicating whether evaluators should run for this span's trace.
Determined once at root span creation via trace-level sampling, propagated to all child spans.
Type: boolean
Value: true (run evaluators) or false (skip evaluators)
"""

FEATURE_SLUG: Final[str] = "basalt.span.feature_slug"
Expand Down
27 changes: 27 additions & 0 deletions basalt/observability/trace_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
USER_CONTEXT_KEY: Final[str] = "basalt.context.user"
ORGANIZATION_CONTEXT_KEY: Final[str] = "basalt.context.organization"
FEATURE_SLUG_CONTEXT_KEY: Final[str] = "basalt.context.feature_slug"
SHOULD_EVALUATE_CONTEXT_KEY: Final[str] = "basalt.context.should_evaluate"


@dataclass(frozen=True, slots=True)
Expand Down Expand Up @@ -44,16 +45,20 @@ class _TraceContextConfig:

experiment: TraceExperiment | str | None = None
observe_metadata: dict[str, Any] | None = None
sample_rate: float = 0.0

def __post_init__(self) -> None:
self.experiment = _coerce_experiment(self.experiment)
self.observe_metadata = dict(self.observe_metadata) if self.observe_metadata else {}
if not 0.0 <= self.sample_rate <= 1.0:
raise ValueError("sample_rate must be within [0.0, 1.0].")

def clone(self) -> _TraceContextConfig:
"""Return a defensive copy of the configuration."""
return _TraceContextConfig(
experiment=self.experiment,
observe_metadata=dict(self.observe_metadata) if self.observe_metadata is not None else {},
sample_rate=self.sample_rate,
)


Expand Down Expand Up @@ -106,6 +111,28 @@ def _current_trace_defaults() -> _TraceContextConfig:
return _DEFAULT_CONTEXT.clone()


def set_global_sample_rate(sample_rate: float) -> None:
"""
Set the global default sample rate for trace-level evaluation.

Args:
sample_rate: Sampling rate (0.0-1.0) where 1.0 means 100% sampling.
"""
if not 0.0 <= sample_rate <= 1.0:
raise ValueError("sample_rate must be within [0.0, 1.0].")

# Take a snapshot of the current defaults under the lock, then
# construct a new config that preserves existing fields while
# updating the sample_rate, and install it via _set_trace_defaults.
with _LOCK:
current = _DEFAULT_CONTEXT.clone()

new_config = _TraceContextConfig(
experiment=current.experiment,
observe_metadata=current.observe_metadata,
sample_rate=sample_rate,
)
_set_trace_defaults(new_config)
def configure_global_metadata(metadata: dict[str, Any] | None) -> None:
"""
Configure global observability metadata applied to all traces.
Expand Down
Loading