Skip to content

Commit 5496d92

Browse files
poorbarcodelhotari
authored andcommitted
[fix][client] Fix producer publishing getting stuck after message with incompatible schema is discarded (#24282)
Fixes #24262 Main Issue: #24262 ### Motivation The issue is a regression of #24178. The flow of the issue occurring is as follows - Publish msg 1, which has an incompatible schema - Publish msg 2, which has an incompatible schema - The first message's schema failed to register because it is incompatible - The message was discarded. - Issue: it did not trigger a schema registration of the following messages ### Modifications Fix the issue (cherry picked from commit df2c619)
1 parent e5d309c commit 5496d92

File tree

2 files changed

+26
-9
lines changed

2 files changed

+26
-9
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1450,8 +1450,7 @@ public User(String name) {
14501450
}
14511451

14521452

1453-
// This test fails consistently, disabling until it is fixed. Issue https://github.com/apache/pulsar/issues/24262
1454-
@Test(enabled = false)
1453+
@Test
14551454
public void testPendingQueueSizeIfIncompatible() throws Exception {
14561455
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + "/ns");
14571456
admin.namespaces().createNamespace(namespace, Sets.newHashSet(CLUSTER_NAME));
@@ -1460,17 +1459,28 @@ public void testPendingQueueSizeIfIncompatible() throws Exception {
14601459
admin.topics().createNonPartitionedTopic(topic);
14611460

14621461
ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
1463-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
1464-
producer.newMessage(Schema.STRING).value("msg").sendAsync();
1462+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
1463+
producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
14651464
AtomicReference<CompletableFuture<MessageId>> latestSend = new AtomicReference<>();
14661465
for (int i = 0; i < 100; i++) {
1467-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
1466+
final String msg = "msg-with-broken-schema-" + i;
1467+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v -> {
1468+
log.info("send complete {}", msg);
1469+
return null;
1470+
}).exceptionally(ex -> {
1471+
log.error("failed to send {}", msg, ex);
1472+
return null;
1473+
}));
14681474
}
1475+
// Verify: msgs with broken schema will be discarded.
14691476
Awaitility.await().untilAsserted(() -> {
14701477
assertTrue(latestSend.get().isDone());
14711478
assertEquals(producer.getPendingQueueSize(), 0);
14721479
});
14731480

1481+
// Verify: msgs with compatible schema can be sent successfully.
1482+
producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
1483+
14741484
// cleanup.
14751485
producer.close();
14761486
admin.topics().delete(topic, false);

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2374,6 +2374,7 @@ protected void updateLastSeqPushed(OpSendMsg op) {
23742374
* 3-1-1. If {@link #pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all following
23752375
* publishing to avoid out-of-order issue.
23762376
* 3-1-2. Otherwise, discard the failed message anc continuously publishing the following messages.
2377+
* Additionally, the following messages may need schema registration also.
23772378
* 3-2. The new schema registration failed due to other error, retry registering.
23782379
* Note: Since the current method accesses & modifies {@link #pendingMessages}, you should acquire a lock on
23792380
* {@link ProducerImpl} before calling method.
@@ -2392,6 +2393,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
23922393
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
23932394
MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
23942395
OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
2396+
boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false;
23952397
while (msgIterator.hasNext()) {
23962398
OpSendMsg op = msgIterator.next();
23972399
if (loopStartAt != null) {
@@ -2436,6 +2438,7 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
24362438
+ " 2) Unload topic on target cluster. Schema details: {}",
24372439
topic, producerName, SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
24382440
loopEndDueToSchemaRegisterNeeded = op;
2441+
pausedSendingToPreservePublishOrderOnSchemaRegFailure = true;
24392442
break;
24402443
}
24412444
// Event 3-1-2.
@@ -2491,23 +2494,27 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl latestMsgAtt
24912494
}
24922495
cnx.ctx().flush();
24932496

2494-
// "Event 1-1" or "Event 3-1-1" or "Event 3-2".
2497+
// "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2".
24952498
if (loopEndDueToSchemaRegisterNeeded != null) {
24962499
if (compareAndSetState(State.Connecting, State.Ready)) {
24972500
// "Event 1-1" happens after "Event 3-1-1".
24982501
// After a topic unload, ask the producer retry to register schema, which avoids restart client
24992502
// after users changed the compatibility strategy to make the schema is compatible.
25002503
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25012504
expectedEpoch);
2502-
} else if (!failedIncompatibleSchema && compareAndSetState(State.RegisteringSchema, State.Ready)) {
2503-
// "Event 2-1" or "Event 3-2".
2505+
} else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) {
2506+
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2507+
return;
2508+
} else if (compareAndSetState(State.RegisteringSchema, State.Ready)) {
2509+
// "Event 2-1" or "Event 3-1-2" or "Event 3-2".
25042510
// "pendingMessages" has more messages to register new schema.
25052511
// This operation will not be conflict with another schema registration because both operations are
25062512
// attempt to acquire the same lock "ProducerImpl.this".
25072513
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, loopEndDueToSchemaRegisterNeeded.callback,
25082514
expectedEpoch);
25092515
}
2510-
// Nothing to do if the event is "Event 3-1-1", just keep stuck.
2516+
// Schema registration will trigger a new "recoverProcessOpSendMsgFrom", so return here. If failed to switch
2517+
// state, it means another task will trigger a new "recoverProcessOpSendMsgFrom".
25112518
return;
25122519
} else if (latestMsgAttemptedRegisteredSchema != null) {
25132520
// Event 2-2 or "Event 3-1-2".

0 commit comments

Comments
 (0)