Skip to content

Commit 834d30b

Browse files
authored
Update Stream.yaml to add cdc method support for MySQL (#12465)
1 parent 1a28a91 commit 834d30b

File tree

3 files changed

+186
-1
lines changed

3 files changed

+186
-1
lines changed

mmv1/products/datastream/Stream.yaml

+34
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,22 @@ examples:
133133
'deletion_protection': 'false'
134134
# Requires SQLServer Configuration
135135
exclude_test: true
136+
- name: 'datastream_stream_mysql_gtid'
137+
primary_resource_id: 'default'
138+
vars:
139+
database_name: 'db'
140+
database_password: 'password'
141+
database_user: 'user'
142+
deletion_protection: 'true'
143+
destination_connection_profile_id: 'destination-profile'
144+
source_connection_profile_id: 'source-profile'
145+
mysql_name: 'mysql'
146+
mysql_root_password: 'root-password'
147+
stream_id: 'stream'
148+
test_vars_overrides:
149+
'deletion_protection': 'false'
150+
# Requires MySQL Configuration
151+
exclude_test: true
136152
- name: 'datastream_stream_postgresql_bigquery_dataset_id'
137153
primary_resource_id: 'default'
138154
vars:
@@ -420,6 +436,24 @@ properties:
420436
send_empty_value: true
421437
validation:
422438
function: 'validation.IntAtLeast(0)'
439+
- name: 'binaryLogPosition'
440+
type: NestedObject
441+
allow_empty_object: true
442+
send_empty_value: true
443+
conflicts:
444+
- source_config.0.mysql_source_config.gtid
445+
description: |
446+
CDC reader reads from binary logs replication cdc method.
447+
properties: []
448+
- name: 'gtid'
449+
type: NestedObject
450+
allow_empty_object: true
451+
send_empty_value: true
452+
conflicts:
453+
- source_config.0.mysql_source_config.binaryLogPosition
454+
description: |
455+
CDC reader reads from gtid based replication.
456+
properties: []
423457
- name: 'oracleSourceConfig'
424458
type: NestedObject
425459
description: |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
resource "google_sql_database_instance" "instance" {
2+
name = "<%= ctx[:vars]['mysql_name'] %>"
3+
database_version = "MYSQL_8_0"
4+
region = "us-central1"
5+
root_password = "<%= ctx[:vars]['mysql_root_password'] %>"
6+
deletion_protection = "<%= ctx[:vars]['deletion_protection'] %>"
7+
8+
settings {
9+
tier = "db-custom-2-4096"
10+
ip_configuration {
11+
// Datastream IPs will vary by region.
12+
// https://cloud.google.com/datastream/docs/ip-allowlists-and-regions
13+
authorized_networks {
14+
value = "34.71.242.81"
15+
}
16+
17+
authorized_networks {
18+
value = "34.72.28.29"
19+
}
20+
21+
authorized_networks {
22+
value = "34.67.6.157"
23+
}
24+
25+
authorized_networks {
26+
value = "34.67.234.134"
27+
}
28+
29+
authorized_networks {
30+
value = "34.72.239.218"
31+
}
32+
}
33+
}
34+
}
35+
36+
resource "google_sql_database" "db" {
37+
name = "<%= ctx[:vars]['database_name'] %>"
38+
instance = google_sql_database_instance.instance.name
39+
depends_on = [google_sql_user.user]
40+
}
41+
42+
resource "google_sql_user" "user" {
43+
name = "<%= ctx[:vars]['database_user'] %>"
44+
instance = google_sql_database_instance.instance.name
45+
password = "<%= ctx[:vars]['database_password'] %>"
46+
}
47+
48+
resource "google_datastream_connection_profile" "source" {
49+
display_name = "MySQL Source"
50+
location = "us-central1"
51+
connection_profile_id = "<%= ctx[:vars]['source_connection_profile_id'] %>"
52+
53+
mysql_profile {
54+
hostname = google_sql_database_instance.instance.public_ip_address
55+
port = 1433
56+
username = google_sql_user.user.name
57+
password = google_sql_user.user.password
58+
database = google_sql_database.db.name
59+
}
60+
}
61+
62+
resource "google_datastream_connection_profile" "destination" {
63+
display_name = "BigQuery Destination"
64+
location = "us-central1"
65+
connection_profile_id = "<%= ctx[:vars]['destination_connection_profile_id'] %>"
66+
67+
bigquery_profile {}
68+
}
69+
70+
resource "google_datastream_stream" "default" {
71+
display_name = "MySQL to BigQuery"
72+
location = "us-central1"
73+
stream_id = "<%= ctx[:vars]['stream_id'] %>"
74+
75+
source_config {
76+
source_connection_profile = google_datastream_connection_profile.source.id
77+
mysql_source_config {
78+
include_objects {
79+
schemas {
80+
schema = "schema"
81+
tables {
82+
table = "table"
83+
}
84+
}
85+
}
86+
gtid {}
87+
}
88+
}
89+
90+
destination_config {
91+
destination_connection_profile = google_datastream_connection_profile.destination.id
92+
bigquery_destination_config {
93+
data_freshness = "900s"
94+
source_hierarchy_datasets {
95+
dataset_template {
96+
location = "us-central1"
97+
}
98+
}
99+
}
100+
}
101+
102+
backfill_none {}
103+
}

mmv1/third_party/terraform/services/datastream/resource_datastream_stream_test.go

+49-1
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,20 @@ func TestAccDatastreamStream_update(t *testing.T) {
6565
ImportStateVerify: true,
6666
ImportStateVerifyIgnore: []string{"create_without_validation", "stream_id", "location", "desired_state", "labels", "terraform_labels"},
6767
},
68+
{
69+
ResourceName: "google_datastream_stream.gtid",
70+
ImportState: true,
71+
ImportStateVerify: true,
72+
ImportStateVerifyIgnore: []string{"create_without_validation", "stream_id", "location", "desired_state", "labels", "terraform_labels"},
73+
},
6874
{
6975
// Disable prevent_destroy
7076
Config: testAccDatastreamStream_datastreamStreamBasicUpdate(context, "RUNNING", false),
7177
},
78+
{
79+
Config: testAccDatastreamStream_datastreamStreamBasicExample(context),
80+
Check: resource.TestCheckResourceAttr("google_datastream_stream.gtid", "state", "NOT_STARTED"),
81+
},
7282
},
7383
})
7484
}
@@ -204,7 +214,45 @@ resource "google_datastream_stream" "default" {
204214
source_config {
205215
source_connection_profile = google_datastream_connection_profile.source_connection_profile.id
206216
207-
mysql_source_config {}
217+
mysql_source_config {
218+
binary_log_position {}
219+
}
220+
}
221+
destination_config {
222+
destination_connection_profile = google_datastream_connection_profile.destination_connection_profile.id
223+
gcs_destination_config {
224+
path = "mydata"
225+
file_rotation_mb = 200
226+
file_rotation_interval = "60s"
227+
json_file_format {
228+
schema_file_format = "NO_SCHEMA_FILE"
229+
compression = "GZIP"
230+
}
231+
}
232+
}
233+
234+
backfill_all {
235+
}
236+
%{lifecycle_block}
237+
}
238+
239+
resource "google_datastream_stream" "gtid" {
240+
stream_id = "tf-test-my-stream-gtid%{random_suffix}"
241+
location = "us-central1"
242+
display_name = "my gtid stream update"
243+
desired_state = "%{desired_state}"
244+
create_without_validation = true
245+
246+
labels = {
247+
key = "updated"
248+
}
249+
250+
source_config {
251+
source_connection_profile = google_datastream_connection_profile.source_connection_profile.id
252+
253+
mysql_source_config {
254+
gtid {}
255+
}
208256
}
209257
destination_config {
210258
destination_connection_profile = google_datastream_connection_profile.destination_connection_profile.id

0 commit comments

Comments
 (0)