Skip to content

Commit bc88f21

Browse files
authored
feat: ✨ Add bus listeners
1 parent 87447de commit bc88f21

6 files changed

Lines changed: 86 additions & 44 deletions

File tree

cq/__init__.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
QueryBus,
1313
command_handler,
1414
event_handler,
15-
find_command_bus,
16-
find_event_bus,
17-
find_query_bus,
15+
get_command_bus,
16+
get_event_bus,
17+
get_query_bus,
1818
query_handler,
1919
)
2020
from ._core.middleware import Middleware, MiddlewareResult
@@ -35,8 +35,8 @@
3535
"QueryBus",
3636
"command_handler",
3737
"event_handler",
38-
"find_command_bus",
39-
"find_event_bus",
40-
"find_query_bus",
38+
"get_command_bus",
39+
"get_event_bus",
40+
"get_query_bus",
4141
"query_handler",
4242
)

cq/_core/dispatcher/bus.py

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import asyncio
2-
from abc import abstractmethod
2+
from abc import ABC, abstractmethod
33
from collections import defaultdict
4-
from collections.abc import Callable
4+
from collections.abc import Awaitable, Callable
55
from dataclasses import dataclass, field
66
from inspect import isclass
77
from types import GenericAlias
8-
from typing import Protocol, Self, TypeAliasType, runtime_checkable
8+
from typing import Any, Protocol, Self, TypeAliasType, runtime_checkable
99

1010
import injection
1111

@@ -14,6 +14,8 @@
1414
type HandlerType[**P, T] = type[Handler[P, T]]
1515
type HandlerFactory[**P, T] = Callable[..., Handler[P, T]]
1616

17+
type Listener[T] = Callable[[T], Awaitable[Any]]
18+
1719
type BusType[I, O] = type[Bus[I, O]]
1820

1921

@@ -34,6 +36,10 @@ class Bus[I, O](Dispatcher[I, O], Protocol):
3436
def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Self:
3537
raise NotImplementedError
3638

39+
@abstractmethod
40+
def add_listeners(self, *listeners: Listener[I]) -> Self:
41+
raise NotImplementedError
42+
3743

3844
@dataclass(eq=False, frozen=True, slots=True)
3945
class SubscriberDecorator[I, O]:
@@ -59,7 +65,24 @@ def __find_bus(self) -> Bus[I, O]:
5965
return self.injection_module.find_instance(self.bus_type)
6066

6167

62-
class SimpleBus[I, O](BaseDispatcher[I, O], Bus[I, O]):
68+
class BaseBus[I, O](BaseDispatcher[I, O], Bus[I, O], ABC):
69+
__slots__ = ("__listeners",)
70+
71+
__listeners: list[Listener[I]]
72+
73+
def __init__(self) -> None:
74+
super().__init__()
75+
self.__listeners = []
76+
77+
def add_listeners(self, *listeners: Listener[I]) -> Self:
78+
self.__listeners.extend(listeners)
79+
return self
80+
81+
async def _trigger_listeners(self, input_value: I, /) -> None:
82+
await asyncio.gather(*(listener(input_value) for listener in self.__listeners))
83+
84+
85+
class SimpleBus[I, O](BaseBus[I, O]):
6386
__slots__ = ("__handlers",)
6487

6588
__handlers: dict[type[I], HandlerFactory[[I], O]]
@@ -69,6 +92,7 @@ def __init__(self) -> None:
6992
self.__handlers = {}
7093

7194
async def dispatch(self, input_value: I, /) -> O:
95+
await self._trigger_listeners(input_value)
7296
input_type = type(input_value)
7397

7498
try:
@@ -91,7 +115,7 @@ def subscribe(self, input_type: type[I], factory: HandlerFactory[[I], O]) -> Sel
91115
return self
92116

93117

94-
class TaskBus[I](BaseDispatcher[I, None], Bus[I, None]):
118+
class TaskBus[I](BaseBus[I, None]):
95119
__slots__ = ("__handlers",)
96120

97121
__handlers: dict[type[I], list[HandlerFactory[[I], None]]]
@@ -101,6 +125,7 @@ def __init__(self) -> None:
101125
self.__handlers = defaultdict(list)
102126

103127
async def dispatch(self, input_value: I, /) -> None:
128+
await self._trigger_listeners(input_value)
104129
handler_factories = self.__handlers.get(type(input_value))
105130

106131
if not handler_factories:

cq/_core/message.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ class Query(Message, ABC):
3939
injection.set_constant(SimpleBus(), QueryBus, alias=True)
4040

4141

42-
def find_command_bus[T]() -> CommandBus[T]:
43-
return injection.find_instance(CommandBus)
42+
@injection.inject
43+
def get_command_bus[T](bus: CommandBus[T] = NotImplemented, /) -> CommandBus[T]:
44+
return bus
4445

4546

46-
def find_event_bus() -> EventBus:
47-
return injection.find_instance(EventBus)
47+
@injection.inject
48+
def get_event_bus(bus: EventBus = NotImplemented, /) -> EventBus:
49+
return bus
4850

4951

50-
def find_query_bus[T]() -> QueryBus[T]:
51-
return injection.find_instance(QueryBus)
52+
@injection.inject
53+
def get_query_bus[T](bus: QueryBus[T] = NotImplemented, /) -> QueryBus[T]:
54+
return bus

documentation/fastapi-example.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from collections.abc import AsyncIterator
55
from contextlib import asynccontextmanager
66

7-
from cq import Command, CommandBus, command_handler, find_command_bus
7+
from cq import Command, CommandBus, command_handler, get_command_bus
88
from fastapi import FastAPI, status
99
from injection import injectable
1010
from injection.integrations.fastapi import Inject
@@ -31,7 +31,7 @@ class ExampleHandler:
3131

3232
@asynccontextmanager
3333
async def lifespan(_: FastAPI) -> AsyncIterator[None]:
34-
find_command_bus().add_middlewares(...) # Add middlewares here
34+
get_command_bus().add_middlewares(...) # Add middlewares here
3535
yield
3636

3737
app = FastAPI(lifespan=lifespan)

poetry.lock

Lines changed: 25 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/core/dispatcher/test_bus.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,20 @@ async def test_dispatch_with_unknown_input_type_return_not_implemented(
4444
) -> None:
4545
assert await bus.dispatch("hello") is NotImplemented
4646

47+
async def test_dispatch_with_listeners_return_none(
48+
self,
49+
bus: SimpleBus[object, Any],
50+
) -> None:
51+
called = False
52+
53+
async def listener(_: Any) -> None:
54+
nonlocal called
55+
called = True
56+
57+
bus.add_listeners(listener)
58+
assert await bus.dispatch("hello") is NotImplemented
59+
assert called
60+
4761
async def test_dispatch_no_wait_with_success_return_none(
4862
self,
4963
bus: SimpleBus[Any, Any],

0 commit comments

Comments
 (0)