Skip to content

Commit 1a3d782

Browse files
committed
chore: transformer factory and tests
1 parent 51f101f commit 1a3d782

12 files changed

Lines changed: 347 additions & 353 deletions

laygo/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,20 @@
77
from laygo.pipeline import Pipeline
88
from laygo.transformers.http import HTTPTransformer
99
from laygo.transformers.parallel import ParallelTransformer
10+
from laygo.transformers.parallel import createParallelTransformer
1011
from laygo.transformers.threaded import ThreadedTransformer
12+
from laygo.transformers.threaded import createThreadedTransformer
1113
from laygo.transformers.transformer import Transformer
14+
from laygo.transformers.transformer import createTransformer
1215

1316
__all__ = [
1417
"Pipeline",
1518
"Transformer",
19+
"createTransformer",
1620
"ThreadedTransformer",
21+
"createThreadedTransformer",
1722
"ParallelTransformer",
23+
"createParallelTransformer",
1824
"HTTPTransformer",
1925
"PipelineContext",
2026
"ErrorHandler",

laygo/transformers/http.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,30 @@
3030
U = TypeVar("U")
3131

3232

33+
def createHTTPTransformer[T](
34+
_type_hint: type[T],
35+
base_url: str,
36+
chunk_size: int | None = None,
37+
endpoint: str | None = None,
38+
max_workers: int = 4,
39+
) -> "HTTPTransformer[T, T]":
40+
"""Create a new identity parallel transformer with an explicit type hint."""
41+
return HTTPTransformer[T, T](
42+
base_url=base_url,
43+
endpoint=endpoint,
44+
max_workers=max_workers,
45+
chunk_size=chunk_size,
46+
)
47+
48+
3349
class HTTPTransformer(Transformer[In, Out]):
3450
"""
3551
A self-sufficient, chainable transformer that manages its own
3652
distributed execution and worker endpoint definition.
3753
"""
3854

39-
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
40-
super().__init__()
55+
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8, chunk_size: int | None = None):
56+
super().__init__(chunk_size=chunk_size)
4157
self.base_url = base_url.rstrip("/")
4258
self.endpoint = endpoint
4359
self.max_workers = max_workers

laygo/transformers/parallel.py

Lines changed: 125 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,70 @@
1-
"""Parallel transformer implementation using multiple threads."""
1+
"""Parallel transformer implementation using multiple processes and loky."""
22

33
from collections import deque
44
from collections.abc import Callable
55
from collections.abc import Iterable
66
from collections.abc import Iterator
7+
from collections.abc import MutableMapping
78
from concurrent.futures import FIRST_COMPLETED
89
from concurrent.futures import Future
9-
from concurrent.futures import ThreadPoolExecutor
1010
from concurrent.futures import wait
1111
import copy
12-
from functools import partial
1312
import itertools
14-
import threading
13+
import multiprocessing as mp
1514
from typing import Any
1615
from typing import Union
1716
from typing import overload
1817

18+
from loky import ProcessPoolExecutor
19+
1920
from laygo.errors import ErrorHandler
2021
from laygo.helpers import PipelineContext
21-
from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE
2222
from laygo.transformers.transformer import ChunkErrorHandler
2323
from laygo.transformers.transformer import InternalTransformer
2424
from laygo.transformers.transformer import PipelineFunction
2525
from laygo.transformers.transformer import Transformer
2626

2727

28-
class ParallelPipelineContextType(PipelineContext):
29-
"""A specific context type for parallel transformers that includes a lock."""
28+
def _process_chunk_for_multiprocessing[In, Out](
29+
transformer: InternalTransformer[In, Out],
30+
shared_context: MutableMapping[str, Any],
31+
chunk: list[In],
32+
) -> list[Out]:
33+
"""
34+
Top-level function to process a single chunk.
35+
'loky' will use cloudpickle to serialize the 'transformer' object.
36+
"""
37+
return transformer(chunk, shared_context) # type: ignore
38+
3039

31-
lock: threading.Lock
40+
def createParallelTransformer[T](
41+
_type_hint: type[T],
42+
max_workers: int = 4,
43+
ordered: bool = True,
44+
chunk_size: int | None = None,
45+
) -> "ParallelTransformer[T, T]":
46+
"""Create a new identity parallel transformer with an explicit type hint."""
47+
return ParallelTransformer[T, T](
48+
max_workers=max_workers,
49+
ordered=ordered,
50+
chunk_size=chunk_size,
51+
transformer=None,
52+
)
3253

3354

3455
class ParallelTransformer[In, Out](Transformer[In, Out]):
3556
"""
36-
A transformer that executes operations concurrently using multiple threads.
57+
A transformer that executes operations concurrently using multiple processes.
58+
It uses 'loky' to support dynamically created transformation logic.
3759
"""
3860

3961
def __init__(
4062
self,
4163
max_workers: int = 4,
4264
ordered: bool = True,
43-
chunk_size: int = DEFAULT_CHUNK_SIZE,
65+
chunk_size: int | None = None,
4466
transformer: InternalTransformer[In, Out] | None = None,
4567
):
46-
"""
47-
Initialize the parallel transformer.
48-
49-
Args:
50-
max_workers: Maximum number of worker threads.
51-
ordered: If True, results are yielded in order. If False, results
52-
are yielded as they complete.
53-
chunk_size: Size of data chunks to process.
54-
transformer: The transformation logic chain.
55-
"""
5668
super().__init__(chunk_size, transformer)
5769
self.max_workers = max_workers
5870
self.ordered = ordered
@@ -65,18 +77,6 @@ def from_transformer[T, U](
6577
max_workers: int = 4,
6678
ordered: bool = True,
6779
) -> "ParallelTransformer[T, U]":
68-
"""
69-
Create a ParallelTransformer from an existing Transformer's logic.
70-
71-
Args:
72-
transformer: The base transformer to copy the transformation logic from.
73-
chunk_size: Optional chunk size override.
74-
max_workers: Maximum number of worker threads.
75-
ordered: If True, results are yielded in order.
76-
77-
Returns:
78-
A new ParallelTransformer with the same transformation logic.
79-
"""
8080
return cls(
8181
chunk_size=chunk_size or transformer.chunk_size,
8282
transformer=copy.deepcopy(transformer.transformer), # type: ignore
@@ -85,73 +85,101 @@ def from_transformer[T, U](
8585
)
8686

8787
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
88-
"""
89-
Executes the transformer on data concurrently.
90-
91-
A new `threading.Lock` is created and added to the context for each call
92-
to ensure execution runs are isolated and thread-safe.
93-
"""
94-
# Determine the context for this run, passing it by reference as requested.
95-
run_context = context or self.context
96-
# Add a per-call lock for thread safety.
97-
run_context["lock"] = threading.Lock()
98-
99-
def process_chunk(chunk: list[In], shared_context: PipelineContext) -> list[Out]:
100-
"""
101-
Process a single chunk by passing the chunk and context explicitly
102-
to the transformer chain. This is safer and avoids mutating self.
103-
"""
104-
return self.transformer(chunk, shared_context)
105-
106-
# Create a partial function with the run_context "baked in".
107-
process_chunk_with_context = partial(process_chunk, shared_context=run_context)
108-
109-
def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
110-
"""Generate results in their original order."""
111-
futures: deque[Future[list[Out]]] = deque()
112-
for _ in range(self.max_workers + 1):
113-
try:
114-
chunk = next(chunks_iter)
115-
futures.append(executor.submit(process_chunk_with_context, chunk))
116-
except StopIteration:
117-
break
118-
while futures:
119-
yield futures.popleft().result()
88+
"""Executes the transformer on data concurrently using processes."""
89+
with mp.Manager() as manager:
90+
initial_ctx_data = context if context is not None else self.context
91+
shared_context = manager.dict(initial_ctx_data)
92+
93+
if "lock" not in shared_context:
94+
shared_context["lock"] = manager.Lock()
95+
96+
try:
97+
with ProcessPoolExecutor(max_workers=self.max_workers) as executor:
98+
chunks_to_process = self._chunk_generator(data)
99+
gen_func = self._ordered_generator if self.ordered else self._unordered_generator
100+
processed_chunks_iterator = gen_func(chunks_to_process, executor, shared_context)
101+
102+
for result_chunk in processed_chunks_iterator:
103+
yield from result_chunk
104+
finally:
105+
if context is not None:
106+
final_context_state = dict(shared_context)
107+
final_context_state.pop("lock", None)
108+
# FIX 2: Do not clear the context, just update it.
109+
# This allows chained transformers to merge their context results.
110+
# context.clear()
111+
context.update(final_context_state)
112+
113+
# ... The rest of the file remains the same ...
114+
def _ordered_generator(
115+
self,
116+
chunks_iter: Iterator[list[In]],
117+
executor: ProcessPoolExecutor,
118+
shared_context: MutableMapping[str, Any],
119+
) -> Iterator[list[Out]]:
120+
"""Generate results in their original order."""
121+
futures: deque[Future[list[Out]]] = deque()
122+
for _ in range(self.max_workers + 1):
123+
try:
124+
chunk = next(chunks_iter)
125+
futures.append(
126+
executor.submit(
127+
_process_chunk_for_multiprocessing,
128+
self.transformer,
129+
shared_context,
130+
chunk,
131+
)
132+
)
133+
except StopIteration:
134+
break
135+
while futures:
136+
yield futures.popleft().result()
137+
try:
138+
chunk = next(chunks_iter)
139+
futures.append(
140+
executor.submit(
141+
_process_chunk_for_multiprocessing,
142+
self.transformer,
143+
shared_context,
144+
chunk,
145+
)
146+
)
147+
except StopIteration:
148+
continue
149+
150+
def _unordered_generator(
151+
self,
152+
chunks_iter: Iterator[list[In]],
153+
executor: ProcessPoolExecutor,
154+
shared_context: MutableMapping[str, Any],
155+
) -> Iterator[list[Out]]:
156+
"""Generate results as they complete."""
157+
futures = {
158+
executor.submit(
159+
_process_chunk_for_multiprocessing,
160+
self.transformer,
161+
shared_context,
162+
chunk,
163+
)
164+
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
165+
}
166+
while futures:
167+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
168+
for future in done:
169+
yield future.result()
120170
try:
121171
chunk = next(chunks_iter)
122-
futures.append(executor.submit(process_chunk_with_context, chunk))
172+
futures.add(
173+
executor.submit(
174+
_process_chunk_for_multiprocessing,
175+
self.transformer,
176+
shared_context,
177+
chunk,
178+
)
179+
)
123180
except StopIteration:
124181
continue
125182

126-
def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
127-
"""Generate results as they complete."""
128-
futures = {
129-
executor.submit(process_chunk_with_context, chunk)
130-
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
131-
}
132-
while futures:
133-
done, futures = wait(futures, return_when=FIRST_COMPLETED)
134-
for future in done:
135-
yield future.result()
136-
try:
137-
chunk = next(chunks_iter)
138-
futures.add(executor.submit(process_chunk_with_context, chunk))
139-
except StopIteration:
140-
continue
141-
142-
def result_iterator_manager() -> Iterator[Out]:
143-
"""Manage the thread pool and yield flattened results."""
144-
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
145-
chunks_to_process = self._chunk_generator(data)
146-
gen_func = _ordered_generator if self.ordered else _unordered_generator
147-
processed_chunks_iterator = gen_func(chunks_to_process, executor)
148-
for result_chunk in processed_chunks_iterator:
149-
yield from result_chunk
150-
151-
return result_iterator_manager()
152-
153-
# --- Overridden Chaining Methods to Preserve Type ---
154-
155183
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]":
156184
super().on_error(handler)
157185
return self
@@ -172,7 +200,9 @@ def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTrans
172200
def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ...
173201
def flatten[T]( # type: ignore
174202
self: Union[
175-
"ParallelTransformer[In, list[T]]", "ParallelTransformer[In, tuple[T, ...]]", "ParallelTransformer[In, set[T]]"
203+
"ParallelTransformer[In, list[T]]",
204+
"ParallelTransformer[In, tuple[T, ...]]",
205+
"ParallelTransformer[In, set[T]]",
176206
],
177207
) -> "ParallelTransformer[In, T]":
178208
super().flatten() # type: ignore

laygo/transformers/threaded.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,21 @@ class ThreadedPipelineContextType(PipelineContext):
3131
lock: threading.Lock
3232

3333

34+
def createThreadedTransformer[T](
35+
_type_hint: type[T],
36+
max_workers: int = 4,
37+
ordered: bool = True,
38+
chunk_size: int = DEFAULT_CHUNK_SIZE,
39+
) -> "ThreadedTransformer[T, T]":
40+
"""Create a new identity threaded transformer with an explicit type hint."""
41+
return ThreadedTransformer[T, T](
42+
max_workers=max_workers,
43+
ordered=ordered,
44+
chunk_size=chunk_size,
45+
transformer=None,
46+
)
47+
48+
3449
class ThreadedTransformer[In, Out](Transformer[In, Out]):
3550
"""
3651
A transformer that executes operations concurrently using multiple threads.
@@ -40,7 +55,7 @@ def __init__(
4055
self,
4156
max_workers: int = 4,
4257
ordered: bool = True,
43-
chunk_size: int = DEFAULT_CHUNK_SIZE,
58+
chunk_size: int | None = None,
4459
transformer: InternalTransformer[In, Out] | None = None,
4560
):
4661
"""

0 commit comments

Comments
 (0)