Skip to content

Commit fef9aeb

Browse files
authored
KAFKA-18276 Migrate ProducerRebootstrapTest to new test infra (#19046)
The PR changed three things. * Migrated `ProducerRebootstrapTest` to new test infra and removed the old Scala test. * Updated the original test case to cover rebootstrap scenarios. * Integrated `ProducerRebootstrapTest` into `ClientRebootstrapTest` in the `client-integration-tests` module. Default `ProducerRebootstrap` config: > properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "300000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "10000"); properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "30000"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "50L"); properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000L"); The test case for the producer with enabled rebootstrap <img width="1549" alt="Screenshot 2025-03-17 at 10 46 03 PM" src="https://github.com/user-attachments/assets/547840a6-d79d-4db4-98c0-9b05ed04cf60" /> The test case for the producer with disabled rebootstrap <img width="1552" alt="Screenshot 2025-03-17 at 10 46 47 PM" src="https://github.com/user-attachments/assets/2248e809-d9d5-4f3b-a24f-ba1aa0fef728" /> Reviewers: TengYao Chi <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 5d2bfb4 commit fef9aeb

File tree

2 files changed

+74
-56
lines changed

2 files changed

+74
-56
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/ClientRebootstrapTest.java

+74
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.apache.kafka.clients;
1818

1919
import org.apache.kafka.clients.admin.NewTopic;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.common.config.TopicConfig;
2022
import org.apache.kafka.common.test.ClusterInstance;
2123
import org.apache.kafka.common.test.api.ClusterConfigProperty;
2224
import org.apache.kafka.common.test.api.ClusterTest;
@@ -26,10 +28,12 @@
2628
import java.time.Duration;
2729
import java.util.List;
2830
import java.util.Map;
31+
import java.util.concurrent.ExecutionException;
2932
import java.util.concurrent.TimeUnit;
3033
import java.util.concurrent.TimeoutException;
3134

3235
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
36+
import static org.junit.jupiter.api.Assertions.assertEquals;
3337
import static org.junit.jupiter.api.Assertions.assertThrows;
3438

3539
public class ClientRebootstrapTest {
@@ -94,4 +98,74 @@ public void testAdminRebootstrapDisabled(ClusterInstance clusterInstance) {
9498
// Since the brokers cached during the bootstrap are offline, the admin client needs to wait the default timeout for other threads.
9599
admin.close(Duration.ZERO);
96100
}
101+
102+
@ClusterTest(
103+
brokers = REPLICAS,
104+
types = {Type.KRAFT},
105+
serverProperties = {
106+
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
107+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
108+
}
109+
)
110+
public void testProducerRebootstrap(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
111+
try (var admin = clusterInstance.admin()) {
112+
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
113+
}
114+
115+
var broker0 = 0;
116+
var broker1 = 1;
117+
118+
// It's ok to shut the leader down, cause the reelection is small enough to the producer timeout.
119+
clusterInstance.shutdownBroker(broker0);
120+
121+
try (var producer = clusterInstance.producer()) {
122+
// Only the broker 1 is available for the producer during the bootstrap.
123+
var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC, "value 0".getBytes())).get();
124+
assertEquals(0, recordMetadata0.offset());
125+
126+
clusterInstance.shutdownBroker(broker1);
127+
clusterInstance.startBroker(broker0);
128+
129+
// Current broker 1 is offline.
130+
// However, the broker 0 from the bootstrap list is online.
131+
// Should be able to produce records.
132+
var recordMetadata1 = producer.send(new ProducerRecord<>(TOPIC, "value 1".getBytes())).get();
133+
assertEquals(0, recordMetadata1.offset());
134+
}
135+
}
136+
137+
@ClusterTest(
138+
brokers = REPLICAS,
139+
types = {Type.KRAFT},
140+
serverProperties = {
141+
@ClusterConfigProperty(key = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, value = "true"),
142+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2")
143+
}
144+
)
145+
public void testProducerRebootstrapDisabled(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
146+
try (var admin = clusterInstance.admin()) {
147+
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) REPLICAS)));
148+
}
149+
150+
var broker0 = 0;
151+
var broker1 = 1;
152+
153+
// It's ok to shut the leader down, cause the reelection is small enough to the producer timeout.
154+
clusterInstance.shutdownBroker(broker0);
155+
156+
var producer = clusterInstance.producer(Map.of(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "none"));
157+
158+
// Only the broker 1 is available for the producer during the bootstrap.
159+
var recordMetadata0 = producer.send(new ProducerRecord<>(TOPIC, "value 0".getBytes())).get();
160+
assertEquals(0, recordMetadata0.offset());
161+
162+
clusterInstance.shutdownBroker(broker1);
163+
clusterInstance.startBroker(broker0);
164+
165+
// The broker 1, originally cached during the bootstrap, is offline.
166+
// As a result, the producer will throw a TimeoutException when trying to send a message.
167+
assertThrows(TimeoutException.class, () -> producer.send(new ProducerRecord<>(TOPIC, "value 1".getBytes())).get(5, TimeUnit.SECONDS));
168+
// Since the brokers cached during the bootstrap are offline, the producer needs to wait the default timeout for other threads.
169+
producer.close(Duration.ZERO);
170+
}
97171
}

core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala

-56
This file was deleted.

0 commit comments

Comments
 (0)