Skip to content

Add support for Stored Procedure for Apache Spark #17028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/9793.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
bigquery: add `spark_options` field to `google_bigquery_routine` resource
```
303 changes: 301 additions & 2 deletions google/services/bigquery/resource_bigquery_routine.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ imported JAVASCRIPT libraries.`,
"language": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", ""}),
Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT"]`,
ValidateFunc: verify.ValidateEnum([]string{"SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA", ""}),
Description: `The language of the routine. Possible values: ["SQL", "JAVASCRIPT", "PYTHON", "JAVA", "SCALA"]`,
},
"return_table_type": {
Type: schema.TypeString,
Expand Down Expand Up @@ -176,6 +176,90 @@ d the order of values or replaced STRUCT field type with RECORD field type, we c
cannot suppress the recurring diff this causes. As a workaround, we recommend using
the schema as returned by the API.`,
},
"spark_options": {
Type: schema.TypeList,
Optional: true,
Description: `Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"archive_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"connection": {
Type: schema.TypeString,
Optional: true,
Description: `Fully qualified name of the user-provided Spark connection object.
Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}"`,
},
"container_image": {
Type: schema.TypeString,
Optional: true,
Description: `Custom container image for the runtime environment.`,
},
"file_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"jar_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"main_class": {
Type: schema.TypeString,
Optional: true,
Description: `The fully qualified name of a class in jarUris, for example, com.example.wordcount.
Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type.`,
},
"main_file_uri": {
Type: schema.TypeString,
Optional: true,
Description: `The main file/jar URI of the Spark application.
Exactly one of the definitionBody field and the mainFileUri field must be set for Python.
Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type.`,
},
"properties": {
Type: schema.TypeMap,
Computed: true,
Optional: true,
Description: `Configuration properties as a set of key/value pairs, which will be passed on to the Spark application.
For more information, see Apache Spark and the procedure option list.
An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.`,
Elem: &schema.Schema{Type: schema.TypeString},
},
"py_file_uris": {
Type: schema.TypeList,
Computed: true,
Optional: true,
Description: `Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark.`,
Elem: &schema.Schema{
Type: schema.TypeString,
},
},
"runtime_version": {
Type: schema.TypeString,
Optional: true,
Description: `Runtime version. If not specified, the default runtime version is used.`,
},
},
},
},
"creation_time": {
Type: schema.TypeInt,
Computed: true,
Expand Down Expand Up @@ -267,6 +351,12 @@ func resourceBigQueryRoutineCreate(d *schema.ResourceData, meta interface{}) err
} else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(determinismLevelProp)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) {
obj["determinismLevel"] = determinismLevelProp
}
sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(sparkOptionsProp)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) {
obj["sparkOptions"] = sparkOptionsProp
}

url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines")
if err != nil {
Expand Down Expand Up @@ -400,6 +490,9 @@ func resourceBigQueryRoutineRead(d *schema.ResourceData, meta interface{}) error
if err := d.Set("determinism_level", flattenBigQueryRoutineDeterminismLevel(res["determinismLevel"], d, config)); err != nil {
return fmt.Errorf("Error reading Routine: %s", err)
}
if err := d.Set("spark_options", flattenBigQueryRoutineSparkOptions(res["sparkOptions"], d, config)); err != nil {
return fmt.Errorf("Error reading Routine: %s", err)
}

return nil
}
Expand Down Expand Up @@ -480,6 +573,12 @@ func resourceBigQueryRoutineUpdate(d *schema.ResourceData, meta interface{}) err
} else if v, ok := d.GetOkExists("determinism_level"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, determinismLevelProp)) {
obj["determinismLevel"] = determinismLevelProp
}
sparkOptionsProp, err := expandBigQueryRoutineSparkOptions(d.Get("spark_options"), d, config)
if err != nil {
return err
} else if v, ok := d.GetOkExists("spark_options"); !tpgresource.IsEmptyValue(reflect.ValueOf(v)) && (ok || !reflect.DeepEqual(v, sparkOptionsProp)) {
obj["sparkOptions"] = sparkOptionsProp
}

url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/routines/{{routine_id}}")
if err != nil {
Expand Down Expand Up @@ -727,6 +826,77 @@ func flattenBigQueryRoutineDeterminismLevel(v interface{}, d *schema.ResourceDat
return v
}

func flattenBigQueryRoutineSparkOptions(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["connection"] =
flattenBigQueryRoutineSparkOptionsConnection(original["connection"], d, config)
transformed["runtime_version"] =
flattenBigQueryRoutineSparkOptionsRuntimeVersion(original["runtimeVersion"], d, config)
transformed["container_image"] =
flattenBigQueryRoutineSparkOptionsContainerImage(original["containerImage"], d, config)
transformed["properties"] =
flattenBigQueryRoutineSparkOptionsProperties(original["properties"], d, config)
transformed["main_file_uri"] =
flattenBigQueryRoutineSparkOptionsMainFileUri(original["mainFileUri"], d, config)
transformed["py_file_uris"] =
flattenBigQueryRoutineSparkOptionsPyFileUris(original["pyFileUris"], d, config)
transformed["jar_uris"] =
flattenBigQueryRoutineSparkOptionsJarUris(original["jarUris"], d, config)
transformed["file_uris"] =
flattenBigQueryRoutineSparkOptionsFileUris(original["fileUris"], d, config)
transformed["archive_uris"] =
flattenBigQueryRoutineSparkOptionsArchiveUris(original["archiveUris"], d, config)
transformed["main_class"] =
flattenBigQueryRoutineSparkOptionsMainClass(original["mainClass"], d, config)
return []interface{}{transformed}
}
func flattenBigQueryRoutineSparkOptionsConnection(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsContainerImage(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsProperties(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsJarUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsFileUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenBigQueryRoutineSparkOptionsMainClass(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func expandBigQueryRoutineRoutineReference(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {

transformed := make(map[string]interface{})
Expand Down Expand Up @@ -852,3 +1022,132 @@ func expandBigQueryRoutineDescription(v interface{}, d tpgresource.TerraformReso
func expandBigQueryRoutineDeterminismLevel(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptions(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedConnection, err := expandBigQueryRoutineSparkOptionsConnection(original["connection"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConnection); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["connection"] = transformedConnection
}

transformedRuntimeVersion, err := expandBigQueryRoutineSparkOptionsRuntimeVersion(original["runtime_version"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedRuntimeVersion); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["runtimeVersion"] = transformedRuntimeVersion
}

transformedContainerImage, err := expandBigQueryRoutineSparkOptionsContainerImage(original["container_image"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedContainerImage); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["containerImage"] = transformedContainerImage
}

transformedProperties, err := expandBigQueryRoutineSparkOptionsProperties(original["properties"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedProperties); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["properties"] = transformedProperties
}

transformedMainFileUri, err := expandBigQueryRoutineSparkOptionsMainFileUri(original["main_file_uri"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMainFileUri); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["mainFileUri"] = transformedMainFileUri
}

transformedPyFileUris, err := expandBigQueryRoutineSparkOptionsPyFileUris(original["py_file_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedPyFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["pyFileUris"] = transformedPyFileUris
}

transformedJarUris, err := expandBigQueryRoutineSparkOptionsJarUris(original["jar_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedJarUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["jarUris"] = transformedJarUris
}

transformedFileUris, err := expandBigQueryRoutineSparkOptionsFileUris(original["file_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedFileUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["fileUris"] = transformedFileUris
}

transformedArchiveUris, err := expandBigQueryRoutineSparkOptionsArchiveUris(original["archive_uris"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedArchiveUris); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["archiveUris"] = transformedArchiveUris
}

transformedMainClass, err := expandBigQueryRoutineSparkOptionsMainClass(original["main_class"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedMainClass); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["mainClass"] = transformedMainClass
}

return transformed, nil
}

func expandBigQueryRoutineSparkOptionsConnection(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsRuntimeVersion(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsContainerImage(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsProperties(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
if v == nil {
return map[string]string{}, nil
}
m := make(map[string]string)
for k, val := range v.(map[string]interface{}) {
m[k] = val.(string)
}
return m, nil
}

func expandBigQueryRoutineSparkOptionsMainFileUri(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsPyFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsJarUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsFileUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsArchiveUris(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandBigQueryRoutineSparkOptionsMainClass(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}
Loading