Skip to content

Throw wrapped TimeoutException on Mono.block* and Flux.block* #3733

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.reactivestreams.Subscription;
import reactor.core.Disposable;
Expand Down Expand Up @@ -148,7 +149,8 @@ final Optional<T> blockingGet(long timeout, TimeUnit unit) {
try {
if (!await(timeout, unit)) {
dispose();
throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
String errorMessage = "Timeout on blocking read for " + timeout + " " + unit;
throw new IllegalStateException(errorMessage, new TimeoutException(errorMessage));
Comment on lines +152 to +153
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public Optional<T> blockOptional(Duration timeout) {

I checked that Mono.blockOptional(timeout) use this code~!

}
}
catch (InterruptedException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.reactivestreams.Subscription;
import reactor.core.Disposable;
Expand Down Expand Up @@ -124,7 +125,8 @@ final T blockingGet(long timeout, TimeUnit unit) {
try {
if (!await(timeout, unit)) {
dispose();
throw new IllegalStateException("Timeout on blocking read for " + timeout + " " + unit);
String errorMessage = "Timeout on blocking read for " + timeout + " " + unit;
throw new IllegalStateException(errorMessage, new TimeoutException(errorMessage));
Comment on lines +128 to +129
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public final T blockFirst(Duration timeout) {

public final T blockLast(Duration timeout) {

I checked that Mono#block(timeout), Flux#blockFirst(timeout), Flux#blockLast(timeout) use this code~!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* If the provided timeout expires, a {@link RuntimeException} is thrown.

// AS-IS
If the provided timeout expires, a {@link RuntimeException} is thrown.

// TO-BE
If the provided timeout expires, a {@link RuntimeException} is thrown with {@link TimeoutException} as the cause.

(just curiosity) should we enhance document on Mono#block*, Flux#block* like above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ update javadoc :)

}
}
catch (InterruptedException ex) {
Expand Down
8 changes: 6 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2771,7 +2771,8 @@ public final T blockFirst() {
* signals its first value, completes or a timeout expires. Returns that value,
* or null if the Flux completes empty. In case the Flux errors, the original
* exception is thrown (wrapped in a {@link RuntimeException} if it was a checked
* exception). If the provided timeout expires, a {@link RuntimeException} is thrown.
* exception). If the provided timeout expires, a {@link RuntimeException} is thrown
* with a {@link TimeoutException} as the cause.
* <p>
* Note that each blockFirst() will trigger a new subscription: in other words,
* the result might miss signal from hot publishers.
Expand All @@ -2780,6 +2781,7 @@ public final T blockFirst() {
* <img class="marble" src="doc-files/marbles/blockFirstWithTimeout.svg" alt="">
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
* with a {@link TimeoutException} as the cause
* @return the first value or null
*/
@Nullable
Expand Down Expand Up @@ -2821,7 +2823,8 @@ public final T blockLast() {
* signals its last value, completes or a timeout expires. Returns that value,
* or null if the Flux completes empty. In case the Flux errors, the original
* exception is thrown (wrapped in a {@link RuntimeException} if it was a checked
* exception). If the provided timeout expires, a {@link RuntimeException} is thrown.
* exception). If the provided timeout expires, a {@link RuntimeException} is thrown
* with a {@link TimeoutException} as the cause.
* <p>
* Note that each blockLast() will trigger a new subscription: in other words,
* the result might miss signal from hot publishers.
Expand All @@ -2830,6 +2833,7 @@ public final T blockLast() {
* <img class="marble" src="doc-files/marbles/blockLastWithTimeout.svg" alt="">
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
* with a {@link TimeoutException} as the cause
* @return the last value or null
*/
@Nullable
Expand Down
8 changes: 6 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1784,7 +1784,8 @@ public T block() {
* received or a timeout expires. Returns that value, or null if the Mono completes
* empty. In case the Mono errors, the original exception is thrown (wrapped in a
* {@link RuntimeException} if it was a checked exception).
* If the provided timeout expires, a {@link RuntimeException} is thrown.
* If the provided timeout expires, a {@link RuntimeException} is thrown
* with a {@link TimeoutException} as the cause.
*
* <p>
* <img class="marble" src="doc-files/marbles/blockWithTimeout.svg" alt="">
Expand All @@ -1793,6 +1794,7 @@ public T block() {
* might miss signal from hot publishers.
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
* with a {@link TimeoutException} as the cause
*
* @return T the result
*/
Expand Down Expand Up @@ -1836,7 +1838,8 @@ public Optional<T> blockOptional() {
* Exception via {@link Optional#orElseThrow(Supplier)}.
* In case the Mono itself errors, the original exception is thrown (wrapped in a
* {@link RuntimeException} if it was a checked exception).
* If the provided timeout expires, a {@link RuntimeException} is thrown.
* If the provided timeout expires, a {@link RuntimeException} is thrown
* with a {@link TimeoutException} as the cause.
*
* <p>
* <img class="marble" src="doc-files/marbles/blockOptionalWithTimeout.svg" alt="">
Expand All @@ -1845,6 +1848,7 @@ public Optional<T> blockOptional() {
* might miss signal from hot publishers.
*
* @param timeout maximum time period to wait for before raising a {@link RuntimeException}
* with a {@link TimeoutException} as the cause
*
* @return T the result
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -103,7 +104,8 @@ public void timeoutOptionalTimingOut() {
// Using sub-millis timeouts after gh-1734
assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> source.blockOptional(Duration.ofNanos(100)))
.withMessage("Timeout on blocking read for 100 NANOSECONDS");
.withMessage("Timeout on blocking read for 100 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 100 NANOSECONDS"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the Javadoc too

}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -87,7 +88,8 @@ public void blockingFirstTimeout() {
assertThatIllegalStateException().isThrownBy(() ->
Flux.just(1).delayElements(Duration.ofSeconds(1))
.blockFirst(Duration.ofMillis(1)))
.withMessage("Timeout on blocking read for 1000000 NANOSECONDS");
.withMessage("Timeout on blocking read for 1000000 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 1000000 NANOSECONDS"));
}

@Test
Expand Down Expand Up @@ -115,7 +117,8 @@ public void blockingLastTimeout() {
assertThatIllegalStateException().isThrownBy(() ->
Flux.just(1).delayElements(Duration.ofMillis(100))
.blockLast(Duration.ofNanos(50)))
.withMessage("Timeout on blocking read for 50 NANOSECONDS");
.withMessage("Timeout on blocking read for 50 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 50 NANOSECONDS"));
}


Expand Down Expand Up @@ -287,7 +290,8 @@ public void monoBlockOptionalDoesntCancel() {
@Test
public void monoBlockSupportsNanos() {
assertThatIllegalStateException().isThrownBy(() -> Mono.never().block(Duration.ofNanos(9_000L)))
.withMessage("Timeout on blocking read for 9000 NANOSECONDS");
.withMessage("Timeout on blocking read for 9000 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 9000 NANOSECONDS"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package reactor.core.publisher;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -247,7 +248,8 @@ void blockNegativeIsImmediateTimeout() {

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> sink.block(Duration.ofNanos(-1)))
.withMessage("Timeout on blocking read for 0 NANOSECONDS");
.withMessage("Timeout on blocking read for 0 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 0 NANOSECONDS"));

assertThat(Duration.ofNanos(System.nanoTime() - start))
.isLessThan(Duration.ofMillis(500));
Expand All @@ -260,7 +262,8 @@ void blockZeroIsImmediateTimeout() {

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(() -> sink.block(Duration.ZERO))
.withMessage("Timeout on blocking read for 0 NANOSECONDS");
.withMessage("Timeout on blocking read for 0 NANOSECONDS")
.withCause(new TimeoutException("Timeout on blocking read for 0 NANOSECONDS"));

assertThat(Duration.ofNanos(System.nanoTime() - start))
.isLessThan(Duration.ofMillis(500));
Expand Down