Skip to content

Commit 51f101f

Browse files
committed
chore: created a threaded transformer (a copy)
1 parent ce55401 commit 51f101f

3 files changed

Lines changed: 532 additions & 0 deletions

File tree

laygo/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
from laygo.pipeline import Pipeline
88
from laygo.transformers.http import HTTPTransformer
99
from laygo.transformers.parallel import ParallelTransformer
10+
from laygo.transformers.threaded import ThreadedTransformer
1011
from laygo.transformers.transformer import Transformer
1112

1213
__all__ = [
1314
"Pipeline",
1415
"Transformer",
16+
"ThreadedTransformer",
1517
"ParallelTransformer",
1618
"HTTPTransformer",
1719
"PipelineContext",

laygo/transformers/threaded.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
"""Parallel transformer implementation using multiple threads."""
2+
3+
from collections import deque
4+
from collections.abc import Callable
5+
from collections.abc import Iterable
6+
from collections.abc import Iterator
7+
from concurrent.futures import FIRST_COMPLETED
8+
from concurrent.futures import Future
9+
from concurrent.futures import ThreadPoolExecutor
10+
from concurrent.futures import wait
11+
import copy
12+
from functools import partial
13+
import itertools
14+
import threading
15+
from typing import Any
16+
from typing import Union
17+
from typing import overload
18+
19+
from laygo.errors import ErrorHandler
20+
from laygo.helpers import PipelineContext
21+
from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE
22+
from laygo.transformers.transformer import ChunkErrorHandler
23+
from laygo.transformers.transformer import InternalTransformer
24+
from laygo.transformers.transformer import PipelineFunction
25+
from laygo.transformers.transformer import Transformer
26+
27+
28+
class ThreadedPipelineContextType(PipelineContext):
29+
"""A specific context type for threaded transformers that includes a lock."""
30+
31+
lock: threading.Lock
32+
33+
34+
class ThreadedTransformer[In, Out](Transformer[In, Out]):
35+
"""
36+
A transformer that executes operations concurrently using multiple threads.
37+
"""
38+
39+
def __init__(
40+
self,
41+
max_workers: int = 4,
42+
ordered: bool = True,
43+
chunk_size: int = DEFAULT_CHUNK_SIZE,
44+
transformer: InternalTransformer[In, Out] | None = None,
45+
):
46+
"""
47+
Initialize the threaded transformer.
48+
49+
Args:
50+
max_workers: Maximum number of worker threads.
51+
ordered: If True, results are yielded in order. If False, results
52+
are yielded as they complete.
53+
chunk_size: Size of data chunks to process.
54+
transformer: The transformation logic chain.
55+
"""
56+
super().__init__(chunk_size, transformer)
57+
self.max_workers = max_workers
58+
self.ordered = ordered
59+
60+
@classmethod
61+
def from_transformer[T, U](
62+
cls,
63+
transformer: Transformer[T, U],
64+
chunk_size: int | None = None,
65+
max_workers: int = 4,
66+
ordered: bool = True,
67+
) -> "ThreadedTransformer[T, U]":
68+
"""
69+
Create a ThreadedTransformer from an existing Transformer's logic.
70+
71+
Args:
72+
transformer: The base transformer to copy the transformation logic from.
73+
chunk_size: Optional chunk size override.
74+
max_workers: Maximum number of worker threads.
75+
ordered: If True, results are yielded in order.
76+
77+
Returns:
78+
A new ThreadedTransformer with the same transformation logic.
79+
"""
80+
return cls(
81+
chunk_size=chunk_size or transformer.chunk_size,
82+
transformer=copy.deepcopy(transformer.transformer), # type: ignore
83+
max_workers=max_workers,
84+
ordered=ordered,
85+
)
86+
87+
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
88+
"""
89+
Executes the transformer on data concurrently.
90+
91+
A new `threading.Lock` is created and added to the context for each call
92+
to ensure execution runs are isolated and thread-safe.
93+
"""
94+
# Determine the context for this run, passing it by reference as requested.
95+
run_context = context or self.context
96+
# Add a per-call lock for thread safety.
97+
run_context["lock"] = threading.Lock()
98+
99+
def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]:
100+
"""
101+
Process a single chunk by passing the chunk and context explicitly
102+
to the transformer chain. This is safer and avoids mutating self.
103+
"""
104+
return self.transformer(chunk, shared_context)
105+
106+
# Create a partial function with the run_context "baked in".
107+
process_chunk_with_context = partial(process_chunk, shared_context=run_context)
108+
109+
def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
110+
"""Generate results in their original order."""
111+
futures: deque[Future[list[Out]]] = deque()
112+
for _ in range(self.max_workers + 1):
113+
try:
114+
chunk = next(chunks_iter)
115+
futures.append(executor.submit(process_chunk_with_context, chunk))
116+
except StopIteration:
117+
break
118+
while futures:
119+
yield futures.popleft().result()
120+
try:
121+
chunk = next(chunks_iter)
122+
futures.append(executor.submit(process_chunk_with_context, chunk))
123+
except StopIteration:
124+
continue
125+
126+
def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
127+
"""Generate results as they complete."""
128+
futures = {
129+
executor.submit(process_chunk_with_context, chunk)
130+
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
131+
}
132+
while futures:
133+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
134+
for future in done:
135+
yield future.result()
136+
try:
137+
chunk = next(chunks_iter)
138+
futures.add(executor.submit(process_chunk_with_context, chunk))
139+
except StopIteration:
140+
continue
141+
142+
def result_iterator_manager() -> Iterator[Out]:
143+
"""Manage the thread pool and yield flattened results."""
144+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
145+
chunks_to_process = self._chunk_generator(data)
146+
gen_func = _ordered_generator if self.ordered else _unordered_generator
147+
processed_chunks_iterator = gen_func(chunks_to_process, executor)
148+
for result_chunk in processed_chunks_iterator:
149+
yield from result_chunk
150+
151+
return result_iterator_manager()
152+
153+
# --- Overridden Chaining Methods to Preserve Type ---
154+
155+
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ThreadedTransformer[In, Out]":
156+
super().on_error(handler)
157+
return self
158+
159+
def map[U](self, function: PipelineFunction[Out, U]) -> "ThreadedTransformer[In, U]":
160+
super().map(function)
161+
return self # type: ignore
162+
163+
def filter(self, predicate: PipelineFunction[Out, bool]) -> "ThreadedTransformer[In, Out]":
164+
super().filter(predicate)
165+
return self
166+
167+
@overload
168+
def flatten[T](self: "ThreadedTransformer[In, list[T]]") -> "ThreadedTransformer[In, T]": ...
169+
@overload
170+
def flatten[T](self: "ThreadedTransformer[In, tuple[T, ...]]") -> "ThreadedTransformer[In, T]": ...
171+
@overload
172+
def flatten[T](self: "ThreadedTransformer[In, set[T]]") -> "ThreadedTransformer[In, T]": ...
173+
def flatten[T]( # type: ignore
174+
self: Union[
175+
"ThreadedTransformer[In, list[T]]", "ThreadedTransformer[In, tuple[T, ...]]", "ThreadedTransformer[In, set[T]]"
176+
],
177+
) -> "ThreadedTransformer[In, T]":
178+
super().flatten() # type: ignore
179+
return self # type: ignore
180+
181+
def tap(self, function: PipelineFunction[Out, Any]) -> "ThreadedTransformer[In, Out]":
182+
super().tap(function)
183+
return self
184+
185+
def apply[T](
186+
self, t: Callable[["ThreadedTransformer[In, Out]"], "Transformer[In, T]"]
187+
) -> "ThreadedTransformer[In, T]":
188+
super().apply(t) # type: ignore
189+
return self # type: ignore
190+
191+
def catch[U](
192+
self,
193+
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
194+
on_error: ChunkErrorHandler[Out, U] | None = None,
195+
) -> "ThreadedTransformer[In, U]":
196+
super().catch(sub_pipeline_builder, on_error)
197+
return self # type: ignore
198+
199+
def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "ThreadedTransformer[In, Out]":
200+
super().short_circuit(function)
201+
return self

0 commit comments

Comments
 (0)