Skip to content

Revert "Prevent Dataflow options in parameters" #11485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -310,23 +310,38 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
if err := hasIllegalParametersErr(d); err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

numWorkers, err := parseInt64("num_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}

maxNumWorkers, err := parseInt64("max_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
Expand All @@ -345,10 +360,23 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}
var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

Expand All @@ -357,8 +385,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: numWorkers,
MaxWorkers: maxNumWorkers,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
Expand Down Expand Up @@ -812,44 +840,4 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
return nil
}

func hasIllegalParametersErr(d *schema.ResourceData) error {
pKey := "parameters"
errFmt := "%s must not include Dataflow options, found: %s"
for k := range ResourceDataflowFlexTemplateJob().Schema {
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
return fmt.Errorf(errFmt, pKey, k)
}
kk := tpgresource.SnakeToPascalCase(k)
kk = strings.ToLower(kk)
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
return fmt.Errorf(errFmt, pKey, kk)
}
}
return nil
}

func parseInt64(name string, d *schema.ResourceData) (int64, error) {
v, ok := d.GetOk(name)
if !ok {
return 0, nil
}
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
if err != nil {
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

func parseBool(name string, d *schema.ResourceData) (bool, error) {
v, ok := d.GetOk(name)
if !ok {
return false, nil
}
vv, err := strconv.ParseBool(fmt.Sprint(v))
if err != nil {
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

{{ end }}
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,20 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
ExpectError: regexp.MustCompile("must not include Dataflow options"),
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,24 +311,39 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
if err := hasIllegalParametersErr(d); err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

numWorkers, err := parseInt64("num_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}
var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}

maxNumWorkers, err := parseInt64("max_workers", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}
var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)

Expand All @@ -346,10 +361,23 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
}
var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
e := strings.ToLower(p.(string))
switch e {
case "true":
enableStreamingEngine = true
case "false":
enableStreamingEngine = false
default:
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
}
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

Expand All @@ -358,8 +386,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: numWorkers,
MaxWorkers: maxNumWorkers,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
Expand Down Expand Up @@ -813,44 +841,4 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
return nil
}

func hasIllegalParametersErr(d *schema.ResourceData) error {
pKey := "parameters"
errFmt := "%s must not include Dataflow options, found: %s"
for k := range ResourceDataflowFlexTemplateJob().Schema {
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
return fmt.Errorf(errFmt, pKey, k)
}
kk :=tpgresource.SnakeToPascalCase(k)
kk = strings.ToLower(kk)
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
return fmt.Errorf(errFmt, pKey, kk)
}
}
return nil
}

func parseInt64(name string, d *schema.ResourceData) (int64, error) {
v, ok := d.GetOk(name)
if !ok {
return 0, nil
}
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
if err != nil {
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

func parseBool(name string, d *schema.ResourceData) (bool, error) {
v, ok := d.GetOk(name)
if !ok {
return false, nil
}
vv, err := strconv.ParseBool(fmt.Sprint(v))
if err != nil {
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
}
return vv, nil
}

<% end -%>
Original file line number Diff line number Diff line change
Expand Up @@ -583,11 +583,19 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
Steps: []resource.TestStep{
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
ExpectError: regexp.MustCompile("must not include Dataflow options"),
Check: resource.ComposeTestCheckFunc(
// Is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
// Is not set
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
),
},
{
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
Check: resource.ComposeTestCheckFunc(
// Now is unset
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
// Now is set
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
),
},
Expand Down
Loading