Skip to content

Commit b880c0b

Browse files
sgup432Peter Alfonsi
authored andcommitted
[Tiered caching] Framework changes (opensearch-project#10753)
* [Tiered caching] Framework changes Signed-off-by: Sagar Upadhyaya <[email protected]> * Added javadoc for new files/packages Signed-off-by: Sagar Upadhyaya <[email protected]> * Added changelog Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing javadoc warnings Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Addressing additional minor comments Signed-off-by: Sagar Upadhyaya <[email protected]> * Moving non null check to builder for OS onHeapCache Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding package-info for new packages Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing service and adding different cache interfaces along with event listener support Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing gradle missingDoc issue Signed-off-by: Sagar Upadhyaya <[email protected]> * Changing listener logic, removing tiered cache integration with IRC Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding opensearch.internal tag for LoadAwareCacheLoader Signed-off-by: Sagar Upadhyaya <[email protected]> * Fixing thread safety issue Signed-off-by: Sagar Upadhyaya <[email protected]> * Remove compute function and event listener logic change for TieredCache Signed-off-by: Sagar Upadhyaya <[email protected]> * Making Cache.compute function private Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding javadoc and more test for cache.put Signed-off-by: Sagar Upadhyaya <[email protected]> * Adding write locks to refresh API as well Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing unwanted EventType class and refactoring one UT Signed-off-by: Sagar Upadhyaya <[email protected]> * Removing TieredCache interface Signed-off-by: Sagar Upadhyaya <[email protected]> --------- Signed-off-by: Sagar Upadhyaya <[email protected]> Signed-off-by: Sagar <[email protected]>
1 parent a5cc554 commit b880c0b

18 files changed

+1565
-55
lines changed

server/src/main/java/org/opensearch/common/cache/Cache.java

Lines changed: 59 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -422,68 +422,74 @@ public V computeIfAbsent(K key, CacheLoader<K, V> loader) throws ExecutionExcept
422422
}
423423
});
424424
if (value == null) {
425-
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
426-
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
427-
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
428-
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
429-
// get the value from this future on the thread that won the race to place the future into the segment map
430-
CacheSegment<K, V> segment = getCacheSegment(key);
431-
CompletableFuture<Entry<K, V>> future;
432-
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
425+
value = compute(key, loader);
426+
}
427+
return value;
428+
}
433429

434-
try (ReleasableLock ignored = segment.writeLock.acquire()) {
435-
future = segment.map.putIfAbsent(key, completableFuture);
436-
}
430+
private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
431+
long now = now();
432+
// we need to synchronize loading of a value for a given key; however, holding the segment lock while
433+
// invoking load can lead to deadlock against another thread due to dependent key loading; therefore, we
434+
// need a mechanism to ensure that load is invoked at most once, but we are not invoking load while holding
435+
// the segment lock; to do this, we atomically put a future in the map that can load the value, and then
436+
// get the value from this future on the thread that won the race to place the future into the segment map
437+
CacheSegment<K, V> segment = getCacheSegment(key);
438+
CompletableFuture<Entry<K, V>> future;
439+
CompletableFuture<Entry<K, V>> completableFuture = new CompletableFuture<>();
437440

438-
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
439-
if (ok != null) {
440-
try (ReleasableLock ignored = lruLock.acquire()) {
441-
promote(ok, now);
442-
}
443-
return ok.value;
444-
} else {
445-
try (ReleasableLock ignored = segment.writeLock.acquire()) {
446-
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
447-
if (sanity != null && sanity.isCompletedExceptionally()) {
448-
segment.map.remove(key);
449-
}
450-
}
451-
return null;
452-
}
453-
};
441+
try (ReleasableLock ignored = segment.writeLock.acquire()) {
442+
future = segment.map.putIfAbsent(key, completableFuture);
443+
}
454444

455-
CompletableFuture<V> completableValue;
456-
if (future == null) {
457-
future = completableFuture;
458-
completableValue = future.handle(handler);
459-
V loaded;
460-
try {
461-
loaded = loader.load(key);
462-
} catch (Exception e) {
463-
future.completeExceptionally(e);
464-
throw new ExecutionException(e);
465-
}
466-
if (loaded == null) {
467-
NullPointerException npe = new NullPointerException("loader returned a null value");
468-
future.completeExceptionally(npe);
469-
throw new ExecutionException(npe);
470-
} else {
471-
future.complete(new Entry<>(key, loaded, now));
445+
BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
446+
if (ok != null) {
447+
try (ReleasableLock ignored = lruLock.acquire()) {
448+
promote(ok, now);
472449
}
450+
return ok.value;
473451
} else {
474-
completableValue = future.handle(handler);
452+
try (ReleasableLock ignored = segment.writeLock.acquire()) {
453+
CompletableFuture<Entry<K, V>> sanity = segment.map.get(key);
454+
if (sanity != null && sanity.isCompletedExceptionally()) {
455+
segment.map.remove(key);
456+
}
457+
}
458+
return null;
475459
}
460+
};
476461

462+
CompletableFuture<V> completableValue;
463+
if (future == null) {
464+
future = completableFuture;
465+
completableValue = future.handle(handler);
466+
V loaded;
477467
try {
478-
value = completableValue.get();
479-
// check to ensure the future hasn't been completed with an exception
480-
if (future.isCompletedExceptionally()) {
481-
future.get(); // call get to force the exception to be thrown for other concurrent callers
482-
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
483-
}
484-
} catch (InterruptedException e) {
485-
throw new IllegalStateException(e);
468+
loaded = loader.load(key);
469+
} catch (Exception e) {
470+
future.completeExceptionally(e);
471+
throw new ExecutionException(e);
486472
}
473+
if (loaded == null) {
474+
NullPointerException npe = new NullPointerException("loader returned a null value");
475+
future.completeExceptionally(npe);
476+
throw new ExecutionException(npe);
477+
} else {
478+
future.complete(new Entry<>(key, loaded, now));
479+
}
480+
} else {
481+
completableValue = future.handle(handler);
482+
}
483+
V value;
484+
try {
485+
value = completableValue.get();
486+
// check to ensure the future hasn't been completed with an exception
487+
if (future.isCompletedExceptionally()) {
488+
future.get(); // call get to force the exception to be thrown for other concurrent callers
489+
throw new IllegalStateException("the future was completed exceptionally but no exception was thrown");
490+
}
491+
} catch (InterruptedException e) {
492+
throw new IllegalStateException(e);
487493
}
488494
return value;
489495
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache;
10+
11+
/**
12+
* Represents a cache interface.
13+
* @param <K> Type of key.
14+
* @param <V> Type of value.
15+
*
16+
* @opensearch.experimental
17+
*/
18+
public interface ICache<K, V> {
19+
V get(K key);
20+
21+
void put(K key, V value);
22+
23+
V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception;
24+
25+
void invalidate(K key);
26+
27+
void invalidateAll();
28+
29+
Iterable<K> keys();
30+
31+
long count();
32+
33+
void refresh();
34+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache;
10+
11+
/**
12+
* Extends a cache loader with awareness of whether the data is loaded or not.
13+
* @param <K> Type of key.
14+
* @param <V> Type of value.
15+
*
16+
* @opensearch.internal
17+
*/
18+
public interface LoadAwareCacheLoader<K, V> extends CacheLoader<K, V> {
19+
boolean isLoaded();
20+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.Cache;
12+
import org.opensearch.common.cache.CacheBuilder;
13+
import org.opensearch.common.cache.LoadAwareCacheLoader;
14+
import org.opensearch.common.cache.RemovalListener;
15+
import org.opensearch.common.cache.RemovalNotification;
16+
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
17+
import org.opensearch.common.cache.store.enums.CacheStoreType;
18+
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
19+
20+
/**
21+
* This variant of on-heap cache uses OpenSearch custom cache implementation.
22+
* @param <K> Type of key.
23+
* @param <V> Type of value.
24+
*
25+
* @opensearch.experimental
26+
*/
27+
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {
28+
29+
private final Cache<K, V> cache;
30+
31+
private final StoreAwareCacheEventListener<K, V> eventListener;
32+
33+
public OpenSearchOnHeapCache(Builder<K, V> builder) {
34+
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
35+
.setMaximumWeight(builder.getMaxWeightInBytes())
36+
.weigher(builder.getWeigher())
37+
.removalListener(this);
38+
if (builder.getExpireAfterAcess() != null) {
39+
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());
40+
}
41+
cache = cacheBuilder.build();
42+
this.eventListener = builder.getEventListener();
43+
}
44+
45+
@Override
46+
public V get(K key) {
47+
V value = cache.get(key);
48+
if (value != null) {
49+
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
50+
} else {
51+
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
52+
}
53+
return value;
54+
}
55+
56+
@Override
57+
public void put(K key, V value) {
58+
cache.put(key, value);
59+
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
60+
}
61+
62+
@Override
63+
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
64+
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
65+
if (!loader.isLoaded()) {
66+
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
67+
} else {
68+
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
69+
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
70+
}
71+
return value;
72+
}
73+
74+
@Override
75+
public void invalidate(K key) {
76+
cache.invalidate(key);
77+
}
78+
79+
@Override
80+
public void invalidateAll() {
81+
cache.invalidateAll();
82+
}
83+
84+
@Override
85+
public Iterable<K> keys() {
86+
return cache.keys();
87+
}
88+
89+
@Override
90+
public long count() {
91+
return cache.count();
92+
}
93+
94+
@Override
95+
public void refresh() {
96+
cache.refresh();
97+
}
98+
99+
@Override
100+
public CacheStoreType getTierType() {
101+
return CacheStoreType.ON_HEAP;
102+
}
103+
104+
@Override
105+
public void onRemoval(RemovalNotification<K, V> notification) {
106+
eventListener.onRemoval(
107+
new StoreAwareCacheRemovalNotification<>(
108+
notification.getKey(),
109+
notification.getValue(),
110+
notification.getRemovalReason(),
111+
CacheStoreType.ON_HEAP
112+
)
113+
);
114+
}
115+
116+
/**
117+
* Builder object
118+
* @param <K> Type of key
119+
* @param <V> Type of value
120+
*/
121+
public static class Builder<K, V> extends StoreAwareCacheBuilder<K, V> {
122+
123+
@Override
124+
public StoreAwareCache<K, V> build() {
125+
return new OpenSearchOnHeapCache<K, V>(this);
126+
}
127+
}
128+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.ICache;
12+
import org.opensearch.common.cache.store.enums.CacheStoreType;
13+
14+
/**
15+
* Represents a cache with a specific type of store like onHeap, disk etc.
16+
* @param <K> Type of key.
17+
* @param <V> Type of value.
18+
*
19+
* @opensearch.experimental
20+
*/
21+
public interface StoreAwareCache<K, V> extends ICache<K, V> {
22+
CacheStoreType getTierType();
23+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.cache.store;
10+
11+
import org.opensearch.common.cache.RemovalNotification;
12+
import org.opensearch.common.cache.RemovalReason;
13+
import org.opensearch.common.cache.store.enums.CacheStoreType;
14+
15+
/**
16+
* Removal notification for store aware cache.
17+
* @param <K> Type of key.
18+
* @param <V> Type of value.
19+
*
20+
* @opensearch.internal
21+
*/
22+
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
23+
private final CacheStoreType cacheStoreType;
24+
25+
public StoreAwareCacheRemovalNotification(K key, V value, RemovalReason removalReason, CacheStoreType cacheStoreType) {
26+
super(key, value, removalReason);
27+
this.cacheStoreType = cacheStoreType;
28+
}
29+
30+
public CacheStoreType getCacheStoreType() {
31+
return cacheStoreType;
32+
}
33+
}

0 commit comments

Comments
 (0)