diff --git a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
index 45b5870dfecc..accc9479f449 100644
--- a/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
+++ b/spring-context-support/src/main/java/org/springframework/cache/transaction/TransactionAwareCacheDecorator.java
@@ -16,22 +16,28 @@
package org.springframework.cache.transaction;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
-
import org.springframework.cache.Cache;
+import org.springframework.cache.support.SimpleValueWrapper;
import org.springframework.lang.Nullable;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
/**
* Cache decorator which synchronizes its {@link #put}, {@link #evict} and
* {@link #clear} operations with Spring-managed transactions (through Spring's
* {@link TransactionSynchronizationManager}), performing the actual cache
* put/evict/clear operation only in the after-commit phase of a successful
- * transaction. If no transaction is active, {@link #put}, {@link #evict} and
+ * transaction. Within a transaction this decorator provides consistency for all operations
+ * performed in order and read-committed isolation from other transactions.
+ * If no transaction is active, {@link #put}, {@link #evict} and
* {@link #clear} operations will be performed immediately, as usual.
*
*
Note: Use of immediate operations such as {@link #putIfAbsent} and
@@ -46,16 +52,42 @@
*/
public class TransactionAwareCacheDecorator implements Cache {
+ // Special value which marks a value as being evicted within the transaction state
+ private static final Object EVICTED = new Object();
+
private final Cache targetCache;
+ // Thread local for storing the changes made within a transaction
+ private final ThreadLocal transactionState = new ThreadLocal<>();
+
+ // Whether commits should be synchronized or not
+ private final boolean synchronizeCommits;
/**
* Create a new TransactionAwareCache for the given target Cache.
+ *
+ * Commits are not synchronized for efficiency, meaning concurrent clear/evict/put events in concurrent transactions could yield different results on the size of the cache
+ * after all transactions are done.
+ *
* @param targetCache the target Cache to decorate
*/
public TransactionAwareCacheDecorator(Cache targetCache) {
+ this(targetCache, false);
+ }
+
+ /**
+ * Create a new TransactionAwareCache for the given target Cache, specifying the consistency needed for transaction commits.
+ *
+ * If synchronizedCommits is set to true, all commits of concurrent transactions are performed in a synchronized fashion, thereby yielding predictable results
+ * for the size of the cache after all operations are finished.
+ *
+ * @param targetCache the target Cache to decorate
+ * @param synchronizeCommits whether full consistency should be maintained for concurrent transaction commits, i.e. all commits are synchronized to the cache.
+ */
+ public TransactionAwareCacheDecorator(Cache targetCache, boolean synchronizeCommits) {
Assert.notNull(targetCache, "Target Cache must not be null");
this.targetCache = targetCache;
+ this.synchronizeCommits = synchronizeCommits;
}
@@ -79,18 +111,50 @@ public Object getNativeCache() {
@Override
@Nullable
public ValueWrapper get(Object key) {
- return this.targetCache.get(key);
+ final var transactionState = getTransactionState();
+ final var current = transactionState == null ? null : transactionState.get(key);
+ if (current != null) {
+ return convert(current);
+ } else {
+ return this.targetCache.get(key);
+ }
}
@Override
@Nullable
public T get(Object key, @Nullable Class type) {
- return this.targetCache.get(key, type);
+ final var transactionState = getTransactionState();
+ final var wrapper = transactionState == null ? null : transactionState.get(key);
+ if (wrapper != null) {
+ // Unwrap
+ final var value = convertValue(wrapper);
+ return cast(value, type);
+ } else {
+ return this.targetCache.get(key, type);
+ }
}
@Override
@Nullable
public T get(Object key, Callable valueLoader) {
+ final var transactionState = getTransactionState();
+ if (transactionState != null) {
+ final var wrapper = transactionState.get(key);
+ final boolean isEvicted = wrapper != null && wrapper.get() == EVICTED;
+ if (isEvicted || (wrapper == null && this.targetCache.get(key) == null)) {
+ // Compute value within transaction state
+ final T value;
+ try {
+ value = valueLoader.call();
+ } catch (Exception e) {
+ throw new ValueRetrievalException(key, valueLoader, e);
+ }
+ transactionState.put(key, value);
+ return value;
+ } else if (wrapper != null) {
+ return convertValue(wrapper);
+ }
+ }
return this.targetCache.get(key, valueLoader);
}
@@ -107,15 +171,10 @@ public CompletableFuture retrieve(Object key, Supplier modifications = new LinkedHashMap<>();
+
+ boolean clearCalled = false;
+
+ @Nullable
+ ValueWrapper get(Object key) {
+ var result = modifications.get(key);
+ if (result == null && clearCalled) {
+ result = new SimpleValueWrapper(EVICTED);
+ }
+ return result;
+ }
+
+
+ void clear() {
+ clearCalled = true;
+ modifications.clear();
+ }
+
+ void reset() {
+ clearCalled = false;
+ modifications.clear();
+ }
+
+ void revert(Object key) {
+ modifications.remove(key);
+ }
+
+ void evict(Object key) {
+ modifications.put(key, new SimpleValueWrapper(EVICTED));
+ }
+
+ void put(Object key, @Nullable Object value) {
+ modifications.put(key, new SimpleValueWrapper(value));
+ }
+
+ void commitTo(Cache cache) {
+ if (clearCalled) {
+ cache.clear();
+ }
+ modifications.forEach((key, valueWrapper) -> {
+ final var value = valueWrapper.get();
+ if (value == EVICTED) {
+ cache.evict(key);
+ } else {
+ cache.put(key, value);
+ }
+ });
+ }
+ }
+
+ /**
+ * Converts the wrapper to effective wrapper, i.e. returns null if the wrapper contains the special EVICTED value and the wrapper otherwise.
+ *
+ * @param wrapper The wrapper to convert
+ * @return The converted wrapper
+ */
+ @Nullable
+ private static ValueWrapper convert(@Nullable ValueWrapper wrapper) {
+ if (wrapper != null && wrapper.get() == EVICTED) {
+ return null;
+ }
+ return wrapper;
+ }
+
+ /**
+ * Converts the value of the specified wrapper, i.e. returns null if the wrapper contains the special EVICTED value or its value otherwise.
+ *
+ * @param wrapper The wrapper to extract the value from
+ * @return The converted value
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ private static T convertValue(@Nullable ValueWrapper wrapper) {
+ final var effectiveWrapper = convert(wrapper);
+ return effectiveWrapper == null ? null : (T)effectiveWrapper.get();
+ }
+
+ /**
+ * Requires the specified value to be null or be an instance of the specified type.
+ * Throws an IllegalStateException otherwise.
+ *
+ * @param value The value
+ * @param type The type
+ * @return The cast value
+ */
+ @SuppressWarnings("unchecked")
+ @Nullable
+ public static T cast(@Nullable Object value, @Nullable Class type) {
+ if (value != null && type != null && !type.isInstance(value)) {
+ throw new IllegalStateException(
+ "Cached value is not of required type [" + type.getName() + "]: " + value);
+ }
+ return (T)value;
+ }
+
+ /**
+ * Gets the current transaction state or null if no transaction is active.
+ * When first invoked within a transaction, a transaction synchronization is registered which will apply any changes in the current transaction on commit.
+ *
+ * @return The current transaction state
+ */
+ @Nullable
+ private TransactionState getTransactionState() {
+ if (TransactionSynchronizationManager.isSynchronizationActive()) {
+ var state = transactionState.get();
+ if (state == null) {
+ state = new TransactionState();
+ transactionState.set(state);
+ TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
+ @Override
+ public void afterCompletion(int status) {
+ final var currentState = Objects.requireNonNull(transactionState.get());
+ // Transfer any modifications to the underlying cache if the transaction committed
+ if (status == STATUS_COMMITTED) {
+ if (synchronizeCommits) {
+ synchronized (targetCache) {
+ currentState.commitTo(targetCache);
+ }
+ } else {
+ currentState.commitTo(targetCache);
+ }
+ }
+ transactionState.remove();
+ }
+ });
+ }
+ return state;
+ } else {
+ return null;
+ }
+ }
}
diff --git a/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java b/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java
new file mode 100644
index 000000000000..22f1c8898608
--- /dev/null
+++ b/spring-context-support/src/test/java/org/springframework/cache/transaction/TransactionAwareCacheDecoratorReadCommittedIsolationTests.java
@@ -0,0 +1,658 @@
+package org.springframework.cache.transaction;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.cache.Cache;
+import org.springframework.cache.concurrent.ConcurrentMapCache;
+import org.springframework.lang.Nullable;
+import org.springframework.transaction.support.TransactionSynchronizationManager;
+
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.springframework.transaction.support.TransactionSynchronization.STATUS_COMMITTED;
+import static org.springframework.transaction.support.TransactionSynchronization.STATUS_ROLLED_BACK;
+
+public class TransactionAwareCacheDecoratorReadCommittedIsolationTests {
+
+ private Cache cache;
+ private ConcurrentHashMap