Skip to content

Commit d950579

Browse files
committed
[GRPC] SearchService and Search GRPC endpoint v1
Signed-off-by: Karen Xu <[email protected]>
1 parent 4560206 commit d950579

File tree

79 files changed

+6727
-18
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+6727
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2626
- Add GRPC DocumentService and Bulk endpoint ([#17727](https://github.com/opensearch-project/OpenSearch/pull/17727))
2727
- Added scale to zero (`search_only` mode) support for OpenSearch reader writer separation ([#17299](https://github.com/opensearch-project/OpenSearch/pull/17299)
2828
- [Star Tree] [Search] Resolving numeric range aggregation with metric aggregation using star-tree ([#17273](https://github.com/opensearch-project/OpenSearch/pull/17273))
29+
- Add SearchService and Search GRPC endpoint ([#17830](https://github.com/opensearch-project/OpenSearch/pull/17830))
2930

3031
### Changed
3132
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))

plugins/transport-grpc/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ dependencies {
2929
implementation "io.grpc:grpc-stub:${versions.grpc}"
3030
implementation "io.grpc:grpc-util:${versions.grpc}"
3131
implementation "io.perfmark:perfmark-api:0.26.0"
32-
implementation "org.opensearch:protobufs:0.1.0"
32+
implementation "org.opensearch:protobufs:0.2.0"
3333
}
3434

3535
tasks.named("dependencyLicenses").configure {

plugins/transport-grpc/licenses/protobufs-0.1.0.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
a29095657b4a0f9b59659d71e7e540e9b07fd044

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/GrpcPlugin.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.opensearch.env.Environment;
2020
import org.opensearch.env.NodeEnvironment;
2121
import org.opensearch.plugin.transport.grpc.services.DocumentServiceImpl;
22+
import org.opensearch.plugin.transport.grpc.services.SearchServiceImpl;
2223
import org.opensearch.plugins.NetworkPlugin;
2324
import org.opensearch.plugins.Plugin;
2425
import org.opensearch.repositories.RepositoriesService;
@@ -80,7 +81,7 @@ public Map<String, Supplier<AuxTransport>> getAuxTransports(
8081
if (client == null) {
8182
throw new RuntimeException("client cannot be null");
8283
}
83-
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client));
84+
List<BindableService> grpcServices = registerGRPCServices(new DocumentServiceImpl(client), new SearchServiceImpl(client));
8485
return Collections.singletonMap(
8586
GRPC_TRANSPORT_SETTING_KEY,
8687
() -> new Netty4GrpcServerTransport(settings, grpcServices, networkService)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.plugin.transport.grpc.listeners;
10+
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.action.search.SearchResponse;
14+
import org.opensearch.core.action.ActionListener;
15+
import org.opensearch.plugin.transport.grpc.proto.response.search.SearchResponseProtoUtils;
16+
17+
import java.io.IOException;
18+
19+
import io.grpc.stub.StreamObserver;
20+
21+
/**
22+
* Listener for search request execution completion, handling successful and failure scenarios.
23+
*/
24+
public class SearchRequestActionListener implements ActionListener<SearchResponse> {
25+
private static final Logger logger = LogManager.getLogger(SearchRequestActionListener.class);
26+
27+
private StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver;
28+
29+
public SearchRequestActionListener(StreamObserver<org.opensearch.protobufs.SearchResponse> responseObserver) {
30+
super();
31+
this.responseObserver = responseObserver;
32+
}
33+
34+
@Override
35+
public void onResponse(SearchResponse response) {
36+
// Search execution succeeded. Convert the opensearch internal response to protobuf
37+
try {
38+
org.opensearch.protobufs.SearchResponse protoResponse = SearchResponseProtoUtils.toProto(response);
39+
responseObserver.onNext(protoResponse);
40+
responseObserver.onCompleted();
41+
} catch (RuntimeException | IOException e) {
42+
responseObserver.onError(e);
43+
}
44+
}
45+
46+
@Override
47+
public void onFailure(Exception e) {
48+
logger.error("SearchRequestActionListener failed to process search request:" + e.getMessage());
49+
responseObserver.onError(e);
50+
}
51+
}

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/FetchSourceContextProtoUtils.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,41 @@ public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.
6969
return null;
7070
}
7171

72+
/**
73+
* Similar to {@link FetchSourceContext#parseFromRestRequest(RestRequest)}
74+
* @param request
75+
* @return
76+
*/
77+
public static FetchSourceContext parseFromProtoRequest(org.opensearch.protobufs.SearchRequest request) {
78+
Boolean fetchSource = null;
79+
String[] sourceExcludes = null;
80+
String[] sourceIncludes = null;
81+
82+
if (request.hasSource()) {
83+
SourceConfigParam source = request.getSource();
84+
85+
// TODO test both cases in parity testing
86+
if (source.hasBoolValue()) {
87+
fetchSource = source.getBoolValue();
88+
} else {
89+
sourceIncludes = source.getStringArray().getStringArrayList().toArray(new String[0]);
90+
}
91+
}
92+
93+
if (request.getSourceIncludesCount() > 0) {
94+
sourceIncludes = request.getSourceIncludesList().toArray(new String[0]);
95+
}
96+
97+
if (request.getSourceExcludesCount() > 0) {
98+
sourceExcludes = request.getSourceExcludesList().toArray(new String[0]);
99+
}
100+
101+
if (fetchSource != null || sourceIncludes != null || sourceExcludes != null) {
102+
return new FetchSourceContext(fetchSource == null ? true : fetchSource, sourceIncludes, sourceExcludes);
103+
}
104+
return null;
105+
}
106+
72107
/**
73108
* Converts a SourceConfig Protocol Buffer to a FetchSourceContext object.
74109
* Similar to {@link FetchSourceContext#fromXContent(XContentParser)}.
@@ -96,7 +131,8 @@ public static FetchSourceContext fromProto(SourceConfig sourceConfig) {
96131
includesList.add(s);
97132
}
98133
includes = includesList.toArray(new String[0]);
99-
} else if (!sourceFilter.getExcludesList().isEmpty()) {
134+
}
135+
if (!sourceFilter.getExcludesList().isEmpty()) {
100136
List<String> excludesList = new ArrayList<>();
101137
for (String s : sourceFilter.getExcludesList()) {
102138
excludesList.add(s);

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ObjectMapProtoUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static Map<String, Object> fromProto(ObjectMap objectMap) {
4949
* @param value The generic protobuf ObjectMap.Value to convert
5050
* @return A Protobuf builder .google.protobuf.Struct representation
5151
*/
52-
private static Object fromProto(ObjectMap.Value value) {
52+
public static Object fromProto(ObjectMap.Value value) {
5353
if (value.hasNullValue()) {
5454
// Null
5555
throw new UnsupportedOperationException("Cannot add null value in ObjectMap.value " + value.toString() + " to a Java map.");

plugins/transport-grpc/src/main/java/org/opensearch/plugin/transport/grpc/proto/request/common/ScriptProtoUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public static Script parseFromProtoRequest(org.opensearch.protobufs.Script scrip
4949
* Converts a Script Protocol Buffer to a Script object.
5050
* Similar to {@link Script#parse(XContentParser, String)}, which internally calls Script#build().
5151
*/
52-
private static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
52+
public static Script parseFromProtoRequest(org.opensearch.protobufs.Script script, String defaultLang) {
5353
Objects.requireNonNull(defaultLang);
5454

5555
if (script.hasInlineScript()) {
@@ -64,7 +64,7 @@ private static Script parseFromProtoRequest(org.opensearch.protobufs.Script scri
6464
/**
6565
* Parses a protobuf InlineScript to a Script object
6666
*/
67-
private static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
67+
public static Script parseInlineScript(InlineScript inlineScript, String defaultLang) {
6868

6969
ScriptType type = ScriptType.INLINE;
7070

@@ -86,7 +86,7 @@ private static Script parseInlineScript(InlineScript inlineScript, String defaul
8686
/**
8787
* Parses a protobuf StoredScriptId to a Script object
8888
*/
89-
private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
89+
public static Script parseStoredScriptId(StoredScriptId storedScriptId) {
9090
ScriptType type = ScriptType.STORED;
9191
String lang = null;
9292
String idOrCode = storedScriptId.getId();
@@ -98,7 +98,7 @@ private static Script parseStoredScriptId(StoredScriptId storedScriptId) {
9898
return new Script(type, lang, idOrCode, options, params);
9999
}
100100

101-
private static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
101+
public static String parseScriptLanguage(ScriptLanguage language, String defaultLang) {
102102
if (language.hasStringValue()) {
103103
return language.getStringValue();
104104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.plugin.transport.grpc.proto.request.search;
9+
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.protobufs.FieldCollapse;
12+
import org.opensearch.search.collapse.CollapseBuilder;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* Utility class for converting CollapseBuilder Protocol Buffers to objects
18+
*
19+
*/
20+
public class CollapseBuilderProtoUtils {
21+
22+
private CollapseBuilderProtoUtils() {
23+
// Utility class, no instances
24+
}
25+
26+
/**
27+
* Similar to {@link CollapseBuilder#fromXContent(XContentParser)}
28+
*
29+
* @param collapseProto
30+
*/
31+
32+
public static CollapseBuilder fromProto(FieldCollapse collapseProto) throws IOException {
33+
CollapseBuilder collapseBuilder = new CollapseBuilder(collapseProto.getField());
34+
35+
if (collapseProto.hasMaxConcurrentGroupSearches()) {
36+
collapseBuilder.setMaxConcurrentGroupRequests(collapseProto.getMaxConcurrentGroupSearches());
37+
}
38+
if (collapseProto.getInnerHitsCount() > 0) {
39+
collapseBuilder.setInnerHits(InnerHitsBuilderProtoUtils.fromProto(collapseProto.getInnerHitsList()));
40+
}
41+
42+
return collapseBuilder;
43+
}
44+
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.plugin.transport.grpc.proto.request.search;
9+
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.search.fetch.subphase.FieldAndFormat;
12+
13+
/**
14+
* Utility class for converting SearchSourceBuilder Protocol Buffers to objects
15+
*
16+
*/
17+
public class FieldAndFormatProtoUtils {
18+
19+
private FieldAndFormatProtoUtils() {
20+
// Utility class, no instances
21+
}
22+
23+
/**
24+
* Similar to {@link FieldAndFormat#fromXContent(XContentParser)}
25+
*
26+
* @param fieldAndFormatProto
27+
*/
28+
29+
public static FieldAndFormat fromProto(org.opensearch.protobufs.FieldAndFormat fieldAndFormatProto) {
30+
31+
// TODO how is this field used?
32+
// fieldAndFormatProto.getIncludeUnmapped();
33+
return new FieldAndFormat(fieldAndFormatProto.getField(), fieldAndFormatProto.getFormat());
34+
}
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.plugin.transport.grpc.proto.request.search;
9+
10+
import org.opensearch.core.xcontent.XContentParser;
11+
import org.opensearch.protobufs.Highlight;
12+
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
13+
14+
/**
15+
* Utility class for converting Highlight Protocol Buffers to objects
16+
*
17+
*/
18+
public class HighlightBuilderProtoUtils {
19+
20+
private HighlightBuilderProtoUtils() {
21+
// Utility class, no instances
22+
}
23+
24+
/**
25+
* Similar to {@link HighlightBuilder#fromXContent(XContentParser)}
26+
*
27+
* @param highlightProto
28+
*/
29+
30+
public static HighlightBuilder fromProto(Highlight highlightProto) {
31+
32+
throw new UnsupportedOperationException("highlight not supported yet");
33+
34+
/*
35+
HighlightBuilder highlightBuilder = new HighlightBuilder();
36+
// TODO populate highlightBuilder
37+
return highlightBuilder;
38+
*/
39+
40+
}
41+
42+
}

0 commit comments

Comments
 (0)