Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limitations of the Resume functionality. No intended to be used by load-balanced server? #1116

Open
ansgarschulte opened this issue Jan 24, 2025 · 2 comments
Labels

Comments

@ansgarschulte
Copy link

I want to use the Resume functionality on a load-balanced server.

Motivation

I can use a persistent ResumableFramesStore by configuring a store with the RSocketServerCustomizer.
However, I cannot provide a custom persistent SessionManager, which is why I receive "unknown resume token" exceptions.

Desired solution

Provide a way to implement a custom SessionManager.

@OlegDokuka
Copy link
Member

Hi, @ansgarschulte !

Thank you for finding time to fill this issue!

Would you mind providing us with a small sample that can showcase what does not work for you exactly?

Thanks,
Oleh

ansgarschulte added a commit to ansgarschulte/rsocket_client_server_resume_echo that referenced this issue Jan 28, 2025
ansgarschulte added a commit to ansgarschulte/rsocket_client_server_resume_echo that referenced this issue Jan 28, 2025
@ansgarschulte
Copy link
Author

ansgarschulte commented Jan 28, 2025

Hi @OlegDokuka

Here you are:
https://github.com/ansgarschulte/rsocket_client_server_resume_echo/tree/feat/redis_store

  1. Start redis app of the repository or a local running redis
  2. Start the server app with RedisResumableFramesStore
  3. note down the server pid
  4. Start client app
  5. see message ping / pong between server and client
  6. kill -9 server_pid
  7. start server application after kill
  8. see error on client:
2025-01-28T20:20:36.655+01:00 ERROR 86185 --- [client] [actor-tcp-nio-6] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: RejectedResumeException (0x4): unknown resume token
Caused by: io.rsocket.exceptions.RejectedResumeException: unknown resume token
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
	reactor.core.publisher.Mono.doFinally(Mono.java:2679)
	io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
	*__Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Original Stack Trace:
		at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]

2025-01-28T20:20:36.659+01:00  INFO 86185 --- [client] [           main] .s.b.a.l.ConditionEvaluationReportLogger : 

Error starting ApplicationContext. To display the condition evaluation report re-run your application with 'debug' enabled.
2025-01-28T20:20:36.669+01:00 ERROR 86185 --- [client] [           main] o.s.boot.SpringApplication               : Application run failed

io.rsocket.exceptions.RejectedResumeException: unknown resume token
	at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoDoFinally] :
	reactor.core.publisher.Mono.doFinally(Mono.java:2679)
	io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
Error has been observed at the following site(s):
	*____Mono.doFinally ⇢ at io.rsocket.resume.ClientRSocketSession.<init>(ClientRSocketSession.java:114)
	*__________Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
	|_         Flux.map ⇢ at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveFlux(DefaultRSocketRequester.java:324)
	|_ Flux.collectList ⇢ at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:52)
Original Stack Trace:
		at io.rsocket.exceptions.Exceptions.from(Exceptions.java:64) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.resume.ClientRSocketSession.tryReestablishSession(ClientRSocketSession.java:287) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxFirstWithSignal$FirstEmittingSubscriber.onNext(FluxFirstWithSignal.java:333) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176) ~[reactor-core-3.7.1.jar:3.7.1]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:115) ~[rsocket-core-1.1.3.jar:na]
		at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19) ~[rsocket-core-1.1.3.jar:na]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:435) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115) ~[reactor-netty-core-1.2.1.jar:1.2.1]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.116.Final.jar:4.1.116.Final]
		at java.base/java.lang.Thread.run(Thread.java:1575) ~[na:na]
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
		at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:53) ~[classes/:na]
		at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
		at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
		at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
		at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
		at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:104) ~[reactor-core-3.7.1.jar:3.7.1]
		at reactor.core.publisher.Mono.block(Mono.java:1779) ~[reactor-core-3.7.1.jar:3.7.1]
		at com.example.client.RSocketClientApplication.run(RSocketClientApplication.java:60) ~[classes/:na]
		at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:788) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:82) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:86) ~[spring-core-6.2.1.jar:6.2.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:796) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:787) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[na:na]
		at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:571) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:560) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151) ~[na:na]
		at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174) ~[na:na]
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265) ~[na:na]
		at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:636) ~[na:na]
		at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:772) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:325) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1361) ~[spring-boot-3.4.1.jar:3.4.1]
		at org.springframework.boot.SpringApplication.run(SpringApplication.java:1350) ~[spring-boot-3.4.1.jar:3.4.1]
		at com.example.client.RSocketClientApplication.main(RSocketClientApplication.java:26) ~[classes/:na]


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants