Skip to content

Add support for Stored Procedure for Apache Spark #9793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions mmv1/products/bigquery/Routine.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@ examples:
vars:
dataset_id: 'dataset_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_pyspark'
primary_resource_id: 'pyspark'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_pyspark_mainfile'
primary_resource_id: 'pyspark_mainfile'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
- !ruby/object:Provider::Terraform::Examples
name: 'big_query_routine_spark_jar'
primary_resource_id: 'spark_jar'
vars:
dataset_id: 'dataset_id'
connection_id: 'connection_id'
routine_id: 'routine_id'
properties:
- !ruby/object:Api::Type::NestedObject
name: routineReference
Expand Down Expand Up @@ -101,6 +122,9 @@ properties:
values:
- :SQL
- :JAVASCRIPT
- :PYTHON
- :JAVA
- :SCALA
- !ruby/object:Api::Type::Array
name: 'arguments'
description: Input/output argument of a function or a stored procedure.
Expand Down Expand Up @@ -201,3 +225,61 @@ properties:
- :DETERMINISM_LEVEL_UNSPECIFIED
- :DETERMINISTIC
- :NOT_DETERMINISTIC
- !ruby/object:Api::Type::NestedObject
name: 'sparkOptions'
description: |
Optional. If language is one of "PYTHON", "JAVA", "SCALA", this field stores the options for spark stored procedure.
properties:
- !ruby/object:Api::Type::String
name: 'connection'
description: |
Fully qualified name of the user-provided Spark connection object.
Format: "projects/{projectId}/locations/{locationId}/connections/{connectionId}"
- !ruby/object:Api::Type::String
name: 'runtimeVersion'
description: Runtime version. If not specified, the default runtime version is used.
- !ruby/object:Api::Type::String
name: 'containerImage'
description: Custom container image for the runtime environment.
- !ruby/object:Api::Type::KeyValuePairs
name: "properties"
description: |
Configuration properties as a set of key/value pairs, which will be passed on to the Spark application.
For more information, see Apache Spark and the procedure option list.
An object containing a list of "key": value pairs. Example: { "name": "wrench", "mass": "1.3kg", "count": "3" }.
default_from_api: true
- !ruby/object:Api::Type::String
name: 'mainFileUri'
description: |
The main file/jar URI of the Spark application.
Exactly one of the definitionBody field and the mainFileUri field must be set for Python.
Exactly one of mainClass and mainFileUri field should be set for Java/Scala language type.
- !ruby/object:Api::Type::Array
name: 'pyFileUris'
description: |
Python files to be placed on the PYTHONPATH for PySpark application. Supported file types: .py, .egg, and .zip. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'jarUris'
description: |
JARs to include on the driver and executor CLASSPATH. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'fileUris'
description: |
Files to be placed in the working directory of each executor. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::Array
name: 'archiveUris'
description: |
Archive files to be extracted into the working directory of each executor. For more information about Apache Spark, see Apache Spark.
item_type: Api::Type::String
default_from_api: true
- !ruby/object:Api::Type::String
name: 'mainClass'
description: |
The fully qualified name of a class in jarUris, for example, com.example.wordcount.
Exactly one of mainClass and main_jar_uri field should be set for Java/Scala language type.
41 changes: 41 additions & 0 deletions mmv1/templates/terraform/examples/big_query_routine_pyspark.tf.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "PYTHON"
definition_body = <<-EOS
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()

# Load data from BigQuery.
words = spark.read.format("bigquery") \
.option("table", "bigquery-public-data:samples.shakespeare") \
.load()
words.createOrReplaceTempView("words")

# Perform word count.
word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")
word_count.show()
word_count.printSchema()

# Saving the data to BigQuery
word_count.write.format("bigquery") \
.option("writeMethod", "direct") \
.save("wordcount_dataset.wordcount_output")
EOS
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "PYTHON"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
main_file_uri = "gs://test-bucket/main.py"
py_file_uris = ["gs://test-bucket/lib.py"]
file_uris = ["gs://test-bucket/distribute_in_executor.json"]
archive_uris = ["gs://test-bucket/distribute_in_executor.tar.gz"]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
resource "google_bigquery_dataset" "test" {
dataset_id = "<%= ctx[:vars]['dataset_id'] %>"
}

resource "google_bigquery_connection" "test" {
connection_id = "<%= ctx[:vars]['connection_id'] %>"
location = "US"
spark { }
}

resource "google_bigquery_routine" "<%= ctx[:primary_resource_id] %>" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "<%= ctx[:vars]['routine_id'] %>"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.1"
container_image = "gcr.io/my-project-id/my-spark-image:latest"
main_class = "com.google.test.jar.MainClass"
jar_uris = [ "gs://test-bucket/uberjar_spark_spark3.jar" ]
properties = {
"spark.dataproc.scaling.version" : "2",
"spark.reducer.fetchMigratedShuffle.enabled" : "true",
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,96 @@ resource "google_bigquery_routine" "sproc" {
}
`, dataset, routine)
}

func TestAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(t *testing.T) {
t.Parallel()

context := map[string]interface{}{
"random_suffix": acctest.RandString(t, 10),
}

acctest.VcrTest(t, resource.TestCase{
PreCheck: func() { acctest.AccTestPreCheck(t) },
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
CheckDestroy: testAccCheckBigQueryRoutineDestroyProducer(t),
Steps: []resource.TestStep{
{
Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar(context),
},
{
ResourceName: "google_bigquery_routine.spark_jar",
ImportState: true,
ImportStateVerify: true,
},
{
Config: testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context),
},
{
ResourceName: "google_bigquery_routine.spark_jar",
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func testAccBigQueryRoutine_bigQueryRoutineSparkJar(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_dataset_id%{random_suffix}"
}

resource "google_bigquery_connection" "test" {
connection_id = "tf_test_connection_id%{random_suffix}"
location = "US"
spark { }
}

resource "google_bigquery_routine" "spark_jar" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "tf_test_routine_id%{random_suffix}"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test.name
runtime_version = "2.0"
main_class = "com.google.test.jar.MainClass"
jar_uris = [ "gs://test-bucket/testjar_spark_spark3.jar" ]
}
}
`, context)
}

func testAccBigQueryRoutine_bigQueryRoutineSparkJar_Update(context map[string]interface{}) string {
return acctest.Nprintf(`
resource "google_bigquery_dataset" "test" {
dataset_id = "tf_test_dataset_id%{random_suffix}"
}

resource "google_bigquery_connection" "test_updated" {
connection_id = "tf_test_connection_updated_id%{random_suffix}"
location = "US"
spark { }
}

resource "google_bigquery_routine" "spark_jar" {
dataset_id = google_bigquery_dataset.test.dataset_id
routine_id = "tf_test_routine_id%{random_suffix}"
routine_type = "PROCEDURE"
language = "SCALA"
definition_body = ""
spark_options {
connection = google_bigquery_connection.test_updated.name
runtime_version = "2.1"
container_image = "gcr.io/my-project-id/my-spark-image:latest"
main_class = "com.google.test.jar.MainClassUpdated"
jar_uris = [ "gs://test-bucket/uberjar_spark_spark3_updated.jar" ]
properties = {
"spark.dataproc.scaling.version" : "2",
"spark.reducer.fetchMigratedShuffle.enabled" : "true",
}
}
}
`, context)
}