Skip to content

Commit 1ff4111

Browse files
authored
feat: ✨ Introduce DeferredBus
1 parent 23ebb68 commit 1ff4111

10 files changed

Lines changed: 155 additions & 37 deletions

File tree

cq/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from ._core.defer import DeferredBus
12
from ._core.dispatcher.bus import Bus
23
from ._core.dispatcher.pipe import Pipe
34
from ._core.message import (
@@ -25,6 +26,7 @@
2526
"CQScope",
2627
"Command",
2728
"CommandBus",
29+
"DeferredBus",
2830
"Event",
2931
"EventBus",
3032
"Middleware",

cq/_core/defer.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from abc import abstractmethod
2+
from typing import Protocol
3+
4+
5+
class DeferredBus[I](Protocol):
6+
__slots__ = ()
7+
8+
@abstractmethod
9+
async def defer(self, input_value: I, /) -> None:
10+
raise NotImplementedError

cq/ext/__init__.py

Whitespace-only changes.

cq/ext/fastapi.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from dataclasses import dataclass
2+
from typing import Annotated, Any
3+
4+
from fastapi import BackgroundTasks, Depends
5+
from injection.ext.fastapi import Inject
6+
7+
from cq import Bus, Command, CommandBus, DeferredBus, Event, EventBus, Query, QueryBus
8+
9+
__all__ = ("DeferredCommandBus", "DeferredEventBus", "DeferredQueryBus")
10+
11+
12+
@dataclass(repr=False, eq=False, frozen=True, slots=True)
13+
class FastAPIDeferredBus[I](DeferredBus[I]):
14+
background_tasks: BackgroundTasks
15+
bus: Bus[I, Any]
16+
17+
async def defer(self, input_value: I, /) -> None:
18+
self.background_tasks.add_task(self.bus.dispatch, input_value)
19+
20+
21+
async def new_deferred_command_bus[T](
22+
background_tasks: BackgroundTasks,
23+
command_bus: Inject[CommandBus[T]],
24+
) -> DeferredBus[Command]:
25+
return FastAPIDeferredBus(background_tasks, command_bus)
26+
27+
28+
async def new_deferred_event_bus(
29+
background_tasks: BackgroundTasks,
30+
event_bus: Inject[EventBus],
31+
) -> DeferredBus[Event]:
32+
return FastAPIDeferredBus(background_tasks, event_bus)
33+
34+
35+
async def new_deferred_query_bus[T](
36+
background_tasks: BackgroundTasks,
37+
query_bus: Inject[QueryBus[T]],
38+
) -> DeferredBus[Query]:
39+
return FastAPIDeferredBus(background_tasks, query_bus)
40+
41+
42+
DeferredCommandBus = Annotated[DeferredBus[Command], Depends(new_deferred_command_bus)]
43+
DeferredEventBus = Annotated[DeferredBus[Event], Depends(new_deferred_event_bus)]
44+
DeferredQueryBus = Annotated[DeferredBus[Query], Depends(new_deferred_query_bus)]

cq/ext/fastapi.pyi

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from cq import Command, DeferredBus, Event, Query
2+
3+
type DeferredCommandBus = DeferredBus[Command]
4+
type DeferredEventBus = DeferredBus[Event]
5+
type DeferredQueryBus = DeferredBus[Query]

documentation/fastapi-example.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Here's an example of its integration:
66

77
```python
88
from cq import CommandBus, command_handler, new_command_bus
9+
from cq.ext.fastapi import DeferredCommandBus
910
from fastapi import FastAPI, status
1011
from injection import injectable
1112
from injection.ext.fastapi import Inject
@@ -48,4 +49,14 @@ async def example(
4849
) -> None:
4950
result = await command_bus.dispatch(command)
5051
# ...
52+
53+
@app.post("/background-example", status_code=status.HTTP_204_NO_CONTENT)
54+
async def background_example(
55+
command: ExampleCommand,
56+
command_bus: DeferredCommandBus,
57+
) -> None:
58+
# runs the command in the background
59+
# so the client receives a response more quickly
60+
# but isn't notified in case of error
61+
await command_bus.defer(command)
5162
```

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ example = [
1414
"pydantic",
1515
]
1616
test = [
17+
"fastapi",
18+
"httpx",
1719
"pytest",
1820
"pytest-asyncio",
1921
"pytest-cov",

tests/ext/__init__.py

Whitespace-only changes.

tests/ext/test_fastapi.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from collections.abc import Iterator
2+
from typing import ClassVar
3+
4+
import pytest
5+
from fastapi import FastAPI
6+
from fastapi.testclient import TestClient
7+
8+
from cq import command_handler
9+
from cq.ext.fastapi import DeferredCommandBus
10+
11+
app = FastAPI()
12+
13+
14+
class DeferredCommand: ...
15+
16+
17+
@command_handler
18+
class DeferredCommandHandler:
19+
call_count: ClassVar[int] = 0
20+
21+
async def handle(self, command: DeferredCommand) -> None:
22+
DeferredCommandHandler.call_count += 1
23+
24+
25+
@app.post("/defer", status_code=204)
26+
async def defer(bus: DeferredCommandBus) -> None:
27+
command = DeferredCommand()
28+
await bus.defer(command)
29+
30+
31+
class TestFastAPIDeferredBus:
32+
@pytest.fixture(scope="class")
33+
def client(self) -> Iterator[TestClient]:
34+
with TestClient(app) as client:
35+
yield client
36+
37+
def test_defer_with_deferred_command_bus(self, client: TestClient) -> None:
38+
response = client.post("/defer")
39+
assert response.status_code == 204
40+
assert DeferredCommandHandler.call_count == 1

uv.lock

Lines changed: 41 additions & 37 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)