Skip to content

Commit 3d96b20

Browse files
KAFKA-19042 Move TransactionsExpirationTest to client-integration-tests module (#19288)
Use Java to rewrite `TransactionsExpirationTest` by new test infra and move it to client-integration-tests module. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent ebb6281 commit 3d96b20

File tree

3 files changed

+327
-252
lines changed

3 files changed

+327
-252
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.clients.admin.Admin;
20+
import org.apache.kafka.clients.admin.ProducerState;
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.clients.consumer.GroupProtocol;
26+
import org.apache.kafka.clients.producer.Producer;
27+
import org.apache.kafka.clients.producer.ProducerConfig;
28+
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.clients.producer.RecordMetadata;
30+
import org.apache.kafka.common.KafkaException;
31+
import org.apache.kafka.common.TopicPartition;
32+
import org.apache.kafka.common.errors.InvalidPidMappingException;
33+
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
34+
import org.apache.kafka.common.header.Header;
35+
import org.apache.kafka.common.header.internals.RecordHeader;
36+
import org.apache.kafka.common.test.ClusterInstance;
37+
import org.apache.kafka.common.test.TestUtils;
38+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
39+
import org.apache.kafka.common.test.api.ClusterFeature;
40+
import org.apache.kafka.common.test.api.ClusterTest;
41+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
42+
import org.apache.kafka.common.test.api.Type;
43+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
44+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
45+
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
46+
import org.apache.kafka.server.common.Feature;
47+
import org.apache.kafka.server.config.ReplicationConfigs;
48+
import org.apache.kafka.server.config.ServerConfigs;
49+
import org.apache.kafka.server.config.ServerLogConfigs;
50+
51+
import java.time.Duration;
52+
import java.util.ArrayList;
53+
import java.util.Collections;
54+
import java.util.Iterator;
55+
import java.util.List;
56+
import java.util.Map;
57+
import java.util.concurrent.ExecutionException;
58+
import java.util.concurrent.Future;
59+
60+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
61+
import static org.junit.jupiter.api.Assertions.assertEquals;
62+
import static org.junit.jupiter.api.Assertions.assertThrows;
63+
import static org.junit.jupiter.api.Assertions.assertTrue;
64+
65+
@ClusterTestDefaults(
66+
types = {Type.CO_KRAFT},
67+
brokers = 3,
68+
serverProperties = {
69+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
70+
// Set a smaller value for the number of partitions for the __consumer_offsets topic
71+
// so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long.
72+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
73+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "3"),
74+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "2"),
75+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "2"),
76+
@ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
77+
@ClusterConfigProperty(key = "log.unclean.leader.election.enable", value = "false"),
78+
@ClusterConfigProperty(key = ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
79+
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
80+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200"),
81+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONAL_ID_EXPIRATION_MS_CONFIG, value = "10000"),
82+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_REMOVE_EXPIRED_TRANSACTIONAL_ID_CLEANUP_INTERVAL_MS_CONFIG, value = "500"),
83+
@ClusterConfigProperty(key = TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, value = "5000"),
84+
@ClusterConfigProperty(key = TransactionLogConfig.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, value = "500"),
85+
}
86+
)
87+
public class TransactionsExpirationTest {
88+
private static final String TOPIC1 = "topic1";
89+
private static final String TOPIC2 = "topic2";
90+
private static final String TRANSACTION_ID = "transactionalProducer";
91+
private static final String HEADER_KEY = "transactionStatus";
92+
private static final byte[] ABORTED_VALUE = "aborted".getBytes();
93+
private static final byte[] COMMITTED_VALUE = "committed".getBytes();
94+
private static final TopicPartition TOPIC1_PARTITION0 = new TopicPartition(TOPIC1, 0);
95+
96+
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)})
97+
public void testFatalErrorAfterInvalidProducerIdMappingWithTV1(ClusterInstance clusterInstance) throws InterruptedException {
98+
testFatalErrorAfterInvalidProducerIdMapping(clusterInstance);
99+
}
100+
101+
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})
102+
public void testFatalErrorAfterInvalidProducerIdMappingWithTV2(ClusterInstance clusterInstance) throws InterruptedException {
103+
testFatalErrorAfterInvalidProducerIdMapping(clusterInstance);
104+
}
105+
106+
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 1)})
107+
public void testTransactionAfterProducerIdExpiresWithTV1(ClusterInstance clusterInstance) throws InterruptedException {
108+
testTransactionAfterProducerIdExpires(clusterInstance, false);
109+
}
110+
111+
@ClusterTest(features = {@ClusterFeature(feature = Feature.TRANSACTION_VERSION, version = 2)})
112+
public void testTransactionAfterProducerIdExpiresWithTV2(ClusterInstance clusterInstance) throws InterruptedException {
113+
testTransactionAfterProducerIdExpires(clusterInstance, true);
114+
}
115+
116+
private void testFatalErrorAfterInvalidProducerIdMapping(ClusterInstance clusterInstance) throws InterruptedException {
117+
clusterInstance.createTopic(TOPIC1, 4, (short) 3);
118+
clusterInstance.createTopic(TOPIC2, 4, (short) 3);
119+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
120+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
121+
))
122+
) {
123+
producer.initTransactions();
124+
// Start and then abort a transaction to allow the transactional ID to expire.
125+
producer.beginTransaction();
126+
producer.send(new ProducerRecord<>(TOPIC1, 0, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
127+
producer.send(new ProducerRecord<>(TOPIC2, 0, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
128+
producer.abortTransaction();
129+
130+
// Check the transactional state exists and then wait for it to expire.
131+
waitUntilTransactionalStateExists(clusterInstance);
132+
waitUntilTransactionalStateExpires(clusterInstance);
133+
134+
// Start a new transaction and attempt to send, triggering an AddPartitionsToTxnRequest that will fail
135+
// due to the expired transactional ID, resulting in a fatal error.
136+
producer.beginTransaction();
137+
Future<RecordMetadata> failedFuture = producer.send(
138+
new ProducerRecord<>(TOPIC1, 3, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
139+
TestUtils.waitForCondition(failedFuture::isDone, "Producer future never completed.");
140+
org.apache.kafka.test.TestUtils.assertFutureThrows(InvalidPidMappingException.class, failedFuture);
141+
142+
// Assert that aborting the transaction throws a KafkaException due to the fatal error.
143+
assertThrows(KafkaException.class, producer::abortTransaction);
144+
}
145+
146+
// Reinitialize to recover from the fatal error.
147+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
148+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
149+
))
150+
) {
151+
producer.initTransactions();
152+
// Proceed with a new transaction after reinitializing.
153+
producer.beginTransaction();
154+
producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
155+
producer.send(new ProducerRecord<>(TOPIC1, 2, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
156+
producer.send(new ProducerRecord<>(TOPIC2, null, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
157+
producer.send(new ProducerRecord<>(TOPIC1, 3, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
158+
producer.commitTransaction();
159+
160+
waitUntilTransactionalStateExists(clusterInstance);
161+
}
162+
163+
assertConsumeRecords(clusterInstance, List.of(TOPIC1, TOPIC2), 4);
164+
}
165+
166+
private void testTransactionAfterProducerIdExpires(ClusterInstance clusterInstance, boolean isTV2Enabled) throws InterruptedException {
167+
clusterInstance.createTopic(TOPIC1, 4, (short) 3);
168+
long oldProducerId = 0;
169+
long oldProducerEpoch = 0;
170+
171+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
172+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
173+
))
174+
) {
175+
producer.initTransactions();
176+
177+
// Start and then abort a transaction to allow the producer ID to expire.
178+
producer.beginTransaction();
179+
producer.send(new ProducerRecord<>(TOPIC1, 0, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
180+
producer.flush();
181+
182+
// Ensure producer IDs are added.
183+
List<ProducerState> producerStates = new ArrayList<>();
184+
TestUtils.waitForCondition(() -> {
185+
try {
186+
producerStates.addAll(producerState(clusterInstance));
187+
return !producerStates.isEmpty();
188+
} catch (ExecutionException | InterruptedException e) {
189+
return false;
190+
}
191+
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not propagate quickly");
192+
assertEquals(1, producerStates.size(), "Unexpected producer to " + TOPIC1_PARTITION0);
193+
oldProducerId = producerStates.get(0).producerId();
194+
oldProducerEpoch = producerStates.get(0).producerEpoch();
195+
196+
producer.abortTransaction();
197+
198+
// Wait for the producer ID to expire.
199+
TestUtils.waitForCondition(() -> {
200+
try {
201+
return producerState(clusterInstance).isEmpty();
202+
} catch (ExecutionException | InterruptedException e) {
203+
return false;
204+
}
205+
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not expire.");
206+
}
207+
208+
// Create a new producer to check that we retain the producer ID in transactional state.
209+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
210+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, TRANSACTION_ID
211+
))
212+
) {
213+
producer.initTransactions();
214+
215+
// Start a new transaction and attempt to send. This should work since only the producer ID was removed from its mapping in ProducerStateManager.
216+
producer.beginTransaction();
217+
producer.send(new ProducerRecord<>(TOPIC1, 0, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
218+
producer.send(new ProducerRecord<>(TOPIC1, 3, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
219+
producer.commitTransaction();
220+
221+
// Producer IDs should repopulate.
222+
List<ProducerState> producerStates = new ArrayList<>();
223+
TestUtils.waitForCondition(() -> {
224+
try {
225+
producerStates.addAll(producerState(clusterInstance));
226+
return !producerStates.isEmpty();
227+
} catch (ExecutionException | InterruptedException e) {
228+
return false;
229+
}
230+
}, "Producer IDs for " + TOPIC1_PARTITION0 + " did not propagate quickly");
231+
assertEquals(1, producerStates.size(), "Unexpected producer to " + TOPIC1_PARTITION0);
232+
long newProducerId = producerStates.get(0).producerId();
233+
long newProducerEpoch = producerStates.get(0).producerEpoch();
234+
235+
// Because the transaction IDs outlive the producer IDs, creating a producer with the same transactional id
236+
// soon after the first will re-use the same producerId, while bumping the epoch to indicate that they are distinct.
237+
assertEquals(oldProducerId, newProducerId);
238+
if (isTV2Enabled) {
239+
// TV2 bumps epoch on EndTxn, and the final commit may or may not have bumped the epoch in the producer state.
240+
// The epoch should be at least oldProducerEpoch + 2 for the first commit and the restarted producer.
241+
assertTrue(oldProducerEpoch + 2 <= newProducerEpoch);
242+
} else {
243+
assertEquals(oldProducerEpoch + 1, newProducerEpoch);
244+
}
245+
246+
assertConsumeRecords(clusterInstance, List.of(TOPIC1), 2);
247+
}
248+
}
249+
250+
private void waitUntilTransactionalStateExists(ClusterInstance clusterInstance) throws InterruptedException {
251+
try (Admin admin = clusterInstance.admin()) {
252+
TestUtils.waitForCondition(() -> {
253+
try {
254+
admin.describeTransactions(List.of(TRANSACTION_ID)).description(TRANSACTION_ID).get();
255+
return true;
256+
} catch (Exception e) {
257+
return false;
258+
}
259+
}, "Transactional state was never added.");
260+
}
261+
}
262+
263+
private void waitUntilTransactionalStateExpires(ClusterInstance clusterInstance) throws InterruptedException {
264+
try (Admin admin = clusterInstance.admin()) {
265+
TestUtils.waitForCondition(() -> {
266+
try {
267+
admin.describeTransactions(List.of(TRANSACTION_ID)).description(TRANSACTION_ID).get();
268+
return false;
269+
} catch (Exception e) {
270+
return e.getCause() instanceof TransactionalIdNotFoundException;
271+
}
272+
}, "Transaction state never expired.");
273+
}
274+
}
275+
276+
private List<ProducerState> producerState(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
277+
try (Admin admin = clusterInstance.admin()) {
278+
return admin.describeProducers(List.of(TOPIC1_PARTITION0)).partitionResult(TOPIC1_PARTITION0).get().activeProducers();
279+
}
280+
}
281+
282+
private void assertConsumeRecords(
283+
ClusterInstance clusterInstance,
284+
List<String> topics,
285+
int expectedCount
286+
) throws InterruptedException {
287+
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
288+
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
289+
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
290+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
291+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
292+
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
293+
)
294+
)) {
295+
consumer.subscribe(topics);
296+
TestUtils.waitForCondition(() -> {
297+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
298+
records.forEach(consumerRecords::add);
299+
return consumerRecords.size() == expectedCount;
300+
}, 15_000, () -> "Consumer with protocol " + groupProtocol.name + " should consume " + expectedCount + " records, but get " + consumerRecords.size());
301+
}
302+
consumerRecords.forEach(record -> {
303+
Iterator<Header> headers = record.headers().headers(HEADER_KEY).iterator();
304+
assertTrue(headers.hasNext());
305+
Header header = headers.next();
306+
assertArrayEquals(COMMITTED_VALUE, header.value(), "Record does not have the expected header value.");
307+
});
308+
}
309+
}
310+
}

0 commit comments

Comments
 (0)