Skip to content

Commit 7582617

Browse files
authored
Fix reactive blocking calls.
Original Pull Request #1825 Closes #1824
1 parent e8f73b7 commit 7582617

11 files changed

+281
-98
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,10 @@ protected void initialize(ElasticsearchConverter elasticsearchConverter) {
101101
this.routingResolver = new DefaultRoutingResolver((SimpleElasticsearchMappingContext) mappingContext);
102102

103103
requestFactory = new RequestFactory(elasticsearchConverter);
104-
VersionInfo.logVersions(getClusterVersion());
104+
105+
// initialize the VersionInfo class in the initialization phase
106+
// noinspection ResultOfMethodCallIgnored
107+
VersionInfo.versionProperties();
105108
}
106109

107110
/**
@@ -166,6 +169,16 @@ public void setRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) {
166169
public RefreshPolicy getRefreshPolicy() {
167170
return refreshPolicy;
168171
}
172+
173+
/**
174+
* logs the versions of the different Elasticsearch components.
175+
*
176+
* @since 4.3
177+
*/
178+
public void logVersions() {
179+
VersionInfo.logVersions(getClusterVersion());
180+
}
181+
169182
// endregion
170183

171184
// region DocumentOperations

src/main/java/org/springframework/data/elasticsearch/core/DefaultReactiveIndexOperations.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@
4949
import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest;
5050
import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest;
5151
import org.springframework.data.elasticsearch.core.index.GetTemplateRequest;
52-
import org.springframework.data.elasticsearch.core.index.MappingBuilder;
5352
import org.springframework.data.elasticsearch.core.index.PutTemplateRequest;
53+
import org.springframework.data.elasticsearch.core.index.ReactiveMappingBuilder;
5454
import org.springframework.data.elasticsearch.core.index.Settings;
5555
import org.springframework.data.elasticsearch.core.index.TemplateData;
5656
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
@@ -190,8 +190,7 @@ public Mono<Document> createMapping(Class<?> clazz) {
190190
}
191191
}
192192

193-
String mapping = new MappingBuilder(converter).buildPropertyMapping(clazz);
194-
return Mono.just(Document.parse(mapping));
193+
return new ReactiveMappingBuilder(converter).buildReactivePropertyMapping(clazz).map(Document::parse);
195194
}
196195

197196
@Override

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ public ReactiveElasticsearchTemplate(ReactiveElasticsearchClient client, Elastic
142142
this.operations = new EntityOperations(this.mappingContext);
143143
this.requestFactory = new RequestFactory(converter);
144144

145-
logVersions();
145+
// initialize the VersionInfo class in the initialization phase
146+
// noinspection ResultOfMethodCallIgnored
147+
VersionInfo.versionProperties();
146148
}
147149

148150
private ReactiveElasticsearchTemplate copy() {
@@ -155,11 +157,14 @@ private ReactiveElasticsearchTemplate copy() {
155157
return copy;
156158
}
157159

158-
private void logVersions() {
159-
getClusterVersion() //
160-
.doOnSuccess(VersionInfo::logVersions) //
161-
.doOnError(e -> VersionInfo.logVersions(null)) //
162-
.subscribe();
160+
/**
161+
* logs the versions of the different Elasticsearch components.
162+
*
163+
* @return a Mono signalling finished execution
164+
* @since 4.3
165+
*/
166+
public Mono<Void> logVersions() {
167+
return getClusterVersion().doOnNext(VersionInfo::logVersions).then();
163168
}
164169

165170
@Override

src/main/java/org/springframework/data/elasticsearch/core/ReactiveResourceUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public static Mono<String> readFileFromClasspath(String url) {
6363

6464
String line;
6565
while ((line = br.readLine()) != null) {
66-
sb.append(line);
66+
sb.append(line).append('\n');
6767
}
6868

6969
sink.next(sb.toString());

src/main/java/org/springframework/data/elasticsearch/core/index/MappingBuilder.java

+30-12
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public class MappingBuilder {
9797
private static final String DYNAMIC_DATE_FORMATS = "dynamic_date_formats";
9898
private static final String RUNTIME = "runtime";
9999

100-
private final ElasticsearchConverter elasticsearchConverter;
100+
protected final ElasticsearchConverter elasticsearchConverter;
101101

102102
private boolean writeTypeHints = true;
103103

@@ -113,9 +113,16 @@ public MappingBuilder(ElasticsearchConverter elasticsearchConverter) {
113113
*/
114114
public String buildPropertyMapping(Class<?> clazz) throws MappingException {
115115

116+
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
117+
.getRequiredPersistentEntity(clazz);
118+
119+
return buildPropertyMapping(entity, getRuntimeFields(entity));
120+
}
121+
122+
protected String buildPropertyMapping(ElasticsearchPersistentEntity<?> entity,
123+
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) {
124+
116125
try {
117-
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
118-
.getRequiredPersistentEntity(clazz);
119126

120127
writeTypeHints = entity.writeTypeHints();
121128

@@ -124,7 +131,8 @@ public String buildPropertyMapping(Class<?> clazz) throws MappingException {
124131
// Dynamic templates
125132
addDynamicTemplatesMapping(builder, entity);
126133

127-
mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class));
134+
mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class),
135+
runtimeFields);
128136

129137
builder.endObject() // root object
130138
.close();
@@ -148,7 +156,8 @@ private void writeTypeHintMapping(XContentBuilder builder) throws IOException {
148156

149157
private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity<?> entity,
150158
boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType,
151-
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping) throws IOException {
159+
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping,
160+
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) throws IOException {
152161

153162
if (entity != null && entity.isAnnotationPresent(Mapping.class)) {
154163
Mapping mappingAnnotation = entity.getRequiredAnnotation(Mapping.class);
@@ -170,8 +179,8 @@ private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersisten
170179
builder.field(DYNAMIC_DATE_FORMATS, mappingAnnotation.dynamicDateFormats());
171180
}
172181

173-
if (StringUtils.hasText(mappingAnnotation.runtimeFieldsPath())) {
174-
addRuntimeFields(builder, mappingAnnotation.runtimeFieldsPath());
182+
if (runtimeFields != null) {
183+
builder.field(RUNTIME, runtimeFields);
175184
}
176185
}
177186

@@ -227,13 +236,22 @@ private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersisten
227236

228237
}
229238

230-
private void addRuntimeFields(XContentBuilder builder, String runtimeFieldsPath) throws IOException {
239+
@Nullable
240+
private org.springframework.data.elasticsearch.core.document.Document getRuntimeFields(
241+
@Nullable ElasticsearchPersistentEntity<?> entity) {
231242

232-
ClassPathResource runtimeFields = new ClassPathResource(runtimeFieldsPath);
243+
if (entity != null) {
244+
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
245+
if (mappingAnnotation != null) {
246+
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();
233247

234-
if (runtimeFields.exists()) {
235-
builder.rawField(RUNTIME, runtimeFields.getInputStream(), XContentType.JSON);
248+
if (hasText(runtimeFieldsPath)) {
249+
String jsonString = ResourceUtil.readFileFromClasspath(runtimeFieldsPath);
250+
return org.springframework.data.elasticsearch.core.document.Document.parse(jsonString);
251+
}
252+
}
236253
}
254+
return null;
237255
}
238256

239257
private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
@@ -291,7 +309,7 @@ private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
291309
: null;
292310

293311
mapEntity(builder, persistentEntity, false, property.getFieldName(), true, fieldAnnotation.type(),
294-
fieldAnnotation, dynamicMapping);
312+
fieldAnnotation, dynamicMapping, null);
295313
return;
296314
}
297315
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core.index;
17+
18+
import static org.springframework.util.StringUtils.*;
19+
20+
import reactor.core.publisher.Mono;
21+
22+
import org.springframework.data.elasticsearch.annotations.Mapping;
23+
import org.springframework.data.elasticsearch.core.ReactiveResourceUtil;
24+
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
25+
import org.springframework.data.elasticsearch.core.document.Document;
26+
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
27+
import org.springframework.data.mapping.MappingException;
28+
import org.springframework.lang.Nullable;
29+
30+
/**
31+
* Subclass of {@link MappingBuilder} with specialized methods TO inhibit blocking CALLS
32+
*
33+
* @author Peter-Josef Meisch
34+
* @since 4.3
35+
*/
36+
public class ReactiveMappingBuilder extends MappingBuilder {
37+
38+
public ReactiveMappingBuilder(ElasticsearchConverter elasticsearchConverter) {
39+
super(elasticsearchConverter);
40+
}
41+
42+
@Override
43+
public String buildPropertyMapping(Class<?> clazz) throws MappingException {
44+
throw new UnsupportedOperationException(
45+
"Use ReactiveMappingBuilder.buildReactivePropertyMapping() instead of buildPropertyMapping()");
46+
}
47+
48+
public Mono<String> buildReactivePropertyMapping(Class<?> clazz) throws MappingException {
49+
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
50+
.getRequiredPersistentEntity(clazz);
51+
52+
return getRuntimeFields(entity) //
53+
.switchIfEmpty(Mono.just(Document.create())) //
54+
.map(document -> {
55+
if (document.isEmpty()) {
56+
return buildPropertyMapping(entity, null);
57+
} else {
58+
return buildPropertyMapping(entity, document);
59+
}
60+
});
61+
}
62+
63+
private Mono<Document> getRuntimeFields(@Nullable ElasticsearchPersistentEntity<?> entity) {
64+
65+
if (entity != null) {
66+
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
67+
if (mappingAnnotation != null) {
68+
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();
69+
70+
if (hasText(runtimeFieldsPath)) {
71+
return ReactiveResourceUtil.readFileFromClasspath(runtimeFieldsPath).map(Document::parse);
72+
}
73+
}
74+
}
75+
76+
return Mono.empty();
77+
}
78+
}

0 commit comments

Comments
 (0)