Skip to content

Commit e751244

Browse files
poorbarcodelhotaricodelipenghui
authored andcommitted
[fix][broker]fix memory leak, messages lost, incorrect replication state if using multiple schema versions(auto_produce) (apache#24178)
Co-authored-by: Lari Hotari <[email protected]> Co-authored-by: Penghui Li <[email protected]> (cherry picked from commit 82e1d20) (cherry picked from commit 0f7aace)
1 parent c42f1f9 commit e751244

File tree

12 files changed

+431
-36
lines changed

12 files changed

+431
-36
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,13 @@
3838
import java.lang.reflect.Field;
3939
import java.lang.reflect.Method;
4040
import java.time.Duration;
41+
import java.util.ArrayList;
4142
import java.util.Arrays;
43+
import java.util.Collection;
4244
import java.util.Collections;
4345
import java.util.HashSet;
4446
import java.util.Iterator;
47+
import java.util.LinkedHashSet;
4548
import java.util.List;
4649
import java.util.Optional;
4750
import java.util.Set;
@@ -79,6 +82,7 @@
7982
import org.apache.pulsar.client.api.ProducerBuilder;
8083
import org.apache.pulsar.client.api.PulsarClient;
8184
import org.apache.pulsar.client.api.Schema;
85+
import org.apache.pulsar.client.api.schema.GenericRecord;
8286
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
8387
import org.apache.pulsar.client.impl.ProducerImpl;
8488
import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -88,9 +92,12 @@
8892
import org.apache.pulsar.common.policies.data.ClusterData;
8993
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
9094
import org.apache.pulsar.common.policies.data.RetentionPolicies;
95+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
9196
import org.apache.pulsar.common.policies.data.TenantInfo;
9297
import org.apache.pulsar.common.policies.data.TopicStats;
9398
import org.apache.pulsar.common.policies.data.impl.AutoTopicCreationOverrideImpl;
99+
import org.apache.pulsar.common.schema.SchemaInfo;
100+
import org.apache.pulsar.common.schema.SchemaType;
94101
import org.apache.pulsar.common.util.FutureUtil;
95102
import org.awaitility.Awaitility;
96103
import org.awaitility.reflect.WhiteboxImpl;
@@ -1324,4 +1331,95 @@ public void testCloseTopicAfterStartReplicationFailed() throws Exception {
13241331
admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1));
13251332
admin1.topics().delete(topicName, false);
13261333
}
1334+
1335+
@DataProvider
1336+
public Object[][] enableDeduplication() {
1337+
return new Object[][] {
1338+
{false},
1339+
{true},
1340+
};
1341+
}
1342+
1343+
@Test(dataProvider = "enableDeduplication")
1344+
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
1345+
final String topicName = BrokerTestUtil.newUniqueName("persistent://"
1346+
+ sourceClusterAlwaysSchemaCompatibleNamespace + "/tp_");
1347+
final String subscriptionName = "s1";
1348+
// 1.Create topic.
1349+
admin1.topics().createNonPartitionedTopic(topicName);
1350+
Producer<byte[]> producer1 = client1.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
1351+
waitReplicatorStarted(topicName);
1352+
admin1.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
1353+
admin2.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
1354+
if (enableDeduplication) {
1355+
admin2.topicPolicies().setDeduplicationStatus(topicName, true);
1356+
}
1357+
// 2. Publish messages with multiple schemas.
1358+
producer1.newMessage(Schema.STRING).value("msg1").send();
1359+
producer1.newMessage(Schema.BOOL).value(false).send();
1360+
producer1.newMessage(Schema.STRING).value("msg3").send();
1361+
// 3. several unloading, which causes replicator internal producer reconnects.
1362+
for (int i = 0; i < 3; i++) {
1363+
Thread.sleep(2000);
1364+
admin2.topics().unload(topicName);
1365+
waitReplicatorStarted(topicName);
1366+
}
1367+
// Verify: no individual acks.
1368+
Awaitility.await().untilAsserted(() -> {
1369+
PersistentTopic persistentTopic2 =
1370+
(PersistentTopic) pulsar2.getBrokerService().getTopic(topicName, false).join().get();
1371+
assertTrue(
1372+
persistentTopic2.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(true) > 0);
1373+
PersistentTopic persistentTopic1 =
1374+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
1375+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
1376+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2");
1377+
assertEquals(cursor.getTotalNonContiguousDeletedMessagesRange(), 0);
1378+
assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) < 0);
1379+
});
1380+
// 4. Adjust schema compatibility and unload topic on the remote side, which will solve the replication stuck
1381+
// issue.
1382+
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
1383+
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
1384+
admin2.topics().unload(topicName);
1385+
admin1.topics().unload(topicName);
1386+
Awaitility.await().untilAsserted(() -> {
1387+
PersistentTopic persistentTopic1 =
1388+
(PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).join().get();
1389+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic1.getManagedLedger();
1390+
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get("pulsar.repl.r2");
1391+
assertTrue(cursor.getMarkDeletedPosition().compareTo(ml.getLastConfirmedEntry()) >= 0);
1392+
});
1393+
// Verify: no out-of-order; schemas are as expected.
1394+
Consumer<GenericRecord> consumer2 = client2.newConsumer(Schema.AUTO_CONSUME()).topic(topicName)
1395+
.subscriptionName(subscriptionName).subscribe();
1396+
Collection<String> msgReceived;
1397+
if (enableDeduplication) {
1398+
msgReceived = new ArrayList<>();
1399+
} else {
1400+
msgReceived = new LinkedHashSet<>();
1401+
}
1402+
while (true) {
1403+
Message<GenericRecord> message = consumer2.receive(2, TimeUnit.SECONDS);
1404+
if (message == null) {
1405+
break;
1406+
}
1407+
SchemaType schemaType = message.getValue().getSchemaType();
1408+
assertTrue(schemaType.equals(SchemaType.STRING) || schemaType.equals(SchemaType.BOOLEAN));
1409+
msgReceived.add(message.getValue().getNativeObject().toString());
1410+
log.info("received msg: {}", message.getValue().getNativeObject().toString());
1411+
}
1412+
assertEquals(msgReceived, Arrays.asList("msg1", "false", "msg3"));
1413+
List<SchemaInfo> schemaInfoList = admin2.schemas().getAllSchemas(topicName);
1414+
assertEquals(schemaInfoList.size(), 2);
1415+
assertEquals(schemaInfoList.get(0).getType(), SchemaType.STRING);
1416+
assertEquals(schemaInfoList.get(1).getType(), SchemaType.BOOLEAN);
1417+
1418+
// cleanup.
1419+
consumer2.close();
1420+
producer1.close();
1421+
admin2.topics().deleteSubscription(topicName, subscriptionName);
1422+
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
1423+
SchemaCompatibilityStrategy.FORWARD);
1424+
}
13271425
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTestBase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.pulsar.common.naming.SystemTopicNames;
5050
import org.apache.pulsar.common.naming.TopicName;
5151
import org.apache.pulsar.common.policies.data.ClusterData;
52+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5253
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
5354
import org.apache.pulsar.common.policies.data.TopicPolicies;
5455
import org.apache.pulsar.common.policies.data.TopicType;
@@ -65,6 +66,7 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
6566
protected final String defaultTenant = "public";
6667
protected final String replicatedNamespace = defaultTenant + "/default";
6768
protected final String nonReplicatedNamespace = defaultTenant + "/ns1";
69+
protected final String sourceClusterAlwaysSchemaCompatibleNamespace = defaultTenant + "/always-compatible";
6870

6971
protected final String cluster1 = "r1";
7072

@@ -157,6 +159,10 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
157159
admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
158160
Sets.newHashSet(cluster1, cluster2)));
159161
admin1.namespaces().createNamespace(replicatedNamespace, Sets.newHashSet(cluster1, cluster2));
162+
admin1.namespaces().createNamespace(
163+
sourceClusterAlwaysSchemaCompatibleNamespace, Sets.newHashSet(cluster1, cluster2));
164+
admin1.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
165+
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
160166
admin1.namespaces().createNamespace(nonReplicatedNamespace);
161167

162168
if (!usingGlobalZK) {
@@ -177,6 +183,9 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
177183
admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
178184
Sets.newHashSet(cluster1, cluster2)));
179185
admin2.namespaces().createNamespace(replicatedNamespace);
186+
admin2.namespaces().createNamespace(sourceClusterAlwaysSchemaCompatibleNamespace);
187+
admin2.namespaces().setSchemaCompatibilityStrategy(sourceClusterAlwaysSchemaCompatibleNamespace,
188+
SchemaCompatibilityStrategy.FORWARD);
180189
admin2.namespaces().createNamespace(nonReplicatedNamespace);
181190
}
182191

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalPartitionedTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,4 +235,10 @@ public void testRemoveCluster() throws Exception {
235235
admin2.topics().deletePartitionedTopic(topic);
236236
admin2.namespaces().deleteNamespace(ns1);
237237
}
238+
239+
@Override
240+
@Test(dataProvider = "enableDeduplication", enabled = false)
241+
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
242+
super.testIncompatibleMultiVersionSchema(enableDeduplication);
243+
}
238244
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,4 +223,10 @@ public void testRemoveCluster() throws Exception {
223223
admin2.topics().delete(topic);
224224
admin2.namespaces().deleteNamespace(ns1);
225225
}
226+
227+
@Override
228+
@Test(dataProvider = "enableDeduplication", enabled = false)
229+
public void testIncompatibleMultiVersionSchema(boolean enableDeduplication) throws Exception {
230+
super.testIncompatibleMultiVersionSchema(enableDeduplication);
231+
}
226232
}

pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleSchemaTest.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Arrays;
3232
import java.util.List;
3333
import java.util.UUID;
34+
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.ExecutionException;
3536
import java.util.concurrent.TimeUnit;
3637
import lombok.AllArgsConstructor;
@@ -42,25 +43,33 @@
4243
import org.apache.avro.Schema.Parser;
4344
import org.apache.avro.reflect.ReflectData;
4445
import org.apache.pulsar.TestNGInstanceOrder;
46+
import org.apache.pulsar.broker.BrokerTestUtil;
47+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
4548
import org.apache.pulsar.client.admin.PulsarAdminException;
4649
import org.apache.pulsar.client.api.PulsarClientException.IncompatibleSchemaException;
4750
import org.apache.pulsar.client.api.PulsarClientException.InvalidMessageException;
4851
import org.apache.pulsar.client.api.schema.GenericRecord;
4952
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
53+
import org.apache.pulsar.client.impl.ClientBuilderImpl;
54+
import org.apache.pulsar.client.impl.ClientCnx;
5055
import org.apache.pulsar.client.impl.HttpLookupService;
5156
import org.apache.pulsar.client.impl.LookupService;
5257
import org.apache.pulsar.client.impl.MessageImpl;
5358
import org.apache.pulsar.client.impl.PulsarClientImpl;
59+
import org.apache.pulsar.client.impl.metrics.InstrumentProvider;
5460
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
5561
import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
5662
import org.apache.pulsar.client.impl.schema.reader.AvroReader;
5763
import org.apache.pulsar.client.impl.schema.writer.AvroWriter;
64+
import org.apache.pulsar.common.api.proto.CommandGetOrCreateSchemaResponse;
5865
import org.apache.pulsar.common.naming.TopicName;
66+
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
5967
import org.apache.pulsar.common.schema.KeyValue;
6068
import org.apache.pulsar.common.schema.KeyValueEncodingType;
6169
import org.apache.pulsar.common.schema.LongSchemaVersion;
6270
import org.apache.pulsar.common.schema.SchemaInfo;
6371
import org.apache.pulsar.common.schema.SchemaType;
72+
import org.awaitility.Awaitility;
6473
import org.testng.Assert;
6574
import org.testng.annotations.AfterClass;
6675
import org.testng.annotations.AfterMethod;
@@ -76,6 +85,8 @@
7685
public class SimpleSchemaTest extends ProducerConsumerBase {
7786

7887
private static final String NAMESPACE = "my-property/my-ns";
88+
private static final String NAMESPACE_ALWAYS_COMPATIBLE = "my-property/always-compatible";
89+
private static final String NAMESPACE_NEVER_COMPATIBLE = "my-property/never-compatible";
7990

8091
@DataProvider(name = "batchingModes")
8192
public static Object[][] batchingModes() {
@@ -124,6 +135,12 @@ protected void setup() throws Exception {
124135
this.isTcpLookup = true;
125136
super.internalSetup();
126137
super.producerBaseSetup();
138+
admin.namespaces().createNamespace(NAMESPACE_ALWAYS_COMPATIBLE);
139+
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_ALWAYS_COMPATIBLE,
140+
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
141+
admin.namespaces().createNamespace(NAMESPACE_NEVER_COMPATIBLE);
142+
admin.namespaces().setSchemaCompatibilityStrategy(NAMESPACE_NEVER_COMPATIBLE,
143+
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
127144
}
128145

129146
@AfterClass(alwaysRun = true)
@@ -340,6 +357,78 @@ public void newProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Ex
340357
}
341358
}
342359

360+
@Test
361+
public void testProducerConnectStateWhenRegisteringSchema() throws Exception {
362+
final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_ALWAYS_COMPATIBLE + "/tp");
363+
final String subscription = "s1";
364+
admin.topics().createNonPartitionedTopic(topic);
365+
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
366+
367+
// Create a pulsar client with a delayed response of "getOrCreateSchemaResponse"
368+
CompletableFuture<Void> responseSignal = new CompletableFuture<>();
369+
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
370+
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
371+
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
372+
protected void handleGetOrCreateSchemaResponse(
373+
CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
374+
responseSignal.join();
375+
super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse);
376+
}
377+
});
378+
Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create();
379+
producer.newMessage(Schema.STRING).value("msg").sendAsync();
380+
381+
PersistentTopic persistentTopic =
382+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
383+
assertEquals(persistentTopic.getProducers().size(), 1);
384+
assertTrue(producer.isConnected());
385+
386+
// cleanup.
387+
responseSignal.complete(null);
388+
producer.close();
389+
client.close();
390+
Awaitility.await().untilAsserted(() -> {
391+
assertEquals(persistentTopic.getProducers().size(), 0);
392+
});
393+
admin.topics().delete(topic);
394+
}
395+
396+
@Test
397+
public void testNoMemoryLeakIfSchemaIncompatible() throws Exception {
398+
final String topic = BrokerTestUtil.newUniqueName(NAMESPACE_NEVER_COMPATIBLE + "/tp");
399+
final String subscription = "s1";
400+
admin.topics().createNonPartitionedTopic(topic);
401+
admin.topics().createSubscription(topic, subscription, MessageId.earliest);
402+
403+
// Create a pulsar client with a delayed response of "getOrCreateSchemaResponse"
404+
CompletableFuture<Void> responseSignal = new CompletableFuture<>();
405+
ClientBuilderImpl clientBuilder = (ClientBuilderImpl) PulsarClient.builder().serviceUrl(lookupUrl.toString());
406+
PulsarClient client = InjectedClientCnxClientBuilder.create(clientBuilder, (conf, eventLoopGroup) ->
407+
new ClientCnx(InstrumentProvider.NOOP, conf, eventLoopGroup) {
408+
protected void handleGetOrCreateSchemaResponse(
409+
CommandGetOrCreateSchemaResponse commandGetOrCreateSchemaResponse) {
410+
responseSignal.join();
411+
super.handleGetOrCreateSchemaResponse(commandGetOrCreateSchemaResponse);
412+
}
413+
});
414+
Producer producer = client.newProducer(Schema.AUTO_PRODUCE_BYTES()).enableBatching(false).topic(topic).create();
415+
producer.newMessage(Schema.STRING).value("msg").sendAsync();
416+
417+
PersistentTopic persistentTopic =
418+
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).get().get();
419+
assertEquals(persistentTopic.getProducers().size(), 1);
420+
assertTrue(producer.isConnected());
421+
422+
// cleanup.
423+
responseSignal.complete(null);
424+
producer.close();
425+
client.close();
426+
Awaitility.await().untilAsserted(() -> {
427+
assertEquals(persistentTopic.getProducers().size(), 0);
428+
});
429+
admin.topics().delete(topic);
430+
}
431+
343432
@Test
344433
public void newNativeAvroProducerForMessageSchemaOnTopicWithMultiVersionSchema() throws Exception {
345434
String topic = NAMESPACE + "/schema-test";

0 commit comments

Comments
 (0)