Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions codex-rs/analytics/src/analytics_client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn ingest_rejected_turn_steer(
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
Expand Down Expand Up @@ -486,7 +486,7 @@ async fn ingest_turn_prerequisites(
ingest_initialize(reducer, out).await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-2", /*ephemeral*/ false, "gpt-5",
Expand All @@ -500,7 +500,7 @@ async fn ingest_turn_prerequisites(

reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
Expand All @@ -510,7 +510,7 @@ async fn ingest_turn_prerequisites(
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
Expand Down Expand Up @@ -862,7 +862,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize

reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-no-client",
Expand Down Expand Up @@ -906,7 +906,7 @@ async fn initialize_caches_client_and_thread_lifecycle_publishes_once_initialize

reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response(
"thread-1", /*ephemeral*/ true, "gpt-5",
Expand Down Expand Up @@ -986,7 +986,7 @@ async fn compaction_event_ingests_custom_fact() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_resume_response_with_source(
"thread-1",
Expand Down Expand Up @@ -1097,7 +1097,7 @@ async fn guardian_review_event_ingests_custom_fact_with_optional_target_item() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_thread_start_response(
"thread-guardian",
Expand Down Expand Up @@ -1867,7 +1867,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
Expand All @@ -1879,7 +1879,7 @@ async fn accepted_turn_steer_emits_expected_event() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
Expand Down Expand Up @@ -2021,7 +2021,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
ingest_initialize(&mut reducer, &mut out).await;
reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(3),
request: Box::new(sample_turn_start_request("thread-2", /*request_id*/ 3)),
Expand All @@ -2045,7 +2045,7 @@ async fn turn_start_error_response_discards_pending_start_request() {
// failed turn/start request and attach request-scoped connection metadata.
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_start_response("turn-2", /*request_id*/ 3)),
},
Expand Down Expand Up @@ -2162,7 +2162,7 @@ async fn accepted_steers_increment_turn_steer_count() {

reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(4),
request: Box::new(sample_turn_steer_request(
Expand All @@ -2174,7 +2174,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 4)),
},
Expand All @@ -2184,7 +2184,7 @@ async fn accepted_steers_increment_turn_steer_count() {

reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(5),
request: Box::new(sample_turn_steer_request(
Expand All @@ -2208,7 +2208,7 @@ async fn accepted_steers_increment_turn_steer_count() {

reducer
.ingest(
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id: 7,
request_id: RequestId::Integer(6),
request: Box::new(sample_turn_steer_request(
Expand All @@ -2220,7 +2220,7 @@ async fn accepted_steers_increment_turn_steer_count() {
.await;
reducer
.ingest(
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id: 7,
response: Box::new(sample_turn_steer_response("turn-2", /*request_id*/ 6)),
},
Expand Down
45 changes: 34 additions & 11 deletions codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_login::AuthManager;
use codex_login::default_client::create_client;
use codex_plugin::PluginTelemetryMetadata;
Expand All @@ -49,8 +51,7 @@ pub(crate) struct AnalyticsEventsQueue {

#[derive(Clone)]
pub struct AnalyticsEventsClient {
queue: AnalyticsEventsQueue,
analytics_enabled: Option<bool>,
queue: Option<AnalyticsEventsQueue>,
}

impl AnalyticsEventsQueue {
Expand Down Expand Up @@ -119,11 +120,15 @@ impl AnalyticsEventsClient {
analytics_enabled: Option<bool>,
) -> Self {
Self {
queue: AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url),
analytics_enabled,
queue: (analytics_enabled != Some(false))
.then(|| AnalyticsEventsQueue::new(Arc::clone(&auth_manager), base_url)),
}
}

pub fn disabled() -> Self {
Self { queue: None }
}

pub fn track_skill_invocations(
&self,
tracking: TrackEventsContext,
Expand Down Expand Up @@ -182,15 +187,18 @@ impl AnalyticsEventsClient {
}

pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
self.record_fact(AnalyticsFact::Request {
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request),
});
}

pub fn track_app_used(&self, tracking: TrackEventsContext, app: AppInvocation) {
if !self.queue.should_enqueue_app_used(&tracking, &app) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_app_used(&tracking, &app) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::AppUsed(
Expand All @@ -205,7 +213,10 @@ impl AnalyticsEventsClient {
}

pub fn track_plugin_used(&self, tracking: TrackEventsContext, plugin: PluginTelemetryMetadata) {
if !self.queue.should_enqueue_plugin_used(&tracking, &plugin) {
let Some(queue) = self.queue.as_ref() else {
return;
};
if !queue.should_enqueue_plugin_used(&tracking, &plugin) {
return;
}
self.record_fact(AnalyticsFact::Custom(CustomAnalyticsFact::PluginUsed(
Expand Down Expand Up @@ -268,14 +279,13 @@ impl AnalyticsEventsClient {
}

pub(crate) fn record_fact(&self, input: AnalyticsFact) {
if self.analytics_enabled == Some(false) {
return;
if let Some(queue) = self.queue.as_ref() {
queue.try_send(input);
}
self.queue.try_send(input);
}

pub fn track_response(&self, connection_id: u64, response: ClientResponse) {
self.record_fact(AnalyticsFact::Response {
self.record_fact(AnalyticsFact::ClientResponse {
connection_id,
response: Box::new(response),
});
Expand All @@ -299,6 +309,19 @@ impl AnalyticsEventsClient {
pub fn track_notification(&self, notification: ServerNotification) {
self.record_fact(AnalyticsFact::Notification(Box::new(notification)));
}

pub fn track_server_request(&self, connection_id: u64, request: ServerRequest) {
self.record_fact(AnalyticsFact::ServerRequest {
connection_id,
request: Box::new(request),
});
}

pub fn track_server_response(&self, response: ServerResponse) {
self.record_fact(AnalyticsFact::ServerResponse {
response: Box::new(response),
});
}
}

async fn send_track_events(
Expand Down
13 changes: 11 additions & 2 deletions codex-rs/analytics/src/facts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use codex_app_server_protocol::InitializeParams;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequest;
use codex_app_server_protocol::ServerResponse;
use codex_plugin::PluginTelemetryMetadata;
use codex_protocol::config_types::ApprovalsReviewer;
use codex_protocol::config_types::ModeKind;
Expand Down Expand Up @@ -272,12 +274,12 @@ pub(crate) enum AnalyticsFact {
runtime: CodexRuntimeMetadata,
rpc_transport: AppServerRpcTransport,
},
Request {
ClientRequest {
connection_id: u64,
request_id: RequestId,
request: Box<ClientRequest>,
},
Response {
ClientResponse {
connection_id: u64,
response: Box<ClientResponse>,
},
Expand All @@ -287,6 +289,13 @@ pub(crate) enum AnalyticsFact {
error: JSONRPCErrorError,
error_type: Option<AnalyticsJsonRpcError>,
},
ServerRequest {
connection_id: u64,
request: Box<ServerRequest>,
},
ServerResponse {
response: Box<ServerResponse>,
},
Notification(Box<ServerNotification>),
// Facts that do not naturally exist on the app-server protocol surface, or
// would require non-trivial protocol reshaping on this branch.
Expand Down
11 changes: 9 additions & 2 deletions codex-rs/analytics/src/reducer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,14 @@ impl AnalyticsReducer {
rpc_transport,
);
}
AnalyticsFact::Request {
AnalyticsFact::ClientRequest {
connection_id,
request_id,
request,
} => {
self.ingest_request(connection_id, request_id, *request);
}
AnalyticsFact::Response {
AnalyticsFact::ClientResponse {
connection_id,
response,
} => {
Expand All @@ -196,6 +196,13 @@ impl AnalyticsReducer {
AnalyticsFact::Notification(notification) => {
self.ingest_notification(*notification, out);
}
AnalyticsFact::ServerRequest {
connection_id: _connection_id,
request: _request,
} => {}
AnalyticsFact::ServerResponse {
response: _response,
} => {}
AnalyticsFact::Custom(input) => match input {
CustomAnalyticsFact::SubAgentThreadStarted(input) => {
self.ingest_subagent_thread_started(input, out);
Expand Down
17 changes: 17 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,23 @@ macro_rules! server_request_definitions {
$(Self::$variant { request_id, .. } => request_id,)*
}
}

pub fn response_from_result(
&self,
result: crate::Result,
) -> serde_json::Result<ServerResponse> {
match self {
$(
Self::$variant { request_id, .. } => {
let response = serde_json::from_value::<$response>(result)?;
Ok(ServerResponse::$variant {
request_id: request_id.clone(),
response,
})
}
)*
}
}
}

/// Typed response from the client to the server.
Expand Down
16 changes: 16 additions & 0 deletions codex-rs/app-server/src/analytics_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use std::sync::Arc;

use codex_analytics::AnalyticsEventsClient;
use codex_core::config::Config;
use codex_login::AuthManager;

pub(crate) fn analytics_events_client_from_config(
auth_manager: Arc<AuthManager>,
config: &Config,
) -> AnalyticsEventsClient {
AnalyticsEventsClient::new(
auth_manager,
config.chatgpt_base_url.trim_end_matches('/').to_string(),
config.analytics_enabled,
)
}
Loading
Loading