Skip to content

Add AwsMsk & ConfluentCloud to 'google_pubsub_topic' #9114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12765.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
pubsub: added `ingestion_data_source_settings.aws_msk` and `ingestion_data_source_settings.confluent_cloud` fields to `google_pubsub_topic` resource
```
290 changes: 290 additions & 0 deletions google-beta/services/pubsub/resource_pubsub_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,42 @@ equals to this service account number.`,
},
ConflictsWith: []string{},
},
"aws_msk": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from Amazon Managed Streaming for Apache Kafka.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"aws_role_arn": {
Type: schema.TypeString,
Required: true,
Description: `AWS role ARN to be used for Federated Identity authentication with
MSK. Check the Pub/Sub docs for how to set up this role and the
required permissions that need to be attached to it.`,
},
"cluster_arn": {
Type: schema.TypeString,
Required: true,
Description: `ARN that uniquely identifies the MSK cluster.`,
},
"gcp_service_account": {
Type: schema.TypeString,
Required: true,
Description: `The GCP service account to be used for Federated Identity authentication
with MSK (via a 'AssumeRoleWithWebIdentity' call for the provided
role). The 'awsRoleArn' must be set up with 'accounts.google.com:sub'
equals to this service account number.`,
},
"topic": {
Type: schema.TypeString,
Required: true,
Description: `The name of the MSK topic that Pub/Sub will import from.`,
},
},
},
ConflictsWith: []string{},
},
"azure_event_hubs": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -233,6 +269,43 @@ message. When unset, '\n' is used.`,
},
ConflictsWith: []string{},
},
"confluent_cloud": {
Type: schema.TypeList,
Optional: true,
Description: `Settings for ingestion from Confluent Cloud.`,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bootstrap_server": {
Type: schema.TypeString,
Required: true,
Description: `The Confluent Cloud bootstrap server. The format is url:port.`,
},
"gcp_service_account": {
Type: schema.TypeString,
Required: true,
Description: `The GCP service account to be used for Federated Identity authentication
with Confluent Cloud.`,
},
"identity_pool_id": {
Type: schema.TypeString,
Required: true,
Description: `Identity pool ID to be used for Federated Identity authentication with Confluent Cloud.`,
},
"topic": {
Type: schema.TypeString,
Required: true,
Description: `Name of the Confluent Cloud topic that Pub/Sub will import from.`,
},
"cluster_id": {
Type: schema.TypeString,
Optional: true,
Description: `The Confluent Cloud cluster ID.`,
},
},
},
ConflictsWith: []string{},
},
"platform_logs_settings": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -883,6 +956,10 @@ func flattenPubsubTopicIngestionDataSourceSettings(v interface{}, d *schema.Reso
flattenPubsubTopicIngestionDataSourceSettingsPlatformLogsSettings(original["platformLogsSettings"], d, config)
transformed["azure_event_hubs"] =
flattenPubsubTopicIngestionDataSourceSettingsAzureEventHubs(original["azureEventHubs"], d, config)
transformed["aws_msk"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsMsk(original["awsMsk"], d, config)
transformed["confluent_cloud"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloud(original["confluentCloud"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
Expand Down Expand Up @@ -1058,6 +1135,82 @@ func flattenPubsubTopicIngestionDataSourceSettingsAzureEventHubsGcpServiceAccoun
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsMsk(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["cluster_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(original["clusterArn"], d, config)
transformed["topic"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsMskTopic(original["topic"], d, config)
transformed["aws_role_arn"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(original["awsRoleArn"], d, config)
transformed["gcp_service_account"] =
flattenPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(original["gcpServiceAccount"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsMskTopic(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloud(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return nil
}
original := v.(map[string]interface{})
if len(original) == 0 {
return nil
}
transformed := make(map[string]interface{})
transformed["bootstrap_server"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(original["bootstrapServer"], d, config)
transformed["cluster_id"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(original["clusterId"], d, config)
transformed["topic"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(original["topic"], d, config)
transformed["identity_pool_id"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(original["identityPoolId"], d, config)
transformed["gcp_service_account"] =
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(original["gcpServiceAccount"], d, config)
return []interface{}{transformed}
}
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
return v
}

func flattenPubsubTopicTerraformLabels(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
if v == nil {
return v
Expand Down Expand Up @@ -1194,6 +1347,20 @@ func expandPubsubTopicIngestionDataSourceSettings(v interface{}, d tpgresource.T
transformed["azureEventHubs"] = transformedAzureEventHubs
}

transformedAwsMsk, err := expandPubsubTopicIngestionDataSourceSettingsAwsMsk(original["aws_msk"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsMsk); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsMsk"] = transformedAwsMsk
}

transformedConfluentCloud, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloud(original["confluent_cloud"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedConfluentCloud); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["confluentCloud"] = transformedConfluentCloud
}

return transformed, nil
}

Expand Down Expand Up @@ -1484,6 +1651,129 @@ func expandPubsubTopicIngestionDataSourceSettingsAzureEventHubsGcpServiceAccount
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsMsk(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedClusterArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(original["cluster_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedClusterArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["clusterArn"] = transformedClusterArn
}

transformedTopic, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskTopic(original["topic"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTopic); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["topic"] = transformedTopic
}

transformedAwsRoleArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(original["aws_role_arn"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedAwsRoleArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["awsRoleArn"] = transformedAwsRoleArn
}

transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(original["gcp_service_account"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsMskTopic(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloud(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
l := v.([]interface{})
if len(l) == 0 || l[0] == nil {
return nil, nil
}
raw := l[0]
original := raw.(map[string]interface{})
transformed := make(map[string]interface{})

transformedBootstrapServer, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(original["bootstrap_server"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedBootstrapServer); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["bootstrapServer"] = transformedBootstrapServer
}

transformedClusterId, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(original["cluster_id"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedClusterId); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["clusterId"] = transformedClusterId
}

transformedTopic, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(original["topic"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedTopic); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["topic"] = transformedTopic
}

transformedIdentityPoolId, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(original["identity_pool_id"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedIdentityPoolId); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["identityPoolId"] = transformedIdentityPoolId
}

transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(original["gcp_service_account"], d, config)
if err != nil {
return nil, err
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
}

return transformed, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
return v, nil
}

func expandPubsubTopicEffectiveLabels(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
if v == nil {
return map[string]string{}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ fields:
- field: 'ingestion_data_source_settings.aws_kinesis.consumer_arn'
- field: 'ingestion_data_source_settings.aws_kinesis.gcp_service_account'
- field: 'ingestion_data_source_settings.aws_kinesis.stream_arn'
- field: 'ingestion_data_source_settings.aws_msk.aws_role_arn'
- field: 'ingestion_data_source_settings.aws_msk.cluster_arn'
- field: 'ingestion_data_source_settings.aws_msk.gcp_service_account'
- field: 'ingestion_data_source_settings.aws_msk.topic'
- field: 'ingestion_data_source_settings.azure_event_hubs.client_id'
- field: 'ingestion_data_source_settings.azure_event_hubs.event_hub'
- field: 'ingestion_data_source_settings.azure_event_hubs.gcp_service_account'
Expand All @@ -23,6 +27,11 @@ fields:
- field: 'ingestion_data_source_settings.cloud_storage.minimum_object_create_time'
- field: 'ingestion_data_source_settings.cloud_storage.pubsub_avro_format'
- field: 'ingestion_data_source_settings.cloud_storage.text_format.delimiter'
- field: 'ingestion_data_source_settings.confluent_cloud.bootstrap_server'
- field: 'ingestion_data_source_settings.confluent_cloud.cluster_id'
- field: 'ingestion_data_source_settings.confluent_cloud.gcp_service_account'
- field: 'ingestion_data_source_settings.confluent_cloud.identity_pool_id'
- field: 'ingestion_data_source_settings.confluent_cloud.topic'
- field: 'ingestion_data_source_settings.platform_logs_settings.severity'
- field: 'kms_key_name'
- field: 'labels'
Expand Down
Loading
Loading