Skip to content

Commit 650e118

Browse files
committed
Add support for autoregister schemas in the json serde
1 parent 109aaca commit 650e118

File tree

9 files changed

+114
-19
lines changed

9 files changed

+114
-19
lines changed

app/src/test/java/io/apicurio/registry/noprofile/serde/JsonSchemaSerdeTest.java

+48
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,54 @@ public void testJsonSchemaSerde() throws Exception {
134134
}
135135
}
136136

137+
@Test
138+
public void testJsonSchemaSerdeAutoRegister() throws Exception {
139+
String groupId = TestUtils.generateGroupId();
140+
String artifactId = generateArtifactId();
141+
142+
Person person = new Person("Carles", "Arnal", 30);
143+
144+
try (JsonSchemaKafkaSerializer<Person> serializer = new JsonSchemaKafkaSerializer<>(restClient, true);
145+
Deserializer<Person> deserializer = new JsonSchemaKafkaDeserializer<>(restClient, true)) {
146+
147+
Map<String, Object> config = new HashMap<>();
148+
config.put(SerdeConfig.EXPLICIT_ARTIFACT_GROUP_ID, groupId);
149+
config.put(SerdeConfig.ARTIFACT_RESOLVER_STRATEGY, SimpleTopicIdStrategy.class.getName());
150+
config.put(SerdeConfig.SCHEMA_LOCATION, "/io/apicurio/registry/util/json-schema.json");
151+
config.put(SerdeConfig.AUTO_REGISTER_ARTIFACT, true);
152+
serializer.configure(config, false);
153+
154+
deserializer.configure(Collections.emptyMap(), false);
155+
156+
Headers headers = new RecordHeaders();
157+
byte[] bytes = serializer.serialize(artifactId, headers, person);
158+
159+
person = deserializer.deserialize(artifactId, headers, bytes);
160+
161+
Assertions.assertEquals("Carles", person.getFirstName());
162+
Assertions.assertEquals("Arnal", person.getLastName());
163+
Assertions.assertEquals(30, person.getAge());
164+
165+
person.setAge(-1);
166+
167+
try {
168+
serializer.serialize(artifactId, new RecordHeaders(), person);
169+
Assertions.fail();
170+
} catch (Exception ignored) {
171+
}
172+
173+
serializer.setValidationEnabled(false); // disable validation
174+
// create invalid person bytes
175+
bytes = serializer.serialize(artifactId, headers, person);
176+
177+
try {
178+
deserializer.deserialize(artifactId, headers, bytes);
179+
Assertions.fail();
180+
} catch (Exception ignored) {
181+
}
182+
}
183+
}
184+
137185
@Test
138186
public void testJsonSchemaSerdeHeaders() throws Exception {
139187
InputStream jsonSchema = getClass().getResourceAsStream("/io/apicurio/registry/util/json-schema.json");

schema-resolver/src/main/java/io/apicurio/registry/resolver/DefaultSchemaResolver.java

+17-10
Original file line numberDiff line numberDiff line change
@@ -106,16 +106,23 @@ private Optional<SchemaLookupResult<S>> getSchemaFromCache(ArtifactReference art
106106

107107
private SchemaLookupResult<S> getSchemaFromRegistry(ParsedSchema<S> parsedSchema, Record<T> data, ArtifactReference artifactReference) {
108108

109-
if (autoCreateArtifact && schemaParser.supportsExtractSchemaFromData()) {
110-
if (parsedSchema == null) {
111-
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
112-
}
113-
114-
if (parsedSchema.hasReferences()) {
115-
//List of references lookup, to be used to create the references for the artifact
116-
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
117-
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
118-
} else {
109+
if (autoCreateArtifact) {
110+
111+
if (schemaParser.supportsExtractSchemaFromData()) {
112+
113+
if (parsedSchema == null) {
114+
parsedSchema = schemaParser.getSchemaFromData(data, dereference);
115+
}
116+
117+
if (parsedSchema.hasReferences()) {
118+
//List of references lookup, to be used to create the references for the artifact
119+
final List<SchemaLookupResult<S>> schemaLookupResults = handleArtifactReferences(data, parsedSchema);
120+
return handleAutoCreateArtifact(parsedSchema, artifactReference, schemaLookupResults);
121+
} else {
122+
return handleAutoCreateArtifact(parsedSchema, artifactReference);
123+
}
124+
} else if (config.getExplicitSchemaLocation() != null && schemaParser.supportsGetSchemaFromLocation()) {
125+
parsedSchema = schemaParser.getSchemaFromLocation(config.getExplicitSchemaLocation());
119126
return handleAutoCreateArtifact(parsedSchema, artifactReference);
120127
}
121128
}

schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaParser.java

+17
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,27 @@ public interface SchemaParser<S, U> {
5050
*/
5151
ParsedSchema<S> getSchemaFromData(Record<U> data, boolean dereference);
5252

53+
/**
54+
* In some artifact types, such as Json, we allow defining a local place for the schema.
55+
*
56+
* @param location the schema location
57+
* @return the ParsedSchema, containing both the raw schema (bytes) and the parsed schema. Can be null.
58+
*/
59+
default ParsedSchema<S> getSchemaFromLocation(String location) {
60+
return null;
61+
}
62+
5363
/**
5464
* Flag that indicates if {@link SchemaParser#getSchemaFromData(Record)} is implemented or not.
5565
*/
5666
default boolean supportsExtractSchemaFromData() {
5767
return true;
5868
}
69+
70+
/**
71+
* Flag that indicates if {@link SchemaParser#getSchemaFromLocation(String)} is implemented or not.
72+
*/
73+
default boolean supportsGetSchemaFromLocation() {
74+
return false;
75+
}
5976
}

schema-resolver/src/main/java/io/apicurio/registry/resolver/SchemaResolverConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public class SchemaResolverConfig {
9393
*/
9494
public static final String EXPLICIT_ARTIFACT_ID = "apicurio.registry.artifact.artifact-id";
9595

96+
/**
97+
* Only applicable for serializers
98+
* Optional, set explicitly the schema location in the classpath for the schema to be used for serializing the data.
99+
*/
100+
public static final String SCHEMA_LOCATION = "apicurio.registry.artifact.schema.location";
101+
96102
/**
97103
* Only applicable for serializers
98104
* Optional, set explicitly the version used for querying/creating an artifact.

schema-resolver/src/main/java/io/apicurio/registry/resolver/config/DefaultSchemaResolverConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@ public String getExplicitArtifactId() {
132132
return getString(EXPLICIT_ARTIFACT_ID);
133133
}
134134

135+
public String getExplicitSchemaLocation() {
136+
return getString(SCHEMA_LOCATION);
137+
}
138+
135139
public String getExplicitArtifactVersion() {
136140
return getString(EXPLICIT_ARTIFACT_VERSION);
137141
}

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaSerializer.java

-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ public SchemaParser<JsonSchema, T> schemaParser() {
124124
*/
125125
@Override
126126
protected void serializeData(ParsedSchema<JsonSchema> schema, T data, OutputStream out) throws IOException {
127-
//TODO add property to specify a jsonschema to allow for auto-register json schemas
128127
serializeData(null, schema, data, out);
129128
}
130129

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaKafkaSerializerConfig.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@
3232
public class JsonSchemaKafkaSerializerConfig extends BaseKafkaSerDeConfig {
3333

3434
private static ConfigDef configDef() {
35-
ConfigDef configDef = new ConfigDef()
35+
return new ConfigDef()
3636
.define(VALIDATION_ENABLED, Type.BOOLEAN, VALIDATION_ENABLED_DEFAULT, Importance.MEDIUM, "Whether to validate the data against the json schema");
37-
return configDef;
3837
}
3938

4039
/**
@@ -49,5 +48,4 @@ public JsonSchemaKafkaSerializerConfig(Map<?, ?> originals) {
4948
public boolean validationEnabled() {
5049
return this.getBoolean(VALIDATION_ENABLED);
5150
}
52-
5351
}

serdes/jsonschema-serde/src/main/java/io/apicurio/registry/serde/jsonschema/JsonSchemaParser.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.apicurio.registry.serde.jsonschema;
1818

1919
import io.apicurio.registry.resolver.ParsedSchema;
20+
import io.apicurio.registry.resolver.ParsedSchemaImpl;
2021
import io.apicurio.registry.resolver.SchemaParser;
2122
import io.apicurio.registry.resolver.data.Record;
2223
import io.apicurio.registry.types.ArtifactType;
@@ -44,11 +45,6 @@ public JsonSchema parseSchema(byte[] rawSchema, Map<String, ParsedSchema<JsonSch
4445
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().getParsedSchema())), 0);
4546
}
4647

47-
//TODO we could implement some way of providing the jsonschema beforehand:
48-
// - via annotation in the object being serialized
49-
// - via config property
50-
//if we do this users will be able to automatically registering the schema when using this serde
51-
5248
/**
5349
* @see io.apicurio.registry.resolver.SchemaParser#getSchemaFromData(java.lang.Object)
5450
*/
@@ -64,8 +60,22 @@ public ParsedSchema<JsonSchema> getSchemaFromData(Record<T> data, boolean derefe
6460
return null;
6561
}
6662

63+
@Override
64+
public ParsedSchema<JsonSchema> getSchemaFromLocation(String location) {
65+
String rawSchema = IoUtil.toString(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));
66+
67+
return new ParsedSchemaImpl<JsonSchema>()
68+
.setParsedSchema(new JsonSchema(rawSchema))
69+
.setRawSchema(rawSchema.getBytes());
70+
}
71+
6772
@Override
6873
public boolean supportsExtractSchemaFromData() {
6974
return false;
7075
}
76+
77+
@Override
78+
public boolean supportsGetSchemaFromLocation() {
79+
return true;
80+
}
7181
}

serdes/serde-common/src/main/java/io/apicurio/registry/serde/SerdeConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ public class SerdeConfig {
9494
*/
9595
public static final String EXPLICIT_ARTIFACT_ID = SchemaResolverConfig.EXPLICIT_ARTIFACT_ID;
9696

97+
/**
98+
* Only applicable for serializers
99+
* Optional, set explicitly the schema used for serialization.
100+
*/
101+
public static final String SCHEMA_LOCATION = SchemaResolverConfig.SCHEMA_LOCATION;
102+
97103
/**
98104
* Only applicable for serializers
99105
* Optional, set explicitly the version used for querying/creating an artifact.

0 commit comments

Comments
 (0)