Skip to content

Commit b18f00b

Browse files
KAFKA-19121 Move AddPartitionsToTxnConfig and TransactionStateManagerConfig out of KafkaConfig (#19439)
Both AddPartitionsToTxnConfig and TransactionStateManagerConfig are static configs and they don't have specific config check. We can move them out of KafkaConfig to simplify KafkaConfig. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 8827ce4 commit b18f00b

File tree

5 files changed

+15
-19
lines changed

5 files changed

+15
-19
lines changed

core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala

+7-6
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.kafka.common.protocol.Errors
2727
import org.apache.kafka.common.record.RecordBatch
2828
import org.apache.kafka.common.requests.{AddPartitionsToTxnResponse, TransactionResult}
2929
import org.apache.kafka.common.utils.{LogContext, ProducerIdAndEpoch, Time}
30-
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionLogConfig}
30+
import org.apache.kafka.coordinator.transaction.{ProducerIdManager, TransactionLogConfig, TransactionStateManagerConfig}
3131
import org.apache.kafka.metadata.MetadataCache
3232
import org.apache.kafka.server.common.{RequestLocal, TransactionVersion}
3333
import org.apache.kafka.server.util.Scheduler
@@ -47,16 +47,17 @@ object TransactionCoordinator {
4747
time: Time): TransactionCoordinator = {
4848

4949
val transactionLogConfig = new TransactionLogConfig(config)
50-
val txnConfig = TransactionConfig(config.transactionStateManagerConfig.transactionalIdExpirationMs,
51-
config.transactionStateManagerConfig.transactionMaxTimeoutMs,
50+
val transactionStateManagerConfig = new TransactionStateManagerConfig(config)
51+
val txnConfig = TransactionConfig(transactionStateManagerConfig.transactionalIdExpirationMs,
52+
transactionStateManagerConfig.transactionMaxTimeoutMs,
5253
transactionLogConfig.transactionTopicPartitions,
5354
transactionLogConfig.transactionTopicReplicationFactor,
5455
transactionLogConfig.transactionTopicSegmentBytes,
5556
transactionLogConfig.transactionLoadBufferSize,
5657
transactionLogConfig.transactionTopicMinISR,
57-
config.transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
58-
config.transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
59-
config.transactionStateManagerConfig.transaction2PCEnabled,
58+
transactionStateManagerConfig.transactionAbortTimedOutTransactionCleanupIntervalMs,
59+
transactionStateManagerConfig.transactionRemoveExpiredTransactionalIdCleanupIntervalMs,
60+
transactionStateManagerConfig.transaction2PCEnabled,
6061
config.requestTimeoutMs)
6162

6263
val txnStateManager = new TransactionStateManager(config.brokerId, scheduler, replicaManager, metadataCache, txnConfig,

core/src/main/scala/kafka/log/LogManager.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import kafka.utils.{CoreUtils, Logging, Pool}
2929
import org.apache.kafka.common.{DirectoryId, KafkaException, TopicPartition, Uuid}
3030
import org.apache.kafka.common.utils.{Exit, KafkaThread, Time, Utils}
3131
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}
32-
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
32+
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, TransactionStateManagerConfig}
3333

3434
import scala.jdk.CollectionConverters._
3535
import scala.collection._
@@ -1565,7 +1565,7 @@ object LogManager {
15651565
flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
15661566
flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
15671567
retentionCheckMs = config.logCleanupIntervalMs,
1568-
maxTransactionTimeoutMs = config.transactionStateManagerConfig.transactionMaxTimeoutMs,
1568+
maxTransactionTimeoutMs = new TransactionStateManagerConfig(config).transactionMaxTimeoutMs,
15691569
producerStateManagerConfig = new ProducerStateManagerConfig(transactionLogConfig.producerIdExpirationMs, transactionLogConfig.transactionPartitionVerificationEnable),
15701570
producerIdExpirationCheckIntervalMs = transactionLogConfig.producerIdExpirationCheckIntervalMs,
15711571
scheduler = kafkaScheduler,

core/src/main/scala/kafka/server/KafkaConfig.scala

-6
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import org.apache.kafka.coordinator.group.Group.GroupType
3737
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
3838
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
3939
import org.apache.kafka.coordinator.share.ShareCoordinatorConfig
40-
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionStateManagerConfig}
4140
import org.apache.kafka.network.SocketServerConfigs
4241
import org.apache.kafka.raft.QuorumConfig
4342
import org.apache.kafka.security.authorizer.AuthorizerUtils
@@ -204,11 +203,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
204203
private val _shareCoordinatorConfig = new ShareCoordinatorConfig(this)
205204
def shareCoordinatorConfig: ShareCoordinatorConfig = _shareCoordinatorConfig
206205

207-
private val _transactionStateManagerConfig = new TransactionStateManagerConfig(this)
208-
private val _addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(this)
209-
def transactionStateManagerConfig: TransactionStateManagerConfig = _transactionStateManagerConfig
210-
def addPartitionsToTxnConfig: AddPartitionsToTxnConfig = _addPartitionsToTxnConfig
211-
212206
private val _quotaConfig = new QuotaConfig(this)
213207
def quotaConfig: QuotaConfig = _quotaConfig
214208

core/src/main/scala/kafka/server/ReplicaManager.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
4747
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
4848
import org.apache.kafka.common.requests._
4949
import org.apache.kafka.common.utils.{Exit, Time, Utils}
50-
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
50+
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
5151
import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5252
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5353
import org.apache.kafka.metadata.MetadataCache
@@ -287,6 +287,7 @@ class ReplicaManager(val config: KafkaConfig,
287287
val defaultActionQueue: ActionQueue = new DelayedActionQueue
288288
) extends Logging {
289289
private val metricsGroup = new KafkaMetricsGroup(this.getClass)
290+
private val addPartitionsToTxnConfig = new AddPartitionsToTxnConfig(config)
290291

291292
val delayedProducePurgatory = delayedProducePurgatoryParam.getOrElse(
292293
new DelayedOperationPurgatory[DelayedProduce](
@@ -842,8 +843,8 @@ class ReplicaManager(val config: KafkaConfig,
842843
requestLocal
843844
)
844845

845-
val retryTimeoutMs = Math.min(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs)
846-
val addPartitionsRetryBackoffMs = config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
846+
val retryTimeoutMs = Math.min(addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMaxMs(), config.requestTimeoutMs)
847+
val addPartitionsRetryBackoffMs = addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs()
847848
val startVerificationTimeMs = time.milliseconds
848849
def maybeRetryOnConcurrentTransactions(results: (Map[TopicPartition, Errors], Map[TopicPartition, VerificationGuard])): Unit = {
849850
if (time.milliseconds() - startVerificationTimeMs >= retryTimeoutMs) {

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
5353
import org.apache.kafka.common.requests._
5454
import org.apache.kafka.common.security.auth.KafkaPrincipal
5555
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
56-
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
56+
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
5757
import org.apache.kafka.image._
5858
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5959
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
@@ -2309,7 +2309,7 @@ class ReplicaManagerTest {
23092309
assertFalse(result.hasFired)
23102310
assertEquals(verificationGuard, getVerificationGuard(replicaManager, tp0, producerId))
23112311

2312-
time.sleep(config.addPartitionsToTxnConfig.addPartitionsToTxnRetryBackoffMs + 1)
2312+
time.sleep(new AddPartitionsToTxnConfig(config).addPartitionsToTxnRetryBackoffMs + 1)
23132313
scheduler.tick()
23142314

23152315
verify(addPartitionsToTxnManager, times(2)).addOrVerifyTransaction(

0 commit comments

Comments
 (0)