Skip to content

Commit 023936f

Browse files
gongxuanzhangjanchilling
authored andcommitted
KAFKA-19004 Move DelayedDeleteRecords to server-common module (apache#19226)
Reviewers: Mickael Maison <[email protected]>, Ken Huang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 66b0419 commit 023936f

File tree

9 files changed

+205
-147
lines changed

9 files changed

+205
-147
lines changed

checkstyle/import-control-server-common.xml

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
<subpackage name="purgatory">
9898
<allow pkg="org.apache.kafka.server.metrics" />
9999
<allow pkg="org.apache.kafka.server.util" />
100+
<allow pkg="com.yammer.metrics.core" />
100101
</subpackage>
101102

102103
<subpackage name="share">

core/src/main/scala/kafka/cluster/Partition.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCac
4242
import org.apache.kafka.server.common.RequestLocal
4343
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4444
import org.apache.kafka.server.metrics.KafkaMetricsGroup
45-
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
45+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
4646
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
4747
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
4848
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints

core/src/main/scala/kafka/server/DelayedDeleteRecords.scala

-134
This file was deleted.

core/src/main/scala/kafka/server/ReplicaManager.scala

+30-8
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package kafka.server
1919
import com.yammer.metrics.core.Meter
2020
import kafka.cluster.{Partition, PartitionListener}
2121
import kafka.controller.StateChangeLogger
22-
import kafka.log.remote.RemoteLogManager
2322
import kafka.log.LogManager
23+
import kafka.log.remote.RemoteLogManager
2424
import kafka.server.HostedPartition.Online
2525
import kafka.server.QuotaFactory.QuotaManagers
2626
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
2727
import kafka.server.share.DelayedShareFetch
2828
import kafka.utils._
29+
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
2930
import org.apache.kafka.common.errors._
3031
import org.apache.kafka.common.internals.{Plugin, Topic}
3132
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
@@ -46,21 +47,19 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
4647
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
4748
import org.apache.kafka.common.requests._
4849
import org.apache.kafka.common.utils.{Exit, Time, Utils}
49-
import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid}
5050
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5151
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5252
import org.apache.kafka.metadata.MetadataCache
53-
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common}
5453
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
5554
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5655
import org.apache.kafka.server.network.BrokerEndPoint
57-
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
56+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DeleteRecordsPartitionStatus, TopicPartitionOperationKey}
5857
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
5958
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
6059
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
60+
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, ListOffsetsPartitionStatus, common}
6161
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
62-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => UnifiedLog, VerificationGuard}
63-
62+
import org.apache.kafka.storage.internals.log._
6463
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6564

6665
import java.io.File
@@ -1314,7 +1313,7 @@ class ReplicaManager(val config: KafkaConfig,
13141313

13151314
val deleteRecordsStatus = localDeleteRecordsResults.map { case (topicPartition, result) =>
13161315
topicPartition ->
1317-
DeleteRecordsPartitionStatus(
1316+
new DeleteRecordsPartitionStatus(
13181317
result.requestedOffset, // requested offset
13191318
new DeleteRecordsPartitionResult()
13201319
.setLowWatermark(result.lowWatermark)
@@ -1323,8 +1322,31 @@ class ReplicaManager(val config: KafkaConfig,
13231322
}
13241323

13251324
if (delayedDeleteRecordsRequired(localDeleteRecordsResults)) {
1325+
def onAcks(topicPartition: TopicPartition, status: DeleteRecordsPartitionStatus): Unit = {
1326+
val (lowWatermarkReached, error, lw) = getPartition(topicPartition) match {
1327+
case HostedPartition.Online(partition) =>
1328+
partition.leaderLogIfLocal match {
1329+
case Some(_) =>
1330+
val leaderLW = partition.lowWatermarkIfLeader
1331+
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
1332+
case None =>
1333+
(false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
1334+
}
1335+
1336+
case HostedPartition.Offline(_) =>
1337+
(false, Errors.KAFKA_STORAGE_ERROR, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
1338+
1339+
case HostedPartition.None =>
1340+
(false, Errors.UNKNOWN_TOPIC_OR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
1341+
}
1342+
if (error != Errors.NONE || lowWatermarkReached) {
1343+
status.setAcksPending(false)
1344+
status.responseStatus.setErrorCode(error.code)
1345+
status.responseStatus.setLowWatermark(lw)
1346+
}
1347+
}
13261348
// create delayed delete records operation
1327-
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
1349+
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava, onAcks, response => responseCallback(response.asScala))
13281350

13291351
// create a list of (topic, partition) pairs to use as keys for this delayed delete records operation
13301352
val deleteRecordsRequestKeys = offsetPerPartition.keys.map(new TopicPartitionOperationKey(_)).toList

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
5555
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
5656
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, RequestLocal}
5757
import org.apache.kafka.server.metrics.KafkaYammerMetrics
58-
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
58+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
5959
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
6060
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
6161
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}

core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,15 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
2525
import kafka.cluster.Partition
2626
import kafka.log.LogManager
2727
import kafka.server.QuotaFactory.QuotaManagers
28-
import kafka.server.{KafkaConfig, _}
28+
import kafka.server._
2929
import kafka.utils._
3030
import org.apache.kafka.common.TopicPartition
3131
import org.apache.kafka.common.protocol.Errors
3232
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
3333
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
3434
import org.apache.kafka.common.utils.{Time, Utils}
3535
import org.apache.kafka.server.common.RequestLocal
36-
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
36+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
3737
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
3838
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
3939
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, UnifiedLog, VerificationGuard}

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerL
6363
import org.apache.kafka.server.log.remote.storage._
6464
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
6565
import org.apache.kafka.server.network.BrokerEndPoint
66-
import org.apache.kafka.server.purgatory.DelayedOperationPurgatory
66+
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory}
6767
import org.apache.kafka.server.share.SharePartitionKey
6868
import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch}
6969
import org.apache.kafka.server.share.metrics.ShareGroupMetrics
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.purgatory;
18+
19+
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult;
21+
import org.apache.kafka.common.protocol.Errors;
22+
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
23+
24+
import com.yammer.metrics.core.Meter;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.util.Collections;
30+
import java.util.Map;
31+
import java.util.concurrent.TimeUnit;
32+
import java.util.function.BiConsumer;
33+
import java.util.function.Consumer;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* A delayed delete records operation that can be created by the replica manager and watched
38+
* in the delete records operation purgatory
39+
*/
40+
public class DelayedDeleteRecords extends DelayedOperation {
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(DelayedDeleteRecords.class);
43+
44+
// migration from kafka.server.DelayedDeleteRecordsMetrics
45+
private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup("kafka.server", "DelayedDeleteRecordsMetrics");
46+
private static final Meter AGGREGATE_EXPIRATION_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests",
47+
TimeUnit.SECONDS);
48+
49+
private final Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus;
50+
private final BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending;
51+
private final Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback;
52+
53+
public DelayedDeleteRecords(long delayMs,
54+
Map<TopicPartition, DeleteRecordsPartitionStatus> deleteRecordsStatus,
55+
// To maintain compatibility with dependency packages, the logic has been moved to the caller.
56+
BiConsumer<TopicPartition, DeleteRecordsPartitionStatus> onAcksPending,
57+
Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) {
58+
super(delayMs);
59+
this.onAcksPending = onAcksPending;
60+
this.deleteRecordsStatus = Collections.unmodifiableMap(deleteRecordsStatus);
61+
this.responseCallback = responseCallback;
62+
// first update the acks pending variable according to the error code
63+
deleteRecordsStatus.forEach((topicPartition, status) -> {
64+
if (status.responseStatus().errorCode() == Errors.NONE.code()) {
65+
// Timeout error state will be cleared when required acks are received
66+
status.setAcksPending(true);
67+
status.responseStatus().setErrorCode(Errors.REQUEST_TIMED_OUT.code());
68+
} else {
69+
status.setAcksPending(false);
70+
}
71+
72+
LOG.trace("Initial partition status for {} is {}", topicPartition, status);
73+
});
74+
}
75+
76+
/**
77+
* The delayed delete records operation can be completed if every partition specified in the request satisfied one of the following:
78+
*
79+
* 1) There was an error while checking if all replicas have caught up to the deleteRecordsOffset: set an error in response
80+
* 2) The low watermark of the partition has caught up to the deleteRecordsOffset. set the low watermark in response
81+
*
82+
*/
83+
@Override
84+
public boolean tryComplete() {
85+
// check for each partition if it still has pending acks
86+
deleteRecordsStatus.forEach((topicPartition, status) -> {
87+
LOG.trace("Checking delete records satisfaction for {}, current status {}", topicPartition, status);
88+
// skip those partitions that have already been satisfied
89+
if (status.acksPending()) {
90+
onAcksPending.accept(topicPartition, status);
91+
}
92+
});
93+
// check if every partition has satisfied at least one of case A or B
94+
return deleteRecordsStatus.values().stream().noneMatch(DeleteRecordsPartitionStatus::acksPending) && forceComplete();
95+
}
96+
97+
@Override
98+
public void onExpiration() {
99+
AGGREGATE_EXPIRATION_METER.mark(deleteRecordsStatus.values().stream().filter(DeleteRecordsPartitionStatus::acksPending).count());
100+
}
101+
102+
/**
103+
* Upon completion, return the current response status along with the error code per partition
104+
*/
105+
@Override
106+
public void onComplete() {
107+
responseCallback.accept(deleteRecordsStatus.entrySet().stream()
108+
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().responseStatus())));
109+
}
110+
}

0 commit comments

Comments
 (0)