Skip to content

Commit 0f578e3

Browse files
its-mashalexhancock
authored andcommitted
fix: use minimal buffer for shadow streams and cap at 32
- Shadow streams only receive SSE keep-alive pings, so use capacity 1 instead of full channel_capacity - Cap shadow_txs at 32 to prevent unbounded growth from misbehaving clients, dropping the oldest shadow when the limit is reached - Add test verifying primary works after exceeding shadow limit Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com>
1 parent de6224b commit 0f578e3

2 files changed

Lines changed: 56 additions & 2 deletions

File tree

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,8 +567,14 @@ impl LocalSessionWorker {
567567
&mut self,
568568
last_event_index: usize,
569569
) -> Result<StreamableHttpMessageReceiver, SessionError> {
570-
let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
571-
if self.common.tx.is_closed() {
570+
let is_replacing_dead_primary = self.common.tx.is_closed();
571+
let capacity = if is_replacing_dead_primary {
572+
self.session_config.channel_capacity
573+
} else {
574+
1 // Shadow streams only need keep-alive pings
575+
};
576+
let (tx, rx) = tokio::sync::mpsc::channel(capacity);
577+
if is_replacing_dead_primary {
572578
// Primary common channel is dead — replace it.
573579
tracing::debug!("Replacing dead common channel with new primary");
574580
self.common.tx = tx;
@@ -580,6 +586,15 @@ impl LocalSessionWorker {
580586
// that stays alive via SSE keep-alive but doesn't receive
581587
// notifications. This prevents competing EventSource connections
582588
// from killing each other's channels.
589+
const MAX_SHADOW_STREAMS: usize = 32;
590+
591+
if self.shadow_txs.len() >= MAX_SHADOW_STREAMS {
592+
tracing::warn!(
593+
shadow_count = self.shadow_txs.len(),
594+
"Shadow stream limit reached, dropping oldest"
595+
);
596+
self.shadow_txs.remove(0);
597+
}
583598
tracing::debug!(
584599
shadow_count = self.shadow_txs.len(),
585600
"Common channel active, creating shadow stream"

crates/rmcp/tests/test_sse_concurrent_streams.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -743,3 +743,42 @@ async fn dead_primary_replacement_replays_cached_events() {
743743

744744
ct.cancel();
745745
}
746+
747+
// ─── Tests: Shadow stream limits ─────────────────────────────────────────────
748+
749+
/// Opening more than 32 shadow streams should not crash or reject — the server
750+
/// drops the oldest shadow to stay within the limit. Primary still works.
751+
#[tokio::test]
752+
async fn shadow_stream_limit_drops_oldest() {
753+
let ct = CancellationToken::new();
754+
let trigger = Arc::new(Notify::new());
755+
let url = start_test_server(ct.clone(), trigger.clone()).await;
756+
let client = reqwest::Client::new();
757+
758+
let session_id = initialize_session(&client, &url).await;
759+
send_initialized_notification(&client, &url, &session_id).await;
760+
tokio::time::sleep(Duration::from_millis(200)).await;
761+
762+
// First GET — primary
763+
let get1 = open_standalone_get(&client, &url, &session_id).await;
764+
assert_eq!(get1.status(), 200);
765+
tokio::time::sleep(Duration::from_millis(100)).await;
766+
767+
// Open 35 shadow streams (exceeds MAX_SHADOW_STREAMS=32)
768+
let mut shadows = Vec::new();
769+
for i in 0..35 {
770+
let shadow = open_standalone_get(&client, &url, &session_id).await;
771+
assert_eq!(shadow.status(), 200, "Shadow #{i} should succeed");
772+
shadows.push(shadow);
773+
}
774+
775+
// Primary should still receive notifications despite shadow churn
776+
trigger.notify_one();
777+
778+
assert!(
779+
wait_for_sse_event(get1, "tools/list_changed", Duration::from_secs(3)).await,
780+
"Primary should still work after exceeding shadow limit"
781+
);
782+
783+
ct.cancel();
784+
}

0 commit comments

Comments
 (0)