Skip to content

Commit 8312e42

Browse files
[Rule based auto tagging] Add in-memory rule processing service (#17365)
* [rule based autotagging] add attribute value store Signed-off-by: Kaushal Kumar <[email protected]> * add in-memory rule processing service Signed-off-by: Kaushal Kumar <[email protected]> * add missing javadoc Signed-off-by: Kaushal Kumar <[email protected]> * merge the in-memory store changes: commit d02e544 Author: Kaushal Kumar <[email protected]> Date: Mon Feb 17 13:05:20 2025 -0800 add licenses directory Signed-off-by: Kaushal Kumar <[email protected]> commit 3f98f9d Author: Kaushal Kumar <[email protected]> Date: Mon Feb 17 11:52:56 2025 -0800 improve binary search bisecting expression Signed-off-by: Kaushal Kumar <[email protected]> commit 630a3ee Author: Kaushal Kumar <[email protected]> Date: Mon Feb 17 11:14:39 2025 -0800 improve javadoc for attribute value store Signed-off-by: Kaushal Kumar <[email protected]> commit acdb27c Author: Kaushal Kumar <[email protected]> Date: Fri Feb 14 10:09:58 2025 -0800 add missing javadoc Signed-off-by: Kaushal Kumar <[email protected]> commit 24c4ea6 Author: Kaushal Kumar <[email protected]> Date: Fri Feb 14 09:28:46 2025 -0800 run spotless apply Signed-off-by: Kaushal Kumar <[email protected]> commit 75b6e68 Author: Kaushal Kumar <[email protected]> Date: Fri Feb 14 09:24:32 2025 -0800 make the store interface generic Signed-off-by: Kaushal Kumar <[email protected]> Signed-off-by: Kaushal Kumar <[email protected]> * fix generics error Signed-off-by: Kaushal Kumar <[email protected]> * add CHANGELOG entry Signed-off-by: Kaushal Kumar <[email protected]> * remove stubs Signed-off-by: Kaushal Kumar <[email protected]> * move generic logic to lib Signed-off-by: Kaushal Kumar <[email protected]> * fix javadoc error Signed-off-by: Kaushal Kumar <[email protected]> * fix javadoc error Signed-off-by: Kaushal Kumar <[email protected]> * delete licenses from the wlm plugin Signed-off-by: Kaushal Kumar <[email protected]> * expose feature level attribute value store init method Signed-off-by: Kaushal Kumar <[email protected]> * add extra space to remove unwanted entry from the changelog diff Signed-off-by: Kaushal Kumar <[email protected]> * address comments Signed-off-by: Kaushal Kumar <[email protected]> * use constructors over static methods Signed-off-by: Kaushal Kumar <[email protected]> * make member var final in InMemoryRuleProcessingService Signed-off-by: Kaushal Kumar <[email protected]> * make concurrency checks more granular Signed-off-by: Kaushal Kumar <[email protected]> * add concurrent test Signed-off-by: Kaushal Kumar <[email protected]> * remove forbidden api usage Signed-off-by: Kaushal Kumar <[email protected]> --------- Signed-off-by: Kaushal Kumar <[email protected]>
1 parent 04db50a commit 8312e42

File tree

21 files changed

+661
-80
lines changed

21 files changed

+661
-80
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
1414
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615))
1515
- Add FilterFieldType for developers who want to wrap MappedFieldType ([#17627](https://github.com/opensearch-project/OpenSearch/pull/17627))
16+
- [Rule Based Auto-tagging] Add in-memory rule processing service ([#17365](https://github.com/opensearch-project/OpenSearch/pull/17365))
1617
- [Security Manager Replacement] Create initial Java Agent to intercept Socket::connect calls ([#17724](https://github.com/opensearch-project/OpenSearch/pull/17724))
1718
- Add ingestion management APIs for pause, resume and get ingestion state ([#17631](https://github.com/opensearch-project/OpenSearch/pull/17631))
1819
- [Security Manager Replacement] Enhance Java Agent to intercept System::exit ([#17746](https://github.com/opensearch-project/OpenSearch/pull/17746))

libs/autotagging-commons/build.gradle

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
dependencies {
10+
api 'org.apache.commons:commons-collections4:4.4'
11+
api project(":server")
12+
13+
testImplementation(project(":test:framework")) {
14+
exclude group: 'org.opensearch', module: 'opensearch-core'
15+
}
16+
}
17+
18+
tasks.named("dependencyLicenses").configure {
19+
mapping from: /commons-collections.*/, to: 'commons-collections'
20+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rule;
10+
11+
import org.opensearch.autotagging.Attribute;
12+
import org.opensearch.autotagging.FeatureType;
13+
import org.opensearch.autotagging.Rule;
14+
import org.opensearch.rule.attribute_extractor.AttributeExtractor;
15+
import org.opensearch.rule.storage.AttributeValueStore;
16+
import org.opensearch.rule.storage.AttributeValueStoreFactory;
17+
18+
import java.util.List;
19+
import java.util.Map;
20+
import java.util.Optional;
21+
import java.util.Set;
22+
import java.util.function.BiConsumer;
23+
import java.util.function.Supplier;
24+
25+
/**
26+
* This class is responsible for managing in-memory view of Rules and Find matching Rule for the request
27+
* Each auto-tagging feature should use a separate instance of this class as this avoid potential concurrency overhead
28+
* in case of dynamic updates and attribute sharing scenarios
29+
*/
30+
public class InMemoryRuleProcessingService {
31+
32+
private final AttributeValueStoreFactory attributeValueStoreFactory;
33+
34+
/**
35+
* Constrcutor
36+
* @param featureType
37+
* @param attributeValueStoreSupplier
38+
*/
39+
public InMemoryRuleProcessingService(
40+
FeatureType featureType,
41+
Supplier<AttributeValueStore<String, String>> attributeValueStoreSupplier
42+
) {
43+
attributeValueStoreFactory = new AttributeValueStoreFactory(featureType, attributeValueStoreSupplier);
44+
}
45+
46+
/**
47+
* Adds the rule to in-memory view
48+
* @param rule to be added
49+
*/
50+
public void add(final Rule rule) {
51+
perform(rule, this::addOperation);
52+
}
53+
54+
/**
55+
* Removes the rule from in-memory view
56+
* @param rule to be removed
57+
*/
58+
public void remove(final Rule rule) {
59+
perform(rule, this::removeOperation);
60+
}
61+
62+
private void perform(Rule rule, BiConsumer<Map.Entry<Attribute, Set<String>>, Rule> ruleOperation) {
63+
for (Map.Entry<Attribute, Set<String>> attributeEntry : rule.getAttributeMap().entrySet()) {
64+
ruleOperation.accept(attributeEntry, rule);
65+
}
66+
}
67+
68+
private void removeOperation(Map.Entry<Attribute, Set<String>> attributeEntry, Rule rule) {
69+
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(attributeEntry.getKey());
70+
for (String value : attributeEntry.getValue()) {
71+
valueStore.remove(value);
72+
}
73+
}
74+
75+
private void addOperation(Map.Entry<Attribute, Set<String>> attributeEntry, Rule rule) {
76+
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(attributeEntry.getKey());
77+
for (String value : attributeEntry.getValue()) {
78+
valueStore.put(value, rule.getFeatureValue());
79+
}
80+
}
81+
82+
/**
83+
* Evaluates the label for the current request. It finds the matches for each attribute value and then it is an
84+
* intersection of all the matches
85+
* @param attributeExtractors list of extractors which are used to get the attribute values to find the
86+
* matching rule
87+
* @return a label if there is unique label otherwise empty
88+
*/
89+
public Optional<String> evaluateLabel(List<AttributeExtractor<String>> attributeExtractors) {
90+
assert attributeValueStoreFactory != null;
91+
Optional<String> result = Optional.empty();
92+
for (AttributeExtractor<String> attributeExtractor : attributeExtractors) {
93+
AttributeValueStore<String, String> valueStore = attributeValueStoreFactory.getAttributeValueStore(
94+
attributeExtractor.getAttribute()
95+
);
96+
for (String value : attributeExtractor.extract()) {
97+
Optional<String> possibleMatch = valueStore.get(value);
98+
99+
if (possibleMatch.isEmpty()) {
100+
return Optional.empty();
101+
}
102+
103+
if (result.isEmpty()) {
104+
result = possibleMatch;
105+
} else {
106+
boolean isThePossibleMatchEqualResult = possibleMatch.get().equals(result.get());
107+
if (!isThePossibleMatchEqualResult) {
108+
return Optional.empty();
109+
}
110+
}
111+
}
112+
}
113+
return result;
114+
}
115+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rule.attribute_extractor;
10+
11+
import org.opensearch.autotagging.Attribute;
12+
13+
/**
14+
* This interface defines the contract for extracting the attributes for Rule based auto-tagging feature
15+
* @param <V>
16+
*/
17+
public interface AttributeExtractor<V> {
18+
/**
19+
* This method returns the Attribute which it is responsible for extracting
20+
* @return attribute
21+
*/
22+
Attribute getAttribute();
23+
24+
/**
25+
* This method returns the attribute values in context of the current request
26+
* @return attribute value
27+
*/
28+
Iterable<V> extract();
29+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* This package contains feature attribute extractor interface and its implementations
11+
*/
12+
package org.opensearch.rule.attribute_extractor;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
/**
10+
* Rule based auto-tagging generic constructs
11+
*/
12+
package org.opensearch.rule;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.plugin.wlm.rule.storage;
9+
package org.opensearch.rule.storage;
1010

1111
import java.util.Optional;
1212

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rule.storage;
10+
11+
import org.opensearch.autotagging.Attribute;
12+
import org.opensearch.autotagging.FeatureType;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
import java.util.function.Supplier;
17+
18+
/**
19+
* Factory class for AttributeValueStore per feature type as two feature types can potentially share same attribute
20+
*/
21+
public class AttributeValueStoreFactory {
22+
private final Map<String, AttributeValueStore<String, String>> attributeValueStores = new HashMap<>();
23+
24+
/**
25+
* Constructor
26+
* @param featureType is the feature which are using rule based auto tagging
27+
* @param attributeValueStoreSupplier supplies the feature level AttributeValueStore instance
28+
*/
29+
public AttributeValueStoreFactory(FeatureType featureType, Supplier<AttributeValueStore<String, String>> attributeValueStoreSupplier) {
30+
for (Attribute attribute : featureType.getAllowedAttributesRegistry().values()) {
31+
attributeValueStores.put(attribute.getName(), attributeValueStoreSupplier.get());
32+
}
33+
}
34+
35+
/**
36+
* Factory method which returns the {@link AttributeValueStore} for the given attribute
37+
* @param attribute
38+
* @return
39+
*/
40+
public AttributeValueStore<String, String> getAttributeValueStore(final Attribute attribute) {
41+
final String attributeName = attribute.getName();
42+
if (attributeValueStores == null) {
43+
throw new IllegalStateException("AttributeValueStoreFactory is not initialized yet.");
44+
}
45+
46+
if (!attributeValueStores.containsKey(attributeName)) {
47+
throw new IllegalArgumentException("[" + attributeName + "] is not a valid attribute for enabled features.");
48+
}
49+
50+
return attributeValueStores.get(attributeName);
51+
}
52+
}
Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,13 @@
66
* compatible open source license.
77
*/
88

9-
package org.opensearch.plugin.wlm.rule.storage;
9+
package org.opensearch.rule.storage;
1010

1111
import org.apache.commons.collections4.trie.PatriciaTrie;
1212

1313
import java.util.Map;
1414
import java.util.Optional;
15+
import java.util.concurrent.locks.ReentrantReadWriteLock;
1516

1617
/**
1718
* This is a patricia trie based implementation of AttributeValueStore
@@ -20,7 +21,10 @@
2021
* ref: https://commons.apache.org/proper/commons-collections/javadocs/api-4.4/org/apache/commons/collections4/trie/PatriciaTrie.html
2122
*/
2223
public class DefaultAttributeValueStore<K extends String, V> implements AttributeValueStore<K, V> {
23-
PatriciaTrie<V> trie;
24+
private final PatriciaTrie<V> trie;
25+
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
26+
private static final ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
27+
private static final ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
2428

2529
/**
2630
* Default constructor
@@ -39,34 +43,48 @@ public DefaultAttributeValueStore(PatriciaTrie<V> trie) {
3943

4044
@Override
4145
public void put(K key, V value) {
42-
trie.put(key, value);
46+
writeLock.lock();
47+
try {
48+
trie.put(key, value);
49+
} finally {
50+
writeLock.unlock();
51+
}
4352
}
4453

4554
@Override
4655
public void remove(String key) {
47-
trie.remove(key);
56+
writeLock.lock();
57+
try {
58+
trie.remove(key);
59+
} finally {
60+
writeLock.unlock();
61+
}
4862
}
4963

5064
@Override
5165
public Optional<V> get(String key) {
52-
/**
53-
* Since we are inserting prefixes into the trie and searching for larger strings
54-
* It is important to find the largest matching prefix key in the trie efficiently
55-
* Hence we can do binary search
56-
*/
57-
final String longestMatchingPrefix = findLongestMatchingPrefix(key);
66+
readLock.lock();
67+
try {
68+
/**
69+
* Since we are inserting prefixes into the trie and searching for larger strings
70+
* It is important to find the largest matching prefix key in the trie efficiently
71+
* Hence we can do binary search
72+
*/
73+
final String longestMatchingPrefix = findLongestMatchingPrefix(key);
5874

59-
/**
60-
* Now there are following cases for this prefix
61-
* 1. There is a Rule which has this prefix as one of the attribute values. In this case we should return the
62-
* Rule's label otherwise send empty
63-
*/
64-
for (Map.Entry<String, V> possibleMatch : trie.prefixMap(longestMatchingPrefix).entrySet()) {
65-
if (key.startsWith(possibleMatch.getKey())) {
66-
return Optional.of(possibleMatch.getValue());
75+
/**
76+
* Now there are following cases for this prefix
77+
* 1. There is a Rule which has this prefix as one of the attribute values. In this case we should return the
78+
* Rule's label otherwise send empty
79+
*/
80+
for (Map.Entry<String, V> possibleMatch : trie.prefixMap(longestMatchingPrefix).entrySet()) {
81+
if (key.startsWith(possibleMatch.getKey())) {
82+
return Optional.of(possibleMatch.getValue());
83+
}
6784
}
85+
} finally {
86+
readLock.unlock();
6887
}
69-
7088
return Optional.empty();
7189
}
7290

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,4 @@
99
/**
1010
* This package contains interfaces and implementations for in memory rule storage mechanisms
1111
*/
12-
package org.opensearch.plugin.wlm.rule.storage;
12+
package org.opensearch.rule.storage;

0 commit comments

Comments
 (0)