Skip to content

Commit eb34eaf

Browse files
authored
[Enhancement/Feature Issue #47] Added support for KeySharedPolicy for the consumer when in KeyShared mode. (#109)
### Motivation The pulsar python client lacks support for defining KeyShared behaviour like out of order message delivery and sticky-hash, auto-hash for consumers in KeyShared mode. This PR adds full support. The user can now provide a KeySharedPolicy when starting a consumer with client.subscribe() #47 The ConsumerConfiguration::KeySharedPolicy and related setter/getter are now exposed to the Python client in this PR. ### Modifications - Added pybind11 enum for KeySharedMode in src/enums.cc. - Added pybind11 class for KeySharedPolicy in src/config.cc. - Modified pybind11 class for ConsumerConfiguration and added function to set KeySharedPolicy and function to read KeySharedPolicy. - Added KeySharedPolicy wrapper to pulsar/__init__.py. This wrapper handles KeySharedPolicy initialization and does some value validation. - Added the key_shared_policy parameter to client.subscribe(), some validation, and adding to the config in pulsar/__init__.py. - Added 4 new tests to test the new KeySharedPolicy functionality to tests/pulsar_test.py
1 parent 39ac8f8 commit eb34eaf

File tree

5 files changed

+233
-2
lines changed

5 files changed

+233
-2
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,9 @@ wheelhouse
1717
vcpkg_installed/
1818
*.pyd
1919
*.lib
20+
21+
22+
lib_pulsar.so
23+
tests/test.log
24+
.tests-container-id.txt
25+

pulsar/__init__.py

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@
4343
"""
4444

4545
import logging
46+
from typing import List, Tuple, Optional
47+
4648
import _pulsar
4749

4850
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
49-
LoggerLevel, BatchReceivePolicy # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
5052

5153
from pulsar.__about__ import __version__
5254

@@ -689,7 +691,8 @@ def subscribe(self, topic, subscription_name,
689691
max_pending_chunked_message=10,
690692
auto_ack_oldest_chunked_message_on_queue_full=False,
691693
start_message_id_inclusive=False,
692-
batch_receive_policy=None
694+
batch_receive_policy=None,
695+
key_shared_policy=None
693696
):
694697
"""
695698
Subscribe to the given topic and subscription combination.
@@ -774,6 +777,8 @@ def my_listener(consumer, message):
774777
Set the consumer to include the given position of any reset operation like Consumer::seek.
775778
batch_receive_policy: class ConsumerBatchReceivePolicy
776779
Set the batch collection policy for batch receiving.
780+
key_shared_policy: class ConsumerKeySharedPolicy
781+
Set the key shared policy for use when the ConsumerType is KeyShared.
777782
"""
778783
_check_type(str, subscription_name, 'subscription_name')
779784
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -794,6 +799,7 @@ def my_listener(consumer, message):
794799
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
795800
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
796801
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
802+
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
797803

798804
conf = _pulsar.ConsumerConfiguration()
799805
conf.consumer_type(consumer_type)
@@ -826,6 +832,9 @@ def my_listener(consumer, message):
826832
if batch_receive_policy:
827833
conf.batch_receive_policy(batch_receive_policy.policy())
828834

835+
if key_shared_policy:
836+
conf.key_shared_policy(key_shared_policy.policy())
837+
829838
c = Consumer()
830839
if isinstance(topic, str):
831840
# Single topic
@@ -1448,6 +1457,73 @@ def policy(self):
14481457
"""
14491458
return self._policy
14501459

1460+
class ConsumerKeySharedPolicy:
1461+
"""
1462+
Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared.
1463+
"""
1464+
def __init__(
1465+
self,
1466+
key_shared_mode: KeySharedMode = KeySharedMode.AutoSplit,
1467+
allow_out_of_order_delivery: bool = False,
1468+
sticky_ranges: Optional[List[Tuple[int, int]]] = None,
1469+
):
1470+
"""
1471+
Wrapper KeySharedPolicy.
1472+
1473+
Parameters
1474+
----------
1475+
1476+
key_shared_mode: KeySharedMode, optional
1477+
Set the key shared mode. eg: KeySharedMode.Sticky or KeysharedMode.AutoSplit
1478+
1479+
allow_out_of_order_delivery: bool, optional
1480+
Set whether to allow for out of order delivery
1481+
If it is enabled, it relaxes the ordering requirement and allows the broker to send out-of-order
1482+
messages in case of failures. This makes it faster for new consumers to join without being stalled by
1483+
an existing slow consumer.
1484+
1485+
If this is True, a single consumer still receives all keys, but they may come in different orders.
1486+
1487+
sticky_ranges: List[Tuple[int, int]], optional
1488+
Set the ranges used with sticky mode. The integers can be from 0 to 2^16 (0 <= val < 65,536)
1489+
"""
1490+
if key_shared_mode == KeySharedMode.Sticky and sticky_ranges is None:
1491+
raise ValueError("When using key_shared_mode = KeySharedMode.Sticky you must also provide sticky_ranges")
1492+
1493+
self._policy = KeySharedPolicy()
1494+
self._policy.set_key_shared_mode(key_shared_mode)
1495+
self._policy.set_allow_out_of_order_delivery(allow_out_of_order_delivery)
1496+
1497+
if sticky_ranges is not None:
1498+
self._policy.set_sticky_ranges(sticky_ranges)
1499+
1500+
@property
1501+
def key_shared_mode(self) -> KeySharedMode:
1502+
"""
1503+
Returns the key shared mode
1504+
"""
1505+
return self._policy.get_key_shared_mode()
1506+
1507+
@property
1508+
def allow_out_of_order_delivery(self) -> bool:
1509+
"""
1510+
Returns whether out of order delivery is enabled
1511+
"""
1512+
return self._policy.is_allow_out_of_order_delivery()
1513+
1514+
@property
1515+
def sticky_ranges(self) -> List[Tuple[int, int]]:
1516+
"""
1517+
Returns the actual sticky ranges
1518+
"""
1519+
return self._policy.get_sticky_ranges()
1520+
1521+
def policy(self):
1522+
"""
1523+
Returns the actual KeySharedPolicy.
1524+
"""
1525+
return self._policy
1526+
14511527
class Reader:
14521528
"""
14531529
Pulsar topic reader.

src/config.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121
#include <pulsar/ConsoleLoggerFactory.h>
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
24+
#include <pulsar/KeySharedPolicy.h>
2425
#include <pybind11/functional.h>
2526
#include <pybind11/pybind11.h>
27+
#include <pybind11/stl.h>
2628
#include <memory>
2729

2830
namespace py = pybind11;
@@ -121,6 +123,15 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio
121123
void export_config(py::module_& m) {
122124
using namespace py;
123125

126+
class_<KeySharedPolicy, std::shared_ptr<KeySharedPolicy>>(m, "KeySharedPolicy")
127+
.def(init<>())
128+
.def("set_key_shared_mode", &KeySharedPolicy::setKeySharedMode, return_value_policy::reference)
129+
.def("get_key_shared_mode", &KeySharedPolicy::getKeySharedMode)
130+
.def("set_allow_out_of_order_delivery", &KeySharedPolicy::setAllowOutOfOrderDelivery, return_value_policy::reference)
131+
.def("is_allow_out_of_order_delivery", &KeySharedPolicy::isAllowOutOfOrderDelivery)
132+
.def("set_sticky_ranges", static_cast<KeySharedPolicy& (KeySharedPolicy::*)(const StickyRanges&)>(&KeySharedPolicy::setStickyRanges), return_value_policy::reference)
133+
.def("get_sticky_ranges", &KeySharedPolicy::getStickyRanges);
134+
124135
class_<CryptoKeyReader, std::shared_ptr<CryptoKeyReader>>(m, "AbstractCryptoKeyReader")
125136
.def("getPublicKey", &CryptoKeyReader::getPublicKey)
126137
.def("getPrivateKey", &CryptoKeyReader::getPrivateKey);
@@ -222,6 +233,8 @@ void export_config(py::module_& m) {
222233

223234
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
224235
.def(init<>())
236+
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
237+
.def("key_shared_policy", &ConsumerConfiguration::setKeySharedPolicy, return_value_policy::reference)
225238
.def("consumer_type", &ConsumerConfiguration::getConsumerType)
226239
.def("consumer_type", &ConsumerConfiguration::setConsumerType, return_value_policy::reference)
227240
.def("schema", &ConsumerConfiguration::getSchema, return_value_policy::copy)

src/enums.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <pulsar/CompressionType.h>
2121
#include <pulsar/ConsumerConfiguration.h>
2222
#include <pulsar/ProducerConfiguration.h>
23+
#include <pulsar/KeySharedPolicy.h>
2324
#include <pybind11/pybind11.h>
2425

2526
using namespace pulsar;
@@ -28,6 +29,10 @@ namespace py = pybind11;
2829
void export_enums(py::module_& m) {
2930
using namespace py;
3031

32+
enum_<KeySharedMode>(m, "KeySharedMode")
33+
.value("AutoSplit", AUTO_SPLIT)
34+
.value("Sticky", STICKY);
35+
3136
enum_<ProducerConfiguration::PartitionsRoutingMode>(m, "PartitionsRoutingMode")
3237
.value("UseSinglePartition", ProducerConfiguration::UseSinglePartition)
3338
.value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution)

tests/pulsar_test.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
MessageId,
3333
CompressionType,
3434
ConsumerType,
35+
KeySharedMode,
36+
ConsumerKeySharedPolicy,
3537
PartitionsRoutingMode,
3638
AuthenticationBasic,
3739
AuthenticationTLS,
@@ -1437,6 +1439,134 @@ def send_callback(res, msg):
14371439
producer.flush()
14381440
client.close()
14391441

1442+
def test_keyshare_policy(self):
1443+
with self.assertRaises(ValueError):
1444+
# Raise error because sticky ranges are not provided.
1445+
pulsar.ConsumerKeySharedPolicy(
1446+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1447+
allow_out_of_order_delivery=False,
1448+
)
1449+
1450+
expected_key_shared_mode = pulsar.KeySharedMode.Sticky
1451+
expected_allow_out_of_order_delivery = True
1452+
expected_sticky_ranges = [(0, 100), (101,200)]
1453+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1454+
key_shared_mode=expected_key_shared_mode,
1455+
allow_out_of_order_delivery=expected_allow_out_of_order_delivery,
1456+
sticky_ranges=expected_sticky_ranges
1457+
)
1458+
1459+
self.assertEqual(consumer_key_shared_policy.key_shared_mode, expected_key_shared_mode)
1460+
self.assertEqual(consumer_key_shared_policy.allow_out_of_order_delivery, expected_allow_out_of_order_delivery)
1461+
self.assertEqual(consumer_key_shared_policy.sticky_ranges, expected_sticky_ranges)
1462+
1463+
def test_keyshared_invalid_sticky_ranges(self):
1464+
client = Client(self.serviceUrl)
1465+
topic = "my-python-topic-keyshare-invalid-" + str(time.time())
1466+
with self.assertRaises(ValueError):
1467+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1468+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1469+
allow_out_of_order_delivery=False,
1470+
sticky_ranges=[(0,65536)]
1471+
)
1472+
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
1473+
start_message_id_inclusive=True,
1474+
key_shared_policy=consumer_key_shared_policy)
1475+
1476+
with self.assertRaises(ValueError):
1477+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1478+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1479+
allow_out_of_order_delivery=False,
1480+
sticky_ranges=[(0, 100), (50, 150)]
1481+
)
1482+
client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared,
1483+
start_message_id_inclusive=True,
1484+
key_shared_policy=consumer_key_shared_policy)
1485+
1486+
def test_keyshared_autosplit(self):
1487+
client = Client(self.serviceUrl)
1488+
topic = "my-python-topic-keyshare-autosplit-" + str(time.time())
1489+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1490+
key_shared_mode=pulsar.KeySharedMode.AutoSplit,
1491+
allow_out_of_order_delivery=True,
1492+
)
1493+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-1',
1494+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1495+
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name = 'con-2',
1496+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1497+
producer = client.create_producer(topic)
1498+
1499+
for i in range(10):
1500+
if i > 0:
1501+
time.sleep(0.02)
1502+
producer.send(b"hello-%d" % i)
1503+
1504+
msgs = []
1505+
while True:
1506+
try:
1507+
msg = consumer.receive(100)
1508+
except pulsar.Timeout:
1509+
break
1510+
msgs.append(msg)
1511+
consumer.acknowledge(msg)
1512+
1513+
while True:
1514+
try:
1515+
msg = consumer2.receive(100)
1516+
except pulsar.Timeout:
1517+
break
1518+
msgs.append(msg)
1519+
consumer2.acknowledge(msg)
1520+
1521+
self.assertEqual(len(msgs), 10)
1522+
client.close()
1523+
1524+
def test_sticky_autosplit(self):
1525+
client = Client(self.serviceUrl)
1526+
topic = "my-python-topic-keyshare-sticky-" + str(time.time())
1527+
consumer_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1528+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1529+
allow_out_of_order_delivery=True,
1530+
sticky_ranges=[(0,30000)],
1531+
)
1532+
1533+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-1',
1534+
start_message_id_inclusive=True, key_shared_policy=consumer_key_shared_policy)
1535+
1536+
consumer2_key_shared_policy = pulsar.ConsumerKeySharedPolicy(
1537+
key_shared_mode=pulsar.KeySharedMode.Sticky,
1538+
allow_out_of_order_delivery=True,
1539+
sticky_ranges=[(30001, 65535)],
1540+
)
1541+
consumer2 = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.KeyShared, consumer_name='con-2',
1542+
start_message_id_inclusive=True, key_shared_policy=consumer2_key_shared_policy)
1543+
producer = client.create_producer(topic)
1544+
1545+
for i in range(10):
1546+
if i > 0:
1547+
time.sleep(0.02)
1548+
producer.send(b"hello-%d" % i)
1549+
1550+
msgs = []
1551+
while True:
1552+
try:
1553+
msg = consumer.receive(100)
1554+
except pulsar.Timeout:
1555+
break
1556+
msgs.append(msg)
1557+
consumer.acknowledge(msg)
1558+
1559+
while True:
1560+
try:
1561+
msg = consumer2.receive(100)
1562+
except pulsar.Timeout:
1563+
break
1564+
msgs.append(msg)
1565+
consumer2.acknowledge(msg)
1566+
1567+
self.assertEqual(len(msgs), 10)
1568+
client.close()
1569+
14401570
def test_acknowledge_failed(self):
14411571
client = Client(self.serviceUrl)
14421572
topic = 'test_acknowledge_failed'
@@ -1461,5 +1591,6 @@ def test_acknowledge_failed(self):
14611591
client.close()
14621592

14631593

1594+
14641595
if __name__ == "__main__":
14651596
main()

0 commit comments

Comments
 (0)