From b50c4dc683fec8cb494e8a11fcdadf75b2649a2a Mon Sep 17 00:00:00 2001 From: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Date: Mon, 8 Jun 2026 15:06:30 -0700 Subject: [PATCH] Fix microsoft.gen_ai.main_agent.* propagation and self-promotion bugs Fix three issues with main_agent attribute handling identified during MAF Python and LangChain Python testing: 1. on_end self-promotion never fired: ReadableSpan lacks set_attribute(), so the hasattr guard always bailed out. Fixed by writing directly to the internal _attributes (BoundedAttributes) mapping. 2. on_start inheritance missed late-set parent attributes: When a child span was created before the parent's gen_ai.agent.* attributes were set, on_start propagation silently skipped them. Fixed by storing the parent Span reference in on_start and re-reading from it during on_end as a fallback. 3. Nested LangChain agents had gen_ai.agent.name overwritten by the top-level agent's config: _resolve_agent_name() unconditionally read from the shared _agent_config, clobbering sub-agent identity. Fixed by adding a use_config flag and skipping config-based identity for nested agents (detected via _find_agent_ancestor). --- .../_genai/_langchain/_tracer.py | 48 ++++++---- .../_genai/main_agent/_processor.py | 60 +++++++++--- .../genai/main_agent/test_sdk_propagation.py | 66 ++++++++++++- tests/genai/main_agent/test_span_processor.py | 92 +++++++++++-------- 4 files changed, 195 insertions(+), 71 deletions(-) diff --git a/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py b/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py index 213afb33..8adbdcc2 100644 --- a/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py +++ b/src/microsoft/opentelemetry/_genai/_langchain/_tracer.py @@ -174,10 +174,14 @@ def _start_trace(self, run: Run) -> None: start_time_utc_nano = as_utc_nano(run.start_time) is_agent = self._is_agent_run(run) + # Nested agents (sub-agents with an agent ancestor) must NOT inherit + # their identity from the shared ``_agent_config`` — that describes + # the top-level agent only. + is_nested_agent = is_agent and self._find_agent_ancestor(run) is not None # Determine span name based on run type if is_agent: - agent_name = self._resolve_agent_name(run) + agent_name = self._resolve_agent_name(run, use_config=not is_nested_agent) span_name = ( f"{INVOKE_AGENT_OPERATION_NAME} {agent_name}" if agent_name @@ -193,7 +197,7 @@ def _start_trace(self, run: Run) -> None: # The inner span shows the framework name (e.g. "invoke_agent LangGraph"). wrapper_span: Span | None = None if is_agent: - agent_name = self._resolve_agent_name(run) + agent_name = self._resolve_agent_name(run, use_config=not is_nested_agent) wrapper_label = agent_name or run.name wrapper_span = self._tracer.start_span( name=f"{INVOKE_AGENT_OPERATION_NAME} {wrapper_label}", @@ -207,15 +211,18 @@ def _start_trace(self, run: Run) -> None: wrapper_span.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME) if agent_name: wrapper_span.set_attribute(GEN_AI_AGENT_NAME_KEY, agent_name) - agent_id = self._agent_config.get("agent_id") - if agent_id: - wrapper_span.set_attribute(GEN_AI_AGENT_ID_KEY, agent_id) - agent_desc = self._agent_config.get("agent_description") - if agent_desc: - wrapper_span.set_attribute(GEN_AI_AGENT_DESCRIPTION_KEY, agent_desc) - agent_version = self._agent_config.get("agent_version") - if agent_version: - wrapper_span.set_attribute(GEN_AI_AGENT_VERSION_KEY, agent_version) + # Apply agent identity from config only for the top-level agent. + # Nested agents derive identity solely from run metadata. + if not is_nested_agent: + agent_id = self._agent_config.get("agent_id") + if agent_id: + wrapper_span.set_attribute(GEN_AI_AGENT_ID_KEY, agent_id) + agent_desc = self._agent_config.get("agent_description") + if agent_desc: + wrapper_span.set_attribute(GEN_AI_AGENT_DESCRIPTION_KEY, agent_desc) + agent_version = self._agent_config.get("agent_version") + if agent_version: + wrapper_span.set_attribute(GEN_AI_AGENT_VERSION_KEY, agent_version) wrapper_span.set_attributes(dict(flatten(extract_agent_metadata(run)))) server_addr = self._agent_config.get("server_address") if server_addr: @@ -399,11 +406,20 @@ def _is_agent_run(self, run: Run) -> bool: return False return True - def _resolve_agent_name(self, run: Run) -> str | None: - """Resolve agent name from config override, run metadata, or run name.""" - # 1. Explicit config override - if name := self._agent_config.get("agent_name"): - return str(name) + def _resolve_agent_name(self, run: Run, *, use_config: bool = True) -> str | None: + """Resolve agent name from config override, run metadata, or run name. + + Args: + run: The LangChain run. + use_config: Whether to check ``_agent_config`` first. Pass + ``False`` for nested (sub-) agents so their identity is + derived from the run's own metadata rather than the shared + top-level config. + """ + # 1. Explicit config override (top-level agent only) + if use_config: + if name := self._agent_config.get("agent_name"): + return str(name) # 2. From run metadata (agent_name or lc_agent_name) if run.extra and isinstance(run.extra, dict): meta = run.extra.get("metadata") diff --git a/src/microsoft/opentelemetry/_genai/main_agent/_processor.py b/src/microsoft/opentelemetry/_genai/main_agent/_processor.py index 5e6f0ba4..5cb098a1 100644 --- a/src/microsoft/opentelemetry/_genai/main_agent/_processor.py +++ b/src/microsoft/opentelemetry/_genai/main_agent/_processor.py @@ -27,6 +27,7 @@ from opentelemetry.sdk._logs import LogRecordProcessor, ReadWriteLogRecord from opentelemetry.sdk.trace import ReadableSpan, Span from opentelemetry.sdk.trace.export import SpanProcessor +from opentelemetry.trace import Span as SpanAPI # Each row: (target attribute on current span, # primary source attribute on parent span, @@ -55,19 +56,33 @@ class GenAIMainAgentSpanProcessor(SpanProcessor): On ``on_start``: copies main-agent attributes from the parent span (or falls back to the parent's ``gen_ai.agent.*`` / ``gen_ai.conversation.id`` - attributes) onto the new span. + attributes) onto the new span. Also stores a reference to the parent + ``Span`` so that ``on_end`` can retry propagation for children whose + parent attributes were not yet available at ``on_start`` time. On ``on_end``: when the span is itself an ``invoke_agent`` operation and has not already been enriched, copies its own ``gen_ai.agent.*`` / ``gen_ai.conversation.id`` attributes onto ``microsoft.gen_ai.main_agent.*`` - so the top-level agent identifies itself as the main agent. + so the top-level agent identifies itself as the main agent. For other + spans that still lack ``microsoft.gen_ai.main_agent.*`` attributes, a + fallback read from the (now potentially enriched) parent is attempted. """ + def __init__(self) -> None: + # span-id → parent Span, used for on_end fallback propagation + self._parent_spans: dict[int, SpanAPI] = {} + def on_start(self, span: Span, parent_context: context_api.Context | None = None) -> None: parent = trace.get_current_span(parent_context) if not parent.get_span_context().is_valid: return + # Store parent reference for on_end fallback when on_start misses + # attributes that are set on the parent after this child is created. + span_ctx = span.get_span_context() + if span_ctx.is_valid: + self._parent_spans[span_ctx.span_id] = parent + parent_attributes = getattr(parent, "attributes", None) or {} for target, primary, fallback in _PROPAGATION_TABLE: value = parent_attributes.get(primary) @@ -77,23 +92,44 @@ def on_start(self, span: Span, parent_context: context_api.Context | None = None span.set_attribute(target, value) def on_end(self, span: ReadableSpan) -> None: + span_id = span.context.span_id + parent = self._parent_spans.pop(span_id, None) + attributes = span.attributes or {} - if attributes.get(GEN_AI_OPERATION_NAME_KEY) != INVOKE_AGENT_OPERATION_NAME: + + # Already enriched — nothing to do. + if any(k.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX) for k in attributes): return - for key in attributes: - if key.startswith(GEN_AI_MAIN_AGENT_ATTRIBUTE_PREFIX): - return + # Access the internal mutable attributes dict. ``on_end`` receives a + # ``ReadableSpan`` which lacks ``set_attribute``, so we write to the + # underlying ``BoundedAttributes`` mapping directly. + mutable = getattr(span, "_attributes", None) + if mutable is None: + return - if not hasattr(span, "set_attribute"): + # Self-promotion: top-level invoke_agent spans copy their own + # gen_ai.agent.* → microsoft.gen_ai.main_agent.* + if attributes.get(GEN_AI_OPERATION_NAME_KEY) == INVOKE_AGENT_OPERATION_NAME: + for target, source in _SELF_COPY_TABLE: + value = attributes.get(source) + if value is not None: + mutable[target] = value return - for target, source in _SELF_COPY_TABLE: - value = attributes.get(source) - if value is not None: - span.set_attribute(target, value) # type: ignore[attr-defined] + + # Fallback propagation: re-read from the parent span whose attributes + # may have been set after this child was created (timing issue). + if parent is not None: + parent_attributes = getattr(parent, "attributes", None) or {} + for target, primary, fallback in _PROPAGATION_TABLE: + value = parent_attributes.get(primary) + if value is None: + value = parent_attributes.get(fallback) + if value is not None: + mutable[target] = value def shutdown(self) -> None: - pass + self._parent_spans.clear() def force_flush(self, timeout_millis: int = 30000) -> bool: return True diff --git a/tests/genai/main_agent/test_sdk_propagation.py b/tests/genai/main_agent/test_sdk_propagation.py index 1de19a81..38a15701 100644 --- a/tests/genai/main_agent/test_sdk_propagation.py +++ b/tests/genai/main_agent/test_sdk_propagation.py @@ -207,13 +207,14 @@ def test_non_agent_parent_does_not_propagate(self): # ---- Timing: attributes set AFTER child creation → broken ---------------- - def test_attrs_set_after_child_creation_breaks_propagation(self): + def test_attrs_set_after_child_creation_recovered_on_end(self): """If agent attributes are set AFTER creating the child span, - on_start cannot propagate them. This is the timing bug.""" + on_start cannot propagate them — but on_end fallback re-reads + from the (now enriched) parent and fills in the gap.""" root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN) agent_span = self.tracer.start_span("invoke_agent TravelBot", context=root_ctx) - # BUG: create child BEFORE setting attributes on parent + # Create child BEFORE setting attributes on parent child_ctx = trace_api.set_span_in_context(agent_span) child = self.tracer.start_span("chat gpt-4", context=child_ctx) @@ -225,9 +226,10 @@ def test_attrs_set_after_child_creation_breaks_propagation(self): agent_span.end() spans = self._get_exported_spans() - self.assertIsNone( + self.assertEqual( spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), - "on_start fired before attrs were set — propagation must fail", + "TravelBot", + "on_end fallback should recover propagation from the parent", ) # ---- Partial attributes: only name set ----------------------------------- @@ -247,6 +249,60 @@ def test_partial_attributes_propagate(self): self.assertEqual(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "Bot") self.assertIsNone(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_ID_KEY)) + # ---- On-end self-promotion for root invoke_agent ------------------------- + + def test_root_invoke_agent_self_promotes_on_end(self): + """A root invoke_agent span with no parent must self-promote + its gen_ai.agent.* to microsoft.gen_ai.main_agent.* on end.""" + root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN) + agent = self.tracer.start_span("invoke_agent TravelBot", context=root_ctx) + agent.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME) + agent.set_attribute(GEN_AI_AGENT_NAME_KEY, "TravelBot") + agent.set_attribute(GEN_AI_AGENT_ID_KEY, "agent-1") + agent.set_attribute(GEN_AI_AGENT_VERSION_KEY, "2.0") + agent.set_attribute(GEN_AI_CONVERSATION_ID_KEY, "conv-1") + agent.end() + + spans = self._get_exported_spans() + exported = spans["invoke_agent TravelBot"] + self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "TravelBot") + self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_ID_KEY), "agent-1") + self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_VERSION_KEY), "2.0") + self.assertEqual(exported.attributes.get(GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY), "conv-1") + + def test_nested_invoke_agent_does_not_self_promote(self): + """A nested invoke_agent span enriched by on_start propagation + must NOT self-promote (main_agent.* already set from parent).""" + root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN) + main = self.tracer.start_span("invoke_agent MainBot", context=root_ctx) + main.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME) + main.set_attribute(GEN_AI_AGENT_NAME_KEY, "MainBot") + main.set_attribute(GEN_AI_AGENT_ID_KEY, "main-1") + + sub_ctx = trace_api.set_span_in_context(main) + sub = self.tracer.start_span("invoke_agent SubBot", context=sub_ctx) + sub.set_attribute(GEN_AI_OPERATION_NAME_KEY, INVOKE_AGENT_OPERATION_NAME) + sub.set_attribute(GEN_AI_AGENT_NAME_KEY, "SubBot") + sub.set_attribute(GEN_AI_AGENT_ID_KEY, "sub-1") + sub.end() + main.end() + + spans = self._get_exported_spans() + sub_exported = spans["invoke_agent SubBot"] + # main_agent must be MainBot (from parent), not SubBot (own) + self.assertEqual(sub_exported.attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY), "MainBot") + self.assertEqual(sub_exported.attributes.get(GEN_AI_MAIN_AGENT_ID_KEY), "main-1") + + def test_self_promotion_only_for_invoke_agent(self): + """Non-invoke_agent root spans must NOT self-promote.""" + root_ctx = trace_api.set_span_in_context(trace_api.INVALID_SPAN) + chat = self.tracer.start_span("chat gpt-4", context=root_ctx) + chat.set_attribute(GEN_AI_AGENT_NAME_KEY, "Bot") + chat.end() + + spans = self._get_exported_spans() + self.assertIsNone(spans["chat gpt-4"].attributes.get(GEN_AI_MAIN_AGENT_NAME_KEY)) + if __name__ == "__main__": unittest.main() diff --git a/tests/genai/main_agent/test_span_processor.py b/tests/genai/main_agent/test_span_processor.py index e9d37c2a..ebb0dc9f 100644 --- a/tests/genai/main_agent/test_span_processor.py +++ b/tests/genai/main_agent/test_span_processor.py @@ -122,67 +122,83 @@ class TestGenAIMainAgentSpanProcessorOnEnd(unittest.TestCase): def setUp(self) -> None: self.processor = GenAIMainAgentSpanProcessor() - def test_skipped_when_not_invoke_agent(self): + @staticmethod + def _make_span(attributes: dict, *, has_internal_attrs: bool = True): + """Build a mock ReadableSpan with context.span_id and _attributes.""" span = MagicMock() - span.attributes = {GEN_AI_OPERATION_NAME_KEY: "chat"} + span.attributes = dict(attributes) + span.context.span_id = id(span) # unique per mock + if has_internal_attrs: + span._attributes = dict(attributes) + else: + del span._attributes + return span + + def test_skipped_when_not_invoke_agent(self): + span = self._make_span({GEN_AI_OPERATION_NAME_KEY: "chat"}) self.processor.on_end(span) - span.set_attribute.assert_not_called() + # _attributes must remain unchanged + self.assertNotIn(GEN_AI_MAIN_AGENT_NAME_KEY, span._attributes) def test_skipped_when_main_agent_attribute_already_present(self): - span = MagicMock() - span.attributes = { - GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, - GEN_AI_MAIN_AGENT_NAME_KEY: "already-set", - GEN_AI_AGENT_NAME_KEY: "self", - } + span = self._make_span( + { + GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, + GEN_AI_MAIN_AGENT_NAME_KEY: "already-set", + GEN_AI_AGENT_NAME_KEY: "self", + } + ) self.processor.on_end(span) - span.set_attribute.assert_not_called() + # Must keep the existing value, not overwrite with "self" + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "already-set") def test_copies_self_attributes_when_invoke_agent_and_unenriched(self): - span = MagicMock() - span.attributes = { - GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, - GEN_AI_AGENT_NAME_KEY: "self-name", - GEN_AI_AGENT_ID_KEY: "self-id", - GEN_AI_AGENT_VERSION_KEY: "self-v", - GEN_AI_CONVERSATION_ID_KEY: "self-conv", - } + span = self._make_span( + { + GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, + GEN_AI_AGENT_NAME_KEY: "self-name", + GEN_AI_AGENT_ID_KEY: "self-id", + GEN_AI_AGENT_VERSION_KEY: "self-v", + GEN_AI_CONVERSATION_ID_KEY: "self-conv", + } + ) self.processor.on_end(span) - span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_NAME_KEY, "self-name") - span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_ID_KEY, "self-id") - span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_VERSION_KEY, "self-v") - span.set_attribute.assert_any_call(GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY, "self-conv") - self.assertEqual(span.set_attribute.call_count, 4) + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "self-name") + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_ID_KEY], "self-id") + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_VERSION_KEY], "self-v") + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_CONVERSATION_ID_KEY], "self-conv") def test_copies_only_present_attributes(self): - span = MagicMock() - span.attributes = { - GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, - GEN_AI_AGENT_NAME_KEY: "only-name", - } + span = self._make_span( + { + GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, + GEN_AI_AGENT_NAME_KEY: "only-name", + } + ) self.processor.on_end(span) - span.set_attribute.assert_called_once_with(GEN_AI_MAIN_AGENT_NAME_KEY, "only-name") + self.assertEqual(span._attributes[GEN_AI_MAIN_AGENT_NAME_KEY], "only-name") + self.assertNotIn(GEN_AI_MAIN_AGENT_ID_KEY, span._attributes) - def test_noop_when_span_has_no_set_attribute(self): - # ReadableSpan-only objects (no ``set_attribute``) must not raise. - span = MagicMock(spec=["attributes"]) - span.attributes = { - GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, - GEN_AI_AGENT_NAME_KEY: "self-name", - } + def test_noop_when_span_has_no_internal_attributes(self): + # ReadableSpan-only objects without ``_attributes`` must not raise. + span = self._make_span( + { + GEN_AI_OPERATION_NAME_KEY: INVOKE_AGENT_OPERATION_NAME, + GEN_AI_AGENT_NAME_KEY: "self-name", + }, + has_internal_attrs=False, + ) self.processor.on_end(span) # must not raise - self.assertFalse(hasattr(span, "set_attribute")) - class TestGenAIMainAgentSpanProcessorLifecycle(unittest.TestCase): def test_shutdown_and_force_flush_are_noops(self):