Skip to content

Commit 26ab32f

Browse files
Support ConsumerCryptoFailureAction for consumer and reader (#253)
1 parent 581531a commit 26ab32f

File tree

4 files changed

+107
-4
lines changed

4 files changed

+107
-4
lines changed

pulsar/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
5151
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
52-
DeadLetterPolicyBuilder # noqa: F401
52+
DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401
5353

5454
from pulsar.__about__ import __version__
5555

@@ -876,6 +876,7 @@ def subscribe(self, topic, subscription_name,
876876
batch_index_ack_enabled=False,
877877
regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
878878
dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
879+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
879880
):
880881
"""
881882
Subscribe to the given topic and subscription combination.
@@ -979,6 +980,19 @@ def my_listener(consumer, message):
979980
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
980981
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
981982
automatically.
983+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
984+
Set the behavior when the decryption fails. The default is to fail the message.
985+
986+
Supported actions:
987+
988+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
989+
* ConsumerCryptoFailureAction.DISCARD:
990+
Message is silently acknowledged and not delivered to the application.
991+
* ConsumerCryptoFailureAction.CONSUME:
992+
Deliver the encrypted message to the application. It's the application's responsibility
993+
to decrypt the message. If message is also compressed, decompression will fail. If the
994+
message contains batch messages, client will not be able to retrieve individual messages
995+
in the batch.
982996
"""
983997
_check_type(str, subscription_name, 'subscription_name')
984998
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -1002,6 +1016,7 @@ def my_listener(consumer, message):
10021016
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
10031017
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
10041018
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
1019+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
10051020

10061021
conf = _pulsar.ConsumerConfiguration()
10071022
conf.consumer_type(consumer_type)
@@ -1040,6 +1055,7 @@ def my_listener(consumer, message):
10401055
conf.batch_index_ack_enabled(batch_index_ack_enabled)
10411056
if dead_letter_policy:
10421057
conf.dead_letter_policy(dead_letter_policy.policy())
1058+
conf.crypto_failure_action(crypto_failure_action)
10431059

10441060
c = Consumer()
10451061
if isinstance(topic, str):
@@ -1068,7 +1084,8 @@ def create_reader(self, topic, start_message_id,
10681084
subscription_role_prefix=None,
10691085
is_read_compacted=False,
10701086
crypto_key_reader: Union[None, CryptoKeyReader] = None,
1071-
start_message_id_inclusive=False
1087+
start_message_id_inclusive=False,
1088+
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
10721089
):
10731090
"""
10741091
Create a reader on a particular topic
@@ -1129,6 +1146,19 @@ def my_listener(reader, message):
11291146
and private key decryption messages for the consumer
11301147
start_message_id_inclusive: bool, default=False
11311148
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
1149+
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
1150+
Set the behavior when the decryption fails. The default is to fail the message.
1151+
1152+
Supported actions:
1153+
1154+
* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
1155+
* ConsumerCryptoFailureAction.DISCARD:
1156+
Message is silently acknowledged and not delivered to the application.
1157+
* ConsumerCryptoFailureAction.CONSUME:
1158+
Deliver the encrypted message to the application. It's the application's responsibility
1159+
to decrypt the message. If message is also compressed, decompression will fail. If the
1160+
message contains batch messages, client will not be able to retrieve individual messages
1161+
in the batch.
11321162
"""
11331163

11341164
# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
@@ -1144,6 +1174,7 @@ def my_listener(reader, message):
11441174
_check_type(bool, is_read_compacted, 'is_read_compacted')
11451175
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
11461176
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
1177+
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')
11471178

11481179
conf = _pulsar.ReaderConfiguration()
11491180
if reader_listener:
@@ -1158,6 +1189,7 @@ def my_listener(reader, message):
11581189
if crypto_key_reader:
11591190
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
11601191
conf.start_message_id_inclusive(start_message_id_inclusive)
1192+
conf.crypto_failure_action(crypto_failure_action)
11611193

11621194
c = Reader()
11631195
c._reader = self._client.create_reader(topic, start_message_id, conf)

src/config.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,11 @@ void export_config(py::module_& m) {
313313
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
314314
return_value_policy::reference)
315315
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
316-
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
316+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
317+
.def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
318+
return_value_policy::copy)
319+
.def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
320+
return_value_policy::reference);
317321

318322
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
319323
.def(init<>())
@@ -331,5 +335,9 @@ void export_config(py::module_& m) {
331335
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
332336
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
333337
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
334-
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
338+
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
339+
.def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
340+
return_value_policy::copy)
341+
.def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
342+
return_value_policy::reference);
335343
}

src/enums.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "utils.h"
2020
#include <pulsar/CompressionType.h>
2121
#include <pulsar/ConsumerConfiguration.h>
22+
#include <pulsar/ConsumerCryptoFailureAction.h>
2223
#include <pulsar/ProducerConfiguration.h>
2324
#include <pulsar/KeySharedPolicy.h>
2425
#include <pybind11/pybind11.h>
@@ -140,4 +141,9 @@ void export_enums(py::module_& m) {
140141
.value("Info", Logger::LEVEL_INFO)
141142
.value("Warn", Logger::LEVEL_WARN)
142143
.value("Error", Logger::LEVEL_ERROR);
144+
145+
enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
146+
.value("FAIL", ConsumerCryptoFailureAction::FAIL)
147+
.value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
148+
.value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
143149
}

tests/pulsar_test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,63 @@ def test_encryption(self):
482482

483483
client.close()
484484

485+
def test_encryption_failure(self):
486+
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
487+
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
488+
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
489+
client = Client(self.serviceUrl)
490+
topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
491+
producer = client.create_producer(
492+
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
493+
)
494+
producer.send(b"msg-0")
495+
496+
def verify_next_message(value: bytes):
497+
consumer = client.subscribe(topic, subscription,
498+
crypto_key_reader=crypto_key_reader)
499+
msg = consumer.receive(3000)
500+
self.assertEqual(msg.data(), value)
501+
consumer.acknowledge(msg)
502+
consumer.close()
503+
504+
subscription = "my-sub"
505+
consumer = client.subscribe(topic, subscription,
506+
initial_position=InitialPosition.Earliest,
507+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
508+
with self.assertRaises(pulsar.Timeout):
509+
consumer.receive(3000)
510+
consumer.close()
511+
producer.send(b"msg-1")
512+
verify_next_message(b"msg-0") # msg-0 won't be skipped
513+
514+
consumer = client.subscribe(topic, subscription,
515+
initial_position=InitialPosition.Earliest,
516+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
517+
with self.assertRaises(pulsar.Timeout):
518+
consumer.receive(3000)
519+
consumer.close()
520+
521+
producer.send(b"msg-2")
522+
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD
523+
524+
# Encrypted messages will be consumed since the crypto failure action is CONSUME
525+
consumer = client.subscribe(topic, 'another-sub',
526+
initial_position=InitialPosition.Earliest,
527+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
528+
for i in range(3):
529+
msg = consumer.receive(3000)
530+
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
531+
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
532+
533+
reader = client.create_reader(topic, MessageId.earliest,
534+
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
535+
for i in range(3):
536+
msg = reader.read_next(3000)
537+
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
538+
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")
539+
540+
client.close()
541+
485542
def test_tls_auth3(self):
486543
authPlugin = "tls"
487544
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)

0 commit comments

Comments
 (0)