Skip to content

Commit 36c2fac

Browse files
authored
MINOR: Consider repartition source topic for changelog topics (#19373)
With the Streams rebalance protocol, when the partition count for changelog topics are computed the topic manager only considers external source topics and throws if there are no source topics. However, subtopologies are allowed to only have repartition topics as source topics. This commit also considers repartition source topics when computing the partition count for changelog topics. Reviewers: Lucas Brutschy <[email protected]>
1 parent d4d9f11 commit 36c2fac

File tree

2 files changed

+38
-3
lines changed

2 files changed

+38
-3
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopics.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Set;
3232
import java.util.function.Function;
3333
import java.util.stream.Collectors;
34+
import java.util.stream.Stream;
3435

3536
/**
3637
* This class is responsible for setting up the changelog topics for a topology. For a given topology, which does not have the number
@@ -75,7 +76,10 @@ public Map<String, Integer> setup() {
7576
final Set<String> sourceTopics = new HashSet<>(subtopology.sourceTopics());
7677

7778
final OptionalInt maxNumPartitions =
78-
subtopology.sourceTopics().stream().mapToInt(this::getPartitionCountOrFail).max();
79+
Stream.concat(
80+
subtopology.sourceTopics().stream(),
81+
subtopology.repartitionSourceTopics().stream().map(TopicInfo::name)
82+
).mapToInt(this::getPartitionCountOrFail).max();
7983

8084
if (maxNumPartitions.isEmpty()) {
8185
throw new StreamsInvalidTopologyException("No source topics found for subtopology " + subtopology.subtopologyId());

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java

+33-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Map;
2929
import java.util.OptionalInt;
3030

31+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3132
import static org.junit.jupiter.api.Assertions.assertEquals;
3233
import static org.junit.jupiter.api.Assertions.assertThrows;
3334
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,6 +50,18 @@ public class ChangelogTopicsTest {
4950
.setRepartitionSinkTopics(List.of(SINK_TOPIC_NAME))
5051
.setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO))
5152
.setStateChangelogTopics(List.of());
53+
private static final Subtopology SUBTOPOLOGY_NO_REPARTITION_SOURCE = new Subtopology()
54+
.setSubtopologyId("SUBTOPOLOGY_NO_SOURCE")
55+
.setSourceTopics(List.of(SOURCE_TOPIC_NAME))
56+
.setRepartitionSinkTopics(List.of(SINK_TOPIC_NAME))
57+
.setRepartitionSourceTopics(List.of())
58+
.setStateChangelogTopics(List.of());
59+
private static final Subtopology SUBTOPOLOGY_NO_SOURCE_NO_REPARTITION_SOURCE = new Subtopology()
60+
.setSubtopologyId("SUBTOPOLOGY_NO_SOURCE")
61+
.setSourceTopics(List.of())
62+
.setRepartitionSinkTopics(List.of(SINK_TOPIC_NAME))
63+
.setRepartitionSourceTopics(List.of())
64+
.setStateChangelogTopics(List.of());
5265
private static final Subtopology SUBTOPOLOGY_STATELESS = new Subtopology()
5366
.setSubtopologyId("SUBTOPOLOGY_STATELESS")
5467
.setSourceTopics(List.of(SOURCE_TOPIC_NAME))
@@ -85,8 +98,8 @@ private static OptionalInt topicPartitionProvider(String s) {
8598
}
8699

87100
@Test
88-
public void shouldFailIfNoSourceTopics() {
89-
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE);
101+
public void shouldFailIfNoSourceTopicsAndNoRepartitionSourceTopics() {
102+
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE_NO_REPARTITION_SOURCE);
90103

91104
final ChangelogTopics changelogTopics =
92105
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
@@ -95,6 +108,24 @@ public void shouldFailIfNoSourceTopics() {
95108
assertTrue(e.getMessage().contains("No source topics found for subtopology"));
96109
}
97110

111+
@Test
112+
public void shouldNotFailIfOnlySourceTopicsEmpty() {
113+
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_SOURCE);
114+
115+
final ChangelogTopics changelogTopics =
116+
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
117+
assertDoesNotThrow(changelogTopics::setup);
118+
}
119+
120+
@Test
121+
public void shouldNotFailIfOnlyRepartitionSourceTopicsEmpty() {
122+
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_NO_REPARTITION_SOURCE);
123+
124+
final ChangelogTopics changelogTopics =
125+
new ChangelogTopics(LOG_CONTEXT, subtopologies, ChangelogTopicsTest::topicPartitionProvider);
126+
assertDoesNotThrow(changelogTopics::setup);
127+
}
128+
98129
@Test
99130
public void shouldNotContainChangelogsForStatelessTasks() {
100131
final List<Subtopology> subtopologies = List.of(SUBTOPOLOGY_STATELESS);

0 commit comments

Comments
 (0)