Skip to content

Commit 56be212

Browse files
authored
refactoring: ♻️ Bus
1 parent e620472 commit 56be212

9 files changed

Lines changed: 46 additions & 39 deletions

File tree

cq/_core/dispatcher/bus.py

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from collections.abc import Awaitable, Callable
2+
from collections.abc import Awaitable, Callable, Iterator
33
from typing import Any, Protocol, Self, runtime_checkable
44

55
import anyio
@@ -30,69 +30,64 @@ def add_listeners(self, *listeners: Listener[I]) -> Self:
3030

3131

3232
class BaseBus[I, O](BaseDispatcher[I, O], Bus[I, O], ABC):
33-
__slots__ = ("__listeners",)
33+
__slots__ = ("__listeners", "__manager")
3434

3535
__listeners: list[Listener[I]]
36+
__manager: HandlerManager[I, O]
3637

37-
def __init__(self) -> None:
38+
def __init__(self, manager: HandlerManager[I, O]) -> None:
3839
super().__init__()
3940
self.__listeners = []
41+
self.__manager = manager
4042

4143
def add_listeners(self, *listeners: Listener[I]) -> Self:
4244
self.__listeners.extend(listeners)
4345
return self
4446

47+
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
48+
self.__manager.subscribe(input_type, factory)
49+
return self
50+
51+
def _handlers_from(
52+
self,
53+
input_type: type[I],
54+
) -> Iterator[Callable[[I], Awaitable[O]]]:
55+
return self.__manager.handlers_from(input_type)
56+
4557
def _trigger_listeners(self, input_value: I, /, task_group: TaskGroup) -> None:
4658
for listener in self.__listeners:
4759
task_group.start_soon(listener, input_value)
4860

4961

5062
class SimpleBus[I, O](BaseBus[I, O]):
51-
__slots__ = ("__manager",)
52-
53-
__manager: HandlerManager[I, O]
63+
__slots__ = ()
5464

5565
def __init__(self, manager: HandlerManager[I, O] | None = None) -> None:
56-
super().__init__()
57-
self.__manager = manager or SingleHandlerManager()
66+
super().__init__(manager or SingleHandlerManager())
5867

5968
async def dispatch(self, input_value: I, /) -> O:
6069
async with anyio.create_task_group() as task_group:
6170
self._trigger_listeners(input_value, task_group)
6271

63-
for handler in self.__manager.handlers_from(type(input_value)):
72+
for handler in self._handlers_from(type(input_value)):
6473
return await self._invoke_with_middlewares(handler, input_value)
6574

6675
return NotImplemented
6776

68-
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
69-
self.__manager.subscribe(input_type, factory)
70-
return self
71-
7277

7378
class TaskBus[I](BaseBus[I, None]):
74-
__slots__ = ("__manager",)
75-
76-
__manager: HandlerManager[I, None]
79+
__slots__ = ()
7780

7881
def __init__(self, manager: HandlerManager[I, None] | None = None) -> None:
79-
super().__init__()
80-
self.__manager = manager or MultipleHandlerManager()
82+
super().__init__(manager or MultipleHandlerManager())
8183

8284
async def dispatch(self, input_value: I, /) -> None:
8385
async with anyio.create_task_group() as task_group:
8486
self._trigger_listeners(input_value, task_group)
85-
for handler in self.__manager.handlers_from(type(input_value)):
87+
88+
for handler in self._handlers_from(type(input_value)):
8689
task_group.start_soon(
8790
self._invoke_with_middlewares,
8891
handler,
8992
input_value,
9093
)
91-
92-
def subscribe(
93-
self,
94-
input_type: type[I],
95-
factory: HandlerFactory[[I], None],
96-
) -> Self:
97-
self.__manager.subscribe(input_type, factory)
98-
return self

cq/_core/handler.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
3737
raise NotImplementedError
3838

3939

40-
@dataclass(eq=False, frozen=True, slots=True)
40+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
4141
class MultipleHandlerManager[I, O](HandlerManager[I, O]):
4242
__factories: dict[type[I], list[HandlerFactory[[I], O]]] = field(
4343
default_factory=partial(defaultdict, list),
@@ -57,7 +57,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
5757
return self
5858

5959

60-
@dataclass(eq=False, frozen=True, slots=True)
60+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
6161
class SingleHandlerManager[I, O](HandlerManager[I, O]):
6262
__factories: dict[type[I], HandlerFactory[[I], O]] = field(
6363
default_factory=dict,
@@ -83,7 +83,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
8383
return self
8484

8585

86-
@dataclass(eq=False, frozen=True, slots=True)
86+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
8787
class HandlerDecorator[I, O]:
8888
manager: HandlerManager[I, O]
8989
injection_module: injection.Module = field(default_factory=injection.mod)
@@ -101,9 +101,9 @@ def decorator(wrapped: type[Handler[[I], O]]) -> type[Handler[[I], O]]:
101101

102102

103103
def _make_handle_function[I, O](
104-
handler_factory: HandlerFactory[[I], O],
104+
factory: HandlerFactory[[I], O],
105105
) -> Callable[[I], Awaitable[O]]:
106-
return partial(__handle, factory=handler_factory)
106+
return partial(__handle, factory=factory)
107107

108108

109109
async def __handle[I, O](input_value: I, factory: HandlerFactory[[I], O]) -> O:

cq/_core/message.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,9 @@ class Query(Message, ABC):
5555
mode="fallback",
5656
)
5757
def new_command_bus[T]() -> CommandBus[T]:
58-
return SimpleBus(command_handler.manager).add_middlewares(
59-
InjectionScopeMiddleware(CQScope.ON_COMMAND),
60-
)
58+
bus = SimpleBus(command_handler.manager)
59+
bus.add_middlewares(InjectionScopeMiddleware(CQScope.ON_COMMAND))
60+
return bus
6161

6262

6363
@injection.singleton(

cq/_core/related_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def add(self, *events: Event) -> None:
1919
raise NotImplementedError
2020

2121

22-
@dataclass(frozen=True, slots=True)
22+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
2323
class _RelatedEvents(RelatedEvents):
2424
items: list[Event] = field(default_factory=list)
2525

cq/middlewares/retry.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
class RetryMiddleware:
1212
__slots__ = ("__delay", "__exceptions", "__retry")
1313

14+
__delay: float
15+
__exceptions: tuple[type[BaseException], ...]
16+
__retry: int
17+
1418
def __init__(
1519
self,
1620
retry: int,

cq/middlewares/scope.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
class InjectionScopeMiddleware:
1414
__slots__ = ("__scope_name",)
1515

16+
__scope_name: str
17+
1618
def __init__(self, scope_name: str) -> None:
1719
self.__scope_name = scope_name
1820

documentation/fastapi-example.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
# [FastAPI](https://github.com/fastapi/fastapi) Example
22

3+
The advantage of `python-cq` is that it can be easily integrated into FastAPI.
4+
5+
Here's an example of its integration:
6+
37
```python
4-
from cq import Command, CommandBus, command_handler, new_command_bus
8+
from cq import Command, CommandBus, DTO, command_handler, new_command_bus
59
from fastapi import FastAPI, status
610
from injection import injectable, singleton
711
from injection.integrations.fastapi import Inject
@@ -21,7 +25,7 @@ def override_command_bus_recipe() -> CommandBus:
2125

2226
class ExampleCommand(Command): ...
2327

24-
type HandlerReturnType = ...
28+
class HandlerReturnType(DTO): ...
2529

2630
@command_handler(ExampleCommand)
2731
class ExampleHandler:

documentation/pipeline.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Pipeline
22

3+
Pipelines are designed to execute several commands one after the other, while encapsulating them in middleware.
4+
35
Example:
46

57
```python

documentation/writing-application-layer.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ class SomeMiddleware:
189189

190190
### Add middleware
191191

192-
To add a middleware: you need to override the bus recipe.
192+
To add a middleware, you need to override the bus recipe.
193193

194194
```python
195195
from cq import CommandBus, new_command_bus

0 commit comments

Comments
 (0)