Skip to content

Commit 9e62028

Browse files
authored
KAFKA-19118: Enable KIP-1071 in StandbyTaskCreationIntegrationTest (#19438)
Enable KIP-1071 parameter in `StandbyTaskCreationIntegrationTest`. Required a fix: In `ChangelogTopic.setup`, we actually need to return both the source-topic (optimized) and the non-source-topic changelog topics, since otherwise we will not find the partition number later on. Extended `EmbeddedKafkaCluster` to set the number of standby replicas dynamically for the group. We need to initialize it to one for the integration test to go through. Reviewers: Bill Bejeck <[email protected]>
1 parent 3623726 commit 9e62028

File tree

4 files changed

+44
-23
lines changed

4 files changed

+44
-23
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,8 @@
2525

2626
import java.util.Collection;
2727
import java.util.HashMap;
28-
import java.util.HashSet;
2928
import java.util.Map;
3029
import java.util.OptionalInt;
31-
import java.util.Set;
3230
import java.util.function.Function;
3331
import java.util.stream.Collectors;
3432
import java.util.stream.Stream;
@@ -64,17 +62,15 @@ public ChangelogTopics(
6462
}
6563

6664
/**
67-
* Determines the number of partitions for each non-source changelog topic in the requested topology.
65+
* Determines the number of partitions for each changelog topic in the requested topology.
6866
*
6967
* @throws IllegalStateException If a source topic does not have a partition count defined through topicPartitionCountProvider.
7068
*
71-
* @return the map of all non-source changelog topics for the requested topology to their required number of partitions.
69+
* @return the map of all changelog topics for the requested topology to their required number of partitions.
7270
*/
7371
public Map<String, Integer> setup() {
7472
final Map<String, Integer> changelogTopicPartitions = new HashMap<>();
7573
for (Subtopology subtopology : subtopologies) {
76-
final Set<String> sourceTopics = new HashSet<>(subtopology.sourceTopics());
77-
7874
final OptionalInt maxNumPartitions =
7975
Stream.concat(
8076
subtopology.sourceTopics().stream(),
@@ -85,9 +81,7 @@ public Map<String, Integer> setup() {
8581
throw new StreamsInvalidTopologyException("No source topics found for subtopology " + subtopology.subtopologyId());
8682
}
8783
for (final TopicInfo topicInfo : subtopology.stateChangelogTopics()) {
88-
if (!sourceTopics.contains(topicInfo.name())) {
89-
changelogTopicPartitions.put(topicInfo.name(), maxNumPartitions.getAsInt());
90-
}
84+
changelogTopicPartitions.put(topicInfo.name(), maxNumPartitions.getAsInt());
9185
}
9286
}
9387

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -149,14 +149,14 @@ public void shouldContainNonSourceBasedChangelogs() {
149149
}
150150

151151
@Test
152-
public void shouldNotContainSourceBasedChangelogs() {
152+
public void shouldContainSourceBasedChangelogs() {
153153
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_SOURCE_CHANGELOG);
154154

155155
final ChangelogTopics changelogTopics =
156156
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
157157
Map<String, Integer> setup = changelogTopics.setup();
158158

159-
assertEquals(Map.of(), setup);
159+
assertEquals(Map.of(SOURCE_TOPIC_NAME, 3), setup);
160160
}
161161

162162
@Test
@@ -167,6 +167,6 @@ public void shouldContainBothTypesOfPreExistingChangelogs() {
167167
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
168168
Map<String, Integer> setup = changelogTopics.setup();
169169

170-
assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3), setup);
170+
assertEquals(Map.of(CHANGELOG_TOPIC_CONFIG.name(), 3, SOURCE_TOPIC_NAME, 3), setup);
171171
}
172172
}

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java

+22-11
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.streams.integration;
1818

1919
import org.apache.kafka.common.serialization.Serdes;
20+
import org.apache.kafka.streams.GroupProtocol;
2021
import org.apache.kafka.streams.KafkaStreams;
2122
import org.apache.kafka.streams.KafkaStreams.State;
2223
import org.apache.kafka.streams.StreamsBuilder;
@@ -38,12 +39,14 @@
3839
import org.junit.jupiter.api.BeforeAll;
3940
import org.junit.jupiter.api.BeforeEach;
4041
import org.junit.jupiter.api.Tag;
41-
import org.junit.jupiter.api.Test;
4242
import org.junit.jupiter.api.TestInfo;
4343
import org.junit.jupiter.api.Timeout;
44+
import org.junit.jupiter.params.ParameterizedTest;
45+
import org.junit.jupiter.params.provider.ValueSource;
4446

4547
import java.io.IOException;
4648
import java.time.Duration;
49+
import java.util.Locale;
4750
import java.util.Properties;
4851
import java.util.function.Predicate;
4952

@@ -54,7 +57,7 @@
5457
public class StandbyTaskCreationIntegrationTest {
5558
private static final int NUM_BROKERS = 1;
5659

57-
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
60+
public static final EmbeddedKafkaCluster CLUSTER = EmbeddedKafkaCluster.withStreamsRebalanceProtocol(NUM_BROKERS);
5861

5962
private String safeTestName;
6063

@@ -87,19 +90,25 @@ public void after() {
8790
client2.close(Duration.ofSeconds(60));
8891
}
8992

90-
private Properties streamsConfiguration() {
93+
private Properties streamsConfiguration(final boolean streamsProtocolEnabled) {
9194
final Properties streamsConfiguration = new Properties();
9295
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
9396
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
9497
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
9598
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
9699
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
97-
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
100+
if (streamsProtocolEnabled) {
101+
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
102+
CLUSTER.setStandbyReplicas("app-" + safeTestName, 1);
103+
} else {
104+
streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
105+
}
98106
return streamsConfiguration;
99107
}
100108

101-
@Test
102-
public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled() throws Exception {
109+
@ParameterizedTest
110+
@ValueSource(booleans = {true, false})
111+
public void shouldNotCreateAnyStandByTasksForStateStoreWithLoggingDisabled(final boolean streamsProtocolEnabled) throws Exception {
103112
final StreamsBuilder builder = new StreamsBuilder();
104113
final String stateStoreName = "myTransformState";
105114
final StoreBuilder<KeyValueStore<Integer, Integer>> keyValueStoreBuilder =
@@ -116,8 +125,9 @@ public void process(final Record<Integer, Integer> record) {
116125
}
117126
}, stateStoreName);
118127

128+
119129
final Topology topology = builder.build();
120-
createClients(topology, streamsConfiguration(), topology, streamsConfiguration());
130+
createClients(topology, streamsConfiguration(streamsProtocolEnabled), topology, streamsConfiguration(streamsProtocolEnabled));
121131

122132
setStateListenersForVerification(thread -> thread.standbyTasks().isEmpty() && !thread.activeTasks().isEmpty());
123133

@@ -128,11 +138,12 @@ public void process(final Record<Integer, Integer> record) {
128138
);
129139
}
130140

131-
@Test
132-
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables() throws Exception {
133-
final Properties streamsConfiguration1 = streamsConfiguration();
141+
@ParameterizedTest
142+
@ValueSource(booleans = {true, false})
143+
public void shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(final boolean streamsProtocolEnabled) throws Exception {
144+
final Properties streamsConfiguration1 = streamsConfiguration(streamsProtocolEnabled);
134145
streamsConfiguration1.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
135-
final Properties streamsConfiguration2 = streamsConfiguration();
146+
final Properties streamsConfiguration2 = streamsConfiguration(streamsProtocolEnabled);
136147
streamsConfiguration2.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
137148

138149
final StreamsBuilder builder = new StreamsBuilder();

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.CommonClientConfigs;
2020
import org.apache.kafka.clients.admin.Admin;
21+
import org.apache.kafka.clients.admin.AlterConfigOp;
2122
import org.apache.kafka.clients.admin.Config;
2223
import org.apache.kafka.clients.admin.ConfigEntry;
2324
import org.apache.kafka.clients.admin.ListTopicsOptions;
@@ -41,6 +42,7 @@
4142
import org.apache.kafka.common.test.KafkaClusterTestKit;
4243
import org.apache.kafka.common.test.TestKitNodes;
4344
import org.apache.kafka.common.utils.Utils;
45+
import org.apache.kafka.coordinator.group.GroupConfig;
4446
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
4547
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
4648
import org.apache.kafka.network.SocketServerConfigs;
@@ -59,6 +61,7 @@
5961
import java.util.Collections;
6062
import java.util.HashMap;
6163
import java.util.HashSet;
64+
import java.util.List;
6265
import java.util.Map;
6366
import java.util.Properties;
6467
import java.util.Set;
@@ -446,6 +449,19 @@ public Properties getLogConfig(final String topic) {
446449
}
447450
}
448451

452+
public void setStandbyReplicas(final String groupId, final int numStandbyReplicas) {
453+
try (final Admin adminClient = createAdminClient()) {
454+
adminClient.incrementalAlterConfigs(
455+
Map.of(
456+
new ConfigResource(ConfigResource.Type.GROUP, groupId),
457+
List.of(new AlterConfigOp(new ConfigEntry(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(numStandbyReplicas)), AlterConfigOp.OpType.SET))
458+
)
459+
).all().get();
460+
} catch (final InterruptedException | ExecutionException e) {
461+
throw new RuntimeException(e);
462+
}
463+
}
464+
449465
private final class TopicsRemainingCondition implements TestCondition {
450466
final Set<String> remainingTopics = new HashSet<>();
451467

0 commit comments

Comments
 (0)