Skip to content

Commit

Permalink
[bq] test BQ with Docker emulator
Browse files Browse the repository at this point in the history
  • Loading branch information
dgray16 committed Aug 18, 2024
1 parent 6f3277a commit e2fcfbf
Show file tree
Hide file tree
Showing 21 changed files with 340 additions and 473 deletions.
12 changes: 9 additions & 3 deletions spring-batch-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@
<version>2.0.16</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.20.1</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand All @@ -136,8 +141,9 @@
<version>3.3.1</version>
<configuration>
<includes>
<!-- Integration tests are omitted because they are designed to be run locally -->
<include>/unit</include>
<!-- Google cloud tests are omitted because they are designed to be run locally -->
<include>**/unit/**</include>
<include>**/emulator/**</include>
</includes>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import org.springframework.batch.extensions.bigquery.writer.BigQueryCsvItemWriter;
import org.springframework.batch.extensions.bigquery.writer.BigQueryJsonItemWriter;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryCsvItemWriterBuilder;
import org.springframework.batch.extensions.bigquery.writer.builder.BigQueryJsonItemWriterBuilder;
import org.springframework.batch.item.Chunk;

import java.util.Comparator;
Expand Down Expand Up @@ -69,25 +67,4 @@ public void loadCsvSample(String tableName) throws Exception {
job.get().waitFor();
}

public void loadJsonSample(String tableName) throws Exception {
AtomicReference<Job> job = new AtomicReference<>();

WriteChannelConfiguration channelConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(TestConstants.DATASET, tableName))
.setSchema(PersonDto.getBigQuerySchema())
.setAutodetect(false)
.setFormatOptions(FormatOptions.json())
.build();

BigQueryJsonItemWriter<PersonDto> writer = new BigQueryJsonItemWriterBuilder<PersonDto>()
.bigQuery(bigQuery)
.writeChannelConfig(channelConfiguration)
.jobConsumer(job::set)
.build();

writer.afterPropertiesSet();
writer.write(CHUNK);
job.get().waitFor();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ private TestConstants() {}
public static final String DATASET = "spring_batch_extensions";
public static final String NAME = "name";
public static final String AGE = "age";
public static final String CSV = "csv";
public static final String JSON = "json";

public static final Converter<FieldValueList, PersonDto> PERSON_MAPPER = res -> new PersonDto(
res.get(NAME).getStringValue(), Long.valueOf(res.get(AGE).getLongValue()).intValue()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.springframework.batch.extensions.bigquery.emulator.reader;

import com.google.cloud.NoCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.junit.jupiter.api.BeforeAll;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.MountableFile;

@Testcontainers
abstract class BaseEmulatorItemReaderTest {
private static final int PORT = 9050;

private static final String PROJECT = "batch-test";

@Container
private static final GenericContainer<?> CONTAINER = new GenericContainer<>("ghcr.io/goccy/bigquery-emulator")
.withExposedPorts(PORT)
.withCommand("--project=" + PROJECT, "--data-from-yaml=/test-data.yaml")
.withCopyFileToContainer(MountableFile.forClasspathResource("test-data.yaml"), "/test-data.yaml");

protected static BigQuery bigQuery;

@BeforeAll
static void init() {
bigQuery = BigQueryOptions
.newBuilder()
.setHost("http://%s:%d".formatted(CONTAINER.getHost(), CONTAINER.getMappedPort(PORT)))
.setProjectId(PROJECT)
.setCredentials(NoCredentials.getInstance())
.build()
.getService();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.springframework.batch.extensions.bigquery.emulator.reader;

import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder;

class BigQueryEmulatorItemReaderTest extends BaseEmulatorItemReaderTest {

@Test
void testBatchReader() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.setPriority(QueryJobConfiguration.Priority.BATCH)
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

@Test
void testInteractiveReader() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.csv p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

private void verifyResult(BigQueryQueryItemReader<PersonDto> reader) throws Exception {
PersonDto actual1 = reader.read();
Assertions.assertEquals("Volodymyr", actual1.name());
Assertions.assertEquals(27, actual1.age());

PersonDto actual2 = reader.read();
Assertions.assertEquals("Oleksandra", actual2.name());
Assertions.assertEquals(26, actual2.age());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.springframework.batch.extensions.bigquery.emulator.writer;

// TODO
public class JsonWriterTest {
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,11 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.base;
package org.springframework.batch.extensions.bigquery.gcloud.base;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import org.junit.jupiter.api.TestInfo;

import java.lang.reflect.Method;

public abstract class BaseBigQueryIntegrationTest {

private static final String TABLE_PATTERN = "%s_%s";

public final BigQuery bigQuery = BigQueryOptions.getDefaultInstance().getService();

protected String getTableName(TestInfo testInfo) {
return String.format(
TABLE_PATTERN,
testInfo.getTags().iterator().next(),
testInfo.getTestMethod().map(Method::getName).orElseThrow()
);
}
public abstract class BaseBigQueryGcloudIntegrationTest {
protected static final BigQuery BIG_QUERY = BigQueryOptions.getDefaultInstance().getService();
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@
*
* @see <a href="https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries#before-you-begin">Authentication</a>
*/
package org.springframework.batch.extensions.bigquery.integration;
package org.springframework.batch.extensions.bigquery.gcloud;
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,76 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.reader.batch;

import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
package org.springframework.batch.extensions.bigquery.gcloud.reader;

import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.*;
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.reader.base.BaseCsvJsonInteractiveQueryItemReaderTest;
import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest;
import org.springframework.batch.extensions.bigquery.reader.BigQueryQueryItemReader;
import org.springframework.batch.extensions.bigquery.reader.builder.BigQueryQueryItemReaderBuilder;
import org.springframework.batch.item.Chunk;

@Tag("csv")
class BigQueryBatchQueryCsvItemReaderTest extends BaseCsvJsonInteractiveQueryItemReaderTest {
class BigQueryGcloudItemReaderTest extends BaseBigQueryGcloudIntegrationTest {

@Test
void batchQueryTest1(TestInfo testInfo) throws Exception {
String tableName = getTableName(testInfo);
new BigQueryDataLoader(bigQuery).loadCsvSample(tableName);
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;
@BeforeAll
static void init() throws Exception {
if (BIG_QUERY.getDataset(TestConstants.DATASET) == null) {
BIG_QUERY.create(DatasetInfo.of(TestConstants.DATASET));
}

if (BIG_QUERY.getTable(TestConstants.DATASET, TestConstants.CSV) == null) {
TableDefinition tableDefinition = StandardTableDefinition.of(PersonDto.getBigQuerySchema());
BIG_QUERY.create(TableInfo.of(TableId.of(TestConstants.DATASET, TestConstants.CSV), tableDefinition));
}

new BigQueryDataLoader(BIG_QUERY).loadCsvSample(TestConstants.CSV);
}

@AfterAll
static void cleanupTest() {
BIG_QUERY.delete(TableId.of(TestConstants.DATASET, TestConstants.CSV));
}

@Test
void testBatchQuery() throws Exception {
QueryJobConfiguration jobConfiguration = QueryJobConfiguration
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2")
.setDestinationTable(TableId.of(TestConstants.DATASET, tableName))
.newBuilder("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV))
.setDestinationTable(TableId.of(TestConstants.DATASET, TestConstants.CSV))
.setPriority(QueryJobConfiguration.Priority.BATCH)
.build();

BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(bigQuery)
.bigQuery(BIG_QUERY)
.rowMapper(TestConstants.PERSON_MAPPER)
.jobConfiguration(jobConfiguration)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

@Test
void testInteractiveQuery() throws Exception {
BigQueryQueryItemReader<PersonDto> reader = new BigQueryQueryItemReaderBuilder<PersonDto>()
.bigQuery(BIG_QUERY)
.query("SELECT p.name, p.age FROM spring_batch_extensions.%s p ORDER BY p.name LIMIT 2".formatted(TestConstants.CSV))
.rowMapper(TestConstants.PERSON_MAPPER)
.build();

reader.afterPropertiesSet();

verifyResult(reader);
}

private void verifyResult(BigQueryQueryItemReader<PersonDto> reader) throws Exception {
PersonDto actualFirstPerson = reader.read();
PersonDto expectedFirstPerson = chunk.getItems().get(0);
PersonDto expectedFirstPerson = BigQueryDataLoader.CHUNK.getItems().get(0);

PersonDto actualSecondPerson = reader.read();
PersonDto expectedSecondPerson = chunk.getItems().get(1);
PersonDto expectedSecondPerson = BigQueryDataLoader.CHUNK.getItems().get(1);

PersonDto actualThirdPerson = reader.read();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,35 @@
* limitations under the License.
*/

package org.springframework.batch.extensions.bigquery.integration.writer;
package org.springframework.batch.extensions.bigquery.gcloud.writer;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.Dataset;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.bigquery.*;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.batch.extensions.bigquery.common.BigQueryDataLoader;
import org.springframework.batch.extensions.bigquery.common.PersonDto;
import org.springframework.batch.extensions.bigquery.common.TestConstants;
import org.springframework.batch.extensions.bigquery.integration.writer.base.BaseBigQueryItemWriterTest;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.extensions.bigquery.gcloud.base.BaseBigQueryGcloudIntegrationTest;

@Tag("csv")
class BigQueryCsvItemWriterTest extends BaseBigQueryItemWriterTest {
abstract class BaseBigQueryGcloudItemWriterTest extends BaseBigQueryGcloudIntegrationTest {

@Test
void test1(TestInfo testInfo) throws Exception {
String tableName = getTableName(testInfo);
new BigQueryDataLoader(bigQuery).loadCsvSample(tableName);
Chunk<PersonDto> chunk = BigQueryDataLoader.CHUNK;

Dataset dataset = bigQuery.getDataset(TestConstants.DATASET);
Table table = bigQuery.getTable(TableId.of(TestConstants.DATASET, tableName));
protected void verifyResults(String tableName) {
Dataset dataset = BIG_QUERY.getDataset(TestConstants.DATASET);
Table table = BIG_QUERY.getTable(TableId.of(TestConstants.DATASET, tableName));
TableId tableId = table.getTableId();
TableResult tableResult = bigQuery.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));
TableResult tableResult = BIG_QUERY.listTableData(tableId, BigQuery.TableDataListOption.pageSize(2L));

Assertions.assertNotNull(dataset.getDatasetId());
Assertions.assertNotNull(tableId);
Assertions.assertEquals(chunk.size(), tableResult.getTotalRows());
Assertions.assertEquals(BigQueryDataLoader.CHUNK.size(), tableResult.getTotalRows());

tableResult
.getValues()
.forEach(field -> {
Assertions.assertTrue(
chunk.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
BigQueryDataLoader.CHUNK.getItems().stream().map(PersonDto::name).anyMatch(name -> field.get(0).getStringValue().equals(name))
);

boolean ageCondition = chunk
boolean ageCondition = BigQueryDataLoader.CHUNK
.getItems()
.stream()
.map(PersonDto::age)
Expand Down
Loading

0 comments on commit e2fcfbf

Please sign in to comment.