Skip to content

Commit 90e7b53

Browse files
authored
MINOR: Remove unused ApiVersions variable from Sender and RecordAccumulator (#19399)
Remove unused `ApiVersions` variable from Sender and RecordAccumulator. Reviewers: PoAn Yang <[email protected]>, Ken Huang <[email protected]>, Parker Chang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent b3ba7bc commit 90e7b53

File tree

6 files changed

+43
-63
lines changed

6 files changed

+43
-63
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,6 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
435435
metrics,
436436
PRODUCER_METRIC_GROUP_NAME,
437437
time,
438-
apiVersions,
439438
transactionManager,
440439
new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME));
441440

@@ -538,8 +537,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadat
538537
time,
539538
requestTimeoutMs,
540539
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
541-
this.transactionManager,
542-
apiVersions);
540+
this.transactionManager);
543541
}
544542

545543
private static Compression configureCompression(ProducerConfig config) {

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java

-8
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.clients.producer.internals;
1818

19-
import org.apache.kafka.clients.ApiVersions;
2019
import org.apache.kafka.clients.CommonClientConfigs;
2120
import org.apache.kafka.clients.MetadataSnapshot;
2221
import org.apache.kafka.clients.producer.Callback;
@@ -82,7 +81,6 @@ public class RecordAccumulator {
8281
private final boolean enableAdaptivePartitioning;
8382
private final BufferPool free;
8483
private final Time time;
85-
private final ApiVersions apiVersions;
8684
private final ConcurrentMap<String /*topic*/, TopicInfo> topicInfoMap = new CopyOnWriteMap<>();
8785
private final ConcurrentMap<Integer /*nodeId*/, NodeLatencyStats> nodeStats = new CopyOnWriteMap<>();
8886
private final IncompleteBatches incomplete;
@@ -109,7 +107,6 @@ public class RecordAccumulator {
109107
* @param metrics The metrics
110108
* @param metricGrpName The metric group name
111109
* @param time The time instance to use
112-
* @param apiVersions Request API versions for current connected brokers
113110
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
114111
* numbers per partition.
115112
* @param bufferPool The buffer pool
@@ -125,7 +122,6 @@ public RecordAccumulator(LogContext logContext,
125122
Metrics metrics,
126123
String metricGrpName,
127124
Time time,
128-
ApiVersions apiVersions,
129125
TransactionManager transactionManager,
130126
BufferPool bufferPool) {
131127
this.logContext = logContext;
@@ -147,7 +143,6 @@ public RecordAccumulator(LogContext logContext,
147143
this.incomplete = new IncompleteBatches();
148144
this.muted = new HashSet<>();
149145
this.time = time;
150-
this.apiVersions = apiVersions;
151146
nodesDrainIndex = new HashMap<>();
152147
this.transactionManager = transactionManager;
153148
registerMetrics(metrics, metricGrpName);
@@ -169,7 +164,6 @@ public RecordAccumulator(LogContext logContext,
169164
* @param metrics The metrics
170165
* @param metricGrpName The metric group name
171166
* @param time The time instance to use
172-
* @param apiVersions Request API versions for current connected brokers
173167
* @param transactionManager The shared transaction state object which tracks producer IDs, epochs, and sequence
174168
* numbers per partition.
175169
* @param bufferPool The buffer pool
@@ -184,7 +178,6 @@ public RecordAccumulator(LogContext logContext,
184178
Metrics metrics,
185179
String metricGrpName,
186180
Time time,
187-
ApiVersions apiVersions,
188181
TransactionManager transactionManager,
189182
BufferPool bufferPool) {
190183
this(logContext,
@@ -198,7 +191,6 @@ public RecordAccumulator(LogContext logContext,
198191
metrics,
199192
metricGrpName,
200193
time,
201-
apiVersions,
202194
transactionManager,
203195
bufferPool);
204196
}

Diff for: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.kafka.clients.producer.internals;
1818

19-
import org.apache.kafka.clients.ApiVersions;
2019
import org.apache.kafka.clients.ClientRequest;
2120
import org.apache.kafka.clients.ClientResponse;
2221
import org.apache.kafka.clients.KafkaClient;
@@ -118,9 +117,6 @@ public class Sender implements Runnable {
118117
/* The max time to wait before retrying a request which has failed */
119118
private final long retryBackoffMs;
120119

121-
/* current request API versions supported by the known brokers */
122-
private final ApiVersions apiVersions;
123-
124120
/* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
125121
private final TransactionManager transactionManager;
126122

@@ -139,8 +135,7 @@ public Sender(LogContext logContext,
139135
Time time,
140136
int requestTimeoutMs,
141137
long retryBackoffMs,
142-
TransactionManager transactionManager,
143-
ApiVersions apiVersions) {
138+
TransactionManager transactionManager) {
144139
this.log = logContext.logger(Sender.class);
145140
this.client = client;
146141
this.accumulator = accumulator;
@@ -154,7 +149,6 @@ public Sender(LogContext logContext,
154149
this.sensors = new SenderMetrics(metricsRegistry, metadata, client, time);
155150
this.requestTimeoutMs = requestTimeoutMs;
156151
this.retryBackoffMs = retryBackoffMs;
157-
this.apiVersions = apiVersions;
158152
this.transactionManager = transactionManager;
159153
this.inFlightBatches = new HashMap<>();
160154
}

Diff for: clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public void testRetryBackoff() throws Exception {
460460

461461
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
462462
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
463-
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
463+
deliveryTimeoutMs, metrics, metricGrpName, time, null,
464464
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
465465

466466
long now = time.milliseconds();
@@ -525,7 +525,7 @@ public void testExponentialRetryBackoff() throws Exception {
525525

526526
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
527527
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
528-
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
528+
deliveryTimeoutMs, metrics, metricGrpName, time, null,
529529
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
530530

531531
long now = time.milliseconds();
@@ -586,7 +586,7 @@ public void testExponentialRetryBackoffLeaderChange() throws Exception {
586586

587587
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
588588
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
589-
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
589+
deliveryTimeoutMs, metrics, metricGrpName, time, null,
590590
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
591591

592592
long now = time.milliseconds();
@@ -1266,7 +1266,7 @@ public void testAdaptiveBuiltInPartitioner() throws Exception {
12661266
long totalSize = 1024 * 1024;
12671267
int batchSize = 128;
12681268
RecordAccumulator accum = new RecordAccumulator(logContext, batchSize, Compression.NONE, 0, 0L, 0L,
1269-
3200, config, metrics, "producer-metrics", time, new ApiVersions(), null,
1269+
3200, config, metrics, "producer-metrics", time, null,
12701270
new BufferPool(totalSize, batchSize, metrics, time, "producer-internal-metrics")) {
12711271
@Override
12721272
BuiltInPartitioner createBuiltInPartitioner(LogContext logContext, String topic,
@@ -1399,7 +1399,7 @@ public void testReadyAndDrainWhenABatchIsBeingRetried() throws InterruptedExcept
13991399
String metricGrpName = "producer-metrics";
14001400
final RecordAccumulator accum = new RecordAccumulator(logContext, batchSize,
14011401
Compression.NONE, lingerMs, retryBackoffMs, retryBackoffMaxMs,
1402-
deliveryTimeoutMs, metrics, metricGrpName, time, new ApiVersions(), null,
1402+
deliveryTimeoutMs, metrics, metricGrpName, time, null,
14031403
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName));
14041404

14051405
// Create 1 batch(batchA) to be produced to partition1.
@@ -1661,7 +1661,6 @@ private RecordAccumulator createTestRecordAccumulator(
16611661
metrics,
16621662
metricGrpName,
16631663
time,
1664-
new ApiVersions(),
16651664
txnManager,
16661665
new BufferPool(totalSize, batchSize, metrics, time, metricGrpName)) {
16671666
@Override

0 commit comments

Comments
 (0)