|
| 1 | +from datetime import datetime |
| 2 | +from decimal import Decimal |
| 3 | +from enum import Enum |
| 4 | +from pathlib import Path |
| 5 | +from typing import Any, Dict, List, cast |
| 6 | + |
| 7 | +from pipelex.client.protocol import CompactMemory |
| 8 | +from pipelex.core.concept_native import NativeConcept |
| 9 | +from pipelex.core.pipe_output import PipeOutput |
| 10 | +from pipelex.core.stuff_content import StuffContent, TextContent |
| 11 | +from pipelex.core.stuff_factory import StuffContentFactory |
| 12 | +from pipelex.core.working_memory import WorkingMemory |
| 13 | +from pipelex.exceptions import ApiSerializationError |
| 14 | + |
| 15 | + |
| 16 | +class ApiSerializer: |
| 17 | + """Handles API-specific serialization with kajson, datetime formatting, and cleanup.""" |
| 18 | + |
| 19 | + # Fixed datetime format for API consistency |
| 20 | + API_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S" |
| 21 | + FIELDS_TO_SKIP = ("__class__", "__module__") |
| 22 | + |
| 23 | + @classmethod |
| 24 | + def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> CompactMemory: |
| 25 | + """ |
| 26 | + Convert WorkingMemory to API-ready format using kajson with proper datetime handling. |
| 27 | +
|
| 28 | + Args: |
| 29 | + working_memory: The WorkingMemory to serialize |
| 30 | +
|
| 31 | + Returns: |
| 32 | + Dict ready for API transmission with datetime strings and no __class__/__module__ |
| 33 | + """ |
| 34 | + compact_memory: CompactMemory = {} |
| 35 | + |
| 36 | + for stuff_name, stuff in working_memory.root.items(): |
| 37 | + if stuff.concept_code == NativeConcept.TEXT.code: |
| 38 | + stuff_content = cast(TextContent, stuff.content) |
| 39 | + item_dict: Dict[str, Any] = { |
| 40 | + "concept_code": stuff.concept_code, |
| 41 | + "content": stuff_content.text, |
| 42 | + } |
| 43 | + else: |
| 44 | + content_dict = stuff.content.model_dump(serialize_as_any=True) |
| 45 | + clean_content = cls._clean_and_format_content(content_dict) |
| 46 | + |
| 47 | + item_dict = { |
| 48 | + "concept_code": stuff.concept_code, |
| 49 | + "content": clean_content, |
| 50 | + } |
| 51 | + |
| 52 | + compact_memory[stuff_name] = item_dict |
| 53 | + |
| 54 | + return compact_memory |
| 55 | + |
| 56 | + @classmethod |
| 57 | + def serialize_pipe_output_for_api(cls, pipe_output: PipeOutput) -> CompactMemory: |
| 58 | + """ |
| 59 | + Convert PipeOutput to API-ready format. |
| 60 | +
|
| 61 | + Args: |
| 62 | + pipe_output: The PipeOutput to serialize |
| 63 | +
|
| 64 | + Returns: |
| 65 | + Dict ready for API transmission |
| 66 | + """ |
| 67 | + return {"compact_memory": cls.serialize_working_memory_for_api(pipe_output.working_memory)} |
| 68 | + |
| 69 | + @classmethod |
| 70 | + def _clean_and_format_content(cls, content: Any) -> Any: |
| 71 | + """ |
| 72 | + Recursively clean content by removing the fields in FIELDS_TO_SKIP and formatting datetimes. |
| 73 | +
|
| 74 | + Args: |
| 75 | + content: Content to clean |
| 76 | +
|
| 77 | + Returns: |
| 78 | + Cleaned content with formatted datetimes |
| 79 | + """ |
| 80 | + if isinstance(content, dict): |
| 81 | + cleaned: Dict[str, Any] = {} |
| 82 | + content_dict = cast(Dict[str, Any], content) |
| 83 | + for key in content_dict: |
| 84 | + if key in cls.FIELDS_TO_SKIP: |
| 85 | + continue |
| 86 | + cleaned[key] = cls._clean_and_format_content(content_dict[key]) |
| 87 | + return cleaned |
| 88 | + elif isinstance(content, list): |
| 89 | + cleaned_list: List[Any] = [] |
| 90 | + content_list = cast(List[Any], content) |
| 91 | + for idx in range(len(content_list)): |
| 92 | + cleaned_list.append(cls._clean_and_format_content(content_list[idx])) |
| 93 | + return cleaned_list |
| 94 | + elif isinstance(content, datetime): |
| 95 | + return content.strftime(cls.API_DATETIME_FORMAT) |
| 96 | + elif isinstance(content, Enum): |
| 97 | + return content.value # Convert enum to its value |
| 98 | + elif isinstance(content, Decimal): |
| 99 | + return float(content) # Convert Decimal to float for JSON compatibility |
| 100 | + elif isinstance(content, Path): |
| 101 | + return str(content) # Convert Path to string representation |
| 102 | + else: |
| 103 | + return content |
| 104 | + |
| 105 | + @classmethod |
| 106 | + def make_stuff_content_from_api_data(cls, concept_code: str, value: Dict[str, Any] | str) -> StuffContent: |
| 107 | + """ |
| 108 | + Create StuffContent from API data using concept code. |
| 109 | +
|
| 110 | + Args: |
| 111 | + concept_code: The concept code to determine the content type |
| 112 | + value: The content value from API |
| 113 | +
|
| 114 | + Returns: |
| 115 | + StuffContent instance |
| 116 | +
|
| 117 | + Raises: |
| 118 | + ApiSerializationError: If concept cannot be resolved or content creation fails |
| 119 | + """ |
| 120 | + try: |
| 121 | + return StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=value) |
| 122 | + |
| 123 | + except Exception as exc: |
| 124 | + raise ApiSerializationError(f"Failed to create StuffContent for concept '{concept_code}': {exc}") from exc |
0 commit comments