-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreaded.py
More file actions
138 lines (114 loc) · 4.88 KB
/
threaded.py
File metadata and controls
138 lines (114 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from collections import deque
from collections.abc import Iterable
from collections.abc import Iterator
from concurrent.futures import FIRST_COMPLETED
from concurrent.futures import Future
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait
import itertools
import threading
from typing import ClassVar
from laygo.context.types import IContextManager
from laygo.transformers.strategies.types import ChunkGenerator
from laygo.transformers.strategies.types import ExecutionStrategy
from laygo.transformers.types import InternalTransformer
class ThreadedStrategy[In, Out](ExecutionStrategy[In, Out]):
# Class-level thread pool cache to reuse executors
_thread_pools: ClassVar[dict[int, ThreadPoolExecutor]] = {}
_pool_lock: ClassVar[threading.Lock] = threading.Lock()
def __init__(self, max_workers: int = 4, ordered: bool = True):
self.max_workers = max_workers
self.ordered = ordered
@classmethod
def _get_thread_pool(cls, max_workers: int) -> ThreadPoolExecutor:
"""Get or create a reusable thread pool for the given worker count."""
with cls._pool_lock:
if max_workers not in cls._thread_pools:
cls._thread_pools[max_workers] = ThreadPoolExecutor(
max_workers=max_workers, thread_name_prefix=f"laygo-{max_workers}"
)
return cls._thread_pools[max_workers]
def execute(self, transformer_logic, chunk_generator, data, context):
"""Execute the transformer on data concurrently.
Uses a reusable thread pool to minimize thread creation overhead.
Args:
transformer_logic: The transformation function to apply.
chunk_generator: Function to generate data chunks.
data: The input data to process.
context: Optional pipeline context for shared state.
Returns:
An iterator over the transformed data.
"""
yield from self._execute_with_context(data, transformer_logic, context, chunk_generator)
def _execute_with_context(
self,
data: Iterable[In],
transformer: InternalTransformer[In, Out],
shared_context: IContextManager,
chunk_generator: ChunkGenerator[In],
) -> Iterator[Out]:
"""Execute the transformation logic with a given context.
Args:
data: The input data to process.
transformer: The transformation function to apply.
shared_context: The shared context for the execution.
chunk_generator: Function to generate data chunks.
Returns:
An iterator over the transformed data.
"""
def process_chunk(chunk: list[In]) -> list[Out]:
"""Process a single chunk by passing the chunk and context explicitly.
Args:
chunk: The data chunk to process.
shared_context: The shared context for processing.
Returns:
The processed chunk.
"""
return transformer(chunk, shared_context) # type: ignore
def _ordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
"""Generate results in their original order."""
futures: deque[Future[list[Out]]] = deque()
# Pre-submit initial batch of futures
for _ in range(min(self.max_workers, 10)): # Limit initial submissions
try:
chunk = next(chunks_iter)
futures.append(executor.submit(process_chunk, chunk))
except StopIteration:
break
while futures:
# Get the next result and submit the next chunk
result = futures.popleft().result()
yield result
try:
chunk = next(chunks_iter)
futures.append(executor.submit(process_chunk, chunk))
except StopIteration:
continue
def _unordered_generator(chunks_iter: Iterator[list[In]], executor: ThreadPoolExecutor) -> Iterator[list[Out]]:
"""Generate results as they complete."""
# Pre-submit initial batch
futures = {
executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunks_iter, min(self.max_workers, 10))
}
while futures:
done, futures = wait(futures, return_when=FIRST_COMPLETED)
for future in done:
yield future.result()
try:
chunk = next(chunks_iter)
futures.add(executor.submit(process_chunk, chunk))
except StopIteration:
continue
# Use the reusable thread pool instead of creating a new one
executor = self._get_thread_pool(self.max_workers)
chunks_to_process = chunk_generator(data)
gen_func = _ordered_generator if self.ordered else _unordered_generator
# Process chunks using the reusable executor
for result_chunk in gen_func(chunks_to_process, executor):
yield from result_chunk
def __del__(self) -> None:
"""Shutdown all cached thread pools. Call this during application cleanup."""
with self._pool_lock:
for pool in self._thread_pools.values():
pool.shutdown(wait=True)
self._thread_pools.clear()