Skip to content

Commit 4634c04

Browse files
ceacheCharles-Henri de Boysson
authored andcommitted
feat(core): Add support for Container and TTL nodes
Also add support through transations. Closes #334, #496
1 parent e4f808f commit 4634c04

File tree

2 files changed

+192
-36
lines changed

2 files changed

+192
-36
lines changed

kazoo/client.py

Lines changed: 138 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
CloseInstance,
3333
Create,
3434
Create2,
35+
CreateContainer,
36+
CreateTTL,
3537
Delete,
3638
Exists,
3739
GetChildren,
@@ -873,7 +875,8 @@ def sync(self, path):
873875
return self.sync_async(path).get()
874876

875877
def create(self, path, value=b"", acl=None, ephemeral=False,
876-
sequence=False, makepath=False, include_data=False):
878+
sequence=False, makepath=False, include_data=False,
879+
container=False, ttl=0):
877880
"""Create a node with the given value as its data. Optionally
878881
set an ACL on the node.
879882
@@ -950,15 +953,19 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
950953
The `makepath` option.
951954
.. versionadded:: 2.7
952955
The `include_data` option.
956+
.. versionadded:: 2.8
957+
The container and ttl options.
958+
953959
"""
954960
acl = acl or self.default_acl
955961
return self.create_async(
956962
path, value, acl=acl, ephemeral=ephemeral,
957-
sequence=sequence, makepath=makepath, include_data=include_data
958-
).get()
963+
sequence=sequence, makepath=makepath, include_data=include_data,
964+
container=container, ttl=ttl).get()
959965

960966
def create_async(self, path, value=b"", acl=None, ephemeral=False,
961-
sequence=False, makepath=False, include_data=False):
967+
sequence=False, makepath=False, include_data=False,
968+
container=False, ttl=0):
962969
"""Asynchronously create a ZNode. Takes the same arguments as
963970
:meth:`create`.
964971
@@ -967,7 +974,9 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
967974
.. versionadded:: 1.1
968975
The makepath option.
969976
.. versionadded:: 2.7
970-
The `include_data` option.
977+
The include_data option.
978+
.. versionadded:: 2.8
979+
The container and ttl options.
971980
"""
972981
if acl is None and self.default_acl:
973982
acl = self.default_acl
@@ -988,24 +997,86 @@ def create_async(self, path, value=b"", acl=None, ephemeral=False,
988997
raise TypeError("Invalid type for 'makepath' (bool expected)")
989998
if not isinstance(include_data, bool):
990999
raise TypeError("Invalid type for 'include_data' (bool expected)")
991-
1000+
if not isinstance(container, bool):
1001+
raise TypeError("Invalid type for 'container' (bool expected)")
1002+
if not isinstance(ttl, int) or ttl < 0:
1003+
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
1004+
if ttl and ephemeral:
1005+
raise TypeError("Invalid node creation: ephemeral & ttl")
1006+
if container and (ephemeral or sequence or ttl):
1007+
raise TypeError("Invalid node creation: container & ephemeral/sequence/ttl")
1008+
1009+
# Should match Zookeeper's CreateMode fromFlag
1010+
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java#L112
9921011
flags = 0
9931012
if ephemeral:
9941013
flags |= 1
9951014
if sequence:
9961015
flags |= 2
1016+
if container:
1017+
flags = 4
1018+
if ttl:
1019+
if sequence:
1020+
flags = 6
1021+
else:
1022+
flags = 5
1023+
9971024
if acl is None:
9981025
acl = OPEN_ACL_UNSAFE
9991026

1027+
# Figure out the OpCode we are going to send
1028+
if include_data:
1029+
stat_in_result = True
1030+
opcode = lambda path, value, acl: Create2(
1031+
_prefix_root(self.chroot, path, trailing=sequence),
1032+
value,
1033+
acl,
1034+
flags
1035+
)
1036+
elif container:
1037+
stat_in_result = True
1038+
opcode = lambda path, value, acl: CreateContainer(
1039+
_prefix_root(self.chroot, path, trailing=False),
1040+
value,
1041+
acl,
1042+
flags
1043+
)
1044+
elif ttl:
1045+
stat_in_result = True
1046+
opcode = lambda path, value, acl: CreateTTL(
1047+
_prefix_root(self.chroot, path, trailing=sequence),
1048+
value,
1049+
acl,
1050+
flags,
1051+
ttl
1052+
)
1053+
else:
1054+
stat_in_result = False
1055+
opcode = lambda path, value, acl: Create(
1056+
_prefix_root(self.chroot, path, trailing=sequence),
1057+
value,
1058+
acl,
1059+
flags
1060+
)
10001061
async_result = self.handler.async_result()
10011062

10021063
@capture_exceptions(async_result)
10031064
def do_create():
1004-
result = self._create_async_inner(
1005-
path, value, acl, flags,
1006-
trailing=sequence, include_data=include_data
1065+
inner_async_result = self.handler.async_result()
1066+
1067+
call_result = self._call(
1068+
opcode(path, value, acl),
1069+
inner_async_result
10071070
)
1008-
result.rawlink(create_completion)
1071+
if call_result is False:
1072+
# We hit a short-circuit exit on the _call. Because we are
1073+
# not using the original async_result here, we bubble the
1074+
# exception upwards to the do_create function in
1075+
# KazooClient.create so that it gets set on the correct
1076+
# async_result object
1077+
raise inner_async_result.exception
1078+
1079+
inner_async_result.rawlink(create_completion)
10091080

10101081
@capture_exceptions(async_result)
10111082
def retry_completion(result):
@@ -1015,7 +1086,7 @@ def retry_completion(result):
10151086
@wrap(async_result)
10161087
def create_completion(result):
10171088
try:
1018-
if include_data:
1089+
if stat_in_result:
10191090
new_path, stat = result.get()
10201091
return self.unchroot(new_path), stat
10211092
else:
@@ -1032,26 +1103,6 @@ def create_completion(result):
10321103
do_create()
10331104
return async_result
10341105

1035-
def _create_async_inner(self, path, value, acl, flags,
1036-
trailing=False, include_data=False):
1037-
async_result = self.handler.async_result()
1038-
if include_data:
1039-
opcode = Create2
1040-
else:
1041-
opcode = Create
1042-
1043-
call_result = self._call(
1044-
opcode(_prefix_root(self.chroot, path, trailing=trailing),
1045-
value, acl, flags), async_result)
1046-
if call_result is False:
1047-
# We hit a short-circuit exit on the _call. Because we are
1048-
# not using the original async_result here, we bubble the
1049-
# exception upwards to the do_create function in
1050-
# KazooClient.create so that it gets set on the correct
1051-
# async_result object
1052-
raise async_result.exception
1053-
return async_result
1054-
10551106
def ensure_path(self, path, acl=None):
10561107
"""Recursively create a path if it doesn't exist.
10571108
@@ -1590,13 +1641,15 @@ def __init__(self, client):
15901641
self.committed = False
15911642

15921643
def create(self, path, value=b"", acl=None, ephemeral=False,
1593-
sequence=False):
1644+
sequence=False, include_data=False, container=False, ttl=0):
15941645
"""Add a create ZNode to the transaction. Takes the same
15951646
arguments as :meth:`KazooClient.create`, with the exception
15961647
of `makepath`.
15971648
15981649
:returns: None
15991650
1651+
.. versionadded:: 2.8
1652+
The include_data, container and ttl options.
16001653
"""
16011654
if acl is None and self.client.default_acl:
16021655
acl = self.client.default_acl
@@ -1612,17 +1665,67 @@ def create(self, path, value=b"", acl=None, ephemeral=False,
16121665
raise TypeError("Invalid type for 'ephemeral' (bool expected)")
16131666
if not isinstance(sequence, bool):
16141667
raise TypeError("Invalid type for 'sequence' (bool expected)")
1615-
1668+
if not isinstance(include_data, bool):
1669+
raise TypeError("Invalid type for 'include_data' (bool expected)")
1670+
if not isinstance(container, bool):
1671+
raise TypeError("Invalid type for 'container' (bool expected)")
1672+
if not isinstance(ttl, int) or ttl < 0:
1673+
raise TypeError("Invalid 'ttl' (integer >= 0 expected)")
1674+
if ttl and ephemeral:
1675+
raise TypeError("Invalid node creation: ephemeral & ttl")
1676+
if container and (ephemeral or sequence or ttl):
1677+
raise TypeError("Invalid node creation: container & ephemeral/sequence/ttl")
1678+
1679+
# Should match Zookeeper's CreateMode fromFlag
1680+
# https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/CreateMode.java#L112
16161681
flags = 0
16171682
if ephemeral:
16181683
flags |= 1
16191684
if sequence:
16201685
flags |= 2
1686+
if container:
1687+
flags = 4
1688+
if ttl:
1689+
if sequence:
1690+
flags = 6
1691+
else:
1692+
flags = 5
1693+
16211694
if acl is None:
16221695
acl = OPEN_ACL_UNSAFE
16231696

1624-
self._add(Create(_prefix_root(self.client.chroot, path), value, acl,
1625-
flags), None)
1697+
# Figure out the OpCode we are going to send
1698+
if include_data:
1699+
opcode = lambda path, value, acl: Create2(
1700+
_prefix_root(self.client.chroot, path, trailing=sequence),
1701+
value,
1702+
acl,
1703+
flags
1704+
)
1705+
elif container:
1706+
opcode = lambda path, value, acl: CreateContainer(
1707+
_prefix_root(self.client.chroot, path, trailing=False),
1708+
value,
1709+
acl,
1710+
flags
1711+
)
1712+
elif ttl:
1713+
opcode = lambda path, value, acl: CreateTTL(
1714+
_prefix_root(self.client.chroot, path, trailing=sequence),
1715+
value,
1716+
acl,
1717+
flags,
1718+
ttl
1719+
)
1720+
else:
1721+
opcode = lambda path, value, acl: Create(
1722+
_prefix_root(self.client.chroot, path, trailing=sequence),
1723+
value,
1724+
acl,
1725+
flags
1726+
)
1727+
1728+
self._add(opcode(path, value, acl), None)
16261729

16271730
def delete(self, path, version=-1):
16281731
"""Add a delete ZNode to the transaction. Takes the same

kazoo/protocol/serialization.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,17 @@ def deserialize(cls, bytes, offset):
323323
while not header.done:
324324
if header.type == Create.type:
325325
response, offset = read_string(bytes, offset)
326+
elif header.type == Create2.type:
327+
path, offset = read_string(bytes, offset)
328+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
329+
offset += stat_struct.size
330+
response = (path, stat)
326331
elif header.type == Delete.type:
327332
response = True
328333
elif header.type == SetData.type:
329334
response = ZnodeStat._make(
330-
stat_struct.unpack_from(bytes, offset))
335+
stat_struct.unpack_from(bytes, offset)
336+
)
331337
offset += stat_struct.size
332338
elif header.type == CheckVersion.type:
333339
response = True
@@ -346,6 +352,10 @@ def unchroot(client, response):
346352
for result in response:
347353
if isinstance(result, six.string_types):
348354
resp.append(client.unchroot(result))
355+
elif isinstance(result, ZnodeStat): # Need to test before tuple
356+
resp.append(result)
357+
elif isinstance(result, tuple):
358+
resp.append((client.unchroot(result[0]), result[1]))
349359
else:
350360
resp.append(result)
351361
return resp
@@ -391,6 +401,49 @@ def deserialize(cls, bytes, offset):
391401
return data, stat
392402

393403

404+
class CreateContainer(namedtuple('CreateContainer', 'path data acl flags')):
405+
type = 19
406+
407+
def serialize(self):
408+
b = bytearray()
409+
b.extend(write_string(self.path))
410+
b.extend(write_buffer(self.data))
411+
b.extend(int_struct.pack(len(self.acl)))
412+
for acl in self.acl:
413+
b.extend(int_struct.pack(acl.perms) +
414+
write_string(acl.id.scheme) + write_string(acl.id.id))
415+
b.extend(int_struct.pack(self.flags))
416+
return b
417+
418+
@classmethod
419+
def deserialize(cls, bytes, offset):
420+
path, offset = read_string(bytes, offset)
421+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
422+
return path, stat
423+
424+
425+
class CreateTTL(namedtuple('CreateTTL', 'path data acl flags ttl')):
426+
type = 21
427+
428+
def serialize(self):
429+
b = bytearray()
430+
b.extend(write_string(self.path))
431+
b.extend(write_buffer(self.data))
432+
b.extend(int_struct.pack(len(self.acl)))
433+
for acl in self.acl:
434+
b.extend(int_struct.pack(acl.perms) +
435+
write_string(acl.id.scheme) + write_string(acl.id.id))
436+
b.extend(int_struct.pack(self.flags))
437+
b.extend(long_struct.pack(self.ttl))
438+
return b
439+
440+
@classmethod
441+
def deserialize(cls, bytes, offset):
442+
path, offset = read_string(bytes, offset)
443+
stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset))
444+
return path, stat
445+
446+
394447
class Auth(namedtuple('Auth', 'auth_type scheme auth')):
395448
type = 100
396449

0 commit comments

Comments
 (0)