diff --git a/codex-rs/analytics/src/analytics_client_tests.rs b/codex-rs/analytics/src/analytics_client_tests.rs index 352acbe1fc16..9f88ceeb94e7 100644 --- a/codex-rs/analytics/src/analytics_client_tests.rs +++ b/codex-rs/analytics/src/analytics_client_tests.rs @@ -426,7 +426,7 @@ async fn ingest_rejected_turn_steer( .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -486,7 +486,7 @@ async fn ingest_turn_prerequisites( ingest_initialize(reducer, out).await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-2", /*ephemeral*/ false, "gpt-5", @@ -500,7 +500,7 @@ async fn ingest_turn_prerequisites( reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -510,7 +510,7 @@ async fn ingest_turn_prerequisites( .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -862,7 +862,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-no-client", @@ -906,7 +906,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response( "thread-1", /*ephemeral*/ true, "gpt-5", @@ -986,7 +986,7 @@ async fn compaction_event_ingests_custom_fact() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_resume_response_with_source( "thread-1", @@ -1097,7 +1097,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_thread_start_response( "thread-guardian", @@ -1867,7 +1867,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -1879,7 +1879,7 @@ async fn accepted_turn_steer_emits_expected_event() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2021,7 +2021,7 @@ async fn turn_start_error_response_discards_pending_start_request() { ingest_initialize(&mut reducer, &mut out).await; reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(3), request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)), @@ -2045,7 +2045,7 @@ async fn turn_start_error_response_discards_pending_start_request() { // failed turn/start request and attach request-scoped connection metadata. reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)), }, @@ -2162,7 +2162,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(4), request: Box::new(sample_turn_steer_request( @@ -2174,7 +2174,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)), }, @@ -2184,7 +2184,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(5), request: Box::new(sample_turn_steer_request( @@ -2208,7 +2208,7 @@ async fn accepted_steers_increment_turn_steer_count() { reducer .ingest( - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id: 7, request_id: RequestId::Integer(6), request: Box::new(sample_turn_steer_request( @@ -2220,7 +2220,7 @@ async fn accepted_steers_increment_turn_steer_count() { .await; reducer .ingest( - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id: 7, response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)), }, diff --git a/codex-rs/analytics/src/client.rs b/codex-rs/analytics/src/client.rs index e145a00d1dcf..b14961435341 100644 --- a/codex-rs/analytics/src/client.rs +++ b/codex-rs/analytics/src/client.rs @@ -27,6 +27,8 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_login::AuthManager; use codex_login::default_client::create_client; use codex_plugin::PluginTelemetryMetadata; @@ -49,8 +51,7 @@ pub(crate) struct AnalyticsEventsQueue { #[derive(Clone)] pub struct AnalyticsEventsClient { - queue: AnalyticsEventsQueue, - analytics_enabled: Option, + queue: Option, } impl AnalyticsEventsQueue { @@ -119,11 +120,15 @@ impl AnalyticsEventsClient { analytics_enabled: Option, ) -> Self { Self { - queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url), - analytics_enabled, + queue: (analytics_enabled != Some(false)) + .then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)), } } + pub fn disabled() -> Self { + Self { queue: None } + } + pub fn track_skill_invocations( &self, tracking: TrackEventsContext, @@ -182,7 +187,7 @@ impl AnalyticsEventsClient { } pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) { - self.record_fact(AnalyticsFact::Request { + self.record_fact(AnalyticsFact::ClientRequest { connection_id, request_id, request: Box::new(request), @@ -190,7 +195,10 @@ impl AnalyticsEventsClient { } pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) { - if !self.queue.should_enqueue_app_used(&tracking, &app) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_app_used(&tracking, &app) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed( @@ -205,7 +213,10 @@ impl AnalyticsEventsClient { } pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) { - if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) { + let Some(queue) = self.queue.as_ref() else { + return; + }; + if !queue.should_enqueue_plugin_used(&tracking, &plugin) { return; } self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed( @@ -268,14 +279,13 @@ impl AnalyticsEventsClient { } pub(crate) fn record_fact(&self, input: AnalyticsFact) { - if self.analytics_enabled == Some(false) { - return; + if let Some(queue) = self.queue.as_ref() { + queue.try_send(input); } - self.queue.try_send(input); } pub fn track_response(&self, connection_id: u64, response: ClientResponse) { - self.record_fact(AnalyticsFact::Response { + self.record_fact(AnalyticsFact::ClientResponse { connection_id, response: Box::new(response), }); @@ -299,6 +309,19 @@ impl AnalyticsEventsClient { pub fn track_notification(&self, notification: ServerNotification) { self.record_fact(AnalyticsFact::Notification(Box::new(notification))); } + + pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) { + self.record_fact(AnalyticsFact::ServerRequest { + connection_id, + request: Box::new(request), + }); + } + + pub fn track_server_response(&self, response: ServerResponse) { + self.record_fact(AnalyticsFact::ServerResponse { + response: Box::new(response), + }); + } } async fn send_track_events( diff --git a/codex-rs/analytics/src/facts.rs b/codex-rs/analytics/src/facts.rs index 8ebff278c4e5..86beb0b88fae 100644 --- a/codex-rs/analytics/src/facts.rs +++ b/codex-rs/analytics/src/facts.rs @@ -7,6 +7,8 @@ use codex_app_server_protocol::InitializeParams; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ServerResponse; use codex_plugin::PluginTelemetryMetadata; use codex_protocol::config_types::ApprovalsReviewer; use codex_protocol::config_types::ModeKind; @@ -272,12 +274,12 @@ pub(crate) enum AnalyticsFact { runtime: CodexRuntimeMetadata, rpc_transport: AppServerRpcTransport, }, - Request { + ClientRequest { connection_id: u64, request_id: RequestId, request: Box, }, - Response { + ClientResponse { connection_id: u64, response: Box, }, @@ -287,6 +289,13 @@ pub(crate) enum AnalyticsFact { error: JSONRPCErrorError, error_type: Option, }, + ServerRequest { + connection_id: u64, + request: Box, + }, + ServerResponse { + response: Box, + }, Notification(Box), // Facts that do not naturally exist on the app-server protocol surface, or // would require non-trivial protocol reshaping on this branch. diff --git a/codex-rs/analytics/src/reducer.rs b/codex-rs/analytics/src/reducer.rs index 290dbe28d318..33e89df0114d 100644 --- a/codex-rs/analytics/src/reducer.rs +++ b/codex-rs/analytics/src/reducer.rs @@ -172,14 +172,14 @@ impl AnalyticsReducer { rpc_transport, ); } - AnalyticsFact::Request { + AnalyticsFact::ClientRequest { connection_id, request_id, request, } => { self.ingest_request(connection_id, request_id, *request); } - AnalyticsFact::Response { + AnalyticsFact::ClientResponse { connection_id, response, } => { @@ -196,6 +196,13 @@ impl AnalyticsReducer { AnalyticsFact::Notification(notification) => { self.ingest_notification(*notification, out); } + AnalyticsFact::ServerRequest { + connection_id: _connection_id, + request: _request, + } => {} + AnalyticsFact::ServerResponse { + response: _response, + } => {} AnalyticsFact::Custom(input) => match input { CustomAnalyticsFact::SubAgentThreadStarted(input) => { self.ingest_subagent_thread_started(input, out); diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 01985b7bd3fc..e89af03d859e 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -889,6 +889,23 @@ macro_rules! server_request_definitions { $(Self::$variant { request_id, .. } => request_id,)* } } + + pub fn response_from_result( + &self, + result: crate::Result, + ) -> serde_json::Result { + match self { + $( + Self::$variant { request_id, .. } => { + let response = serde_json::from_value::<$response>(result)?; + Ok(ServerResponse::$variant { + request_id: request_id.clone(), + response, + }) + } + )* + } + } } /// Typed response from the client to the server. diff --git a/codex-rs/app-server/src/analytics_utils.rs b/codex-rs/app-server/src/analytics_utils.rs new file mode 100644 index 000000000000..24ed12d2ad3c --- /dev/null +++ b/codex-rs/app-server/src/analytics_utils.rs @@ -0,0 +1,16 @@ +use std::sync::Arc; + +use codex_analytics::AnalyticsEventsClient; +use codex_core::config::Config; +use codex_login::AuthManager; + +pub(crate) fn analytics_events_client_from_config( + auth_manager: Arc, + config: &Config, +) -> AnalyticsEventsClient { + AnalyticsEventsClient::new( + auth_manager, + config.chatgpt_base_url.trim_end_matches('/').to_string(), + config.analytics_enabled, + ) +} diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index d67efed164cb..061d94ee919c 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -3436,7 +3436,10 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3505,7 +3508,10 @@ mod tests { let conversation_id = ThreadId::new(); let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -3595,7 +3601,10 @@ mod tests { let thread_state = new_thread_state(); let thread_watch_manager = ThreadWatchManager::new(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4221,7 +4230,10 @@ mod tests { let conversation_id = ThreadId::new(); let event_turn_id = "complete1".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4287,7 +4299,10 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4335,7 +4350,10 @@ mod tests { ) .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4377,7 +4395,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4431,7 +4452,10 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-123".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4520,7 +4544,10 @@ mod tests { let conversation_id = ThreadId::new(); let turn_id = "turn-456".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4593,7 +4620,10 @@ mod tests { let thread_state = new_thread_state(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4855,7 +4885,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4893,7 +4926,10 @@ mod tests { #[tokio::test] async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, vec![ConnectionId(1)], @@ -4919,7 +4955,10 @@ mod tests { #[tokio::test] async fn test_hook_prompt_raw_response_emits_item_completed() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let conversation_id = ThreadId::new(); let outgoing = ThreadScopedOutgoingMessageSender::new( outgoing, diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 77fe5f9ec99e..2076896f4072 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -10593,7 +10593,10 @@ mod tests { let connection_id = ConnectionId(7); let (outgoing_tx, mut outgoing_rx) = tokio::sync::mpsc::channel(8); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), vec![connection_id], diff --git a/codex-rs/app-server/src/command_exec.rs b/codex-rs/app-server/src/command_exec.rs index 18077a994219..fc4761c1c68d 100644 --- a/codex-rs/app-server/src/command_exec.rs +++ b/codex-rs/app-server/src/command_exec.rs @@ -726,7 +726,10 @@ mod tests { let manager = CommandExecManager::default(); let err = manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: ConnectionRequestId { connection_id: ConnectionId(1), request_id: codex_app_server_protocol::RequestId::Integer(42), @@ -762,7 +765,10 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: request_id.clone(), process_id: Some("proc-99".to_string()), exec_request: windows_sandbox_exec_request(), @@ -809,7 +815,10 @@ mod tests { manager .start(StartCommandExecParams { - outgoing: Arc::new(OutgoingMessageSender::new(tx)), + outgoing: Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), request_id: request_id.clone(), process_id: Some("proc-100".to_string()), exec_request: ExecRequest::new( diff --git a/codex-rs/app-server/src/fs_watch.rs b/codex-rs/app-server/src/fs_watch.rs index 4ae1ca149e82..47248451a2cb 100644 --- a/codex-rs/app-server/src/fs_watch.rs +++ b/codex-rs/app-server/src/fs_watch.rs @@ -234,7 +234,10 @@ mod tests { const OUTGOING_BUFFER: usize = 1; let (tx, _rx) = mpsc::channel(OUTGOING_BUFFER); FsWatchManager::new_with_file_watcher( - Arc::new(OutgoingMessageSender::new(tx)), + Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )), Arc::new(FileWatcher::noop()), ) } diff --git a/codex-rs/app-server/src/in_process.rs b/codex-rs/app-server/src/in_process.rs index 5ee044b14a45..0f7a31d6cb0d 100644 --- a/codex-rs/app-server/src/in_process.rs +++ b/codex-rs/app-server/src/in_process.rs @@ -50,6 +50,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use crate::analytics_utils::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; @@ -365,7 +366,15 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let runtime_handle = tokio::spawn(async move { let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(channel_capacity); - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); + let auth_manager = + AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env) + .await; + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), args.config.as_ref()); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let (writer_tx, mut writer_rx) = mpsc::channel::(channel_capacity); let outbound_initialized = Arc::new(AtomicBool::new(false)); @@ -390,9 +399,6 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { }); let processor_outgoing = Arc::clone(&outgoing_message_sender); - let auth_manager = - AuthManager::shared_from_config(args.config.as_ref(), args.enable_codex_api_key_env) - .await; let config_manager = ConfigManager::new( args.config.codex_home.to_path_buf(), args.cli_overrides, @@ -405,6 +411,7 @@ fn start_uninitialized(args: InProcessStartArgs) -> InProcessClientHandle { let mut processor_handle = tokio::spawn(async move { let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: Arc::clone(&processor_outgoing), + analytics_events_client, arg0_paths: args.arg0_paths, config: args.config, config_manager, diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 8f68fc95cc77..4df869551e79 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; +use crate::analytics_utils::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::message_processor::MessageProcessor; use crate::message_processor::MessageProcessorArgs; @@ -69,6 +70,7 @@ use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::registry::Registry; use tracing_subscriber::util::SubscriberInitExt; +mod analytics_utils; mod app_server_tracing; mod bespoke_event_handling; mod codex_message_processor; @@ -728,13 +730,19 @@ pub async fn run_main_with_transport_options( }); let processor_handle = tokio::spawn({ - let outgoing_message_sender = Arc::new(OutgoingMessageSender::new(outgoing_tx)); - let initialize_notification_sender = outgoing_message_sender.clone(); - let outbound_control_tx = outbound_control_tx; let auth_manager = AuthManager::shared_from_config(&config, /*enable_codex_api_key_env*/ false).await; + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), &config); + let outgoing_message_sender = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); + let initialize_notification_sender = outgoing_message_sender.clone(); + let outbound_control_tx = outbound_control_tx; let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing: outgoing_message_sender, + analytics_events_client, arg0_paths, config: Arc::new(config), config_manager, diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index dbe01322479a..cbe55882491c 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -247,6 +247,7 @@ impl ConnectionSessionState { pub(crate) struct MessageProcessorArgs { pub(crate) outgoing: Arc, + pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) arg0_paths: Arg0DispatchPaths, pub(crate) config: Arc, pub(crate) config_manager: ConfigManager, @@ -267,6 +268,7 @@ impl MessageProcessor { pub(crate) fn new(args: MessageProcessorArgs) -> Self { let MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths, config, config_manager, @@ -283,11 +285,6 @@ impl MessageProcessor { auth_manager.set_external_auth(Arc::new(ExternalAuthRefreshBridge { outgoing: outgoing.clone(), })); - let analytics_events_client = AnalyticsEventsClient::new( - Arc::clone(&auth_manager), - config.chatgpt_base_url.trim_end_matches('/').to_string(), - config.analytics_enabled, - ); let thread_manager = Arc::new(ThreadManager::new( config.as_ref(), auth_manager.clone(), diff --git a/codex-rs/app-server/src/message_processor/tracing_tests.rs b/codex-rs/app-server/src/message_processor/tracing_tests.rs index 850d05c04e0c..bb247be59e6c 100644 --- a/codex-rs/app-server/src/message_processor/tracing_tests.rs +++ b/codex-rs/app-server/src/message_processor/tracing_tests.rs @@ -1,6 +1,7 @@ use super::ConnectionSessionState; use super::MessageProcessor; use super::MessageProcessorArgs; +use crate::analytics_utils::analytics_events_client_from_config; use crate::config_manager::ConfigManager; use crate::outgoing_message::ConnectionId; use crate::outgoing_message::OutgoingMessageSender; @@ -264,7 +265,6 @@ async fn build_test_processor( mpsc::Receiver, ) { let (outgoing_tx, outgoing_rx) = mpsc::channel(16); - let outgoing = Arc::new(OutgoingMessageSender::new(outgoing_tx)); let auth_manager = AuthManager::shared_from_config(config.as_ref(), /*enable_codex_api_key_env*/ false).await; let config_manager = ConfigManager::new( @@ -275,8 +275,15 @@ async fn build_test_processor( Arg0DispatchPaths::default(), Arc::new(codex_config::NoopThreadConfigLoader), ); + let analytics_events_client = + analytics_events_client_from_config(Arc::clone(&auth_manager), config.as_ref()); + let outgoing = Arc::new(OutgoingMessageSender::new( + outgoing_tx, + analytics_events_client.clone(), + )); let processor = Arc::new(MessageProcessor::new(MessageProcessorArgs { outgoing, + analytics_events_client, arg0_paths: Arg0DispatchPaths::default(), config, config_manager, diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index 6ca1fcabaf32..300a7044e517 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; +use codex_analytics::AnalyticsEventsClient; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::Result; @@ -118,6 +119,7 @@ pub(crate) struct OutgoingMessageSender { /// We keep them here because this is where responses, errors, and /// disconnect cleanup all get handled. request_contexts: Mutex>, + analytics_events_client: AnalyticsEventsClient, } #[derive(Clone)] @@ -204,12 +206,16 @@ impl ThreadScopedOutgoingMessageSender { } impl OutgoingMessageSender { - pub(crate) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new( + sender: mpsc::Sender, + analytics_events_client: AnalyticsEventsClient, + ) -> Self { Self { next_server_request_id: AtomicI64::new(0), sender, request_id_to_callback: Mutex::new(HashMap::new()), request_contexts: Mutex::new(HashMap::new()), + analytics_events_client, } } @@ -299,7 +305,7 @@ impl OutgoingMessageSender { ); } - let outgoing_message = OutgoingMessage::Request(request); + let outgoing_message = OutgoingMessage::Request(request.clone()); let send_result = match connection_ids { None => { self.sender @@ -322,6 +328,9 @@ impl OutgoingMessageSender { { send_error = Some(err); break; + } else { + self.analytics_events_client + .track_server_request(connection_id.0, request.clone()); } } match send_error { @@ -365,6 +374,9 @@ impl OutgoingMessageSender { match entry { Some((id, entry)) => { + if let Ok(response) = entry.request.response_from_result(result.clone()) { + self.analytics_events_client.track_server_response(response); + } if let Err(err) = entry.callback.send(Ok(result)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } @@ -665,6 +677,8 @@ mod tests { use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::ApplyPatchApprovalParams; use codex_app_server_protocol::AuthMode; + use codex_app_server_protocol::CommandExecutionApprovalDecision; + use codex_app_server_protocol::CommandExecutionRequestApprovalParams; use codex_app_server_protocol::ConfigWarningNotification; use codex_app_server_protocol::DynamicToolCallParams; use codex_app_server_protocol::FileChangeRequestApprovalParams; @@ -675,6 +689,7 @@ mod tests { use codex_app_server_protocol::ModelVerificationNotification; use codex_app_server_protocol::RateLimitSnapshot; use codex_app_server_protocol::RateLimitWindow; + use codex_app_server_protocol::ServerResponse; use codex_app_server_protocol::ToolRequestUserInputParams; use codex_protocol::ThreadId; use pretty_assertions::assert_eq; @@ -900,10 +915,51 @@ mod tests { ); } + #[test] + fn server_request_response_from_result_decodes_typed_response() { + let request = ServerRequest::CommandExecutionRequestApproval { + request_id: RequestId::Integer(7), + params: CommandExecutionRequestApprovalParams { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item_id: "item-1".to_string(), + approval_id: None, + reason: None, + network_approval_context: None, + command: Some("echo hi".to_string()), + cwd: None, + command_actions: None, + additional_permissions: None, + proposed_execpolicy_amendment: None, + proposed_network_policy_amendments: None, + available_decisions: None, + }, + }; + + let response = request + .response_from_result(json!({ + "decision": "acceptForSession", + })) + .expect("decode typed server response"); + + let ServerResponse::CommandExecutionRequestApproval { + request_id, + response, + } = response + else { + panic!("expected command execution approval response"); + }; + assert_eq!(request_id, RequestId::Integer(7)); + assert_eq!( + response.decision, + CommandExecutionApprovalDecision::AcceptForSession + ); + } #[tokio::test] async fn send_response_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -938,7 +994,8 @@ mod tests { #[tokio::test] async fn send_response_clears_registered_request_context() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(42), request_id: RequestId::Integer(7), @@ -963,7 +1020,8 @@ mod tests { #[tokio::test] async fn send_error_routes_to_target_connection() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let request_id = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -1001,7 +1059,8 @@ mod tests { #[tokio::test] async fn send_server_notification_to_connection_and_wait_tracks_write_completion() { let (tx, mut rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let send_task = tokio::spawn(async move { outgoing .send_server_notification_to_connection_and_wait( @@ -1045,7 +1104,8 @@ mod tests { #[tokio::test] async fn connection_closed_clears_registered_request_contexts() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let closed_connection_request = ConnectionRequestId { connection_id: ConnectionId(9), request_id: RequestId::Integer(3), @@ -1079,7 +1139,8 @@ mod tests { #[tokio::test] async fn notify_client_error_forwards_error_to_waiter() { let (tx, _rx) = mpsc::channel::(4); - let outgoing = OutgoingMessageSender::new(tx); + let outgoing = + OutgoingMessageSender::new(tx, codex_analytics::AnalyticsEventsClient::disabled()); let (request_id, wait_for_result) = outgoing .send_request(ServerRequestPayload::ApplyPatchApproval( @@ -1113,7 +1174,10 @@ mod tests { #[tokio::test] async fn pending_requests_for_thread_returns_thread_requests_in_request_id_order() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), @@ -1171,7 +1235,10 @@ mod tests { #[tokio::test] async fn cancel_requests_for_thread_cancels_all_thread_requests() { let (tx, _rx) = mpsc::channel::(8); - let outgoing = Arc::new(OutgoingMessageSender::new(tx)); + let outgoing = Arc::new(OutgoingMessageSender::new( + tx, + codex_analytics::AnalyticsEventsClient::disabled(), + )); let thread_id = ThreadId::new(); let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing.clone(), diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index f78b8753a9a4..b1373c293d05 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -722,6 +722,7 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), ))); manager @@ -764,6 +765,7 @@ mod tests { let (outgoing_tx, mut outgoing_rx) = mpsc::channel(8); let manager = ThreadWatchManager::new_with_outgoing(Arc::new(OutgoingMessageSender::new( outgoing_tx, + codex_analytics::AnalyticsEventsClient::disabled(), ))); manager