Skip to content

Commit 493df65

Browse files
authored
feat: implement AsyncMultiRangeDownloader with multiplexed bidi-gRPC stream support (#16528)
This PR implements **`AsyncMultiRangeDownloader`** with a new **`_StreamMultiplexer`**, enabling multiple concurrent range downloads to share a single bidirectional gRPC stream. ### Before vs. After | Feature | Before | After (This PR) | | :--- | :--- | :--- | | **Concurrency** | Sequential or multiple connections | Concurrent over **one** connection | | **Overhead** | High (multiple gRPC streams) | Low (multiplexed single stream) | | **Reliability** | Per-stream retry logic | Unified generation-gated reopening | ### How it works The system uses a background **`_StreamMultiplexer`** to manage the shared bidirectional stream: 1. **Requests**: Concurrent tasks send range requests (`BidiReadObjectRequest`) directly to the shared stream. 2. **Multiplexing**: A background **Recv Loop** listens for all responses. It uses the `read_id` in each response to route data to the correct task-specific `asyncio.Queue`. 3. **Error Handling**: If the stream breaks, a **generation-gated lock** ensures the stream is reopened only once. All active tasks receive a `_StreamError` and automatically retry using the new stream generation. --- **Key Changes:** - **`_StreamMultiplexer`**: Background receiver loop for routing responses. - **Generation-Gated Reopening**: Coordinates stream recovery across concurrent tasks. - **`AsyncMultiRangeDownloader` Integration**: Full support for concurrent `download_ranges` calls.
1 parent 33e4a6f commit 493df65

5 files changed

Lines changed: 1202 additions & 172 deletions

File tree

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import asyncio
18+
import logging
19+
from typing import Awaitable, Callable, Dict, Optional, Set
20+
21+
import grpc
22+
23+
from google.cloud import _storage_v2
24+
from google.cloud.storage.asyncio.async_read_object_stream import (
25+
_AsyncReadObjectStream,
26+
)
27+
28+
logger = logging.getLogger(__name__)
29+
30+
_DEFAULT_QUEUE_MAX_SIZE = 100
31+
_DEFAULT_PUT_TIMEOUT_SECONDS = 20.0
32+
33+
34+
class _StreamError:
35+
"""Wraps an error with the stream generation that produced it."""
36+
37+
def __init__(self, exception: Exception, generation: int):
38+
self.exception = exception
39+
self.generation = generation
40+
41+
42+
class _StreamEnd:
43+
"""Signals the stream closed normally."""
44+
45+
pass
46+
47+
48+
class _StreamMultiplexer:
49+
"""Multiplexes concurrent download tasks over a single bidi-gRPC stream.
50+
51+
Routes responses from a background recv loop to per-task asyncio.Queues
52+
keyed by read_id. Coordinates stream reopening via generation-gated
53+
locking.
54+
55+
A slow consumer on one task will slow down the entire shared connection
56+
due to bounded queue backpressure propagating through gRPC flow control.
57+
"""
58+
59+
def __init__(
60+
self,
61+
stream: _AsyncReadObjectStream,
62+
queue_max_size: int = _DEFAULT_QUEUE_MAX_SIZE,
63+
):
64+
self._stream = stream
65+
self._stream_generation: int = 0
66+
self._queues: Dict[int, asyncio.Queue] = {}
67+
self._reopen_lock = asyncio.Lock()
68+
self._recv_task: Optional[asyncio.Task] = None
69+
self._queue_max_size = queue_max_size
70+
71+
@property
72+
def stream_generation(self) -> int:
73+
return self._stream_generation
74+
75+
def register(self, read_ids: Set[int]) -> asyncio.Queue:
76+
"""Register read_ids for a task and return its response queue."""
77+
queue = asyncio.Queue(maxsize=self._queue_max_size)
78+
for read_id in read_ids:
79+
self._queues[read_id] = queue
80+
return queue
81+
82+
def unregister(self, read_ids: Set[int]) -> None:
83+
"""Remove read_ids from routing."""
84+
for read_id in read_ids:
85+
self._queues.pop(read_id, None)
86+
87+
def _get_unique_queues(self) -> Set[asyncio.Queue]:
88+
return set(self._queues.values())
89+
90+
async def _put_with_timeout(self, queue: asyncio.Queue, item) -> None:
91+
try:
92+
await asyncio.wait_for(
93+
queue.put(item), timeout=_DEFAULT_PUT_TIMEOUT_SECONDS
94+
)
95+
except asyncio.TimeoutError:
96+
if queue not in self._get_unique_queues():
97+
logger.debug("Dropped item for unregistered queue.")
98+
else:
99+
logger.warning(
100+
"Queue full for too long. Dropping item to prevent multiplexer hang."
101+
)
102+
103+
def _ensure_recv_loop(self) -> None:
104+
if self._recv_task is None or self._recv_task.done():
105+
self._recv_task = asyncio.create_task(self._recv_loop())
106+
107+
def _stop_recv_loop(self) -> None:
108+
if self._recv_task and not self._recv_task.done():
109+
self._recv_task.cancel()
110+
111+
def _put_error_nowait(self, queue: asyncio.Queue, error: _StreamError) -> None:
112+
while True:
113+
try:
114+
queue.put_nowait(error)
115+
break
116+
except asyncio.QueueFull:
117+
try:
118+
queue.get_nowait()
119+
except asyncio.QueueEmpty:
120+
pass
121+
122+
async def _recv_loop(self) -> None:
123+
try:
124+
while True:
125+
response = await self._stream.recv()
126+
if response == grpc.aio.EOF:
127+
sentinel = _StreamEnd()
128+
await asyncio.gather(
129+
*(
130+
self._put_with_timeout(queue, sentinel)
131+
for queue in self._get_unique_queues()
132+
)
133+
)
134+
return
135+
136+
if response.object_data_ranges:
137+
queues_to_notify: Set[asyncio.Queue] = set()
138+
for data_range in response.object_data_ranges:
139+
read_id = data_range.read_range.read_id
140+
queue = self._queues.get(read_id)
141+
if queue:
142+
queues_to_notify.add(queue)
143+
else:
144+
logger.warning(
145+
f"Received data for unregistered read_id: {read_id}"
146+
)
147+
await asyncio.gather(
148+
*(
149+
self._put_with_timeout(queue, response)
150+
for queue in queues_to_notify
151+
)
152+
)
153+
else:
154+
await asyncio.gather(
155+
*(
156+
self._put_with_timeout(queue, response)
157+
for queue in self._get_unique_queues()
158+
)
159+
)
160+
except asyncio.CancelledError:
161+
raise
162+
except Exception as e:
163+
logger.warning(f"Stream multiplexer recv loop failed: {e}", exc_info=True)
164+
error = _StreamError(e, self._stream_generation)
165+
for queue in self._get_unique_queues():
166+
self._put_error_nowait(queue, error)
167+
168+
async def send(self, request: _storage_v2.BidiReadObjectRequest) -> int:
169+
self._ensure_recv_loop()
170+
await self._stream.send(request)
171+
return self._stream_generation
172+
173+
async def reopen_stream(
174+
self,
175+
broken_generation: int,
176+
stream_factory: Callable[[], Awaitable[_AsyncReadObjectStream]],
177+
) -> None:
178+
async with self._reopen_lock:
179+
if self._stream_generation != broken_generation:
180+
return
181+
self._stop_recv_loop()
182+
if self._recv_task:
183+
try:
184+
await self._recv_task
185+
except (asyncio.CancelledError, Exception):
186+
pass
187+
error = _StreamError(Exception("Stream reopening"), self._stream_generation)
188+
for queue in self._get_unique_queues():
189+
self._put_error_nowait(queue, error)
190+
try:
191+
await self._stream.close()
192+
except Exception:
193+
pass
194+
self._stream = await stream_factory()
195+
self._stream_generation += 1
196+
self._ensure_recv_loop()
197+
198+
async def close(self) -> None:
199+
self._stop_recv_loop()
200+
if self._recv_task:
201+
try:
202+
await self._recv_task
203+
except (asyncio.CancelledError, Exception):
204+
pass
205+
error = _StreamError(Exception("Multiplexer closed"), self._stream_generation)
206+
for queue in self._get_unique_queues():
207+
self._put_error_nowait(queue, error)

0 commit comments

Comments
 (0)