24
24
import com .datastax .oss .driver .internal .core .cql .MultiPageResultSet ;
25
25
import com .datastax .oss .driver .internal .core .cql .SinglePageResultSet ;
26
26
import com .datastax .oss .driver .internal .core .util .concurrent .CompletableFutures ;
27
- import org .apache .commons .lang3 .StringUtils ;
28
27
import org .slf4j .Logger ;
29
28
import org .slf4j .LoggerFactory ;
30
29
58
57
import static com .ing .data .cassandra .jdbc .utils .ErrorConstants .NO_GEN_KEYS ;
59
58
import static com .ing .data .cassandra .jdbc .utils .ErrorConstants .NO_MULTIPLE ;
60
59
import static com .ing .data .cassandra .jdbc .utils .ErrorConstants .NO_RESULT_SET ;
60
+ import static com .ing .data .cassandra .jdbc .utils .ErrorConstants .TOO_MANY_QUERIES ;
61
61
import static com .ing .data .cassandra .jdbc .utils .ErrorConstants .WAS_CLOSED_STMT ;
62
+ import static org .apache .commons .lang3 .StringUtils .countMatches ;
62
63
63
64
/**
64
65
* Cassandra statement: implementation class for {@link Statement}.
@@ -292,21 +293,50 @@ public int compareTo(@Nonnull final Object target) {
292
293
return 1 ;
293
294
}
294
295
296
+ private List <String > splitStatements (final String cql ) {
297
+ final String [] cqlQueries = cql .split (STATEMENTS_SEPARATOR_REGEX );
298
+ final List <String > cqlQueriesToExecute = new ArrayList <>(cqlQueries .length );
299
+
300
+ // If a query contains a semicolon character in a string value (for example 'abc;xyz'), re-merge queries
301
+ // wrongly split.
302
+ final String singleQuote = "'" ;
303
+ StringBuilder prevCqlQuery = new StringBuilder ();
304
+ for (final String cqlQuery : cqlQueries ) {
305
+ final boolean hasStringValues = cqlQuery .contains (singleQuote );
306
+ final boolean isFirstQueryPartWithIncompleteStringValue =
307
+ countMatches (cqlQuery , singleQuote ) % 2 == 1 && prevCqlQuery .length () == 0 ;
308
+ final boolean isNotFirstQueryPartWithCompleteStringValue =
309
+ countMatches (cqlQuery , singleQuote ) % 2 == 0 && prevCqlQuery .length () > 0 ;
310
+ final boolean isNotFirstQueryPartWithoutStringValue =
311
+ !prevCqlQuery .toString ().isEmpty () && !cqlQuery .contains (singleQuote );
312
+
313
+ if ((hasStringValues && (isFirstQueryPartWithIncompleteStringValue
314
+ || isNotFirstQueryPartWithCompleteStringValue )) || isNotFirstQueryPartWithoutStringValue ) {
315
+ prevCqlQuery .append (cqlQuery ).append (";" );
316
+ } else {
317
+ prevCqlQuery .append (cqlQuery );
318
+ cqlQueriesToExecute .add (prevCqlQuery .toString ());
319
+ prevCqlQuery = new StringBuilder ();
320
+ }
321
+ }
322
+ return cqlQueriesToExecute ;
323
+ }
324
+
295
325
private void doExecute (final String cql ) throws SQLException {
296
326
final List <CompletionStage <AsyncResultSet >> futures = new ArrayList <>();
297
327
298
328
try {
299
- final String [] cqlQueries = cql .split (STATEMENTS_SEPARATOR_REGEX );
300
- if (cqlQueries .length > 1
329
+ final List <String > cqlQueries = splitStatements (cql );
330
+ final int nbQueriesToExecute = cqlQueries .size ();
331
+ if (nbQueriesToExecute > 1
301
332
&& !(cql .trim ().toLowerCase ().startsWith ("begin" )
302
333
&& cql .toLowerCase ().contains ("batch" ) && cql .toLowerCase ().contains ("apply" ))) {
303
334
final ArrayList <com .datastax .oss .driver .api .core .cql .ResultSet > results = new ArrayList <>();
304
335
305
- // Several statements in the query to execute asynchronously...
306
- if (cqlQueries . length > MAX_ASYNC_QUERIES * 1.1 ) {
336
+ // Several statements in the query to execute potentially asynchronously...
337
+ if (nbQueriesToExecute > MAX_ASYNC_QUERIES * 1.1 ) {
307
338
// Protect the cluster from receiving too many queries at once and force the dev to split the load
308
- throw new SQLNonTransientException ("Too many queries at once (" + cqlQueries .length
309
- + "). You must split your queries into more batches !" );
339
+ throw new SQLNonTransientException (String .format (TOO_MANY_QUERIES , nbQueriesToExecute ));
310
340
}
311
341
312
342
// If we should not execute the queries asynchronously, for example if they must be executed in the
@@ -318,29 +348,19 @@ private void doExecute(final String cql) throws SQLException {
318
348
results .add (rs );
319
349
}
320
350
} else {
321
- StringBuilder prevCqlQuery = new StringBuilder ();
322
351
for (final String cqlQuery : cqlQueries ) {
323
- if ((cqlQuery .contains ("'" ) && ((StringUtils .countMatches (cqlQuery , "'" ) % 2 == 1
324
- && prevCqlQuery .length () == 0 )
325
- || (StringUtils .countMatches (cqlQuery , "'" ) % 2 == 0 && prevCqlQuery .length () > 0 )))
326
- || (!prevCqlQuery .toString ().isEmpty () && !cqlQuery .contains ("'" ))) {
327
- prevCqlQuery .append (cqlQuery ).append (";" );
328
- } else {
329
- prevCqlQuery .append (cqlQuery );
330
- if (LOG .isTraceEnabled () || this .connection .isDebugMode ()) {
331
- LOG .debug ("CQL: {}" , prevCqlQuery );
332
- }
333
- SimpleStatement stmt = SimpleStatement .newInstance (prevCqlQuery .toString ())
334
- .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
335
- .setPageSize (this .fetchSize );
336
- if (this .customTimeoutProfile != null ) {
337
- stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
338
- }
339
- final CompletionStage <AsyncResultSet > resultSetFuture =
340
- ((CqlSession ) this .connection .getSession ()).executeAsync (stmt );
341
- futures .add (resultSetFuture );
342
- prevCqlQuery = new StringBuilder ();
352
+ if (LOG .isDebugEnabled () || this .connection .isDebugMode ()) {
353
+ LOG .debug ("CQL: {}" , cqlQuery );
354
+ }
355
+ SimpleStatement stmt = SimpleStatement .newInstance (cqlQuery )
356
+ .setConsistencyLevel (this .connection .getDefaultConsistencyLevel ())
357
+ .setPageSize (this .fetchSize );
358
+ if (this .customTimeoutProfile != null ) {
359
+ stmt = stmt .setExecutionProfile (this .customTimeoutProfile );
343
360
}
361
+ final CompletionStage <AsyncResultSet > resultSetFuture =
362
+ ((CqlSession ) this .connection .getSession ()).executeAsync (stmt );
363
+ futures .add (resultSetFuture );
344
364
}
345
365
346
366
for (final CompletionStage <AsyncResultSet > future : futures ) {
0 commit comments