Skip to content

Commit 03b1b72

Browse files
authored
MINOR: Cleanup Storage Module (#19072)
Given that now we support Java 17 on our brokers, this PR replace the use of the following in storage module: - Collections.singletonList() and Collections.emptyList() with List.of() - Collections.singletonMap() and Collections.emptyMap() with Map.of() - Collections.singleton() and Collections.emptySet() with Set.of() - Arrays.asList() with List.of() Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent db4e74b commit 03b1b72

File tree

81 files changed

+320
-407
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

81 files changed

+320
-407
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,11 @@ class ConsumerTask implements Runnable, Closeable {
8585
private final Object assignPartitionsLock = new Object();
8686

8787
// Remote log metadata topic partitions that consumer is assigned to.
88-
private volatile Set<Integer> assignedMetadataPartitions = Collections.emptySet();
88+
private volatile Set<Integer> assignedMetadataPartitions = Set.of();
8989

9090
// User topic partitions that this broker is a leader/follower for.
91-
private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Collections.emptyMap();
92-
private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Collections.emptySet();
91+
private volatile Map<TopicIdPartition, UserTopicIdPartition> assignedUserTopicIdPartitions = Map.of();
92+
private volatile Set<TopicIdPartition> processedAssignmentOfUserTopicIdPartitions = Set.of();
9393

9494
private long uninitializedAt;
9595
private boolean isAllUserTopicPartitionsInitialized;
@@ -299,11 +299,11 @@ private void clearResourcesForUnassignedUserTopicPartitions(Set<TopicIdPartition
299299
}
300300

301301
void addAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
302-
updateAssignments(Objects.requireNonNull(partitions), Collections.emptySet());
302+
updateAssignments(Objects.requireNonNull(partitions), Set.of());
303303
}
304304

305305
void removeAssignmentsForPartitions(final Set<TopicIdPartition> partitions) {
306-
updateAssignments(Collections.emptySet(), Objects.requireNonNull(partitions));
306+
updateAssignments(Set.of(), Objects.requireNonNull(partitions));
307307
}
308308

309309
private void updateAssignments(final Set<TopicIdPartition> addedPartitions,

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
import java.io.Closeable;
3333
import java.io.IOException;
34-
import java.util.Collections;
3534
import java.util.Iterator;
3635
import java.util.Map;
3736
import java.util.Objects;
@@ -154,8 +153,8 @@ public void close() throws IOException {
154153

155154
// Clear the entries by creating unmodifiable empty maps.
156155
// Practically, we do not use the same instances that are closed.
157-
idToPartitionDeleteMetadata = Collections.emptyMap();
158-
idToRemoteLogMetadataCache = Collections.emptyMap();
156+
idToPartitionDeleteMetadata = Map.of();
157+
idToRemoteLogMetadataCache = Map.of();
159158
}
160159

161160
@Override

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@
5858
import java.util.concurrent.locks.ReentrantReadWriteLock;
5959
import java.util.function.Function;
6060
import java.util.function.Supplier;
61-
import java.util.stream.Collectors;
6261

6362
/**
6463
* This is the {@link RemoteLogMetadataManager} implementation with storage as an internal topic with name {@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
@@ -471,7 +470,7 @@ private void initializeResources() {
471470

472471
boolean doesTopicExist(Admin adminClient, String topic) {
473472
try {
474-
TopicDescription description = adminClient.describeTopics(Collections.singleton(topic))
473+
TopicDescription description = adminClient.describeTopics(Set.of(topic))
475474
.topicNameValues()
476475
.get(topic)
477476
.get();
@@ -491,7 +490,7 @@ boolean doesTopicExist(Admin adminClient, String topic) {
491490
private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
492491
String topicName) throws InterruptedException, ExecutionException {
493492
log.debug("Getting topic details to check for partition count and replication factor.");
494-
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName))
493+
TopicDescription topicDescription = adminClient.describeTopics(Set.of(topicName))
495494
.topicNameValues().get(topicName).get();
496495
int expectedPartitions = rlmmConfig.metadataTopicPartitionsCount();
497496
int topicPartitionsSize = topicDescription.partitions().size();
@@ -525,14 +524,14 @@ private boolean createTopic(Admin adminClient, NewTopic newTopic) {
525524
try {
526525
doesTopicExist = doesTopicExist(adminClient, topic);
527526
if (!doesTopicExist) {
528-
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(newTopic));
527+
CreateTopicsResult result = adminClient.createTopics(Set.of(newTopic));
529528
result.all().get();
530529
List<String> overriddenConfigs = result.config(topic).get()
531530
.entries()
532531
.stream()
533532
.filter(entry -> !entry.isDefault())
534533
.map(entry -> entry.name() + "=" + entry.value())
535-
.collect(Collectors.toList());
534+
.toList();
536535
log.info("Topic {} created. TopicId: {}, numPartitions: {}, replicationFactor: {}, config: {}",
537536
topic, result.topicId(topic).get(), result.numPartitions(topic).get(),
538537
result.replicationFactor(topic).get(), overriddenConfigs);

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Optional;
29-
import java.util.stream.Collectors;
3029

3130
public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
3231

@@ -52,7 +51,7 @@ private List<RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry> cre
5251
.map(entry -> new RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry()
5352
.setLeaderEpoch(entry.getKey())
5453
.setOffset(entry.getValue()))
55-
.collect(Collectors.toList());
54+
.toList();
5655
}
5756

5857
@Override

storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.List;
3131
import java.util.Map;
3232
import java.util.Optional;
33-
import java.util.stream.Collectors;
3433

3534
public class RemoteLogSegmentMetadataTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadata> {
3635

@@ -56,7 +55,7 @@ private List<RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry> createSegme
5655
.map(entry -> new RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry()
5756
.setLeaderEpoch(entry.getKey())
5857
.setOffset(entry.getValue()))
59-
.collect(Collectors.toList());
58+
.toList();
6059
}
6160

6261
private RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadata data) {

storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaManager.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.slf4j.Logger;
3333
import org.slf4j.LoggerFactory;
3434

35-
import java.util.Collections;
3635
import java.util.Map;
3736
import java.util.concurrent.TimeUnit;
3837
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -102,7 +101,7 @@ private MetricConfig getQuotaMetricConfig(Quota quota) {
102101
}
103102

104103
private MetricName metricName() {
105-
return metrics.metricName("byte-rate", quotaType.toString(), description, Collections.emptyMap());
104+
return metrics.metricName("byte-rate", quotaType.toString(), description, Map.of());
106105
}
107106

108107
private Sensor sensor() {

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ public Map<String, Object> remoteLogMetadataManagerProps() {
454454

455455
public Map<String, Object> getConfigProps(String configPrefixProp) {
456456
String prefixProp = config.getString(configPrefixProp);
457-
return prefixProp == null ? Collections.emptyMap() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
457+
return prefixProp == null ? Map.of() : Collections.unmodifiableMap(config.originalsWithPrefix(prefixProp));
458458
}
459459

460460
public int remoteLogManagerCopyNumQuotaSamples() {

storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import java.util.AbstractMap;
2828
import java.util.ArrayList;
29-
import java.util.Collections;
3029
import java.util.Iterator;
3130
import java.util.List;
3231
import java.util.Map;
@@ -406,7 +405,7 @@ private static List<EpochEntry> truncateFromEnd(TreeMap<Integer, EpochEntry> epo
406405
if (endOffset >= 0 && epochEntry.isPresent() && epochEntry.get().startOffset >= endOffset) {
407406
return removeWhileMatching(epochs.descendingMap().entrySet().iterator(), x -> x.startOffset >= endOffset);
408407
}
409-
return Collections.emptyList();
408+
return List.of();
410409
}
411410

412411
public OptionalInt epochForOffset(long offset) {

storage/src/main/java/org/apache/kafka/storage/internals/log/LocalLog.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.io.IOException;
3737
import java.nio.file.Files;
3838
import java.util.ArrayList;
39-
import java.util.Arrays;
4039
import java.util.Collection;
4140
import java.util.Collections;
4241
import java.util.Comparator;
@@ -51,7 +50,6 @@
5150
import java.util.regex.Pattern;
5251
import java.util.stream.Collectors;
5352

54-
import static java.util.Collections.singletonList;
5553
import static org.apache.kafka.common.utils.Utils.require;
5654
import static org.apache.kafka.storage.internals.log.LogFileUtils.CLEANED_FILE_SUFFIX;
5755
import static org.apache.kafka.storage.internals.log.LogFileUtils.DELETED_FILE_SUFFIX;
@@ -433,11 +431,11 @@ public LogSegment createAndDeleteSegment(long newOffset,
433431
config.preallocate);
434432
segments.add(newSegment);
435433

436-
reason.logReason(singletonList(segmentToDelete));
434+
reason.logReason(List.of(segmentToDelete));
437435
if (newOffset != segmentToDelete.baseOffset()) {
438436
segments.remove(segmentToDelete.baseOffset());
439437
}
440-
deleteSegmentFiles(singletonList(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
438+
deleteSegmentFiles(List.of(segmentToDelete), asyncDelete, dir, topicPartition, config, scheduler, logDirFailureChannel, logIdent);
441439
return newSegment;
442440
}
443441

@@ -619,7 +617,7 @@ public LogSegment roll(Long expectedNextOffset) {
619617
File offsetIdxFile = LogFileUtils.offsetIndexFile(dir, newOffset);
620618
File timeIdxFile = LogFileUtils.timeIndexFile(dir, newOffset);
621619
File txnIdxFile = LogFileUtils.transactionIndexFile(dir, newOffset);
622-
for (File file : Arrays.asList(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
620+
for (File file : List.of(logFile, offsetIdxFile, timeIdxFile, txnIdxFile)) {
623621
if (file.exists()) {
624622
logger.warn("Newly rolled segment file {} already exists; deleting it first", file.getAbsolutePath());
625623
Files.delete(file.toPath());
@@ -791,7 +789,7 @@ public static <T> Optional<T> nextItem(Iterator<T> iterator) {
791789

792790
private static FetchDataInfo emptyFetchDataInfo(LogOffsetMetadata fetchOffsetMetadata, boolean includeAbortedTxns) {
793791
Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions = includeAbortedTxns
794-
? Optional.of(Collections.emptyList())
792+
? Optional.of(List.of())
795793
: Optional.empty();
796794
return new FetchDataInfo(fetchOffsetMetadata, MemoryRecords.EMPTY, false, abortedTransactions);
797795
}
@@ -943,7 +941,7 @@ public static SplitSegmentResult splitOverflowedSegment(LogSegment segment,
943941
}
944942
// replace old segment with new ones
945943
LOG.info("{}Replacing overflowed segment {} with split segments {}", logPrefix, segment, newSegments);
946-
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, singletonList(segment),
944+
List<LogSegment> deletedSegments = replaceSegments(existingSegments, newSegments, List.of(segment),
947945
dir, topicPartition, config, scheduler, logDirFailureChannel, logPrefix, false);
948946
return new SplitSegmentResult(deletedSegments, newSegments);
949947
} catch (Exception e) {
@@ -1035,7 +1033,7 @@ public static List<LogSegment> replaceSegments(LogSegments existingSegments,
10351033
existingSegments.remove(segment.baseOffset());
10361034
}
10371035
deleteSegmentFiles(
1038-
singletonList(segment),
1036+
List.of(segment),
10391037
true,
10401038
dir,
10411039
topicPartition,

storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.kafka.common.record.RecordValidationStats;
2222
import org.apache.kafka.common.requests.ProduceResponse.RecordError;
2323

24-
import java.util.Collections;
2524
import java.util.List;
2625
import java.util.Optional;
2726

@@ -74,7 +73,7 @@ public LogAppendInfo(long firstOffset,
7473
int validBytes,
7574
long lastOffsetOfFirstBatch) {
7675
this(firstOffset, lastOffset, lastLeaderEpoch, maxTimestamp, logAppendTime, logStartOffset,
77-
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, Collections.emptyList(),
76+
recordValidationStats, sourceCompression, validBytes, lastOffsetOfFirstBatch, List.of(),
7877
LeaderHwChange.NONE);
7978
}
8079

storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import java.util.concurrent.TimeUnit;
4949
import java.util.stream.Collectors;
5050

51-
import static java.util.Arrays.asList;
5251
import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
5352
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
5453
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@@ -105,7 +104,7 @@ public LogConfigDef(ConfigDef base) {
105104

106105
@Override
107106
public List<String> headers() {
108-
return asList("Name", "Description", "Type", "Default", "Valid Values", SERVER_DEFAULT_HEADER_NAME, "Importance");
107+
return List.of("Name", "Description", "Type", "Default", "Valid Values", SERVER_DEFAULT_HEADER_NAME, "Importance");
109108
}
110109

111110
// Visible for testing
@@ -300,7 +299,7 @@ public Optional<String> serverConfigName(String configName) {
300299
private final Map<?, ?> props;
301300

302301
public LogConfig(Map<?, ?> props) {
303-
this(props, Collections.emptySet());
302+
this(props, Set.of());
304303
}
305304

306305
@SuppressWarnings({"this-escape"})
@@ -326,11 +325,11 @@ public LogConfig(Map<?, ?> props, Set<String> overriddenConfigs) {
326325
this.minCleanableRatio = getDouble(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG);
327326
this.compact = getList(TopicConfig.CLEANUP_POLICY_CONFIG).stream()
328327
.map(c -> c.toLowerCase(Locale.ROOT))
329-
.collect(Collectors.toList())
328+
.toList()
330329
.contains(TopicConfig.CLEANUP_POLICY_COMPACT);
331330
this.delete = getList(TopicConfig.CLEANUP_POLICY_CONFIG).stream()
332331
.map(c -> c.toLowerCase(Locale.ROOT))
333-
.collect(Collectors.toList())
332+
.toList()
334333
.contains(TopicConfig.CLEANUP_POLICY_DELETE);
335334
this.uncleanLeaderElectionEnable = getBoolean(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG);
336335
this.minInSyncReplicas = getInt(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
@@ -447,7 +446,7 @@ public static Optional<Type> configType(String configName) {
447446
}
448447

449448
public static List<String> configNames() {
450-
return CONFIG.names().stream().sorted().collect(Collectors.toList());
449+
return CONFIG.names().stream().sorted().toList();
451450
}
452451

453452
public static Optional<String> serverConfigName(String configName) {
@@ -610,7 +609,7 @@ private static void validateRemoteStorageRetentionTime(Map<?, ?> props) {
610609
* Check that the given properties contain only valid log config names and that all values can be parsed and are valid
611610
*/
612611
public static void validate(Properties props) {
613-
validate(Collections.emptyMap(), props, Collections.emptyMap(), false);
612+
validate(Map.of(), props, Map.of(), false);
614613
}
615614

616615
public static void validate(Map<String, String> existingConfigs,

storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@
4343
import java.nio.file.NoSuchFileException;
4444
import java.nio.file.attribute.FileTime;
4545
import java.util.Iterator;
46+
import java.util.List;
4647
import java.util.Optional;
4748
import java.util.OptionalLong;
4849
import java.util.concurrent.Callable;
4950
import java.util.concurrent.TimeUnit;
5051
import java.util.regex.Matcher;
5152
import java.util.regex.Pattern;
5253

53-
import static java.util.Arrays.asList;
5454

5555
/**
5656
* A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing
@@ -772,7 +772,7 @@ void closeHandlers() {
772772
*/
773773
public void deleteIfExists() throws IOException {
774774
try {
775-
Utils.tryAll(asList(
775+
Utils.tryAll(List.of(
776776
() -> deleteTypeIfExists(() -> log.deleteIfExists(), "log", log.file(), true),
777777
() -> deleteTypeIfExists(() -> lazyOffsetIndex.deleteIfExists(), "offset index", offsetIndexFile(), true),
778778
() -> deleteTypeIfExists(() -> lazyTimeIndex.deleteIfExists(), "time index", timeIndexFile(), true),

storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegments.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,13 @@
2222
import java.io.File;
2323
import java.io.IOException;
2424
import java.util.Collection;
25-
import java.util.Collections;
25+
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Optional;
2828
import java.util.OptionalLong;
2929
import java.util.concurrent.ConcurrentNavigableMap;
3030
import java.util.concurrent.ConcurrentSkipListMap;
3131
import java.util.function.Predicate;
32-
import java.util.stream.Collectors;
3332

3433
/**
3534
* This class encapsulates a thread-safe navigable map of LogSegment instances and provides the
@@ -141,7 +140,7 @@ public int numberOfSegments() {
141140
* @return the base offsets of all segments
142141
*/
143142
public Collection<Long> baseOffsets() {
144-
return values().stream().map(LogSegment::baseOffset).collect(Collectors.toList());
143+
return values().stream().map(LogSegment::baseOffset).toList();
145144
}
146145

147146
/**
@@ -182,7 +181,7 @@ public Collection<LogSegment> values() {
182181
public Collection<LogSegment> values(long from, long to) {
183182
if (from == to) {
184183
// Handle non-segment-aligned empty sets
185-
return Collections.emptyList();
184+
return List.of();
186185
} else if (to < from) {
187186
throw new IllegalArgumentException("Invalid log segment range: requested segments in " + topicPartition +
188187
" from offset " + from + " which is greater than limit offset " + to);
@@ -197,7 +196,7 @@ public Collection<LogSegment> values(long from, long to) {
197196
public Collection<LogSegment> nonActiveLogSegmentsFrom(long from) {
198197
LogSegment activeSegment = lastSegment().get();
199198
if (from > activeSegment.baseOffset())
200-
return Collections.emptyList();
199+
return List.of();
201200
else
202201
return values(from, activeSegment.baseOffset());
203202
}
@@ -314,7 +313,7 @@ public Collection<LogSegment> higherSegments(long baseOffset) {
314313
Long higherOffset = segments.higherKey(baseOffset);
315314
if (higherOffset != null)
316315
return segments.tailMap(higherOffset, true).values();
317-
return Collections.emptyList();
316+
return List.of();
318317
}
319318

320319
/**
@@ -334,7 +333,7 @@ public long sizeInBytes() {
334333
* @param predicate the predicate to be used for filtering segments.
335334
*/
336335
public Collection<LogSegment> filter(Predicate<LogSegment> predicate) {
337-
return values().stream().filter(predicate).collect(Collectors.toList());
336+
return values().stream().filter(predicate).toList();
338337
}
339338

340339
/**

0 commit comments

Comments
 (0)