|
42 | 42 | InitialPosition,
|
43 | 43 | CryptoKeyReader,
|
44 | 44 | ConsumerBatchReceivePolicy,
|
| 45 | + ProducerAccessMode, |
45 | 46 | )
|
46 | 47 | from pulsar.schema import JsonSchema, Record, Integer
|
47 | 48 |
|
@@ -166,6 +167,61 @@ def test_producer_send(self):
|
166 | 167 | self.assertEqual(msg_id, msg.message_id())
|
167 | 168 | client.close()
|
168 | 169 |
|
| 170 | + def test_producer_access_mode_exclusive(self): |
| 171 | + client = Client(self.serviceUrl) |
| 172 | + topic_name = "test-access-mode-exclusive" |
| 173 | + client.create_producer(topic_name, producer_name="p1", access_mode=pulsar.ProducerAccessMode.Exclusive) |
| 174 | + with self.assertRaises(pulsar.ProducerFenced): |
| 175 | + client.create_producer(topic_name, producer_name="p2", access_mode=pulsar.ProducerAccessMode.Exclusive) |
| 176 | + client.close() |
| 177 | + |
| 178 | + def test_producer_access_mode_wait_exclusive(self): |
| 179 | + client = Client(self.serviceUrl) |
| 180 | + topic_name = "test_producer_access_mode_wait_exclusive" |
| 181 | + producer1 = client.create_producer( |
| 182 | + topic=topic_name, |
| 183 | + producer_name='p-1', |
| 184 | + access_mode=pulsar.ProducerAccessMode.Exclusive |
| 185 | + ) |
| 186 | + assert producer1.producer_name() == 'p-1' |
| 187 | + |
| 188 | + # when p1 close, p2 success created. |
| 189 | + producer1.close() |
| 190 | + producer2 = client.create_producer( |
| 191 | + topic=topic_name, |
| 192 | + producer_name='p-2', |
| 193 | + access_mode=pulsar.ProducerAccessMode.WaitForExclusive |
| 194 | + ) |
| 195 | + assert producer2.producer_name() == 'p-2' |
| 196 | + |
| 197 | + producer2.close() |
| 198 | + client.close() |
| 199 | + |
| 200 | + def test_producer_access_mode_exclusive_with_fencing(self): |
| 201 | + client = Client(self.serviceUrl) |
| 202 | + topic_name = 'test_producer_access_mode_exclusive_with_fencing' |
| 203 | + |
| 204 | + producer1 = client.create_producer( |
| 205 | + topic=topic_name, |
| 206 | + producer_name='p-1', |
| 207 | + access_mode=pulsar.ProducerAccessMode.Exclusive |
| 208 | + ) |
| 209 | + assert producer1.producer_name() == 'p-1' |
| 210 | + |
| 211 | + producer2 = client.create_producer( |
| 212 | + topic=topic_name, |
| 213 | + producer_name='p-2', |
| 214 | + access_mode=pulsar.ProducerAccessMode.ExclusiveWithFencing |
| 215 | + ) |
| 216 | + assert producer2.producer_name() == 'p-2' |
| 217 | + |
| 218 | + # producer1 will be fenced. |
| 219 | + with self.assertRaises(pulsar.ProducerFenced): |
| 220 | + producer1.send('test-msg'.encode('utf-8')) |
| 221 | + |
| 222 | + producer2.close() |
| 223 | + client.close() |
| 224 | + |
169 | 225 | def test_producer_is_connected(self):
|
170 | 226 | client = Client(self.serviceUrl)
|
171 | 227 | topic = "test_producer_is_connected"
|
|
0 commit comments