Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions codex-rs/app-server/src/bespoke_event_handling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_app_server_protocol::ServerRequestPayload;
use codex_app_server_protocol::SkillsChangedNotification;
use codex_app_server_protocol::ThreadGoalUpdatedNotification;
use codex_app_server_protocol::ThreadItem;
use codex_app_server_protocol::ThreadRealtimeClosedNotification;
Expand Down Expand Up @@ -193,6 +194,13 @@ pub(crate) async fn apply_bespoke_event_handling(
)
.await;
}
EventMsg::SkillsUpdateAvailable => {
outgoing
.send_server_notification(ServerNotification::SkillsChanged(
SkillsChangedNotification {},
))
.await;
}
EventMsg::McpStartupUpdate(update) => {
let (status, error) = match update.status {
codex_protocol::protocol::McpStartupStatus::Starting => {
Expand Down
1 change: 0 additions & 1 deletion codex-rs/app-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ mod outgoing_message;
mod request_processors;
mod request_serialization;
mod server_request_error;
mod skills_watcher;
mod thread_state;
mod thread_status;
mod transport;
Expand Down
4 changes: 0 additions & 4 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use crate::request_processors::WindowsSandboxRequestProcessor;
use crate::request_serialization::QueuedInitializedRequest;
use crate::request_serialization::RequestSerializationQueueKey;
use crate::request_serialization::RequestSerializationQueues;
use crate::skills_watcher::SkillsWatcher;
use crate::thread_state::ThreadStateManager;
use crate::transport::AppServerTransport;
use crate::transport::ConnectionOrigin;
Expand Down Expand Up @@ -310,7 +309,6 @@ impl MessageProcessor {
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
let skills_watcher = SkillsWatcher::new(thread_manager.skills_manager(), outgoing.clone());

let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new()));
let thread_state_manager = ThreadStateManager::new();
Expand Down Expand Up @@ -403,7 +401,6 @@ impl MessageProcessor {
Arc::clone(&thread_list_state_permit),
thread_goal_processor.clone(),
Some(state_db.clone()),
Arc::clone(&skills_watcher),
);
let turn_processor = TurnRequestProcessor::new(
auth_manager.clone(),
Expand All @@ -417,7 +414,6 @@ impl MessageProcessor {
thread_state_manager,
thread_watch_manager,
thread_list_state_permit,
Arc::clone(&skills_watcher),
);
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.
Expand Down
1 change: 0 additions & 1 deletion codex-rs/app-server/src/request_processors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::outgoing_message::ConnectionRequestId;
use crate::outgoing_message::OutgoingMessageSender;
use crate::outgoing_message::RequestContext;
use crate::outgoing_message::ThreadScopedOutgoingMessageSender;
use crate::skills_watcher::SkillsWatcher;
use crate::thread_status::ThreadWatchManager;
use crate::thread_status::resolve_thread_status;
use chrono::DateTime;
Expand Down
14 changes: 1 addition & 13 deletions codex-rs/app-server/src/request_processors/thread_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub(super) struct ListenerTaskContext {
pub(super) thread_list_state_permit: Arc<Semaphore>,
pub(super) fallback_model_provider: String,
pub(super) codex_home: PathBuf,
pub(super) skills_watcher: Arc<SkillsWatcher>,
}

struct UnloadingState {
Expand Down Expand Up @@ -227,22 +226,12 @@ pub(super) async fn ensure_listener_task_running(
"thread {conversation_id} is closing; retry after the thread is closed"
)));
};
let config = conversation.config().await;
let environments = conversation.environment_selections().await;
let watch_registration = listener_task_context
.skills_watcher
.register_thread_config(
config.as_ref(),
listener_task_context.thread_manager.as_ref(),
&environments,
)
.await;
let (mut listener_command_rx, listener_generation) = {
let mut thread_state = thread_state.lock().await;
if thread_state.listener_matches(&conversation) {
return Ok(());
}
thread_state.set_listener(cancel_tx, &conversation, watch_registration)
thread_state.set_listener(cancel_tx, &conversation)
};
let ListenerTaskContext {
outgoing,
Expand All @@ -253,7 +242,6 @@ pub(super) async fn ensure_listener_task_running(
thread_list_state_permit,
fallback_model_provider,
codex_home,
..
} = listener_task_context;
let outgoing_for_task = Arc::clone(&outgoing);
tokio::spawn(async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ pub(crate) struct ThreadRequestProcessor {
pub(super) thread_goal_processor: ThreadGoalRequestProcessor,
pub(super) state_db: Option<StateDbHandle>,
pub(super) background_tasks: TaskTracker,
pub(super) skills_watcher: Arc<SkillsWatcher>,
}

impl ThreadRequestProcessor {
Expand All @@ -335,7 +334,6 @@ impl ThreadRequestProcessor {
thread_list_state_permit: Arc<Semaphore>,
thread_goal_processor: ThreadGoalRequestProcessor,
state_db: Option<StateDbHandle>,
skills_watcher: Arc<SkillsWatcher>,
) -> Self {
Self {
auth_manager,
Expand All @@ -352,7 +350,6 @@ impl ThreadRequestProcessor {
thread_goal_processor,
state_db,
background_tasks: TaskTracker::new(),
skills_watcher,
}
}

Expand Down Expand Up @@ -745,7 +742,6 @@ impl ThreadRequestProcessor {
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
}
}

Expand Down Expand Up @@ -843,7 +839,6 @@ impl ThreadRequestProcessor {
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
};
let request_trace = request_context.request_trace();
let config_manager = self.config_manager.clone();
Expand Down Expand Up @@ -1044,6 +1039,7 @@ impl ThreadRequestProcessor {
.collect()
};
let core_dynamic_tool_count = core_dynamic_tools.len();

let NewThread {
thread_id,
thread,
Expand Down
4 changes: 0 additions & 4 deletions codex-rs/app-server/src/request_processors/turn_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub(crate) struct TurnRequestProcessor {
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_list_state_permit: Arc<Semaphore>,
skills_watcher: Arc<SkillsWatcher>,
}

impl TurnRequestProcessor {
Expand All @@ -30,7 +29,6 @@ impl TurnRequestProcessor {
thread_state_manager: ThreadStateManager,
thread_watch_manager: ThreadWatchManager,
thread_list_state_permit: Arc<Semaphore>,
skills_watcher: Arc<SkillsWatcher>,
) -> Self {
Self {
auth_manager,
Expand All @@ -44,7 +42,6 @@ impl TurnRequestProcessor {
thread_state_manager,
thread_watch_manager,
thread_list_state_permit,
skills_watcher,
}
}

Expand Down Expand Up @@ -1090,7 +1087,6 @@ impl TurnRequestProcessor {
thread_list_state_permit: self.thread_list_state_permit.clone(),
fallback_model_provider: self.config.model_provider_id.clone(),
codex_home: self.config.codex_home.to_path_buf(),
skills_watcher: Arc::clone(&self.skills_watcher),
}
}

Expand Down
112 changes: 0 additions & 112 deletions codex-rs/app-server/src/skills_watcher.rs

This file was deleted.

5 changes: 0 additions & 5 deletions codex-rs/app-server/src/thread_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnError;
use codex_core::CodexThread;
use codex_core::ThreadConfigSnapshot;
use codex_core::file_watcher::WatchRegistration;
use codex_protocol::ThreadId;
use codex_protocol::protocol::EventMsg;
use codex_protocol::protocol::RolloutItem;
Expand Down Expand Up @@ -78,7 +77,6 @@ pub(crate) struct ThreadState {
listener_command_tx: Option<mpsc::UnboundedSender<ThreadListenerCommand>>,
current_turn_history: ThreadHistoryBuilder,
listener_thread: Option<Weak<CodexThread>>,
watch_registration: WatchRegistration,
}

impl ThreadState {
Expand All @@ -93,7 +91,6 @@ impl ThreadState {
&mut self,
cancel_tx: oneshot::Sender<()>,
conversation: &Arc<CodexThread>,
watch_registration: WatchRegistration,
) -> (mpsc::UnboundedReceiver<ThreadListenerCommand>, u64) {
if let Some(previous) = self.cancel_tx.replace(cancel_tx) {
let _ = previous.send(());
Expand All @@ -102,7 +99,6 @@ impl ThreadState {
let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel();
self.listener_command_tx = Some(listener_command_tx);
self.listener_thread = Some(Arc::downgrade(conversation));
self.watch_registration = watch_registration;
(listener_command_rx, self.listener_generation)
}

Expand All @@ -113,7 +109,6 @@ impl ThreadState {
self.listener_command_tx = None;
self.current_turn_history.reset();
self.listener_thread = None;
self.watch_registration = WatchRegistration::default();
}

pub(crate) fn set_experimental_raw_events(&mut self, enabled: bool) {
Expand Down
41 changes: 0 additions & 41 deletions codex-rs/app-server/tests/suite/v2/skills_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,27 +661,6 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let initial_skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![codex_home.path().to_path_buf()],
force_reload: true,
per_cwd_extra_user_roots: None,
})
.await?;
let initial_skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(initial_skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(initial_skills_response)?;
assert_eq!(data.len(), 1);
assert!(
data[0]
.skills
.iter()
.any(|skill| { skill.name == "demo" && skill.description == "demo description" })
);

let thread_start_request_id = mcp
.send_thread_start_request(ThreadStartParams {
model: None,
Expand Down Expand Up @@ -734,25 +713,5 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<(
let notification: SkillsChangedNotification = serde_json::from_value(params)?;

assert_eq!(notification, SkillsChangedNotification {});
let updated_skills_request_id = mcp
.send_skills_list_request(SkillsListParams {
cwds: vec![codex_home.path().to_path_buf()],
force_reload: false,
per_cwd_extra_user_roots: None,
})
.await?;
let updated_skills_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(updated_skills_request_id)),
)
.await??;
let SkillsListResponse { data } = to_response(updated_skills_response)?;
assert_eq!(data.len(), 1);
assert!(
data[0]
.skills
.iter()
.any(|skill| skill.name == "demo" && skill.description == "updated")
);
Ok(())
}
Loading
Loading