Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions src/microsoft/opentelemetry/_genai/_langchain/_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,14 @@ def _start_trace(self, run: Run) -> None:
start_time_utc_nano = as_utc_nano(run.start_time)

is_agent = self._is_agent_run(run)
# Nested agents (sub-agents with an agent ancestor) must NOT inherit
# their identity from the shared ``_agent_config`` — that describes
# the top-level agent only.
is_nested_agent = is_agent and self._find_agent_ancestor(run) is not None

# Determine span name based on run type
if is_agent:
agent_name = self._resolve_agent_name(run)
agent_name = self._resolve_agent_name(run, use_config=not is_nested_agent)
span_name = (
f"{INVOKE_AGENT_OPERATION_NAME} {agent_name}"
if agent_name
Expand All @@ -193,7 +197,7 @@ def _start_trace(self, run: Run) -> None:
# The inner span shows the framework name (e.g. "invoke_agent LangGraph").
wrapper_span: Span | None = None
if is_agent:
agent_name = self._resolve_agent_name(run)
agent_name = self._resolve_agent_name(run, use_config=not is_nested_agent)
wrapper_label = agent_name or run.name
wrapper_span = self._tracer.start_span(
name=f"{INVOKE_AGENT_OPERATION_NAME} {wrapper_label}",
Expand All @@ -207,15 +211,18 @@ def _start_trace(self, run: Run) -> None:
wrapper_span.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME)
if agent_name:
wrapper_span.set_attribute(GEN_AI_AGENT_NAME_KEY, agent_name)
agent_id = self._agent_config.get("agent_id")
if agent_id:
wrapper_span.set_attribute(GEN_AI_AGENT_ID_KEY, agent_id)
agent_desc = self._agent_config.get("agent_description")
if agent_desc:
wrapper_span.set_attribute(GEN_AI_AGENT_DESCRIPTION_KEY, agent_desc)
agent_version = self._agent_config.get("agent_version")
if agent_version:
wrapper_span.set_attribute(GEN_AI_AGENT_VERSION_KEY, agent_version)
# Apply agent identity from config only for the top-level agent.
# Nested agents derive identity solely from run metadata.
if not is_nested_agent:
agent_id = self._agent_config.get("agent_id")
if agent_id:
wrapper_span.set_attribute(GEN_AI_AGENT_ID_KEY, agent_id)
agent_desc = self._agent_config.get("agent_description")
if agent_desc:
wrapper_span.set_attribute(GEN_AI_AGENT_DESCRIPTION_KEY, agent_desc)
agent_version = self._agent_config.get("agent_version")
if agent_version:
wrapper_span.set_attribute(GEN_AI_AGENT_VERSION_KEY, agent_version)
wrapper_span.set_attributes(dict(flatten(extract_agent_metadata(run))))
server_addr = self._agent_config.get("server_address")
if server_addr:
Expand Down Expand Up @@ -399,11 +406,20 @@ def _is_agent_run(self, run: Run) -> bool:
return False
return True

def _resolve_agent_name(self, run: Run) -> str | None:
"""Resolve agent name from config override, run metadata, or run name."""
# 1. Explicit config override
if name := self._agent_config.get("agent_name"):
return str(name)
def _resolve_agent_name(self, run: Run, *, use_config: bool = True) -> str | None:
"""Resolve agent name from config override, run metadata, or run name.
Args:
run: The LangChain run.
use_config: Whether to check ``_agent_config`` first. Pass
``False`` for nested (sub-) agents so their identity is
derived from the run's own metadata rather than the shared
top-level config.
"""
# 1. Explicit config override (top-level agent only)
if use_config:
if name := self._agent_config.get("agent_name"):
return str(name)
# 2. From run metadata (agent_name or lc_agent_name)
if run.extra and isinstance(run.extra, dict):
meta = run.extra.get("metadata")
Expand Down
60 changes: 48 additions & 12 deletions src/microsoft/opentelemetry/_genai/main_agent/_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord
from opentelemetry.sdk.trace import ReadableSpan, Span
from opentelemetry.sdk.trace.export import SpanProcessor
from opentelemetry.trace import Span as SpanAPI

# Each row: (target attribute on current span,
# primary source attribute on parent span,
Expand Down Expand Up @@ -55,19 +56,33 @@ class GenAIMainAgentSpanProcessor(SpanProcessor):

On ``on_start``: copies main-agent attributes from the parent span (or
falls back to the parent's ``gen_ai.agent.*`` / ``gen_ai.conversation.id``
attributes) onto the new span.
attributes) onto the new span. Also stores a reference to the parent
``Span`` so that ``on_end`` can retry propagation for children whose
parent attributes were not yet available at ``on_start`` time.

On ``on_end``: when the span is itself an ``invoke_agent`` operation and
has not already been enriched, copies its own ``gen_ai.agent.*`` /
``gen_ai.conversation.id`` attributes onto ``microsoft.gen_ai.main_agent.*``
so the top-level agent identifies itself as the main agent.
so the top-level agent identifies itself as the main agent. For other
spans that still lack ``microsoft.gen_ai.main_agent.*`` attributes, a
fallback read from the (now potentially enriched) parent is attempted.
"""

def __init__(self) -> None:
# span-id → parent Span, used for on_end fallback propagation
self._parent_spans: dict[int, SpanAPI] = {}

def on_start(self, span: Span, parent_context: context_api.Context | None = None) -> None:
parent = trace.get_current_span(parent_context)
if not parent.get_span_context().is_valid:
return

# Store parent reference for on_end fallback when on_start misses
# attributes that are set on the parent after this child is created.
span_ctx = span.get_span_context()
if span_ctx.is_valid:
self._parent_spans[span_ctx.span_id] = parent

parent_attributes = getattr(parent, "attributes", None) or {}
for target, primary, fallback in _PROPAGATION_TABLE:
value = parent_attributes.get(primary)
Expand All @@ -77,23 +92,44 @@ def on_start(self, span: Span, parent_context: context_api.Context | None = None
span.set_attribute(target, value)

def on_end(self, span: ReadableSpan) -> None:
span_id = span.context.span_id
parent = self._parent_spans.pop(span_id, None)

attributes = span.attributes or {}
if attributes.get(GEN_AI_OPERATION_NAME_KEY) != INVOKE_AGENT_OPERATION_NAME:

# Already enriched — nothing to do.
if any(k.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX) for k in attributes):
return

for key in attributes:
if key.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX):
return
# Access the internal mutable attributes dict. ``on_end`` receives a
# ``ReadableSpan`` which lacks ``set_attribute``, so we write to the
# underlying ``BoundedAttributes`` mapping directly.
mutable = getattr(span, "_attributes", None)
if mutable is None:
return

if not hasattr(span, "set_attribute"):
# Self-promotion: top-level invoke_agent spans copy their own
# gen_ai.agent.* → microsoft.gen_ai.main_agent.*
if attributes.get(GEN_AI_OPERATION_NAME_KEY) == INVOKE_AGENT_OPERATION_NAME:
for target, source in _SELF_COPY_TABLE:
value = attributes.get(source)
if value is not None:
mutable[target] = value
return
for target, source in _SELF_COPY_TABLE:
value = attributes.get(source)
if value is not None:
span.set_attribute(target, value) # type: ignore[attr-defined]

# Fallback propagation: re-read from the parent span whose attributes
# may have been set after this child was created (timing issue).
if parent is not None:
parent_attributes = getattr(parent, "attributes", None) or {}
for target, primary, fallback in _PROPAGATION_TABLE:
value = parent_attributes.get(primary)
if value is None:
value = parent_attributes.get(fallback)
if value is not None:
mutable[target] = value

def shutdown(self) -> None:
pass
self._parent_spans.clear()

def force_flush(self, timeout_millis: int = 30000) -> bool:
return True
Expand Down
66 changes: 61 additions & 5 deletions tests/genai/main_agent/test_sdk_propagation.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,14 @@ def test_non_agent_parent_does_not_propagate(self):

# ---- Timing: attributes set AFTER child creation → broken ----------------

def test_attrs_set_after_child_creation_breaks_propagation(self):
def test_attrs_set_after_child_creation_recovered_on_end(self):
"""If agent attributes are set AFTER creating the child span,
on_start cannot propagate them. This is the timing bug."""
on_start cannot propagate them — but on_end fallback re-reads
from the (now enriched) parent and fills in the gap."""
root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN)
agent_span = self.tracer.start_span("invoke_agent TravelBot", context=root_ctx)

# BUG: create child BEFORE setting attributes on parent
# Create child BEFORE setting attributes on parent
child_ctx = trace_api.set_span_in_context(agent_span)
child = self.tracer.start_span("chat gpt-4", context=child_ctx)

Expand All @@ -225,9 +226,10 @@ def test_attrs_set_after_child_creation_breaks_propagation(self):
agent_span.end()

spans = self._get_exported_spans()
self.assertIsNone(
self.assertEqual(
spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY),
"on_start fired before attrs were set — propagation must fail",
"TravelBot",
"on_end fallback should recover propagation from the parent",
)

# ---- Partial attributes: only name set -----------------------------------
Expand All @@ -247,6 +249,60 @@ def test_partial_attributes_propagate(self):
self.assertEqual(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "Bot")
self.assertIsNone(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_ID_KEY))

# ---- On-end self-promotion for root invoke_agent -------------------------

def test_root_invoke_agent_self_promotes_on_end(self):
"""A root invoke_agent span with no parent must self-promote
its gen_ai.agent.* to microsoft.gen_ai.main_agent.* on end."""
root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN)
agent = self.tracer.start_span("invoke_agent TravelBot", context=root_ctx)
agent.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME)
agent.set_attribute(GEN_AI_AGENT_NAME_KEY, "TravelBot")
agent.set_attribute(GEN_AI_AGENT_ID_KEY, "agent-1")
agent.set_attribute(GEN_AI_AGENT_VERSION_KEY, "2.0")
agent.set_attribute(GEN_AI_CONVERSATION_ID_KEY, "conv-1")
agent.end()

spans = self._get_exported_spans()
exported = spans["invoke_agent TravelBot"]
self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "TravelBot")
self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_ID_KEY), "agent-1")
self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_VERSION_KEY), "2.0")
self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY), "conv-1")

def test_nested_invoke_agent_does_not_self_promote(self):
"""A nested invoke_agent span enriched by on_start propagation
must NOT self-promote (main_agent.* already set from parent)."""
root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN)
main = self.tracer.start_span("invoke_agent MainBot", context=root_ctx)
main.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME)
main.set_attribute(GEN_AI_AGENT_NAME_KEY, "MainBot")
main.set_attribute(GEN_AI_AGENT_ID_KEY, "main-1")

sub_ctx = trace_api.set_span_in_context(main)
sub = self.tracer.start_span("invoke_agent SubBot", context=sub_ctx)
sub.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME)
sub.set_attribute(GEN_AI_AGENT_NAME_KEY, "SubBot")
sub.set_attribute(GEN_AI_AGENT_ID_KEY, "sub-1")
sub.end()
main.end()

spans = self._get_exported_spans()
sub_exported = spans["invoke_agent SubBot"]
# main_agent must be MainBot (from parent), not SubBot (own)
self.assertEqual(sub_exported.attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "MainBot")
self.assertEqual(sub_exported.attributes.get(GEN_AI_MAIN_AGENT_ID_KEY), "main-1")

def test_self_promotion_only_for_invoke_agent(self):
"""Non-invoke_agent root spans must NOT self-promote."""
root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN)
chat = self.tracer.start_span("chat gpt-4", context=root_ctx)
chat.set_attribute(GEN_AI_AGENT_NAME_KEY, "Bot")
chat.end()

spans = self._get_exported_spans()
self.assertIsNone(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY))


if __name__ == "__main__":
unittest.main()
92 changes: 54 additions & 38 deletions tests/genai/main_agent/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,67 +122,83 @@ class TestGenAIMainAgentSpanProcessorOnEnd(unittest.TestCase):
def setUp(self) -> None:
self.processor = GenAIMainAgentSpanProcessor()

def test_skipped_when_not_invoke_agent(self):
@staticmethod
def _make_span(attributes: dict, *, has_internal_attrs: bool = True):
"""Build a mock ReadableSpan with context.span_id and _attributes."""
span = MagicMock()
span.attributes = {GEN_AI_OPERATION_NAME_KEY: "chat"}
span.attributes = dict(attributes)
span.context.span_id = id(span) # unique per mock
if has_internal_attrs:
span._attributes = dict(attributes)
else:
del span._attributes
return span
Comment on lines +131 to +135

def test_skipped_when_not_invoke_agent(self):
span = self._make_span({GEN_AI_OPERATION_NAME_KEY: "chat"})

self.processor.on_end(span)

span.set_attribute.assert_not_called()
# _attributes must remain unchanged
self.assertNotIn(GEN_AI_MAIN_AGENT_NAME_KEY, span._attributes)

def test_skipped_when_main_agent_attribute_already_present(self):
span = MagicMock()
span.attributes = {
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_MAIN_AGENT_NAME_KEY: "already-set",
GEN_AI_AGENT_NAME_KEY: "self",
}
span = self._make_span(
{
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_MAIN_AGENT_NAME_KEY: "already-set",
GEN_AI_AGENT_NAME_KEY: "self",
}
)

self.processor.on_end(span)

span.set_attribute.assert_not_called()
# Must keep the existing value, not overwrite with "self"
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "already-set")

def test_copies_self_attributes_when_invoke_agent_and_unenriched(self):
span = MagicMock()
span.attributes = {
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "self-name",
GEN_AI_AGENT_ID_KEY: "self-id",
GEN_AI_AGENT_VERSION_KEY: "self-v",
GEN_AI_CONVERSATION_ID_KEY: "self-conv",
}
span = self._make_span(
{
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "self-name",
GEN_AI_AGENT_ID_KEY: "self-id",
GEN_AI_AGENT_VERSION_KEY: "self-v",
GEN_AI_CONVERSATION_ID_KEY: "self-conv",
}
)

self.processor.on_end(span)

span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_NAME_KEY, "self-name")
span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_ID_KEY, "self-id")
span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_VERSION_KEY, "self-v")
span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY, "self-conv")
self.assertEqual(span.set_attribute.call_count, 4)
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "self-name")
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_ID_KEY], "self-id")
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_VERSION_KEY], "self-v")
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY], "self-conv")

def test_copies_only_present_attributes(self):
span = MagicMock()
span.attributes = {
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "only-name",
}
span = self._make_span(
{
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "only-name",
}
)

self.processor.on_end(span)

span.set_attribute.assert_called_once_with(GEN_AI_MAIN_AGENT_NAME_KEY, "only-name")
self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "only-name")
self.assertNotIn(GEN_AI_MAIN_AGENT_ID_KEY, span._attributes)

def test_noop_when_span_has_no_set_attribute(self):
# ReadableSpan-only objects (no ``set_attribute``) must not raise.
span = MagicMock(spec=["attributes"])
span.attributes = {
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "self-name",
}
def test_noop_when_span_has_no_internal_attributes(self):
# ReadableSpan-only objects without ``_attributes`` must not raise.
span = self._make_span(
{
GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME,
GEN_AI_AGENT_NAME_KEY: "self-name",
},
has_internal_attrs=False,
)

self.processor.on_end(span) # must not raise

self.assertFalse(hasattr(span, "set_attribute"))


class TestGenAIMainAgentSpanProcessorLifecycle(unittest.TestCase):
def test_shutdown_and_force_flush_are_noops(self):
Expand Down
Loading