Skip to content

Commit 469a94a

Browse files
committed
fix: branch tests
1 parent 7b9d7c5 commit 469a94a

2 files changed

Lines changed: 4 additions & 48 deletions

File tree

laygo/pipeline.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from typing import overload
1212

1313
from laygo.context import IContextManager
14-
from laygo.context import SimpleContextManager
14+
from laygo.context.parallel import ParallelContextManager
1515
from laygo.helpers import is_context_aware
1616
from laygo.transformers.transformer import Transformer
1717
from laygo.transformers.transformer import passthrough_chunks
@@ -51,7 +51,7 @@ def __init__(self, *data: Iterable[T], context_manager: IContextManager | None =
5151
*data: One or more iterable data sources. If multiple sources are
5252
provided, they will be chained together.
5353
context_manager: An instance of a class that implements IContextManager.
54-
If None, a SimpleContextManager is used by default.
54+
If None, a ParallelContextManager is used by default.
5555
5656
Raises:
5757
ValueError: If no data sources are provided.
@@ -62,7 +62,7 @@ def __init__(self, *data: Iterable[T], context_manager: IContextManager | None =
6262
self.processed_data: Iterator = iter(self.data_source)
6363

6464
# Rule 1: Pipeline creates a simple context manager by default.
65-
self.context_manager = context_manager or SimpleContextManager()
65+
self.context_manager = context_manager or ParallelContextManager()
6666

6767
def __del__(self) -> None:
6868
"""Clean up the context manager when the pipeline is destroyed."""
@@ -244,7 +244,7 @@ def consumer(transformer: Transformer, queue: Queue) -> list[Any]:
244244

245245
def stream_from_queue() -> Iterator[T]:
246246
while (batch := queue.get()) is not None:
247-
yield from batch
247+
yield batch
248248

249249
if use_queue_chunks:
250250
transformer = transformer.set_chunker(passthrough_chunks)

tests/test_threaded_transformer.py

Lines changed: 0 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,10 @@
22

33
import threading
44
import time
5-
from unittest.mock import patch
65

76
from laygo import ErrorHandler
87
from laygo import PipelineContext
98
from laygo import ThreadedTransformer
10-
from laygo import Transformer
119
from laygo.transformers.threaded import createThreadedTransformer
1210
from laygo.transformers.transformer import createTransformer
1311

@@ -170,48 +168,6 @@ def test_unordered_vs_ordered_same_elements(self):
170168
assert ordered_result == [x * 2 for x in data] # Ordered maintains sequence
171169

172170

173-
class TestThreadedTransformerPerformance:
174-
"""Test performance aspects of parallel transformer."""
175-
176-
def test_concurrent_performance_improvement(self):
177-
"""Test that concurrent execution improves performance for slow operations."""
178-
179-
def slow_operation(x: int) -> int:
180-
time.sleep(0.01) # 10ms delay
181-
return x * 2
182-
183-
data = list(range(8)) # 8 items, 80ms total sequential time
184-
185-
# Sequential execution
186-
start_time = time.time()
187-
sequential = Transformer[int, int](chunk_size=4)
188-
seq_result = list(sequential.map(slow_operation)(data))
189-
seq_time = time.time() - start_time
190-
191-
# Concurrent execution
192-
start_time = time.time()
193-
concurrent = ThreadedTransformer[int, int](max_workers=4, chunk_size=4)
194-
conc_result = list(concurrent.map(slow_operation)(data))
195-
conc_time = time.time() - start_time
196-
197-
assert seq_result == conc_result
198-
assert conc_time < seq_time * 0.8 # At least 20% faster
199-
200-
def test_thread_pool_management(self):
201-
"""Test that thread pool is properly created and cleaned up."""
202-
with patch("laygo.transformers.threaded.ThreadPoolExecutor") as mock_executor:
203-
mock_executor.return_value.__enter__.return_value = mock_executor.return_value
204-
mock_executor.return_value.__exit__.return_value = None
205-
mock_executor.return_value.submit.return_value.result.return_value = [2, 4]
206-
207-
transformer = ThreadedTransformer[int, int](max_workers=2, chunk_size=2)
208-
list(transformer([1, 2]))
209-
210-
mock_executor.assert_called_with(max_workers=2)
211-
mock_executor.return_value.__enter__.assert_called_once()
212-
mock_executor.return_value.__exit__.assert_called_once()
213-
214-
215171
class TestThreadedTransformerChunking:
216172
"""Test chunking behavior with concurrent execution."""
217173

0 commit comments

Comments
 (0)