Skip to content

Commit 9e0ba1a

Browse files
authored
Merge pull request #13 from ringoldsdev/feat/20250728/context-manager
Feat/20250728/context manager
2 parents 1967dad + 752c6cb commit 9e0ba1a

17 files changed

+784
-525
lines changed

laygo/context/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
Laygo Context Management Package.
3+
4+
This package provides different strategies for managing state (context)
5+
within a data pipeline, from simple in-memory dictionaries to
6+
process-safe managers for parallel execution.
7+
"""
8+
9+
from .parallel import ParallelContextManager
10+
from .simple import SimpleContextManager
11+
from .types import IContextHandle
12+
from .types import IContextManager
13+
14+
__all__ = [
15+
"IContextManager",
16+
"IContextHandle",
17+
"SimpleContextManager",
18+
"ParallelContextManager",
19+
]

laygo/context/parallel.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
"""
2+
A context manager for parallel and distributed processing using
3+
multiprocessing.Manager to share state across processes.
4+
"""
5+
6+
from collections.abc import Callable
7+
from collections.abc import Iterator
8+
import multiprocessing as mp
9+
from multiprocessing.managers import DictProxy
10+
import threading
11+
from threading import Lock
12+
from typing import Any
13+
from typing import TypeVar
14+
15+
from laygo.context.types import IContextHandle
16+
from laygo.context.types import IContextManager
17+
18+
R = TypeVar("R")
19+
20+
21+
class ParallelContextHandle(IContextHandle):
22+
"""
23+
A lightweight, picklable handle that carries the actual shared objects
24+
(the DictProxy and Lock) to worker processes.
25+
"""
26+
27+
def __init__(self, shared_dict: DictProxy, lock: Lock):
28+
self._shared_dict = shared_dict
29+
self._lock = lock
30+
31+
def create_proxy(self) -> "IContextManager":
32+
"""
33+
Creates a new ParallelContextManager instance that wraps the shared
34+
objects received by the worker process.
35+
"""
36+
return ParallelContextManager(handle=self)
37+
38+
39+
class ParallelContextManager(IContextManager):
40+
"""
41+
A context manager that enables state sharing across processes.
42+
43+
It operates in two modes:
44+
1. Main Mode: When created normally, it starts a multiprocessing.Manager
45+
and creates a shared dictionary and lock.
46+
2. Proxy Mode: When created from a handle, it wraps a DictProxy and Lock
47+
that were received from another process. It does not own the manager.
48+
"""
49+
50+
def __init__(self, initial_context: dict[str, Any] | None = None, handle: ParallelContextHandle | None = None):
51+
"""
52+
Initializes the manager. If a handle is provided, it initializes in
53+
proxy mode; otherwise, it starts a new manager.
54+
"""
55+
if handle:
56+
# --- PROXY MODE INITIALIZATION ---
57+
# This instance is a client wrapping objects from an existing server.
58+
self._manager = None # Proxies do not own the manager process.
59+
self._shared_dict = handle._shared_dict
60+
self._lock = handle._lock
61+
else:
62+
# --- MAIN MODE INITIALIZATION ---
63+
# This instance owns the manager and its shared objects.
64+
self._manager = mp.Manager()
65+
self._shared_dict = self._manager.dict(initial_context or {})
66+
self._lock = self._manager.Lock()
67+
68+
# Thread-local storage for lock state to handle concurrent access
69+
self._local = threading.local()
70+
71+
def _lock_context(self) -> None:
72+
"""Acquire the lock for this context manager."""
73+
if not getattr(self._local, "is_locked", False):
74+
self._lock.acquire()
75+
self._local.is_locked = True
76+
77+
def _unlock_context(self) -> None:
78+
"""Release the lock for this context manager."""
79+
if getattr(self._local, "is_locked", False):
80+
self._lock.release()
81+
self._local.is_locked = False
82+
83+
def _execute_locked(self, operation: Callable[[], R]) -> R:
84+
"""A private helper to execute an operation within a lock."""
85+
if not getattr(self._local, "is_locked", False):
86+
self._lock_context()
87+
try:
88+
return operation()
89+
finally:
90+
self._unlock_context()
91+
else:
92+
return operation()
93+
94+
def get_handle(self) -> ParallelContextHandle:
95+
"""
96+
Returns a picklable handle containing the shared dict and lock.
97+
Only the main instance can generate handles.
98+
"""
99+
if not self._manager:
100+
raise TypeError("Cannot get a handle from a proxy context instance.")
101+
102+
return ParallelContextHandle(self._shared_dict, self._lock)
103+
104+
def shutdown(self) -> None:
105+
"""
106+
Shuts down the background manager process.
107+
This is a no-op for proxy instances.
108+
"""
109+
if self._manager:
110+
self._manager.shutdown()
111+
112+
def __enter__(self) -> "ParallelContextManager":
113+
"""Acquires the lock for use in a 'with' statement."""
114+
self._lock_context()
115+
return self
116+
117+
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
118+
"""Releases the lock."""
119+
self._unlock_context()
120+
121+
def __getitem__(self, key: str) -> Any:
122+
return self._shared_dict[key]
123+
124+
def __setitem__(self, key: str, value: Any) -> None:
125+
self._execute_locked(lambda: self._shared_dict.__setitem__(key, value))
126+
127+
def __delitem__(self, key: str) -> None:
128+
self._execute_locked(lambda: self._shared_dict.__delitem__(key))
129+
130+
def __iter__(self) -> Iterator[str]:
131+
# Iteration needs to copy the keys to be safe across processes
132+
return self._execute_locked(lambda: iter(list(self._shared_dict.keys())))
133+
134+
def __len__(self) -> int:
135+
return self._execute_locked(lambda: len(self._shared_dict))
136+
137+
def to_dict(self) -> dict[str, Any]:
138+
return self._execute_locked(lambda: dict(self._shared_dict))

laygo/context/simple.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""
2+
A simple, dictionary-based context manager for single-process pipelines.
3+
"""
4+
5+
from collections.abc import Iterator
6+
from typing import Any
7+
8+
from laygo.context.types import IContextHandle
9+
from laygo.context.types import IContextManager
10+
11+
12+
class SimpleContextHandle(IContextHandle):
13+
"""
14+
A handle for the SimpleContextManager that provides a reference back to the
15+
original manager instance.
16+
17+
In a single-process environment, the "proxy" is the manager itself, ensuring
18+
all transformers in a chain share the exact same context dictionary.
19+
"""
20+
21+
def __init__(self, manager_instance: "IContextManager"):
22+
self._manager_instance = manager_instance
23+
24+
def create_proxy(self) -> "IContextManager":
25+
"""
26+
Returns the original SimpleContextManager instance.
27+
28+
This ensures that in a non-distributed pipeline, all chained transformers
29+
operate on the same shared dictionary.
30+
"""
31+
return self._manager_instance
32+
33+
34+
class SimpleContextManager(IContextManager):
35+
"""
36+
A basic context manager that uses a standard Python dictionary for state.
37+
38+
This manager is suitable for single-threaded, single-process pipelines where
39+
no state needs to be shared across process boundaries. It is the default
40+
context manager for a Laygo pipeline.
41+
"""
42+
43+
def __init__(self, initial_context: dict[str, Any] | None = None) -> None:
44+
"""
45+
Initializes the context manager with an optional dictionary.
46+
47+
Args:
48+
initial_context: An optional dictionary to populate the context with.
49+
"""
50+
self._context = dict(initial_context or {})
51+
52+
def get_handle(self) -> IContextHandle:
53+
"""
54+
Returns a handle that holds a reference back to this same instance.
55+
"""
56+
return SimpleContextHandle(self)
57+
58+
def __enter__(self) -> "SimpleContextManager":
59+
"""
60+
Provides 'with' statement compatibility. No lock is needed for this
61+
simple, single-threaded context manager.
62+
"""
63+
return self
64+
65+
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
66+
"""
67+
Provides 'with' statement compatibility. No lock is needed for this
68+
simple, single-threaded context manager.
69+
"""
70+
pass
71+
72+
def __getitem__(self, key: str) -> Any:
73+
return self._context[key]
74+
75+
def __setitem__(self, key: str, value: Any) -> None:
76+
self._context[key] = value
77+
78+
def __delitem__(self, key: str) -> None:
79+
del self._context[key]
80+
81+
def __iter__(self) -> Iterator[str]:
82+
return iter(self._context)
83+
84+
def __len__(self) -> int:
85+
return len(self._context)
86+
87+
def shutdown(self) -> None:
88+
"""No-op for the simple context manager."""
89+
pass
90+
91+
def to_dict(self) -> dict[str, Any]:
92+
"""
93+
Returns a copy of the entire context as a standard Python dictionary.
94+
95+
This operation is performed atomically to ensure consistency.
96+
"""
97+
return self._context

laygo/context/types.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""
2+
Defines the abstract base classes for context management in Laygo.
3+
4+
This module provides the core interfaces (IContextHandle and IContextManager)
5+
that all context managers must implement, ensuring a consistent API for
6+
state management across different execution environments (simple, threaded, parallel).
7+
"""
8+
9+
from abc import ABC
10+
from abc import abstractmethod
11+
from collections.abc import MutableMapping
12+
from typing import Any
13+
14+
15+
class IContextHandle(ABC):
16+
"""
17+
An abstract base class for a picklable handle to a context manager.
18+
19+
A handle contains the necessary information for a worker process to
20+
reconstruct a connection (a proxy) to the shared context.
21+
"""
22+
23+
@abstractmethod
24+
def create_proxy(self) -> "IContextManager":
25+
"""
26+
Creates the appropriate context proxy instance from the handle's data.
27+
28+
This method is called within a worker process to establish its own
29+
connection to the shared state.
30+
31+
Returns:
32+
An instance of an IContextManager proxy.
33+
"""
34+
raise NotImplementedError
35+
36+
37+
class IContextManager(MutableMapping[str, Any], ABC):
38+
"""
39+
Abstract base class for managing shared state (context) in a pipeline.
40+
41+
This class defines the contract for all context managers, ensuring they
42+
provide a dictionary-like interface for state manipulation by inheriting
43+
from `collections.abc.MutableMapping`. It also includes methods for
44+
distribution (get_handle), resource management (shutdown), and context
45+
management (__enter__, __exit__).
46+
"""
47+
48+
@abstractmethod
49+
def get_handle(self) -> IContextHandle:
50+
"""
51+
Returns a picklable handle for connecting from a worker process.
52+
53+
This handle is serialized and sent to distributed workers, which then
54+
use it to create a proxy to the shared context.
55+
56+
Returns:
57+
A picklable IContextHandle instance.
58+
"""
59+
raise NotImplementedError
60+
61+
@abstractmethod
62+
def shutdown(self) -> None:
63+
"""
64+
Performs final synchronization and cleans up any resources.
65+
66+
This method is responsible for releasing connections, shutting down
67+
background processes, or any other cleanup required by the manager.
68+
"""
69+
raise NotImplementedError
70+
71+
def __enter__(self) -> "IContextManager":
72+
"""
73+
Enters the runtime context related to this object.
74+
75+
Returns:
76+
The context manager instance itself.
77+
"""
78+
return self
79+
80+
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
81+
"""
82+
Exits the runtime context and performs cleanup.
83+
84+
Args:
85+
exc_type: The exception type, if an exception was raised.
86+
exc_val: The exception instance, if an exception was raised.
87+
exc_tb: The traceback object, if an exception was raised.
88+
"""
89+
self.shutdown()
90+
91+
def to_dict(self) -> dict[str, Any]:
92+
"""
93+
Returns a copy of the entire shared context as a standard
94+
Python dictionary.
95+
96+
This operation is performed atomically using a lock to ensure a
97+
consistent snapshot of the context is returned.
98+
99+
Returns:
100+
A standard dict containing a copy of the shared context.
101+
"""
102+
# The dict() constructor iterates over the proxy and copies its items.
103+
# The lock ensures this happens atomically without race conditions.
104+
raise NotImplementedError

laygo/errors.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
from collections.abc import Callable
22

3-
from laygo.helpers import PipelineContext
3+
from laygo.context.types import IContextManager
44

5-
ChunkErrorHandler = Callable[[list, Exception, PipelineContext], None]
5+
ChunkErrorHandler = Callable[[list, Exception, IContextManager], None]
66

77

8-
def raise_error(chunk: list, error: Exception, context: PipelineContext) -> None:
8+
def raise_error(chunk: list, error: Exception, context: IContextManager) -> None:
99
"""Handler that always re-raises the error, stopping execution.
1010
1111
This is a default error handler that provides fail-fast behavior by
@@ -47,7 +47,7 @@ def on_error(self, handler: ChunkErrorHandler) -> "ErrorHandler":
4747
self._handlers.insert(0, handler)
4848
return self
4949

50-
def handle(self, chunk: list, error: Exception, context: PipelineContext) -> None:
50+
def handle(self, chunk: list, error: Exception, context: IContextManager) -> None:
5151
"""Execute all handlers in the chain.
5252
5353
Handlers are executed in reverse order of addition. Execution stops

0 commit comments

Comments
 (0)