Skip to content

Commit 692bae2

Browse files
authored
Add support for setting Pub/Sub Cloud Storage subscription max_messages and use_topic_schema (#11583)
1 parent e169437 commit 692bae2

File tree

4 files changed

+80
-20
lines changed

4 files changed

+80
-20
lines changed

mmv1/products/pubsub/Subscription.yaml

+8
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,10 @@ properties:
212212
description: |
213213
The maximum bytes that can be written to a Cloud Storage file before a new file is created. Min 1 KB, max 10 GiB.
214214
The maxBytes limit may be exceeded in cases where messages are larger than the limit.
215+
- !ruby/object:Api::Type::Integer
216+
name: 'maxMessages'
217+
description: |
218+
The maximum messages that can be written to a Cloud Storage file before a new file is created. Min 1000 messages.
215219
- !ruby/object:Api::Type::Enum
216220
name: 'state'
217221
description: |
@@ -230,6 +234,10 @@ properties:
230234
name: 'writeMetadata'
231235
description: |
232236
When true, write the subscription name, messageId, publishTime, attributes, and orderingKey as additional fields in the output.
237+
- !ruby/object:Api::Type::Boolean
238+
name: 'useTopicSchema'
239+
description: |
240+
When true, the output Cloud Storage file will be serialized using the topic schema, if it exists.
233241
- !ruby/object:Api::Type::String
234242
name: 'serviceAccountEmail'
235243
description: |

mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage.tf.erb

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" {
2121

2222
max_bytes = 1000
2323
max_duration = "300s"
24+
max_messages = 1000
2425
}
2526
depends_on = [
2627
google_storage_bucket.<%= ctx[:primary_resource_id] %>,

mmv1/templates/terraform/examples/pubsub_subscription_push_cloudstorage_avro.tf.erb

+2
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@ resource "google_pubsub_subscription" "<%= ctx[:primary_resource_id] %>" {
2121

2222
max_bytes = 1000
2323
max_duration = "300s"
24+
max_messages = 1000
2425

2526
avro_config {
2627
write_metadata = true
28+
use_topic_schema = true
2729
}
2830
}
2931
depends_on = [

mmv1/third_party/terraform/services/pubsub/resource_pubsub_subscription_test.go

+69-20
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func TestAccPubsubSubscriptionBigQuery_serviceAccount(t *testing.T) {
250250
})
251251
}
252252

253-
func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
253+
func TestAccPubsubSubscriptionCloudStorage_updateText(t *testing.T) {
254254
t.Parallel()
255255

256256
bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
@@ -263,7 +263,7 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
263263
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
264264
Steps: []resource.TestStep{
265265
{
266-
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", ""),
266+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "text"),
267267
},
268268
{
269269
ResourceName: "google_pubsub_subscription.foo",
@@ -272,7 +272,41 @@ func TestAccPubsubSubscriptionCloudStorage_update(t *testing.T) {
272272
ImportStateVerify: true,
273273
},
274274
{
275-
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
275+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
276+
},
277+
{
278+
ResourceName: "google_pubsub_subscription.foo",
279+
ImportStateId: subscriptionShort,
280+
ImportState: true,
281+
ImportStateVerify: true,
282+
},
283+
},
284+
})
285+
}
286+
287+
func TestAccPubsubSubscriptionCloudStorage_updateAvro(t *testing.T) {
288+
t.Parallel()
289+
290+
bucket := fmt.Sprintf("tf-test-bucket-%s", acctest.RandString(t, 10))
291+
topic := fmt.Sprintf("tf-test-topic-%s", acctest.RandString(t, 10))
292+
subscriptionShort := fmt.Sprintf("tf-test-sub-%s", acctest.RandString(t, 10))
293+
294+
acctest.VcrTest(t, resource.TestCase{
295+
PreCheck: func() { acctest.AccTestPreCheck(t) },
296+
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories(t),
297+
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
298+
Steps: []resource.TestStep{
299+
{
300+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "", "avro"),
301+
},
302+
{
303+
ResourceName: "google_pubsub_subscription.foo",
304+
ImportStateId: subscriptionShort,
305+
ImportState: true,
306+
ImportStateVerify: true,
307+
},
308+
{
309+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "avro"),
276310
},
277311
{
278312
ResourceName: "google_pubsub_subscription.foo",
@@ -297,7 +331,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
297331
CheckDestroy: testAccCheckPubsubSubscriptionDestroyProducer(t),
298332
Steps: []resource.TestStep{
299333
{
300-
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa"),
334+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa", "text"),
301335
},
302336
{
303337
ResourceName: "google_pubsub_subscription.foo",
@@ -306,7 +340,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
306340
ImportStateVerify: true,
307341
},
308342
{
309-
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", ""),
343+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "pre-", "-suffix", "YYYY-MM-DD/hh_mm_ssZ", 1000, "300s", 1000, "", "text"),
310344
},
311345
{
312346
ResourceName: "google_pubsub_subscription.foo",
@@ -315,7 +349,7 @@ func TestAccPubsubSubscriptionCloudStorage_serviceAccount(t *testing.T) {
315349
ImportStateVerify: true,
316350
},
317351
{
318-
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", "gcs-test-sa2"),
352+
Config: testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscriptionShort, "", "", "", 0, "", 0, "gcs-test-sa2", "avro"),
319353
},
320354
{
321355
ResourceName: "google_pubsub_subscription.foo",
@@ -597,10 +631,10 @@ resource "google_pubsub_subscription" "foo" {
597631
}
598632

599633
func testAccPubsubSubscriptionBigQuery_basic(dataset, table, topic, subscription string, useTableSchema bool, serviceAccountId string) string {
600-
serivceAccountEmailField := ""
601-
serivceAccountResource := ""
634+
serviceAccountEmailField := ""
635+
serviceAccountResource := ""
602636
if serviceAccountId != "" {
603-
serivceAccountResource = fmt.Sprintf(`
637+
serviceAccountResource = fmt.Sprintf(`
604638
resource "google_service_account" "bq_write_service_account" {
605639
account_id = "%s"
606640
display_name = "BQ Write Service Account"
@@ -617,9 +651,9 @@ resource "google_project_iam_member" "editor" {
617651
role = "roles/bigquery.dataEditor"
618652
member = "serviceAccount:${google_service_account.bq_write_service_account.email}"
619653
}`, serviceAccountId)
620-
serivceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
654+
serviceAccountEmailField = "service_account_email = google_service_account.bq_write_service_account.email"
621655
} else {
622-
serivceAccountResource = fmt.Sprintf(`
656+
serviceAccountResource = fmt.Sprintf(`
623657
resource "google_project_iam_member" "viewer" {
624658
project = data.google_project.project.project_id
625659
role = "roles/bigquery.metadataViewer"
@@ -679,10 +713,10 @@ resource "google_pubsub_subscription" "foo" {
679713
google_project_iam_member.editor
680714
]
681715
}
682-
`, serivceAccountResource, dataset, table, topic, subscription, useTableSchema, serivceAccountEmailField)
716+
`, serviceAccountResource, dataset, table, topic, subscription, useTableSchema, serviceAccountEmailField)
683717
}
684718

685-
func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, serviceAccountId string) string {
719+
func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, filenamePrefix, filenameSuffix, filenameDatetimeFormat string, maxBytes int, maxDuration string, maxMessages int, serviceAccountId, outputFormat string) string {
686720
filenamePrefixString := ""
687721
if filenamePrefix != "" {
688722
filenamePrefixString = fmt.Sprintf(`filename_prefix = "%s"`, filenamePrefix)
@@ -703,11 +737,15 @@ func testAccPubsubSubscriptionCloudStorage_basic(bucket, topic, subscription, fi
703737
if maxDuration != "" {
704738
maxDurationString = fmt.Sprintf(`max_duration = "%s"`, maxDuration)
705739
}
740+
maxMessagesString := ""
741+
if maxMessages != 0 {
742+
maxMessagesString = fmt.Sprintf(`max_messages = %d`, maxMessages)
743+
}
706744

707-
serivceAccountEmailField := ""
708-
serivceAccountResource := ""
745+
serviceAccountEmailField := ""
746+
serviceAccountResource := ""
709747
if serviceAccountId != "" {
710-
serivceAccountResource = fmt.Sprintf(`
748+
serviceAccountResource = fmt.Sprintf(`
711749
resource "google_service_account" "storage_write_service_account" {
712750
account_id = "%s"
713751
display_name = "Write Service Account"
@@ -724,14 +762,23 @@ resource "google_project_iam_member" "editor" {
724762
role = "roles/bigquery.dataEditor"
725763
member = "serviceAccount:${google_service_account.storage_write_service_account.email}"
726764
}`, serviceAccountId)
727-
serivceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
765+
serviceAccountEmailField = "service_account_email = google_service_account.storage_write_service_account.email"
728766
} else {
729-
serivceAccountResource = fmt.Sprintf(`
767+
serviceAccountResource = fmt.Sprintf(`
730768
resource "google_storage_bucket_iam_member" "admin" {
731769
bucket = google_storage_bucket.test.name
732770
role = "roles/storage.admin"
733771
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-pubsub.iam.gserviceaccount.com"
734772
}`)
773+
}
774+
outputFormatString := ""
775+
if outputFormat == "avro" {
776+
outputFormatString = `
777+
avro_config {
778+
write_metadata = true
779+
use_topic_schema = true
780+
}
781+
`
735782
}
736783
return fmt.Sprintf(`
737784
data "google_project" "project" { }
@@ -758,15 +805,17 @@ resource "google_pubsub_subscription" "foo" {
758805
%s
759806
%s
760807
%s
761-
%s
808+
%s
809+
%s
810+
%s
762811
}
763812
764813
depends_on = [
765814
google_storage_bucket.test,
766815
google_storage_bucket_iam_member.admin,
767816
]
768817
}
769-
`, bucket, serivceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, serivceAccountEmailField)
818+
`, bucket, serviceAccountResource, topic, subscription, filenamePrefixString, filenameSuffixString, filenameDatetimeString, maxBytesString, maxDurationString, maxMessagesString, serviceAccountEmailField, outputFormatString)
770819
}
771820

772821
func testAccPubsubSubscription_topicOnly(topic string) string {

0 commit comments

Comments
 (0)