Skip to content

Commit 18901d0

Browse files
committed
Wrapper BatchReceivePolicy.
1 parent fae8e39 commit 18901d0

File tree

2 files changed

+33
-14
lines changed

2 files changed

+33
-14
lines changed

pulsar/__init__.py

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -741,16 +741,8 @@ def my_listener(consumer, message):
741741
if autoAckOldestChunkedMessageOnQueueFull is true else it marks them for redelivery.
742742
start_message_id_inclusive: bool, default=False
743743
Set the consumer to include the given position of any reset operation like Consumer::seek.
744-
batch_receive_policy: class BatchReceivePolicy, Constructor parameters (in order):
745-
: param maxNumMessage: Max num message, if less than 0, it means no limit. default: -1
746-
: param maxNumBytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024
747-
: param timeoutMs: If less than 0, it means no limit. default: 100
748-
749-
Batch receive policy can limit the number and bytes of messages in a single batch,
750-
and can specify a timeout for waiting for enough messages for this batch.
751-
752-
A batch receive action is completed as long as any one of the conditions (the batch has enough number
753-
or size of messages, or the waiting timeout is passed) are met.
744+
batch_receive_policy: class ConsumerBatchReceivePolicy
745+
Set the batch collection policy for batch receiving.
754746
"""
755747
_check_type(str, subscription_name, 'subscription_name')
756748
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -770,7 +762,7 @@ def my_listener(consumer, message):
770762
_check_type(int, max_pending_chunked_message, 'max_pending_chunked_message')
771763
_check_type(bool, auto_ack_oldest_chunked_message_on_queue_full, 'auto_ack_oldest_chunked_message_on_queue_full')
772764
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
773-
_check_type_or_none(BatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
765+
_check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy')
774766

775767
conf = _pulsar.ConsumerConfiguration()
776768
conf.consumer_type(consumer_type)
@@ -801,7 +793,7 @@ def my_listener(consumer, message):
801793
conf.auto_ack_oldest_chunked_message_on_queue_full(auto_ack_oldest_chunked_message_on_queue_full)
802794
conf.start_message_id_inclusive(start_message_id_inclusive)
803795
if batch_receive_policy:
804-
conf.batch_receive_policy(batch_receive_policy)
796+
conf.batch_receive_policy(batch_receive_policy.policy())
805797

806798
c = Consumer()
807799
if isinstance(topic, str):
@@ -1382,6 +1374,32 @@ def get_last_message_id(self):
13821374
"""
13831375
return self._consumer.get_last_message_id()
13841376

1377+
class ConsumerBatchReceivePolicy:
1378+
"""
1379+
Batch receive policy can limit the number and bytes of messages in a single batch,
1380+
and can specify a timeout for waiting for enough messages for this batch.
1381+
1382+
A batch receive action is completed as long as any one of the conditions (the batch has enough number
1383+
or size of messages, or the waiting timeout is passed) are met.
1384+
"""
1385+
def __init__(self, max_num_message, max_num_bytes, timeout_ms):
1386+
"""
1387+
Wrapper BatchReceivePolicy.
1388+
1389+
Parameters
1390+
----------
1391+
1392+
max_num_message: Max num message, if less than 0, it means no limit. default: -1
1393+
max_num_bytes: Max num bytes, if less than 0, it means no limit. default: 10 * 1024 * 1024
1394+
timeout_ms: If less than 0, it means no limit. default: 100
1395+
"""
1396+
self._policy = BatchReceivePolicy(max_num_message, max_num_bytes, timeout_ms)
1397+
1398+
def policy(self):
1399+
"""
1400+
Returns the actual one BatchReceivePolicy.
1401+
"""
1402+
return self._policy
13851403

13861404
class Reader:
13871405
"""

tests/pulsar_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,11 @@
3939
AuthenticationToken,
4040
InitialPosition,
4141
CryptoKeyReader,
42+
ConsumerBatchReceivePolicy,
4243
)
4344
from pulsar.schema import JsonSchema, Record, Integer
4445

45-
from _pulsar import ProducerConfiguration, ConsumerConfiguration, BatchReceivePolicy
46+
from _pulsar import ProducerConfiguration, ConsumerConfiguration
4647

4748
from schema_test import *
4849

@@ -1068,7 +1069,7 @@ def test_batch_receive(self):
10681069
client = Client(self.serviceUrl)
10691070
topic = "my-python-topic-batch-receive-" + str(time.time())
10701071
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1071-
start_message_id_inclusive=True, batch_receive_policy=BatchReceivePolicy(10, -1, -1))
1072+
start_message_id_inclusive=True, batch_receive_policy=ConsumerBatchReceivePolicy(10, -1, -1))
10721073
producer = client.create_producer(topic)
10731074

10741075

0 commit comments

Comments
 (0)