Skip to content

Support constructing MessageId from results of send() and receive() #254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MessageId:
"""

def __init__(self, partition=-1, ledger_id=-1, entry_id=-1, batch_index=-1):
self._msg_id = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)
self._msg_id: _pulsar.MessageId = _pulsar.MessageId(partition, ledger_id, entry_id, batch_index)

earliest = _pulsar.MessageId.earliest
latest = _pulsar.MessageId.latest
Expand Down Expand Up @@ -111,6 +111,24 @@ def __str__(self) -> str:
"""
return str(self._msg_id)

def __eq__(self, other) -> bool:
return self._msg_id == other._msg_id

def __ne__(self, other) -> bool:
return self._msg_id != other._msg_id

def __le__(self, other) -> bool:
return self._msg_id <= other._msg_id

def __lt__(self, other) -> bool:
return self._msg_id < other._msg_id

def __ge__(self, other) -> bool:
return self._msg_id >= other._msg_id

def __gt__(self, other) -> bool:
return self._msg_id > other._msg_id

@staticmethod
def deserialize(message_id_bytes):
"""
Expand All @@ -119,6 +137,14 @@ def deserialize(message_id_bytes):
"""
return _pulsar.MessageId.deserialize(message_id_bytes)

@classmethod
def wrap(cls, msg_id: _pulsar.MessageId):
"""
Wrap the underlying MessageId type from the C extension to the Python type.
"""
self = cls()
self._msg_id = msg_id
return self

class Message:
"""
Expand Down Expand Up @@ -170,9 +196,13 @@ def event_timestamp(self):
"""
return self._message.event_timestamp()

def message_id(self):
def message_id(self) -> _pulsar.MessageId:
"""
The message ID that can be used to refer to this particular message.

Returns
----------
A `_pulsar.MessageId` object that represents where the message is persisted.
"""
return self._message.message_id()

Expand Down Expand Up @@ -1231,7 +1261,7 @@ def send(self, content,
event_timestamp=None,
deliver_at=None,
deliver_after=None,
):
) -> _pulsar.MessageId:
"""
Publish a message on the topic. Blocks until the message is acknowledged

Expand Down Expand Up @@ -1264,6 +1294,10 @@ def send(self, content,
The timestamp is milliseconds and based on UTC
deliver_after: optional
Specify a delay in timedelta for the delivery of the messages.

Returns
----------
A `_pulsar.MessageId` object that represents where the message is persisted.
"""
msg = self._build_msg(content, properties, partition_key, ordering_key, sequence_id,
replication_clusters, disable_replication, event_timestamp,
Expand Down Expand Up @@ -1502,7 +1536,7 @@ def batch_receive(self):
messages.append(m)
return messages

def acknowledge(self, message):
def acknowledge(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]):
"""
Acknowledge the reception of a single message.

Expand All @@ -1511,7 +1545,7 @@ def acknowledge(self, message):

Parameters
----------
message : Message, _pulsar.Message, _pulsar.MessageId
message : Message, MessageId, _pulsar.Message, _pulsar.MessageId
The received message or message id.

Raises
Expand All @@ -1521,10 +1555,12 @@ def acknowledge(self, message):
"""
if isinstance(message, Message):
self._consumer.acknowledge(message._message)
elif isinstance(message, MessageId):
self._consumer.acknowledge(message._msg_id)
else:
self._consumer.acknowledge(message)

def acknowledge_cumulative(self, message):
def acknowledge_cumulative(self, message: Union[Message, MessageId, _pulsar.Message, _pulsar.MessageId]):
"""
Acknowledge the reception of all the messages in the stream up to (and
including) the provided message.
Expand All @@ -1545,6 +1581,8 @@ def acknowledge_cumulative(self, message):
"""
if isinstance(message, Message):
self._consumer.acknowledge_cumulative(message._message)
elif isinstance(message, MessageId):
self._consumer.acknowledge_cumulative(message._msg_id)
else:
self._consumer.acknowledge_cumulative(message)

Expand Down
23 changes: 23 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,29 @@ def test_message_id(self):
s = MessageId.latest.serialize()
self.assertEqual(MessageId.deserialize(s), MessageId.latest)

client = Client(self.serviceUrl)
topic = f'test-message-id-compare-{str(time.time())}'
producer = client.create_producer(topic)
consumer = client.subscribe(topic, 'sub')

sent_ids = []
received_ids = []
for i in range(5):
sent_ids.append(MessageId.wrap(producer.send(b'msg-%d' % i)))
msg = consumer.receive(TM)
received_ids.append(MessageId.wrap(msg.message_id()))
self.assertEqual(sent_ids[i], received_ids[i])
consumer.acknowledge(received_ids[i])
consumer.acknowledge_cumulative(received_ids[4])

for i in range(4):
self.assertLess(sent_ids[i], sent_ids[i + 1])
self.assertLessEqual(sent_ids[i], sent_ids[i + 1])
self.assertGreater(sent_ids[i + 1], sent_ids[i])
self.assertGreaterEqual(sent_ids[i + 1], sent_ids[i])
self.assertNotEqual(sent_ids[i], sent_ids[i + 1])
client.close()

def test_get_topics_partitions(self):
client = Client(self.serviceUrl)
topic_partitioned = "persistent://public/default/test_get_topics_partitions"
Expand Down