diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index 306836f..13e271a 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -15,7 +15,7 @@ jobs: strategy: matrix: - python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.10', '3.11', '3.12', '3.13', '3.14'] steps: - uses: actions/checkout@v3 @@ -29,9 +29,8 @@ jobs: working-directory: ./ run: | python -m pip install --upgrade pip - pip install pytest pytest-cov + pip install pytest pytest-cov anyio pytest-asyncio pip install -e . - pip install -r dev-requirements.txt - name: Run tests working-directory: ./ diff --git a/basalt/observability/api.py b/basalt/observability/api.py index d2827a7..ed8fb6a 100644 --- a/basalt/observability/api.py +++ b/basalt/observability/api.py @@ -4,7 +4,7 @@ import inspect from collections.abc import Callable, Sequence from contextlib import ContextDecorator -from typing import TYPE_CHECKING, Any, NotRequired, TypedDict, TypeVar +from typing import TYPE_CHECKING, Any, TypeVar from opentelemetry.trace import StatusCode @@ -31,6 +31,7 @@ get_root_span_handle, ) from .decorators import ObserveKind +from .types import Identity from .utils import ( apply_llm_request_metadata, apply_llm_response_metadata, @@ -49,28 +50,6 @@ F = TypeVar("F", bound=Callable[..., Any]) -class _IdentityEntity(TypedDict): - """Identity entity with required id and optional name.""" - - id: str - name: NotRequired[str] - - -class Identity(TypedDict, total=False): - """ - Identity structure for user and organization tracking. - - Example: - { - "organization": {"id": "123", "name": "ACME"}, - "user": {"id": "456", "name": "John Doe"} - } - """ - - organization: NotRequired[_IdentityEntity] - user: NotRequired[_IdentityEntity] - - class StartObserve(ContextDecorator): """ Entry point for Basalt observability. @@ -822,22 +801,23 @@ def add_evaluators(*slugs: str) -> None: handle.add_evaluators(*slugs) @staticmethod - def set_identity( - *, - user_id: str | None = None, - user_name: str | None = None, - organization_id: str | None = None, - organization_name: str | None = None, - ) -> None: - """Set identity on the current span.""" + def set_identity(identity: Identity | None = None) -> None: + """ + Set identity on the current span. + + Args: + identity: Identity TypedDict with optional 'user' and 'organization' keys. + Each key should contain a dict with 'id' (required) and 'name' (optional). + + Example: + >>> Observe.set_identity({ + ... "user": {"id": "user-123", "name": "John Doe"}, + ... "organization": {"id": "org-456", "name": "ACME Corp"} + ... }) + """ handle = get_current_span_handle() if handle: - handle.set_identity( - user_id=user_id, - user_name=user_name, - organization_id=organization_id, - organization_name=organization_name, - ) + handle.set_identity(identity) class AsyncStartObserve: diff --git a/basalt/observability/context_managers.py b/basalt/observability/context_managers.py index ead8faa..53998d9 100644 --- a/basalt/observability/context_managers.py +++ b/basalt/observability/context_managers.py @@ -29,6 +29,7 @@ apply_organization_from_context, apply_user_from_context, ) +from .types import Identity SPAN_TYPE_ATTRIBUTE = semconv.BasaltSpan.KIND EVALUATOR_CONTEXT_KEY: Final[str] = "basalt.context.evaluators" @@ -364,31 +365,30 @@ def add_evaluators(self, *evaluators: Sequence[str]) -> None: for attachment in normalize_evaluator_specs(evaluators): self._append_evaluator(attachment) - def set_identity( - self, - *, - user_id: str | None = None, - user_name: str | None = None, - organization_id: str | None = None, - organization_name: str | None = None, - ) -> None: + def set_identity(self, identity: Identity | None = None) -> None: """ Set user and/or organization identity for the span. Args: - user_id: User identifier to associate with the span. - user_name: Optional user display name. - organization_id: Organization identifier to associate with the span. - organization_name: Optional organization display name. - """ - if user_id is not None: - self._span.set_attribute(semconv.BasaltUser.ID, user_id) - if user_name is not None: - self._span.set_attribute(semconv.BasaltUser.NAME, user_name) - if organization_id is not None: - self._span.set_attribute(semconv.BasaltOrganization.ID, organization_id) - if organization_name is not None: - self._span.set_attribute(semconv.BasaltOrganization.NAME, organization_name) + identity: Identity TypedDict with optional 'user' and 'organization' keys. + Each key should contain a dict with 'id' (required) and 'name' (optional). + + Example: + >>> span.set_identity({ + ... "user": {"id": "user-123", "name": "John Doe"}, + ... "organization": {"id": "org-456", "name": "ACME Corp"} + ... }) + """ + if identity is None: + return + + user_spec = identity.get("user") if identity else None + org_spec = identity.get("organization") if identity else None + + if user_spec is not None: + apply_user_from_context(self._span, user_spec) + if org_spec is not None: + apply_organization_from_context(self._span, org_spec) def _append_evaluator(self, attachment: EvaluatorAttachment) -> None: """Attach an evaluator to the span, avoiding duplicates. @@ -728,6 +728,27 @@ def _with_span_handle( # Mark all basalt spans with basalt.in_trace span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + # Inject prompt context if available + try: + from basalt.prompts.models import _current_prompt_context + prompt_ctx = _current_prompt_context.get() + if prompt_ctx: + # Inject prompt attributes into this span + import json + span.set_attribute("basalt.prompt.slug", prompt_ctx["slug"]) + if prompt_ctx.get("version"): + span.set_attribute("basalt.prompt.version", prompt_ctx["version"]) + if prompt_ctx.get("tag"): + span.set_attribute("basalt.prompt.tag", prompt_ctx["tag"]) + span.set_attribute("basalt.prompt.model.provider", prompt_ctx["provider"]) + span.set_attribute("basalt.prompt.model.model", prompt_ctx["model"]) + if prompt_ctx.get("variables"): + span.set_attribute("basalt.prompt.variables", json.dumps(prompt_ctx["variables"])) + span.set_attribute("basalt.prompt.from_cache", prompt_ctx["from_cache"]) + except ImportError: + # Prompts module not available, skip injection + pass + _attach_attributes(span, attributes) # Apply metadata if provided @@ -840,6 +861,27 @@ async def _async_with_span_handle( # Mark all basalt spans with basalt.in_trace span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + # Inject prompt context if available + try: + from basalt.prompts.models import _current_prompt_context + prompt_ctx = _current_prompt_context.get() + if prompt_ctx: + # Inject prompt attributes into this span + import json + span.set_attribute("basalt.prompt.slug", prompt_ctx["slug"]) + if prompt_ctx.get("version"): + span.set_attribute("basalt.prompt.version", prompt_ctx["version"]) + if prompt_ctx.get("tag"): + span.set_attribute("basalt.prompt.tag", prompt_ctx["tag"]) + span.set_attribute("basalt.prompt.model.provider", prompt_ctx["provider"]) + span.set_attribute("basalt.prompt.model.model", prompt_ctx["model"]) + if prompt_ctx.get("variables"): + span.set_attribute("basalt.prompt.variables", json.dumps(prompt_ctx["variables"])) + span.set_attribute("basalt.prompt.from_cache", prompt_ctx["from_cache"]) + except ImportError: + # Prompts module not available, skip injection + pass + _attach_attributes(span, attributes) # Apply metadata if provided diff --git a/basalt/observability/processors.py b/basalt/observability/processors.py index 687c18d..1fd55da 100644 --- a/basalt/observability/processors.py +++ b/basalt/observability/processors.py @@ -328,6 +328,38 @@ def on_start(self, span: Span, parent_context: Any | None = None) -> None: # ty # Apply prompt attributes (slug, version, provider, model) for key, value in prompt_data.items(): span.set_attribute(f"basalt.prompt.{key}", value) + else: + # If no explicit injection, try to read from ContextVar + # This allows auto-instrumented spans to inherit prompt context + # from the parent prompt context manager + try: + from basalt.prompts.models import _current_prompt_context + prompt_ctx = _current_prompt_context.get() + if prompt_ctx: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"✓ Injecting prompt context from ContextVar for span '{scope.name}': slug='{prompt_ctx['slug']}'") + # Inject prompt attributes from ContextVar + span.set_attribute("basalt.prompt.slug", prompt_ctx["slug"]) + if prompt_ctx.get("version"): + span.set_attribute("basalt.prompt.version", prompt_ctx["version"]) + if prompt_ctx.get("tag"): + span.set_attribute("basalt.prompt.tag", prompt_ctx["tag"]) + span.set_attribute("basalt.prompt.model.provider", prompt_ctx["provider"]) + span.set_attribute("basalt.prompt.model.model", prompt_ctx["model"]) + if prompt_ctx.get("variables"): + span.set_attribute("basalt.prompt.variables", json.dumps(prompt_ctx["variables"])) + span.set_attribute("basalt.prompt.from_cache", prompt_ctx["from_cache"]) + else: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"✗ No prompt context found in ContextVar for auto-instrumented span '{scope.name}'") + except (ImportError, LookupError) as e: + import logging + logger = logging.getLogger(__name__) + logger.debug(f"✗ Failed to read prompt context for span '{scope.name}': {e}") + # Prompts module not available or no context set - skip injection + pass # Clear all pending injection data (single-use semantics) # We create a new context with all injection keys set to None diff --git a/basalt/observability/request_tracing.py b/basalt/observability/request_tracing.py index 3074441..93410d3 100644 --- a/basalt/observability/request_tracing.py +++ b/basalt/observability/request_tracing.py @@ -51,8 +51,16 @@ async def trace_async_request( ) raise + # Type-safe output formatting for PromptRequestSpan + from basalt.prompts.client import PromptRequestSpan + if isinstance(span_data, PromptRequestSpan): + output = span_data.format_output(result) + else: + status_code = getattr(result, "status_code", None) + output = {"status_code": status_code} + + observe.set_output(output) status_code = getattr(result, "status_code", None) - observe.set_output({"status_code": status_code}) span_data.finalize( span, duration_s=time.perf_counter() - start, @@ -101,8 +109,16 @@ def trace_sync_request( ) raise + # Type-safe output formatting for PromptRequestSpan + from basalt.prompts.client import PromptRequestSpan + if isinstance(span_data, PromptRequestSpan): + output = span_data.format_output(result) + else: + status_code = getattr(result, "status_code", None) + output = {"status_code": status_code} + + observe.set_output(output) status_code = getattr(result, "status_code", None) - observe.set_output({"status_code": status_code}) span_data.finalize( span, duration_s=time.perf_counter() - start, diff --git a/basalt/observability/semconv.py b/basalt/observability/semconv.py index cbd97f0..5cfe3f3 100644 --- a/basalt/observability/semconv.py +++ b/basalt/observability/semconv.py @@ -362,6 +362,13 @@ class BasaltAPI: Examples: "get", "list", "create", "update" """ + INTERNAL: Final[str] = "basalt.internal.api" + """ + Marks internal Basalt SDK API calls. + Type: boolean + Value: True for SDK->API requests (prompts, datasets, etc.) + """ + class BasaltCache: """Basalt caching attributes.""" diff --git a/basalt/observability/spans.py b/basalt/observability/spans.py index 3b67cf5..7a9762b 100644 --- a/basalt/observability/spans.py +++ b/basalt/observability/spans.py @@ -40,6 +40,7 @@ def start_attributes(self) -> dict[str, Any]: attributes: dict[str, Any] = { semconv.BasaltAPI.CLIENT: self.client, semconv.BasaltAPI.OPERATION: self.operation, + semconv.BasaltAPI.INTERNAL: True, semconv.HTTP.METHOD: self.method.upper(), semconv.HTTP.URL: self.url, } diff --git a/basalt/observability/types.py b/basalt/observability/types.py new file mode 100644 index 0000000..e298468 --- /dev/null +++ b/basalt/observability/types.py @@ -0,0 +1,27 @@ +"""Shared observability types.""" + +from __future__ import annotations + +from typing import TypedDict + + +class _IdentityEntity(TypedDict): + """Identity entity with required id and optional name.""" + + id: str + name: str + + +class Identity(TypedDict, total=False): + """ + Identity structure for user and organization tracking. + + Example: + { + "organization": {"id": "123", "name": "ACME"}, + "user": {"id": "456", "name": "John Doe"} + } + """ + + organization: _IdentityEntity + user: _IdentityEntity diff --git a/basalt/prompts/client.py b/basalt/prompts/client.py index a2e4d37..b8c8610 100644 --- a/basalt/prompts/client.py +++ b/basalt/prompts/client.py @@ -5,11 +5,12 @@ """ from __future__ import annotations -from typing import cast +from typing import Any, cast from .._internal.base_client import BaseServiceClient from .._internal.http import HTTPClient from ..config import config +from ..observability.spans import BasaltRequestSpan from ..types.cache import CacheProtocol from ..types.exceptions import BasaltAPIError from .models import ( @@ -23,6 +24,24 @@ ) +class PromptRequestSpan(BasaltRequestSpan): + """Span metadata for prompt API requests with JSON output.""" + + def format_output(self, response_data: dict[str, Any]) -> dict[str, Any]: + """Return the Prompt object as JSON output.""" + prompt_data = response_data.get("prompt", {}) + if not prompt_data: + # Fallback to status code if no prompt data + status_code = getattr(response_data, "status_code", None) + return {"status_code": status_code} + + # Return the full prompt object as JSON + return { + "prompt": prompt_data, + "from_cache": response_data.get("from_cache", False) + } + + class PromptsClient(BaseServiceClient): """ Client for interacting with the Basalt Prompts API. @@ -59,6 +78,74 @@ def __init__( # Cache responses for 5 minutes self._cache_duration = 5 * 60 + async def _request_async( + self, + operation: str, + *, + method: str, + url: str, + span_attributes: dict[str, Any] | None = None, + span_variables: dict[str, Any] | None = None, + cache_hit: bool | None = None, + **request_kwargs: Any, + ): + """Override to use PromptRequestSpan for custom output formatting.""" + # Lazy import to avoid circular dependency + import functools + + from basalt.observability.request_tracing import trace_async_request + + span = PromptRequestSpan( + client=self._client_name, + operation=operation, + method=method, + url=url, + cache_hit=cache_hit, + extra_attributes=self._filter_attributes(span_attributes), + variables=self._filter_attributes(span_variables), + ) + call = functools.partial( + self._http_client.fetch, + url=url, + method=method, + **request_kwargs, + ) + return await trace_async_request(span, call) + + def _request_sync( + self, + operation: str, + *, + method: str, + url: str, + span_attributes: dict[str, Any] | None = None, + span_variables: dict[str, Any] | None = None, + cache_hit: bool | None = None, + **request_kwargs: Any, + ): + """Override to use PromptRequestSpan for custom output formatting.""" + # Lazy import to avoid circular dependency + import functools + + from basalt.observability.request_tracing import trace_sync_request + + span = PromptRequestSpan( + client=self._client_name, + operation=operation, + method=method, + url=url, + cache_hit=cache_hit, + extra_attributes=self._filter_attributes(span_attributes), + variables=self._filter_attributes(span_variables), + ) + call = functools.partial( + self._http_client.fetch_sync, + url=url, + method=method, + **request_kwargs, + ) + return trace_sync_request(span, call) + async def get( self, slug: str, diff --git a/basalt/prompts/models.py b/basalt/prompts/models.py index 88e3e9b..74c3380 100644 --- a/basalt/prompts/models.py +++ b/basalt/prompts/models.py @@ -6,10 +6,18 @@ """ from __future__ import annotations +import json from collections.abc import Mapping +from contextvars import ContextVar from dataclasses import dataclass from typing import Any +# Context variable for prompt data injection into child spans +_current_prompt_context: ContextVar[dict[str, Any] | None] = ContextVar( + '_current_prompt_context', + default=None +) + @dataclass(slots=True, frozen=True) class PromptModelParameters: @@ -172,7 +180,29 @@ def compile_variables(self, variables: dict[str, Any]) -> Prompt: return self -class PromptContextManager: +class _PromptContextMixin: + """Shared span helper for prompt context managers.""" + + def _set_span_attributes(self) -> None: + from basalt.observability import semconv + from basalt.observability.context_managers import get_tracer + + tracer = get_tracer("basalt.prompts") + with tracer.start_as_current_span(f"basalt.prompt.{self._slug}") as span: + span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + span.set_attribute("basalt.prompt.slug", self._slug) + if self._version: + span.set_attribute("basalt.prompt.version", self._version) + if self._tag: + span.set_attribute("basalt.prompt.tag", self._tag) + span.set_attribute("basalt.prompt.model.provider", self._prompt.model.provider) + span.set_attribute("basalt.prompt.model.model", self._prompt.model.model) + if self._variables: + span.set_attribute("basalt.prompt.variables", json.dumps(self._variables)) + span.set_attribute("basalt.prompt.from_cache", self._from_cache) + + +class PromptContextManager(_PromptContextMixin): """ Context manager wrapper for Prompt that enables observability. @@ -200,13 +230,13 @@ def __init__( from_cache: bool = False, ): """ - Initialize the wrapper and create an immediate span for tracking. + Initialize the wrapper. Args: prompt: The Prompt dataclass to wrap - slug: Prompt slug for span naming and attributes - version: Prompt version for span attributes - tag: Prompt tag for span attributes + slug: Prompt slug for context injection + version: Prompt version for context injection + tag: Prompt tag for context injection variables: Variables used in prompt compilation from_cache: Whether the prompt was retrieved from cache """ @@ -216,56 +246,8 @@ def __init__( self._tag = tag self._variables = variables self._from_cache = from_cache - - # Context manager state - self._span = None - self._span_context = None - self._tracer = None - - # Create and immediately end a span for imperative usage tracking - self._create_immediate_span() - - def _create_immediate_span(self): - """Create a span that immediately ends to track prompt fetches.""" - try: - from ..observability.context_managers import get_tracer - - tracer = get_tracer("basalt.prompts") - - # Create span context - with tracer.start_as_current_span(f"prompt.{self._slug}") as span: - # Set span attributes - self._set_span_attributes(span) - except Exception: - # Silently fail if observability is not available - # Prompt functionality should not break due to observability issues - pass - - def _set_span_attributes(self, span): - """Set all prompt-related attributes on a span.""" - import json - - from ..observability import semconv - - span.set_attribute("basalt.span.kind", "function") - span.set_attribute("basalt.prompt.slug", self._slug) - - if self._version: - span.set_attribute("basalt.prompt.version", self._version) - - if self._tag: - span.set_attribute("basalt.prompt.tag", self._tag) - - span.set_attribute("basalt.prompt.model.provider", self._prompt.model.provider) - span.set_attribute("basalt.prompt.model.model", self._prompt.model.model) - - if self._variables: - span.set_attribute("basalt.prompt.variables", json.dumps(self._variables)) - - span.set_attribute("basalt.prompt.from_cache", self._from_cache) - - # Mark all prompt spans with basalt.in_trace - span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + self._context_token = None + # Span creation removed - ContextVar injection is sufficient for auto-instrumented spans def __getattr__(self, name): """Forward all attribute access to the wrapped Prompt.""" @@ -273,37 +255,31 @@ def __getattr__(self, name): def __enter__(self): """ - Enter context manager mode - create a new span that stays open. + Enter context manager mode - set prompt context for child spans. - This allows auto-instrumented GenAI calls to nest under the prompt span. + This allows child spans to automatically receive prompt attributes. """ - try: - from opentelemetry import trace - - from ..observability.context_managers import get_tracer - - self._tracer = get_tracer("basalt.prompts") - - # Start a new span that will stay open - self._span = self._tracer.start_span(f"prompt.{self._slug}") - self._set_span_attributes(self._span) - - # Use span context that will auto-end on exit - self._span_context = trace.use_span(self._span, end_on_exit=True) - self._span_context.__enter__() - except Exception: - # Silently fail - observability should not break functionality - pass - + # Store prompt metadata in context + prompt_ctx = { + "slug": self._slug, + "version": self._version, + "tag": self._tag, + "provider": self._prompt.model.provider, + "model": self._prompt.model.model, + "variables": self._variables, + "from_cache": self._from_cache, + } + self._context_token = _current_prompt_context.set(prompt_ctx) return self def __exit__(self, exc_type, exc_val, exc_tb): - """Exit context manager mode - end the span.""" - if self._span_context: - try: - self._span_context.__exit__(exc_type, exc_val, exc_tb) - except Exception: - pass + """Exit context manager mode - clear prompt context.""" + try: + # Any cleanup logic + pass + finally: + if self._context_token is not None: + _current_prompt_context.reset(self._context_token) # Don't suppress exceptions return False @@ -317,7 +293,7 @@ def __str__(self): return str(self._prompt) -class AsyncPromptContextManager: +class AsyncPromptContextManager(_PromptContextMixin): """ Async context manager wrapper for Prompt that enables observability. @@ -335,13 +311,13 @@ def __init__( from_cache: bool = False, ): """ - Initialize the wrapper and create an immediate span for tracking. + Initialize the wrapper. Args: prompt: The Prompt dataclass to wrap - slug: Prompt slug for span naming and attributes - version: Prompt version for span attributes - tag: Prompt tag for span attributes + slug: Prompt slug for context injection + version: Prompt version for context injection + tag: Prompt tag for context injection variables: Variables used in prompt compilation from_cache: Whether the prompt was retrieved from cache """ @@ -351,55 +327,8 @@ def __init__( self._tag = tag self._variables = variables self._from_cache = from_cache - - # Context manager state - self._span = None - self._span_context = None - self._tracer = None - - # Create and immediately end a span for imperative usage tracking - self._create_immediate_span() - - def _create_immediate_span(self): - """Create a span that immediately ends to track prompt fetches.""" - try: - from ..observability.context_managers import get_tracer - - tracer = get_tracer("basalt.prompts") - - # Create span context - with tracer.start_as_current_span(f"prompt.{self._slug}") as span: - # Set span attributes - self._set_span_attributes(span) - except Exception: - # Silently fail if observability is not available - pass - - def _set_span_attributes(self, span): - """Set all prompt-related attributes on a span.""" - import json - - from ..observability import semconv - - span.set_attribute("basalt.span.kind", "function") - span.set_attribute("basalt.prompt.slug", self._slug) - - if self._version: - span.set_attribute("basalt.prompt.version", self._version) - - if self._tag: - span.set_attribute("basalt.prompt.tag", self._tag) - - span.set_attribute("basalt.prompt.model.provider", self._prompt.model.provider) - span.set_attribute("basalt.prompt.model.model", self._prompt.model.model) - - if self._variables: - span.set_attribute("basalt.prompt.variables", json.dumps(self._variables)) - - span.set_attribute("basalt.prompt.from_cache", self._from_cache) - - # Mark all prompt spans with basalt.in_trace - span.set_attribute(semconv.BasaltSpan.IN_TRACE, True) + self._context_token = None + # Span creation removed - ContextVar injection is sufficient for auto-instrumented spans def __getattr__(self, name): """Forward all attribute access to the wrapped Prompt.""" @@ -407,37 +336,31 @@ def __getattr__(self, name): async def __aenter__(self): """ - Enter async context manager mode - create a new span that stays open. + Enter async context manager mode - set prompt context for child spans. - This allows auto-instrumented GenAI calls to nest under the prompt span. + This allows child spans to automatically receive prompt attributes. """ - try: - from opentelemetry import trace - - from ..observability.context_managers import get_tracer - - self._tracer = get_tracer("basalt.prompts") - - # Start a new span that will stay open - self._span = self._tracer.start_span(f"prompt.{self._slug}") - self._set_span_attributes(self._span) - - # Use span context that will auto-end on exit - self._span_context = trace.use_span(self._span, end_on_exit=True) - self._span_context.__enter__() - except Exception: - # Silently fail - observability should not break functionality - pass - + # Store prompt metadata in context + prompt_ctx = { + "slug": self._slug, + "version": self._version, + "tag": self._tag, + "provider": self._prompt.model.provider, + "model": self._prompt.model.model, + "variables": self._variables, + "from_cache": self._from_cache, + } + self._context_token = _current_prompt_context.set(prompt_ctx) return self async def __aexit__(self, exc_type, exc_val, exc_tb): - """Exit async context manager mode - end the span.""" - if self._span_context: - try: - self._span_context.__exit__(exc_type, exc_val, exc_tb) - except Exception: - pass + """Exit async context manager mode - clear prompt context.""" + try: + # Any cleanup logic + pass + finally: + if self._context_token is not None: + _current_prompt_context.reset(self._context_token) # Don't suppress exceptions return False diff --git a/docs/02-getting-started.md b/docs/02-getting-started.md index 3bef1d1..95b9b00 100644 --- a/docs/02-getting-started.md +++ b/docs/02-getting-started.md @@ -133,8 +133,8 @@ def handle_chat(message): # Method 2: Set dynamically @start_observe(name="api_handler") def handle_request(auth_token): - user_data = verify_token(auth_token) - observe.set_identity(user=user_data) + user_data = verify_token(auth_token) # Returns {"id": "user-123", "name": "John Doe"} + observe.set_identity({"user": user_data}) # Identity now set for entire trace ``` diff --git a/docs/05-observability.md b/docs/05-observability.md index c3940db..68bec10 100644 --- a/docs/05-observability.md +++ b/docs/05-observability.md @@ -94,12 +94,15 @@ def calculate(): You can enrich spans with various types of data using static methods on `observe`. These methods always apply to the *current active span*. -#### Identity (`observe.identify`) +#### Identity (`observe.set_identity`) Associate traces with users and organizations to track usage and costs. ```python -observe.set_identity(user="user_123", organization="org_abc") +observe.set_identity({ + "user": {"id": "user-123", "name": "John Doe"}, + "organization": {"id": "org-456", "name": "ACME Corp"} +}) ``` #### Metadata (`observe.metadata`, `observe.update_metadata`) diff --git a/docs/06-manual-tracing.md b/docs/06-manual-tracing.md index ddd8e70..d743621 100644 --- a/docs/06-manual-tracing.md +++ b/docs/06-manual-tracing.md @@ -378,7 +378,7 @@ When using context managers, the span handle provides these methods: **Identity & Experiments:** -- `set_identity(user=..., organization=...)` - Set user/org identity +- `set_identity(identity)` - Set user/org identity using dict format: `{"user": {"id": "...", "name": "..."}, "organization": {"id": "...", "name": "..."}}` - `add_evaluator(slug)` - Attach single evaluator - `add_evaluators(*slugs)` - Attach multiple evaluators @@ -413,6 +413,55 @@ prompt = basalt.prompts.get_sync("qa-prompt", variables={"context": "..."}) See [Prompts Documentation](03-prompts.md#observability-integration) for more details. +### Automatic Prompt Context Injection for Auto-Instrumented Spans + +**NEW**: Basalt automatically injects prompt context into auto-instrumented LLM spans (OpenAI, Anthropic, Gemini, etc.) when they are called within a prompt context manager. This eliminates the need for manual prompt attribute setting. + +```python +from basalt import Basalt +from basalt.observability import start_observe +from openai import OpenAI + +basalt = Basalt(api_key="your-api-key") +openai_client = OpenAI(api_key="your-openai-key") + +# Root trace +with start_observe(name="process_request", feature_slug="support-ticket"): + # Fetch prompt with context manager + with basalt.prompts.get_sync("joke-analyzer", variables={"jokeText": "..."}) as prompt: + # Auto-instrumented OpenAI call automatically receives prompt attributes + response = openai_client.chat.completions.create( + model=prompt.model.model, + messages=[{"role": "user", "content": prompt.text}] + ) + # The OpenAI span will automatically have: + # - basalt.prompt.slug = "joke-analyzer" + # - basalt.prompt.version = (prompt version) + # - basalt.prompt.model.provider = "openai" + # - basalt.prompt.model.model = "gpt-4" + # - basalt.prompt.variables = {"jokeText": "..."} + # - basalt.prompt.from_cache = true/false +``` + +**How it works:** +- When you use a prompt context manager (`with prompts.get_sync(...)`), it sets an internal context variable +- Auto-instrumented spans (OpenAI, Gemini, Anthropic, etc.) automatically detect this context +- Prompt attributes are injected into the span without any manual intervention + +**Supported providers:** +- OpenAI +- Anthropic (Claude) +- Google Gemini +- AWS Bedrock +- Mistral +- Any other auto-instrumented LLM provider + +**Benefits:** +- Zero boilerplate - no manual attribute setting required +- Consistent across all providers +- Works with nested prompt contexts (inner context wins) +- Thread-safe and async-compatible + ## Best Practices ### Set Input and Output diff --git a/docs/12-user-org-tracking.md b/docs/12-user-org-tracking.md index e69de29..cda0c84 100644 --- a/docs/12-user-org-tracking.md +++ b/docs/12-user-org-tracking.md @@ -0,0 +1,223 @@ +# User and Organization Tracking + +Basalt allows you to associate traces with users and organizations, enabling you to track usage, costs, and performance metrics on a per-user or per-organization basis. + +## Identity Structure + +Identity information is provided as a dictionary with optional `user` and `organization` keys. Each key contains an entity with: +- `id` (required): Unique identifier +- `name` (optional): Display name + +```python +from basalt.observability import Identity + +identity: Identity = { + "user": { + "id": "user-123", + "name": "John Doe" # optional + }, + "organization": { + "id": "org-456", + "name": "ACME Corp" # optional + } +} +``` + +## Setting Identity on Root Spans + +### Using the Decorator + +The most common approach is to set identity when creating a root span with `@start_observe`: + +```python +from basalt.observability import start_observe + +# Static identity +@start_observe( + name="process_order", + feature_slug="checkout", + identity={ + "user": {"id": "user-123", "name": "John Doe"}, + "organization": {"id": "org-456", "name": "ACME Corp"} + } +) +def process_order(order_id): + # Identity automatically propagates to all child spans + pass +``` + +### Using a Callable + +You can also provide a callable that resolves identity from function arguments: + +```python +def resolve_identity_from_request(request): + """Extract identity from request context.""" + return { + "user": { + "id": request.user.id, + "name": request.user.name + }, + "organization": { + "id": request.user.org_id, + "name": request.user.org_name + } + } + +@start_observe( + name="api_endpoint", + feature_slug="api", + identity=resolve_identity_from_request +) +def handle_request(request): + # Identity resolved from request argument + pass +``` + +## Setting Identity Dynamically + +You can also set identity dynamically within a span using `observe.set_identity()`: + +```python +from basalt.observability import observe, start_observe + +@start_observe(name="workflow") +def process_data(auth_token): + # Authenticate and get user info + user_info = authenticate(auth_token) + + # Set identity dynamically + observe.set_identity({ + "user": { + "id": user_info["id"], + "name": user_info["name"] + } + }) + + # Continue processing... +``` + +### On Span Handles + +When using context managers, you can set identity on the span handle: + +```python +from basalt.observability import async_start_observe + +async def workflow(): + async with async_start_observe(name="process", feature_slug="task") as span: + # Set identity on the span handle + span.set_identity({ + "user": {"id": "user-789"}, + "organization": {"id": "org-101", "name": "Beta Corp"} + }) + + # Process... +``` + +## Identity Propagation + +Identity set on a root span automatically propagates to all child spans. This means you only need to set it once at the entry point: + +```python +@start_observe( + name="order_pipeline", + feature_slug="orders", + identity={"user": {"id": "user-123"}} +) +def process_order(): + # Identity propagates to this child span + @observe(kind=ObserveKind.SPAN, name="validate_order") + def validate(): + pass + + # And to this child span + @observe(kind=ObserveKind.SPAN, name="charge_payment") + def charge(): + pass + + validate() + charge() +``` + +## Partial Identity + +You can set only user or only organization identity as needed: + +```python +# User only +observe.set_identity({ + "user": {"id": "user-123", "name": "John Doe"} +}) + +# Organization only +observe.set_identity({ + "organization": {"id": "org-456"} +}) + +# Both (either can omit the name) +observe.set_identity({ + "user": {"id": "user-123"}, + "organization": {"id": "org-456", "name": "ACME Corp"} +}) +``` + +## Best Practices + +1. **Set identity early**: Set identity on the root span or as early as possible in the trace +2. **Use IDs consistently**: Ensure user and organization IDs are consistent across all traces +3. **Include names when available**: Names make traces easier to understand in the UI +4. **Leverage propagation**: Take advantage of automatic propagation to avoid redundant identity setting +5. **Handle authentication**: Set identity after authentication to ensure accurate tracking + +## Example: Flask API + +```python +from flask import Flask, request +from basalt.observability import start_observe, observe + +app = Flask(__name__) + +def get_identity_from_request(): + """Extract identity from Flask request context.""" + auth_header = request.headers.get("Authorization") + if not auth_header: + return None + + user = authenticate(auth_header) + return { + "user": { + "id": user.id, + "name": user.full_name + }, + "organization": { + "id": user.organization.id, + "name": user.organization.name + } + } + +@app.route("/api/orders/") +@start_observe(name="get_order", feature_slug="orders", identity=get_identity_from_request) +def get_order(order_id): + # Identity automatically set from request + order = fetch_order(order_id) + return {"order": order} +``` + +## Identity in Async Contexts + +Identity works seamlessly with async/await: + +```python +from basalt.observability import async_start_observe + +@async_start_observe( + name="async_workflow", + feature_slug="background-jobs", + identity={"user": {"id": "user-123"}} +) +async def process_async(data): + # Identity propagates through async calls + await step_one() + await step_two() +``` diff --git a/tests/observability/test_context_managers.py b/tests/observability/test_context_managers.py index 89a6ed6..9d35b38 100644 --- a/tests/observability/test_context_managers.py +++ b/tests/observability/test_context_managers.py @@ -281,7 +281,7 @@ def test_identify_with_user_only(): mock_span = SimpleSpanMock() span_handle = SpanHandle(span=mock_span) - span_handle.set_identity(user_id="user-123", user_name="John Doe") + span_handle.set_identity({"user": {"id": "user-123", "name": "John Doe"}}) assert mock_span.attributes[semconv.BasaltUser.ID] == "user-123" assert mock_span.attributes[semconv.BasaltUser.NAME] == "John Doe" @@ -294,7 +294,7 @@ def test_identify_with_organization_only(): mock_span = SimpleSpanMock() span_handle = SpanHandle(span=mock_span) - span_handle.set_identity(organization_id="org-456", organization_name="Acme Corp") + span_handle.set_identity({"organization": {"id": "org-456", "name": "Acme Corp"}}) assert mock_span.attributes[semconv.BasaltOrganization.ID] == "org-456" assert mock_span.attributes[semconv.BasaltOrganization.NAME] == "Acme Corp" @@ -307,12 +307,10 @@ def test_identify_with_both_user_and_organization(): mock_span = SimpleSpanMock() span_handle = SpanHandle(span=mock_span) - span_handle.set_identity( - user_id="user-123", - user_name="John Doe", - organization_id="org-456", - organization_name="Acme Corp" - ) + span_handle.set_identity({ + "user": {"id": "user-123", "name": "John Doe"}, + "organization": {"id": "org-456", "name": "Acme Corp"} + }) assert mock_span.attributes[semconv.BasaltUser.ID] == "user-123" assert mock_span.attributes[semconv.BasaltUser.NAME] == "John Doe" @@ -326,7 +324,10 @@ def test_identify_with_ids_only(): mock_span = SimpleSpanMock() span_handle = SpanHandle(span=mock_span) - span_handle.set_identity(user_id="user-789", organization_id="org-101") + span_handle.set_identity({ + "user": {"id": "user-789"}, + "organization": {"id": "org-101"} + }) assert mock_span.attributes[semconv.BasaltUser.ID] == "user-789" assert mock_span.attributes[semconv.BasaltOrganization.ID] == "org-101" @@ -340,7 +341,7 @@ def test_identify_with_no_parameters(): mock_span = SimpleSpanMock() span_handle = SpanHandle(span=mock_span) - span_handle.set_identity() + span_handle.set_identity(None) assert semconv.BasaltUser.ID not in mock_span.attributes assert semconv.BasaltOrganization.ID not in mock_span.attributes @@ -427,13 +428,13 @@ async def test_async_start_observe_with_metadata(): # Verify span was created successfully assert span is not None assert isinstance(span._span, object) # Span exists - + # Verify metadata was properly stored in basalt.metadata spans = exporter.get_finished_spans() test_span = next((s for s in spans if s.name == "test_with_metadata"), None) assert test_span is not None assert semconv.BasaltSpan.METADATA in test_span.attributes - + metadata_json = test_span.attributes[semconv.BasaltSpan.METADATA] parsed_metadata = json.loads(metadata_json) assert parsed_metadata["custom_key"] == "custom_value" diff --git a/tests/observability/test_processors.py b/tests/observability/test_processors.py index fa08da3..7c0303b 100644 --- a/tests/observability/test_processors.py +++ b/tests/observability/test_processors.py @@ -103,3 +103,201 @@ def test_auto_instrumentation_processor_sets_in_trace_for_openai_v1(): mock_span.set_attribute.assert_any_call(semconv.BasaltSpan.KIND, "generation") finally: otel_context.detach(token) + + +def test_auto_instrumentation_processor_injects_prompt_from_contextvar(): + """Test that BasaltAutoInstrumentationProcessor reads _current_prompt_context and injects attributes.""" + from opentelemetry import context as otel_context + + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + from basalt.prompts.models import _current_prompt_context + + # Create a mock span with Gemini instrumentation scope + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.google_generativeai" + mock_span.instrumentation_scope = mock_scope + + # Set up a root span context + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + otel_token = otel_context.attach(ctx) + + # Set up prompt context via ContextVar + prompt_ctx = { + "slug": "test-prompt", + "version": "v1.0.0", + "tag": "latest", + "provider": "gemini", + "model": "gemini-2.5-flash-lite", + "variables": {"var1": "value1"}, + "from_cache": False + } + cv_token = _current_prompt_context.set(prompt_ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that prompt attributes were injected from ContextVar + mock_span.set_attribute.assert_any_call("basalt.prompt.slug", "test-prompt") + mock_span.set_attribute.assert_any_call("basalt.prompt.version", "v1.0.0") + mock_span.set_attribute.assert_any_call("basalt.prompt.tag", "latest") + mock_span.set_attribute.assert_any_call("basalt.prompt.model.provider", "gemini") + mock_span.set_attribute.assert_any_call("basalt.prompt.model.model", "gemini-2.5-flash-lite") + mock_span.set_attribute.assert_any_call("basalt.prompt.from_cache", False) + + # Check that variables were serialized as JSON + import json + calls = mock_span.set_attribute.call_args_list + variables_call = [call for call in calls if call[0][0] == "basalt.prompt.variables"] + assert len(variables_call) == 1 + assert json.loads(variables_call[0][0][1]) == {"var1": "value1"} + finally: + _current_prompt_context.reset(cv_token) + otel_context.detach(otel_token) + + +def test_auto_instrumentation_processor_explicit_injection_overrides_contextvar(): + """Test that explicit injection via PENDING_INJECT_PROMPT_KEY takes precedence over ContextVar.""" + from opentelemetry import context as otel_context + + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + from basalt.observability.processors import PENDING_INJECT_PROMPT_KEY + from basalt.prompts.models import _current_prompt_context + + # Create a mock span + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.openai" + mock_span.instrumentation_scope = mock_scope + + # Set up a root span context + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + + # Set up ContextVar prompt context (should be overridden) + contextvar_prompt = { + "slug": "contextvar-prompt", + "version": "v1.0.0", + "provider": "openai", + "model": "gpt-4", + "from_cache": False + } + cv_token = _current_prompt_context.set(contextvar_prompt) + + # Set up explicit injection (should take precedence) + explicit_prompt = { + "slug": "explicit-prompt", + "version": "v2.0.0", + "provider": "anthropic", + "model": "claude-3", + } + ctx = otel_context.set_value(PENDING_INJECT_PROMPT_KEY, explicit_prompt, ctx) + otel_token = otel_context.attach(ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that explicit injection won (not ContextVar) + mock_span.set_attribute.assert_any_call("basalt.prompt.slug", "explicit-prompt") + mock_span.set_attribute.assert_any_call("basalt.prompt.version", "v2.0.0") + mock_span.set_attribute.assert_any_call("basalt.prompt.provider", "anthropic") + mock_span.set_attribute.assert_any_call("basalt.prompt.model", "claude-3") + + # Verify ContextVar values were NOT used + calls = mock_span.set_attribute.call_args_list + call_dict = {call[0][0]: call[0][1] for call in calls} + assert call_dict.get("basalt.prompt.slug") != "contextvar-prompt" + finally: + _current_prompt_context.reset(cv_token) + otel_context.detach(otel_token) + + +def test_auto_instrumentation_processor_no_prompt_context(): + """Test that processor gracefully handles absence of prompt context.""" + from opentelemetry import context as otel_context + + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + + # Create a mock span + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.openai" + mock_span.instrumentation_scope = mock_scope + + # Set up a root span context but NO prompt context + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + otel_token = otel_context.attach(ctx) + + try: + # Create processor and call on_start (should not raise any exception) + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that no prompt attributes were set + calls = mock_span.set_attribute.call_args_list + prompt_calls = [call for call in calls if "basalt.prompt" in call[0][0]] + assert len(prompt_calls) == 0 + finally: + otel_context.detach(otel_token) + + +def test_auto_instrumentation_processor_prompt_context_with_optional_fields(): + """Test that processor handles prompt context with missing optional fields.""" + from opentelemetry import context as otel_context + + from basalt.observability.context_managers import ROOT_SPAN_CONTEXT_KEY + from basalt.prompts.models import _current_prompt_context + + # Create a mock span + mock_span = MagicMock() + mock_span.is_recording.return_value = True + mock_scope = MagicMock() + mock_scope.name = "opentelemetry.instrumentation.openai" + mock_span.instrumentation_scope = mock_scope + + # Set up a root span context + mock_root_span = MagicMock() + ctx = otel_context.set_value(ROOT_SPAN_CONTEXT_KEY, mock_root_span) + otel_token = otel_context.attach(ctx) + + # Set up prompt context with only required fields (no version, tag, or variables) + prompt_ctx = { + "slug": "minimal-prompt", + "version": None, + "tag": None, + "provider": "openai", + "model": "gpt-4", + "variables": None, + "from_cache": True + } + cv_token = _current_prompt_context.set(prompt_ctx) + + try: + # Create processor and call on_start + processor = processors.BasaltAutoInstrumentationProcessor() + processor.on_start(mock_span, None) + + # Verify that required fields were set + mock_span.set_attribute.assert_any_call("basalt.prompt.slug", "minimal-prompt") + mock_span.set_attribute.assert_any_call("basalt.prompt.model.provider", "openai") + mock_span.set_attribute.assert_any_call("basalt.prompt.model.model", "gpt-4") + mock_span.set_attribute.assert_any_call("basalt.prompt.from_cache", True) + + # Verify that optional fields were NOT set + calls = mock_span.set_attribute.call_args_list + call_dict = {call[0][0]: call[0][1] for call in calls} + assert "basalt.prompt.version" not in call_dict + assert "basalt.prompt.tag" not in call_dict + assert "basalt.prompt.variables" not in call_dict + finally: + _current_prompt_context.reset(cv_token) + otel_context.detach(otel_token) diff --git a/tests/prompts/test_context_manager.py b/tests/prompts/test_context_manager.py index a2f240c..b5e53ed 100644 --- a/tests/prompts/test_context_manager.py +++ b/tests/prompts/test_context_manager.py @@ -204,39 +204,6 @@ def test_wrapper_repr_and_str(mock_prompt): assert "Prompt" in repr(wrapper) assert "Prompt" in str(wrapper) - -def test_prompt_context_manager_sets_in_trace_attribute(mock_prompt): - """Test that prompt context manager includes basalt.in_trace in span attributes. - - This test verifies the implementation by checking that _set_span_attributes - is called during wrapper initialization. The method now includes - span.set_attribute(semconv.BasaltSpan.IN_TRACE, True). - """ - from unittest.mock import MagicMock, patch - - # Mock the tracer to capture span attribute calls - with patch('basalt.observability.context_managers.get_tracer') as mock_get_tracer: - mock_tracer = MagicMock() - mock_span = MagicMock() - mock_get_tracer.return_value = mock_tracer - mock_tracer.start_as_current_span.return_value.__enter__.return_value = mock_span - - PromptContextManager( - prompt=mock_prompt, - slug="test-prompt", - version="1.0.0", - tag="prod", - variables={"name": "World"}, - from_cache=False, - ) - - # Verify that set_attribute was called with basalt.in_trace - attribute_calls = [call[0] for call in mock_span.set_attribute.call_args_list] - # Check that basalt.in_trace was set (we used semconv.BasaltSpan.IN_TRACE constant) - assert any("basalt.in_trace" in str(call) for call in attribute_calls), \ - f"basalt.in_trace not found in attribute calls: {attribute_calls}" - - @pytest.mark.asyncio async def test_async_prompt_context_manager_sets_in_trace_attribute(mock_prompt): """Test that async prompt context manager uses _set_span_attributes which includes basalt.in_trace. diff --git a/tests/test_observe_decorators.py b/tests/test_observe_decorators.py index 8dc35d9..b0a5352 100644 --- a/tests/test_observe_decorators.py +++ b/tests/test_observe_decorators.py @@ -3,18 +3,6 @@ import sys -def test_imports(): - """Test that all new decorators can be imported.""" - try: - from basalt.observability import ( - ObserveKind, - observe, - ) - return True - except ImportError: - return False - - def test_observe_kind_enum(): """Test ObserveKind enum values.""" try: