Skip to content

KAFKA-19004:Move DelayedDeleteRecords to server-common module #19226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 2, 2025
3 changes: 3 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@
<subpackage name="common">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.purgatory" />
<allow pkg="com.yammer.metrics" />
</subpackage>

<subpackage name="immutable">
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.DeleteRecordsPartitionStatus.DelayedDeleteRecords
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
Expand Down
134 changes: 0 additions & 134 deletions core/src/main/scala/kafka/server/DelayedDeleteRecords.scala

This file was deleted.

35 changes: 30 additions & 5 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,17 @@ import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPar
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.common.DeleteRecordsPartitionStatus.DelayedDeleteRecords
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common}
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.common.{DeleteRecordsPartitionStatus, DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => UnifiedLog, VerificationGuard}

import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.io.File
Expand Down Expand Up @@ -1273,7 +1273,7 @@ class ReplicaManager(val config: KafkaConfig,

val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
topicPartition ->
DeleteRecordsPartitionStatus(
new DeleteRecordsPartitionStatus(
result.requestedOffset, // requested offset
new DeleteRecordsPartitionResult()
.setLowWatermark(result.lowWatermark)
Expand All @@ -1282,8 +1282,33 @@ class ReplicaManager(val config: KafkaConfig,
}

if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
def onAcks(topicPartition: TopicPartition, status: DeleteRecordsPartitionStatus): Unit = {
val (lowWatermarkReached, error, lw) = getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
partition.leaderLogIfLocal match {
case Some(_) =>
val leaderLW = partition.lowWatermarkIfLeader
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
case None =>
(false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}

case HostedPartition.Offline(_) =>
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)

case HostedPartition.None =>
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
if (error != Errors.NONE || lowWatermarkReached) {
status.setAcksPending(false)
status.responseStatus.setErrorCode(error.code)
status.responseStatus.setLowWatermark(lw)
}
}
val responseCallbackJava: java.util.function.Consumer[util.Map[TopicPartition, DeleteRecordsPartitionResult]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need to create this object, right? we can pass response => responseCallback(response.asScala) directly

response => responseCallback(response.asScala)
// create delayed delete records operation
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
val delayedDeleteRecords = new DeleteRecordsPartitionStatus.DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava,onAcks , responseCallbackJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteRecordsStatus.asJava, onAcks, responseCallbackJava


// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.server.common.DeleteRecordsPartitionStatus.DelayedDeleteRecords
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, RequestLocal}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidat
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.DeleteRecordsPartitionStatus.DelayedDeleteRecords
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import org.apache.kafka.image._
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.server.common.DeleteRecordsPartitionStatus.DelayedDeleteRecords
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, StopPartition}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage._
Expand Down
Loading