Skip to content

Commit 8974da2

Browse files
zizaresimonbasle
authored andcommitted
Use error handler for reactive cache aspect
This change ensures that the cache error handler is used in case of future-based or publisher-based asynchronous caching completing with an exception. Closes gh-33073
1 parent ab236c7 commit 8974da2

File tree

2 files changed

+111
-4
lines changed

2 files changed

+111
-4
lines changed

spring-context/src/main/java/org/springframework/cache/interceptor/CacheAspectSupport.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,10 @@ private Object findInCaches(CacheOperationContext context, Object key,
503503
if (CompletableFuture.class.isAssignableFrom(context.getMethod().getReturnType())) {
504504
CompletableFuture<?> result = cache.retrieve(key);
505505
if (result != null) {
506-
return result.thenCompose(value -> (CompletableFuture<?>) evaluate(
506+
return result.exceptionally(ex -> {
507+
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
508+
return null;
509+
}).thenCompose(value -> (CompletableFuture<?>) evaluate(
507510
(value != null ? CompletableFuture.completedFuture(unwrapCacheValue(value)) : null),
508511
invoker, method, contexts));
509512
}
@@ -1131,12 +1134,30 @@ public Object findInCaches(CacheOperationContext context, Cache cache, Object ke
11311134
if (adapter.isMultiValue()) {
11321135
return adapter.fromPublisher(Flux.from(Mono.fromFuture(cachedFuture))
11331136
.switchIfEmpty(Flux.defer(() -> (Flux) evaluate(null, invoker, method, contexts)))
1134-
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts)));
1137+
.flatMap(v -> evaluate(valueToFlux(v, contexts), invoker, method, contexts))
1138+
.onErrorResume(RuntimeException.class, ex -> {
1139+
try {
1140+
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
1141+
return evaluate(null, invoker, method, contexts);
1142+
}
1143+
catch (RuntimeException exception) {
1144+
return Flux.error(exception);
1145+
}
1146+
}));
11351147
}
11361148
else {
11371149
return adapter.fromPublisher(Mono.fromFuture(cachedFuture)
11381150
.switchIfEmpty(Mono.defer(() -> (Mono) evaluate(null, invoker, method, contexts)))
1139-
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts)));
1151+
.flatMap(v -> evaluate(Mono.justOrEmpty(unwrapCacheValue(v)), invoker, method, contexts))
1152+
.onErrorResume(RuntimeException.class, ex -> {
1153+
try {
1154+
getErrorHandler().handleCacheGetError((RuntimeException) ex, cache, key);
1155+
return evaluate(null, invoker, method, contexts);
1156+
}
1157+
catch (RuntimeException exception) {
1158+
return Mono.error(exception);
1159+
}
1160+
}));
11401161
}
11411162
}
11421163
return NOT_HANDLED;

spring-context/src/test/java/org/springframework/cache/annotation/ReactiveCachingTests.java

+87-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818

1919
import java.util.List;
2020
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.CompletionException;
2122
import java.util.concurrent.atomic.AtomicLong;
2223

24+
import org.junit.jupiter.api.Test;
2325
import org.junit.jupiter.params.ParameterizedTest;
2426
import org.junit.jupiter.params.provider.ValueSource;
2527
import reactor.core.publisher.Flux;
@@ -29,12 +31,15 @@
2931
import org.springframework.cache.CacheManager;
3032
import org.springframework.cache.concurrent.ConcurrentMapCache;
3133
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
34+
import org.springframework.cache.interceptor.CacheErrorHandler;
35+
import org.springframework.cache.interceptor.LoggingCacheErrorHandler;
3236
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
3337
import org.springframework.context.annotation.Bean;
3438
import org.springframework.context.annotation.Configuration;
3539
import org.springframework.lang.Nullable;
3640

3741
import static org.assertj.core.api.Assertions.assertThat;
42+
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
3843

3944
/**
4045
* Tests for annotation-based caching methods that use reactive operators.
@@ -113,6 +118,51 @@ void cacheHitDetermination(Class<?> configClass) {
113118
ctx.close();
114119
}
115120

121+
@Test
122+
void cacheErrorHandlerWithLoggingCacheErrorHandler() {
123+
AnnotationConfigApplicationContext ctx =
124+
new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class, ErrorHandlerCachingConfiguration.class);
125+
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);
126+
127+
Object key = new Object();
128+
Long r1 = service.cacheFuture(key).join();
129+
130+
assertThat(r1).isNotNull();
131+
assertThat(r1).as("cacheFuture").isEqualTo(0L);
132+
133+
key = new Object();
134+
135+
r1 = service.cacheMono(key).block();
136+
137+
assertThat(r1).isNotNull();
138+
assertThat(r1).as("cacheMono").isEqualTo(1L);
139+
140+
key = new Object();
141+
142+
r1 = service.cacheFlux(key).blockFirst();
143+
144+
assertThat(r1).isNotNull();
145+
assertThat(r1).as("cacheFlux blockFirst").isEqualTo(2L);
146+
}
147+
148+
@Test
149+
void cacheErrorHandlerWithSimpleCacheErrorHandler() {
150+
AnnotationConfigApplicationContext ctx =
151+
new AnnotationConfigApplicationContext(ExceptionCacheManager.class, ReactiveCacheableService.class);
152+
ReactiveCacheableService service = ctx.getBean(ReactiveCacheableService.class);
153+
154+
Throwable completableFuturThrowable = catchThrowable(() -> service.cacheFuture(new Object()).join());
155+
assertThat(completableFuturThrowable).isInstanceOf(CompletionException.class)
156+
.extracting(Throwable::getCause)
157+
.isInstanceOf(UnsupportedOperationException.class);
158+
159+
Throwable monoThrowable = catchThrowable(() -> service.cacheMono(new Object()).block());
160+
assertThat(monoThrowable).isInstanceOf(UnsupportedOperationException.class);
161+
162+
Throwable fluxThrowable = catchThrowable(() -> service.cacheFlux(new Object()).blockFirst());
163+
assertThat(fluxThrowable).isInstanceOf(UnsupportedOperationException.class);
164+
}
165+
116166
@ParameterizedTest
117167
@ValueSource(classes = {EarlyCacheHitDeterminationConfig.class,
118168
EarlyCacheHitDeterminationWithoutNullValuesConfig.class,
@@ -139,7 +189,6 @@ void fluxCacheDoesntDependOnFirstRequest(Class<?> configClass) {
139189
ctx.close();
140190
}
141191

142-
143192
@CacheConfig(cacheNames = "first")
144193
static class ReactiveCacheableService {
145194

@@ -242,4 +291,41 @@ public void put(Object key, @Nullable Object value) {
242291
}
243292
}
244293

294+
@Configuration
295+
static class ErrorHandlerCachingConfiguration implements CachingConfigurer {
296+
297+
@Bean
298+
@Override
299+
public CacheErrorHandler errorHandler() {
300+
return new LoggingCacheErrorHandler();
301+
}
302+
}
303+
304+
@Configuration(proxyBeanMethods = false)
305+
@EnableCaching
306+
static class ExceptionCacheManager {
307+
308+
@Bean
309+
CacheManager cacheManager() {
310+
return new ConcurrentMapCacheManager("first") {
311+
@Override
312+
protected Cache createConcurrentMapCache(String name) {
313+
return new ConcurrentMapCache(name, isAllowNullValues()) {
314+
@Override
315+
public CompletableFuture<?> retrieve(Object key) {
316+
return CompletableFuture.supplyAsync(() -> {
317+
throw new UnsupportedOperationException("Test exception on retrieve");
318+
});
319+
}
320+
321+
@Override
322+
public void put(Object key, @Nullable Object value) {
323+
throw new UnsupportedOperationException("Test exception on put");
324+
}
325+
};
326+
}
327+
};
328+
}
329+
}
330+
245331
}

0 commit comments

Comments
 (0)