Skip to content

Commit d038f44

Browse files
authored
MINOR: Small cleanups in ReplicaManager (#19322)
This is a small follow-up of #19290. The `actionQueue` argument is only used by the `CoordinatorPartitionWriter` so we can remove it from the other methods now. Reviewers: Jeff Kim <[email protected]>
1 parent 8b80626 commit d038f44

File tree

4 files changed

+1
-22
lines changed

4 files changed

+1
-22
lines changed

Diff for: core/src/main/scala/kafka/server/ReplicaManager.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,6 @@ class ReplicaManager(val config: KafkaConfig,
686686
* @param recordValidationStatsCallback callback for updating stats on record conversions
687687
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
688688
* thread calling this method
689-
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
690689
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
691690
*/
692691
def appendRecords(timeout: Long,
@@ -698,7 +697,6 @@ class ReplicaManager(val config: KafkaConfig,
698697
delayedProduceLock: Option[Lock] = None,
699698
recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
700699
requestLocal: RequestLocal = RequestLocal.noCaching,
701-
actionQueue: ActionQueue = this.defaultActionQueue,
702700
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
703701
if (!isValidRequiredAcks(requiredAcks)) {
704702
sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
@@ -711,7 +709,7 @@ class ReplicaManager(val config: KafkaConfig,
711709
origin,
712710
entriesPerPartition,
713711
requestLocal,
714-
actionQueue,
712+
defaultActionQueue,
715713
verificationGuards
716714
)
717715

@@ -744,7 +742,6 @@ class ReplicaManager(val config: KafkaConfig,
744742
* @param recordValidationStatsCallback callback for updating stats on record conversions
745743
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
746744
* thread calling this method
747-
* @param actionQueue the action queue to use. ReplicaManager#defaultActionQueue is used by default.
748745
* @param transactionSupportedOperation determines the supported Operation based on the client's Request api version
749746
*
750747
* The responseCallback is wrapped so that it is scheduled on a request handler thread. There, it should be called with
@@ -758,7 +755,6 @@ class ReplicaManager(val config: KafkaConfig,
758755
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
759756
recordValidationStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
760757
requestLocal: RequestLocal = RequestLocal.noCaching,
761-
actionQueue: ActionQueue = this.defaultActionQueue,
762758
transactionSupportedOperation: TransactionSupportedOperation): Unit = {
763759

764760
val transactionalProducerInfo = mutable.HashSet[(Long, Short)]()
@@ -825,7 +821,6 @@ class ReplicaManager(val config: KafkaConfig,
825821
responseCallback = newResponseCallback,
826822
recordValidationStatsCallback = recordValidationStatsCallback,
827823
requestLocal = newRequestLocal,
828-
actionQueue = actionQueue,
829824
verificationGuards = verificationGuards
830825
)
831826
}

Diff for: core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala

-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ import org.apache.kafka.common.protocol.Errors
3232
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, RecordValidationStats}
3333
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
3434
import org.apache.kafka.common.utils.{Time, Utils}
35-
import org.apache.kafka.server.ActionQueue
3635
import org.apache.kafka.server.common.RequestLocal
3736
import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, TopicPartitionOperationKey}
3837
import org.apache.kafka.server.util.timer.{MockTimer, Timer}
@@ -220,7 +219,6 @@ object AbstractCoordinatorConcurrencyTest {
220219
delayedProduceLock: Option[Lock] = None,
221220
processingStatsCallback: Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
222221
requestLocal: RequestLocal = RequestLocal.noCaching,
223-
actionQueue: ActionQueue = null,
224222
verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
225223

226224
if (entriesPerPartition.isEmpty)

Diff for: core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala

-6
Original file line numberDiff line numberDiff line change
@@ -739,7 +739,6 @@ class TransactionStateManagerTest {
739739
any[Option[ReentrantLock]],
740740
any(),
741741
any(),
742-
any(),
743742
any()
744743
)
745744

@@ -784,7 +783,6 @@ class TransactionStateManagerTest {
784783
any[Option[ReentrantLock]],
785784
any(),
786785
any(),
787-
any(),
788786
any()
789787
)
790788

@@ -826,7 +824,6 @@ class TransactionStateManagerTest {
826824
any[Option[ReentrantLock]],
827825
any(),
828826
any(),
829-
any(),
830827
any())
831828

832829
assertEquals(Set.empty, listExpirableTransactionalIds())
@@ -879,7 +876,6 @@ class TransactionStateManagerTest {
879876
any[Option[ReentrantLock]],
880877
any(),
881878
any(),
882-
any(),
883879
any()
884880
)
885881

@@ -1097,7 +1093,6 @@ class TransactionStateManagerTest {
10971093
any[Option[ReentrantLock]],
10981094
any(),
10991095
any(),
1100-
any(),
11011096
any()
11021097
)).thenAnswer(_ => callbackCapture.getValue.apply(
11031098
recordsCapture.getValue.map { case (topicPartition, records) =>
@@ -1251,7 +1246,6 @@ class TransactionStateManagerTest {
12511246
any[Option[ReentrantLock]],
12521247
any(),
12531248
any(),
1254-
any(),
12551249
any()
12561250
)).thenAnswer(_ => capturedArgument.getValue.apply(
12571251
Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) ->

Diff for: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

-8
Original file line numberDiff line numberDiff line change
@@ -2088,7 +2088,6 @@ class KafkaApisTest extends Logging {
20882088
responseCallback.capture(),
20892089
any(),
20902090
any(),
2091-
any(),
20922091
any()
20932092
)).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
20942093

@@ -2150,7 +2149,6 @@ class KafkaApisTest extends Logging {
21502149
responseCallback.capture(),
21512150
any(),
21522151
any(),
2153-
any(),
21542152
any())
21552153
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
21562154

@@ -2215,7 +2213,6 @@ class KafkaApisTest extends Logging {
22152213
responseCallback.capture(),
22162214
any(),
22172215
any(),
2218-
any(),
22192216
any())
22202217
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
22212218

@@ -2279,7 +2276,6 @@ class KafkaApisTest extends Logging {
22792276
responseCallback.capture(),
22802277
any(),
22812278
any(),
2282-
any(),
22832279
any())
22842280
).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER))))
22852281

@@ -2347,7 +2343,6 @@ class KafkaApisTest extends Logging {
23472343
any(),
23482344
any(),
23492345
any(),
2350-
any(),
23512346
any())
23522347
} finally {
23532348
kafkaApis.close()
@@ -2492,7 +2487,6 @@ class KafkaApisTest extends Logging {
24922487
any(),
24932488
any(),
24942489
ArgumentMatchers.eq(requestLocal),
2495-
any(),
24962490
any()
24972491
)).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE))))
24982492
kafkaApis = createKafkaApis()
@@ -2549,7 +2543,6 @@ class KafkaApisTest extends Logging {
25492543
any(),
25502544
any(),
25512545
ArgumentMatchers.eq(requestLocal),
2552-
any(),
25532546
any())
25542547
}
25552548

@@ -2626,7 +2619,6 @@ class KafkaApisTest extends Logging {
26262619
any(),
26272620
any(),
26282621
ArgumentMatchers.eq(RequestLocal.noCaching),
2629-
any(),
26302622
any()
26312623
)).thenAnswer { _ =>
26322624
responseCallback.getValue.apply(

0 commit comments

Comments
 (0)