diff --git a/kazoo/client.py b/kazoo/client.py index 5bd12a87..369ef3a9 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -34,6 +34,7 @@ Exists, GetChildren, GetChildren2, + RemoveWatches, GetACL, SetACL, GetData, @@ -1275,6 +1276,47 @@ def _delete_recursive(self, path): except NoNodeError: # pragma: nocover pass + def remove_watches(self, path, wtype): + """Remove watches for a given path. + + This call will remove the watch (or watches) set for path and + unregister all the related watchers. + + If wtype is WatcherType.ANY, then both the watches for CHILDREN + and DATA events will be removed. + + :param path: Path for which the watcher(s) will be removed. + :param wtype: The type of watcher to remove. + :type wtype: :class:`~kazoo.states.WatcherType` + + :raises: + :exc:`~kazoo.exceptions.NoWatcherError` if the watcher doesn't + exist. + + :exc:`ConnectionLoss` if there is no connection open. + + :exc:`~kazoo.exceptions.ZookeeperError` if the server + returns a non-zero error code. + + """ + return self.remove_watches_async(path, wtype).get() + + def remove_watches_async(self, path, wtype): + """Asynchronously remove watches for a path. Takes the same arguments as + :meth:`remove_watches`. + + :rtype: :class:`~kazoo.interfaces.IAsyncResult` + + """ + if not isinstance(path, basestring): + raise TypeError("path must be a string") + if not isinstance(wtype, int): + raise TypeError("wtype must be an int") + async_result = self.handler.async_result() + self._call(RemoveWatches(_prefix_root(self.chroot, path), wtype), + async_result) + return async_result + class TransactionRequest(object): """A Zookeeper Transaction Request diff --git a/kazoo/exceptions.py b/kazoo/exceptions.py index 8d4f0f34..bfeeb5aa 100644 --- a/kazoo/exceptions.py +++ b/kazoo/exceptions.py @@ -173,6 +173,14 @@ class NotReadOnlyCallError(ZookeeperError): a read-only server""" +@_zookeeper_exception(-123) +class NoWatcherError(ZookeeperError): + """There was no watch for the given path + + .. versionadded:: 1.4 + """ + + class ConnectionClosedError(SessionExpiredError): """Connection is closed""" diff --git a/kazoo/protocol/connection.py b/kazoo/protocol/connection.py index 650e1f67..98f7e7ce 100644 --- a/kazoo/protocol/connection.py +++ b/kazoo/protocol/connection.py @@ -26,6 +26,7 @@ GetChildren, Ping, PingInstance, + RemoveWatches, ReplyHeader, Transaction, Watch, @@ -35,6 +36,7 @@ Callback, KeeperState, WatchedEvent, + WatcherType, EVENT_TYPE_MAP, ) from kazoo.retry import ( @@ -370,13 +372,23 @@ def _read_response(self, header, buffer, offset): async_object.set(response) - # Determine if watchers should be registered + # Determine if watchers should be registered/unregistered watcher = getattr(request, 'watcher', None) if not client._stopped.is_set() and watcher: if isinstance(request, GetChildren): client._child_watchers[request.path].add(watcher) else: client._data_watchers[request.path].add(watcher) + elif isinstance(request, RemoveWatches): + print header + print "Removing watches for %s" % (request.path) + if request.wtype == WatcherType.CHILDREN: + client._child_watchers[request.path].clear() + elif request.wtype == WatcherType.DATA: + client._data_watchers[request.path].clear() + elif request.wtype == WatcherType.ANY: + client._child_watchers[request.path].clear() + client._data_watchers[request.path].clear() if isinstance(request, Close): self.logger.log(BLATHER, 'Read close response') diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index f44f49a3..98a96f19 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -394,3 +394,14 @@ def deserialize(cls, bytes, offset): t, done, err = multiheader_struct.unpack_from(bytes, offset) offset += multiheader_struct.size return cls(t, done is 1, err), offset + + +class RemoveWatches(namedtuple('Auth', 'path wtype')): + type = 17 + + def serialize(self): + return (write_string(self.path) + int_struct.pack(self.wtype)) + + @classmethod + def deserialize(self, bytes, offset): + return True diff --git a/kazoo/protocol/states.py b/kazoo/protocol/states.py index 395c013f..f4d1e750 100644 --- a/kazoo/protocol/states.py +++ b/kazoo/protocol/states.py @@ -235,3 +235,26 @@ def data_length(self): @property def children_count(self): return self.numChildren + + +class WatcherType(object): + """Zookeeper Watcher Type + + Represents the type of watcher, used when removing watchers. + + .. attribute:: CHILDREN + + Watcher for children related watches. + + .. attribute:: DATA + + Watcher for data related watches. + + .. attribute:: ANY + + Any type of watcher (CHILDREN and/or DATA). + + """ + CHILDREN = 1 + DATA = 2 + ANY = 3 diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index 374bbb07..3273e6bb 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -870,6 +870,18 @@ def test_update_host_list(self): finally: self.cluster[0].run() + def test_remove_watches(self): + from kazoo.protocol.states import WatcherType + + def watcher(event): + pass + + self.client.create("/watches", b'first') + + self.client.get("/watches", watcher) + rc = self.client.remove_watches("/watchesx", WatcherType.DATA) + self.assertTrue(rc) + dummy_dict = { 'aversion': 1, 'ctime': 0, 'cversion': 1,