Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions basalt/observability/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
EventSpanHandle,
FunctionSpanHandle,
LLMSpanHandle,
NoOpSpanHandle,
RetrievalSpanHandle,
SpanHandle,
StartSpanHandle,
Expand Down Expand Up @@ -70,6 +71,7 @@
"ToolSpanHandle",
"FunctionSpanHandle",
"EventSpanHandle",
"NoOpSpanHandle",
]

_instrumentation = InstrumentationManager()
128 changes: 60 additions & 68 deletions basalt/observability/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
EventSpanHandle,
FunctionSpanHandle,
LLMSpanHandle,
NoOpSpanHandle,
RetrievalSpanHandle,
SpanHandle,
StartSpanHandle,
Expand Down Expand Up @@ -117,9 +118,7 @@ def _resolve_kind_str(kind: ObserveKind | str) -> str:
kind_str = str(kind).lower()
valid_kinds = {k.value for k in ObserveKind}
if kind_str not in valid_kinds:
raise ValueError(
f"Invalid kind '{kind_str}'. Must be one of: {', '.join(sorted(valid_kinds))}"
)
raise ValueError(f"Invalid kind '{kind_str}'. Must be one of: {', '.join(sorted(valid_kinds))}")
return kind_str


Expand Down Expand Up @@ -255,9 +254,7 @@ async def async_wrapper(
**kwargs: object,
) -> object:
bound = resolve_bound_arguments(func, args, kwargs)
user_identity, org_identity = resolve_identity_payload(
self.identity_resolver, bound
)
user_identity, org_identity = resolve_identity_payload(self.identity_resolver, bound)
pre_evaluators = resolve_evaluators_payload(self.evaluators, bound)

span_name = self.name
Expand Down Expand Up @@ -337,8 +334,7 @@ def __init__(
# Validate name is provided and non-empty
if not name or not isinstance(name, str) or not name.strip():
raise ValueError(
"name is required and must be a non-empty string. "
"Please provide a descriptive name for this span (e.g., 'llm_generation', 'vector_search')."
"name is required and must be a non-empty string. Please provide a descriptive name for this span (e.g., 'llm_generation', 'vector_search')."
)

self.name = name.strip()
Expand All @@ -349,24 +345,21 @@ def __init__(
self.output_resolver = output
self.variables_resolver = variables
self.prompt = prompt
self._span_handle: SpanHandle | None = None
self._span_handle: SpanHandle | NoOpSpanHandle | None = None
self._ctx_manager = None

@staticmethod
def _get_config_for_kind(kind_str: str):
return _get_observe_config_for_kind(kind_str)

def __enter__(self) -> SpanHandle:
def __enter__(self) -> SpanHandle | NoOpSpanHandle:
span_name = self.name

kind_str = _resolve_kind_str(self.kind)

# Reject ROOT kind
if kind_str == ObserveKind.ROOT.value:
raise ValueError(
f"Cannot use kind='{ObserveKind.ROOT.value}' with Observe. "
f"Use StartObserve (start_observe) for root spans."
)
raise ValueError(f"Cannot use kind='{ObserveKind.ROOT.value}' with Observe. Use StartObserve (start_observe) for root spans.")

handle_cls, tracer_name, _, _ = self._get_config_for_kind(kind_str)

Expand All @@ -387,16 +380,23 @@ def __enter__(self) -> SpanHandle:
if self.prompt.variables:
prompt_attrs["basalt.prompt.variables"] = json.dumps(self.prompt.variables)

# Check for root span
# Check for root span - if no root span, return a no-op handle
# This ensures observe() is ignored unless start_observe has been called first
from opentelemetry import context as otel_context
from .context_managers import NoOpSpanHandle

if not otel_context.get_value(ROOT_SPAN_CONTEXT_KEY):
import logging
# No root span exists - return a no-op handle
self._ctx_manager = NoOpSpanHandle()
self._span_handle = self._ctx_manager.__enter__()
return self._span_handle

logger = logging.getLogger(__name__)
logger.warning(
"Observe used without a preceding start_observe. This may lead to missing trace context."
)
# Inherit feature_slug from parent context if available
from .trace_context import FEATURE_SLUG_CONTEXT_KEY

current_feature_slug = otel_context.get_value(FEATURE_SLUG_CONTEXT_KEY)
if not isinstance(current_feature_slug, str):
current_feature_slug = None

self._ctx_manager = _with_span_handle(
name=span_name,
Expand All @@ -406,6 +406,7 @@ def __enter__(self) -> SpanHandle:
span_type=kind_str,
evaluators=self.evaluators,
metadata=self._metadata,
feature_slug=current_feature_slug,
# In context manager mode, we don't auto-resolve input/vars from args
# User must call observe.input() or pass explicit input_payload if we added it to __init__
# But __init__ has resolvers, not values.
Expand All @@ -420,25 +421,24 @@ def __exit__(self, exc_type, exc_value, traceback) -> bool | None:
return None

def __call__(self, func: F) -> F:
# Import context module once for all nested functions
from opentelemetry import context as otel_context
from .trace_context import FEATURE_SLUG_CONTEXT_KEY

if isinstance(self.kind, ObserveKind):
kind_str = self.kind.value
else:
kind_str = str(self.kind).lower()

# Reject ROOT kind
if kind_str == ObserveKind.ROOT.value:
raise ValueError(
f"Cannot use kind='{ObserveKind.ROOT.value}' with Observe. "
f"Use StartObserve (start_observe) for root spans."
)
raise ValueError(f"Cannot use kind='{ObserveKind.ROOT.value}' with Observe. Use StartObserve (start_observe) for root spans.")

handle_cls, tracer_name, default_input, default_vars = self._get_config_for_kind(kind_str)

# Use defaults if not provided
input_resolver = self.input_resolver if self.input_resolver is not None else default_input
variables_resolver = (
self.variables_resolver if self.variables_resolver is not None else default_vars
)
variables_resolver = self.variables_resolver if self.variables_resolver is not None else default_vars

# Process prompt parameter if provided
prompt_metadata = {}
Expand Down Expand Up @@ -476,25 +476,12 @@ def prepare_call_data(
list[Any] | None,
]:
computed_metadata_raw = resolve_attributes(self._metadata, args, kwargs)
computed_metadata = (
computed_metadata_raw if isinstance(computed_metadata_raw, Mapping) else None
)
computed_metadata = computed_metadata_raw if isinstance(computed_metadata_raw, Mapping) else None
bound = resolve_bound_arguments(func, args, kwargs)
input_payload = resolve_payload_from_bound(input_resolver, bound)
variables_payload = resolve_variables_payload(variables_resolver, bound)
pre_evaluators = resolve_evaluators_payload(self.evaluators, bound)

# Check for root span
from opentelemetry import context as otel_context

if not otel_context.get_value(ROOT_SPAN_CONTEXT_KEY):
import logging

logger = logging.getLogger(__name__)
logger.warning(
"Observe used without a preceding start_observe. This may lead to missing trace context."
)

return computed_metadata, bound, input_payload, variables_payload, pre_evaluators

# Pre-hooks
Expand All @@ -513,9 +500,17 @@ def apply_post(span, result):

@functools.wraps(func)
def wrapper(*args, **kwargs):
computed_metadata, bound, input_payload, variables_payload, pre_evaluators = (
prepare_call_data(args, kwargs)
)
# Early exit: if no root span exists, just call the function (no-op)
if not otel_context.get_value(ROOT_SPAN_CONTEXT_KEY):
return func(*args, **kwargs)

computed_metadata, bound, input_payload, variables_payload, pre_evaluators = prepare_call_data(args, kwargs)
# Inherit feature_slug from parent context if available
from .trace_context import FEATURE_SLUG_CONTEXT_KEY

current_feature_slug = otel_context.get_value(FEATURE_SLUG_CONTEXT_KEY)
if not isinstance(current_feature_slug, str):
current_feature_slug = None

with _with_span_handle(
name=self.name,
Expand All @@ -527,6 +522,7 @@ def wrapper(*args, **kwargs):
variables=variables_payload,
evaluators=pre_evaluators,
metadata=computed_metadata,
feature_slug=current_feature_slug,
) as span:
if apply_pre:
apply_pre(span, bound)
Expand Down Expand Up @@ -556,9 +552,18 @@ async def async_wrapper(
*args: object,
**kwargs: object,
) -> object:
computed_metadata, bound, input_payload, variables_payload, pre_evaluators = (
prepare_call_data(args, kwargs)
)
# Early exit: if no root span exists, just call the function (no-op)
if not otel_context.get_value(ROOT_SPAN_CONTEXT_KEY):
return await func(*args, **kwargs)

computed_metadata, bound, input_payload, variables_payload, pre_evaluators = prepare_call_data(args, kwargs)

# Inherit feature_slug from parent context if available
from .trace_context import FEATURE_SLUG_CONTEXT_KEY

current_feature_slug = otel_context.get_value(FEATURE_SLUG_CONTEXT_KEY)
if not isinstance(current_feature_slug, str):
current_feature_slug = None

with _with_span_handle(
name=self.name,
Expand All @@ -570,6 +575,7 @@ async def async_wrapper(
variables=variables_payload,
evaluators=pre_evaluators,
metadata=computed_metadata,
feature_slug=current_feature_slug,
) as span:
if apply_pre:
apply_pre(span, bound)
Expand Down Expand Up @@ -598,9 +604,7 @@ async def async_wrapper(
# Static Domain Methods

@staticmethod
def _identify(
user: str | dict[str, Any] | None = None, organization: str | dict[str, Any] | None = None
) -> None:
def _identify(user: str | dict[str, Any] | None = None, organization: str | dict[str, Any] | None = None) -> None:
"""Set the user and/or organization identity for the current context."""
if user:
if isinstance(user, str):
Expand All @@ -612,9 +616,7 @@ def _identify(
if isinstance(organization, str):
_set_trace_organization(organization_id=organization)
elif isinstance(organization, dict):
_set_trace_organization(
organization_id=organization.get("id", "unknown"), name=organization.get("name")
)
_set_trace_organization(organization_id=organization.get("id", "unknown"), name=organization.get("name"))

@staticmethod
def _root_span() -> StartSpanHandle | None:
Expand Down Expand Up @@ -870,10 +872,7 @@ def _set_evaluation_config(config: EvaluationConfig | dict[str, Any]) -> None:
import logging

logger = logging.getLogger(__name__)
logger.warning(
"_set_evaluation_config() can only be called on root spans (StartSpanHandle). "
"This call will be ignored."
)
logger.warning("_set_evaluation_config() can only be called on root spans (StartSpanHandle). This call will be ignored.")
return
handle.set_evaluation_config(config)

Expand Down Expand Up @@ -1038,8 +1037,7 @@ def __init__(
# Validate name is provided and non-empty
if not name or not isinstance(name, str) or not name.strip():
raise ValueError(
"name is required and must be a non-empty string. "
"Please provide a descriptive name for this span (e.g., 'async_workflow', 'async_operation')."
"name is required and must be a non-empty string. Please provide a descriptive name for this span (e.g., 'async_workflow', 'async_operation')."
)

self.name = name.strip()
Expand Down Expand Up @@ -1129,8 +1127,7 @@ def __init__(
# Validate name is provided and non-empty
if not name or not isinstance(name, str) or not name.strip():
raise ValueError(
"name is required and must be a non-empty string. "
"Please provide a descriptive name for this span (e.g., 'async_operation', 'async_fetch')."
"name is required and must be a non-empty string. Please provide a descriptive name for this span (e.g., 'async_operation', 'async_fetch')."
)

self.name = name.strip()
Expand All @@ -1155,10 +1152,7 @@ async def __aenter__(self) -> SpanHandle:

# Reject ROOT kind
if kind_str == ObserveKind.ROOT.value:
raise ValueError(
f"Cannot use kind='{ObserveKind.ROOT.value}' with AsyncObserve. "
f"Use AsyncStartObserve (async_start_observe) for root spans."
)
raise ValueError(f"Cannot use kind='{ObserveKind.ROOT.value}' with AsyncObserve. Use AsyncStartObserve (async_start_observe) for root spans.")

handle_cls, tracer_name, _, _ = self._get_config_for_kind(kind_str)

Expand Down Expand Up @@ -1191,9 +1185,7 @@ async def __aenter__(self) -> SpanHandle:
import logging

logger = logging.getLogger(__name__)
logger.warning(
"AsyncObserve used without a preceding async_start_observe. This may lead to missing trace context."
)
logger.warning("AsyncObserve used without a preceding async_start_observe. This may lead to missing trace context.")

self._ctx_manager = _async_with_span_handle(
name=span_name,
Expand Down
Loading