Skip to content

Commit 23e7158

Browse files
authored
KAFKA-19002 Rewrite ListOffsetsIntegrationTest and move it to clients-integration-test (#19460)
the following tasks should be addressed in this ticket rewrite it by 1. new test infra 2. use java 3. move it to clients-integration-test Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent 6a4207f commit 23e7158

File tree

4 files changed

+303
-290
lines changed

4 files changed

+303
-290
lines changed

checkstyle/import-control-clients-integration-tests.xml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
<import-control pkg="org.apache.kafka">
2222
<allow pkg="java"/>
2323
<allow pkg="org.junit"/>
24+
<allow pkg="scala" />
2425

2526
<!-- These are tests, allow whatever -->
2627
<allow pkg="org.apache.kafka"/>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.admin;
18+
19+
import kafka.server.KafkaBroker;
20+
21+
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
22+
import org.apache.kafka.clients.producer.Producer;
23+
import org.apache.kafka.clients.producer.ProducerConfig;
24+
import org.apache.kafka.clients.producer.ProducerRecord;
25+
import org.apache.kafka.clients.producer.RecordMetadata;
26+
import org.apache.kafka.common.TopicPartition;
27+
import org.apache.kafka.common.config.TopicConfig;
28+
import org.apache.kafka.common.record.CompressionType;
29+
import org.apache.kafka.common.requests.ListOffsetsResponse;
30+
import org.apache.kafka.common.test.ClusterInstance;
31+
import org.apache.kafka.common.test.TestUtils;
32+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
33+
import org.apache.kafka.common.test.api.ClusterTest;
34+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
35+
import org.apache.kafka.common.test.api.Type;
36+
import org.apache.kafka.common.utils.Utils;
37+
38+
import org.junit.jupiter.api.AfterEach;
39+
import org.junit.jupiter.api.BeforeEach;
40+
41+
import java.io.File;
42+
import java.util.Arrays;
43+
import java.util.LinkedList;
44+
import java.util.List;
45+
import java.util.Map;
46+
import java.util.Optional;
47+
import java.util.Set;
48+
import java.util.concurrent.ExecutionException;
49+
import java.util.concurrent.Future;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.stream.Collectors;
52+
53+
import scala.jdk.javaapi.CollectionConverters;
54+
55+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
56+
import static org.junit.jupiter.api.Assertions.assertEquals;
57+
58+
@ClusterTestDefaults(
59+
types = {Type.KRAFT},
60+
brokers = 3,
61+
serverProperties = {
62+
@ClusterConfigProperty(key = "log.retention.ms", value = "-1"),
63+
}
64+
)
65+
public class ListOffsetsIntegrationTest {
66+
private static final String TOPIC = "topic";
67+
private static final String CUSTOM_CONFIG_TOPIC = "custom_topic";
68+
private static final short REPLICAS = 1;
69+
private static final int PARTITION = 1;
70+
private final ClusterInstance clusterInstance;
71+
private Admin adminClient;
72+
73+
ListOffsetsIntegrationTest(ClusterInstance clusterInstance) {
74+
this.clusterInstance = clusterInstance;
75+
}
76+
77+
@BeforeEach
78+
public void setup() throws InterruptedException {
79+
clusterInstance.waitForReadyBrokers();
80+
clusterInstance.createTopic(TOPIC, PARTITION, REPLICAS);
81+
adminClient = clusterInstance.admin();
82+
}
83+
84+
@AfterEach
85+
public void teardown() {
86+
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient");
87+
}
88+
89+
@ClusterTest
90+
public void testListMaxTimestampWithEmptyLog() throws InterruptedException, ExecutionException {
91+
ListOffsetsResultInfo maxTimestampOffset = runFetchOffsets(OffsetSpec.maxTimestamp(), TOPIC);
92+
assertEquals(ListOffsetsResponse.UNKNOWN_OFFSET, maxTimestampOffset.offset());
93+
assertEquals(ListOffsetsResponse.UNKNOWN_TIMESTAMP, maxTimestampOffset.timestamp());
94+
}
95+
96+
@ClusterTest
97+
public void testThreeCompressedRecordsInOneBatch() throws InterruptedException, ExecutionException {
98+
produceMessagesInOneBatch(CompressionType.GZIP.name, TOPIC);
99+
verifyListOffsets(TOPIC, 1);
100+
101+
// test LogAppendTime case
102+
setUpForLogAppendTimeCase();
103+
produceMessagesInOneBatch(CompressionType.GZIP.name, CUSTOM_CONFIG_TOPIC);
104+
// In LogAppendTime's case, the maxTimestampOffset should be the first message of the batch.
105+
// So in this one batch test, it'll be the first offset 0
106+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 0);
107+
}
108+
109+
@ClusterTest
110+
public void testThreeNonCompressedRecordsInOneBatch() throws ExecutionException, InterruptedException {
111+
produceMessagesInOneBatch(CompressionType.NONE.name, TOPIC);
112+
verifyListOffsets(TOPIC, 1);
113+
114+
// test LogAppendTime case
115+
setUpForLogAppendTimeCase();
116+
produceMessagesInOneBatch(CompressionType.NONE.name, CUSTOM_CONFIG_TOPIC);
117+
// In LogAppendTime's case, if the timestamps are the same, we choose the offset of the first record
118+
// thus, the maxTimestampOffset should be the first record of the batch.
119+
// So in this one batch test, it'll be the first offset which is 0
120+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 0);
121+
}
122+
123+
@ClusterTest
124+
public void testThreeNonCompressedRecordsInSeparateBatch() throws ExecutionException, InterruptedException {
125+
produceMessagesInOneBatch(CompressionType.NONE.name, TOPIC);
126+
verifyListOffsets(TOPIC, 1);
127+
128+
// test LogAppendTime case
129+
setUpForLogAppendTimeCase();
130+
produceMessagesInSeparateBatch(CompressionType.NONE.name, CUSTOM_CONFIG_TOPIC);
131+
// In LogAppendTime's case, if the timestamp is different, it should be the last one
132+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 2);
133+
}
134+
135+
@ClusterTest
136+
public void testThreeRecordsInOneBatchHavingDifferentCompressionTypeWithServer() throws InterruptedException, ExecutionException {
137+
createTopicWithConfig(CUSTOM_CONFIG_TOPIC, Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name));
138+
produceMessagesInOneBatch(CompressionType.NONE.name, CUSTOM_CONFIG_TOPIC);
139+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 1);
140+
}
141+
142+
@ClusterTest
143+
public void testThreeRecordsInSeparateBatchHavingDifferentCompressionTypeWithServer() throws InterruptedException, ExecutionException {
144+
createTopicWithConfig(CUSTOM_CONFIG_TOPIC, Map.of(TopicConfig.COMPRESSION_TYPE_CONFIG, CompressionType.LZ4.name));
145+
produceMessagesInOneBatch(CompressionType.NONE.name, CUSTOM_CONFIG_TOPIC);
146+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 1);
147+
}
148+
149+
@ClusterTest
150+
public void testThreeCompressedRecordsInSeparateBatch() throws InterruptedException, ExecutionException {
151+
produceMessagesInSeparateBatch(CompressionType.NONE.name, TOPIC);
152+
verifyListOffsets(TOPIC, 1);
153+
154+
// test LogAppendTime case
155+
setUpForLogAppendTimeCase();
156+
produceMessagesInSeparateBatch(CompressionType.GZIP.name, CUSTOM_CONFIG_TOPIC);
157+
// In LogAppendTime's case, the maxTimestampOffset is the message in the last batch since we advance the time
158+
// for each batch, So it'll be the last offset 2
159+
verifyListOffsets(CUSTOM_CONFIG_TOPIC, 2);
160+
}
161+
162+
private void produceMessagesInOneBatch(String compressionType, String topic) {
163+
List<ProducerRecord<byte[], byte[]>> records = new LinkedList<>();
164+
records.add(new ProducerRecord<>(topic, 0, 100L, null, new byte[10]));
165+
records.add(new ProducerRecord<>(topic, 0, 999L, null, new byte[10]));
166+
records.add(new ProducerRecord<>(topic, 0, 200L, null, new byte[10]));
167+
168+
// create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records),
169+
// so that we can confirm all records will be accumulated in producer until we flush them into one batch.
170+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
171+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(),
172+
ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE,
173+
ProducerConfig.LINGER_MS_CONFIG, Integer.MAX_VALUE,
174+
ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType))) {
175+
176+
List<Future<RecordMetadata>> futures = records.stream().map(producer::send).toList();
177+
producer.flush();
178+
179+
for (Future<RecordMetadata> future : futures) {
180+
assertDoesNotThrow(() -> future.get(600, TimeUnit.SECONDS));
181+
}
182+
}
183+
}
184+
185+
private void produceMessagesInSeparateBatch(String compressionType, String topic) throws ExecutionException, InterruptedException {
186+
ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, 0, 100L, null, new byte[10]);
187+
ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, 0, 999L, null, new byte[10]);
188+
ProducerRecord<byte[], byte[]> record3 = new ProducerRecord<>(topic, 0, 200L, null, new byte[10]);
189+
190+
// create a producer with large linger.ms and enough batch.size (default is enough for three 10 bytes records),
191+
// so that we can confirm all records will be accumulated in producer until we flush them into one batch.
192+
try (Producer<byte[], byte[]> producer = clusterInstance.producer(Map.of(
193+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers(),
194+
ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType))) {
195+
196+
Future<RecordMetadata> future1 = producer.send(record1);
197+
future1.get();
198+
// sleep some time in order to advance the server time
199+
// after each record sent to make sure the time changed when appendTime is used
200+
Thread.sleep(100);
201+
202+
Future<RecordMetadata> future2 = producer.send(record2);
203+
future2.get();
204+
Thread.sleep(100);
205+
206+
Future<RecordMetadata> future3 = producer.send(record3);
207+
future3.get();
208+
}
209+
}
210+
211+
private void verifyListOffsets(String topic, int expectedMaxTimestampOffset) throws ExecutionException, InterruptedException {
212+
213+
// case 0: test the offsets from leader's append path
214+
checkListOffsets(topic, expectedMaxTimestampOffset);
215+
216+
// case 1: test the offsets from follower's append path.
217+
// we make a follower be the new leader to handle the ListOffsetRequest
218+
int previousLeader = clusterInstance.getLeaderBrokerId(new TopicPartition(topic, 0));
219+
int newLeader = clusterInstance.brokerIds().stream().filter(id -> id != previousLeader).findAny().get();
220+
221+
// change the leader to new one
222+
adminClient.alterPartitionReassignments(
223+
Map.of(new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(List.of(newLeader))))
224+
).all().get();
225+
// wait for all reassignments get completed
226+
TestUtils.waitForCondition(() -> {
227+
try {
228+
return adminClient.listPartitionReassignments().reassignments().get().isEmpty();
229+
} catch (InterruptedException | ExecutionException e) {
230+
return false;
231+
}
232+
}, "There still are ongoing reassignments");
233+
// make sure we are able to see the new leader
234+
TestUtils.waitForCondition(() -> {
235+
try {
236+
return clusterInstance.getLeaderBrokerId(new TopicPartition(topic, 0)) == newLeader;
237+
} catch (InterruptedException | ExecutionException e) {
238+
return false;
239+
}
240+
}, String.format("expected leader: %d but actual: %d", newLeader, clusterInstance.getLeaderBrokerId(new TopicPartition(topic, 0))));
241+
checkListOffsets(topic, expectedMaxTimestampOffset);
242+
243+
// case 2: test the offsets from recovery path.
244+
// server will rebuild offset index according to log files if the index files are nonexistent
245+
Set<String> indexFiles = clusterInstance.brokers().values().stream().flatMap(broker ->
246+
CollectionConverters.asJava(broker.config().logDirs()).stream()
247+
).collect(Collectors.toUnmodifiableSet());
248+
clusterInstance.brokers().values().forEach(KafkaBroker::shutdown);
249+
indexFiles.forEach(root -> {
250+
File[] files = new File(String.format("%s/%s-0", root, topic)).listFiles();
251+
if (files != null)
252+
Arrays.stream(files).forEach(f -> {
253+
if (f.getName().endsWith(".index"))
254+
f.delete();
255+
});
256+
});
257+
258+
clusterInstance.brokers().values().forEach(b -> {
259+
if (b.isShutdown())
260+
b.startup();
261+
});
262+
263+
Utils.closeQuietly(adminClient, "ListOffsetsAdminClient");
264+
adminClient = clusterInstance.admin(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()));
265+
checkListOffsets(topic, expectedMaxTimestampOffset);
266+
}
267+
268+
private void checkListOffsets(String topic, int expectedMaxTimestampOffset) throws ExecutionException, InterruptedException {
269+
ListOffsetsResultInfo earliestOffset = runFetchOffsets(OffsetSpec.earliest(), topic);
270+
assertEquals(0, earliestOffset.offset());
271+
272+
ListOffsetsResultInfo latestOffset = runFetchOffsets(OffsetSpec.latest(), topic);
273+
assertEquals(3, latestOffset.offset());
274+
275+
ListOffsetsResultInfo maxTimestampOffset = runFetchOffsets(OffsetSpec.maxTimestamp(), topic);
276+
assertEquals(expectedMaxTimestampOffset, maxTimestampOffset.offset());
277+
278+
// the epoch is related to the returned offset.
279+
// Hence, it should be zero (the earliest leader epoch), regardless of new leader election
280+
assertEquals(Optional.of(0), maxTimestampOffset.leaderEpoch());
281+
}
282+
283+
private ListOffsetsResultInfo runFetchOffsets(OffsetSpec offsetSpec,
284+
String topic) throws InterruptedException, ExecutionException {
285+
TopicPartition tp = new TopicPartition(topic, 0);
286+
return adminClient.listOffsets(Map.of(tp, offsetSpec), new ListOffsetsOptions()).all().get().get(tp);
287+
}
288+
289+
private void setUpForLogAppendTimeCase() throws InterruptedException {
290+
createTopicWithConfig(CUSTOM_CONFIG_TOPIC, Map.of(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, "LogAppendTime"));
291+
}
292+
293+
private void createTopicWithConfig(String topic, Map<String, String> props) throws InterruptedException {
294+
clusterInstance.createTopic(topic, PARTITION, REPLICAS, props);
295+
}
296+
}

0 commit comments

Comments
 (0)