Skip to content

Commit 4690cae

Browse files
aibrahim-oaicodex
andcommitted
Implement executor HTTP request runner
Run http/request calls with executor-side reqwest, returning buffered bodies directly and streaming response bodies through ordered http/request/bodyDelta notifications. Register the executor handler while keeping the API transport-shaped. Co-authored-by: Codex <noreply@openai.com>
1 parent 1a9358f commit 4690cae

7 files changed

Lines changed: 572 additions & 0 deletions

File tree

codex-rs/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/exec-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ codex-sandboxing = { workspace = true }
2121
codex-utils-absolute-path = { workspace = true }
2222
codex-utils-pty = { workspace = true }
2323
futures = { workspace = true }
24+
reqwest = { workspace = true, features = ["rustls-tls", "stream"] }
2425
serde = { workspace = true, features = ["derive"] }
2526
serde_json = { workspace = true }
2627
thiserror = { workspace = true }
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
//! Executor-owned HTTP request runner.
2+
//!
3+
//! The remote MCP path uses this module to make HTTP requests from the same
4+
//! environment that owns stdio process execution. Buffered responses return as a
5+
//! normal JSON-RPC result, while streaming responses split headers and body
6+
//! frames so Streamable HTTP clients can process SSE data before EOF.
7+
8+
use std::time::Duration;
9+
10+
use codex_app_server_protocol::JSONRPCErrorError;
11+
use futures::StreamExt;
12+
use reqwest::Method;
13+
use reqwest::Url;
14+
use reqwest::header::HeaderMap;
15+
use reqwest::header::HeaderName;
16+
use reqwest::header::HeaderValue;
17+
18+
use crate::protocol::HTTP_REQUEST_BODY_DELTA_METHOD;
19+
use crate::protocol::HttpHeader;
20+
use crate::protocol::HttpRequestBodyDeltaNotification;
21+
use crate::protocol::HttpRequestParams;
22+
use crate::protocol::HttpRequestResponse;
23+
use crate::rpc::RpcNotificationSender;
24+
use crate::rpc::internal_error;
25+
use crate::rpc::invalid_params;
26+
27+
/// Default timeout for executor HTTP requests when the protocol omits one.
28+
const DEFAULT_HTTP_REQUEST_TIMEOUT: Duration = Duration::from_secs(30);
29+
30+
/// Runs one executor HTTP request and returns the JSON-RPC response payload.
31+
///
32+
/// When `stream_response` is set, the returned body is empty and the response
33+
/// bytes are emitted as ordered `http/request/bodyDelta` notifications.
34+
pub(crate) async fn run_http_request(
35+
params: HttpRequestParams,
36+
notifications: RpcNotificationSender,
37+
) -> Result<HttpRequestResponse, JSONRPCErrorError> {
38+
let method = Method::from_bytes(params.method.as_bytes())
39+
.map_err(|err| invalid_params(format!("http/request method is invalid: {err}")))?;
40+
let url = Url::parse(&params.url)
41+
.map_err(|err| invalid_params(format!("http/request url is invalid: {err}")))?;
42+
match url.scheme() {
43+
"http" | "https" => {}
44+
scheme => {
45+
return Err(invalid_params(format!(
46+
"http/request only supports http and https URLs, got {scheme}"
47+
)));
48+
}
49+
}
50+
51+
let request_id = if params.stream_response {
52+
Some(params.request_id.clone().ok_or_else(|| {
53+
invalid_params("http/request streamResponse requires requestId".to_string())
54+
})?)
55+
} else {
56+
None
57+
};
58+
let timeout = params
59+
.timeout_ms
60+
.map(Duration::from_millis)
61+
.unwrap_or(DEFAULT_HTTP_REQUEST_TIMEOUT);
62+
let headers = build_headers(params.headers)?;
63+
let client = reqwest::Client::builder()
64+
.timeout(timeout)
65+
.build()
66+
.map_err(|err| internal_error(format!("failed to build http/request client: {err}")))?;
67+
68+
let mut request = client.request(method, url).headers(headers);
69+
if let Some(body) = params.body {
70+
request = request.body(body.into_inner());
71+
}
72+
73+
let response = request
74+
.send()
75+
.await
76+
.map_err(|err| internal_error(format!("http/request failed: {err}")))?;
77+
let status = response.status().as_u16();
78+
let headers = response_headers(response.headers());
79+
80+
if let Some(request_id) = request_id {
81+
// The JSON-RPC response carries status and headers; the body follows as
82+
// ordered notifications so callers can begin parsing streaming content
83+
// without waiting for the server to close the HTTP response.
84+
spawn_body_stream(request_id, response, notifications);
85+
return Ok(HttpRequestResponse {
86+
status,
87+
headers,
88+
body: Vec::new().into(),
89+
});
90+
}
91+
92+
let body = response.bytes().await.map_err(|err| {
93+
internal_error(format!("failed to read http/request response body: {err}"))
94+
})?;
95+
96+
Ok(HttpRequestResponse {
97+
status,
98+
headers,
99+
body: body.to_vec().into(),
100+
})
101+
}
102+
103+
/// Converts protocol headers into a reqwest header map while preserving repeats.
104+
fn build_headers(headers: Vec<HttpHeader>) -> Result<HeaderMap, JSONRPCErrorError> {
105+
let mut header_map = HeaderMap::new();
106+
for header in headers {
107+
let name = HeaderName::from_bytes(header.name.as_bytes())
108+
.map_err(|err| invalid_params(format!("http/request header name is invalid: {err}")))?;
109+
let value = HeaderValue::from_str(&header.value).map_err(|err| {
110+
invalid_params(format!(
111+
"http/request header value is invalid for {}: {err}",
112+
header.name
113+
))
114+
})?;
115+
header_map.append(name, value);
116+
}
117+
Ok(header_map)
118+
}
119+
120+
/// Converts response headers back into protocol headers with UTF-8 values only.
121+
fn response_headers(headers: &HeaderMap) -> Vec<HttpHeader> {
122+
headers
123+
.iter()
124+
.filter_map(|(name, value)| {
125+
Some(HttpHeader {
126+
name: name.as_str().to_string(),
127+
value: value.to_str().ok()?.to_string(),
128+
})
129+
})
130+
.collect()
131+
}
132+
133+
/// Spawns the task that bridges a reqwest byte stream to JSON-RPC notifications.
134+
fn spawn_body_stream(
135+
request_id: String,
136+
response: reqwest::Response,
137+
notifications: RpcNotificationSender,
138+
) {
139+
tokio::spawn(async move {
140+
let mut seq = 1;
141+
let mut body = response.bytes_stream();
142+
while let Some(chunk) = body.next().await {
143+
match chunk {
144+
Ok(bytes) => {
145+
if !send_body_delta(
146+
&notifications,
147+
HttpRequestBodyDeltaNotification {
148+
request_id: request_id.clone(),
149+
seq,
150+
delta: bytes.to_vec().into(),
151+
done: false,
152+
error: None,
153+
},
154+
)
155+
.await
156+
{
157+
return;
158+
}
159+
seq += 1;
160+
}
161+
Err(err) => {
162+
let _ = send_body_delta(
163+
&notifications,
164+
HttpRequestBodyDeltaNotification {
165+
request_id,
166+
seq,
167+
delta: Vec::new().into(),
168+
done: true,
169+
error: Some(err.to_string()),
170+
},
171+
)
172+
.await;
173+
return;
174+
}
175+
}
176+
}
177+
178+
let _ = send_body_delta(
179+
&notifications,
180+
HttpRequestBodyDeltaNotification {
181+
request_id,
182+
seq,
183+
delta: Vec::new().into(),
184+
done: true,
185+
error: None,
186+
},
187+
)
188+
.await;
189+
});
190+
}
191+
192+
/// Sends one streamed response-body notification.
193+
///
194+
/// Returns `false` when the JSON-RPC connection has closed, letting the stream
195+
/// task stop without treating disconnects as executor errors.
196+
async fn send_body_delta(
197+
notifications: &RpcNotificationSender,
198+
delta: HttpRequestBodyDeltaNotification,
199+
) -> bool {
200+
notifications
201+
.notify(HTTP_REQUEST_BODY_DELTA_METHOD, &delta)
202+
.await
203+
.is_ok()
204+
}

codex-rs/exec-server/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod file_system;
66
mod fs_helper;
77
mod fs_helper_main;
88
mod fs_sandbox;
9+
mod http_request;
910
mod local_file_system;
1011
mod local_process;
1112
mod process;

codex-rs/exec-server/src/server/handler.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use crate::protocol::FsRemoveParams;
2222
use crate::protocol::FsRemoveResponse;
2323
use crate::protocol::FsWriteFileParams;
2424
use crate::protocol::FsWriteFileResponse;
25+
use crate::protocol::HttpRequestParams;
26+
use crate::protocol::HttpRequestResponse;
2527
use crate::protocol::InitializeParams;
2628
use crate::protocol::InitializeResponse;
2729
use crate::protocol::ReadParams;
@@ -203,6 +205,14 @@ impl ExecServerHandler {
203205
self.file_system.copy(params).await
204206
}
205207

208+
pub(crate) async fn http_request(
209+
&self,
210+
params: HttpRequestParams,
211+
) -> Result<HttpRequestResponse, JSONRPCErrorError> {
212+
self.require_initialized_for("http")?;
213+
crate::http_request::run_http_request(params, self.notifications.clone()).await
214+
}
215+
206216
fn require_initialized_for(
207217
&self,
208218
method_family: &str,

codex-rs/exec-server/src/server/registry.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ use crate::protocol::FsReadDirectoryParams;
1919
use crate::protocol::FsReadFileParams;
2020
use crate::protocol::FsRemoveParams;
2121
use crate::protocol::FsWriteFileParams;
22+
use crate::protocol::HTTP_REQUEST_METHOD;
23+
use crate::protocol::HttpRequestParams;
2224
use crate::protocol::INITIALIZE_METHOD;
2325
use crate::protocol::INITIALIZED_METHOD;
2426
use crate::protocol::InitializeParams;
@@ -106,5 +108,11 @@ pub(crate) fn build_router() -> RpcRouter<ExecServerHandler> {
106108
handler.fs_copy(params).await
107109
},
108110
);
111+
router.request(
112+
HTTP_REQUEST_METHOD,
113+
|handler: Arc<ExecServerHandler>, params: HttpRequestParams| async move {
114+
handler.http_request(params).await
115+
},
116+
);
109117
router
110118
}

0 commit comments

Comments
 (0)