Skip to content

Commit 48b59b6

Browse files
committed
feat: support producer access mode.
1 parent 766db9e commit 48b59b6

File tree

4 files changed

+82
-3
lines changed

4 files changed

+82
-3
lines changed

pulsar/__init__.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode # noqa: F401
5252

5353
from pulsar.__about__ import __version__
5454

@@ -523,7 +523,8 @@ def create_producer(self, topic,
523523
properties=None,
524524
batching_type=BatchingType.Default,
525525
encryption_key=None,
526-
crypto_key_reader=None
526+
crypto_key_reader=None,
527+
access_mode=ProducerAccessMode.Shared,
527528
):
528529
"""
529530
Create a new producer on a given topic.
@@ -614,6 +615,17 @@ def create_producer(self, topic,
614615
crypto_key_reader: CryptoKeyReader, optional
615616
Symmetric encryption class implementation, configuring public key encryption messages for the producer
616617
and private key decryption messages for the consumer
618+
access_mode: ProducerAccessMode, optional
619+
Set the type of access mode that the producer requires on the topic.
620+
621+
Supported modes:
622+
623+
* Shared: By default multiple producers can publish on a topic.
624+
* Exclusive: Require exclusive access for producer.
625+
Fail immediately if there's already a producer connected.
626+
* WaitForExclusive: Producer creation is pending until it can acquire exclusive access.
627+
* ExclusiveWithFencing: Acquire exclusive access for the producer.
628+
Any existing producer will be removed and invalidated immediately.
617629
"""
618630
_check_type(str, topic, 'topic')
619631
_check_type_or_none(str, producer_name, 'producer_name')
@@ -634,6 +646,7 @@ def create_producer(self, topic,
634646
_check_type_or_none(str, encryption_key, 'encryption_key')
635647
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
636648
_check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
649+
_check_type(ProducerAccessMode, access_mode, 'access_mode')
637650

638651
conf = _pulsar.ProducerConfiguration()
639652
conf.send_timeout_millis(send_timeout_millis)
@@ -649,6 +662,7 @@ def create_producer(self, topic,
649662
conf.batching_type(batching_type)
650663
conf.chunking_enabled(chunking_enabled)
651664
conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
665+
conf.access_mode(access_mode)
652666
if producer_name:
653667
conf.producer_name(producer_name)
654668
if initial_sequence_id:

src/config.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,9 @@ void export_config(py::module_& m) {
223223
.def("batching_type", &ProducerConfiguration::setBatchingType, return_value_policy::reference)
224224
.def("batching_type", &ProducerConfiguration::getBatchingType)
225225
.def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_value_policy::reference)
226-
.def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, return_value_policy::reference);
226+
.def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, return_value_policy::reference)
227+
.def("access_mode", &ProducerConfiguration::setAccessMode, return_value_policy::reference)
228+
.def("access_mode", &ProducerConfiguration::getAccessMode, return_value_policy::copy);
227229

228230
class_<BatchReceivePolicy>(m, "BatchReceivePolicy")
229231
.def(init<int, int, long>())

src/enums.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ void export_enums(py::module_& m) {
124124
.value("Default", ProducerConfiguration::DefaultBatching)
125125
.value("KeyBased", ProducerConfiguration::KeyBasedBatching);
126126

127+
enum_<ProducerConfiguration::ProducerAccessMode>(m, "ProducerAccessMode", "Producer Access Mode")
128+
.value("Shared", ProducerConfiguration::ProducerAccessMode::Shared)
129+
.value("Exclusive", ProducerConfiguration::ProducerAccessMode::Exclusive)
130+
.value("WaitForExclusive", ProducerConfiguration::ProducerAccessMode::WaitForExclusive)
131+
.value("ExclusiveWithFencing", ProducerConfiguration::ProducerAccessMode::ExclusiveWithFencing);
132+
127133
enum_<Logger::Level>(m, "LoggerLevel")
128134
.value("Debug", Logger::LEVEL_DEBUG)
129135
.value("Info", Logger::LEVEL_INFO)

tests/pulsar_test.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
InitialPosition,
4343
CryptoKeyReader,
4444
ConsumerBatchReceivePolicy,
45+
ProducerAccessMode,
4546
)
4647
from pulsar.schema import JsonSchema, Record, Integer
4748

@@ -166,6 +167,62 @@ def test_producer_send(self):
166167
self.assertEqual(msg_id, msg.message_id())
167168
client.close()
168169

170+
def test_producer_access_mode_exclusive(self):
171+
client = Client(self.serviceUrl)
172+
topic_name = "test-access-mode-exclusive"
173+
client.create_producer(topic_name, producer_name="p1", access_mode=ProducerAccessMode.Exclusive)
174+
with self.assertRaises(pulsar.ProducerFenced):
175+
client.create_producer(topic_name, producer_name="p2", access_mode=ProducerAccessMode.Exclusive)
176+
client.close()
177+
178+
def test_producer_access_mode_wait_exclusive(self):
179+
client = Client(self.serviceUrl)
180+
topic_name = "test_producer_access_mode_wait_exclusive"
181+
producer1 = client.create_producer(
182+
topic=topic_name,
183+
producer_name='p-1',
184+
access_mode=ProducerAccessMode.Exclusive
185+
)
186+
assert producer1.producer_name() == 'p-1'
187+
188+
# when p1 close, p2 success created.
189+
producer1.close()
190+
producer2 = client.create_producer(
191+
topic=topic_name,
192+
producer_name='p-2',
193+
access_mode=ProducerAccessMode.WaitForExclusive
194+
)
195+
assert producer2.producer_name() == 'p-2'
196+
197+
producer2.close()
198+
client.close()
199+
200+
def test_producer_access_mode_exclusive_with_fencing(self):
201+
client = Client(self.serviceUrl)
202+
topic_name = 'test_producer_access_mode_exclusive_with_fencing'
203+
204+
producer1 = client.create_producer(
205+
topic=topic_name,
206+
producer_name='p-1',
207+
access_mode=ProducerAccessMode.Exclusive
208+
)
209+
assert producer1.producer_name() == 'p-1'
210+
211+
producer2 = client.create_producer(
212+
topic=topic_name,
213+
producer_name='p-2',
214+
access_mode=ProducerAccessMode.ExclusiveWithFencing
215+
)
216+
assert producer2.producer_name() == 'p-2'
217+
218+
# producer1 will be fenced.
219+
time.sleep(0.2)
220+
with self.assertRaises((pulsar.ProducerFenced, pulsar.AlreadyClosed)):
221+
producer1.send('test-msg'.encode('utf-8'))
222+
223+
producer2.close()
224+
client.close()
225+
169226
def test_producer_is_connected(self):
170227
client = Client(self.serviceUrl)
171228
topic = "test_producer_is_connected"

0 commit comments

Comments
 (0)