Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
cf1e660
AtomicLong for asyncio
yuce Jun 2, 2026
5a7560e
Merge branch 'master' into asyncio-atomic-long
yuce Jun 2, 2026
c493901
AtomicReference for Asyncio
yuce Jun 2, 2026
4de45d1
Added CountdownLatch for Asyncio
yuce Jun 3, 2026
aac1fc3
black
yuce Jun 3, 2026
d3b6199
CountdownLatch in public API
yuce Jun 3, 2026
09d99a0
Merge branch 'master' into asyncio-atomic-long
yuce Jun 3, 2026
b0401e2
Merge branch 'asyncio-atomic-long' into asyncio-atomic-ref
yuce Jun 3, 2026
9019094
AtomicReference in public API
yuce Jun 3, 2026
d318598
Merge branch 'asyncio-atomic-ref' into asyncio-countdown-latch
yuce Jun 3, 2026
6c6b6c8
black
yuce Jun 3, 2026
18a4c4d
review comment
yuce Jun 3, 2026
1a40724
Merge branch 'asyncio-atomic-long' into asyncio-atomic-ref
yuce Jun 3, 2026
166d0e6
Merge branch 'asyncio-atomic-ref' into asyncio-countdown-latch
yuce Jun 3, 2026
9e2d0ea
update
yuce Jun 3, 2026
ba5110b
Merge branch 'asyncio-atomic-ref' into asyncio-countdown-latch
yuce Jun 3, 2026
4a9126a
Ported Semaphore to asyncio
yuce Jun 5, 2026
29a6e29
Merge branch 'master' into asyncio-semaphore
yuce Jun 5, 2026
8cd8554
Review comments
yuce Jun 5, 2026
520a38f
Portede the FencedLock proxy to asyncio
yuce Jun 8, 2026
5aa1fcd
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 8, 2026
a293fd9
Trivial
yuce Jun 8, 2026
34dfdf7
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 8, 2026
077a800
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 11, 2026
e169bfe
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 16, 2026
6af970d
Updated FencedLock with task ID
yuce Jun 16, 2026
3facd36
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 17, 2026
4392c16
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 17, 2026
8453658
Merge branch 'master' into asyncio-fenced-lock
yuce Jun 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hazelcast/asyncio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"EntryEventCallable",
"FlakeIdGenerator",
"Executor",
"FencedLock",
"HazelcastClient",
"List",
"Map",
Expand Down Expand Up @@ -39,3 +40,4 @@
from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference
from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch
from hazelcast.internal.asyncio_proxy.semaphore import Semaphore
from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock
40 changes: 40 additions & 0 deletions hazelcast/internal/asyncio_proxy/cp_manager.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import asyncio

from hazelcast.cp import (
_without_default_group_name,
_get_object_name_for_proxy,
ATOMIC_LONG_SERVICE,
ATOMIC_REFERENCE_SERVICE,
COUNT_DOWN_LATCH_SERVICE,
SEMAPHORE_SERVICE,
LOCK_SERVICE,
)
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong
from hazelcast.internal.asyncio_proxy.atomic_reference import AtomicReference
from hazelcast.internal.asyncio_proxy.countdown_latch import CountDownLatch
from hazelcast.internal.asyncio_proxy.fenced_lock import FencedLock
from hazelcast.internal.asyncio_proxy.semaphore import (
Semaphore,
SessionAwareSemaphore,
Expand Down Expand Up @@ -109,6 +113,25 @@ async def get_count_down_latch(self, name: str) -> CountDownLatch:
"""
return await self._proxy_manager.get_or_create(COUNT_DOWN_LATCH_SERVICE, name)

async def get_lock(self, name: str) -> FencedLock:
"""Returns the distributed FencedLock instance with given name.

The instance is created on CP Subsystem.

If no group name is given within the ``name`` argument, then the
FencedLock instance will be created on the DEFAULT CP group.
If a group name is given, like ``.get_lock("myLock@group1")``,
the given group will be initialized first, if not initialized
already, and then the instance will be created on this group.

Args:
name: Name of the FencedLock

Returns:
The FencedLock proxy for the given name.
"""
return await self._proxy_manager.get_or_create(LOCK_SERVICE, name)

async def get_semaphore(self, name: str) -> Semaphore:
"""Returns the distributed Semaphore instance with given name.

Expand All @@ -132,6 +155,8 @@ async def get_semaphore(self, name: str) -> Semaphore:
class CPProxyManager:
def __init__(self, context):
self._context = context
self._lock_proxies = dict() # proxy_name to FencedLock
self._mux = asyncio.Lock() # Guards the _lock_proxies

async def get_or_create(self, service_name, proxy_name):
proxy_name = _without_default_group_name(proxy_name)
Expand All @@ -144,11 +169,26 @@ async def get_or_create(self, service_name, proxy_name):
return AtomicReference(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == COUNT_DOWN_LATCH_SERVICE:
return CountDownLatch(self._context, group_id, service_name, proxy_name, object_name)
elif service_name == LOCK_SERVICE:
return await self._create_fenced_lock(group_id, proxy_name, object_name)
elif service_name == SEMAPHORE_SERVICE:
return await self._create_semaphore(group_id, proxy_name, object_name)

raise ValueError("Unknown service name: %s" % service_name)

async def _create_fenced_lock(self, group_id, proxy_name, object_name):
async with self._mux:
proxy = self._lock_proxies.get(proxy_name, None)
if proxy:
if proxy.get_group_id() != group_id:
self._lock_proxies.pop(proxy_name, None)
else:
return proxy

proxy = FencedLock(self._context, group_id, LOCK_SERVICE, proxy_name, object_name)
self._lock_proxies[proxy_name] = proxy
return proxy

async def _create_semaphore(self, group_id, proxy_name, object_name):
codec = semaphore_get_semaphore_type_codec
request = codec.encode_request(proxy_name)
Expand Down
257 changes: 257 additions & 0 deletions hazelcast/internal/asyncio_proxy/fenced_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
import time
import uuid

from hazelcast.errors import (
LockOwnershipLostError,
IllegalMonitorStateError,
SessionExpiredError,
WaitKeyCancelledError,
LockAcquireLimitReachedError,
)
from hazelcast.internal.asyncio_proxy.base import task_id
from hazelcast.internal.asyncio_proxy.cp import SessionAwareCPProxy
from hazelcast.protocol.codec import (
fenced_lock_unlock_codec,
fenced_lock_get_lock_ownership_codec,
fenced_lock_try_lock_codec,
fenced_lock_lock_codec,
)
from hazelcast.proxy.cp.fenced_lock import _LockOwnershipState
from hazelcast.util import to_millis

_NO_SESSION_ID = -1


class FencedLock(SessionAwareCPProxy):
"""A linearizable, distributed lock.

FencedLock is CP with respect to the CAP principle. It works on top
of the Raft consensus algorithm. It offers linearizability during crash-stop
failures and network partitions. If a network partition occurs, it remains
available on at most one side of the partition.

FencedLock works on top of CP sessions. Please refer to CP Session
documentation section for more information.

By default, FencedLock is reentrant. Once a caller acquires
the lock, it can acquire the lock reentrantly as many times as it wants
in a linearizable manner. You can configure the reentrancy behaviour
on the member side. For instance, reentrancy can be disabled and
FencedLock can work as a non-reentrant mutex. One can also set
a custom reentrancy limit. When the reentrancy limit is reached,
FencedLock does not block a lock call. Instead, it fails with
``LockAcquireLimitReachedError`` or a specified return value.
Please check the locking methods to see details about the behaviour.
"""

INVALID_FENCE = 0

def __init__(self, context, group_id, service_name, proxy_name, object_name):
super(FencedLock, self).__init__(context, group_id, service_name, proxy_name, object_name)
self._lock_session_ids = dict() # thread-id to session id that has acquired the lock

async def lock(self) -> int:
invocation_uuid = uuid.uuid4()
return await self._do_lock(task_id(), invocation_uuid)

async def try_lock(self, timeout: float = 0) -> int:
invocation_uuid = uuid.uuid4()
timeout = max(0.0, timeout)
return await self._do_try_lock(task_id(), invocation_uuid, timeout)

async def unlock(self) -> None:
current_thread_id = task_id()
session_id = self._get_session_id()

# the order of the following checks is important
self._verify_locked_session_id_if_present(current_thread_id, session_id, False)
if session_id == _NO_SESSION_ID:
self._lock_session_ids.pop(current_thread_id, None)
raise self._new_illegal_monitor_state_error()

try:
still_locked_by_the_current_thread = await self._request_unlock(
session_id, current_thread_id, uuid.uuid4()
)
if still_locked_by_the_current_thread:
self._lock_session_ids[current_thread_id] = session_id
else:
self._lock_session_ids.pop(current_thread_id, None)

self._release_session(session_id)
except SessionExpiredError:
self._invalidate_session(session_id)
self._lock_session_ids.pop(current_thread_id, None)
raise self._new_lock_ownership_lost_error(session_id)
except IllegalMonitorStateError as e:
self._lock_session_ids.pop(current_thread_id, None)
raise e

async def is_locked(self) -> bool:
current_thread_id = task_id()
session_id = self._get_session_id()
self._verify_locked_session_id_if_present(current_thread_id, session_id, False)
f = await self._request_get_lock_ownership_state()
state = _LockOwnershipState(f)
if state.is_locked_by(session_id, current_thread_id):
self._lock_session_ids[current_thread_id] = session_id
return True

self._verify_no_locked_session_id_present(current_thread_id)
return state.is_locked()

async def is_locked_by_current_task(self) -> bool:
current_thread_id = task_id()
session_id = self._get_session_id()
self._verify_locked_session_id_if_present(current_thread_id, session_id, False)
f = await self._request_get_lock_ownership_state()
state = _LockOwnershipState(f)
locked_by_the_current_thread = state.is_locked_by(session_id, current_thread_id)
if locked_by_the_current_thread:
self._lock_session_ids[current_thread_id] = session_id
else:
self._verify_no_locked_session_id_present(current_thread_id)

return locked_by_the_current_thread

async def get_lock_count(self) -> int:
current_thread_id = task_id()
session_id = self._get_session_id()
self._verify_locked_session_id_if_present(current_thread_id, session_id, False)
f = await self._request_get_lock_ownership_state()
state = _LockOwnershipState(f)
if state.is_locked_by(session_id, current_thread_id):
self._lock_session_ids[current_thread_id] = session_id
else:
self._verify_no_locked_session_id_present(current_thread_id)

return state.lock_count

async def destroy(self) -> None:
self._lock_session_ids.clear()
return await super(FencedLock, self).destroy()

async def _do_lock(self, current_thread_id, invocation_uuid):
async def do_lock_once(session_id):
self._verify_locked_session_id_if_present(current_thread_id, session_id, True)

try:
fence = await self._request_lock(session_id, current_thread_id, invocation_uuid)
except SessionExpiredError:
self._invalidate_session(session_id)
self._verify_no_locked_session_id_present(current_thread_id)
return await self._do_lock(current_thread_id, invocation_uuid)
except WaitKeyCancelledError:
self._release_session(session_id)
raise IllegalMonitorStateError(
"Lock(%s) not acquired because the lock call on the CP group "
"is cancelled, possibly because of another indeterminate call "
"from the same task." % self._object_name
)
except Exception as e:
self._release_session(session_id)
raise e

if fence != self.INVALID_FENCE:
self._lock_session_ids[current_thread_id] = session_id
return fence

self._release_session(session_id)
raise LockAcquireLimitReachedError(
"Lock(%s) reentrant lock limit is already reached!" % self._object_name
)

session_id = await self._acquire_session()
return await do_lock_once(session_id)

async def _do_try_lock(self, current_thread_id, invocation_uuid, timeout):
start = time.time()

async def do_try_lock_once(session_id):
self._verify_locked_session_id_if_present(current_thread_id, session_id, True)

try:
fence = await self._request_try_lock(
session_id, current_thread_id, invocation_uuid, timeout
)
except SessionExpiredError:
self._invalidate_session(session_id)
self._verify_no_locked_session_id_present(current_thread_id)
remaining_timeout = timeout - (time.time() - start)
if remaining_timeout <= 0:
return self.INVALID_FENCE
return await self._do_try_lock(
current_thread_id, invocation_uuid, remaining_timeout
)
except WaitKeyCancelledError:
self._release_session(session_id)
return self.INVALID_FENCE
except Exception as e:
self._release_session(session_id)
raise e

if fence != self.INVALID_FENCE:
self._lock_session_ids[current_thread_id] = session_id
else:
self._release_session(session_id)

return fence

session_id = await self._acquire_session()
return await do_try_lock_once(session_id)

def _verify_locked_session_id_if_present(self, current_thread_id, session_id, release_session):
lock_session_id = self._lock_session_ids.get(current_thread_id, None)
if lock_session_id and lock_session_id != session_id:
self._lock_session_ids.pop(current_thread_id, None)
if release_session:
self._release_session(session_id)

raise self._new_lock_ownership_lost_error(lock_session_id)

def _verify_no_locked_session_id_present(self, current_thread_id):
lock_session_id = self._lock_session_ids.pop(current_thread_id, None)
if lock_session_id:
raise self._new_lock_ownership_lost_error(lock_session_id)

def _new_lock_ownership_lost_error(self, lock_session_id):
return LockOwnershipLostError(
"Current task is not the owner of the Lock(%s) because its "
"Session(%s) is closed by the server." % (self._proxy_name, lock_session_id)
)

def _new_illegal_monitor_state_error(self):
return IllegalMonitorStateError(
"Current task is not the owner of the Lock(%s)" % self._proxy_name
)

async def _request_lock(self, session_id, current_thread_id, invocation_uuid):
codec = fenced_lock_lock_codec
request = codec.encode_request(
self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid
)
return await self._ainvoke(request, codec.decode_response)

async def _request_try_lock(self, session_id, current_thread_id, invocation_uuid, timeout):
codec = fenced_lock_try_lock_codec
request = codec.encode_request(
self._group_id,
self._object_name,
session_id,
current_thread_id,
invocation_uuid,
to_millis(timeout),
)
return await self._ainvoke(request, codec.decode_response)

async def _request_unlock(self, session_id, current_thread_id, invocation_uuid):
codec = fenced_lock_unlock_codec
request = codec.encode_request(
self._group_id, self._object_name, session_id, current_thread_id, invocation_uuid
)
return await self._ainvoke(request, codec.decode_response)

async def _request_get_lock_ownership_state(self):
codec = fenced_lock_get_lock_ownership_codec
request = codec.encode_request(self._group_id, self._object_name)
return await self._ainvoke(request, codec.decode_response)
Loading
Loading