Skip to content

Commit f838bfc

Browse files
sohamikaushalmahi12
authored andcommitted
Update search.concurrent.max_slice setting to dynamic cluster setting for main with lucene-9.8 (opensearch-project#9107)
* Update search.concurrent.max_slice setting to dynamic cluster setting for main with lucene-9.8 Signed-off-by: Sorabh Hamirwasia <[email protected]> * Address review comments Signed-off-by: Sorabh Hamirwasia <[email protected]> --------- Signed-off-by: Sorabh Hamirwasia <[email protected]> Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 76126b4 commit f838bfc

File tree

16 files changed

+450
-6
lines changed

16 files changed

+450
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
1010
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
1111
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
12+
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
1213

1314
### Dependencies
1415
- Bump `log4j-core` from 2.18.0 to 2.19.0

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,10 @@ public void apply(Settings value, Settings current, Settings previous) {
678678
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
679679
),
680680
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
681-
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING),
681+
List.of(
682+
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
683+
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
684+
),
682685
List.of(FeatureFlags.TELEMETRY),
683686
List.of(TelemetrySettings.TRACER_ENABLED_SETTING)
684687
);

server/src/main/java/org/opensearch/search/DefaultSearchContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,4 +943,12 @@ private boolean useConcurrentSearch(Executor concurrentSearchExecutor) {
943943
return false;
944944
}
945945
}
946+
947+
@Override
948+
public int getTargetMaxSliceCount() {
949+
if (isConcurrentSegmentSearchEnabled() == false) {
950+
throw new IllegalStateException("Target slice count should not be used when concurrent search is disabled");
951+
}
952+
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
953+
}
946954
}

server/src/main/java/org/opensearch/search/SearchService.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
254254
Property.NodeScope
255255
);
256256

257+
// settings to configure maximum slice created per search request using OS custom slice computation mechanism. Default lucene
258+
// mechanism will not be used if this setting is set with value > 0
259+
public static final String CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY = "search.concurrent.max_slice_count";
260+
public static final int CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE = 0;
261+
262+
// value == 0 means lucene slice computation will be used
263+
public static final Setting<Integer> CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING = Setting.intSetting(
264+
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY,
265+
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
266+
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
267+
Property.Dynamic,
268+
Property.NodeScope
269+
);
270+
257271
public static final int DEFAULT_SIZE = 10;
258272
public static final int DEFAULT_FROM = 0;
259273

server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.search.internal;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.apache.lucene.index.DirectoryReader;
3638
import org.apache.lucene.index.IndexReader;
3739
import org.apache.lucene.index.LeafReaderContext;
@@ -93,11 +95,13 @@
9395
* @opensearch.internal
9496
*/
9597
public class ContextIndexSearcher extends IndexSearcher implements Releasable {
98+
99+
private static final Logger logger = LogManager.getLogger(ContextIndexSearcher.class);
96100
/**
97101
* The interval at which we check for search cancellation when we cannot use
98102
* a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}.
99103
*/
100-
private static int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;
104+
private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;
101105

102106
private AggregatedDfs aggregatedDfs;
103107
private QueryProfiler profiler;
@@ -443,6 +447,16 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
443447
return collectionStatistics;
444448
}
445449

450+
/**
451+
* Compute the leaf slices that will be used by concurrent segment search to spread work across threads
452+
* @param leaves all the segments
453+
* @return leafSlice group to be executed by different threads
454+
*/
455+
@Override
456+
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
457+
return slicesInternal(leaves, searchContext.getTargetMaxSliceCount());
458+
}
459+
446460
public DirectoryReader getDirectoryReader() {
447461
final IndexReader reader = getIndexReader();
448462
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
@@ -522,4 +536,19 @@ private boolean shouldReverseLeafReaderContexts() {
522536
}
523537
return false;
524538
}
539+
540+
// package-private for testing
541+
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
542+
LeafSlice[] leafSlices;
543+
if (targetMaxSlice == 0) {
544+
// use the default lucene slice calculation
545+
leafSlices = super.slices(leaves);
546+
logger.debug("Slice count using lucene default [{}]", leafSlices.length);
547+
} else {
548+
// use the custom slice calculation based on targetMaxSlice
549+
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
550+
logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length);
551+
}
552+
return leafSlices;
553+
}
525554
}

server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,4 +564,9 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
564564
public boolean isConcurrentSegmentSearchEnabled() {
565565
return in.isConcurrentSegmentSearchEnabled();
566566
}
567+
568+
@Override
569+
public int getTargetMaxSliceCount() {
570+
return in.getTargetMaxSliceCount();
571+
}
567572
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.search.internal;
10+
11+
import org.apache.lucene.index.LeafReaderContext;
12+
import org.apache.lucene.search.IndexSearcher;
13+
14+
import java.util.ArrayList;
15+
import java.util.Collections;
16+
import java.util.Comparator;
17+
import java.util.List;
18+
19+
/**
20+
* Supplier to compute leaf slices based on passed in leaves and max target slice count to limit the number of computed slices. It sorts
21+
* all the leaves based on document count and then assign each leaf in round-robin fashion to the target slice count slices. Based on
22+
* experiment results as shared in <a href=https://github.com/opensearch-project/OpenSearch/issues/7358>issue-7358</a>
23+
* we can see this mechanism helps to achieve better tail/median latency over default lucene slice computation.
24+
*
25+
* @opensearch.internal
26+
*/
27+
final class MaxTargetSliceSupplier {
28+
29+
static IndexSearcher.LeafSlice[] getSlices(List<LeafReaderContext> leaves, int targetMaxSlice) {
30+
if (targetMaxSlice <= 0) {
31+
throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + targetMaxSlice);
32+
}
33+
34+
// slice count should not exceed the segment count
35+
int targetSliceCount = Math.min(targetMaxSlice, leaves.size());
36+
37+
// Make a copy so we can sort:
38+
List<LeafReaderContext> sortedLeaves = new ArrayList<>(leaves);
39+
40+
// Sort by maxDoc, descending:
41+
sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));
42+
43+
final List<List<LeafReaderContext>> groupedLeaves = new ArrayList<>(targetSliceCount);
44+
for (int i = 0; i < targetSliceCount; ++i) {
45+
groupedLeaves.add(new ArrayList<>());
46+
}
47+
// distribute the slices in round-robin fashion
48+
for (int idx = 0; idx < sortedLeaves.size(); ++idx) {
49+
int currentGroup = idx % targetSliceCount;
50+
groupedLeaves.get(currentGroup).add(sortedLeaves.get(idx));
51+
}
52+
53+
return groupedLeaves.stream().map(IndexSearcher.LeafSlice::new).toArray(IndexSearcher.LeafSlice[]::new);
54+
}
55+
}

server/src/main/java/org/opensearch/search/internal/SearchContext.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,4 +471,6 @@ public String toString() {
471471
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);
472472

473473
public abstract BucketCollectorProcessor bucketCollectorProcessor();
474+
475+
public abstract int getTargetMaxSliceCount();
474476
}

server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,4 +335,36 @@ public void testConcurrentSegmentSearchIndexSettings() {
335335
"node"
336336
);
337337
}
338+
339+
public void testMaxSliceCountClusterSettingsForConcurrentSearch() {
340+
// Test that we throw an exception without the feature flag
341+
Settings settings = Settings.builder()
342+
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)
343+
.build();
344+
SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings));
345+
assertTrue(
346+
ex.getMessage()
347+
.contains("unknown setting [" + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey() + "]")
348+
);
349+
350+
// Test that the settings updates correctly with the feature flag
351+
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
352+
int settingValue = randomIntBetween(0, 10);
353+
Settings settingsWithFeatureFlag = Settings.builder()
354+
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
355+
.build();
356+
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
357+
assertEquals(
358+
settingValue,
359+
(int) SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.get(settingsModule.getSettings())
360+
);
361+
362+
// Test that negative value is not allowed
363+
settingValue = -1;
364+
final Settings settingsWithFeatureFlag_2 = Settings.builder()
365+
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
366+
.build();
367+
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> new SettingsModule(settingsWithFeatureFlag_2));
368+
assertTrue(iae.getMessage().contains(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey()));
369+
}
338370
}

server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.index.cache.bitset.BitsetFilterCache;
8282
import org.opensearch.core.index.shard.ShardId;
8383
import org.opensearch.index.shard.IndexShard;
84+
import org.opensearch.search.SearchService;
8485
import org.opensearch.search.aggregations.LeafBucketCollector;
8586
import org.opensearch.test.OpenSearchTestCase;
8687
import org.opensearch.test.IndexSettingsModule;
@@ -89,7 +90,9 @@
8990
import java.io.UncheckedIOException;
9091
import java.util.Collections;
9192
import java.util.IdentityHashMap;
93+
import java.util.List;
9294
import java.util.Set;
95+
import java.util.concurrent.ExecutorService;
9396

9497
import static org.mockito.Mockito.mock;
9598
import static org.mockito.Mockito.when;
@@ -99,6 +102,7 @@
99102
import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms;
100103
import static org.hamcrest.Matchers.equalTo;
101104
import static org.hamcrest.Matchers.instanceOf;
105+
import static org.opensearch.search.internal.IndexReaderUtils.getLeaves;
102106

103107
public class ContextIndexSearcherTests extends OpenSearchTestCase {
104108
public void testIntersectScorerAndRoleBits() throws Exception {
@@ -303,6 +307,123 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
303307
IOUtils.close(reader, w, dir);
304308
}
305309

310+
public void testSlicesInternal() throws Exception {
311+
final List<LeafReaderContext> leaves = getLeaves(10);
312+
try (
313+
final Directory directory = newDirectory();
314+
IndexWriter iw = new IndexWriter(
315+
directory,
316+
new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)
317+
)
318+
) {
319+
Document document = new Document();
320+
document.add(new StringField("field1", "value1", Field.Store.NO));
321+
document.add(new StringField("field2", "value1", Field.Store.NO));
322+
iw.addDocument(document);
323+
iw.commit();
324+
try (DirectoryReader directoryReader = DirectoryReader.open(directory)) {
325+
SearchContext searchContext = mock(SearchContext.class);
326+
IndexShard indexShard = mock(IndexShard.class);
327+
when(searchContext.indexShard()).thenReturn(indexShard);
328+
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
329+
ContextIndexSearcher searcher = new ContextIndexSearcher(
330+
directoryReader,
331+
IndexSearcher.getDefaultSimilarity(),
332+
IndexSearcher.getDefaultQueryCache(),
333+
IndexSearcher.getDefaultQueryCachingPolicy(),
334+
true,
335+
null,
336+
searchContext
337+
);
338+
// Case 1: Verify the slice count when lucene default slice computation is used
339+
IndexSearcher.LeafSlice[] slices = searcher.slicesInternal(
340+
leaves,
341+
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE
342+
);
343+
int expectedSliceCount = 2;
344+
// 2 slices will be created since max segment per slice of 5 will be reached
345+
assertEquals(expectedSliceCount, slices.length);
346+
for (int i = 0; i < expectedSliceCount; ++i) {
347+
assertEquals(5, slices[i].leaves.length);
348+
}
349+
350+
// Case 2: Verify the slice count when custom max slice computation is used
351+
expectedSliceCount = 4;
352+
slices = searcher.slicesInternal(leaves, expectedSliceCount);
353+
354+
// 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices
355+
assertEquals(expectedSliceCount, slices.length);
356+
for (int i = 0; i < expectedSliceCount; ++i) {
357+
if (i < 2) {
358+
assertEquals(3, slices[i].leaves.length);
359+
} else {
360+
assertEquals(2, slices[i].leaves.length);
361+
}
362+
}
363+
}
364+
}
365+
}
366+
367+
public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception {
368+
final List<LeafReaderContext> leaves = getLeaves(10);
369+
try (
370+
final Directory directory = newDirectory();
371+
IndexWriter iw = new IndexWriter(
372+
directory,
373+
new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)
374+
)
375+
) {
376+
Document document = new Document();
377+
document.add(new StringField("field1", "value1", Field.Store.NO));
378+
document.add(new StringField("field2", "value1", Field.Store.NO));
379+
iw.addDocument(document);
380+
iw.commit();
381+
try (DirectoryReader directoryReader = DirectoryReader.open(directory);) {
382+
SearchContext searchContext = mock(SearchContext.class);
383+
IndexShard indexShard = mock(IndexShard.class);
384+
when(searchContext.indexShard()).thenReturn(indexShard);
385+
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
386+
when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(false);
387+
ContextIndexSearcher searcher = new ContextIndexSearcher(
388+
directoryReader,
389+
IndexSearcher.getDefaultSimilarity(),
390+
IndexSearcher.getDefaultQueryCache(),
391+
IndexSearcher.getDefaultQueryCachingPolicy(),
392+
true,
393+
null,
394+
searchContext
395+
);
396+
// Case 1: Verify getSlices return null when concurrent segment search is disabled
397+
assertNull(searcher.getSlices());
398+
399+
// Case 2: Verify the slice count when custom max slice computation is used
400+
searcher = new ContextIndexSearcher(
401+
directoryReader,
402+
IndexSearcher.getDefaultSimilarity(),
403+
IndexSearcher.getDefaultQueryCache(),
404+
IndexSearcher.getDefaultQueryCachingPolicy(),
405+
true,
406+
mock(ExecutorService.class),
407+
searchContext
408+
);
409+
when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(true);
410+
when(searchContext.getTargetMaxSliceCount()).thenReturn(4);
411+
int expectedSliceCount = 4;
412+
IndexSearcher.LeafSlice[] slices = searcher.slices(leaves);
413+
414+
// 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices
415+
assertEquals(expectedSliceCount, slices.length);
416+
for (int i = 0; i < expectedSliceCount; ++i) {
417+
if (i < 2) {
418+
assertEquals(3, slices[i].leaves.length);
419+
} else {
420+
assertEquals(2, slices[i].leaves.length);
421+
}
422+
}
423+
}
424+
}
425+
}
426+
306427
private SparseFixedBitSet query(LeafReaderContext leaf, String field, String value) throws IOException {
307428
SparseFixedBitSet sparseFixedBitSet = new SparseFixedBitSet(leaf.reader().maxDoc());
308429
TermsEnum tenum = leaf.reader().terms(field).iterator();

0 commit comments

Comments
 (0)