Skip to content

MINOR: Cleanup Raft Module #19284

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/Batch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.raft;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -181,7 +180,7 @@ public static <T> Batch<T> control(
appendTimestamp,
sizeInBytes,
baseOffset + records.size() - 1,
Collections.emptyList(),
List.of(),
records
);
}
Expand Down Expand Up @@ -219,7 +218,7 @@ public static <T> Batch<T> data(
sizeInBytes,
baseOffset + records.size() - 1,
records,
Collections.emptyList()
List.of()
);
}
}
22 changes: 8 additions & 14 deletions raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,14 @@ public ControlRecordType type() {
}

public short version() {
switch (recordType) {
case LEADER_CHANGE:
return ((LeaderChangeMessage) message).version();
case SNAPSHOT_HEADER:
return ((SnapshotHeaderRecord) message).version();
case SNAPSHOT_FOOTER:
return ((SnapshotFooterRecord) message).version();
case KRAFT_VERSION:
return ((KRaftVersionRecord) message).version();
case KRAFT_VOTERS:
return ((VotersRecord) message).version();
default:
throw new IllegalStateException(String.format("Unknown control record type %s", recordType));
}
return switch (recordType) {
case LEADER_CHANGE -> ((LeaderChangeMessage) message).version();
case SNAPSHOT_HEADER -> ((SnapshotHeaderRecord) message).version();
case SNAPSHOT_FOOTER -> ((SnapshotFooterRecord) message).version();
case KRAFT_VERSION -> ((KRaftVersionRecord) message).version();
case KRAFT_VOTERS -> ((VotersRecord) message).version();
default -> throw new IllegalStateException(String.format("Unknown control record type %s", recordType));
};
}

public ApiMessage message() {
Expand Down
4 changes: 2 additions & 2 deletions raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.kafka.common.network.ListenerName;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;

/**
Expand Down Expand Up @@ -153,7 +153,7 @@ public int port() {

public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
Endpoints listeners = Endpoints.fromInetSocketAddresses(Collections.singletonMap(
Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
ListenerName.normalised(controllerListenerName),
new InetSocketAddress(host, port)));
SupportedVersionRange supportedKRaftVersion =
Expand Down
13 changes: 6 additions & 7 deletions raft/src/main/java/org/apache/kafka/raft/Endpoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -147,7 +146,7 @@ public UpdateRaftVoterRequestData.ListenerCollection toUpdateVoterRequest() {
return listeners;
}

private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap());
private static final Endpoints EMPTY = new Endpoints(Map.of());
public static Endpoints empty() {
return EMPTY;
}
Expand Down Expand Up @@ -188,7 +187,7 @@ public static Endpoints fromBeginQuorumEpochResponse(
return Optional.ofNullable(endpoints.find(leaderId))
.map(endpoint ->
new Endpoints(
Collections.singletonMap(
Map.of(
listenerName,
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
)
Expand Down Expand Up @@ -217,7 +216,7 @@ public static Endpoints fromEndQuorumEpochResponse(
return Optional.ofNullable(endpoints.find(leaderId))
.map(endpoint ->
new Endpoints(
Collections.singletonMap(
Map.of(
listenerName,
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
)
Expand All @@ -234,7 +233,7 @@ public static Endpoints fromVoteResponse(
return Optional.ofNullable(endpoints.find(leaderId))
.map(endpoint ->
new Endpoints(
Collections.singletonMap(
Map.of(
listenerName,
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
)
Expand All @@ -251,7 +250,7 @@ public static Endpoints fromFetchResponse(
return Optional.ofNullable(endpoints.find(leaderId))
.map(endpoint ->
new Endpoints(
Collections.singletonMap(
Map.of(
listenerName,
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
)
Expand All @@ -268,7 +267,7 @@ public static Endpoints fromFetchSnapshotResponse(
return Optional.ofNullable(endpoints.find(leaderId))
.map(endpoint ->
new Endpoints(
Collections.singletonMap(
Map.of(
listenerName,
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
)
Expand Down
9 changes: 4 additions & 5 deletions raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.HexFormat;
import java.util.IdentityHashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -654,7 +653,7 @@ private void onBecomeLeader(long currentTimeMs) {
log.initializeLeaderEpoch(quorum.epoch());

// The high watermark can only be advanced once we have written a record
// from the new leader's epoch. Hence we write a control message immediately
// from the new leader's epoch. Hence, we write a control message immediately
// to ensure there is no delay committing pending data.
state.appendStartOfEpochControlRecords(quorum.localVoterNodeOrThrow(), currentTimeMs);

Expand Down Expand Up @@ -1900,7 +1899,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
* or if either the fetch offset or the last fetched epoch is invalid
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exist
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
*/
private FetchSnapshotResponseData handleFetchSnapshotRequest(
Expand Down Expand Up @@ -2418,7 +2417,7 @@ private boolean handleUpdateVoterResponse(
final Endpoints leaderEndpoints;
if (responseLeaderId.isPresent() && data.currentLeader().host().isEmpty()) {
leaderEndpoints = Endpoints.fromInetSocketAddresses(
Collections.singletonMap(
Map.of(
channel.listenerName(),
InetSocketAddress.createUnresolved(
data.currentLeader().host(),
Expand Down Expand Up @@ -3835,7 +3834,7 @@ private void fireHandleCommit(
List<T> records
) {
Batch<T> batch = Batch.data(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
MemoryBatchReader<T> reader = MemoryBatchReader.of(Collections.singletonList(batch), this);
MemoryBatchReader<T> reader = MemoryBatchReader.of(List.of(batch), this);
fireHandleCommit(reader);
}

Expand Down
5 changes: 2 additions & 3 deletions raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.kafka.common.utils.Utils;

import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,14 +58,14 @@ public class QuorumConfig {
public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " +
"the set of voters in a comma-separated list of <code>{id}@{host}:{port}</code> entries. " +
"For example: <code>1@localhost:9092,2@localhost:9093,3@localhost:9094</code>";
public static final List<String> DEFAULT_QUORUM_VOTERS = Collections.emptyList();
public static final List<String> DEFAULT_QUORUM_VOTERS = List.of();

public static final String QUORUM_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "bootstrap.servers";
public static final String QUORUM_BOOTSTRAP_SERVERS_DOC = "List of endpoints to use for " +
"bootstrapping the cluster metadata. The endpoints are specified in comma-separated list " +
"of <code>{host}:{port}</code> entries. For example: " +
"<code>localhost:9092,localhost:9093,localhost:9094</code>.";
public static final List<String> DEFAULT_QUORUM_BOOTSTRAP_SERVERS = Collections.emptyList();
public static final List<String> DEFAULT_QUORUM_BOOTSTRAP_SERVERS = List.of();

public static final String QUORUM_ELECTION_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "election.timeout.ms";
public static final String QUORUM_ELECTION_TIMEOUT_MS_DOC = "Maximum time in milliseconds to wait " +
Expand Down
3 changes: 1 addition & 2 deletions raft/src/main/java/org/apache/kafka/raft/QuorumState.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
Expand Down Expand Up @@ -185,7 +184,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
election.epoch(),
partitionState.lastVoterSet().voterIds(),
randomElectionTimeoutMs(),
Collections.emptyList(),
List.of(),
localListeners,
logContext
);
Expand Down
49 changes: 24 additions & 25 deletions raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
Expand Down Expand Up @@ -90,10 +89,10 @@ public static FetchRequestData singletonFetchRequest(
new FetchRequestData.FetchTopic()
.setTopic(topicPartition.topic())
.setTopicId(topicId)
.setPartitions(Collections.singletonList(fetchPartition));
.setPartitions(List.of(fetchPartition));

return new FetchRequestData()
.setTopics(Collections.singletonList(fetchTopic));
.setTopics(List.of(fetchTopic));
}

public static FetchResponseData singletonFetchResponse(
Expand All @@ -117,7 +116,7 @@ public static FetchResponseData singletonFetchResponse(
new FetchResponseData.FetchableTopicResponse()
.setTopic(topicPartition.topic())
.setTopicId(topicId)
.setPartitions(Collections.singletonList(fetchablePartition));
.setPartitions(List.of(fetchablePartition));

FetchResponseData response = new FetchResponseData();

Expand All @@ -138,7 +137,7 @@ public static FetchResponseData singletonFetchResponse(

return response
.setErrorCode(topLevelError.code())
.setResponses(Collections.singletonList(fetchableTopic));
.setResponses(List.of(fetchableTopic));
}

public static VoteRequestData singletonVoteRequest(
Expand All @@ -155,11 +154,11 @@ public static VoteRequestData singletonVoteRequest(
.setClusterId(clusterId)
.setVoterId(voterKey.id())
.setTopics(
Collections.singletonList(
List.of(
new VoteRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new VoteRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setReplicaEpoch(replicaEpoch)
Expand Down Expand Up @@ -202,10 +201,10 @@ public static VoteResponseData singletonVoteResponse(

VoteResponseData response = new VoteResponseData()
.setErrorCode(topLevelError.code())
.setTopics(Collections.singletonList(
.setTopics(List.of(
new VoteResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(partitionData))));
.setPartitions(List.of(partitionData))));

if (apiVersion >= 1) {
Optional<InetSocketAddress> address = endpoints.address(listenerName);
Expand Down Expand Up @@ -250,10 +249,10 @@ public static FetchSnapshotRequestData singletonFetchSnapshotRequest(
.setReplicaId(replicaKey.id())
.setMaxBytes(maxBytes)
.setTopics(
Collections.singletonList(
List.of(
new FetchSnapshotRequestData.TopicSnapshot()
.setName(topicPartition.topic())
.setPartitions(Collections.singletonList(partitionSnapshot))
.setPartitions(List.of(partitionSnapshot))
)
);
}
Expand Down Expand Up @@ -285,10 +284,10 @@ public static FetchSnapshotResponseData singletonFetchSnapshotResponse(

FetchSnapshotResponseData response = new FetchSnapshotResponseData()
.setTopics(
Collections.singletonList(
List.of(
new FetchSnapshotResponseData.TopicSnapshot()
.setName(topicPartition.topic())
.setPartitions(Collections.singletonList(partitionSnapshot))
.setPartitions(List.of(partitionSnapshot))
)
);

Expand Down Expand Up @@ -323,11 +322,11 @@ public static BeginQuorumEpochRequestData singletonBeginQuorumEpochRequest(
.setClusterId(clusterId)
.setVoterId(voterKey.id())
.setTopics(
Collections.singletonList(
List.of(
new BeginQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new BeginQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
Expand All @@ -353,11 +352,11 @@ public static BeginQuorumEpochResponseData singletonBeginQuorumEpochResponse(
BeginQuorumEpochResponseData response = new BeginQuorumEpochResponseData()
.setErrorCode(topLevelError.code())
.setTopics(
Collections.singletonList(
List.of(
new BeginQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new BeginQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
Expand Down Expand Up @@ -409,11 +408,11 @@ public static EndQuorumEpochRequestData singletonEndQuorumEpochRequest(
return new EndQuorumEpochRequestData()
.setClusterId(clusterId)
.setTopics(
Collections.singletonList(
List.of(
new EndQuorumEpochRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new EndQuorumEpochRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setLeaderEpoch(leaderEpoch)
Expand All @@ -439,10 +438,10 @@ public static EndQuorumEpochResponseData singletonEndQuorumEpochResponse(
) {
EndQuorumEpochResponseData response = new EndQuorumEpochResponseData()
.setErrorCode(topLevelError.code())
.setTopics(Collections.singletonList(
.setTopics(List.of(
new EndQuorumEpochResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(Collections.singletonList(
.setPartitions(List.of(
new EndQuorumEpochResponseData.PartitionData()
.setErrorCode(partitionLevelError.code())
.setLeaderId(leaderId)
Expand Down Expand Up @@ -476,11 +475,11 @@ public static DescribeQuorumRequestData singletonDescribeQuorumRequest(

return new DescribeQuorumRequestData()
.setTopics(
Collections.singletonList(
List.of(
new DescribeQuorumRequestData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new DescribeQuorumRequestData.PartitionData()
.setPartitionIndex(topicPartition.partition())
)
Expand All @@ -501,11 +500,11 @@ public static DescribeQuorumResponseData singletonDescribeQuorumResponse(
) {
DescribeQuorumResponseData response = new DescribeQuorumResponseData()
.setTopics(
Collections.singletonList(
List.of(
new DescribeQuorumResponseData.TopicData()
.setTopicName(topicPartition.topic())
.setPartitions(
Collections.singletonList(
List.of(
new DescribeQuorumResponseData.PartitionData()
.setPartitionIndex(topicPartition.partition())
.setErrorCode(Errors.NONE.code())
Expand Down
Loading