Skip to content

Commit 834b10f

Browse files
committed
Remove blocking code in SearchDocument processing.
Original Pull Request #2094 Closes #2025 (cherry picked from commit c1a1ea9)
1 parent 823cfa9 commit 834b10f

File tree

6 files changed

+88
-46
lines changed

6 files changed

+88
-46
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchRestTransportTemplate.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Iterator;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
2324
import java.util.stream.Collectors;
2425
import java.util.stream.Stream;
2526

@@ -109,7 +110,8 @@ public <T> List<SearchHits<T>> multiSearch(List<? extends Query> queries, Class<
109110
List<SearchHits<T>> res = new ArrayList<>(queries.size());
110111
int c = 0;
111112
for (Query query : queries) {
112-
res.add(callback.doWith(SearchDocumentResponse.from(items[c++].getResponse(), documentCallback::doWith)));
113+
res.add(
114+
callback.doWith(SearchDocumentResponse.from(items[c++].getResponse(), getEntityCreator(documentCallback))));
113115
}
114116
return res;
115117
}
@@ -142,7 +144,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
142144
index);
143145

144146
SearchResponse response = items[c++].getResponse();
145-
res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)));
147+
res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))));
146148
}
147149
return res;
148150
}
@@ -175,7 +177,7 @@ public List<SearchHits<?>> multiSearch(List<? extends Query> queries, List<Class
175177
index);
176178

177179
SearchResponse response = items[c++].getResponse();
178-
res.add(callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith)));
180+
res.add(callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback))));
179181
}
180182
return res;
181183
}
@@ -215,5 +217,9 @@ public SearchResponse suggest(SuggestBuilder suggestion, Class<?> clazz) {
215217
return suggest(suggestion, getIndexCoordinatesFor(clazz));
216218
}
217219

220+
protected <T> SearchDocumentResponse.EntityCreator<T> getEntityCreator(ReadDocumentCallback<T> documentCallback) {
221+
return searchDocument -> CompletableFuture.completedFuture(documentCallback.doWith(searchDocument));
222+
}
223+
218224
// endregion
219225
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2013-2021 the original author or authors.
2+
* Copyright 2013-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -319,7 +319,7 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
319319
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
320320
SearchDocumentResponseCallback<SearchHits<T>> callback = new ReadSearchDocumentResponseCallback<>(clazz, index);
321321

322-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
322+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
323323
}
324324

325325
@Override
@@ -336,7 +336,7 @@ public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query
336336
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
337337
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
338338
index);
339-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
339+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
340340
}
341341

342342
@Override
@@ -351,7 +351,7 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
351351
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
352352
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
353353
index);
354-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
354+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
355355
}
356356

357357
@Override
@@ -378,8 +378,8 @@ protected MultiSearchResponse.Item[] getMultiSearchResult(MultiSearchRequest req
378378
Assert.isTrue(items.length == request.requests().size(), "Response should has same length with queries");
379379
return items;
380380
}
381-
// endregion
382381

382+
// endregion
383383
// region ClientCallback
384384
/**
385385
* Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
354354

355355
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
356356
SearchDocumentResponseCallback<SearchHits<T>> callback = new ReadSearchDocumentResponseCallback<>(clazz, index);
357-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
357+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
358358
}
359359

360360
@Override
@@ -372,7 +372,7 @@ public <T> SearchScrollHits<T> searchScrollStart(long scrollTimeInMillis, Query
372372
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
373373
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
374374
index);
375-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
375+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
376376
}
377377

378378
@Override
@@ -389,7 +389,7 @@ public <T> SearchScrollHits<T> searchScrollContinue(@Nullable String scrollId, l
389389
ReadDocumentCallback<T> documentCallback = new ReadDocumentCallback<T>(elasticsearchConverter, clazz, index);
390390
SearchDocumentResponseCallback<SearchScrollHits<T>> callback = new ReadSearchScrollDocumentResponseCallback<>(clazz,
391391
index);
392-
return callback.doWith(SearchDocumentResponse.from(response, documentCallback::doWith));
392+
return callback.doWith(SearchDocumentResponse.from(response, getEntityCreator(documentCallback)));
393393
}
394394

395395
@Override

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+11-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.function.Function;
2726
import java.util.stream.Collectors;
2827

2928
import org.elasticsearch.Version;
@@ -771,15 +770,18 @@ private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinate
771770
});
772771
}
773772

774-
private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
773+
private <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> clazz, IndexCoordinates index) {
775774

776775
return Mono.defer(() -> {
777776
SearchRequest request = requestFactory.searchRequest(query, clazz, index);
778777
request = prepareSearchRequest(request, false);
779778

780779
SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index);
780+
// noinspection unchecked
781+
SearchDocumentResponse.EntityCreator<T> entityCreator = searchDocument -> ((Mono<T>) documentCallback
782+
.toEntity(searchDocument)).toFuture();
781783

782-
return doFindForResponse(request, searchDocument -> documentCallback.toEntity(searchDocument).block());
784+
return doFindForResponse(request, entityCreator);
783785
});
784786
}
785787

@@ -896,19 +898,18 @@ protected Flux<SearchDocument> doFind(SearchRequest request) {
896898
* Customization hook on the actual execution result {@link Mono}. <br />
897899
*
898900
* @param request the already prepared {@link SearchRequest} ready to be executed.
899-
* @param suggestEntityCreator
901+
* @param entityCreator
900902
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
901903
*/
902-
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
903-
Function<SearchDocument, ? extends Object> suggestEntityCreator) {
904+
protected <T> Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
905+
SearchDocumentResponse.EntityCreator<T> entityCreator) {
904906

905907
if (QUERY_LOGGER.isDebugEnabled()) {
906908
QUERY_LOGGER.debug("Executing doFindForResponse: {}", request);
907909
}
908910

909-
return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(searchResponse -> {
910-
return SearchDocumentResponse.from(searchResponse, suggestEntityCreator);
911-
});
911+
return Mono.from(execute(client -> client.searchForResponse(request)))
912+
.map(searchResponse -> SearchDocumentResponse.from(searchResponse, entityCreator));
912913
}
913914

914915
/**

Diff for: src/main/java/org/springframework/data/elasticsearch/core/document/SearchDocumentResponse.java

+37-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,8 +17,11 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.function.Function;
2122

23+
import org.apache.commons.logging.Log;
24+
import org.apache.commons.logging.LogFactory;
2225
import org.apache.lucene.search.TotalHits;
2326
import org.elasticsearch.action.search.SearchResponse;
2427
import org.elasticsearch.common.text.Text;
@@ -38,13 +41,15 @@
3841

3942
/**
4043
* This represents the complete search response from Elasticsearch, including the returned documents. Instances must be
41-
* created with the {@link #from(SearchResponse,Function)} method.
44+
* created with the {@link #from(SearchResponse, EntityCreator)} method.
4245
*
4346
* @author Peter-Josef Meisch
4447
* @since 4.0
4548
*/
4649
public class SearchDocumentResponse {
4750

51+
private static final Log LOGGER = LogFactory.getLog(SearchDocumentResponse.class);
52+
4853
private final long totalHits;
4954
private final String totalHitsRelation;
5055
private final float maxScore;
@@ -98,12 +103,11 @@ public Suggest getSuggest() {
98103
* creates a SearchDocumentResponse from the {@link SearchResponse}
99104
*
100105
* @param searchResponse must not be {@literal null}
101-
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument}
106+
* @param entityCreator function to create an entity from a {@link SearchDocument}
102107
* @param <T> entity type
103108
* @return the SearchDocumentResponse
104109
*/
105-
public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
106-
Function<SearchDocument, T> suggestEntityCreator) {
110+
public static <T> SearchDocumentResponse from(SearchResponse searchResponse, EntityCreator<T> entityCreator) {
107111

108112
Assert.notNull(searchResponse, "searchResponse must not be null");
109113

@@ -112,7 +116,7 @@ public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
112116
Aggregations aggregations = searchResponse.getAggregations();
113117
org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest();
114118

115-
return from(searchHits, scrollId, aggregations, suggest, suggestEntityCreator);
119+
return from(searchHits, scrollId, aggregations, suggest, entityCreator);
116120
}
117121

118122
/**
@@ -122,14 +126,14 @@ public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
122126
* @param scrollId scrollId
123127
* @param aggregations aggregations
124128
* @param suggestES the suggestion response from Elasticsearch
125-
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument}
129+
* @param entityCreator function to create an entity from a {@link SearchDocument}
126130
* @param <T> entity type
127131
* @return the {@link SearchDocumentResponse}
128132
* @since 4.3
129133
*/
130134
public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId,
131135
@Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES,
132-
Function<SearchDocument, T> suggestEntityCreator) {
136+
EntityCreator<T> entityCreator) {
133137

134138
TotalHits responseTotalHits = searchHits.getTotalHits();
135139

@@ -153,14 +157,14 @@ public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable S
153157
}
154158
}
155159

156-
Suggest suggest = suggestFrom(suggestES, suggestEntityCreator);
160+
Suggest suggest = suggestFrom(suggestES, entityCreator);
157161
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations,
158162
suggest);
159163
}
160164

161165
@Nullable
162166
private static <T> Suggest suggestFrom(@Nullable org.elasticsearch.search.suggest.Suggest suggestES,
163-
Function<SearchDocument, T> entityCreator) {
167+
EntityCreator<T> entityCreator) {
164168

165169
if (suggestES == null) {
166170
return null;
@@ -219,7 +223,19 @@ private static <T> Suggest suggestFrom(@Nullable org.elasticsearch.search.sugges
219223
List<CompletionSuggestion.Entry.Option<T>> options = new ArrayList<>();
220224
for (org.elasticsearch.search.suggest.completion.CompletionSuggestion.Entry.Option optionES : entryES) {
221225
SearchDocument searchDocument = optionES.getHit() != null ? DocumentAdapters.from(optionES.getHit()) : null;
222-
T hitEntity = searchDocument != null ? entityCreator.apply(searchDocument) : null;
226+
227+
T hitEntity = null;
228+
229+
if (searchDocument != null) {
230+
try {
231+
hitEntity = entityCreator.apply(searchDocument).get();
232+
} catch (Exception e) {
233+
if (LOGGER.isWarnEnabled()) {
234+
LOGGER.warn("Error creating entity from SearchDocument");
235+
}
236+
}
237+
}
238+
223239
options.add(new CompletionSuggestion.Entry.Option<T>(textToString(optionES.getText()),
224240
textToString(optionES.getHighlighted()), optionES.getScore(), optionES.collateMatch(),
225241
optionES.getContexts(), scoreDocFrom(optionES.getDoc()), searchDocument, hitEntity));
@@ -254,4 +270,14 @@ private static ScoreDoc scoreDocFrom(@Nullable org.apache.lucene.search.ScoreDoc
254270
private static String textToString(@Nullable Text text) {
255271
return text != null ? text.string() : "";
256272
}
273+
274+
/**
275+
* A function to convert a {@link SearchDocument} async into an entity. Asynchronous so that it can be used from the
276+
* imperative and the reactive code.
277+
*
278+
* @param <T> the entity type
279+
*/
280+
@FunctionalInterface
281+
public interface EntityCreator<T> extends Function<SearchDocument, CompletableFuture<T>> {}
282+
257283
}

0 commit comments

Comments
 (0)