|
13 | 13 |
|
14 | 14 | from laygo.helpers import PipelineContext |
15 | 15 | from laygo.helpers import is_context_aware |
16 | | -from laygo.transformers.threaded import ThreadedTransformer |
17 | 16 | from laygo.transformers.transformer import Transformer |
18 | 17 |
|
19 | 18 | T = TypeVar("T") |
@@ -208,16 +207,47 @@ def stream_from_queue() -> Iterator[T]: |
208 | 207 |
|
209 | 208 | return final_results |
210 | 209 |
|
211 | | - def buffer(self, size: int) -> "Pipeline[T]": |
212 | | - """Buffer the pipeline using threaded processing. |
| 210 | + def buffer(self, size: int, batch_size: int = 1000) -> "Pipeline[T]": |
| 211 | + """Inserts a buffer in the pipeline to allow downstream processing to read ahead. |
| 212 | +
|
| 213 | + This creates a background thread that reads from the upstream data source |
| 214 | + and fills a queue, decoupling the upstream and downstream stages. |
213 | 215 |
|
214 | 216 | Args: |
215 | | - size: The number of worker threads to use for buffering. |
| 217 | + size: The number of **batches** to hold in the buffer. |
| 218 | + batch_size: The number of items to accumulate per batch. |
216 | 219 |
|
217 | 220 | Returns: |
218 | 221 | The pipeline instance for method chaining. |
219 | 222 | """ |
220 | | - self.apply(ThreadedTransformer(max_workers=size)) |
| 223 | + source_iterator = self.processed_data |
| 224 | + |
| 225 | + def _buffered_stream() -> Iterator[T]: |
| 226 | + queue = Queue(maxsize=size) |
| 227 | + # We only need one background thread for the producer. |
| 228 | + executor = ThreadPoolExecutor(max_workers=1) |
| 229 | + |
| 230 | + def _producer() -> None: |
| 231 | + """The producer reads from the source and fills the queue.""" |
| 232 | + try: |
| 233 | + for batch_tuple in itertools.batched(source_iterator, batch_size): |
| 234 | + queue.put(list(batch_tuple)) |
| 235 | + finally: |
| 236 | + # Always put the sentinel value to signal the end of the stream. |
| 237 | + queue.put(None) |
| 238 | + |
| 239 | + # Start the producer in the background thread. |
| 240 | + executor.submit(_producer) |
| 241 | + |
| 242 | + try: |
| 243 | + # The main thread becomes the consumer. |
| 244 | + while (batch := queue.get()) is not None: |
| 245 | + yield from batch |
| 246 | + finally: |
| 247 | + # Ensure the background thread is cleaned up. |
| 248 | + executor.shutdown(wait=False, cancel_futures=True) |
| 249 | + |
| 250 | + self.processed_data = _buffered_stream() |
221 | 251 | return self |
222 | 252 |
|
223 | 253 | def __iter__(self) -> Iterator[T]: |
|
0 commit comments