Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions src/providers/anthropic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,12 +275,23 @@ impl Provider for AnthropicProvider {
}

let mut stream = response.bytes_stream();
let mut raw_buf: Vec<u8> = Vec::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let text = String::from_utf8_lossy(&chunk);
raw_buf.extend_from_slice(&chunk);

while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Flush trailing Anthropic SSE bytes at EOF

Like the OpenAI path, this loop now parses only newline-terminated records and never drains raw_buf after the stream ends. If the last Anthropic data: event arrives without a terminating newline, it remains buffered and is never deserialized, which drops the final text chunk. This behavior is newly introduced by the raw-byte buffering change and can truncate outputs in EOF-terminated streams.

Useful? React with 👍 / 👎.

let line_bytes = raw_buf.drain(..=newline_pos).collect::<Vec<u8>>();
let line = match std::str::from_utf8(&line_bytes) {
Ok(s) => s.trim().to_string(),
Err(_) => continue,
};

if line.is_empty() {
continue;
}

for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if let Ok(event) = serde_json::from_str::<AnthropicStreamEvent>(data) {
if event.event_type == "content_block_delta" {
Expand Down
32 changes: 19 additions & 13 deletions src/providers/gemini.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,16 +320,19 @@ impl Provider for GeminiProvider {
}

let mut stream = response.bytes_stream();
let mut buffer = String::new();
let mut raw_buf: Vec<u8> = Vec::new();

while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.push_str(&String::from_utf8_lossy(&chunk));
raw_buf.extend_from_slice(&chunk);

// Process complete lines from buffer
while let Some(newline_pos) = buffer.find('\n') {
let line = buffer[..newline_pos].trim().to_string();
buffer = buffer[newline_pos + 1..].to_string();
while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') {
let line_bytes = raw_buf.drain(..=newline_pos).collect::<Vec<u8>>();
let line = match std::str::from_utf8(&line_bytes) {
Ok(s) => s.trim().to_string(),
Err(_) => continue,
};

if line.is_empty() {
continue;
Expand All @@ -352,14 +355,17 @@ impl Provider for GeminiProvider {
}

// Process any remaining data in buffer after stream ends
if !buffer.trim().is_empty() {
if let Some(data) = buffer.trim().strip_prefix("data: ") {
if let Ok(response) = serde_json::from_str::<GeminiStreamResponse>(data) {
if let Some(candidates) = response.candidates {
for candidate in candidates {
for part in candidate.content.parts {
if let Some(text) = part.text {
callback(&text);
if !raw_buf.is_empty() {
if let Ok(line) = std::str::from_utf8(&raw_buf) {
let line = line.trim();
if let Some(data) = line.strip_prefix("data: ") {
if let Ok(response) = serde_json::from_str::<GeminiStreamResponse>(data) {
if let Some(candidates) = response.candidates {
for candidate in candidates {
for part in candidate.content.parts {
if let Some(text) = part.text {
callback(&text);
}
}
}
}
Expand Down
19 changes: 15 additions & 4 deletions src/providers/openai.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,15 +334,26 @@ impl Provider for OpenAIProvider {
}

let mut stream = response.bytes_stream();
let mut raw_buf: Vec<u8> = Vec::new();

while let Some(chunk) = stream.next().await {
'outer: while let Some(chunk) = stream.next().await {
let chunk = chunk?;
let text = String::from_utf8_lossy(&chunk);
raw_buf.extend_from_slice(&chunk);

while let Some(newline_pos) = raw_buf.iter().position(|&b| b == b'\n') {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Flush trailing OpenAI SSE bytes at EOF

The new parser in stream_with_options only processes frames when raw_buf contains a \n, so a final data: {...} line without a trailing newline is dropped when the HTTP stream closes. This can happen with EOF-terminated last lines (or intermediaries that trim the final newline), causing the last streamed tokens to be silently lost. The previous text.lines() behavior would still yield an unterminated final line, so this is a regression introduced by the newline-gated loop.

Useful? React with 👍 / 👎.

let line_bytes = raw_buf.drain(..=newline_pos).collect::<Vec<u8>>();
let line = match std::str::from_utf8(&line_bytes) {
Ok(s) => s.trim().to_string(),
Err(_) => continue,
};

if line.is_empty() {
continue;
}

for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
break;
break 'outer;
}

if let Ok(response) = serde_json::from_str::<OpenAIResponse>(data) {
Expand Down
Loading