Skip to content

Commit 6af849f

Browse files
authored
KAFKA-7952: use in memory stores for KTable test (#19218)
Switch to in-memory story for KTable-KTable left-join test. Reviewers: Matthias J. Sax <[email protected]>
1 parent ef73fb9 commit 6af849f

File tree

1 file changed

+19
-29
lines changed

1 file changed

+19
-29
lines changed

streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java

+19-29
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import org.apache.kafka.streams.KeyValue;
2222
import org.apache.kafka.streams.KeyValueTimestamp;
2323
import org.apache.kafka.streams.StreamsBuilder;
24+
import org.apache.kafka.streams.StreamsConfig;
2425
import org.apache.kafka.streams.TestInputTopic;
2526
import org.apache.kafka.streams.TestOutputTopic;
2627
import org.apache.kafka.streams.Topology;
28+
import org.apache.kafka.streams.TopologyConfig;
2729
import org.apache.kafka.streams.TopologyTestDriver;
2830
import org.apache.kafka.streams.TopologyTestDriverWrapper;
2931
import org.apache.kafka.streams.TopologyWrapper;
@@ -35,6 +37,7 @@
3537
import org.apache.kafka.streams.processor.api.MockProcessorContext;
3638
import org.apache.kafka.streams.processor.api.Processor;
3739
import org.apache.kafka.streams.processor.api.Record;
40+
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;
3841
import org.apache.kafka.streams.state.Stores;
3942
import org.apache.kafka.streams.test.TestRecord;
4043
import org.apache.kafka.test.MockApiProcessor;
@@ -70,9 +73,14 @@ public class KTableKTableLeftJoinTest {
7073
private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
7174
private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
7275

76+
private StreamsBuilder createStreamBuilderInMemory() {
77+
props.put(StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG, BuiltInDslStoreSuppliers.InMemoryDslStoreSuppliers.class.getName());
78+
return new StreamsBuilder(new TopologyConfig(new StreamsConfig(props)));
79+
}
80+
7381
@Test
7482
public void testJoin() {
75-
final StreamsBuilder builder = new StreamsBuilder();
83+
final StreamsBuilder builder = createStreamBuilderInMemory();
7684

7785
final int[] expectedKeys = new int[] {0, 1, 2, 3};
7886

@@ -193,7 +201,7 @@ public void testJoin() {
193201

194202
@Test
195203
public void testNotSendingOldValue() {
196-
final StreamsBuilder builder = new StreamsBuilder();
204+
final StreamsBuilder builder = createStreamBuilderInMemory();
197205

198206
final int[] expectedKeys = new int[] {0, 1, 2, 3};
199207

@@ -309,7 +317,7 @@ public void testNotSendingOldValue() {
309317

310318
@Test
311319
public void testSendingOldValue() {
312-
final StreamsBuilder builder = new StreamsBuilder();
320+
final StreamsBuilder builder = createStreamBuilderInMemory();
313321

314322
final int[] expectedKeys = new int[] {0, 1, 2, 3};
315323

@@ -443,7 +451,7 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
443451
final String tableSix = "tableSix";
444452
final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
445453

446-
final StreamsBuilder builder = new StreamsBuilder();
454+
final StreamsBuilder builder = createStreamBuilderInMemory();
447455
final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
448456
final KTable<Long, String> aggTable = builder
449457
.table(agg, consumed, Materialized.as(Stores.inMemoryKeyValueStore("agg-base-store")))
@@ -453,30 +461,12 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
453461
MockReducer.STRING_ADDER,
454462
Materialized.as(Stores.inMemoryKeyValueStore("agg-store")));
455463

456-
final KTable<Long, String> one = builder.table(
457-
tableOne,
458-
consumed,
459-
Materialized.as(Stores.inMemoryKeyValueStore("tableOne-base-store")));
460-
final KTable<Long, String> two = builder.table(
461-
tableTwo,
462-
consumed,
463-
Materialized.as(Stores.inMemoryKeyValueStore("tableTwo-base-store")));
464-
final KTable<Long, String> three = builder.table(
465-
tableThree,
466-
consumed,
467-
Materialized.as(Stores.inMemoryKeyValueStore("tableThree-base-store")));
468-
final KTable<Long, String> four = builder.table(
469-
tableFour,
470-
consumed,
471-
Materialized.as(Stores.inMemoryKeyValueStore("tableFour-base-store")));
472-
final KTable<Long, String> five = builder.table(
473-
tableFive,
474-
consumed,
475-
Materialized.as(Stores.inMemoryKeyValueStore("tableFive-base-store")));
476-
final KTable<Long, String> six = builder.table(
477-
tableSix,
478-
consumed,
479-
Materialized.as(Stores.inMemoryKeyValueStore("tableSix-base-store")));
464+
final KTable<Long, String> one = builder.table(tableOne, consumed);
465+
final KTable<Long, String> two = builder.table(tableTwo, consumed);
466+
final KTable<Long, String> three = builder.table(tableThree, consumed);
467+
final KTable<Long, String> four = builder.table(tableFour, consumed);
468+
final KTable<Long, String> five = builder.table(tableFive, consumed);
469+
final KTable<Long, String> six = builder.table(tableSix, consumed);
480470

481471
final ValueMapper<String, String> mapper = value -> value.toUpperCase(Locale.ROOT);
482472

@@ -515,7 +505,7 @@ public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
515505

516506
@Test
517507
public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
518-
final StreamsBuilder builder = new StreamsBuilder();
508+
final StreamsBuilder builder = createStreamBuilderInMemory();
519509

520510
@SuppressWarnings("unchecked")
521511
final Processor<String, Change<String>, String, Change<Object>> join = new KTableKTableLeftJoin<>(

0 commit comments

Comments
 (0)