Skip to content

Commit 6e0bd88

Browse files
committed
feat: implemented http transformer allowing distributed processing
1 parent 28cdb49 commit 6e0bd88

9 files changed

Lines changed: 118 additions & 20 deletions

File tree

.github/workflows/publish.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ jobs:
3838
3939
- name: Update version in __init__.py
4040
run: |
41-
sed -i 's|VERSION_NUMBER|${{ steps.version.outputs.VERSION }}|g' laygo/__init__.py
41+
sed -i 's|"0.1.0"|"${{ steps.version.outputs.VERSION }}"|g' pyproject.toml
4242
echo "Updated version to ${{ steps.version.outputs.VERSION }}"
43-
cat laygo/__init__.py
43+
cat pyproject.toml
4444
4545
- name: Build package
4646
run: uv build

laygo/__init__.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@
22
Laygo - A lightweight Python library for building resilient, in-memory data pipelines
33
"""
44

5-
__version__ = "VERSION_NUMBER"
6-
7-
from .errors import ErrorHandler
8-
from .helpers import PipelineContext
9-
from .pipeline import Pipeline
10-
from .transformers.parallel import ParallelTransformer
11-
from .transformers.transformer import Transformer
5+
from laygo.errors import ErrorHandler
6+
from laygo.helpers import PipelineContext
7+
from laygo.pipeline import Pipeline
8+
from laygo.transformers.http import HTTPTransformer
9+
from laygo.transformers.parallel import ParallelTransformer
10+
from laygo.transformers.transformer import Transformer
1211

1312
__all__ = [
1413
"Pipeline",
1514
"Transformer",
1615
"ParallelTransformer",
16+
"HTTPTransformer",
1717
"PipelineContext",
1818
"ErrorHandler",
1919
]

laygo/errors.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from collections.abc import Callable
22

3-
from laygo.helpers import PipelineContext
3+
from laygo import PipelineContext
44

55
ChunkErrorHandler = Callable[[list, Exception, PipelineContext], None]
66

laygo/pipeline.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
from typing import TypeVar
77
from typing import overload
88

9-
from laygo.helpers import PipelineContext
9+
from laygo import PipelineContext
10+
from laygo import Transformer
1011
from laygo.helpers import is_context_aware
1112

12-
from .transformers.transformer import Transformer
13-
1413
T = TypeVar("T")
1514
PipelineFunction = Callable[[T], Any]
1615

laygo/transformers/http.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""
2+
The final, self-sufficient DistributedTransformer.
3+
"""
4+
5+
from collections.abc import Iterable
6+
from collections.abc import Iterator
7+
from concurrent.futures import FIRST_COMPLETED
8+
from concurrent.futures import ThreadPoolExecutor
9+
from concurrent.futures import wait
10+
import hashlib
11+
import itertools
12+
import pickle
13+
14+
import requests
15+
16+
from laygo import PipelineContext
17+
from laygo import Transformer
18+
19+
20+
class HTTPTransformer(Transformer):
21+
"""
22+
A self-sufficient, chainable transformer that manages its own
23+
distributed execution and worker endpoint definition.
24+
"""
25+
26+
def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int = 8):
27+
super().__init__()
28+
self.base_url = base_url.rstrip("/")
29+
self.endpoint = endpoint
30+
self.max_workers = max_workers
31+
self.session = requests.Session()
32+
self._worker_url: str
33+
34+
def _finalize_config(self):
35+
"""Determines the final worker URL, generating one if needed."""
36+
if self._worker_url:
37+
return
38+
39+
if self.endpoint:
40+
path = self.endpoint
41+
else:
42+
# Using pickle to serialize the function chain and hashing for a unique ID
43+
serialized_logic = pickle.dumps(self.transformer)
44+
hash_id = hashlib.sha1(serialized_logic).hexdigest()[:16]
45+
path = f"/autogen/{hash_id}"
46+
47+
self.endpoint = path.lstrip("/")
48+
self._worker_url = f"{self.base_url}/{self.endpoint}"
49+
50+
def __call__(self, data: Iterable, context=None) -> Iterator:
51+
"""CLIENT-SIDE: Called by the Pipeline to start distributed processing."""
52+
self._finalize_config()
53+
54+
def process_chunk(chunk: list) -> list:
55+
"""Target for a thread: sends one chunk to the worker."""
56+
try:
57+
response = self.session.post(self._worker_url, json=chunk, timeout=300)
58+
response.raise_for_status()
59+
return response.json()
60+
except requests.RequestException as e:
61+
print(f"Error calling worker {self._worker_url}: {e}")
62+
return []
63+
64+
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
65+
chunk_iterator = self._chunk_generator(data)
66+
futures = {executor.submit(process_chunk, chunk) for chunk in itertools.islice(chunk_iterator, self.max_workers)}
67+
while futures:
68+
done, futures = wait(futures, return_when=FIRST_COMPLETED)
69+
for future in done:
70+
yield from future.result()
71+
try:
72+
new_chunk = next(chunk_iterator)
73+
futures.add(executor.submit(process_chunk, new_chunk))
74+
except StopIteration:
75+
continue
76+
77+
def get_route(self):
78+
"""
79+
Function that returns the route for the worker.
80+
This is used to register the worker in a Flask app or similar.
81+
82+
Returns:
83+
A tuple containing the endpoint and the worker function.
84+
"""
85+
self._finalize_config()
86+
87+
def worker_view_func(chunk: list, context: PipelineContext):
88+
"""The actual Flask view function for this transformer's logic."""
89+
return self.transformer(chunk, context)
90+
91+
return (f"/{self.endpoint}", worker_view_func)

laygo/transformers/parallel.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
import itertools
1313
import threading
1414

15-
from .transformer import DEFAULT_CHUNK_SIZE
16-
from .transformer import InternalTransformer
17-
from .transformer import PipelineContext
18-
from .transformer import Transformer
15+
from laygo import PipelineContext
16+
from laygo import Transformer
17+
from laygo.transformers.transformer import DEFAULT_CHUNK_SIZE
18+
from laygo.transformers.transformer import InternalTransformer
1919

2020

2121
class ParallelPipelineContextType(PipelineContext):

laygo/transformers/transformer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from typing import Union
1010
from typing import overload
1111

12-
from laygo.errors import ErrorHandler
13-
from laygo.helpers import PipelineContext
12+
from laygo import ErrorHandler
13+
from laygo import PipelineContext
1414
from laygo.helpers import is_context_aware
1515
from laygo.helpers import is_context_aware_reduce
1616

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ path = "laygo/__init__.py"
77

88
[project]
99
name = "laygo"
10-
dynamic = ["version"]
10+
version = "0.1.0"
1111
description = "A lightweight Python library for building resilient, in-memory data pipelines with elegant, chainable syntax"
1212
readme = "README.md"
1313
requires-python = ">=3.12"
@@ -29,12 +29,15 @@ classifiers = [
2929
"Typing :: Typed",
3030
]
3131

32+
dependencies = ["requests>=2.32"]
33+
3234
[project.urls]
3335
Homepage = "https://github.com/ringoldsdev/laygo-python"
3436
Documentation = "https://github.com/ringoldsdev/laygo-python/wiki"
3537
Repository = "https://github.com/ringoldsdev/laygo-python.git"
3638
Issues = "https://github.com/ringoldsdev/laygo-python/issues"
3739

40+
3841
[project.optional-dependencies]
3942
dev = [
4043
"pytest>=7.0.0",

uv.lock

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)