Skip to content

Commit 7b49c5d

Browse files
committed
bigquery - basic direct loader for raw tables only (#56984)
1 parent 537a7d0 commit 7b49c5d

File tree

8 files changed

+315
-12
lines changed

8 files changed

+315
-12
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ import io.airbyte.cdk.load.message.DestinationRecordRaw
3232
* always forward whatever work is in progress in [finish], as [accept] will not be called again for
3333
* the same batch.
3434
*
35+
* After [accept] returns a [DirectLoader.Complete] status, or after [finish] returns, the
36+
* DirectLoader MUST have committed all data durably to the destination.
37+
*
3538
* [close] will be called once at the end of the batch, after the last call to [accept] or [finish],
3639
* or if the sync fails. Afterward the loader will be discarded and a new one will be created for
3740
* the next batch if more data arrives. (Note: close should only be used to do cleanup that must

airbyte-integrations/connectors/destination-bigquery/src/integrationTestLegacy/kotlin/io/airbyte/integrations/destination/bigquery/typing_deduping/BigQueryStandardInsertsRawOverrideDisableTypingDedupingTest.kt

+34
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33
*/
44
package io.airbyte.integrations.destination.bigquery.typing_deduping
55

6+
import io.airbyte.protocol.models.v0.AirbyteStream
7+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
8+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
9+
import io.airbyte.protocol.models.v0.DestinationSyncMode
10+
import io.airbyte.protocol.models.v0.SyncMode
611
import org.junit.jupiter.api.Disabled
12+
import org.junit.jupiter.api.Test
713

814
class BigQueryStandardInsertsRawOverrideDisableTypingDedupingTest :
915
AbstractBigQueryTypingDedupingTest() {
@@ -17,6 +23,34 @@ class BigQueryStandardInsertsRawOverrideDisableTypingDedupingTest :
1723
return true
1824
}
1925

26+
@Test
27+
@Throws(Exception::class)
28+
fun arst() {
29+
val catalog =
30+
ConfiguredAirbyteCatalog()
31+
.withStreams(
32+
listOf(
33+
ConfiguredAirbyteStream()
34+
.withSyncMode(SyncMode.INCREMENTAL)
35+
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP)
36+
.withSyncId(42L)
37+
.withGenerationId(43L)
38+
.withMinimumGenerationId(0L)
39+
.withPrimaryKey(listOf(listOf("id1"), listOf("id2")))
40+
.withStream(
41+
AirbyteStream()
42+
.withNamespace(streamNamespace)
43+
.withName(streamName)
44+
.withJsonSchema(SCHEMA)
45+
)
46+
)
47+
)
48+
49+
val messages = readMessages("dat/sync1_messages.jsonl")
50+
runSync(catalog, messages)
51+
println()
52+
}
53+
2054
@Disabled
2155
@Throws(Exception::class)
2256
override fun testRemovingPKNonNullIndexes() {

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/formatter/BigQueryRecordFormatter.kt

+39-9
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@ import com.google.cloud.bigquery.StandardSQLTypeName
1313
import io.airbyte.cdk.integrations.base.JavaBaseConstants
1414
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
1515
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteRecordMessage
16-
import io.airbyte.commons.json.Jsons
16+
import io.airbyte.cdk.load.data.IntegerValue
17+
import io.airbyte.cdk.load.data.ObjectValue
18+
import io.airbyte.cdk.load.data.StringValue
19+
import io.airbyte.cdk.load.message.DestinationRecordRaw
20+
import io.airbyte.cdk.load.message.Meta
21+
import io.airbyte.cdk.load.util.serializeToString
1722
import io.airbyte.commons.json.Jsons.emptyObject
1823
import io.airbyte.commons.json.Jsons.serialize
19-
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
2024
import java.util.*
2125
import java.util.concurrent.TimeUnit
2226

@@ -34,20 +38,46 @@ class BigQueryRecordFormatter {
3438
)
3539
record.set<JsonNode>(JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT, NullNode.instance)
3640
record.put(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.serialized)
37-
record.put(
38-
JavaBaseConstants.COLUMN_NAME_AB_META,
39-
Jsons.serialize<AirbyteRecordMessageMeta>(recordMessage.record!!.meta!!)
40-
)
41+
record.put(JavaBaseConstants.COLUMN_NAME_AB_META, serialize(recordMessage.record!!.meta!!))
4142
record.put(JavaBaseConstants.COLUMN_NAME_AB_GENERATION_ID, generationId)
4243
return serialize(record)
4344
}
4445

46+
fun formatRecord(record: DestinationRecordRaw): String {
47+
val enrichedRecord = record.asEnrichedDestinationRecordAirbyteValue()
48+
49+
val outputRecord = mutableMapOf<String, Any?>()
50+
enrichedRecord.airbyteMetaFields.forEach { (key, value) ->
51+
when (key) {
52+
Meta.COLUMN_NAME_AB_EXTRACTED_AT -> {
53+
val extractedAtMillis = (value.abValue as IntegerValue).value.longValueExact()
54+
outputRecord[key] = getExtractedAt(extractedAtMillis)
55+
}
56+
Meta.COLUMN_NAME_AB_META -> {
57+
val serializedAirbyteMeta = (value.abValue as ObjectValue).serializeToString()
58+
outputRecord[key] = serializedAirbyteMeta
59+
}
60+
Meta.COLUMN_NAME_AB_RAW_ID ->
61+
outputRecord[key] = (value.abValue as StringValue).value
62+
Meta.COLUMN_NAME_AB_GENERATION_ID ->
63+
outputRecord[key] = (value.abValue as IntegerValue).value
64+
}
65+
}
66+
67+
outputRecord[JavaBaseConstants.COLUMN_NAME_DATA] = record.asRawJson().serializeToString()
68+
69+
return outputRecord.serializeToString()
70+
}
71+
4572
private fun getEmittedAtField(recordMessage: PartialAirbyteRecordMessage?): String? {
73+
return getExtractedAt(recordMessage!!.emittedAt)
74+
}
75+
76+
private fun getExtractedAt(extractedAtMillis: Long): String? {
4677
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds
47-
// then
48-
// use BQ helpers to string-format correctly.
78+
// then use BQ helpers to string-format correctly.
4979
val emittedAtMicroseconds =
50-
TimeUnit.MICROSECONDS.convert(recordMessage!!.emittedAt, TimeUnit.MILLISECONDS)
80+
TimeUnit.MICROSECONDS.convert(extractedAtMillis, TimeUnit.MILLISECONDS)
5181
return QueryParameterValue.timestamp(emittedAtMicroseconds).value
5282
}
5383

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ class BigQueryDirectLoadingStorageOperation(
4343
) {
4444
private val rateLimiter: RateLimiter = RateLimiter.create(0.07)
4545
companion object {
46-
private const val HTTP_STATUS_CODE_FORBIDDEN = 403
47-
private const val HTTP_STATUS_CODE_NOT_FOUND = 404
46+
const val HTTP_STATUS_CODE_FORBIDDEN = 403
47+
const val HTTP_STATUS_CODE_NOT_FOUND = 404
4848

49-
private val CONFIG_ERROR_MSG =
49+
val CONFIG_ERROR_MSG =
5050
"""
5151
|Failed to write to destination schema.
5252
| 1. Make sure you have all required permissions for writing to the schema.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.util
6+
7+
import com.google.auth.oauth2.GoogleCredentials
8+
import com.google.cloud.bigquery.BigQuery
9+
import com.google.cloud.bigquery.BigQueryOptions
10+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
11+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
12+
import io.github.oshai.kotlinlogging.KotlinLogging
13+
import io.micronaut.context.annotation.Factory
14+
import jakarta.inject.Singleton
15+
import java.io.ByteArrayInputStream
16+
import java.nio.charset.StandardCharsets
17+
18+
private val logger = KotlinLogging.logger {}
19+
20+
@Factory
21+
class BigqueryClientFactory(private val config: BigqueryConfiguration) {
22+
@Singleton
23+
fun make(): BigQuery {
24+
// Follows this order of resolution:
25+
// https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault
26+
val credentials =
27+
if (config.credentialsJson == null) {
28+
logger.info {
29+
"No service account key json is provided. It is required if you are using Airbyte cloud."
30+
}
31+
logger.info { "Using the default service account credential from environment." }
32+
GoogleCredentials.getApplicationDefault()
33+
} else {
34+
// The JSON credential can either be a raw JSON object, or a serialized JSON object.
35+
GoogleCredentials.fromStream(
36+
ByteArrayInputStream(
37+
config.credentialsJson.toByteArray(StandardCharsets.UTF_8)
38+
),
39+
)
40+
}
41+
return BigQueryOptions.newBuilder()
42+
.setProjectId(config.projectId)
43+
.setCredentials(credentials)
44+
.setHeaderProvider(BigQueryUtils.headerProvider)
45+
.build()
46+
.service
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.write
6+
7+
import com.google.cloud.bigquery.BigQuery
8+
import io.airbyte.cdk.load.command.DestinationStream
9+
import io.airbyte.cdk.load.write.StreamLoader
10+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
11+
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
12+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
13+
import io.github.oshai.kotlinlogging.KotlinLogging
14+
15+
private val logger = KotlinLogging.logger {}
16+
17+
class BigqueryStreamLoader(
18+
override val stream: DestinationStream,
19+
private val bigquery: BigQuery,
20+
private val config: BigqueryConfiguration,
21+
) : StreamLoader {
22+
override suspend fun start() {
23+
super.start()
24+
logger.info { "Creating dataset if needed: ${config.rawTableDataset}" }
25+
BigQueryUtils.getOrCreateDataset(
26+
bigquery,
27+
config.rawTableDataset,
28+
config.datasetLocation.region
29+
)
30+
// TODO also need to create final table dataset
31+
logger.info {
32+
"Creating table if needed: ${TempUtils.rawTableId(config, stream.descriptor)}"
33+
}
34+
BigQueryUtils.createPartitionedTableIfNotExists(
35+
bigquery,
36+
TempUtils.rawTableId(config, stream.descriptor),
37+
BigQueryRecordFormatter.SCHEMA_V2,
38+
)
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.write
6+
7+
import com.google.cloud.bigquery.BigQuery
8+
import com.google.cloud.bigquery.TableId
9+
import io.airbyte.cdk.load.command.DestinationStream
10+
import io.airbyte.cdk.load.write.DestinationWriter
11+
import io.airbyte.cdk.load.write.StreamLoader
12+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
13+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
14+
import jakarta.inject.Singleton
15+
16+
@Singleton
17+
class BigqueryWriter(
18+
private val bigquery: BigQuery,
19+
private val config: BigqueryConfiguration,
20+
) : DestinationWriter {
21+
override fun createStreamLoader(stream: DestinationStream): StreamLoader {
22+
return BigqueryStreamLoader(stream, bigquery, config)
23+
}
24+
}
25+
26+
// TODO delete this - this is definitely duplicated code, and also is definitely wrong
27+
// e.g. we need to handle special chars in stream name/namespace (c.f.
28+
// bigquerysqlgenerator.buildStreamId)
29+
// and that logic needs to be in BigqueryWriter.setup, to handle collisions
30+
// (probably actually a toolkit)
31+
object TempUtils {
32+
fun rawTableId(
33+
config: BigqueryConfiguration,
34+
streamDescriptor: DestinationStream.Descriptor,
35+
) =
36+
TableId.of(
37+
config.rawTableDataset,
38+
StreamId.concatenateRawTableName(
39+
streamDescriptor.namespace ?: config.datasetId,
40+
streamDescriptor.name
41+
)
42+
)
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.write.standard_insert
6+
7+
import com.google.cloud.bigquery.BigQuery
8+
import com.google.cloud.bigquery.BigQueryException
9+
import com.google.cloud.bigquery.FormatOptions
10+
import com.google.cloud.bigquery.JobId
11+
import com.google.cloud.bigquery.JobInfo
12+
import com.google.cloud.bigquery.TableDataWriteChannel
13+
import com.google.cloud.bigquery.WriteChannelConfiguration
14+
import io.airbyte.cdk.ConfigErrorException
15+
import io.airbyte.cdk.load.command.DestinationStream
16+
import io.airbyte.cdk.load.message.DestinationRecordRaw
17+
import io.airbyte.cdk.load.write.DirectLoader
18+
import io.airbyte.cdk.load.write.DirectLoaderFactory
19+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
20+
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
21+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.CONFIG_ERROR_MSG
22+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.HTTP_STATUS_CODE_FORBIDDEN
23+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.HTTP_STATUS_CODE_NOT_FOUND
24+
import io.airbyte.integrations.destination.bigquery.spec.BatchedStandardInsertConfiguration
25+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
26+
import io.airbyte.integrations.destination.bigquery.write.TempUtils
27+
import io.micronaut.context.annotation.Requires
28+
import io.micronaut.context.condition.Condition
29+
import io.micronaut.context.condition.ConditionContext
30+
import jakarta.inject.Singleton
31+
import java.nio.ByteBuffer
32+
import java.nio.charset.StandardCharsets
33+
34+
class BigqueryBatchStandardInsertsLoader(
35+
private val writer: TableDataWriteChannel,
36+
) : DirectLoader {
37+
private val recordFormatter = BigQueryRecordFormatter()
38+
39+
override fun accept(record: DestinationRecordRaw): DirectLoader.DirectLoadResult {
40+
// TODO there was a RateLimiter here for some reason...?
41+
val formattedRecord = recordFormatter.formatRecord(record)
42+
val byteArray =
43+
"$formattedRecord${System.lineSeparator()}".toByteArray(StandardCharsets.UTF_8)
44+
writer.write(ByteBuffer.wrap(byteArray))
45+
return DirectLoader.Incomplete
46+
}
47+
48+
override fun finish() {
49+
writer.close()
50+
BigQueryUtils.waitForJobFinish(writer.job)
51+
}
52+
53+
override fun close() {
54+
if (writer.isOpen) {
55+
writer.close()
56+
}
57+
}
58+
}
59+
60+
class BigqueryConfiguredForBatchStandardInserts : Condition {
61+
override fun matches(context: ConditionContext<*>): Boolean {
62+
val config = context.beanContext.getBean(BigqueryConfiguration::class.java)
63+
return config.loadingMethod is BatchedStandardInsertConfiguration
64+
}
65+
}
66+
67+
@Requires(condition = BigqueryConfiguredForBatchStandardInserts::class)
68+
@Singleton
69+
class BigqueryBatchStandardInsertsLoaderFactory(
70+
private val bigquery: BigQuery,
71+
private val config: BigqueryConfiguration,
72+
) : DirectLoaderFactory<BigqueryBatchStandardInsertsLoader> {
73+
override fun create(
74+
streamDescriptor: DestinationStream.Descriptor,
75+
part: Int
76+
): BigqueryBatchStandardInsertsLoader {
77+
val writeChannelConfiguration =
78+
// TODO need to write to raw vs final table appropriately
79+
WriteChannelConfiguration.newBuilder(TempUtils.rawTableId(config, streamDescriptor))
80+
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
81+
.setSchema(BigQueryRecordFormatter.SCHEMA_V2)
82+
.setFormatOptions(FormatOptions.json())
83+
.build() // new-line delimited json.
84+
85+
val job =
86+
JobId.newBuilder()
87+
.setRandomJob()
88+
.setLocation(config.datasetLocation.region)
89+
.setProject(bigquery.options.projectId)
90+
.build()
91+
92+
val writer =
93+
try {
94+
bigquery.writer(job, writeChannelConfiguration)
95+
} catch (e: BigQueryException) {
96+
if (e.code == HTTP_STATUS_CODE_FORBIDDEN || e.code == HTTP_STATUS_CODE_NOT_FOUND) {
97+
throw ConfigErrorException(CONFIG_ERROR_MSG + e)
98+
} else {
99+
throw BigQueryException(e.code, e.message)
100+
}
101+
}
102+
103+
return BigqueryBatchStandardInsertsLoader(writer)
104+
}
105+
}

0 commit comments

Comments
 (0)