-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocess.py
More file actions
113 lines (95 loc) · 3.35 KB
/
process.py
File metadata and controls
113 lines (95 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from collections import deque
from collections.abc import Iterator
from concurrent.futures import wait
import itertools
from loky import as_completed
from loky import get_reusable_executor
from laygo.context.types import IContextHandle
from laygo.context.types import IContextManager
from laygo.transformers.strategies.types import ExecutionStrategy
from laygo.transformers.types import InternalTransformer
def _worker_process_chunk[In, Out](
transformer_logic: InternalTransformer[In, Out],
context_handle: IContextHandle,
chunk: list[In],
) -> list[Out]:
"""Top-level function executed by each worker process."""
context_proxy = context_handle.create_proxy()
try:
return transformer_logic(chunk, context_proxy)
finally:
context_proxy.shutdown()
class ProcessStrategy[In, Out](ExecutionStrategy[In, Out]):
"""Execute transformer logic using a process pool."""
def __init__(self, max_workers: int = 4, ordered: bool = True):
self.max_workers = max_workers
self.ordered = ordered
def execute(
self,
transformer_logic: InternalTransformer[In, Out],
chunks: Iterator[list[In]],
context: IContextManager,
) -> Iterator[list[Out]]:
"""Execute the transformer by distributing chunks to a process pool."""
context_handle = context.get_handle()
executor = get_reusable_executor(max_workers=self.max_workers)
gen_func = self._ordered_generator if self.ordered else self._unordered_generator
yield from gen_func(chunks, transformer_logic, executor, context_handle)
def _ordered_generator(
self,
chunks_iter: Iterator[list[In]],
transformer: InternalTransformer[In, Out],
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results in their original order, with robust error handling."""
futures = deque()
chunks_iter = iter(chunks_iter)
# Submit the initial batch of tasks
for _ in range(self.max_workers + 1):
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
break
try:
while futures:
result = futures.popleft().result()
try:
chunk = next(chunks_iter)
futures.append(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
pass
yield result
finally:
for future in futures:
future.cancel()
if futures:
wait(list(futures))
def _unordered_generator(
self,
chunks_iter: Iterator[list[In]],
transformer: InternalTransformer[In, Out],
executor,
context_handle: IContextHandle,
) -> Iterator[list[Out]]:
"""Generate results as they complete, with robust error handling."""
futures = {
executor.submit(_worker_process_chunk, transformer, context_handle, chunk)
for chunk in itertools.islice(chunks_iter, self.max_workers + 1)
}
try:
for future in as_completed(futures):
result = future.result()
futures.remove(future)
try:
chunk = next(chunks_iter)
futures.add(executor.submit(_worker_process_chunk, transformer, context_handle, chunk))
except StopIteration:
pass
yield result
finally:
for future in futures:
future.cancel()
if futures:
wait(futures)