Skip to content

Commit 804f87b

Browse files
TakaHiR07fanjianye
andauthored
[fix] producer do not create when topic update partition (#295)
Co-authored-by: fanjianye <[email protected]>
1 parent 94cf3fc commit 804f87b

File tree

2 files changed

+10
-1
lines changed

2 files changed

+10
-1
lines changed

lib/PartitionedProducerImpl.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,8 +471,8 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
471471
}
472472
} else {
473473
LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
474-
runPartitionUpdateTask();
475474
}
475+
runPartitionUpdateTask();
476476
}
477477

478478
bool PartitionedProducerImpl::isConnected() const {

tests/PartitionsUpdateTest.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,11 @@ static void waitForPartitionsUpdated() {
101101
std::this_thread::sleep_for(std::chrono::seconds(3));
102102
}
103103

104+
static void waitForPartitionUpdateTaskRunMultipleTimes() {
105+
// Assume runPartitionUpdateTask run more than one time in 2 seconds if enabled
106+
std::this_thread::sleep_for(std::chrono::seconds(2));
107+
}
108+
104109
TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
105110
ClientConfiguration clientConfig;
106111
ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
@@ -131,6 +136,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN
131136
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers));
132137
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
133138

139+
waitForPartitionUpdateTaskRunMultipleTimes();
134140
res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3
135141
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
136142
waitForPartitionsUpdated();
@@ -143,6 +149,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN
143149
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false));
144150
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
145151

152+
waitForPartitionUpdateTaskRunMultipleTimes();
146153
res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5
147154
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
148155
waitForPartitionsUpdated();
@@ -155,6 +162,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN
155162
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
156163
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
157164

165+
waitForPartitionUpdateTaskRunMultipleTimes();
158166
res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7
159167
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
160168
waitForPartitionsUpdated();
@@ -167,6 +175,7 @@ void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicN
167175
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
168176
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
169177

178+
waitForPartitionUpdateTaskRunMultipleTimes();
170179
res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10
171180
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
172181
waitForPartitionsUpdated();

0 commit comments

Comments
 (0)