Skip to content

NIFI-14448 nifi-cdc: Fix integer serialization to respect SQL UNSIGNED type semantics #9857

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,16 @@
public class ColumnDefinition {

private int type;
private Boolean isSigned;
private String name = "";

public ColumnDefinition(int type) {
public ColumnDefinition(Boolean isSigned, int type) {
this.type = type;
this.isSigned = isSigned;
}

public ColumnDefinition(int type, String name) {
this(type);
public ColumnDefinition(Boolean isSigned, int type, String name) {
this(isSigned, type);
this.name = name;
}

Expand All @@ -43,6 +45,14 @@ public void setType(int type) {
this.type = type;
}

public Boolean getIsSigned() {
return isSigned;
}

public void setIsSigned(boolean isSigned) {
this.isSigned = isSigned;
}

public String getName() {
return name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,44 @@
*/
package org.apache.nifi.cdc.mysql.event.io;

import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

/**
* An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g.
*/
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {

protected Object getWritableObject(Integer type, Serializable value) {
private final static Map<Integer, Function<Number, String>> UNSIGNED_SQLTYPE_MAP = new HashMap<>(Map.of(
java.sql.Types.BIGINT, (z) -> Long.toUnsignedString(z.longValue()),
java.sql.Types.INTEGER, (z) -> String.valueOf(Integer.toUnsignedLong(z.intValue())),
java.sql.Types.SMALLINT, (z) -> String.valueOf(Short.toUnsignedInt(z.shortValue())),
java.sql.Types.TINYINT, (z) -> String.valueOf(Byte.toUnsignedInt(z.byteValue()))
));

protected void writeObjectAsValueField(JsonGenerator jg, String fieldName, ColumnDefinition columnDefinition,
Serializable value) throws IOException {
if (value == null) {
return null;
}
if (type == null) {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
jg.writeNullField(fieldName);
} else if (value instanceof Number) {
if (columnDefinition != null && Boolean.FALSE.equals(columnDefinition.getIsSigned())
&& UNSIGNED_SQLTYPE_MAP.containsKey(columnDefinition.getType())) {
jg.writeFieldName(fieldName);
jg.writeRawValue(UNSIGNED_SQLTYPE_MAP.get(columnDefinition.getType()).apply((Number) value));
} else {
return null;
jg.writeObjectField(fieldName, value);
}
} else if (value instanceof byte[]) {
jg.writeObjectField(fieldName, new String((byte[]) value));
} else {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
} else {
return value.toString();
}
jg.writeObjectField(fieldName, value.toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,11 @@ protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet in
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
}
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]);
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,17 +80,11 @@ protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet in
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
}
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, row[i]);
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,15 @@ protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Ser
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
jsonGenerator.writeNumberField("column_type", columnDefinition.getType());
}
Serializable[] oldRow = row.getKey();
Serializable[] newRow = row.getValue();

if (oldRow[i] == null) {
jsonGenerator.writeNullField("last_value");
} else {
jsonGenerator.writeObjectField("last_value", getWritableObject(columnType, oldRow[i]));
}

if (newRow[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", getWritableObject(columnType, newRow[i]));
}
writeObjectAsValueField(jsonGenerator, "last_value", columnDefinition, oldRow[i]);
writeObjectAsValueField(jsonGenerator, "value", columnDefinition, newRow[i]);
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1253,7 +1253,9 @@ protected TableInfo loadTableInfo(TableInfoCacheKey key) throws SQLException {
for (int i = 1; i <= numCols; i++) {
// Use the column label if it exists, otherwise use the column name. We're not doing aliasing here, but it's better practice.
String columnLabel = rsmd.getColumnLabel(i);
columnDefinitions.add(new ColumnDefinition(rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i)));
columnDefinitions.add(new ColumnDefinition(
rsmd.isSigned(i), rsmd.getColumnType(i), columnLabel != null ? columnLabel : rsmd.getColumnName(i)
));
}

tableInfo = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), columnDefinitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,77 @@
package org.apache.nifi.cdc.mysql.event.io;


import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.sql.Types;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

class TestInsertRowsWriter {

@Test
public void testGetWritableObject() {
public void testGetWritableObject() throws IOException {
InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
assertNull(insertRowsWriter.getWritableObject(null, null));
assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null));
assertEquals((byte) 1, insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1));
assertEquals("Hello", insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", null, null); //null
verify(g).writeNullField("fieldName");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.INTEGER), null); //null
verify(g).writeNullField("fieldName");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.BIGINT), (int) -77); //null
verify(g).writeFieldName("fieldName");
verify(g).writeRawValue("18446744073709551539");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.INTEGER), (int) -77); //null
verify(g).writeFieldName("fieldName");
verify(g).writeRawValue("4294967219");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.SMALLINT), (int) -77); //null
verify(g).writeFieldName("fieldName");
verify(g).writeRawValue("65459");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.TINYINT), (int) -77); //null
verify(g).writeFieldName("fieldName");
verify(g).writeRawValue("179");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.TINYINT), (int) -77); //null
verify(g).writeObjectField("fieldName", -77);
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(false, Types.TINYINT), (byte) 1); //null
verify(g).writeFieldName("fieldName");
verify(g).writeRawValue( "1");
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(true, Types.TINYINT), (byte) 1); //null
verify(g).writeObjectField("fieldName", (byte) 1);
verifyNoMoreInteractions(g);
}
try (JsonGenerator g = mock(JsonGenerator.class)) {
insertRowsWriter.writeObjectAsValueField(g, "fieldName", new ColumnDefinition(null, Types.VARCHAR), "Hello".getBytes()); //null
verify(g).writeObjectField("fieldName", "Hello");
verifyNoMoreInteractions(g);
}
}

}
Loading