Skip to content

Commit 46dc4d0

Browse files
authored
GH-500: Workaround for non-serializable header (#503)
Fixes #500 When `listeners` are provided for `DefaultKafkaConsumerFactory`, the target `KafkaConsumer` instance is proxied. The `java.lang.reflect.Proxy` is `Serializable`, but the value it is wrapping is not. When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`), it checks for `Serializable` type only on top-level object of the header. Therefore, the `Proxy` is passing condition, but eventually we fail with `NotSerializableException`, since the proxied object is not like that * Remove `kafka_consumer` from a message before it reaches an aggregator with its logic to serialize message into the store This is a workaround until Spring for Apache Kafka is released with the fix: spring-projects/spring-kafka#2822
1 parent 7a30b39 commit 46dc4d0

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

functions/function/aggregator-function/src/main/java/org/springframework/cloud/fn/aggregator/AggregatorFunctionConfiguration.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2022 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -43,6 +43,7 @@
4343
import org.springframework.integration.channel.FluxMessageChannel;
4444
import org.springframework.integration.config.AggregatorFactoryBean;
4545
import org.springframework.integration.store.MessageGroupStore;
46+
import org.springframework.integration.support.MessageBuilder;
4647
import org.springframework.lang.Nullable;
4748
import org.springframework.messaging.Message;
4849
import org.springframework.messaging.MessageChannel;
@@ -67,7 +68,12 @@ public Function<Flux<Message<?>>, Flux<Message<?>>> aggregatorFunction(
6768
FluxMessageChannel outputChannel
6869
) {
6970
return input -> Flux.from(outputChannel)
70-
.doOnRequest((request) -> inputChannel.subscribeTo(input));
71+
.doOnRequest((request) ->
72+
inputChannel.subscribeTo(
73+
input.map((inputMessage) ->
74+
MessageBuilder.fromMessage(inputMessage)
75+
.removeHeader("kafka_consumer")
76+
.build())));
7177
}
7278

7379
@Bean

functions/function/aggregator-function/src/test/java/org/springframework/cloud/fn/aggregator/RedisMessageStoreAggregatorTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2020 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,13 +16,16 @@
1616

1717
package org.springframework.cloud.fn.aggregator;
1818

19+
import java.io.ByteArrayInputStream;
20+
import java.io.InputStream;
1921
import java.time.Duration;
2022
import java.util.List;
2123

2224
import org.junit.jupiter.api.Test;
2325
import reactor.core.publisher.Flux;
2426
import reactor.test.StepVerifier;
2527

28+
import org.springframework.aop.framework.ProxyFactory;
2629
import org.springframework.cloud.fn.consumer.redis.RedisTestContainerSupport;
2730
import org.springframework.integration.IntegrationMessageHeaderAccessor;
2831
import org.springframework.integration.redis.store.RedisMessageStore;
@@ -46,11 +49,14 @@ static void redisProperties(DynamicPropertyRegistry registry) {
4649

4750
@Test
4851
public void test() {
52+
InputStream fakeNonSerializableKafkaConsumer = new ByteArrayInputStream(new byte[0]);
53+
4954
Flux<Message<?>> input =
5055
Flux.just(MessageBuilder.withPayload("2")
5156
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")
5257
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
5358
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
59+
.setHeader("kafka_consumer", new ProxyFactory(fakeNonSerializableKafkaConsumer).getProxy())
5460
.build(),
5561
MessageBuilder.withPayload("1")
5662
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")

0 commit comments

Comments
 (0)