Skip to content

Commit f4a80cb

Browse files
authored
feat: ✨ Retry middleware
1 parent 565b6b7 commit f4a80cb

12 files changed

Lines changed: 238 additions & 133 deletions

File tree

conftest.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from typing import Any
2+
3+
import pytest
4+
5+
from cq import Bus
6+
from cq._core.bus import SimpleBus
7+
from tests.helpers.history import HistoryMiddleware
8+
9+
10+
@pytest.fixture(scope="function")
11+
def bus() -> Bus[Any, Any]:
12+
return SimpleBus()
13+
14+
15+
@pytest.fixture(scope="function")
16+
def history() -> HistoryMiddleware:
17+
return HistoryMiddleware()

cq/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1+
from ._core.bus import Bus
12
from ._core.command import Command, CommandBus, command_handler, find_command_bus
23
from ._core.dto import DTO
34
from ._core.event import Event, EventBus, event_handler, find_event_bus
45
from ._core.middleware import Middleware, MiddlewareResult
56
from ._core.query import Query, QueryBus, find_query_bus, query_handler
67

78
__all__ = (
9+
"Bus",
810
"Command",
911
"CommandBus",
1012
"DTO",

cq/_core/bus.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ def dispatch_no_wait(self, first_input_value: I, /, *input_values: I) -> None:
3939
*(
4040
self.dispatch(input_value)
4141
for input_value in (first_input_value, *input_values)
42-
)
42+
),
43+
return_exceptions=True,
4344
)
4445

4546
@abstractmethod

cq/middlewares/__init__.py

Whitespace-only changes.

cq/middlewares/retry.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import asyncio
2+
from typing import Any
3+
4+
from cq import MiddlewareResult
5+
6+
7+
class RetryMiddleware:
8+
__slots__ = ("__delay", "__exceptions", "__retry")
9+
10+
def __init__(
11+
self,
12+
retry: int,
13+
delay: float = 0,
14+
exceptions: tuple[type[BaseException], ...] = (Exception,),
15+
) -> None:
16+
self.__delay = delay
17+
self.__exceptions = exceptions
18+
self.__retry = retry
19+
20+
async def __call__(self, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
21+
retry = self.__retry
22+
23+
for attempt in range(1, retry + 1):
24+
try:
25+
yield
26+
27+
except self.__exceptions as exc:
28+
if attempt == retry:
29+
raise exc
30+
31+
else:
32+
break
33+
34+
await asyncio.sleep(self.__delay)

poetry.lock

Lines changed: 64 additions & 74 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/core/test_bus.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99

1010

1111
class TestSimpleBus:
12-
@pytest.fixture(scope="function")
13-
def bus(self) -> SimpleBus[Any, Any]:
14-
return SimpleBus()
15-
1612
def test_add_middlewares_with_success_return_self(
1713
self,
1814
bus: SimpleBus[Any, Any],
@@ -93,26 +89,29 @@ class BadHandler: ...
9389

9490
class TestTaskBus:
9591
@pytest.fixture(scope="function")
96-
def bus(self) -> TaskBus[Any]:
92+
def task_bus(self) -> TaskBus[Any]:
9793
return TaskBus()
9894

99-
def test_subscribe_with_success_return_self(self, bus: TaskBus[Any]) -> None:
100-
assert bus.subscribe(str, _SomeTaskHandler) is bus
95+
def test_subscribe_with_success_return_self(self, task_bus: TaskBus[Any]) -> None:
96+
assert task_bus.subscribe(str, _SomeTaskHandler) is task_bus
10197
# Checks whether several handlers can be subscribed for the same input type
102-
assert bus.subscribe(str, _SomeTaskHandler) is bus
98+
assert task_bus.subscribe(str, _SomeTaskHandler) is task_bus
10399

104-
async def test_dispatch_with_success_return_none(self, bus: TaskBus[Any]) -> None:
105-
bus.subscribe(str, _SomeTaskHandler)
100+
async def test_dispatch_with_success_return_none(
101+
self,
102+
task_bus: TaskBus[Any],
103+
) -> None:
104+
task_bus.subscribe(str, _SomeTaskHandler)
106105

107106
with pytest.raises(NotImplementedError):
108-
await bus.dispatch("hello")
107+
await task_bus.dispatch("hello")
109108

110109
async def test_dispatch_with_unknown_input_type_return_none(
111110
self,
112-
bus: TaskBus[Any],
111+
task_bus: TaskBus[Any],
113112
) -> None:
114113
# Do nothing
115-
await bus.dispatch("hello")
114+
await task_bus.dispatch("hello")
116115

117116

118117
class _SomeHandler:

tests/core/test_middleware.py

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,29 @@
11
from __future__ import annotations
22

3-
from collections.abc import Mapping
4-
from enum import IntEnum
5-
from typing import Any, NamedTuple
3+
from typing import Any
64

75
import pytest
86

97
from cq._core.middleware import MiddlewareGroup, MiddlewareResult
8+
from tests.helpers.history import HistoryMiddleware
109

1110

1211
class TestMiddlewareGroup:
1312
@pytest.fixture(scope="function")
1413
def group(self) -> MiddlewareGroup[..., Any]:
1514
return MiddlewareGroup()
1615

17-
@pytest.fixture(scope="function")
18-
def history(self) -> _HistoryMiddleware:
19-
return _HistoryMiddleware()
20-
2116
def test_add_with_success_return_self(
2217
self,
2318
group: MiddlewareGroup[..., Any],
24-
history: _HistoryMiddleware,
19+
history: HistoryMiddleware,
2520
) -> None:
2621
assert group.add(history) is group
2722

2823
async def test_invoke_with_success_return_any(
2924
self,
3025
group: MiddlewareGroup[..., Any],
31-
history: _HistoryMiddleware,
26+
history: HistoryMiddleware,
3227
) -> None:
3328
async def handler() -> str:
3429
return "I'm a handler..."
@@ -43,18 +38,20 @@ async def handler() -> str:
4338
assert record.args == ()
4439
assert record.kwargs == {}
4540
assert record.result == result
46-
assert record.status == _Status.SUCCESS
41+
assert record.is_success
4742

4843
async def test_invoke_with_exception_raise_any(
4944
self,
5045
group: MiddlewareGroup[..., Any],
51-
history: _HistoryMiddleware,
46+
history: HistoryMiddleware,
5247
) -> None:
5348
async def handler() -> str:
5449
raise ValueError("I failed...")
5550

5651
group.add(history)
57-
assert await group.invoke(handler) is NotImplemented
52+
53+
with pytest.raises(ValueError):
54+
await group.invoke(handler)
5855

5956
records = history.records
6057
assert len(records) == 1
@@ -63,12 +60,12 @@ async def handler() -> str:
6360
assert record.args == ()
6461
assert record.kwargs == {}
6562
assert isinstance(record.result, ValueError)
66-
assert record.status == _Status.FAILED
63+
assert record.is_failed
6764

6865
async def test_invoke_with_multiple_yield_return_any(
6966
self,
7067
group: MiddlewareGroup[..., Any],
71-
history: _HistoryMiddleware,
68+
history: HistoryMiddleware,
7269
) -> None:
7370
async def handler() -> str:
7471
return "I'm a handler..."
@@ -80,37 +77,6 @@ async def handler() -> str:
8077
assert len(records) == 2
8178

8279

83-
class _Status(IntEnum):
84-
SUCCESS = 1
85-
FAILED = 0
86-
87-
88-
class _Record(NamedTuple):
89-
args: tuple[Any, ...]
90-
kwargs: Mapping[str, Any]
91-
result: Any
92-
status: _Status
93-
94-
95-
class _HistoryMiddleware:
96-
def __init__(self) -> None:
97-
self.__records: list[_Record] = []
98-
99-
@property
100-
def records(self) -> tuple[_Record, ...]:
101-
return tuple(self.__records)
102-
103-
async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
104-
try:
105-
result = yield
106-
except BaseException as exc:
107-
record = _Record(args, kwargs, exc, _Status.FAILED)
108-
else:
109-
record = _Record(args, kwargs, result, _Status.SUCCESS)
110-
111-
self.__records.append(record)
112-
113-
11480
async def _exec_2_times_middleware(*args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
11581
yield
11682
yield

tests/helpers/__init__.py

Whitespace-only changes.

tests/helpers/history.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from enum import IntEnum
2+
from typing import Any, Mapping, NamedTuple
3+
4+
from cq import MiddlewareResult
5+
6+
7+
class HistoryRecordStatus(IntEnum):
8+
SUCCESS = 1
9+
FAILED = 0
10+
11+
12+
class HistoryRecord(NamedTuple):
13+
args: tuple[Any, ...]
14+
kwargs: Mapping[str, Any]
15+
result: Any
16+
status: HistoryRecordStatus
17+
18+
@property
19+
def is_success(self) -> bool:
20+
return self.status == HistoryRecordStatus.SUCCESS
21+
22+
@property
23+
def is_failed(self) -> bool:
24+
return self.status == HistoryRecordStatus.FAILED
25+
26+
27+
class HistoryMiddleware:
28+
def __init__(self) -> None:
29+
self.__records: list[HistoryRecord] = []
30+
31+
@property
32+
def records(self) -> tuple[HistoryRecord, ...]:
33+
return tuple(self.__records)
34+
35+
async def __call__(self, /, *args: Any, **kwargs: Any) -> MiddlewareResult[Any]:
36+
try:
37+
result = yield
38+
except BaseException as exc:
39+
record = HistoryRecord(args, kwargs, exc, HistoryRecordStatus.FAILED)
40+
raise exc
41+
else:
42+
record = HistoryRecord(args, kwargs, result, HistoryRecordStatus.SUCCESS)
43+
finally:
44+
self.__records.append(record)

0 commit comments

Comments
 (0)