Skip to content

Commit d236067

Browse files
akatona84david-simon
authored andcommitted
Upgrade Kafka to 3.8.0 (linkedin#2180)
* Upgrading kafka to 3.8.0 - config properties rewriting and adding necessary dependencies # Conflicts: # gradle.properties * Upgrading kafka to 3.8.0 - using alternative for removed getAllTopicConfigs zk admin client method * Upgrading kafka to 3.8.0 - adding 3.8 zk client creation way * Upgrading kafka to 3.8.0 - adding 3.8 network client creation way * replication/quota/topic log constants moved in 3.8 again its value hasn't changed, only where it was stored, this way it's backward compatible * Update usages of Metadata to conform to kafka 3.7 interface --------- Co-authored-by: David Simon <[email protected]>
1 parent e08c4f8 commit d236067

22 files changed

+174
-164
lines changed

build.gradle

+6-1
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@ project(':cruise-control') {
279279
implementation "io.netty:netty-handler:${nettyVersion}"
280280
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
281281
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
282+
api "org.apache.kafka:kafka-server:$kafkaVersion"
283+
api "org.apache.kafka:kafka-server-common:$kafkaVersion"
282284
api "org.apache.kafka:kafka-clients:$kafkaVersion"
283285
// Add following dependency when upgrading to Kafka 3.5
284286
api "org.apache.kafka:kafka-storage:$kafkaVersion"
@@ -447,6 +449,7 @@ project(':cruise-control-metrics-reporter') {
447449
implementation "com.yammer.metrics:metrics-core:2.2.0"
448450
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2"
449451
implementation "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
452+
implementation "org.apache.kafka:kafka-server:$kafkaVersion"
450453
implementation "org.apache.kafka:kafka-clients:$kafkaVersion"
451454
implementation 'com.google.code.findbugs:jsr305:3.0.2'
452455
// Temporary pin for vulnerability
@@ -457,7 +460,9 @@ project(':cruise-control-metrics-reporter') {
457460
testImplementation 'org.powermock:powermock-module-junit4:2.0.9'
458461
testImplementation 'org.powermock:powermock-api-easymock:2.0.9'
459462
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
460-
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion"
463+
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
464+
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
465+
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
461466
testImplementation 'commons-io:commons-io:2.11.0'
462467
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
463468
testOutput sourceSets.test.output

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
88
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
9-
import kafka.server.KafkaConfig;
109
import org.apache.kafka.clients.CommonClientConfigs;
1110
import org.apache.kafka.clients.admin.AdminClient;
1211
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -16,6 +15,10 @@
1615
import org.apache.kafka.clients.producer.Producer;
1716
import org.apache.kafka.clients.producer.ProducerConfig;
1817
import org.apache.kafka.clients.producer.ProducerRecord;
18+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
19+
import org.apache.kafka.network.SocketServerConfigs;
20+
import org.apache.kafka.server.config.ReplicationConfigs;
21+
import org.apache.kafka.server.config.ServerLogConfigs;
1922
import org.junit.After;
2023
import org.junit.Before;
2124
import org.junit.Test;
@@ -79,7 +82,7 @@ public Properties overridingProps() {
7982
Properties props = new Properties();
8083
int port = CCKafkaTestUtils.findLocalPort();
8184
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
82-
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://127.0.0.1:" + port);
85+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:" + port);
8386
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
8487
"127.0.0.1:" + port);
8588
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
@@ -91,11 +94,11 @@ public Properties overridingProps() {
9194
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
9295
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
9396
// disable topic auto-creation to leave the metrics reporter to create the metrics topic
94-
props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
95-
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
96-
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
97-
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
98-
props.setProperty(KafkaConfig.NumPartitionsProp(), "2");
97+
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
98+
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
99+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
100+
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
101+
props.setProperty(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "2");
99102
return props;
100103
}
101104

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterSslTest.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,13 @@
88
import java.io.IOException;
99
import java.util.Properties;
1010
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
11-
import kafka.server.KafkaConfig;
1211
import org.apache.kafka.clients.CommonClientConfigs;
1312
import org.apache.kafka.clients.producer.ProducerConfig;
1413
import org.apache.kafka.common.security.auth.SecurityProtocol;
14+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
15+
import org.apache.kafka.network.SocketServerConfigs;
16+
import org.apache.kafka.server.config.ReplicationConfigs;
17+
import org.apache.kafka.server.config.ServerLogConfigs;
1518
import org.junit.Assert;
1619

1720
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
@@ -45,14 +48,14 @@ public Properties overridingProps() {
4548
}
4649
}
4750
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
48-
props.setProperty(KafkaConfig.ListenersProp(), "SSL://127.0.0.1:" + port);
51+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port);
4952
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
5053
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name);
5154
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
5255
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
53-
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
54-
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
55-
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
56+
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
57+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
58+
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
5659
return props;
5760
}
5861

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.concurrent.atomic.AtomicInteger;
2121
import java.util.regex.Pattern;
2222
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
23-
import kafka.server.KafkaConfig;
2423
import org.apache.kafka.clients.CommonClientConfigs;
2524
import org.apache.kafka.clients.admin.AdminClient;
2625
import org.apache.kafka.clients.admin.TopicDescription;
@@ -35,6 +34,10 @@
3534
import org.apache.kafka.clients.producer.ProducerRecord;
3635
import org.apache.kafka.clients.producer.RecordMetadata;
3736
import org.apache.kafka.common.serialization.StringDeserializer;
37+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
38+
import org.apache.kafka.network.SocketServerConfigs;
39+
import org.apache.kafka.server.config.ReplicationConfigs;
40+
import org.apache.kafka.server.config.ServerLogConfigs;
3841
import org.junit.After;
3942
import org.junit.Before;
4043
import org.junit.Test;
@@ -86,13 +89,13 @@ public Properties overridingProps() {
8689
Properties props = new Properties();
8790
int port = CCKafkaTestUtils.findLocalPort();
8891
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
89-
props.setProperty(KafkaConfig.ListenersProp(), "PLAINTEXT://" + HOST + ":" + port);
92+
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port);
9093
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port);
9194
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
9295
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
93-
props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
94-
props.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
95-
props.setProperty(KafkaConfig.DefaultReplicationFactorProp(), "2");
96+
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
97+
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
98+
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
9699
return props;
97100
}
98101

@@ -210,7 +213,8 @@ public void testUpdatingMetricsTopicConfig() throws ExecutionException, Interrup
210213
public void testGetKafkaBootstrapServersConfigure() {
211214
// Test with a "listeners" config with a host
212215
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
213-
Map<String, Object> listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), brokerConfig.get(KafkaConfig.ListenersProp()));
216+
Map<String, Object> listenersMap = Collections.singletonMap(
217+
SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
214218
String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
215219
String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)";
216220
Pattern urlParsePattern = Pattern.compile(urlParse);
@@ -219,15 +223,15 @@ public void testGetKafkaBootstrapServersConfigure() {
219223

220224
// Test with a "listeners" config without a host in the first listener.
221225
String listeners = "SSL://:1234,PLAINTEXT://myhost:4321";
222-
listenersMap = Collections.singletonMap(KafkaConfig.ListenersProp(), listeners);
226+
listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners);
223227
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
224228
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
225229
assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]);
226230
assertEquals("1234", bootstrapServers.split(":")[1]);
227231

228232
// Test with "listeners" and "port" config together.
229233
listenersMap = new HashMap<>();
230-
listenersMap.put(KafkaConfig.ListenersProp(), listeners);
234+
listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
231235
listenersMap.put("port", "43");
232236
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
233237
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import org.apache.kafka.common.network.ListenerName;
1919
import org.apache.kafka.common.security.auth.SecurityProtocol;
2020
import org.apache.kafka.common.utils.Time;
21+
import org.apache.kafka.network.SocketServerConfigs;
22+
import org.apache.kafka.server.config.ServerConfigs;
23+
import org.apache.kafka.server.config.ServerLogConfigs;
2124
import org.slf4j.Logger;
2225
import org.slf4j.LoggerFactory;
2326
import scala.Option;
@@ -94,11 +97,11 @@ private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws Cla
9497
}
9598

9699
private void parseConfigs(Map<Object, Object> config) {
97-
_id = Integer.parseInt((String) config.get(KafkaConfig.BrokerIdProp()));
98-
_logDir = new File((String) config.get(KafkaConfig.LogDirProp()));
100+
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
101+
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));
99102

100103
// Bind addresses
101-
String listenersString = (String) config.get(KafkaConfig.ListenersProp());
104+
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
102105
for (String protocolAddr : listenersString.split("\\s*,\\s*")) {
103106
try {
104107
URI uri = new URI(protocolAddr.trim());

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java

+23-16
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,16 @@
99
import java.util.Map;
1010
import java.util.StringJoiner;
1111
import java.util.concurrent.atomic.AtomicInteger;
12-
import kafka.server.KafkaConfig;
12+
import org.apache.kafka.common.config.SslConfigs;
1313
import org.apache.kafka.common.network.Mode;
1414
import org.apache.kafka.common.security.auth.SecurityProtocol;
15+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
16+
import org.apache.kafka.network.SocketServerConfigs;
17+
import org.apache.kafka.server.config.ReplicationConfigs;
18+
import org.apache.kafka.server.config.ServerConfigs;
19+
import org.apache.kafka.server.config.ServerLogConfigs;
20+
import org.apache.kafka.server.config.ZkConfigs;
21+
import org.apache.kafka.storage.internals.log.CleanerConfig;
1522
import org.apache.kafka.test.TestSslUtils;
1623

1724

@@ -257,27 +264,27 @@ public Map<Object, Object> buildConfig() {
257264
if (_sslPort >= 0) {
258265
csvJoiner.add(SecurityProtocol.SSL.name + "://localhost:" + _sslPort);
259266
}
260-
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(_nodeId));
261-
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
262-
props.put(KafkaConfig.LogDirProp(), _logDirectory.getAbsolutePath());
263-
props.put(KafkaConfig.ZkConnectProp(), _zkConnect);
264-
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
265-
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), Long.toString(_socketTimeoutMs));
266-
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(_enableControlledShutdown));
267-
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(_enableDeleteTopic));
268-
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), Long.toString(_controlledShutdownRetryBackoff));
269-
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), Long.toString(_logCleanerDedupBufferSize));
270-
props.put(KafkaConfig.LogCleanerEnableProp(), Boolean.toString(_enableLogCleaner));
271-
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
272-
props.put(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
267+
props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(_nodeId));
268+
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
269+
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
270+
props.put(ZkConfigs.ZK_CONNECT_CONFIG, _zkConnect);
271+
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
272+
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
273+
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown));
274+
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic));
275+
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, Long.toString(_controlledShutdownRetryBackoff));
276+
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize));
277+
props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner));
278+
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
279+
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
273280
if (_rack != null) {
274-
props.put(KafkaConfig.RackProp(), _rack);
281+
props.put(ServerConfigs.BROKER_RACK_CONFIG, _rack);
275282
}
276283
if (_trustStore != null || _sslPort > 0) {
277284
try {
278285
props.putAll(TestSslUtils.createSslConfig(false, true, Mode.SERVER, _trustStore, "server" + _nodeId));
279286
// Switch interbroker to ssl
280-
props.put(KafkaConfig.InterBrokerSecurityProtocolProp(), SecurityProtocol.SSL.name);
287+
props.put(ReplicationConfigs.INTER_BROKER_SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name);
281288
} catch (Exception e) {
282289
throw new IllegalStateException(e);
283290
}

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaClientsIntegrationTestHarness.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66

77
import java.io.File;
88
import java.util.Properties;
9-
import kafka.server.KafkaConfig;
109
import org.apache.kafka.clients.CommonClientConfigs;
1110
import org.apache.kafka.clients.producer.ProducerConfig;
1211
import org.apache.kafka.clients.producer.Producer;
1312
import org.apache.kafka.clients.producer.KafkaProducer;
13+
import org.apache.kafka.common.config.SslConfigs;
1414
import org.apache.kafka.common.network.Mode;
1515
import org.apache.kafka.common.security.auth.SecurityProtocol;
1616
import org.apache.kafka.common.serialization.StringSerializer;
@@ -56,7 +56,7 @@ protected void setSecurityConfigs(Properties clientProps, String certAlias) {
5656
throw new AssertionError("ssl set but no trust store provided");
5757
}
5858
clientProps.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, protocol.name);
59-
clientProps.setProperty(KafkaConfig.SslEndpointIdentificationAlgorithmProp(), "");
59+
clientProps.setProperty(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
6060
try {
6161
clientProps.putAll(TestSslUtils.createSslConfig(true, true, Mode.CLIENT, trustStoreFile, certAlias));
6262
} catch (Exception e) {

cruise-control/src/integrationTest/java/com/linkedin/kafka/cruisecontrol/DiskFailureIntegrationTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
import com.linkedin.kafka.cruisecontrol.detector.AnomalyState;
2727
import com.linkedin.kafka.cruisecontrol.detector.TopicReplicationFactorAnomalyFinder;
2828
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
29-
import kafka.server.KafkaConfig;
3029
import net.minidev.json.JSONArray;
3130
import org.apache.commons.io.FileUtils;
3231
import org.apache.kafka.clients.admin.AdminClient;
3332
import org.apache.kafka.clients.admin.AdminClientConfig;
3433
import org.apache.kafka.clients.admin.NewTopic;
34+
import org.apache.kafka.server.config.ServerLogConfigs;
3535
import org.junit.After;
3636
import org.junit.Before;
3737
import org.junit.Test;
@@ -97,7 +97,7 @@ public Map<Object, Object> overridingProps() {
9797
Map<Object, Object> props = KafkaCruiseControlIntegrationTestUtils.createBrokerProps();
9898
Entry<File, File> logFolders = Map.entry(CCKafkaTestUtils.newTempDir(), CCKafkaTestUtils.newTempDir());
9999
_brokerLogDirs.add(logFolders);
100-
props.put(KafkaConfig.LogDirsProp(), logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
100+
props.put(ServerLogConfigs.LOG_DIR_CONFIG, logFolders.getKey().getAbsolutePath() + "," + logFolders.getValue().getAbsolutePath());
101101
return props;
102102
}
103103

cruise-control/src/integrationTest/java/com/linkedin/kafka/cruisecontrol/KafkaCruiseControlIntegrationTestUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
3535
import com.linkedin.kafka.cruisecontrol.monitor.sampling.CruiseControlMetricsReporterSampler;
3636
import com.linkedin.kafka.cruisecontrol.monitor.sampling.KafkaSampleStore;
37-
import kafka.server.KafkaConfig;
3837
import org.apache.commons.io.IOUtils;
3938
import org.apache.kafka.clients.admin.AdminClient;
4039
import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -45,6 +44,7 @@
4544
import org.apache.kafka.clients.producer.ProducerRecord;
4645
import org.apache.kafka.common.security.auth.SecurityProtocol;
4746
import org.apache.kafka.common.serialization.StringSerializer;
47+
import org.apache.kafka.network.SocketServerConfigs;
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
5050

@@ -168,7 +168,7 @@ public static Map<Object, Object> createBrokerProps() {
168168
StringJoiner csvJoiner = new StringJoiner(",");
169169
csvJoiner.add(SecurityProtocol.PLAINTEXT.name + "://localhost:"
170170
+ KafkaCruiseControlIntegrationTestUtils.findRandomOpenPortOnAllLocalInterfaces());
171-
props.put(KafkaConfig.ListenersProp(), csvJoiner.toString());
171+
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
172172
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
173173
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "2");
174174
props.put(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");

0 commit comments

Comments
 (0)