Skip to content

Commit 3ccb290

Browse files
author
remimd
committed
refactor: Extract DI logic into DIAdapter protocol
1 parent 7120a5e commit 3ccb290

File tree

21 files changed

+712
-805
lines changed

21 files changed

+712
-805
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
before_commit: lint mypy pytest
22

33
install:
4-
uv sync
4+
uv sync --all-extras
55

66
update:
77
uv lock --upgrade
8-
uv sync
8+
uv sync --all-extras
99

1010
lint:
1111
uv run ruff format

conftest.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,16 @@
44
import pytest
55
from injection.testing import load_test_profile, set_test_constant
66

7-
from cq import Bus, CommandBus, EventBus, QueryBus
7+
from cq import (
8+
Bus,
9+
CommandBus,
10+
EventBus,
11+
QueryBus,
12+
new_command_bus,
13+
new_event_bus,
14+
new_query_bus,
15+
)
816
from cq._core.dispatcher.bus import SimpleBus
9-
from cq._core.message import new_command_bus, new_event_bus, new_query_bus
1017
from tests.helpers.history import HistoryMiddleware
1118

1219

cq/__init__.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
from ._core.dispatcher.base import DeferredDispatcher, Dispatcher
1+
from ._core.cq import CQ
2+
from ._core.di import DIAdapter
3+
from ._core.di import NoDI as _NoDI
4+
from ._core.dispatcher.base import Dispatcher
25
from ._core.dispatcher.bus import Bus
3-
from ._core.dispatcher.lazy import LazyDispatcher
46
from ._core.dispatcher.pipe import ContextPipeline, Pipe
57
from ._core.message import (
68
AnyCommandBus,
@@ -10,31 +12,24 @@
1012
EventBus,
1113
Query,
1214
QueryBus,
13-
command_handler,
14-
event_handler,
15-
new_command_bus,
16-
new_event_bus,
17-
new_query_bus,
18-
query_handler,
1915
)
2016
from ._core.middleware import Middleware, MiddlewareResult, resolve_handler_source
21-
from ._core.pipetools import ContextCommandPipeline
22-
from ._core.related_events import RelatedEvents
23-
from ._core.scope import CQScope
17+
from ._core.pipetools import ContextCommandPipeline as _ContextCommandPipeline
18+
from ._core.related_events import AnyIORelatedEvents, RelatedEvents
2419

2520
__all__ = (
2621
"AnyCommandBus",
22+
"AnyIORelatedEvents",
2723
"Bus",
28-
"CQScope",
24+
"CQ",
2925
"Command",
3026
"CommandBus",
3127
"ContextCommandPipeline",
3228
"ContextPipeline",
33-
"DeferredDispatcher",
29+
"DIAdapter",
3430
"Dispatcher",
3531
"Event",
3632
"EventBus",
37-
"LazyDispatcher",
3833
"Middleware",
3934
"MiddlewareResult",
4035
"Pipe",
@@ -49,3 +44,32 @@
4944
"query_handler",
5045
"resolve_handler_source",
5146
)
47+
48+
try:
49+
from cq.ext.injection import InjectionAdapter as _InjectionAdapter
50+
51+
except ImportError: # pragma: no cover
52+
_default = CQ(_NoDI())
53+
54+
else:
55+
_default = CQ(_InjectionAdapter())
56+
57+
_default.register_defaults()
58+
59+
command_handler = _default.command_handler
60+
event_handler = _default.event_handler
61+
query_handler = _default.query_handler
62+
63+
new_command_bus = _default.new_command_bus
64+
new_event_bus = _default.new_event_bus
65+
new_query_bus = _default.new_query_bus
66+
67+
68+
class ContextCommandPipeline[C: Command](_ContextCommandPipeline[C]):
69+
__slots__ = ()
70+
71+
def __init__(self, di: DIAdapter = _default.di) -> None:
72+
super().__init__(di)
73+
74+
75+
del _default

cq/_core/cq.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from typing import Any, Self
2+
3+
from cq._core.di import DIAdapter
4+
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
5+
from cq._core.handler import (
6+
HandlerDecorator,
7+
HandlerRegistry,
8+
MultipleHandlerRegistry,
9+
SingleHandlerRegistry,
10+
)
11+
from cq._core.message import Command, Event, Query
12+
from cq._core.middlewares.scope import CommandDispatchScopeMiddleware
13+
14+
15+
class CQ:
16+
__slots__ = ("__command_registry", "__di", "__event_registry", "__query_registry")
17+
18+
__command_registry: HandlerRegistry[Command, Any]
19+
__di: DIAdapter
20+
__event_registry: HandlerRegistry[Event, Any]
21+
__query_registry: HandlerRegistry[Query, Any]
22+
23+
def __init__(self, di: DIAdapter, /) -> None:
24+
self.__di = di
25+
self.__command_registry = SingleHandlerRegistry()
26+
self.__event_registry = MultipleHandlerRegistry()
27+
self.__query_registry = SingleHandlerRegistry()
28+
29+
@property
30+
def di(self) -> DIAdapter:
31+
return self.__di
32+
33+
@property
34+
def command_handler(self) -> HandlerDecorator[Command, Any]:
35+
return HandlerDecorator(self.__command_registry, self.__di)
36+
37+
@property
38+
def event_handler(self) -> HandlerDecorator[Event, Any]:
39+
return HandlerDecorator(self.__event_registry, self.__di)
40+
41+
@property
42+
def query_handler(self) -> HandlerDecorator[Query, Any]:
43+
return HandlerDecorator(self.__query_registry, self.__di)
44+
45+
def new_command_bus(self) -> Bus[Command, Any]:
46+
bus = SimpleBus(self.__command_registry)
47+
command_middleware = CommandDispatchScopeMiddleware(self.__di)
48+
bus.add_middlewares(command_middleware)
49+
return bus
50+
51+
def new_event_bus(self) -> Bus[Event, None]:
52+
return TaskBus(self.__event_registry)
53+
54+
def new_query_bus(self) -> Bus[Query, Any]:
55+
return SimpleBus(self.__query_registry)
56+
57+
def register_defaults(self) -> Self:
58+
self.__di.register_defaults(
59+
self.new_command_bus,
60+
self.new_event_bus,
61+
self.new_query_bus,
62+
)
63+
return self

cq/_core/di.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
from __future__ import annotations
2+
3+
from abc import abstractmethod
4+
from collections.abc import Awaitable, Callable
5+
from contextlib import nullcontext
6+
from typing import TYPE_CHECKING, Any, AsyncContextManager, Protocol, runtime_checkable
7+
8+
if TYPE_CHECKING:
9+
from cq._core.message import CommandBus, EventBus, QueryBus
10+
11+
12+
@runtime_checkable
13+
class DIAdapter(Protocol):
14+
__slots__ = ()
15+
16+
@abstractmethod
17+
def command_scope(self) -> AsyncContextManager[None]:
18+
raise NotImplementedError
19+
20+
@abstractmethod
21+
def lazy[T](self, tp: type[T]) -> Callable[[], Awaitable[T]]:
22+
raise NotImplementedError
23+
24+
def register_defaults(
25+
self,
26+
command_bus: Callable[..., CommandBus[Any]],
27+
event_bus: Callable[..., EventBus],
28+
query_bus: Callable[..., QueryBus[Any]],
29+
) -> None:
30+
return
31+
32+
@abstractmethod
33+
def wire[T](self, tp: type[T]) -> Callable[..., Awaitable[T]]:
34+
raise NotImplementedError
35+
36+
37+
class NoDI(DIAdapter):
38+
__slots__ = ()
39+
40+
def command_scope(self) -> AsyncContextManager[None]:
41+
return nullcontext()
42+
43+
def lazy[T](self, tp: type[T], /) -> Callable[[], Awaitable[T]]:
44+
tp_str = getattr(tp, "__name__", str(tp))
45+
raise RuntimeError(
46+
f"Can't lazily resolve {tp_str}: no DI container configured."
47+
)
48+
49+
def wire[T](self, tp: type[T], /) -> Callable[..., Awaitable[T]]:
50+
async def factory() -> T:
51+
return tp()
52+
53+
return factory

cq/_core/dispatcher/base.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,6 @@ async def dispatch(self, input_value: I, /) -> O:
1515
raise NotImplementedError
1616

1717

18-
@runtime_checkable
19-
class DeferredDispatcher[I](Protocol):
20-
__slots__ = ()
21-
22-
@abstractmethod
23-
async def defer(self, input_value: I, /) -> None:
24-
raise NotImplementedError
25-
26-
2718
class BaseDispatcher[I, O](Dispatcher[I, O], ABC):
2819
__slots__ = ("__middleware_group",)
2920

cq/_core/dispatcher/lazy.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
1-
from collections.abc import Awaitable
1+
from collections.abc import Awaitable, Callable
22
from types import GenericAlias
33
from typing import TypeAliasType
44

5-
import injection
6-
5+
from cq._core.di import DIAdapter
76
from cq._core.dispatcher.base import Dispatcher
87

98

109
class LazyDispatcher[I, O](Dispatcher[I, O]):
11-
__slots__ = ("__value",)
10+
__slots__ = ("__resolve",)
1211

13-
__value: Awaitable[Dispatcher[I, O]]
12+
__resolve: Callable[[], Awaitable[Dispatcher[I, O]]]
1413

1514
def __init__(
1615
self,
1716
dispatcher_type: type[Dispatcher[I, O]] | TypeAliasType | GenericAlias,
1817
/,
19-
*,
20-
injection_module: injection.Module | None = None,
21-
threadsafe: bool | None = None,
18+
di: DIAdapter,
2219
) -> None:
23-
module = injection_module or injection.mod()
24-
self.__value = module.aget_lazy_instance(dispatcher_type, threadsafe=threadsafe)
20+
self.__resolve = di.lazy(dispatcher_type) # type: ignore[arg-type]
2521

2622
async def dispatch(self, input_value: I, /) -> O:
27-
dispatcher = await self.__value
23+
dispatcher = await self.__resolve()
2824
return await dispatcher.dispatch(input_value)

cq/_core/handler.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
from inspect import signature as inspect_signature
88
from typing import TYPE_CHECKING, Any, Protocol, Self, overload, runtime_checkable
99

10-
import injection
1110
from type_analyzer import MatchingTypesConfig, iter_matching_types, matching_types
1211

1312
from cq._core.common.typing import Decorator
13+
from cq._core.di import DIAdapter, NoDI
1414

1515
type HandlerType[**P, T] = type[Handler[P, T]]
1616
type HandlerFactory[**P, T] = Callable[..., Awaitable[Handler[P, T]]]
@@ -126,7 +126,7 @@ def subscribe(
126126
@dataclass(repr=False, eq=False, frozen=True, slots=True)
127127
class HandlerDecorator[I, O]:
128128
registry: HandlerRegistry[I, O]
129-
injection_module: injection.Module = field(default_factory=injection.mod)
129+
di: DIAdapter = field(default_factory=NoDI)
130130

131131
if TYPE_CHECKING: # pragma: no cover
132132

@@ -137,7 +137,6 @@ def __call__(
137137
/,
138138
*,
139139
fail_silently: bool = ...,
140-
threadsafe: bool | None = ...,
141140
) -> Decorator: ...
142141

143142
@overload
@@ -147,7 +146,6 @@ def __call__[T](
147146
/,
148147
*,
149148
fail_silently: bool = ...,
150-
threadsafe: bool | None = ...,
151149
) -> T: ...
152150

153151
@overload
@@ -157,7 +155,6 @@ def __call__(
157155
/,
158156
*,
159157
fail_silently: bool = ...,
160-
threadsafe: bool | None = ...,
161158
) -> Decorator: ...
162159

163160
def __call__[T](
@@ -166,7 +163,6 @@ def __call__[T](
166163
/,
167164
*,
168165
fail_silently: bool = False,
169-
threadsafe: bool | None = None,
170166
) -> Any:
171167
if (
172168
input_or_handler_type is not None
@@ -176,14 +172,12 @@ def __call__[T](
176172
return self.__decorator(
177173
input_or_handler_type,
178174
fail_silently=fail_silently,
179-
threadsafe=threadsafe,
180175
)
181176

182177
return partial(
183178
self.__decorator,
184179
input_type=input_or_handler_type, # type: ignore[arg-type]
185180
fail_silently=fail_silently,
186-
threadsafe=threadsafe,
187181
)
188182

189183
def __decorator(
@@ -193,9 +187,8 @@ def __decorator(
193187
*,
194188
input_type: type[I] | None = None,
195189
fail_silently: bool = False,
196-
threadsafe: bool | None = None,
197190
) -> HandlerType[[I], O]:
198-
factory = self.injection_module.make_async_factory(wrapped, threadsafe)
191+
factory = self.di.wire(wrapped)
199192
input_type = input_type or _resolve_input_type(wrapped)
200193
self.registry.subscribe(input_type, factory, wrapped, fail_silently)
201194
return wrapped

0 commit comments

Comments
 (0)