diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index aa3e50e07..8416874e2 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -241,15 +241,17 @@ async def _handle_resumption_request(self, ctx: RequestContext) -> None: logger.debug("Resumption GET SSE connection established") async for sse in event_source.aiter_sse(): # pragma: no branch - is_complete = await self._handle_sse_event( + await self._handle_sse_event( sse, ctx.read_stream_writer, original_request_id, ctx.metadata.on_resumption_token_update if ctx.metadata else None, ) - if is_complete: - await event_source.response.aclose() - break + # Drain to EOF rather than breaking on completion (see + # _handle_sse_response): breaking leaves the enclosing + # `async with aconnect_sse(...)` to close an un-drained stream, + # which can stall the next request that reuses the connection. + # Cancellation still tears the stream down promptly. async def _handle_post_request(self, ctx: RequestContext) -> None: """Handle a POST request with response processing.""" @@ -340,6 +342,8 @@ async def _handle_sse_response( assert isinstance(ctx.session_message.message, JSONRPCRequest) original_request_id = ctx.session_message.message.id + response_complete = False + try: event_source = EventSource(response) async for sse in event_source.aiter_sse(): # pragma: no branch @@ -358,16 +362,20 @@ async def _handle_sse_response( resumption_callback=(ctx.metadata.on_resumption_token_update if ctx.metadata else None), is_initialization=is_initialization, ) - # If the SSE event indicates completion, like returning response/error - # break the loop + # Once the response/error arrives the caller is unblocked, but we keep + # iterating so the SSE body is drained to EOF before the connection is + # released. Closing the response early (await response.aclose()) leaves + # the keepalive connection un-drained, which can stall the next request + # that reuses it. Cancellation still tears down promptly: CancelledError + # propagates out of aiter_sse() and the caller's + # `async with client.stream(...)` closes the response. if is_complete: - await response.aclose() - return # Normal completion, no reconnect needed + response_complete = True except Exception: logger.debug("SSE stream ended", exc_info=True) # pragma: no cover - # Stream ended without response - reconnect if we received an event with ID - if last_event_id is not None: # pragma: no branch + # Stream ended without a response - reconnect if we received an event with ID + if not response_complete and last_event_id is not None: logger.info("SSE stream disconnected, reconnecting...") await self._handle_reconnection(ctx, last_event_id, retry_interval_ms) @@ -404,6 +412,7 @@ async def _handle_reconnection( # Track for potential further reconnection reconnect_last_event_id: str = last_event_id reconnect_retry_ms = retry_interval_ms + response_complete = False async for sse in event_source.aiter_sse(): if sse.id: # pragma: no branch @@ -417,9 +426,12 @@ async def _handle_reconnection( original_request_id, ctx.metadata.on_resumption_token_update if ctx.metadata else None, ) + # Drain to EOF instead of closing early (see _handle_sse_response). if is_complete: - await event_source.response.aclose() - return + response_complete = True + + if response_complete: + return # Stream ended again without response - reconnect again (reset attempt counter) logger.info("SSE stream disconnected, reconnecting...")