Skip to content

Commit 60968ec

Browse files
committed
start implementing bigquery stuff
1 parent a8cefee commit 60968ec

File tree

3 files changed

+147
-3
lines changed

3 files changed

+147
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.client
6+
7+
import com.google.auth.oauth2.GoogleCredentials
8+
import com.google.cloud.bigquery.BigQuery
9+
import com.google.cloud.bigquery.BigQueryOptions
10+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
11+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
12+
import io.github.oshai.kotlinlogging.KotlinLogging
13+
import io.micronaut.context.annotation.Factory
14+
import jakarta.inject.Singleton
15+
import java.io.ByteArrayInputStream
16+
import java.nio.charset.StandardCharsets
17+
18+
private val logger = KotlinLogging.logger {}
19+
20+
@Factory
21+
class BigqueryClientFactory(private val config: BigqueryConfiguration) {
22+
@Singleton
23+
fun make(): BigQuery {
24+
// Follows this order of resolution:
25+
// https://cloud.google.com/java/docs/reference/google-auth-library/latest/com.google.auth.oauth2.GoogleCredentials#com_google_auth_oauth2_GoogleCredentials_getApplicationDefault
26+
val credentials =
27+
if (config.credentialsJson == null) {
28+
logger.info {
29+
"No service account key json is provided. It is required if you are using Airbyte cloud."
30+
}
31+
logger.info { "Using the default service account credential from environment." }
32+
GoogleCredentials.getApplicationDefault()
33+
} else {
34+
// The JSON credential can either be a raw JSON object, or a serialized JSON object.
35+
GoogleCredentials.fromStream(
36+
ByteArrayInputStream(
37+
config.credentialsJson.toByteArray(StandardCharsets.UTF_8)
38+
),
39+
)
40+
}
41+
return BigQueryOptions.newBuilder()
42+
.setProjectId(config.projectId)
43+
.setCredentials(credentials)
44+
.setHeaderProvider(BigQueryUtils.headerProvider)
45+
.build()
46+
.service
47+
}
48+
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/operation/BigQueryDirectLoadingStorageOperation.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ class BigQueryDirectLoadingStorageOperation(
4343
) {
4444
private val rateLimiter: RateLimiter = RateLimiter.create(0.07)
4545
companion object {
46-
private const val HTTP_STATUS_CODE_FORBIDDEN = 403
47-
private const val HTTP_STATUS_CODE_NOT_FOUND = 404
46+
const val HTTP_STATUS_CODE_FORBIDDEN = 403
47+
const val HTTP_STATUS_CODE_NOT_FOUND = 404
4848

49-
private val CONFIG_ERROR_MSG =
49+
val CONFIG_ERROR_MSG =
5050
"""
5151
|Failed to write to destination schema.
5252
| 1. Make sure you have all required permissions for writing to the schema.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.write.standard_insert
6+
7+
import com.google.cloud.bigquery.BigQuery
8+
import com.google.cloud.bigquery.BigQueryException
9+
import com.google.cloud.bigquery.FormatOptions
10+
import com.google.cloud.bigquery.JobId
11+
import com.google.cloud.bigquery.JobInfo
12+
import com.google.cloud.bigquery.TableDataWriteChannel
13+
import com.google.cloud.bigquery.TableId
14+
import com.google.cloud.bigquery.WriteChannelConfiguration
15+
import io.airbyte.cdk.ConfigErrorException
16+
import io.airbyte.cdk.load.command.DestinationCatalog
17+
import io.airbyte.cdk.load.command.DestinationStream
18+
import io.airbyte.cdk.load.message.DestinationRecordRaw
19+
import io.airbyte.cdk.load.write.DirectLoader
20+
import io.airbyte.cdk.load.write.DirectLoaderFactory
21+
import io.airbyte.integrations.destination.bigquery.BigQueryUtils
22+
import io.airbyte.integrations.destination.bigquery.formatter.BigQueryRecordFormatter
23+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.CONFIG_ERROR_MSG
24+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.HTTP_STATUS_CODE_FORBIDDEN
25+
import io.airbyte.integrations.destination.bigquery.operation.BigQueryDirectLoadingStorageOperation.Companion.HTTP_STATUS_CODE_NOT_FOUND
26+
import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration
27+
import jakarta.inject.Singleton
28+
import java.nio.ByteBuffer
29+
import java.nio.charset.StandardCharsets
30+
31+
class BigqueryDirectLoader(
32+
private val stream: DestinationStream,
33+
private val writer: TableDataWriteChannel,
34+
) : DirectLoader {
35+
override fun accept(record: DestinationRecordRaw): DirectLoader.DirectLoadResult {
36+
// TODO wtf is a bigquery record formatter
37+
val byteArray =
38+
"${bigQueryRecordFormatter.formatRecord(record, stream.generationId)} ${System.lineSeparator()}".toByteArray(
39+
StandardCharsets.UTF_8,
40+
)
41+
writer.write(ByteBuffer.wrap(byteArray))
42+
return DirectLoader.Incomplete
43+
}
44+
45+
override fun finish() {
46+
writer.close()
47+
BigQueryUtils.waitForJobFinish(writer.job)
48+
}
49+
50+
override fun close() {
51+
// do nothing
52+
}
53+
}
54+
55+
@Singleton
56+
class BigqueryDirectLoaderFactory(
57+
private val catalog: DestinationCatalog,
58+
private val bigquery: BigQuery,
59+
private val config: BigqueryConfiguration,
60+
) : DirectLoaderFactory<BigqueryDirectLoader> {
61+
override fun create(
62+
streamDescriptor: DestinationStream.Descriptor,
63+
part: Int
64+
): BigqueryDirectLoader {
65+
// TODO there was a RateLimiter here for some reason...?
66+
// TODO we need to handle special chars in stream name/namespace
67+
val writeChannelConfiguration =
68+
WriteChannelConfiguration.newBuilder(
69+
TableId.of(streamDescriptor.namespace, streamDescriptor.name)
70+
)
71+
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
72+
.setSchema(BigQueryRecordFormatter.SCHEMA_V2)
73+
.setFormatOptions(FormatOptions.json())
74+
.build() // new-line delimited json.
75+
76+
val job =
77+
JobId.newBuilder()
78+
.setRandomJob()
79+
.setLocation(config.datasetLocation.region)
80+
.setProject(bigquery.options.projectId)
81+
.build()
82+
83+
val writer =
84+
try {
85+
bigquery.writer(job, writeChannelConfiguration)
86+
} catch (e: BigQueryException) {
87+
if (e.code == HTTP_STATUS_CODE_FORBIDDEN || e.code == HTTP_STATUS_CODE_NOT_FOUND) {
88+
throw ConfigErrorException(CONFIG_ERROR_MSG + e)
89+
} else {
90+
throw BigQueryException(e.code, e.message)
91+
}
92+
}
93+
94+
return BigqueryDirectLoader(catalog.getStream(streamDescriptor), writer)
95+
}
96+
}

0 commit comments

Comments
 (0)