Skip to content

Commit 75212de

Browse files
Fix crash when parameter.enableStreamingEngine is set on google_dataflow_flex_template_job, update resource's docs (#10303) (#7160)
[upstream:561b6a3f1501f8e40bb7ca1c0f92108665bdccaa] Signed-off-by: Modular Magician <[email protected]>
1 parent 5b6185b commit 75212de

File tree

4 files changed

+198
-7
lines changed

4 files changed

+198
-7
lines changed

.changelog/10303.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:bug
2+
dataflow: fixed an issue where the provider would crash when `enableStreamingEngine` is set as a `parameter` value in `google_dataflow_flex_template_job`
3+
```

google-beta/services/dataflow/resource_dataflow_flex_template_job.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,15 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
364364
var enableStreamingEngine bool
365365
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
366366
delete(updatedParameters, "enableStreamingEngine")
367-
enableStreamingEngine = p.(bool)
367+
e := strings.ToLower(p.(string))
368+
switch e {
369+
case "true":
370+
enableStreamingEngine = true
371+
case "false":
372+
enableStreamingEngine = false
373+
default:
374+
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
375+
}
368376
} else {
369377
if v, ok := d.GetOk("enable_streaming_engine"); ok {
370378
enableStreamingEngine = v.(bool)

google-beta/services/dataflow/resource_dataflow_flex_template_job_test.go

+152
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,46 @@ func TestAccDataflowJob_withAttributionLabelProactive(t *testing.T) {
555555
})
556556
}
557557

558+
// Test implementation of enabling streaming engine via parameters or via argument in resource block
559+
// NOTE: these fields are immutable, so the resource is being recreated between both test steps.
560+
func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
561+
// Dataflow responses include serialized java classes and bash commands
562+
// This makes body comparison infeasible
563+
acctest.SkipIfVcr(t)
564+
t.Parallel()
565+
566+
randStr := acctest.RandString(t, 10)
567+
job := "tf-test-dataflow-job-" + randStr
568+
bucket := "tf-test-dataflow-bucket-" + randStr
569+
topic := "tf-test-topic" + randStr
570+
571+
acctest.VcrTest(t, resource.TestCase{
572+
PreCheck: func() { acctest.AccTestPreCheck(t) },
573+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
574+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
575+
Steps: []resource.TestStep{
576+
{
577+
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
578+
Check: resource.ComposeTestCheckFunc(
579+
// Is set
580+
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
581+
// Is not set
582+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
583+
),
584+
},
585+
{
586+
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
587+
Check: resource.ComposeTestCheckFunc(
588+
// Now is unset
589+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
590+
// Now is set
591+
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
592+
),
593+
},
594+
},
595+
})
596+
}
597+
558598
func testAccDataflowFlexTemplateJobHasNetwork(t *testing.T, res, expected string, wait bool) resource.TestCheckFunc {
559599
return func(s *terraform.State) error {
560600
instanceTmpl, err := testAccDataflowFlexTemplateGetGeneratedInstanceTemplate(t, s, res)
@@ -1755,3 +1795,115 @@ func testAccDataflowFlexJobExists(t *testing.T, resource string, wait bool) reso
17551795
return nil
17561796
}
17571797
}
1798+
1799+
// Set parameters.enableStreamingEngine value in parameters map to control feature enablement (versus using enable_streaming_engine field)
1800+
func testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topicName string) string {
1801+
return fmt.Sprintf(`
1802+
1803+
resource "google_pubsub_topic" "example" {
1804+
name = "%s"
1805+
}
1806+
1807+
data "google_storage_bucket_object" "flex_template" {
1808+
name = "latest/flex/Streaming_Data_Generator"
1809+
bucket = "dataflow-templates"
1810+
}
1811+
1812+
resource "google_storage_bucket" "bucket" {
1813+
name = "%s"
1814+
location = "US-CENTRAL1"
1815+
force_destroy = true
1816+
uniform_bucket_level_access = true
1817+
}
1818+
1819+
resource "google_storage_bucket_object" "schema" {
1820+
name = "schema.json"
1821+
bucket = google_storage_bucket.bucket.name
1822+
content = <<EOF
1823+
{
1824+
"eventId": "{{uuid()}}",
1825+
"eventTimestamp": {{timestamp()}},
1826+
"ipv4": "{{ipv4()}}",
1827+
"ipv6": "{{ipv6()}}",
1828+
"country": "{{country()}}",
1829+
"username": "{{username()}}",
1830+
"quest": "{{random("A Break In the Ice", "Ghosts of Perdition", "Survive the Low Road")}}",
1831+
"score": {{integer(100, 10000)}},
1832+
"completed": {{bool()}}
1833+
}
1834+
EOF
1835+
}
1836+
1837+
resource "google_dataflow_flex_template_job" "flex_job" {
1838+
name = "%s"
1839+
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
1840+
on_delete = "cancel"
1841+
parameters = {
1842+
schemaLocation = "gs://${google_storage_bucket_object.schema.bucket}/schema.json"
1843+
qps = "1"
1844+
topic = google_pubsub_topic.example.id
1845+
enableStreamingEngine = true
1846+
}
1847+
labels = {
1848+
"my_labels" = "value"
1849+
}
1850+
}
1851+
`, topicName, bucket, job)
1852+
}
1853+
1854+
// Set enable_streaming_engine field to control feature enablement (versus using parameters.enableStreamingEngine)
1855+
func testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topicName string) string {
1856+
return fmt.Sprintf(`
1857+
1858+
resource "google_pubsub_topic" "example" {
1859+
name = "%s"
1860+
}
1861+
1862+
data "google_storage_bucket_object" "flex_template" {
1863+
name = "latest/flex/Streaming_Data_Generator"
1864+
bucket = "dataflow-templates"
1865+
}
1866+
1867+
resource "google_storage_bucket" "bucket" {
1868+
name = "%s"
1869+
location = "US-CENTRAL1"
1870+
force_destroy = true
1871+
uniform_bucket_level_access = true
1872+
}
1873+
1874+
resource "google_storage_bucket_object" "schema" {
1875+
name = "schema.json"
1876+
bucket = google_storage_bucket.bucket.name
1877+
content = <<EOF
1878+
{
1879+
"eventId": "{{uuid()}}",
1880+
"eventTimestamp": {{timestamp()}},
1881+
"ipv4": "{{ipv4()}}",
1882+
"ipv6": "{{ipv6()}}",
1883+
"country": "{{country()}}",
1884+
"username": "{{username()}}",
1885+
"quest": "{{random("A Break In the Ice", "Ghosts of Perdition", "Survive the Low Road")}}",
1886+
"score": {{integer(100, 10000)}},
1887+
"completed": {{bool()}}
1888+
}
1889+
EOF
1890+
}
1891+
1892+
resource "google_dataflow_flex_template_job" "flex_job" {
1893+
name = "%s"
1894+
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
1895+
on_delete = "cancel"
1896+
1897+
enable_streaming_engine = true
1898+
1899+
parameters = {
1900+
schemaLocation = "gs://${google_storage_bucket_object.schema.bucket}/schema.json"
1901+
qps = "1"
1902+
topic = google_pubsub_topic.example.id
1903+
}
1904+
labels = {
1905+
"my_labels" = "value"
1906+
}
1907+
}
1908+
`, topicName, bucket, job)
1909+
}

website/docs/r/dataflow_flex_template_job.html.markdown

+34-6
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,27 @@ resource "google_dataflow_flex_template_job" "big_data_job" {
8484

8585
The following arguments are supported:
8686

87-
* `name` - (Required) A unique name for the resource, required by Dataflow.
87+
* `name` - (Required) Immutable. A unique name for the resource, required by Dataflow.
8888

8989
* `container_spec_gcs_path` - (Required) The GCS path to the Dataflow job Flex
9090
Template.
9191

9292
- - -
9393

94+
* `additional_experiments` - (Optional) List of experiments that should be used by the job. An example value is `["enable_stackdriver_agent_metrics"]`.
95+
96+
* `autoscaling_algorithm` - (Optional) The algorithm to use for autoscaling.
97+
9498
* `parameters` - (Optional) Key/Value pairs to be passed to the Dataflow job (as
9599
used in the template). Additional [pipeline options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-dataflow-pipeline-options)
96100
such as `serviceAccount`, `workerMachineType`, etc can be specified here.
97101

102+
* `enable_streaming_engine` - (Optional) Immutable. Indicates if the job should use the streaming engine feature.
103+
104+
* `ip_configuration` - (Optional) The configuration for VM IPs. Options are `"WORKER_IP_PUBLIC"` or `"WORKER_IP_PRIVATE"`.
105+
106+
* `kms_key_name` - (Optional) The name for the Cloud KMS key for the job. Key format is: `projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`
107+
98108
* `labels` - (Optional) User labels to be specified for the job. Keys and values
99109
should follow the restrictions specified in the [labeling restrictions](https://cloud.google.com/compute/docs/labeling-resources#restrictions)
100110
page.
@@ -106,21 +116,39 @@ page.
106116
* `effective_labels` -
107117
All of labels (key/value pairs) present on the resource in GCP, including the labels configured through Terraform, other clients and services.
108118

119+
* `launcher_machine_type` - (Optional) The machine type to use for launching the job. The default is n1-standard-1.
120+
121+
* `machine_type` - (Optional) The machine type to use for the job.
122+
123+
* `max_workers` - (Optional) Immutable. The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000.
124+
125+
* `network` - (Optional) The network to which VMs will be assigned. If it is not provided, "default" will be used.
126+
127+
* `num_workers` - (Optional) Immutable. The initial number of Google Compute Engine instances for the job.
128+
109129
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
110130
deletion during `terraform destroy`. See above note.
111131

132+
* `project` - (Optional) The project in which the resource belongs. If it is not
133+
provided, the provider project is used.
134+
135+
* `region` - (Optional) Immutable. The region in which the created job should run.
136+
137+
* `sdk_container_image` - (Optional) Docker registry location of container image to use for the 'worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable pipelines.
138+
139+
* `service_account_email` - (Optional) Service account email to run the workers as.
140+
112141
* `skip_wait_on_job_termination` - (Optional) If set to `true`, terraform will
113142
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
114143
and will remove the resource from terraform state and move on. See above note.
115144

116-
* `project` - (Optional) The project in which the resource belongs. If it is not
117-
provided, the provider project is used.
145+
* `staging_location` - (Optional) The Cloud Storage path to use for staging files. Must be a valid Cloud Storage URL, beginning with gs://.
118146

119-
* `region` - (Optional) The region in which the created job should run.
147+
* `subnetwork` - (Optional) The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".
120148

121-
* `service_account_email` - (Optional) Service account email to run the workers as.
149+
* `temp_location` - (Optional) The Cloud Storage path to use for temporary files. Must be a valid Cloud Storage URL, beginning with gs://.
122150

123-
* `subnetwork` - (Optional) Compute Engine subnetwork for launching instances to run your pipeline.
151+
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job.Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job.
124152

125153
## Attributes Reference
126154
In addition to the arguments listed above, the following computed attributes are exported:

0 commit comments

Comments
 (0)