Skip to content

Commit 16925b5

Browse files
committed
Polishing
Tweak fetchsize defaulting to ignore simple batch statements. Introduce FixedFetchSize type to aid fixed fetch size debugging. Rearrange methods. Reuse statement cache. [closes #256][closes #257]
1 parent 104a7c5 commit 16925b5

File tree

6 files changed

+54
-31
lines changed

6 files changed

+54
-31
lines changed

src/main/java/io/r2dbc/postgresql/ExtendedQueryPostgresqlStatement.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import java.util.regex.Matcher;
3737

3838
import static io.r2dbc.postgresql.client.ExtendedQueryMessageFlow.PARAMETER_SYMBOL;
39-
import static io.r2dbc.postgresql.message.frontend.Execute.NO_LIMIT;
4039
import static io.r2dbc.postgresql.util.PredicateUtils.not;
4140
import static io.r2dbc.postgresql.util.PredicateUtils.or;
4241

@@ -53,7 +52,7 @@ final class ExtendedQueryPostgresqlStatement implements PostgresqlStatement {
5352

5453
private final String sql;
5554

56-
private int fetchSize = NO_LIMIT;
55+
private int fetchSize;
5756

5857
private String[] generatedColumns;
5958

src/main/java/io/r2dbc/postgresql/IndefiniteStatementCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public String toString() {
8787
private Mono<String> parse(String sql, int[] types) {
8888
String name = "S_" + this.counter.getAndIncrement();
8989

90-
ExceptionFactory factory = ExceptionFactory.withSql(name);
90+
ExceptionFactory factory = ExceptionFactory.withSql(sql);
9191
return ExtendedQueryMessageFlow
9292
.parse(this.client, name, sql, types)
9393
.handle(factory::handleErrorResponse)

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionConfiguration.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ public final class PostgresqlConnectionConfiguration {
8484
private final SSLConfig sslConfig;
8585

8686
private PostgresqlConnectionConfiguration(String applicationName, boolean autodetectExtensions,
87-
@Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions, ToIntFunction<String> fetchSize, boolean forceBinary, @Nullable String host,
87+
@Nullable Duration connectTimeout, @Nullable String database, List<Extension> extensions, ToIntFunction<String> fetchSize, boolean forceBinary,
88+
@Nullable String host,
8889
@Nullable Map<String, String> options, @Nullable CharSequence password, int port, @Nullable String schema, @Nullable String socket, String username,
8990
SSLConfig sslConfig) {
9091
this.applicationName = Assert.requireNonNull(applicationName, "applicationName must not be null");
@@ -166,7 +167,7 @@ ToIntFunction<String> getFetchSize() {
166167
}
167168

168169
int getFetchSize(String sql) {
169-
return this.fetchSize != null ? this.fetchSize.applyAsInt(sql) : NO_LIMIT;
170+
return this.fetchSize.applyAsInt(sql);
170171
}
171172

172173
@Nullable
@@ -259,8 +260,7 @@ public static final class Builder {
259260

260261
private List<Extension> extensions = new ArrayList<>();
261262

262-
@Nullable
263-
private ToIntFunction<String> fetchSize;
263+
private ToIntFunction<String> fetchSize = sql -> NO_LIMIT;
264264

265265
private boolean forceBinary = false;
266266

@@ -346,7 +346,8 @@ public PostgresqlConnectionConfiguration build() {
346346
throw new IllegalArgumentException("username must not be null");
347347
}
348348

349-
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary, this.host,
349+
return new PostgresqlConnectionConfiguration(this.applicationName, this.autodetectExtensions, this.connectTimeout, this.database, this.extensions, this.fetchSize, this.forceBinary,
350+
this.host,
350351
this.options, this.password, this.port, this.schema, this.socket, this.username, this.createSslConfig());
351352
}
352353

@@ -403,23 +404,25 @@ public Builder extendWith(Extension extension) {
403404
}
404405

405406
/**
406-
* Set the default number of rows to return when fetching results from a query instead deriving fetch size from
407-
* back pressure. If the value specified is zero, then the hint is ignored.
407+
* Set the default number of rows to return when fetching results from a query. If the value specified is zero, then the hint is ignored and queries request all rows when running a statement.
408408
*
409409
* @param fetchSize the number of rows to fetch
410410
* @return this {@code Builder}
411+
* @throws IllegalArgumentException if {@code fetchSize} is negative
412+
* @since 0.8.2
411413
*/
412414
public Builder fetchSize(int fetchSize) {
413415
Assert.isTrue(fetchSize >= 0, "fetch size must be greater or equal zero");
414-
this.fetchSize = sql -> fetchSize;
415-
return this;
416+
return fetchSize(new FixedFetchSize(fetchSize));
416417
}
417418

418419
/**
419420
* Set a function that maps a SQL query to the number of rows to return when fetching results for that query.
420421
*
421422
* @param fetchSizeFunction a function that maps the number of rows to fetch
422423
* @return this {@code Builder}
424+
* @throws IllegalArgumentException if {@code fetchSizeFunction} is {@code null}
425+
* @since 0.8.2
423426
*/
424427
public Builder fetchSize(ToIntFunction<String> fetchSizeFunction) {
425428
Assert.requireNonNull(fetchSizeFunction, "fetch size function must be non null");
@@ -596,6 +599,7 @@ public String toString() {
596599
", connectTimeout='" + this.connectTimeout + '\'' +
597600
", database='" + this.database + '\'' +
598601
", extensions='" + this.extensions + '\'' +
602+
", fetchSize='" + this.fetchSize + '\'' +
599603
", forceBinary='" + this.forceBinary + '\'' +
600604
", host='" + this.host + '\'' +
601605
", parameters='" + this.options + '\'' +
@@ -694,4 +698,23 @@ private Supplier<SslProvider> createSslProvider() {
694698
.build();
695699
}
696700
}
701+
702+
static class FixedFetchSize implements ToIntFunction<String> {
703+
704+
private final int fetchSize;
705+
706+
public FixedFetchSize(int fetchSize) {
707+
this.fetchSize = fetchSize;
708+
}
709+
710+
@Override
711+
public int applyAsInt(String value) {
712+
return this.fetchSize;
713+
}
714+
715+
@Override
716+
public String toString() {
717+
return "" + this.fetchSize;
718+
}
719+
}
697720
}

src/main/java/io/r2dbc/postgresql/PostgresqlConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ private Mono<PostgresqlConnection> doCreateConnection(boolean forReplication, @N
176176
}
177177
return isolationLevelMono
178178
// actual connection to be used
179-
.map(isolationLevel -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, new IndefiniteStatementCache(client), isolationLevel, this.configuration))
179+
.map(isolationLevel -> new PostgresqlConnection(client, codecs, DefaultPortalNameSupplier.INSTANCE, statementCache, isolationLevel, this.configuration))
180180
.delayUntil(connection -> {
181181
return prepareConnection(connection, client.getByteBufAllocator(), codecs);
182182
})

src/main/java/io/r2dbc/postgresql/SimpleQueryPostgresqlStatement.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,16 @@ final class SimpleQueryPostgresqlStatement implements PostgresqlStatement {
5151

5252
private String[] generatedColumns;
5353

54-
private int fetchSize = NO_LIMIT;
54+
private int fetchSize;
5555

5656
SimpleQueryPostgresqlStatement(ConnectionContext context, String sql) {
5757
this.context = Assert.requireNonNull(context, "context must not be null");
5858
this.sql = Assert.requireNonNull(sql, "sql must not be null");
59-
fetchSize(this.context.getConfiguration().getFetchSize(sql));
59+
fetchSize(isBatch() ? NO_LIMIT : this.context.getConfiguration().getFetchSize(sql));
60+
}
61+
62+
private boolean isBatch() {
63+
return this.sql.contains(";");
6064
}
6165

6266
@Override
@@ -96,8 +100,9 @@ public Flux<io.r2dbc.postgresql.api.PostgresqlResult> execute() {
96100
@Override
97101
public SimpleQueryPostgresqlStatement fetchSize(int rows) {
98102
Assert.isTrue(rows >= 0, "Fetch size must be greater or equal zero");
103+
99104
if (rows != NO_LIMIT) {
100-
Assert.isTrue(!this.sql.contains(";"), "Fetch size can only be used with a single SQL statement");
105+
Assert.isTrue(!isBatch(), "Fetch size can only be used with a single SQL statement");
101106
this.fetchSize = rows;
102107
}
103108

src/test/java/io/r2dbc/postgresql/FetchSizeIntegrationTests.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616

1717
package io.r2dbc.postgresql;
1818

19-
import org.junit.jupiter.api.BeforeEach;
19+
import io.r2dbc.spi.Statement;
2020
import org.junit.jupiter.api.Test;
2121
import reactor.test.StepVerifier;
2222

23-
public class FetchSizeIntegrationTests extends AbstractIntegrationTests {
24-
25-
@Override
26-
@BeforeEach
27-
void setUp() {
28-
super.setUp();
29-
System.gc();
30-
}
23+
/**
24+
* Integration tests for {@link Statement#fetchSize(int)}.
25+
*/
26+
final class FetchSizeIntegrationTests extends AbstractIntegrationTests {
3127

3228
@Override
3329
protected void customize(PostgresqlConnectionConfiguration.Builder builder) {
@@ -37,12 +33,12 @@ protected void customize(PostgresqlConnectionConfiguration.Builder builder) {
3733
@Test
3834
void exchangeWithDefaultFetchSize() {
3935
this.connection.createStatement("SELECT * FROM generate_series(1,20) WHERE $1 = $1")
40-
.bind(0, 1)
41-
.execute()
42-
.flatMap(r -> r.map((row, meta) -> row.get(0, Integer.class)))
43-
.as(StepVerifier::create)
44-
.expectNextCount(20)
45-
.verifyComplete();
36+
.bind(0, 1)
37+
.execute()
38+
.flatMap(r -> r.map((row, meta) -> row.get(0, Integer.class)))
39+
.as(StepVerifier::create)
40+
.expectNextCount(20)
41+
.verifyComplete();
4642
}
4743

4844
@Test

0 commit comments

Comments
 (0)