Skip to content

throw exception on errorConsumer, client will not finish #1105

Open
@levil3

Description

@levil3

I tried throwing an exception in the errorConsumer on the server, so that the client's request seems to not be released, and will not do doOnComplete() and map()

Expected Behavior

I expect it to print 10 times "complete 1"

Actual Behavior

But it didn't print even once

Steps to Reproduce

my code is :

  • server
public static void main(String[] args) throws InterruptedException {
        RSocket rsocket = new RSocket() {
            @Override
            public Flux<Payload> requestStream(Payload p) {
                return Flux.create(emitter -> Mono.just("test").subscribe(payload -> {
                    throw new RuntimeException();
                }, throwable -> {
                    throw new RuntimeException();
                }, emitter::complete));
            }
        };

        LoopResources resources = LoopResources.create("test", 1, 2, true);
        TcpServer server = TcpServer.create().host("localhost").port(8090).runOn(resources);
        RSocketServer.create(SocketAcceptor.with(rsocket))
                .bind(TcpServerTransport.create(server))
                .subscribe();

        // wait
        TimeUnit.MINUTES.sleep(10);
}
  • client
public static void main(String[] args) throws InterruptedException {
        RSocket rSocket = RSocketConnector.create()
                .payloadDecoder(PayloadDecoder.ZERO_COPY)
                .connect(TcpClientTransport.create(8090))
                .doOnSuccess(rSocket1 -> {
                    System.out.println("connect server success!");
                })
                .block();

        for (int i = 0; i < 10; ++i) {
            Payload payload = ByteBufPayload.create("0");
            rSocket.requestStream(payload).map(Payload::getDataUtf8).doOnComplete(() -> {
                System.out.println("complete 1");
            }).map(e -> {
                System.out.println("doMap");
                return e;
            }).blockLast();
        }

        Thread.sleep(TimeUnit.SECONDS.toNanos(1000));

        // dispose
        rSocket.dispose();
}

Possible Solution

Your Environment

  • RSocket version(s) used: v1.1.4
  • jdk: 1.8

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions