Skip to content

Commit 2ae4ffb

Browse files
authored
MINOR: Cleanup Core Module (#19372)
Now that Kafka Brokers support Java 17, this PR makes some changes in core module. The changes in this PR are limited to only the Java files in the Core module. Scala related changes may follow next. The changes mostly include: - Collections.emptyList(), Collections.singletonList() and Arrays.asList() are replaced with List.of() - Collections.emptyMap() and Collections.singletonMap() are replaced with Map.of() - Collections.singleton() is replaced with Set.of() - Some changes to use enhanced switch statement. Reviewers: Andrew Schofield <[email protected]>, PoAn Yang <[email protected]>, Ken Huang <[email protected]>
1 parent 391b258 commit 2ae4ffb

23 files changed

+591
-639
lines changed

Diff for: core/src/main/java/kafka/docker/Log4jConfiguration.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.fasterxml.jackson.annotation.JsonProperty;
2323
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
2424

25-
import java.util.Collections;
2625
import java.util.LinkedHashMap;
2726
import java.util.List;
2827
import java.util.Map;
@@ -121,7 +120,7 @@ public void setProperties(String key, Object value) {
121120
@JsonIgnoreProperties(ignoreUnknown = true)
122121
class Loggers {
123122
private Root root;
124-
private List<Logger> logger = Collections.emptyList();
123+
private List<Logger> logger = List.of();
125124

126125
@JsonProperty("Root")
127126
public Root getRoot() {

Diff for: core/src/main/java/kafka/log/remote/RemoteLogManager.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1707,7 +1707,7 @@ public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws
17071707
RecordBatch firstBatch = enrichedRecordBatch.batch;
17081708
if (firstBatch == null)
17091709
return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
1710-
includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
1710+
includeAbortedTxns ? Optional.of(List.of()) : Optional.empty());
17111711

17121712
int firstBatchSize = firstBatch.sizeInBytes();
17131713
// An empty record is sent instead of an incomplete batch when
@@ -1783,7 +1783,7 @@ private FetchDataInfo addAbortedTransactions(long startOffset,
17831783
return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
17841784
fetchInfo.records,
17851785
fetchInfo.firstEntryIncomplete,
1786-
Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions)));
1786+
Optional.of(abortedTransactions.isEmpty() ? List.of() : new ArrayList<>(abortedTransactions)));
17871787
}
17881788

17891789
/**

Diff for: core/src/main/java/kafka/server/NetworkUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.apache.kafka.common.utils.LogContext;
3232
import org.apache.kafka.common.utils.Time;
3333

34-
import java.util.Collections;
34+
import java.util.Map;
3535

3636
public class NetworkUtils {
3737

@@ -62,7 +62,7 @@ public static NetworkClient buildNetworkClient(String prefix,
6262
metrics,
6363
time,
6464
metricGroupPrefix,
65-
Collections.emptyMap(),
65+
Map.of(),
6666
false,
6767
channelBuilder,
6868
logContext

Diff for: core/src/main/java/kafka/server/builders/KafkaApisBuilder.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import org.apache.kafka.server.authorizer.Authorizer;
4242
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
4343

44-
import java.util.Collections;
44+
import java.util.Map;
4545
import java.util.Optional;
4646

4747
import scala.jdk.javaapi.OptionConverters;
@@ -195,7 +195,7 @@ public KafkaApis build() {
195195
if (txnCoordinator == null) throw new RuntimeException("You must set txnCoordinator");
196196
if (autoTopicCreationManager == null)
197197
throw new RuntimeException("You must set autoTopicCreationManager");
198-
if (config == null) config = new KafkaConfig(Collections.emptyMap());
198+
if (config == null) config = new KafkaConfig(Map.of());
199199
if (configRepository == null) throw new RuntimeException("You must set configRepository");
200200
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
201201
if (metrics == null) throw new RuntimeException("You must set metrics");

Diff for: core/src/main/java/kafka/server/builders/LogManagerBuilder.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
3131

3232
import java.io.File;
33-
import java.util.Collections;
3433
import java.util.List;
3534

3635
import scala.jdk.javaapi.CollectionConverters;
@@ -39,7 +38,7 @@
3938
public class LogManagerBuilder {
4039
private static final int PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS = 600000;
4140
private List<File> logDirs = null;
42-
private List<File> initialOfflineDirs = Collections.emptyList();
41+
private List<File> initialOfflineDirs = List.of();
4342
private ConfigRepository configRepository = null;
4443
private LogConfig initialDefaultConfig = null;
4544
private CleanerConfig cleanerConfig = null;

Diff for: core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,11 @@
3232
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
3333
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
3434

35-
import java.util.Collections;
35+
import java.util.Map;
3636
import java.util.concurrent.atomic.AtomicBoolean;
3737

3838
import scala.Option;
3939

40-
41-
4240
public class ReplicaManagerBuilder {
4341
private KafkaConfig config = null;
4442
private Metrics metrics = null;
@@ -102,7 +100,7 @@ public ReplicaManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicSta
102100
}
103101

104102
public ReplicaManager build() {
105-
if (config == null) config = new KafkaConfig(Collections.emptyMap());
103+
if (config == null) config = new KafkaConfig(Map.of());
106104
if (logManager == null) throw new RuntimeException("You must set logManager");
107105
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
108106
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");

Diff for: core/src/main/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandler.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.kafka.common.resource.Resource;
3333
import org.apache.kafka.metadata.MetadataCache;
3434

35-
import java.util.Collections;
3635
import java.util.HashSet;
3736
import java.util.List;
3837
import java.util.Set;
@@ -96,7 +95,7 @@ public DescribeTopicPartitionsResponseData handleDescribeTopicPartitionsRequest(
9695
if (!fetchAllTopics && !isAuthorized) {
9796
// We should not return topicId when on unauthorized error, so we return zero uuid.
9897
unauthorizedForDescribeTopicMetadata.add(describeTopicPartitionsResponseTopic(
99-
Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, Collections.emptyList())
98+
Errors.TOPIC_AUTHORIZATION_FAILED, topicName, Uuid.ZERO_UUID, false, List.of())
10099
);
101100
}
102101
return isAuthorized;

Diff for: core/src/main/java/kafka/server/share/ShareFetchUtils.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44-
import java.util.Collections;
4544
import java.util.HashMap;
4645
import java.util.Iterator;
4746
import java.util.LinkedHashMap;
@@ -87,7 +86,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
8786
.setRecords(null)
8887
.setErrorCode(fetchPartitionData.error.code())
8988
.setErrorMessage(fetchPartitionData.error.message())
90-
.setAcquiredRecords(Collections.emptyList());
89+
.setAcquiredRecords(List.of());
9190

9291
// In case we get OFFSET_OUT_OF_RANGE error, that's because the Log Start Offset is later than the fetch offset.
9392
// So, we would update the start and end offset of the share partition and still return an empty
@@ -124,7 +123,7 @@ static Map<TopicIdPartition, ShareFetchResponseData.PartitionData> processFetchR
124123
if (shareAcquiredRecords.acquiredRecords().isEmpty()) {
125124
partitionData
126125
.setRecords(null)
127-
.setAcquiredRecords(Collections.emptyList());
126+
.setAcquiredRecords(List.of());
128127
} else {
129128
partitionData
130129
.setRecords(maybeSliceFetchRecords(fetchPartitionData.records, shareAcquiredRecords))

Diff for: core/src/main/java/kafka/server/share/SharePartition.java

+32-45
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.slf4j.LoggerFactory;
6464

6565
import java.util.ArrayList;
66-
import java.util.Collections;
6766
import java.util.HashMap;
6867
import java.util.List;
6968
import java.util.Map;
@@ -175,18 +174,13 @@ public RecordState validateTransition(RecordState newState) throws IllegalStateE
175174
}
176175

177176
public static RecordState forId(byte id) {
178-
switch (id) {
179-
case 0:
180-
return AVAILABLE;
181-
case 1:
182-
return ACQUIRED;
183-
case 2:
184-
return ACKNOWLEDGED;
185-
case 4:
186-
return ARCHIVED;
187-
default:
188-
throw new IllegalArgumentException("Unknown record state id: " + id);
189-
}
177+
return switch (id) {
178+
case 0 -> AVAILABLE;
179+
case 1 -> ACQUIRED;
180+
case 2 -> ACKNOWLEDGED;
181+
case 4 -> ARCHIVED;
182+
default -> throw new IllegalArgumentException("Unknown record state id: " + id);
183+
};
190184
}
191185

192186
public byte id() {
@@ -440,8 +434,8 @@ public CompletableFuture<Void> maybeInitialize() {
440434
persister.readState(new ReadShareGroupStateParameters.Builder()
441435
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionIdLeaderEpochData>()
442436
.setGroupId(this.groupId)
443-
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
444-
Collections.singletonList(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch)))))
437+
.setTopicsData(List.of(new TopicData<>(topicIdPartition.topicId(),
438+
List.of(PartitionFactory.newPartitionIdLeaderEpochData(topicIdPartition.partition(), leaderEpoch)))))
445439
.build())
446440
.build()
447441
).whenComplete((result, exception) -> {
@@ -1675,17 +1669,12 @@ private Map<Long, RecordState> fetchRecordStateMapForAcknowledgementBatch(
16751669
}
16761670

16771671
private static RecordState fetchRecordState(byte acknowledgeType) {
1678-
switch (acknowledgeType) {
1679-
case 1 /* ACCEPT */:
1680-
return RecordState.ACKNOWLEDGED;
1681-
case 2 /* RELEASE */:
1682-
return RecordState.AVAILABLE;
1683-
case 3 /* REJECT */:
1684-
case 0 /* GAP */:
1685-
return RecordState.ARCHIVED;
1686-
default:
1687-
throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType);
1688-
}
1672+
return switch (acknowledgeType) {
1673+
case 1 /* ACCEPT */ -> RecordState.ACKNOWLEDGED;
1674+
case 2 /* RELEASE */ -> RecordState.AVAILABLE;
1675+
case 3, 0 /* REJECT / GAP */ -> RecordState.ARCHIVED;
1676+
default -> throw new IllegalArgumentException("Invalid acknowledge type: " + acknowledgeType);
1677+
};
16891678
}
16901679

16911680
private NavigableMap<Long, InFlightBatch> fetchSubMapForAcknowledgementBatch(
@@ -2239,8 +2228,8 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc
22392228
persister.writeState(new WriteShareGroupStateParameters.Builder()
22402229
.setGroupTopicPartitionData(new GroupTopicPartitionData.Builder<PartitionStateBatchData>()
22412230
.setGroupId(this.groupId)
2242-
.setTopicsData(Collections.singletonList(new TopicData<>(topicIdPartition.topicId(),
2243-
Collections.singletonList(PartitionFactory.newPartitionStateBatchData(
2231+
.setTopicsData(List.of(new TopicData<>(topicIdPartition.topicId(),
2232+
List.of(PartitionFactory.newPartitionStateBatchData(
22442233
topicIdPartition.partition(), stateEpoch, startOffset, leaderEpoch, stateBatches))))
22452234
).build()).build())
22462235
.whenComplete((result, exception) -> {
@@ -2284,22 +2273,20 @@ CompletableFuture<Void> writeShareGroupState(List<PersisterStateBatch> stateBatc
22842273

22852274
private KafkaException fetchPersisterError(short errorCode, String errorMessage) {
22862275
Errors error = Errors.forCode(errorCode);
2287-
switch (error) {
2288-
case NOT_COORDINATOR:
2289-
case COORDINATOR_NOT_AVAILABLE:
2290-
case COORDINATOR_LOAD_IN_PROGRESS:
2291-
return new CoordinatorNotAvailableException(errorMessage);
2292-
case GROUP_ID_NOT_FOUND:
2293-
return new GroupIdNotFoundException(errorMessage);
2294-
case UNKNOWN_TOPIC_OR_PARTITION:
2295-
return new UnknownTopicOrPartitionException(errorMessage);
2296-
case FENCED_STATE_EPOCH:
2297-
return new FencedStateEpochException(errorMessage);
2298-
case FENCED_LEADER_EPOCH:
2299-
return new NotLeaderOrFollowerException(errorMessage);
2300-
default:
2301-
return new UnknownServerException(errorMessage);
2302-
}
2276+
return switch (error) {
2277+
case NOT_COORDINATOR, COORDINATOR_NOT_AVAILABLE, COORDINATOR_LOAD_IN_PROGRESS ->
2278+
new CoordinatorNotAvailableException(errorMessage);
2279+
case GROUP_ID_NOT_FOUND ->
2280+
new GroupIdNotFoundException(errorMessage);
2281+
case UNKNOWN_TOPIC_OR_PARTITION ->
2282+
new UnknownTopicOrPartitionException(errorMessage);
2283+
case FENCED_STATE_EPOCH ->
2284+
new FencedStateEpochException(errorMessage);
2285+
case FENCED_LEADER_EPOCH ->
2286+
new NotLeaderOrFollowerException(errorMessage);
2287+
default ->
2288+
new UnknownServerException(errorMessage);
2289+
};
23032290
}
23042291

23052292
// Visible for testing
@@ -2466,7 +2453,7 @@ private void releaseAcquisitionLockOnTimeoutForPerOffsetBatch(InFlightBatch inFl
24662453
}
24672454
}
24682455

2469-
private long startOffsetDuringInitialization(long partitionDataStartOffset) throws Exception {
2456+
private long startOffsetDuringInitialization(long partitionDataStartOffset) {
24702457
// Set the state epoch and end offset from the persisted state.
24712458
if (partitionDataStartOffset != PartitionFactory.UNINITIALIZED_START_OFFSET) {
24722459
return partitionDataStartOffset;

Diff for: core/src/main/java/kafka/server/share/SharePartitionManager.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363

6464
import java.util.ArrayList;
6565
import java.util.Collection;
66-
import java.util.Collections;
6766
import java.util.HashMap;
6867
import java.util.HashSet;
6968
import java.util.LinkedHashMap;
@@ -352,12 +351,12 @@ public CompletableFuture<Map<TopicIdPartition, ShareAcknowledgeResponseData.Part
352351
log.error("Share session error for {}: no such share session found", key);
353352
return FutureUtils.failedFuture(Errors.SHARE_SESSION_NOT_FOUND.exception());
354353
} else {
355-
log.debug("Removed share session with key " + key);
354+
log.debug("Removed share session with key {}", key);
356355
}
357356

358357
// Additionally release the acquired records for the respective member.
359358
if (topicIdPartitions.isEmpty()) {
360-
return CompletableFuture.completedFuture(Collections.emptyMap());
359+
return CompletableFuture.completedFuture(Map.of());
361360
}
362361

363362
Map<TopicIdPartition, CompletableFuture<Throwable>> futuresMap = new HashMap<>();
@@ -537,7 +536,7 @@ List<TopicIdPartition> cachedTopicIdPartitionsInShareSession(String groupId, Uui
537536
ShareSessionKey key = shareSessionKey(groupId, memberId);
538537
ShareSession shareSession = cache.get(key);
539538
if (shareSession == null) {
540-
return Collections.emptyList();
539+
return List.of();
541540
}
542541
List<TopicIdPartition> cachedTopicIdPartitions = new ArrayList<>();
543542
shareSession.partitionMap().forEach(cachedSharePartition -> cachedTopicIdPartitions.add(
@@ -570,7 +569,7 @@ private static String partitionsToLogString(Collection<TopicIdPartition> partiti
570569
void processShareFetch(ShareFetch shareFetch) {
571570
if (shareFetch.topicIdPartitions().isEmpty()) {
572571
// If there are no partitions to fetch then complete the future with an empty map.
573-
shareFetch.maybeComplete(Collections.emptyMap());
572+
shareFetch.maybeComplete(Map.of());
574573
return;
575574
}
576575

@@ -633,7 +632,7 @@ void processShareFetch(ShareFetch shareFetch) {
633632

634633
// If all the partitions in the request errored out, then complete the fetch request with an exception.
635634
if (shareFetch.errorInAllPartitions()) {
636-
shareFetch.maybeComplete(Collections.emptyMap());
635+
shareFetch.maybeComplete(Map.of());
637636
// Do not proceed with share fetch processing as all the partitions errored out.
638637
return;
639638
}
@@ -654,7 +653,7 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
654653
// to identify the respective share partition.
655654
SharePartitionListener listener = new SharePartitionListener(sharePartitionKey, replicaManager, partitionCacheMap);
656655
replicaManager.maybeAddListener(sharePartitionKey.topicIdPartition().topicPartition(), listener);
657-
SharePartition partition = new SharePartition(
656+
return new SharePartition(
658657
sharePartitionKey.groupId(),
659658
sharePartitionKey.topicIdPartition(),
660659
leaderEpoch,
@@ -668,7 +667,6 @@ private SharePartition getOrCreateSharePartition(SharePartitionKey sharePartitio
668667
groupConfigManager,
669668
listener
670669
);
671-
return partition;
672670
});
673671
}
674672

0 commit comments

Comments
 (0)