Skip to content

Commit ce55401

Browse files
authored
Merge pull request #1 from ringoldsdev/feat/20250717/http-transformer
Feat/20250717/http transformer
2 parents 28cdb49 + bf0af9c commit ce55401

8 files changed

Lines changed: 311 additions & 16 deletions

File tree

.github/workflows/publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ jobs:
3838
3939
- name: Update version in __init__.py
4040
run: |
41-
sed -i 's|VERSION_NUMBER|${{ steps.version.outputs.VERSION }}|g' laygo/__init__.py
41+
sed -i 's|"0.1.0"|"${{ steps.version.outputs.VERSION }}"|g' pyproject.toml
4242
echo "Updated version to ${{ steps.version.outputs.VERSION }}"
43-
cat laygo/__init__.py
43+
cat pyproject.toml
4444
4545
- name: Build package
4646
run: uv build

laygo/__init__.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
Laygo - A lightweight Python library for building resilient, in-memory data pipelines
33
"""
44

5-
__version__ = "VERSION_NUMBER"
6-
7-
from .errors import ErrorHandler
8-
from .helpers import PipelineContext
9-
from .pipeline import Pipeline
10-
from .transformers.parallel import ParallelTransformer
11-
from .transformers.transformer import Transformer
5+
from laygo.errors import ErrorHandler
6+
from laygo.helpers import PipelineContext
7+
from laygo.pipeline import Pipeline
8+
from laygo.transformers.http import HTTPTransformer
9+
from laygo.transformers.parallel import ParallelTransformer
10+
from laygo.transformers.transformer import Transformer
1211

1312
__all__ = [
1413
"Pipeline",
1514
"Transformer",
1615
"ParallelTransformer",
16+
"HTTPTransformer",
1717
"PipelineContext",
1818
"ErrorHandler",
1919
]

laygo/pipeline.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88

99
from laygo.helpers import PipelineContext
1010
from laygo.helpers import is_context_aware
11-
12-
from .transformers.transformer import Transformer
11+
from laygo.transformers.transformer import Transformer
1312

1413
T = TypeVar("T")
1514
PipelineFunction = Callable[[T], Any]

laygo/transformers/http.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
"""
2+
The final, self-sufficient DistributedTransformer with corrected typing.
3+
"""
4+
5+
from collections.abc import Callable
6+
from collections.abc import Iterable
7+
from collections.abc import Iterator
8+
from concurrent.futures import FIRST_COMPLETED
9+
from concurrent.futures import ThreadPoolExecutor
10+
from concurrent.futures import wait
11+
import hashlib
12+
import itertools
13+
import pickle
14+
from typing import Any
15+
from typing import TypeVar
16+
from typing import Union
17+
from typing import overload
18+
19+
import requests
20+
21+
from laygo.errors import ErrorHandler
22+
from laygo.helpers import PipelineContext
23+
from laygo.transformers.transformer import ChunkErrorHandler
24+
from laygo.transformers.transformer import PipelineFunction
25+
from laygo.transformers.transformer import Transformer
26+
27+
In = TypeVar("In")
28+
Out = TypeVar("Out")
29+
T = TypeVar("T")
30+
U = TypeVar("U")
31+
32+
33+
class HTTPTransformer(Transformer[In, Out]):
34+
"""
35+
A self-sufficient, chainable transformer that manages its own
36+
distributed execution and worker endpoint definition.
37+
"""
38+
39+
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
40+
super().__init__()
41+
self.base_url = base_url.rstrip("/")
42+
self.endpoint = endpoint
43+
self.max_workers = max_workers
44+
self.session = requests.Session()
45+
self._worker_url: str | None = None
46+
47+
def _finalize_config(self):
48+
"""Determines the final worker URL, generating one if needed."""
49+
if hasattr(self, "_worker_url") and self._worker_url:
50+
return
51+
52+
if self.endpoint:
53+
path = self.endpoint
54+
else:
55+
if not self.transformer:
56+
raise ValueError("Cannot determine endpoint for an empty transformer.")
57+
serialized_logic = pickle.dumps(self.transformer)
58+
hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16]
59+
path = f"/autogen/{hash_id}"
60+
61+
self.endpoint = path.lstrip("/")
62+
self._worker_url = f"{self.base_url}/{self.endpoint}"
63+
64+
# --- Original HTTPTransformer Methods ---
65+
66+
def __call__(self, data: Iterable[In], context: PipelineContext | None = None) -> Iterator[Out]:
67+
"""CLIENT-SIDE: Called by the Pipeline to start distributed processing."""
68+
self._finalize_config()
69+
70+
def process_chunk(chunk: list) -> list:
71+
"""Target for a thread: sends one chunk to the worker."""
72+
try:
73+
response = self.session.post(
74+
self._worker_url, # type: ignore
75+
json=chunk,
76+
timeout=300,
77+
)
78+
response.raise_for_status()
79+
return response.json()
80+
except requests.RequestException as e:
81+
print(f"Error calling worker {self._worker_url}: {e}")
82+
return []
83+
84+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
85+
chunk_iterator = self._chunk_generator(data)
86+
futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)}
87+
while futures:
88+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
89+
for future in done:
90+
yield from future.result()
91+
try:
92+
new_chunk = next(chunk_iterator)
93+
futures.add(executor.submit(process_chunk, new_chunk))
94+
except StopIteration:
95+
continue
96+
97+
def get_route(self):
98+
"""
99+
Function that returns the route for the worker.
100+
This is used to register the worker in a Flask app or similar.
101+
"""
102+
self._finalize_config()
103+
104+
def worker_view_func(chunk: list, context: PipelineContext):
105+
"""The actual worker logic for this transformer."""
106+
return self.transformer(chunk, context)
107+
108+
return (f"/{self.endpoint}", worker_view_func)
109+
110+
# --- Overridden Chaining Methods to Preserve Type ---
111+
112+
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "HTTPTransformer[In, Out]":
113+
super().on_error(handler)
114+
return self
115+
116+
def map[U](self, function: PipelineFunction[Out, U]) -> "HTTPTransformer[In, U]":
117+
super().map(function)
118+
return self # type: ignore
119+
120+
def filter(self, predicate: PipelineFunction[Out, bool]) -> "HTTPTransformer[In, Out]":
121+
super().filter(predicate)
122+
return self
123+
124+
@overload
125+
def flatten[T](self: "HTTPTransformer[In, list[T]]") -> "HTTPTransformer[In, T]": ...
126+
@overload
127+
def flatten[T](self: "HTTPTransformer[In, tuple[T, ...]]") -> "HTTPTransformer[In, T]": ...
128+
@overload
129+
def flatten[T](self: "HTTPTransformer[In, set[T]]") -> "HTTPTransformer[In, T]": ...
130+
# Forgive me for I have sinned, but this is necessary to avoid type errors
131+
# Sinec I'm setting self type in the parent class, overriding it isn't allowed
132+
def flatten[T]( # type: ignore
133+
self: Union["HTTPTransformer[In, list[T]]", "HTTPTransformer[In, tuple[T, ...]]", "HTTPTransformer[In, set[T]]"],
134+
) -> "HTTPTransformer[In, T]":
135+
super().flatten() # type: ignore
136+
return self # type: ignore
137+
138+
def tap(self, function: PipelineFunction[Out, Any]) -> "HTTPTransformer[In, Out]":
139+
super().tap(function)
140+
return self
141+
142+
def apply[T](self, t: Callable[["HTTPTransformer[In, Out]"], "Transformer[In, T]"]) -> "HTTPTransformer[In, T]":
143+
# Note: The type hint for `t` is slightly adjusted to reflect it receives an HTTPTransformer
144+
super().apply(t) # type: ignore
145+
return self # type: ignore
146+
147+
def catch[U](
148+
self,
149+
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
150+
on_error: ChunkErrorHandler[Out, U] | None = None,
151+
) -> "HTTPTransformer[In, U]":
152+
super().catch(sub_pipeline_builder, on_error)
153+
return self # type: ignore
154+
155+
def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "HTTPTransformer[In, Out]":
156+
super().short_circuit(function)
157+
return self

laygo/transformers/parallel.py

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Parallel transformer implementation using multiple threads."""
22

33
from collections import deque
4+
from collections.abc import Callable
45
from collections.abc import Iterable
56
from collections.abc import Iterator
67
from concurrent.futures import FIRST_COMPLETED
@@ -11,11 +12,17 @@
1112
from functools import partial
1213
import itertools
1314
import threading
15+
from typing import Any
16+
from typing import Union
17+
from typing import overload
1418

15-
from .transformer import DEFAULT_CHUNK_SIZE
16-
from .transformer import InternalTransformer
17-
from .transformer import PipelineContext
18-
from .transformer import Transformer
19+
from laygo.errors import ErrorHandler
20+
from laygo.helpers import PipelineContext
21+
from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE
22+
from laygo.transformers.transformer import ChunkErrorHandler
23+
from laygo.transformers.transformer import InternalTransformer
24+
from laygo.transformers.transformer import PipelineFunction
25+
from laygo.transformers.transformer import Transformer
1926

2027

2128
class ParallelPipelineContextType(PipelineContext):
@@ -142,3 +149,53 @@ def result_iterator_manager() -> Iterator[Out]:
142149
yield from result_chunk
143150

144151
return result_iterator_manager()
152+
153+
# --- Overridden Chaining Methods to Preserve Type ---
154+
155+
def on_error(self, handler: ChunkErrorHandler[In, Out] | ErrorHandler) -> "ParallelTransformer[In, Out]":
156+
super().on_error(handler)
157+
return self
158+
159+
def map[U](self, function: PipelineFunction[Out, U]) -> "ParallelTransformer[In, U]":
160+
super().map(function)
161+
return self # type: ignore
162+
163+
def filter(self, predicate: PipelineFunction[Out, bool]) -> "ParallelTransformer[In, Out]":
164+
super().filter(predicate)
165+
return self
166+
167+
@overload
168+
def flatten[T](self: "ParallelTransformer[In, list[T]]") -> "ParallelTransformer[In, T]": ...
169+
@overload
170+
def flatten[T](self: "ParallelTransformer[In, tuple[T, ...]]") -> "ParallelTransformer[In, T]": ...
171+
@overload
172+
def flatten[T](self: "ParallelTransformer[In, set[T]]") -> "ParallelTransformer[In, T]": ...
173+
def flatten[T]( # type: ignore
174+
self: Union[
175+
"ParallelTransformer[In, list[T]]", "ParallelTransformer[In, tuple[T, ...]]", "ParallelTransformer[In, set[T]]"
176+
],
177+
) -> "ParallelTransformer[In, T]":
178+
super().flatten() # type: ignore
179+
return self # type: ignore
180+
181+
def tap(self, function: PipelineFunction[Out, Any]) -> "ParallelTransformer[In, Out]":
182+
super().tap(function)
183+
return self
184+
185+
def apply[T](
186+
self, t: Callable[["ParallelTransformer[In, Out]"], "Transformer[In, T]"]
187+
) -> "ParallelTransformer[In, T]":
188+
super().apply(t) # type: ignore
189+
return self # type: ignore
190+
191+
def catch[U](
192+
self,
193+
sub_pipeline_builder: Callable[[Transformer[Out, Out]], Transformer[Out, U]],
194+
on_error: ChunkErrorHandler[Out, U] | None = None,
195+
) -> "ParallelTransformer[In, U]":
196+
super().catch(sub_pipeline_builder, on_error)
197+
return self # type: ignore
198+
199+
def short_circuit(self, function: Callable[[PipelineContext], bool | None]) -> "ParallelTransformer[In, Out]":
200+
super().short_circuit(function)
201+
return self

pyproject.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ path = "laygo/__init__.py"
77

88
[project]
99
name = "laygo"
10-
dynamic = ["version"]
10+
version = "0.1.0"
1111
description = "A lightweight Python library for building resilient, in-memory data pipelines with elegant, chainable syntax"
1212
readme = "README.md"
1313
requires-python = ">=3.12"
@@ -29,17 +29,21 @@ classifiers = [
2929
"Typing :: Typed",
3030
]
3131

32+
dependencies = ["requests>=2.32"]
33+
3234
[project.urls]
3335
Homepage = "https://github.com/ringoldsdev/laygo-python"
3436
Documentation = "https://github.com/ringoldsdev/laygo-python/wiki"
3537
Repository = "https://github.com/ringoldsdev/laygo-python.git"
3638
Issues = "https://github.com/ringoldsdev/laygo-python/issues"
3739

40+
3841
[project.optional-dependencies]
3942
dev = [
4043
"pytest>=7.0.0",
4144
"ruff>=0.1.0",
4245
"twine>=4.0.0",
46+
"requests-mock>=1.12.1",
4347
]
4448

4549
[tool.ruff]

tests/test_http_transformer.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Assuming the classes from your latest example are in a file named `pipeline_lib.py`
2+
# This includes Pipeline, Transformer, and your HTTPTransformer.
3+
import requests_mock
4+
5+
from laygo import HTTPTransformer
6+
from laygo import Pipeline
7+
from laygo import PipelineContext
8+
9+
10+
class TestHTTPTransformer:
11+
"""
12+
Test suite for the HTTPTransformer class.
13+
"""
14+
15+
def test_distributed_transformer_with_mock(self):
16+
"""
17+
Tests the HTTPTransformer by mocking the worker endpoint.
18+
This test validates that the client-side of the transformer correctly
19+
calls the endpoint and processes the response from the (mocked) worker.
20+
"""
21+
# 1. Define the transformer's properties
22+
base_url = "http://mock-worker.com"
23+
endpoint = "/process/data"
24+
worker_url = f"{base_url}{endpoint}"
25+
26+
# 2. Define the transformer and its logic using the chainable API.
27+
# This single instance holds both the client and server logic.
28+
http_transformer = (
29+
HTTPTransformer(base_url=base_url, endpoint=endpoint).map(lambda x: x * 2).filter(lambda x: x > 10)
30+
)
31+
32+
# Set a small chunk_size to ensure the client makes multiple requests
33+
http_transformer.chunk_size = 4
34+
35+
# 3. Get the worker's logic from the transformer itself
36+
# The `get_route` method provides the exact function the worker would run.
37+
_, worker_view_func = http_transformer.get_route()
38+
39+
# 4. Configure the mock endpoint to use the real worker logic
40+
def mock_response(request, context):
41+
"""The behavior of the mocked Flask endpoint."""
42+
input_chunk = request.json()
43+
# Call the actual view function logic obtained from get_route()
44+
# We pass None for the context as it's not used in this simple case.
45+
output_chunk = worker_view_func(chunk=input_chunk, context=PipelineContext())
46+
return output_chunk
47+
48+
# Use requests_mock context manager
49+
with requests_mock.Mocker() as m:
50+
m.post(worker_url, json=mock_response)
51+
52+
# 5. Run the standard Pipeline with the configured transformer
53+
initial_data = list(range(10)) # [0, 1, 2, ..., 9]
54+
pipeline = Pipeline(initial_data).apply(http_transformer)
55+
result = pipeline.to_list()
56+
57+
# 6. Assert the final result
58+
expected_result = [12, 14, 16, 18]
59+
assert sorted(result) == sorted(expected_result)

0 commit comments

Comments
 (0)