Skip to content

Add BigQuery BigLake Managed Tables as a Datastream destination #9677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/13189.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
datastream: added `blmt_config` field to `bigquery_destination_config` resource to enable support for BigLake Managed Tables streams.
```
152 changes: 152 additions & 0 deletions google-beta/services/datastream/resource_datastream_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,41 @@ historical state of the data.`,
},
ConflictsWith: []string{"destination_config.0.bigquery_destination_config.0.merge"},
},
"blmt_config": {
Type: schema.TypeList,
Optional: true,
Description: `BigLake Managed Tables configuration for BigQuery streams.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket": {
Type: schema.TypeString,
Required: true,
Description: `The Cloud Storage bucket name.`,
},
"connection_name": {
Type: schema.TypeString,
Required: true,
Description: `The bigquery connection. Format: '{project}.{location}.{name}'`,
},
"file_format": {
Type: schema.TypeString,
Required: true,
Description: `The file format.`,
},
"table_format": {
Type: schema.TypeString,
Required: true,
Description: `The table format.`,
},
"root_path": {
Type: schema.TypeString,
Optional: true,
Description: `The root path inside the Cloud Storage bucket.`,
},
},
},
},
"data_freshness": {
Type: schema.TypeString,
Optional: true,
Expand Down Expand Up @@ -3866,6 +3901,8 @@ func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfig(v interfa
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSingleTargetDataset(original["singleTargetDataset"], d, config)
transformed["source_hierarchy_datasets"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHierarchyDatasets(original["sourceHierarchyDatasets"], d, config)
transformed["blmt_config"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(original["blmtConfig"], d, config)
transformed["merge"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(original["merge"], d, config)
transformed["append_only"] =
Expand Down Expand Up @@ -3935,6 +3972,47 @@ func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHier
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["bucket"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(original["bucket"], d, config)
transformed["connection_name"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(original["connectionName"], d, config)
transformed["file_format"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(original["fileFormat"], d, config)
transformed["table_format"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(original["tableFormat"], d, config)
transformed["root_path"] =
flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(original["rootPath"], d, config)
return []interface{}{transformed}
}
func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
Expand Down Expand Up @@ -6737,6 +6815,13 @@ func expandDatastreamStreamDestinationConfigBigqueryDestinationConfig(v interfac
transformed["sourceHierarchyDatasets"] = transformedSourceHierarchyDatasets
}

transformedBlmtConfig, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(original["blmt_config"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedBlmtConfig); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["blmtConfig"] = transformedBlmtConfig
}

transformedMerge, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(original["merge"], d, config)
if err != nil {
return nil, err
Expand Down Expand Up @@ -6854,6 +6939,73 @@ func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigSourceHiera
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfig(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedBucket, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(original["bucket"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedBucket); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["bucket"] = transformedBucket
}

transformedConnectionName, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(original["connection_name"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConnectionName); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["connectionName"] = transformedConnectionName
}

transformedFileFormat, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(original["file_format"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedFileFormat); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["fileFormat"] = transformedFileFormat
}

transformedTableFormat, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(original["table_format"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTableFormat); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["tableFormat"] = transformedTableFormat
}

transformedRootPath, err := expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(original["root_path"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedRootPath); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["rootPath"] = transformedRootPath
}

return transformed, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigBucket(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigConnectionName(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigFileFormat(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigTableFormat(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigBlmtConfigRootPath(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandDatastreamStreamDestinationConfigBigqueryDestinationConfigMerge(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ fields:
- field: 'desired_state'
provider_only: true
- field: 'destination_config.bigquery_destination_config.append_only'
- field: 'destination_config.bigquery_destination_config.blmt_config.bucket'
- field: 'destination_config.bigquery_destination_config.blmt_config.connection_name'
- field: 'destination_config.bigquery_destination_config.blmt_config.file_format'
- field: 'destination_config.bigquery_destination_config.blmt_config.root_path'
- field: 'destination_config.bigquery_destination_config.blmt_config.table_format'
- field: 'destination_config.bigquery_destination_config.data_freshness'
- field: 'destination_config.bigquery_destination_config.merge'
- field: 'destination_config.bigquery_destination_config.single_target_dataset.dataset_id'
Expand Down
158 changes: 158 additions & 0 deletions website/docs/r/datastream_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,137 @@ resource "google_datastream_stream" "default" {
}
}
```
## Example Usage - Datastream Stream Bigquery Blmt


```hcl
data "google_project" "project" {
}

resource "google_sql_database_instance" "instance" {
name = "blmt-instance"
database_version = "MYSQL_8_0"
region = "us-central1"
settings {
tier = "db-f1-micro"
ip_configuration {

// Datastream IPs will vary by region.
authorized_networks {
value = "34.71.242.81"
}

authorized_networks {
value = "34.72.28.29"
}

authorized_networks {
value = "34.67.6.157"
}

authorized_networks {
value = "34.67.234.134"
}

authorized_networks {
value = "34.72.239.218"
}
}
}
deletion_protection = true
}

resource "google_sql_database" "db" {
instance = google_sql_database_instance.instance.name
name = "db"
}

resource "random_password" "pwd" {
length = 16
special = false
}

resource "google_sql_user" "user" {
name = "user"
instance = google_sql_database_instance.instance.name
host = "%"
password = random_password.pwd.result
}

resource "google_storage_bucket" "blmt_bucket" {
# Use variable from Stream.yaml for the name
name = "blmt-bucket"
location = "us-central1"
force_destroy = true
}

resource "google_bigquery_connection" "blmt_connection" {
project = data.google_project.project.project_id
location = "us-central1"
connection_id = "blmt-connection"
friendly_name = "Datastream BLMT Test Connection"
description = "Connection for Datastream BLMT test"

cloud_resource {}
}

resource "google_storage_bucket_iam_member" "blmt_connection_bucket_admin" {
bucket = google_storage_bucket.blmt_bucket.name
role = "roles/storage.admin"
member = "serviceAccount:${google_bigquery_connection.blmt_connection.cloud_resource[0].service_account_id}"
}

resource "google_datastream_connection_profile" "source_connection_profile" {
display_name = "Source connection profile"
location = "us-central1"
connection_profile_id = "blmt-source-profile"

mysql_profile {
hostname = google_sql_database_instance.instance.public_ip_address
username = google_sql_user.user.name
password = google_sql_user.user.password
}
}

resource "google_datastream_connection_profile" "destination_connection_profile" {
display_name = "Connection profile"
location = "us-central1"
connection_profile_id = "blmt-destination-profile"

bigquery_profile {}
}

resource "google_datastream_stream" "default" {
stream_id = "blmt-stream"
location = "us-central1"
display_name = "My BLMT stream"
source_config {
source_connection_profile = google_datastream_connection_profile.source_connection_profile.id
mysql_source_config {}
}
destination_config {
destination_connection_profile = google_datastream_connection_profile.destination_connection_profile.id
bigquery_destination_config {
source_hierarchy_datasets {
dataset_template {
location = "us-central1"
}
}
blmt_config {
bucket = google_storage_bucket.blmt_bucket.name
connection_name = "${google_bigquery_connection.blmt_connection.project}.${google_bigquery_connection.blmt_connection.location}.${google_bigquery_connection.blmt_connection.connection_id}"
file_format = "PARQUET"
table_format = "ICEBERG"
root_path = "/"
}
append_only {}
}
}

backfill_none {
}
}
```

## Argument Reference

Expand Down Expand Up @@ -1878,6 +2009,11 @@ The following arguments are supported:
Destination datasets are created so that hierarchy of the destination data objects matches the source hierarchy.
Structure is [documented below](#nested_destination_config_bigquery_destination_config_source_hierarchy_datasets).

* `blmt_config` -
(Optional)
BigLake Managed Tables configuration for BigQuery streams.
Structure is [documented below](#nested_destination_config_bigquery_destination_config_blmt_config).

* `merge` -
(Optional)
Merge mode defines that all changes to a table will be merged at the destination Google BigQuery
Expand Down Expand Up @@ -1925,6 +2061,28 @@ The following arguments are supported:
encryption key. i.e. projects/{project}/locations/{location}/keyRings/{key_ring}/cryptoKeys/{cryptoKey}.
See https://cloud.google.com/bigquery/docs/customer-managed-encryption for more information.

<a name="nested_destination_config_bigquery_destination_config_blmt_config"></a>The `blmt_config` block supports:

* `bucket` -
(Required)
The Cloud Storage bucket name.

* `connection_name` -
(Required)
The bigquery connection. Format: `{project}.{location}.{name}`

* `file_format` -
(Required)
The file format.

* `table_format` -
(Required)
The table format.

* `root_path` -
(Optional)
The root path inside the Cloud Storage bucket.

- - -


Expand Down
Loading