26
26
import java .util .Optional ;
27
27
import java .util .concurrent .CompletableFuture ;
28
28
import java .util .concurrent .ConcurrentHashMap ;
29
+ import java .util .concurrent .atomic .AtomicBoolean ;
29
30
import java .util .function .Supplier ;
30
31
31
32
import org .apache .commons .logging .Log ;
@@ -440,13 +441,40 @@ protected void clearMetadataCache() {
440
441
return cacheHit ;
441
442
}
442
443
444
+ @ SuppressWarnings ("unchecked" )
443
445
private @ Nullable Object executeSynchronized (CacheOperationInvoker invoker , Method method , CacheOperationContexts contexts ) {
444
446
CacheOperationContext context = contexts .get (CacheableOperation .class ).iterator ().next ();
445
447
if (isConditionPassing (context , CacheOperationExpressionEvaluator .NO_RESULT )) {
446
448
Object key = generateKey (context , CacheOperationExpressionEvaluator .NO_RESULT );
447
449
Cache cache = context .getCaches ().iterator ().next ();
448
450
if (CompletableFuture .class .isAssignableFrom (method .getReturnType ())) {
449
- return doRetrieve (cache , key , () -> (CompletableFuture <?>) invokeOperation (invoker ));
451
+ AtomicBoolean invokeFailure = new AtomicBoolean (false );
452
+ CompletableFuture <?> result = doRetrieve (cache , key ,
453
+ () -> {
454
+ CompletableFuture <?> invokeResult = ((CompletableFuture <?>) invokeOperation (invoker ));
455
+ if (invokeResult == null ) {
456
+ return null ;
457
+ }
458
+ return invokeResult .exceptionallyCompose (ex -> {
459
+ invokeFailure .set (true );
460
+ return CompletableFuture .failedFuture (ex );
461
+ });
462
+ });
463
+ return result .exceptionallyCompose (ex -> {
464
+ if (!(ex instanceof RuntimeException rex )) {
465
+ return CompletableFuture .failedFuture (ex );
466
+ }
467
+ try {
468
+ getErrorHandler ().handleCacheGetError (rex , cache , key );
469
+ if (invokeFailure .get ()) {
470
+ return CompletableFuture .failedFuture (ex );
471
+ }
472
+ return (CompletableFuture ) invokeOperation (invoker );
473
+ }
474
+ catch (Throwable ex2 ) {
475
+ return CompletableFuture .failedFuture (ex2 );
476
+ }
477
+ });
450
478
}
451
479
if (this .reactiveCachingHandler != null ) {
452
480
Object returnValue = this .reactiveCachingHandler .executeSynchronized (invoker , method , cache , key );
@@ -505,9 +533,17 @@ protected void clearMetadataCache() {
505
533
if (CompletableFuture .class .isAssignableFrom (context .getMethod ().getReturnType ())) {
506
534
CompletableFuture <?> result = doRetrieve (cache , key );
507
535
if (result != null ) {
508
- return result .exceptionally (ex -> {
509
- getErrorHandler ().handleCacheGetError ((RuntimeException ) ex , cache , key );
510
- return null ;
536
+ return result .exceptionallyCompose (ex -> {
537
+ if (!(ex instanceof RuntimeException rex )) {
538
+ return CompletableFuture .failedFuture (ex );
539
+ }
540
+ try {
541
+ getErrorHandler ().handleCacheGetError (rex , cache , key );
542
+ return CompletableFuture .completedFuture (null );
543
+ }
544
+ catch (Throwable ex2 ) {
545
+ return CompletableFuture .failedFuture (ex2 );
546
+ }
511
547
}).thenCompose (value -> (CompletableFuture <?>) evaluate (
512
548
(value != null ? CompletableFuture .completedFuture (unwrapCacheValue (value )) : null ),
513
549
invoker , method , contexts ));
@@ -1075,31 +1111,71 @@ private class ReactiveCachingHandler {
1075
1111
1076
1112
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry .getSharedInstance ();
1077
1113
1114
+ @ SuppressWarnings ({"rawtypes" , "unchecked" })
1078
1115
public @ Nullable Object executeSynchronized (CacheOperationInvoker invoker , Method method , Cache cache , Object key ) {
1116
+ AtomicBoolean invokeFailure = new AtomicBoolean (false );
1079
1117
ReactiveAdapter adapter = this .registry .getAdapter (method .getReturnType ());
1080
1118
if (adapter != null ) {
1081
1119
if (adapter .isMultiValue ()) {
1082
1120
// Flux or similar
1083
1121
return adapter .fromPublisher (Flux .from (Mono .fromFuture (
1084
- cache .retrieve (key ,
1085
- () -> Flux .from (adapter .toPublisher (invokeOperation (invoker ))).collectList ().toFuture ())))
1086
- .flatMap (Flux ::fromIterable ));
1122
+ doRetrieve (cache , key ,
1123
+ () -> Flux .from (adapter .toPublisher (invokeOperation (invoker ))).collectList ().doOnError (ex -> invokeFailure .set (true )).toFuture ())))
1124
+ .flatMap (Flux ::fromIterable )
1125
+ .onErrorResume (RuntimeException .class , ex -> {
1126
+ try {
1127
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1128
+ if (invokeFailure .get ()) {
1129
+ return Flux .error (ex );
1130
+ }
1131
+ return Flux .from (adapter .toPublisher (invokeOperation (invoker )));
1132
+ }
1133
+ catch (RuntimeException exception ) {
1134
+ return Flux .error (exception );
1135
+ }
1136
+ }));
1087
1137
}
1088
1138
else {
1089
1139
// Mono or similar
1090
1140
return adapter .fromPublisher (Mono .fromFuture (
1091
- cache .retrieve (key ,
1092
- () -> Mono .from (adapter .toPublisher (invokeOperation (invoker ))).toFuture ())));
1141
+ doRetrieve (cache , key ,
1142
+ () -> Mono .from (adapter .toPublisher (invokeOperation (invoker ))).doOnError (ex -> invokeFailure .set (true )).toFuture ()))
1143
+ .onErrorResume (RuntimeException .class , ex -> {
1144
+ try {
1145
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1146
+ if (invokeFailure .get ()) {
1147
+ return Mono .error (ex );
1148
+ }
1149
+ return Mono .from (adapter .toPublisher (invokeOperation (invoker )));
1150
+ }
1151
+ catch (RuntimeException exception ) {
1152
+ return Mono .error (exception );
1153
+ }
1154
+ }));
1093
1155
}
1094
1156
}
1095
1157
if (KotlinDetector .isSuspendingFunction (method )) {
1096
- return Mono .fromFuture (cache .retrieve (key , () -> {
1097
- Mono <?> mono = ((Mono <?>) invokeOperation (invoker ));
1098
- if (mono == null ) {
1158
+ return Mono .fromFuture (doRetrieve (cache , key , () -> {
1159
+ Mono <?> mono = (Mono <?>) invokeOperation (invoker );
1160
+ if (mono != null ) {
1161
+ mono = mono .doOnError (ex -> invokeFailure .set (true ));
1162
+ }
1163
+ else {
1099
1164
mono = Mono .empty ();
1100
1165
}
1101
1166
return mono .toFuture ();
1102
- }));
1167
+ })).onErrorResume (RuntimeException .class , ex -> {
1168
+ try {
1169
+ getErrorHandler ().handleCacheGetError (ex , cache , key );
1170
+ if (invokeFailure .get ()) {
1171
+ return Mono .error (ex );
1172
+ }
1173
+ return (Mono ) invokeOperation (invoker );
1174
+ }
1175
+ catch (RuntimeException exception ) {
1176
+ return Mono .error (exception );
1177
+ }
1178
+ });
1103
1179
}
1104
1180
return NOT_HANDLED ;
1105
1181
}
@@ -1113,7 +1189,7 @@ private class ReactiveCachingHandler {
1113
1189
return NOT_HANDLED ;
1114
1190
}
1115
1191
1116
- @ SuppressWarnings ({ "unchecked " , "rawtypes" })
1192
+ @ SuppressWarnings ({"rawtypes " , "unchecked" })
1117
1193
public @ Nullable Object findInCaches (CacheOperationContext context , Cache cache , Object key ,
1118
1194
CacheOperationInvoker invoker , Method method , CacheOperationContexts contexts ) {
1119
1195
0 commit comments