Skip to content

fix #1177 Operators to capture last visible Scheduler and go back to it #1644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -8621,6 +8622,14 @@ public final Flux<Flux<T>> window(int maxSize) {
return onAssembly(new FluxWindow<>(this, maxSize, Queues.get(maxSize)));
}

public final Flux<T> captureScheduler() {
return onAssembly(new FluxSchedulerCapture(this));
}

public final Flux<T> publishOnCapturedOr(Supplier<Scheduler> alternative) {
return onAssembly(new FluxPublishOnCaptured<>(this, alternative));
}

/**
* Split this {@link Flux} sequence into multiple {@link Flux} windows of size
* {@code maxSize}, that each open every {@code skip} elements in the source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,15 @@ public void subscribe(CoreSubscriber<? super T> actual) {
queueSupplier));
}

static final class PublishOnSubscriber<T>
static class PublishOnSubscriber<T>
implements QueueSubscription<T>, Runnable, InnerOperator<T, T> {

final CoreSubscriber<? super T> actual;

final Scheduler scheduler;

final Worker worker;
@Nullable
volatile Scheduler scheduler;
@Nullable
volatile Worker worker;

final boolean delayError;

Expand Down Expand Up @@ -158,8 +159,8 @@ static final class PublishOnSubscriber<T>
boolean outputFused;

PublishOnSubscriber(CoreSubscriber<? super T> actual,
Scheduler scheduler,
Worker worker,
@Nullable Scheduler scheduler,
@Nullable Worker worker,
boolean delayError,
int prefetch,
int lowTide,
Expand Down Expand Up @@ -270,7 +271,10 @@ public void cancel() {

cancelled = true;
s.cancel();
worker.dispose();
Worker w = worker;
if (w != null) {
w.dispose();
}

if (WIP.getAndIncrement(this) == 0) {
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
Expand All @@ -285,14 +289,22 @@ void trySchedule(
return;
}

try {
worker.schedule(this);
}
catch (RejectedExecutionException ree) {
Worker w = worker;
if (w == null) {
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.onError(Operators.onOperatorError(new NullPointerException("worker is still undefined in trySchedule"),
actual.currentContext()));
}
else {
try {
w.schedule(this);
}
catch (RejectedExecutionException ree) {
Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null);
actual.onError(Operators.onRejectedExecution(ree, subscription, suppressed, dataSignal,
actual.currentContext()));
}
}
}

void runSync() {
Expand Down Expand Up @@ -460,15 +472,19 @@ void runBackfused() {

void doComplete(Subscriber<?> a) {
a.onComplete();
worker.dispose();
if (worker != null) {
worker.dispose();
}
}

void doError(Subscriber<?> a, Throwable e) {
try {
a.onError(e);
}
finally {
worker.dispose();
if (worker != null) {
worker.dispose();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.function.Supplier;

import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.core.scheduler.Scheduler;
import reactor.util.concurrent.Queues;
import reactor.util.context.Context;

/**
* @author Simon Baslé
*/
final class FluxPublishOnCaptured<T> extends FluxOperator<T, T> {

final Supplier<Scheduler> alternative;

FluxPublishOnCaptured(Flux<? extends T> source, Supplier<Scheduler> alternative) {
super(source);
this.alternative = alternative;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new PublishOnCapturedSubscriber<>(actual, alternative));
}

static final class PublishOnCapturedSubscriber<T>
extends FluxPublishOn.PublishOnSubscriber<T> {

final Context context;
final Supplier<Scheduler> alternativeSupplier;

PublishOnCapturedSubscriber(CoreSubscriber<? super T> actual, Supplier<Scheduler> alternativeSupplier) {
super(actual,
null,
null,
true,
Queues.SMALL_BUFFER_SIZE,
Queues.SMALL_BUFFER_SIZE,
Queues.get(Queues.SMALL_BUFFER_SIZE));
this.context = actual.currentContext()
.put(PublishOnCapturedSubscriber.class, this);
this.alternativeSupplier = alternativeSupplier;
}

@Override
public Context currentContext() {
return this.context;
}

@Override
public void onSubscribe(Subscription s) {
if (scheduler == null && !done && !cancelled) {
Scheduler alternative = alternativeSupplier.get();
this.scheduler = alternative;
this.worker = alternative.createWorker();
}
super.onSubscribe(s);
}

void set(Scheduler scheduler) {
if (this.scheduler != null || done || cancelled) {
return;
}
this.scheduler = scheduler;
this.worker = scheduler.createWorker();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2011-2019 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.NoSuchElementException;

import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.scheduler.Scheduler;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/**
* @author Simon Baslé
*/
final class FluxSchedulerCapture<T> extends FluxOperator<T, T> {

@Nullable
final Scheduler scheduler;

FluxSchedulerCapture(Flux<? extends T> source) {
super(source);
this.scheduler = Scannable.from(this)
.parents()
.map(parent -> parent.scan(Attr.RUN_ON))
.filter(s -> s instanceof Scheduler)
.map(s -> (Scheduler) s)
.findFirst()
.orElse(null);
}

@Override
public Object scanUnsafe(Attr key) {
if (key == Attr.RUN_ON) return scheduler;

return super.scanUnsafe(key);
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
if (scheduler != null) {
Context c = actual.currentContext();
try {
FluxPublishOnCaptured.PublishOnCapturedSubscriber<?> schedulerStore = c.get(FluxPublishOnCaptured.PublishOnCapturedSubscriber.class);
schedulerStore.set(scheduler);
}
catch (NoSuchElementException noRunOnCaptured) {
//NO OP
//TODO should we fail the operator in this case?
}
}
source.subscribe(actual);
}
}
Loading