Skip to content

MINOR: Remove dead code maybeWarnIfOversizedRecords #19316

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 1, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class ReplicaFetcherManager(brokerConfig: KafkaConfig,
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, endpoint, fetchSessionHandler, brokerConfig,
replicaManager, quotaManager, metadataVersionSupplier, brokerEpochSupplier)
new ReplicaFetcherThread(threadName, leader, brokerConfig, failedPartitions, replicaManager,
quotaManager, logContext.logPrefix, metadataVersionSupplier)
quotaManager, logContext.logPrefix)
}

def shutdown(): Unit = {
Expand Down
17 changes: 2 additions & 15 deletions core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
package kafka.server

import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchResponse
import org.apache.kafka.server.common.{MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.common.OffsetAndEpoch
import org.apache.kafka.storage.internals.log.{LogAppendInfo, LogStartOffsetIncrementReason}

import java.util.Optional
Expand All @@ -32,8 +31,7 @@ class ReplicaFetcherThread(name: String,
failedPartitions: FailedPartitions,
replicaMgr: ReplicaManager,
quota: ReplicaQuota,
logPrefix: String,
metadataVersionSupplier: () => MetadataVersion)
logPrefix: String)
extends AbstractFetcherThread(name = name,
clientId = name,
leader = leader,
Expand Down Expand Up @@ -110,8 +108,6 @@ class ReplicaFetcherThread(name: String,
val log = partition.localLogOrException
val records = toMemoryRecords(FetchResponse.recordsOrFail(partitionData))

maybeWarnIfOversizedRecords(records, topicPartition)

if (fetchOffset != log.logEndOffset)
throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
topicPartition, fetchOffset, log.logEndOffset))
Expand Down Expand Up @@ -161,15 +157,6 @@ class ReplicaFetcherThread(name: String,
}
}

private def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
// oversized messages don't cause replication to fail from fetch request version 3 (KIP-74)
if (metadataVersionSupplier().fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems another use case of fetchRequestVersion is dead code. Maybe we can remove it as well?

      val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {
        12
      } else {
        metadataVersion.fetchRequestVersion
      }

val version: Short = if (metadataVersion.fetchRequestVersion >= 13 && !fetchData.canUseTopicIds) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're saying that IBP 3.3 implies fetch request 13 or higher. That makes sense, I pushed a commit that removes that check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also updated the PR description.

error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
"This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
"message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
"equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
}

/**
* Truncate the log for each partition's epoch based on leader's returned epoch and offset.
* The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaMgr,
quota,
logContext.logPrefix,
() => metadataVersion)
logContext.logPrefix)
}

@Test
Expand Down Expand Up @@ -291,8 +290,7 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaManager,
quota,
logContext.logPrefix,
() => MetadataVersion.MINIMUM_VERSION
logContext.logPrefix
) {
override def processPartitionData(
topicPartition: TopicPartition,
Expand Down Expand Up @@ -423,8 +421,7 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaManager,
quota,
logContext.logPrefix,
() => MetadataVersion.MINIMUM_VERSION
logContext.logPrefix
)

thread.addPartitions(Map(
Expand Down Expand Up @@ -515,8 +512,7 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaManager,
quota,
logContext.logPrefix,
() => MetadataVersion.MINIMUM_VERSION
logContext.logPrefix
)

thread.addPartitions(Map(
Expand Down Expand Up @@ -620,8 +616,7 @@ class ReplicaFetcherThreadTest {
failedPartitions,
replicaManager,
replicaQuota,
logContext.logPrefix,
() => MetadataVersion.MINIMUM_VERSION)
logContext.logPrefix)

val leaderEpoch = 1

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2904,7 +2904,7 @@ class ReplicaManagerTest {
val leader = new RemoteLeaderEndPoint(logContext.logPrefix, blockingSend, fetchSessionHandler, rm.config,
rm, quotaManager.follower, () => MetadataVersion.MINIMUM_VERSION, () => 1)
new ReplicaFetcherThread(s"ReplicaFetcherThread-$fetcherId", leader, rm.config, failedPartitions, rm,
quotaManager.follower, logContext.logPrefix, () => MetadataVersion.MINIMUM_VERSION) {
quotaManager.follower, logContext.logPrefix) {
override def doWork(): Unit = {
// In case the thread starts before the partition is added by AbstractFetcherManager,
// add it here (it's a no-op if already added)
Expand Down Expand Up @@ -3363,7 +3363,7 @@ class ReplicaManagerTest {
leader.setReplicaPartitionStateCallback(_ => PartitionState(leaderEpoch = 0))

val fetcher = new ReplicaFetcherThread(threadName, leader, config, failedPartitions, replicaManager,
quotaManager, "", () => MetadataVersion.MINIMUM_VERSION)
quotaManager, "")

val initialFetchState = InitialFetchState(
topicId = Some(Uuid.randomUuid()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ public Map<TopicPartition, FetchResponseData.PartitionData> fetch(FetchRequest.B
new FailedPartitions(),
replicaManager,
replicaQuota,
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3),
() -> MetadataVersion.MINIMUM_VERSION
String.format("[ReplicaFetcher replicaId=%d, leaderId=%d, fetcherId=%d", config.brokerId(), 3, 3)
);

pool = partitions;
Expand Down