Skip to content

Commit 245f5d8

Browse files
edgaofrifriSF59
andcommitted
Destination bigquery: implement spec (#56979)
Co-authored-by: Francis Genet <[email protected]>
1 parent 2b23f65 commit 245f5d8

File tree

7 files changed

+630
-2
lines changed

7 files changed

+630
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.write.db
6+
7+
object DbConstants {
8+
const val DEFAULT_RAW_TABLE_NAMESPACE = "airbyte_internal"
9+
}

airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/BigQueryDestination.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import com.google.cloud.bigquery.BigQueryOptions
1111
import com.google.cloud.bigquery.QueryJobConfiguration
1212
import com.google.cloud.storage.Storage
1313
import com.google.cloud.storage.StorageOptions
14-
import com.google.common.base.Charsets
1514
import io.airbyte.cdk.AirbyteDestinationRunner
1615
import io.airbyte.cdk.integrations.BaseConnector
1716
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addAllStringsInConfigForDeinterpolation
@@ -62,6 +61,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
6261
import io.github.oshai.kotlinlogging.KotlinLogging
6362
import java.io.ByteArrayInputStream
6463
import java.io.IOException
64+
import java.nio.charset.StandardCharsets
6565
import java.util.*
6666
import java.util.function.Consumer
6767

@@ -553,7 +553,7 @@ class BigQueryDestination : BaseConnector(), Destination {
553553
if (serviceAccountKey.isObject) serialize(serviceAccountKey)
554554
else serviceAccountKey.asText()
555555
return GoogleCredentials.fromStream(
556-
ByteArrayInputStream(credentialsString.toByteArray(Charsets.UTF_8)),
556+
ByteArrayInputStream(credentialsString.toByteArray(StandardCharsets.UTF_8)),
557557
)
558558
}
559559
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.spec
6+
7+
import io.airbyte.cdk.load.command.DestinationConfiguration
8+
import io.airbyte.cdk.load.command.DestinationConfigurationFactory
9+
import io.airbyte.cdk.load.command.gcs.GcsClientConfiguration
10+
import io.airbyte.cdk.load.write.db.DbConstants
11+
import io.micronaut.context.annotation.Factory
12+
import jakarta.inject.Singleton
13+
14+
data class BigqueryConfiguration(
15+
val projectId: String,
16+
val datasetLocation: BigqueryRegion,
17+
val datasetId: String,
18+
val loadingMethod: LoadingMethodConfiguration,
19+
val credentialsJson: String?,
20+
val transformationPriority: TransformationPriority,
21+
val rawTableDataset: String,
22+
val disableTypingDeduping: Boolean,
23+
) : DestinationConfiguration()
24+
25+
sealed interface LoadingMethodConfiguration
26+
27+
data object BatchedStandardInsertConfiguration : LoadingMethodConfiguration
28+
29+
data class GcsStagingConfiguration(
30+
val gcsClientConfig: GcsClientConfiguration,
31+
val filePostProcessing: GcsFilePostProcessing,
32+
) : LoadingMethodConfiguration
33+
34+
@Singleton
35+
class BigqueryConfigurationFactory :
36+
DestinationConfigurationFactory<BigquerySpecification, BigqueryConfiguration> {
37+
override fun makeWithoutExceptionHandling(pojo: BigquerySpecification): BigqueryConfiguration {
38+
val loadingMethodConfig =
39+
when (pojo.loadingMethod) {
40+
is GcsStagingSpecification -> {
41+
val gcsStagingSpec = pojo.loadingMethod as GcsStagingSpecification
42+
GcsStagingConfiguration(
43+
GcsClientConfiguration(gcsStagingSpec, pojo.datasetLocation.gcsRegion),
44+
gcsStagingSpec.filePostProcessing ?: GcsFilePostProcessing.DELETE,
45+
)
46+
}
47+
is BatchedStandardInsertSpecification,
48+
null -> BatchedStandardInsertConfiguration
49+
}
50+
return BigqueryConfiguration(
51+
projectId = pojo.projectId,
52+
pojo.datasetLocation,
53+
datasetId = pojo.datasetId,
54+
loadingMethodConfig,
55+
credentialsJson = pojo.credentialsJson,
56+
pojo.transformationPriority ?: TransformationPriority.INTERACTIVE,
57+
rawTableDataset = pojo.rawTableDataset ?: DbConstants.DEFAULT_RAW_TABLE_NAMESPACE,
58+
disableTypingDeduping = pojo.disableTypingDeduping ?: false,
59+
)
60+
}
61+
}
62+
63+
@Factory
64+
class BigqueryConfigurationProvider(private val config: DestinationConfiguration) {
65+
@Singleton fun get() = config as BigqueryConfiguration
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery.spec
6+
7+
import com.fasterxml.jackson.annotation.JsonProperty
8+
import com.fasterxml.jackson.annotation.JsonPropertyDescription
9+
import com.fasterxml.jackson.annotation.JsonSubTypes
10+
import com.fasterxml.jackson.annotation.JsonTypeInfo
11+
import com.fasterxml.jackson.annotation.JsonValue
12+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaDescription
13+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
14+
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
15+
import io.airbyte.cdk.command.ConfigurationSpecification
16+
import io.airbyte.cdk.load.command.gcs.GcsCommonSpecification
17+
import io.airbyte.cdk.load.command.gcs.GcsRegion
18+
import io.airbyte.cdk.load.spec.DestinationSpecificationExtension
19+
import io.airbyte.protocol.models.v0.DestinationSyncMode
20+
import jakarta.inject.Singleton
21+
22+
@Singleton
23+
class BigquerySpecification : ConfigurationSpecification() {
24+
@get:JsonSchemaTitle("Project ID")
25+
@get:JsonPropertyDescription(
26+
"""The GCP project ID for the project containing the target BigQuery dataset. Read more <a href="https://cloud.google.com/resource-manager/docs/creating-managing-projects#identifying_projects">here</a>."""
27+
)
28+
@get:JsonProperty("project_id")
29+
@get:JsonSchemaInject(json = """{"group": "connection", "order": 0}""")
30+
val projectId: String = ""
31+
32+
@get:JsonSchemaTitle("Dataset Location")
33+
@get:JsonPropertyDescription(
34+
"""The location of the dataset. Warning: Changes made after creation will not be applied. Read more <a href="https://cloud.google.com/bigquery/docs/locations">here</a>."""
35+
)
36+
@get:JsonProperty("dataset_location")
37+
@get:JsonSchemaInject(json = """{"group": "connection", "order": 1}""")
38+
val datasetLocation: BigqueryRegion = BigqueryRegion.US_EAST1
39+
40+
@get:JsonSchemaTitle("Default Dataset ID")
41+
@get:JsonPropertyDescription(
42+
"""The default BigQuery Dataset ID that tables are replicated to if the source does not specify a namespace. Read more <a href="https://cloud.google.com/bigquery/docs/datasets#create-dataset">here</a>."""
43+
)
44+
@get:JsonProperty("dataset_id")
45+
@get:JsonSchemaInject(json = """{"group": "connection", "order": 2}""")
46+
val datasetId: String = ""
47+
48+
@get:JsonSchemaTitle("Loading Method")
49+
@get:JsonPropertyDescription("""The way data will be uploaded to BigQuery.""")
50+
@get:JsonProperty("loading_method")
51+
@get:JsonSchemaInject(json = """{"group": "connection", "order": 3, "display_type": "radio"}""")
52+
val loadingMethod: LoadingMethodSpecification? = BatchedStandardInsertSpecification()
53+
54+
@get:JsonSchemaTitle("Service Account Key JSON (Required for cloud, optional for open-source)")
55+
@get:JsonPropertyDescription(
56+
"""The contents of the JSON service account key. Check out the <a href="https://docs.airbyte.com/integrations/destinations/bigquery#service-account-key">docs</a> if you need help generating this key. Default credentials will be used if this field is left empty."""
57+
)
58+
@get:JsonProperty("credentials_json")
59+
@get:JsonSchemaInject(
60+
json =
61+
"""{"group": "connection", "order": 4, "airbyte_secret": true, "always_show": true}"""
62+
)
63+
val credentialsJson: String? = null
64+
65+
@get:JsonSchemaTitle("Transformation Query Run Type")
66+
@get:JsonPropertyDescription(
67+
"""Interactive run type means that the query is executed as soon as possible, and these queries count towards concurrent rate limit and daily limit. Read more about interactive run type <a href="https://cloud.google.com/bigquery/docs/running-queries#queries">here</a>. Batch queries are queued and started as soon as idle resources are available in the BigQuery shared resource pool, which usually occurs within a few minutes. Batch queries don’t count towards your concurrent rate limit. Read more about batch queries <a href="https://cloud.google.com/bigquery/docs/running-queries#batch">here</a>. The default "interactive" value is used if not set explicitly."""
68+
)
69+
@get:JsonProperty("transformation_priority", defaultValue = "interactive")
70+
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 5}""")
71+
val transformationPriority: TransformationPriority? = null
72+
73+
@get:JsonSchemaTitle("Raw Table Dataset Name")
74+
@get:JsonPropertyDescription(
75+
"""The dataset to write raw tables into (default: airbyte_internal)"""
76+
)
77+
@get:JsonProperty("raw_data_dataset")
78+
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 7}""")
79+
val rawTableDataset: String? = null
80+
81+
@get:JsonSchemaTitle(
82+
"Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)"
83+
)
84+
@get:JsonPropertyDescription(
85+
"""Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions"""
86+
)
87+
@get:JsonProperty("disable_type_dedupe")
88+
@get:JsonSchemaInject(json = """{"group": "advanced", "order": 8, "default": false}""")
89+
val disableTypingDeduping: Boolean? = null
90+
}
91+
92+
@JsonTypeInfo(
93+
use = JsonTypeInfo.Id.NAME,
94+
include = JsonTypeInfo.As.EXISTING_PROPERTY,
95+
property = "method"
96+
)
97+
@JsonSubTypes(
98+
JsonSubTypes.Type(value = BatchedStandardInsertSpecification::class, name = "Standard"),
99+
JsonSubTypes.Type(value = GcsStagingSpecification::class, name = "GCS Staging"),
100+
)
101+
sealed class LoadingMethodSpecification(@JsonProperty("method") val method: LoadingMethod) {
102+
enum class LoadingMethod(@get:JsonValue val typeName: String) {
103+
BATCHED_STANDARD_INSERT("Standard"),
104+
GCS("GCS Staging"),
105+
}
106+
}
107+
108+
@JsonSchemaTitle("Batched Standard Inserts")
109+
@JsonSchemaDescription(
110+
"Direct loading using batched SQL INSERT statements. This method uses the BigQuery driver to convert large INSERT statements into file uploads automatically."
111+
)
112+
class BatchedStandardInsertSpecification :
113+
LoadingMethodSpecification(LoadingMethod.BATCHED_STANDARD_INSERT)
114+
115+
@JsonSchemaTitle("GCS Staging")
116+
@JsonSchemaDescription(
117+
"Writes large batches of records to a file, uploads the file to GCS, then uses COPY INTO to load your data into BigQuery."
118+
)
119+
abstract class GcsStagingSpecification :
120+
GcsCommonSpecification, LoadingMethodSpecification(LoadingMethod.GCS) {
121+
@get:JsonSchemaTitle("GCS Tmp Files Post-Processing")
122+
@get:JsonPropertyDescription(
123+
"""This upload method is supposed to temporary store records in GCS bucket. By this select you can chose if these records should be removed from GCS when migration has finished. The default "Delete all tmp files from GCS" value is used if not set explicitly."""
124+
)
125+
// yes, this is mixed underscore+hyphen.
126+
@get:JsonProperty("keep_files_in_gcs-bucket", defaultValue = "Delete all tmp files from GCS")
127+
@get:JsonSchemaInject(json = """{"order": 3}""")
128+
val filePostProcessing: GcsFilePostProcessing? = null
129+
}
130+
131+
// bigquery supports a subset of GCS regions.
132+
// See https://cloud.google.com/bigquery/docs/locations#supported_locations
133+
enum class BigqueryRegion(@get:JsonValue val region: String, val gcsRegion: GcsRegion) {
134+
EU("EU", GcsRegion.EU),
135+
US("US", GcsRegion.US),
136+
AFRICA_SOUTH1("africa-south1", GcsRegion.AFRICA_SOUTH1),
137+
ASIA_EAST1("asia-east1", GcsRegion.ASIA_EAST1),
138+
ASIA_EAST2("asia-east2", GcsRegion.ASIA_EAST2),
139+
ASIA_NORTHEAST1("asia-northeast1", GcsRegion.ASIA_NORTHEAST1),
140+
ASIA_NORTHEAST2("asia-northeast2", GcsRegion.ASIA_NORTHEAST2),
141+
ASIA_NORTHEAST3("asia-northeast3", GcsRegion.ASIA_NORTHEAST3),
142+
ASIA_SOUTH1("asia-south1", GcsRegion.ASIA_SOUTH1),
143+
ASIA_SOUTH2("asia-south2", GcsRegion.ASIA_SOUTH2),
144+
ASIA_SOUTHEAST1("asia-southeast1", GcsRegion.ASIA_SOUTHEAST1),
145+
ASIA_SOUTHEAST2("asia-southeast2", GcsRegion.ASIA_SOUTHEAST2),
146+
AUSTRALIA_SOUTHEAST1("australia-southeast1", GcsRegion.AUSTRALIA_SOUTHEAST1),
147+
AUSTRALIA_SOUTHEAST2("australia-southeast2", GcsRegion.AUSTRALIA_SOUTHEAST2),
148+
EUROPE_CENTRAL2("europe-central2", GcsRegion.EUROPE_CENTRAL2),
149+
EUROPE_NORTH1("europe-north1", GcsRegion.EUROPE_NORTH1),
150+
EUROPE_NORTH2("europe-north2", GcsRegion.EUROPE_NORTH2),
151+
EUROPE_SOUTHWEST1("europe-southwest1", GcsRegion.EUROPE_SOUTHWEST1),
152+
EUROPE_WEST1("europe-west1", GcsRegion.EUROPE_WEST1),
153+
EUROPE_WEST2("europe-west2", GcsRegion.EUROPE_WEST2),
154+
EUROPE_WEST3("europe-west3", GcsRegion.EUROPE_WEST3),
155+
EUROPE_WEST4("europe-west4", GcsRegion.EUROPE_WEST4),
156+
EUROPE_WEST6("europe-west6", GcsRegion.EUROPE_WEST6),
157+
EUROPE_WEST8("europe-west8", GcsRegion.EUROPE_WEST8),
158+
EUROPE_WEST9("europe-west9", GcsRegion.EUROPE_WEST9),
159+
EUROPE_WEST10("europe-west10", GcsRegion.EUROPE_WEST10),
160+
EUROPE_WEST12("europe-west12", GcsRegion.EUROPE_WEST12),
161+
ME_CENTRAL1("me-central1", GcsRegion.ME_CENTRAL1),
162+
ME_CENTRAL2("me-central2", GcsRegion.ME_CENTRAL2),
163+
ME_WEST1("me-west1", GcsRegion.ME_WEST1),
164+
NORTHAMERICA_NORTHEAST1("northamerica-northeast1", GcsRegion.NORTHAMERICA_NORTHEAST1),
165+
NORTHAMERICA_NORTHEAST2("northamerica-northeast2", GcsRegion.NORTHAMERICA_NORTHEAST2),
166+
NORTHAMERICA_SOUTH1("northamerica-south1", GcsRegion.NORTHAMERICA_SOUTH1),
167+
SOUTHAMERICA_EAST1("southamerica-east1", GcsRegion.SOUTHAMERICA_EAST1),
168+
SOUTHAMERICA_WEST1("southamerica-west1", GcsRegion.SOUTHAMERICA_WEST1),
169+
US_CENTRAL1("us-central1", GcsRegion.US_CENTRAL1),
170+
US_EAST1("us-east1", GcsRegion.US_EAST1),
171+
US_EAST4("us-east4", GcsRegion.US_EAST4),
172+
US_EAST5("us-east5", GcsRegion.US_EAST5),
173+
US_SOUTH1("us-south1", GcsRegion.US_SOUTH1),
174+
US_WEST1("us-west1", GcsRegion.US_WEST1),
175+
US_WEST2("us-west2", GcsRegion.US_WEST2),
176+
US_WEST3("us-west3", GcsRegion.US_WEST3),
177+
US_WEST4("us-west4", GcsRegion.US_WEST4),
178+
}
179+
180+
enum class GcsFilePostProcessing(@get:JsonValue val postProcesing: String) {
181+
DELETE("Delete all tmp files from GCS"),
182+
KEEP("Keep all tmp files in GCS"),
183+
}
184+
185+
enum class TransformationPriority(@get:JsonValue val transformationPriority: String) {
186+
INTERACTIVE("interactive"),
187+
BATCH("batch")
188+
}
189+
190+
@Singleton
191+
class BigquerySpecificationExtension : DestinationSpecificationExtension {
192+
override val supportedSyncModes =
193+
listOf(
194+
DestinationSyncMode.OVERWRITE,
195+
DestinationSyncMode.APPEND,
196+
DestinationSyncMode.APPEND_DEDUP
197+
)
198+
override val supportsIncremental = true
199+
override val groups =
200+
listOf(
201+
DestinationSpecificationExtension.Group("connection", "Connection"),
202+
DestinationSpecificationExtension.Group("advanced", "Advanced"),
203+
)
204+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
/*
2+
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.bigquery
6+
7+
import io.airbyte.cdk.load.spec.SpecTest
8+
9+
class BigquerySpecTest : SpecTest()

0 commit comments

Comments
 (0)