Skip to content

Commit 28de78b

Browse files
authored
MINOR: Refactor GroupCoordinator write path (#19290)
This patch addresses a weirdness on the GroupCoordinator write path. The `CoordinatorPartitionWriter` uses the `ReplicaManager#appendRecords` method with `acks=1` and it expects it to completes immediately/synchronously. It works because this is effectively what the method does with `acks=1`. The issue is that fundamentally the method is asynchronous so the contract is really fragile. This patch changes it by introducing new method `ReplicaManager.appendRecordsToLeader`, which is synchronous. It also refactors `ReplicaManager#appendRecords` to use `ReplicaManager.appendRecordsToLeader` so we can benefits from all the existing tests. Reviewers: Fred Zheng <[email protected]>, Jeff Kim <[email protected]>
1 parent 2267902 commit 28de78b

File tree

4 files changed

+150
-55
lines changed

4 files changed

+150
-55
lines changed

core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala

+3-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import kafka.server.{AddPartitionsToTxnManager, ReplicaManager}
2121
import org.apache.kafka.common.TopicPartition
2222
import org.apache.kafka.common.protocol.Errors
2323
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
24-
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
2524
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
2625
import org.apache.kafka.server.ActionQueue
2726
import org.apache.kafka.server.common.RequestLocal
@@ -139,17 +138,14 @@ class CoordinatorPartitionWriter(
139138
verificationGuard: VerificationGuard,
140139
records: MemoryRecords
141140
): Long = {
142-
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
143-
replicaManager.appendRecords(
144-
timeout = 0L,
141+
// We write synchronously to the leader replica without waiting on replication.
142+
val appendResults = replicaManager.appendRecordsToLeader(
145143
requiredAcks = 1,
146144
internalTopicsAllowed = true,
147145
origin = AppendOrigin.COORDINATOR,
148146
entriesPerPartition = Map(tp -> records),
149-
responseCallback = results => appendResults = results,
150147
requestLocal = RequestLocal.noCaching,
151148
verificationGuards = Map(tp -> verificationGuard),
152-
delayedProduceLock = None,
153149
// We can directly complete the purgatories here because we don't hold
154150
// any conflicting locks.
155151
actionQueue = directActionQueue
@@ -163,7 +159,7 @@ class CoordinatorPartitionWriter(
163159
}
164160

165161
// Required offset.
166-
partitionResult.lastOffset + 1
162+
partitionResult.info.lastOffset + 1
167163
}
168164

169165
override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {

core/src/main/scala/kafka/server/ReplicaManager.scala

+53-7
Original file line numberDiff line numberDiff line change
@@ -623,6 +623,50 @@ class ReplicaManager(val config: KafkaConfig,
623623

624624
def addToActionQueue(action: Runnable): Unit = defaultActionQueue.add(action)
625625

626+
/**
627+
* Append messages to leader replicas of the partition, without waiting on replication.
628+
*
629+
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecordsToLeader()
630+
* are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
631+
* locks.
632+
*
633+
* @param requiredAcks the required acks -- it is only used to ensure that the append meets the
634+
* required acks.
635+
* @param internalTopicsAllowed boolean indicating whether internal topics can be appended to
636+
* @param origin source of the append request (ie, client, replication, coordinator)
637+
* @param entriesPerPartition the records per partition to be appended
638+
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
639+
* thread calling this method
640+
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
641+
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
642+
*/
643+
def appendRecordsToLeader(
644+
requiredAcks: Short,
645+
internalTopicsAllowed: Boolean,
646+
origin: AppendOrigin,
647+
entriesPerPartition: Map[TopicPartition, MemoryRecords],
648+
requestLocal: RequestLocal = RequestLocal.noCaching,
649+
actionQueue: ActionQueue = this.defaultActionQueue,
650+
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty
651+
): Map[TopicPartition, LogAppendResult] = {
652+
val startTimeMs = time.milliseconds
653+
val localProduceResultsWithTopicId = appendToLocalLog(
654+
internalTopicsAllowed = internalTopicsAllowed,
655+
origin,
656+
entriesPerPartition,
657+
requiredAcks,
658+
requestLocal,
659+
verificationGuards.toMap
660+
)
661+
debug("Produce to local log in %d ms".format(time.milliseconds - startTimeMs))
662+
663+
addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
664+
665+
localProduceResultsWithTopicId.map {
666+
case (k, v) => (k.topicPartition, v)
667+
}
668+
}
669+
626670
/**
627671
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
628672
* the callback function will be triggered either when timeout or the required acks are satisfied;
@@ -661,16 +705,18 @@ class ReplicaManager(val config: KafkaConfig,
661705
return
662706
}
663707

664-
val sTime = time.milliseconds
665-
val localProduceResultsWithTopicId = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
666-
origin, entriesPerPartition, requiredAcks, requestLocal, verificationGuards.toMap)
667-
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
668-
val localProduceResults : Map[TopicPartition, LogAppendResult] = localProduceResultsWithTopicId.map {
669-
case(k, v) => (k.topicPartition, v)}
708+
val localProduceResults = appendRecordsToLeader(
709+
requiredAcks,
710+
internalTopicsAllowed,
711+
origin,
712+
entriesPerPartition,
713+
requestLocal,
714+
actionQueue,
715+
verificationGuards
716+
)
670717

671718
val produceStatus = buildProducePartitionStatus(localProduceResults)
672719

673-
addCompletePurgatoryAction(actionQueue, localProduceResultsWithTopicId)
674720
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
675721
k -> v.info.recordValidationStats
676722
})

core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala

+28-39
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,15 @@
1616
*/
1717
package kafka.coordinator.group
1818

19-
import kafka.server.ReplicaManager
19+
import kafka.server.{LogAppendResult, ReplicaManager}
2020
import org.apache.kafka.common.TopicPartition
2121
import org.apache.kafka.common.compress.Compression
2222
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
2323
import org.apache.kafka.common.message.DeleteRecordsResponseData.DeleteRecordsPartitionResult
2424
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
25-
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, SimpleRecord}
26-
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
25+
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordValidationStats, SimpleRecord}
2726
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
28-
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
27+
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, LogConfig, VerificationGuard}
2928
import org.apache.kafka.test.TestUtils.assertFutureThrows
3029
import org.junit.jupiter.api.Assertions.{assertEquals, assertNull, assertThrows, assertTrue}
3130
import org.junit.jupiter.api.Test
@@ -35,7 +34,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers}
3534
import org.mockito.Mockito.{mock, verify, when}
3635

3736
import java.nio.charset.Charset
38-
import java.util.Collections
37+
import java.util.Optional
3938
import scala.collection.Map
4039
import scala.jdk.CollectionConverters._
4140

@@ -94,34 +93,31 @@ class CoordinatorPartitionWriterTest {
9493

9594
val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
9695
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
97-
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
98-
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
9996

100-
when(replicaManager.appendRecords(
101-
ArgumentMatchers.eq(0L),
97+
when(replicaManager.appendRecordsToLeader(
10298
ArgumentMatchers.eq(1.toShort),
10399
ArgumentMatchers.eq(true),
104100
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
105101
recordsCapture.capture(),
106-
callbackCapture.capture(),
107-
ArgumentMatchers.any(),
108-
ArgumentMatchers.any(),
109102
ArgumentMatchers.any(),
110103
ArgumentMatchers.any(),
111104
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
112-
)).thenAnswer( _ => {
113-
callbackCapture.getValue.apply(Map(
114-
tp -> new PartitionResponse(
115-
Errors.NONE,
116-
5,
117-
10,
118-
RecordBatch.NO_TIMESTAMP,
119-
-1,
120-
Collections.emptyList(),
121-
""
122-
)
123-
))
124-
})
105+
)).thenReturn(Map(tp -> LogAppendResult(
106+
new LogAppendInfo(
107+
5L,
108+
10L,
109+
Optional.empty,
110+
RecordBatch.NO_TIMESTAMP,
111+
0L,
112+
0L,
113+
RecordValidationStats.EMPTY,
114+
CompressionType.NONE,
115+
100,
116+
10L
117+
),
118+
Option.empty,
119+
false
120+
)))
125121

126122
val batch = MemoryRecords.withRecords(
127123
Compression.NONE,
@@ -140,8 +136,7 @@ class CoordinatorPartitionWriterTest {
140136

141137
assertEquals(
142138
batch,
143-
recordsCapture.getValue.getOrElse(tp,
144-
throw new AssertionError(s"No records for $tp"))
139+
recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No records for $tp"))
145140
)
146141
}
147142

@@ -203,26 +198,20 @@ class CoordinatorPartitionWriterTest {
203198

204199
val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
205200
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
206-
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
207-
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
208201

209-
when(replicaManager.appendRecords(
210-
ArgumentMatchers.eq(0L),
202+
when(replicaManager.appendRecordsToLeader(
211203
ArgumentMatchers.eq(1.toShort),
212204
ArgumentMatchers.eq(true),
213205
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
214206
recordsCapture.capture(),
215-
callbackCapture.capture(),
216-
ArgumentMatchers.any(),
217-
ArgumentMatchers.any(),
218207
ArgumentMatchers.any(),
219208
ArgumentMatchers.any(),
220209
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
221-
)).thenAnswer(_ => {
222-
callbackCapture.getValue.apply(Map(
223-
tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER)
224-
))
225-
})
210+
)).thenReturn(Map(tp -> LogAppendResult(
211+
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
212+
Some(Errors.NOT_LEADER_OR_FOLLOWER.exception),
213+
false
214+
)))
226215

227216
val batch = MemoryRecords.withRecords(
228217
Compression.NONE,

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+66-2
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ import org.apache.kafka.image._
5858
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5959
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
6060
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
61-
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, StopPartition}
61+
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition}
6262
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerLogConfigs}
6363
import org.apache.kafka.server.log.remote.storage._
6464
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
@@ -72,7 +72,7 @@ import org.apache.kafka.server.util.timer.MockTimer
7272
import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler}
7373
import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints
7474
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
75-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
75+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
7676
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
7777
import org.junit.jupiter.api.Assertions._
7878
import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test}
@@ -6134,6 +6134,70 @@ class ReplicaManagerTest {
61346134

61356135
}
61366136

6137+
@Test
6138+
def testAppendRecordsToLeader(): Unit = {
6139+
val localId = 0
6140+
val foo = new TopicIdPartition(Uuid.randomUuid, 0, "foo")
6141+
val bar = new TopicIdPartition(Uuid.randomUuid, 0, "bar")
6142+
6143+
val replicaManager = setupReplicaManagerWithMockedPurgatories(
6144+
timer = new MockTimer(time),
6145+
brokerId = localId
6146+
)
6147+
6148+
try {
6149+
val topicDelta = new TopicsDelta(TopicsImage.EMPTY)
6150+
topicDelta.replay(new TopicRecord()
6151+
.setName(foo.topic)
6152+
.setTopicId(foo.topicId)
6153+
)
6154+
topicDelta.replay(new PartitionRecord()
6155+
.setTopicId(foo.topicId)
6156+
.setPartitionId(foo.partition)
6157+
.setLeader(localId)
6158+
.setLeaderEpoch(0)
6159+
.setPartitionEpoch(0)
6160+
.setReplicas(List[Integer](localId).asJava)
6161+
.setIsr(List[Integer](localId).asJava)
6162+
)
6163+
6164+
val metadataImage = imageFromTopics(topicDelta.apply())
6165+
replicaManager.applyDelta(topicDelta, metadataImage)
6166+
6167+
// Create test records.
6168+
val records = TestUtils.singletonRecords(
6169+
value = "test".getBytes,
6170+
timestamp = time.milliseconds
6171+
)
6172+
6173+
// Append records to both foo and bar.
6174+
val result = replicaManager.appendRecordsToLeader(
6175+
requiredAcks = 1,
6176+
internalTopicsAllowed = true,
6177+
origin = AppendOrigin.CLIENT,
6178+
entriesPerPartition = Map(
6179+
foo.topicPartition -> records,
6180+
bar.topicPartition -> records
6181+
),
6182+
requestLocal = RequestLocal.noCaching
6183+
)
6184+
6185+
assertNotNull(result)
6186+
assertEquals(2, result.size)
6187+
6188+
val fooResult = result(foo.topicPartition)
6189+
assertEquals(Errors.NONE, fooResult.error)
6190+
assertEquals(0, fooResult.info.logStartOffset)
6191+
assertEquals(0, fooResult.info.firstOffset)
6192+
assertEquals(0, fooResult.info.lastOffset)
6193+
6194+
val barResult = result(bar.topicPartition)
6195+
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
6196+
assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
6197+
} finally {
6198+
replicaManager.shutdown(checkpointHW = false)
6199+
}
6200+
}
61376201

61386202
@Test
61396203
def testMonitorableReplicaSelector(): Unit = {

0 commit comments

Comments
 (0)