Skip to content

Commit c65a161

Browse files
authored
KAFKA-19118: Enable KIP-1071 in InternalTopicIntegrationTest (#19425)
KIP-1071 creates internal topics broker-side, so this test checks whether, when KIP-1071 is enabled, basically the same topics are created. It also adds a little helper method in `EmbeddedKafkaCluster`, so that fewer code changes are required to enable KIP-1071. We use that helper in the already enabled SmokeTestDriverIntegrationTest and revert some of the changes there (making the cluster `final` again). Reviewers: Bill Bejeck <[email protected]>, PoAn Yang <[email protected]>
1 parent c3b7aa6 commit c65a161

File tree

3 files changed

+42
-25
lines changed

3 files changed

+42
-25
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java

+27-14
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.kafka.common.serialization.StringSerializer;
2929
import org.apache.kafka.common.utils.Bytes;
3030
import org.apache.kafka.server.util.MockTime;
31+
import org.apache.kafka.streams.GroupProtocol;
3132
import org.apache.kafka.streams.KafkaStreams;
3233
import org.apache.kafka.streams.StreamsBuilder;
3334
import org.apache.kafka.streams.StreamsConfig;
@@ -48,8 +49,9 @@
4849
import org.junit.jupiter.api.BeforeAll;
4950
import org.junit.jupiter.api.BeforeEach;
5051
import org.junit.jupiter.api.Tag;
51-
import org.junit.jupiter.api.Test;
5252
import org.junit.jupiter.api.Timeout;
53+
import org.junit.jupiter.params.ParameterizedTest;
54+
import org.junit.jupiter.params.provider.ValueSource;
5355

5456
import java.io.IOException;
5557
import java.time.Duration;
@@ -73,7 +75,7 @@
7375
@Timeout(600)
7476
@Tag("integration")
7577
public class InternalTopicIntegrationTest {
76-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
78+
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(1);
7779

7880
@BeforeAll
7981
public static void startCluster() throws IOException, InterruptedException {
@@ -147,14 +149,22 @@ private Admin createAdminClient() {
147149
return Admin.create(adminClientConfig);
148150
}
149151

152+
private void configureStreams(final boolean streamsProtocolEnabled, final String appID) {
153+
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
154+
if (streamsProtocolEnabled) {
155+
streamsProp.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
156+
}
157+
}
158+
150159
/*
151160
* This test just ensures that the assignor does not get stuck during partition number resolution
152161
* for internal repartition topics. See KAFKA-10689
153162
*/
154-
@Test
155-
public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
156-
final String appID = APP_ID + "-windowed-FKJ";
157-
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
163+
@ParameterizedTest
164+
@ValueSource(booleans = {true, false})
165+
public void shouldGetToRunningWithWindowedTableInFKJ(final boolean streamsProtocolEnabled) throws Exception {
166+
final String appID = APP_ID + "-windowed-FKJ-" + streamsProtocolEnabled;
167+
configureStreams(streamsProtocolEnabled, appID);
158168

159169
final StreamsBuilder streamsBuilder = new StreamsBuilder();
160170
final KStream<String, String> inputTopic = streamsBuilder.stream(DEFAULT_INPUT_TOPIC);
@@ -179,10 +189,12 @@ public void shouldGetToRunningWithWindowedTableInFKJ() throws Exception {
179189
}
180190
}
181191

182-
@Test
183-
public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
184-
final String appID = APP_ID + "-compact";
185-
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
192+
193+
@ParameterizedTest
194+
@ValueSource(booleans = {true, false})
195+
public void shouldCompactTopicsForKeyValueStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
196+
final String appID = APP_ID + "-compact-" + streamsProtocolEnabled;
197+
configureStreams(streamsProtocolEnabled, appID);
186198

187199
//
188200
// Step 1: Configure and start a simple word count topology
@@ -216,10 +228,11 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs() throws Exception {
216228
assertEquals(4, repartitionProps.size());
217229
}
218230

219-
@Test
220-
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Exception {
221-
final String appID = APP_ID + "-compact-delete";
222-
streamsProp.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
231+
@ParameterizedTest
232+
@ValueSource(booleans = {true, false})
233+
public void shouldCompactAndDeleteTopicsForWindowStoreChangelogs(final boolean streamsProtocolEnabled) throws Exception {
234+
final String appID = APP_ID + "-compact-delete-" + streamsProtocolEnabled;
235+
configureStreams(streamsProtocolEnabled, appID);
223236

224237
//
225238
// Step 1: Configure and start a simple word count topology

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig;
2020
import org.apache.kafka.common.utils.Exit;
21-
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
22-
import org.apache.kafka.server.config.ServerConfigs;
2321
import org.apache.kafka.streams.GroupProtocol;
2422
import org.apache.kafka.streams.StreamsConfig;
2523
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
@@ -53,21 +51,17 @@
5351
@Timeout(600)
5452
@Tag("integration")
5553
public class SmokeTestDriverIntegrationTest {
56-
public static EmbeddedKafkaCluster cluster;
54+
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(3);
5755
public TestInfo testInfo;
5856

5957
@BeforeAll
6058
public static void startCluster() throws IOException {
61-
final Properties props = new Properties();
62-
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
63-
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
64-
cluster = new EmbeddedKafkaCluster(3, props);
65-
cluster.start();
59+
CLUSTER.start();
6660
}
6761

6862
@AfterAll
6963
public static void closeCluster() {
70-
cluster.stop();
64+
CLUSTER.stop();
7165
}
7266

7367
@BeforeEach
@@ -135,9 +129,9 @@ public void shouldWorkWithRebalance(
135129
int numClientsCreated = 0;
136130
final ArrayList<SmokeTestClient> clients = new ArrayList<>();
137131

138-
IntegrationTestUtils.cleanStateBeforeTest(cluster, SmokeTestDriver.topics());
132+
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
139133

140-
final String bootstrapServers = cluster.bootstrapServers();
134+
final String bootstrapServers = CLUSTER.bootstrapServers();
141135
final Driver driver = new Driver(bootstrapServers, 10, 1000);
142136
driver.start();
143137
System.out.println("started driver");

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

+10
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,16 @@ public EmbeddedKafkaCluster(final int numBrokers,
140140
this.time = new MockTime(mockTimeMillisStart, mockTimeNanoStart);
141141
}
142142

143+
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers) {
144+
return withStreamsRebalanceProtocol(numBrokers, new Properties());
145+
}
146+
147+
public static EmbeddedKafkaCluster withStreamsRebalanceProtocol(final int numBrokers, final Properties props) {
148+
props.setProperty(GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, "classic,consumer,streams");
149+
props.setProperty(ServerConfigs.UNSTABLE_API_VERSIONS_ENABLE_CONFIG, "true");
150+
return new EmbeddedKafkaCluster(numBrokers, props);
151+
}
152+
143153
public void start() {
144154
try {
145155
cluster.format();

0 commit comments

Comments
 (0)