Skip to content

Commit 891f363

Browse files
authored
refactor: Extract DI logic into DIAdapter protocol
1 parent 7120a5e commit 891f363

File tree

30 files changed

+870
-813
lines changed

30 files changed

+870
-813
lines changed

.github/actions/environment/action.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ runs:
1616

1717
- name: Install Dependencies
1818
shell: bash
19-
run: uv sync
19+
run: uv sync --all-extras

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
before_commit: lint mypy pytest
22

33
install:
4-
uv sync
4+
uv sync --all-extras
55

66
update:
77
uv lock --upgrade
8-
uv sync
8+
uv sync --all-extras
99

1010
lint:
1111
uv run ruff format

README.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,18 @@
77

88
Documentation: https://python-cq.remimd.dev
99

10-
Python package designed to organize your code following CQRS principles. It builds on top of [python-injection](https://github.com/100nm/python-injection) for dependency injection.
10+
**python-cq** is a Python package designed to organize your code following CQRS principles. It provides a `DIAdapter` protocol for dependency injection, with [python-injection](https://github.com/100nm/python-injection) as the default implementation available via the `[injection]` extra.
1111

1212
## Installation
1313

1414
⚠️ _Requires Python 3.12 or higher_
15+
16+
Without dependency injection:
1517
```bash
1618
pip install python-cq
1719
```
20+
21+
With [python-injection](https://github.com/100nm/python-injection) as the DI backend (recommended):
22+
```bash
23+
pip install "python-cq[injection]"
24+
```

conftest.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,16 @@
44
import pytest
55
from injection.testing import load_test_profile, set_test_constant
66

7-
from cq import Bus, CommandBus, EventBus, QueryBus
7+
from cq import (
8+
Bus,
9+
CommandBus,
10+
EventBus,
11+
QueryBus,
12+
new_command_bus,
13+
new_event_bus,
14+
new_query_bus,
15+
)
816
from cq._core.dispatcher.bus import SimpleBus
9-
from cq._core.message import new_command_bus, new_event_bus, new_query_bus
1017
from tests.helpers.history import HistoryMiddleware
1118

1219

cq/__init__.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
from ._core.dispatcher.base import DeferredDispatcher, Dispatcher
1+
from ._core.cq import CQ
2+
from ._core.di import DIAdapter
3+
from ._core.di import NoDI as _NoDI
4+
from ._core.dispatcher.base import Dispatcher
25
from ._core.dispatcher.bus import Bus
3-
from ._core.dispatcher.lazy import LazyDispatcher
46
from ._core.dispatcher.pipe import ContextPipeline, Pipe
57
from ._core.message import (
68
AnyCommandBus,
@@ -10,31 +12,24 @@
1012
EventBus,
1113
Query,
1214
QueryBus,
13-
command_handler,
14-
event_handler,
15-
new_command_bus,
16-
new_event_bus,
17-
new_query_bus,
18-
query_handler,
1915
)
2016
from ._core.middleware import Middleware, MiddlewareResult, resolve_handler_source
21-
from ._core.pipetools import ContextCommandPipeline
22-
from ._core.related_events import RelatedEvents
23-
from ._core.scope import CQScope
17+
from ._core.pipetools import ContextCommandPipeline as _ContextCommandPipeline
18+
from ._core.related_events import AnyIORelatedEvents, RelatedEvents
2419

2520
__all__ = (
2621
"AnyCommandBus",
22+
"AnyIORelatedEvents",
2723
"Bus",
28-
"CQScope",
24+
"CQ",
2925
"Command",
3026
"CommandBus",
3127
"ContextCommandPipeline",
3228
"ContextPipeline",
33-
"DeferredDispatcher",
29+
"DIAdapter",
3430
"Dispatcher",
3531
"Event",
3632
"EventBus",
37-
"LazyDispatcher",
3833
"Middleware",
3934
"MiddlewareResult",
4035
"Pipe",
@@ -49,3 +44,32 @@
4944
"query_handler",
5045
"resolve_handler_source",
5146
)
47+
48+
try:
49+
from cq.ext.injection import InjectionAdapter as _InjectionAdapter
50+
51+
except ImportError: # pragma: no cover
52+
_default = CQ(_NoDI())
53+
54+
else:
55+
_default = CQ(_InjectionAdapter())
56+
57+
_default.register_defaults()
58+
59+
command_handler = _default.command_handler
60+
event_handler = _default.event_handler
61+
query_handler = _default.query_handler
62+
63+
new_command_bus = _default.new_command_bus
64+
new_event_bus = _default.new_event_bus
65+
new_query_bus = _default.new_query_bus
66+
67+
68+
class ContextCommandPipeline[C: Command](_ContextCommandPipeline[C]):
69+
__slots__ = ()
70+
71+
def __init__(self, di: DIAdapter = _default.di) -> None:
72+
super().__init__(di)
73+
74+
75+
del _default

cq/_core/cq.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from typing import Any, Self
2+
3+
from cq._core.di import DIAdapter
4+
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
5+
from cq._core.handler import (
6+
HandlerDecorator,
7+
HandlerRegistry,
8+
MultipleHandlerRegistry,
9+
SingleHandlerRegistry,
10+
)
11+
from cq._core.message import Command, Event, Query
12+
from cq._core.middlewares.scope import CommandDispatchScopeMiddleware
13+
14+
15+
class CQ:
16+
__slots__ = ("__command_registry", "__di", "__event_registry", "__query_registry")
17+
18+
__command_registry: HandlerRegistry[Command, Any]
19+
__di: DIAdapter
20+
__event_registry: HandlerRegistry[Event, Any]
21+
__query_registry: HandlerRegistry[Query, Any]
22+
23+
def __init__(self, di: DIAdapter, /) -> None:
24+
self.__di = di
25+
self.__command_registry = SingleHandlerRegistry()
26+
self.__event_registry = MultipleHandlerRegistry()
27+
self.__query_registry = SingleHandlerRegistry()
28+
29+
@property
30+
def di(self) -> DIAdapter:
31+
return self.__di
32+
33+
@property
34+
def command_handler(self) -> HandlerDecorator[Command, Any]:
35+
return HandlerDecorator(self.__command_registry, self.__di)
36+
37+
@property
38+
def event_handler(self) -> HandlerDecorator[Event, Any]:
39+
return HandlerDecorator(self.__event_registry, self.__di)
40+
41+
@property
42+
def query_handler(self) -> HandlerDecorator[Query, Any]:
43+
return HandlerDecorator(self.__query_registry, self.__di)
44+
45+
def new_command_bus(self) -> Bus[Command, Any]:
46+
bus = SimpleBus(self.__command_registry)
47+
command_middleware = CommandDispatchScopeMiddleware(self.__di)
48+
bus.add_middlewares(command_middleware)
49+
return bus
50+
51+
def new_event_bus(self) -> Bus[Event, None]:
52+
return TaskBus(self.__event_registry)
53+
54+
def new_query_bus(self) -> Bus[Query, Any]:
55+
return SimpleBus(self.__query_registry)
56+
57+
def register_defaults(self) -> Self:
58+
self.__di.register_defaults(
59+
self.new_command_bus,
60+
self.new_event_bus,
61+
self.new_query_bus,
62+
)
63+
return self

cq/_core/di.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
from __future__ import annotations
2+
3+
from abc import abstractmethod
4+
from collections.abc import Awaitable, Callable
5+
from contextlib import nullcontext
6+
from typing import TYPE_CHECKING, Any, AsyncContextManager, Protocol, runtime_checkable
7+
8+
if TYPE_CHECKING:
9+
from cq import CommandBus, EventBus, QueryBus
10+
11+
12+
@runtime_checkable
13+
class DIAdapter(Protocol):
14+
"""
15+
Protocol for integrating a dependency injection container with python-cq.
16+
17+
Implement this protocol to connect your DI framework to the CQ buses.
18+
A concrete implementation (``InjectionAdapter``) is provided via the
19+
``python-cq[injection]`` extra for projects that use *python-injection*.
20+
"""
21+
22+
__slots__ = ()
23+
24+
@abstractmethod
25+
def command_scope(self) -> AsyncContextManager[None]:
26+
"""
27+
Return an async context manager that delimits the lifetime of a
28+
command dispatch.
29+
30+
**Responsibilities**
31+
32+
The scope must at minimum manage the lifecycle of a ``RelatedEvents``
33+
instance and register it so that it is resolvable via injection for
34+
the duration of the scope.
35+
36+
**Nested calls**
37+
38+
``command_scope`` is entered in two distinct situations:
39+
40+
1. Around a standard command dispatch (via
41+
``CommandDispatchScopeMiddleware``).
42+
2. Around each step of a ``ContextCommandPipeline``, which itself
43+
wraps a command dispatch.
44+
45+
This means two nested calls can occur for a single logical command.
46+
Implementations must detect re-entrant activation (e.g. a scope
47+
already active on the current task) and silently ignore the inner
48+
call instead of opening a second, conflicting scope.
49+
"""
50+
51+
raise NotImplementedError
52+
53+
@abstractmethod
54+
def lazy[T](self, tp: type[T]) -> Callable[[], Awaitable[T]]:
55+
"""
56+
Return a callable that resolves an instance of ``tp`` in two steps.
57+
58+
1. ``lazy(tp)`` obtains a resolver from the DI framework for ``tp``.
59+
2. Calling and awaiting the returned callable performs the actual
60+
resolution and returns the instance.
61+
"""
62+
63+
raise NotImplementedError
64+
65+
def register_defaults(
66+
self,
67+
command_bus: Callable[..., CommandBus[Any]],
68+
event_bus: Callable[..., EventBus],
69+
query_bus: Callable[..., QueryBus[Any]],
70+
) -> None:
71+
"""
72+
Register the CQ buses as default providers in the DI container.
73+
74+
Called once during setup so that handlers and middlewares can
75+
declare ``CommandBus``, ``EventBus``, or ``QueryBus`` as
76+
constructor dependencies and receive the configured instances.
77+
78+
The default implementation is a no-op for adapters that do not
79+
need automatic bus registration.
80+
"""
81+
82+
return
83+
84+
@abstractmethod
85+
def wire[T](self, tp: type[T]) -> Callable[..., Awaitable[T]]:
86+
"""
87+
Return an async factory that instantiates ``tp`` with injected
88+
dependencies.
89+
90+
Used internally to build handler instances whose dependencies are
91+
resolved by the container.
92+
"""
93+
94+
raise NotImplementedError
95+
96+
97+
class NoDI(DIAdapter):
98+
__slots__ = ()
99+
100+
def command_scope(self) -> AsyncContextManager[None]:
101+
return nullcontext()
102+
103+
def lazy[T](self, tp: type[T], /) -> Callable[[], Awaitable[T]]:
104+
tp_str = getattr(tp, "__name__", str(tp))
105+
raise RuntimeError(
106+
f"Can't lazily resolve {tp_str}: no DI container configured."
107+
)
108+
109+
def wire[T](self, tp: type[T], /) -> Callable[..., Awaitable[T]]:
110+
async def factory() -> T:
111+
return tp()
112+
113+
return factory

cq/_core/dispatcher/base.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,6 @@ async def dispatch(self, input_value: I, /) -> O:
1515
raise NotImplementedError
1616

1717

18-
@runtime_checkable
19-
class DeferredDispatcher[I](Protocol):
20-
__slots__ = ()
21-
22-
@abstractmethod
23-
async def defer(self, input_value: I, /) -> None:
24-
raise NotImplementedError
25-
26-
2718
class BaseDispatcher[I, O](Dispatcher[I, O], ABC):
2819
__slots__ = ("__middleware_group",)
2920

cq/_core/dispatcher/lazy.py

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
1-
from collections.abc import Awaitable
1+
from collections.abc import Awaitable, Callable
22
from types import GenericAlias
33
from typing import TypeAliasType
44

5-
import injection
6-
5+
from cq._core.di import DIAdapter
76
from cq._core.dispatcher.base import Dispatcher
87

98

109
class LazyDispatcher[I, O](Dispatcher[I, O]):
11-
__slots__ = ("__value",)
10+
__slots__ = ("__resolve",)
1211

13-
__value: Awaitable[Dispatcher[I, O]]
12+
__resolve: Callable[[], Awaitable[Dispatcher[I, O]]]
1413

1514
def __init__(
1615
self,
1716
dispatcher_type: type[Dispatcher[I, O]] | TypeAliasType | GenericAlias,
1817
/,
19-
*,
20-
injection_module: injection.Module | None = None,
21-
threadsafe: bool | None = None,
18+
di: DIAdapter,
2219
) -> None:
23-
module = injection_module or injection.mod()
24-
self.__value = module.aget_lazy_instance(dispatcher_type, threadsafe=threadsafe)
20+
self.__resolve = di.lazy(dispatcher_type) # type: ignore[arg-type]
2521

2622
async def dispatch(self, input_value: I, /) -> O:
27-
dispatcher = await self.__value
23+
dispatcher = await self.__resolve()
2824
return await dispatcher.dispatch(input_value)

0 commit comments

Comments
 (0)