Skip to content

MINOR: Refactor GroupCoordinator write path #19290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kafka.server.{AddPartitionsToTxnManager, ReplicaManager}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.server.ActionQueue
import org.apache.kafka.server.common.RequestLocal
Expand Down Expand Up @@ -139,17 +138,14 @@ class CoordinatorPartitionWriter(
verificationGuard: VerificationGuard,
records: MemoryRecords
): Long = {
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
replicaManager.appendRecords(
timeout = 0L,
// We write synchronously to the leader replica without waiting on replication.
val appendResults = replicaManager.appendRecordsToLeader(
requiredAcks = 1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> records),
responseCallback = results => appendResults = results,
requestLocal = RequestLocal.noCaching,
verificationGuards = Map(tp -> verificationGuard),
delayedProduceLock = None,
// We can directly complete the purgatories here because we don't hold
// any conflicting locks.
actionQueue = directActionQueue
Expand All @@ -163,7 +159,7 @@ class CoordinatorPartitionWriter(
}

// Required offset.
partitionResult.lastOffset + 1
partitionResult.info.lastOffset + 1
}

override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {
Expand Down
60 changes: 53 additions & 7 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,50 @@ class ReplicaManager(val config: KafkaConfig,

def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)

/**
* Append messages to leader replicas of the partition, without waiting on replication.
*
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecordsToLeader()
* are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
* locks.
*
* @param requiredAcks the required acks -- it is only used to ensure that the append meets the
* required acks.
* @param internalTopicsAllowed boolean indicating whether internal topics can be appended to
* @param origin source of the append request (ie, client, replication, coordinator)
* @param entriesPerPartition the records per partition to be appended
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
*/
def appendRecordsToLeader(
requiredAcks: Short,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requestLocal: RequestLocal = RequestLocal.noCaching,
actionQueue: ActionQueue = this.defaultActionQueue,
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty
): Map[TopicPartition, LogAppendResult] = {
val startTimeMs = time.milliseconds
val localProduceResultsWithTopicId = appendToLocalLog(
internalTopicsAllowed = internalTopicsAllowed,
origin,
entriesPerPartition,
requiredAcks,
requestLocal,
verificationGuards.toMap
)
debug("Produce to local log in %d ms".format(time.milliseconds - startTimeMs))

addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)

localProduceResultsWithTopicId.map {
case (k, v) => (k.topicPartition, v)
}
}

/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
Expand Down Expand Up @@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
return
}

val sTime = time.milliseconds
val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map {
case(k, v) => (k.topicPartition, v)}
val localProduceResults = appendRecordsToLeader(
requiredAcks,
internalTopicsAllowed,
origin,
entriesPerPartition,
requestLocal,
actionQueue,
verificationGuards
)

val produceStatus = buildProducePartitionStatus(localProduceResults)

addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
k -> v.info.recordValidationStats
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
*/
package kafka.coordinator.group

import kafka.server.ReplicaManager
import kafka.server.{LogAppendResult, ReplicaManager}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, LogConfig, VerificationGuard}
import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
Expand All @@ -35,7 +34,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}

import java.nio.charset.Charset
import java.util.Collections
import java.util.Optional
import scala.collection.Map
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -94,34 +93,31 @@ class CoordinatorPartitionWriterTest {

val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])

when(replicaManager.appendRecords(
ArgumentMatchers.eq(0L),
when(replicaManager.appendRecordsToLeader(
ArgumentMatchers.eq(1.toShort),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
)).thenAnswer( _ => {
callbackCapture.getValue.apply(Map(
tp -> new PartitionResponse(
Errors.NONE,
5,
10,
RecordBatch.NO_TIMESTAMP,
-1,
Collections.emptyList(),
""
)
))
})
)).thenReturn(Map(tp -> LogAppendResult(
new LogAppendInfo(
5L,
10L,
Optional.empty,
RecordBatch.NO_TIMESTAMP,
0L,
0L,
RecordValidationStats.EMPTY,
CompressionType.NONE,
100,
10L
),
Option.empty,
false
)))

val batch = MemoryRecords.withRecords(
Compression.NONE,
Expand All @@ -140,8 +136,7 @@ class CoordinatorPartitionWriterTest {

assertEquals(
batch,
recordsCapture.getValue.getOrElse(tp,
throw new AssertionError(s"No records for $tp"))
recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No records for $tp"))
)
}

Expand Down Expand Up @@ -203,26 +198,20 @@ class CoordinatorPartitionWriterTest {

val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])

when(replicaManager.appendRecords(
ArgumentMatchers.eq(0L),
when(replicaManager.appendRecordsToLeader(
ArgumentMatchers.eq(1.toShort),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
)).thenAnswer(_ => {
callbackCapture.getValue.apply(Map(
tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)
))
})
)).thenReturn(Map(tp -> LogAppendResult(
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
false
)))

val batch = MemoryRecords.withRecords(
Compression.NONE,
Expand Down