Skip to content

Commit 913e8c5

Browse files
committed
DATAJDBC-356 - Support for large datasets with stream
1 parent 4434a0f commit 913e8c5

File tree

6 files changed

+304
-5
lines changed

6 files changed

+304
-5
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.repository.query;
17+
18+
import java.lang.annotation.*;
19+
20+
/**
21+
* Annotation to mark a {@link org.springframework.data.jdbc.repository.query.Query} as
22+
* streamable. The resulting {@link java.util.stream.Stream} will wrap a <b>connected</b>
23+
* {@link java.sql.ResultSet} that will fetch rows as needed. It is responsibility of
24+
* the client code to close all resources via {@link java.util.stream.Stream#close()}.
25+
*
26+
* @author detinho
27+
*/
28+
@Retention(RetentionPolicy.RUNTIME)
29+
@Target(ElementType.METHOD)
30+
@Documented
31+
public @interface QueryStream {
32+
33+
/**
34+
* The number of rows fetched from the database when more rows are needed
35+
*/
36+
int fetchSize();
37+
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.repository.support;
17+
18+
import org.springframework.jdbc.InvalidResultSetAccessException;
19+
import org.springframework.jdbc.core.JdbcOperations;
20+
import org.springframework.jdbc.core.PreparedStatementCreator;
21+
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
22+
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
23+
import org.springframework.jdbc.datasource.DataSourceUtils;
24+
import org.springframework.jdbc.support.JdbcUtils;
25+
import org.springframework.util.Assert;
26+
27+
import java.sql.Connection;
28+
import java.sql.PreparedStatement;
29+
import java.sql.ResultSet;
30+
import java.sql.SQLException;
31+
32+
/**
33+
* An extended {@link NamedParameterJdbcTemplate} to return an {@link JdbcOpenSqlRowSet}.
34+
*
35+
* @author detinho
36+
*/
37+
class JdbcOpenRowSetTemplate extends NamedParameterJdbcTemplate {
38+
39+
JdbcOpenRowSetTemplate(JdbcOperations classicJdbcTemplate) {
40+
super(classicJdbcTemplate);
41+
}
42+
43+
JdbcOpenSqlRowSet queryForOpenCursorRowSet(String sql, SqlParameterSource paramSource, Integer fetchSize) {
44+
Assert.state(this.getJdbcTemplate().getDataSource() != null, "No DataSource set");
45+
Assert.state(fetchSize != null, "No fetchSize set");
46+
47+
Connection connection = DataSourceUtils.getConnection(this.getJdbcTemplate().getDataSource());
48+
PreparedStatementCreator preparedStatementCreator = this.getPreparedStatementCreator(sql, paramSource);
49+
PreparedStatement preparedStatement = null;
50+
ResultSet resultSet = null;
51+
try {
52+
preparedStatement = preparedStatementCreator.createPreparedStatement(connection);
53+
preparedStatement.setFetchSize(fetchSize);
54+
resultSet = preparedStatement.executeQuery();
55+
56+
return new JdbcOpenSqlRowSet(this.getJdbcTemplate().getDataSource(), resultSet);
57+
} catch (SQLException e) {
58+
JdbcUtils.closeResultSet(resultSet);
59+
JdbcUtils.closeStatement(preparedStatement);
60+
DataSourceUtils.releaseConnection(connection, this.getJdbcTemplate().getDataSource());
61+
62+
throw new InvalidResultSetAccessException(e);
63+
}
64+
}
65+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.repository.support;
17+
18+
import org.springframework.jdbc.InvalidResultSetAccessException;
19+
import org.springframework.jdbc.datasource.DataSourceUtils;
20+
import org.springframework.jdbc.support.JdbcUtils;
21+
import org.springframework.jdbc.support.rowset.ResultSetWrappingSqlRowSet;
22+
import org.springframework.jdbc.support.rowset.SqlRowSet;
23+
24+
import javax.sql.DataSource;
25+
import java.sql.Connection;
26+
import java.sql.ResultSet;
27+
import java.sql.SQLException;
28+
import java.sql.Statement;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
31+
/**
32+
* An implementation of Spring's {@link SqlRowSet} interface, wrapping a
33+
* <b>connected</b> {@link ResultSet}. This implementation also keeps track of
34+
* the {@link DataSource} that originated the {@link ResultSet} and releases
35+
* the resources when {@link JdbcOpenSqlRowSet#close()} is called.
36+
*
37+
* @author detinho
38+
*/
39+
class JdbcOpenSqlRowSet extends ResultSetWrappingSqlRowSet implements AutoCloseable {
40+
private final DataSource dataSource;
41+
private final AtomicBoolean closed = new AtomicBoolean(false);
42+
43+
/**
44+
* Create a new JdbcOpenSqlRowSet for the given ResultSet.
45+
*
46+
* @param resultSet a <b>connected</b> ResultSet to wrap. The client code is responsible
47+
* for closing the resources
48+
* @param dataSource the dataSource that originated the ResultSet connection
49+
* @throws InvalidResultSetAccessException if extracting
50+
* the ResultSetMetaData failed
51+
* @see java.sql.ResultSet#getMetaData
52+
* @see JdbcOpenSqlRowSet
53+
*/
54+
public JdbcOpenSqlRowSet(DataSource dataSource, ResultSet resultSet) {
55+
super(resultSet);
56+
this.dataSource = dataSource;
57+
}
58+
59+
@Override
60+
public void close() {
61+
if (this.closed.compareAndSet(false, true)) {
62+
try {
63+
final Statement statement = getResultSet().getStatement();
64+
final Connection connection = statement.getConnection();
65+
66+
JdbcUtils.closeResultSet(getResultSet());
67+
JdbcUtils.closeStatement(statement);
68+
DataSourceUtils.releaseConnection(connection, dataSource);
69+
} catch (SQLException e) {
70+
throw new InvalidResultSetAccessException(e);
71+
}
72+
}
73+
}
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2018-2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.repository.support;
17+
18+
import org.springframework.jdbc.InvalidResultSetAccessException;
19+
import org.springframework.jdbc.core.ResultSetExtractor;
20+
import org.springframework.jdbc.core.RowMapper;
21+
22+
import java.sql.SQLException;
23+
import java.util.Iterator;
24+
25+
/**
26+
* An iterator implementation that wraps a {@link JdbcOpenSqlRowSet} and applies
27+
* an {@link RowMapper} or {@link ResultSetExtractor} to each row.
28+
*
29+
* @author detinho
30+
*/
31+
class JdbcOpenSqlRowSetIterator<T> implements Iterator<T> {
32+
private final JdbcOpenSqlRowSet openCursorSqlRowSet;
33+
private final RowMapper<T> rowMapper;
34+
private final ResultSetExtractor<T> resultSetExtractor;
35+
private int rowIndex = 1;
36+
37+
public JdbcOpenSqlRowSetIterator(JdbcOpenSqlRowSet openCursorSqlRowSet,
38+
RowMapper<T> rowMapper, ResultSetExtractor<T> resultSetExtractor) {
39+
this.openCursorSqlRowSet = openCursorSqlRowSet;
40+
this.rowMapper = rowMapper;
41+
this.resultSetExtractor = resultSetExtractor;
42+
}
43+
44+
@Override
45+
public boolean hasNext() {
46+
return openCursorSqlRowSet.next();
47+
}
48+
49+
@Override
50+
public T next() {
51+
try {
52+
if (rowMapper != null) {
53+
return rowMapper.mapRow(openCursorSqlRowSet.getResultSet(), rowIndex++);
54+
} else {
55+
return resultSetExtractor.extractData(openCursorSqlRowSet.getResultSet());
56+
}
57+
} catch (SQLException e) {
58+
throw new InvalidResultSetAccessException(e);
59+
}
60+
}
61+
}

Diff for: spring-data-jdbc/src/main/java/org/springframework/data/jdbc/repository/support/JdbcQueryMethod.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@
1515
*/
1616
package org.springframework.data.jdbc.repository.support;
1717

18-
import java.lang.reflect.Method;
19-
2018
import org.springframework.core.annotation.AnnotatedElementUtils;
2119
import org.springframework.core.annotation.AnnotationUtils;
2220
import org.springframework.data.jdbc.repository.query.Modifying;
2321
import org.springframework.data.jdbc.repository.query.Query;
22+
import org.springframework.data.jdbc.repository.query.QueryStream;
2423
import org.springframework.data.projection.ProjectionFactory;
2524
import org.springframework.data.repository.core.RepositoryMetadata;
2625
import org.springframework.data.repository.query.QueryMethod;
2726
import org.springframework.jdbc.core.ResultSetExtractor;
2827
import org.springframework.jdbc.core.RowMapper;
2928
import org.springframework.lang.Nullable;
3029

30+
import java.lang.reflect.Method;
31+
3132
/**
3233
* {@link QueryMethod} implementation that implements a method by executing the query from a {@link Query} annotation on
3334
* that method. Binds method arguments to named parameters in the SQL statement.
@@ -95,4 +96,24 @@ private <T> T getMergedAnnotationAttribute(String attribute) {
9596
Query queryAnnotation = AnnotatedElementUtils.findMergedAnnotation(method, Query.class);
9697
return (T) AnnotationUtils.getValue(queryAnnotation, attribute);
9798
}
99+
100+
/**
101+
* Returns whether the resulting {@link java.util.stream.Stream} will be
102+
* backed by a {@link JdbcOpenSqlRowSet}
103+
* @return if the {@link java.util.stream.Stream} is backed by an {@link JdbcOpenSqlRowSet}, returns true
104+
*/
105+
public boolean isOpenStreamQuery() {
106+
return AnnotatedElementUtils.findMergedAnnotation(method, QueryStream.class) != null;
107+
}
108+
109+
/**
110+
* Returns the fetch size used by the backing {@link JdbcOpenSqlRowSet} to fetch more rows
111+
* when needed.
112+
* @return the number of rows to fetch when needed.
113+
*/
114+
@Nullable
115+
public Integer getStreamQueryFetchSized() {
116+
QueryStream queryStreamAnnotation = AnnotatedElementUtils.findMergedAnnotation(method, QueryStream.class);
117+
return (Integer) AnnotationUtils.getValue(queryStreamAnnotation, "fetchSize");
118+
}
98119
}

Diff for: spring-data-jdbc/src/main/java/org/springframework/data/jdbc/repository/support/JdbcRepositoryQuery.java

+43-3
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
*/
1616
package org.springframework.data.jdbc.repository.support;
1717

18-
import java.lang.reflect.Constructor;
19-
import java.util.List;
20-
2118
import org.springframework.beans.BeanUtils;
2219
import org.springframework.context.ApplicationEventPublisher;
2320
import org.springframework.dao.EmptyResultDataAccessException;
@@ -37,6 +34,14 @@
3734
import org.springframework.util.ClassUtils;
3835
import org.springframework.util.StringUtils;
3936

37+
import java.lang.reflect.Constructor;
38+
import java.util.List;
39+
import java.util.Spliterator;
40+
import java.util.Spliterators;
41+
import java.util.function.Supplier;
42+
import java.util.stream.Stream;
43+
import java.util.stream.StreamSupport;
44+
4045
/**
4146
* A query to be executed based on a repository method, it's annotated SQL query and the arguments provided to the
4247
* method.
@@ -103,9 +108,16 @@ private QueryExecutor<Object> createExecutor(JdbcQueryMethod queryMethod, @Nulla
103108
if (queryMethod.isModifyingQuery()) {
104109
return createModifyingQueryExecutor(query);
105110
}
111+
112+
if (queryMethod.isOpenStreamQuery()) {
113+
QueryExecutor<Object> innerExecutor = createRowMapperOpenQueryStreamExecutor(query, rowMapper, extractor);
114+
return createOpenStreamQueryExecutor(innerExecutor);
115+
}
116+
106117
if (queryMethod.isCollectionQuery() || queryMethod.isStreamQuery()) {
107118
QueryExecutor<Object> innerExecutor = extractor != null ? createResultSetExtractorQueryExecutor(query, extractor)
108119
: createListRowMapperQueryExecutor(query, rowMapper);
120+
109121
return createCollectionQueryExecutor(innerExecutor);
110122
}
111123

@@ -170,6 +182,19 @@ private QueryExecutor<Object> createModifyingQueryExecutor(String query) {
170182
};
171183
}
172184

185+
private QueryExecutor<Object> createOpenStreamQueryExecutor(QueryExecutor<Object> executor) {
186+
return parameters -> {
187+
188+
Stream<?> result = (Stream<?>) executor.execute(parameters);
189+
190+
Assert.notNull(result, "A stream valued result must never be null.");
191+
192+
return result.peek(element -> {
193+
publishAfterLoad(element);
194+
});
195+
};
196+
}
197+
173198
private QueryExecutor<Object> createListRowMapperQueryExecutor(String query, RowMapper<?> rowMapper) {
174199
return parameters -> operations.query(query, parameters, rowMapper);
175200
}
@@ -183,6 +208,21 @@ private QueryExecutor<Object> createResultSetExtractorQueryExecutor(String query
183208
return parameters -> operations.query(query, parameters, resultSetExtractor);
184209
}
185210

211+
private QueryExecutor<Object> createRowMapperOpenQueryStreamExecutor(String query, RowMapper rowMapper, ResultSetExtractor extractor) {
212+
JdbcOpenRowSetTemplate openResultSetNamedParameterJdbcTemplate =
213+
new JdbcOpenRowSetTemplate(operations.getJdbcOperations());
214+
return parameters -> {
215+
final JdbcOpenSqlRowSet rowSet =
216+
openResultSetNamedParameterJdbcTemplate.queryForOpenCursorRowSet(query, parameters, queryMethod.getStreamQueryFetchSized());
217+
218+
final Spliterator<Object> spliterator = Spliterators
219+
.spliteratorUnknownSize(new JdbcOpenSqlRowSetIterator<Object>(rowSet, rowMapper, extractor), Spliterator.IMMUTABLE);
220+
final Supplier<Spliterator<Object>> supplier = () -> spliterator;
221+
return StreamSupport.stream(supplier, Spliterator.IMMUTABLE, false)
222+
.onClose(rowSet::close);
223+
};
224+
}
225+
186226
/*
187227
* (non-Javadoc)
188228
* @see org.springframework.data.repository.query.RepositoryQuery#getQueryMethod()

0 commit comments

Comments
 (0)