Skip to content

Commit f39d43d

Browse files
authored
feat: ✨ Pipeline
1 parent 0a040ab commit f39d43d

18 files changed

Lines changed: 354 additions & 207 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@ pip install python-cq
2121
## Resources
2222

2323
* [**Writing Application Layer**](https://github.com/100nm/python-cq/tree/prod/documentation/writing-application-layer.md)
24+
* [**Pipeline**](https://github.com/100nm/python-cq/tree/prod/documentation/pipeline.md)
2425
* [**FastAPI Example**](https://github.com/100nm/python-cq/tree/prod/documentation/fastapi-example.md)

conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import pytest
44

55
from cq import Bus
6-
from cq._core.bus import SimpleBus
6+
from cq._core.dispatcher.bus import SimpleBus
77
from tests.helpers.history import HistoryMiddleware
88

99

cq/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1-
from ._core.bus import Bus
2-
from ._core.command import Command, CommandBus, command_handler, find_command_bus
1+
from ._core.command import (
2+
AnyCommandBus,
3+
Command,
4+
CommandBus,
5+
command_handler,
6+
find_command_bus,
7+
)
8+
from ._core.dispatcher.bus import Bus
9+
from ._core.dispatcher.pipe import Pipe
310
from ._core.dto import DTO
411
from ._core.event import Event, EventBus, event_handler, find_event_bus
512
from ._core.middleware import Middleware, MiddlewareResult
613
from ._core.query import Query, QueryBus, find_query_bus, query_handler
714

815
__all__ = (
16+
"AnyCommandBus",
917
"Bus",
1018
"Command",
1119
"CommandBus",
@@ -14,6 +22,7 @@
1422
"EventBus",
1523
"Middleware",
1624
"MiddlewareResult",
25+
"Pipe",
1726
"Query",
1827
"QueryBus",
1928
"command_handler",

cq/_core/command.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import injection
55

6-
from cq._core.bus import Bus, SimpleBus, SubscriberDecorator
6+
from cq._core.dispatcher.bus import Bus, SimpleBus, SubscriberDecorator
77
from cq._core.dto import DTO
88

99

@@ -12,6 +12,7 @@ class Command(DTO, ABC):
1212

1313

1414
type CommandBus[T] = Bus[Command, T]
15+
AnyCommandBus = CommandBus[Any]
1516
command_handler: SubscriberDecorator[Command, Any] = SubscriberDecorator(CommandBus)
1617

1718
injection.set_constant(SimpleBus(), CommandBus, alias=True)

cq/_core/dispatcher/__init__.py

Whitespace-only changes.

cq/_core/dispatcher/base.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import asyncio
2+
from abc import ABC, abstractmethod
3+
from collections.abc import Awaitable
4+
from typing import Callable, Protocol, Self, runtime_checkable
5+
6+
from cq._core.middleware import Middleware, MiddlewareGroup
7+
8+
9+
@runtime_checkable
10+
class Dispatcher[I, O](Protocol):
11+
__slots__ = ()
12+
13+
@abstractmethod
14+
async def dispatch(self, input_value: I, /) -> O:
15+
raise NotImplementedError
16+
17+
@abstractmethod
18+
def dispatch_no_wait(self, first_input_value: I, /, *input_values: I) -> None:
19+
raise NotImplementedError
20+
21+
@abstractmethod
22+
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
23+
raise NotImplementedError
24+
25+
26+
class BaseDispatcher[I, O](Dispatcher[I, O], ABC):
27+
__slots__ = ("__middleware_group",)
28+
29+
__middleware_group: MiddlewareGroup[[I], O]
30+
31+
def __init__(self) -> None:
32+
self.__middleware_group = MiddlewareGroup()
33+
34+
def dispatch_no_wait(self, first_input_value: I, /, *input_values: I) -> None:
35+
asyncio.gather(
36+
*(
37+
self.dispatch(input_value)
38+
for input_value in (first_input_value, *input_values)
39+
),
40+
return_exceptions=True,
41+
)
42+
43+
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
44+
self.__middleware_group.add(*middlewares)
45+
return self
46+
47+
async def _invoke_with_middlewares(
48+
self,
49+
handler: Callable[[I], Awaitable[O]],
50+
input_value: I,
51+
/,
52+
) -> O:
53+
return await self.__middleware_group.invoke(handler, input_value)
Lines changed: 13 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import asyncio
2-
from abc import ABC, abstractmethod
2+
from abc import abstractmethod
33
from collections import defaultdict
44
from collections.abc import Callable
55
from dataclasses import dataclass, field
@@ -9,7 +9,7 @@
99

1010
import injection
1111

12-
from cq._core.middleware import Middleware, MiddlewareGroup
12+
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
1313

1414
type HandlerType[**P, T] = type[Handler[P, T]]
1515
type HandlerFactory[**P, T] = Callable[..., Handler[P, T]]
@@ -27,30 +27,13 @@ async def handle(self, *args: P.args, **kwargs: P.kwargs) -> T:
2727

2828

2929
@runtime_checkable
30-
class Bus[I, O](Protocol):
30+
class Bus[I, O](Dispatcher[I, O], Protocol):
3131
__slots__ = ()
3232

33-
@abstractmethod
34-
async def dispatch(self, input_value: I, /) -> O:
35-
raise NotImplementedError
36-
37-
def dispatch_no_wait(self, first_input_value: I, /, *input_values: I) -> None:
38-
asyncio.gather(
39-
*(
40-
self.dispatch(input_value)
41-
for input_value in (first_input_value, *input_values)
42-
),
43-
return_exceptions=True,
44-
)
45-
4633
@abstractmethod
4734
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
4835
raise NotImplementedError
4936

50-
@abstractmethod
51-
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
52-
raise NotImplementedError
53-
5437

5538
@dataclass(eq=False, frozen=True, slots=True)
5639
class SubscriberDecorator[I, O]:
@@ -81,23 +64,7 @@ def __find_bus(self) -> Bus[I, O]:
8164
return self.injection_module.find_instance(self.bus_type)
8265

8366

84-
class _BaseBus[I, O](Bus[I, O], ABC):
85-
__slots__ = ("__middleware_group",)
86-
87-
__middleware_group: MiddlewareGroup[[I], O]
88-
89-
def __init__(self) -> None:
90-
self.__middleware_group = MiddlewareGroup()
91-
92-
def add_middlewares(self, *middlewares: Middleware[[I], O]) -> Self:
93-
self.__middleware_group.add(*middlewares)
94-
return self
95-
96-
async def _invoke(self, handler: Handler[[I], O], input_value: I, /) -> O:
97-
return await self.__middleware_group.invoke(handler.handle, input_value)
98-
99-
100-
class SimpleBus[I, O](_BaseBus[I, O]):
67+
class SimpleBus[I, O](BaseDispatcher[I, O], Bus[I, O]):
10168
__slots__ = ("__handlers",)
10269

10370
__handlers: dict[type[I], HandlerFactory[[I], O]]
@@ -114,7 +81,10 @@ async def dispatch(self, input_value: I, /) -> O:
11481
except KeyError:
11582
return NotImplemented
11683

117-
return await self._invoke(handler_factory(), input_value)
84+
return await self._invoke_with_middlewares(
85+
handler_factory().handle,
86+
input_value,
87+
)
11888

11989
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
12090
if input_type in self.__handlers:
@@ -126,7 +96,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
12696
return self
12797

12898

129-
class TaskBus[I](_BaseBus[I, None]):
99+
class TaskBus[I](BaseDispatcher[I, None], Bus[I, None]):
130100
__slots__ = ("__handlers",)
131101

132102
__handlers: dict[type[I], list[HandlerFactory[[I], None]]]
@@ -143,7 +113,10 @@ async def dispatch(self, input_value: I, /) -> None:
143113

144114
await asyncio.gather(
145115
*(
146-
self._invoke(handler_factory(), input_value)
116+
self._invoke_with_middlewares(
117+
handler_factory().handle,
118+
input_value,
119+
)
147120
for handler_factory in handler_factories
148121
)
149122
)

cq/_core/dispatcher/pipe.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from collections.abc import Callable
2+
from dataclasses import dataclass, field
3+
from typing import Any, Awaitable
4+
5+
from cq._core.dispatcher.base import BaseDispatcher, Dispatcher
6+
7+
type PipeConverter[I, O] = Callable[[O], Awaitable[I]]
8+
9+
10+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
11+
class PipeStep[I, O]:
12+
converter: PipeConverter[I, O]
13+
dispatcher: Dispatcher[I, Any] | None = field(default=None)
14+
15+
16+
class Pipe[I, O](BaseDispatcher[I, O]):
17+
__slots__ = ("__dispatcher", "__steps")
18+
19+
__dispatcher: Dispatcher[Any, Any]
20+
__steps: list[PipeStep[Any, Any]]
21+
22+
def __init__(self, dispatcher: Dispatcher[Any, Any]) -> None:
23+
super().__init__()
24+
self.__dispatcher = dispatcher
25+
self.__steps = []
26+
27+
def step[T]( # type: ignore[no-untyped-def]
28+
self,
29+
wrapped: PipeConverter[T, Any] | None = None,
30+
/,
31+
*,
32+
dispatcher: Dispatcher[T, Any] | None = None,
33+
):
34+
def decorator(wp): # type: ignore[no-untyped-def]
35+
step = PipeStep(wp, dispatcher)
36+
self.__steps.append(step)
37+
return wp
38+
39+
return decorator(wrapped) if wrapped else decorator
40+
41+
async def dispatch(self, input_value: I, /) -> O:
42+
return await self._invoke_with_middlewares(self.__execute, input_value)
43+
44+
async def __execute(self, input_value: I) -> O:
45+
dispatcher = self.__dispatcher
46+
47+
for step in self.__steps:
48+
output_value = await dispatcher.dispatch(input_value)
49+
input_value = await step.converter(output_value)
50+
dispatcher = step.dispatcher or self.__dispatcher
51+
52+
return await dispatcher.dispatch(input_value)

cq/_core/event.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import injection
44

5-
from cq._core.bus import Bus, SubscriberDecorator, TaskBus
5+
from cq._core.dispatcher.bus import Bus, SubscriberDecorator, TaskBus
66
from cq._core.dto import DTO
77

88

cq/_core/middleware.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,9 @@
66
type Middleware[**P, T] = Callable[P, MiddlewareResult[T]]
77

88

9-
@dataclass(eq=False, frozen=True, slots=True)
9+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
1010
class MiddlewareGroup[**P, T]:
11-
__middlewares: list[Middleware[P, T]] = field(
12-
default_factory=list,
13-
init=False,
14-
repr=False,
15-
)
11+
__middlewares: list[Middleware[P, T]] = field(default_factory=list, init=False)
1612

1713
@property
1814
def __stack(self) -> Iterator[Middleware[P, T]]:
@@ -51,6 +47,7 @@ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
5147
await generator.athrow(exc)
5248
else:
5349
await generator.asend(value)
50+
break
5451

5552
except StopAsyncIteration:
5653
...

0 commit comments

Comments
 (0)