Skip to content

Commit 5f80de3

Browse files
authored
KAFKA-19162: Topology metadata contains non-deterministically ordered topic configs (#19491)
Topology description sent to broker in KIP-1071 contains non-deterministically ordered topic configs. Since the topology is compared to the groups topology upon joining we may run into `INVALID_REQUEST: Topology updates are not supported yet` failures if the topology sent by the application does not match the group topology due to different topic config order. This PR ensures that topic configs are ordered, to avoid an `INVALID_REQUEST` error. Reviewers: Matthias J. Sax <[email protected]>
1 parent 144101a commit 5f80de3

File tree

2 files changed

+15
-2
lines changed

2 files changed

+15
-2
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java

+2
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getRepartitionTo
237237
repartitionTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
238238
);
239239
repartitionTopicsInfo.add(repartitionTopicInfo);
240+
repartitionTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
240241
}
241242
repartitionTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));
242243
return repartitionTopicsInfo;
@@ -251,6 +252,7 @@ private static List<StreamsGroupHeartbeatRequestData.TopicInfo> getChangelogTopi
251252
changelogTopic.getValue().topicConfigs().forEach((k, v) ->
252253
changelogTopicInfo.topicConfigs().add(new StreamsGroupHeartbeatRequestData.KeyValue().setKey(k).setValue(v))
253254
);
255+
changelogTopicInfo.topicConfigs().sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key));
254256
changelogTopicsInfo.add(changelogTopicInfo);
255257
}
256258
changelogTopicsInfo.sort(Comparator.comparing(StreamsGroupHeartbeatRequestData.TopicInfo::name));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
108108
private static final String REPARTITION_SOURCE_TOPIC_1 = "repartitionSourceTopic1";
109109
private static final String REPARTITION_SOURCE_TOPIC_2 = "repartitionSourceTopic2";
110110
private static final Map<String, StreamsRebalanceData.TopicInfo> REPARTITION_SOURCE_TOPICS = Map.of(
111-
REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config1", "value1")),
111+
REPARTITION_SOURCE_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.of(2), Optional.of((short) 1), Map.of("config3", "value3", "config1", "value1")),
112112
REPARTITION_SOURCE_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.of(3), Optional.of((short) 3), Collections.emptyMap())
113113
);
114114
private static final String CHANGELOG_TOPIC_1 = "changelogTopic1";
@@ -117,7 +117,7 @@ class StreamsGroupHeartbeatRequestManagerTest {
117117
private static final Map<String, StreamsRebalanceData.TopicInfo> CHANGELOG_TOPICS = Map.of(
118118
CHANGELOG_TOPIC_1, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 1), Map.of()),
119119
CHANGELOG_TOPIC_2, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 2), Map.of()),
120-
CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config2", "value2"))
120+
CHANGELOG_TOPIC_3, new StreamsRebalanceData.TopicInfo(Optional.empty(), Optional.of((short) 3), Map.of("config4", "value4", "config2", "value2"))
121121
);
122122
private static final Collection<Set<String>> COPARTITION_GROUP = Set.of(
123123
Set.of(SOURCE_TOPIC_1, REPARTITION_SOURCE_TOPIC_2),
@@ -664,6 +664,7 @@ public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberStat
664664
assertEquals(repartitionTopic.numPartitions().get(), topicInfo.partitions());
665665
assertEquals(repartitionTopic.replicationFactor().get(), topicInfo.replicationFactor());
666666
assertEquals(repartitionTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
667+
assertTrue(isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
667668
});
668669
assertEquals(CHANGELOG_TOPICS.size(), subtopology1.stateChangelogTopics().size());
669670
subtopology1.stateChangelogTopics().forEach(topicInfo -> {
@@ -672,6 +673,7 @@ public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberStat
672673
final StreamsRebalanceData.TopicInfo changelogTopic = CHANGELOG_TOPICS.get(topicInfo.name());
673674
assertEquals(changelogTopic.replicationFactor().get(), topicInfo.replicationFactor());
674675
assertEquals(changelogTopic.topicConfigs().size(), topicInfo.topicConfigs().size());
676+
assertTrue(isSorted(topicInfo.topicConfigs(), Comparator.comparing(StreamsGroupHeartbeatRequestData.KeyValue::key)));
675677
});
676678
assertEquals(2, subtopology1.copartitionGroups().size());
677679
final StreamsGroupHeartbeatRequestData.CopartitionGroup expectedCopartitionGroupData1 =
@@ -701,6 +703,15 @@ public void testBuildingHeartbeatRequestTopologySentWhenJoining(final MemberStat
701703
assertNull(nonJoiningRequestData.topology());
702704
}
703705

706+
private <V> boolean isSorted(List<V> collection, Comparator<V> comparator) {
707+
for (int i = 1; i < collection.size(); i++) {
708+
if (comparator.compare(collection.get(i - 1), collection.get(i)) > 0) {
709+
return false;
710+
}
711+
}
712+
return true;
713+
}
714+
704715
@ParameterizedTest
705716
@MethodSource("provideNonJoiningStates")
706717
public void testBuildingHeartbeatRequestRebalanceTimeoutSentWhenJoining(final MemberState memberState) {

0 commit comments

Comments
 (0)