Skip to content

Commit b963e58

Browse files
authored
KAFKA-16407: Fix foreign key INNER join on change of FK from/to a null value (#19303)
Fixes both KAFKA-16407 and KAFKA-16434. Summary of existing issues: - We are ignoring new left record when its previous FK value is null - We do not unset foreign key join result when FK becomes null Reviewers: Matthias J. Sax <[email protected]>
1 parent 3f8e86a commit b963e58

File tree

2 files changed

+75
-20
lines changed

2 files changed

+75
-20
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java

+14-17
Original file line numberDiff line numberDiff line change
@@ -143,28 +143,25 @@ private void leftJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
143143

144144
private void defaultJoinInstructions(final Record<KLeft, Change<VLeft>> record) {
145145
if (record.value().oldValue != null) {
146-
final KRight oldForeignKey = record.value().oldValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().oldValue);
147-
if (oldForeignKey == null) {
146+
final KRight oldForeignKey = foreignKeyExtractor.extract(record.key(), record.value().oldValue);
147+
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
148+
149+
if (oldForeignKey == null && newForeignKey == null) {
148150
logSkippedRecordDueToNullForeignKey();
149-
return;
150-
}
151-
if (record.value().newValue != null) {
152-
final KRight newForeignKey = record.value().newValue == null ? null : foreignKeyExtractor.extract(record.key(), record.value().newValue);
153-
if (newForeignKey == null) {
154-
logSkippedRecordDueToNullForeignKey();
155-
return;
156-
}
157-
if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
158-
//Different Foreign Key - delete the old key value and propagate the new one.
159-
//Delete it from the oldKey's state store
160-
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
161-
}
151+
} else if (oldForeignKey == null) {
152+
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
153+
} else if (newForeignKey == null) {
154+
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
155+
} else if (!Arrays.equals(serialize(newForeignKey), serialize(oldForeignKey))) {
156+
//Different Foreign Key - delete the old key value and propagate the new one.
157+
//Delete it from the oldKey's state store
158+
forward(record, oldForeignKey, DELETE_KEY_NO_PROPAGATE);
162159
//Add to the newKey's state store. Additionally, propagate null if no FK is found there,
163160
//since we must "unset" any output set by the previous FK-join. This is true for both INNER
164161
//and LEFT join.
165162
forward(record, newForeignKey, PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE);
166-
} else {
167-
forward(record, oldForeignKey, DELETE_KEY_AND_PROPAGATE);
163+
} else { // unchanged FK
164+
forward(record, newForeignKey, PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE);
168165
}
169166
} else if (record.value().newValue != null) {
170167
final KRight newForeignKey = foreignKeyExtractor.extract(record.key(), record.value().newValue);

streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplierTest.java

+61-3
Original file line numberDiff line numberDiff line change
@@ -295,10 +295,57 @@ public void innerJoinShouldPropagateNothingWhenOldAndNewFKIsNull() {
295295
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
296296

297297
assertThat(context.forwarded(), empty());
298+
}
298299

299-
// test dropped-records sensors
300-
assertEquals(1.0, getDroppedRecordsTotalMetric(context));
301-
assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
300+
@Test
301+
public void innerJoinShouldPropagateChangeFromNullFKToNonNullFK() {
302+
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
303+
innerJoinProcessor.init(context);
304+
context.setRecordMetadata("topic", 0, 0);
305+
306+
final LeftValue leftRecordValue = new LeftValue(fk1);
307+
308+
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(null)), 0));
309+
310+
assertThat(context.forwarded().size(), is(1));
311+
assertThat(
312+
context.forwarded().get(0).record(),
313+
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
314+
);
315+
}
316+
317+
@Test
318+
public void innerJoinShouldDeleteAndPropagateChangeFromNonNullFKToNullFK() {
319+
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
320+
innerJoinProcessor.init(context);
321+
context.setRecordMetadata("topic", 0, 0);
322+
323+
final LeftValue leftRecordValue = new LeftValue(null);
324+
325+
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, new LeftValue(fk1)), 0));
326+
327+
assertThat(context.forwarded().size(), is(1));
328+
assertThat(
329+
context.forwarded().get(0).record(),
330+
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), DELETE_KEY_AND_PROPAGATE, pk, 0), 0))
331+
);
332+
}
333+
334+
@Test
335+
public void innerJoinShouldPropagateUnchangedFKOnlyIfFKExistsInRightTable() {
336+
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
337+
innerJoinProcessor.init(context);
338+
context.setRecordMetadata("topic", 0, 0);
339+
340+
final LeftValue leftRecordValue = new LeftValue(fk1);
341+
342+
innerJoinProcessor.process(new Record<>(pk, new Change<>(leftRecordValue, leftRecordValue), 0));
343+
344+
assertThat(context.forwarded().size(), is(1));
345+
assertThat(
346+
context.forwarded().get(0).record(),
347+
is(new Record<>(fk1, new SubscriptionWrapper<>(hash(leftRecordValue), PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE, pk, 0), 0))
348+
);
302349
}
303350

304351
@Test
@@ -316,6 +363,17 @@ public void innerJoinShouldPropagateDeletionOfPrimaryKey() {
316363
);
317364
}
318365

366+
@Test
367+
public void innerJoinShouldNotPropagateDeletionOfPrimaryKeyWhenPreviousFKIsNull() {
368+
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();
369+
innerJoinProcessor.init(context);
370+
context.setRecordMetadata("topic", 0, 0);
371+
372+
innerJoinProcessor.process(new Record<>(pk, new Change<>(null, new LeftValue(null)), 0));
373+
374+
assertThat(context.forwarded(), empty());
375+
}
376+
319377
@Test
320378
public void innerJoinShouldPropagateNothingWhenOldAndNewLeftValueIsNull() {
321379
final MockInternalProcessorContext<String, SubscriptionWrapper<String>> context = new MockInternalProcessorContext<>();

0 commit comments

Comments
 (0)