Skip to content

Commit d6d0867

Browse files
Add BigQuery BigLake Managed Tables as a Datastream destination (#13189) (#9677)
[upstream:b3d80d3a4e03b12287f1c7c87a93962111dbb4e3] Signed-off-by: Modular Magician <[email protected]>
1 parent 5962476 commit d6d0867

File tree

4 files changed

+318
-0
lines changed

4 files changed

+318
-0
lines changed

.changelog/13189.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
datastream: added `blmt_config` field to `bigquery_destination_config` resource to enable support for BigLake Managed Tables streams.
3+
```

google-beta/services/datastream/resource_datastream_stream.go

+152
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,41 @@ historical state of the data.`,
162162
},
163163
ConflictsWith: []string{"destination_config.0.bigquery_destination_config.0.merge"},
164164
},
165+
"blmt_config": {
166+
Type: schema.TypeList,
167+
Optional: true,
168+
Description: `BigLake Managed Tables configuration for BigQuery streams.`,
169+
MaxItems: 1,
170+
Elem: &schema.Resource{
171+
Schema: map[string]*schema.Schema{
172+
"bucket": {
173+
Type: schema.TypeString,
174+
Required: true,
175+
Description: `The Cloud Storage bucket name.`,
176+
},
177+
"connection_name": {
178+
Type: schema.TypeString,
179+
Required: true,
180+
Description: `The bigquery connection. Format: '{project}.{location}.{name}'`,
181+
},
182+
"file_format": {
183+
Type: schema.TypeString,
184+
Required: true,
185+
Description: `The file format.`,
186+
},
187+
"table_format": {
188+
Type: schema.TypeString,
189+
Required: true,
190+
Description: `The table format.`,
191+
},
192+
"root_path": {
193+
Type: schema.TypeString,
194+
Optional: true,
195+
Description: `The root path inside the Cloud Storage bucket.`,
196+
},
197+
},
198+
},
199+
},
165200
"data_freshness": {
166201
Type: schema.TypeString,
167202
Optional: true,
@@ -3866,6 +3901,8 @@ func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfig(v interfa
38663901
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSingleTargetDataset(original["singleTargetDataset"], d, config)
38673902
transformed["source_hierarchy_datasets"] =
38683903
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasets(original["sourceHierarchyDatasets"], d, config)
3904+
transformed["blmt_config"] =
3905+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(original["blmtConfig"], d, config)
38693906
transformed["merge"] =
38703907
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(original["merge"], d, config)
38713908
transformed["append_only"] =
@@ -3935,6 +3972,47 @@ func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHier
39353972
return v
39363973
}
39373974

3975+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
3976+
if v == nil {
3977+
return nil
3978+
}
3979+
original := v.(map[string]interface{})
3980+
if len(original) == 0 {
3981+
return nil
3982+
}
3983+
transformed := make(map[string]interface{})
3984+
transformed["bucket"] =
3985+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(original["bucket"], d, config)
3986+
transformed["connection_name"] =
3987+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(original["connectionName"], d, config)
3988+
transformed["file_format"] =
3989+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(original["fileFormat"], d, config)
3990+
transformed["table_format"] =
3991+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(original["tableFormat"], d, config)
3992+
transformed["root_path"] =
3993+
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(original["rootPath"], d, config)
3994+
return []interface{}{transformed}
3995+
}
3996+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
3997+
return v
3998+
}
3999+
4000+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
4001+
return v
4002+
}
4003+
4004+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
4005+
return v
4006+
}
4007+
4008+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
4009+
return v
4010+
}
4011+
4012+
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
4013+
return v
4014+
}
4015+
39384016
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
39394017
if v == nil {
39404018
return nil
@@ -6737,6 +6815,13 @@ func expandDatastreamStreamDestinationConfigBigqueryDestinationConfig(v interfac
67376815
transformed["sourceHierarchyDatasets"] = transformedSourceHierarchyDatasets
67386816
}
67396817

6818+
transformedBlmtConfig, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(original["blmt_config"], d, config)
6819+
if err != nil {
6820+
return nil, err
6821+
} else if val := reflect.ValueOf(transformedBlmtConfig); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6822+
transformed["blmtConfig"] = transformedBlmtConfig
6823+
}
6824+
67406825
transformedMerge, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(original["merge"], d, config)
67416826
if err != nil {
67426827
return nil, err
@@ -6854,6 +6939,73 @@ func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHiera
68546939
return v, nil
68556940
}
68566941

6942+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
6943+
l := v.([]interface{})
6944+
if len(l) == 0 || l[0] == nil {
6945+
return nil, nil
6946+
}
6947+
raw := l[0]
6948+
original := raw.(map[string]interface{})
6949+
transformed := make(map[string]interface{})
6950+
6951+
transformedBucket, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(original["bucket"], d, config)
6952+
if err != nil {
6953+
return nil, err
6954+
} else if val := reflect.ValueOf(transformedBucket); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6955+
transformed["bucket"] = transformedBucket
6956+
}
6957+
6958+
transformedConnectionName, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(original["connection_name"], d, config)
6959+
if err != nil {
6960+
return nil, err
6961+
} else if val := reflect.ValueOf(transformedConnectionName); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6962+
transformed["connectionName"] = transformedConnectionName
6963+
}
6964+
6965+
transformedFileFormat, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(original["file_format"], d, config)
6966+
if err != nil {
6967+
return nil, err
6968+
} else if val := reflect.ValueOf(transformedFileFormat); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6969+
transformed["fileFormat"] = transformedFileFormat
6970+
}
6971+
6972+
transformedTableFormat, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(original["table_format"], d, config)
6973+
if err != nil {
6974+
return nil, err
6975+
} else if val := reflect.ValueOf(transformedTableFormat); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6976+
transformed["tableFormat"] = transformedTableFormat
6977+
}
6978+
6979+
transformedRootPath, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(original["root_path"], d, config)
6980+
if err != nil {
6981+
return nil, err
6982+
} else if val := reflect.ValueOf(transformedRootPath); val.IsValid() && !tpgresource.IsEmptyValue(val) {
6983+
transformed["rootPath"] = transformedRootPath
6984+
}
6985+
6986+
return transformed, nil
6987+
}
6988+
6989+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
6990+
return v, nil
6991+
}
6992+
6993+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
6994+
return v, nil
6995+
}
6996+
6997+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
6998+
return v, nil
6999+
}
7000+
7001+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
7002+
return v, nil
7003+
}
7004+
7005+
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
7006+
return v, nil
7007+
}
7008+
68577009
func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
68587010
l := v.([]interface{})
68597011
if len(l) == 0 {

google-beta/services/datastream/resource_datastream_stream_generated_meta.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ fields:
5252
- field: 'desired_state'
5353
provider_only: true
5454
- field: 'destination_config.bigquery_destination_config.append_only'
55+
- field: 'destination_config.bigquery_destination_config.blmt_config.bucket'
56+
- field: 'destination_config.bigquery_destination_config.blmt_config.connection_name'
57+
- field: 'destination_config.bigquery_destination_config.blmt_config.file_format'
58+
- field: 'destination_config.bigquery_destination_config.blmt_config.root_path'
59+
- field: 'destination_config.bigquery_destination_config.blmt_config.table_format'
5560
- field: 'destination_config.bigquery_destination_config.data_freshness'
5661
- field: 'destination_config.bigquery_destination_config.merge'
5762
- field: 'destination_config.bigquery_destination_config.single_target_dataset.dataset_id'

website/docs/r/datastream_stream.html.markdown

+158
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,137 @@ resource "google_datastream_stream" "default" {
10941094
}
10951095
}
10961096
```
1097+
## Example Usage - Datastream Stream Bigquery Blmt
1098+
1099+
1100+
```hcl
1101+
data "google_project" "project" {
1102+
}
1103+
1104+
resource "google_sql_database_instance" "instance" {
1105+
name = "blmt-instance"
1106+
database_version = "MYSQL_8_0"
1107+
region = "us-central1"
1108+
settings {
1109+
tier = "db-f1-micro"
1110+
ip_configuration {
1111+
1112+
// Datastream IPs will vary by region.
1113+
authorized_networks {
1114+
value = "34.71.242.81"
1115+
}
1116+
1117+
authorized_networks {
1118+
value = "34.72.28.29"
1119+
}
1120+
1121+
authorized_networks {
1122+
value = "34.67.6.157"
1123+
}
1124+
1125+
authorized_networks {
1126+
value = "34.67.234.134"
1127+
}
1128+
1129+
authorized_networks {
1130+
value = "34.72.239.218"
1131+
}
1132+
}
1133+
}
1134+
deletion_protection = true
1135+
}
1136+
1137+
resource "google_sql_database" "db" {
1138+
instance = google_sql_database_instance.instance.name
1139+
name = "db"
1140+
}
1141+
1142+
resource "random_password" "pwd" {
1143+
length = 16
1144+
special = false
1145+
}
1146+
1147+
resource "google_sql_user" "user" {
1148+
name = "user"
1149+
instance = google_sql_database_instance.instance.name
1150+
host = "%"
1151+
password = random_password.pwd.result
1152+
}
1153+
1154+
resource "google_storage_bucket" "blmt_bucket" {
1155+
# Use variable from Stream.yaml for the name
1156+
name = "blmt-bucket"
1157+
location = "us-central1"
1158+
force_destroy = true
1159+
}
1160+
1161+
resource "google_bigquery_connection" "blmt_connection" {
1162+
project = data.google_project.project.project_id
1163+
location = "us-central1"
1164+
connection_id = "blmt-connection"
1165+
friendly_name = "Datastream BLMT Test Connection"
1166+
description = "Connection for Datastream BLMT test"
1167+
1168+
cloud_resource {}
1169+
}
1170+
1171+
resource "google_storage_bucket_iam_member" "blmt_connection_bucket_admin" {
1172+
bucket = google_storage_bucket.blmt_bucket.name
1173+
role = "roles/storage.admin"
1174+
member = "serviceAccount:${google_bigquery_connection.blmt_connection.cloud_resource[0].service_account_id}"
1175+
}
1176+
1177+
resource "google_datastream_connection_profile" "source_connection_profile" {
1178+
display_name = "Source connection profile"
1179+
location = "us-central1"
1180+
connection_profile_id = "blmt-source-profile"
1181+
1182+
mysql_profile {
1183+
hostname = google_sql_database_instance.instance.public_ip_address
1184+
username = google_sql_user.user.name
1185+
password = google_sql_user.user.password
1186+
}
1187+
}
1188+
1189+
resource "google_datastream_connection_profile" "destination_connection_profile" {
1190+
display_name = "Connection profile"
1191+
location = "us-central1"
1192+
connection_profile_id = "blmt-destination-profile"
1193+
1194+
bigquery_profile {}
1195+
}
1196+
1197+
resource "google_datastream_stream" "default" {
1198+
stream_id = "blmt-stream"
1199+
location = "us-central1"
1200+
display_name = "My BLMT stream"
1201+
source_config {
1202+
source_connection_profile = google_datastream_connection_profile.source_connection_profile.id
1203+
mysql_source_config {}
1204+
}
1205+
destination_config {
1206+
destination_connection_profile = google_datastream_connection_profile.destination_connection_profile.id
1207+
bigquery_destination_config {
1208+
source_hierarchy_datasets {
1209+
dataset_template {
1210+
location = "us-central1"
1211+
}
1212+
}
1213+
blmt_config {
1214+
bucket = google_storage_bucket.blmt_bucket.name
1215+
connection_name = "${google_bigquery_connection.blmt_connection.project}.${google_bigquery_connection.blmt_connection.location}.${google_bigquery_connection.blmt_connection.connection_id}"
1216+
file_format = "PARQUET"
1217+
table_format = "ICEBERG"
1218+
root_path = "/"
1219+
}
1220+
append_only {}
1221+
}
1222+
}
1223+
1224+
backfill_none {
1225+
}
1226+
}
1227+
```
10971228

10981229
## Argument Reference
10991230

@@ -1878,6 +2009,11 @@ The following arguments are supported:
18782009
Destination datasets are created so that hierarchy of the destination data objects matches the source hierarchy.
18792010
Structure is [documented below](#nested_destination_config_bigquery_destination_config_source_hierarchy_datasets).
18802011

2012+
* `blmt_config` -
2013+
(Optional)
2014+
BigLake Managed Tables configuration for BigQuery streams.
2015+
Structure is [documented below](#nested_destination_config_bigquery_destination_config_blmt_config).
2016+
18812017
* `merge` -
18822018
(Optional)
18832019
Merge mode defines that all changes to a table will be merged at the destination Google BigQuery
@@ -1925,6 +2061,28 @@ The following arguments are supported:
19252061
encryption key. i.e. projects/{project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{cryptoKey}.
19262062
See https://cloud.google.com/bigquery/docs/customer-managed-encryption for more information.
19272063

2064+
<a name="nested_destination_config_bigquery_destination_config_blmt_config"></a>The `blmt_config` block supports:
2065+
2066+
* `bucket` -
2067+
(Required)
2068+
The Cloud Storage bucket name.
2069+
2070+
* `connection_name` -
2071+
(Required)
2072+
The bigquery connection. Format: `{project}.{location}.{name}`
2073+
2074+
* `file_format` -
2075+
(Required)
2076+
The file format.
2077+
2078+
* `table_format` -
2079+
(Required)
2080+
The table format.
2081+
2082+
* `root_path` -
2083+
(Optional)
2084+
The root path inside the Cloud Storage bucket.
2085+
19282086
- - -
19292087

19302088

0 commit comments

Comments
 (0)