Skip to content

Commit de6224b

Browse files
its-mashalexhancock
authored andcommitted
fix: address review feedback — remove dead Conflict variant, restore sync on resume, rename test
- Remove unused SessionError::Conflict and dead string-matching in tower.rs (leftover from abandoned 409 approach) - Restore sync() replay when replacing a dead primary common channel so server-initiated requests and cached notifications are not lost on reconnect - Rename test from test_sse_channel_replacement_bug to test_sse_concurrent_streams per reviewer suggestion (describe what tests verify, not what triggered them) - Add test for cache replay on dead primary replacement - Use generic "MCP clients" in comments instead of specific client names Signed-off-by: Mohammod Al Amin Ashik <maa.ashik00@gmail.com>
1 parent d4801bd commit de6224b

4 files changed

Lines changed: 69 additions & 38 deletions

File tree

crates/rmcp/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,6 @@ required-features = [
243243
path = "tests/test_custom_headers.rs"
244244

245245
[[test]]
246-
name = "test_sse_channel_replacement_bug"
246+
name = "test_sse_concurrent_streams"
247247
required-features = ["server", "client", "transport-streamable-http-server", "transport-streamable-http-client", "reqwest"]
248-
path = "tests/test_sse_channel_replacement_bug.rs"
248+
path = "tests/test_sse_concurrent_streams.rs"

crates/rmcp/src/transport/streamable_http_server/session/local.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,9 @@ pub struct LocalSessionWorker {
295295
common: CachedTx,
296296
/// Shadow senders for secondary SSE streams (e.g. from POST EventSource
297297
/// reconnections). These keep the HTTP connections alive via SSE keep-alive
298-
/// without receiving notifications, preventing clients like Cursor from
299-
/// entering infinite reconnect loops when multiple EventSource connections
300-
/// compete to replace the common channel.
298+
/// without receiving notifications, preventing MCP clients from entering
299+
/// infinite reconnect loops when multiple EventSource connections compete
300+
/// to replace the common channel.
301301
shadow_txs: Vec<Sender<ServerSseMessage>>,
302302
event_rx: Receiver<SessionEvent>,
303303
session_config: SessionConfig,
@@ -321,8 +321,6 @@ pub enum SessionError {
321321
SessionServiceTerminated,
322322
#[error("Invalid event id")]
323323
InvalidEventId,
324-
#[error("Conflict: Only one standalone SSE stream is allowed per session")]
325-
Conflict,
326324
#[error("IO error: {0}")]
327325
Io(#[from] std::io::Error),
328326
}
@@ -546,29 +544,37 @@ impl LocalSessionWorker {
546544
http_request_id,
547545
"Request-wise channel completed, falling back to common channel"
548546
);
549-
self.resume_or_shadow_common()
547+
self.resume_or_shadow_common(last_event_id.index).await
550548
}
551549
}
552-
None => self.resume_or_shadow_common(),
550+
None => self.resume_or_shadow_common(last_event_id.index).await,
553551
}
554552
}
555553

556554
/// Resume the common channel, or create a shadow stream if the primary is
557555
/// still active.
558556
///
559557
/// When the primary common channel is dead (receiver dropped), replace it
560-
/// so this stream becomes the new primary notification channel.
558+
/// so this stream becomes the new primary notification channel. Cached
559+
/// messages are replayed from `last_event_index` so the client receives
560+
/// any events it missed (including server-initiated requests).
561561
///
562562
/// When the primary is still active, create a "shadow" stream — an idle SSE
563563
/// connection kept alive by keep-alive pings. This prevents multiple
564564
/// EventSource connections (e.g. from POST response reconnections) from
565565
/// killing each other by repeatedly replacing the common channel sender.
566-
fn resume_or_shadow_common(&mut self) -> Result<StreamableHttpMessageReceiver, SessionError> {
566+
async fn resume_or_shadow_common(
567+
&mut self,
568+
last_event_index: usize,
569+
) -> Result<StreamableHttpMessageReceiver, SessionError> {
567570
let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
568571
if self.common.tx.is_closed() {
569572
// Primary common channel is dead — replace it.
570573
tracing::debug!("Replacing dead common channel with new primary");
571574
self.common.tx = tx;
575+
// Replay cached messages from where the client left off so
576+
// server-initiated requests and notifications are not lost.
577+
self.common.sync(last_event_index).await?;
572578
} else {
573579
// Primary common channel is still active. Create a shadow stream
574580
// that stays alive via SSE keep-alive but doesn't receive

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -328,20 +328,11 @@ where
328328
.map(|s| s.to_owned());
329329
if let Some(last_event_id) = last_event_id {
330330
// check if session has this event id
331-
let stream = match self
331+
let stream = self
332332
.session_manager
333333
.resume(&session_id, last_event_id)
334334
.await
335-
{
336-
Ok(stream) => stream,
337-
Err(e) if e.to_string().contains("Conflict:") => {
338-
return Ok(Response::builder()
339-
.status(http::StatusCode::CONFLICT)
340-
.body(Full::new(Bytes::from(e.to_string())).boxed())
341-
.expect("valid response"));
342-
}
343-
Err(e) => return Err(internal_error_response("resume session")(e)),
344-
};
335+
.map_err(internal_error_response("resume session"))?;
345336
// Resume doesn't need priming - client already has the event ID
346337
Ok(sse_stream_response(
347338
stream,
@@ -350,20 +341,11 @@ where
350341
))
351342
} else {
352343
// create standalone stream
353-
let stream = match self
344+
let stream = self
354345
.session_manager
355346
.create_standalone_stream(&session_id)
356347
.await
357-
{
358-
Ok(stream) => stream,
359-
Err(e) if e.to_string().contains("Conflict:") => {
360-
return Ok(Response::builder()
361-
.status(http::StatusCode::CONFLICT)
362-
.body(Full::new(Bytes::from(e.to_string())).boxed())
363-
.expect("valid response"));
364-
}
365-
Err(e) => return Err(internal_error_response("create standalone stream")(e)),
366-
};
348+
.map_err(internal_error_response("create standalone stream"))?;
367349
// Prepend priming event if sse_retry configured
368350
let stream = if let Some(retry) = self.config.sse_retry {
369351
let priming = ServerSseMessage {

crates/rmcp/tests/test_sse_channel_replacement_bug.rs renamed to crates/rmcp/tests/test_sse_concurrent_streams.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
/// Tests for SSE channel replacement fix (shadow channels)
1+
/// Tests for concurrent SSE stream handling (shadow channels)
22
///
33
/// These tests verify that multiple GET SSE streams on the same session
44
/// don't kill each other by replacing the common channel sender.
55
///
6-
/// Root cause: When POST SSE responses include `retry`, EventSource reconnects
7-
/// via GET after the stream ends. Each GET was unconditionally replacing
8-
/// `self.common.tx`, killing the other stream's receiver — causing an infinite
9-
/// reconnect loop every `sse_retry` seconds.
6+
/// Root cause: When POST SSE responses include `retry`, the EventSource API
7+
/// reconnects via GET after the stream ends. Each GET was unconditionally
8+
/// replacing `self.common.tx`, killing the other stream's receiver — causing
9+
/// an infinite reconnect loop every `sse_retry` seconds.
1010
///
1111
/// Fix: `resume_or_shadow_common()` checks if the primary common channel is
1212
/// still active. If so, it creates a "shadow" stream (idle, keep-alive only)
@@ -700,3 +700,46 @@ async fn dropping_shadows_does_not_affect_primary() {
700700

701701
ct.cancel();
702702
}
703+
704+
// ─── Tests: Cache replay on dead primary replacement ─────────────────────────
705+
706+
/// When a notification is sent while the primary is alive, then the primary
707+
/// dies and a new GET resumes with Last-Event-ID "0", the replacement primary
708+
/// should receive the cached notification via sync() replay.
709+
#[tokio::test]
710+
async fn dead_primary_replacement_replays_cached_events() {
711+
let ct = CancellationToken::new();
712+
let trigger = Arc::new(Notify::new());
713+
let url = start_test_server(ct.clone(), trigger.clone()).await;
714+
let client = reqwest::Client::new();
715+
716+
let session_id = initialize_session(&client, &url).await;
717+
send_initialized_notification(&client, &url, &session_id).await;
718+
tokio::time::sleep(Duration::from_millis(200)).await;
719+
720+
// First GET — becomes primary
721+
let get1 = open_standalone_get(&client, &url, &session_id).await;
722+
assert_eq!(get1.status(), 200);
723+
tokio::time::sleep(Duration::from_millis(100)).await;
724+
725+
// Trigger notification while primary is alive (gets cached)
726+
trigger.notify_one();
727+
tokio::time::sleep(Duration::from_millis(200)).await;
728+
729+
// Drop primary — notification was sent and cached
730+
drop(get1);
731+
tokio::time::sleep(Duration::from_millis(100)).await;
732+
733+
// Resume with Last-Event-ID "0" — primary is dead, should replace it
734+
// and replay cached events from index 0
735+
let get_resume = open_resume_get(&client, &url, &session_id, "0").await;
736+
assert_eq!(get_resume.status(), 200);
737+
738+
// The cached notification should be replayed on the new primary
739+
assert!(
740+
wait_for_sse_event(get_resume, "tools/list_changed", Duration::from_secs(3)).await,
741+
"Replacement primary should receive cached notification via sync() replay"
742+
);
743+
744+
ct.cancel();
745+
}

0 commit comments

Comments
 (0)