|
| 1 | +""" |
| 2 | +A context manager for parallel and distributed processing using |
| 3 | +multiprocessing.Manager to share state across processes. |
| 4 | +""" |
| 5 | + |
| 6 | +from collections.abc import Iterator |
| 7 | +import multiprocessing as mp |
| 8 | +from multiprocessing.managers import BaseManager |
| 9 | +from multiprocessing.managers import DictProxy |
| 10 | +from multiprocessing.synchronize import Lock |
| 11 | +from typing import Any |
| 12 | + |
| 13 | +from laygo.context.types import IContextHandle |
| 14 | +from laygo.context.types import IContextManager |
| 15 | + |
| 16 | + |
| 17 | +class _ParallelStateManager(BaseManager): |
| 18 | + """A custom manager to expose a shared dictionary and lock.""" |
| 19 | + |
| 20 | + pass |
| 21 | + |
| 22 | + |
| 23 | +class ParallelContextHandle(IContextHandle): |
| 24 | + """ |
| 25 | + A lightweight, picklable "blueprint" for recreating a connection to the |
| 26 | + shared context in a different process. |
| 27 | + """ |
| 28 | + |
| 29 | + def __init__(self, address: tuple[str, int], manager_class: type["ParallelContextManager"]): |
| 30 | + self.address = address |
| 31 | + self.manager_class = manager_class |
| 32 | + |
| 33 | + def create_proxy(self) -> "IContextManager": |
| 34 | + """ |
| 35 | + Creates a new instance of the ParallelContextManager in "proxy" mode |
| 36 | + by initializing it with this handle. |
| 37 | + """ |
| 38 | + return self.manager_class(handle=self) |
| 39 | + |
| 40 | + |
| 41 | +class ParallelContextManager(IContextManager): |
| 42 | + """ |
| 43 | + A context manager that uses a background multiprocessing.Manager to enable |
| 44 | + state sharing across different processes. |
| 45 | +
|
| 46 | + This single class operates in two modes: |
| 47 | + 1. Server Mode (when created normally): It starts and manages the background |
| 48 | + server process that holds the shared state. |
| 49 | + 2. Proxy Mode (when created with a handle): It acts as a client, connecting |
| 50 | + to an existing server process to manipulate the shared state. |
| 51 | + """ |
| 52 | + |
| 53 | + def __init__(self, initial_context: dict[str, Any] | None = None, handle: ParallelContextHandle | None = None): |
| 54 | + """ |
| 55 | + Initializes the manager. If a handle is provided, it initializes in |
| 56 | + proxy mode; otherwise, it starts a new server. |
| 57 | + """ |
| 58 | + if handle: |
| 59 | + # --- PROXY MODE INITIALIZATION --- |
| 60 | + # This instance is a client connecting to an existing server. |
| 61 | + self._is_proxy = True |
| 62 | + self._manager_server = None # Proxies do not own the server process. |
| 63 | + |
| 64 | + manager = _ParallelStateManager(address=handle.address) |
| 65 | + manager.connect() |
| 66 | + self._manager = manager |
| 67 | + |
| 68 | + else: |
| 69 | + # --- SERVER MODE INITIALIZATION --- |
| 70 | + # This is the main instance that owns the server process. |
| 71 | + self._is_proxy = False |
| 72 | + manager = mp.Manager() # type: ignore |
| 73 | + _ParallelStateManager.register("get_dict", callable=lambda: manager.dict(initial_context or {})) |
| 74 | + _ParallelStateManager.register("get_lock", callable=lambda: manager.Lock()) |
| 75 | + |
| 76 | + self._manager_server = _ParallelStateManager(address=("", 0)) |
| 77 | + self._manager_server.start() |
| 78 | + self._manager = self._manager_server |
| 79 | + |
| 80 | + # Common setup for both modes |
| 81 | + self._shared_dict: DictProxy = self._manager.get_dict() # type: ignore |
| 82 | + self._lock: Lock = self._manager.get_lock() # type: ignore |
| 83 | + |
| 84 | + def get_handle(self) -> ParallelContextHandle: |
| 85 | + """ |
| 86 | + Returns a picklable handle for reconstruction in a worker. |
| 87 | + Only the main server instance can generate handles. |
| 88 | + """ |
| 89 | + if self._is_proxy or not self._manager_server: |
| 90 | + raise TypeError("Cannot get a handle from a proxy context instance.") |
| 91 | + |
| 92 | + return ParallelContextHandle( |
| 93 | + address=self._manager_server.address, # type: ignore |
| 94 | + manager_class=self.__class__, # Pass its own class for reconstruction |
| 95 | + ) |
| 96 | + |
| 97 | + def shutdown(self) -> None: |
| 98 | + """ |
| 99 | + Shuts down the background manager process. |
| 100 | + This is a no-op for proxy instances, as only the main instance |
| 101 | + should control the server's lifecycle. |
| 102 | + """ |
| 103 | + if not self._is_proxy and self._manager_server: |
| 104 | + self._manager_server.shutdown() |
| 105 | + |
| 106 | + def __enter__(self) -> "ParallelContextManager": |
| 107 | + """Acquires the lock for use in a 'with' statement.""" |
| 108 | + self._lock.acquire() |
| 109 | + return self |
| 110 | + |
| 111 | + def __exit__(self, exc_type, exc_val, exc_tb) -> None: |
| 112 | + """Releases the lock.""" |
| 113 | + self._lock.release() |
| 114 | + |
| 115 | + def __getitem__(self, key: str) -> Any: |
| 116 | + with self._lock: |
| 117 | + return self._shared_dict[key] |
| 118 | + |
| 119 | + def __setitem__(self, key: str, value: Any) -> None: |
| 120 | + with self._lock: |
| 121 | + self._shared_dict[key] = value |
| 122 | + |
| 123 | + def __delitem__(self, key: str) -> None: |
| 124 | + with self._lock: |
| 125 | + del self._shared_dict[key] |
| 126 | + |
| 127 | + def __iter__(self) -> Iterator[str]: |
| 128 | + with self._lock: |
| 129 | + return iter(list(self._shared_dict.keys())) |
| 130 | + |
| 131 | + def __len__(self) -> int: |
| 132 | + with self._lock: |
| 133 | + return len(self._shared_dict) |
0 commit comments