Skip to content

KAFKA-19004:Move DelayedDeleteRecords to server-common module #19226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 2, 2025

Conversation

gongxuanzhang
Copy link
Collaborator

@gongxuanzhang gongxuanzhang commented Mar 18, 2025

move DeleteRecordsPartitionStatus.scala to server-common module.
scala --> java

Reviewers: Mickael Maison [email protected], Ken Huang [email protected], Chia-Ping Tsai [email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker labels Mar 18, 2025
Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gongxuanzhang for this PR, left some comments

} else {
status.setAcksPending(false);
}
});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remain this log trace("Initial partition status for %s is %s".format(topicPartition, status))

private static final Meter AGGREGATE_EXPIRATION_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests",
TimeUnit.SECONDS);

public static void recordExpiration(TopicPartition partition) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TopicPartition partition doesn't use, I think we can remove it.

}


public DeleteRecordsResponseData.DeleteRecordsPartitionResult getResponseStatus() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the get prefix is rarely used in Kafka, should change to responseStatus

return responseStatus;
}

public long getRequiredOffset() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto It seems that the get prefix is rarely used in Kafka, should change to requiredOffset

@gongxuanzhang
Copy link
Collaborator Author

@m1a2st thanks for your review, i have changed the code.
I think this pr is a migration. we can't delete this parameter

 public static void recordExpiration(TopicPartition partition) {
            AGGREGATE_EXPIRATION_METER.mark();
        }

@github-actions github-actions bot removed the triage PRs from the community label Mar 19, 2025
@gongxuanzhang
Copy link
Collaborator Author

@chia7712 PTAL

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongxuanzhang thanks for this patch!

import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class DeleteRecordsPartitionStatus {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main class should be DelayedDeleteRecords rather than DeleteRecordsPartitionStatus

// create delayed delete records operation
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
val delayedDeleteRecords = new DeleteRecordsPartitionStatus.DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava,onAcks , responseCallbackJava)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteRecordsStatus.asJava, onAcks, responseCallbackJava

status.responseStatus.setLowWatermark(lw)
}
}
val responseCallbackJava: java.util.function.Consumer[util.Map[TopicPartition, DeleteRecordsPartitionResult]] =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we don't need to create this object, right? we can pass response => responseCallback(response.asScala) directly

}
}

private static class DelayedDeleteRecordsMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this static class as all static variables can be moved to DelayedDeleteRecords

}

private static class DelayedDeleteRecordsMetrics {
private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup(DelayedDeleteRecordsMetrics.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this break the compatibility of metrics name, so please use "kafka.server.DelayedDeleteRecordsMetrics" instead

deleteRecordsStatus.forEach((k, status) -> {
responseStatus.put(k, status.responseStatus());
});
responseCallback.accept(responseStatus);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

responseCallback.accept(deleteRecordsStatus.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().responseStatus())));

private volatile boolean acksPending;

public DeleteRecordsPartitionStatus(long requiredOffset,
DeleteRecordsResponseData.DeleteRecordsPartitionResult responseStatus) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please import DeleteRecordsResponseData.DeleteRecordsPartitionResult?

@gongxuanzhang
Copy link
Collaborator Author

@chia7712 PTAL

*/
public class DelayedDeleteRecords extends DelayedOperation {

private static final Logger log = LoggerFactory.getLogger(DelayedDeleteRecords.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOG

Consumer<Map<TopicPartition, DeleteRecordsPartitionResult>> responseCallback) {
super(delayMs);
this.onAcksPending = onAcksPending;
this.deleteRecordsStatus = new ConcurrentHashMap<>(deleteRecordsStatus);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why to copy deleteRecordsStatus? it is immutable, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make it immutable and avoid copying. Thanks

@gongxuanzhang
Copy link
Collaborator Author

@chia7712 PTAL

Copy link
Collaborator

@m1a2st m1a2st left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this patch, left some comments

import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.ReplicaManager._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is unnecessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the explicit imports

Comment on lines 54 to 53
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TopicOptionalIdPartition}
import org.apache.kafka.server.common._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is unnecessary.

import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => UnifiedLog, VerificationGuard}

import org.apache.kafka.storage.internals.log._
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change is unnecessary.

Copy link
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. I left a few comments.

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.common;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the org.apache.kafka.server.purgatory package be a better place for these classes? So it's next to the abstract class DelayedOperation it's extending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to org.apache.kafka.server.purgatory

*/
public class DelayedDeleteRecords extends DelayedOperation {

private static final Logger LOG = LoggerFactory.getLogger(DelayedDeleteRecords.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep kafka.server.DelayedDeleteRecords as the logger name?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think changing the name will affect anything. Do we have a special need to keep the name

Copy link
Member

@mimaison mimaison Mar 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger names are what users set in their log4j2 configuration. But it seems we've not consistently kept names for other classes we moved, so this is probably fine.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your answer

import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName, createLogReadResult, isListOffsetsTimestampUnsupported}
import kafka.server.ReplicaManager._
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the explicit imports

@gongxuanzhang
Copy link
Collaborator Author

gongxuanzhang commented Mar 28, 2025

I changed this pr @mimaison @chia7712 PTAL

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gongxuanzhang thanks for updating patch.

// create delayed delete records operation
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus, this, responseCallback)
val delayedDeleteRecords = new DelayedDeleteRecords(timeout, deleteRecordsStatus.asJava, onAcks , response => responseCallback(response.asScala))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sJava, onAcks , response = -> sJava, onAcks, response =

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not addressed


@Override
public void onExpiration() {
deleteRecordsStatus.forEach((topicPartition, status) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AGGREGATE_EXPIRATION_METER.mark(deleteRecordsStatus.values().stream().filter(DeleteRecordsPartitionStatus::acksPending).count());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent!

status.acksPending = false
}

trace("Initial partition status for %s is %s".format(topicPartition, status))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please keep this log

@@ -97,6 +97,7 @@
<subpackage name="purgatory">
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="com.yammer.metrics" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be com.yammer.metrics.core?

@gongxuanzhang
Copy link
Collaborator Author

@chia7712 PTAL

@chia7712 chia7712 merged commit 2994e5e into apache:trunk Apr 2, 2025
23 checks passed
janchilling pushed a commit to janchilling/kafka that referenced this pull request Apr 4, 2025
@mimaison
Copy link
Member

mimaison commented Apr 7, 2025

I was looking at #19390 and wondering whether we should put these classes in server instead of server-common as they don't seem to be used elsewhere. For example we put DelayedRemoteListOffsets in the storage module. WDYT?

@chia7712
Copy link
Member

chia7712 commented Apr 7, 2025

I was looking at #19390 and wondering whether we should put these classes in server instead of server-common as they don't seem to be used elsewhere. For example we put DelayedRemoteListOffsets in the storage module. WDYT?

It seems RemoteLogManager could use those Purgatory-related code. I guess RemoteLogManager will be moved to storage module in the future? If so, those Purgatory code should be in server-common module

@mimaison
Copy link
Member

mimaison commented Apr 7, 2025

RemoteLogManager only uses DelayedRemoteListOffsets hence why we put that in the storage module. But as far as I can tell other purgatory code could be in server.

@chia7712
Copy link
Member

chia7712 commented Apr 7, 2025

RemoteLogManager only uses DelayedRemoteListOffsets hence why we put that in the storage module. But as far as I can tell other purgatory code could be in server.

RemoteLogManager also uses TopicPartitionOperationKey (and DelayedOperationKey), so I guess you meant other classes, such as DeleteRecordsPartitionStatus, DelayedDeleteRecords, and DelayedOperation, can be moved to server module, right?

@mimaison
Copy link
Member

mimaison commented Apr 7, 2025

Yes I meant DelayedDeleteRecords, DeleteRecordsPartitionStatus and future classes not needed in a different module, for example the ones in #19390

We need to keep DelayedOperation in server-common as we have implementations in other modules, for example DelayedRemoteListOffsets in storage.

@chia7712
Copy link
Member

chia7712 commented Apr 7, 2025

@gongxuanzhang WDYT? If you agree @mimaison's comment, could you please create a MINOR to address it?

@gongxuanzhang
Copy link
Collaborator Author

@mimaison @chia7712
Thanks for your comments
I will create a MINOR PR to address it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants