22
33Pipelines are designed to execute several commands one after the other, while encapsulating them in middleware.
44
5+ ## Pipe
6+
7+ A simple pipeline implementation.
8+
59Example:
610
711``` python
@@ -12,14 +16,54 @@ async def pipeline_example(command_bus: AnyCommandBus) -> None:
1216 pipeline.add_middlewares(... ) # You can add middleware to encapsulate the pipeline in a transaction, for example.
1317
1418 @pipeline.step
15- async def converter_1 ( output_value : FirstResult) -> SecondCommand:
19+ async def _ ( result : FirstResult) -> SecondCommand:
1620 """ Transform the return value into a new command. """
1721
1822 @pipeline.step
19- async def converter_2 ( output_value : SecondResult) -> ThirdCommand:
23+ async def _ ( result : SecondResult) -> ThirdCommand:
2024 """ Transform the return value into a new command. """
2125
2226 command = FirstCommand(... )
2327 output_value = await pipeline.dispatch(command)
2428 # ...
2529```
30+
31+ ## ContextPipeline
32+
33+ ` Pipe ` has a big problem: it isn't possible to have values that go through the steps. Each converter in a ` Pipe `
34+ receives only the output of the previous one, but the intermediate state, any contextual information, is lost between
35+ steps.
36+
37+ This makes it impossible to accumulate data or share context between converters without resorting to global variables.
38+
39+ To solve this limitation, ` ContextPipeline ` introduces a contextual layer: each stage operates within the same instance,
40+ allowing stateful processing and side effects to persist across the entire pipeline execution.
41+
42+ Example:
43+
44+ ``` python
45+ from cq import ContextCommandPipeline, ContextPipeline
46+
47+ class ContextExample :
48+ user_id: int
49+
50+ pipeline: ContextPipeline[FirstCommand] = ContextCommandPipeline()
51+
52+ @pipeline.step
53+ async def _ (self , result : FirstResult) -> SecondCommand:
54+ self .user_id = result.user_id # set user_id in context
55+ return SecondCommand(... )
56+
57+ @pipeline.step
58+ async def _ (self , result : SecondResult) -> ThirdCommand:
59+ """ Transform the return value into a new command. """
60+
61+ @pipeline.step
62+ async def _ (self , result : ThirdResult) -> None :
63+ """ The last step is optional, but if you need it, you must return `None`. """
64+
65+ async def how_to_dispatch () -> None :
66+ command = FirstCommand(... )
67+ context = await ContextExample().pipeline.dispatch(command)
68+ # ...
69+ ```
0 commit comments