Skip to content

Commit

Permalink
jspecify nullability changes in support package
Browse files Browse the repository at this point in the history
serializer and mapping packages under support.

spring-projects#3762

Signed-off-by: Soby Chacko <[email protected]>
  • Loading branch information
sobychacko committed Feb 24, 2025
1 parent 53149d4 commit e5907af
Show file tree
Hide file tree
Showing 19 changed files with 74 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.messaging.converter.MessageConversionException;
Expand Down Expand Up @@ -82,7 +83,7 @@ public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {

private String keyClassIdFieldName = DEFAULT_KEY_CLASSID_FIELD_NAME;

private ClassLoader classLoader = ClassUtils.getDefaultClassLoader();
private @Nullable ClassLoader classLoader = ClassUtils.getDefaultClassLoader();

public String getClassIdFieldName() {
return this.classIdFieldName;
Expand Down Expand Up @@ -133,7 +134,7 @@ public void setBeanClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
}

protected ClassLoader getClassLoader() {
protected @Nullable ClassLoader getClassLoader() {
return this.classLoader;
}

Expand All @@ -155,7 +156,7 @@ protected String retrieveHeader(Headers headers, String headerName) {
return classId;
}

protected String retrieveHeaderAsString(Headers headers, String headerName) {
protected @Nullable String retrieveHeaderAsString(Headers headers, String headerName) {
Header header = headers.lastHeader(headerName);
if (header != null) {
String classId = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.kafka.support.mapping;

import org.apache.kafka.common.header.Headers;
import org.jspecify.annotations.Nullable;

/**
* Strategy for setting metadata on messages such that one can create the class
Expand All @@ -32,6 +33,7 @@ public interface ClassMapper {

void fromClass(Class<?> clazz, Headers headers);

@Nullable
Class<?> toClass(Headers headers);

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.type.TypeFactory;
import org.apache.kafka.common.header.Headers;
import org.jspecify.annotations.Nullable;

import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -89,7 +90,7 @@ public void addTrustedPackages(String... packagesToTrust) {
}

@Override
public JavaType toJavaType(Headers headers) {
public @Nullable JavaType toJavaType(Headers headers) {
String typeIdHeader = retrieveHeaderAsString(headers, getClassIdFieldName());

if (typeIdHeader != null) {
Expand Down Expand Up @@ -181,8 +182,9 @@ public void fromClass(Class<?> clazz, Headers headers) {
}

@Override
public Class<?> toClass(Headers headers) {
return toJavaType(headers).getRawClass();
public @Nullable Class<?> toClass(Headers headers) {
JavaType javaType = toJavaType(headers);
return javaType == null ? null : javaType.getRawClass();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import com.fasterxml.jackson.databind.JavaType;
import org.apache.kafka.common.header.Headers;
import org.jspecify.annotations.Nullable;

/**
* Strategy for setting metadata on messages such that one can create the class that needs
Expand Down Expand Up @@ -52,6 +53,7 @@ enum TypePrecedence {

void fromJavaType(JavaType javaType, Headers headers);

@Nullable
JavaType toJavaType(Headers headers);

TypePrecedence getTypePrecedence();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Provides classes related to type mapping.
*/
package org.springframework.kafka.support.mapping;
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.support.mapping;
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public abstract class DelegatingByTopicSerialization<T extends Closeable> implem

private final Set<String> patterns = ConcurrentHashMap.newKeySet();

private T defaultDelegate;
private @Nullable T defaultDelegate;

private boolean forKeys;

Expand All @@ -93,7 +93,7 @@ public abstract class DelegatingByTopicSerialization<T extends Closeable> implem
public DelegatingByTopicSerialization() {
}

public DelegatingByTopicSerialization(Map<Pattern, T> delegates, T defaultDelegate) {
public DelegatingByTopicSerialization(Map<Pattern, T> delegates, @Nullable T defaultDelegate) {
Assert.notNull(delegates, "'delegates' cannot be null");
Assert.notNull(defaultDelegate, "'defaultDelegate' cannot be null");
this.delegates.putAll(delegates);
Expand Down Expand Up @@ -260,7 +260,7 @@ else if (key instanceof String regex) {
}
}

protected T instantiateAndConfigure(Map<String, ?> configs, boolean isKey, Map<Pattern, T> delegates2,
protected @Nullable T instantiateAndConfigure(Map<String, ?> configs, boolean isKey, Map<Pattern, T> delegates2,
@Nullable Pattern pattern, Class<?> clazz) {

if (pattern != null && !this.patterns.add(pattern.pattern())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,7 +71,7 @@ public byte[] serialize(String topic, Object data) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "NullAway"}) // Dataflow analysis limitation
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
if (data == null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2024 the original author or authors.
* Copyright 2021-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -81,6 +81,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
this.delegates.values().forEach(del -> del.configure(configs, isKey));
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Override
public byte[] serialize(String topic, Object data) {
if (data == null) {
Expand All @@ -90,6 +91,7 @@ public byte[] serialize(String topic, Object data) {
return delegate.serialize(topic, data);
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
if (data == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public Object deserialize(String topic, Headers headers, ByteBuffer data) {
return deserializer == null ? data : deserializer.deserialize(topic, headers, data);
}

private Deserializer<?> getDeserializerByHeaders(Headers headers) {
private @Nullable Deserializer<?> getDeserializerByHeaders(Headers headers) {
byte[] value = null;
String selectorKey = selectorKey();
Header header = headers.lastHeader(selectorKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ public byte[] serialize(String topic, Object data) {
throw new UnsupportedOperationException();
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Override
public byte[] serialize(String topic, Headers headers, Object data) {
if (data == null) {
Expand Down Expand Up @@ -229,7 +230,7 @@ private String selectorKey() {
/*
* Package for testing.
*/
@Nullable
@SuppressWarnings("NullAway") // Dataflow analysis limitation
byte[] trySerdes(Object data) {
try {
Serde<? extends Object> serdeFrom = Serdes.serdeFrom(data.getClass());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.jspecify.annotations.Nullable;

import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
Expand Down Expand Up @@ -66,13 +67,13 @@ public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
*/
public static final String VALIDATOR_CLASS = "spring.deserializer.validator.class";

private Deserializer<T> delegate;
private @Nullable Deserializer<T> delegate;

private boolean isForKey;

private Function<FailedDeserializationInfo, T> failedDeserializationFunction;
private @Nullable Function<FailedDeserializationInfo, T> failedDeserializationFunction;

private Validator validator;
private @Nullable Validator validator;

public ErrorHandlingDeserializer() {
}
Expand Down Expand Up @@ -194,25 +195,25 @@ private void setupValidator(Map<String, ?> configs) {
}

@Override
public T deserialize(String topic, byte[] data) {
public @Nullable T deserialize(String topic, byte[] data) {
try {
return validate(this.delegate.deserialize(topic, data));
return this.delegate == null ? null : validate(this.delegate.deserialize(topic, data));
}
catch (Exception e) {
return recoverFromSupplier(topic, null, data, e);
}
}

@Override
public T deserialize(String topic, Headers headers, byte[] data) {
public @Nullable T deserialize(String topic, Headers headers, byte[] data) {
try {
if (this.isForKey) {
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
}
else {
headers.remove(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER);
}
return validate(this.delegate.deserialize(topic, headers, data));
return this.delegate == null ? null : validate(this.delegate.deserialize(topic, headers, data));
}
catch (Exception e) {
SerializationUtils.deserializationException(headers, data, e, this.isForKey);
Expand All @@ -228,7 +229,7 @@ private T validate(T deserialized) {
return deserialized;
}

private T recoverFromSupplier(String topic, Headers headers, byte[] data, Exception exception) {
private @Nullable T recoverFromSupplier(String topic, @Nullable Headers headers, byte[] data, Exception exception) {
if (this.failedDeserializationFunction != null) {
FailedDeserializationInfo failedDeserializationInfo =
new FailedDeserializationInfo(topic, headers, data, this.isForKey, exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Arrays;

import org.apache.kafka.common.header.Headers;
import org.jspecify.annotations.Nullable;

/**
* Class containing all the contextual information around a deserialization error.
Expand All @@ -32,7 +33,7 @@ public class FailedDeserializationInfo {

private final String topic;

private final Headers headers;
private final @Nullable Headers headers;

private final byte[] data;

Expand All @@ -48,7 +49,7 @@ public class FailedDeserializationInfo {
* @param isForKey true for a key deserializer, false otherwise.
* @param exception exception causing the deserialization error.
*/
public FailedDeserializationInfo(String topic, Headers headers, byte[] data, boolean isForKey,
public FailedDeserializationInfo(String topic, @Nullable Headers headers, byte[] data, boolean isForKey,
Exception exception) {

this.topic = topic;
Expand All @@ -62,7 +63,7 @@ public String getTopic() {
return this.topic;
}

public Headers getHeaders() {
public @Nullable Headers getHeaders() {
return this.headers;
}

Expand Down
Loading

0 comments on commit e5907af

Please sign in to comment.