Skip to content

Commit 97ac765

Browse files
committed
NIFI-14448 nifi-cdc: Fix integer serialization to respect SQL UNSIGNED type semantics
1 parent 83b186f commit 97ac765

File tree

8 files changed

+318
-58
lines changed

8 files changed

+318
-58
lines changed

nifi-extension-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/ColumnDefinition.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,16 @@
2424
public class ColumnDefinition {
2525

2626
private int type;
27+
private Boolean isSigned;
2728
private String name = "";
2829

29-
public ColumnDefinition(int type) {
30+
public ColumnDefinition(Boolean isSigned, int type) {
3031
this.type = type;
32+
this.isSigned = isSigned;
3133
}
3234

33-
public ColumnDefinition(int type, String name) {
34-
this(type);
35+
public ColumnDefinition(Boolean isSigned, int type, String name) {
36+
this(isSigned, type);
3537
this.name = name;
3638
}
3739

@@ -43,6 +45,14 @@ public void setType(int type) {
4345
this.type = type;
4446
}
4547

48+
public Boolean getIsSigned() {
49+
return isSigned;
50+
}
51+
52+
public void setIsSigned(boolean isSigned) {
53+
this.isSigned = isSigned;
54+
}
55+
4656
public String getName() {
4757
return name;
4858
}

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java

+24-16
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,44 @@
1616
*/
1717
package org.apache.nifi.cdc.mysql.event.io;
1818

19+
import com.fasterxml.jackson.core.JsonGenerator;
20+
import org.apache.nifi.cdc.event.ColumnDefinition;
1921
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
2022

2123
import java.io.IOException;
2224
import java.io.Serializable;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.function.Function;
2328

2429
/**
2530
* An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g.
2631
*/
2732
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
2833

29-
protected Object getWritableObject(Integer type, Serializable value) {
34+
private final static Map<Integer, Function<Number, String>> UNSIGNED_SQLTYPE_MAP = new HashMap<>(Map.of(
35+
java.sql.Types.BIGINT, (z) -> Long.toUnsignedString(z.longValue()),
36+
java.sql.Types.INTEGER, (z) -> String.valueOf(Integer.toUnsignedLong(z.intValue())),
37+
java.sql.Types.SMALLINT, (z) -> String.valueOf(Short.toUnsignedInt(z.shortValue())),
38+
java.sql.Types.TINYINT, (z) -> String.valueOf(Byte.toUnsignedInt(z.byteValue()))
39+
));
40+
41+
protected void writeObjectAsValueField(JsonGenerator jg, String fieldName, ColumnDefinition columnDefinition,
42+
Serializable value) throws IOException {
3043
if (value == null) {
31-
return null;
32-
}
33-
if (type == null) {
34-
if (value instanceof byte[]) {
35-
return new String((byte[]) value);
36-
} else if (value instanceof Number) {
37-
return value;
44+
jg.writeNullField(fieldName);
45+
} else if (value instanceof Number) {
46+
if (columnDefinition != null && Boolean.FALSE.equals(columnDefinition.getIsSigned())
47+
&& UNSIGNED_SQLTYPE_MAP.containsKey(columnDefinition.getType())) {
48+
jg.writeFieldName(fieldName);
49+
jg.writeRawValue(UNSIGNED_SQLTYPE_MAP.get(columnDefinition.getType()).apply((Number) value));
3850
} else {
39-
return null;
51+
jg.writeObjectField(fieldName, value);
4052
}
53+
} else if (value instanceof byte[]) {
54+
jg.writeObjectField(fieldName, new String((byte[]) value));
4155
} else {
42-
if (value instanceof byte[]) {
43-
return new String((byte[]) value);
44-
} else if (value instanceof Number) {
45-
return value;
46-
} else {
47-
return value.toString();
48-
}
56+
jg.writeObjectField(fieldName, value.toString());
4957
}
5058
}
5159

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,11 @@ protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet in
7979
jsonGenerator.writeStartObject();
8080
jsonGenerator.writeNumberField("id", i + 1);
8181
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
82-
Integer columnType = null;
8382
if (columnDefinition != null) {
8483
jsonGenerator.writeStringField("name", columnDefinition.getName());
85-
columnType = columnDefinition.getType();
86-
jsonGenerator.writeNumberField("column_type", columnType);
87-
}
88-
if (row[i] == null) {
89-
jsonGenerator.writeNullField("value");
90-
} else {
91-
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
84+
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
9285
}
86+
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]);
9387
jsonGenerator.writeEndObject();
9488
i = includedColumns.nextSetBit(i + 1);
9589
}

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -80,17 +80,11 @@ protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet in
8080
jsonGenerator.writeStartObject();
8181
jsonGenerator.writeNumberField("id", i + 1);
8282
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
83-
Integer columnType = null;
8483
if (columnDefinition != null) {
8584
jsonGenerator.writeStringField("name", columnDefinition.getName());
86-
columnType = columnDefinition.getType();
87-
jsonGenerator.writeNumberField("column_type", columnType);
88-
}
89-
if (row[i] == null) {
90-
jsonGenerator.writeNullField("value");
91-
} else {
92-
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
85+
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
9386
}
87+
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]);
9488
jsonGenerator.writeEndObject();
9589
i = includedColumns.nextSetBit(i + 1);
9690
}

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -82,26 +82,15 @@ protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Ser
8282
jsonGenerator.writeStartObject();
8383
jsonGenerator.writeNumberField("id", i + 1);
8484
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
85-
Integer columnType = null;
8685
if (columnDefinition != null) {
8786
jsonGenerator.writeStringField("name", columnDefinition.getName());
88-
columnType = columnDefinition.getType();
89-
jsonGenerator.writeNumberField("column_type", columnType);
87+
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
9088
}
9189
Serializable[] oldRow = row.getKey();
9290
Serializable[] newRow = row.getValue();
9391

94-
if (oldRow[i] == null) {
95-
jsonGenerator.writeNullField("last_value");
96-
} else {
97-
jsonGenerator.writeObjectField("last_value", getWritableObject(columnType, oldRow[i]));
98-
}
99-
100-
if (newRow[i] == null) {
101-
jsonGenerator.writeNullField("value");
102-
} else {
103-
jsonGenerator.writeObjectField("value", getWritableObject(columnType, newRow[i]));
104-
}
92+
writeObjectAsValueField(jsonGenerator, "last_value", columnDefinition, oldRow[i]);
93+
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, newRow[i]);
10594
jsonGenerator.writeEndObject();
10695
i = includedColumns.nextSetBit(i + 1);
10796
}

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -1253,7 +1253,9 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
12531253
for (int i = 1; i <= numCols; i++) {
12541254
// Use the column label if it exists, otherwise use the column name. We're not doing aliasing here, but it's better practice.
12551255
String columnLabel = rsmd.getColumnLabel(i);
1256-
columnDefinitions.add(new ColumnDefinition(rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i)));
1256+
columnDefinitions.add(new ColumnDefinition(
1257+
rsmd.isSigned(i), rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i)
1258+
));
12571259
}
12581260

12591261
tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), columnDefinitions);

nifi-extension-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java

+62-7
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,77 @@
1717
package org.apache.nifi.cdc.mysql.event.io;
1818

1919

20+
import com.fasterxml.jackson.core.JsonGenerator;
21+
import org.apache.nifi.cdc.event.ColumnDefinition;
2022
import org.junit.jupiter.api.Test;
2123

24+
import java.io.IOException;
2225
import java.sql.Types;
2326

24-
import static org.junit.jupiter.api.Assertions.assertEquals;
25-
import static org.junit.jupiter.api.Assertions.assertNull;
27+
import static org.mockito.Mockito.mock;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2630

2731
class TestInsertRowsWriter {
2832

2933
@Test
30-
public void testGetWritableObject() {
34+
public void testGetWritableObject() throws IOException {
3135
InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
32-
assertNull(insertRowsWriter.getWritableObject(null, null));
33-
assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null));
34-
assertEquals((byte) 1, insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1));
35-
assertEquals("Hello", insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
36+
try (JsonGenerator g = mock(JsonGenerator.class)) {
37+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", null, null); //null
38+
verify(g).writeNullField("fieldName");
39+
verifyNoMoreInteractions(g);
40+
}
41+
try (JsonGenerator g = mock(JsonGenerator.class)) {
42+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.INTEGER), null); //null
43+
verify(g).writeNullField("fieldName");
44+
verifyNoMoreInteractions(g);
45+
}
46+
try (JsonGenerator g = mock(JsonGenerator.class)) {
47+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.BIGINT), (int) -77); //null
48+
verify(g).writeFieldName("fieldName");
49+
verify(g).writeRawValue("18446744073709551539");
50+
verifyNoMoreInteractions(g);
51+
}
52+
try (JsonGenerator g = mock(JsonGenerator.class)) {
53+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.INTEGER), (int) -77); //null
54+
verify(g).writeFieldName("fieldName");
55+
verify(g).writeRawValue("4294967219");
56+
verifyNoMoreInteractions(g);
57+
}
58+
try (JsonGenerator g = mock(JsonGenerator.class)) {
59+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.SMALLINT), (int) -77); //null
60+
verify(g).writeFieldName("fieldName");
61+
verify(g).writeRawValue("65459");
62+
verifyNoMoreInteractions(g);
63+
}
64+
try (JsonGenerator g = mock(JsonGenerator.class)) {
65+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.TINYINT), (int) -77); //null
66+
verify(g).writeFieldName("fieldName");
67+
verify(g).writeRawValue("179");
68+
verifyNoMoreInteractions(g);
69+
}
70+
try (JsonGenerator g = mock(JsonGenerator.class)) {
71+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.TINYINT), (int) -77); //null
72+
verify(g).writeObjectField("fieldName", -77);
73+
verifyNoMoreInteractions(g);
74+
}
75+
try (JsonGenerator g = mock(JsonGenerator.class)) {
76+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.TINYINT), (byte) 1); //null
77+
verify(g).writeFieldName("fieldName");
78+
verify(g).writeRawValue( "1");
79+
verifyNoMoreInteractions(g);
80+
}
81+
try (JsonGenerator g = mock(JsonGenerator.class)) {
82+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.TINYINT), (byte) 1); //null
83+
verify(g).writeObjectField("fieldName", (byte) 1);
84+
verifyNoMoreInteractions(g);
85+
}
86+
try (JsonGenerator g = mock(JsonGenerator.class)) {
87+
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(null, Types.VARCHAR), "Hello".getBytes()); //null
88+
verify(g).writeObjectField("fieldName", "Hello");
89+
verifyNoMoreInteractions(g);
90+
}
3691
}
3792

3893
}

0 commit comments

Comments
 (0)