From d816bac09f39794a8dbcda29c6883d9efb143489 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?PKPM=20=E6=9E=84=E5=8A=9B=E7=BB=93=E6=9E=84?= <13466632+pkpmgh@user.noreply.gitee.com> Date: Fri, 29 May 2026 10:03:05 +0800 Subject: [PATCH] fix(streamable-http): drain SSE response to EOF instead of closing early The client closed the SSE response stream immediately after the JSON-RPC response/error event arrived (`await response.aclose()` / `break`), in three places: - _handle_sse_response (POST response) - _handle_reconnection (resumed GET stream) - _handle_resumption_request (explicit resumption GET stream) Aborting the stream before EOF leaves the underlying keepalive connection un-drained. With some servers this stalls the next request that reuses the connection by a fixed delay (~260ms observed against a FastMCP streamable-http server hosted on a background thread inside a desktop app; 37x slower per call). Drain the stream to its natural EOF instead: the caller is still unblocked as soon as the response event is routed to read_stream, and the connection is returned to the pool cleanly. Cancellation/shutdown still tears the stream down promptly because CancelledError propagates out of aiter_sse() and the enclosing `async with` closes the response. Co-Authored-By: Claude Opus 4.8 --- src/mcp/client/streamable_http.py | 36 ++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index aa3e50e07..8416874e2 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -241,15 +241,17 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: logger.debug("Resumption GET SSE connection established") async for sse in event_source.aiter_sse(): # pragma: no branch - is_complete = await self._handle_sse_event( + await self._handle_sse_event( sse, ctx.read_stream_writer, original_request_id, ctx.metadata.on_resumption_token_update if ctx.metadata else None, ) - if is_complete: - await event_source.response.aclose() - break + # Drain to EOF rather than breaking on completion (see + # _handle_sse_response): breaking leaves the enclosing + # `async with aconnect_sse(...)` to close an un-drained stream, + # which can stall the next request that reuses the connection. + # Cancellation still tears the stream down promptly. async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" @@ -340,6 +342,8 @@ async def _handle_sse_response( assert isinstance(ctx.session_message.message, JSONRPCRequest) original_request_id = ctx.session_message.message.id + response_complete = False + try: event_source = EventSource(response) async for sse in event_source.aiter_sse(): # pragma: no branch @@ -358,16 +362,20 @@ async def _handle_sse_response( resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), is_initialization=is_initialization, ) - # If the SSE event indicates completion, like returning response/error - # break the loop + # Once the response/error arrives the caller is unblocked, but we keep + # iterating so the SSE body is drained to EOF before the connection is + # released. Closing the response early (await response.aclose()) leaves + # the keepalive connection un-drained, which can stall the next request + # that reuses it. Cancellation still tears down promptly: CancelledError + # propagates out of aiter_sse() and the caller's + # `async with client.stream(...)` closes the response. if is_complete: - await response.aclose() - return # Normal completion, no reconnect needed + response_complete = True except Exception: logger.debug("SSE stream ended", exc_info=True) # pragma: no cover - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch + # Stream ended without a response - reconnect if we received an event with ID + if not response_complete and last_event_id is not None: logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) @@ -404,6 +412,7 @@ async def _handle_reconnection( # Track for potential further reconnection reconnect_last_event_id: str = last_event_id reconnect_retry_ms = retry_interval_ms + response_complete = False async for sse in event_source.aiter_sse(): if sse.id: # pragma: no branch @@ -417,9 +426,12 @@ async def _handle_reconnection( original_request_id, ctx.metadata.on_resumption_token_update if ctx.metadata else None, ) + # Drain to EOF instead of closing early (see _handle_sse_response). if is_complete: - await event_source.response.aclose() - return + response_complete = True + + if response_complete: + return # Stream ended again without response - reconnect again (reset attempt counter) logger.info("SSE stream disconnected, reconnecting...")