Skip to content

Commit 66b0419

Browse files
sjhajhariajanchilling
authored andcommitted
MINOR: Cleanup Raft Module (apache#19284)
Given that now we support Java 17 on our brokers, this PR replace the use of the following in raft module: - Collections.singletonList() and Collections.emptyList() with List.of() - Collections.singletonMap() and Collections.emptyMap() with Map.of() - Collections.singleton() and Collections.emptySet() with Set.of() - Arrays.asList() with List.of() Reviewers: Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent bef86a1 commit 66b0419

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

57 files changed

+470
-659
lines changed

raft/src/main/java/org/apache/kafka/raft/Batch.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.raft;
1818

19-
import java.util.Collections;
2019
import java.util.Iterator;
2120
import java.util.List;
2221
import java.util.Objects;
@@ -181,7 +180,7 @@ public static <T> Batch<T> control(
181180
appendTimestamp,
182181
sizeInBytes,
183182
baseOffset + records.size() - 1,
184-
Collections.emptyList(),
183+
List.of(),
185184
records
186185
);
187186
}
@@ -219,7 +218,7 @@ public static <T> Batch<T> data(
219218
sizeInBytes,
220219
baseOffset + records.size() - 1,
221220
records,
222-
Collections.emptyList()
221+
List.of()
223222
);
224223
}
225224
}

raft/src/main/java/org/apache/kafka/raft/ControlRecord.java

-17
Original file line numberDiff line numberDiff line change
@@ -79,23 +79,6 @@ public ControlRecordType type() {
7979
return recordType;
8080
}
8181

82-
public short version() {
83-
switch (recordType) {
84-
case LEADER_CHANGE:
85-
return ((LeaderChangeMessage) message).version();
86-
case SNAPSHOT_HEADER:
87-
return ((SnapshotHeaderRecord) message).version();
88-
case SNAPSHOT_FOOTER:
89-
return ((SnapshotFooterRecord) message).version();
90-
case KRAFT_VERSION:
91-
return ((KRaftVersionRecord) message).version();
92-
case KRAFT_VOTERS:
93-
return ((VotersRecord) message).version();
94-
default:
95-
throw new IllegalStateException(String.format("Unknown control record type %s", recordType));
96-
}
97-
}
98-
9982
public ApiMessage message() {
10083
return message;
10184
}

raft/src/main/java/org/apache/kafka/raft/DynamicVoter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.apache.kafka.common.network.ListenerName;
2323

2424
import java.net.InetSocketAddress;
25-
import java.util.Collections;
25+
import java.util.Map;
2626
import java.util.Objects;
2727

2828
/**
@@ -153,7 +153,7 @@ public int port() {
153153

154154
public VoterSet.VoterNode toVoterNode(String controllerListenerName) {
155155
ReplicaKey voterKey = ReplicaKey.of(nodeId, directoryId);
156-
Endpoints listeners = Endpoints.fromInetSocketAddresses(Collections.singletonMap(
156+
Endpoints listeners = Endpoints.fromInetSocketAddresses(Map.of(
157157
ListenerName.normalised(controllerListenerName),
158158
new InetSocketAddress(host, port)));
159159
SupportedVersionRange supportedKRaftVersion =

raft/src/main/java/org/apache/kafka/raft/ElectionState.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public boolean isLeader(int nodeId) {
6363
/**
6464
* Return if the replica has voted for the given candidate.
6565
*
66-
* A replica has voted for a candidate if all of the following are true:
66+
* A replica has voted for a candidate if all the following are true:
6767
* 1. the node's id and voted id match and
6868
* 2. if the voted directory id is set, it matches the node's directory id
6969
*

raft/src/main/java/org/apache/kafka/raft/Endpoints.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131

3232
import java.net.InetSocketAddress;
3333
import java.util.Collection;
34-
import java.util.Collections;
3534
import java.util.HashMap;
3635
import java.util.Iterator;
3736
import java.util.Map;
@@ -147,7 +146,7 @@ public UpdateRaftVoterRequestData.ListenerCollection toUpdateVoterRequest() {
147146
return listeners;
148147
}
149148

150-
private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap());
149+
private static final Endpoints EMPTY = new Endpoints(Map.of());
151150
public static Endpoints empty() {
152151
return EMPTY;
153152
}
@@ -188,7 +187,7 @@ public static Endpoints fromBeginQuorumEpochResponse(
188187
return Optional.ofNullable(endpoints.find(leaderId))
189188
.map(endpoint ->
190189
new Endpoints(
191-
Collections.singletonMap(
190+
Map.of(
192191
listenerName,
193192
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
194193
)
@@ -217,7 +216,7 @@ public static Endpoints fromEndQuorumEpochResponse(
217216
return Optional.ofNullable(endpoints.find(leaderId))
218217
.map(endpoint ->
219218
new Endpoints(
220-
Collections.singletonMap(
219+
Map.of(
221220
listenerName,
222221
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
223222
)
@@ -234,7 +233,7 @@ public static Endpoints fromVoteResponse(
234233
return Optional.ofNullable(endpoints.find(leaderId))
235234
.map(endpoint ->
236235
new Endpoints(
237-
Collections.singletonMap(
236+
Map.of(
238237
listenerName,
239238
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
240239
)
@@ -251,7 +250,7 @@ public static Endpoints fromFetchResponse(
251250
return Optional.ofNullable(endpoints.find(leaderId))
252251
.map(endpoint ->
253252
new Endpoints(
254-
Collections.singletonMap(
253+
Map.of(
255254
listenerName,
256255
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
257256
)
@@ -268,7 +267,7 @@ public static Endpoints fromFetchSnapshotResponse(
268267
return Optional.ofNullable(endpoints.find(leaderId))
269268
.map(endpoint ->
270269
new Endpoints(
271-
Collections.singletonMap(
270+
Map.of(
272271
listenerName,
273272
InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port())
274273
)

raft/src/main/java/org/apache/kafka/raft/KafkaNetworkChannel.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ private void sendOnComplete(RaftRequest.Outbound request, ClientResponse clientR
140140
log.error("Request {} failed due to unsupported version error", request, clientResponse.versionMismatch());
141141
response = errorResponse(request.data(), Errors.UNSUPPORTED_VERSION);
142142
} else if (clientResponse.authenticationException() != null) {
143-
// For now we treat authentication errors as retriable. We use the
143+
// For now, we treat authentication errors as retriable. We use the
144144
// `NETWORK_EXCEPTION` error code for lack of a good alternative.
145145
// Note that `NodeToControllerChannelManager` will still log the
146146
// authentication errors so that users have a chance to fix the problem.

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

+12-19
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@
9898
import java.net.InetSocketAddress;
9999
import java.nio.ByteBuffer;
100100
import java.util.Collection;
101-
import java.util.Collections;
102101
import java.util.HexFormat;
103102
import java.util.IdentityHashMap;
104103
import java.util.Iterator;
@@ -593,9 +592,7 @@ public void initialize(
593592
this.updateVoterHandler = new UpdateVoterHandler(
594593
nodeId,
595594
partitionState,
596-
channel.listenerName(),
597-
time,
598-
quorumConfig.requestTimeoutMs()
595+
channel.listenerName()
599596
);
600597
}
601598

@@ -654,7 +651,7 @@ private void onBecomeLeader(long currentTimeMs) {
654651
log.initializeLeaderEpoch(quorum.epoch());
655652

656653
// The high watermark can only be advanced once we have written a record
657-
// from the new leader's epoch. Hence we write a control message immediately
654+
// from the new leader's epoch. Hence, we write a control message immediately
658655
// to ensure there is no delay committing pending data.
659656
state.appendStartOfEpochControlRecords(quorum.localVoterNodeOrThrow(), currentTimeMs);
660657

@@ -1900,7 +1897,7 @@ private DescribeQuorumResponseData handleDescribeQuorumRequest(
19001897
* - {@link Errors#FENCED_LEADER_EPOCH} if the epoch is smaller than this node's epoch
19011898
* - {@link Errors#INVALID_REQUEST} if the request epoch is larger than the leader's current epoch
19021899
* or if either the fetch offset or the last fetched epoch is invalid
1903-
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exists
1900+
* - {@link Errors#SNAPSHOT_NOT_FOUND} if the request snapshot id does not exist
19041901
* - {@link Errors#POSITION_OUT_OF_RANGE} if the request snapshot offset out of range
19051902
*/
19061903
private FetchSnapshotResponseData handleFetchSnapshotRequest(
@@ -2418,7 +2415,7 @@ private boolean handleUpdateVoterResponse(
24182415
final Endpoints leaderEndpoints;
24192416
if (responseLeaderId.isPresent() && data.currentLeader().host().isEmpty()) {
24202417
leaderEndpoints = Endpoints.fromInetSocketAddresses(
2421-
Collections.singletonMap(
2418+
Map.of(
24222419
channel.listenerName(),
24232420
InetSocketAddress.createUnresolved(
24242421
data.currentLeader().host(),
@@ -2887,15 +2884,11 @@ private FetchRequestData buildFetchRequest() {
28872884

28882885
private long maybeSendFetchToAnyBootstrap(long currentTimeMs) {
28892886
Optional<Node> readyNode = requestManager.findReadyBootstrapServer(currentTimeMs);
2890-
if (readyNode.isPresent()) {
2891-
return maybeSendRequest(
2892-
currentTimeMs,
2893-
readyNode.get(),
2894-
this::buildFetchRequest
2895-
);
2896-
} else {
2897-
return requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs);
2898-
}
2887+
return readyNode.map(node -> maybeSendRequest(
2888+
currentTimeMs,
2889+
node,
2890+
this::buildFetchRequest
2891+
)).orElseGet(() -> requestManager.backoffBeforeAvailableBootstrapServer(currentTimeMs));
28992892
}
29002893

29012894
private FetchSnapshotRequestData buildFetchSnapshotRequest(OffsetAndEpoch snapshotId, long snapshotSize) {
@@ -3331,7 +3324,7 @@ private long pollCurrentState(long currentTimeMs) {
33313324
}
33323325

33333326
private void pollListeners() {
3334-
// Apply all of the pending registration
3327+
// Apply all the pending registration
33353328
while (true) {
33363329
Registration<T> registration = pendingRegistrations.poll();
33373330
if (registration == null) {
@@ -3825,7 +3818,7 @@ private void fireHandleCommit(long baseOffset, Records records) {
38253818
* This API is used for committed records originating from {@link #prepareAppend(int, List)}
38263819
* on this instance. In this case, we are able to save the original record objects, which
38273820
* saves the need to read them back from disk. This is a nice optimization for the leader
3828-
* which is typically doing more work than all of the * followers.
3821+
* which is typically doing more work than all the * followers.
38293822
*/
38303823
private void fireHandleCommit(
38313824
long baseOffset,
@@ -3835,7 +3828,7 @@ private void fireHandleCommit(
38353828
List<T> records
38363829
) {
38373830
Batch<T> batch = Batch.data(baseOffset, epoch, appendTimestamp, sizeInBytes, records);
3838-
MemoryBatchReader<T> reader = MemoryBatchReader.of(Collections.singletonList(batch), this);
3831+
MemoryBatchReader<T> reader = MemoryBatchReader.of(List.of(batch), this);
38393832
fireHandleCommit(reader);
38403833
}
38413834

raft/src/main/java/org/apache/kafka/raft/LeaderState.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public class LeaderState<T> implements EpochState {
7676
private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>();
7777
private final Logger log;
7878
private final BatchAccumulator<T> accumulator;
79-
// The set includes all of the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.
79+
// The set includes all the followers voters that FETCH or FETCH_SNAPSHOT during the current checkQuorumTimer interval.
8080
private final Set<Integer> fetchedVoters = new HashSet<>();
8181
private final Timer checkQuorumTimer;
8282
private final int checkQuorumTimeoutMs;

raft/src/main/java/org/apache/kafka/raft/QuorumConfig.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.kafka.common.utils.Utils;
2525

2626
import java.net.InetSocketAddress;
27-
import java.util.Collections;
2827
import java.util.HashMap;
2928
import java.util.List;
3029
import java.util.Map;
@@ -45,7 +44,7 @@
4544
* The default raft timeouts are relatively low compared to some other timeouts such as
4645
* request.timeout.ms. This is part of a general design philosophy where we see changing
4746
* the leader of a Raft cluster as a relatively quick operation. For example, the KIP-631
48-
* controller should be able to transition from standby to active without reloading all of
47+
* controller should be able to transition from standby to active without reloading all
4948
* the metadata. The standby is a "hot" standby, not a "cold" one.
5049
*/
5150
public class QuorumConfig {
@@ -59,14 +58,14 @@ public class QuorumConfig {
5958
public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint information for " +
6059
"the set of voters in a comma-separated list of <code>{id}@{host}:{port}</code> entries. " +
6160
"For example: <code>1@localhost:9092,2@localhost:9093,3@localhost:9094</code>";
62-
public static final List<String> DEFAULT_QUORUM_VOTERS = Collections.emptyList();
61+
public static final List<String> DEFAULT_QUORUM_VOTERS = List.of();
6362

6463
public static final String QUORUM_BOOTSTRAP_SERVERS_CONFIG = QUORUM_PREFIX + "bootstrap.servers";
6564
public static final String QUORUM_BOOTSTRAP_SERVERS_DOC = "List of endpoints to use for " +
6665
"bootstrapping the cluster metadata. The endpoints are specified in comma-separated list " +
6766
"of <code>{host}:{port}</code> entries. For example: " +
6867
"<code>localhost:9092,localhost:9093,localhost:9094</code>.";
69-
public static final List<String> DEFAULT_QUORUM_BOOTSTRAP_SERVERS = Collections.emptyList();
68+
public static final List<String> DEFAULT_QUORUM_BOOTSTRAP_SERVERS = List.of();
7069

7170
public static final String QUORUM_ELECTION_TIMEOUT_MS_CONFIG = QUORUM_PREFIX + "election.timeout.ms";
7271
public static final String QUORUM_ELECTION_TIMEOUT_MS_DOC = "Maximum time in milliseconds to wait " +

raft/src/main/java/org/apache/kafka/raft/QuorumState.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929
import java.io.IOException;
3030
import java.io.UncheckedIOException;
31-
import java.util.Collections;
3231
import java.util.List;
3332
import java.util.Optional;
3433
import java.util.OptionalInt;
@@ -185,7 +184,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IllegalStateE
185184
election.epoch(),
186185
partitionState.lastVoterSet().voterIds(),
187186
randomElectionTimeoutMs(),
188-
Collections.emptyList(),
187+
List.of(),
189188
localListeners,
190189
logContext
191190
);

0 commit comments

Comments
 (0)