Skip to content

fix: reliable final-response delivery from sub-agents#52

Open
zyma wants to merge 1 commit into
ringier-data:mainfrom
Authrion:fix/subagent-final-response
Open

fix: reliable final-response delivery from sub-agents#52
zyma wants to merge 1 commit into
ringier-data:mainfrom
Authrion:fix/subagent-final-response

Conversation

@zyma
Copy link
Copy Markdown
Contributor

@zyma zyma commented Jun 5, 2026

Problem: Several paths could send the client an empty completed response:

The first streaming chunk was always sent with append=True, so the A2A SDK dropped it (nonexistent artifact index) and short replies like "Hello!" never arrived.
The terminal completed status carried no message body - if a client failed to parse the streamed artifact frame, the user saw nothing.

Checklist

  • I have performed a self-review of my code
  • If it is a core feature and/or a bugfix, I have added thorough tests.
  • Does this PR introduce a breaking change? 🚨

context_id=task.context_id,
task_id=task.id,
)
await updater.update_status(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (high): HITL branch never closes an open streaming artifact

When first_chunk_sent=True and a HITL interrupt fires, the artifact stream is left permanently open — no add_artifact(..., last_chunk=True) is called before this update_status. The non-HITL else branch (lines 999–1010) correctly seals the artifact first; this branch needs the same:

                if first_chunk_sent:
                    await updater.add_artifact(
                        [Part(root=TextPart(text=""))],
                        artifact_id=streaming_artifact_id,
                        append=True,
                        last_chunk=True,
                        metadata={},
                    )
                await updater.update_status(
                    TaskState.input_required,
                    msg,
                )

Note: do not add final=True here — see comments on lines 1019 and 1059.

await updater.update_status(
TaskState.input_required,
msg,
final=True,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup: final=True is deprecated in A2A v1.x — remove it

A2A spec PR #1308 removes final from TaskStatusUpdateEvent entirely as redundant — stream termination should be inferred from the task state, not an explicit flag. The HITL branch (line 971) already omits final=True and is the more forward-compatible pattern. Callers should never rely on setting final=True to close the stream.

The stream closes cleanly when execute() returns and the framework calls queue.close(). Remove final=True here for consistency with the HITL branch and the v1.x spec direction.

task.context_id,
task.id,
),
final=True,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup: final=True is deprecated in A2A v1.x — remove it

A2A spec PR #1308 removes final from TaskStatusUpdateEvent entirely as redundant — stream termination should be inferred from the task state, not an explicit flag. The HITL branch (line 971) already omits final=True and is the more forward-compatible pattern. Callers should never rely on setting final=True to close the stream.

Same as line 1019: remove final=True here.

subagent_content = None
# Diagnostic: per-subagent-call info for enriched warning
# Entries: {"tool_call_id", "subagent_type", "has_tool_message", "extracted_length"}
subagent_call_diag: list[dict] = []
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup: remove the diagnostic scaffolding — it's debugging code, not production logic

subagent_call_diag, diag_by_id, the forward pass over AIMessages to collect them, and the full O(N) walk after subagent_content is found all exist only to produce a richer warning log line. The actual functional change — skip empty ToolMessage extractions instead of stopping at the first match — is straightforward and doesn't need any of it:

for msg in reversed(current_turn_messages):
    if not isinstance(msg, ToolMessage):
        continue
    tool_call = tool_call_map.get(msg.tool_call_id)
    if not (tool_call and tool_call.get("name") == "task"):
        continue
    try:
        if isinstance(msg.content, str):
            parsed = json.loads(msg.content)
            if isinstance(parsed, dict):
                extracted = parsed.get("message", msg.content)
            elif isinstance(parsed, list):
                extracted = extract_text_from_content(parsed)[0]
            else:
                extracted = msg.content
        elif isinstance(msg.content, list):
            extracted = extract_text_from_content(msg.content)[0]
        else:
            extracted = msg.content
    except json.JSONDecodeError:
        extracted = msg.content
    if isinstance(extracted, str) and extracted:
        subagent_content = extracted
        logger.info(...)
        break

if isinstance(parsed_content, dict):
extracted = parsed_content.get("message", msg.content)
elif isinstance(parsed_content, list):
extracted = extract_text_from_content(parsed_content)[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (latent): IndexError from [0] not caught by the json.JSONDecodeError handler

The try/except only catches json.JSONDecodeError. If extract_text_from_content ever returns an empty sequence the [0] raises IndexError, propagating uncaught and aborting stream_handler with a 500. Same issue at line 427.

extracted = (extract_text_from_content(parsed_content) or [""])[0]

msg,
)
status_metadata = dict(metadata) if metadata else {}
if first_chunk_sent:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup: input_required and auth_required terminal branches duplicate ~20 lines verbatim

Both share the same pattern: copy metadata → optional artifact-close + log → set final_answer_source: fallbackupdate_status. When the log format or key name changes, one branch will be missed.

Consider a _close_streaming_artifact_and_respond(...) helper.

# a short greeting like "hi" → "Hello!"), trust the LLM's completed
# state — otherwise stale blocked entries from earlier turns in
# a2a_tracking would suppress the final response.
if not recently_called:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup (altitude): outer if not recently_called guard duplicates an invariant already inside _check_all_agents_blocked

_check_all_agents_blocked already returns (False, None) for an empty set (line 121), so the outer guard is redundant. If that inner behaviour ever changes, this outer guard silently swallows it. Consider removing it and letting _check_all_agents_blocked own the invariant.

# go to a separate "-thought" artifact and aren't part of the
# main response stream the client renders).
if not metadata.get("intermediate_output") and item.content:
streamed_bytes += len(item.content)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup: streamed_bytes counts Unicode code-points, not UTF-8 bytes

len(item.content) returns character count. For CJK or emoji content each character is 3–4 wire bytes. Rename to streamed_chars or use len(item.content.encode()).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants