11"""
2- The final, self-sufficient DistributedTransformer.
2+ The final, self-sufficient DistributedTransformer with corrected typing .
33"""
44
5+ from collections .abc import Callable
56from collections .abc import Iterable
67from collections .abc import Iterator
78from concurrent .futures import FIRST_COMPLETED
1011import hashlib
1112import itertools
1213import pickle
14+ from typing import Any
15+ from typing import TypeVar
16+ from typing import Union
17+ from typing import overload
1318
1419import requests
1520
16- from laygo import PipelineContext
17- from laygo import Transformer
21+ from laygo .errors import ErrorHandler
22+ from laygo .helpers import PipelineContext
23+ from laygo .transformers .transformer import ChunkErrorHandler
24+ from laygo .transformers .transformer import PipelineFunction
25+ from laygo .transformers .transformer import Transformer
1826
27+ In = TypeVar ("In" )
28+ Out = TypeVar ("Out" )
29+ T = TypeVar ("T" )
30+ U = TypeVar ("U" )
1931
20- class HTTPTransformer (Transformer ):
32+
33+ class HTTPTransformer (Transformer [In , Out ]):
2134 """
2235 A self-sufficient, chainable transformer that manages its own
2336 distributed execution and worker endpoint definition.
@@ -29,32 +42,39 @@ def __init__(self, base_url: str, endpoint: str | None = None, max_workers: int
2942 self .endpoint = endpoint
3043 self .max_workers = max_workers
3144 self .session = requests .Session ()
32- self ._worker_url : str
45+ self ._worker_url : str | None = None
3346
3447 def _finalize_config (self ):
3548 """Determines the final worker URL, generating one if needed."""
36- if self ._worker_url :
49+ if hasattr ( self , "_worker_url" ) and self ._worker_url :
3750 return
3851
3952 if self .endpoint :
4053 path = self .endpoint
4154 else :
42- # Using pickle to serialize the function chain and hashing for a unique ID
55+ if not self .transformer :
56+ raise ValueError ("Cannot determine endpoint for an empty transformer." )
4357 serialized_logic = pickle .dumps (self .transformer )
4458 hash_id = hashlib .sha1 (serialized_logic ).hexdigest ()[:16 ]
4559 path = f"/autogen/{ hash_id } "
4660
4761 self .endpoint = path .lstrip ("/" )
4862 self ._worker_url = f"{ self .base_url } /{ self .endpoint } "
4963
50- def __call__ (self , data : Iterable , context = None ) -> Iterator :
64+ # --- Original HTTPTransformer Methods ---
65+
66+ def __call__ (self , data : Iterable [In ], context : PipelineContext | None = None ) -> Iterator [Out ]:
5167 """CLIENT-SIDE: Called by the Pipeline to start distributed processing."""
5268 self ._finalize_config ()
5369
5470 def process_chunk (chunk : list ) -> list :
5571 """Target for a thread: sends one chunk to the worker."""
5672 try :
57- response = self .session .post (self ._worker_url , json = chunk , timeout = 300 )
73+ response = self .session .post (
74+ self ._worker_url , # type: ignore
75+ json = chunk ,
76+ timeout = 300 ,
77+ )
5878 response .raise_for_status ()
5979 return response .json ()
6080 except requests .RequestException as e :
@@ -78,14 +98,58 @@ def get_route(self):
7898 """
7999 Function that returns the route for the worker.
80100 This is used to register the worker in a Flask app or similar.
81-
82- Returns:
83- A tuple containing the endpoint and the worker function.
84101 """
85102 self ._finalize_config ()
86103
87104 def worker_view_func (chunk : list , context : PipelineContext ):
88- """The actual Flask view function for this transformer's logic ."""
105+ """The actual worker logic for this transformer."""
89106 return self .transformer (chunk , context )
90107
91108 return (f"/{ self .endpoint } " , worker_view_func )
109+
110+ # --- Overridden Chaining Methods to Preserve Type ---
111+
112+ def on_error (self , handler : ChunkErrorHandler [In , Out ] | ErrorHandler ) -> "HTTPTransformer[In, Out]" :
113+ super ().on_error (handler )
114+ return self
115+
116+ def map [U ](self , function : PipelineFunction [Out , U ]) -> "HTTPTransformer[In, U]" :
117+ super ().map (function )
118+ return self # type: ignore
119+
120+ def filter (self , predicate : PipelineFunction [Out , bool ]) -> "HTTPTransformer[In, Out]" :
121+ super ().filter (predicate )
122+ return self
123+
124+ @overload
125+ def flatten [T ](self : "HTTPTransformer[In, list[T]]" ) -> "HTTPTransformer[In, T]" : ...
126+ @overload
127+ def flatten [T ](self : "HTTPTransformer[In, tuple[T, ...]]" ) -> "HTTPTransformer[In, T]" : ...
128+ @overload
129+ def flatten [T ](self : "HTTPTransformer[In, set[T]]" ) -> "HTTPTransformer[In, T]" : ...
130+ def flatten [T ](
131+ self : Union ["HTTPTransformer[In, list[T]]" , "HTTPTransformer[In, tuple[T, ...]]" , "HTTPTransformer[In, set[T]]" ],
132+ ) -> "HTTPTransformer[In, T]" :
133+ super ().flatten ()
134+ return self # type: ignore
135+
136+ def tap (self , function : PipelineFunction [Out , Any ]) -> "HTTPTransformer[In, Out]" :
137+ super ().tap (function )
138+ return self
139+
140+ def apply [T ](self , t : Callable [["HTTPTransformer[In, Out]" ], "Transformer[In, T]" ]) -> "HTTPTransformer[In, T]" :
141+ # Note: The type hint for `t` is slightly adjusted to reflect it receives an HTTPTransformer
142+ super ().apply (t ) # type: ignore
143+ return self # type: ignore
144+
145+ def catch [U ](
146+ self ,
147+ sub_pipeline_builder : Callable [[Transformer [Out , Out ]], Transformer [Out , U ]],
148+ on_error : ChunkErrorHandler [Out , U ] | None = None ,
149+ ) -> "HTTPTransformer[In, U]" :
150+ super ().catch (sub_pipeline_builder , on_error )
151+ return self # type: ignore
152+
153+ def short_circuit (self , function : Callable [[PipelineContext ], bool | None ]) -> "HTTPTransformer[In, Out]" :
154+ super ().short_circuit (function )
155+ return self
0 commit comments