Skip to content

Commit 1ded681

Browse files
authored
MINOR: Add 4.0.0 to streams system tests (#19239)
This patch adds 4.0.0 to streams system tests. Reviewers: Matthias J. Sax <[email protected]>
1 parent e21c46a commit 1ded681

File tree

14 files changed

+1359
-11
lines changed

14 files changed

+1359
-11
lines changed

build.gradle

+16
Original file line numberDiff line numberDiff line change
@@ -2738,6 +2738,7 @@ project(':streams') {
27382738
':streams:upgrade-system-tests-37:test',
27392739
':streams:upgrade-system-tests-38:test',
27402740
':streams:upgrade-system-tests-39:test',
2741+
':streams:upgrade-system-tests-40:test',
27412742
':streams:examples:test'
27422743
]
27432744
)
@@ -3252,6 +3253,21 @@ project(':streams:upgrade-system-tests-39') {
32523253
}
32533254
}
32543255

3256+
project(':streams:upgrade-system-tests-40') {
3257+
base {
3258+
archivesName = "kafka-streams-upgrade-system-tests-40"
3259+
}
3260+
3261+
dependencies {
3262+
testImplementation libs.kafkaStreams_40
3263+
testRuntimeOnly libs.junitJupiter
3264+
}
3265+
3266+
systemTestLibs {
3267+
dependsOn testJar
3268+
}
3269+
}
3270+
32553271
project(':jmh-benchmarks') {
32563272

32573273
apply plugin: 'com.gradleup.shadow'

settings.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ include 'clients',
116116
'streams:upgrade-system-tests-37',
117117
'streams:upgrade-system-tests-38',
118118
'streams:upgrade-system-tests-39',
119+
'streams:upgrade-system-tests-40',
119120
'tools',
120121
'tools:tools-api',
121122
'transaction-coordinator',

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,12 @@ public class StreamsConfig extends AbstractConfig {
431431
@SuppressWarnings("WeakerAccess")
432432
public static final String UPGRADE_FROM_39 = UpgradeFromValues.UPGRADE_FROM_39.toString();
433433

434+
/**
435+
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 4.0.x}.
436+
*/
437+
@SuppressWarnings("WeakerAccess")
438+
public static final String UPGRADE_FROM_40 = UpgradeFromValues.UPGRADE_FROM_40.toString();
439+
434440

435441
/**
436442
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.

streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public enum UpgradeFromValues {
4141
UPGRADE_FROM_36("3.6"),
4242
UPGRADE_FROM_37("3.7"),
4343
UPGRADE_FROM_38("3.8"),
44-
UPGRADE_FROM_39("3.9");
44+
UPGRADE_FROM_39("3.9"),
45+
UPGRADE_FROM_40("4.0");
4546

4647
private final String value;
4748

streams/upgrade-system-tests-38/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void main(final String[] args) throws Exception {
4646

4747
final Properties streamsProperties = Utils.loadProps(propFileName);
4848

49-
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.7)");
49+
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.8)");
5050
System.out.println("props=" + streamsProperties);
5151

5252
final StreamsBuilder builder = new StreamsBuilder();

streams/upgrade-system-tests-39/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public static void main(final String[] args) throws Exception {
4646

4747
final Properties streamsProperties = Utils.loadProps(propFileName);
4848

49-
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.7)");
49+
System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.9)");
5050
System.out.println("props=" + streamsProperties);
5151

5252
final StreamsBuilder builder = new StreamsBuilder();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,299 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.tests;
18+
19+
import org.apache.kafka.common.serialization.Serdes;
20+
import org.apache.kafka.common.utils.Bytes;
21+
import org.apache.kafka.common.utils.KafkaThread;
22+
import org.apache.kafka.common.utils.Utils;
23+
import org.apache.kafka.streams.KafkaStreams;
24+
import org.apache.kafka.streams.KeyValue;
25+
import org.apache.kafka.streams.StreamsBuilder;
26+
import org.apache.kafka.streams.StreamsConfig;
27+
import org.apache.kafka.streams.Topology;
28+
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
29+
import org.apache.kafka.streams.kstream.Consumed;
30+
import org.apache.kafka.streams.kstream.Grouped;
31+
import org.apache.kafka.streams.kstream.KGroupedStream;
32+
import org.apache.kafka.streams.kstream.KStream;
33+
import org.apache.kafka.streams.kstream.KTable;
34+
import org.apache.kafka.streams.kstream.Materialized;
35+
import org.apache.kafka.streams.kstream.Produced;
36+
import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
37+
import org.apache.kafka.streams.kstream.TimeWindows;
38+
import org.apache.kafka.streams.kstream.Windowed;
39+
import org.apache.kafka.streams.state.Stores;
40+
import org.apache.kafka.streams.state.WindowStore;
41+
42+
import java.io.File;
43+
import java.io.IOException;
44+
import java.nio.file.Files;
45+
import java.time.Duration;
46+
import java.time.Instant;
47+
import java.util.Properties;
48+
import java.util.concurrent.CountDownLatch;
49+
import java.util.concurrent.TimeUnit;
50+
51+
import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
52+
53+
public class SmokeTestClient extends SmokeTestUtil {
54+
55+
private final String name;
56+
57+
private KafkaStreams streams;
58+
private boolean uncaughtException = false;
59+
private boolean started;
60+
private volatile boolean closed;
61+
62+
private static void addShutdownHook(final String name, final Runnable runnable) {
63+
if (name != null) {
64+
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
65+
} else {
66+
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
67+
}
68+
}
69+
70+
private static File tempDirectory() {
71+
final String prefix = "kafka-";
72+
final File file;
73+
try {
74+
file = Files.createTempDirectory(prefix).toFile();
75+
} catch (final IOException ex) {
76+
throw new RuntimeException("Failed to create a temp dir", ex);
77+
}
78+
file.deleteOnExit();
79+
80+
addShutdownHook("delete-temp-file-shutdown-hook", () -> {
81+
try {
82+
Utils.delete(file);
83+
} catch (final IOException e) {
84+
System.out.println("Error deleting " + file.getAbsolutePath());
85+
e.printStackTrace(System.out);
86+
}
87+
});
88+
89+
return file;
90+
}
91+
92+
public SmokeTestClient(final String name) {
93+
this.name = name;
94+
}
95+
96+
public boolean started() {
97+
return started;
98+
}
99+
100+
public boolean closed() {
101+
return closed;
102+
}
103+
104+
public void start(final Properties streamsProperties) {
105+
final Topology build = getTopology();
106+
streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
107+
108+
final CountDownLatch countDownLatch = new CountDownLatch(1);
109+
streams.setStateListener((newState, oldState) -> {
110+
System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState);
111+
if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) {
112+
started = true;
113+
countDownLatch.countDown();
114+
}
115+
116+
if (newState == KafkaStreams.State.NOT_RUNNING) {
117+
closed = true;
118+
}
119+
});
120+
121+
streams.setUncaughtExceptionHandler(e -> {
122+
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
123+
System.out.println(name + ": FATAL: An unexpected exception is encountered: " + e);
124+
e.printStackTrace(System.out);
125+
uncaughtException = true;
126+
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
127+
});
128+
129+
addShutdownHook("streams-shutdown-hook", this::close);
130+
131+
streams.start();
132+
try {
133+
if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
134+
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute");
135+
}
136+
} catch (final InterruptedException e) {
137+
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
138+
e.printStackTrace(System.out);
139+
}
140+
System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
141+
System.out.println(name + " started at " + Instant.now());
142+
}
143+
144+
public void closeAsync() {
145+
streams.close(Duration.ZERO);
146+
}
147+
148+
public void close() {
149+
final boolean closed = streams.close(Duration.ofMinutes(1));
150+
151+
if (closed && !uncaughtException) {
152+
System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
153+
} else if (closed) {
154+
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
155+
} else {
156+
System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close");
157+
}
158+
}
159+
160+
private Properties getStreamsConfig(final Properties props) {
161+
final Properties fullProps = new Properties(props);
162+
fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
163+
fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
164+
fullProps.put(StreamsConfig.STATE_DIR_CONFIG, tempDirectory().getAbsolutePath());
165+
fullProps.putAll(props);
166+
return fullProps;
167+
}
168+
169+
public Topology getTopology() {
170+
final StreamsBuilder builder = new StreamsBuilder();
171+
final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
172+
final KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
173+
source.filterNot((k, v) -> k.equals("flush"))
174+
.to("echo", Produced.with(stringSerde, intSerde));
175+
final KStream<String, Integer> data = source.filter((key, value) -> value == null || value != END);
176+
data.process(SmokeTestUtil.printProcessorSupplier("data", name));
177+
178+
// min
179+
final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
180+
181+
final KTable<Windowed<String>, Integer> minAggregation = groupedData
182+
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofMinutes(1)))
183+
.aggregate(
184+
() -> Integer.MAX_VALUE,
185+
(aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate,
186+
Materialized
187+
.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-min")
188+
.withValueSerde(intSerde)
189+
.withRetention(Duration.ofHours(25))
190+
);
191+
192+
streamify(minAggregation, "min-raw");
193+
194+
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())), "min-suppressed");
195+
196+
minAggregation
197+
.toStream(new Unwindow<>())
198+
.filterNot((k, v) -> k.equals("flush"))
199+
.to("min", Produced.with(stringSerde, intSerde));
200+
201+
final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
202+
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2), Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
203+
.reduce(Integer::sum);
204+
205+
streamify(smallWindowSum, "sws-raw");
206+
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())), "sws-suppressed");
207+
208+
final KTable<String, Integer> minTable = builder.table(
209+
"min",
210+
Consumed.with(stringSerde, intSerde),
211+
Materialized.as("minStoreName"));
212+
213+
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
214+
215+
// max
216+
groupedData
217+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
218+
.aggregate(
219+
() -> Integer.MIN_VALUE,
220+
(aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate,
221+
Materialized.<String, Integer, WindowStore<Bytes, byte[]>>as("uwin-max").withValueSerde(intSerde))
222+
.toStream(new Unwindow<>())
223+
.filterNot((k, v) -> k.equals("flush"))
224+
.to("max", Produced.with(stringSerde, intSerde));
225+
226+
final KTable<String, Integer> maxTable = builder.table(
227+
"max",
228+
Consumed.with(stringSerde, intSerde),
229+
Materialized.as("maxStoreName"));
230+
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
231+
232+
// sum
233+
groupedData
234+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
235+
.aggregate(
236+
() -> 0L,
237+
(aggKey, value, aggregate) -> (long) value + aggregate,
238+
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("win-sum").withValueSerde(longSerde))
239+
.toStream(new Unwindow<>())
240+
.filterNot((k, v) -> k.equals("flush"))
241+
.to("sum", Produced.with(stringSerde, longSerde));
242+
243+
final Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
244+
final KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
245+
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
246+
247+
// cnt
248+
groupedData
249+
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
250+
.count(Materialized.as("uwin-cnt"))
251+
.toStream(new Unwindow<>())
252+
.filterNot((k, v) -> k.equals("flush"))
253+
.to("cnt", Produced.with(stringSerde, longSerde));
254+
255+
final KTable<String, Long> cntTable = builder.table(
256+
"cnt",
257+
Consumed.with(stringSerde, longSerde),
258+
Materialized.as("cntStoreName"));
259+
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
260+
261+
// dif
262+
maxTable
263+
.join(
264+
minTable,
265+
(value1, value2) -> value1 - value2)
266+
.toStream()
267+
.filterNot((k, v) -> k.equals("flush"))
268+
.to("dif", Produced.with(stringSerde, intSerde));
269+
270+
// avg
271+
sumTable
272+
.join(
273+
cntTable,
274+
(value1, value2) -> (double) value1 / (double) value2)
275+
.toStream()
276+
.filterNot((k, v) -> k.equals("flush"))
277+
.to("avg", Produced.with(stringSerde, doubleSerde));
278+
279+
// test repartition
280+
final Agg agg = new Agg();
281+
cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
282+
.aggregate(agg.init(), agg.adder(), agg.remover(),
283+
Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
284+
.withKeySerde(Serdes.String())
285+
.withValueSerde(Serdes.Long()))
286+
.toStream()
287+
.to("tagg", Produced.with(stringSerde, longSerde));
288+
289+
return builder.build();
290+
}
291+
292+
private static void streamify(final KTable<Windowed<String>, Integer> windowedTable, final String topic) {
293+
windowedTable
294+
.toStream()
295+
.filterNot((k, v) -> k.key().equals("flush"))
296+
.map((key, value) -> new KeyValue<>(key.toString(), value))
297+
.to(topic, Produced.with(stringSerde, intSerde));
298+
}
299+
}

0 commit comments

Comments
 (0)