-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtypes.py
More file actions
104 lines (81 loc) · 3.21 KB
/
types.py
File metadata and controls
104 lines (81 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
"""
Defines the abstract base classes for context management in Laygo.
This module provides the core interfaces (IContextHandle and IContextManager)
that all context managers must implement, ensuring a consistent API for
state management across different execution environments (simple, threaded, parallel).
"""
from abc import ABC
from abc import abstractmethod
from collections.abc import MutableMapping
from typing import Any
class IContextHandle(ABC):
"""
An abstract base class for a picklable handle to a context manager.
A handle contains the necessary information for a worker process to
reconstruct a connection (a proxy) to the shared context.
"""
@abstractmethod
def create_proxy(self) -> "IContextManager":
"""
Creates the appropriate context proxy instance from the handle's data.
This method is called within a worker process to establish its own
connection to the shared state.
Returns:
An instance of an IContextManager proxy.
"""
raise NotImplementedError
class IContextManager(MutableMapping[str, Any], ABC):
"""
Abstract base class for managing shared state (context) in a pipeline.
This class defines the contract for all context managers, ensuring they
provide a dictionary-like interface for state manipulation by inheriting
from `collections.abc.MutableMapping`. It also includes methods for
distribution (get_handle), resource management (shutdown), and context
management (__enter__, __exit__).
"""
@abstractmethod
def get_handle(self) -> IContextHandle:
"""
Returns a picklable handle for connecting from a worker process.
This handle is serialized and sent to distributed workers, which then
use it to create a proxy to the shared context.
Returns:
A picklable IContextHandle instance.
"""
raise NotImplementedError
@abstractmethod
def shutdown(self) -> None:
"""
Performs final synchronization and cleans up any resources.
This method is responsible for releasing connections, shutting down
background processes, or any other cleanup required by the manager.
"""
raise NotImplementedError
def __enter__(self) -> "IContextManager":
"""
Enters the runtime context related to this object.
Returns:
The context manager instance itself.
"""
return self
def __exit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
"""
Exits the runtime context and performs cleanup.
Args:
exc_type: The exception type, if an exception was raised.
exc_val: The exception instance, if an exception was raised.
exc_tb: The traceback object, if an exception was raised.
"""
self.shutdown()
def to_dict(self) -> dict[str, Any]:
"""
Returns a copy of the entire shared context as a standard
Python dictionary.
This operation is performed atomically using a lock to ensure a
consistent snapshot of the context is returned.
Returns:
A standard dict containing a copy of the shared context.
"""
# The dict() constructor iterates over the proxy and copies its items.
# The lock ensures this happens atomically without race conditions.
raise NotImplementedError