Skip to content

Commit 34a87d3

Browse files
KAFKA-19042 Move TransactionsWithMaxInFlightOneTest to client-integration-tests module (#19289)
Use Java to rewrite `TransactionsWithMaxInFlightOneTest` by new test infra and move it to client-integration-tests module. Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 588d107 commit 34a87d3

File tree

2 files changed

+126
-136
lines changed

2 files changed

+126
-136
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.ConsumerRecord;
22+
import org.apache.kafka.clients.consumer.ConsumerRecords;
23+
import org.apache.kafka.clients.consumer.GroupProtocol;
24+
import org.apache.kafka.clients.producer.Producer;
25+
import org.apache.kafka.clients.producer.ProducerConfig;
26+
import org.apache.kafka.clients.producer.ProducerRecord;
27+
import org.apache.kafka.common.header.Header;
28+
import org.apache.kafka.common.header.internals.RecordHeader;
29+
import org.apache.kafka.common.test.ClusterInstance;
30+
import org.apache.kafka.common.test.TestUtils;
31+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
32+
import org.apache.kafka.common.test.api.ClusterTest;
33+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
34+
import org.apache.kafka.common.test.api.Type;
35+
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
36+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
37+
import org.apache.kafka.coordinator.transaction.TransactionStateManagerConfig;
38+
import org.apache.kafka.server.config.ReplicationConfigs;
39+
import org.apache.kafka.server.config.ServerConfigs;
40+
import org.apache.kafka.server.config.ServerLogConfigs;
41+
42+
import java.time.Duration;
43+
import java.util.ArrayList;
44+
import java.util.Collections;
45+
import java.util.Iterator;
46+
import java.util.List;
47+
import java.util.Map;
48+
49+
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
50+
import static org.junit.jupiter.api.Assertions.assertEquals;
51+
import static org.junit.jupiter.api.Assertions.assertTrue;
52+
53+
@ClusterTestDefaults(
54+
types = {Type.CO_KRAFT},
55+
serverProperties = {
56+
@ClusterConfigProperty(key = ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, value = "false"),
57+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
58+
@ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
59+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_PARTITIONS_CONFIG, value = "1"),
60+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
61+
@ClusterConfigProperty(key = TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG, value = "1"),
62+
@ClusterConfigProperty(key = ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, value = "true"),
63+
@ClusterConfigProperty(key = "log.unclean.leader.election.enable", value = "false"),
64+
@ClusterConfigProperty(key = ReplicationConfigs.AUTO_LEADER_REBALANCE_ENABLE_CONFIG, value = "false"),
65+
@ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0"),
66+
@ClusterConfigProperty(key = TransactionStateManagerConfig.TRANSACTIONS_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS_CONFIG, value = "200")
67+
}
68+
)
69+
public class TransactionsWithMaxInFlightOneTest {
70+
private static final String TOPIC1 = "topic1";
71+
private static final String TOPIC2 = "topic2";
72+
private static final String HEADER_KEY = "transactionStatus";
73+
private static final byte[] ABORTED_VALUE = "aborted".getBytes();
74+
private static final byte[] COMMITTED_VALUE = "committed".getBytes();
75+
76+
@ClusterTest
77+
public void testTransactionalProducerSingleBrokerMaxInFlightOne(ClusterInstance clusterInstance) throws InterruptedException {
78+
// We want to test with one broker to verify multiple requests queued on a connection
79+
assertEquals(1, clusterInstance.brokers().size());
80+
81+
clusterInstance.createTopic(TOPIC1, 4, (short) 1);
82+
clusterInstance.createTopic(TOPIC2, 4, (short) 1);
83+
84+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
85+
ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactional-producer",
86+
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1
87+
))
88+
) {
89+
producer.initTransactions();
90+
91+
producer.beginTransaction();
92+
producer.send(new ProducerRecord<>(TOPIC2, null, "2".getBytes(), "2".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
93+
producer.send(new ProducerRecord<>(TOPIC1, null, "4".getBytes(), "4".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, ABORTED_VALUE))));
94+
producer.flush();
95+
producer.abortTransaction();
96+
97+
producer.beginTransaction();
98+
producer.send(new ProducerRecord<>(TOPIC1, null, "1".getBytes(), "1".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
99+
producer.send(new ProducerRecord<>(TOPIC2, null, "3".getBytes(), "3".getBytes(), Collections.singleton(new RecordHeader(HEADER_KEY, COMMITTED_VALUE))));
100+
producer.commitTransaction();
101+
102+
for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
103+
ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<>();
104+
try (Consumer<byte[], byte[]> consumer = clusterInstance.consumer(Map.of(
105+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name(),
106+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
107+
ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"
108+
)
109+
)) {
110+
consumer.subscribe(List.of(TOPIC1, TOPIC2));
111+
TestUtils.waitForCondition(() -> {
112+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
113+
records.forEach(consumerRecords::add);
114+
return consumerRecords.size() == 2;
115+
}, 15_000, () -> "Consumer with protocol " + groupProtocol.name + " should consume 2 records, but get " + consumerRecords.size());
116+
}
117+
consumerRecords.forEach(record -> {
118+
Iterator<Header> headers = record.headers().headers(HEADER_KEY).iterator();
119+
assertTrue(headers.hasNext());
120+
Header header = headers.next();
121+
assertArrayEquals(COMMITTED_VALUE, header.value(), "Record does not have the expected header value");
122+
});
123+
}
124+
}
125+
}
126+
}

core/src/test/scala/integration/kafka/api/TransactionsWithMaxInFlightOneTest.scala

-136
This file was deleted.

0 commit comments

Comments
 (0)