Skip to content

Header serialization exception in aggregator processor with Redis #500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
CEDDM opened this issue Sep 13, 2023 · 14 comments · Fixed by #503
Closed

Header serialization exception in aggregator processor with Redis #500

CEDDM opened this issue Sep 13, 2023 · 14 comments · Fixed by #503
Labels
bug Something isn't working
Milestone

Comments

@CEDDM
Copy link

CEDDM commented Sep 13, 2023

When I configure my aggregator with Redis to cache JSON messages, I get a serialization exception because the object does not implement Serializable. This is an known problem with Redis default serializer (JdkSerializationRedisSerializer).

The problem is we don't have a real Java object when the aggregator is used in SCDF (the messages come from Kafka or RabbitMQ)
The recommanded workaround I found is to use GenericJackson2JsonRedisSerializer but there is no way to do this without forking the aggregator (see the answer of @artembilan here : https://stackoverflow.com/questions/77088203/redis-serializer-properties-in-scdf).

It would be great to add a property to set the Redis serializer in the app configuration.

EDIT : According to @artembilan analysis, the problem comes from headers not the payload so the description above is wrong. Some headers must be filtered if not serializable

@artembilan
Copy link
Member

Any explanation how an aggregator processor receives from the binder something what is not byte[]?
Well, I may understand that JsonBytesToMap function may have an effect since default content-type in Spring Cloud Stream is JSON.
But again: the result Map is Serializable.

May you can share with us a stack trace from that serialization error, so we would have some clue what is going on?

Thanks

@CEDDM
Copy link
Author

CEDDM commented Sep 18, 2023

Sorry I forgot to mention that I don't use the last version of the aggregator. I use 2021.1.2 version.
I will give a try with the latest version containing the JsonBytesToMap evolution and come back to you.

@CEDDM
Copy link
Author

CEDDM commented Sep 18, 2023

In fact I think the problem happens before the aggregation when the app is caching individual messages to Redis so it's not a Map at this point. And yes I don't understand why there is a serialization problem as the message is already byte[]...
Anyway I will try with the latest version and post the stacktrace

@CEDDM
Copy link
Author

CEDDM commented Sep 18, 2023

I just see that 2021.1.2 is still the latest stable version so I couldn't try with a more recent one.
But my aggregation is :
aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]
So I think it's pretty much the same as JsonBytesToMap

Here is the stacktrace :

2023-09-18T11:16:30.665905861+02:00 stdout F 2023-09-18 09:16:30.665 ERROR [test-aggregator-aggregator,5037da907fd2b327,ee76cce914935991] 1 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [bean 'aggregator'; defined in: 'org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration'; from source: 'public org.springframework.integration.config.AggregatorFactoryBean org.springframework.cloud.fn.aggregator.AggregatorFunctionConfiguration.aggregator(org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.beans.factory.ObjectProvider,org.springframework.messaging.MessageChannel)']; nested exception is java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)', failedMessage=GenericMessage [payload=byte[48], headers={content-length=439, sequenceSize=4, target-protocol=kafka, authorization=Basic Om51bGw=, b3=5037da907fd2b327-98ac8c6d934be05e-0, host=10.152.183.167:8181, source-type=kafka, connection=Keep-Alive, correlationId=78c5121d-a229-ec2e-3201-8233291b4f10, id=4cca4fb5-180b-345e-71a3-ffe2020fe01b, contentType=application/json, kafka_receivedTimestamp=1695028590365, timestamp=1695028590574, http_requestMethod=POST, sequenceNumber=3, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=test-aggregator.splitter, accept=[text/plain, application/json, application/*+json, */*], nativeHeaders={}, kafka_offset=10, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@66c1b1ed, http_requestUrl=http://10.152.183.167:8181/, kafka_receivedPartitionId=0, accept-encoding=gzip,deflate, kafka_groupId=test-aggregator, user-agent=Apache-HttpClient/4.5.13 (Java/11.0.20.1)}]
2023-09-18T11:16:30.665926538+02:00 stdout F 	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
2023-09-18T11:16:30.665929086+02:00 stdout F 	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
2023-09-18T11:16:30.665930856+02:00 stdout F 	at org.springframework.integration.handler.ReplyProducingMessageHandlerWrapper.handleRequestMessage(ReplyProducingMessageHandlerWrapper.java:59)
2023-09-18T11:16:30.665932291+02:00 stdout F 	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
2023-09-18T11:16:30.665933719+02:00 stdout F 	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
2023-09-18T11:16:30.665935112+02:00 stdout F 	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
2023-09-18T11:16:30.665936549+02:00 stdout F 	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
2023-09-18T11:16:30.66593819+02:00 stdout F 	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
2023-09-18T11:16:30.665939641+02:00 stdout F 	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
2023-09-18T11:16:30.665941166+02:00 stdout F 	at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
2023-09-18T11:16:30.665942774+02:00 stdout F 	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
2023-09-18T11:16:30.66594415+02:00 stdout F 	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
2023-09-18T11:16:30.665945525+02:00 stdout F 	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
2023-09-18T11:16:30.665946945+02:00 stdout F 	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
2023-09-18T11:16:30.66594843+02:00 stdout F 	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
2023-09-18T11:16:30.665949805+02:00 stdout F 	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
2023-09-18T11:16:30.66595119+02:00 stdout F 	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
2023-09-18T11:16:30.665952594+02:00 stdout F 	at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
2023-09-18T11:16:30.665953988+02:00 stdout F 	at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
2023-09-18T11:16:30.66595724+02:00 stdout F 	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
2023-09-18T11:16:30.665958754+02:00 stdout F 	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
2023-09-18T11:16:30.665960145+02:00 stdout F 	at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
2023-09-18T11:16:30.665963507+02:00 stdout F 	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
2023-09-18T11:16:30.665964956+02:00 stdout F 	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
2023-09-18T11:16:30.665966347+02:00 stdout F 	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
2023-09-18T11:16:30.665967724+02:00 stdout F 	at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
2023-09-18T11:16:30.6659694+02:00 stdout F 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
2023-09-18T11:16:30.66597084+02:00 stdout F 	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
2023-09-18T11:16:30.665972266+02:00 stdout F 	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
2023-09-18T11:16:30.665973876+02:00 stdout F 	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
2023-09-18T11:16:30.66597532+02:00 stdout F 	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
2023-09-18T11:16:30.665976718+02:00 stdout F 	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
2023-09-18T11:16:30.6659781+02:00 stdout F 	at java.base/java.lang.Thread.run(Unknown Source)
2023-09-18T11:16:30.66597977+02:00 stdout F Caused by: java.lang.IllegalArgumentException: If relying on the default RedisSerializer (JdkSerializationRedisSerializer) the Object must be Serializable. Either make it Serializable or provide your own implementation of RedisSerializer via 'setValueSerializer(..)'
2023-09-18T11:16:30.665981195+02:00 stdout F 	at org.springframework.integration.redis.store.RedisMessageStore.rethrowAsIllegalArgumentException(RedisMessageStore.java:188)
2023-09-18T11:16:30.665982596+02:00 stdout F 	at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:128)
2023-09-18T11:16:30.665983994+02:00 stdout F 	at org.springframework.integration.store.AbstractKeyValueMessageStore.doAddMessage(AbstractKeyValueMessageStore.java:146)
2023-09-18T11:16:30.665985421+02:00 stdout F 	at org.springframework.integration.store.AbstractKeyValueMessageStore.addMessagesToGroup(AbstractKeyValueMessageStore.java:214)
2023-09-18T11:16:30.66598687+02:00 stdout F 	at org.springframework.integration.store.AbstractMessageGroupStore.addMessageToGroup(AbstractMessageGroupStore.java:189)
2023-09-18T11:16:30.665988294+02:00 stdout F 	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.store(AbstractCorrelatingMessageHandler.java:854)
2023-09-18T11:16:30.665989757+02:00 stdout F 	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.processMessageForGroup(AbstractCorrelatingMessageHandler.java:566)
2023-09-18T11:16:30.665991159+02:00 stdout F 	at org.springframework.integration.aggregator.AbstractCorrelatingMessageHandler.handleMessageInternal(AbstractCorrelatingMessageHandler.java:541)
2023-09-18T11:16:30.665992539+02:00 stdout F 	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
2023-09-18T11:16:30.665993961+02:00 stdout F 	... 31 more
2023-09-18T11:16:30.665995666+02:00 stdout F Caused by: org.springframework.data.redis.serializer.SerializationException: Cannot serialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.665997091+02:00 stdout F 	at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:96)
2023-09-18T11:16:30.66599852+02:00 stdout F 	at org.springframework.data.redis.core.AbstractOperations.rawValue(AbstractOperations.java:128)
2023-09-18T11:16:30.666001531+02:00 stdout F 	at org.springframework.data.redis.core.DefaultValueOperations.setIfAbsent(DefaultValueOperations.java:364)
2023-09-18T11:16:30.666002961+02:00 stdout F 	at org.springframework.data.redis.core.DefaultBoundValueOperations.setIfAbsent(DefaultBoundValueOperations.java:190)
2023-09-18T11:16:30.666004357+02:00 stdout F 	at org.springframework.integration.redis.store.RedisMessageStore.doStoreIfAbsent(RedisMessageStore.java:121)
2023-09-18T11:16:30.666005745+02:00 stdout F 	... 38 more
2023-09-18T11:16:30.666007412+02:00 stdout F Caused by: org.springframework.core.serializer.support.SerializationFailedException: Failed to serialize object using DefaultSerializer; nested exception is java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.666008808+02:00 stdout F 	at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:64)
2023-09-18T11:16:30.666010247+02:00 stdout F 	at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:33)
2023-09-18T11:16:30.666012758+02:00 stdout F 	at org.springframework.data.redis.serializer.JdkSerializationRedisSerializer.serialize(JdkSerializationRedisSerializer.java:94)
2023-09-18T11:16:30.666014248+02:00 stdout F 	... 42 more
2023-09-18T11:16:30.666015674+02:00 stdout F Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1
2023-09-18T11:16:30.666017077+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666018469+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666019849+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666021262+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666022639+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666024069+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666025446+02:00 stdout F 	at java.base/java.util.ArrayList.writeObject(Unknown Source)
2023-09-18T11:16:30.666026844+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666028228+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666029592+02:00 stdout F 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666030953+02:00 stdout F 	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666032331+02:00 stdout F 	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666033715+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666037642+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666039145+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666040577+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666041959+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666043326+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666044699+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.66604606+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666047441+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666048889+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666051926+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666053401+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666054782+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666056152+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.66605756+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666059016+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666060396+02:00 stdout F 	at java.base/java.util.HashMap.internalWriteEntries(Unknown Source)
2023-09-18T11:16:30.666061842+02:00 stdout F 	at java.base/java.util.HashMap.writeObject(Unknown Source)
2023-09-18T11:16:30.666063224+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666064608+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666065982+02:00 stdout F 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666067382+02:00 stdout F 	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666068754+02:00 stdout F 	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666070126+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666071517+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666073907+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666075358+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666076745+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteObject(Unknown Source)
2023-09-18T11:16:30.66607816+02:00 stdout F 	at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:317)
2023-09-18T11:16:30.666079551+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
2023-09-18T11:16:30.666080931+02:00 stdout F 	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666082335+02:00 stdout F 	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
2023-09-18T11:16:30.666083716+02:00 stdout F 	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
2023-09-18T11:16:30.666085169+02:00 stdout F 	at java.base/java.io.ObjectStreamClass.invokeWriteObject(Unknown Source)
2023-09-18T11:16:30.666086546+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666087944+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666089326+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666090714+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666092079+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666094271+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666095703+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.66609713+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.defaultWriteFields(Unknown Source)
2023-09-18T11:16:30.666098505+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeSerialData(Unknown Source)
2023-09-18T11:16:30.666099947+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source)
2023-09-18T11:16:30.666102962+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
2023-09-18T11:16:30.666104421+02:00 stdout F 	at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
2023-09-18T11:16:30.666105795+02:00 stdout F 	at org.springframework.core.serializer.DefaultSerializer.serialize(DefaultSerializer.java:46)
2023-09-18T11:16:30.666107178+02:00 stdout F 	at org.springframework.core.serializer.Serializer.serializeToByteArray(Serializer.java:56)
2023-09-18T11:16:30.666126366+02:00 stdout F 	at org.springframework.core.serializer.support.SerializingConverter.convert(SerializingConverter.java:60)
2023-09-18T11:16:30.666129355+02:00 stdout F 	... 44 more
2023-09-18T11:16:30.666130766+02:00 stdout F 

Thanks for your help !

@artembilan
Copy link
Member

OK. According to the stack trace, there is nothing with a payload.
The problem comes from MessageHeaders serialization:

2023-09-18T11:16:30.66607816+02:00 stdout F 	at org.springframework.messaging.MessageHeaders.writeObject(MessageHeaders.java:317)

And real exception is like this:

2023-09-18T11:16:30.666015674+02:00 stdout F Caused by: java.io.NotSerializableException: org.springframework.kafka.core.DefaultKafkaConsumerFactory$1

So, your original request is wrong since we definitely don't talk about "JSON or not" for a payload: we are just failing to serialize message headers.
We do have a logic there in the MessageHeaders:

	private void writeObject(ObjectOutputStream out) throws IOException {
		Set<String> keysToIgnore = new HashSet<>();
		this.headers.forEach((key, value) -> {
			if (!(value instanceof Serializable)) {
				keysToIgnore.add(key);
			}
		});

Therefore that DefaultKafkaConsumerFactory$1 could be as a part of some other header which passes the mentioned condition.

We need to investigate what headers are populated by binders and filter them out before applying for an aggregator handler.

However to trigger that process I need you to refactor this ticket to the proper problem description.
I wish we had that stack trace long time ago...

Thank you for understanding!

@CEDDM CEDDM changed the title Add redis-serializer property in aggregator processor Header serialization exception in aggregator processor with Redis Sep 19, 2023
@CEDDM
Copy link
Author

CEDDM commented Sep 19, 2023

Thanks I didn't see that line in the stack trace !
I changed the title and add an EDIT in the description. I hope it's OK.

Here are the headers of one individual Kafka message of my example, if that helps :

{
	"content-length": "439",
	"http_requestMethod": "POST",
	"sequenceNumber": "1",
	"sequenceSize": "4",
	"target-protocol": "kafka",
	"accept": "[\"text/plain\",\"application/json\",\"application/*+json\",\"*/*\"]",
	"authorization": "Basic kshfbljgblgb=",
	"b3": "5037da907fd2b327-36bb30cb5c84cd85-0",
	"nativeHeaders": "{\"b3\":[\"5037da907fd2b327-36bb30cb5c84cd85-0\"]}",
	"spring_json_header_types": "{\"content-length\":\"java.lang.Long\",\"http_requestMethod\":\"java.lang.String\",\"sequenceNumber\":\"java.lang.Integer\",\"sequenceSize\":\"java.lang.Integer\",\"target-protocol\":\"java.lang.String\",\"accept\":\"java.util.ArrayList\",\"authorization\":\"java.lang.String\",\"b3\":\"java.lang.String\",\"nativeHeaders\":\"org.springframework.util.LinkedMultiValueMap\",\"host\":\"java.lang.String\",\"http_requestUrl\":\"java.lang.String\",\"connection\":\"java.lang.String\",\"correlationId\":\"java.util.UUID\",\"contentType\":\"java.lang.String\",\"accept-encoding\":\"java.lang.String\",\"user-agent\":\"java.lang.String\"}",
	"host": "10.152.xxx.yyy:8181",
	"http_requestUrl": "http://10.152.xxx.yyy:8181/",
	"connection": "Keep-Alive",
	"correlationId": "\"78c5121d-a229-ec2e-3201-8233291b4f10\"",
	"contentType": "application/json",
	"accept-encoding": "gzip,deflate",
	"user-agent": "Apache-HttpClient/4.5.13 (Java/11.0.20.1)"
}

@artembilan
Copy link
Member

Well, those headers are OK and they are probably a part of Kafka record before it is pulled by consumer.
And then this Kafka Binder consumer adds its own headers and that's what I'd like to investigate: what headers are added by the binder before message is reached the target handler in the function.

@sobychacko , any chances that you can quickly enlighten us what headers are populated by the Kafka Binder on the inbound side?

I wonder what is that org.springframework.kafka.core.DefaultKafkaConsumerFactory$1 and why it is not a top-level header to be skipped from Serializable filter?

Thanks

@sobychacko
Copy link
Collaborator

sobychacko commented Sep 21, 2023

@CEDDM, We spent some time debugging this issue with @artembilan. The problem occurs because of the micrometer listeners we are adding in the binder and hence Spring Kafka creates a proxy object for the actual KafkaConsumer. The proxy is not able to get serialized over the wire. We are thinking about a potential fix. In the meantime, as a potential workaround, can you try to make the following change in the aggregator app and build a custom version for your use?

Add the following bean in your custom version.

@Bean
ClientFactoryCustomizer binderClientFactoryCustomizer() {
    return new ClientFactoryCustomizer() { };
}

When the binder detects this bean, it does not add any micrometer listeners, thus avoiding the proxy issue above.

Once you add that, you need to rebuild the app for the Kafka binder using the standard procedures for building a custom app. You need to rebuild and re-generated the binder-based app for the aggregator until we develop a proper fix at the framework level.

@artembilan
Copy link
Member

Here is the fix for Spring for Apache Kafka: spring-projects/spring-kafka#2822.

Since there is nothing we can fix in this project to mitigate the problem, I'm going to close this as Invalid.

Well, it is valid, but it does not trigger anything to be changed in this project, unless the version upgrade when it is available.
Another workaround for this issue is to not use an external DB for message store in the aggregator processor.

Thank you for the report and understanding!

@artembilan artembilan closed this as not planned Won't fix, can't repro, duplicate, stale Sep 21, 2023
@artembilan artembilan added the invalid This doesn't seem right label Sep 21, 2023
@CEDDM
Copy link
Author

CEDDM commented Sep 22, 2023

Thanks for the fix in Spring Kafka.
In some cases, we really need to store messages to avoid losing them.
The latest stable version is more than a year old. Any ETA for the next stable version with all the recent fixes and hopefully with the Spring Kafka fix ?

@artembilan
Copy link
Member

PR for Spring for Apache Kafka has been just opened, so the fix won't make it the release until October.
At the same time this project might not be updated to next Spring Boot version (to pull that fix from Spring for Apache Kafka) until November.
Therefore you are stuck with suggested workarounds for a while.

Sorry for inconvenience.

Well, I can come up with a workaround for the version of Stream Applications which release is due today, but that is already for fully fresh generation of this project.
Will it work for you upgrading to them afterwards?

@artembilan
Copy link
Member

Correction: the release is postponed to next Wednesday.
Reopening to provide the discussed fix as out-of-the-box workaround for this aggregator application.

@artembilan artembilan reopened this Sep 22, 2023
@artembilan artembilan added this to the 4.0.0 milestone Sep 22, 2023
@artembilan artembilan added enhancement New feature or request bug Something isn't working and removed invalid This doesn't seem right enhancement New feature or request labels Sep 22, 2023
@CEDDM
Copy link
Author

CEDDM commented Sep 22, 2023

Thanks a lot @artembilan ! That's great news !

artembilan added a commit to artembilan/stream-applications that referenced this issue Sep 22, 2023
Fixes spring-cloud#500

When `listeners` are provided for `DefaultKafkaConsumerFactory`,
the target `KafkaConsumer` instance is proxied.
The `java.lang.reflect.Proxy` is `Serializable`,
but the value it is wrapping is not.
When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`),
it checks for `Serializable` type only on top-level object of the header.
Therefore, the `Proxy` is passing condition, but eventually we fail
with `NotSerializableException`, since the proxied object is not like that

* Remove `kafka_consumer` from a message before it reaches an aggregator
with its logic to serialize message into the store

This is a workaround until Spring for Apache Kafka is released
with the fix: spring-projects/spring-kafka#2822
@artembilan
Copy link
Member

See related PR.
But still sorry: even i we back-port it to previous version, there is no clues when we are going to release it.
So, probably better to be ready for upcoming 4.0.0.

onobc pushed a commit that referenced this issue Sep 22, 2023
Fixes #500

When `listeners` are provided for `DefaultKafkaConsumerFactory`,
the target `KafkaConsumer` instance is proxied.
The `java.lang.reflect.Proxy` is `Serializable`,
but the value it is wrapping is not.
When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`),
it checks for `Serializable` type only on top-level object of the header.
Therefore, the `Proxy` is passing condition, but eventually we fail
with `NotSerializableException`, since the proxied object is not like that

* Remove `kafka_consumer` from a message before it reaches an aggregator
with its logic to serialize message into the store

This is a workaround until Spring for Apache Kafka is released
with the fix: spring-projects/spring-kafka#2822
artembilan added a commit to spring-cloud/spring-functions-catalog that referenced this issue Nov 22, 2023
Fixes spring-cloud/stream-applications#500

When `listeners` are provided for `DefaultKafkaConsumerFactory`,
the target `KafkaConsumer` instance is proxied.
The `java.lang.reflect.Proxy` is `Serializable`,
but the value it is wrapping is not.
When the `MessageHeaders` is serialized (e.g. into persistent `MessageStore`),
it checks for `Serializable` type only on top-level object of the header.
Therefore, the `Proxy` is passing condition, but eventually we fail
with `NotSerializableException`, since the proxied object is not like that

* Remove `kafka_consumer` from a message before it reaches an aggregator
with its logic to serialize message into the store

This is a workaround until Spring for Apache Kafka is released
with the fix: spring-projects/spring-kafka#2822
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants