Skip to content

Commit 625f7c3

Browse files
jeblairStephenSorriaux
authored andcommitted
feat: add support for persistent recursive watches
ZooKeeper 3.6.0 added support for persistent, and persistent recursive watches. This adds the corresponding support to the Kazoo client class.
1 parent 55f27b2 commit 625f7c3

File tree

6 files changed

+344
-8
lines changed

6 files changed

+344
-8
lines changed

kazoo/client.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from kazoo.protocol.connection import ConnectionHandler
2626
from kazoo.protocol.paths import _prefix_root, normpath
2727
from kazoo.protocol.serialization import (
28+
AddWatch,
2829
Auth,
2930
CheckVersion,
3031
CloseInstance,
@@ -38,6 +39,7 @@
3839
SetACL,
3940
GetData,
4041
Reconfig,
42+
RemoveWatches,
4143
SetData,
4244
Sync,
4345
Transaction,
@@ -248,6 +250,8 @@ def __init__(
248250
self.state_listeners = set()
249251
self._child_watchers = defaultdict(set)
250252
self._data_watchers = defaultdict(set)
253+
self._persistent_watchers = defaultdict(set)
254+
self._persistent_recursive_watchers = defaultdict(set)
251255
self._reset()
252256
self.read_only = read_only
253257

@@ -416,8 +420,16 @@ def _reset_watchers(self):
416420
for data_watchers in self._data_watchers.values():
417421
watchers.extend(data_watchers)
418422

423+
for persistent_watchers in self._persistent_watchers.values():
424+
watchers.extend(persistent_watchers)
425+
426+
for pr_watchers in self._persistent_recursive_watchers.values():
427+
watchers.extend(pr_watchers)
428+
419429
self._child_watchers = defaultdict(set)
420430
self._data_watchers = defaultdict(set)
431+
self._persistent_watchers = defaultdict(set)
432+
self._persistent_recursive_watchers = defaultdict(set)
421433

422434
ev = WatchedEvent(EventType.NONE, self._state, None)
423435
for watch in watchers:
@@ -1644,8 +1656,100 @@ def reconfig_async(self, joining, leaving, new_members, from_config):
16441656

16451657
return async_result
16461658

1659+
def add_watch(self, path, watch, mode):
1660+
"""Add a watch.
1661+
1662+
This method adds persistent watches. Unlike the data and
1663+
child watches which may be set by calls to
1664+
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
1665+
:meth:`KazooClient.get_children`, persistent watches are not
1666+
removed after being triggered.
1667+
1668+
To remove a persistent watch, use
1669+
:meth:`KazooClient.remove_all_watches` with an argument of
1670+
:attr:`~kazoo.states.WatcherType.ANY`.
1671+
1672+
The `mode` argument determines whether or not the watch is
1673+
recursive. To set a persistent watch, use
1674+
:class:`~kazoo.states.AddWatchMode.PERSISTENT`. To set a
1675+
persistent recursive watch, use
1676+
:class:`~kazoo.states.AddWatchMode.PERSISTENT_RECURSIVE`.
1677+
1678+
:param path: Path of node to watch.
1679+
:param watch: Watch callback to set for future changes
1680+
to this path.
1681+
:param mode: The mode to use.
1682+
:type mode: int
1683+
1684+
:raises:
1685+
:exc:`~kazoo.exceptions.MarshallingError` if mode is
1686+
unknown.
1687+
1688+
:exc:`~kazoo.exceptions.ZookeeperError` if the server
1689+
returns a non-zero error code.
1690+
"""
1691+
return self.add_watch_async(path, watch, mode).get()
1692+
1693+
def add_watch_async(self, path, watch, mode):
1694+
"""Asynchronously add a watch. Takes the same arguments as
1695+
:meth:`add_watch`.
1696+
"""
1697+
if not isinstance(path, str):
1698+
raise TypeError("Invalid type for 'path' (string expected)")
1699+
if not callable(watch):
1700+
raise TypeError("Invalid type for 'watch' (must be a callable)")
1701+
if not isinstance(mode, int):
1702+
raise TypeError("Invalid type for 'mode' (int expected)")
1703+
1704+
async_result = self.handler.async_result()
1705+
self._call(
1706+
AddWatch(_prefix_root(self.chroot, path), watch, mode),
1707+
async_result,
1708+
)
1709+
return async_result
1710+
1711+
def remove_all_watches(self, path, watcher_type):
1712+
"""Remove watches from a path.
1713+
1714+
This removes all watches of a specified type (data, child,
1715+
any) from a given path.
1716+
1717+
The `watcher_type` argument specifies which type to use. It
1718+
may be one of:
1719+
1720+
* :attr:`~kazoo.states.WatcherType.DATA`
1721+
* :attr:`~kazoo.states.WatcherType.CHILD`
1722+
* :attr:`~kazoo.states.WatcherType.ANY`
1723+
1724+
To remove persistent watches, specify a watcher type of
1725+
:attr:`~kazoo.states.WatcherType.ANY`.
1726+
1727+
:param path: Path of watch to remove.
1728+
:param watcher_type: The type of watch to remove.
1729+
:type watcher_type: int
1730+
"""
1731+
1732+
return self.remove_all_watches_async(path, watcher_type).get()
1733+
1734+
def remove_all_watches_async(self, path, watcher_type):
1735+
"""Asynchronously remove watches. Takes the same arguments as
1736+
:meth:`remove_all_watches`.
1737+
"""
1738+
if not isinstance(path, str):
1739+
raise TypeError("Invalid type for 'path' (string expected)")
1740+
if not isinstance(watcher_type, int):
1741+
raise TypeError("Invalid type for 'watcher_type' (int expected)")
1742+
1743+
async_result = self.handler.async_result()
1744+
self._call(
1745+
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
1746+
async_result,
1747+
)
1748+
return async_result
1749+
16471750

16481751
class TransactionRequest(object):
1752+
16491753
"""A Zookeeper Transaction Request
16501754
16511755
A Transaction provides a builder object that can be used to

kazoo/exceptions.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ class NotReadOnlyCallError(ZookeeperError):
187187
a read-only server"""
188188

189189

190+
@_zookeeper_exception(-121)
191+
class NoWatcherError(ZookeeperError):
192+
"""No watcher was found at the supplied path"""
193+
194+
190195
@_zookeeper_exception(-125)
191196
class QuotaExceededError(ZookeeperError):
192197
"""Exceeded the quota that was set on the path"""

kazoo/protocol/connection.py

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from kazoo.loggingsupport import BLATHER
2222
from kazoo.protocol.serialization import (
23+
AddWatch,
2324
Auth,
2425
Close,
2526
Connect,
@@ -28,13 +29,15 @@
2829
GetChildren2,
2930
Ping,
3031
PingInstance,
32+
RemoveWatches,
3133
ReplyHeader,
3234
SASL,
3335
Transaction,
3436
Watch,
3537
int_struct,
3638
)
3739
from kazoo.protocol.states import (
40+
AddWatchMode,
3841
Callback,
3942
KeeperState,
4043
WatchedEvent,
@@ -363,6 +366,18 @@ def _write(self, msg, timeout):
363366
raise ConnectionDropped("socket connection broken")
364367
sent += bytes_sent
365368

369+
def _find_persistent_recursive_watchers(self, path):
370+
parts = path.split("/")
371+
watchers = []
372+
for count in range(len(parts)):
373+
candidate = "/".join(parts[: count + 1])
374+
if not candidate:
375+
continue
376+
watchers.extend(
377+
self.client._persistent_recursive_watchers.get(candidate, [])
378+
)
379+
return watchers
380+
366381
def _read_watch_event(self, buffer, offset):
367382
client = self.client
368383
watch, offset = Watch.deserialize(buffer, offset)
@@ -374,9 +389,13 @@ def _read_watch_event(self, buffer, offset):
374389

375390
if watch.type in (CREATED_EVENT, CHANGED_EVENT):
376391
watchers.extend(client._data_watchers.pop(path, []))
392+
watchers.extend(client._persistent_watchers.get(path, []))
393+
watchers.extend(self._find_persistent_recursive_watchers(path))
377394
elif watch.type == DELETED_EVENT:
378395
watchers.extend(client._data_watchers.pop(path, []))
379396
watchers.extend(client._child_watchers.pop(path, []))
397+
watchers.extend(client._persistent_watchers.get(path, []))
398+
watchers.extend(self._find_persistent_recursive_watchers(path))
380399
elif watch.type == CHILD_EVENT:
381400
watchers.extend(client._child_watchers.pop(path, []))
382401
else:
@@ -448,13 +467,35 @@ def _read_response(self, header, buffer, offset):
448467

449468
async_object.set(response)
450469

451-
# Determine if watchers should be registered
452-
watcher = getattr(request, "watcher", None)
453-
if not client._stopped.is_set() and watcher:
454-
if isinstance(request, (GetChildren, GetChildren2)):
455-
client._child_watchers[request.path].add(watcher)
456-
else:
457-
client._data_watchers[request.path].add(watcher)
470+
# Determine if watchers should be registered or unregistered
471+
if not client._stopped.is_set():
472+
watcher = getattr(request, "watcher", None)
473+
if watcher:
474+
if isinstance(request, AddWatch):
475+
if request.mode == AddWatchMode.PERSISTENT:
476+
client._persistent_watchers[request.path].add(
477+
watcher
478+
)
479+
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
480+
client._persistent_recursive_watchers[
481+
request.path
482+
].add(watcher)
483+
elif isinstance(request, (GetChildren, GetChildren2)):
484+
client._child_watchers[request.path].add(watcher)
485+
else:
486+
client._data_watchers[request.path].add(watcher)
487+
if isinstance(request, RemoveWatches):
488+
if request.watcher_type == 1:
489+
client._child_watchers.pop(request.path, None)
490+
elif request.watcher_type == 2:
491+
client._data_watchers.pop(request.path, None)
492+
elif request.watcher_type == 3:
493+
client._child_watchers.pop(request.path, None)
494+
client._data_watchers.pop(request.path, None)
495+
client._persistent_watchers.pop(request.path, None)
496+
client._persistent_recursive_watchers.pop(
497+
request.path, None
498+
)
458499

459500
if isinstance(request, Close):
460501
self.logger.log(BLATHER, "Read close response")

kazoo/protocol/serialization.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
416416
return data, stat
417417

418418

419+
class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
420+
type = 18
421+
422+
def serialize(self):
423+
b = bytearray()
424+
b.extend(write_string(self.path))
425+
b.extend(int_struct.pack(self.watcher_type))
426+
return b
427+
428+
@classmethod
429+
def deserialize(cls, bytes, offset):
430+
return None
431+
432+
419433
class Auth(namedtuple("Auth", "auth_type scheme auth")):
420434
type = 100
421435

@@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
441455
return challenge, offset
442456

443457

458+
class AddWatch(namedtuple("AddWatch", "path watcher mode")):
459+
type = 106
460+
461+
def serialize(self):
462+
b = bytearray()
463+
b.extend(write_string(self.path))
464+
b.extend(int_struct.pack(self.mode))
465+
return b
466+
467+
@classmethod
468+
def deserialize(cls, bytes, offset):
469+
return None
470+
471+
444472
class Watch(namedtuple("Watch", "type state path")):
445473
@classmethod
446474
def deserialize(cls, bytes, offset):

kazoo/protocol/states.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,44 @@ def data_length(self):
251251
@property
252252
def children_count(self):
253253
return self.numChildren
254+
255+
256+
class AddWatchMode(object):
257+
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
258+
259+
.. attribute:: PERSISTENT
260+
261+
The watch is not removed when trigged.
262+
263+
.. attribute:: PERSISTENT_RECURSIVE
264+
265+
The watch is not removed when trigged, and applies to all
266+
paths underneath the supplied path as well.
267+
"""
268+
269+
PERSISTENT = 0
270+
PERSISTENT_RECURSIVE = 1
271+
272+
273+
class WatcherType(object):
274+
"""Watcher types for use with
275+
:meth:`~kazoo.client.KazooClient.remove_all_watches`
276+
277+
.. attribute:: CHILDREN
278+
279+
Child watches.
280+
281+
.. attribute:: DATA
282+
283+
Data watches.
284+
285+
.. attribute:: ANY
286+
287+
Any type of watch (child, data, persistent, or persistent
288+
recursive).
289+
290+
"""
291+
292+
CHILDREN = 1
293+
DATA = 2
294+
ANY = 3

0 commit comments

Comments
 (0)