@@ -49,6 +49,7 @@ pub async fn process_stream(
4949 on_tool_start : impl FnMut ( & str , & str ) ,
5050 on_tool_complete : impl FnMut ( & str , & str , Duration , Option < & str > ) ,
5151 on_usage : impl FnMut ( u64 , u64 ) ,
52+ on_reasoning : impl FnMut ( & str , & str , & BTreeMap < String , serde_json:: Value > ) ,
5253 on_raw_event : impl FnMut ( & str , & str ) ,
5354 on_orchestrator_event : impl FnMut ( & str , & serde_json:: Value ) ,
5455) -> Result < StreamResult > {
@@ -61,6 +62,7 @@ pub async fn process_stream(
6162 on_tool_start,
6263 on_tool_complete,
6364 on_usage,
65+ on_reasoning,
6466 on_raw_event,
6567 on_orchestrator_event,
6668 )
@@ -78,6 +80,7 @@ pub async fn process_stream(
7880/// - `on_tool_start`: called when a tool begins execution with (tool_id, tool_name)
7981/// - `on_tool_complete`: called when a tool finishes with (tool_id, tool_name, duration, result)
8082/// - `on_usage`: called with (prompt_tokens, completion_tokens) from aura.usage events
83+ /// - `on_reasoning`: called for `aura.reasoning` events with (content, agent_id)
8184/// - `on_orchestrator_event`: called for `aura.orchestrator.*`, `aura.session_info`, and `aura.progress` events
8285///
8386/// `tool_id` is the per-call identifier emitted by the server. Callers should
@@ -94,6 +97,7 @@ pub async fn process_sse_events<S, E>(
9497 mut on_tool_start : impl FnMut ( & str , & str ) ,
9598 mut on_tool_complete : impl FnMut ( & str , & str , Duration , Option < & str > ) ,
9699 mut on_usage : impl FnMut ( u64 , u64 ) ,
100+ mut on_reasoning : impl FnMut ( & str , & str , & BTreeMap < String , serde_json:: Value > ) ,
97101 mut on_raw_event : impl FnMut ( & str , & str ) ,
98102 mut on_orchestrator_event : impl FnMut ( & str , & serde_json:: Value ) ,
99103) -> Result < StreamResult >
@@ -213,9 +217,34 @@ where
213217 "aura.tool_usage" => {
214218 // Silently skip — usage is tracked via aura.usage at stream end
215219 }
220+ "aura.reasoning" => {
221+ // Parse the raw payload into a BTreeMap so the consumer
222+ // can render every wire-level field (agent_id, content,
223+ // parent_agent_id, session_id, trace_id, ...) in /expand
224+ // without us having to enumerate them here. Separately
225+ // extract `content` and `agent_id` as named args because
226+ // every consumer wants those for the live block header
227+ // and body.
228+ if let Ok ( serde_json:: Value :: Object ( map) ) =
229+ serde_json:: from_str :: < serde_json:: Value > ( & event. data )
230+ {
231+ let content = map
232+ . get ( "content" )
233+ . and_then ( |v| v. as_str ( ) )
234+ . unwrap_or ( "" )
235+ . to_string ( ) ;
236+ let agent_id = map
237+ . get ( "agent_id" )
238+ . and_then ( |v| v. as_str ( ) )
239+ . unwrap_or ( "" )
240+ . to_string ( ) ;
241+ let fields: BTreeMap < String , serde_json:: Value > = map. into_iter ( ) . collect ( ) ;
242+ on_reasoning ( & content, & agent_id, & fields) ;
243+ }
244+ }
216245 _ => {
217246 // aura.orchestrator.*, aura.session_info, aura.progress,
218- // aura.reasoning, aura. worker_phase, and any future events
247+ // aura.worker_phase, and any future events
219248 if let Ok ( val) = serde_json:: from_str :: < serde_json:: Value > ( & event. data ) {
220249 on_orchestrator_event ( event_name, & val) ;
221250 }
@@ -362,6 +391,7 @@ mod tests {
362391 tools_started : Vec < ( String , String ) > ,
363392 tools_completed : Vec < ( String , String , Option < String > ) > ,
364393 usages : Vec < ( u64 , u64 ) > ,
394+ reasoning : Vec < ( String , String ) > ,
365395 raw_events : Vec < ( String , String ) > ,
366396 orchestrator_events : Vec < ( String , serde_json:: Value ) > ,
367397 }
@@ -384,6 +414,7 @@ mod tests {
384414 let tools_started = & mut caps. tools_started ;
385415 let tools_completed = & mut caps. tools_completed ;
386416 let usages = & mut caps. usages ;
417+ let reasoning = & mut caps. reasoning ;
387418 let raw_events = & mut caps. raw_events ;
388419 let orchestrator_events = & mut caps. orchestrator_events ;
389420
@@ -403,6 +434,9 @@ mod tests {
403434 ) )
404435 } ,
405436 |p, c| usages. push ( ( p, c) ) ,
437+ |content, agent_id, _fields| {
438+ reasoning. push ( ( content. to_string ( ) , agent_id. to_string ( ) ) )
439+ } ,
406440 |name, data| raw_events. push ( ( name. to_string ( ) , data. to_string ( ) ) ) ,
407441 |name, val| orchestrator_events. push ( ( name. to_string ( ) , val. clone ( ) ) ) ,
408442 )
@@ -735,6 +769,65 @@ mod tests {
735769 assert_eq ! ( caps. orchestrator_events[ 0 ] . 0 , "aura.session_info" ) ;
736770 }
737771
772+ #[ tokio:: test]
773+ async fn aura_reasoning_callback_fires ( ) {
774+ let data = serde_json:: json!( {
775+ "content" : "Let me think about this problem step by step." ,
776+ "agent_id" : "main" ,
777+ "session_id" : "s1"
778+ } ) ;
779+ let events = vec ! [ sse( "aura.reasoning" , & data. to_string( ) ) , sse( "" , "[DONE]" ) ] ;
780+ let ( _, caps) = run_stream ( events) . await ;
781+ assert_eq ! ( caps. reasoning. len( ) , 1 ) ;
782+ assert_eq ! (
783+ caps. reasoning[ 0 ] . 0 ,
784+ "Let me think about this problem step by step."
785+ ) ;
786+ assert_eq ! ( caps. reasoning[ 0 ] . 1 , "main" ) ;
787+ // Reasoning should NOT have been routed to the orchestrator fallback
788+ assert ! ( caps. orchestrator_events. is_empty( ) ) ;
789+ // But it IS a named event, so raw_event should have captured it
790+ assert ! ( caps. raw_events. iter( ) . any( |( n, _) | n == "aura.reasoning" ) ) ;
791+ }
792+
793+ #[ tokio:: test]
794+ async fn aura_reasoning_callback_carries_worker_agent_id ( ) {
795+ // In orchestration mode the server emits aura.reasoning with the
796+ // worker's agent_id so the CLI can attribute reasoning to a worker.
797+ let data = serde_json:: json!( {
798+ "content" : "Analyzing the logs." ,
799+ "agent_id" : "log_worker" ,
800+ "parent_agent_id" : "coordinator" ,
801+ "session_id" : "s1"
802+ } ) ;
803+ let events = vec ! [ sse( "aura.reasoning" , & data. to_string( ) ) , sse( "" , "[DONE]" ) ] ;
804+ let ( _, caps) = run_stream ( events) . await ;
805+ assert_eq ! ( caps. reasoning. len( ) , 1 ) ;
806+ assert_eq ! ( caps. reasoning[ 0 ] . 1 , "log_worker" ) ;
807+ }
808+
809+ #[ tokio:: test]
810+ async fn aura_reasoning_chunks_accumulate ( ) {
811+ let chunk = |c : & str | {
812+ serde_json:: json!( {
813+ "content" : c,
814+ "agent_id" : "main" ,
815+ "session_id" : "s1"
816+ } )
817+ . to_string ( )
818+ } ;
819+ let events = vec ! [
820+ sse( "aura.reasoning" , & chunk( "First, " ) ) ,
821+ sse( "aura.reasoning" , & chunk( "I'll check " ) ) ,
822+ sse( "aura.reasoning" , & chunk( "the inputs." ) ) ,
823+ sse( "" , "[DONE]" ) ,
824+ ] ;
825+ let ( _, caps) = run_stream ( events) . await ;
826+ assert_eq ! ( caps. reasoning. len( ) , 3 ) ;
827+ let joined: String = caps. reasoning . iter ( ) . map ( |( c, _) | c. as_str ( ) ) . collect ( ) ;
828+ assert_eq ! ( joined, "First, I'll check the inputs." ) ;
829+ }
830+
738831 #[ tokio:: test]
739832 async fn aura_progress_routed_to_orchestrator ( ) {
740833 let data = serde_json:: json!( { "message" : "Discovering tools" } ) ;
@@ -802,6 +895,7 @@ mod tests {
802895 |_, _| { } ,
803896 |_, _, _, _| { } ,
804897 |_, _| { } ,
898+ |_, _, _| { } ,
805899 |_, _| { } ,
806900 |_, _| { } ,
807901 )
0 commit comments