22
33Manages the complete lifecycle of streaming agent responses:
44- Stream/call agent with query
5- - Process events through parsers (event parser, harmony processor )
6- - Render output via ResponseRenderer
5+ - Let agent library handle natural output (no interception )
6+ - Silently collect text for session history
77- Track metrics (tokens, duration, cycles, tools)
88- Save to session state
99- Play audio notifications
1212"""
1313
1414import asyncio
15- import io
1615import logging
17- import sys
1816import time
1917from datetime import datetime
2018from typing import TYPE_CHECKING , Any , Optional
2624
2725if TYPE_CHECKING :
2826 from .audio_notifier import AudioNotifier
29- from .harmony_processor import HarmonyProcessor
3027 from .session_state import SessionState
3128 from .status_bar import StatusBar
3229 from .ui_components import Colors
@@ -51,15 +48,14 @@ def _serialize_for_logging(obj: Any) -> str:
5148
5249
5350class ResponseStreamer :
54- """Handles streaming agent responses and processing output .
51+ """Handles streaming agent responses with minimal intervention .
5552
5653 Coordinates multiple components to:
57- - Stream responses from agent
58- - Parse streaming events
59- - Process through Harmony (if enabled)
60- - Render to terminal
54+ - Stream responses from agent (letting library handle output naturally)
55+ - Silently collect text for session history
6156 - Track tokens and metrics
6257 - Save conversation history
58+ - Display UI elements (thinking, agent header, stats)
6359 """
6460
6561 def __init__ (
@@ -76,16 +72,14 @@ def __init__(
7672 show_thinking : bool = True ,
7773 show_duration : bool = True ,
7874 show_tokens : bool = True ,
79- harmony_processor : Optional ["HarmonyProcessor" ] = None ,
8075 status_bar : Optional ["StatusBar" ] = None ,
81- suppress_agent_stdout : bool = True ,
8276 ):
8377 """Initialize the response streamer.
8478
8579 Args:
8680 agent: The agent instance to query
8781 agent_name: Name of the agent for display
88- response_renderer: Renderer for displaying responses
82+ response_renderer: Renderer for UI elements (agent header)
8983 event_parser: Parser for streaming events
9084 session_state: Session state for tracking conversation
9185 usage_extractor: Extractor for token usage from responses
@@ -95,10 +89,7 @@ def __init__(
9589 show_thinking: Whether to show thinking indicator
9690 show_duration: Whether to show query duration
9791 show_tokens: Whether to show token usage
98- harmony_processor: Optional Harmony processor for OpenAI format
9992 status_bar: Optional status bar for real-time updates
100- suppress_agent_stdout: Whether to suppress agent library stdout
101- during streaming
10293 """
10394 self .agent = agent
10495 self .agent_name = agent_name
@@ -112,9 +103,7 @@ def __init__(
112103 self .show_thinking = show_thinking
113104 self .show_duration = show_duration
114105 self .show_tokens = show_tokens
115- self .harmony_processor = harmony_processor
116106 self .status_bar = status_bar
117- self .suppress_agent_stdout = suppress_agent_stdout
118107
119108 # Token tracking for session (matches chat_loop.py behavior)
120109 self .total_input_tokens = 0
@@ -125,7 +114,6 @@ def __init__(
125114 logger .debug (f" show_thinking: { show_thinking } " )
126115 logger .debug (f" show_duration: { show_duration } " )
127116 logger .debug (f" show_tokens: { show_tokens } " )
128- logger .debug (f" harmony: { harmony_processor is not None } " )
129117 logger .debug (f" status_bar: { status_bar is not None } " )
130118
131119 async def _show_thinking_indicator (self , stop_event : asyncio .Event ) -> None :
@@ -189,54 +177,28 @@ async def stream_agent_response(
189177
190178 # Check if agent supports streaming
191179 if hasattr (self .agent , "stream_async" ):
192- # WORKAROUND: Suppress stdout during streaming to prevent
193- # agent libraries from printing accumulated response text as
194- # a side effect (discovered in beta.8 diagnostics - text
195- # appears between event loop iterations)
196- # NOTE: Suppression is only active BETWEEN iterations (during
197- # yield back to stream_async). During our event processing,
198- # stdout is restored so tool calls and logging work normally.
199- old_stdout = None
200- if self .suppress_agent_stdout :
201- old_stdout = sys .stdout
202- sys .stdout = io .StringIO ()
203-
204- try :
205- async for event in self .agent .stream_async (query ):
206- # Restore stdout for our controlled output and logging
207- if self .suppress_agent_stdout :
208- sys .stdout = old_stdout
209-
210- # Store last event for token extraction
211- response_obj = event
212-
213- # Log streaming event received from agent
214- logger .debug ("STREAMING EVENT FROM AGENT:" )
215- logger .debug (_serialize_for_logging (event ))
216-
217- # Stop thinking indicator on first token
218- if not first_token_received :
219- stop_thinking .set ()
220- if thinking_task :
221- await thinking_task
222- first_token_received = True
223-
224- # Extract text from streaming event using event parser
225- text_to_add = self .event_parser .parse_event (event )
226-
227- # Append text if found and display it
228- if text_to_add :
229- response_text .append (text_to_add )
230- # Display streaming text (renderer handles skip logic)
231- self .response_renderer .render_streaming_text (text_to_add )
232-
233- # Suppress stdout again before yielding back to stream_async
234- if self .suppress_agent_stdout :
235- sys .stdout = io .StringIO ()
236- finally :
237- # Always restore stdout
238- if self .suppress_agent_stdout and old_stdout is not None :
239- sys .stdout = old_stdout
180+ # Let agent library handle all output naturally
181+ # We just collect text silently for session history
182+ async for event in self .agent .stream_async (query ):
183+ # Store last event for token extraction
184+ response_obj = event
185+
186+ # Log streaming event received from agent
187+ logger .debug ("STREAMING EVENT FROM AGENT:" )
188+ logger .debug (_serialize_for_logging (event ))
189+
190+ # Stop thinking indicator on first token
191+ if not first_token_received :
192+ stop_thinking .set ()
193+ if thinking_task :
194+ await thinking_task
195+ first_token_received = True
196+
197+ # Extract text from streaming event for session history
198+ # (don't display - agent library handles that)
199+ text_to_add = self .event_parser .parse_event (event )
200+ if text_to_add :
201+ response_text .append (text_to_add )
240202 else :
241203 # Fallback to non-streaming call if streaming not supported
242204 response = await asyncio .get_event_loop ().run_in_executor (
@@ -254,7 +216,7 @@ async def stream_agent_response(
254216 if thinking_task :
255217 await thinking_task
256218
257- # Format and display response
219+ # Extract text from response for history
258220 if hasattr (response , "message" ):
259221 message = response .message
260222 if isinstance (message , dict ) and "content" in message :
@@ -272,76 +234,20 @@ async def stream_agent_response(
272234 else :
273235 response_text .append (str (response ))
274236
237+ # For non-streaming, agent doesn't print, so we need to
238+ print ("" .join (response_text ))
239+
275240 # Log final response object (for streaming, this is the last event)
276241 if hasattr (self .agent , "stream_async" ):
277242 logger .debug ("FINAL RESPONSE OBJECT (last streaming event):" )
278243 logger .debug (_serialize_for_logging (response_obj ))
279244 logger .debug ("=" * 60 )
280245
281- # Render collected response
282- # Concatenate streaming chunks directly (they may break mid-word)
246+ # Collect full response text for session history
283247 full_response = "" .join (response_text )
284248
285- # Track if we already printed during streaming (to prevent duplicates)
286- # Use renderer's method to determine if streaming was skipped
287- # (skipped means we need to print in final response)
288- already_printed_streaming = (
289- first_token_received
290- and not self .response_renderer .should_skip_streaming_display ()
291- )
292-
293- # Process through Harmony if available
294- display_text = full_response
295- if self .harmony_processor :
296- # Debug: Log response object structure
297- # (safely handle mocks/test objects)
298- try :
299- logger .debug (f"Response object type: { type (response_obj )} " )
300- logger .debug (f"Response object attrs: { dir (response_obj )[:20 ]} " )
301- if response_obj and hasattr (response_obj , "choices" ):
302- try :
303- logger .debug (
304- f"Response has choices: { len (response_obj .choices )} "
305- )
306- except TypeError :
307- logger .debug (
308- "Response has choices attribute (non-sequence)"
309- )
310-
311- if response_obj .choices :
312- choice = response_obj .choices [0 ]
313- logger .debug (f"Choice type: { type (choice )} " )
314- logger .debug (f"Choice attrs: { dir (choice )[:20 ]} " )
315- if hasattr (choice , "logprobs" ):
316- logger .debug (
317- f"Has logprobs: { choice .logprobs is not None } "
318- )
319- if hasattr (choice , "message" ):
320- logger .debug (f"Message type: { type (choice .message )} " )
321- except Exception as e :
322- logger .debug (
323- f"Error logging response structure (safe to ignore): { e } "
324- )
325-
326- processed = self .harmony_processor .process_response (
327- full_response , metadata = response_obj
328- )
329- display_text = self .harmony_processor .format_for_display (processed )
330-
331- # Log if Harmony-specific features detected
332- if processed .get ("has_reasoning" ):
333- logger .debug ("Harmony response contains reasoning" )
334- if processed .get ("has_tools" ):
335- logger .debug ("Harmony response contains tool calls" )
336-
337- # Store last response for copy commands (what user sees)
338- self .session_state .update_last_response (display_text )
339-
340- # Render final response (only if not already printed during streaming)
341- if not already_printed_streaming :
342- self .response_renderer .render_final_response (
343- display_text = display_text , first_token_received = first_token_received
344- )
249+ # Store last response for copy commands
250+ self .session_state .update_last_response (full_response )
345251
346252 duration = time .time () - start_time
347253
@@ -433,7 +339,7 @@ async def stream_agent_response(
433339 md_entry = [
434340 f"\n ## Query { query_num } ({ entry_timestamp } )\n " ,
435341 f"**You:** { query } \n \n " ,
436- f"**{ self .agent_name } :** { display_text } \n \n " ,
342+ f"**{ self .agent_name } :** { full_response } \n \n " ,
437343 ]
438344
439345 # Add metadata
0 commit comments