Skip to content

Commit 972ee9d

Browse files
poorbarcodelhotari
authored andcommitted
[fix] [broker] topics infinitely failed to delete after remove cluster from replicated clusters modifying when using partitioned system topic (#24097)
(cherry picked from commit abd5121)
1 parent 800f8cd commit 972ee9d

File tree

2 files changed

+222
-2
lines changed

2 files changed

+222
-2
lines changed

Diff for: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,27 @@ public CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName) {
136136
return CompletableFuture.completedFuture(null);
137137
}
138138
TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject());
139-
return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> {
139+
CompletableFuture<Boolean> changeEventTopicExists = pulsarService.getPulsarResources().getTopicResources()
140+
.persistentTopicExists(changeEvents).thenCompose(nonPartitionedExists -> {
141+
if (!nonPartitionedExists) {
142+
// To check whether partitioned __change_events exists.
143+
// Instead of checking partitioned metadata, we check the first partition, because there is a case
144+
// does not work if we choose checking partitioned metadata.
145+
// The case's details:
146+
// 1. Start 2 clusters: c1 and c2.
147+
// 2. Enable replication between c1 and c2 with a global ZK.
148+
// 3. The partitioned metadata was shared using by c1 and c2.
149+
// 4. Pulsar only delete partitions when the topic is deleting from c1, because c2 is still using
150+
// partitioned metadata.
151+
return pulsarService.getPulsarResources().getTopicResources()
152+
.persistentTopicExists(changeEvents.getPartition(0));
153+
}
154+
return CompletableFuture.completedFuture(true);
155+
});
156+
return changeEventTopicExists.thenCompose(exists -> {
140157
// If the system topic named "__change_events" has been deleted, it means all the data in the topic have
141158
// been deleted, so we do not need to delete the message that we want to delete again.
142-
if (!topicExistsInfo.isExists()) {
159+
if (!exists) {
143160
log.info("Skip delete topic-level policies because {} has been removed before", changeEvents);
144161
return CompletableFuture.completedFuture(null);
145162
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import static org.testng.Assert.assertFalse;
22+
import static org.testng.Assert.assertTrue;
23+
import java.time.Duration;
24+
import java.util.Arrays;
25+
import java.util.HashSet;
26+
import java.util.Optional;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.TimeUnit;
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.pulsar.broker.ServiceConfiguration;
31+
import org.apache.pulsar.client.api.Producer;
32+
import org.apache.pulsar.client.api.Schema;
33+
import org.apache.pulsar.common.naming.TopicName;
34+
import org.apache.pulsar.common.policies.data.TopicType;
35+
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
36+
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
37+
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
38+
import org.awaitility.Awaitility;
39+
import org.testng.annotations.AfterClass;
40+
import org.testng.annotations.BeforeClass;
41+
import org.testng.annotations.Test;
42+
43+
@Slf4j
44+
@Test(groups = "broker")
45+
public class OneWayReplicatorUsingGlobalPartitionedTest extends OneWayReplicatorTest {
46+
47+
@Override
48+
@BeforeClass(alwaysRun = true, timeOut = 300000)
49+
public void setup() throws Exception {
50+
super.usingGlobalZK = true;
51+
super.setup();
52+
}
53+
54+
@Override
55+
@AfterClass(alwaysRun = true, timeOut = 300000)
56+
public void cleanup() throws Exception {
57+
super.cleanup();
58+
}
59+
60+
@Override
61+
protected void setConfigDefaults(ServiceConfiguration config, String clusterName,
62+
LocalBookkeeperEnsemble bookkeeperEnsemble, ZookeeperServerTest brokerConfigZk) {
63+
super.setConfigDefaults(config, clusterName, bookkeeperEnsemble, brokerConfigZk);
64+
config.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
65+
config.setDefaultNumPartitions(1);
66+
}
67+
68+
@Override
69+
@Test(enabled = false)
70+
public void testReplicatorProducerStatInTopic() throws Exception {
71+
super.testReplicatorProducerStatInTopic();
72+
}
73+
74+
@Override
75+
@Test(enabled = false)
76+
public void testCreateRemoteConsumerFirst() throws Exception {
77+
super.testReplicatorProducerStatInTopic();
78+
}
79+
80+
@Override
81+
@Test(enabled = false)
82+
public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
83+
super.testReplicatorProducerStatInTopic();
84+
}
85+
86+
@Override
87+
@Test(enabled = false)
88+
public void testConcurrencyOfUnloadBundleAndRecreateProducer() throws Exception {
89+
super.testConcurrencyOfUnloadBundleAndRecreateProducer();
90+
}
91+
92+
@Override
93+
@Test(enabled = false)
94+
public void testPartitionedTopicLevelReplication() throws Exception {
95+
super.testPartitionedTopicLevelReplication();
96+
}
97+
98+
@Override
99+
@Test(enabled = false)
100+
public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception {
101+
super.testPartitionedTopicLevelReplicationRemoteTopicExist();
102+
}
103+
104+
@Override
105+
@Test(enabled = false)
106+
public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception {
107+
super.testPartitionedTopicLevelReplicationRemoteConflictTopicExist();
108+
}
109+
110+
@Override
111+
@Test(enabled = false)
112+
public void testConcurrencyOfUnloadBundleAndRecreateProducer2() throws Exception {
113+
super.testConcurrencyOfUnloadBundleAndRecreateProducer2();
114+
}
115+
116+
@Override
117+
@Test(enabled = false)
118+
public void testUnFenceTopicToReuse() throws Exception {
119+
super.testUnFenceTopicToReuse();
120+
}
121+
122+
@Override
123+
@Test(enabled = false)
124+
public void testDeleteNonPartitionedTopic() throws Exception {
125+
super.testDeleteNonPartitionedTopic();
126+
}
127+
128+
@Override
129+
@Test(enabled = false)
130+
public void testDeletePartitionedTopic() throws Exception {
131+
super.testDeletePartitionedTopic();
132+
}
133+
134+
@Override
135+
@Test(enabled = false)
136+
public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception {
137+
super.testNoExpandTopicPartitionsWhenDisableTopicLevelReplication();
138+
}
139+
140+
@Override
141+
@Test(enabled = false)
142+
public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception {
143+
super.testExpandTopicPartitionsOnNamespaceLevelReplication();
144+
}
145+
146+
@Override
147+
@Test(enabled = false)
148+
public void testReloadWithTopicLevelGeoReplication(ReplicationLevel replicationLevel) throws Exception {
149+
super.testReloadWithTopicLevelGeoReplication(replicationLevel);
150+
}
151+
152+
@Test(enabled = false)
153+
@Override
154+
public void testConfigReplicationStartAt() throws Exception {
155+
super.testConfigReplicationStartAt();
156+
}
157+
158+
@Test(enabled = false)
159+
@Override
160+
public void testDifferentTopicCreationRule(ReplicationMode replicationMode) throws Exception {
161+
super.testDifferentTopicCreationRule(replicationMode);
162+
}
163+
164+
@Test(timeOut = 60_000)
165+
public void testRemoveCluster() throws Exception {
166+
// Initialize.
167+
final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8";
168+
final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a";
169+
final String topicChangeEvents = "persistent://" + ns1 + "/__change_events-partition-0";
170+
admin1.namespaces().createNamespace(ns1);
171+
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2)));
172+
admin1.topics().createNonPartitionedTopic(topic);
173+
174+
// Wait for loading topic up.
175+
Producer<String> p = client1.newProducer(Schema.STRING).topic(topic).create();
176+
Awaitility.await().untilAsserted(() -> {
177+
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps =
178+
pulsar1.getBrokerService().getTopics();
179+
assertTrue(tps.containsKey(topic));
180+
assertTrue(tps.containsKey(topicChangeEvents));
181+
});
182+
183+
// The topics under the namespace of the cluster-1 will be deleted.
184+
// Verify the result.
185+
admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2)));
186+
Awaitility.await().atMost(Duration.ofSeconds(60)).ignoreExceptions().untilAsserted(() -> {
187+
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> tps =
188+
pulsar1.getBrokerService().getTopics();
189+
assertFalse(tps.containsKey(topic));
190+
assertFalse(tps.containsKey(topicChangeEvents));
191+
assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic))
192+
.get(5, TimeUnit.SECONDS).isExists());
193+
assertFalse(pulsar1.getNamespaceService()
194+
.checkTopicExists(TopicName.get(topicChangeEvents))
195+
.get(5, TimeUnit.SECONDS).isExists());
196+
});
197+
198+
// cleanup.
199+
p.close();
200+
admin2.topics().delete(topic);
201+
admin2.namespaces().deleteNamespace(ns1);
202+
}
203+
}

0 commit comments

Comments
 (0)