-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-19004:Move DelayedDeleteRecords to server-common module #19226
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
5a55640
e4e2341
7cbd93e
e236f2a
87bc32c
b572a13
5e0a336
d546401
4529c33
0714951
ea265cd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,14 @@ package kafka.server | |
import com.yammer.metrics.core.Meter | ||
import kafka.cluster.{Partition, PartitionListener} | ||
import kafka.controller.StateChangeLogger | ||
import kafka.log.remote.RemoteLogManager | ||
import kafka.log.LogManager | ||
import kafka.log.remote.RemoteLogManager | ||
import kafka.server.HostedPartition.Online | ||
import kafka.server.QuotaFactory.QuotaManagers | ||
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported} | ||
import kafka.server.share.DelayedShareFetch | ||
import kafka.utils._ | ||
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} | ||
import org.apache.kafka.common.errors._ | ||
import org.apache.kafka.common.internals.{Plugin, Topic} | ||
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult | ||
|
@@ -46,21 +47,19 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData | |
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse | ||
import org.apache.kafka.common.requests._ | ||
import org.apache.kafka.common.utils.{Exit, Time, Utils} | ||
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} | ||
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta} | ||
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER | ||
import org.apache.kafka.metadata.MetadataCache | ||
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common} | ||
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition} | ||
import org.apache.kafka.server.metrics.KafkaMetricsGroup | ||
import org.apache.kafka.server.network.BrokerEndPoint | ||
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey} | ||
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DeleteRecordsPartitionStatus, TopicPartitionOperationKey} | ||
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey} | ||
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} | ||
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} | ||
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common} | ||
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} | ||
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => UnifiedLog, VerificationGuard} | ||
|
||
import org.apache.kafka.storage.internals.log._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this change is unnecessary. |
||
import org.apache.kafka.storage.log.metrics.BrokerTopicStats | ||
|
||
import java.io.File | ||
|
@@ -1273,7 +1272,7 @@ class ReplicaManager(val config: KafkaConfig, | |
|
||
val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) => | ||
topicPartition -> | ||
DeleteRecordsPartitionStatus( | ||
new DeleteRecordsPartitionStatus( | ||
result.requestedOffset, // requested offset | ||
new DeleteRecordsPartitionResult() | ||
.setLowWatermark(result.lowWatermark) | ||
|
@@ -1282,8 +1281,31 @@ class ReplicaManager(val config: KafkaConfig, | |
} | ||
|
||
if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) { | ||
def onAcks(topicPartition: TopicPartition, status: DeleteRecordsPartitionStatus): Unit = { | ||
val (lowWatermarkReached, error, lw) = getPartition(topicPartition) match { | ||
case HostedPartition.Online(partition) => | ||
partition.leaderLogIfLocal match { | ||
case Some(_) => | ||
val leaderLW = partition.lowWatermarkIfLeader | ||
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW) | ||
case None => | ||
(false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK) | ||
} | ||
|
||
case HostedPartition.Offline(_) => | ||
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK) | ||
|
||
case HostedPartition.None => | ||
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK) | ||
} | ||
if (error != Errors.NONE || lowWatermarkReached) { | ||
status.setAcksPending(false) | ||
status.responseStatus.setErrorCode(error.code) | ||
status.responseStatus.setLowWatermark(lw) | ||
} | ||
} | ||
// create delayed delete records operation | ||
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback) | ||
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava, onAcks , response => responseCallback(response.asScala)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not addressed |
||
|
||
// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation | ||
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.kafka.server.purgatory; | ||
|
||
import org.apache.kafka.common.TopicPartition; | ||
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult; | ||
import org.apache.kafka.common.protocol.Errors; | ||
import org.apache.kafka.server.metrics.KafkaMetricsGroup; | ||
|
||
import com.yammer.metrics.core.Meter; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collections; | ||
import java.util.Map; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.Consumer; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* A delayed delete records operation that can be created by the replica manager and watched | ||
* in the delete records operation purgatory | ||
*/ | ||
public class DelayedDeleteRecords extends DelayedOperation { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(DelayedDeleteRecords.class); | ||
|
||
// migration from kafka.server.DelayedDeleteRecordsMetrics | ||
private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup("kafka.server", "DelayedDeleteRecordsMetrics"); | ||
private static final Meter AGGREGATE_EXPIRATION_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests", | ||
TimeUnit.SECONDS); | ||
|
||
private final Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus; | ||
private final BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending; | ||
private final Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback; | ||
|
||
public DelayedDeleteRecords(long delayMs, | ||
Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus, | ||
// To maintain compatibility with dependency packages, the logic has been moved to the caller. | ||
BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending, | ||
Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) { | ||
super(delayMs); | ||
this.onAcksPending = onAcksPending; | ||
this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus); | ||
this.responseCallback = responseCallback; | ||
// first update the acks pending variable according to the error code | ||
deleteRecordsStatus.forEach((topicPartition, status) -> { | ||
if (status.responseStatus().errorCode() == Errors.NONE.code()) { | ||
// Timeout error state will be cleared when required acks are received | ||
status.setAcksPending(true); | ||
status.responseStatus().setErrorCode(Errors.REQUEST_TIMED_OUT.code()); | ||
} else { | ||
status.setAcksPending(false); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following: | ||
* | ||
* 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response | ||
* 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response | ||
* | ||
*/ | ||
@Override | ||
public boolean tryComplete() { | ||
// check for each partition if it still has pending acks | ||
deleteRecordsStatus.forEach((topicPartition, status) -> { | ||
LOG.trace("Checking delete records satisfaction for {}, current status {}", topicPartition, status); | ||
// skip those partitions that have already been satisfied | ||
if (status.acksPending()) { | ||
onAcksPending.accept(topicPartition, status); | ||
} | ||
}); | ||
// check if every partition has satisfied at least one of case A or B | ||
return deleteRecordsStatus.values().stream().noneMatch(DeleteRecordsPartitionStatus::acksPending) && forceComplete(); | ||
} | ||
|
||
@Override | ||
public void onExpiration() { | ||
deleteRecordsStatus.forEach((topicPartition, status) -> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AGGREGATE_EXPIRATION_METER.mark(deleteRecordsStatus.values().stream().filter(DeleteRecordsPartitionStatus::acksPending).count()); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @gongxuanzhang WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. excellent! |
||
if (status.acksPending()) { | ||
AGGREGATE_EXPIRATION_METER.mark(); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Upon completion, return the current response status along with the error code per partition | ||
*/ | ||
@Override | ||
public void onComplete() { | ||
responseCallback.accept(deleteRecordsStatus.entrySet().stream() | ||
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().responseStatus()))); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be
com.yammer.metrics.core
?