Skip to content

Commit ccf2510

Browse files
authored
MINOR: Remove dead code maybeWarnIfOversizedRecords (#19316)
The `metadataVersionSupplier` is unused after this - remove it. Also remove redundant `metadataVersion.fetchRequestVersion >= 13` check in `RemoteLeaderEndPoint` - the minimum version returned by this method is `13`. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent e301508 commit ccf2510

File tree

6 files changed

+12
-31
lines changed

6 files changed

+12
-31
lines changed

core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ class RemoteLeaderEndPoint(logPrefix: String,
203203
None
204204
} else {
205205
val metadataVersion = metadataVersionSupplier()
206-
val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {
206+
val version: Short = if (!fetchData.canUseTopicIds) {
207207
12
208208
} else {
209209
metadataVersion.fetchRequestVersion

core/src/main/scala/kafka/server/ReplicaFetcherManager.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
4747
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
4848
replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
4949
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
50-
quotaManager, logContext.logPrefix, metadataVersionSupplier)
50+
quotaManager, logContext.logPrefix)
5151
}
5252

5353
def shutdown(): Unit = {

core/src/main/scala/kafka/server/ReplicaFetcherThread.scala

+2-15
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
package kafka.server
1919

2020
import org.apache.kafka.common.TopicPartition
21-
import org.apache.kafka.common.record.MemoryRecords
2221
import org.apache.kafka.common.requests.FetchResponse
23-
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
22+
import org.apache.kafka.server.common.OffsetAndEpoch
2423
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}
2524

2625
import java.util.Optional
@@ -32,8 +31,7 @@ class ReplicaFetcherThread(name: String,
3231
failedPartitions: FailedPartitions,
3332
replicaMgr: ReplicaManager,
3433
quota: ReplicaQuota,
35-
logPrefix: String,
36-
metadataVersionSupplier: () => MetadataVersion)
34+
logPrefix: String)
3735
extends AbstractFetcherThread(name = name,
3836
clientId = name,
3937
leader = leader,
@@ -110,8 +108,6 @@ class ReplicaFetcherThread(name: String,
110108
val log = partition.localLogOrException
111109
val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData))
112110

113-
maybeWarnIfOversizedRecords(records, topicPartition)
114-
115111
if (fetchOffset != log.logEndOffset)
116112
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
117113
topicPartition, fetchOffset, log.logEndOffset))
@@ -161,15 +157,6 @@ class ReplicaFetcherThread(name: String,
161157
}
162158
}
163159

164-
private def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
165-
// oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
166-
if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
167-
error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
168-
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
169-
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
170-
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
171-
}
172-
173160
/**
174161
* Truncate the log for each partition's epoch based on leader's returned epoch and offset.
175162
* The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState

core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala

+5-10
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ class ReplicaFetcherThreadTest {
100100
failedPartitions,
101101
replicaMgr,
102102
quota,
103-
logContext.logPrefix,
104-
() => metadataVersion)
103+
logContext.logPrefix)
105104
}
106105

107106
@Test
@@ -291,8 +290,7 @@ class ReplicaFetcherThreadTest {
291290
failedPartitions,
292291
replicaManager,
293292
quota,
294-
logContext.logPrefix,
295-
() => MetadataVersion.MINIMUM_VERSION
293+
logContext.logPrefix
296294
) {
297295
override def processPartitionData(
298296
topicPartition: TopicPartition,
@@ -423,8 +421,7 @@ class ReplicaFetcherThreadTest {
423421
failedPartitions,
424422
replicaManager,
425423
quota,
426-
logContext.logPrefix,
427-
() => MetadataVersion.MINIMUM_VERSION
424+
logContext.logPrefix
428425
)
429426

430427
thread.addPartitions(Map(
@@ -515,8 +512,7 @@ class ReplicaFetcherThreadTest {
515512
failedPartitions,
516513
replicaManager,
517514
quota,
518-
logContext.logPrefix,
519-
() => MetadataVersion.MINIMUM_VERSION
515+
logContext.logPrefix
520516
)
521517

522518
thread.addPartitions(Map(
@@ -620,8 +616,7 @@ class ReplicaFetcherThreadTest {
620616
failedPartitions,
621617
replicaManager,
622618
replicaQuota,
623-
logContext.logPrefix,
624-
() => MetadataVersion.MINIMUM_VERSION)
619+
logContext.logPrefix)
625620

626621
val leaderEpoch = 1
627622

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -2904,7 +2904,7 @@ class ReplicaManagerTest {
29042904
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config,
29052905
rm, quotaManager.follower, () => MetadataVersion.MINIMUM_VERSION, () => 1)
29062906
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm,
2907-
quotaManager.follower, logContext.logPrefix, () => MetadataVersion.MINIMUM_VERSION) {
2907+
quotaManager.follower, logContext.logPrefix) {
29082908
override def doWork(): Unit = {
29092909
// In case the thread starts before the partition is added by AbstractFetcherManager,
29102910
// add it here (it's a no-op if already added)
@@ -3363,7 +3363,7 @@ class ReplicaManagerTest {
33633363
leader.setReplicaPartitionStateCallback(_ => PartitionState(leaderEpoch = 0))
33643364

33653365
val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager,
3366-
quotaManager, "", () => MetadataVersion.MINIMUM_VERSION)
3366+
quotaManager, "")
33673367

33683368
val initialFetchState = InitialFetchState(
33693369
topicId = Some(Uuid.randomUuid()),

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,7 @@ public Map<TopicPartition, FetchResponseData.PartitionData> fetch(FetchRequest.B
302302
new FailedPartitions(),
303303
replicaManager,
304304
replicaQuota,
305-
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
306-
() -> MetadataVersion.MINIMUM_VERSION
305+
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
307306
);
308307

309308
pool = partitions;

0 commit comments

Comments
 (0)