Skip to content

BE: Skip FQ union types for avro-json convertions #3931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.provectus.kafka.ui.exception;

public class JsonAvroConversionException extends ValidationException {
public JsonAvroConversionException(String message) {
super(message);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -151,30 +151,16 @@ private ArrayFieldSchema createArraySchema(Schema schema,
}

private JsonType convertType(Schema schema) {
switch (schema.getType()) {
case INT:
case LONG:
return new SimpleJsonType(JsonType.Type.INTEGER);
case MAP:
case RECORD:
return new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM:
return new EnumJsonType(schema.getEnumSymbols());
case BYTES:
case STRING:
return new SimpleJsonType(JsonType.Type.STRING);
case NULL:
return new SimpleJsonType(JsonType.Type.NULL);
case ARRAY:
return new SimpleJsonType(JsonType.Type.ARRAY);
case FIXED:
case FLOAT:
case DOUBLE:
return new SimpleJsonType(JsonType.Type.NUMBER);
case BOOLEAN:
return new SimpleJsonType(JsonType.Type.BOOLEAN);
default:
return new SimpleJsonType(JsonType.Type.STRING);
}
return switch (schema.getType()) {
case INT, LONG -> new SimpleJsonType(JsonType.Type.INTEGER);
case MAP, RECORD -> new SimpleJsonType(JsonType.Type.OBJECT);
case ENUM -> new EnumJsonType(schema.getEnumSymbols());
case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING);
case NULL -> new SimpleJsonType(JsonType.Type.NULL);
case ARRAY -> new SimpleJsonType(JsonType.Type.ARRAY);
case FIXED, FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER);
case BOOLEAN -> new SimpleJsonType(JsonType.Type.BOOLEAN);
default -> new SimpleJsonType(JsonType.Type.STRING);
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.provectus.kafka.ui.util.jsonschema;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
Expand All @@ -15,7 +16,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.collect.Lists;
import com.provectus.kafka.ui.exception.JsonToAvroConversionException;
import com.provectus.kafka.ui.exception.JsonAvroConversionException;
import io.confluent.kafka.serializers.AvroData;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
Expand All @@ -34,20 +35,24 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;

// json <-> avro
public class JsonAvroConversion {

private static final JsonMapper MAPPER = new JsonMapper();
private static final Schema NULL_SCHEMA = Schema.create(Schema.Type.NULL);

// converts json into Object that is expected input for KafkaAvroSerializer
// (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!)
@SneakyThrows
public static Object convertJsonToAvro(String jsonString, Schema avroSchema) {
JsonNode rootNode = MAPPER.readTree(jsonString);
JsonNode rootNode = null;
try {
rootNode = MAPPER.readTree(jsonString);
} catch (JsonProcessingException e) {
throw new JsonAvroConversionException("String is not a valid json");
}
return convert(rootNode, avroSchema);
}

Expand Down Expand Up @@ -80,31 +85,43 @@ private static Object convert(JsonNode node, Schema avroSchema) {
assertJsonType(node, JsonNodeType.STRING);
String symbol = node.textValue();
if (!avroSchema.getEnumSymbols().contains(symbol)) {
throw new JsonToAvroConversionException("%s is not a part of enum symbols [%s]"
throw new JsonAvroConversionException("%s is not a part of enum symbols [%s]"
.formatted(symbol, avroSchema.getEnumSymbols()));
}
yield new GenericData.EnumSymbol(avroSchema, symbol);
}
case UNION -> {
// for types from enum (other than null) payload should be an object with single key == name of type
// ex: schema = [ "null", "int", "string" ], possible payloads = null, { "string": "str" }, { "int": 123 }
if (node.isNull() && avroSchema.getTypes().contains(Schema.create(Schema.Type.NULL))) {
if (node.isNull() && avroSchema.getTypes().contains(NULL_SCHEMA)) {
yield null;
}

assertJsonType(node, JsonNodeType.OBJECT);
var elements = Lists.newArrayList(node.fields());
if (elements.size() != 1) {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"UNION field value should be an object with single field == type name");
}
var typeNameToValue = elements.get(0);
Map.Entry<String, JsonNode> typeNameToValue = elements.get(0);
List<Schema> candidates = new ArrayList<>();
for (Schema unionType : avroSchema.getTypes()) {
if (typeNameToValue.getKey().equals(unionType.getFullName())) {
yield convert(typeNameToValue.getValue(), unionType);
}
if (typeNameToValue.getKey().equals(unionType.getName())) {
candidates.add(unionType);
}
}
if (candidates.size() == 1) {
yield convert(typeNameToValue.getValue(), candidates.get(0));
}
throw new JsonToAvroConversionException(
if (candidates.size() > 1) {
throw new JsonAvroConversionException(
"Can't select type within union for value '%s'. Provide full type name.".formatted(node)
);
}
throw new JsonAvroConversionException(
"json value '%s' is cannot be converted to any of union types [%s]"
.formatted(node, avroSchema.getTypes()));
}
Expand Down Expand Up @@ -164,7 +181,7 @@ private static Object convert(JsonNode node, Schema avroSchema) {
assertJsonType(node, JsonNodeType.STRING);
byte[] bytes = node.textValue().getBytes(StandardCharsets.ISO_8859_1);
if (bytes.length != avroSchema.getFixedSize()) {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"Fixed field has unexpected size %d (should be %d)"
.formatted(bytes.length, avroSchema.getFixedSize()));
}
Expand Down Expand Up @@ -208,8 +225,11 @@ public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) {
case UNION -> {
ObjectNode node = MAPPER.createObjectNode();
int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj);
Schema unionType = avroSchema.getTypes().get(unionIdx);
node.set(unionType.getFullName(), convertAvroToJson(obj, unionType));
Schema selectedType = avroSchema.getTypes().get(unionIdx);
node.set(
selectUnionTypeFieldName(avroSchema, selectedType, unionIdx),
convertAvroToJson(obj, selectedType)
);
yield node;
}
case STRING -> {
Expand Down Expand Up @@ -252,19 +272,38 @@ public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) {
};
}

// select name for a key field that represents type name of union.
// For records selects short name, if it is possible.
private static String selectUnionTypeFieldName(Schema unionSchema,
Schema chosenType,
int chosenTypeIdx) {
var types = unionSchema.getTypes();
if (types.size() == 2 && types.contains(NULL_SCHEMA)) {
return chosenType.getName();
}
for (int i = 0; i < types.size(); i++) {
if (i != chosenTypeIdx && chosenType.getName().equals(types.get(i).getName())) {
// there is another type inside union with the same name
// so, we have to use fullname
return chosenType.getFullName();
}
}
return chosenType.getName();
}

private static Object processLogicalType(JsonNode node, Schema schema) {
return findConversion(schema)
.map(c -> c.jsonToAvroConversion.apply(node, schema))
.orElseThrow(() ->
new JsonToAvroConversionException("'%s' logical type is not supported"
new JsonAvroConversionException("'%s' logical type is not supported"
.formatted(schema.getLogicalType().getName())));
}

private static JsonNode processLogicalType(Object obj, Schema schema) {
return findConversion(schema)
.map(c -> c.avroToJsonConversion.apply(obj, schema))
.orElseThrow(() ->
new JsonToAvroConversionException("'%s' logical type is not supported"
new JsonAvroConversionException("'%s' logical type is not supported"
.formatted(schema.getLogicalType().getName())));
}

Expand All @@ -281,15 +320,15 @@ private static boolean isLogicalType(Schema schema) {

private static void assertJsonType(JsonNode node, JsonNodeType... allowedTypes) {
if (Stream.of(allowedTypes).noneMatch(t -> node.getNodeType() == t)) {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"%s node has unexpected type, allowed types %s, actual type %s"
.formatted(node, Arrays.toString(allowedTypes), node.getNodeType()));
}
}

private static void assertJsonNumberType(JsonNode node, JsonParser.NumberType... allowedTypes) {
if (Stream.of(allowedTypes).noneMatch(t -> node.numberType() == t)) {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"%s node has unexpected numeric type, allowed types %s, actual type %s"
.formatted(node, Arrays.toString(allowedTypes), node.numberType()));
}
Expand Down Expand Up @@ -318,7 +357,7 @@ enum LogicalTypeConversion {
} else if (node.isNumber()) {
return new BigDecimal(node.numberValue().toString());
}
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to decimal logical type"
.formatted(node));
},
Expand All @@ -335,7 +374,7 @@ enum LogicalTypeConversion {
} else if (node.isTextual()) {
return LocalDate.parse(node.asText());
} else {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to date logical type"
.formatted(node));
}
Expand All @@ -356,7 +395,7 @@ enum LogicalTypeConversion {
} else if (node.isTextual()) {
return LocalTime.parse(node.asText());
} else {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to time-millis logical type"
.formatted(node));
}
Expand All @@ -377,7 +416,7 @@ enum LogicalTypeConversion {
} else if (node.isTextual()) {
return LocalTime.parse(node.asText());
} else {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to time-micros logical type"
.formatted(node));
}
Expand All @@ -398,7 +437,7 @@ enum LogicalTypeConversion {
} else if (node.isTextual()) {
return Instant.parse(node.asText());
} else {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to timestamp-millis logical type"
.formatted(node));
}
Expand All @@ -423,7 +462,7 @@ enum LogicalTypeConversion {
} else if (node.isTextual()) {
return Instant.parse(node.asText());
} else {
throw new JsonToAvroConversionException(
throw new JsonAvroConversionException(
"node '%s' can't be converted to timestamp-millis logical type"
.formatted(node));
}
Expand Down
Loading