Skip to content

Commit 5d2bfb4

Browse files
authored
KAFKA-18980 OffsetMetadataManager#cleanupExpiredOffsets should record the number of records rather than topic partitions (#19207)
jira: https://issues.apache.org/jira/browse/KAFKA-18980 We should use the number of `records` rather than topic partitions,but this is not a bug as the number of `records` should be equal to the number of topic partitions. Also, we previously used `expiredPartitions` for logging, but now it is not being used anymore, so we can remove it. Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent d497250 commit 5d2bfb4

File tree

1 file changed

+4
-11
lines changed

1 file changed

+4
-11
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

+4-11
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,10 @@
5656

5757
import java.util.ArrayList;
5858
import java.util.HashMap;
59-
import java.util.HashSet;
6059
import java.util.List;
6160
import java.util.Map;
6261
import java.util.Optional;
6362
import java.util.OptionalLong;
64-
import java.util.Set;
6563
import java.util.concurrent.atomic.AtomicBoolean;
6664
import java.util.concurrent.atomic.AtomicInteger;
6765
import java.util.function.Consumer;
@@ -858,7 +856,6 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
858856

859857
// We expect the group to exist.
860858
Group group = groupMetadataManager.group(groupId);
861-
Set<String> expiredPartitions = new HashSet<>();
862859
long currentTimestampMs = time.milliseconds();
863860
Optional<OffsetExpirationCondition> offsetExpirationCondition = group.offsetExpirationCondition();
864861

@@ -875,7 +872,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
875872
// We don't expire the offset yet if there is a pending transactional offset for the partition.
876873
if (condition.isOffsetExpired(offsetAndMetadata, currentTimestampMs, config.offsetsRetentionMs()) &&
877874
!hasPendingTransactionalOffsets(groupId, topic, partition)) {
878-
expiredPartitions.add(appendOffsetCommitTombstone(groupId, topic, partition, records).toString());
875+
appendOffsetCommitTombstone(groupId, topic, partition, records);
879876
log.debug("[GroupId {}] Expired offset for partition={}-{}", groupId, topic, partition);
880877
} else {
881878
allOffsetsExpired.set(false);
@@ -885,7 +882,7 @@ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> rec
885882
allOffsetsExpired.set(false);
886883
}
887884
});
888-
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, expiredPartitions.size());
885+
metrics.record(OFFSET_EXPIRED_SENSOR_NAME, records.size());
889886

890887
// We don't want to remove the group if there are ongoing transactions.
891888
return allOffsetsExpired.get() && !openTransactionsByGroup.containsKey(groupId);
@@ -937,19 +934,15 @@ public List<CoordinatorRecord> onPartitionsDeleted(
937934
* @param topic The topic name.
938935
* @param partition The partition.
939936
* @param records The list of records to append the tombstone.
940-
*
941-
* @return The topic partition of the corresponding tombstone.
942937
*/
943-
private TopicPartition appendOffsetCommitTombstone(
938+
private void appendOffsetCommitTombstone(
944939
String groupId,
945940
String topic,
946941
int partition,
947942
List<CoordinatorRecord> records
948943
) {
949944
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition));
950-
TopicPartition tp = new TopicPartition(topic, partition);
951-
log.trace("[GroupId {}] Removing expired offset and metadata for {}", groupId, tp);
952-
return tp;
945+
log.trace("[GroupId {}] Removing expired offset and metadata for {}-{}", groupId, topic, partition);
953946
}
954947

955948
/**

0 commit comments

Comments
 (0)