Skip to content

Commit 7914b4b

Browse files
aibrahim-oaicodex
andcommitted
Add executor-backed RMCP HTTP client
Add an environment streamable HTTP client that implements RMCP streamable HTTP over executor http/request calls, including streamed response-body deltas and the existing orchestrator-owned auth path. Co-authored-by: Codex <noreply@openai.com>
1 parent 4690cae commit 7914b4b

7 files changed

Lines changed: 753 additions & 0 deletions

File tree

codex-rs/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/exec-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mod server;
2121

2222
pub use client::ExecServerClient;
2323
pub use client::ExecServerError;
24+
pub use client::HttpResponseBodyStream;
2425
pub use client_api::ExecServerClientConnectOptions;
2526
pub use client_api::RemoteExecServerConnectArgs;
2627
pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR;

codex-rs/rmcp-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ codex-keyring-store = { workspace = true }
2020
codex-protocol = { workspace = true }
2121
codex-utils-pty = { workspace = true }
2222
codex-utils-home-dir = { workspace = true }
23+
bytes = { workspace = true }
2324
futures = { workspace = true, default-features = false, features = ["std"] }
2425
keyring = { workspace = true, features = ["crypto-rust"] }
2526
oauth2 = "5"

codex-rs/rmcp-client/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod perform_oauth_login;
77
mod program_resolver;
88
mod rmcp_client;
99
mod stdio_server_launcher;
10+
mod streamable_http_environment_client;
1011
mod utils;
1112

1213
pub use auth_status::StreamableHttpOAuthDiscovery;

codex-rs/rmcp-client/src/rmcp_client.rs

Lines changed: 216 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use anyhow::Result;
1414
use anyhow::anyhow;
1515
use codex_client::build_reqwest_client_with_custom_ca;
1616
use codex_config::types::McpServerEnvVar;
17+
use codex_exec_server::ExecServerClient;
1718
use futures::FutureExt;
1819
use futures::StreamExt;
1920
use futures::future::BoxFuture;
@@ -74,6 +75,8 @@ use crate::oauth::StoredOAuthTokens;
7475
use crate::stdio_server_launcher::StdioServerCommand;
7576
use crate::stdio_server_launcher::StdioServerLauncher;
7677
use crate::stdio_server_launcher::StdioServerTransport;
78+
use crate::streamable_http_environment_client::EnvironmentStreamableHttpClient;
79+
use crate::streamable_http_environment_client::EnvironmentStreamableHttpClientError;
7780
use crate::utils::apply_default_headers;
7881
use crate::utils::build_default_headers;
7982
use codex_config::types::OAuthCredentialsStoreMode;
@@ -309,10 +312,17 @@ enum PendingTransport {
309312
StreamableHttp {
310313
transport: StreamableHttpClientTransport<StreamableHttpResponseClient>,
311314
},
315+
EnvironmentStreamableHttp {
316+
transport: StreamableHttpClientTransport<EnvironmentStreamableHttpClient>,
317+
},
312318
StreamableHttpWithOAuth {
313319
transport: StreamableHttpClientTransport<AuthClient<StreamableHttpResponseClient>>,
314320
oauth_persistor: OAuthPersistor,
315321
},
322+
EnvironmentStreamableHttpWithOAuth {
323+
transport: StreamableHttpClientTransport<AuthClient<EnvironmentStreamableHttpClient>>,
324+
oauth_persistor: OAuthPersistor,
325+
},
316326
}
317327

318328
enum ClientState {
@@ -339,6 +349,15 @@ enum TransportRecipe {
339349
env_http_headers: Option<HashMap<String, String>>,
340350
store_mode: OAuthCredentialsStoreMode,
341351
},
352+
EnvironmentStreamableHttp {
353+
server_name: String,
354+
url: String,
355+
bearer_token: Option<String>,
356+
http_headers: Option<HashMap<String, String>>,
357+
env_http_headers: Option<HashMap<String, String>>,
358+
store_mode: OAuthCredentialsStoreMode,
359+
exec_client: ExecServerClient,
360+
},
342361
}
343362

344363
#[derive(Clone)]
@@ -556,6 +575,43 @@ impl RmcpClient {
556575
})
557576
}
558577

578+
/// Creates a Streamable HTTP MCP client whose HTTP requests run through an
579+
/// exec-server connection.
580+
///
581+
/// This keeps MCP protocol handling local to this crate while letting the
582+
/// executor environment own DNS resolution, network reachability, and
583+
/// response-body streaming for remote MCP servers.
584+
#[allow(clippy::too_many_arguments)]
585+
pub async fn new_environment_streamable_http_client(
586+
server_name: &str,
587+
url: &str,
588+
bearer_token: Option<String>,
589+
http_headers: Option<HashMap<String, String>>,
590+
env_http_headers: Option<HashMap<String, String>>,
591+
store_mode: OAuthCredentialsStoreMode,
592+
exec_client: ExecServerClient,
593+
) -> Result<Self> {
594+
let transport_recipe = TransportRecipe::EnvironmentStreamableHttp {
595+
server_name: server_name.to_string(),
596+
url: url.to_string(),
597+
bearer_token,
598+
http_headers,
599+
env_http_headers,
600+
store_mode,
601+
exec_client,
602+
};
603+
let transport = Self::create_pending_transport(&transport_recipe).await?;
604+
Ok(Self {
605+
state: Mutex::new(ClientState::Connecting {
606+
transport: Some(transport),
607+
}),
608+
transport_recipe,
609+
initialize_context: Mutex::new(None),
610+
session_recovery_lock: Mutex::new(()),
611+
elicitation_pause_state: ElicitationPauseState::new(),
612+
})
613+
}
614+
559615
/// Perform the initialization handshake with the MCP server.
560616
/// https://modelcontextprotocol.io/specification/2025-06-18/basic/lifecycle#initialization
561617
pub async fn initialize(
@@ -969,6 +1025,90 @@ impl RmcpClient {
9691025
Ok(PendingTransport::StreamableHttp { transport })
9701026
}
9711027
}
1028+
TransportRecipe::EnvironmentStreamableHttp {
1029+
server_name,
1030+
url,
1031+
bearer_token,
1032+
http_headers,
1033+
env_http_headers,
1034+
store_mode,
1035+
exec_client,
1036+
} => {
1037+
let default_headers =
1038+
build_default_headers(http_headers.clone(), env_http_headers.clone())?;
1039+
1040+
let initial_oauth_tokens =
1041+
if bearer_token.is_none() && !default_headers.contains_key(AUTHORIZATION) {
1042+
match load_oauth_tokens(server_name, url, *store_mode) {
1043+
Ok(tokens) => tokens,
1044+
Err(err) => {
1045+
warn!("failed to read tokens for server `{server_name}`: {err}");
1046+
None
1047+
}
1048+
}
1049+
} else {
1050+
None
1051+
};
1052+
1053+
if let Some(initial_tokens) = initial_oauth_tokens.clone() {
1054+
match create_environment_oauth_transport_and_runtime(
1055+
server_name,
1056+
url,
1057+
initial_tokens.clone(),
1058+
*store_mode,
1059+
default_headers.clone(),
1060+
exec_client.clone(),
1061+
)
1062+
.await
1063+
{
1064+
Ok((transport, oauth_persistor)) => {
1065+
Ok(PendingTransport::EnvironmentStreamableHttpWithOAuth {
1066+
transport,
1067+
oauth_persistor,
1068+
})
1069+
}
1070+
Err(err)
1071+
if err.downcast_ref::<AuthError>().is_some_and(|auth_err| {
1072+
matches!(auth_err, AuthError::NoAuthorizationSupport)
1073+
}) =>
1074+
{
1075+
let access_token = initial_tokens
1076+
.token_response
1077+
.0
1078+
.access_token()
1079+
.secret()
1080+
.to_string();
1081+
warn!(
1082+
"OAuth metadata discovery is unavailable for MCP server `{server_name}`; falling back to stored bearer token authentication"
1083+
);
1084+
let http_config =
1085+
StreamableHttpClientTransportConfig::with_uri(url.clone())
1086+
.auth_header(access_token);
1087+
let transport = StreamableHttpClientTransport::with_client(
1088+
EnvironmentStreamableHttpClient::new(
1089+
exec_client.clone(),
1090+
default_headers,
1091+
),
1092+
http_config,
1093+
);
1094+
Ok(PendingTransport::EnvironmentStreamableHttp { transport })
1095+
}
1096+
Err(err) => Err(err),
1097+
}
1098+
} else {
1099+
let mut http_config =
1100+
StreamableHttpClientTransportConfig::with_uri(url.clone());
1101+
if let Some(bearer_token) = bearer_token.clone() {
1102+
http_config = http_config.auth_header(bearer_token);
1103+
}
1104+
1105+
let transport = StreamableHttpClientTransport::with_client(
1106+
EnvironmentStreamableHttpClient::new(exec_client.clone(), default_headers),
1107+
http_config,
1108+
);
1109+
Ok(PendingTransport::EnvironmentStreamableHttp { transport })
1110+
}
1111+
}
9721112
}
9731113
}
9741114

@@ -989,13 +1129,24 @@ impl RmcpClient {
9891129
service::serve_client(client_service, transport).boxed(),
9901130
None,
9911131
),
1132+
PendingTransport::EnvironmentStreamableHttp { transport } => (
1133+
service::serve_client(client_service, transport).boxed(),
1134+
None,
1135+
),
9921136
PendingTransport::StreamableHttpWithOAuth {
9931137
transport,
9941138
oauth_persistor,
9951139
} => (
9961140
service::serve_client(client_service, transport).boxed(),
9971141
Some(oauth_persistor),
9981142
),
1143+
PendingTransport::EnvironmentStreamableHttpWithOAuth {
1144+
transport,
1145+
oauth_persistor,
1146+
} => (
1147+
service::serve_client(client_service, transport).boxed(),
1148+
Some(oauth_persistor),
1149+
),
9991150
};
10001151

10011152
let service = match timeout {
@@ -1092,6 +1243,17 @@ impl RmcpClient {
10921243
)
10931244
)
10941245
})
1246+
|| error
1247+
.error
1248+
.downcast_ref::<StreamableHttpError<EnvironmentStreamableHttpClientError>>()
1249+
.is_some_and(|error| {
1250+
matches!(
1251+
error,
1252+
StreamableHttpError::Client(
1253+
EnvironmentStreamableHttpClientError::SessionExpired404
1254+
)
1255+
)
1256+
})
10951257
}
10961258

10971259
async fn reinitialize_after_session_expiry(
@@ -1192,6 +1354,60 @@ async fn create_oauth_transport_and_runtime(
11921354
Ok((transport, runtime))
11931355
}
11941356

1357+
/// Builds an executor-backed Streamable HTTP transport with stored OAuth tokens.
1358+
async fn create_environment_oauth_transport_and_runtime(
1359+
server_name: &str,
1360+
url: &str,
1361+
initial_tokens: StoredOAuthTokens,
1362+
credentials_store: OAuthCredentialsStoreMode,
1363+
default_headers: HeaderMap,
1364+
exec_client: ExecServerClient,
1365+
) -> Result<(
1366+
StreamableHttpClientTransport<AuthClient<EnvironmentStreamableHttpClient>>,
1367+
OAuthPersistor,
1368+
)> {
1369+
// OAuth discovery still runs from the orchestrator because credentials are
1370+
// persisted locally, while MCP traffic after auth goes through exec-server.
1371+
let metadata_client = build_http_client(&default_headers)?;
1372+
let mut oauth_state = OAuthState::new(url.to_string(), Some(metadata_client)).await?;
1373+
1374+
oauth_state
1375+
.set_credentials(
1376+
&initial_tokens.client_id,
1377+
initial_tokens.token_response.0.clone(),
1378+
)
1379+
.await?;
1380+
1381+
let manager = match oauth_state {
1382+
OAuthState::Authorized(manager) => manager,
1383+
OAuthState::Unauthorized(manager) => manager,
1384+
OAuthState::Session(_) | OAuthState::AuthorizedHttpClient(_) => {
1385+
return Err(anyhow!("unexpected OAuth state during client setup"));
1386+
}
1387+
};
1388+
1389+
let auth_client = AuthClient::new(
1390+
EnvironmentStreamableHttpClient::new(exec_client, default_headers),
1391+
manager,
1392+
);
1393+
let auth_manager = auth_client.auth_manager.clone();
1394+
1395+
let transport = StreamableHttpClientTransport::with_client(
1396+
auth_client,
1397+
StreamableHttpClientTransportConfig::with_uri(url.to_string()),
1398+
);
1399+
1400+
let runtime = OAuthPersistor::new(
1401+
server_name.to_string(),
1402+
url.to_string(),
1403+
auth_manager,
1404+
credentials_store,
1405+
Some(initial_tokens),
1406+
);
1407+
1408+
Ok((transport, runtime))
1409+
}
1410+
11951411
#[cfg(test)]
11961412
mod tests {
11971413
use std::time::Duration;

0 commit comments

Comments
 (0)