Skip to content

Commit 0f55b0f

Browse files
committed
Add result set for COPY commands
and add additional tests for COPY FROM command
1 parent 82a0d79 commit 0f55b0f

File tree

5 files changed

+176
-16
lines changed

5 files changed

+176
-16
lines changed

Diff for: src/main/java/com/ing/data/cassandra/jdbc/commands/AbstractCopyCommandExecutor.java

+41
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,27 @@
1717

1818
import com.datastax.oss.driver.api.core.cql.ResultSet;
1919
import com.ing.data.cassandra.jdbc.CassandraStatement;
20+
import com.ing.data.cassandra.jdbc.ColumnDefinitions;
2021
import org.apache.commons.lang3.StringUtils;
2122

23+
import java.nio.ByteBuffer;
2224
import java.sql.SQLException;
2325
import java.sql.SQLSyntaxErrorException;
2426
import java.text.DecimalFormat;
2527
import java.text.DecimalFormatSymbols;
2628
import java.text.NumberFormat;
29+
import java.util.Collections;
2730
import java.util.HashSet;
2831
import java.util.Locale;
2932
import java.util.Properties;
3033
import java.util.Set;
3134

35+
import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
3236
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.LOG;
37+
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.buildSpecialCommandResultSet;
38+
import static com.ing.data.cassandra.jdbc.utils.ByteBufferUtil.bytes;
3339
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.UNSUPPORTED_COPY_OPTIONS;
40+
import static org.apache.commons.lang3.StringUtils.EMPTY;
3441

3542
/**
3643
* Executor abstraction for common parts of the special commands {@code COPY}.
@@ -135,4 +142,38 @@ int getOptionValueAsInt(final String optionName, final int defaultValue) {
135142
return defaultValue;
136143
}
137144

145+
/**
146+
* Generates a result set for the {@code COPY} command.
147+
*
148+
* @param actionVerb The action verb (imported from/exported to) used in the result set depending on the
149+
* executed command.
150+
* @param processedRows The total number of imported or exported rows.
151+
* @param executedBatches The number of batches executed.
152+
* @param skippedRows The total number of skipped rows. The information about the skipped rows is only
153+
* displayed if this number is greater or equal to 0.
154+
* @return The result for the {@code COPY} command.
155+
* @implNote The result set is a single row with a string value in a column {@code result} following the model of
156+
* the result representation described in the examples presented in the
157+
* <a href="https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlshCopy.html#Examples">cqlsh
158+
* documentation</a>.
159+
*/
160+
ResultSet buildCopyCommandResultSet(final String actionVerb, final long processedRows, final int executedBatches,
161+
final int skippedRows) {
162+
String skippedRowsInfo = EMPTY;
163+
if (skippedRows >= 0) {
164+
skippedRowsInfo = String.format(" (%d skipped)", skippedRows);
165+
}
166+
final String result = String.format("%d row(s) %s 1 file in %d batch(es)%s.",
167+
processedRows, actionVerb, executedBatches, skippedRowsInfo);
168+
final ByteBuffer resultAsBytes = bytes(result);
169+
return buildSpecialCommandResultSet(
170+
new ColumnDefinitions.Definition[]{
171+
ColumnDefinitions.Definition.buildDefinitionInAnonymousTable("result", TEXT)
172+
},
173+
Collections.singletonList(
174+
Collections.singletonList(resultAsBytes)
175+
)
176+
);
177+
}
178+
138179
}

Diff for: src/main/java/com/ing/data/cassandra/jdbc/commands/CopyFromCommandExecutor.java

+14-3
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import java.util.stream.Collectors;
4747

4848
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.LOG;
49-
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.buildEmptyResultSet;
5049
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.translateFilename;
5150
import static com.ing.data.cassandra.jdbc.utils.DriverUtil.COMMA;
5251
import static com.ing.data.cassandra.jdbc.utils.DriverUtil.SINGLE_QUOTE;
@@ -119,6 +118,10 @@
119118
* Using unknown options will throw a {@link SQLSyntaxErrorException}.
120119
* </p>
121120
* <p>
121+
* A successful command execution will return a result set with a single row containing some information about the
122+
* import process in a column {@code result}.
123+
* </p>
124+
* <p>
122125
* The documentation of the original {@code COPY FROM} command is available:
123126
* <ul>
124127
* <li><a href="https://cassandra.apache.org/doc/latest/cassandra/managing/tools/cqlsh.html#copy-from">
@@ -200,9 +203,14 @@ public ResultSet execute(final CassandraStatement statement, final String cql) t
200203
final int batchSize = getOptionValueAsInt(OPTION_BATCHSIZE, DEFAULT_BATCH_SIZE);
201204

202205
CSVReader csvReader = null;
206+
207+
int executedBatches = 0;
208+
final int importedRows;
209+
int skippedRows = getOptionValueAsInt(OPTION_SKIPROWS, DEFAULT_SKIPPED_ROWS);
210+
203211
try {
204212
csvReader = new CSVReaderBuilder(new FileReader(translateFilename(this.origin)))
205-
.withSkipLines(getOptionValueAsInt(OPTION_SKIPROWS, DEFAULT_SKIPPED_ROWS))
213+
.withSkipLines(skippedRows)
206214
.withCSVParser(configureCsvParser())
207215
.build();
208216
final List<String[]> allRows = csvReader.readAll();
@@ -225,8 +233,11 @@ public ResultSet execute(final CassandraStatement statement, final String cql) t
225233
final int rowsInBatch = i - startRow + 1;
226234
if (rowsInBatch % batchSize == 0 || rowsInBatch == maxRows - startRow) {
227235
insertStatement.executeBatch();
236+
executedBatches++;
228237
}
229238
}
239+
importedRows = maxRows - startRow;
240+
skippedRows += allRows.size() - importedRows - startRow;
230241
} catch (final FileNotFoundException e) {
231242
throw new SQLException(String.format(CSV_FILE_NOT_FOUND, this.origin), e);
232243
} catch (final IOException | CsvException e) {
@@ -236,7 +247,7 @@ public ResultSet execute(final CassandraStatement statement, final String cql) t
236247
IOUtils.closeQuietly(csvReader);
237248
}
238249

239-
return buildEmptyResultSet();
250+
return buildCopyCommandResultSet("imported from", importedRows, executedBatches, skippedRows);
240251
}
241252

242253
private ICSVParser configureCsvParser() {

Diff for: src/main/java/com/ing/data/cassandra/jdbc/commands/CopyToCommandExecutor.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.io.OutputStreamWriter;
3030
import java.nio.charset.StandardCharsets;
3131
import java.nio.file.Files;
32+
import java.nio.file.Path;
3233
import java.nio.file.Paths;
3334
import java.sql.ResultSetMetaData;
3435
import java.sql.SQLException;
@@ -38,8 +39,9 @@
3839
import java.text.NumberFormat;
3940
import java.util.Objects;
4041
import java.util.Properties;
42+
import java.util.stream.Stream;
4143

42-
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.buildEmptyResultSet;
44+
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.LOG;
4345
import static com.ing.data.cassandra.jdbc.commands.SpecialCommandsUtil.translateFilename;
4446
import static com.ing.data.cassandra.jdbc.utils.ErrorConstants.CANNOT_WRITE_CSV_FILE;
4547
import static org.apache.commons.lang3.StringUtils.EMPTY;
@@ -99,6 +101,10 @@
99101
* Using unknown options will throw a {@link SQLSyntaxErrorException}.
100102
* </p>
101103
* <p>
104+
* A successful command execution will return a result set with a single row containing some information about the
105+
* export process in a column {@code result}.
106+
* </p>
107+
* <p>
102108
* The documentation of the original {@code COPY TO} command is available:
103109
* <ul>
104110
* <li><a href="https://cassandra.apache.org/doc/latest/cassandra/managing/tools/cqlsh.html#copy-to">
@@ -159,11 +165,12 @@ public ResultSet execute(final CassandraStatement statement, final String cql) t
159165
String.format("SELECT %s FROM %s", this.columns, this.tableName)
160166
);
161167

168+
final Path targetPath = Paths.get(translateFilename(this.target));
162169
ICSVWriter csvWriter = null;
163170
try {
164171
final CSVWriterBuilder builder = new CSVWriterBuilder(
165172
new OutputStreamWriter(
166-
Files.newOutputStream(Paths.get(translateFilename(this.target))),
173+
Files.newOutputStream(targetPath),
167174
StandardCharsets.UTF_8.newEncoder() // Use UTF-8 encoding by default
168175
)
169176
);
@@ -185,7 +192,13 @@ public ResultSet execute(final CassandraStatement statement, final String cql) t
185192
IOUtils.closeQuietly(csvWriter);
186193
}
187194

188-
return buildEmptyResultSet();
195+
long exportedRows = -1;
196+
try (Stream<String> csvLines = Files.lines(targetPath)) {
197+
exportedRows = csvLines.count();
198+
} catch (final IOException e) {
199+
LOG.warn("Failed to read exported CSV file to count exportedRows.");
200+
}
201+
return buildCopyCommandResultSet("exported to", exportedRows, 1, -1);
189202
}
190203

191204
private ResultSetHelperService configureResultSetHelperService() {

Diff for: src/test/java/com/ing/data/cassandra/jdbc/commands/CopyFromCommandTest.java

+78-10
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,24 @@
2626

2727
import java.math.BigDecimal;
2828
import java.net.URL;
29+
import java.sql.ResultSet;
2930
import java.sql.SQLException;
3031
import java.sql.Statement;
32+
import java.util.Optional;
3133
import java.util.stream.Stream;
3234

3335
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.COPY_CMD_TEST_KEYSPACE;
3436
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.COPY_CMD_TEST_PARTIAL_TABLE;
3537
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.COPY_CMD_TEST_PARTIAL_TABLE_NAME;
3638
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.COPY_CMD_TEST_TABLE;
3739
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.COPY_CMD_TEST_TABLE_NAME;
40+
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.assertCommandResultSet;
3841
import static com.ing.data.cassandra.jdbc.utils.CopyCommandsTestUtils.assertRowValues;
3942
import static org.apache.commons.lang3.StringUtils.EMPTY;
43+
import static org.hamcrest.MatcherAssert.assertThat;
44+
import static org.hamcrest.Matchers.containsString;
4045
import static org.junit.jupiter.api.Assertions.assertNotNull;
46+
import static org.junit.jupiter.api.Assertions.assertThrows;
4147
import static org.junit.jupiter.api.Assertions.fail;
4248

4349
public class CopyFromCommandTest extends UsingCassandraContainerTest {
@@ -49,23 +55,35 @@ static void finalizeSetUpTests() throws Exception {
4955
initConnection(COPY_CMD_TEST_KEYSPACE, "localdatacenter=datacenter1");
5056
}
5157

52-
static String getTestOriginPath(final String csvFile) {
58+
static String getTestOriginPath(final String csvFile, final boolean allowInvalidFile) {
5359
final URL cqlScriptResourceUrl =
5460
CopyFromCommandTest.class.getClassLoader().getResource("copyFromTests/" + csvFile);
5561
if (cqlScriptResourceUrl == null) {
62+
if (allowInvalidFile) {
63+
return csvFile;
64+
}
5665
fail("Could not find the CSV script to import in 'copyFromTests' directory: " + csvFile);
5766
}
5867
return cqlScriptResourceUrl.getPath();
5968
}
6069

61-
private void executeCopyFromCommand(final String targetTable, final String csvSourceFile,
62-
final String options) throws SQLException {
70+
private ResultSet executeCopyFromCommand(final String targetTable, final String csvSourceFile,
71+
final String options) throws SQLException {
72+
return executeCopyFromCommand(targetTable, csvSourceFile, options, false);
73+
}
74+
75+
private ResultSet executeCopyFromCommand(final String targetTable, final String csvSourceFile,
76+
final String options, final boolean allowInvalidFile) throws SQLException {
6377
assertNotNull(sqlConnection);
78+
// First, truncate target table to ensure the data of any test previously executed is removed.
6479
final Statement truncateTableStmt = sqlConnection.createStatement();
6580
truncateTableStmt.execute(String.format("TRUNCATE TABLE %s", targetTable));
6681
truncateTableStmt.close();
67-
sqlConnection.createStatement().execute(String.format("COPY %s FROM '%s' %s", targetTable,
68-
getTestOriginPath(csvSourceFile), options));
82+
83+
final Statement copyCmdStmt = sqlConnection.createStatement();
84+
copyCmdStmt.execute(String.format("COPY %s FROM '%s' %s", targetTable,
85+
getTestOriginPath(csvSourceFile, allowInvalidFile), Optional.ofNullable(options).orElse(EMPTY)));
86+
return copyCmdStmt.getResultSet();
6987
}
7088

7189
static Stream<Arguments> buildCopyFromCommandVariableParameters() {
@@ -82,41 +100,91 @@ static Stream<Arguments> buildCopyFromCommandVariableParameters() {
82100
void givenTableAndOriginFile_whenExecuteCopyFromCommand_executeExpectedStatements(final String csvFile,
83101
final String options)
84102
throws SQLException {
85-
executeCopyFromCommand(COPY_CMD_TEST_TABLE_NAME, csvFile, options);
103+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_TABLE_NAME, csvFile, options);
104+
assertCommandResultSet(resultSet, true, 2, 1, 0);
86105
assertRowValues(sqlConnection, COPY_CMD_TEST_TABLE, "key1", true, new BigDecimal("654000.7"));
87106
assertRowValues(sqlConnection, COPY_CMD_TEST_TABLE, "key2", false, new BigDecimal("321000.8"));
88107
}
89108

90109
@Test
91110
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithSkipRows_executeExpectedStatements()
92111
throws SQLException {
93-
executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv", "WITH SKIPROWS=2");
112+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME,
113+
"test_partial_import.csv", "WITH SKIPROWS=2");
114+
assertCommandResultSet(resultSet, true, 2, 1, 2);
94115
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key3", 3, "N/A");
95116
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key4", 4, "test4");
96117
}
97118

98119
@Test
99120
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithMaxRows_executeExpectedStatements()
100121
throws SQLException {
101-
executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv", "WITH MAXROWS=1");
122+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME,
123+
"test_partial_import.csv", "WITH MAXROWS=1");
124+
assertCommandResultSet(resultSet, true, 1, 1, 3);
102125
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key1", 1, "test1");
103126
}
104127

105128
@Test
106129
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithPartialImportOptions_executeExpectedStatements()
107130
throws SQLException {
108-
executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv",
131+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv",
109132
"WITH MAXROWS=2 AND SKIPROWS=1 AND SKIPCOLS=str_val");
133+
assertCommandResultSet(resultSet, true, 2, 1, 2);
110134
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key2", 2, null);
111135
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key3", 3, null);
112136
}
113137

114138
@Test
115139
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithNullValueOption_executeExpectedStatements()
116140
throws SQLException {
117-
executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv", "WITH NULLVAL=N/A");
141+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME,
142+
"test_partial_import.csv", "WITH NULLVAL=N/A");
143+
assertCommandResultSet(resultSet, true, 4, 1, 0);
118144
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key1", 1, "test1");
119145
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key3", 3, null);
120146
}
121147

148+
@Test
149+
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithUnsupportedOptions_throwException() {
150+
final SQLException sqlException = assertThrows(SQLException.class, () ->
151+
executeCopyFromCommand(COPY_CMD_TEST_TABLE_NAME, "test_simple.csv", "WITH BADOPTION=1"));
152+
assertThat(sqlException.getMessage(),
153+
containsString("Command COPY used with unknown or unsupported options: [BADOPTION]"));
154+
}
155+
156+
@Test
157+
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithInvalidIntOption_useDefaultOptionValue()
158+
throws SQLException {
159+
final ResultSet resultSet = executeCopyFromCommand(
160+
COPY_CMD_TEST_KEYSPACE + "." + COPY_CMD_TEST_PARTIAL_TABLE_NAME, "test_partial_import.csv",
161+
"WITH SKIPROWS=a");
162+
assertCommandResultSet(resultSet, true, 4, 1, 0);
163+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key1", 1, "test1");
164+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key2", 2, "test2");
165+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key3", 3, "N/A");
166+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key4", 4, "test4");
167+
}
168+
169+
@Test
170+
void givenNonExistingFile_whenExecuteCopyFromCommand_throwException() {
171+
final SQLException sqlException = assertThrows(SQLException.class, () ->
172+
executeCopyFromCommand(COPY_CMD_TEST_TABLE_NAME, "bad_test_file.csv", EMPTY, true));
173+
assertThat(sqlException.getMessage(),
174+
containsString("Could not find CSV file to import 'bad_test_file.csv'"));
175+
}
176+
177+
@Test
178+
void givenTableAndOriginFile_whenExecuteCopyFromCommandWithSpecificBatchSize_executeExpectedBatches()
179+
throws SQLException {
180+
final ResultSet resultSet = executeCopyFromCommand(COPY_CMD_TEST_PARTIAL_TABLE_NAME,
181+
"test_partial_import.csv", "WITH BATCHSIZE=2");
182+
assertCommandResultSet(resultSet, true, 4, 2, 0);
183+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key1", 1, "test1");
184+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key2", 2, "test2");
185+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key3", 3, "N/A");
186+
assertRowValues(sqlConnection, COPY_CMD_TEST_PARTIAL_TABLE, "key4", 4, "test4");
187+
}
188+
189+
// TODO: test with all types
122190
}

Diff for: src/test/java/com/ing/data/cassandra/jdbc/utils/CopyCommandsTestUtils.java

+27
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import java.sql.ResultSet;
2121
import java.sql.SQLException;
2222

23+
import static org.apache.commons.lang3.StringUtils.EMPTY;
24+
import static org.hamcrest.MatcherAssert.assertThat;
25+
import static org.hamcrest.Matchers.containsString;
2326
import static org.junit.jupiter.api.Assertions.assertEquals;
2427
import static org.junit.jupiter.api.Assertions.assertTrue;
2528

@@ -53,4 +56,28 @@ public static void assertRowValues(final Connection connection,
5356
verifyStmt.close();
5457
}
5558

59+
public static void assertCommandResultSet(final ResultSet rs, final boolean isImport,
60+
final int expectedProcessedRows,
61+
final int expectedExecutedBatches) throws SQLException {
62+
assertCommandResultSet(rs, isImport, expectedProcessedRows, expectedExecutedBatches, null);
63+
}
64+
65+
public static void assertCommandResultSet(final ResultSet rs, final boolean isImport,
66+
final int expectedProcessedRows, final int expectedExecutedBatches,
67+
final Integer expectedSkippedRows) throws SQLException {
68+
String expectedVerb = "exported to";
69+
String expectedSkippedRowsInfo = EMPTY;
70+
if (isImport) {
71+
expectedVerb = "imported from";
72+
expectedSkippedRowsInfo = String.format(" (%d skipped)", expectedSkippedRows);
73+
}
74+
75+
assertTrue(rs.next());
76+
assertEquals(1, rs.findColumn("result"));
77+
assertThat(rs.getString(1), containsString(String.format("%d row(s) %s 1 file in %d batch(es)%s",
78+
expectedProcessedRows, expectedVerb, expectedExecutedBatches, expectedSkippedRowsInfo)));
79+
80+
rs.close();
81+
}
82+
5683
}

0 commit comments

Comments
 (0)