Skip to content

Commit fcc2222

Browse files
mp911dechristophstrobl
authored andcommitted
Switch to Flux.flatMapSequential(…) to prevent backpressure shaping.
We now use Flux.flatMapSequential(…) instead of concatMap as concatMap reduces the request size to 1. The change in backpressure/request size reduces parallelism and impacts the batch size by fetching 2 documents instead of considering the actual backpressure. flatMapSequential doesn't tamper the requested amount while retaining the sequence order. Closes: #4543 Original Pull Request: #4550
1 parent 6413b20 commit fcc2222

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

Diff for: spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/DefaultReactiveBulkOperations.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ private Mono<BulkWriteResult> bulkWriteTo(MongoCollection<Document> collection)
216216
collection = collection.withWriteConcern(defaultWriteConcern);
217217
}
218218

219-
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMap(it -> {
219+
Flux<SourceAwareWriteModelHolder> concat = Flux.concat(models).flatMapSequential(it -> {
220220

221221
if (it.model()instanceof InsertOneModel<Document> iom) {
222222

Diff for: spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReactiveMongoTemplate.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -1051,7 +1051,7 @@ private <O> Flux<O> aggregateAndMap(MongoCollection<Document> collection, List<D
10511051
return (isOutOrMerge ? Flux.from(cursor.toCollection()) : Flux.from(cursor.first())).thenMany(Mono.empty());
10521052
}
10531053

1054-
return Flux.from(cursor).concatMap(readCallback::doWith);
1054+
return Flux.from(cursor).flatMapSequential(readCallback::doWith);
10551055
}
10561056

10571057
@Override
@@ -1098,7 +1098,7 @@ protected <T> Flux<GeoResult<T>> geoNear(NearQuery near, Class<?> entityClass, S
10981098
.withOptions(optionsBuilder.build());
10991099

11001100
return aggregate($geoNear, collection, Document.class) //
1101-
.concatMap(callback::doWith);
1101+
.flatMapSequential(callback::doWith);
11021102
}
11031103

11041104
@Override
@@ -1324,7 +1324,7 @@ public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> batchToSave
13241324

13251325
Assert.notNull(batchToSave, "Batch to insert must not be null");
13261326

1327-
return Flux.from(batchToSave).flatMap(collection -> insert(collection, collectionName));
1327+
return Flux.from(batchToSave).flatMapSequential(collection -> insert(collection, collectionName));
13281328
}
13291329

13301330
@Override
@@ -1392,7 +1392,7 @@ public <T> Flux<T> insertAll(Collection<? extends T> objectsToSave) {
13921392

13931393
@Override
13941394
public <T> Flux<T> insertAll(Mono<? extends Collection<? extends T>> objectsToSave) {
1395-
return Flux.from(objectsToSave).flatMap(this::insertAll);
1395+
return Flux.from(objectsToSave).flatMapSequential(this::insertAll);
13961396
}
13971397

13981398
protected <T> Flux<T> doInsertAll(Collection<? extends T> listToSave, MongoWriter<Object> writer) {
@@ -1443,7 +1443,7 @@ protected <T> Flux<T> doInsertBatch(String collectionName, Collection<? extends
14431443
return insertDocumentList(collectionName, documents).thenMany(Flux.fromIterable(tuples));
14441444
});
14451445

1446-
return insertDocuments.flatMap(tuple -> {
1446+
return insertDocuments.flatMapSequential(tuple -> {
14471447

14481448
Document document = tuple.getT2();
14491449
Object id = MappedDocument.of(document).getId();
@@ -1600,7 +1600,7 @@ protected Flux<ObjectId> insertDocumentList(String collectionName, List<Document
16001600

16011601
return collectionToUse.insertMany(documents);
16021602

1603-
}).flatMap(s -> {
1603+
}).flatMapSequential(s -> {
16041604

16051605
return Flux.fromStream(documents.stream() //
16061606
.map(MappedDocument::of) //
@@ -2187,7 +2187,7 @@ public <T> Flux<T> mapReduce(Query filterQuery, Class<?> domainType, String inpu
21872187
publisher = collation.map(Collation::toMongoCollation).map(publisher::collation).orElse(publisher);
21882188

21892189
return Flux.from(publisher)
2190-
.concatMap(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
2190+
.flatMapSequential(new ReadDocumentCallback<>(mongoConverter, resultType, inputCollectionName)::doWith);
21912191
});
21922192
}
21932193

@@ -2255,7 +2255,7 @@ protected <T> Flux<T> doFindAndDelete(String collectionName, Query query, Class<
22552255

22562256
return Flux.from(flux).collectList().filter(it -> !it.isEmpty())
22572257
.flatMapMany(list -> Flux.from(remove(operations.getByIdInQuery(list), entityClass, collectionName))
2258-
.flatMap(deleteResult -> Flux.fromIterable(list)));
2258+
.flatMapSequential(deleteResult -> Flux.fromIterable(list)));
22592259
}
22602260

22612261
/**
@@ -2729,7 +2729,7 @@ private <T> Flux<T> executeFindMultiInternal(ReactiveCollectionQueryCallback<Doc
27292729

27302730
return createFlux(collectionName, collection -> {
27312731
return Flux.from(preparer.initiateFind(collection, collectionCallback::doInCollection))
2732-
.concatMap(objectCallback::doWith);
2732+
.flatMapSequential(objectCallback::doWith);
27332733
});
27342734
}
27352735

Diff for: spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public <S extends T> Flux<S> saveAll(Publisher<S> entityStream) {
121121

122122
Assert.notNull(entityStream, "The given Publisher of entities must not be null");
123123

124-
return Flux.from(entityStream).flatMap(entity -> entityInformation.isNew(entity) ? //
124+
return Flux.from(entityStream).flatMapSequential(entity -> entityInformation.isNew(entity) ? //
125125
mongoOperations.insert(entity, entityInformation.getCollectionName()) : //
126126
mongoOperations.save(entity, entityInformation.getCollectionName()));
127127
}
@@ -191,7 +191,7 @@ public Flux<T> findAllById(Publisher<ID> ids) {
191191
Assert.notNull(ids, "The given Publisher of Id's must not be null");
192192

193193
Optional<ReadPreference> readPreference = getReadPreference();
194-
return Flux.from(ids).buffer().flatMap(listOfIds -> {
194+
return Flux.from(ids).buffer().flatMapSequential(listOfIds -> {
195195
Query query = getIdQuery(listOfIds);
196196
readPreference.ifPresent(query::withReadPreference);
197197
return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName());
@@ -345,7 +345,8 @@ public <S extends T> Flux<S> insert(Publisher<S> entities) {
345345

346346
Assert.notNull(entities, "The given Publisher of entities must not be null");
347347

348-
return Flux.from(entities).flatMap(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
348+
return Flux.from(entities)
349+
.flatMapSequential(entity -> mongoOperations.insert(entity, entityInformation.getCollectionName()));
349350
}
350351

351352
// -------------------------------------------------------------------------

Diff for: spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/ReactiveMongoTemplateUnitTests.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.Map;
3535
import java.util.Objects;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicLong;
3738
import java.util.stream.Collectors;
3839

3940
import org.assertj.core.api.Assertions;
@@ -697,6 +698,28 @@ void aggreateShouldUseReadReadPreference() {
697698
verify(collection).withReadPreference(ReadPreference.primaryPreferred());
698699
}
699700

701+
@Test // GH-4543
702+
void aggregateDoesNotLimitBackpressure() {
703+
704+
reset(collection);
705+
706+
AtomicLong request = new AtomicLong();
707+
Publisher<Document> realPublisher = Flux.just(new Document()).doOnRequest(request::addAndGet);
708+
709+
doAnswer(invocation -> {
710+
Subscriber<Document> subscriber = invocation.getArgument(0);
711+
realPublisher.subscribe(subscriber);
712+
return null;
713+
}).when(aggregatePublisher).subscribe(any());
714+
715+
when(collection.aggregate(anyList())).thenReturn(aggregatePublisher);
716+
when(collection.aggregate(anyList(), any(Class.class))).thenReturn(aggregatePublisher);
717+
718+
template.aggregate(newAggregation(Sith.class, project("id")), AutogenerateableId.class, Document.class).subscribe();
719+
720+
assertThat(request).hasValueGreaterThan(128);
721+
}
722+
700723
@Test // DATAMONGO-1854
701724
void aggreateShouldUseCollationFromOptionsEvenIfDefaultCollationIsPresent() {
702725

@@ -1261,6 +1284,17 @@ void findShouldInvokeAfterConvertCallbacks() {
12611284
assertThat(results.get(0).id).isEqualTo("after-convert");
12621285
}
12631286

1287+
@Test // GH-4543
1288+
void findShouldNotLimitBackpressure() {
1289+
1290+
AtomicLong request = new AtomicLong();
1291+
stubFindSubscribe(new Document(), request);
1292+
1293+
template.find(new Query(), Person.class).subscribe();
1294+
1295+
assertThat(request).hasValueGreaterThan(128);
1296+
}
1297+
12641298
@Test // DATAMONGO-2479
12651299
void findByIdShouldInvokeAfterConvertCallbacks() {
12661300

@@ -1706,8 +1740,12 @@ public WriteConcern resolve(MongoAction action) {
17061740
}
17071741

17081742
private void stubFindSubscribe(Document document) {
1743+
stubFindSubscribe(document, new AtomicLong());
1744+
}
1745+
1746+
private void stubFindSubscribe(Document document, AtomicLong request) {
17091747

1710-
Publisher<Document> realPublisher = Flux.just(document);
1748+
Publisher<Document> realPublisher = Flux.just(document).doOnRequest(request::addAndGet);
17111749

17121750
doAnswer(invocation -> {
17131751
Subscriber<Document> subscriber = invocation.getArgument(0);

0 commit comments

Comments
 (0)