@@ -138,7 +138,7 @@ public class CassandraStatement extends AbstractStatement
138
138
* The consistency level used for the statement.
139
139
*/
140
140
protected ConsistencyLevel consistencyLevel ;
141
-
141
+ private boolean isClosed ;
142
142
private DriverExecutionProfile customTimeoutProfile ;
143
143
144
144
/**
@@ -211,6 +211,7 @@ public class CassandraStatement extends AbstractStatement
211
211
this .cql = cql ;
212
212
this .batchQueries = new ArrayList <>();
213
213
this .consistencyLevel = connection .getDefaultConsistencyLevel ();
214
+ this .isClosed = false ;
214
215
215
216
if (!(resultSetType == ResultSet .TYPE_FORWARD_ONLY
216
217
|| resultSetType == ResultSet .TYPE_SCROLL_INSENSITIVE
@@ -265,7 +266,7 @@ public void clearWarnings() throws SQLException {
265
266
266
267
@ Override
267
268
public void close () {
268
- this .connection = null ;
269
+ this .isClosed = true ;
269
270
this .cql = null ;
270
271
}
271
272
@@ -285,67 +286,68 @@ private void doExecute(final String cql) throws SQLException {
285
286
286
287
try {
287
288
final String [] cqlQueries = cql .split (STATEMENTS_SEPARATOR_REGEX );
288
- if (cqlQueries .length > 1 && !(cql .trim ().toLowerCase ().startsWith ("begin" )
289
+ if (cqlQueries .length > 1
290
+ && !(cql .trim ().toLowerCase ().startsWith ("begin" )
289
291
&& cql .toLowerCase ().contains ("batch" ) && cql .toLowerCase ().contains ("apply" ))) {
290
- // Several statements in the query to execute asynchronously...
291
-
292
292
final ArrayList <com .datastax .oss .driver .api .core .cql .ResultSet > results = new ArrayList <>();
293
+
294
+ // Several statements in the query to execute asynchronously...
293
295
if (cqlQueries .length > MAX_ASYNC_QUERIES * 1.1 ) {
294
296
// Protect the cluster from receiving too many queries at once and force the dev to split the load
295
297
throw new SQLNonTransientException ("Too many queries at once (" + cqlQueries .length
296
298
+ "). You must split your queries into more batches !" );
297
299
}
298
300
299
- StringBuilder prevCqlQuery = new StringBuilder ();
300
- for (final String cqlQuery : cqlQueries ) {
301
- if ((cqlQuery .contains ("'" ) && ((StringUtils .countMatches (cqlQuery , "'" ) % 2 == 1
302
- && prevCqlQuery .length () == 0 )
303
- || (StringUtils .countMatches (cqlQuery , "'" ) % 2 == 0 && prevCqlQuery .length () > 0 )))
304
- || (prevCqlQuery .toString ().length () > 0 && !cqlQuery .contains ("'" ))) {
305
- prevCqlQuery .append (cqlQuery ).append (";" );
306
- } else {
307
- prevCqlQuery .append (cqlQuery );
308
- if (LOG .isTraceEnabled () || this .connection .isDebugMode ()) {
309
- LOG .debug ("CQL: {}" , prevCqlQuery );
310
- }
311
- SimpleStatement stmt = SimpleStatement .newInstance (prevCqlQuery .toString ())
312
- .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
313
- .setPageSize (this .fetchSize );
314
- if (this .customTimeoutProfile != null ) {
315
- stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
301
+ // If we should not execute the queries asynchronously, for example if they must be executed in the
302
+ // specified order (e.g. in Liquibase scripts with queries such as CREATE TABLE t, then
303
+ // INSERT INTO t ...).
304
+ if (!this .connection .getOptionSet ().executeMultipleQueriesByStatementAsync ()) {
305
+ for (final String cqlQuery : cqlQueries ) {
306
+ final com .datastax .oss .driver .api .core .cql .ResultSet rs = executeSingleStatement (cqlQuery );
307
+ results .add (rs );
308
+ }
309
+ } else {
310
+ StringBuilder prevCqlQuery = new StringBuilder ();
311
+ for (final String cqlQuery : cqlQueries ) {
312
+ if ((cqlQuery .contains ("'" ) && ((StringUtils .countMatches (cqlQuery , "'" ) % 2 == 1
313
+ && prevCqlQuery .length () == 0 )
314
+ || (StringUtils .countMatches (cqlQuery , "'" ) % 2 == 0 && prevCqlQuery .length () > 0 )))
315
+ || (!prevCqlQuery .toString ().isEmpty () && !cqlQuery .contains ("'" ))) {
316
+ prevCqlQuery .append (cqlQuery ).append (";" );
317
+ } else {
318
+ prevCqlQuery .append (cqlQuery );
319
+ if (LOG .isTraceEnabled () || this .connection .isDebugMode ()) {
320
+ LOG .debug ("CQL: {}" , prevCqlQuery );
321
+ }
322
+ SimpleStatement stmt = SimpleStatement .newInstance (prevCqlQuery .toString ())
323
+ .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
324
+ .setPageSize (this .fetchSize );
325
+ if (this .customTimeoutProfile != null ) {
326
+ stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
327
+ }
328
+ final CompletionStage <AsyncResultSet > resultSetFuture =
329
+ ((CqlSession ) this .connection .getSession ()).executeAsync (stmt );
330
+ futures .add (resultSetFuture );
331
+ prevCqlQuery = new StringBuilder ();
316
332
}
317
- final CompletionStage <AsyncResultSet > resultSetFuture =
318
- ((CqlSession ) this .connection .getSession ()).executeAsync (stmt );
319
- futures .add (resultSetFuture );
320
- prevCqlQuery = new StringBuilder ();
321
333
}
322
- }
323
334
324
- for (final CompletionStage <AsyncResultSet > future : futures ) {
325
- final AsyncResultSet asyncResultSet = CompletableFutures .getUninterruptibly (future );
326
- final com .datastax .oss .driver .api .core .cql .ResultSet rows ;
327
- if (asyncResultSet .hasMorePages ()) {
328
- rows = new MultiPageResultSet (asyncResultSet );
329
- } else {
330
- rows = new SinglePageResultSet (asyncResultSet );
335
+ for (final CompletionStage <AsyncResultSet > future : futures ) {
336
+ final AsyncResultSet asyncResultSet = CompletableFutures .getUninterruptibly (future );
337
+ final com .datastax .oss .driver .api .core .cql .ResultSet rows ;
338
+ if (asyncResultSet .hasMorePages ()) {
339
+ rows = new MultiPageResultSet (asyncResultSet );
340
+ } else {
341
+ rows = new SinglePageResultSet (asyncResultSet );
342
+ }
343
+ results .add (rows );
331
344
}
332
- results .add (rows );
333
345
}
334
346
335
347
this .currentResultSet = new CassandraResultSet (this , results );
336
348
} else {
337
349
// Only one statement to execute, so do it synchronously.
338
- if (LOG .isTraceEnabled () || this .connection .isDebugMode ()) {
339
- LOG .debug ("CQL: " + cql );
340
- }
341
- SimpleStatement stmt = SimpleStatement .newInstance (cql )
342
- .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
343
- .setPageSize (this .fetchSize );
344
- if (this .customTimeoutProfile != null ) {
345
- stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
346
- }
347
- this .currentResultSet = new CassandraResultSet (this ,
348
- ((CqlSession ) this .connection .getSession ()).execute (stmt ));
350
+ this .currentResultSet = new CassandraResultSet (this , executeSingleStatement (cql ));
349
351
}
350
352
} catch (final Exception e ) {
351
353
for (final CompletionStage <AsyncResultSet > future : futures ) {
@@ -355,6 +357,19 @@ private void doExecute(final String cql) throws SQLException {
355
357
}
356
358
}
357
359
360
+ private com .datastax .oss .driver .api .core .cql .ResultSet executeSingleStatement (final String cql ) {
361
+ if (LOG .isTraceEnabled () || this .connection .isDebugMode ()) {
362
+ LOG .debug ("CQL: " + cql );
363
+ }
364
+ SimpleStatement stmt = SimpleStatement .newInstance (cql )
365
+ .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
366
+ .setPageSize (this .fetchSize );
367
+ if (this .customTimeoutProfile != null ) {
368
+ stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
369
+ }
370
+ return ((CqlSession ) this .connection .getSession ()).execute (stmt );
371
+ }
372
+
358
373
@ Override
359
374
public boolean execute (final String query ) throws SQLException {
360
375
checkNotClosed ();
@@ -658,7 +673,7 @@ public SQLWarning getWarnings() throws SQLException {
658
673
659
674
@ Override
660
675
public boolean isClosed () {
661
- return this .connection == null ;
676
+ return this .isClosed ;
662
677
}
663
678
664
679
/**
0 commit comments