Skip to content

Commit

Permalink
Add nullability changes in listener/adapter package
Browse files Browse the repository at this point in the history
#3762

Signed-off-by: Soby Chacko <[email protected]>
  • Loading branch information
sobychacko committed Feb 26, 2025
1 parent efd3484 commit 3d1e450
Show file tree
Hide file tree
Showing 18 changed files with 100 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
* @param consumer the consumer.
* @since 2.0
*/
default void onMessage(T data, Consumer<?, ?> consumer) {
default void onMessage(T data, @Nullable Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}

Expand All @@ -68,7 +68,7 @@ default void onMessage(T data, Consumer<?, ?> consumer) {
* @param consumer the consumer.
* @since 2.0
*/
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2021 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

/**
* Interface for backing off a {@link MessageListenerContainer}
Expand All @@ -32,7 +33,7 @@ public interface KafkaConsumerBackoffManager {
void backOffIfNecessary(Context context);

default Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition,
Consumer<?, ?> messageConsumer) {
@Nullable Consumer<?, ?> messageConsumer) {
return new Context(dueTimestamp, topicPartition, listenerId, messageConsumer);
}

Expand Down Expand Up @@ -64,7 +65,7 @@ class Context {
private final Consumer<?, ?> consumerForTimingAdjustment;

Context(long dueTimestamp, TopicPartition topicPartition, String listenerId,
Consumer<?, ?> consumerForTimingAdjustment) {
@Nullable Consumer<?, ?> consumerForTimingAdjustment) {

this.dueTimestamp = dueTimestamp;
this.listenerId = listenerId;
Expand All @@ -84,7 +85,7 @@ public TopicPartition getTopicPartition() {
return this.topicPartition;
}

public Consumer<?, ?> getConsumerForTimingAdjustment() {
public @Nullable Consumer<?, ?> getConsumerForTimingAdjustment() {
return this.consumerForTimingAdjustment;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.ConsumerSeekAware;
Expand All @@ -46,7 +47,7 @@ public abstract class AbstractDelegatingMessageListenerAdapter<T>

protected final ListenerType delegateType; // NOSONAR

private final ConsumerSeekAware seekAware;
private final @Nullable ConsumerSeekAware seekAware;

public AbstractDelegatingMessageListenerAdapter(T delegate) {
this.delegate = delegate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AbstractRetryingMessageListenerAdapter<K, V, T>

private final RetryTemplate retryTemplate;

private final RecoveryCallback<? extends Object> recoveryCallback;
private final @Nullable RecoveryCallback<? extends Object> recoveryCallback;

/**
* Construct an instance with the supplied retry template. The exception will be
Expand Down Expand Up @@ -69,7 +69,7 @@ public RetryTemplate getRetryTemplate() {
return this.retryTemplate;
}

public RecoveryCallback<? extends Object> getRecoveryCallback() {
public @Nullable RecoveryCallback<? extends Object> getRecoveryCallback() {
return this.recoveryCallback;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage

private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();

private BatchToRecordAdapter<K, V> batchToRecordAdapter;
private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;

/**
* Create an instance with the provided parameters.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -47,7 +48,7 @@ public interface BatchToRecordAdapter<K, V> {
* @param consumer the consumer.
* @param callback the callback.
*/
void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment ack,
Consumer<?, ?> consumer, Callback<K, V> callback);

/**
Expand All @@ -66,7 +67,7 @@ interface Callback<K, V> {
* @param consumer the consumer.
* @param message the message.
*/
void invoke(ConsumerRecord<K, V> record, Acknowledgment ack, Consumer<?, ?> consumer,
void invoke(ConsumerRecord<K, V> record, @Nullable Acknowledgment ack, Consumer<?, ?> consumer,
Message<?> message);

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
Expand Down Expand Up @@ -60,7 +61,7 @@ public class ConvertingMessageListener<V> implements DelegatingMessageListener<M

private MessageConverter messageConverter;

private KafkaHeaderMapper headerMapper;
private @Nullable KafkaHeaderMapper headerMapper;

/**
* Construct an instance with the provided {@link MessageListener} and {@link Class}
Expand Down Expand Up @@ -106,7 +107,7 @@ public MessageListener getDelegate() {

@Override
@SuppressWarnings("unchecked")
public void onMessage(ConsumerRecord receivedRecord, Acknowledgment acknowledgment, Consumer consumer) {
public void onMessage(ConsumerRecord receivedRecord, @Nullable Acknowledgment acknowledgment, Consumer consumer) {
ConsumerRecord convertedConsumerRecord = convertConsumerRecord(receivedRecord);
if (this.delegate instanceof AcknowledgingConsumerAwareMessageListener) {
this.delegate.onMessage(convertedConsumerRecord, acknowledgment, consumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.jspecify.annotations.Nullable;

import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
Expand Down Expand Up @@ -62,7 +63,7 @@ public DefaultBatchToRecordAdapter(ConsumerRecordRecoverer recoverer) {
}

@Override
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment ack,
Consumer<?, ?> consumer, Callback<K, V> callback) {

for (int i = 0; i < messages.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class DelegatingInvocableHandler {
private final ConcurrentMap<InvocableHandlerMethod, MethodParameter> payloadMethodParameters =
new ConcurrentHashMap<>();

private final InvocableHandlerMethod defaultHandler;
private final @Nullable InvocableHandlerMethod defaultHandler;

private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new ConcurrentHashMap<>();

Expand All @@ -79,13 +79,13 @@ public class DelegatingInvocableHandler {

private final Object bean;

private final BeanExpressionResolver resolver;
private final @Nullable BeanExpressionResolver resolver;

private final BeanExpressionContext beanExpressionContext;
private final @Nullable BeanExpressionContext beanExpressionContext;

private final ConfigurableListableBeanFactory beanFactory;
private final @Nullable ConfigurableListableBeanFactory beanFactory;

private final PayloadValidator validator;
private final @Nullable PayloadValidator validator;

private final boolean asyncReplies;

Expand Down Expand Up @@ -168,7 +168,8 @@ public boolean isAsyncReplies() {
* @throws Exception raised if no suitable argument resolver can be found,
* or the method raised an exception.
*/
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public Object invoke(Message<?> message, @Nullable Object... providedArgs) throws Exception { //NOSONAR
Class<?> payloadClass = message.getPayload().getClass();
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
if (this.validator != null && this.defaultHandler != null) {
Expand Down Expand Up @@ -345,6 +346,7 @@ private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadC
* @since 3.2
*/
@Nullable
@SuppressWarnings("NullAway") // Dataflow analysis limitation
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
if (handler != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,

@Override
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
@Nullable Consumer<?, ?> consumer) {

final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
Expand All @@ -100,8 +100,8 @@ else if (!consumerRecords.isEmpty() || this.consumerAware
}
}

private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, @Nullable Acknowledgment acknowledgment,
@Nullable Consumer<?, ?> consumer) {

switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
Expand Down Expand Up @@ -129,12 +129,12 @@ public void onMessage(List<ConsumerRecord<K, V>> data) {
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment) {
public void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment) {
onMessage(data, acknowledgment, null); // NOSONAR
}

@Override
public void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer) {
public void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
@Nullable Consumer<?, ?> consumer) {

if (!filter(consumerRecord)) {
switch (this.delegateType) {
Expand Down Expand Up @@ -104,12 +104,12 @@ public void onMessage(ConsumerRecord<K, V> data) {
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
public void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment) {
onMessage(data, acknowledgment, null); // NOSONAR
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.listener.adapter;

import java.lang.reflect.Method;
import java.util.Objects;

import org.jspecify.annotations.Nullable;

Expand All @@ -35,9 +36,9 @@
*/
public class HandlerAdapter {

private final InvocableHandlerMethod invokerHandlerMethod;
private final @Nullable InvocableHandlerMethod invokerHandlerMethod;

private final DelegatingInvocableHandler delegatingHandler;
private final @Nullable DelegatingInvocableHandler delegatingHandler;

private final boolean asyncReplies;

Expand Down Expand Up @@ -74,11 +75,11 @@ public boolean isAsyncReplies() {
}

@Nullable
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
public Object invoke(Message<?> message, @Nullable Object... providedArgs) throws Exception { //NOSONAR
if (this.invokerHandlerMethod != null) {
return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR
}
else if (this.delegatingHandler.hasDefaultHandler()) {
else if (Objects.requireNonNull(this.delegatingHandler).hasDefaultHandler()) {
// Needed to avoid returning raw Message which matches Object
Object[] args = new Object[providedArgs.length + 1];
args[0] = message.getPayload();
Expand All @@ -95,7 +96,7 @@ public String getMethodAsString(Object payload) {
return this.invokerHandlerMethod.getMethod().toGenericString();
}
else {
return this.delegatingHandler.getMethodNameFor(payload);
return Objects.requireNonNull(this.delegatingHandler).getMethodNameFor(payload);
}
}

Expand All @@ -104,7 +105,7 @@ public Object getBean() {
return this.invokerHandlerMethod.getBean();
}
else {
return this.delegatingHandler.getBean();
return Objects.requireNonNull(this.delegatingHandler).getBean();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
}
}

private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
switch (this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
Expand All @@ -111,7 +111,7 @@ private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknow
}

private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp,
Consumer<?, ?> consumer) {
@Nullable Consumer<?, ?> consumer) {

return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId,
new TopicPartition(data.topic(), data.partition()), consumer);
Expand All @@ -135,12 +135,12 @@ public void onMessage(ConsumerRecord<K, V> data) {
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
public void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment) {
onMessage(data, acknowledgment, null); // NOSONAR
}

@Override
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consumer) {
onMessage(data, null, consumer);
}
}
Loading

0 comments on commit 3d1e450

Please sign in to comment.