Skip to content

Commit d35ab4d

Browse files
authored
KAFKA-18713: Fix FK Left-Join result race condition (#19005)
When a row in a FK-join left table is updated, we should send a "delete subscription with no response" for the old FK to the right hand side, to avoid getting two responses from the right hand side. Only the "new subscription" for the new FK should request a response. If two responses are requested, there is a race condition for which both responses could be processed in the wrong order, leading to an incorrect join result. This PR fixes the "delete subscription" case accordingly, to no request a response. Reviewers: Matthias J. Sax <[email protected]>
1 parent 5eb4e11 commit d35ab4d

File tree

3 files changed

+114
-31
lines changed

3 files changed

+114
-31
lines changed

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java

+109-26
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.serialization.StringSerializer;
2222
import org.apache.kafka.common.utils.Bytes;
2323
import org.apache.kafka.common.utils.MockTime;
24+
import org.apache.kafka.streams.KeyValue;
2425
import org.apache.kafka.streams.StreamsBuilder;
2526
import org.apache.kafka.streams.StreamsConfig;
2627
import org.apache.kafka.streams.TestInputTopic;
@@ -60,10 +61,13 @@
6061
import java.util.stream.Collectors;
6162
import java.util.stream.Stream;
6263

64+
import static java.util.Arrays.asList;
65+
import static java.util.Collections.emptyList;
6366
import static java.util.Collections.emptyMap;
6467
import static org.apache.kafka.common.utils.Utils.mkEntry;
6568
import static org.apache.kafka.common.utils.Utils.mkMap;
6669
import static org.apache.kafka.common.utils.Utils.mkProperties;
70+
import static org.hamcrest.CoreMatchers.hasItem;
6771
import static org.hamcrest.CoreMatchers.is;
6872
import static org.hamcrest.MatcherAssert.assertThat;
6973

@@ -183,13 +187,13 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
183187
right.pipeInput("rhs3", "rhsValue3", baseTimestamp + 2); // this unreferenced FK won't show up in any results
184188

185189
assertThat(
186-
outputTopic.readKeyValuesToMap(),
187-
is(emptyMap())
190+
outputTopic.readKeyValuesToList(),
191+
is(emptyList())
188192
);
189193
if (rejoin) {
190194
assertThat(
191-
rejoinOutputTopic.readKeyValuesToMap(),
192-
is(emptyMap())
195+
rejoinOutputTopic.readKeyValuesToList(),
196+
is(emptyList())
193197
);
194198
}
195199
if (materialized) {
@@ -203,27 +207,27 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
203207
left.pipeInput("lhs2", "lhsValue2|rhs2", baseTimestamp + 4);
204208

205209
{
206-
final Map<String, String> expected = mkMap(
207-
mkEntry("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
208-
mkEntry("lhs2", "(lhsValue2|rhs2,rhsValue2)")
210+
final List<KeyValue<String, String>> expected = Arrays.asList(
211+
KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)"),
212+
KeyValue.pair("lhs2", "(lhsValue2|rhs2,rhsValue2)")
209213
);
210214
assertThat(
211-
outputTopic.readKeyValuesToMap(),
215+
outputTopic.readKeyValuesToList(),
212216
is(expected)
213217
);
214218
if (rejoin) {
215219
assertThat(
216-
rejoinOutputTopic.readKeyValuesToMap(),
217-
is(mkMap(
218-
mkEntry("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
219-
mkEntry("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
220+
rejoinOutputTopic.readKeyValuesToList(),
221+
is(asList(
222+
KeyValue.pair("lhs1", "rejoin((lhsValue1|rhs1,rhsValue1),lhsValue1|rhs1)"),
223+
KeyValue.pair("lhs2", "rejoin((lhsValue2|rhs2,rhsValue2),lhsValue2|rhs2)")
220224
))
221225
);
222226
}
223227
if (materialized) {
224228
assertThat(
225229
asMap(store),
226-
is(expected)
230+
is(expected.stream().collect(Collectors.toMap(kv -> kv.key, kv -> kv.value)))
227231
);
228232
}
229233
}
@@ -232,16 +236,16 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
232236
left.pipeInput("lhs3", "lhsValue3|rhs1", baseTimestamp + 5);
233237
{
234238
assertThat(
235-
outputTopic.readKeyValuesToMap(),
236-
is(mkMap(
237-
mkEntry("lhs3", "(lhsValue3|rhs1,rhsValue1)")
239+
outputTopic.readKeyValuesToList(),
240+
is(List.of(
241+
new KeyValue<>("lhs3", "(lhsValue3|rhs1,rhsValue1)")
238242
))
239243
);
240244
if (rejoin) {
241245
assertThat(
242-
rejoinOutputTopic.readKeyValuesToMap(),
243-
is(mkMap(
244-
mkEntry("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
246+
rejoinOutputTopic.readKeyValuesToList(),
247+
is(List.of(
248+
new KeyValue<>("lhs3", "rejoin((lhsValue3|rhs1,rhsValue1),lhsValue3|rhs1)")
245249
))
246250
);
247251
}
@@ -256,21 +260,21 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
256260
);
257261
}
258262
}
263+
259264
// Now delete one LHS entity such that one delete is propagated down to the output.
260265

261266
left.pipeInput("lhs1", null, baseTimestamp + 6);
262267
assertThat(
263-
outputTopic.readKeyValuesToMap(),
264-
is(mkMap(
265-
mkEntry("lhs1", null)
268+
outputTopic.readKeyValuesToList(),
269+
is(List.of(
270+
new KeyValue<>("lhs1", null)
266271
))
267272
);
268273
if (rejoin) {
269274
assertThat(
270-
rejoinOutputTopic.readKeyValuesToMap(),
271-
is(mkMap(
272-
mkEntry("lhs1", null)
273-
))
275+
rejoinOutputTopic.readKeyValuesToList(),
276+
hasItem(
277+
KeyValue.pair("lhs1", null))
274278
);
275279
}
276280
if (materialized) {
@@ -285,6 +289,79 @@ public void doJoinFromLeftThenDeleteLeftEntity(final boolean leftJoin,
285289
}
286290
}
287291

292+
@ParameterizedTest
293+
@MethodSource("testCases")
294+
public void doJoinFromLeftThenUpdateFkThenRevertBack(final boolean leftJoin,
295+
final String optimization,
296+
final boolean materialized,
297+
final boolean rejoin,
298+
final boolean leftVersioned,
299+
final boolean rightVersioned) {
300+
final Properties streamsConfig = getStreamsProperties(optimization);
301+
final Topology topology = getTopology(streamsConfig, materialized ? "store" : null, leftJoin, rejoin, leftVersioned, rightVersioned);
302+
try (final TopologyTestDriver driver = new TopologyTestDriver(topology, streamsConfig)) {
303+
final TestInputTopic<String, String> right = driver.createInputTopic(RIGHT_TABLE, new StringSerializer(), new StringSerializer());
304+
final TestInputTopic<String, String> left = driver.createInputTopic(LEFT_TABLE, new StringSerializer(), new StringSerializer());
305+
final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
306+
final TestOutputTopic<String, String> rejoinOutputTopic = rejoin ? driver.createOutputTopic(REJOIN_OUTPUT, new StringDeserializer(), new StringDeserializer()) : null;
307+
final KeyValueStore<String, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore("store");
308+
309+
// Pre-populate the RHS records. This test is all about what happens when we add/remove LHS records
310+
right.pipeInput("rhs1", "rhsValue1", baseTimestamp);
311+
right.pipeInput("rhs2", "rhsValue2", baseTimestamp + 1);
312+
313+
assertThat(
314+
outputTopic.readKeyValuesToList(),
315+
is(emptyList())
316+
);
317+
if (rejoin) {
318+
assertThat(
319+
rejoinOutputTopic.readKeyValuesToList(),
320+
is(emptyList())
321+
);
322+
}
323+
if (materialized) {
324+
assertThat(
325+
asMap(store),
326+
is(emptyMap())
327+
);
328+
}
329+
330+
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 3);
331+
332+
{
333+
final List<KeyValue<String, String>> expected = asList(
334+
KeyValue.pair("lhs1", "(lhsValue1|rhs1,rhsValue1)")
335+
);
336+
assertThat(
337+
outputTopic.readKeyValuesToList(),
338+
is(expected)
339+
);
340+
}
341+
342+
// Add another reference to an existing FK
343+
left.pipeInput("lhs1", "lhsValue1|rhs2", baseTimestamp + 5);
344+
{
345+
assertThat(
346+
outputTopic.readKeyValuesToList(),
347+
is(List.of(
348+
new KeyValue<>("lhs1", "(lhsValue1|rhs2,rhsValue2)")
349+
))
350+
);
351+
}
352+
353+
// Now revert back the foreign key to earlier reference
354+
355+
left.pipeInput("lhs1", "lhsValue1|rhs1", baseTimestamp + 6);
356+
assertThat(
357+
outputTopic.readKeyValuesToList(),
358+
is(List.of(
359+
new KeyValue<>("lhs1", "(lhsValue1|rhs1,rhsValue1)")
360+
))
361+
);
362+
}
363+
}
364+
288365
@ParameterizedTest
289366
@MethodSource("testCases")
290367
public void doJoinFromRightThenDeleteRightEntity(final boolean leftJoin,
@@ -795,6 +872,12 @@ protected static Map<String, String> asMap(final KeyValueStore<String, ValueAndT
795872
return result;
796873
}
797874

875+
protected static List<KeyValue<String, String>> makeList(final KeyValueStore<String, ValueAndTimestamp<String>> store) {
876+
final List<KeyValue<String, String>> result = new LinkedList<>();
877+
store.all().forEachRemaining(ele -> result.add(new KeyValue<>(ele.key, ele.value.value())));
878+
return result;
879+
}
880+
798881
protected static Topology getTopology(final Properties streamsConfig,
799882
final String queryableStoreName,
800883
final boolean leftJoin,

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
132132
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
133133
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
134134
if (oldForeignKey != null && !Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
135-
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
135+
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
136136
}
137137
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
138138
} else if (record.value().newValue != null) {

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void leftJoinShouldPropagateChangeOfFKFromNonNullToNullValue() {
149149
assertThat(context.forwarded().size(), greaterThan(0));
150150
assertThat(
151151
context.forwarded().get(0).record(),
152-
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
152+
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
153153
);
154154
}
155155

@@ -198,7 +198,7 @@ public void leftJoinShouldPropagateDeletionOfAPrimaryKey() {
198198
assertThat(context.forwarded().size(), greaterThan(0));
199199
assertThat(
200200
context.forwarded().get(0).record(),
201-
is(new Record<>(fk1, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
201+
is(new Record<>(fk1, new SubscriptionWrapper<>(null, DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
202202
);
203203
}
204204

@@ -438,7 +438,7 @@ public void biFunctionLeftJoinShouldPropagateChangeOfFKFromNonNullToNullValue()
438438
assertThat(context.forwarded().size(), greaterThan(0));
439439
assertThat(
440440
context.forwarded().get(0).record(),
441-
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
441+
is(new Record<>(compositeKey, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
442442
);
443443
}
444444

@@ -491,7 +491,7 @@ public void biFunctionLeftJoinShouldPropagateDeletionOfAPrimaryKey() {
491491
assertThat(context.forwarded().size(), greaterThan(0));
492492
assertThat(
493493
context.forwarded().get(0).record(),
494-
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
494+
is(new Record<>(compositeKey, new SubscriptionWrapper<>(null, DELETE_KEY_NO_PROPAGATE, pk, 0), 0))
495495
);
496496
}
497497

0 commit comments

Comments
 (0)