Skip to content

Commit 8a8e61c

Browse files
authored
Revert "Prevent Dataflow options in parameters" (#11485)
1 parent c52857e commit 8a8e61c

4 files changed

+107
-115
lines changed

mmv1/third_party/terraform/services/dataflow/go/resource_dataflow_flex_template_job.go.tmpl

+43-55
Original file line numberDiff line numberDiff line change
@@ -310,23 +310,38 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
310310
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {
311311

312312
updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
313-
if err := hasIllegalParametersErr(d); err != nil {
314-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
315-
}
316313

317314
additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))
318315

319316
var autoscalingAlgorithm string
320317
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)
321318

322-
numWorkers, err := parseInt64("num_workers", d)
323-
if err != nil {
324-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
319+
var numWorkers int
320+
if p, ok := d.GetOk("parameters.numWorkers"); ok {
321+
number, err := strconv.Atoi(p.(string))
322+
if err != nil {
323+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
324+
}
325+
delete(updatedParameters, "numWorkers")
326+
numWorkers = number
327+
} else {
328+
if v, ok := d.GetOk("num_workers"); ok {
329+
numWorkers = v.(int)
330+
}
325331
}
326332

327-
maxNumWorkers, err := parseInt64("max_workers", d)
328-
if err != nil {
329-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
333+
var maxNumWorkers int
334+
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
335+
number, err := strconv.Atoi(p.(string))
336+
if err != nil {
337+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
338+
}
339+
delete(updatedParameters, "maxNumWorkers")
340+
maxNumWorkers = number
341+
} else {
342+
if v, ok := d.GetOk("max_workers"); ok {
343+
maxNumWorkers = v.(int)
344+
}
330345
}
331346

332347
network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
@@ -345,10 +360,23 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
345360

346361
ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)
347362

348-
enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
349-
if err != nil {
350-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
351-
}
363+
var enableStreamingEngine bool
364+
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
365+
delete(updatedParameters, "enableStreamingEngine")
366+
e := strings.ToLower(p.(string))
367+
switch e {
368+
case "true":
369+
enableStreamingEngine = true
370+
case "false":
371+
enableStreamingEngine = false
372+
default:
373+
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
374+
}
375+
} else {
376+
if v, ok := d.GetOk("enable_streaming_engine"); ok {
377+
enableStreamingEngine = v.(bool)
378+
}
379+
}
352380

353381
sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)
354382

@@ -357,8 +385,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
357385
env := dataflow.FlexTemplateRuntimeEnvironment{
358386
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
359387
AutoscalingAlgorithm: autoscalingAlgorithm,
360-
NumWorkers: numWorkers,
361-
MaxWorkers: maxNumWorkers,
388+
NumWorkers: int64(numWorkers),
389+
MaxWorkers: int64(maxNumWorkers),
362390
Network: network,
363391
ServiceAccountEmail: serviceAccountEmail,
364392
Subnetwork: subnetwork,
@@ -812,44 +840,4 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
812840
return nil
813841
}
814842

815-
func hasIllegalParametersErr(d *schema.ResourceData) error {
816-
pKey := "parameters"
817-
errFmt := "%s must not include Dataflow options, found: %s"
818-
for k := range ResourceDataflowFlexTemplateJob().Schema {
819-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
820-
return fmt.Errorf(errFmt, pKey, k)
821-
}
822-
kk := tpgresource.SnakeToPascalCase(k)
823-
kk = strings.ToLower(kk)
824-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
825-
return fmt.Errorf(errFmt, pKey, kk)
826-
}
827-
}
828-
return nil
829-
}
830-
831-
func parseInt64(name string, d *schema.ResourceData) (int64, error) {
832-
v, ok := d.GetOk(name)
833-
if !ok {
834-
return 0, nil
835-
}
836-
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
837-
if err != nil {
838-
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
839-
}
840-
return vv, nil
841-
}
842-
843-
func parseBool(name string, d *schema.ResourceData) (bool, error) {
844-
v, ok := d.GetOk(name)
845-
if !ok {
846-
return false, nil
847-
}
848-
vv, err := strconv.ParseBool(fmt.Sprint(v))
849-
if err != nil {
850-
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
851-
}
852-
return vv, nil
853-
}
854-
855843
{{ end }}

mmv1/third_party/terraform/services/dataflow/go/resource_dataflow_flex_template_job_test.go.tmpl

+10-2
Original file line numberDiff line numberDiff line change
@@ -581,12 +581,20 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
581581
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
582582
Steps: []resource.TestStep{
583583
{
584-
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
585-
ExpectError: regexp.MustCompile("must not include Dataflow options"),
584+
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
585+
Check: resource.ComposeTestCheckFunc(
586+
// Is set
587+
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
588+
// Is not set
589+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
590+
),
586591
},
587592
{
588593
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
589594
Check: resource.ComposeTestCheckFunc(
595+
// Now is unset
596+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
597+
// Now is set
590598
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
591599
),
592600
},

mmv1/third_party/terraform/services/dataflow/resource_dataflow_flex_template_job.go.erb

+45-57
Original file line numberDiff line numberDiff line change
@@ -311,24 +311,39 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
311311
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {
312312

313313
updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
314-
if err := hasIllegalParametersErr(d); err != nil {
315-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
316-
}
317314

318315
additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))
319316

320317
var autoscalingAlgorithm string
321318
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)
322319

323-
numWorkers, err := parseInt64("num_workers", d)
324-
if err != nil {
325-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
326-
}
320+
var numWorkers int
321+
if p, ok := d.GetOk("parameters.numWorkers"); ok {
322+
number, err := strconv.Atoi(p.(string))
323+
if err != nil {
324+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
325+
}
326+
delete(updatedParameters, "numWorkers")
327+
numWorkers = number
328+
} else {
329+
if v, ok := d.GetOk("num_workers"); ok {
330+
numWorkers = v.(int)
331+
}
332+
}
327333

328-
maxNumWorkers, err := parseInt64("max_workers", d)
329-
if err != nil {
330-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
331-
}
334+
var maxNumWorkers int
335+
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
336+
number, err := strconv.Atoi(p.(string))
337+
if err != nil {
338+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
339+
}
340+
delete(updatedParameters, "maxNumWorkers")
341+
maxNumWorkers = number
342+
} else {
343+
if v, ok := d.GetOk("max_workers"); ok {
344+
maxNumWorkers = v.(int)
345+
}
346+
}
332347

333348
network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
334349

@@ -346,10 +361,23 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
346361

347362
ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)
348363

349-
enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
350-
if err != nil {
351-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
352-
}
364+
var enableStreamingEngine bool
365+
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
366+
delete(updatedParameters, "enableStreamingEngine")
367+
e := strings.ToLower(p.(string))
368+
switch e {
369+
case "true":
370+
enableStreamingEngine = true
371+
case "false":
372+
enableStreamingEngine = false
373+
default:
374+
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
375+
}
376+
} else {
377+
if v, ok := d.GetOk("enable_streaming_engine"); ok {
378+
enableStreamingEngine = v.(bool)
379+
}
380+
}
353381

354382
sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)
355383

@@ -358,8 +386,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
358386
env := dataflow.FlexTemplateRuntimeEnvironment{
359387
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
360388
AutoscalingAlgorithm: autoscalingAlgorithm,
361-
NumWorkers: numWorkers,
362-
MaxWorkers: maxNumWorkers,
389+
NumWorkers: int64(numWorkers),
390+
MaxWorkers: int64(maxNumWorkers),
363391
Network: network,
364392
ServiceAccountEmail: serviceAccountEmail,
365393
Subnetwork: subnetwork,
@@ -813,44 +841,4 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
813841
return nil
814842
}
815843

816-
func hasIllegalParametersErr(d *schema.ResourceData) error {
817-
pKey := "parameters"
818-
errFmt := "%s must not include Dataflow options, found: %s"
819-
for k := range ResourceDataflowFlexTemplateJob().Schema {
820-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
821-
return fmt.Errorf(errFmt, pKey, k)
822-
}
823-
kk :=tpgresource.SnakeToPascalCase(k)
824-
kk = strings.ToLower(kk)
825-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
826-
return fmt.Errorf(errFmt, pKey, kk)
827-
}
828-
}
829-
return nil
830-
}
831-
832-
func parseInt64(name string, d *schema.ResourceData) (int64, error) {
833-
v, ok := d.GetOk(name)
834-
if !ok {
835-
return 0, nil
836-
}
837-
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
838-
if err != nil {
839-
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
840-
}
841-
return vv, nil
842-
}
843-
844-
func parseBool(name string, d *schema.ResourceData) (bool, error) {
845-
v, ok := d.GetOk(name)
846-
if !ok {
847-
return false, nil
848-
}
849-
vv, err := strconv.ParseBool(fmt.Sprint(v))
850-
if err != nil {
851-
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
852-
}
853-
return vv, nil
854-
}
855-
856844
<% end -%>

mmv1/third_party/terraform/services/dataflow/resource_dataflow_flex_template_job_test.go.erb

+9-1
Original file line numberDiff line numberDiff line change
@@ -583,11 +583,19 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
583583
Steps: []resource.TestStep{
584584
{
585585
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
586-
ExpectError: regexp.MustCompile("must not include Dataflow options"),
586+
Check: resource.ComposeTestCheckFunc(
587+
// Is set
588+
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
589+
// Is not set
590+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
591+
),
587592
},
588593
{
589594
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
590595
Check: resource.ComposeTestCheckFunc(
596+
// Now is unset
597+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
598+
// Now is set
591599
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
592600
),
593601
},

0 commit comments

Comments
 (0)