Skip to content

Commit

Permalink
Add support for removeWatches
Browse files Browse the repository at this point in the history
Signed-off-by: Raul Gutierrez S <rgs@itevenworks.net>
  • Loading branch information
rgs1 committed Apr 9, 2014
1 parent 1576ccf commit 44ca48e
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 1 deletion.
42 changes: 42 additions & 0 deletions kazoo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Exists,
GetChildren,
GetChildren2,
RemoveWatches,
GetACL,
SetACL,
GetData,
Expand Down Expand Up @@ -1275,6 +1276,47 @@ def _delete_recursive(self, path):
except NoNodeError: # pragma: nocover
pass

def remove_watches(self, path, wtype):
"""Remove watches for a given path.
This call will remove the watch (or watches) set for path and
unregister all the related watchers.
If wtype is WatcherType.ANY, then both the watches for CHILDREN
and DATA events will be removed.
:param path: Path for which the watcher(s) will be removed.
:param wtype: The type of watcher to remove.
:type wtype: :class:`~kazoo.states.WatcherType`
:raises:
:exc:`~kazoo.exceptions.NoWatcherError` if the watcher doesn't
exist.
:exc:`ConnectionLoss` if there is no connection open.
:exc:`~kazoo.exceptions.ZookeeperError` if the server
returns a non-zero error code.
"""
return self.remove_watches_async(path, wtype).get()

def remove_watches_async(self, path, wtype):
"""Asynchronously remove watches for a path. Takes the same arguments as
:meth:`remove_watches`.
:rtype: :class:`~kazoo.interfaces.IAsyncResult`
"""
if not isinstance(path, basestring):
raise TypeError("path must be a string")
if not isinstance(wtype, int):
raise TypeError("wtype must be an int")
async_result = self.handler.async_result()
self._call(RemoveWatches(_prefix_root(self.chroot, path), wtype),
async_result)
return async_result


class TransactionRequest(object):
"""A Zookeeper Transaction Request
Expand Down
8 changes: 8 additions & 0 deletions kazoo/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ class NotReadOnlyCallError(ZookeeperError):
a read-only server"""


@_zookeeper_exception(-123)
class NoWatcherError(ZookeeperError):
"""There was no watch for the given path
.. versionadded:: 1.4
"""


class ConnectionClosedError(SessionExpiredError):
"""Connection is closed"""

Expand Down
14 changes: 13 additions & 1 deletion kazoo/protocol/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
GetChildren,
Ping,
PingInstance,
RemoveWatches,
ReplyHeader,
Transaction,
Watch,
Expand All @@ -35,6 +36,7 @@
Callback,
KeeperState,
WatchedEvent,
WatcherType,
EVENT_TYPE_MAP,
)
from kazoo.retry import (
Expand Down Expand Up @@ -370,13 +372,23 @@ def _read_response(self, header, buffer, offset):

async_object.set(response)

# Determine if watchers should be registered
# Determine if watchers should be registered/unregistered
watcher = getattr(request, 'watcher', None)
if not client._stopped.is_set() and watcher:
if isinstance(request, GetChildren):
client._child_watchers[request.path].add(watcher)
else:
client._data_watchers[request.path].add(watcher)
elif isinstance(request, RemoveWatches):
print header
print "Removing watches for %s" % (request.path)
if request.wtype == WatcherType.CHILDREN:
client._child_watchers[request.path].clear()
elif request.wtype == WatcherType.DATA:
client._data_watchers[request.path].clear()
elif request.wtype == WatcherType.ANY:
client._child_watchers[request.path].clear()
client._data_watchers[request.path].clear()

if isinstance(request, Close):
self.logger.log(BLATHER, 'Read close response')
Expand Down
11 changes: 11 additions & 0 deletions kazoo/protocol/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,14 @@ def deserialize(cls, bytes, offset):
t, done, err = multiheader_struct.unpack_from(bytes, offset)
offset += multiheader_struct.size
return cls(t, done is 1, err), offset


class RemoveWatches(namedtuple('Auth', 'path wtype')):
type = 17

def serialize(self):
return (write_string(self.path) + int_struct.pack(self.wtype))

@classmethod
def deserialize(self, bytes, offset):
return True
23 changes: 23 additions & 0 deletions kazoo/protocol/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,26 @@ def data_length(self):
@property
def children_count(self):
return self.numChildren


class WatcherType(object):
"""Zookeeper Watcher Type
Represents the type of watcher, used when removing watchers.
.. attribute:: CHILDREN
Watcher for children related watches.
.. attribute:: DATA
Watcher for data related watches.
.. attribute:: ANY
Any type of watcher (CHILDREN and/or DATA).
"""
CHILDREN = 1
DATA = 2
ANY = 3
12 changes: 12 additions & 0 deletions kazoo/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,18 @@ def test_update_host_list(self):
finally:
self.cluster[0].run()

def test_remove_watches(self):
from kazoo.protocol.states import WatcherType

def watcher(event):
pass

self.client.create("/watches", b'first')

self.client.get("/watches", watcher)
rc = self.client.remove_watches("/watchesx", WatcherType.DATA)
self.assertTrue(rc)


dummy_dict = {
'aversion': 1, 'ctime': 0, 'cversion': 1,
Expand Down

0 comments on commit 44ca48e

Please sign in to comment.