From e94ba219dba19332adfd715eb51f6de42c3e7b14 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Wed, 6 May 2026 15:22:55 -0700 Subject: [PATCH] Add cloud environment provider Load remote environments from a cloud-backed provider in environments.toml and simplify executor registry registration to use the current cloud API without mutable environment upserts. Co-authored-by: Codex --- codex-rs/Cargo.lock | 1 - codex-rs/exec-server/Cargo.toml | 1 - codex-rs/exec-server/src/environment_toml.rs | 289 ++++++++++++++++++- codex-rs/exec-server/src/remote.rs | 129 +++------ 4 files changed, 325 insertions(+), 95 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 295c24820ade..681b1aae8103 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2723,7 +2723,6 @@ dependencies = [ "serde", "serde_json", "serial_test", - "sha2", "tempfile", "test-case", "thiserror 2.0.18", diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index c466a234c1ed..9fbdd91117f1 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -26,7 +26,6 @@ futures = { workspace = true } reqwest = { workspace = true, features = ["json", "rustls-tls", "stream"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -sha2 = { workspace = true } thiserror = { workspace = true } toml = { workspace = true } tokio = { workspace = true, features = [ diff --git a/codex-rs/exec-server/src/environment_toml.rs b/codex-rs/exec-server/src/environment_toml.rs index bd45bcf83225..147c233756bc 100644 --- a/codex-rs/exec-server/src/environment_toml.rs +++ b/codex-rs/exec-server/src/environment_toml.rs @@ -17,6 +17,9 @@ use crate::client_api::StdioExecServerCommand; use crate::environment::LOCAL_ENVIRONMENT_ID; use crate::environment_provider::EnvironmentDefault; use crate::environment_provider::EnvironmentProviderSnapshot; +use crate::remote::CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR; +use crate::remote::ExecutorRegistryClient; +use crate::remote::read_bearer_token_from_env_var; const ENVIRONMENTS_TOML_FILE: &str = "environments.toml"; const MAX_ENVIRONMENT_ID_LEN: usize = 64; @@ -28,6 +31,8 @@ struct EnvironmentsToml { #[serde(default)] environments: Vec, + + cloud: Option, } #[derive(Deserialize, Debug, Default, PartialEq, Eq)] @@ -41,6 +46,23 @@ struct EnvironmentToml { cwd: Option, } +#[derive(Deserialize, Debug, Default)] +#[serde(deny_unknown_fields)] +struct CloudProviderToml { + base_url: String, + bearer_token_env: Option, + + #[serde(default)] + environments: Vec, +} + +#[derive(Clone, Deserialize, Debug, Default, PartialEq, Eq)] +#[serde(deny_unknown_fields)] +struct CloudEnvironmentToml { + id: String, + executor_id: String, +} + #[derive(Clone, Debug, PartialEq, Eq)] struct TomlEnvironmentProvider { default: EnvironmentDefault, @@ -105,6 +127,82 @@ impl EnvironmentProvider for TomlEnvironmentProvider { } } +#[derive(Clone, Debug, PartialEq, Eq)] +struct CloudEnvironmentProvider { + default: EnvironmentDefault, + base_url: String, + bearer_token_env: String, + environments: Vec, +} + +impl CloudEnvironmentProvider { + fn new(config: CloudProviderToml, default: Option) -> Result { + let CloudProviderToml { + base_url, + bearer_token_env, + environments, + } = config; + let mut ids = HashSet::from([LOCAL_ENVIRONMENT_ID.to_string()]); + let mut cloud_environments = Vec::with_capacity(environments.len()); + for item in environments { + validate_environment_id(&item.id)?; + if !ids.insert(item.id.clone()) { + return Err(ExecServerError::Protocol(format!( + "environment id `{}` is duplicated", + item.id + ))); + } + cloud_environments.push(CloudEnvironmentToml { + id: item.id, + executor_id: validate_cloud_executor_id(&item.id, item.executor_id)?, + }); + } + + let bearer_token_env = bearer_token_env + .unwrap_or_else(|| CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR.to_string()); + if bearer_token_env.trim().is_empty() { + return Err(ExecServerError::Protocol( + "cloud bearer_token_env cannot be empty".to_string(), + )); + } + + Ok(Self { + default: normalize_default_environment_id(default.as_deref(), &ids)?, + base_url, + bearer_token_env, + environments: cloud_environments, + }) + } +} + +#[async_trait] +impl EnvironmentProvider for CloudEnvironmentProvider { + async fn snapshot( + &self, + local_runtime_paths: &ExecServerRuntimePaths, + ) -> Result { + let bearer_token = read_bearer_token_from_env_var(&self.bearer_token_env)?; + let client = ExecutorRegistryClient::new(self.base_url.clone(), bearer_token)?; + let mut environments = HashMap::from([( + LOCAL_ENVIRONMENT_ID.to_string(), + Environment::local(local_runtime_paths.clone()), + )]); + + for item in &self.environments { + let response = client.register_executor(&item.executor_id).await?; + environments.insert( + item.id.clone(), + Environment::remote_inner(response.url, Some(local_runtime_paths.clone())), + ); + } + + Ok(EnvironmentProviderSnapshot { + environments, + default: self.default.clone(), + }) + } +} + fn parse_environment_toml( item: EnvironmentToml, config_dir: Option<&Path>, @@ -187,10 +285,36 @@ pub(crate) fn environment_provider_from_codex_home( } let environments = load_environments_toml(&path)?; - Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir( + environment_provider_from_config(environments, Some(codex_home)) +} + +fn environment_provider_from_config( + config: EnvironmentsToml, + config_dir: Option<&Path>, +) -> Result, ExecServerError> { + let EnvironmentsToml { + default, environments, - Some(codex_home), - )?)) + cloud, + } = config; + match cloud { + Some(cloud) => { + if !environments.is_empty() { + return Err(ExecServerError::Protocol( + "cloud provider cannot be combined with explicit environments".to_string(), + )); + } + Ok(Box::new(CloudEnvironmentProvider::new(cloud, default)?)) + } + None => Ok(Box::new(TomlEnvironmentProvider::new_with_config_dir( + EnvironmentsToml { + default, + environments, + cloud: None, + }, + config_dir, + )?)), + } } fn normalize_default_environment_id( @@ -270,6 +394,16 @@ fn validate_websocket_url(url: String) -> Result { Ok(url.to_string()) } +fn validate_cloud_executor_id(id: &str, executor_id: String) -> Result { + let executor_id = executor_id.trim().to_string(); + if executor_id.is_empty() { + return Err(ExecServerError::Protocol(format!( + "cloud environment `{id}` executor_id cannot be empty" + ))); + } + Ok(executor_id) +} + fn load_environments_toml(path: &Path) -> Result { let contents = std::fs::read_to_string(path).map_err(|err| { ExecServerError::Protocol(format!( @@ -289,10 +423,43 @@ fn load_environments_toml(path: &Path) -> Result, + } + + impl EnvVarGuard { + fn set(key: &'static str, value: &str) -> Self { + let previous = std::env::var(key).ok(); + unsafe { + std::env::set_var(key, value); + } + Self { key, previous } + } + } + + impl Drop for EnvVarGuard { + fn drop(&mut self) { + unsafe { + match &self.previous { + Some(value) => std::env::set_var(self.key, value), + None => std::env::remove_var(self.key), + } + } + } + } + fn test_runtime_paths() -> ExecServerRuntimePaths { ExecServerRuntimePaths::new( std::env::current_exe().expect("current exe"), @@ -325,6 +492,7 @@ mod tests { ..Default::default() }, ], + ..Default::default() }) .expect("provider"); let runtime_paths = test_runtime_paths(); @@ -381,6 +549,7 @@ mod tests { let provider = TomlEnvironmentProvider::new(EnvironmentsToml { default: Some("none".to_string()), environments: Vec::new(), + ..Default::default() }) .expect("provider"); let snapshot = provider @@ -391,6 +560,75 @@ mod tests { assert_eq!(snapshot.default, EnvironmentDefault::Disabled); } + #[tokio::test] + #[serial] + async fn cloud_provider_fetches_remote_environment_urls() { + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/cloud/executor/exec-requested/register")) + .and(header("authorization", "Bearer registry-token")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "executor_id": "exec-requested", + "url": "wss://rendezvous.test/executor/exec-requested?sig=abc" + }))) + .mount(&server) + .await; + + let provider = CloudEnvironmentProvider::new( + CloudProviderToml { + base_url: server.uri(), + bearer_token_env: Some("CODEX_TEST_CLOUD_PROVIDER_TOKEN".to_string()), + environments: vec![CloudEnvironmentToml { + id: "cloud-dev".to_string(), + executor_id: "exec-requested".to_string(), + }], + }, + Some("cloud-dev".to_string()), + ) + .expect("provider"); + let _env_guard = EnvVarGuard::set("CODEX_TEST_CLOUD_PROVIDER_TOKEN", "registry-token"); + + let snapshot = provider + .snapshot(&test_runtime_paths()) + .await + .expect("cloud environments"); + + assert_eq!( + snapshot.default, + EnvironmentDefault::EnvironmentId("cloud-dev".to_string()) + ); + assert_eq!( + snapshot.environments["cloud-dev"].exec_server_url(), + Some("wss://rendezvous.test/executor/exec-requested?sig=abc") + ); + } + + #[test] + fn environment_provider_from_config_rejects_mixed_cloud_and_explicit_environments() { + let err = environment_provider_from_config( + EnvironmentsToml { + default: None, + environments: vec![EnvironmentToml { + id: "devbox".to_string(), + url: Some("ws://127.0.0.1:8765".to_string()), + ..Default::default() + }], + cloud: Some(CloudProviderToml { + base_url: "https://registry.example".to_string(), + bearer_token_env: None, + environments: Vec::new(), + }), + }, + /*config_dir*/ None, + ) + .expect_err("mixed provider config should fail"); + + assert_eq!( + err.to_string(), + "exec-server protocol error: cloud provider cannot be combined with explicit environments" + ); + } + #[test] fn toml_provider_rejects_invalid_environments() { let cases = [ @@ -457,6 +695,7 @@ mod tests { let err = TomlEnvironmentProvider::new(EnvironmentsToml { default: None, environments: vec![item], + ..Default::default() }) .expect_err("invalid item should fail"); @@ -479,6 +718,7 @@ mod tests { cwd: Some(PathBuf::from("workspace")), ..Default::default() }], + ..Default::default() }, Some(config_dir.path()), ) @@ -505,6 +745,7 @@ mod tests { cwd: Some(PathBuf::from("workspace")), ..Default::default() }], + ..Default::default() }) .expect_err("relative cwd without config dir should fail"); @@ -530,6 +771,7 @@ mod tests { ..Default::default() }, ], + ..Default::default() }) .expect_err("duplicate id should fail"); @@ -549,6 +791,7 @@ mod tests { url: Some("ws://127.0.0.1:8765".to_string()), ..Default::default() }], + ..Default::default() }) .expect_err("overlong id should fail"); @@ -565,6 +808,7 @@ mod tests { let err = TomlEnvironmentProvider::new(EnvironmentsToml { default: Some("missing".to_string()), environments: Vec::new(), + ..Default::default() }) .expect_err("unknown default should fail"); @@ -651,6 +895,44 @@ unknown = true } } + #[test] + fn load_environments_toml_reads_cloud_provider_config() { + let codex_home = tempdir().expect("tempdir"); + let path = codex_home.path().join(ENVIRONMENTS_TOML_FILE); + std::fs::write( + &path, + r#" +default = "cloud-dev" + +[cloud] +base_url = "https://registry.example" +bearer_token_env = "CODEX_TEST_CLOUD_PROVIDER_TOKEN" + +[[cloud.environments]] +id = "cloud-dev" +executor_id = "exec-requested" +"#, + ) + .expect("write environments.toml"); + + let environments = load_environments_toml(&path).expect("environments.toml"); + + assert_eq!(environments.default.as_deref(), Some("cloud-dev")); + let cloud = environments.cloud.expect("cloud config"); + assert_eq!(cloud.base_url, "https://registry.example"); + assert_eq!( + cloud.bearer_token_env.as_deref(), + Some("CODEX_TEST_CLOUD_PROVIDER_TOKEN") + ); + assert_eq!( + cloud.environments, + vec![CloudEnvironmentToml { + id: "cloud-dev".to_string(), + executor_id: "exec-requested".to_string(), + }] + ); + } + #[test] fn toml_provider_rejects_malformed_websocket_url() { let err = TomlEnvironmentProvider::new(EnvironmentsToml { @@ -660,6 +942,7 @@ unknown = true url: Some("ws://".to_string()), ..Default::default() }], + ..Default::default() }) .expect_err("malformed websocket url should fail"); diff --git a/codex-rs/exec-server/src/remote.rs b/codex-rs/exec-server/src/remote.rs index b574ced72f1e..92c890f67c9e 100644 --- a/codex-rs/exec-server/src/remote.rs +++ b/codex-rs/exec-server/src/remote.rs @@ -1,16 +1,11 @@ -use std::collections::BTreeMap; use std::env; use std::time::Duration; use reqwest::StatusCode; use serde::Deserialize; -use serde::Serialize; -use serde_json::Value; -use sha2::Digest as _; use tokio::time::sleep; use tokio_tungstenite::connect_async; use tracing::warn; -use uuid::Uuid; use crate::ExecServerError; use crate::ExecServerRuntimePaths; @@ -20,11 +15,10 @@ use crate::server::ConnectionProcessor; pub const CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR: &str = "CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN"; -const PROTOCOL_VERSION: &str = "codex-exec-server-v1"; const ERROR_BODY_PREVIEW_BYTES: usize = 4096; #[derive(Clone)] -struct ExecutorRegistryClient { +pub(crate) struct ExecutorRegistryClient { base_url: String, bearer_token: String, http: reqwest::Client, @@ -40,7 +34,7 @@ impl std::fmt::Debug for ExecutorRegistryClient { } impl ExecutorRegistryClient { - fn new(base_url: String, bearer_token: String) -> Result { + pub(crate) fn new(base_url: String, bearer_token: String) -> Result { let base_url = normalize_base_url(base_url)?; Ok(Self { base_url, @@ -49,30 +43,29 @@ impl ExecutorRegistryClient { }) } - async fn register_executor( + pub(crate) async fn register_executor( &self, - request: &ExecutorRegistryRegisterExecutorRequest, + executor_id: &str, ) -> Result { - self.post_json( - &format!("/cloud/executor/{}/register", request.executor_id), - request, - ) - .await - } - - async fn post_json(&self, path: &str, request: &T) -> Result - where - T: Serialize + Sync, - R: for<'de> Deserialize<'de>, - { let response = self .http - .post(endpoint_url(&self.base_url, path)) + .post(endpoint_url( + &self.base_url, + &format!("/cloud/executor/{executor_id}/register"), + )) .bearer_auth(&self.bearer_token) - .json(request) .send() .await?; + self.parse_json_response(response).await + } + async fn parse_json_response( + &self, + response: reqwest::Response, + ) -> Result + where + R: for<'de> Deserialize<'de>, + { if response.status().is_success() { return response.json::().await.map_err(ExecServerError::from); } @@ -87,21 +80,10 @@ impl ExecutorRegistryClient { } } -#[derive(Debug, Clone, Eq, PartialEq, Serialize)] -struct ExecutorRegistryRegisterExecutorRequest { - idempotency_id: String, - executor_id: String, - #[serde(skip_serializing_if = "Option::is_none")] - name: Option, - labels: BTreeMap, - metadata: Value, -} - #[derive(Debug, Clone, Eq, PartialEq, Deserialize)] -struct ExecutorRegistryExecutorRegistrationResponse { - id: String, - executor_id: String, - url: String, +pub(crate) struct ExecutorRegistryExecutorRegistrationResponse { + pub(crate) executor_id: String, + pub(crate) url: String, } /// Configuration for registering an exec-server for remote use. @@ -135,7 +117,8 @@ impl RemoteExecutorConfig { bearer_token: String, ) -> Result { let executor_id = normalize_executor_id(executor_id)?; - let bearer_token = normalize_bearer_token(bearer_token)?; + let bearer_token = + normalize_bearer_token(bearer_token, CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR)?; Ok(Self { base_url, executor_id, @@ -143,32 +126,6 @@ impl RemoteExecutorConfig { bearer_token, }) } - - fn registration_request( - &self, - registration_id: Uuid, - ) -> ExecutorRegistryRegisterExecutorRequest { - ExecutorRegistryRegisterExecutorRequest { - idempotency_id: self.default_idempotency_id(registration_id), - executor_id: self.executor_id.clone(), - name: Some(self.name.clone()), - labels: BTreeMap::new(), - metadata: Value::Object(Default::default()), - } - } - - fn default_idempotency_id(&self, registration_id: Uuid) -> String { - let mut hasher = sha2::Sha256::new(); - hasher.update(self.executor_id.as_bytes()); - hasher.update(b"\0"); - hasher.update(self.name.as_bytes()); - hasher.update(b"\0"); - hasher.update(PROTOCOL_VERSION); - hasher.update(b"\0"); - hasher.update(registration_id.as_bytes()); - let digest = hasher.finalize(); - format!("codex-exec-server-{digest:x}") - } } /// Register an exec-server for remote use and serve requests over the returned @@ -179,15 +136,13 @@ pub async fn run_remote_executor( ) -> Result<(), ExecServerError> { let client = ExecutorRegistryClient::new(config.base_url.clone(), config.bearer_token.clone())?; let processor = ConnectionProcessor::new(runtime_paths); - let registration_id = Uuid::new_v4(); let mut backoff = Duration::from_secs(1); loop { - let request = config.registration_request(registration_id); - let response = client.register_executor(&request).await?; + let response = client.register_executor(&config.executor_id).await?; eprintln!( - "codex exec-server remote executor {} registered with executor_id {}", - response.id, response.executor_id + "codex exec-server remote executor registered with executor_id {}", + response.executor_id ); match connect_async(response.url.as_str()).await { @@ -211,26 +166,28 @@ pub async fn run_remote_executor( } fn read_remote_bearer_token_from_env() -> Result { - read_remote_bearer_token_from_env_with(|name| env::var(name)) + read_bearer_token_from_env_var(CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR) } -fn read_remote_bearer_token_from_env_with(get_var: F) -> Result -where - F: FnOnce(&str) -> Result, -{ - let bearer_token = get_var(CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR).map_err(|_| { +pub(crate) fn read_bearer_token_from_env_var( + env_var_name: &str, +) -> Result { + let bearer_token = env::var(env_var_name).map_err(|_| { ExecServerError::ExecutorRegistryAuth(format!( - "executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is not set" + "executor registry bearer token environment variable `{env_var_name}` is not set" )) })?; - normalize_bearer_token(bearer_token) + normalize_bearer_token(bearer_token, env_var_name) } -fn normalize_bearer_token(bearer_token: String) -> Result { +fn normalize_bearer_token( + bearer_token: String, + env_var_name: &str, +) -> Result { let bearer_token = bearer_token.trim().to_string(); if bearer_token.is_empty() { return Err(ExecServerError::ExecutorRegistryAuth(format!( - "executor registry bearer token environment variable `{CODEX_EXEC_SERVER_REMOTE_BEARER_TOKEN_ENV_VAR}` is empty" + "executor registry bearer token environment variable `{env_var_name}` is empty" ))); } Ok(bearer_token) @@ -323,11 +280,9 @@ fn preview_error_body(body: &str) -> Option { #[cfg(test)] mod tests { use pretty_assertions::assert_eq; - use serde_json::json; use wiremock::Mock; use wiremock::MockServer; use wiremock::ResponseTemplate; - use wiremock::matchers::body_json; use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; @@ -337,21 +292,16 @@ mod tests { #[tokio::test] async fn register_executor_posts_with_bearer_token_header() { let server = MockServer::start().await; - let registration_id = Uuid::from_u128(1); let config = RemoteExecutorConfig::with_bearer_token( server.uri(), "exec-requested".to_string(), "registry-token".to_string(), ) .expect("config"); - let request = config.registration_request(registration_id); - let expected_request = serde_json::to_value(&request).expect("serialize request"); Mock::given(method("POST")) .and(path("/cloud/executor/exec-requested/register")) .and(header("authorization", "Bearer registry-token")) - .and(body_json(expected_request)) - .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "id": "registration-1", + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ "executor_id": "exec-1", "url": "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc" }))) @@ -361,14 +311,13 @@ mod tests { .expect("client"); let response = client - .register_executor(&request) + .register_executor(&config.executor_id) .await .expect("register executor"); assert_eq!( response, ExecutorRegistryExecutorRegistrationResponse { - id: "registration-1".to_string(), executor_id: "exec-1".to_string(), url: "wss://rendezvous.test/executor/exec-1?role=executor&sig=abc".to_string(), }