Skip to content

Commit 4144290

Browse files
authored
MINOR: Cleanup metadata module (#18937)
Removed unused code and fixed IDEA warnings. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent c095faa commit 4144290

26 files changed

+35
-324
lines changed

core/src/main/scala/kafka/server/metadata/ScramPublisher.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class ScramPublisher(
5656
userChanges.forEach {
5757
case (userName, change) =>
5858
if (change.isPresent) {
59-
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential(mechanism))
59+
credentialProvider.updateCredential(mechanism, userName, change.get().toCredential)
6060
} else {
6161
credentialProvider.removeCredentials(mechanism, userName)
6262
}

metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java

-4
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,6 @@ PartitionsOnReplicaIterator partitionsWithNoLeader() {
278278
return iterator(NO_LEADER, true);
279279
}
280280

281-
PartitionsOnReplicaIterator partitionsLedByBroker(int brokerId) {
282-
return iterator(brokerId, true);
283-
}
284-
285281
PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
286282
return iterator(brokerId, false);
287283
}

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ BrokerFeature processRegistrationFeature(
521521
* @param brokerId The broker id to track.
522522
* @param brokerEpoch The broker epoch to track.
523523
*
524-
* @returns True only if the ClusterControlManager is active.
524+
* @return True only if the ClusterControlManager is active.
525525
*/
526526
boolean trackBrokerHeartbeat(int brokerId, long brokerEpoch) {
527527
BrokerHeartbeatManager manager = heartbeatManager;

metadata/src/main/java/org/apache/kafka/controller/OffsetControlManager.java

-5
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,4 @@ public void replay(AbortTransactionRecord message, long offset) {
418418
log.info("Replayed {} at offset {}. Reverted to offset {}.",
419419
message, offset, preTransactionOffset);
420420
}
421-
422-
// VisibleForTesting
423-
void setNextWriteOffset(long newNextWriteOffset) {
424-
this.nextWriteOffset = newNextWriteOffset;
425-
}
426421
}

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

-10
Original file line numberDiff line numberDiff line change
@@ -177,11 +177,6 @@ public final class QuorumController implements Controller {
177177
*/
178178
private static final int DEFAULT_MAX_RECORDS_PER_BATCH = 10000;
179179

180-
/**
181-
* The default minimum event time that can be logged as a slow event.
182-
*/
183-
private static final int DEFAULT_MIN_SLOW_EVENT_TIME_MS = 200;
184-
185180
/**
186181
* The maximum records any user-initiated operation is allowed to generate.
187182
*
@@ -677,11 +672,6 @@ public String toString() {
677672
}
678673
}
679674

680-
// Visible for testing
681-
OffsetControlManager offsetControl() {
682-
return offsetControl;
683-
}
684-
685675
// Visible for testing
686676
ReplicationControlManager replicationControl() {
687677
return replicationControl;

metadata/src/main/java/org/apache/kafka/controller/QuorumFeatures.java

-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ public int nodeId() {
8484
return nodeId;
8585
}
8686

87-
public Map<String, VersionRange> localSupportedFeatures() {
88-
return localSupportedFeatures;
89-
}
90-
9187
public List<Integer> quorumNodeIds() {
9288
return quorumNodeIds;
9389
}

metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -2071,9 +2071,8 @@ void generateLeaderAndIsrUpdates(String context,
20712071
alterPartitionReassignment(topic.name(), partition, records, allowRFChange);
20722072
successfulAlterations++;
20732073
} catch (Throwable e) {
2074-
log.info("Unable to alter partition reassignment for " +
2075-
topic.name() + ":" + partition.partitionIndex() + " because " +
2076-
"of an " + e.getClass().getSimpleName() + " error: " + e.getMessage());
2074+
log.info("Unable to alter partition reassignment for {}:{} because of an {} error: {}",
2075+
topic.name(), partition.partitionIndex(), e.getClass().getSimpleName(), e.getMessage());
20772076
error = ApiError.fromThrowable(e);
20782077
}
20792078
totalAlterations++;

metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java

-15
Original file line numberDiff line numberDiff line change
@@ -40,21 +40,6 @@ public static boolean isTimeoutException(Throwable exception) {
4040
return exception instanceof TimeoutException;
4141
}
4242

43-
/**
44-
* Check if an exception is a NotController exception.
45-
*
46-
* @param exception The exception to check.
47-
* @return True if the exception is a NotController exception.
48-
*/
49-
public static boolean isNotControllerException(Throwable exception) {
50-
if (exception == null) return false;
51-
if (exception instanceof ExecutionException) {
52-
exception = exception.getCause();
53-
if (exception == null) return false;
54-
}
55-
return exception instanceof NotControllerException;
56-
}
57-
5843
/**
5944
* Create a new exception indicating that the controller is in pre-migration mode, so the
6045
* operation cannot be completed.

metadata/src/main/java/org/apache/kafka/image/ClusterImage.java

-4
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,6 @@ public Map<Integer, ControllerRegistration> controllers() {
6666
return controllers;
6767
}
6868

69-
public boolean containsBroker(int brokerId) {
70-
return brokers.containsKey(brokerId);
71-
}
72-
7369
public long brokerEpoch(int brokerId) {
7470
BrokerRegistration brokerRegistration = broker(brokerId);
7571
if (brokerRegistration == null) {

metadata/src/main/java/org/apache/kafka/image/TopicDelta.java

-14
Original file line numberDiff line numberDiff line change
@@ -129,20 +129,6 @@ public TopicImage apply() {
129129
return new TopicImage(image.name(), image.id(), newPartitions);
130130
}
131131

132-
public boolean hasPartitionsWithAssignmentChanges() {
133-
for (Entry<Integer, PartitionRegistration> entry : partitionChanges.entrySet()) {
134-
int partitionId = entry.getKey();
135-
// New Partition.
136-
if (!image.partitions().containsKey(partitionId))
137-
return true;
138-
PartitionRegistration previousPartition = image.partitions().get(partitionId);
139-
PartitionRegistration currentPartition = entry.getValue();
140-
if (!previousPartition.hasSameAssignment(currentPartition))
141-
return true;
142-
}
143-
return false;
144-
}
145-
146132
/**
147133
* Find the partitions that have change based on the replica given.
148134
* <p>

metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java

+4-13
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.kafka.image.MetadataImage;
2424
import org.apache.kafka.image.loader.LoaderManifest;
2525
import org.apache.kafka.image.loader.LogDeltaManifest;
26-
import org.apache.kafka.image.loader.SnapshotManifest;
2726
import org.apache.kafka.queue.EventQueue;
2827
import org.apache.kafka.queue.KafkaEventQueue;
2928
import org.apache.kafka.server.fault.FaultHandler;
@@ -217,28 +216,20 @@ public void onMetadataUpdate(
217216
) {
218217
switch (manifest.type()) {
219218
case LOG_DELTA:
220-
publishLogDelta(delta, newImage, (LogDeltaManifest) manifest);
219+
publishLogDelta(newImage, (LogDeltaManifest) manifest);
221220
break;
222221
case SNAPSHOT:
223-
publishSnapshot(delta, newImage, (SnapshotManifest) manifest);
222+
publishSnapshot(newImage);
224223
break;
225224
}
226225
}
227226

228-
void publishSnapshot(
229-
MetadataDelta delta,
230-
MetadataImage newImage,
231-
SnapshotManifest manifest
232-
) {
227+
void publishSnapshot(MetadataImage newImage) {
233228
log.debug("Resetting the snapshot counters because we just read {}.", newImage.provenance().snapshotName());
234229
resetSnapshotCounters();
235230
}
236231

237-
void publishLogDelta(
238-
MetadataDelta delta,
239-
MetadataImage newImage,
240-
LogDeltaManifest manifest
241-
) {
232+
void publishLogDelta(MetadataImage newImage, LogDeltaManifest manifest) {
242233
bytesSinceLastSnapshot += manifest.numBytes();
243234
if (bytesSinceLastSnapshot >= maxBytesSinceLastSnapshot) {
244235
if (eventQueue.isEmpty()) {

metadata/src/main/java/org/apache/kafka/metadata/LeaderAndIsr.java

-21
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ public class LeaderAndIsr {
2626
public static final int INITIAL_LEADER_EPOCH = 0;
2727
public static final int INITIAL_PARTITION_EPOCH = 0;
2828
public static final int NO_LEADER = -1;
29-
public static final int NO_EPOCH = -1;
30-
public static final int LEADER_DURING_DELETE = -2;
31-
public static final int EPOCH_DURING_DELETE = -2;
3229

3330
private final int leader;
3431
private final int leaderEpoch;
@@ -74,10 +71,6 @@ public LeaderAndIsr(
7471
this.partitionEpoch = partitionEpoch;
7572
}
7673

77-
public static LeaderAndIsr duringDelete(List<Integer> isr) {
78-
return new LeaderAndIsr(LEADER_DURING_DELETE, isr);
79-
}
80-
8174
public int leader() {
8275
return leader;
8376
}
@@ -132,20 +125,6 @@ public List<Integer> isr() {
132125
.toList();
133126
}
134127

135-
public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) {
136-
if (this == other) {
137-
return true;
138-
} else if (other == null) {
139-
return false;
140-
} else {
141-
return leader == other.leader &&
142-
leaderEpoch == other.leaderEpoch &&
143-
isrWithBrokerEpoch.equals(other.isrWithBrokerEpoch) &&
144-
leaderRecoveryState == other.leaderRecoveryState &&
145-
partitionEpoch <= other.partitionEpoch;
146-
}
147-
}
148-
149128
@Override
150129
public String toString() {
151130
return "LeaderAndIsr(" +

metadata/src/main/java/org/apache/kafka/metadata/PartitionRegistration.java

-7
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,4 @@ public String toString() {
470470
", partitionEpoch=" + partitionEpoch +
471471
")";
472472
}
473-
474-
public boolean hasSameAssignment(PartitionRegistration registration) {
475-
return Arrays.equals(this.replicas, registration.replicas) &&
476-
Arrays.equals(this.directories, registration.directories) &&
477-
Arrays.equals(this.addingReplicas, registration.addingReplicas) &&
478-
Arrays.equals(this.removingReplicas, registration.removingReplicas);
479-
}
480473
}

metadata/src/main/java/org/apache/kafka/metadata/ScramCredentialData.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public UserScramCredentialRecord toRecord(
8686
setIterations(iterations);
8787
}
8888

89-
public ScramCredential toCredential(ScramMechanism mechanism) {
89+
public ScramCredential toCredential() {
9090
return new ScramCredential(salt, storedKey, serverKey, iterations);
9191
}
9292

metadata/src/main/java/org/apache/kafka/metadata/VersionRange.java

-7
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,6 @@ public boolean contains(short version) {
5656
return version >= min && version <= max;
5757
}
5858

59-
/**
60-
* Check if a given version range has overlap with this one
61-
*/
62-
public boolean intersects(VersionRange other) {
63-
return other.min <= max && other.max >= min;
64-
}
65-
6659
@Override
6760
public int hashCode() {
6861
return Objects.hash(min, max);

metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java

-4
Original file line numberDiff line numberDiff line change
@@ -208,8 +208,4 @@ public void close() throws Exception {
208208
beginShutdown("closing");
209209
queue.close();
210210
}
211-
212-
public CompletableFuture<Void> caughtUpFuture() {
213-
return caughtUpFuture;
214-
}
215211
}

metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,6 @@ public class ConfigurationControlManagerTest {
9494
static {
9595
SYNONYMS.put("abc", List.of(new ConfigSynonym("foo.bar")));
9696
SYNONYMS.put("def", List.of(new ConfigSynonym("baz")));
97-
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG,
98-
List.of(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
9997
SYNONYMS.put("quuux", List.of(new ConfigSynonym("quux", HOURS_TO_MILLISECONDS)));
10098
SYNONYMS.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, List.of(new ConfigSynonym(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG)));
10199
}
@@ -130,7 +128,7 @@ static <A, B> Entry<A, B> entry(A a, B b) {
130128
}
131129

132130
@Test
133-
public void testReplay() throws Exception {
131+
public void testReplay() {
134132
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
135133
setKafkaConfigSchema(SCHEMA).
136134
build();
@@ -314,7 +312,7 @@ public void validate(RequestMetadata actual) throws PolicyViolationException {
314312
}
315313

316314
@Override
317-
public void close() throws Exception {
315+
public void close() {
318316
// nothing to do
319317
}
320318

@@ -379,7 +377,7 @@ public void validate(RequestMetadata actual) throws PolicyViolationException {
379377
}
380378

381379
@Override
382-
public void close() throws Exception {
380+
public void close() {
383381
// empty
384382
}
385383

metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -154,12 +154,11 @@ public void testGenerateProducerIds() {
154154
assertEquals(new ProducerIdsBlock(3, 100000, 1000), producerIdControlManager.nextProducerBlock());
155155
}
156156

157-
static ProducerIdsBlock generateProducerIds(
157+
static void generateProducerIds(
158158
ProducerIdControlManager producerIdControlManager, int brokerId, long brokerEpoch) {
159159
ControllerResult<ProducerIdsBlock> result =
160160
producerIdControlManager.generateNextProducerId(brokerId, brokerEpoch);
161161
result.records().forEach(apiMessageAndVersion ->
162162
producerIdControlManager.replay((ProducerIdsRecord) apiMessageAndVersion.message()));
163-
return result.response();
164163
}
165164
}

metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java

-4
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,6 @@
5353
public class QuorumControllerIntegrationTestUtils {
5454
private static final Logger log = LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
5555

56-
BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
57-
return brokerFeatures(MetadataVersion.MINIMUM_VERSION, MetadataVersion.latestTesting());
58-
}
59-
6056
/**
6157
* Create a broker features collection for use in a registration request. We only set MV. here.
6258
*

metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java

-14
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,6 @@
9797
import org.apache.kafka.metadata.RecordTestUtils;
9898
import org.apache.kafka.metadata.RecordTestUtils.ImageDeltaPair;
9999
import org.apache.kafka.metadata.RecordTestUtils.TestThroughAllIntermediateImagesLeadingToFinalImageHelper;
100-
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
101100
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
102101
import org.apache.kafka.metadata.util.BatchFileWriter;
103102
import org.apache.kafka.metalog.LocalLogManager;
@@ -144,7 +143,6 @@
144143
import java.util.function.Function;
145144
import java.util.function.Supplier;
146145
import java.util.stream.Collectors;
147-
import java.util.stream.IntStream;
148146

149147
import static java.util.function.Function.identity;
150148
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET;
@@ -1409,11 +1407,6 @@ public void testConfigResourceExistenceChecker() throws Throwable {
14091407
}
14101408
}
14111409

1412-
private static final Uuid FOO_ID = Uuid.fromString("igRktLOnR8ektWHr79F8mw");
1413-
1414-
private static final Map<Integer, Long> ALL_ZERO_BROKER_EPOCHS =
1415-
IntStream.of(0, 1, 2, 3).boxed().collect(Collectors.toMap(identity(), __ -> 0L));
1416-
14171410
@Test
14181411
public void testFatalMetadataReplayErrorOnActive() throws Throwable {
14191412
try (
@@ -1476,13 +1469,6 @@ public void testFatalMetadataErrorDuringLogLoading() throws Exception {
14761469
}
14771470
}
14781471

1479-
private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
1480-
for (int i = 0; i < authorizers.size(); i++) {
1481-
assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
1482-
"authorizer " + i + " should not have completed loading.");
1483-
}
1484-
}
1485-
14861472
static class InitialSnapshot implements AutoCloseable {
14871473
File tempDir;
14881474
BatchFileWriter writer;

0 commit comments

Comments
 (0)