Skip to content

Commit 55c5a67

Browse files
committed
feat: implemented execution strategy pattern
1 parent 7d3a99d commit 55c5a67

17 files changed

+503
-700
lines changed

laygo/__init__.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,19 @@
99
from laygo.pipeline import Pipeline
1010
from laygo.transformers.http import HTTPTransformer
1111
from laygo.transformers.http import createHTTPTransformer
12-
from laygo.transformers.parallel import ParallelTransformer
13-
from laygo.transformers.parallel import createParallelTransformer
14-
from laygo.transformers.threaded import ThreadedTransformer
15-
from laygo.transformers.threaded import createThreadedTransformer
1612
from laygo.transformers.transformer import Transformer
1713
from laygo.transformers.transformer import build_chunk_generator
18-
from laygo.transformers.transformer import createTransformer
14+
from laygo.transformers.transformer import create_process_transformer
15+
from laygo.transformers.transformer import create_threaded_transformer
16+
from laygo.transformers.transformer import create_transformer
1917
from laygo.transformers.transformer import passthrough_chunks
2018

2119
__all__ = [
2220
"Pipeline",
2321
"Transformer",
24-
"createTransformer",
25-
"ThreadedTransformer",
26-
"createThreadedTransformer",
27-
"ParallelTransformer",
28-
"createParallelTransformer",
22+
"create_transformer",
23+
"create_threaded_transformer",
24+
"create_process_transformer",
2925
"HTTPTransformer",
3026
"createHTTPTransformer",
3127
"PipelineContext",

laygo/pipeline.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,17 @@
2020
from laygo.context.types import IContextHandle
2121
from laygo.helpers import is_context_aware
2222
from laygo.transformers.transformer import Transformer
23+
from laygo.transformers.types import BaseTransformer
2324

2425
T = TypeVar("T")
2526
U = TypeVar("U")
2627
PipelineFunction = Callable[[T], Any]
2728

2829

2930
# This function must be defined at the top level of the module (e.g., after imports)
30-
def _branch_consumer_process[T](transformer: Transformer, queue: "Queue", context_handle: IContextHandle) -> list[Any]:
31+
def _branch_consumer_process[T](
32+
transformer: BaseTransformer, queue: "Queue", context_handle: IContextHandle
33+
) -> list[Any]:
3134
"""Entry point for a consumer process in parallel branching.
3235
3336
Reconstructs the necessary objects and runs a dedicated pipeline instance
@@ -457,7 +460,7 @@ def _producer_broadcast(
457460
@overload
458461
def branch(
459462
self,
460-
branches: Mapping[str, Transformer[T, Any]],
463+
branches: Mapping[str, BaseTransformer[T, Any]],
461464
*,
462465
executor_type: Literal["thread", "process"] = "thread",
463466
batch_size: int = 1000,
@@ -468,7 +471,7 @@ def branch(
468471
@overload
469472
def branch(
470473
self,
471-
branches: Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]],
474+
branches: Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]],
472475
*,
473476
executor_type: Literal["thread", "process"] = "thread",
474477
first_match: bool = True,
@@ -478,7 +481,7 @@ def branch(
478481

479482
def branch(
480483
self,
481-
branches: Mapping[str, Transformer[T, Any]] | Mapping[str, tuple[Transformer[T, Any], Callable[[T], bool]]],
484+
branches: Mapping[str, BaseTransformer[T, Any]] | Mapping[str, tuple[BaseTransformer[T, Any], Callable[[T], bool]]],
482485
*,
483486
executor_type: Literal["thread", "process"] = "thread",
484487
first_match: bool = True,
@@ -519,7 +522,7 @@ def branch(
519522
first_value = next(iter(branches.values()))
520523
is_conditional = isinstance(first_value, tuple)
521524

522-
parsed_branches: list[tuple[str, Transformer[T, Any], Callable[[T], bool]]]
525+
parsed_branches: list[tuple[str, BaseTransformer[T, Any], Callable[[T], bool]]]
523526
if is_conditional:
524527
parsed_branches = [(name, trans, cond) for name, (trans, cond) in branches.items()] # type: ignore
525528
else:
@@ -555,7 +558,7 @@ def _execute_branching_process(
555558
self,
556559
*,
557560
producer_fn: Callable,
558-
parsed_branches: list[tuple[str, Transformer, Callable]],
561+
parsed_branches: list[tuple[str, BaseTransformer, Callable]],
559562
batch_size: int,
560563
max_batch_buffer: int,
561564
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
@@ -629,7 +632,7 @@ def _execute_branching_thread(
629632
self,
630633
*,
631634
producer_fn: Callable,
632-
parsed_branches: list[tuple[str, Transformer, Callable]],
635+
parsed_branches: list[tuple[str, BaseTransformer, Callable]],
633636
batch_size: int,
634637
max_batch_buffer: int,
635638
) -> tuple[dict[str, list[Any]], dict[str, Any]]:
@@ -654,7 +657,7 @@ def _execute_branching_thread(
654657
final_results: dict[str, list[Any]] = {name: [] for name, _, _ in parsed_branches}
655658
queues = {name: Queue(maxsize=max_batch_buffer) for name, _, _ in parsed_branches}
656659

657-
def consumer(transformer: Transformer, queue: Queue, context_handle: IContextHandle) -> list[Any]:
660+
def consumer(transformer: BaseTransformer, queue: Queue, context_handle: IContextHandle) -> list[Any]:
658661
"""Consume batches from a queue and process them with a transformer.
659662
660663
Creates a mini-pipeline for the transformer and processes all

laygo/transformers/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
from laygo.transformers.transformer import ChunkErrorHandler
2323
from laygo.transformers.transformer import PipelineFunction
2424
from laygo.transformers.transformer import Transformer
25+
from laygo.transformers.types import BaseTransformer
2526

2627
In = TypeVar("In")
2728
Out = TypeVar("Out")
@@ -56,7 +57,7 @@ def createHTTPTransformer[T](
5657
)
5758

5859

59-
class HTTPTransformer(Transformer[In, Out]):
60+
class HTTPTransformer[In, Out](BaseTransformer[In, Out]):
6061
"""A self-sufficient, chainable transformer for distributed execution.
6162
6263
This transformer manages its own distributed execution by coordinating

laygo/transformers/parallel.py

Lines changed: 0 additions & 236 deletions
This file was deleted.

laygo/transformers/strategies/http.py

Whitespace-only changes.

0 commit comments

Comments
 (0)