From 1f82d88698fa59d39128c508121a3dc6b15b0f0c Mon Sep 17 00:00:00 2001 From: Benjamin Riley Zimmerman Date: Wed, 8 Apr 2026 23:21:57 -0700 Subject: [PATCH] . --- crates/forge_app/src/mcp_executor.rs | 11 +- crates/forge_app/src/services.rs | 13 +- crates/forge_domain/src/mcp.rs | 40 + crates/forge_main/src/info.rs | 2 +- crates/forge_services/src/mcp/lazy_client.rs | 113 +++ crates/forge_services/src/mcp/mod.rs | 1 + crates/forge_services/src/mcp/service.rs | 871 ++++++++++++++++--- 7 files changed, 945 insertions(+), 106 deletions(-) create mode 100644 crates/forge_services/src/mcp/lazy_client.rs diff --git a/crates/forge_app/src/mcp_executor.rs b/crates/forge_app/src/mcp_executor.rs index e058999622..18c944e7ea 100644 --- a/crates/forge_app/src/mcp_executor.rs +++ b/crates/forge_app/src/mcp_executor.rs @@ -25,11 +25,12 @@ impl McpExecutor { self.services.execute_mcp(input).await } + /// Check whether `tool_name` belongs to any MCP server. + /// + /// This is a pure in-memory check that does NOT connect to any server. + /// Tool names are known either because the server connected during a + /// previous call, or because they were declared statically in the config. pub async fn contains_tool(&self, tool_name: &ToolName) -> anyhow::Result { - let mcp_servers = self.services.get_mcp_servers().await?; - Ok(mcp_servers - .get_servers() - .values() - .any(|tools| tools.iter().any(|tool| tool.name == *tool_name))) + self.services.contains_mcp_tool(tool_name).await } } diff --git a/crates/forge_app/src/services.rs b/crates/forge_app/src/services.rs index 9cf7a12c89..676775b10d 100644 --- a/crates/forge_app/src/services.rs +++ b/crates/forge_app/src/services.rs @@ -7,7 +7,7 @@ use forge_domain::{ AgentId, AnyProvider, Attachment, AuthContextRequest, AuthContextResponse, AuthMethod, ChatCompletionMessage, CommandOutput, Context, Conversation, ConversationId, File, FileInfo, FileStatus, Image, McpConfig, McpServers, Model, ModelId, Node, Provider, ProviderId, - ResultStream, Scope, SearchParams, SyncProgress, SyntaxError, Template, ToolCallFull, + ResultStream, Scope, SearchParams, SyncProgress, SyntaxError, Template, ToolCallFull, ToolName, ToolOutput, WorkspaceAuth, WorkspaceId, WorkspaceInfo, }; use reqwest::Response; @@ -250,6 +250,13 @@ pub trait McpService: Send + Sync { async fn execute_mcp(&self, call: ToolCallFull) -> anyhow::Result; /// Refresh the MCP cache by fetching fresh data async fn reload_mcp(&self) -> anyhow::Result<()>; + /// Check whether a tool name belongs to any configured MCP server. + /// + /// This is intentionally a pure in-memory check: it does NOT establish + /// a live connection to any server. Tool names are known either because + /// the server already connected during a previous call, or because they + /// were declared statically in the MCP config. + async fn contains_mcp_tool(&self, tool_name: &ToolName) -> anyhow::Result; } #[async_trait::async_trait] @@ -712,6 +719,10 @@ impl McpService for I { async fn reload_mcp(&self) -> anyhow::Result<()> { self.mcp_service().reload_mcp().await } + + async fn contains_mcp_tool(&self, tool_name: &ToolName) -> anyhow::Result { + self.mcp_service().contains_mcp_tool(tool_name).await + } } #[async_trait::async_trait] diff --git a/crates/forge_domain/src/mcp.rs b/crates/forge_domain/src/mcp.rs index 75a15aedc4..4caf7e8d40 100644 --- a/crates/forge_domain/src/mcp.rs +++ b/crates/forge_domain/src/mcp.rs @@ -35,6 +35,7 @@ impl McpServerConfig { env: env.unwrap_or_default(), timeout: None, disable: false, + tools: Vec::new(), }) } @@ -45,6 +46,7 @@ impl McpServerConfig { headers: BTreeMap::new(), timeout: None, disable: false, + tools: Vec::new(), }) } @@ -62,6 +64,22 @@ impl McpServerConfig { McpServerConfig::Http(_) => "HTTP", } } + + /// Returns the statically-declared tool names for this server, if any. + /// + /// Returns `None` when no tools have been declared in the config, meaning + /// the real tool list is only known after a live connection is established. + pub fn declared_tools(&self) -> Option<&[String]> { + let tools = match self { + McpServerConfig::Stdio(s) => &s.tools, + McpServerConfig::Http(h) => &h.tools, + }; + if tools.is_empty() { + None + } else { + Some(tools.as_slice()) + } + } } #[derive(Default, Debug, Clone, Serialize, Deserialize, Setters, PartialEq, Hash)] @@ -88,6 +106,17 @@ pub struct McpStdioServer { /// remove it from the config. #[serde(default)] pub disable: bool, + + /// Optional static declaration of tool names exposed by this server. + /// + /// When present, Forge populates the system-prompt tool list from these + /// names **without** establishing a live connection. The server is only + /// connected to when one of its tools is actually invoked. + /// + /// When absent, the server's tools are unknown until first use and will + /// not appear in the system prompt until a tool from this server is called. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tools: Vec, } #[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Hash)] @@ -110,6 +139,17 @@ pub struct McpHttpServer { /// remove it from the config. #[serde(default)] pub disable: bool, + + /// Optional static declaration of tool names exposed by this server. + /// + /// When present, Forge populates the system-prompt tool list from these + /// names **without** establishing a live connection. The server is only + /// connected to when one of its tools is actually invoked. + /// + /// When absent, the server's tools are unknown until first use and will + /// not appear in the system prompt until a tool from this server is called. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub tools: Vec, } impl McpHttpServer {} diff --git a/crates/forge_main/src/info.rs b/crates/forge_main/src/info.rs index b0815a8799..074e8e9711 100644 --- a/crates/forge_main/src/info.rs +++ b/crates/forge_main/src/info.rs @@ -75,7 +75,7 @@ impl Section { /// # Output Format /// /// ```text -/// +/// /// CONFIGURATION /// model gpt-4 /// provider openai diff --git a/crates/forge_services/src/mcp/lazy_client.rs b/crates/forge_services/src/mcp/lazy_client.rs new file mode 100644 index 0000000000..765c316d8c --- /dev/null +++ b/crates/forge_services/src/mcp/lazy_client.rs @@ -0,0 +1,113 @@ +//! Lazy MCP client that defers connection until a tool is actually called. +//! +//! During discovery (building the tool list for the system prompt) Forge only +//! needs to know *which* tools a server exposes, not to hold a live connection +//! to it. `LazyMcpClient` separates these two concerns: +//! +//! - **Discovery**: the client is constructed from config without any network +//! I/O. Tool names and schemas come either from statically-declared tools in +//! the MCP config, or are left unknown until the first call. +//! - **Execution**: on the first `list()` or `call()`, the real +//! `McpServerInfra::Client` is initialised via `connect()`. Subsequent +//! calls reuse the same underlying client. +//! +//! Thread-safety is guaranteed by [`tokio::sync::OnceCell`]: even if two +//! concurrent callers race to initialise the connection, only one +//! initialisation will run. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use forge_app::{McpClientInfra, McpServerInfra}; +use forge_domain::{McpServerConfig, ToolDefinition, ToolName, ToolOutput}; +use tokio::sync::OnceCell; + +/// A lazily-initialised MCP client. +/// +/// Holds the configuration needed to connect to an MCP server and defers +/// the actual connection until [`list`] or [`call`] is first invoked. +pub(crate) struct LazyMcpClient { + config: McpServerConfig, + env_vars: BTreeMap, + infra: Arc, + /// The real client, initialised on first use. + inner: Arc>, +} + +impl Clone for LazyMcpClient { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + env_vars: self.env_vars.clone(), + infra: self.infra.clone(), + // Share the same OnceCell so all clones see the same connection. + inner: self.inner.clone(), + } + } +} + +impl LazyMcpClient { + pub(crate) fn new( + config: McpServerConfig, + env_vars: BTreeMap, + infra: Arc, + ) -> Self { + Self { config, env_vars, infra, inner: Arc::new(OnceCell::new()) } + } + + /// Ensure the inner client is initialised and return a reference to it. + async fn client(&self) -> anyhow::Result<&I::Client> { + self.inner + .get_or_try_init(|| async { + self.infra + .connect(self.config.clone(), &self.env_vars) + .await + }) + .await + } + + /// Consume the lazy client and return the initialised inner client. + /// + /// Prefers taking sole ownership via `Arc::try_unwrap` when this is the + /// last holder of the inner `Arc`. When other holders still exist (e.g., + /// a clone kept alive in `pending_servers` at call time), it falls back to + /// cloning the already-initialised inner value — the two resulting handles + /// will share the same underlying transport. + /// + /// # Errors + /// Returns an error if the inner client has not yet been initialised (i.e., + /// neither `list()` nor `call()` has been called). + pub(crate) async fn into_inner(self) -> anyhow::Result + where + I::Client: Clone + Send + Sync + 'static, + { + // Take ownership of the Arc; if we hold the only reference, we can + // unwrap it without Clone. Otherwise clone the inner value. + match Arc::try_unwrap(self.inner) { + Ok(once_cell) => once_cell + .into_inner() + .ok_or_else(|| anyhow::anyhow!("LazyMcpClient: inner client not yet initialised")), + Err(arc) => arc + .get() + .cloned() + .ok_or_else(|| anyhow::anyhow!("LazyMcpClient: inner client not yet initialised")), + } + } +} + +#[async_trait::async_trait] +impl McpClientInfra for LazyMcpClient { + /// List tools — connects on first call, reuses the connection thereafter. + async fn list(&self) -> anyhow::Result> { + self.client().await?.list().await + } + + /// Execute a tool call — connects on first call, reuses thereafter. + async fn call( + &self, + tool_name: &ToolName, + input: serde_json::Value, + ) -> anyhow::Result { + self.client().await?.call(tool_name, input).await + } +} diff --git a/crates/forge_services/src/mcp/mod.rs b/crates/forge_services/src/mcp/mod.rs index 6f5a8826d6..e4451e57fe 100644 --- a/crates/forge_services/src/mcp/mod.rs +++ b/crates/forge_services/src/mcp/mod.rs @@ -1,3 +1,4 @@ +pub(crate) mod lazy_client; mod manager; mod service; mod tool; diff --git a/crates/forge_services/src/mcp/service.rs b/crates/forge_services/src/mcp/service.rs index f03725edf0..4eb0b6835d 100644 --- a/crates/forge_services/src/mcp/service.rs +++ b/crates/forge_services/src/mcp/service.rs @@ -3,25 +3,49 @@ use std::sync::Arc; use anyhow::Context; use forge_app::domain::{ - McpConfig, McpServerConfig, McpServers, ServerName, ToolCallFull, ToolDefinition, ToolName, - ToolOutput, + McpConfig, McpServers, ServerName, ToolCallFull, ToolDefinition, ToolName, ToolOutput, }; use forge_app::{ EnvironmentInfra, KVStore, McpClientInfra, McpConfigManager, McpServerInfra, McpService, }; use tokio::sync::{Mutex, RwLock}; +use crate::mcp::lazy_client::LazyMcpClient; use crate::mcp::tool::McpExecutor; -#[derive(Clone)] -pub struct ForgeMcpService { +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +pub struct ForgeMcpService { + /// Live, connected tool executors (populated on first actual tool call). tools: Arc>>>>, + /// Servers registered from config – connection is NOT yet established. + pending_servers: Arc>>>, failed_servers: Arc>>, + /// Tool stubs built from statically-declared tools in config (no live connection). + declared_tools: Arc>>, previous_config_hash: Arc>, manager: Arc, infra: Arc, } +/// Manual `Clone` impl so that we don't require `M: Clone` or `I: Clone` — +/// all fields are `Arc`-wrapped so the clone just bumps reference counts. +impl Clone for ForgeMcpService { + fn clone(&self) -> Self { + Self { + tools: self.tools.clone(), + pending_servers: self.pending_servers.clone(), + failed_servers: self.failed_servers.clone(), + declared_tools: self.declared_tools.clone(), + previous_config_hash: self.previous_config_hash.clone(), + manager: self.manager.clone(), + infra: self.infra.clone(), + } + } +} + #[derive(Clone)] struct ToolHolder { definition: ToolDefinition, @@ -29,6 +53,10 @@ struct ToolHolder { server_name: String, } +// --------------------------------------------------------------------------- +// Core implementation +// --------------------------------------------------------------------------- + impl ForgeMcpService where M: McpConfigManager, @@ -39,7 +67,9 @@ where pub fn new(manager: Arc, infra: Arc) -> Self { Self { tools: Default::default(), + pending_servers: Default::default(), failed_servers: Default::default(), + declared_tools: Default::default(), previous_config_hash: Arc::new(Mutex::new(Default::default())), manager, infra, @@ -50,16 +80,207 @@ where *self.previous_config_hash.lock().await != config.cache_key() } - async fn insert_clients(&self, server_name: &ServerName, client: Arc) -> anyhow::Result<()> { - let tools = client.list().await?; + // ----------------------------------------------------------------------- + // Discovery — zero network I/O + // ----------------------------------------------------------------------- + + /// Register servers from config without establishing any connection. + /// + /// For servers that declare their tools statically in the config we + /// immediately build lightweight `ToolDefinition` stubs so the LLM can see + /// them in the system prompt. For servers with no static declarations, the + /// tools remain invisible until one of their tools is actually called and + /// the live connection is established. + async fn register_servers(&self, mcp: McpConfig) { + let new_hash = mcp.cache_key(); + *self.previous_config_hash.lock().await = new_hash; + + // Any config change — even adding an unrelated server — evicts all live + // connections and pending registrations. This matches the previous eager + // connect-everything behaviour and avoids stale tool lists for servers + // whose config did change. A future optimisation could diff the configs + // and preserve live connections for unchanged servers. + self.tools.write().await.clear(); + self.pending_servers.write().await.clear(); + self.declared_tools.write().await.clear(); + self.failed_servers.write().await.clear(); + + let env_vars = self.infra.get_env_vars(); + + let mut pending = self.pending_servers.write().await; + let mut declared = self.declared_tools.write().await; + + for (server_name, config) in mcp.mcp_servers.into_iter().filter(|v| !v.1.is_disabled()) { + // Build a lazy client – no network call happens here + let lazy = LazyMcpClient::new(config.clone(), env_vars.clone(), self.infra.clone()); + pending.insert(server_name.clone(), lazy); + + // Populate declared-tool stubs if the config specifies tool names + if let Some(tool_names) = config.declared_tools() { + for raw_name in tool_names { + let generated = ToolName::new(format!( + "mcp_{server_name}_tool_{}", + ToolName::sanitized(raw_name) + )); + declared.insert(generated, server_name.clone()); + } + } + } + } + + /// Ensure servers are registered from the current config. + /// Called at the start of `list()` and `call()`. + async fn ensure_registered(&self) -> anyhow::Result<()> { + let mcp = self.manager.read_mcp_config(None).await?; + if self.is_config_modified(&mcp).await { + self.register_servers(mcp).await; + } + Ok(()) + } + + // ----------------------------------------------------------------------- + // Connection — happens only when a tool is actually invoked + // ----------------------------------------------------------------------- + + /// Determine which server owns `tool_name` and connect to it. + async fn connect_for_tool(&self, tool_name: &ToolName) -> anyhow::Result<()> { + // Try to find the owning server via declared-tool stubs first (no-alloc path) + let server_name = { + let declared = self.declared_tools.read().await; + declared.get(tool_name).cloned() + }; + + match server_name { + Some(name) => self.connect_server(&name).await, + None => { + // Tool was not declared statically; discover by connecting all pending servers. + self.connect_all_pending().await + } + } + } + + /// Connect a specific server by name and insert its tools. + /// + /// **Concurrency:** the `pending_servers.remove()` call below is the true + /// mutual-exclusion point — exactly one concurrent caller receives the + /// `LazyMcpClient`; all others get `None` and return `Ok(())`. + /// + /// The two read-lock checks that precede it (fast-path on `tools` and on + /// `failed_servers`) are *optimistic* guards. They avoid the write-lock in + /// the common case but are not atomic with respect to each other: a server + /// could be marked failed between the two reads. The consequence is benign + /// — the `pending_servers.remove()` will return `None` and the caller + /// returns `Ok(())`, seeing the failure on its next attempt. + async fn connect_server(&self, server_name: &ServerName) -> anyhow::Result<()> { + // Fast path — already connected (no write-lock needed). + { + let tools = self.tools.read().await; + if tools + .values() + .any(|h| h.server_name == server_name.as_str()) + { + return Ok(()); + } + } + + // Already failed? + { + let failed = self.failed_servers.read().await; + if let Some(err) = failed.get(server_name) { + return Err(anyhow::anyhow!( + "MCP server '{server_name}' previously failed to connect: {err}" + )); + } + } + + // Grab the lazy client and remove it from pending in one step so + // concurrent callers that also passed the fast-path check cannot both + // proceed to connect the same server (TOCTOU fix). + let lazy = { + let mut pending = self.pending_servers.write().await; + match pending.remove(server_name) { + Some(lazy) => lazy, + // Another concurrent caller already removed and connected it. + None => return Ok(()), + } + }; + + // Trigger the actual connection + list tools + match self.insert_lazy_client(server_name, lazy).await { + Ok(()) => Ok(()), + Err(e) => { + let msg = format!("{e:?}"); + self.failed_servers + .write() + .await + .insert(server_name.clone(), msg.clone()); + Err(anyhow::anyhow!( + "Failed to connect to MCP server '{server_name}': {msg}" + )) + } + } + } + + /// Connect all pending servers (used when the tool owner is unknown). + /// + /// Connections are driven concurrently via `join_all`. All fields of + /// `ForgeMcpService` are `Arc`-wrapped so cloning is cheap. + async fn connect_all_pending(&self) -> anyhow::Result<()> { + let pending_names: Vec = { + let pending = self.pending_servers.read().await; + pending.keys().cloned().collect() + }; + + // Clone `self` once per server so each async block owns its own handle + // to the shared state. This avoids lifetime issues with `&self` across + // yield points inside `join_all`. + let futures: Vec<_> = pending_names + .into_iter() + .map(|name| { + let svc = self.clone(); + async move { svc.connect_server(&name).await } + }) + .collect(); + + let results = futures::future::join_all(futures).await; + + let mut last_err: Option = None; + for r in results { + if let Err(e) = r { + tracing::warn!(error = ?e, "MCP server failed to connect during bulk discovery"); + last_err = Some(e); + } + } + // Return last error only if nothing was connected at all + if let Some(e) = last_err { + let tools = self.tools.read().await; + if tools.is_empty() { + return Err(e); + } + } + Ok(()) + } + + /// Call `list()` on the lazy client (triggers real connection) and insert + /// the resulting tools into the live tool map. + async fn insert_lazy_client( + &self, + server_name: &ServerName, + lazy: LazyMcpClient, + ) -> anyhow::Result<()> { + // `lazy.list()` triggers the real connection on first call + let live_tools = lazy.list().await?; + + // Re-use the already-initialised inner client for execution + let inner = lazy.into_inner().await?; + let client: Arc = Arc::new(C::from(inner)); let mut tool_map = self.tools.write().await; - for mut tool in tools.into_iter() { + for mut tool in live_tools.into_iter() { let actual_name = tool.name.clone(); - let server = McpExecutor::new(actual_name, client.clone())?; + let executor = McpExecutor::new(actual_name, client.clone())?; - // Generate a unique name for the tool let generated_name = ToolName::new(format!( "mcp_{server_name}_tool_{}", tool.name.into_sanitized() @@ -71,123 +292,115 @@ where generated_name, ToolHolder { definition: tool, - executable: server, + executable: executor, server_name: server_name.to_string(), }, ); } + // Drop the write lock on tools before taking one on declared_tools to + // avoid holding two write locks simultaneously. + drop(tool_map); + + // Prune stale declared stubs for this server now that we have the live list. + // This prevents contains_tool_in_memory from returning true for tools that + // were declared in config but absent from the server's actual live tool list. + self.declared_tools + .write() + .await + .retain(|_, owner| owner != server_name); Ok(()) } - async fn connect( - &self, - server_name: &ServerName, - config: McpServerConfig, - ) -> anyhow::Result<()> { - let env_vars = self.infra.get_env_vars(); - let client = self.infra.connect(config, &env_vars).await?; - let client = Arc::new(C::from(client)); - self.insert_clients(server_name, client).await?; - - Ok(()) - } - - async fn init_mcp(&self) -> anyhow::Result<()> { - let mcp = self.manager.read_mcp_config(None).await?; - - // If config is unchanged, skip reinitialization - if !self.is_config_modified(&mcp).await { - return Ok(()); - } - - self.update_mcp(mcp).await - } + // ----------------------------------------------------------------------- + // list() — returns tool stubs without blocking on connections + // ----------------------------------------------------------------------- - async fn update_mcp(&self, mcp: McpConfig) -> Result<(), anyhow::Error> { - // Update the hash with the new config - let new_hash = mcp.cache_key(); - *self.previous_config_hash.lock().await = new_hash; - self.clear_tools().await; + async fn list(&self) -> anyhow::Result { + self.ensure_registered().await?; - // Clear failed servers map before attempting new connections - self.failed_servers.write().await.clear(); + let tools = self.tools.read().await; + let declared = self.declared_tools.read().await; + let failures = self.failed_servers.read().await.clone(); - let connections: Vec<_> = mcp - .mcp_servers - .into_iter() - .filter(|v| !v.1.is_disabled()) - .map(|(name, server)| async move { - let conn = self - .connect(&name, server) - .await - .context(format!("Failed to initiate MCP server: {name}")); + let mut grouped: HashMap> = HashMap::new(); - (name, conn) - }) - .collect(); + // Include already-connected live tools (full schemas) + for holder in tools.values() { + grouped + .entry(ServerName::from(holder.server_name.clone())) + .or_default() + .push(holder.definition.clone()); + } - let results = futures::future::join_all(connections).await; - - for (server_name, result) in results { - match result { - Ok(_) => {} - Err(error) => { - // Format error with full chain for detailed diagnostics - // Using Debug formatting with alternate flag shows the full error chain - let error_string = format!("{error:?}"); - self.failed_servers - .write() - .await - .insert(server_name.clone(), error_string.clone()); - } + // Include declared (not-yet-connected) tool stubs + for (tool_name, server_name) in declared.iter() { + // Skip if already represented via a live connection + if tools.contains_key(tool_name) { + continue; } + let stub = ToolDefinition::new(tool_name.as_str()) + .description("(tool schema not yet loaded — schema available after first use)"); + grouped.entry(server_name.clone()).or_default().push(stub); } - Ok(()) + Ok(McpServers::new(grouped, failures)) } - async fn list(&self) -> anyhow::Result { - self.init_mcp().await?; + // ----------------------------------------------------------------------- + // call() — triggers real connection lazily + // ----------------------------------------------------------------------- - let tools = self.tools.read().await; - let mut grouped_tools = std::collections::HashMap::new(); + async fn call(&self, call: ToolCallFull) -> anyhow::Result { + self.ensure_registered().await?; - for tool in tools.values() { - grouped_tools - .entry(ServerName::from(tool.server_name.clone())) - .or_insert_with(Vec::new) - .push(tool.definition.clone()); + // Fast path: tool already live + { + let tools = self.tools.read().await; + if let Some(holder) = tools.get(&call.name) { + return holder.executable.call_tool(call.arguments.parse()?).await; + } } - let failures = self.failed_servers.read().await.clone(); - - Ok(McpServers::new(grouped_tools, failures)) - } - async fn clear_tools(&self) { - self.tools.write().await.clear() - } - - async fn call(&self, call: ToolCallFull) -> anyhow::Result { - // Ensure MCP connections are initialized before calling tools - self.init_mcp().await?; + // Slow path: connect the owning server on first use + self.connect_for_tool(&call.name).await?; let tools = self.tools.read().await; + let holder = tools.get(&call.name).context(format!( + "Tool '{}' not found after connecting MCP server", + call.name + ))?; + holder.executable.call_tool(call.arguments.parse()?).await + } - let tool = tools.get(&call.name).context("Tool not found")?; + // ----------------------------------------------------------------------- + // contains_tool — in-memory only, zero network I/O + // ----------------------------------------------------------------------- - tool.executable.call_tool(call.arguments.parse()?).await + async fn contains_tool_in_memory(&self, tool_name: &ToolName) -> bool { + if self.tools.read().await.contains_key(tool_name) { + return true; + } + self.declared_tools.read().await.contains_key(tool_name) } - /// Refresh the MCP cache by fetching fresh data + // ----------------------------------------------------------------------- + // Refresh + // ----------------------------------------------------------------------- + async fn refresh_cache(&self) -> anyhow::Result<()> { - // Fetch fresh tools by calling list() which connects to MCPs self.infra.cache_clear().await?; + // Reset config hash so the next access re-registers from disk + *self.previous_config_hash.lock().await = 0; let _ = self.get_mcp_servers().await?; Ok(()) } } +// --------------------------------------------------------------------------- +// McpService trait implementation +// --------------------------------------------------------------------------- + #[async_trait::async_trait] impl McpService for ForgeMcpService @@ -196,20 +409,45 @@ where C: From<::Client>, { async fn get_mcp_servers(&self) -> anyhow::Result { - // Read current configs to compute merged hash let mcp_config = self.manager.read_mcp_config(None).await?; - - // Compute unified hash from merged config let config_hash = mcp_config.cache_key(); - // Check if cache is valid (exists and not expired) - // Cache is valid, retrieve it - if let Some(cache) = self.infra.cache_get::<_, McpServers>(&config_hash).await? { - return Ok(cache.clone()); + // The KV cache stores *stub* tool listings (declared-tool names only, + // no full schemas) so that cold-start latency is minimised. Once any + // server has established a live connection its full schemas live in the + // in-memory `tools` map which is always authoritative and must not be + // replaced by a stale KV entry. + // + // Decision matrix: + // no live connections, KV hit → serve KV cache (stubs, fast path) + // no live connections, KV miss → build from declared config, write KV + // any live connections → serve from memory (includes schemas) + // do NOT write back to KV (avoid + // persisting schemas that may become + // stale across runs) + // + // Note: `had_live_before` and `has_live_now` are point-in-time snapshots. + // A concurrent caller could establish connections between either read and + // the subsequent operations. This is an intentional eventual-consistency + // tradeoff: the window is sub-millisecond and the worst case is a missed + // cache write (not a correctness failure). + let had_live_before = !self.tools.read().await.is_empty(); + if !had_live_before { + if let Some(cache) = self.infra.cache_get::<_, McpServers>(&config_hash).await? { + return Ok(cache); + } } let servers = self.list().await?; - self.infra.cache_set(&config_hash, &servers).await?; + + // Fresh read after list() so we never persist live schemas to the KV cache. + // list() may have triggered connections (e.g. via ensure_registered clearing + // and re-registering), so take an up-to-date snapshot. + let has_live_now = !self.tools.read().await.is_empty(); + if !has_live_now { + self.infra.cache_set(&config_hash, &servers).await?; + } + Ok(servers) } @@ -220,4 +458,439 @@ where async fn reload_mcp(&self) -> anyhow::Result<()> { self.refresh_cache().await } + + async fn contains_mcp_tool(&self, tool_name: &ToolName) -> anyhow::Result { + self.ensure_registered().await?; + Ok(self.contains_tool_in_memory(tool_name).await) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +#[allow(dead_code)] +mod tests { + use std::collections::BTreeMap; + use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use forge_app::domain::{ + McpConfig, McpServerConfig, McpStdioServer, ServerName, ToolCallFull, ToolDefinition, + ToolName, ToolOutput, + }; + use forge_app::{ + EnvironmentInfra, KVStore, McpClientInfra, McpConfigManager, McpServerInfra, McpService, + }; + use forge_config::ForgeConfig; + use forge_domain::{ConfigOperation, Environment, Scope}; + use serde::de::DeserializeOwned; + use tokio::sync::Mutex; + + use super::ForgeMcpService; + + // ----------------------------------------------------------------------- + // Mock: McpClientInfra + // ----------------------------------------------------------------------- + + #[derive(Clone)] + struct MockMcpClientInfra { + tools: Vec, + connect_count: Arc, + } + + impl MockMcpClientInfra { + fn new(tool_names: Vec<&str>, connect_count: Arc) -> Self { + let tools = tool_names + .into_iter() + .map(|n| ToolDefinition::new(n)) + .collect(); + Self { tools, connect_count } + } + } + + #[async_trait::async_trait] + impl McpClientInfra for MockMcpClientInfra { + async fn list(&self) -> anyhow::Result> { + self.connect_count.fetch_add(1, Ordering::SeqCst); + Ok(self.tools.clone()) + } + + async fn call( + &self, + _tool_name: &ToolName, + _input: serde_json::Value, + ) -> anyhow::Result { + Ok(ToolOutput::text("mock response")) + } + } + + // ----------------------------------------------------------------------- + // Mock: McpServerInfra + KVStore + EnvironmentInfra + // ----------------------------------------------------------------------- + + struct MockMcpServerInfra { + client: MockMcpClientInfra, + kv: Arc>>>, + } + + impl MockMcpServerInfra { + fn new(client: MockMcpClientInfra) -> Self { + Self { client, kv: Default::default() } + } + } + + #[async_trait::async_trait] + impl McpServerInfra for MockMcpServerInfra { + type Client = MockMcpClientInfra; + + async fn connect( + &self, + _config: McpServerConfig, + _env_vars: &BTreeMap, + ) -> anyhow::Result { + Ok(self.client.clone()) + } + } + + #[async_trait::async_trait] + impl KVStore for MockMcpServerInfra { + async fn cache_get(&self, key: &K) -> anyhow::Result> + where + K: std::hash::Hash + Sync, + V: serde::Serialize + DeserializeOwned + Send, + { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + let mut h = DefaultHasher::new(); + std::hash::Hash::hash(key, &mut h); + let k = h.finish().to_string(); + let map = self.kv.lock().await; + match map.get(&k) { + Some(bytes) => Ok(Some(serde_json::from_slice(bytes)?)), + None => Ok(None), + } + } + + async fn cache_set(&self, key: &K, value: &V) -> anyhow::Result<()> + where + K: std::hash::Hash + Sync, + V: serde::Serialize + Sync, + { + use std::collections::hash_map::DefaultHasher; + use std::hash::Hasher; + let mut h = DefaultHasher::new(); + std::hash::Hash::hash(key, &mut h); + let k = h.finish().to_string(); + let bytes = serde_json::to_vec(value)?; + self.kv.lock().await.insert(k, bytes); + Ok(()) + } + + async fn cache_clear(&self) -> anyhow::Result<()> { + self.kv.lock().await.clear(); + Ok(()) + } + } + + impl EnvironmentInfra for MockMcpServerInfra { + type Config = ForgeConfig; + + fn get_env_var(&self, _key: &str) -> Option { + None + } + + fn get_env_vars(&self) -> BTreeMap { + BTreeMap::new() + } + + fn get_environment(&self) -> Environment { + Environment { + os: "test".to_string(), + pid: 0, + cwd: std::path::PathBuf::from("/tmp"), + home: None, + shell: "sh".to_string(), + base_path: std::path::PathBuf::from("/tmp"), + } + } + + fn get_config(&self) -> ForgeConfig { + ForgeConfig::default() + } + + fn update_environment( + &self, + _ops: Vec, + ) -> impl std::future::Future> + Send { + async { Ok(()) } + } + } + + // ----------------------------------------------------------------------- + // Mock: McpConfigManager + // ----------------------------------------------------------------------- + + struct MockMcpConfigManager { + config: Arc>, + } + + impl MockMcpConfigManager { + fn new(config: McpConfig) -> Self { + Self { config: Arc::new(Mutex::new(config)) } + } + } + + #[async_trait::async_trait] + impl McpConfigManager for MockMcpConfigManager { + async fn read_mcp_config(&self, _scope: Option<&Scope>) -> anyhow::Result { + Ok(self.config.lock().await.clone()) + } + + async fn write_mcp_config( + &self, + _config: &McpConfig, + _scope: &Scope, + ) -> anyhow::Result<()> { + Ok(()) + } + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + fn server_config_with_tools(tool_names: Vec<&str>) -> McpServerConfig { + McpServerConfig::Stdio(McpStdioServer { + command: "echo".to_string(), + args: vec![], + env: BTreeMap::new(), + timeout: None, + disable: false, + tools: tool_names.into_iter().map(|s| s.to_string()).collect(), + }) + } + + fn server_config_no_tools() -> McpServerConfig { + McpServerConfig::new_stdio("echo", vec![], None) + } + + fn make_service( + config: McpConfig, + client: MockMcpClientInfra, + ) -> ForgeMcpService { + let manager = Arc::new(MockMcpConfigManager::new(config)); + let infra = Arc::new(MockMcpServerInfra::new(client)); + ForgeMcpService::new(manager, infra) + } + + // ----------------------------------------------------------------------- + // Tests + // ----------------------------------------------------------------------- + + /// Declared tool names appear in get_mcp_servers() without any connection. + #[tokio::test] + async fn test_declared_tools_visible_without_connection() { + let connect_count = Arc::new(AtomicUsize::new(0)); + let client = MockMcpClientInfra::new(vec![], connect_count.clone()); + + let mut config = McpConfig::default(); + config.mcp_servers.insert( + ServerName::from("github".to_string()), + server_config_with_tools(vec!["get_repo", "list_prs"]), + ); + + let svc = make_service(config, client); + let servers = svc.get_mcp_servers().await.unwrap(); + + let tool_names: Vec = servers + .get_servers() + .values() + .flat_map(|tools| tools.iter().map(|t| t.name.to_string())) + .collect(); + + assert!( + tool_names.iter().any(|n| n.contains("get_repo")), + "expected get_repo in {tool_names:?}" + ); + assert!( + tool_names.iter().any(|n| n.contains("list_prs")), + "expected list_prs in {tool_names:?}" + ); + // No real connection should have been triggered + assert_eq!(connect_count.load(Ordering::SeqCst), 0); + } + + /// Calling a tool with no static declaration triggers connect_all_pending. + #[tokio::test] + async fn test_undeclared_tool_triggers_connect_all_pending() { + let connect_count = Arc::new(AtomicUsize::new(0)); + // live tool returned by the server + let client = MockMcpClientInfra::new(vec!["list_repos"], connect_count.clone()); + + let mut config = McpConfig::default(); + config.mcp_servers.insert( + ServerName::from("github".to_string()), + server_config_no_tools(), // no static declaration + ); + + let svc = make_service(config, client); + + // Tool name that the live server would return (generated name) + let call = ToolCallFull::new(ToolName::new("mcp_github_tool_list_repos")); + let result = svc.execute_mcp(call).await; + + // Either succeeds (tool found) or fails with "not found" — the important + // thing is that connect() was invoked. + let _ = result; // don't assert success, just that path ran + assert!( + connect_count.load(Ordering::SeqCst) >= 1, + "expected at least one connect() call" + ); + } + + /// A declared tool triggers only the owning server's connection. + #[tokio::test] + async fn test_declared_tool_only_connects_owning_server() { + let github_count = Arc::new(AtomicUsize::new(0)); + let notion_count = Arc::new(AtomicUsize::new(0)); + + // github client: returns list_prs + let github_client = MockMcpClientInfra::new(vec!["list_prs"], github_count.clone()); + + // notion client: panics on list() — should never be called + struct PanickingClient; + #[async_trait::async_trait] + impl McpClientInfra for PanickingClient { + async fn list(&self) -> anyhow::Result> { + panic!("notion should not be connected") + } + async fn call(&self, _: &ToolName, _: serde_json::Value) -> anyhow::Result { + panic!("should not be called") + } + } + impl Clone for PanickingClient { + fn clone(&self) -> Self { + PanickingClient + } + } + + // Build a two-server config: github (declared) + notion (declared) + // We set both as declared so connect_for_tool can route by name. + let mut cfg = McpConfig::default(); + cfg.mcp_servers.insert( + ServerName::from("github".to_string()), + server_config_with_tools(vec!["list_prs"]), + ); + cfg.mcp_servers.insert( + ServerName::from("notion".to_string()), + server_config_with_tools(vec!["search_pages"]), + ); + + // Use github client only — notion will never be reached + let _ = notion_count; // suppress unused warning + let svc = make_service(cfg, github_client); + + let call = ToolCallFull::new(ToolName::new("mcp_github_tool_list_prs")); + let result = svc.execute_mcp(call).await; + // Should succeed (github connected) without panicking (notion not touched) + assert!(result.is_ok(), "expected success but got: {result:?}"); + assert_eq!( + github_count.load(Ordering::SeqCst), + 1, + "github should connect once" + ); + } + + /// contains_mcp_tool returns true for a declared tool without connecting. + #[tokio::test] + async fn test_contains_tool_declared_returns_true() { + let connect_count = Arc::new(AtomicUsize::new(0)); + let client = MockMcpClientInfra::new(vec![], connect_count.clone()); + + let mut config = McpConfig::default(); + config.mcp_servers.insert( + ServerName::from("github".to_string()), + server_config_with_tools(vec!["search_repos"]), + ); + + let svc = make_service(config, client); + let found = svc + .contains_mcp_tool(&ToolName::new("mcp_github_tool_search_repos")) + .await + .unwrap(); + + assert!(found, "expected declared tool to be found"); + assert_eq!( + connect_count.load(Ordering::SeqCst), + 0, + "no connection should occur" + ); + } + + /// After a live connection, stale declared stubs are pruned (Task 1 fix). + #[tokio::test] + async fn test_stale_declared_stubs_pruned_after_live_connection() { + let connect_count = Arc::new(AtomicUsize::new(0)); + // Server declares "old_tool" in config but live list returns "new_tool" + let client = MockMcpClientInfra::new(vec!["new_tool"], connect_count.clone()); + + let mut config = McpConfig::default(); + config.mcp_servers.insert( + ServerName::from("srv".to_string()), + server_config_with_tools(vec!["old_tool"]), + ); + + let svc = make_service(config, client); + + // Trigger live connection by calling the undeclared live tool (connect_all_pending) + let call = ToolCallFull::new(ToolName::new("mcp_srv_tool_new_tool")); + let _ = svc.execute_mcp(call).await; + + // After connection: old_tool stub should be gone, new_tool should be present + let has_old = svc + .contains_mcp_tool(&ToolName::new("mcp_srv_tool_old_tool")) + .await + .unwrap(); + let has_new = svc + .contains_mcp_tool(&ToolName::new("mcp_srv_tool_new_tool")) + .await + .unwrap(); + + assert!(!has_old, "stale declared stub should have been pruned"); + assert!(has_new, "live tool should be visible after connection"); + } + + /// Concurrent calls to connect the same server only trigger one actual connection. + #[tokio::test] + async fn test_concurrent_connect_server_idempotent() { + let connect_count = Arc::new(AtomicUsize::new(0)); + let client = MockMcpClientInfra::new(vec!["list_prs"], connect_count.clone()); + + let mut config = McpConfig::default(); + config.mcp_servers.insert( + ServerName::from("github".to_string()), + server_config_with_tools(vec!["list_prs"]), + ); + + let svc = Arc::new(make_service(config, client)); + + let svc1 = svc.clone(); + let svc2 = svc.clone(); + let call1 = ToolCallFull::new(ToolName::new("mcp_github_tool_list_prs")); + let call2 = ToolCallFull::new(ToolName::new("mcp_github_tool_list_prs")); + + let (r1, r2) = tokio::join!(svc1.execute_mcp(call1), svc2.execute_mcp(call2)); + + assert!(r1.is_ok(), "first call should succeed: {r1:?}"); + assert!(r2.is_ok(), "second call should succeed: {r2:?}"); + + // connect() should have been called exactly once despite the race + assert_eq!( + connect_count.load(Ordering::SeqCst), + 1, + "connect() should be called exactly once" + ); + } }