fix(llm): preserve fragmented agent tool calls#10988
Conversation
Signed-off-by: Matej Kosec <mkosec@nvidia.com>
Signed-off-by: Matej Kosec <mkosec@nvidia.com>
WalkthroughAdds buffered tool-call handling in Anthropic and OpenAI stream converters, and adds shared HTTP replay harnesses, scripted engine support, and replay tests/fixtures for Anthropic messages and OpenAI Responses compatibility. ChangesTool-call streaming and replay coverage
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/llm/src/protocols/openai/responses/stream_converter.rs (1)
306-420: 🗄️ Data Integrity & Integration | 🟠 Major | ⚡ Quick winNormalize legacy
delta.function_callchunks before closing onFunctionCall.Line 418 handles
FinishReason::FunctionCall, but Lines 306-416 only recorddelta.tool_calls. A backend that streams the legacyfunction_calldelta shape will reach the finish reason with noFunctionCallState, so the Responses stream drops the call instead of emittingresponse.output_item.added/argument/done events. Please routedelta.function_callthrough the same buffering path, likely as index0with a generated call id when the legacy shape has none. Based on PR objectives, legacy function-call finish behavior is part of this change.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/llm/src/protocols/openai/responses/stream_converter.rs` around lines 306 - 420, The stream conversion in ResponseStreamConverter currently buffers only delta.tool_calls, so legacy delta.function_call chunks are lost before FinishReason::FunctionCall closes the item. Update the buffering logic in the tool-call handling path to also normalize delta.function_call into the same FunctionCallState flow, likely using index 0 and generating a call_id when missing, so the existing response.output_item.added and argument/done event emission still occurs for legacy callers.
🧹 Nitpick comments (2)
lib/llm/src/protocols/anthropic/stream_converter.rs (1)
98-124: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low value
record_tool_callbuffering is correct;startedis effectively redundant.
startedis set wheneverid/name/argumentsare present, butis_emit_ready()already gates on non-emptyidandname, so a state can never beis_emit_ready()withoutstarted == true. Harmless, but the flag adds no signal beyond the id/name checks.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/llm/src/protocols/anthropic/stream_converter.rs` around lines 98 - 124, The `record_tool_call` state machine in `stream_converter.rs` uses `started` redundantly, since `is_emit_ready()` already depends on the presence of `id` and `name`. Update `ToolCallState` handling and `record_tool_call` so the readiness logic relies on the actual fields (`id`, `name`, and `argument_fragments`) instead of maintaining `started`, and remove any remaining writes/checks tied to that flag.lib/llm/tests/data/replays/agent-harness/README.md (1)
3-5: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winAdd the public issue reference for
DYN-2764if one exists.External readers cannot resolve the Linear ID on its own. Please add the matching GitHub issue reference/link next to
DYN-2764(or replace it if the Linear ID is no longer needed). As per coding guidelines, Markdown docs should "prefer to also include the matching GitHub link when one exists".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@lib/llm/tests/data/replays/agent-harness/README.md` around lines 3 - 5, The README for the agent-harness replay fixtures references DYN-2764 but does not include a resolvable public issue link. Update the text in this document to add the matching GitHub issue reference/link next to the DYN-2764 mention, or replace the Linear ID if it is no longer needed, so external readers can follow it; keep the existing mention of PR `#8284` intact.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/llm/tests/anthropic_http_replay.rs`:
- Around line 219-239: The test in `anthropic_http_replay.rs` is asserting the
wrong streaming order by requiring `message_delta` to be absent before
`gate.release()`. Update the `saw_message_delta` expectation in the
`tokio::time::timeout` loop around the `parser.push` handling so it allows a
finish-driven `message_delta` to appear before the gated usage tail, while still
verifying `content_block_stop` is observed before releasing the gate.
---
Outside diff comments:
In `@lib/llm/src/protocols/openai/responses/stream_converter.rs`:
- Around line 306-420: The stream conversion in ResponseStreamConverter
currently buffers only delta.tool_calls, so legacy delta.function_call chunks
are lost before FinishReason::FunctionCall closes the item. Update the buffering
logic in the tool-call handling path to also normalize delta.function_call into
the same FunctionCallState flow, likely using index 0 and generating a call_id
when missing, so the existing response.output_item.added and argument/done event
emission still occurs for legacy callers.
---
Nitpick comments:
In `@lib/llm/src/protocols/anthropic/stream_converter.rs`:
- Around line 98-124: The `record_tool_call` state machine in
`stream_converter.rs` uses `started` redundantly, since `is_emit_ready()`
already depends on the presence of `id` and `name`. Update `ToolCallState`
handling and `record_tool_call` so the readiness logic relies on the actual
fields (`id`, `name`, and `argument_fragments`) instead of maintaining
`started`, and remove any remaining writes/checks tied to that flag.
In `@lib/llm/tests/data/replays/agent-harness/README.md`:
- Around line 3-5: The README for the agent-harness replay fixtures references
DYN-2764 but does not include a resolvable public issue link. Update the text in
this document to add the matching GitHub issue reference/link next to the
DYN-2764 mention, or replace the Linear ID if it is no longer needed, so
external readers can follow it; keep the existing mention of PR `#8284` intact.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Enterprise
Run ID: 6f33e136-09ad-4cbd-95a4-7484d0d2d6e9
⛔ Files ignored due to path filters (4)
lib/llm/tests/snapshots/anthropic_http_replay__anthropic_streaming_text.snapis excluded by!**/*.snaplib/llm/tests/snapshots/anthropic_http_replay__anthropic_unary_text.snapis excluded by!**/*.snaplib/llm/tests/snapshots/responses_http_replay__responses_streaming_text.snapis excluded by!**/*.snaplib/llm/tests/snapshots/responses_http_replay__responses_unary_text.snapis excluded by!**/*.snap
📒 Files selected for processing (12)
lib/llm/src/protocols/anthropic/stream_converter.rslib/llm/src/protocols/openai/chat_completions/tool_parser_v2.rslib/llm/src/protocols/openai/responses/stream_converter.rslib/llm/tests/anthropic_http_replay.rslib/llm/tests/common/http_harness.rslib/llm/tests/common/scripted_chat_engine.rslib/llm/tests/data/replays/agent-harness/README.mdlib/llm/tests/data/replays/agent-harness/fragmented-tool.sselib/llm/tests/data/replays/agent-harness/parallel-tools.sselib/llm/tests/data/replays/agent-harness/text.sselib/llm/tests/data/replays/agent-harness/thinking-tool.sselib/llm/tests/responses_http_replay.rs
| let mut saw_tool_stop = false; | ||
| let mut saw_message_delta = false; | ||
|
|
||
| tokio::time::timeout(Duration::from_secs(2), async { | ||
| while !saw_tool_stop { | ||
| let bytes = body | ||
| .next() | ||
| .await | ||
| .expect("response ended before tool block completion") | ||
| .expect("failed to read response SSE bytes"); | ||
| for event in parser.push(&bytes).expect("failed to parse response SSE") { | ||
| saw_tool_stop |= event == "content_block_stop"; | ||
| saw_message_delta |= event == "message_delta"; | ||
| } | ||
| } | ||
| }) | ||
| .await | ||
| .expect("tool block completion did not arrive before the gated usage tail"); | ||
|
|
||
| assert!(!saw_message_delta); | ||
| gate.release(); |
There was a problem hiding this comment.
🎯 Functional Correctness | 🟠 Major | ⚡ Quick win
Don't assert that message_delta is delayed behind the usage tail.
Because the gate starts at the first usage chunk, a finish-driven message_delta should already be observable before gate.release(). Line 238 currently locks in the opposite contract and would fail once the stream publishes the stop reason immediately, which is the behavior described in this PR.
Suggested fix
let mut body = response.bytes_stream();
let mut parser = IncrementalSseParser::default();
let mut saw_tool_stop = false;
let mut saw_message_delta = false;
tokio::time::timeout(Duration::from_secs(2), async {
- while !saw_tool_stop {
+ while !saw_tool_stop || !saw_message_delta {
let bytes = body
.next()
.await
.expect("response ended before tool block completion")
.expect("failed to read response SSE bytes");
for event in parser.push(&bytes).expect("failed to parse response SSE") {
saw_tool_stop |= event == "content_block_stop";
saw_message_delta |= event == "message_delta";
}
}
})
.await
- .expect("tool block completion did not arrive before the gated usage tail");
+ .expect("tool completion did not arrive before the gated usage tail");
- assert!(!saw_message_delta);
+ assert!(saw_message_delta);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let mut saw_tool_stop = false; | |
| let mut saw_message_delta = false; | |
| tokio::time::timeout(Duration::from_secs(2), async { | |
| while !saw_tool_stop { | |
| let bytes = body | |
| .next() | |
| .await | |
| .expect("response ended before tool block completion") | |
| .expect("failed to read response SSE bytes"); | |
| for event in parser.push(&bytes).expect("failed to parse response SSE") { | |
| saw_tool_stop |= event == "content_block_stop"; | |
| saw_message_delta |= event == "message_delta"; | |
| } | |
| } | |
| }) | |
| .await | |
| .expect("tool block completion did not arrive before the gated usage tail"); | |
| assert!(!saw_message_delta); | |
| gate.release(); | |
| let mut saw_tool_stop = false; | |
| let mut saw_message_delta = false; | |
| tokio::time::timeout(Duration::from_secs(2), async { | |
| while !saw_tool_stop || !saw_message_delta { | |
| let bytes = body | |
| .next() | |
| .await | |
| .expect("response ended before tool block completion") | |
| .expect("failed to read response SSE bytes"); | |
| for event in parser.push(&bytes).expect("failed to parse response SSE") { | |
| saw_tool_stop |= event == "content_block_stop"; | |
| saw_message_delta |= event == "message_delta"; | |
| } | |
| } | |
| }) | |
| .await | |
| .expect("tool completion did not arrive before the gated usage tail"); | |
| assert!(saw_message_delta); | |
| gate.release(); |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/llm/tests/anthropic_http_replay.rs` around lines 219 - 239, The test in
`anthropic_http_replay.rs` is asserting the wrong streaming order by requiring
`message_delta` to be absent before `gate.release()`. Update the
`saw_message_delta` expectation in the `tokio::time::timeout` loop around the
`parser.push` handling so it allows a finish-driven `message_delta` to appear
before the gated usage tail, while still verifying `content_block_stop` is
observed before releasing the gate.
Summary
tool_callsor legacyfunction_callfinish reason, then publish completion events immediately; retain end-of-stream completion only for backends that omit the finish reason.The replay fixtures are minimized reconstructions of recorded event ordering rather than backend-specific captures. The harness uses the production HTTP handlers and converters without a model, GPU, Docker, timing delay, or new dependency.
Validation
Run on
computelab-build:cargo fmt --all -- --checkcargo clippy -p dynamo-llm --tests -- -D warningscargo test -p dynamo-llm --lib stream_converter::tests— 29 passedINSTA_UPDATE=no cargo test -p dynamo-llm --test anthropic_http_replay --test responses_http_replay— 13 passedReviewer guidance
The finish-reason chunk is the first explicit indication that all tool-call fragments are complete. Usage-only chunks may follow it, so tool completion is emitted on the finish reason while the usage-bearing terminal response event remains at end of stream.
This supersedes #8284 and #10985.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation