Skip to content

Commit 46750fb

Browse files
cbornetlhotari
authored andcommitted
[fix][io] Fix KinesisSink json flattening for AVRO's SchemaType.BYTES (#24132)
(cherry picked from commit 9252675)
1 parent 29d32df commit 46750fb

File tree

4 files changed

+37
-20
lines changed

4 files changed

+37
-20
lines changed

Diff for: pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ public static String serializeRecordToJsonExpandingValue(ObjectMapper mapper, Re
222222
JsonRecord jsonRecord = new JsonRecord();
223223
GenericObject value = record.getValue();
224224
if (value != null) {
225-
jsonRecord.setPayload(toJsonSerializable(record.getSchema(), value.getNativeObject()));
225+
jsonRecord.setPayload(toJsonSerializable(record.getSchema(), value.getNativeObject(), flatten));
226226
}
227227
record.getKey().ifPresent(jsonRecord::setKey);
228228
record.getTopicName().ifPresent(jsonRecord::setTopicName);
@@ -242,7 +242,7 @@ public static org.apache.pulsar.client.api.Message<GenericObject> getMessage(Rec
242242
.orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"));
243243
}
244244

245-
private static Object toJsonSerializable(Schema<?> schema, Object val) {
245+
private static Object toJsonSerializable(Schema<?> schema, Object val, boolean convertBytesToString) {
246246
if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
247247
return val;
248248
}
@@ -254,15 +254,15 @@ private static Object toJsonSerializable(Schema<?> schema, Object val) {
254254
Map<String, Object> jsonKeyValue = new HashMap<>();
255255
if (keyValue.getKey() != null) {
256256
jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(),
257-
keyValue.getKey().getNativeObject()));
257+
keyValue.getKey().getNativeObject(), convertBytesToString));
258258
}
259259
if (keyValue.getValue() != null) {
260260
jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(),
261-
keyValue.getValue().getNativeObject()));
261+
keyValue.getValue().getNativeObject(), convertBytesToString));
262262
}
263263
return jsonKeyValue;
264264
case AVRO:
265-
return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
265+
return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val, convertBytesToString);
266266
case JSON:
267267
return val;
268268
default:

Diff for: pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java

+16-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Instant;
2727
import java.time.LocalDate;
2828
import java.time.LocalTime;
29+
import java.util.Base64;
2930
import java.util.HashMap;
3031
import java.util.Map;
3132
import java.util.UUID;
@@ -45,18 +46,18 @@ public class JsonConverter {
4546
private static final Map<String, LogicalTypeConverter<?>> logicalTypeConverters = new HashMap<>();
4647
private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);
4748

48-
public static JsonNode toJson(GenericRecord genericRecord) {
49+
public static JsonNode toJson(GenericRecord genericRecord, boolean convertBytesToString) {
4950
if (genericRecord == null) {
5051
return null;
5152
}
5253
ObjectNode objectNode = jsonNodeFactory.objectNode();
5354
for (Schema.Field field : genericRecord.getSchema().getFields()) {
54-
objectNode.set(field.name(), toJson(field.schema(), genericRecord.get(field.name())));
55+
objectNode.set(field.name(), toJson(field.schema(), genericRecord.get(field.name()), convertBytesToString));
5556
}
5657
return objectNode;
5758
}
5859

59-
public static JsonNode toJson(Schema schema, Object value) {
60+
public static JsonNode toJson(Schema schema, Object value, boolean convertBytesToString) {
6061
if (schema.getLogicalType() != null && logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
6162
return logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, value);
6263
}
@@ -77,8 +78,16 @@ public static JsonNode toJson(Schema schema, Object value) {
7778
case BOOLEAN:
7879
return jsonNodeFactory.booleanNode((Boolean) value);
7980
case BYTES:
81+
// Workaround for https://github.com/wnameless/json-flattener/issues/91
82+
if (convertBytesToString) {
83+
return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value));
84+
}
8085
return jsonNodeFactory.binaryNode((byte[]) value);
8186
case FIXED:
87+
// Workaround for https://github.com/wnameless/json-flattener/issues/91
88+
if (convertBytesToString) {
89+
return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString(((GenericFixed) value).bytes()));
90+
}
8291
return jsonNodeFactory.binaryNode(((GenericFixed) value).bytes());
8392
case ENUM: // GenericEnumSymbol
8493
case STRING:
@@ -93,7 +102,7 @@ public static JsonNode toJson(Schema schema, Object value) {
93102
iterable = (Object[]) value;
94103
}
95104
for (Object elem : iterable) {
96-
JsonNode fieldValue = toJson(elementSchema, elem);
105+
JsonNode fieldValue = toJson(elementSchema, elem, convertBytesToString);
97106
arrayNode.add(fieldValue);
98107
}
99108
return arrayNode;
@@ -102,21 +111,21 @@ public static JsonNode toJson(Schema schema, Object value) {
102111
Map<Object, Object> map = (Map<Object, Object>) value;
103112
ObjectNode objectNode = jsonNodeFactory.objectNode();
104113
for (Map.Entry<Object, Object> entry : map.entrySet()) {
105-
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
114+
JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue(), convertBytesToString);
106115
// can be a String or org.apache.avro.util.Utf8
107116
final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
108117
objectNode.set(entryKey, jsonNode);
109118
}
110119
return objectNode;
111120
}
112121
case RECORD:
113-
return toJson((GenericRecord) value);
122+
return toJson((GenericRecord) value, convertBytesToString);
114123
case UNION:
115124
for (Schema s : schema.getTypes()) {
116125
if (s.getType() == Schema.Type.NULL) {
117126
continue;
118127
}
119-
return toJson(s, value);
128+
return toJson(s, value, convertBytesToString);
120129
}
121130
// this case should not happen
122131
return jsonNodeFactory.textNode(value.toString());

Diff for: pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.google.gson.Gson;
3030

3131
import java.nio.ByteBuffer;
32+
import java.nio.charset.StandardCharsets;
33+
import java.util.Base64;
3234
import java.util.Collections;
3335
import java.util.HashMap;
3436
import java.util.Map;
@@ -358,6 +360,7 @@ public void testKeyValueSerializeRecordToJsonExpandingValue(SchemaType schemaTyp
358360
RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
359361
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
360362
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
363+
udtSchemaBuilder.field("c").type(SchemaType.BYTES).optional().defaultValue(null);
361364
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
362365
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
363366
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
@@ -366,12 +369,16 @@ public void testKeyValueSerializeRecordToJsonExpandingValue(SchemaType schemaTyp
366369
valueSchemaBuilder.field("e", udtGenericSchema).type(schemaType).optional().defaultValue(null);
367370
GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType));
368371

372+
byte[] bytes = "10".getBytes(StandardCharsets.UTF_8);
369373
GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
370374
.set("c", "1")
371375
.set("d", 1)
372376
.set("e", udtGenericSchema.newRecordBuilder()
373377
.set("a", "a")
374378
.set("b", true)
379+
// There's a bug in json-flattener that doesn't handle byte[] fields correctly.
380+
// But since we use AUTO_CONSUME, we won't get byte[] fields for JSON schema anyway.
381+
.set("c", schemaType == SchemaType.AVRO ? bytes : Base64.getEncoder().encodeToString(bytes))
375382
.set("d", 1.0)
376383
.set("f", 1.0f)
377384
.set("i", 1)
@@ -434,16 +441,17 @@ public Optional<Long> getEventTime() {
434441
String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false);
435442

436443
assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
437-
+ "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,"
438-
+ "\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},\"properties\":{\"prop-key\":\"prop-value\"},"
439-
+ "\"eventTime\":1648502845803}");
444+
+ "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":1.0,"
445+
+ "\"f\":1.0,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},"
446+
+ "\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}");
440447

441448
json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true);
442449

443450
assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload.value.c\":\"1\","
444-
+ "\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,\"payload.value.e"
445-
+ ".d\":1.0,\"payload.value.e.f\":1.0,\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key"
446-
+ ".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
451+
+ "\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,"
452+
+ "\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":1.0,\"payload.value.e.f\":1.0,"
453+
+ "\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key.a\":\"1\",\"payload.key.b\":1,"
454+
+ "\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
447455
}
448456

449457
@Test(dataProvider = "schemaType")

Diff for: pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public void testAvroToJson() throws IOException {
8181
genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
8282
genericRecord.put("map", ImmutableMap.of("a",10));
8383
genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10));
84-
JsonNode jsonNode = JsonConverter.toJson(genericRecord);
84+
JsonNode jsonNode = JsonConverter.toJson(genericRecord, false);
8585
assertEquals(jsonNode.get("n"), NullNode.getInstance());
8686
assertEquals(jsonNode.get("l").asLong(), 1L);
8787
assertEquals(jsonNode.get("i").asInt(), 1);
@@ -135,7 +135,7 @@ public void testLogicalTypesToJson() throws IOException {
135135
genericRecord.put("myuuid", myUuid.toString());
136136

137137
GenericRecord genericRecord2 = deserialize(serialize(genericRecord, schema), schema);
138-
JsonNode jsonNode = JsonConverter.toJson(genericRecord2);
138+
JsonNode jsonNode = JsonConverter.toJson(genericRecord2, false);
139139
assertEquals(jsonNode.get("mydate").asInt(), calendar.toInstant().getEpochSecond());
140140
assertEquals(jsonNode.get("tsmillis").asInt(), (int)calendar.getTimeInMillis());
141141
assertEquals(jsonNode.get("tsmicros").asLong(), calendar.getTimeInMillis() * 1000);

0 commit comments

Comments
 (0)