@@ -26,6 +26,8 @@ pub(crate) struct SessionTranslationState {
2626 agent_message_item_id : Option < String > ,
2727 /// Stable item ID for the current reasoning stream.
2828 reasoning_item_id : Option < String > ,
29+ /// OpenCode part ID for the current reasoning part (used to route `message.part.delta` events).
30+ reasoning_part_id : Option < String > ,
2931 /// Stable item ID for contiguous user-message chunk streams.
3032 pub ( crate ) user_message_item_id : Option < String > ,
3133 /// Buffered text for contiguous user-message chunk streams.
@@ -41,6 +43,7 @@ impl SessionTranslationState {
4143 tool_call_items : HashMap :: new ( ) ,
4244 agent_message_item_id : None ,
4345 reasoning_item_id : None ,
46+ reasoning_part_id : None ,
4447 user_message_item_id : None ,
4548 user_message_text : String :: new ( ) ,
4649 }
@@ -58,6 +61,7 @@ impl SessionTranslationState {
5861 self . tool_call_items . clear ( ) ;
5962 self . agent_message_item_id = None ;
6063 self . reasoning_item_id = None ;
64+ self . reasoning_part_id = None ;
6165 self . user_message_item_id = None ;
6266 self . user_message_text . clear ( ) ;
6367 }
@@ -69,6 +73,7 @@ impl SessionTranslationState {
6973 self . tool_call_items . clear ( ) ;
7074 self . agent_message_item_id = None ;
7175 self . reasoning_item_id = None ;
76+ self . reasoning_part_id = None ;
7277 self . user_message_item_id = None ;
7378 self . user_message_text . clear ( ) ;
7479 }
@@ -107,6 +112,7 @@ impl SessionTranslationState {
107112 self . tool_call_items . clear ( ) ;
108113 self . agent_message_item_id = None ;
109114 self . reasoning_item_id = None ;
115+ self . reasoning_part_id = None ;
110116 }
111117}
112118
@@ -132,6 +138,7 @@ pub(crate) fn translate_sse_event(
132138
133139 match event_type {
134140 "message.part.updated" => translate_part_updated ( properties, state) ,
141+ "message.part.delta" => translate_part_delta ( properties, state) ,
135142 "message.updated" => translate_message_updated ( properties, state) ,
136143 "session.status" => translate_session_status ( properties, state) ,
137144 "permission.updated" => translate_sse_permission ( properties, state) ,
@@ -201,6 +208,11 @@ fn translate_part_updated(properties: &Value, state: &mut SessionTranslationStat
201208 }
202209
203210 "reasoning" => {
211+ // Track the OpenCode part ID so `message.part.delta` events can be
212+ // routed to reasoning vs text.
213+ if let Some ( pid) = part. get ( "id" ) . and_then ( |v| v. as_str ( ) ) {
214+ state. reasoning_part_id = Some ( pid. to_string ( ) ) ;
215+ }
204216 if delta. is_empty ( ) {
205217 return vec ! [ ] ;
206218 }
@@ -222,6 +234,66 @@ fn translate_part_updated(properties: &Value, state: &mut SessionTranslationStat
222234 }
223235}
224236
237+ // ---------------------------------------------------------------------------
238+ // message.part.delta — incremental text/reasoning streaming chunks
239+ // ---------------------------------------------------------------------------
240+
241+ fn translate_part_delta ( properties : & Value , state : & mut SessionTranslationState ) -> Vec < Value > {
242+ let delta = match properties. get ( "delta" ) . and_then ( |v| v. as_str ( ) ) {
243+ Some ( d) if !d. is_empty ( ) => d,
244+ _ => return vec ! [ ] ,
245+ } ;
246+
247+ if let Some ( sid) = properties. get ( "sessionID" ) . and_then ( |v| v. as_str ( ) ) {
248+ if !sid. is_empty ( ) {
249+ state. session_id = sid. to_string ( ) ;
250+ }
251+ }
252+
253+ let field = properties
254+ . get ( "field" )
255+ . and_then ( |v| v. as_str ( ) )
256+ . unwrap_or ( "text" ) ;
257+ let thread_id = state. session_id . clone ( ) ;
258+ let turn_id = state. current_turn_id . clone ( ) ;
259+
260+ match field {
261+ "text" => {
262+ let part_id = properties
263+ . get ( "partID" )
264+ . and_then ( |v| v. as_str ( ) )
265+ . unwrap_or_default ( ) ;
266+
267+ if let Some ( ref reasoning_part_id) = state. reasoning_part_id {
268+ if part_id == reasoning_part_id {
269+ let item_id = state. reasoning_item ( ) ;
270+ return vec ! [ json!( {
271+ "method" : "item/reasoning/textDelta" ,
272+ "params" : {
273+ "threadId" : thread_id,
274+ "turnId" : turn_id,
275+ "itemId" : item_id,
276+ "delta" : delta
277+ }
278+ } ) ] ;
279+ }
280+ }
281+
282+ let item_id = state. agent_message_item ( ) ;
283+ vec ! [ json!( {
284+ "method" : "item/agentMessage/delta" ,
285+ "params" : {
286+ "threadId" : thread_id,
287+ "turnId" : turn_id,
288+ "itemId" : item_id,
289+ "delta" : delta
290+ }
291+ } ) ]
292+ }
293+ _ => vec ! [ ] ,
294+ }
295+ }
296+
225297// ---------------------------------------------------------------------------
226298// Tool part translation
227299// ---------------------------------------------------------------------------
@@ -362,24 +434,36 @@ fn translate_message_updated(
362434 Some ( i) => i,
363435 None => return vec ! [ ] ,
364436 } ;
437+
438+ if let Some ( sid) = info. get ( "sessionID" ) . and_then ( |v| v. as_str ( ) ) {
439+ if !sid. is_empty ( ) {
440+ state. session_id = sid. to_string ( ) ;
441+ }
442+ }
443+
365444 let thread_id = state. session_id . clone ( ) ;
366445
367- // Extract token usage from message info if available.
368- let input_tokens = info
369- . get ( "inputTokens" )
446+ // Token usage: try nested `tokens` object first (current format), then flat keys (legacy).
447+ let tokens = info. get ( "tokens" ) ;
448+ let input_tokens = tokens
449+ . and_then ( |t| t. get ( "input" ) )
450+ . or_else ( || info. get ( "inputTokens" ) )
370451 . and_then ( |v| v. as_u64 ( ) )
371452 . unwrap_or ( 0 ) ;
372- let output_tokens = info
373- . get ( "outputTokens" )
453+ let output_tokens = tokens
454+ . and_then ( |t| t. get ( "output" ) )
455+ . or_else ( || info. get ( "outputTokens" ) )
374456 . and_then ( |v| v. as_u64 ( ) )
375457 . unwrap_or ( 0 ) ;
376- let cached_tokens = info
377- . get ( "cachedInputTokens" )
458+ let cached_tokens = tokens
459+ . and_then ( |t| t. get ( "cache" ) . and_then ( |c| c. get ( "read" ) ) )
460+ . or_else ( || info. get ( "cachedInputTokens" ) )
378461 . or_else ( || info. get ( "cacheReadInputTokens" ) )
379462 . and_then ( |v| v. as_u64 ( ) )
380463 . unwrap_or ( 0 ) ;
381- let reasoning_tokens = info
382- . get ( "reasoningOutputTokens" )
464+ let reasoning_tokens = tokens
465+ . and_then ( |t| t. get ( "reasoning" ) )
466+ . or_else ( || info. get ( "reasoningOutputTokens" ) )
383467 . or_else ( || info. get ( "reasoningTokens" ) )
384468 . and_then ( |v| v. as_u64 ( ) )
385469 . unwrap_or ( 0 ) ;
@@ -953,4 +1037,113 @@ mod tests {
9531037 let events = translate_sse_event ( & event, & mut state) ;
9541038 assert_eq ! ( events[ 0 ] [ "params" ] [ "delta" ] , " line with trailing space " ) ;
9551039 }
1040+
1041+ #[ test]
1042+ fn part_delta_text_produces_agent_message_delta ( ) {
1043+ let mut state = make_state ( ) ;
1044+ let event = json ! ( {
1045+ "type" : "message.part.delta" ,
1046+ "properties" : {
1047+ "sessionID" : "ses_test123" ,
1048+ "messageID" : "msg_1" ,
1049+ "partID" : "prt_text_1" ,
1050+ "field" : "text" ,
1051+ "delta" : "hello"
1052+ }
1053+ } ) ;
1054+ let events = translate_sse_event ( & event, & mut state) ;
1055+ assert_eq ! ( events. len( ) , 1 ) ;
1056+ assert_eq ! ( events[ 0 ] [ "method" ] , "item/agentMessage/delta" ) ;
1057+ assert_eq ! ( events[ 0 ] [ "params" ] [ "threadId" ] , "ses_test123" ) ;
1058+ assert_eq ! ( events[ 0 ] [ "params" ] [ "delta" ] , "hello" ) ;
1059+ }
1060+
1061+ #[ test]
1062+ fn part_delta_routes_reasoning_by_part_id ( ) {
1063+ let mut state = make_state ( ) ;
1064+
1065+ // First, announce a reasoning part via message.part.updated.
1066+ let reasoning_announce = json ! ( {
1067+ "type" : "message.part.updated" ,
1068+ "properties" : {
1069+ "part" : {
1070+ "type" : "reasoning" ,
1071+ "id" : "prt_reasoning_1" ,
1072+ "sessionID" : "ses_test123" ,
1073+ "text" : ""
1074+ }
1075+ }
1076+ } ) ;
1077+ translate_sse_event ( & reasoning_announce, & mut state) ;
1078+
1079+ // Now a delta for that reasoning part should produce reasoning event.
1080+ let delta_event = json ! ( {
1081+ "type" : "message.part.delta" ,
1082+ "properties" : {
1083+ "sessionID" : "ses_test123" ,
1084+ "partID" : "prt_reasoning_1" ,
1085+ "field" : "text" ,
1086+ "delta" : "thinking..."
1087+ }
1088+ } ) ;
1089+ let events = translate_sse_event ( & delta_event, & mut state) ;
1090+ assert_eq ! ( events. len( ) , 1 ) ;
1091+ assert_eq ! ( events[ 0 ] [ "method" ] , "item/reasoning/textDelta" ) ;
1092+ assert_eq ! ( events[ 0 ] [ "params" ] [ "delta" ] , "thinking..." ) ;
1093+ }
1094+
1095+ #[ test]
1096+ fn nested_token_format_produces_token_usage ( ) {
1097+ let mut state = make_state ( ) ;
1098+ let event = json ! ( {
1099+ "type" : "message.updated" ,
1100+ "properties" : {
1101+ "info" : {
1102+ "sessionID" : "ses_test123" ,
1103+ "tokens" : {
1104+ "total" : 6000 ,
1105+ "input" : 5000 ,
1106+ "output" : 1000 ,
1107+ "reasoning" : 50 ,
1108+ "cache" : { "read" : 200 , "write" : 0 }
1109+ }
1110+ }
1111+ }
1112+ } ) ;
1113+ let events = translate_sse_event ( & event, & mut state) ;
1114+ assert_eq ! ( events. len( ) , 1 ) ;
1115+ assert_eq ! ( events[ 0 ] [ "method" ] , "thread/tokenUsage/updated" ) ;
1116+ assert_eq ! (
1117+ events[ 0 ] [ "params" ] [ "tokenUsage" ] [ "total" ] [ "inputTokens" ] ,
1118+ 5000
1119+ ) ;
1120+ assert_eq ! (
1121+ events[ 0 ] [ "params" ] [ "tokenUsage" ] [ "total" ] [ "outputTokens" ] ,
1122+ 1000
1123+ ) ;
1124+ assert_eq ! (
1125+ events[ 0 ] [ "params" ] [ "tokenUsage" ] [ "total" ] [ "cachedInputTokens" ] ,
1126+ 200
1127+ ) ;
1128+ assert_eq ! (
1129+ events[ 0 ] [ "params" ] [ "tokenUsage" ] [ "total" ] [ "reasoningOutputTokens" ] ,
1130+ 50
1131+ ) ;
1132+ }
1133+
1134+ #[ test]
1135+ fn part_delta_empty_delta_returns_empty ( ) {
1136+ let mut state = make_state ( ) ;
1137+ let event = json ! ( {
1138+ "type" : "message.part.delta" ,
1139+ "properties" : {
1140+ "sessionID" : "ses_test123" ,
1141+ "partID" : "prt_1" ,
1142+ "field" : "text" ,
1143+ "delta" : ""
1144+ }
1145+ } ) ;
1146+ let events = translate_sse_event ( & event, & mut state) ;
1147+ assert ! ( events. is_empty( ) ) ;
1148+ }
9561149}
0 commit comments