From bf91d5e4ddb183f562e4a8af3b7b1cfa127b8ff1 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Thu, 14 May 2026 03:53:43 +0000 Subject: [PATCH] Fix SSE stream corruption by buffering raw bytes Previously, the AI provider streams (OpenAI, Anthropic, Gemini) converted individual network chunks directly to strings using `String::from_utf8_lossy`. This caused silent corruption when multi-byte UTF-8 characters or JSON lines were split across network boundaries, dropping text or causing deserialization failures. This commit fixes the issue by buffering raw `Vec` bytes and splitting on newlines (`\n`) before converting to UTF-8 and processing, ensuring payloads are always complete and intact. Co-authored-by: insign <1113045+insign@users.noreply.github.com> --- src/providers/anthropic.rs | 15 +++++++++++++-- src/providers/gemini.rs | 32 +++++++++++++++++++------------- src/providers/openai.rs | 19 +++++++++++++++---- 3 files changed, 47 insertions(+), 19 deletions(-) diff --git a/src/providers/anthropic.rs b/src/providers/anthropic.rs index 2ceb98a..1ced5a8 100644 --- a/src/providers/anthropic.rs +++ b/src/providers/anthropic.rs @@ -275,12 +275,23 @@ impl Provider for AnthropicProvider { } let mut stream = response.bytes_stream(); + let mut raw_buf: Vec = Vec::new(); while let Some(chunk) = stream.next().await { let chunk = chunk?; - let text = String::from_utf8_lossy(&chunk); + raw_buf.extend_from_slice(&chunk); + + while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') { + let line_bytes = raw_buf.drain(..=newline_pos).collect::>(); + let line = match std::str::from_utf8(&line_bytes) { + Ok(s) => s.trim().to_string(), + Err(_) => continue, + }; + + if line.is_empty() { + continue; + } - for line in text.lines() { if let Some(data) = line.strip_prefix("data: ") { if let Ok(event) = serde_json::from_str::(data) { if event.event_type == "content_block_delta" { diff --git a/src/providers/gemini.rs b/src/providers/gemini.rs index 5a09eff..a047a26 100644 --- a/src/providers/gemini.rs +++ b/src/providers/gemini.rs @@ -320,16 +320,19 @@ impl Provider for GeminiProvider { } let mut stream = response.bytes_stream(); - let mut buffer = String::new(); + let mut raw_buf: Vec = Vec::new(); while let Some(chunk) = stream.next().await { let chunk = chunk?; - buffer.push_str(&String::from_utf8_lossy(&chunk)); + raw_buf.extend_from_slice(&chunk); // Process complete lines from buffer - while let Some(newline_pos) = buffer.find('\n') { - let line = buffer[..newline_pos].trim().to_string(); - buffer = buffer[newline_pos + 1..].to_string(); + while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') { + let line_bytes = raw_buf.drain(..=newline_pos).collect::>(); + let line = match std::str::from_utf8(&line_bytes) { + Ok(s) => s.trim().to_string(), + Err(_) => continue, + }; if line.is_empty() { continue; @@ -352,14 +355,17 @@ impl Provider for GeminiProvider { } // Process any remaining data in buffer after stream ends - if !buffer.trim().is_empty() { - if let Some(data) = buffer.trim().strip_prefix("data: ") { - if let Ok(response) = serde_json::from_str::(data) { - if let Some(candidates) = response.candidates { - for candidate in candidates { - for part in candidate.content.parts { - if let Some(text) = part.text { - callback(&text); + if !raw_buf.is_empty() { + if let Ok(line) = std::str::from_utf8(&raw_buf) { + let line = line.trim(); + if let Some(data) = line.strip_prefix("data: ") { + if let Ok(response) = serde_json::from_str::(data) { + if let Some(candidates) = response.candidates { + for candidate in candidates { + for part in candidate.content.parts { + if let Some(text) = part.text { + callback(&text); + } } } } diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 3230a58..1685896 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -334,15 +334,26 @@ impl Provider for OpenAIProvider { } let mut stream = response.bytes_stream(); + let mut raw_buf: Vec = Vec::new(); - while let Some(chunk) = stream.next().await { + 'outer: while let Some(chunk) = stream.next().await { let chunk = chunk?; - let text = String::from_utf8_lossy(&chunk); + raw_buf.extend_from_slice(&chunk); + + while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') { + let line_bytes = raw_buf.drain(..=newline_pos).collect::>(); + let line = match std::str::from_utf8(&line_bytes) { + Ok(s) => s.trim().to_string(), + Err(_) => continue, + }; + + if line.is_empty() { + continue; + } - for line in text.lines() { if let Some(data) = line.strip_prefix("data: ") { if data == "[DONE]" { - break; + break 'outer; } if let Ok(response) = serde_json::from_str::(data) {