diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index ec06898d9b8f..d9ff9d1551f6 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -49,6 +49,7 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; +use codex_app_server_protocol::SkillsChangedNotification; use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadRealtimeClosedNotification; @@ -193,6 +194,13 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } + EventMsg::SkillsUpdateAvailable => { + outgoing + .send_server_notification(ServerNotification::SkillsChanged( + SkillsChangedNotification {}, + )) + .await; + } EventMsg::McpStartupUpdate(update) => { let (status, error) = match update.status { codex_protocol::protocol::McpStartupStatus::Starting => { diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 2efdfef52e89..c7efae69c615 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -93,7 +93,6 @@ mod outgoing_message; mod request_processors; mod request_serialization; mod server_request_error; -mod skills_watcher; mod thread_state; mod thread_status; mod transport; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index b1d28f20cc7a..1ac635685f18 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -35,7 +35,6 @@ use crate::request_processors::WindowsSandboxRequestProcessor; use crate::request_serialization::QueuedInitializedRequest; use crate::request_serialization::RequestSerializationQueueKey; use crate::request_serialization::RequestSerializationQueues; -use crate::skills_watcher::SkillsWatcher; use crate::thread_state::ThreadStateManager; use crate::transport::AppServerTransport; use crate::transport::ConnectionOrigin; @@ -310,7 +309,6 @@ impl MessageProcessor { thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); - let skills_watcher = SkillsWatcher::new(thread_manager.skills_manager(), outgoing.clone()); let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new())); let thread_state_manager = ThreadStateManager::new(); @@ -403,7 +401,6 @@ impl MessageProcessor { Arc::clone(&thread_list_state_permit), thread_goal_processor.clone(), Some(state_db.clone()), - Arc::clone(&skills_watcher), ); let turn_processor = TurnRequestProcessor::new( auth_manager.clone(), @@ -417,7 +414,6 @@ impl MessageProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, - Arc::clone(&skills_watcher), ); if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) { // Keep plugin startup warmups aligned at app-server startup. diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index 84e16f726d98..52b15b0521b3 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -11,7 +11,6 @@ use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::RequestContext; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; -use crate::skills_watcher::SkillsWatcher; use crate::thread_status::ThreadWatchManager; use crate::thread_status::resolve_thread_status; use chrono::DateTime; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 45031490b059..ef44a2b178c6 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -12,7 +12,6 @@ pub(super) struct ListenerTaskContext { pub(super) thread_list_state_permit: Arc, pub(super) fallback_model_provider: String, pub(super) codex_home: PathBuf, - pub(super) skills_watcher: Arc, } struct UnloadingState { @@ -227,22 +226,12 @@ pub(super) async fn ensure_listener_task_running( "thread {conversation_id} is closing; retry after the thread is closed" ))); }; - let config = conversation.config().await; - let environments = conversation.environment_selections().await; - let watch_registration = listener_task_context - .skills_watcher - .register_thread_config( - config.as_ref(), - listener_task_context.thread_manager.as_ref(), - &environments, - ) - .await; let (mut listener_command_rx, listener_generation) = { let mut thread_state = thread_state.lock().await; if thread_state.listener_matches(&conversation) { return Ok(()); } - thread_state.set_listener(cancel_tx, &conversation, watch_registration) + thread_state.set_listener(cancel_tx, &conversation) }; let ListenerTaskContext { outgoing, @@ -253,7 +242,6 @@ pub(super) async fn ensure_listener_task_running( thread_list_state_permit, fallback_model_provider, codex_home, - .. } = listener_task_context; let outgoing_for_task = Arc::clone(&outgoing); tokio::spawn(async move { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index d498e17c5395..4042197a7667 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -316,7 +316,6 @@ pub(crate) struct ThreadRequestProcessor { pub(super) thread_goal_processor: ThreadGoalRequestProcessor, pub(super) state_db: Option, pub(super) background_tasks: TaskTracker, - pub(super) skills_watcher: Arc, } impl ThreadRequestProcessor { @@ -335,7 +334,6 @@ impl ThreadRequestProcessor { thread_list_state_permit: Arc, thread_goal_processor: ThreadGoalRequestProcessor, state_db: Option, - skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -352,7 +350,6 @@ impl ThreadRequestProcessor { thread_goal_processor, state_db, background_tasks: TaskTracker::new(), - skills_watcher, } } @@ -745,7 +742,6 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), - skills_watcher: Arc::clone(&self.skills_watcher), } } @@ -843,7 +839,6 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), - skills_watcher: Arc::clone(&self.skills_watcher), }; let request_trace = request_context.request_trace(); let config_manager = self.config_manager.clone(); @@ -1044,6 +1039,7 @@ impl ThreadRequestProcessor { .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); + let NewThread { thread_id, thread, diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index d1dae4ef46ea..bdc5847b0d0d 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -13,7 +13,6 @@ pub(crate) struct TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, - skills_watcher: Arc, } impl TurnRequestProcessor { @@ -30,7 +29,6 @@ impl TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, - skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -44,7 +42,6 @@ impl TurnRequestProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, - skills_watcher, } } @@ -1090,7 +1087,6 @@ impl TurnRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), - skills_watcher: Arc::clone(&self.skills_watcher), } } diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs deleted file mode 100644 index 33acf653355f..000000000000 --- a/codex-rs/app-server/src/skills_watcher.rs +++ /dev/null @@ -1,112 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use crate::outgoing_message::OutgoingMessageSender; -use codex_app_server_protocol::ServerNotification; -use codex_app_server_protocol::SkillsChangedNotification; -use codex_core::ThreadManager; -use codex_core::config::Config; -use codex_core::file_watcher::FileWatcher; -use codex_core::file_watcher::FileWatcherSubscriber; -use codex_core::file_watcher::Receiver; -use codex_core::file_watcher::ThrottledWatchReceiver; -use codex_core::file_watcher::WatchPath; -use codex_core::file_watcher::WatchRegistration; -use codex_core::skills::SkillsLoadInput; -use codex_core::skills::SkillsManager; -use codex_protocol::protocol::TurnEnvironmentSelection; -use tracing::warn; - -#[cfg(not(test))] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); -#[cfg(test)] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); - -pub(crate) struct SkillsWatcher { - subscriber: FileWatcherSubscriber, -} - -impl SkillsWatcher { - pub(crate) fn new( - skills_manager: Arc, - outgoing: Arc, - ) -> Arc { - let file_watcher = match FileWatcher::new() { - Ok(file_watcher) => Arc::new(file_watcher), - Err(err) => { - warn!("failed to initialize skills file watcher: {err}"); - Arc::new(FileWatcher::noop()) - } - }; - let (subscriber, rx) = file_watcher.add_subscriber(); - Self::spawn_event_loop(rx, skills_manager, outgoing); - Arc::new(Self { subscriber }) - } - - pub(crate) async fn register_thread_config( - &self, - config: &Config, - thread_manager: &ThreadManager, - environments: &[TurnEnvironmentSelection], - ) -> WatchRegistration { - let Some(environment_selection) = environments.first() else { - return WatchRegistration::default(); - }; - let Some(environment) = thread_manager - .environment_manager() - .get_environment(&environment_selection.environment_id) - else { - warn!( - "failed to register skills watcher for unknown environment `{}`", - environment_selection.environment_id - ); - return WatchRegistration::default(); - }; - if environment.is_remote() { - return WatchRegistration::default(); - } - - let plugins_input = config.plugins_config_input(); - let plugins_manager = thread_manager.plugins_manager(); - let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; - let skills_input = SkillsLoadInput::new( - config.cwd.clone(), - plugin_outcome.effective_plugin_skill_roots(), - config.config_layer_stack.clone(), - config.bundled_skills_enabled(), - ); - let roots = thread_manager - .skills_manager() - .skill_roots_for_config(&skills_input, Some(environment.get_filesystem())) - .await - .into_iter() - .map(|root| WatchPath { - path: root.path.into_path_buf(), - recursive: true, - }) - .collect(); - self.subscriber.register_paths(roots) - } - - fn spawn_event_loop( - rx: Receiver, - skills_manager: Arc, - outgoing: Arc, - ) { - let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); - let Ok(handle) = tokio::runtime::Handle::try_current() else { - warn!("skills watcher listener skipped: no Tokio runtime available"); - return; - }; - handle.spawn(async move { - while rx.recv().await.is_some() { - skills_manager.clear_cache(); - outgoing - .send_server_notification(ServerNotification::SkillsChanged( - SkillsChangedNotification {}, - )) - .await; - } - }); - } -} diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 9b26f6ad9003..dddbcf483b09 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -7,7 +7,6 @@ use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; use codex_core::CodexThread; use codex_core::ThreadConfigSnapshot; -use codex_core::file_watcher::WatchRegistration; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -78,7 +77,6 @@ pub(crate) struct ThreadState { listener_command_tx: Option>, current_turn_history: ThreadHistoryBuilder, listener_thread: Option>, - watch_registration: WatchRegistration, } impl ThreadState { @@ -93,7 +91,6 @@ impl ThreadState { &mut self, cancel_tx: oneshot::Sender<()>, conversation: &Arc, - watch_registration: WatchRegistration, ) -> (mpsc::UnboundedReceiver, u64) { if let Some(previous) = self.cancel_tx.replace(cancel_tx) { let _ = previous.send(()); @@ -102,7 +99,6 @@ impl ThreadState { let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel(); self.listener_command_tx = Some(listener_command_tx); self.listener_thread = Some(Arc::downgrade(conversation)); - self.watch_registration = watch_registration; (listener_command_rx, self.listener_generation) } @@ -113,7 +109,6 @@ impl ThreadState { self.listener_command_tx = None; self.current_turn_history.reset(); self.listener_thread = None; - self.watch_registration = WatchRegistration::default(); } pub(crate) fn set_experimental_raw_events(&mut self, enabled: bool) { diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index 88960ec8506f..df8bab9364cc 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -661,27 +661,6 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; - let initial_skills_request_id = mcp - .send_skills_list_request(SkillsListParams { - cwds: vec![codex_home.path().to_path_buf()], - force_reload: true, - per_cwd_extra_user_roots: None, - }) - .await?; - let initial_skills_response: JSONRPCResponse = timeout( - DEFAULT_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(initial_skills_request_id)), - ) - .await??; - let SkillsListResponse { data } = to_response(initial_skills_response)?; - assert_eq!(data.len(), 1); - assert!( - data[0] - .skills - .iter() - .any(|skill| { skill.name == "demo" && skill.description == "demo description" }) - ); - let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams { model: None, @@ -734,25 +713,5 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( let notification: SkillsChangedNotification = serde_json::from_value(params)?; assert_eq!(notification, SkillsChangedNotification {}); - let updated_skills_request_id = mcp - .send_skills_list_request(SkillsListParams { - cwds: vec![codex_home.path().to_path_buf()], - force_reload: false, - per_cwd_extra_user_roots: None, - }) - .await?; - let updated_skills_response: JSONRPCResponse = timeout( - DEFAULT_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(updated_skills_request_id)), - ) - .await??; - let SkillsListResponse { data } = to_response(updated_skills_response)?; - assert_eq!(data.len(), 1); - assert!( - data[0] - .skills - .iter() - .any(|skill| skill.name == "demo" && skill.description == "updated") - ); Ok(()) } diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index eb51b9dcbf4b..3ead350dfad5 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -83,6 +83,7 @@ pub(crate) async fn run_codex_thread_interactive( skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), + skills_watcher: Arc::clone(&parent_session.services.skills_watcher), conversation_history: initial_history.unwrap_or(InitialHistory::New), session_source: SessionSource::SubAgent(subagent_source.clone()), thread_source: Some(ThreadSource::Subagent), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index b7b992e47c69..24a976637b59 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -1,5 +1,6 @@ use crate::agent::AgentStatus; use crate::config::ConstraintResult; +use crate::file_watcher::WatchRegistration; use crate::goals::ExternalGoalSet; use crate::goals::GoalRuntimeEvent; use crate::session::Codex; @@ -30,7 +31,6 @@ use codex_protocol::protocol::Submission; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TokenUsageInfo; -use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use codex_thread_store::StoredThread; @@ -101,6 +101,7 @@ pub struct CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, out_of_band_elicitation_count: Mutex, + _watch_registration: WatchRegistration, } /// Conduit for the bidirectional stream of messages that compose a thread @@ -111,6 +112,7 @@ impl CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, session_source: SessionSource, + watch_registration: WatchRegistration, ) -> Self { Self { codex, @@ -118,6 +120,7 @@ impl CodexThread { session_configured, rollout_path, out_of_band_elicitation_count: Mutex::new(0), + _watch_registration: watch_registration, } } @@ -461,10 +464,6 @@ impl CodexThread { self.codex.session.get_config().await } - pub async fn environment_selections(&self) -> Vec { - self.codex.thread_environment_selections().await - } - pub async fn read_mcp_resource( &self, server: &str, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 398d44df5c2c..1a754d1ece08 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -99,6 +99,7 @@ pub(crate) use skills::manager; pub(crate) use skills::maybe_emit_implicit_skill_invocation; pub(crate) use skills::resolve_skill_dependencies_for_turn; pub(crate) use skills::skills_load_input_from_config; +mod skills_watcher; mod stream_events_utils; pub mod test_support; mod unified_exec; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 952b9a57113f..8782f14b3dae 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -113,7 +113,6 @@ use codex_protocol::protocol::ThreadSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnContextNetworkItem; -use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; @@ -282,6 +281,8 @@ use crate::rollout::map_session_init_error; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; +use crate::skills_watcher::SkillsWatcher; +use crate::skills_watcher::SkillsWatcherEvent; use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; use crate::state::PendingRequestPermissions; @@ -389,6 +390,7 @@ pub(crate) struct CodexSpawnArgs { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, + pub(crate) skills_watcher: Arc, pub(crate) conversation_history: InitialHistory, pub(crate) session_source: SessionSource, pub(crate) thread_source: Option, @@ -452,6 +454,7 @@ impl Codex { skills_manager, plugins_manager, mcp_manager, + skills_watcher, conversation_history, session_source, thread_source, @@ -639,6 +642,7 @@ impl Codex { skills_manager, plugins_manager, mcp_manager.clone(), + skills_watcher, agent_control, environment_manager, analytics_events_client, @@ -773,11 +777,6 @@ impl Codex { state.session_configuration.thread_config_snapshot() } - pub(crate) async fn thread_environment_selections(&self) -> Vec { - let state = self.session.state.lock().await; - state.session_configuration.environments.clone() - } - pub(crate) fn state_db(&self) -> Option { self.session.state_db() } @@ -1002,6 +1001,29 @@ impl Session { self.out_of_band_elicitation_paused.send_replace(paused); } + fn start_skills_watcher_listener(self: &Arc) { + let mut rx = self.services.skills_watcher.subscribe(); + let weak_sess = Arc::downgrade(self); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { + let Some(sess) = weak_sess.upgrade() else { + break; + }; + let event = Event { + id: sess.next_internal_sub_id(), + msg: EventMsg::SkillsUpdateAvailable, + }; + sess.send_event_raw(event).await; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } + pub(crate) fn get_tx_event(&self) -> Sender { self.tx_event.clone() } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 1110869ff5fe..2ce01e81b3ce 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -364,6 +364,7 @@ impl Session { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, + skills_watcher: Arc, agent_control: AgentControl, environment_manager: Arc, analytics_events_client: Option, @@ -830,6 +831,7 @@ impl Session { skills_manager, plugins_manager: Arc::clone(&plugins_manager), mcp_manager: Arc::clone(&mcp_manager), + skills_watcher, agent_control, network_proxy, network_approval: Arc::clone(&network_approval), @@ -916,6 +918,8 @@ impl Session { sess.send_event_raw(event).await; } + // Start the watcher after SessionConfigured so it cannot emit earlier events. + sess.start_skills_watcher_listener(); let mut required_mcp_servers: Vec = mcp_servers .iter() .filter(|(_, server)| server.enabled && server.required) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 72ce91c7a91f..cae3e1f97810 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3594,6 +3594,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { skills_manager, plugins_manager, mcp_manager, + Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -3710,6 +3711,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { .expect("create environment"), ); + let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -3745,6 +3747,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { skills_manager, plugins_manager, mcp_manager, + skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), @@ -3932,6 +3935,7 @@ async fn make_session_with_config_and_rx( skills_manager, plugins_manager, mcp_manager, + Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -4039,6 +4043,7 @@ async fn make_session_with_history_source_and_agent_control_and_rx( skills_manager, plugins_manager, mcp_manager, + Arc::new(SkillsWatcher::noop()), agent_control, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -5397,6 +5402,7 @@ where ) .await .expect("state db should initialize"); + let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -5432,6 +5438,7 @@ where skills_manager, plugins_manager, mcp_manager, + skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index af7397dea058..1026468627bc 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -728,6 +728,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { /*bundled_skills_enabled*/ true, )); let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager))); + let skills_watcher = Arc::new(SkillsWatcher::noop()); let thread_store = Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(&config), codex_state::StateRuntime::init( @@ -747,6 +748,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { skills_manager, plugins_manager, mcp_manager, + skills_watcher, conversation_history: InitialHistory::New, session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 4579a4147af3..effbf5372f52 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1511,6 +1511,7 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::StreamError(_) | EventMsg::TurnDiff(_) | EventMsg::RealtimeConversationListVoicesResponse(_) + | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete diff --git a/codex-rs/core/src/skills_watcher.rs b/codex-rs/core/src/skills_watcher.rs new file mode 100644 index 000000000000..fb271ca87651 --- /dev/null +++ b/codex-rs/core/src/skills_watcher.rs @@ -0,0 +1,125 @@ +//! Skills-specific watcher built on top of the generic [`FileWatcher`]. + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use tokio::runtime::Handle; +use tokio::sync::broadcast; +use tracing::warn; + +use crate::SkillsManager; +use crate::config::Config; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherSubscriber; +use crate::file_watcher::Receiver; +use crate::file_watcher::ThrottledWatchReceiver; +use crate::file_watcher::WatchPath; +use crate::file_watcher::WatchRegistration; +use crate::skills_load_input_from_config; +use codex_core_plugins::PluginsManager; + +#[cfg(not(test))] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(test)] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum SkillsWatcherEvent { + SkillsChanged { paths: Vec }, +} + +pub(crate) struct SkillsWatcher { + subscriber: FileWatcherSubscriber, + tx: broadcast::Sender, +} + +impl SkillsWatcher { + pub(crate) fn new(file_watcher: &Arc) -> Self { + let (subscriber, rx) = file_watcher.add_subscriber(); + let (tx, _) = broadcast::channel(128); + let skills_watcher = Self { + subscriber, + tx: tx.clone(), + }; + Self::spawn_event_loop(rx, tx); + skills_watcher + } + + pub(crate) fn noop() -> Self { + Self::new(&Arc::new(FileWatcher::noop())) + } + + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub(crate) async fn register_config( + &self, + config: &Config, + skills_manager: &SkillsManager, + plugins_manager: &PluginsManager, + fs: Option>, + ) -> WatchRegistration { + let plugins_input = config.plugins_config_input(); + let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; + let effective_skill_roots = plugin_outcome.effective_plugin_skill_roots(); + let skills_input = skills_load_input_from_config(config, effective_skill_roots); + let roots = skills_manager + .skill_roots_for_config(&skills_input, fs) + .await + .into_iter() + .map(|root| WatchPath { + path: root.path.into_path_buf(), + recursive: true, + }) + .collect(); + self.subscriber.register_paths(roots) + } + + fn spawn_event_loop(rx: Receiver, tx: broadcast::Sender) { + let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + while let Some(event) = rx.recv().await { + let _ = tx.send(SkillsWatcherEvent::SkillsChanged { paths: event.paths }); + } + }); + } else { + warn!("skills watcher listener skipped: no Tokio runtime available"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + use tokio::time::Duration; + use tokio::time::timeout; + + #[tokio::test] + async fn forwards_file_watcher_events() { + let file_watcher = Arc::new(FileWatcher::noop()); + let skills_watcher = SkillsWatcher::new(&file_watcher); + let mut rx = skills_watcher.subscribe(); + let _registration = skills_watcher + .subscriber + .register_path(PathBuf::from("/tmp/skill"), /*recursive*/ true); + + file_watcher + .send_paths_for_test(vec![PathBuf::from("/tmp/skill/SKILL.md")]) + .await; + + let event = timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("skills watcher event") + .expect("broadcast recv"); + assert_eq!( + event, + SkillsWatcherEvent::SkillsChanged { + paths: vec![PathBuf::from("/tmp/skill/SKILL.md")], + } + ); + } +} diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 64a1810740c8..9cd9e97fbba7 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -9,6 +9,7 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GuardianRejection; use crate::guardian::GuardianRejectionCircuitBreaker; use crate::mcp::McpManager; +use crate::skills_watcher::SkillsWatcher; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::sandboxing::ApprovalStore; @@ -58,6 +59,7 @@ pub(crate) struct SessionServices { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, + pub(crate) skills_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) network_proxy: Option, pub(crate) network_approval: Arc, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index b9ee319feb0c..331ed3ca15c7 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -5,6 +5,7 @@ use crate::config::Config; use crate::config::ThreadStoreConfig; use crate::environment_selection::default_thread_environment_selections; use crate::environment_selection::resolve_environment_selections; +use crate::file_watcher::FileWatcher; use crate::mcp::McpManager; use crate::resolve_installation_id; use crate::rollout::RolloutRecorder; @@ -14,6 +15,8 @@ use crate::session::CodexSpawnArgs; use crate::session::CodexSpawnOk; use crate::session::INITIAL_SUBMIT_ID; use crate::shell_snapshot::ShellSnapshot; +use crate::skills_watcher::SkillsWatcher; +use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; use codex_agent_graph_store::AgentGraphStore; @@ -70,6 +73,8 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use tokio::runtime::Handle; +use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; @@ -103,6 +108,47 @@ impl Drop for TempCodexHomeGuard { } } +fn build_skills_watcher(skills_manager: Arc) -> Arc { + if should_use_test_thread_manager_behavior() + && let Ok(handle) = Handle::try_current() + && handle.runtime_flavor() == RuntimeFlavor::CurrentThread + { + // The real watcher spins background tasks that can starve the + // current-thread test runtime and cause event waits to time out. + warn!("using noop skills watcher under current-thread test runtime"); + return Arc::new(SkillsWatcher::noop()); + } + + let file_watcher = match FileWatcher::new() { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher)); + + let mut rx = skills_watcher.subscribe(); + let skills_manager = Arc::clone(&skills_manager); + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + loop { + match rx.recv().await { + Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { + skills_manager.clear_cache(); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } else { + warn!("skills watcher listener skipped: no Tokio runtime available"); + } + + skills_watcher +} + /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -203,6 +249,7 @@ pub(crate) struct ThreadManagerState { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, + skills_watcher: Arc, thread_store: Arc, state_db: StateDbHandle, agent_graph_store: Arc, @@ -286,6 +333,7 @@ impl ThreadManager { config.bundled_skills_enabled(), restriction_product, )); + let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), @@ -295,6 +343,7 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, + skills_watcher, thread_store, state_db, agent_graph_store, @@ -403,6 +452,7 @@ impl ThreadManager { /*bundled_skills_enabled*/ true, restriction_product, )); + let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); // This test constructor has no Config input. Tests that need a non-local // process store should construct ThreadManager::new with an explicit store. let thread_store: Arc = Arc::new(LocalThreadStore::new( @@ -423,6 +473,7 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, + skills_watcher, thread_store, state_db, agent_graph_store, @@ -1148,6 +1199,19 @@ impl ThreadManagerState { } let environment_selections = resolve_environment_selections(self.environment_manager.as_ref(), &environments)?; + let watch_registration = match environment_selections.primary() { + Some(turn_environment) if !turn_environment.environment.is_remote() => { + self.skills_watcher + .register_config( + &config, + self.skills_manager.as_ref(), + self.plugins_manager.as_ref(), + Some(turn_environment.environment.get_filesystem()), + ) + .await + } + Some(_) | None => crate::file_watcher::WatchRegistration::default(), + }; let parent_rollout_thread_trace = self .parent_rollout_thread_trace_for_source(&session_source, &initial_history) .await; @@ -1163,6 +1227,7 @@ impl ThreadManagerState { skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), + skills_watcher: Arc::clone(&self.skills_watcher), conversation_history: initial_history, session_source, thread_source, @@ -1182,7 +1247,7 @@ impl ThreadManagerState { }) .await?; let new_thread = self - .finalize_thread_spawn(codex, thread_id, tracked_session_source) + .finalize_thread_spawn(codex, thread_id, tracked_session_source, watch_registration) .await?; if is_resumed_thread && let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await @@ -1197,6 +1262,7 @@ impl ThreadManagerState { codex: Codex, thread_id: ThreadId, session_source: SessionSource, + watch_registration: crate::file_watcher::WatchRegistration, ) -> CodexResult { let event = codex.next_event().await?; let session_configured = match event { @@ -1217,6 +1283,7 @@ impl ThreadManagerState { session_configured.clone(), session_configured.rollout_path.clone(), session_source, + watch_registration, )); e.insert(thread.clone()); return Ok(NewThread { diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs new file mode 100644 index 000000000000..c422073e4f29 --- /dev/null +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -0,0 +1,157 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Result; +use codex_config::config_toml::ProjectConfig; +use codex_protocol::config_types::TrustLevel; +use codex_protocol::models::PermissionProfile; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::Op; +use codex_protocol::user_input::UserInput; +use core_test_support::responses; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::test_codex::turn_permission_fields; +use core_test_support::wait_for_event; +use tokio::time::timeout; + +fn enable_trusted_project(config: &mut codex_core::config::Config) { + config.active_project = ProjectConfig { + trust_level: Some(TrustLevel::Trusted), + }; +} + +fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { + let skill_dir = home.join("skills").join(name); + fs::create_dir_all(&skill_dir).expect("create skill dir"); + let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); + let path = skill_dir.join("SKILL.md"); + fs::write(&path, contents).expect("write skill"); + path +} + +fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { + request + .message_input_texts("user") + .iter() + .any(|text| text.contains(skill_body) && text.contains("")) +} + +async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { + let session_model = test.session_configured.model.clone(); + let (sandbox_policy, permission_profile) = + turn_permission_fields(PermissionProfile::Disabled, test.cwd_path()); + test.codex + .submit(Op::UserTurn { + environments: None, + items: vec![ + UserInput::Text { + text: prompt.to_string(), + text_elements: Vec::new(), + }, + UserInput::Skill { + name: "demo".to_string(), + path: skill_path, + }, + ], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy, + permission_profile, + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(test.codex.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + responses::sse(vec![responses::ev_completed("resp-1")]), + responses::sse(vec![responses::ev_completed("resp-2")]), + ], + ) + .await; + + let skill_v1 = "skill body v1"; + let skill_v2 = "skill body v2"; + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + write_skill(home, "demo", "demo skill", skill_v1); + }) + .with_config(|config| { + enable_trusted_project(config); + }); + let test = builder.build(&server).await?; + + let skill_path = dunce::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; + + submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; + let first_request = responses + .requests() + .first() + .cloned() + .expect("first request captured"); + assert!( + contains_skill_body(&first_request, skill_v1), + "expected initial skill body in request" + ); + + write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); + + let saw_skills_update = timeout(Duration::from_secs(5), async { + loop { + match test.codex.next_event().await { + Ok(event) => { + if matches!(event.msg, EventMsg::SkillsUpdateAvailable) { + break; + } + } + Err(err) => panic!("event stream ended unexpectedly: {err}"), + } + } + }) + .await; + + if saw_skills_update.is_err() { + // Some environments do not reliably surface file watcher events for + // skill changes. Clear the cache explicitly so we can still validate + // that the updated skill body is injected on the next turn. + test.thread_manager.skills_manager().clear_cache(); + } + + submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; + let last_request = responses + .last_request() + .expect("request captured after skill update"); + + assert!( + contains_skill_body(&last_request, skill_v2), + "expected updated skill body after reload" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index a914af3021f7..ad3280ebf080 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -57,6 +57,7 @@ mod image_rollout; mod items; mod json_result; mod live_cli; +mod live_reload; mod model_overrides; mod model_switching; mod model_visible_layout; diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 00611b3a8c98..b462022dbcb5 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -362,6 +362,7 @@ async fn run_codex_tool_session_inner( | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) + | EventMsg::SkillsUpdateAvailable | EventMsg::ExitedReviewMode(_) | EventMsg::RequestUserInput(_) | EventMsg::RequestPermissions(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 91fd02d8583e..30e33abe434a 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1401,6 +1401,9 @@ pub enum EventMsg { /// List of voices supported by realtime conversation streams. RealtimeConversationListVoicesResponse(RealtimeConversationListVoicesResponseEvent), + /// Notification that skill data may have been updated and clients may want to reload. + SkillsUpdateAvailable, + PlanUpdate(UpdatePlanArgs), TurnAborted(TurnAbortedEvent), diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index 1e49c82be246..3d52798b8d20 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -260,6 +260,7 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Option<&'static s | EventMsg::PatchApplyEnd(_) | EventMsg::TurnDiff(_) | EventMsg::RealtimeConversationListVoicesResponse(_) + | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 21b98b4e8d07..558c3fef9887 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -169,6 +169,7 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::ImageGenerationBegin(_) + | EventMsg::SkillsUpdateAvailable | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentInteractionBegin(_) | EventMsg::CollabWaitingBegin(_)