Skip to content

Commit 81164c3

Browse files
Add skip_wait_on_job_termination option for dataflow job resources (#5844) (#11452)
Signed-off-by: Modular Magician <[email protected]>
1 parent 8a4b59e commit 81164c3

File tree

5 files changed

+169
-8
lines changed

5 files changed

+169
-8
lines changed

.changelog/5844.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
dataflow: added `skip_wait_on_job_termination` attribute to `google_dataflow_job` and `google_dataflow_flex_template_job` resources (issue #10559)
3+
```

google/resource_dataflow_job.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import (
1818

1919
const resourceDataflowJobGoogleProvidedLabelPrefix = "labels.goog-dataflow-provided"
2020

21+
var dataflowTerminatingStatesMap = map[string]struct{}{
22+
"JOB_STATE_CANCELLING": {},
23+
"JOB_STATE_DRAINING": {},
24+
}
25+
2126
var dataflowTerminalStatesMap = map[string]struct{}{
2227
"JOB_STATE_DONE": {},
2328
"JOB_STATE_FAILED": {},
@@ -204,6 +209,13 @@ func resourceDataflowJob() *schema.Resource {
204209
Optional: true,
205210
Description: `Indicates if the job should use the streaming engine feature.`,
206211
},
212+
213+
"skip_wait_on_job_termination": {
214+
Type: schema.TypeBool,
215+
Optional: true,
216+
Default: false,
217+
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`,
218+
},
207219
},
208220
UseJSONNumber: true,
209221
}
@@ -233,6 +245,16 @@ func resourceDataflowJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceD
233245
return nil
234246
}
235247

248+
// return true if a job is in a terminal state, OR if a job is in a
249+
// terminating state and skipWait is true
250+
func shouldStopDataflowJobDeleteQuery(state string, skipWait bool) bool {
251+
_, stopQuery := dataflowTerminalStatesMap[state]
252+
if !stopQuery && skipWait {
253+
_, stopQuery = dataflowTerminatingStatesMap[state]
254+
}
255+
return stopQuery
256+
}
257+
236258
func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
237259
config := meta.(*Config)
238260
userAgent, err := generateUserAgentString(d, config.userAgent)
@@ -343,7 +365,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
343365
return fmt.Errorf("Error setting additional_experiments: %s", err)
344366
}
345367

346-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
368+
if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok {
347369
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
348370
d.SetId("")
349371
return nil
@@ -469,8 +491,9 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
469491
return err
470492
}
471493

472-
// Wait for state to reach terminal state (canceled/drained/done)
473-
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
494+
// Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait)
495+
skipWait := d.Get("skip_wait_on_job_termination").(bool)
496+
ok := shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
474497
for !ok {
475498
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
476499
time.Sleep(5 * time.Second)
@@ -479,11 +502,11 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
479502
if err != nil {
480503
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
481504
}
482-
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
505+
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
483506
}
484507

485-
// Only remove the job from state if it's actually successfully canceled.
486-
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
508+
// Only remove the job from state if it's actually successfully hit a final state.
509+
if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok {
487510
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
488511
d.SetId("")
489512
return nil

google/resource_dataflow_job_test.go

+73-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package google
22

33
import (
44
"fmt"
5+
"strconv"
56
"strings"
67
"testing"
78
"time"
@@ -44,6 +45,32 @@ func TestAccDataflowJob_basic(t *testing.T) {
4445
})
4546
}
4647

48+
func TestAccDataflowJobSkipWait_basic(t *testing.T) {
49+
// Dataflow responses include serialized java classes and bash commands
50+
// This makes body comparison infeasible
51+
skipIfVcr(t)
52+
t.Parallel()
53+
54+
randStr := randString(t, 10)
55+
bucket := "tf-test-dataflow-gcs-" + randStr
56+
job := "tf-test-dataflow-job-" + randStr
57+
zone := "us-central1-f"
58+
59+
vcrTest(t, resource.TestCase{
60+
PreCheck: func() { testAccPreCheck(t) },
61+
Providers: testAccProviders,
62+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
63+
Steps: []resource.TestStep{
64+
{
65+
Config: testAccDataflowJobSkipWait_zone(bucket, job, zone),
66+
Check: resource.ComposeTestCheckFunc(
67+
testAccDataflowJobExists(t, "google_dataflow_job.big_data"),
68+
),
69+
},
70+
},
71+
})
72+
}
73+
4774
func TestAccDataflowJob_withRegion(t *testing.T) {
4875
// Dataflow responses include serialized java classes and bash commands
4976
// This makes body comparison infeasible
@@ -329,7 +356,16 @@ func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.Stat
329356
config := googleProviderConfig(t)
330357
job, err := config.NewDataflowClient(config.userAgent).Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
331358
if job != nil {
332-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
359+
var ok bool
360+
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
361+
if err != nil {
362+
return fmt.Errorf("could not parse attribute: %v", err)
363+
}
364+
_, ok = dataflowTerminalStatesMap[job.CurrentState]
365+
if !ok && skipWait {
366+
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
367+
}
368+
if !ok {
333369
return fmt.Errorf("Job still present")
334370
}
335371
} else if err != nil {
@@ -351,7 +387,16 @@ func testAccCheckDataflowJobRegionDestroyProducer(t *testing.T) func(s *terrafor
351387
config := googleProviderConfig(t)
352388
job, err := config.NewDataflowClient(config.userAgent).Projects.Locations.Jobs.Get(config.Project, "us-central1", rs.Primary.ID).Do()
353389
if job != nil {
354-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
390+
var ok bool
391+
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
392+
if err != nil {
393+
return fmt.Errorf("could not parse attribute: %v", err)
394+
}
395+
_, ok = dataflowTerminalStatesMap[job.CurrentState]
396+
if !ok && skipWait {
397+
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
398+
}
399+
if !ok {
355400
return fmt.Errorf("Job still present")
356401
}
357402
} else if err != nil {
@@ -635,6 +680,32 @@ resource "google_dataflow_job" "big_data" {
635680
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
636681
}
637682

683+
func testAccDataflowJobSkipWait_zone(bucket, job, zone string) string {
684+
return fmt.Sprintf(`
685+
resource "google_storage_bucket" "temp" {
686+
name = "%s"
687+
location = "US"
688+
force_destroy = true
689+
}
690+
691+
resource "google_dataflow_job" "big_data" {
692+
name = "%s"
693+
694+
zone = "%s"
695+
696+
machine_type = "e2-standard-2"
697+
template_gcs_path = "%s"
698+
temp_gcs_location = google_storage_bucket.temp.url
699+
parameters = {
700+
inputFile = "%s"
701+
output = "${google_storage_bucket.temp.url}/output"
702+
}
703+
on_delete = "cancel"
704+
skip_wait_on_job_termination = true
705+
}
706+
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
707+
}
708+
638709
func testAccDataflowJob_region(bucket, job string) string {
639710
return fmt.Sprintf(`
640711
resource "google_storage_bucket" "temp" {

website/docs/r/dataflow_flex_template_job.html.markdown

+36
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,38 @@ is "cancelled", but if a user sets `on_delete` to `"drain"` in the
4848
configuration, you may experience a long wait for your `terraform destroy` to
4949
complete.
5050

51+
You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
52+
to `true`, but beware that unless you take active steps to ensure that the job
53+
`name` parameter changes between instances, the name will conflict and the launch
54+
of the new job will fail. One way to do this is with a
55+
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
56+
resource, for example:
57+
58+
```hcl
59+
variable "big_data_job_subscription_id" {
60+
type = string
61+
default = "projects/myproject/subscriptions/messages"
62+
}
63+
64+
resource "random_id" "big_data_job_name_suffix" {
65+
byte_length = 4
66+
keepers = {
67+
region = var.region
68+
subscription_id = var.big_data_job_subscription_id
69+
}
70+
}
71+
resource "google_dataflow_flex_template_job" "big_data_job" {
72+
provider = google-beta
73+
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
74+
region = var.region
75+
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
76+
skip_wait_for_job_termination = true
77+
parameters = {
78+
inputSubscription = var.big_data_job_subscription_id
79+
}
80+
}
81+
```
82+
5183
## Argument Reference
5284

5385
The following arguments are supported:
@@ -74,6 +106,10 @@ labels will be ignored to prevent diffs on re-apply.
74106
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
75107
deletion during `terraform destroy`. See above note.
76108

109+
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will
110+
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
111+
and will remove the resource from terraform state and move on. See above note.
112+
77113
* `project` - (Optional) The project in which the resource belongs. If it is not
78114
provided, the provider project is used.
79115

website/docs/r/dataflow_job.html.markdown

+28
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,33 @@ The Dataflow resource is considered 'existing' while it is in a nonterminal stat
6565

6666
A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "drain". When `on_delete` is set to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.
6767

68+
You can potentially short-circuit the wait by setting `skip_wait_for_job_termination` to `true`, but beware that unless you take active steps to ensure that the job `name` parameter changes between instances, the name will conflict and the launch of the new job will fail. One way to do this is with a [random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) resource, for example:
69+
70+
```hcl
71+
variable "big_data_job_subscription_id" {
72+
type = string
73+
default = "projects/myproject/subscriptions/messages"
74+
}
75+
76+
resource "random_id" "big_data_job_name_suffix" {
77+
byte_length = 4
78+
keepers = {
79+
region = var.region
80+
subscription_id = var.big_data_job_subscription_id
81+
}
82+
}
83+
resource "google_dataflow_flex_template_job" "big_data_job" {
84+
provider = google-beta
85+
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
86+
region = var.region
87+
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
88+
skip_wait_for_job_termination = true
89+
parameters = {
90+
inputSubscription = var.big_data_job_subscription_id
91+
}
92+
}
93+
```
94+
6895
## Argument Reference
6996

7097
The following arguments are supported:
@@ -83,6 +110,7 @@ The following arguments are supported:
83110
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job. This field is not used outside of update.
84111
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
85112
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
113+
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource, and will remove the resource from terraform state and move on. See above note.
86114
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
87115
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
88116
* `region` - (Optional) The region in which the created job should run.

0 commit comments

Comments
 (0)