Skip to content

Commit d0c7ebb

Browse files
markap14pvillard31
authored andcommitted
NIFI-11671: Allow FlowFile attributes to be referenced in SQL for JoinEnrichment
Signed-off-by: Pierre Villard <[email protected]> This closes #8059.
1 parent e68c384 commit d0c7ebb

File tree

5 files changed

+57
-9
lines changed

5 files changed

+57
-9
lines changed

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoinEnrichment.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.nifi.annotation.lifecycle.OnStopped;
3131
import org.apache.nifi.components.AllowableValue;
3232
import org.apache.nifi.components.PropertyDescriptor;
33+
import org.apache.nifi.components.PropertyValue;
3334
import org.apache.nifi.context.PropertyContext;
3435
import org.apache.nifi.expression.ExpressionLanguageScope;
3536
import org.apache.nifi.flowfile.FlowFile;
@@ -372,7 +373,7 @@ protected BinProcessingResult processBin(final Bin unmodifiableBin, final Proces
372373
final WriteResult writeResult;
373374
final String mimeType;
374375
FlowFile output;
375-
try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, session, writerSchema)) {
376+
try (final RecordJoinResult result = joinStrategy.join(originalInput, enrichmentInput, combinedAttributes, session, writerSchema)) {
376377
// Create output FlowFile
377378
output = session.create(flowFiles);
378379

@@ -426,10 +427,10 @@ private BinProcessingResult transferFailure(final List<FlowFile> flowFiles, fina
426427
private RecordJoinStrategy getJoinStrategy(final ProcessContext context, final Map<String, String> attributes) {
427428
final String strategyName = context.getProperty(JOIN_STRATEGY).getValue();
428429
if (strategyName.equalsIgnoreCase(JOIN_SQL.getValue())) {
429-
final String sql = context.getProperty(SQL).getValue();
430+
final PropertyValue sqlPropertyValue = context.getProperty(SQL);
430431
final int defaultPrecision = context.getProperty(DEFAULT_PRECISION).evaluateAttributeExpressions(attributes).asInteger();
431432
final int defaultScale = context.getProperty(DEFAULT_SCALE).evaluateAttributeExpressions(attributes).asInteger();
432-
return new SqlJoinStrategy(sqlJoinCache, sql, getLogger(), defaultPrecision, defaultScale);
433+
return new SqlJoinStrategy(sqlJoinCache, sqlPropertyValue, getLogger(), defaultPrecision, defaultScale);
433434
} else if (strategyName.equalsIgnoreCase(JOIN_WRAPPER.getValue())) {
434435
return new WrapperJoinStrategy(getLogger());
435436
} else if (strategyName.equalsIgnoreCase(JOIN_INSERT_ENRICHMENT_FIELDS.getValue())) {

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/IndexCorrelatedJoinStrategy.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.io.IOException;
3030
import java.io.InputStream;
31+
import java.util.Map;
3132

3233
public abstract class IndexCorrelatedJoinStrategy implements RecordJoinStrategy {
3334
private final ComponentLog logger;
@@ -41,7 +42,9 @@ protected ComponentLog getLogger() {
4142
}
4243

4344
@Override
44-
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema writerSchema) throws Exception {
45+
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map<String, String> combinedAttributes,
46+
final ProcessSession session,final RecordSchema writerSchema) throws Exception {
47+
4548
final FlowFile originalFlowFile = originalInput.getFlowFile();
4649
final FlowFile enrichmentFlowFile = enrichmentInput.getFlowFile();
4750

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/RecordJoinStrategy.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.nifi.processor.ProcessSession;
2121
import org.apache.nifi.serialization.record.RecordSchema;
2222

23+
import java.util.Map;
24+
2325
public interface RecordJoinStrategy {
24-
RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, ProcessSession session, RecordSchema outputSchema) throws Exception;
26+
RecordJoinResult join(RecordJoinInput originalInput, RecordJoinInput enrichmentInput, Map<String, String> combinedAttributes, ProcessSession session, RecordSchema outputSchema) throws Exception;
2527
}

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/enrichment/SqlJoinStrategy.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.nifi.processors.standard.enrichment;
1919

20+
import org.apache.nifi.components.PropertyValue;
2021
import org.apache.nifi.logging.ComponentLog;
2122
import org.apache.nifi.processor.ProcessSession;
2223
import org.apache.nifi.queryrecord.RecordDataSource;
@@ -28,27 +29,31 @@
2829
import java.sql.PreparedStatement;
2930
import java.sql.ResultSet;
3031
import java.sql.SQLException;
32+
import java.util.Map;
3133

3234
public class SqlJoinStrategy implements RecordJoinStrategy {
3335
public static final String ENRICHMENT_TABLE_NAME = "ENRICHMENT";
3436
public static final String ORIGINAL_TABLE_NAME = "ORIGINAL";
3537

3638
private final SqlJoinCache cache;
3739
private final ComponentLog logger;
38-
private final String sql;
40+
private final PropertyValue sqlPropertyValue;
3941
private final int defaultPrecision;
4042
private final int defaultScale;
4143

42-
public SqlJoinStrategy(final SqlJoinCache cache, final String sql, final ComponentLog logger, final int defaultPrecision, final int defaultScale) {
44+
public SqlJoinStrategy(final SqlJoinCache cache, final PropertyValue sqlPropertyValue, final ComponentLog logger, final int defaultPrecision, final int defaultScale) {
4345
this.cache = cache;
44-
this.sql = sql;
46+
this.sqlPropertyValue = sqlPropertyValue;
4547
this.logger = logger;
4648
this.defaultPrecision = defaultPrecision;
4749
this.defaultScale = defaultScale;
4850
}
4951

5052
@Override
51-
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final ProcessSession session, final RecordSchema outputSchema) throws SQLException {
53+
public RecordJoinResult join(final RecordJoinInput originalInput, final RecordJoinInput enrichmentInput, final Map<String, String> combinedAttributes,
54+
final ProcessSession session, final RecordSchema outputSchema) throws SQLException {
55+
56+
final String sql = sqlPropertyValue.evaluateAttributeExpressions(combinedAttributes).getValue();
5257
final SqlJoinCalciteParameters calciteParameters = cache.getCalciteParameters(sql, outputSchema, originalInput, enrichmentInput);
5358

5459
final NiFiTable originalTable = calciteParameters.getDatabase().getTable(ORIGINAL_TABLE_NAME);

nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestJoinEnrichment.java

+37
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,43 @@ public void testSimpleSqlJoin() throws InitializationException {
154154
assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType());
155155
}
156156

157+
@Test
158+
public void testELReferencingAttribute() throws InitializationException {
159+
final TestRunner runner = TestRunners.newTestRunner(new JoinEnrichment());
160+
161+
final ArrayListRecordWriter writer = setupCsvServices(runner);
162+
runner.setProperty(JoinEnrichment.JOIN_STRATEGY, JoinEnrichment.JOIN_SQL);
163+
runner.setProperty(JoinEnrichment.SQL, "SELECT original.id, enrichment.${desired_enrichment_column} FROM original JOIN enrichment ON original.id = enrichment.id");
164+
165+
final Map<String, String> originalAttributes = new HashMap<>();
166+
originalAttributes.put("enrichment.group.id", "abc");
167+
originalAttributes.put("enrichment.role", "ORIGINAL");
168+
runner.enqueue("id\n5", originalAttributes);
169+
170+
final Map<String, String> enrichmentAttributes = new HashMap<>();
171+
enrichmentAttributes.put("enrichment.group.id", "abc");
172+
enrichmentAttributes.put("enrichment.role", "ENRICHMENT");
173+
enrichmentAttributes.put("desired_enrichment_column", "name");
174+
runner.enqueue("id,name\n5,John Doe", enrichmentAttributes);
175+
176+
runner.run();
177+
178+
runner.assertTransferCount(JoinEnrichment.REL_JOINED, 1);
179+
runner.assertTransferCount(JoinEnrichment.REL_ORIGINAL, 2);
180+
181+
final List<Record> written = writer.getRecordsWritten();
182+
assertEquals(1, written.size());
183+
184+
final Record outRecord = written.get(0);
185+
assertEquals(5, outRecord.getAsInt("id"));
186+
assertEquals("John Doe", outRecord.getValue("name"));
187+
188+
final RecordSchema schema = outRecord.getSchema();
189+
assertEquals(RecordFieldType.STRING, schema.getField("id").get().getDataType().getFieldType());
190+
assertEquals(RecordFieldType.STRING, schema.getField("name").get().getDataType().getFieldType());
191+
}
192+
193+
157194
// Tests that the LEFT OUTER JOIN example in the Additional Details works as expected
158195
@Test
159196
public void testLeftOuterJoin() throws InitializationException, IOException, SchemaNotFoundException, MalformedRecordException {

0 commit comments

Comments
 (0)