diff --git a/assets/apps_script/Code.gs b/assets/apps_script/Code.gs index f9d03755..835568ee 100644 --- a/assets/apps_script/Code.gs +++ b/assets/apps_script/Code.gs @@ -46,6 +46,32 @@ const AUTH_KEY = "CHANGE_ME_TO_A_STRONG_SECRET"; // (Inspired by #365 Section 3, mhrv-rs v1.8.0+.) const DIAGNOSTIC_MODE = false; +// ── Response header noise filtering ──────────────────────────────────────── +// CDN stacks (Cloudflare, AWS, Fastly, Google) attach metadata headers to +// every response that are useless through a MITM relay: report-to, nel, +// alt-svc, server-timing, etc. These add 400-700 bytes of JSON per response +// for no benefit — the relay ignores them and the browser never reads them. +// +// STRIP_NOISE_RESPONSE_HEADERS controls whether _respHeaders() filters them +// before returning. Hardcoded true here for GAS-side payload reduction. +// The primary user toggle is `strip_noise_response_headers` in config.toml +// on the Rust client side, which drops them even if Code.gs sends them. +// +// Set to false only if you need to see raw origin headers in GAS logs. +// --------------------------------------------------------------------------- +const STRIP_NOISE_RESPONSE_HEADERS = true; + +const STRIP_RESPONSE_HEADERS = { + "report-to": 1, "reporting-endpoints": 1, + "nel": 1, + "alt-svc": 1, + "server-timing": 1, + "origin-trial": 1, + "cf-ray": 1, "cf-cache-status": 1, + "x-amzn-requestid": 1, "x-amzn-trace-id": 1, + "x-request-id": 1, "x-correlation-id": 1, +}; + // ── Optional Spreadsheet Cache ────────────────────────────── // Set to a valid Spreadsheet ID to enable response caching. // Leave as-is to disable caching entirely (zero overhead). @@ -329,12 +355,20 @@ function _buildOpts(req) { } function _respHeaders(resp) { + var raw; try { - if (typeof resp.getAllHeaders === "function") { - return resp.getAllHeaders(); - } - } catch (err) {} - return resp.getHeaders(); + raw = typeof resp.getAllHeaders === "function" + ? resp.getAllHeaders() + : resp.getHeaders(); + } catch (err) { + raw = {}; + } + if (!STRIP_NOISE_RESPONSE_HEADERS) return raw; + var out = {}; + for (var k in raw) { + if (!STRIP_RESPONSE_HEADERS[k.toLowerCase()]) out[k] = raw[k]; + } + return out; } function _json(obj) { diff --git a/src/config.rs b/src/config.rs index f09f9f33..aa15b9a8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -112,6 +112,13 @@ pub struct Config { /// Hard cap on total coalesce wait (ms). 0 = use compiled default (1000ms). #[serde(default)] pub coalesce_max_ms: u16, + /// Adaptive coalescing preset. One of "auto" (default), "fast", or "slow". + /// "auto" measures batch RTT and switches automatically. + /// "fast" uses 50ms/300ms windows — best for broadband/fiber. + /// "slow" uses 150ms/600ms windows — best for slow links (Iran cable, mobile). + /// Leave unset or set to "auto" for automatic detection. + #[serde(default)] + pub network_preset: Option, /// Optional explicit SNI rotation pool for outbound TLS to `google_ip`. /// Empty / missing = auto-expand from `front_domain` (current default of /// {www, mail, drive, docs, calendar}.google.com). Set to an explicit list @@ -426,6 +433,16 @@ pub struct Config { /// Default 500. #[serde(default = "default_quota_safety_buffer")] pub quota_safety_buffer: u64, + + /// Strip CDN noise headers from relay responses before forwarding to + /// the browser. Headers such as `report-to`, `nel`, `alt-svc`, and + /// `server-timing` are attached by modern CDNs (Cloudflare, AWS, + /// Fastly) and add 400–700 bytes per response for no benefit through + /// a MITM relay — the proxy ignores them and the browser never reads + /// them. Default `true`. Set to `false` only to inspect raw origin + /// headers for debugging. + #[serde(default = "default_strip_noise_response_headers")] + pub strip_noise_response_headers: bool, } /// Configuration for the optional second-hop exit node. @@ -556,6 +573,7 @@ fn default_auto_blacklist_window_secs() -> u64 { 30 } fn default_auto_blacklist_cooldown_secs() -> u64 { 120 } fn default_quota_daily_limit() -> u64 { 20_000 } fn default_quota_safety_buffer() -> u64 { 500 } +fn default_strip_noise_response_headers() -> bool { true } /// Default for `request_timeout_secs`: 30s, matching the historical /// hard-coded `BATCH_TIMEOUT` and Apps Script's typical response cliff. @@ -785,6 +803,8 @@ pub struct TomlRelay { #[serde(default)] pub coalesce_max_ms: u16, #[serde(default)] + pub network_preset: Option, + #[serde(default)] pub youtube_via_relay: bool, #[serde(default)] pub normalize_x_graphql: bool, @@ -802,6 +822,8 @@ pub struct TomlRelay { pub request_timeout_secs: u64, #[serde(default = "default_stream_timeout_secs")] pub stream_timeout_secs: u64, + #[serde(default = "default_strip_noise_response_headers")] + pub strip_noise_response_headers: bool, } /// [network] section of config.toml. @@ -935,6 +957,7 @@ impl From for Config { parallel_relay: t.relay.parallel_relay, coalesce_step_ms: t.relay.coalesce_step_ms, coalesce_max_ms: t.relay.coalesce_max_ms, + network_preset: t.relay.network_preset, sni_hosts: t.network.sni_hosts, fetch_ips_from_api: t.scan.fetch_ips_from_api, max_ips_to_scan: t.scan.max_ips_to_scan, @@ -959,6 +982,7 @@ impl From for Config { exit_node: t.exit_node, quota_daily_limit: default_quota_daily_limit(), quota_safety_buffer: default_quota_safety_buffer(), + strip_noise_response_headers: t.relay.strip_noise_response_headers, } } } @@ -977,6 +1001,7 @@ impl From<&Config> for TomlConfig { enable_batching: c.enable_batching, coalesce_step_ms: c.coalesce_step_ms, coalesce_max_ms: c.coalesce_max_ms, + network_preset: c.network_preset.clone(), youtube_via_relay: c.youtube_via_relay, normalize_x_graphql: c.normalize_x_graphql, disable_padding: c.disable_padding, @@ -986,6 +1011,7 @@ impl From<&Config> for TomlConfig { auto_blacklist_cooldown_secs: c.auto_blacklist_cooldown_secs, request_timeout_secs: c.request_timeout_secs, stream_timeout_secs: c.stream_timeout_secs, + strip_noise_response_headers: c.strip_noise_response_headers, }, network: TomlNetwork { google_ip: c.google_ip.clone(), diff --git a/src/domain_fronter.rs b/src/domain_fronter.rs index e92dfaca..4097e8bb 100644 --- a/src/domain_fronter.rs +++ b/src/domain_fronter.rs @@ -95,9 +95,9 @@ impl FronterError { } type PooledStream = TlsStream; -const POOL_TTL_SECS: u64 = 60; +const POOL_TTL_SECS: u64 = 30; const POOL_MIN: usize = 8; -const POOL_REFILL_INTERVAL_SECS: u64 = 5; +const POOL_REFILL_INTERVAL_SECS: u64 = 2; const POOL_MAX: usize = 80; const REQUEST_TIMEOUT_SECS: u64 = 25; const RANGE_PARALLEL_CHUNK_BYTES: u64 = 256 * 1024; @@ -118,7 +118,7 @@ const H2_CONN_TTL_SECS: u64 = 540; /// `h2_round_trip`. This way a slow but legitimate Apps Script call /// isn't cut off at an arbitrary fixed cap, and Full-mode batches can /// honor the user's `request_timeout_secs` setting. -const H2_READY_TIMEOUT_SECS: u64 = 5; +const H2_READY_TIMEOUT_SECS: u64 = 3; /// Default response-phase deadline used by `relay_uncoalesced` callers /// (the Apps-Script direct path). Sized to be just under the outer /// `REQUEST_TIMEOUT_SECS` (25 s) so an h2 timeout still leaves a few @@ -147,7 +147,7 @@ const H1_OPEN_TIMEOUT_SECS: u64 = 8; /// containers go cold after ~5min idle and cost 1-3s on the first /// request to wake back up — most painful on YouTube / streaming where /// the first chunk after a quiet pause stalls the player. -const H1_KEEPALIVE_INTERVAL_SECS: u64 = 240; +const H1_KEEPALIVE_INTERVAL_SECS: u64 = 60; /// Largest response body Apps Script's `UrlFetchApp` will deliver before /// the script gets killed mid-execution. The hard wire ceiling is ~50 MiB; /// after base64 / envelope overhead and edge variance, the practical raw @@ -413,6 +413,10 @@ pub struct DomainFronter { /// payloads. Mirrors `Config::disable_padding` (#391). Default false /// (padding active = stronger DPI defense at +25% bandwidth cost). disable_padding: bool, + /// Strip CDN noise headers (report-to, nel, alt-svc, etc.) from the + /// relay response before forwarding to the browser. Default true. + /// Mirrors `Config::strip_noise_response_headers`. + strip_noise_response_headers: bool, zstd_enabled: Arc, /// Per-instance auto-blacklist tuning. Mirrors `Config::auto_blacklist_*` /// (#391, #444). Cached here so the hot path in `record_timeout_strike` @@ -648,6 +652,7 @@ impl DomainFronter { today_bytes: AtomicU64::new(0), today_key: std::sync::Mutex::new(current_pt_day_key()), disable_padding: config.disable_padding, + strip_noise_response_headers: config.strip_noise_response_headers, zstd_enabled: Arc::new(AtomicBool::new(false)), auto_blacklist_strikes: config.auto_blacklist_strikes.max(1), auto_blacklist_window: Duration::from_secs( @@ -1405,10 +1410,13 @@ impl DomainFronter { // `release_capacity` on every chunk for typical Apps Script // payloads (usually < 1 MB; range chunks are 256 KB). We still // release capacity in the body-read loop for safety on larger - // bodies. + // bodies. 16/32 MB windows eliminate stalls for range-parallel + // streaming (256 KB chunks × many streams) without adding memory + // overhead on idle connections (the window is just a counter until + // data flows). let (send, conn) = h2::client::Builder::new() - .initial_window_size(4 * 1024 * 1024) - .initial_connection_window_size(8 * 1024 * 1024) + .initial_window_size(16 * 1024 * 1024) + .initial_connection_window_size(32 * 1024 * 1024) .handshake(tls) .await .map_err(|e| OpenH2Error::Handshake(e.to_string()))?; @@ -1626,9 +1634,15 @@ impl DomainFronter { // through Apps Script (where a 256 KB range chunk can take 30-90s // of wall-clock time) are not killed by the tighter `batch_timeout`. // Release flow-control credit per chunk so large responses don't - // stall after the initial 4 MB window. + // stall after the initial window. + // Pre-size from content-length to avoid O(log n) realloc cycles + // on large GAS responses (up to 40 MB). let stream_timeout = self.stream_timeout(); - let mut buf: Vec = Vec::new(); + let body_hint: usize = headers.iter() + .find(|(k, _)| k.eq_ignore_ascii_case("content-length")) + .and_then(|(_, v)| v.parse().ok()) + .unwrap_or(0); + let mut buf: Vec = Vec::with_capacity(body_hint.min(APPS_SCRIPT_BODY_MAX_BYTES as usize)); loop { match tokio::time::timeout(stream_timeout, body.data()).await { Ok(None) => break, @@ -2842,7 +2856,7 @@ impl DomainFronter { status, body_txt ))); } - return parse_relay_json(&resp_body).map_err(|e| { + return parse_relay_json(&resp_body, self.strip_noise_response_headers).map_err(|e| { if let FronterError::Relay(ref msg) = e { if looks_like_quota_error(msg) { self.blacklist_script(&script_id, msg); @@ -2951,7 +2965,7 @@ impl DomainFronter { status, body_txt ))); } - match parse_relay_json(&resp_body) { + match parse_relay_json(&resp_body, self.strip_noise_response_headers) { Ok(bytes) => Ok::<_, FronterError>((bytes, true)), Err(e) => { if let FronterError::Relay(ref msg) = e { @@ -4992,8 +5006,27 @@ fn is_h2_fronting_refusal_status(status: u16) -> bool { status == 421 } +/// CDN metadata headers that carry no value through a MITM relay. +/// Stripped when `strip_noise_response_headers = true` (the default). +/// The browser never reads them through a proxy, and they add 400-700 bytes +/// of JSON per CDN-backed response for zero benefit. +static NOISE_RESPONSE_HEADERS: &[&str] = &[ + "report-to", + "reporting-endpoints", + "nel", + "alt-svc", + "server-timing", + "origin-trial", + "cf-ray", + "cf-cache-status", + "x-amzn-requestid", + "x-amzn-trace-id", + "x-request-id", + "x-correlation-id", +]; + /// Parse the JSON envelope from Apps Script and build a raw HTTP response. -fn parse_relay_json(body: &[u8]) -> Result, FronterError> { +fn parse_relay_json(body: &[u8], strip_noise: bool) -> Result, FronterError> { let text = std::str::from_utf8(body) .map_err(|_| FronterError::BadResponse("non-utf8 json".into()))? .trim(); @@ -5075,6 +5108,9 @@ fn parse_relay_json(body: &[u8]) -> Result, FronterError> { if SKIP.contains(&lk.as_str()) { continue; } + if strip_noise && NOISE_RESPONSE_HEADERS.contains(&lk.as_str()) { + continue; + } match v { Value::Array(arr) => { for item in arr { @@ -5896,7 +5932,7 @@ mod tests { #[test] fn parse_relay_basic_json() { let body = r#"{"s":200,"h":{"Content-Type":"text/plain"},"b":"SGVsbG8="}"#; - let raw = parse_relay_json(body.as_bytes()).unwrap(); + let raw = parse_relay_json(body.as_bytes(), true).unwrap(); let s = String::from_utf8_lossy(&raw); assert!(s.starts_with("HTTP/1.1 200 OK\r\n")); assert!(s.contains("Content-Type: text/plain\r\n")); @@ -6795,14 +6831,14 @@ hello"; #[test] fn parse_relay_error_field() { let body = r#"{"e":"unauthorized"}"#; - let err = parse_relay_json(body.as_bytes()).unwrap_err(); + let err = parse_relay_json(body.as_bytes(), true).unwrap_err(); assert!(matches!(err, FronterError::Relay(_))); } #[test] fn parse_relay_rejects_invalid_body_base64() { let body = r#"{"s":200,"b":"***not-base64***"}"#; - let err = parse_relay_json(body.as_bytes()).unwrap_err(); + let err = parse_relay_json(body.as_bytes(), true).unwrap_err(); assert!(matches!(err, FronterError::BadResponse(_))); } @@ -6861,7 +6897,7 @@ hello"; #[test] fn parse_relay_array_set_cookie() { let body = r#"{"s":200,"h":{"Set-Cookie":["a=1","b=2"]},"b":""}"#; - let raw = parse_relay_json(body.as_bytes()).unwrap(); + let raw = parse_relay_json(body.as_bytes(), true).unwrap(); let s = String::from_utf8_lossy(&raw); assert!(s.contains("Set-Cookie: a=1\r\n")); assert!(s.contains("Set-Cookie: b=2\r\n")); @@ -6929,7 +6965,7 @@ hello"; // to fail with `key must be a string at line 2`. let inner_json = r#"{"s":200,"h":{},"b":""}"#; let wrapped = build_goog_script_init_wrapper(inner_json); - let raw = parse_relay_json(wrapped.as_bytes()).unwrap(); + let raw = parse_relay_json(wrapped.as_bytes(), true).unwrap(); let s = String::from_utf8_lossy(&raw); assert!(s.starts_with("HTTP/1.1 200 "), "got: {}", s); } diff --git a/src/proxy_server.rs b/src/proxy_server.rs index 19d97481..c3ff0741 100644 --- a/src/proxy_server.rs +++ b/src/proxy_server.rs @@ -220,6 +220,7 @@ pub struct ProxyServer { tunnel_mux: Option>, coalesce_step_ms: u64, coalesce_max_ms: u64, + network_preset: Option, } pub struct RewriteCtx { @@ -527,6 +528,7 @@ impl ProxyServer { tunnel_mux: None, // initialized in run() inside the tokio runtime coalesce_step_ms: if config.coalesce_step_ms > 0 { config.coalesce_step_ms as u64 } else { 10 }, coalesce_max_ms: if config.coalesce_max_ms > 0 { config.coalesce_max_ms as u64 } else { 1000 }, + network_preset: config.network_preset.clone(), }) } @@ -548,7 +550,7 @@ impl ProxyServer { // Initialize TunnelMux inside the runtime (tokio::spawn requires it). if self.rewrite_ctx.mode == Mode::Full { if let Some(f) = self.fronter.as_ref() { - self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms)); + self.tunnel_mux = Some(TunnelMux::start(f.clone(), self.coalesce_step_ms, self.coalesce_max_ms, self.network_preset.as_deref())); } } @@ -1837,11 +1839,14 @@ async fn dispatch_tunnel( }; // 3. Peek at the first byte to detect TLS vs plain. Time-bounded — if the - // client doesn't send anything within 300ms, assume server-first + // client doesn't send anything within 100ms, assume server-first // protocol (SMTP, POP3, FTP banner) and jump straight to plain TCP. + // Reduced from 300ms: browsers deliver ClientHello within 10-50ms; + // 100ms still covers slow/congested links while saving 200ms per + // connection setup. let mut peek_buf = [0u8; 8]; let peek_n = match tokio::time::timeout( - std::time::Duration::from_millis(300), + std::time::Duration::from_millis(100), sock.peek(&mut peek_buf), ) .await @@ -1927,7 +1932,7 @@ async fn plain_tcp_passthrough( } else { std::time::Duration::from_secs(10) }; - let upstream = if let Some(proxy) = upstream_socks5 { + let mut upstream = if let Some(proxy) = upstream_socks5 { match socks5_connect_via(proxy, target_host, port).await { Ok(s) => { tracing::info!("tcp via upstream-socks5 {} -> {}:{}", proxy, host, port); @@ -1970,17 +1975,12 @@ async fn plain_tcp_passthrough( } }; let _ = upstream.set_nodelay(true); - let (mut ar, mut aw) = sock.split(); - let (mut br, mut bw) = { - let (r, w) = upstream.into_split(); - (r, w) - }; - let t1 = tokio::io::copy(&mut ar, &mut bw); - let t2 = tokio::io::copy(&mut br, &mut aw); - tokio::select! { - _ = t1 => {} - _ = t2 => {} - } + // 64 KB buffers: on a 200ms relay RTT, 8 KB (tokio default) caps + // throughput at ~40 KB/s; 64 KB raises that ceiling to ~320 KB/s. + // copy_bidirectional_with_sizes also handles half-close correctly — + // unlike the previous select! pattern which could drop in-flight data + // when one direction closed first. + let _ = tokio::io::copy_bidirectional_with_sizes(&mut sock, &mut upstream, 65536, 65536).await; } /// Open a TCP stream to `(host, port)` through an upstream SOCKS5 proxy @@ -2101,7 +2101,7 @@ enum HeadReadResult { async fn read_http_head(sock: &mut TcpStream) -> std::io::Result { let mut buf = Vec::with_capacity(4096); - let mut tmp = [0u8; 4096]; + let mut tmp = [0u8; 16384]; loop { let n = sock.read(&mut tmp).await?; if n == 0 { @@ -2327,7 +2327,7 @@ async fn do_sni_rewrite_tunnel_from_tcp( } } }; - let inbound = match TlsAcceptor::from(server_config).accept(sock).await { + let mut inbound = match TlsAcceptor::from(server_config).accept(sock).await { Ok(t) => t, Err(e) => { tracing::debug!("inbound TLS accept failed for {}: {}", host, e); @@ -2357,7 +2357,7 @@ async fn do_sni_rewrite_tunnel_from_tcp( }; let _ = upstream_tcp.set_nodelay(true); - let outbound = match rewrite_ctx + let mut outbound = match rewrite_ctx .tls_connector .connect(server_name, upstream_tcp) .await @@ -2369,15 +2369,8 @@ async fn do_sni_rewrite_tunnel_from_tcp( } }; - // Bridge decrypted bytes between the two TLS streams. - let (mut ir, mut iw) = tokio::io::split(inbound); - let (mut or, mut ow) = tokio::io::split(outbound); - let client_to_server = async { tokio::io::copy(&mut ir, &mut ow).await }; - let server_to_client = async { tokio::io::copy(&mut or, &mut iw).await }; - tokio::select! { - _ = client_to_server => {} - _ = server_to_client => {} - } + // Bridge decrypted bytes between the two TLS streams with 64 KB buffers. + let _ = tokio::io::copy_bidirectional_with_sizes(&mut inbound, &mut outbound, 65536, 65536).await; Ok(()) } @@ -2634,7 +2627,7 @@ where S: tokio::io::AsyncRead + Unpin, { let mut buf = Vec::with_capacity(4096); - let mut tmp = [0u8; 4096]; + let mut tmp = [0u8; 16384]; loop { let n = stream.read(&mut tmp).await?; if n == 0 { @@ -3075,18 +3068,15 @@ async fn do_plain_http_passthrough( }; let _ = upstream.set_nodelay(true); - let (mut ar, mut aw) = sock.split(); - let (mut br, mut bw) = upstream.into_split(); + // Write the rewritten request, then reunite the upstream halves so we + // can use copy_bidirectional_with_sizes for the full relay loop. + let (br, mut bw) = upstream.into_split(); bw.write_all(&rewritten).await?; if !leftover.is_empty() { bw.write_all(leftover).await?; } - let t1 = tokio::io::copy(&mut ar, &mut bw); - let t2 = tokio::io::copy(&mut br, &mut aw); - tokio::select! { - _ = t1 => {} - _ = t2 => {} - } + let mut upstream = br.reunite(bw).expect("halves from the same TcpStream"); + let _ = tokio::io::copy_bidirectional_with_sizes(&mut sock, &mut upstream, 65536, 65536).await; Ok(()) } @@ -3709,4 +3699,54 @@ mod tests { }; assert!(FrontingGroupResolved::from_config(&bad).is_err()); } + + /// Verifies that copy_bidirectional_with_sizes correctly relays data in + /// both directions. Splits each duplex into read/write halves so the + /// write task and read assertions can operate independently. + #[tokio::test(flavor = "current_thread")] + async fn copy_bidirectional_large_buf_roundtrip() { + use tokio::io::AsyncWriteExt; + + // a_client <-> a_server and b_client <-> b_server + let (a_client, mut a_server) = tokio::io::duplex(128 * 1024); + let (b_client, mut b_server) = tokio::io::duplex(128 * 1024); + + let (mut a_client_read, mut a_client_write) = tokio::io::split(a_client); + let (mut b_client_read, mut b_client_write) = tokio::io::split(b_client); + + let payload_ab = vec![0xAAu8; 32 * 1024]; // a→b + let payload_ba = vec![0xBBu8; 48 * 1024]; // b→a + + let pa = payload_ab.clone(); + let pb = payload_ba.clone(); + + // Writer task: fills both pipes and shuts down write halves so the + // relay sees EOF and can propagate the shutdown to the other side. + let write_task = tokio::spawn(async move { + a_client_write.write_all(&pa).await.unwrap(); + a_client_write.shutdown().await.unwrap(); + b_client_write.write_all(&pb).await.unwrap(); + b_client_write.shutdown().await.unwrap(); + }); + + // Relay task: bridges a_server <-> b_server with 64 KB buffers. + let relay_task = tokio::spawn(async move { + tokio::io::copy_bidirectional_with_sizes(&mut a_server, &mut b_server, 65536, 65536) + .await + .unwrap(); + }); + + write_task.await.unwrap(); + relay_task.await.unwrap(); + + // After relay finishes it has propagated each shutdown to the opposite + // side, so both client read halves are at EOF. + let mut got_at_b = Vec::new(); + b_client_read.read_to_end(&mut got_at_b).await.unwrap(); + assert_eq!(got_at_b, payload_ab, "data written to A must arrive at B"); + + let mut got_at_a = Vec::new(); + a_client_read.read_to_end(&mut got_at_a).await.unwrap(); + assert_eq!(got_at_a, payload_ba, "data written to B must arrive at A"); + } } diff --git a/src/tunnel_client.rs b/src/tunnel_client.rs index be671b4e..4c30eaac 100644 --- a/src/tunnel_client.rs +++ b/src/tunnel_client.rs @@ -56,7 +56,7 @@ const REPLY_TIMEOUT_SLACK: Duration = Duration::from_secs(5); /// Per-inflight reply timeout used by the pipelined poll loop. Each /// in-flight future independently times out after this duration so a /// dead target on the tunnel-node side doesn't block the session. -const REPLY_TIMEOUT: Duration = Duration::from_secs(35); +const REPLY_TIMEOUT: Duration = Duration::from_secs(20); /// How long we'll briefly hold the client socket after the local /// CONNECT/SOCKS5 handshake, waiting for the client's first bytes (the @@ -97,6 +97,26 @@ const MAX_ELEVATED_TOTAL: u64 = 10; const DEFAULT_COALESCE_STEP_MS: u64 = 200; const DEFAULT_COALESCE_MAX_MS: u64 = 1000; +/// Coalesce timings for the "fast" network preset (broadband/fiber, RTT < 700ms). +/// Prioritises responsiveness — small windows still batch parallel page assets. +const COALESCE_FAST_STEP_MS: u64 = 50; +const COALESCE_FAST_MAX_MS: u64 = 300; +/// Coalesce timings for the "slow" network preset (Iran cable/mobile, RTT > 1200ms). +/// Larger windows pack more ops per expensive round-trip to Apps Script. +const COALESCE_SLOW_STEP_MS: u64 = 150; +const COALESCE_SLOW_MAX_MS: u64 = 600; +/// Median batch RTT above which the RttTracker switches to the Slow preset. +const RTT_SLOW_THRESHOLD_MS: u64 = 1200; +/// Median batch RTT below which the RttTracker switches back to Fast. +/// Lower than `RTT_SLOW_THRESHOLD_MS` to provide hysteresis (avoids flapping). +const RTT_FAST_THRESHOLD_MS: u64 = 700; +/// Consecutive sub-threshold RTT measurements required to leave the Slow preset. +/// Prevents a brief network improvement from prematurely flipping back to Fast. +const HYSTERESIS_FAST_COUNT: u32 = 3; +/// Size of the RttTracker ring buffer. A window of 8 smooths short spikes +/// while staying responsive to sustained network changes. +const RTT_WINDOW: usize = 8; + /// Structured error code the tunnel-node returns when it doesn't know the /// op (version mismatch). Must match `tunnel-node/src/main.rs`. const CODE_UNSUPPORTED_OP: &str = "UNSUPPORTED_OP"; @@ -321,6 +341,111 @@ struct PendingOp { wseq: Option, } +// --- RttTracker --- \\ + +/// Tracks GAS batch round-trip times and auto-selects the coalesce preset. +/// +/// Shared (via `Arc`) between `mux_loop` (which reads the coalesce atomics) and +/// `fire_batch` spawned tasks (which record observed RTTs). Each record() call +/// inserts a sample into the ring buffer; once the buffer is full the median is +/// compared against the threshold constants and the coalesce atomics are updated +/// if the preset should change. +struct RttTracker { + /// Ring buffer of recent batch RTTs in milliseconds. + samples: Mutex<[u64; RTT_WINDOW]>, + /// Index of the next write slot (mod RTT_WINDOW). + head: std::sync::atomic::AtomicUsize, + /// How many samples have been inserted (capped at RTT_WINDOW once full). + filled: std::sync::atomic::AtomicUsize, + /// 0 = Fast preset active, 1 = Slow preset active. + current_preset: std::sync::atomic::AtomicU8, + /// Consecutive sub-threshold readings while in Slow mode. + /// Resets to 0 on any slow reading; triggers Fast switch at HYSTERESIS_FAST_COUNT. + consecutive_fast: std::sync::atomic::AtomicU32, + /// Shared with mux_loop; updated when preset changes. + coalesce_step_ms: Arc, + coalesce_max_ms: Arc, +} + +impl RttTracker { + fn new( + start_slow: bool, + step: Arc, + max: Arc, + ) -> Self { + Self { + samples: Mutex::new([0u64; RTT_WINDOW]), + head: std::sync::atomic::AtomicUsize::new(0), + filled: std::sync::atomic::AtomicUsize::new(0), + current_preset: std::sync::atomic::AtomicU8::new(if start_slow { 1 } else { 0 }), + consecutive_fast: std::sync::atomic::AtomicU32::new(0), + coalesce_step_ms: step, + coalesce_max_ms: max, + } + } + + /// Record a batch RTT observation and update the preset if warranted. + fn record(&self, rtt: std::time::Duration) { + let rtt_ms = rtt.as_millis() as u64; + let slot = self.head.fetch_add(1, Ordering::Relaxed) % RTT_WINDOW; + { + let mut buf = self.samples.lock().unwrap(); + buf[slot] = rtt_ms; + } + let prev = self.filled.load(Ordering::Relaxed); + if prev < RTT_WINDOW { + self.filled.store(prev + 1, Ordering::Relaxed); + return; // not enough data yet + } + + // Median of the ring buffer (sort a copy — 8 elements, trivial cost). + let median = { + let buf = self.samples.lock().unwrap(); + let mut sorted = *buf; + sorted.sort_unstable(); + sorted[RTT_WINDOW / 2] + }; + + let is_slow = self.current_preset.load(Ordering::Relaxed) == 1; + if is_slow { + if median < RTT_FAST_THRESHOLD_MS { + let n = self.consecutive_fast.fetch_add(1, Ordering::Relaxed) + 1; + if n >= HYSTERESIS_FAST_COUNT { + self.apply_preset(false); + } + } else { + self.consecutive_fast.store(0, Ordering::Relaxed); + } + } else if median > RTT_SLOW_THRESHOLD_MS { + self.consecutive_fast.store(0, Ordering::Relaxed); + self.apply_preset(true); + } + } + + fn apply_preset(&self, slow: bool) { + let (step, max) = if slow { + (COALESCE_SLOW_STEP_MS, COALESCE_SLOW_MAX_MS) + } else { + (COALESCE_FAST_STEP_MS, COALESCE_FAST_MAX_MS) + }; + self.current_preset.store(if slow { 1 } else { 0 }, Ordering::Relaxed); + self.coalesce_step_ms.store(step, Ordering::Relaxed); + self.coalesce_max_ms.store(max, Ordering::Relaxed); + self.consecutive_fast.store(0, Ordering::Relaxed); + tracing::info!( + "coalesce preset -> {} (median RTT threshold crossed; step={}ms max={}ms)", + if slow { "Slow" } else { "Fast" }, + step, + max, + ); + } + + /// Current preset label — used in tests and startup log. + fn preset_name(&self) -> &'static str { + if self.current_preset.load(Ordering::Relaxed) == 1 { "Slow" } else { "Fast" } + } +} + pub struct TunnelMux { tx: mpsc::UnboundedSender, /// Set to `true` after the first time the tunnel-node rejects @@ -402,10 +527,16 @@ pub struct TunnelMux { /// How many sessions are currently at elevated pipeline depth (>= 3). elevated_sessions: AtomicU64, max_elevated: u64, + /// Adaptive coalesce step/max (milliseconds). Read by mux_loop at each new + /// window boundary; updated by RttTracker when the preset changes. + coalesce_step_ms: Arc, + coalesce_max_ms: Arc, + /// Present when network_preset = "auto"; absent when locked to fast/slow. + rtt_tracker: Option>, } impl TunnelMux { - pub fn start(fronter: Arc, coalesce_step_ms: u64, coalesce_max_ms: u64) -> Arc { + pub fn start(fronter: Arc, coalesce_step_ms: u64, coalesce_max_ms: u64, network_preset: Option<&str>) -> Arc { // Dedupe before snapshotting: the aggregate `all_legacy` gate // compares `legacy_deployments.len()` (a HashMap, so unique // keys) against this count, so using the raw `num_scripts()` @@ -430,9 +561,43 @@ impl TunnelMux { unique_n, CONCURRENCY_PER_DEPLOYMENT ); - let step = if coalesce_step_ms > 0 { coalesce_step_ms } else { DEFAULT_COALESCE_STEP_MS }; - let max = if coalesce_max_ms > 0 { coalesce_max_ms } else { DEFAULT_COALESCE_MAX_MS }; - tracing::info!("batch coalesce: step={}ms max={}ms, pipeline max depth: {}, optimist: {}", step, max, INFLIGHT_ACTIVE, INFLIGHT_OPTIMIST); + // Determine initial coalesce values and whether to run auto-adaptation. + // Config coalesce_step_ms / coalesce_max_ms are explicit overrides + // (non-zero = user wants that specific value). network_preset selects a + // named profile; "auto" (default) starts at Fast and adapts via RttTracker. + let (init_step, init_max, locked) = match network_preset { + Some("slow") => (COALESCE_SLOW_STEP_MS, COALESCE_SLOW_MAX_MS, true), + Some("fast") => (COALESCE_FAST_STEP_MS, COALESCE_FAST_MAX_MS, true), + _ => { + // "auto" or unset: honour explicit config overrides if present, + // otherwise start at Fast defaults and let RttTracker adapt. + let s = if coalesce_step_ms > 0 { coalesce_step_ms } else { COALESCE_FAST_STEP_MS }; + let m = if coalesce_max_ms > 0 { coalesce_max_ms } else { COALESCE_FAST_MAX_MS }; + (s, m, false) + } + }; + // If the user provided explicit ms values with no preset, honour them but + // don't run auto-adaptation (explicit overrides take precedence). + let locked = locked || (coalesce_step_ms > 0 || coalesce_max_ms > 0); + + let step_atom = Arc::new(AtomicU64::new(init_step)); + let max_atom = Arc::new(AtomicU64::new(init_max)); + let rtt_tracker: Option> = if locked { + None + } else { + Some(Arc::new(RttTracker::new(false, step_atom.clone(), max_atom.clone()))) + }; + + let preset_label = match network_preset { + Some("slow") => "Slow (locked)", + Some("fast") => "Fast (locked)", + _ if locked => "custom (locked)", + _ => "Fast (auto-adaptive)", + }; + tracing::info!( + "batch coalesce: preset={} step={}ms max={}ms, pipeline max depth: {}, optimist: {}", + preset_label, init_step, init_max, INFLIGHT_ACTIVE, INFLIGHT_OPTIMIST + ); // Reply timeout co-varies with `request_timeout_secs` so an // operator who raises the batch budget doesn't have sessions // abandoning replies just before the HTTP round-trip would @@ -446,7 +611,7 @@ impl TunnelMux { (CONCURRENCY_PER_DEPLOYMENT * unique_n) as u64, ); let (tx, rx) = mpsc::unbounded_channel(); - tokio::spawn(mux_loop(rx, fronter, step, max)); + tokio::spawn(mux_loop(rx, fronter, step_atom.clone(), max_atom.clone(), rtt_tracker.clone())); Arc::new(Self { tx, connect_data_unsupported: Arc::new(AtomicBool::new(false)), @@ -463,6 +628,9 @@ impl TunnelMux { reply_timeout, elevated_sessions: AtomicU64::new(0), max_elevated: MAX_ELEVATED_TOTAL, + coalesce_step_ms: step_atom, + coalesce_max_ms: max_atom, + rtt_tracker, }) } @@ -739,9 +907,13 @@ impl TunnelMux { } } -async fn mux_loop(mut rx: mpsc::UnboundedReceiver, fronter: Arc, coalesce_step_ms: u64, coalesce_max_ms: u64) { - let coalesce_step = Duration::from_millis(coalesce_step_ms); - let coalesce_max = Duration::from_millis(coalesce_max_ms); +async fn mux_loop( + mut rx: mpsc::UnboundedReceiver, + fronter: Arc, + coalesce_step_ms: Arc, + coalesce_max_ms: Arc, + rtt_tracker: Option>, +) { // One semaphore per deployment ID, each allowing 30 concurrent requests. let sems: Arc>> = Arc::new( fronter @@ -766,6 +938,10 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver, fronter: Arc msgs.push(msg), None => break, } + // Sample coalesce timings at each new window so RttTracker changes + // take effect without a restart. + let coalesce_step = Duration::from_millis(coalesce_step_ms.load(Ordering::Relaxed)); + let coalesce_max = Duration::from_millis(coalesce_max_ms.load(Ordering::Relaxed)); let hard_deadline = tokio::time::Instant::now() + coalesce_max; let mut soft_deadline = tokio::time::Instant::now() + coalesce_step; loop { @@ -792,7 +968,7 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver, fronter: Arc = Vec::new(); for msg in msgs { @@ -904,11 +1080,11 @@ async fn mux_loop(mut rx: mpsc::UnboundedReceiver, fronter: Arc, data_replies: Vec<(usize, BatchedReply)>, payload_bytes: usize, + /// Shared with `fire_batch` so spawned tasks can record observed RTTs. + /// `None` when coalesce preset is locked (not auto-adaptive). + rtt_tracker: Option>, } impl BatchAccum { - fn new() -> Self { + fn new(rtt_tracker: Option>) -> Self { Self { pending_ops: Vec::new(), data_replies: Vec::new(), payload_bytes: 0, + rtt_tracker, } } @@ -945,6 +1125,7 @@ impl BatchAccum { fronter, std::mem::take(&mut self.pending_ops), std::mem::take(&mut self.data_replies), + self.rtt_tracker.clone(), ) .await; self.payload_bytes = 0; @@ -1024,6 +1205,7 @@ async fn fire_batch( fronter: &Arc, pending_ops: Vec, data_replies: Vec<(usize, BatchedReply)>, + rtt_tracker: Option>, ) { let script_id = fronter.next_script_id(); let sem = sems @@ -1166,6 +1348,12 @@ async fn fire_batch( } } } + + // Record the observed RTT so RttTracker can auto-adapt coalesce timings. + // Only fires in "auto" mode (rtt_tracker is None when preset is locked). + if let Some(tracker) = rtt_tracker { + tracker.record(t0.elapsed()); + } }); } @@ -1443,7 +1631,9 @@ async fn tunnel_loop( let mut pending_writes: BTreeMap = BTreeMap::new(); // Buffered upload data waiting to be sent (when pipeline is full). - let mut buffered_upload: Option = None; + // Stored as BytesMut so appends are O(n) amortised; frozen to Bytes + // only at the send site (freeze() is a zero-copy pointer bump). + let mut buffered_upload: Option = None; enum ReplyOutcome { Ok(TunnelResponse, String), @@ -1543,13 +1733,14 @@ async fn tunnel_loop( } // Send initial pre-fill empty polls (optimist depth), staggered - // 1s apart so they land in separate batches. The pending data op + // 100ms apart so they land in separate batches without blocking + // session startup for a full second per slot. The pending data op // (if any) already occupies one slot. { let polls_to_send = max_inflight.saturating_sub(inflight.len()); for i in 0..polls_to_send { if i > 0 { - tokio::time::sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } let (meta, reply_rx) = send_empty_poll(sid, &mut next_send_seq, mux); tracing::debug!( @@ -1696,8 +1887,8 @@ async fn tunnel_loop( if refill_steps >= 10 { // Check buffered upload first — merge into a data // op instead of sending an empty poll. - if let Some(data) = buffered_upload.take() { - let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); + if let Some(bm) = buffered_upload.take() { + let (meta, reply_rx) = send_data_op(sid, bm.freeze(), &mut next_send_seq, &mut next_data_write_seq, mux); inflight.push(wrap_reply(meta, reply_rx)); } else { let (meta, reply_rx) = send_empty_poll(sid, &mut next_send_seq, mux); @@ -1811,13 +2002,13 @@ async fn tunnel_loop( } // Send buffered upload data now that a slot freed up. - if let Some(data) = buffered_upload.take() { + if let Some(bm) = buffered_upload.take() { if inflight.len() < max_inflight { - let (meta, reply_rx) = send_data_op(sid, data, &mut next_send_seq, &mut next_data_write_seq, mux); + let (meta, reply_rx) = send_data_op(sid, bm.freeze(), &mut next_send_seq, &mut next_data_write_seq, mux); consecutive_empty = 0; inflight.push(wrap_reply(meta, reply_rx)); } else { - buffered_upload = Some(data); + buffered_upload = Some(bm); } } @@ -1930,15 +2121,14 @@ async fn tunnel_loop( consecutive_empty = 0; inflight.push(wrap_reply(meta, reply_rx)); } else { - // Buffer upload data until a slot frees up. - if let Some(ref mut existing) = buffered_upload { - // Merge: append new data to existing buffer. - let mut merged = BytesMut::with_capacity(existing.len() + data.len()); - merged.extend_from_slice(existing); - merged.extend_from_slice(&data); - *existing = merged.freeze(); + // Buffer upload data until a slot frees up. BytesMut + // extends in-place (amortised O(n)); no full-copy per append. + if let Some(ref mut bm) = buffered_upload { + bm.extend_from_slice(&data); } else { - buffered_upload = Some(data); + let mut bm = BytesMut::with_capacity(data.len()); + bm.extend_from_slice(&data); + buffered_upload = Some(bm); } } } @@ -2271,6 +2461,10 @@ mod tests { reply_timeout: Duration::from_secs(35), elevated_sessions: AtomicU64::new(0), max_elevated: MAX_ELEVATED_TOTAL, + // Tests use Fast preset constants; auto-adaptation is off. + coalesce_step_ms: Arc::new(AtomicU64::new(COALESCE_FAST_STEP_MS)), + coalesce_max_ms: Arc::new(AtomicU64::new(COALESCE_FAST_MAX_MS)), + rtt_tracker: None, }); (mux, rx) } @@ -2307,7 +2501,7 @@ mod tests { ) .unwrap(); let fronter = Arc::new(DomainFronter::new(&cfg).expect("test fronter must construct")); - let mux = TunnelMux::start(fronter, 0, 0); + let mux = TunnelMux::start(fronter, 0, 0, None); assert_eq!( mux.reply_timeout(), @@ -2684,7 +2878,7 @@ mod tests { }; let mk_reply = || oneshot::channel::>().0; - let mut accum = BatchAccum::new(); + let mut accum = BatchAccum::new(None); // Batch A: 3 ops at indices 0, 1, 2. push_no_fire(&mut accum, mk_op("a0"), 4, mk_reply()); @@ -2936,4 +3130,99 @@ mod tests { .await .expect("tunnel_loop did not exit after eof"); } + + // --- RttTracker unit tests --- \\ + + fn make_tracker(start_slow: bool) -> (RttTracker, Arc, Arc) { + let step = Arc::new(AtomicU64::new(if start_slow { COALESCE_SLOW_STEP_MS } else { COALESCE_FAST_STEP_MS })); + let max = Arc::new(AtomicU64::new(if start_slow { COALESCE_SLOW_MAX_MS } else { COALESCE_FAST_MAX_MS })); + let tracker = RttTracker::new(start_slow, step.clone(), max.clone()); + (tracker, step, max) + } + + /// Feed RTT_WINDOW+1 slow samples — tracker must switch to Slow preset. + /// + /// `record()` only evaluates the median once the ring is full: the first + /// RTT_WINDOW calls fill the buffer and all return early; only the + /// (RTT_WINDOW+1)th call performs the first evaluation. + #[test] + fn rtt_tracker_preset_selection_slow() { + let (tracker, step, max) = make_tracker(false); + // Start in Fast. RTT_WINDOW calls fill the ring (no evaluation yet); + // the extra +1 triggers the first median check. + for _ in 0..RTT_WINDOW + 1 { + tracker.record(Duration::from_millis(RTT_SLOW_THRESHOLD_MS + 100)); + } + assert_eq!(tracker.preset_name(), "Slow"); + assert_eq!(step.load(Ordering::Relaxed), COALESCE_SLOW_STEP_MS); + assert_eq!(max.load(Ordering::Relaxed), COALESCE_SLOW_MAX_MS); + } + + /// Fill with fast samples — tracker must switch from Slow to Fast after + /// HYSTERESIS_FAST_COUNT consecutive sub-threshold evaluations. + #[test] + fn rtt_tracker_preset_selection_fast() { + // Start in Slow. RTT_WINDOW calls fill the ring (no evaluation yet); + // subsequent calls each evaluate the median and increment consecutive_fast. + let (tracker, step, max) = make_tracker(true); + for _ in 0..RTT_WINDOW { + tracker.record(Duration::from_millis(RTT_FAST_THRESHOLD_MS - 100)); + } + // Buffer is full but no evaluation has fired — still Slow. + assert_eq!(tracker.preset_name(), "Slow"); + + // Each of the next HYSTERESIS_FAST_COUNT calls evaluates: consecutive_fast + // increments 1→2→3 and the 3rd flip applies the Fast preset. + for _ in 0..HYSTERESIS_FAST_COUNT { + tracker.record(Duration::from_millis(RTT_FAST_THRESHOLD_MS - 100)); + } + assert_eq!(tracker.preset_name(), "Fast"); + assert_eq!(step.load(Ordering::Relaxed), COALESCE_FAST_STEP_MS); + assert_eq!(max.load(Ordering::Relaxed), COALESCE_FAST_MAX_MS); + } + + /// Two consecutive fast samples while in Slow must not flip the preset; + /// only the third (HYSTERESIS_FAST_COUNT = 3) should. + #[test] + fn rtt_tracker_hysteresis_prevents_premature_flip() { + let (tracker, _, _) = make_tracker(true); + // Fill the buffer with slow samples so median is slow. + for _ in 0..RTT_WINDOW { + tracker.record(Duration::from_millis(RTT_SLOW_THRESHOLD_MS + 100)); + } + assert_eq!(tracker.preset_name(), "Slow"); + + // Now inject fast samples one at a time — the median will still be + // slow for the first few because the ring buffer still holds old values. + // But the consecutive_fast counter also resets if any slow sample + // arrives. We only need to verify the counter logic fires correctly + // when the median *does* cross the threshold. + // + // Strategy: overwrite all slots with fast values so median flips, + // then count the consecutive increments needed. + for _ in 0..RTT_WINDOW { + tracker.record(Duration::from_millis(RTT_FAST_THRESHOLD_MS - 100)); + } + // At this point the buffer is full of fast samples — median is fast. + // The first such sample after all slots filled should have started + // incrementing consecutive_fast. Let's verify by checking the preset: + // we need exactly HYSTERESIS_FAST_COUNT calls past the fill. + // (The loop above did RTT_WINDOW calls; the last call that filled the + // ring also set filled=RTT_WINDOW and then ran the comparison. So + // by now consecutive_fast has been incrementing for RTT_WINDOW - (RTT_WINDOW-1) + // = 1 sample... let's just confirm the state and send more if needed.) + if tracker.preset_name() != "Fast" { + // Still need more consecutive fast readings. + for _ in 0..HYSTERESIS_FAST_COUNT { + tracker.record(Duration::from_millis(RTT_FAST_THRESHOLD_MS - 100)); + } + } + assert_eq!(tracker.preset_name(), "Fast", "must switch to Fast after HYSTERESIS_FAST_COUNT consecutive sub-threshold readings"); + + // A single slow sample while in Fast must not immediately flip back. + tracker.record(Duration::from_millis(RTT_SLOW_THRESHOLD_MS + 100)); + // Buffer still has mostly fast samples — median won't cross slow threshold yet. + // Just verify it didn't jump back. + assert_eq!(tracker.preset_name(), "Fast", "one slow sample must not flip Fast -> Slow when median is still fast"); + } }