Skip to content

Commit b3fc07f

Browse files
Merge pull request Pipelex#175 from Pipelex/release/v0.6.1
Release/v0.6.1
2 parents 020c517 + fbb59c5 commit b3fc07f

15 files changed

Lines changed: 279 additions & 424 deletions

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## [v0.6.1] - 2025-07-16
4+
5+
- Can execute pipelines with `input_memory`: It is a `CompactMemory: Dict[str, Dict[str, Any]]`
6+
37
## [v0.6.0] - 2025-07-15
48

59
### Changed

pipelex/client/api_serializer.py

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,13 @@
22
from decimal import Decimal
33
from enum import Enum
44
from pathlib import Path
5-
from typing import Any, Dict, List, cast
5+
from typing import Any, Dict, List, Optional, cast
66

77
from pipelex.client.protocol import CompactMemory
88
from pipelex.core.concept_native import NativeConcept
99
from pipelex.core.pipe_output import PipeOutput
10-
from pipelex.core.stuff_content import StuffContent, TextContent
11-
from pipelex.core.stuff_factory import StuffContentFactory
10+
from pipelex.core.stuff_content import TextContent
1211
from pipelex.core.working_memory import WorkingMemory
13-
from pipelex.exceptions import ApiSerializationError
1412

1513

1614
class ApiSerializer:
@@ -21,7 +19,7 @@ class ApiSerializer:
2119
FIELDS_TO_SKIP = ("__class__", "__module__")
2220

2321
@classmethod
24-
def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> CompactMemory:
22+
def serialize_working_memory_for_api(cls, working_memory: Optional[WorkingMemory] = None) -> CompactMemory:
2523
"""
2624
Convert WorkingMemory to API-ready format using kajson with proper datetime handling.
2725
@@ -32,6 +30,8 @@ def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> Comp
3230
Dict ready for API transmission with datetime strings and no __class__/__module__
3331
"""
3432
compact_memory: CompactMemory = {}
33+
if working_memory is None:
34+
return compact_memory
3535

3636
for stuff_name, stuff in working_memory.root.items():
3737
if stuff.concept_code == NativeConcept.TEXT.code:
@@ -101,24 +101,3 @@ def _clean_and_format_content(cls, content: Any) -> Any:
101101
return str(content) # Convert Path to string representation
102102
else:
103103
return content
104-
105-
@classmethod
106-
def make_stuff_content_from_api_data(cls, concept_code: str, value: Dict[str, Any] | str) -> StuffContent:
107-
"""
108-
Create StuffContent from API data using concept code.
109-
110-
Args:
111-
concept_code: The concept code to determine the content type
112-
value: The content value from API
113-
114-
Returns:
115-
StuffContent instance
116-
117-
Raises:
118-
ApiSerializationError: If concept cannot be resolved or content creation fails
119-
"""
120-
try:
121-
return StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=value)
122-
123-
except Exception as exc:
124-
raise ApiSerializationError(f"Failed to create StuffContent for concept '{concept_code}': {exc}") from exc

pipelex/client/client.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55

66
from pipelex.client.pipeline_request_factory import PipelineRequestFactory
77
from pipelex.client.pipeline_response_factory import PipelineResponseFactory
8-
from pipelex.client.protocol import PipelexProtocol, PipelineResponse
8+
from pipelex.client.protocol import CompactMemory, PipelexProtocol, PipelineResponse
99
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
1010
from pipelex.core.working_memory import WorkingMemory
11+
from pipelex.core.working_memory_factory import WorkingMemoryFactory
1112
from pipelex.exceptions import ClientAuthenticationError
1213
from pipelex.tools.environment import get_required_env
1314

@@ -75,10 +76,16 @@ async def execute_pipeline(
7576
self,
7677
pipe_code: str,
7778
working_memory: Optional[WorkingMemory] = None,
79+
input_memory: Optional[CompactMemory] = None,
7880
output_name: Optional[str] = None,
7981
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
8082
dynamic_output_concept_code: Optional[str] = None,
8183
) -> PipelineResponse:
84+
if working_memory and input_memory:
85+
raise ValueError(f"working_memory and input_memory cannot be provided together to the API execute_pipeline {pipe_code=}")
86+
87+
if input_memory is not None:
88+
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)
8289
pipeline_request = PipelineRequestFactory.make_from_working_memory(
8390
working_memory=working_memory,
8491
output_name=output_name,
@@ -93,10 +100,17 @@ async def start_pipeline(
93100
self,
94101
pipe_code: str,
95102
working_memory: Optional[WorkingMemory] = None,
103+
input_memory: Optional[CompactMemory] = None,
96104
output_name: Optional[str] = None,
97105
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
98106
dynamic_output_concept_code: Optional[str] = None,
99107
) -> PipelineResponse:
108+
if working_memory and input_memory:
109+
raise ValueError(f"working_memory and input_memory cannot be provided together to the API start_pipeline {pipe_code=}")
110+
111+
if input_memory is not None:
112+
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)
113+
100114
pipeline_request = PipelineRequestFactory.make_from_working_memory(
101115
working_memory=working_memory,
102116
output_name=output_name,

pipelex/client/pipeline_request_factory.py

Lines changed: 4 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
from typing import Any, Dict, Optional
22

33
from pipelex.client.api_serializer import ApiSerializer
4-
from pipelex.client.protocol import COMPACT_MEMORY_KEY, CompactMemory, PipelineRequest
4+
from pipelex.client.protocol import COMPACT_MEMORY_KEY, PipelineRequest
55
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
6-
from pipelex.core.stuff_factory import StuffFactory
76
from pipelex.core.working_memory import WorkingMemory
8-
from pipelex.core.working_memory_factory import WorkingMemoryFactory
97

108

119
class PipelineRequestFactory:
@@ -30,48 +28,16 @@ def make_from_working_memory(
3028
Returns:
3129
PipelineRequest with the working memory serialized to reduced format
3230
"""
33-
compact_memory = None
34-
if working_memory is not None:
35-
compact_memory = ApiSerializer.serialize_working_memory_for_api(working_memory)
3631

3732
return PipelineRequest(
38-
compact_memory=compact_memory,
33+
input_memory=ApiSerializer.serialize_working_memory_for_api(working_memory),
3934
output_name=output_name,
4035
output_multiplicity=output_multiplicity,
4136
dynamic_output_concept_code=dynamic_output_concept_code,
4237
)
4338

4439
@staticmethod
45-
def make_working_memory_from_reduced(compact_memory: Optional[CompactMemory]) -> WorkingMemory:
46-
"""
47-
Create a WorkingMemory from a reduced memory dictionary.
48-
49-
Args:
50-
compact_memory: Dictionary in the format from API
51-
52-
Returns:
53-
WorkingMemory object reconstructed from the reduced format
54-
"""
55-
working_memory = WorkingMemoryFactory.make_empty()
56-
if compact_memory is None:
57-
return working_memory
58-
59-
for stuff_key, stuff_data in compact_memory.items():
60-
concept_code = stuff_data.get("concept_code", "")
61-
content_value = stuff_data.get("content", {})
62-
63-
# Use API serializer to create content
64-
content = ApiSerializer.make_stuff_content_from_api_data(concept_code=concept_code, value=content_value)
65-
66-
# Create stuff directly
67-
stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content)
68-
69-
working_memory.add_new_stuff(name=stuff_key, stuff=stuff)
70-
71-
return working_memory
72-
73-
@staticmethod
74-
def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
40+
def make_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
7541
"""
7642
Create a PipelineRequest from raw request body dictionary.
7743
@@ -82,7 +48,7 @@ def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
8248
PipelineRequest object with dictionary working_memory
8349
"""
8450
return PipelineRequest(
85-
compact_memory=request_body.get(COMPACT_MEMORY_KEY),
51+
input_memory=request_body.get(COMPACT_MEMORY_KEY),
8652
output_name=request_body.get("output_name"),
8753
output_multiplicity=request_body.get("output_multiplicity"),
8854
dynamic_output_concept_code=request_body.get("dynamic_output_concept_code"),

pipelex/client/pipeline_response_factory.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,16 @@ def make_from_pipe_output(
3535
Returns:
3636
PipelineResponse with the pipe output serialized to reduced format
3737
"""
38-
reduced_output = None
38+
compact_output = None
3939
if pipe_output is not None:
40-
reduced_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output)
40+
compact_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output)
4141

4242
return PipelineResponse(
4343
pipeline_run_id=pipeline_run_id,
4444
created_at=created_at,
4545
pipeline_state=pipeline_state,
4646
finished_at=finished_at,
47-
pipe_output=reduced_output,
47+
pipe_output=compact_output,
4848
status=status,
4949
message=message,
5050
error=error,

pipelex/client/protocol.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ class PipelineRequest(BaseModel):
4545
Request for executing a pipeline.
4646
4747
Attributes:
48-
compact_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory()
48+
input_memory (Optional[CompactMemory]): In the format of WorkingMemory.to_compact_memory()
4949
output_name (Optional[str]): Name of the output slot to write to
5050
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
5151
dynamic_output_concept_code (Optional[str]): Override for the dynamic output concept code
5252
"""
5353

54-
compact_memory: Optional[CompactMemory] = None
54+
input_memory: Optional[CompactMemory] = None
5555
output_name: Optional[str] = None
5656
output_multiplicity: Optional[PipeOutputMultiplicity] = None
5757
dynamic_output_concept_code: Optional[str] = None
@@ -70,7 +70,7 @@ class PipelineResponse(ApiResponse):
7070
7171
Example of pipe_output:
7272
"pipe_output": {
73-
"compact_memory": {
73+
"input_memory": {
7474
"text": {
7575
"concept_code": "native.Text",
7676
"content": "Some text........"
@@ -125,6 +125,7 @@ async def execute_pipeline(
125125
self,
126126
pipe_code: str,
127127
working_memory: Optional[WorkingMemory] = None,
128+
input_memory: Optional[CompactMemory] = None,
128129
output_name: Optional[str] = None,
129130
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
130131
dynamic_output_concept_code: Optional[str] = None,
@@ -135,6 +136,7 @@ async def execute_pipeline(
135136
Args:
136137
pipe_code (str): The code identifying the pipeline to execute
137138
working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline
139+
input_memory (Optional[CompactMemory]): Input memory passed to the pipeline
138140
output_name (Optional[str]): Target output slot name
139141
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
140142
dynamic_output_concept_code (Optional[str]): Override for dynamic output concept
@@ -152,6 +154,7 @@ async def start_pipeline(
152154
self,
153155
pipe_code: str,
154156
working_memory: Optional[WorkingMemory] = None,
157+
input_memory: Optional[CompactMemory] = None,
155158
output_name: Optional[str] = None,
156159
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
157160
dynamic_output_concept_code: Optional[str] = None,
@@ -162,6 +165,7 @@ async def start_pipeline(
162165
Args:
163166
pipe_code (str): The code identifying the pipeline to execute
164167
working_memory (Optional[WorkingMemory]): Memory context passed to the pipeline
168+
input_memory (Optional[CompactMemory]): Input memory passed to the pipeline
165169
output_name (Optional[str]): Target output slot name
166170
output_multiplicity (Optional[PipeOutputMultiplicity]): Output multiplicity setting
167171
dynamic_output_concept_code (Optional[str]): Override for dynamic output concept

pipelex/core/working_memory_factory.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
from pydantic import BaseModel
66

77
from pipelex import log
8+
from pipelex.client.protocol import CompactMemory
89
from pipelex.core.concept_native import NativeConcept
910
from pipelex.core.stuff import Stuff
1011
from pipelex.core.stuff_content import ImageContent, PDFContent, StuffContent, TextContent
11-
from pipelex.core.stuff_factory import StuffBlueprint, StuffFactory
12+
from pipelex.core.stuff_factory import StuffBlueprint, StuffContentFactory, StuffFactory
1213
from pipelex.core.working_memory import MAIN_STUFF_NAME, StuffDict, WorkingMemory
1314
from pipelex.exceptions import WorkingMemoryFactoryError
1415
from pipelex.tools.misc.json_utils import load_json_dict_from_path
@@ -124,6 +125,31 @@ def make_from_memory_file(cls, memory_file_path: str) -> WorkingMemory:
124125
working_memory = WorkingMemory.model_validate(working_memory_dict)
125126
return working_memory
126127

128+
@classmethod
129+
def make_from_compact_memory(cls, compact_memory: CompactMemory) -> WorkingMemory:
130+
"""
131+
Create a WorkingMemory from a compact memory dictionary.
132+
133+
Args:
134+
compact_memory: Dictionary in the format from API serialization
135+
136+
Returns:
137+
WorkingMemory object reconstructed from the compact format
138+
"""
139+
working_memory = cls.make_empty()
140+
141+
for stuff_key, stuff_data in compact_memory.items():
142+
concept_code = stuff_data.get("concept_code", "")
143+
content_value = stuff_data.get("content", {})
144+
145+
content = StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=content_value)
146+
147+
stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content)
148+
149+
working_memory.add_new_stuff(name=stuff_key, stuff=stuff)
150+
151+
return working_memory
152+
127153
@classmethod
128154
def make_for_dry_run(cls, needed_inputs: List[Tuple[str, str, Type[StuffContent]]]) -> "WorkingMemory":
129155
"""

pipelex/exceptions.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,11 @@ class ApiSerializationError(Exception):
264264
"""Exception raised when API serialization fails."""
265265

266266
pass
267+
268+
269+
class StartPipelineException(Exception):
270+
pass
271+
272+
273+
class ExecutePipelineException(Exception):
274+
pass

pipelex/pipe_works/pipe_job_factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pipelex.core.pipe_run_params import PipeRunParams
55
from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory
66
from pipelex.core.working_memory import WorkingMemory
7+
from pipelex.core.working_memory_factory import WorkingMemoryFactory
78
from pipelex.pipe_works.pipe_job import PipeJob
89
from pipelex.pipeline.job_metadata import JobMetadata
910

@@ -19,7 +20,7 @@ def make_pipe_job(
1920
output_name: Optional[str] = None,
2021
) -> PipeJob:
2122
job_metadata = job_metadata or JobMetadata()
22-
working_memory = working_memory or WorkingMemory()
23+
working_memory = working_memory or WorkingMemoryFactory.make_empty()
2324
if not pipe_run_params:
2425
pipe_run_params = PipeRunParamsFactory.make_run_params()
2526
return PipeJob(

pipelex/pipeline/execute.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from typing import Optional
22

3+
from pipelex.client.protocol import CompactMemory
34
from pipelex.core.pipe_output import PipeOutput
45
from pipelex.core.pipe_run_params import FORCE_DRY_RUN_MODE_ENV_KEY, PipeOutputMultiplicity, PipeRunMode
56
from pipelex.core.pipe_run_params_factory import PipeRunParamsFactory
67
from pipelex.core.working_memory import WorkingMemory
8+
from pipelex.core.working_memory_factory import WorkingMemoryFactory
9+
from pipelex.exceptions import ExecutePipelineException
710
from pipelex.hub import get_pipe_router, get_pipeline_manager, get_report_delegate, get_required_pipe
811
from pipelex.pipe_works.pipe_job_factory import PipeJobFactory
912
from pipelex.pipeline.job_metadata import JobMetadata
@@ -13,6 +16,7 @@
1316
async def execute_pipeline(
1417
pipe_code: str,
1518
working_memory: Optional[WorkingMemory] = None,
19+
input_memory: Optional[CompactMemory] = None,
1620
output_name: Optional[str] = None,
1721
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
1822
dynamic_output_concept_code: Optional[str] = None,
@@ -30,6 +34,8 @@ async def execute_pipeline(
3034
The code of the pipe to execute.
3135
working_memory:
3236
Optional ``WorkingMemory`` instance passed to the pipe.
37+
input_memory:
38+
Optional compact memory to pass to the pipe.
3339
output_name:
3440
Name of the output slot to write to.
3541
output_multiplicity:
@@ -47,6 +53,13 @@ async def execute_pipeline(
4753
Tuple[PipeOutput, str]
4854
A tuple containing the pipe output and the pipeline run ID.
4955
"""
56+
# Can be either working_memory or compact_memory, but not both
57+
if working_memory and input_memory:
58+
raise ExecutePipelineException(f"Cannot pass both working_memory and input_memory to `execute_pipeline` {pipe_code=}")
59+
60+
if input_memory:
61+
working_memory = WorkingMemoryFactory.make_from_compact_memory(input_memory)
62+
5063
if pipe_run_mode is None:
5164
if run_mode_from_env := get_optional_env(key=FORCE_DRY_RUN_MODE_ENV_KEY):
5265
pipe_run_mode = PipeRunMode(run_mode_from_env)

0 commit comments

Comments
 (0)