Skip to content

Commit

Permalink
Revert "docs: add types to recipe.lock"
Browse files Browse the repository at this point in the history
This reverts commit 914f68c.
  • Loading branch information
a-ungurianu committed Jan 28, 2023
1 parent 87937a3 commit 1ee35e3
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 98 deletions.
2 changes: 1 addition & 1 deletion kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,7 @@ def create(
sequence=False,
makepath=False,
include_data=False,
) -> str:
):
"""Create a node with the given value as its data. Optionally
set an ACL on the node.
Expand Down
3 changes: 1 addition & 2 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
"""Kazoo State and Event objects"""
from collections import namedtuple
from enum import Enum


class KazooState(str, Enum):
class KazooState(object):
"""High level connection state values
States inspired by Netflix Curator.
Expand Down
138 changes: 44 additions & 94 deletions kazoo/recipe/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
and/or the lease has been lost.
"""
from __future__ import annotations

import re
import time
from typing import TYPE_CHECKING, cast, ContextManager
import uuid

from kazoo.exceptions import (
Expand All @@ -34,37 +31,24 @@
RetryFailedError,
)

if TYPE_CHECKING:
from kazoo.client import KazooClient
from kazoo.protocol.states import WatchedEvent
from typing import (
Any,
List,
Optional,
Sequence,
Type,
Union,
)
from typing_extensions import Literal


class _Watch(object):
def __init__(self, duration: Optional[float] = None):
def __init__(self, duration=None):
self.duration = duration
self.started_at: Optional[float] = None
self.started_at = None

def start(self) -> None:
def start(self):
self.started_at = time.monotonic()

def leftover(self) -> Optional[float]:
def leftover(self):
if self.duration is None:
return None
else:
elapsed = time.monotonic() - cast(float, self.started_at)
elapsed = time.monotonic() - self.started_at
return max(0, self.duration - elapsed)


class Lock(ContextManager[None]):
class Lock(object):
"""Kazoo Lock
Example usage with a :class:`~kazoo.client.KazooClient` instance:
Expand Down Expand Up @@ -93,13 +77,7 @@ class Lock(ContextManager[None]):
# sequence number. Involved in read/write locks.
_EXCLUDE_NAMES = ["__lock__"]

def __init__(
self,
client: KazooClient,
path: str,
identifier: Optional[str] = None,
extra_lock_patterns: Sequence[str] = (),
):
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
"""Create a Kazoo lock.
:param client: A :class:`~kazoo.client.KazooClient` instance.
Expand Down Expand Up @@ -131,7 +109,7 @@ def __init__(
# some data is written to the node. this can be queried via
# contenders() to see who is contending for the lock
self.data = str(identifier or "").encode("utf-8")
self.node: Optional[str] = None
self.node = None

self.wake_event = client.handler.event_object()

Expand All @@ -147,25 +125,20 @@ def __init__(
self.assured_path = False
self.cancelled = False
self._retry = KazooRetry(
max_tries=-1, sleep_func=client.handler.sleep_func
max_tries=None, sleep_func=client.handler.sleep_func
)
self._acquire_method_lock = client.handler.lock_object()

def _ensure_path(self) -> None:
def _ensure_path(self):
self.client.ensure_path(self.path)
self.assured_path = True

def cancel(self) -> None:
def cancel(self):
"""Cancel a pending lock acquire."""
self.cancelled = True
self.wake_event.set()

def acquire(
self,
blocking: bool = True,
timeout: Optional[float] = None,
ephemeral: bool = True,
) -> bool:
def acquire(self, blocking=True, timeout=None, ephemeral=True):
"""
Acquire the lock. By defaults blocks and waits forever.
Expand Down Expand Up @@ -231,13 +204,11 @@ def acquire(
finally:
self._acquire_method_lock.release()

def _watch_session(self, state: KazooState) -> bool:
def _watch_session(self, state):
self.wake_event.set()
return True

def _inner_acquire(
self, blocking: bool, timeout: Optional[float], ephemeral: bool = True
) -> bool:
def _inner_acquire(self, blocking, timeout, ephemeral=True):

# wait until it's our chance to get it..
if self.is_acquired:
Expand Down Expand Up @@ -295,10 +266,10 @@ def _inner_acquire(
finally:
self.client.remove_listener(self._watch_session)

def _watch_predecessor(self, event: WatchedEvent) -> None:
def _watch_predecessor(self, event):
self.wake_event.set()

def _get_predecessor(self, node: str) -> Optional[str]:
def _get_predecessor(self, node):
"""returns `node`'s predecessor or None
Note: This handle the case where the current lock is not a contender
Expand All @@ -307,7 +278,7 @@ def _get_predecessor(self, node: str) -> Optional[str]:
"""
node_sequence = node[len(self.prefix) :]
children = self.client.get_children(self.path)
found_self: Union[Literal[False], re.Match[bytes]] = False
found_self = False
# Filter out the contenders using the computed regex
contender_matches = []
for child in children:
Expand All @@ -322,7 +293,7 @@ def _get_predecessor(self, node: str) -> Optional[str]:
if child == node:
# Remember the node's match object so we can short circuit
# below.
found_self = cast(re.Match[bytes], match)
found_self = match

if found_self is False: # pragma: nocover
# somehow we aren't in the childrens -- probably we are
Expand All @@ -338,42 +309,42 @@ def _get_predecessor(self, node: str) -> Optional[str]:
sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
return sorted_matches[-1].string

def _find_node(self) -> Optional[str]:
def _find_node(self):
children = self.client.get_children(self.path)
for child in children:
if child.startswith(self.prefix):
return child
return None

def _delete_node(self, node: str) -> None:
def _delete_node(self, node):
self.client.delete(self.path + "/" + node)

def _best_effort_cleanup(self) -> None:
def _best_effort_cleanup(self):
try:
node = self.node or self._find_node()
if node:
self._delete_node(node)
except KazooException: # pragma: nocover
pass

def release(self) -> bool:
def release(self):
"""Release the lock immediately."""
return self.client.retry(self._inner_release)

def _inner_release(self) -> bool:
def _inner_release(self):
if not self.is_acquired:
return False

try:
self._delete_node(cast(str, self.node))
self._delete_node(self.node)
except NoNodeError: # pragma: nocover
pass

self.is_acquired = False
self.node = None
return True

def contenders(self) -> List[str]:
def contenders(self):
"""Return an ordered list of the current contenders for the
lock.
Expand Down Expand Up @@ -409,7 +380,7 @@ def contenders(self) -> List[str]:
for match in sorted(contender_matches, key=lambda m: m.groups())
]
# Retrieve all the contender nodes data (preserving order).
contenders: List[str] = []
contenders = []
for node in contender_nodes:
try:
data, stat = self.client.get(self.path + "/" + node)
Expand All @@ -420,15 +391,10 @@ def contenders(self) -> List[str]:

return contenders

def __enter__(self) -> None:
def __enter__(self):
self.acquire()

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Any,
) -> None:
def __exit__(self, exc_type, exc_value, traceback):
self.release()


Expand Down Expand Up @@ -492,7 +458,7 @@ class ReadLock(Lock):
_EXCLUDE_NAMES = ["__lock__"]


class Semaphore(ContextManager[None]):
class Semaphore(object):
"""A Zookeeper-based Semaphore
This synchronization primitive operates in the same manner as the
Expand Down Expand Up @@ -527,13 +493,7 @@ class Semaphore(ContextManager[None]):
"""

def __init__(
self,
client: KazooClient,
path: str,
identifier: Optional[str] = None,
max_leases: int = 1,
):
def __init__(self, client, path, identifier=None, max_leases=1):
"""Create a Kazoo Lock
:param client: A :class:`~kazoo.client.KazooClient` instance.
Expand Down Expand Up @@ -569,7 +529,7 @@ def __init__(
self.cancelled = False
self._session_expired = False

def _ensure_path(self) -> None:
def _ensure_path(self):
result = self.client.ensure_path(self.path)
self.assured_path = True
if result is True:
Expand All @@ -590,14 +550,12 @@ def _ensure_path(self) -> None:
else:
self.client.set(self.path, str(self.max_leases).encode("utf-8"))

def cancel(self) -> None:
def cancel(self):
"""Cancel a pending semaphore acquire."""
self.cancelled = True
self.wake_event.set()

def acquire(
self, blocking: bool = True, timeout: Optional[float] = None
) -> bool:
def acquire(self, blocking=True, timeout=None):
"""Acquire the semaphore. By defaults blocks and waits forever.
:param blocking: Block until semaphore is obtained or
Expand Down Expand Up @@ -635,9 +593,7 @@ def acquire(

return self.is_acquired

def _inner_acquire(
self, blocking: bool, timeout: Optional[float] = None
) -> bool:
def _inner_acquire(self, blocking, timeout=None):
"""Inner loop that runs from the top anytime a command hits a
retryable Zookeeper exception."""
self._session_expired = False
Expand Down Expand Up @@ -678,10 +634,10 @@ def _inner_acquire(
finally:
lock.release()

def _watch_lease_change(self, event: WatchedEvent) -> None:
def _watch_lease_change(self, event):
self.wake_event.set()

def _get_lease(self) -> bool:
def _get_lease(self, data=None):
# Make sure the session is still valid
if self._session_expired:
raise ForceRetryError("Retry on session loss at top")
Expand Down Expand Up @@ -710,26 +666,25 @@ def _get_lease(self) -> bool:
# Return current state
return self.is_acquired

def _watch_session(self, state: KazooState) -> Optional[Literal[True]]:
def _watch_session(self, state):
if state == KazooState.LOST:
self._session_expired = True
self.wake_event.set()

# Return true to de-register
return True
return None

def _best_effort_cleanup(self) -> None:
def _best_effort_cleanup(self):
try:
self.client.delete(self.create_path)
except KazooException: # pragma: nocover
pass

def release(self) -> bool:
def release(self):
"""Release the lease immediately."""
return self.client.retry(self._inner_release)

def _inner_release(self) -> bool:
def _inner_release(self):
if not self.is_acquired:
return False
try:
Expand All @@ -739,7 +694,7 @@ def _inner_release(self) -> bool:
self.is_acquired = False
return True

def lease_holders(self) -> List[str]:
def lease_holders(self):
"""Return an unordered list of the current lease holders.
.. note::
Expand All @@ -753,7 +708,7 @@ def lease_holders(self) -> List[str]:

children = self.client.get_children(self.path)

lease_holders: List[str] = []
lease_holders = []
for child in children:
try:
data, stat = self.client.get(self.path + "/" + child)
Expand All @@ -762,13 +717,8 @@ def lease_holders(self) -> List[str]:
pass
return lease_holders

def __enter__(self) -> None:
def __enter__(self):
self.acquire()

def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Any,
) -> None:
def __exit__(self, exc_type, exc_value, traceback):
self.release()
Loading

0 comments on commit 1ee35e3

Please sign in to comment.