-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.py
More file actions
236 lines (204 loc) · 8.01 KB
/
parallel.py
File metadata and controls
236 lines (204 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
"""Parallel transformer implementation using multiple processes and loky."""
from collections import deque
from collections.abc import Callable
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import wait
import copy
import itertools
from typing import Any
from typing import Union
from typing import overload
from loky import get_reusable_executor
from laygo.context import ParallelContextManager
from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager
from laygo.errors import ErrorHandler
from laygo.transformers.transformer import ChunkErrorHandler
from laygo.transformers.transformer import InternalTransformer
from laygo.transformers.transformer import PipelineFunction
from laygo.transformers.transformer import Transformer
def _worker_process_chunk[In, Out](
transformer_logic: InternalTransformer[In, Out],
context_handle: IContextHandle,
chunk: list[In],
) -> list[Out]:
"""
Top-level function executed by each worker process.
It reconstructs the context proxy from the handle and runs the transformation.
"""
context_proxy = context_handle.create_proxy()
try:
return transformer_logic(chunk, context_proxy)
finally:
# The proxy's shutdown is a no-op, but it's good practice to call it.
context_proxy.shutdown()
def createParallelTransformer[T](
_type_hint: type[T],
max_workers: int = 4,
ordered: bool = True,
chunk_size: int | None = None,
) -> "ParallelTransformer[T, T]":
"""Create a new identity parallel transformer with an explicit type hint.
Args:
_type_hint: Type hint for the data being processed.
max_workers: Maximum number of worker processes.
ordered: Whether to preserve order of results.
chunk_size: Size of chunks to process data in.
Returns:
A new identity parallel transformer.
"""
return ParallelTransformer[T, T](
max_workers=max_workers,
ordered=ordered,
chunk_size=chunk_size,
transformer=None,
)
class ParallelTransformer[In, Out](Transformer[In, Out]):
"""A transformer that executes operations concurrently using multiple processes.
This transformer uses 'loky' to support dynamically created transformation
logic and provides true parallelism by bypassing Python's Global Interpreter
Lock (GIL). It's ideal for CPU-bound operations.
"""
def __init__(
self,
max_workers: int = 4,
ordered: bool = True,
chunk_size: int | None = None,
transformer: InternalTransformer[In, Out] | None = None,
) -> None:
"""Initialize the parallel transformer.
Args:
max_workers: Maximum number of worker processes.
ordered: If True, results are yielded in order. If False, results
are yielded as they complete.
chunk_size: Size of data chunks to process.
transformer: The transformation logic chain.
"""
super().__init__(chunk_size, transformer)
self.max_workers = max_workers
self.ordered = ordered
# Rule 3: Parallel transformers create a parallel context manager by default.
self._default_context = ParallelContextManager()
@classmethod
def from_transformer[T, U](
cls,
transformer: Transformer[T, U],
chunk_size: int | None = None,
max_workers: int = 4,
ordered: bool = True,
) -> "ParallelTransformer[T, U]":
"""Create a ParallelTransformer from an existing Transformer's logic.
Args:
transformer: The base transformer to copy the transformation logic from.
chunk_size: Optional chunk size override.
max_workers: Maximum number of worker processes.
ordered: If True, results are yielded in order.
Returns:
A new ParallelTransformer with the same transformation logic.
"""
return cls(
chunk_size=chunk_size or transformer.chunk_size,
transformer=copy.deepcopy(transformer.transformer), # type: ignore
max_workers=max_workers,
ordered=ordered,
)
def __call__(self, data: Iterable[In], context: IContextManager | None = None) -> Iterator[Out]:
"""Execute the transformer by distributing chunks to a process pool."""
run_context = context if context is not None else self._default_context
# Get the picklable handle from the context manager.
context_handle = run_context.get_handle()
executor = get_reusable_executor(max_workers=self.max_workers)
chunks_to_process = self._chunk_generator(data)
gen_func = self._ordered_generator if self.ordered else self._unordered_generator
try:
processed_chunks_iterator = gen_func(chunks_to_process, executor, context_handle)
for result_chunk in processed_chunks_iterator:
yield from result_chunk
finally:
if run_context is self._default_context:
self._default_context.shutdown()
def _ordered_generator(
self,
chunks_iter: Iterator[list[In]],
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results in their original order."""
futures = deque()
for _ in range(self.max_workers + 1):
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk))
except StopIteration:
break
while futures:
yield futures.popleft().result()
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk))
except StopIteration:
continue
def _unordered_generator(
self,
chunks_iter: Iterator[list[In]],
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results as they complete."""
futures = {
executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk)
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()
try:
chunk = next(chunks_iter)
futures.add(executor.submit(_worker_process_chunk, self.transformer, context_handle, chunk))
except StopIteration:
continue
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]":
super().on_error(handler)
return self
def map[U](self, function: PipelineFunction[Out, U]) -> "ParallelTransformer[In, U]":
super().map(function)
return self # type: ignore
def filter(self, predicate: PipelineFunction[Out, bool]) -> "ParallelTransformer[In, Out]":
super().filter(predicate)
return self
@overload
def flatten[T](self: "ParallelTransformer[In, list[T]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTransformer[In, T]": ...
@overload
def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ...
def flatten[T]( # type: ignore
self: Union[
"ParallelTransformer[In, list[T]]",
"ParallelTransformer[In, tuple[T, ...]]",
"ParallelTransformer[In, set[T]]",
],
) -> "ParallelTransformer[In, T]":
super().flatten() # type: ignore
return self # type: ignore
def tap(self, arg: Union["Transformer[Out, Any]", PipelineFunction[Out, Any]]) -> "ParallelTransformer[In, Out]":
super().tap(arg)
return self
def apply[T](
self, t: Callable[["ParallelTransformer[In, Out]"], "Transformer[In, T]"]
) -> "ParallelTransformer[In, T]":
super().apply(t) # type: ignore
return self # type: ignore
def catch[U](
self,
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
on_error: ChunkErrorHandler[Out, None] | None = None,
) -> "ParallelTransformer[In, U]":
super().catch(sub_pipeline_builder, on_error)
return self # type: ignore
def short_circuit(self, function: Callable[[IContextManager], bool | None]) -> "ParallelTransformer[In, Out]":
super().short_circuit(function)
return self