Skip to content

Commit

Permalink
jspecify nullability changes for the streams package
Browse files Browse the repository at this point in the history
Signed-off-by: Soby Chacko <[email protected]>
  • Loading branch information
sobychacko committed Feb 18, 2025
1 parent ea9cf27 commit 4a105e5
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2023 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,7 +60,12 @@ public void process(Record<K, V> record) {
headerValue = ((String) headerValue).getBytes(StandardCharsets.UTF_8);
}
else if (!(headerValue instanceof byte[])) {
throw new IllegalStateException("Invalid header value type: " + headerValue.getClass());
if (headerValue != null) {
throw new IllegalStateException("Invalid header value type: " + headerValue.getClass());
}
else {
throw new IllegalStateException("headerValue is null");
}
}
headers.add(new RecordHeader(name, (byte[]) headerValue));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class KafkaStreamsInteractiveQueryService {
/**
* Underlying {@link KafkaStreams} from {@link StreamsBuilderFactoryBean}.
*/
private volatile KafkaStreams kafkaStreams;
private volatile @Nullable KafkaStreams kafkaStreams;

/**
* Construct an instance for querying state stores from the KafkaStreams in the {@link StreamsBuilderFactoryBean}.
Expand Down Expand Up @@ -87,6 +87,7 @@ public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> stor

return this.retryTemplate.execute(context -> {
try {
Assert.state(this.kafkaStreams != null, "KafkaStreams cannot be null.");
return this.kafkaStreams.store(storeQueryParams);
}
catch (Exception e) {
Expand Down Expand Up @@ -143,6 +144,7 @@ public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Seri
return this.retryTemplate.execute(context -> {
Throwable throwable = null;
try {
Assert.state(this.kafkaStreams != null, "KafkaStreams cannot be null.");
KeyQueryMetadata keyQueryMetadata = this.kafkaStreams.queryMetadataForKey(store, key, serializer);
if (keyQueryMetadata != null) {
return keyQueryMetadata.activeHost();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.jspecify.annotations.Nullable;

import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.util.ClassUtils;
Expand All @@ -45,7 +46,7 @@ public class RecoveringDeserializationExceptionHandler implements Deserializatio

private static final Log LOGGER = LogFactory.getLog(RecoveringDeserializationExceptionHandler.class);

private ConsumerRecordRecoverer recoverer;
private @Nullable ConsumerRecordRecoverer recoverer;

public RecoveringDeserializationExceptionHandler() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019-2022 the original author or authors.
* Copyright 2019-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -84,7 +84,7 @@ public void process(Record<Kin, Vin> record) {
message = this.function.exchange(message);
List<String> headerList = new ArrayList<>();
headers.forEach(header -> headerList.add(header.key()));
headerList.forEach(name -> headers.remove(name));
headerList.forEach(headers::remove);
ProducerRecord<?, ?> fromMessage = this.converter.fromMessage(message, "dummy");
fromMessage.headers().forEach(header -> {
if (!header.key().equals(KafkaHeaders.TOPIC)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Package for classes related to spring-messaging with Kafka Streams.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.streams.messaging;
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/**
* Package for classes related to Kafka Streams.
*/
@org.jspecify.annotations.NullMarked
package org.springframework.kafka.streams;

0 comments on commit 4a105e5

Please sign in to comment.