Skip to content

Commit 5e53298

Browse files
frankvickyjanchilling
authored andcommitted
KAFKA-19042 Move ConsumerTopicCreationTest to client-integration-tests module (apache#19283)
This patch moves `ConsumerTopicCreationTest` to the `client-integration-tests` and rewrite it as Java. The patch also streamlines the test flow. In the Scala version, there is a producer that produces messages, but this is not the main purpose of the `ConsumerTopicCreationTest`. Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent e6fbbc5 commit 5e53298

File tree

2 files changed

+125
-108
lines changed

2 files changed

+125
-108
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer;
18+
19+
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.TopicListing;
21+
import org.apache.kafka.common.test.ClusterInstance;
22+
import org.apache.kafka.common.test.api.ClusterConfig;
23+
import org.apache.kafka.common.test.api.ClusterTemplate;
24+
25+
import java.time.Duration;
26+
import java.util.List;
27+
import java.util.Locale;
28+
import java.util.Map;
29+
import java.util.Set;
30+
31+
import static org.apache.kafka.clients.consumer.ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG;
32+
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_PROTOCOL_CONFIG;
33+
import static org.apache.kafka.common.test.api.Type.KRAFT;
34+
import static org.apache.kafka.server.config.ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG;
35+
import static org.junit.jupiter.api.Assertions.assertFalse;
36+
import static org.junit.jupiter.api.Assertions.assertTrue;
37+
38+
public class ConsumerTopicCreationTest {
39+
private static final String TOPIC = "topic";
40+
private static final long POLL_TIMEOUT = 1000;
41+
42+
@ClusterTemplate("autoCreateTopicsConfigs")
43+
void testAsyncConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) throws Exception {
44+
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CONSUMER, true)) {
45+
subscribeAndPoll(consumer);
46+
assertTopicCreateBasedOnPermission(cluster);
47+
}
48+
}
49+
50+
@ClusterTemplate("autoCreateTopicsConfigs")
51+
void testAsyncConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) throws Exception {
52+
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CONSUMER, false)) {
53+
subscribeAndPoll(consumer);
54+
assertTopicNotCreate(cluster);
55+
}
56+
}
57+
58+
@ClusterTemplate("autoCreateTopicsConfigs")
59+
void testClassicConsumerTopicCreationIfConsumerAllowToCreateTopic(ClusterInstance cluster) throws Exception {
60+
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CLASSIC, true)) {
61+
subscribeAndPoll(consumer);
62+
assertTopicCreateBasedOnPermission(cluster);
63+
}
64+
}
65+
66+
@ClusterTemplate("autoCreateTopicsConfigs")
67+
void testClassicConsumerTopicCreationIfConsumerDisallowToCreateTopic(ClusterInstance cluster) throws Exception {
68+
try (Consumer<byte[], byte[]> consumer = createConsumer(cluster, GroupProtocol.CLASSIC, false)) {
69+
subscribeAndPoll(consumer);
70+
assertTopicNotCreate(cluster);
71+
}
72+
}
73+
74+
private void subscribeAndPoll(Consumer<byte[], byte[]> consumer) {
75+
consumer.subscribe(List.of(TOPIC));
76+
consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
77+
}
78+
79+
private void assertTopicCreateBasedOnPermission(ClusterInstance cluster) throws Exception {
80+
if (allowAutoCreateTopics(cluster))
81+
assertTopicCreate(cluster);
82+
else
83+
assertTopicNotCreate(cluster);
84+
}
85+
86+
private boolean allowAutoCreateTopics(ClusterInstance cluster) {
87+
return cluster.config().serverProperties().get(AUTO_CREATE_TOPICS_ENABLE_CONFIG).equals("true");
88+
}
89+
90+
private void assertTopicCreate(ClusterInstance cluster) throws Exception {
91+
assertTrue(getAllTopics(cluster).contains(TOPIC));
92+
}
93+
94+
private void assertTopicNotCreate(ClusterInstance cluster) throws Exception {
95+
assertFalse(getAllTopics(cluster).contains(TOPIC),
96+
"Both " + AUTO_CREATE_TOPICS_ENABLE_CONFIG + " and " + ALLOW_AUTO_CREATE_TOPICS_CONFIG + " need to be true to create topic automatically");
97+
}
98+
99+
private List<String> getAllTopics(ClusterInstance cluster) throws Exception {
100+
try (Admin admin = cluster.admin()) {
101+
return admin.listTopics().listings().get().stream().map(TopicListing::name).toList();
102+
}
103+
}
104+
105+
private static List<ClusterConfig> autoCreateTopicsConfigs() {
106+
return List.of(
107+
ClusterConfig.defaultBuilder()
108+
.setTypes(Set.of(KRAFT))
109+
.setServerProperties(Map.of(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "true"))
110+
.build(),
111+
ClusterConfig.defaultBuilder()
112+
.setTypes(Set.of(KRAFT))
113+
.setServerProperties(Map.of(AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false"))
114+
.build()
115+
);
116+
}
117+
118+
private Consumer<byte[], byte[]> createConsumer(ClusterInstance cluster, GroupProtocol protocol, boolean allowConsumerAutoCreateTopics) {
119+
Map<String, Object> consumerConfig = Map.of(
120+
ALLOW_AUTO_CREATE_TOPICS_CONFIG, allowConsumerAutoCreateTopics,
121+
GROUP_PROTOCOL_CONFIG, protocol.name().toLowerCase(Locale.ROOT)
122+
);
123+
return cluster.consumer(consumerConfig);
124+
}
125+
}

core/src/test/scala/integration/kafka/api/ConsumerTopicCreationTest.scala

-108
This file was deleted.

0 commit comments

Comments
 (0)