Skip to content

Commit

Permalink
More jspecify nullability changes in support package
Browse files Browse the repository at this point in the history
spring-projects#3762

Signed-off-by: Soby Chacko <[email protected]>
  • Loading branch information
sobychacko committed Feb 25, 2025
1 parent e5907af commit 85aa161
Show file tree
Hide file tree
Showing 22 changed files with 72 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ public void setThreadNameSupplier(Function<MessageListenerContainer, String> thr
this.threadNameSupplier = threadNameSupplier;
}

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "NullAway"})
@Override
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
C instance = createContainerInstance(endpoint);
Expand All @@ -372,6 +372,7 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
return instance;
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
if (aklEndpoint.getRecordFilterStrategy() == null) {
JavaUtils.INSTANCE
Expand Down Expand Up @@ -403,7 +404,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
* @param instance the container instance to configure.
* @param endpoint the endpoint.
*/
@SuppressWarnings("deprecation")
@SuppressWarnings({"deprecation", "NullAway"})
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
ContainerProperties properties = instance.getContainerProperties();
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public void setupListenerContainer(MessageListenerContainer listenerContainer,
protected abstract MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter);

@SuppressWarnings("unchecked")
@SuppressWarnings({"unchecked", "NullAway"})
private void setupMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,9 @@ public MessageListenerContainer unregisterListenerContainer(String id) {
* @param factory the {@link KafkaListenerContainerFactory} to use.
* @return the {@link MessageListenerContainer}.
*/
@SuppressWarnings("NullAway") // Dataflow analysis limitation
protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
KafkaListenerContainerFactory<?> factory) {

if (endpoint instanceof MultiMethodKafkaListenerEndpoint<?, ?> mmkle) {
Object bean = mmkle.getBean();
if (bean instanceof EndpointHandlerMultiMethod ehmm) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ public void setMessagingConverter(SmartMessageConverter messagingConverter) {
this.messagingConverter = messagingConverter;
}

@Nullable
private String getReplyTopic() {
private @Nullable String getReplyTopic() {
Method replyingMethod = getMethod();
if (replyingMethod != null) {
SendTo ann = AnnotatedElementUtils.findMergedAnnotation(replyingMethod, SendTo.class);
Expand Down Expand Up @@ -171,6 +170,7 @@ private String getReplyTopic() {
}

@Override
@SuppressWarnings("NullAway") // Dataflow analysis limitation
protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
@Nullable MessageConverter messageConverter) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ protected Object headerValueToAddOut(String key, Object value) {
return valueToAdd;
}

@SuppressWarnings("NullAway") // Dataflow analysis limitation
@Nullable
private byte[] mapRawOut(String header, Object value) {
if (this.mapAllStringsOut || this.rawMappedHeaders.containsKey(header)) {
Expand All @@ -269,7 +270,7 @@ else if (value instanceof String) {
* @param header the header.
* @return the value to add.
*/
protected Object headerValueToAddIn(Header header) {
protected @Nullable Object headerValueToAddIn(@Nullable Header header) {
if (header == null || header.value() == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2020 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.jspecify.annotations.Nullable;

import org.springframework.util.Assert;

Expand Down Expand Up @@ -74,7 +75,7 @@ public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata record
}

@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
public void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
this.delegates.forEach(d -> d.onError(producerRecord, recordMetadata, exception));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2024 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -430,6 +430,7 @@ public static class NonTrustedHeaderType {

private String untrustedType;

@SuppressWarnings("NullAway.Init")
public NonTrustedHeaderType() {
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2024 the original author or authors.
* Copyright 2021-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
import java.lang.reflect.Method;
import java.util.Arrays;

import org.jspecify.annotations.Nullable;

import org.springframework.beans.factory.BeanCurrentlyInCreationException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
Expand All @@ -41,11 +43,11 @@ public class EndpointHandlerMethod {

private final Object beanOrClass;

private String methodName;
private @Nullable String methodName;

private Object bean;
private @Nullable Object bean;

private Method method;
private @Nullable Method method;

public EndpointHandlerMethod(Object beanOrClass, String methodName) {
Assert.notNull(beanOrClass, () -> "No destination bean or class provided!");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.kafka.support;

import org.jspecify.annotations.Nullable;

import org.springframework.util.ClassUtils;

/**
Expand All @@ -28,7 +30,7 @@
*/
public final class JacksonPresent {

private static final ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); // NOSONAR
private static final @Nullable ClassLoader classLoader = ClassUtils.getDefaultClassLoader(); // NOSONAR

private static final boolean jackson2Present = // NOSONAR
ClassUtils.isPresent("com.fasterxml.jackson.databind.ObjectMapper", classLoader) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public <T> JavaUtils acceptIfCondition(boolean condition, T value, Consumer<T> c
* @param <T> the value type.
* @return this.
*/
public <T> JavaUtils acceptIfNotNull(@Nullable T value, Consumer<T> consumer) {
public <T> JavaUtils acceptIfNotNull(@Nullable T value, Consumer<@Nullable T> consumer) {
if (value != null) {
consumer.accept(value);
}
Expand Down Expand Up @@ -162,7 +162,7 @@ public <T1, T2> JavaUtils acceptIfCondition(boolean condition, T1 t1, T2 t2, BiC
* @param <T2> the second argument type.
* @return this.
*/
public <T1, T2> JavaUtils acceptIfNotNull(T1 t1, T2 t2, BiConsumer<T1, T2> consumer) {
public <T1, T2> JavaUtils acceptIfNotNull(T1 t1, @Nullable T2 t2, BiConsumer<T1, @Nullable T2> consumer) {
if (t2 != null) {
consumer.accept(t1, t2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public int getBlockingRetryDeliveryAttempt() {
Assert.state(getHeader(KafkaHeaders.DELIVERY_ATTEMPT) != null,
"Blocking delivery attempt header not present, "
+ "see ContainerProperties.setDeliveryAttemptHeader() to enable");
return getHeader(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class);
Integer deliveryAttempts = getHeader(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class);
return deliveryAttempts == null ? 0 : deliveryAttempts;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2021 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@

import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.jspecify.annotations.Nullable;

/**
* Provides a method-chaining way to build {@link org.apache.kafka.streams.kstream.KStream#branch branches} in
Expand Down Expand Up @@ -54,7 +55,7 @@ public final class KafkaStreamBrancher<K, V> {

private final List<Consumer<? super KStream<K, V>>> consumerList = new ArrayList<>();

private Consumer<? super KStream<K, V>> defaultConsumer;
private @Nullable Consumer<? super KStream<K, V>> defaultConsumer;

/**
* Defines a new branch.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2024 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.messaging.Message;
import org.springframework.util.Assert;
Expand Down Expand Up @@ -101,7 +102,7 @@ public static boolean returnTypeMessageOrCollectionOf(Method method) {
* @param groupId the group id.
* @since 2.3
*/
public static void setConsumerGroupId(String groupId) {
public static void setConsumerGroupId(@Nullable String groupId) {
if (groupId != null) {
KafkaUtils.GROUP_IDS.put(Thread.currentThread(), groupId);
}
Expand All @@ -112,7 +113,7 @@ public static void setConsumerGroupId(String groupId) {
* @return the group id.
* @since 2.3
*/
public static String getConsumerGroupId() {
public static @Nullable String getConsumerGroupId() {
return KafkaUtils.GROUP_IDS.get(Thread.currentThread());
}

Expand Down Expand Up @@ -150,9 +151,10 @@ else if (dt instanceof String str) {
catch (@SuppressWarnings("unused") NumberFormatException ex) {
}
}
Integer deliveryTimeoutInMs = (Integer) ProducerConfig.configDef().defaultValues()
.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
return Duration.ofMillis(Math.max(
((Integer) ProducerConfig.configDef().defaultValues()
.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)).longValue() + buffer,
deliveryTimeoutInMs == null ? 0 : deliveryTimeoutInMs.longValue() + buffer,
min));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@

import java.util.function.Supplier;

import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -101,7 +103,7 @@ public void log(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void fatal(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void fatal(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.fatal(thrown, messageSupplier);
}
Expand All @@ -110,7 +112,7 @@ private void fatal(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void error(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void error(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.error(thrown, messageSupplier);
}
Expand All @@ -119,7 +121,7 @@ private void error(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void warn(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void warn(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.warn(thrown, messageSupplier);
}
Expand All @@ -128,7 +130,7 @@ private void warn(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void info(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void info(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.info(thrown, messageSupplier);
}
Expand All @@ -137,7 +139,7 @@ private void info(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void debug(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void debug(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.debug(thrown, messageSupplier);
}
Expand All @@ -146,7 +148,7 @@ private void debug(Supplier<CharSequence> messageSupplier, Throwable thrown) {
}
}

private void trace(Supplier<CharSequence> messageSupplier, Throwable thrown) {
private void trace(Supplier<CharSequence> messageSupplier, @Nullable Throwable thrown) {
if (thrown != null) {
this.logger.trace(thrown, messageSupplier);
}
Expand Down
Loading

0 comments on commit 85aa161

Please sign in to comment.