Skip to content

Commit c19da74

Browse files
authored
Add support for schema imports/references (#1280)
* Allow SR to be pluggable * Allow SR client to be pluggable * First cut at schema imports * Refactor AvroSchemaUtils * Use try-with-resources for ByteArrayOutputStream * Add AvroSchema tests * Fix maven plugins to work with references * Add back some deprecated methods * Temporarily undeprecate some items * Minor refactoring * Minor cleanup of unnecessary type param * Fix compatibility calls * Add missing schema type converter * Fix formatting of if statement * Fix NPE * Fix schema canonicalization * Add back ctor used by KSQL tests
1 parent f6205ee commit c19da74

File tree

93 files changed

+3369
-996
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+3369
-996
lines changed

avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,12 @@
1616

1717
package io.confluent.connect.avro;
1818

19+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
1920
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
2021
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
2122
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
2223
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
23-
import io.confluent.kafka.serializers.AvroSchemaUtils;
24+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
2425
import io.confluent.kafka.serializers.GenericContainerWithVersion;
2526
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
2627
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
@@ -128,8 +129,12 @@ public Serializer(Map<String, ?> configs, SchemaRegistryClient client) {
128129
}
129130

130131
public byte[] serialize(String topic, boolean isKey, Object value) {
132+
if (value == null) {
133+
return null;
134+
}
131135
return serializeImpl(
132-
getSubjectName(topic, isKey, value, AvroSchemaUtils.getSchema(value)), value);
136+
getSubjectName(topic, isKey, value, new AvroSchema(AvroSchemaUtils.getSchema(value))),
137+
value);
133138
}
134139
}
135140

avro-converter/src/main/java/io/confluent/connect/avro/AvroConverterConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import java.util.HashMap;
2020
import java.util.Map;
2121

22-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
22+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
2323

24-
public class AvroConverterConfig extends AbstractKafkaAvroSerDeConfig {
24+
public class AvroConverterConfig extends AbstractKafkaSchemaSerDeConfig {
2525

2626
public AvroConverterConfig(Map<?, ?> props) {
2727
super(baseConfigDef(), props);

avro-converter/src/test/java/io/confluent/connect/avro/AvroConverterTest.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.confluent.connect.avro;
1818

19+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
1920
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
2021
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
2122

@@ -36,8 +37,8 @@
3637

3738
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
3839
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
39-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDe;
40-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
40+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;
41+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
4142
import io.confluent.kafka.serializers.KafkaAvroSerializer;
4243

4344
import static org.hamcrest.CoreMatchers.equalTo;
@@ -56,7 +57,7 @@ public class AvroConverterTest {
5657
private static final String TOPIC = "topic";
5758

5859
private static final Map<String, ?> SR_CONFIG = Collections.singletonMap(
59-
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost");
60+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost");
6061

6162
private final SchemaRegistryClient schemaRegistry;
6263
private final AvroConverter converter;
@@ -76,7 +77,7 @@ public void testConfigure() {
7677
converter.configure(SR_CONFIG, true);
7778
assertTrue(Whitebox.<Boolean>getInternalState(converter, "isKey"));
7879
assertNotNull(Whitebox.getInternalState(
79-
Whitebox.<AbstractKafkaAvroSerDe>getInternalState(converter, "serializer"),
80+
Whitebox.<AbstractKafkaSchemaSerDe>getInternalState(converter, "serializer"),
8081
"schemaRegistry"));
8182
}
8283

@@ -85,7 +86,7 @@ public void testConfigureAlt() {
8586
converter.configure(SR_CONFIG, false);
8687
assertFalse(Whitebox.<Boolean>getInternalState(converter, "isKey"));
8788
assertNotNull(Whitebox.getInternalState(
88-
Whitebox.<AbstractKafkaAvroSerDe>getInternalState(converter, "serializer"),
89+
Whitebox.<AbstractKafkaSchemaSerDe>getInternalState(converter, "serializer"),
8990
"schemaRegistry"));
9091
}
9192

@@ -197,14 +198,14 @@ private void testVersionExtracted(String subject, KafkaAvroSerializer serializer
197198
.record("Foo").fields()
198199
.requiredInt("key")
199200
.endRecord();
200-
schemaRegistry.register(subject, avroSchema1);
201+
schemaRegistry.register(subject, new AvroSchema(avroSchema1));
201202

202203
org.apache.avro.Schema avroSchema2 = org.apache.avro.SchemaBuilder
203204
.record("Foo").fields()
204205
.requiredInt("key")
205206
.requiredString("value")
206207
.endRecord();
207-
schemaRegistry.register(subject, avroSchema2);
208+
schemaRegistry.register(subject, new AvroSchema(avroSchema2));
208209

209210

210211
// Get serialized data
@@ -271,8 +272,8 @@ public void testSameSchemaMultipleTopicWithDeprecatedSubjectNameStrategyForValue
271272
SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
272273
AvroConverter avroConverter = new AvroConverter(schemaRegistry);
273274
Map<String, ?> converterConfig = ImmutableMap.of(
274-
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost",
275-
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, DeprecatedTestTopicNameStrategy.class.getName());
275+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost",
276+
AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, DeprecatedTestTopicNameStrategy.class.getName());
276277
avroConverter.configure(converterConfig, false);
277278
assertSameSchemaMultipleTopic(avroConverter, schemaRegistry, false);
278279
}
@@ -282,8 +283,8 @@ public void testSameSchemaMultipleTopicWithDeprecatedSubjectNameStrategyForKey()
282283
SchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
283284
AvroConverter avroConverter = new AvroConverter(schemaRegistry);
284285
Map<String, ?> converterConfig = ImmutableMap.of(
285-
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost",
286-
AbstractKafkaAvroSerDeConfig.KEY_SUBJECT_NAME_STRATEGY, DeprecatedTestTopicNameStrategy.class.getName());
286+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost",
287+
AbstractKafkaSchemaSerDeConfig.KEY_SUBJECT_NAME_STRATEGY, DeprecatedTestTopicNameStrategy.class.getName());
287288
avroConverter.configure(converterConfig, true);
288289
assertSameSchemaMultipleTopic(avroConverter, schemaRegistry, true);
289290
}
@@ -300,7 +301,7 @@ public void testExplicitlyNamedNestedMapsWithNonStringKeys() {
300301
final AvroConverter avroConverter = new AvroConverter(new MockSchemaRegistryClient());
301302
avroConverter.configure(
302303
Collections.singletonMap(
303-
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost"
304+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "localhost"
304305
),
305306
false
306307
);
@@ -330,9 +331,9 @@ private void assertSameSchemaMultipleTopic(AvroConverter converter, SchemaRegist
330331
.requiredString("value")
331332
.endRecord();
332333
String subjectSuffix = isKey ? "key" : "value";
333-
schemaRegistry.register("topic1-" + subjectSuffix, avroSchema2_1);
334-
schemaRegistry.register("topic2-" + subjectSuffix, avroSchema1);
335-
schemaRegistry.register("topic2-" + subjectSuffix, avroSchema2_2);
334+
schemaRegistry.register("topic1-" + subjectSuffix, new AvroSchema(avroSchema2_1));
335+
schemaRegistry.register("topic2-" + subjectSuffix, new AvroSchema(avroSchema1));
336+
schemaRegistry.register("topic2-" + subjectSuffix, new AvroSchema(avroSchema2_2));
336337

337338
org.apache.avro.generic.GenericRecord avroRecord1
338339
= new org.apache.avro.generic.GenericRecordBuilder(avroSchema2_1).set("key", 15).set

avro-data/src/main/java/io/confluent/connect/avro/AvroData.java

+5-40
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.Objects;
6060
import java.util.Set;
6161

62+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
6263
import io.confluent.kafka.serializers.NonRecordContainer;
6364

6465

@@ -300,46 +301,10 @@ public Object convert(Schema schema, Object value) {
300301
}
301302

302303
private Cache<Schema, org.apache.avro.Schema> fromConnectSchemaCache;
303-
private Cache<AvroSchemaAndVersion, Schema> toConnectSchemaCache;
304+
private Cache<AvroSchema, Schema> toConnectSchemaCache;
304305
private boolean connectMetaData;
305306
private boolean enhancedSchemaSupport;
306307

307-
private static class AvroSchemaAndVersion {
308-
private org.apache.avro.Schema schema;
309-
private Integer version;
310-
311-
public AvroSchemaAndVersion(org.apache.avro.Schema schema, Integer version) {
312-
this.schema = schema;
313-
this.version = version;
314-
}
315-
316-
public org.apache.avro.Schema schema() {
317-
return schema;
318-
}
319-
320-
public Integer version() {
321-
return version;
322-
}
323-
324-
@Override
325-
public boolean equals(Object o) {
326-
if (this == o) {
327-
return true;
328-
}
329-
if (o == null || getClass() != o.getClass()) {
330-
return false;
331-
}
332-
AvroSchemaAndVersion that = (AvroSchemaAndVersion) o;
333-
return Objects.equals(schema, that.schema) && Objects.equals(version, that.version);
334-
}
335-
336-
@Override
337-
public int hashCode() {
338-
return Objects.hash(schema, version);
339-
}
340-
}
341-
342-
343308
public AvroData(int cacheSize) {
344309
this(new AvroDataConfig.Builder()
345310
.with(AvroDataConfig.SCHEMAS_CACHE_SIZE_CONFIG, cacheSize)
@@ -1510,7 +1475,7 @@ private Schema toConnectSchema(org.apache.avro.Schema schema,
15101475
// the internal conversions, this is the safest place to add caching since some of the internal
15111476
// conversions take extra flags (like forceOptional) which means the resulting schema might not
15121477
// exactly match the Avro schema.
1513-
AvroSchemaAndVersion schemaAndVersion = new AvroSchemaAndVersion(schema, version);
1478+
AvroSchema schemaAndVersion = new AvroSchema(schema, version);
15141479
Schema cachedSchema = toConnectSchemaCache.get(schemaAndVersion);
15151480
if (cachedSchema != null) {
15161481
return cachedSchema;
@@ -2009,8 +1974,8 @@ private String unionMemberFieldName(Schema schema) {
20091974

20101975
private static boolean isEnumSchema(Schema schema) {
20111976
return schema.type() == Schema.Type.STRING
2012-
&& schema.name() != null
2013-
&& schema.name().equals(AVRO_TYPE_ENUM);
1977+
&& schema.parameters() != null
1978+
&& schema.parameters().containsKey(AVRO_TYPE_ENUM);
20141979
}
20151980

20161981
private static boolean isInstanceOfAvroSchemaTypeForSimpleSchema(Schema fieldSchema,

avro-serde/src/test/java/io/confluent/kafka/streams/serdes/avro/GenericAvroSerdeTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
2626
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
27-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
27+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
2828

2929
import static org.hamcrest.CoreMatchers.equalTo;
3030
import static org.hamcrest.CoreMatchers.nullValue;
@@ -38,7 +38,7 @@ private static GenericAvroSerde createConfiguredSerdeForRecordValues() {
3838
SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
3939
GenericAvroSerde serde = new GenericAvroSerde(schemaRegistryClient);
4040
Map<String, Object> serdeConfig = new HashMap<>();
41-
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
41+
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
4242
serde.configure(serdeConfig, false);
4343
return serde;
4444
}

avro-serde/src/test/java/io/confluent/kafka/streams/serdes/avro/SpecificAvroSerdeTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import io.confluent.kafka.example.User;
2525
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
2626
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
27-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
27+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
2828
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
2929

3030
import static org.hamcrest.CoreMatchers.equalTo;
@@ -40,7 +40,7 @@ public class SpecificAvroSerdeTest {
4040
SchemaRegistryClient schemaRegistryClient = new MockSchemaRegistryClient();
4141
SpecificAvroSerde<T> serde = new SpecificAvroSerde<>(schemaRegistryClient);
4242
Map<String, Object> serdeConfig = new HashMap<>();
43-
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
43+
serdeConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
4444
serde.configure(serdeConfig, false);
4545
return serde;
4646
}
@@ -96,7 +96,8 @@ public void shouldRoundTripRecordsEvenWhenConfiguredToDisableSpecificAvro() {
9696
SpecificAvroSerde<User> serde = createConfiguredSerdeForRecordValues();
9797
User record = User.newBuilder().setName("alice").build();
9898
Map<String, Object> serdeConfig = new HashMap<>();
99-
serdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
99+
serdeConfig.put(
100+
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
100101
"fake-to-satisfy-checks");
101102
serdeConfig.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
102103

avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageFormatter.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package io.confluent.kafka.formatter;
1818

19-
import io.confluent.kafka.serializers.AvroSchemaUtils;
19+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
2020
import org.apache.avro.AvroRuntimeException;
2121
import org.apache.avro.Schema;
2222
import org.apache.avro.generic.GenericDatumWriter;
@@ -40,7 +40,7 @@
4040
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
4141
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
4242
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
43-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
43+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
4444
import kafka.common.MessageFormatter;
4545

4646
/**
@@ -105,7 +105,7 @@ public void init(Properties props) {
105105
if (props == null) {
106106
throw new ConfigException("Missing schema registry url!");
107107
}
108-
String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
108+
String url = props.getProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
109109
if (url == null) {
110110
throw new ConfigException("Missing schema registry url!");
111111
}
@@ -244,7 +244,7 @@ private SchemaRegistryClient createSchemaRegistry(
244244
) {
245245
return schemaRegistry != null ? schemaRegistry : new CachedSchemaRegistryClient(
246246
schemaRegistryUrl,
247-
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
247+
AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT,
248248
originals
249249
);
250250
}

avro-serializer/src/main/java/io/confluent/kafka/formatter/AvroMessageReader.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
3939
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
40-
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
40+
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
4141
import kafka.common.KafkaException;
4242
import kafka.common.MessageReader;
4343
import io.confluent.kafka.serializers.AbstractKafkaAvroSerializer;
@@ -131,15 +131,15 @@ public void init(java.io.InputStream inputStream, java.util.Properties props) {
131131
ignoreError = props.getProperty("ignore.error").trim().toLowerCase().equals("true");
132132
}
133133
reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8));
134-
String url = props.getProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
134+
String url = props.getProperty(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
135135
if (url == null) {
136136
throw new ConfigException("Missing schema registry url!");
137137
}
138138

139139
Map<String, Object> originals = getPropertiesMap(props);
140140

141141
schemaRegistry = new CachedSchemaRegistryClient(
142-
url, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, originals);
142+
url, AbstractKafkaSchemaSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT, originals);
143143
if (!props.containsKey("value.schema")) {
144144
throw new ConfigException("Must provide the Avro schema string in value.schema");
145145
}

0 commit comments

Comments
 (0)