Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Extract HeartbeatRequestState from heartbeat request managers #19043

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

cadonna
Copy link
Member

@cadonna cadonna commented Feb 26, 2025

The AbstractHeartbeatRequestManager and the
StreamsGroupHeartbeatRequestManager, both use the
HeartbeatRequestState to track the state of the heartbeat requests. Both heartbeat request managers have an implementation of HeartbeatRequestState as inner class.
To deduplicate code this commit extracts the HeartbeatRequestState so that the same code can be used by both heartbeat request manager.

Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.

If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.

If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.

@cadonna cadonna added streams consumer KIP-1071 PRs related to KIP-1071 labels Feb 26, 2025
@cadonna
Copy link
Member Author

cadonna commented Feb 27, 2025

Call for review: @aliehsaeedii

@cadonna cadonna force-pushed the extract_heartbeat_request_state branch from 06c6325 to 5452477 Compare February 27, 2025 09:37
@cadonna cadonna requested a review from Copilot February 27, 2025 11:39

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Overview

This PR deduplicates heartbeat request state management by extracting the HeartbeatRequestState from its duplicated inner class implementations into a dedicated file. Key changes include:

  • Extraction of HeartbeatRequestState into its own file.
  • Updates to heartbeat request managers and their tests to reference the new HeartbeatRequestState.
  • Removal of outdated inner class definitions from the AbstractHeartbeatRequestManager and StreamsGroupHeartbeatRequestManager.

Reviewed Changes

File Description
HeartbeatRequestState.java New file extracting heartbeat state logic.
HeartbeatRequestStateTest.java Unit tests covering the new HeartbeatRequestState behavior.
ShareHeartbeatRequestManagerTest.java Updated tests to use the extracted HeartbeatRequestState.
AbstractHeartbeatRequestManager.java Updated to invoke heartbeatIntervalMs() instead of using a field directly.
StreamsGroupHeartbeatRequestManager.java Removed obsolete inner class and updated heartbeatIntervalMs usage.
ConsumerHeartbeatRequestManager.java Updated import and type references to the new HeartbeatRequestState.
ConsumerHeartbeatRequestManagerTest.java Removed import of the now-obsolete inner class.

Copilot reviewed 8 out of 8 changed files in this pull request and generated no comments.

Comments suppressed due to low confidence (1)

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.java:181

  • The change to invoke heartbeatIntervalMs() improves clarity; please ensure that this getter method remains consistently used across all heartbeat managers.
return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat));
Copy link
Collaborator

@kirktrue kirktrue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @cadonna!

This looks like a pretty simple refactoring. Just the one question about the Javadoc comment in the new, top-level class.

Thanks!


/**
* Represents the state of a heartbeat request, including logic for timing, retries, and exponential backoff. The object extends
* {@link org.apache.kafka.clients.consumer.internals.RequestState} to enable exponential backoff and duplicated request handling. The two fields that it holds are:
Copy link
Collaborator

@kirktrue kirktrue Feb 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there supposed to be another line after "The two fields that it holds are:", or is it implying the reader looks at the class instance variables?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I did not notice that! Thanks!

I do not have an answer to your question. Also the old code had that. I will remove the last sentence.

The AbstractHeartbeatRequestManager and the
StreamsGroupHeartbeatRequestManager, both use the
HeartbeatRequestState to track the state of the heartbeat requests.
Both heartbeat request managers have an implementation of
HeartbeatRequestState as inner class.
To deduplicate code this commit extracts the HeartbeatRequestState
so that the same code can be used by both heartbeat request manager.
@cadonna cadonna force-pushed the extract_heartbeat_request_state branch from 1be5f2f to a3f1506 Compare February 28, 2025 10:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants