diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 515fcc56379e..269d33326f72 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -303,10 +303,10 @@ fn sample_turn_completed_notification( }) } -fn sample_turn_resolved_config(turn_id: &str) -> TurnResolvedConfigFact { +fn sample_turn_resolved_config(thread_id: &str, turn_id: &str) -> TurnResolvedConfigFact { TurnResolvedConfigFact { turn_id: turn_id.to_string(), - thread_id: "thread-2".to_string(), + thread_id: thread_id.to_string(), num_input_images: 1, submission_type: None, ephemeral: false, @@ -419,6 +419,38 @@ async fn ingest_rejected_turn_steer( /*include_started*/ false, /*include_token_usage*/ false, ) .await; + reducer + .ingest( + AnalyticsFact::Initialize { + connection_id: 8, + params: InitializeParams { + client_info: ClientInfo { + name: "codex-web".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: None, + }, + product_client_id: "codex-web".to_string(), + runtime: sample_runtime_metadata(), + rpc_transport: AppServerRpcTransport::Stdio, + }, + out, + ) + .await; + reducer + .ingest( + AnalyticsFact::ClientResponse { + connection_id: 8, + request_id: RequestId::Integer(6), + response: Box::new(sample_thread_resume_response( + "thread-2", /*ephemeral*/ false, "gpt-5", + )), + }, + out, + ) + .await; + out.clear(); reducer .ingest( AnalyticsFact::ClientRequest { @@ -519,7 +551,7 @@ async fn ingest_turn_prerequisites( reducer .ingest( AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new( - sample_turn_resolved_config("turn-2"), + sample_turn_resolved_config("thread-2", "turn-2"), ))), out, ) @@ -1436,6 +1468,110 @@ async fn subagent_thread_started_publishes_without_initialize() { assert_eq!(payload[0]["event_params"]["subagent_source"], "review"); } +#[tokio::test] +async fn subagent_thread_started_inherits_parent_connection_for_new_thread() { + let mut reducer = AnalyticsReducer::default(); + let mut events = Vec::new(); + let parent_thread_id = + codex_protocol::ThreadId::from_string("44444444-4444-4444-4444-444444444444") + .expect("valid parent thread id"); + let parent_thread_id_string = parent_thread_id.to_string(); + + reducer + .ingest( + AnalyticsFact::Initialize { + connection_id: 7, + params: InitializeParams { + client_info: ClientInfo { + name: "parent-client".to_string(), + title: None, + version: "1.0.0".to_string(), + }, + capabilities: None, + }, + product_client_id: "parent-client".to_string(), + runtime: sample_runtime_metadata(), + rpc_transport: AppServerRpcTransport::Stdio, + }, + &mut events, + ) + .await; + reducer + .ingest( + AnalyticsFact::ClientResponse { + connection_id: 7, + request_id: RequestId::Integer(1), + response: Box::new(sample_thread_start_response( + &parent_thread_id_string, + /*ephemeral*/ false, + "gpt-5", + )), + }, + &mut events, + ) + .await; + + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::SubAgentThreadStarted( + SubAgentThreadStartedInput { + thread_id: "thread-review".to_string(), + parent_thread_id: None, + product_client_id: "parent-client".to_string(), + client_name: "parent-client".to_string(), + client_version: "1.0.0".to_string(), + model: "gpt-5".to_string(), + ephemeral: false, + subagent_source: SubAgentSource::ThreadSpawn { + parent_thread_id, + depth: 1, + agent_path: None, + agent_nickname: None, + agent_role: None, + }, + created_at: 130, + }, + )), + &mut events, + ) + .await; + + events.clear(); + reducer + .ingest( + AnalyticsFact::Custom(CustomAnalyticsFact::Compaction(Box::new( + CodexCompactionEvent { + thread_id: "thread-review".to_string(), + turn_id: "turn-compact".to_string(), + trigger: CompactionTrigger::Manual, + reason: CompactionReason::UserRequested, + implementation: CompactionImplementation::Responses, + phase: CompactionPhase::StandaloneTurn, + strategy: CompactionStrategy::Memento, + status: CompactionStatus::Completed, + error: None, + active_context_tokens_before: 131_000, + active_context_tokens_after: 64_000, + started_at: 100, + completed_at: 101, + duration_ms: Some(1200), + }, + ))), + &mut events, + ) + .await; + + let payload = serde_json::to_value(&events).expect("serialize events"); + assert_eq!( + payload[0]["event_params"]["app_server_client"]["product_client_id"], + "parent-client" + ); + assert_eq!( + payload[0]["event_params"]["parent_thread_id"], + "44444444-4444-4444-4444-444444444444" + ); +} + #[test] fn plugin_used_event_serializes_expected_shape() { let tracking = TrackEventsContext { @@ -2130,7 +2266,7 @@ async fn turn_start_error_response_discards_pending_start_request() { reducer .ingest( AnalyticsFact::Custom(CustomAnalyticsFact::TurnResolvedConfig(Box::new( - sample_turn_resolved_config("turn-2"), + sample_turn_resolved_config("thread-2", "turn-2"), ))), &mut out, ) diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 43da35c47c3e..b1dc822d4365 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -74,8 +74,7 @@ pub(crate) struct AnalyticsReducer { requests: HashMap<(u64, RequestId), RequestState>, turns: HashMap, connections: HashMap, - thread_connections: HashMap, - thread_metadata: HashMap, + threads: HashMap, } struct ConnectionState { @@ -83,6 +82,69 @@ struct ConnectionState { runtime: CodexRuntimeMetadata, } +#[derive(Default)] +struct ThreadAnalyticsState { + connection_id: Option, + metadata: Option, +} + +#[derive(Clone, Copy)] +struct AnalyticsDropSite<'a> { + event_name: &'static str, + thread_id: &'a str, + turn_id: Option<&'a str>, + review_id: Option<&'a str>, + item_id: Option<&'a str>, +} + +impl<'a> AnalyticsDropSite<'a> { + fn guardian(input: &'a GuardianReviewEventParams) -> Self { + Self { + event_name: "guardian", + thread_id: &input.thread_id, + turn_id: Some(&input.turn_id), + review_id: Some(&input.review_id), + item_id: None, + } + } + + fn compaction(input: &'a CodexCompactionEvent) -> Self { + Self { + event_name: "compaction", + thread_id: &input.thread_id, + turn_id: Some(&input.turn_id), + review_id: None, + item_id: None, + } + } + + fn turn_steer(thread_id: &'a str) -> Self { + Self { + event_name: "turn steer", + thread_id, + turn_id: None, + review_id: None, + item_id: None, + } + } + + fn turn(thread_id: &'a str, turn_id: &'a str) -> Self { + Self { + event_name: "turn", + thread_id, + turn_id: Some(turn_id), + review_id: None, + item_id: None, + } + } +} + +enum MissingAnalyticsContext { + ThreadConnection, + Connection { connection_id: u64 }, + ThreadMetadata, +} + #[derive(Clone)] struct ThreadMetadataState { thread_source: Option<&'static str>, @@ -274,6 +336,26 @@ impl AnalyticsReducer { input: SubAgentThreadStartedInput, out: &mut Vec, ) { + let parent_thread_id = input + .parent_thread_id + .clone() + .or_else(|| subagent_parent_thread_id(&input.subagent_source)); + let parent_connection_id = parent_thread_id + .as_ref() + .and_then(|parent_thread_id| self.threads.get(parent_thread_id)) + .and_then(|thread| thread.connection_id); + let thread_state = self.threads.entry(input.thread_id.clone()).or_default(); + thread_state + .metadata + .get_or_insert_with(|| ThreadMetadataState { + thread_source: Some("subagent"), + initialization_mode: ThreadInitializationMode::New, + subagent_source: Some(subagent_source_name(&input.subagent_source)), + parent_thread_id, + }); + if thread_state.connection_id.is_none() { + thread_state.connection_id = parent_connection_id; + } out.push(TrackEventRequest::ThreadInitialized( subagent_thread_started_event_request(input), )); @@ -284,23 +366,9 @@ impl AnalyticsReducer { input: GuardianReviewEventParams, out: &mut Vec, ) { - let Some(connection_id) = self.thread_connections.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - review_id = %input.review_id, - "dropping guardian analytics event: missing thread connection metadata" - ); - return; - }; - let Some(connection_state) = self.connections.get(connection_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - review_id = %input.review_id, - connection_id, - "dropping guardian analytics event: missing connection metadata" - ); + let Some(connection_state) = + self.thread_connection_or_warn(AnalyticsDropSite::guardian(&input)) + else { return; }; out.push(TrackEventRequest::GuardianReview(Box::new( @@ -686,10 +754,13 @@ impl AnalyticsReducer { }; let thread_metadata = ThreadMetadataState::from_thread_metadata(&thread_source, initialization_mode); - self.thread_connections - .insert(thread_id.clone(), connection_id); - self.thread_metadata - .insert(thread_id.clone(), thread_metadata.clone()); + self.threads.insert( + thread_id.clone(), + ThreadAnalyticsState { + connection_id: Some(connection_id), + metadata: Some(thread_metadata.clone()), + }, + ); out.push(TrackEventRequest::ThreadInitialized( ThreadInitializedEvent { event_type: "codex_thread_initialized", @@ -710,29 +781,9 @@ impl AnalyticsReducer { } fn ingest_compaction(&mut self, input: CodexCompactionEvent, out: &mut Vec) { - let Some(connection_id) = self.thread_connections.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - "dropping compaction analytics event: missing thread connection metadata" - ); - return; - }; - let Some(connection_state) = self.connections.get(connection_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - connection_id, - "dropping compaction analytics event: missing connection metadata" - ); - return; - }; - let Some(thread_metadata) = self.thread_metadata.get(&input.thread_id) else { - tracing::warn!( - thread_id = %input.thread_id, - turn_id = %input.turn_id, - "dropping compaction analytics event: missing thread lifecycle metadata" - ); + let Some((connection_state, thread_metadata)) = + self.thread_context_or_warn(AnalyticsDropSite::compaction(&input)) + else { return; }; out.push(TrackEventRequest::Compaction(Box::new( @@ -787,11 +838,13 @@ impl AnalyticsReducer { let Some(connection_state) = self.connections.get(&connection_id) else { return; }; - let Some(thread_metadata) = self.thread_metadata.get(&pending_request.thread_id) else { - tracing::warn!( - thread_id = %pending_request.thread_id, - "dropping turn steer analytics event: missing thread lifecycle metadata" - ); + let drop_site = AnalyticsDropSite::turn_steer(&pending_request.thread_id); + let Some(thread_metadata) = self + .threads + .get(drop_site.thread_id) + .and_then(|thread| thread.metadata.as_ref()) + else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); return; }; out.push(TrackEventRequest::TurnSteer(CodexTurnSteerEventRequest { @@ -824,42 +877,34 @@ impl AnalyticsReducer { { return; } - let connection_metadata = turn_state - .connection_id - .and_then(|connection_id| self.connections.get(&connection_id)) - .map(|connection_state| { - ( - connection_state.app_server_client.clone(), - connection_state.runtime.clone(), - ) - }); - let Some((app_server_client, runtime)) = connection_metadata else { - if let Some(connection_id) = turn_state.connection_id { - tracing::warn!( - turn_id, - connection_id, - "dropping turn analytics event: missing connection metadata" - ); - } + let Some(thread_id) = turn_state.thread_id.as_ref() else { return; }; - let Some(thread_id) = turn_state.thread_id.as_ref() else { + let Some(connection_id) = turn_state.connection_id else { return; }; - let Some(thread_metadata) = self.thread_metadata.get(thread_id) else { - tracing::warn!( - thread_id, - turn_id, - "dropping turn analytics event: missing thread lifecycle metadata" + let Some(connection_state) = self.connections.get(&connection_id) else { + warn_missing_analytics_context( + &AnalyticsDropSite::turn(thread_id, turn_id), + MissingAnalyticsContext::Connection { connection_id }, ); return; }; + let drop_site = AnalyticsDropSite::turn(thread_id, turn_id); + let Some(thread_metadata) = self + .threads + .get(drop_site.thread_id) + .and_then(|thread| thread.metadata.as_ref()) + else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); + return; + }; out.push(TrackEventRequest::TurnEvent(Box::new( CodexTurnEventRequest { event_type: "codex_turn_event", event_params: codex_turn_event_params( - app_server_client, - runtime, + connection_state.app_server_client.clone(), + connection_state.runtime.clone(), turn_id.to_string(), turn_state, thread_metadata, @@ -868,6 +913,67 @@ impl AnalyticsReducer { ))); self.turns.remove(turn_id); } + + fn thread_connection_or_warn( + &self, + drop_site: AnalyticsDropSite<'_>, + ) -> Option<&ConnectionState> { + let Some(thread_state) = self.threads.get(drop_site.thread_id) else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadConnection); + return None; + }; + let Some(connection_id) = thread_state.connection_id else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadConnection); + return None; + }; + let Some(connection_state) = self.connections.get(&connection_id) else { + warn_missing_analytics_context( + &drop_site, + MissingAnalyticsContext::Connection { connection_id }, + ); + return None; + }; + Some(connection_state) + } + + fn thread_context_or_warn( + &self, + drop_site: AnalyticsDropSite<'_>, + ) -> Option<(&ConnectionState, &ThreadMetadataState)> { + let connection_state = self.thread_connection_or_warn(drop_site)?; + let Some(thread_metadata) = self + .threads + .get(drop_site.thread_id) + .and_then(|thread| thread.metadata.as_ref()) + else { + warn_missing_analytics_context(&drop_site, MissingAnalyticsContext::ThreadMetadata); + return None; + }; + Some((connection_state, thread_metadata)) + } +} + +fn warn_missing_analytics_context( + drop_site: &AnalyticsDropSite<'_>, + missing: MissingAnalyticsContext, +) { + let (missing_context, connection_id) = match missing { + MissingAnalyticsContext::ThreadConnection => ("thread_connection", None), + MissingAnalyticsContext::Connection { connection_id } => { + ("connection", Some(connection_id)) + } + MissingAnalyticsContext::ThreadMetadata => ("thread_metadata", None), + }; + tracing::warn!( + thread_id = %drop_site.thread_id, + turn_id = ?drop_site.turn_id, + review_id = ?drop_site.review_id, + item_id = ?drop_site.item_id, + missing_context, + connection_id, + "dropping {} analytics event: missing analytics context", + drop_site.event_name + ); } fn codex_turn_event_params(