Skip to content

Commit c527530

Browse files
KAFKA-19042 Move ProducerCompressionTest, ProducerFailureHandlingTest, and ProducerIdExpirationTest to client-integration-tests module (#19319)
include three test case - ProducerCompressionTest - ProducerFailureHandlingTest - ProducerIdExpirationTest Reviewers: Ken Huang <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 321a380 commit c527530

File tree

7 files changed

+780
-688
lines changed

7 files changed

+780
-688
lines changed

Diff for: checkstyle/import-control-clients-integration-tests.xml

+10-6
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
-->
2020

2121
<import-control pkg="org.apache.kafka">
22-
<allow pkg="java" />
23-
<allow pkg="org.junit" />
22+
<allow pkg="java"/>
23+
<allow pkg="org.junit"/>
2424

25-
<!-- These are tests, allow whatever -->
26-
<allow pkg="org.apache.kafka"/>
27-
<allow pkg="org.junit" />
28-
<allow pkg="kafka"/>
25+
<!-- These are tests, allow whatever -->
26+
<allow pkg="org.apache.kafka"/>
27+
<allow pkg="org.junit"/>
28+
<allow pkg="kafka"/>
29+
30+
<subpackage name="clients.producer">
31+
<allow pkg="org.opentest4j"/>
32+
</subpackage>
2933

3034
</import-control>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.producer;
18+
19+
20+
import org.apache.kafka.clients.consumer.CloseOptions;
21+
import org.apache.kafka.clients.consumer.Consumer;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.common.TopicPartition;
24+
import org.apache.kafka.common.header.Header;
25+
import org.apache.kafka.common.header.internals.RecordHeader;
26+
import org.apache.kafka.common.header.internals.RecordHeaders;
27+
import org.apache.kafka.common.record.CompressionType;
28+
import org.apache.kafka.common.test.ClusterInstance;
29+
import org.apache.kafka.common.test.api.ClusterTest;
30+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
31+
import org.apache.kafka.common.test.api.Type;
32+
import org.apache.kafka.test.TestUtils;
33+
34+
import java.time.Duration;
35+
import java.util.ArrayList;
36+
import java.util.HashMap;
37+
import java.util.List;
38+
import java.util.Map;
39+
import java.util.Random;
40+
import java.util.concurrent.ExecutionException;
41+
import java.util.concurrent.Future;
42+
import java.util.concurrent.atomic.AtomicInteger;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
45+
46+
import static kafka.utils.TestUtils.consumeRecords;
47+
import static org.junit.jupiter.api.Assertions.assertEquals;
48+
import static org.junit.jupiter.api.Assertions.assertNull;
49+
import static org.junit.jupiter.api.Assertions.fail;
50+
51+
52+
@ClusterTestDefaults(types = {Type.KRAFT})
53+
class ProducerCompressionTest {
54+
55+
private final String topicName = "topic";
56+
private final int numRecords = 2000;
57+
58+
/**
59+
* testCompression
60+
* <p>
61+
* Compressed messages should be able to sent and consumed correctly
62+
*/
63+
@ClusterTest
64+
void testCompression(ClusterInstance cluster) throws ExecutionException, InterruptedException {
65+
for (CompressionType compression : CompressionType.values()) {
66+
processCompressionTest(cluster, compression);
67+
}
68+
}
69+
70+
71+
void processCompressionTest(ClusterInstance cluster, CompressionType compression) throws InterruptedException,
72+
ExecutionException {
73+
String compressionTopic = topicName + "_" + compression.name;
74+
cluster.createTopic(compressionTopic, 1, (short) 1);
75+
Map<String, Object> producerProps = new HashMap<>();
76+
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compression.name);
77+
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "66000");
78+
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "200");
79+
Consumer<byte[], byte[]> classicConsumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"));
80+
Consumer<byte[], byte[]> consumer = cluster.consumer(Map.of(ConsumerConfig.GROUP_PROTOCOL_CONFIG, "consumer"));
81+
try (Producer<byte[], byte[]> producer = cluster.producer(producerProps)) {
82+
int partition = 0;
83+
// prepare the messages
84+
List<String> messages = IntStream.range(0, numRecords).mapToObj(this::messageValue).toList();
85+
Header[] headerArr = new Header[]{new RecordHeader("key", "value".getBytes())};
86+
RecordHeaders headers = new RecordHeaders(headerArr);
87+
88+
// make sure the returned messages are correct
89+
long now = System.currentTimeMillis();
90+
List<Future<RecordMetadata>> responses = new ArrayList<>();
91+
messages.forEach(message -> {
92+
// 1. send message without key and header
93+
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now, null,
94+
message.getBytes())));
95+
// 2. send message with key, without header
96+
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now,
97+
String.valueOf(message.length()).getBytes(), message.getBytes())));
98+
// 3. send message with key and header
99+
responses.add(producer.send(new ProducerRecord<>(compressionTopic, null, now,
100+
String.valueOf(message.length()).getBytes(), message.getBytes(), headers)));
101+
});
102+
for (int offset = 0; offset < responses.size(); offset++) {
103+
assertEquals(offset, responses.get(offset).get().offset(), compression.name);
104+
}
105+
verifyConsumerRecords(consumer, messages, now, headerArr, partition, compressionTopic, compression.name);
106+
verifyConsumerRecords(classicConsumer, messages, now, headerArr, partition, compressionTopic,
107+
compression.name);
108+
} finally {
109+
// This consumer close very slowly, which may cause the entire test to time out, and we can't wait for
110+
// it to auto close
111+
consumer.close(CloseOptions.timeout(Duration.ofSeconds(1)));
112+
classicConsumer.close(CloseOptions.timeout(Duration.ofSeconds(1)));
113+
}
114+
}
115+
116+
private void verifyConsumerRecords(Consumer<byte[], byte[]> consumer, List<String> messages, long now,
117+
Header[] headerArr, int partition, String topic, String compression) {
118+
TopicPartition tp = new TopicPartition(topic, partition);
119+
consumer.assign(List.of(tp));
120+
consumer.seek(tp, 0);
121+
AtomicInteger num = new AtomicInteger(0);
122+
AtomicInteger flag = new AtomicInteger(0);
123+
consumeRecords(consumer, numRecords * 3, TestUtils.DEFAULT_MAX_WAIT_MS).foreach(record -> {
124+
String messageValue = messages.get(num.get());
125+
long offset = num.get() * 3L + flag.get();
126+
if (flag.get() == 0) {
127+
// verify message without key and header
128+
assertNull(record.key(), errorMessage(compression));
129+
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
130+
assertEquals(0, record.headers().toArray().length, errorMessage(compression));
131+
assertEquals(now, record.timestamp(), errorMessage(compression));
132+
assertEquals(offset, record.offset(), errorMessage(compression));
133+
} else if (flag.get() == 1) {
134+
// verify message with key, without header
135+
assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression));
136+
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
137+
assertEquals(0, record.headers().toArray().length, errorMessage(compression));
138+
assertEquals(now, record.timestamp(), errorMessage(compression));
139+
assertEquals(offset, record.offset(), errorMessage(compression));
140+
} else if (flag.get() == 2) {
141+
// verify message with key and header
142+
assertEquals(String.valueOf(messageValue.length()), new String(record.key()), errorMessage(compression));
143+
assertEquals(messageValue, new String(record.value()), errorMessage(compression));
144+
assertEquals(1, record.headers().toArray().length, errorMessage(compression));
145+
assertEquals(headerArr[0], record.headers().toArray()[0], errorMessage(compression));
146+
assertEquals(now, record.timestamp(), errorMessage(compression));
147+
assertEquals(offset, record.offset(), errorMessage(compression));
148+
} else {
149+
fail();
150+
}
151+
flagLoop(num, flag);
152+
return null;
153+
});
154+
}
155+
156+
private void flagLoop(AtomicInteger num, AtomicInteger flag) {
157+
if (flag.get() == 2) {
158+
num.incrementAndGet();
159+
flag.set(0);
160+
} else {
161+
flag.incrementAndGet();
162+
}
163+
}
164+
165+
private String messageValue(int length) {
166+
Random random = new Random();
167+
return IntStream.range(0, length)
168+
.map(i -> random.nextInt(TestUtils.LETTERS_AND_DIGITS.length()))
169+
.mapToObj(TestUtils.LETTERS_AND_DIGITS::charAt)
170+
.map(String::valueOf)
171+
.collect(Collectors.joining());
172+
}
173+
174+
private String errorMessage(String compression) {
175+
return String.format("Compression type: %s - Assertion failed", compression);
176+
}
177+
178+
}

0 commit comments

Comments
 (0)