Skip to content

Commit d706188

Browse files
authored
Merge pull request #2 from ringoldsdev/feat/20250718/process-transformer
Feat/20250718/process transformer
2 parents ce55401 + ec2fe3d commit d706188

13 files changed

Lines changed: 949 additions & 375 deletions

laygo/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,24 @@
66
from laygo.helpers import PipelineContext
77
from laygo.pipeline import Pipeline
88
from laygo.transformers.http import HTTPTransformer
9+
from laygo.transformers.http import createHTTPTransformer
910
from laygo.transformers.parallel import ParallelTransformer
11+
from laygo.transformers.parallel import createParallelTransformer
12+
from laygo.transformers.threaded import ThreadedTransformer
13+
from laygo.transformers.threaded import createThreadedTransformer
1014
from laygo.transformers.transformer import Transformer
15+
from laygo.transformers.transformer import createTransformer
1116

1217
__all__ = [
1318
"Pipeline",
1419
"Transformer",
20+
"createTransformer",
21+
"ThreadedTransformer",
22+
"createThreadedTransformer",
1523
"ParallelTransformer",
24+
"createParallelTransformer",
1625
"HTTPTransformer",
26+
"createHTTPTransformer",
1727
"PipelineContext",
1828
"ErrorHandler",
1929
]

laygo/pipeline.py

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
# pipeline.py
2+
13
from collections.abc import Callable
24
from collections.abc import Iterable
35
from collections.abc import Iterator
46
import itertools
7+
import multiprocessing as mp
58
from typing import Any
69
from typing import TypeVar
710
from typing import overload
@@ -17,34 +20,79 @@
1720
class Pipeline[T]:
1821
"""
1922
Manages a data source and applies transformers to it.
20-
Provides terminal operations to consume the resulting data.
23+
Always uses a multiprocessing-safe shared context.
2124
"""
2225

2326
def __init__(self, *data: Iterable[T]):
2427
if len(data) == 0:
2528
raise ValueError("At least one data source must be provided to Pipeline.")
2629
self.data_source: Iterable[T] = itertools.chain.from_iterable(data) if len(data) > 1 else data[0]
2730
self.processed_data: Iterator = iter(self.data_source)
28-
self.ctx = PipelineContext()
31+
32+
# Always create a shared context with multiprocessing manager
33+
self._manager = mp.Manager()
34+
self.ctx = self._manager.dict()
35+
# Add a shared lock to the context for safe concurrent updates
36+
self.ctx["lock"] = self._manager.Lock()
37+
38+
# Store reference to original context for final synchronization
39+
self._original_context_ref: PipelineContext | None = None
40+
41+
def __del__(self):
42+
"""Clean up the multiprocessing manager when the pipeline is destroyed."""
43+
try:
44+
self._sync_context_back()
45+
self._manager.shutdown()
46+
except Exception:
47+
pass # Ignore errors during cleanup
2948

3049
def context(self, ctx: PipelineContext) -> "Pipeline[T]":
3150
"""
32-
Sets the context for the pipeline.
51+
Updates the pipeline context and stores a reference to the original context.
52+
When the pipeline finishes processing, the original context will be updated
53+
with the final pipeline context data.
3354
"""
34-
self.ctx = ctx
55+
# Store reference to the original context
56+
self._original_context_ref = ctx
57+
# Copy the context data to the pipeline's shared context
58+
self.ctx.update(ctx)
3559
return self
3660

61+
def _sync_context_back(self) -> None:
62+
"""
63+
Synchronize the final pipeline context back to the original context reference.
64+
This is called after processing is complete.
65+
"""
66+
if self._original_context_ref is not None:
67+
# Copy the final context state back to the original context reference
68+
final_context_state = dict(self.ctx)
69+
final_context_state.pop("lock", None) # Remove non-serializable lock
70+
self._original_context_ref.clear()
71+
self._original_context_ref.update(final_context_state)
72+
73+
def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
74+
"""
75+
Shorthand method to apply a transformation using a lambda function.
76+
Creates a Transformer under the hood and applies it to the pipeline.
77+
78+
Args:
79+
t: A callable that takes a transformer and returns a transformed transformer
80+
81+
Returns:
82+
A new Pipeline with the transformed data
83+
"""
84+
# Create a new transformer and apply the transformation function
85+
transformer = t(Transformer[T, T]())
86+
return self.apply(transformer)
87+
3788
@overload
3889
def apply[U](self, transformer: Transformer[T, U]) -> "Pipeline[U]": ...
3990

4091
@overload
4192
def apply[U](self, transformer: Callable[[Iterable[T]], Iterator[U]]) -> "Pipeline[U]": ...
4293

4394
@overload
44-
def apply[U](
45-
self,
46-
transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]],
47-
) -> "Pipeline[U]": ...
95+
def apply[U](self, transformer: Callable[[Iterable[T], PipelineContext], Iterator[U]]) -> "Pipeline[U]": ...
4896

4997
def apply[U](
5098
self,
@@ -53,42 +101,26 @@ def apply[U](
53101
| Callable[[Iterable[T], PipelineContext], Iterator[U]],
54102
) -> "Pipeline[U]":
55103
"""
56-
Applies a transformer to the current data source.
104+
Applies a transformer to the current data source. The pipeline's
105+
managed context is passed down.
57106
"""
58-
59107
match transformer:
60108
case Transformer():
61-
# If a Transformer instance is provided, use its __call__ method
109+
# The transformer is called with self.ctx, which is the
110+
# shared mp.Manager.dict proxy when inside a 'with' block.
62111
self.processed_data = transformer(self.processed_data, self.ctx) # type: ignore
63112
case _ if callable(transformer):
64-
# If a callable function is provided, call it with the current data and context
65-
66113
if is_context_aware(transformer):
67114
processed_transformer = transformer
68115
else:
69116
processed_transformer = lambda data, ctx: transformer(data) # type: ignore # noqa: E731
70-
71117
self.processed_data = processed_transformer(self.processed_data, self.ctx) # type: ignore
72118
case _:
73119
raise TypeError("Transformer must be a Transformer instance or a callable function")
74120

75121
return self # type: ignore
76122

77-
def transform[U](self, t: Callable[[Transformer[T, T]], Transformer[T, U]]) -> "Pipeline[U]":
78-
"""
79-
Shorthand method to apply a transformation using a lambda function.
80-
Creates a Transformer under the hood and applies it to the pipeline.
81-
82-
Args:
83-
t: A callable that takes a transformer and returns a transformed transformer
84-
85-
Returns:
86-
A new Pipeline with the transformed data
87-
"""
88-
# Create a new transformer and apply the transformation function
89-
transformer = t(Transformer[T, T]())
90-
return self.apply(transformer)
91-
123+
# ... The rest of the Pipeline class (transform, __iter__, to_list, etc.) remains unchanged ...
92124
def __iter__(self) -> Iterator[T]:
93125
"""Allows the pipeline to be iterated over."""
94126
yield from self.processed_data

laygo/transformers/http.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,30 @@
3030
U = TypeVar("U")
3131

3232

33+
def createHTTPTransformer[T](
34+
_type_hint: type[T],
35+
base_url: str,
36+
chunk_size: int | None = None,
37+
endpoint: str | None = None,
38+
max_workers: int = 4,
39+
) -> "HTTPTransformer[T, T]":
40+
"""Create a new identity parallel transformer with an explicit type hint."""
41+
return HTTPTransformer[T, T](
42+
base_url=base_url,
43+
endpoint=endpoint,
44+
max_workers=max_workers,
45+
chunk_size=chunk_size,
46+
)
47+
48+
3349
class HTTPTransformer(Transformer[In, Out]):
3450
"""
3551
A self-sufficient, chainable transformer that manages its own
3652
distributed execution and worker endpoint definition.
3753
"""
3854

39-
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
40-
super().__init__()
55+
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8, chunk_size: int | None = None):
56+
super().__init__(chunk_size=chunk_size)
4157
self.base_url = base_url.rstrip("/")
4258
self.endpoint = endpoint
4359
self.max_workers = max_workers

0 commit comments

Comments
 (0)