95
95
import java .util .ArrayList ;
96
96
import java .util .Arrays ;
97
97
import java .util .Collections ;
98
- import java .util .HashMap ;
98
+ import java .util .ConcurrentModificationException ;
99
99
import java .util .List ;
100
100
import java .util .Map ;
101
101
import java .util .Optional ;
105
105
import java .util .concurrent .CountDownLatch ;
106
106
import java .util .concurrent .ExecutorService ;
107
107
import java .util .concurrent .Executors ;
108
+ import java .util .concurrent .Phaser ;
108
109
import java .util .concurrent .TimeUnit ;
110
+ import java .util .concurrent .atomic .AtomicBoolean ;
109
111
import java .util .concurrent .atomic .AtomicInteger ;
110
112
111
113
import static java .util .Collections .emptyMap ;
@@ -489,7 +491,7 @@ public void testStaleCount_OnRemovalNotificationOfStaleKey_DecrementsStaleCount(
489
491
indexShard .hashCode ()
490
492
);
491
493
// test the mapping
492
- ConcurrentMap <ShardId , HashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
494
+ ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
493
495
// shard id should exist
494
496
assertTrue (cleanupKeyToCountMap .containsKey (shardId ));
495
497
// reader CacheKeyId should NOT exist
@@ -552,7 +554,7 @@ public void testStaleCount_OnRemovalNotificationOfNonStaleKey_DoesNotDecrementsS
552
554
);
553
555
554
556
// test the mapping
555
- ConcurrentMap <ShardId , HashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
557
+ ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
556
558
// shard id should exist
557
559
assertTrue (cleanupKeyToCountMap .containsKey (shardId ));
558
560
// reader CacheKeyId should NOT exist
@@ -720,7 +722,7 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
720
722
cache .getOrCompute (getEntity (indexShard ), getLoader (reader ), reader , getTermBytes ());
721
723
assertEquals (1 , cache .count ());
722
724
// test the mappings
723
- ConcurrentMap <ShardId , HashMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
725
+ ConcurrentMap <ShardId , ConcurrentMap <String , Integer >> cleanupKeyToCountMap = cache .cacheCleanupManager .getCleanupKeyToCountMap ();
724
726
assertEquals (1 , (int ) cleanupKeyToCountMap .get (shardId ).get (getReaderCacheKeyId (reader )));
725
727
726
728
cache .getOrCompute (getEntity (indexShard ), getLoader (secondReader ), secondReader , getTermBytes ());
@@ -793,8 +795,54 @@ public void testCleanupKeyToCountMapAreSetAppropriately() throws Exception {
793
795
IOUtils .close (secondReader );
794
796
}
795
797
796
- private DirectoryReader getReader (IndexWriter writer , ShardId shardId ) throws IOException {
797
- return OpenSearchDirectoryReader .wrap (DirectoryReader .open (writer ), shardId );
798
+ // test adding to cleanupKeyToCountMap with multiple threads
799
+ public void testAddToCleanupKeyToCountMap () throws Exception {
800
+ threadPool = getThreadPool ();
801
+ Settings settings = Settings .builder ().put (INDICES_REQUEST_CACHE_STALENESS_THRESHOLD_SETTING .getKey (), "51%" ).build ();
802
+ cache = getIndicesRequestCache (settings );
803
+
804
+ int numberOfThreads = 10 ;
805
+ int numberOfIterations = 1000 ;
806
+ Phaser phaser = new Phaser (numberOfThreads + 1 ); // +1 for the main thread
807
+ AtomicBoolean exceptionDetected = new AtomicBoolean (false );
808
+
809
+ ExecutorService executorService = Executors .newFixedThreadPool (numberOfThreads );
810
+
811
+ for (int i = 0 ; i < numberOfThreads ; i ++) {
812
+ executorService .submit (() -> {
813
+ phaser .arriveAndAwaitAdvance (); // Ensure all threads start at the same time
814
+ try {
815
+ for (int j = 0 ; j < numberOfIterations ; j ++) {
816
+ cache .cacheCleanupManager .addToCleanupKeyToCountMap (indexShard .shardId (), UUID .randomUUID ().toString ());
817
+ }
818
+ } catch (ConcurrentModificationException e ) {
819
+ logger .error ("ConcurrentModificationException detected in thread : " + e .getMessage ());
820
+ exceptionDetected .set (true ); // Set flag if exception is detected
821
+ }
822
+ });
823
+ }
824
+ phaser .arriveAndAwaitAdvance (); // Start all threads
825
+
826
+ // Main thread iterates over the map
827
+ executorService .submit (() -> {
828
+ try {
829
+ for (int j = 0 ; j < numberOfIterations ; j ++) {
830
+ cache .cacheCleanupManager .getCleanupKeyToCountMap ().forEach ((k , v ) -> {
831
+ v .forEach ((k1 , v1 ) -> {
832
+ // Accessing the map to create contention
833
+ v .get (k1 );
834
+ });
835
+ });
836
+ }
837
+ } catch (ConcurrentModificationException e ) {
838
+ logger .error ("ConcurrentModificationException detected in main thread : " + e .getMessage ());
839
+ exceptionDetected .set (true ); // Set flag if exception is detected
840
+ }
841
+ });
842
+
843
+ executorService .shutdown ();
844
+ executorService .awaitTermination (60 , TimeUnit .SECONDS );
845
+ assertFalse (exceptionDetected .get ());
798
846
}
799
847
800
848
private IndicesRequestCache getIndicesRequestCache (Settings settings ) {
@@ -808,6 +856,10 @@ private IndicesRequestCache getIndicesRequestCache(Settings settings) {
808
856
);
809
857
}
810
858
859
+ private DirectoryReader getReader (IndexWriter writer , ShardId shardId ) throws IOException {
860
+ return OpenSearchDirectoryReader .wrap (DirectoryReader .open (writer ), shardId );
861
+ }
862
+
811
863
private Loader getLoader (DirectoryReader reader ) {
812
864
return new Loader (reader , 0 );
813
865
}
0 commit comments