Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/environment/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ runs:

- name: Install Dependencies
shell: bash
run: uv sync
run: uv sync --all-extras
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
before_commit: lint mypy pytest

install:
uv sync
uv sync --all-extras

update:
uv lock --upgrade
uv sync
uv sync --all-extras

lint:
uv run ruff format
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@

Documentation: https://python-cq.remimd.dev

Python package designed to organize your code following CQRS principles. It builds on top of [python-injection](https://github.com/100nm/python-injection) for dependency injection.
**python-cq** is a Python package designed to organize your code following CQRS principles. It provides a `DIAdapter` protocol for dependency injection, with [python-injection](https://github.com/100nm/python-injection) as the default implementation available via the `[injection]` extra.

## Installation

⚠️ _Requires Python 3.12 or higher_

Without dependency injection:
```bash
pip install python-cq
```

With [python-injection](https://github.com/100nm/python-injection) as the DI backend (recommended):
```bash
pip install "python-cq[injection]"
```
11 changes: 9 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import pytest
from injection.testing import load_test_profile, set_test_constant

from cq import Bus, CommandBus, EventBus, QueryBus
from cq import (
Bus,
CommandBus,
EventBus,
QueryBus,
new_command_bus,
new_event_bus,
new_query_bus,
)
from cq._core.dispatcher.bus import SimpleBus
from cq._core.message import new_command_bus, new_event_bus, new_query_bus
from tests.helpers.history import HistoryMiddleware


Expand Down
52 changes: 38 additions & 14 deletions cq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from ._core.dispatcher.base import DeferredDispatcher, Dispatcher
from ._core.cq import CQ
from ._core.di import DIAdapter
from ._core.di import NoDI as _NoDI
from ._core.dispatcher.base import Dispatcher
from ._core.dispatcher.bus import Bus
from ._core.dispatcher.lazy import LazyDispatcher
from ._core.dispatcher.pipe import ContextPipeline, Pipe
from ._core.message import (
AnyCommandBus,
Expand All @@ -10,31 +12,24 @@
EventBus,
Query,
QueryBus,
command_handler,
event_handler,
new_command_bus,
new_event_bus,
new_query_bus,
query_handler,
)
from ._core.middleware import Middleware, MiddlewareResult, resolve_handler_source
from ._core.pipetools import ContextCommandPipeline
from ._core.related_events import RelatedEvents
from ._core.scope import CQScope
from ._core.pipetools import ContextCommandPipeline as _ContextCommandPipeline
from ._core.related_events import AnyIORelatedEvents, RelatedEvents

__all__ = (
"AnyCommandBus",
"AnyIORelatedEvents",
"Bus",
"CQScope",
"CQ",
"Command",
"CommandBus",
"ContextCommandPipeline",
"ContextPipeline",
"DeferredDispatcher",
"DIAdapter",
"Dispatcher",
"Event",
"EventBus",
"LazyDispatcher",
"Middleware",
"MiddlewareResult",
"Pipe",
Expand All @@ -49,3 +44,32 @@
"query_handler",
"resolve_handler_source",
)

try:
from cq.ext.injection import InjectionAdapter as _InjectionAdapter

except ImportError: # pragma: no cover
_default = CQ(_NoDI())

else:
_default = CQ(_InjectionAdapter())

_default.register_defaults()

command_handler = _default.command_handler
event_handler = _default.event_handler
query_handler = _default.query_handler

new_command_bus = _default.new_command_bus
new_event_bus = _default.new_event_bus
new_query_bus = _default.new_query_bus


class ContextCommandPipeline[C: Command](_ContextCommandPipeline[C]):
__slots__ = ()

def __init__(self, di: DIAdapter = _default.di) -> None:
super().__init__(di)


del _default
63 changes: 63 additions & 0 deletions cq/_core/cq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from typing import Any, Self

from cq._core.di import DIAdapter
from cq._core.dispatcher.bus import Bus, SimpleBus, TaskBus
from cq._core.handler import (
HandlerDecorator,
HandlerRegistry,
MultipleHandlerRegistry,
SingleHandlerRegistry,
)
from cq._core.message import Command, Event, Query
from cq._core.middlewares.scope import CommandDispatchScopeMiddleware


class CQ:
__slots__ = ("__command_registry", "__di", "__event_registry", "__query_registry")

__command_registry: HandlerRegistry[Command, Any]
__di: DIAdapter
__event_registry: HandlerRegistry[Event, Any]
__query_registry: HandlerRegistry[Query, Any]

def __init__(self, di: DIAdapter, /) -> None:
self.__di = di
self.__command_registry = SingleHandlerRegistry()
self.__event_registry = MultipleHandlerRegistry()
self.__query_registry = SingleHandlerRegistry()

@property
def di(self) -> DIAdapter:
return self.__di

@property
def command_handler(self) -> HandlerDecorator[Command, Any]:
return HandlerDecorator(self.__command_registry, self.__di)

@property
def event_handler(self) -> HandlerDecorator[Event, Any]:
return HandlerDecorator(self.__event_registry, self.__di)

@property
def query_handler(self) -> HandlerDecorator[Query, Any]:
return HandlerDecorator(self.__query_registry, self.__di)

def new_command_bus(self) -> Bus[Command, Any]:
bus = SimpleBus(self.__command_registry)
command_middleware = CommandDispatchScopeMiddleware(self.__di)
bus.add_middlewares(command_middleware)
return bus

def new_event_bus(self) -> Bus[Event, None]:
return TaskBus(self.__event_registry)

def new_query_bus(self) -> Bus[Query, Any]:
return SimpleBus(self.__query_registry)

def register_defaults(self) -> Self:
self.__di.register_defaults(
self.new_command_bus,
self.new_event_bus,
self.new_query_bus,
)
return self
113 changes: 113 additions & 0 deletions cq/_core/di.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from __future__ import annotations

from abc import abstractmethod
from collections.abc import Awaitable, Callable
from contextlib import nullcontext
from typing import TYPE_CHECKING, Any, AsyncContextManager, Protocol, runtime_checkable

if TYPE_CHECKING:
from cq import CommandBus, EventBus, QueryBus


@runtime_checkable
class DIAdapter(Protocol):
"""
Protocol for integrating a dependency injection container with python-cq.

Implement this protocol to connect your DI framework to the CQ buses.
A concrete implementation (``InjectionAdapter``) is provided via the
``python-cq[injection]`` extra for projects that use *python-injection*.
"""

__slots__ = ()

@abstractmethod
def command_scope(self) -> AsyncContextManager[None]:
"""
Return an async context manager that delimits the lifetime of a
command dispatch.

**Responsibilities**

The scope must at minimum manage the lifecycle of a ``RelatedEvents``
instance and register it so that it is resolvable via injection for
the duration of the scope.

**Nested calls**

``command_scope`` is entered in two distinct situations:

1. Around a standard command dispatch (via
``CommandDispatchScopeMiddleware``).
2. Around each step of a ``ContextCommandPipeline``, which itself
wraps a command dispatch.

This means two nested calls can occur for a single logical command.
Implementations must detect re-entrant activation (e.g. a scope
already active on the current task) and silently ignore the inner
call instead of opening a second, conflicting scope.
"""

raise NotImplementedError

@abstractmethod
def lazy[T](self, tp: type[T]) -> Callable[[], Awaitable[T]]:
"""
Return a callable that resolves an instance of ``tp`` in two steps.

1. ``lazy(tp)`` obtains a resolver from the DI framework for ``tp``.
2. Calling and awaiting the returned callable performs the actual
resolution and returns the instance.
"""

raise NotImplementedError

def register_defaults(
self,
command_bus: Callable[..., CommandBus[Any]],
event_bus: Callable[..., EventBus],
query_bus: Callable[..., QueryBus[Any]],
) -> None:
"""
Register the CQ buses as default providers in the DI container.

Called once during setup so that handlers and middlewares can
declare ``CommandBus``, ``EventBus``, or ``QueryBus`` as
constructor dependencies and receive the configured instances.

The default implementation is a no-op for adapters that do not
need automatic bus registration.
"""

return

@abstractmethod
def wire[T](self, tp: type[T]) -> Callable[..., Awaitable[T]]:
"""
Return an async factory that instantiates ``tp`` with injected
dependencies.

Used internally to build handler instances whose dependencies are
resolved by the container.
"""

raise NotImplementedError


class NoDI(DIAdapter):
__slots__ = ()

def command_scope(self) -> AsyncContextManager[None]:
return nullcontext()

def lazy[T](self, tp: type[T], /) -> Callable[[], Awaitable[T]]:
tp_str = getattr(tp, "__name__", str(tp))
raise RuntimeError(
f"Can't lazily resolve {tp_str}: no DI container configured."
)

def wire[T](self, tp: type[T], /) -> Callable[..., Awaitable[T]]:
async def factory() -> T:
return tp()

return factory
9 changes: 0 additions & 9 deletions cq/_core/dispatcher/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ async def dispatch(self, input_value: I, /) -> O:
raise NotImplementedError


@runtime_checkable
class DeferredDispatcher[I](Protocol):
__slots__ = ()

@abstractmethod
async def defer(self, input_value: I, /) -> None:
raise NotImplementedError


class BaseDispatcher[I, O](Dispatcher[I, O], ABC):
__slots__ = ("__middleware_group",)

Expand Down
18 changes: 7 additions & 11 deletions cq/_core/dispatcher/lazy.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
from collections.abc import Awaitable
from collections.abc import Awaitable, Callable
from types import GenericAlias
from typing import TypeAliasType

import injection

from cq._core.di import DIAdapter
from cq._core.dispatcher.base import Dispatcher


class LazyDispatcher[I, O](Dispatcher[I, O]):
__slots__ = ("__value",)
__slots__ = ("__resolve",)

__value: Awaitable[Dispatcher[I, O]]
__resolve: Callable[[], Awaitable[Dispatcher[I, O]]]

def __init__(
self,
dispatcher_type: type[Dispatcher[I, O]] | TypeAliasType | GenericAlias,
/,
*,
injection_module: injection.Module | None = None,
threadsafe: bool | None = None,
di: DIAdapter,
) -> None:
module = injection_module or injection.mod()
self.__value = module.aget_lazy_instance(dispatcher_type, threadsafe=threadsafe)
self.__resolve = di.lazy(dispatcher_type) # type: ignore[arg-type]

async def dispatch(self, input_value: I, /) -> O:
dispatcher = await self.__value
dispatcher = await self.__resolve()
return await dispatcher.dispatch(input_value)
Loading
Loading