Skip to content

Commit f9123b0

Browse files
Add AwsMsk & ConfluentCloud to 'google_pubsub_topic' (#12765) (#9114)
[upstream:665eddddbb5fd4ce0519f02944edc546159fb96c] Signed-off-by: Modular Magician <[email protected]>
1 parent 4015c26 commit f9123b0

6 files changed

+630
-0
lines changed

.changelog/12765.txt

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
pubsub: added `ingestion_data_source_settings.aws_msk` and `ingestion_data_source_settings.confluent_cloud` fields to `google_pubsub_topic` resource
3+
```

google-beta/services/pubsub/resource_pubsub_topic.go

+290
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,42 @@ equals to this service account number.`,
108108
},
109109
ConflictsWith: []string{},
110110
},
111+
"aws_msk": {
112+
Type: schema.TypeList,
113+
Optional: true,
114+
Description: `Settings for ingestion from Amazon Managed Streaming for Apache Kafka.`,
115+
MaxItems: 1,
116+
Elem: &schema.Resource{
117+
Schema: map[string]*schema.Schema{
118+
"aws_role_arn": {
119+
Type: schema.TypeString,
120+
Required: true,
121+
Description: `AWS role ARN to be used for Federated Identity authentication with
122+
MSK. Check the Pub/Sub docs for how to set up this role and the
123+
required permissions that need to be attached to it.`,
124+
},
125+
"cluster_arn": {
126+
Type: schema.TypeString,
127+
Required: true,
128+
Description: `ARN that uniquely identifies the MSK cluster.`,
129+
},
130+
"gcp_service_account": {
131+
Type: schema.TypeString,
132+
Required: true,
133+
Description: `The GCP service account to be used for Federated Identity authentication
134+
with MSK (via a 'AssumeRoleWithWebIdentity' call for the provided
135+
role). The 'awsRoleArn' must be set up with 'accounts.google.com:sub'
136+
equals to this service account number.`,
137+
},
138+
"topic": {
139+
Type: schema.TypeString,
140+
Required: true,
141+
Description: `The name of the MSK topic that Pub/Sub will import from.`,
142+
},
143+
},
144+
},
145+
ConflictsWith: []string{},
146+
},
111147
"azure_event_hubs": {
112148
Type: schema.TypeList,
113149
Optional: true,
@@ -233,6 +269,43 @@ message. When unset, '\n' is used.`,
233269
},
234270
ConflictsWith: []string{},
235271
},
272+
"confluent_cloud": {
273+
Type: schema.TypeList,
274+
Optional: true,
275+
Description: `Settings for ingestion from Confluent Cloud.`,
276+
MaxItems: 1,
277+
Elem: &schema.Resource{
278+
Schema: map[string]*schema.Schema{
279+
"bootstrap_server": {
280+
Type: schema.TypeString,
281+
Required: true,
282+
Description: `The Confluent Cloud bootstrap server. The format is url:port.`,
283+
},
284+
"gcp_service_account": {
285+
Type: schema.TypeString,
286+
Required: true,
287+
Description: `The GCP service account to be used for Federated Identity authentication
288+
with Confluent Cloud.`,
289+
},
290+
"identity_pool_id": {
291+
Type: schema.TypeString,
292+
Required: true,
293+
Description: `Identity pool ID to be used for Federated Identity authentication with Confluent Cloud.`,
294+
},
295+
"topic": {
296+
Type: schema.TypeString,
297+
Required: true,
298+
Description: `Name of the Confluent Cloud topic that Pub/Sub will import from.`,
299+
},
300+
"cluster_id": {
301+
Type: schema.TypeString,
302+
Optional: true,
303+
Description: `The Confluent Cloud cluster ID.`,
304+
},
305+
},
306+
},
307+
ConflictsWith: []string{},
308+
},
236309
"platform_logs_settings": {
237310
Type: schema.TypeList,
238311
Optional: true,
@@ -883,6 +956,10 @@ func flattenPubsubTopicIngestionDataSourceSettings(v interface{}, d *schema.Reso
883956
flattenPubsubTopicIngestionDataSourceSettingsPlatformLogsSettings(original["platformLogsSettings"], d, config)
884957
transformed["azure_event_hubs"] =
885958
flattenPubsubTopicIngestionDataSourceSettingsAzureEventHubs(original["azureEventHubs"], d, config)
959+
transformed["aws_msk"] =
960+
flattenPubsubTopicIngestionDataSourceSettingsAwsMsk(original["awsMsk"], d, config)
961+
transformed["confluent_cloud"] =
962+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloud(original["confluentCloud"], d, config)
886963
return []interface{}{transformed}
887964
}
888965
func flattenPubsubTopicIngestionDataSourceSettingsAwsKinesis(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
@@ -1058,6 +1135,82 @@ func flattenPubsubTopicIngestionDataSourceSettingsAzureEventHubsGcpServiceAccoun
10581135
return v
10591136
}
10601137

1138+
func flattenPubsubTopicIngestionDataSourceSettingsAwsMsk(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1139+
if v == nil {
1140+
return nil
1141+
}
1142+
original := v.(map[string]interface{})
1143+
if len(original) == 0 {
1144+
return nil
1145+
}
1146+
transformed := make(map[string]interface{})
1147+
transformed["cluster_arn"] =
1148+
flattenPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(original["clusterArn"], d, config)
1149+
transformed["topic"] =
1150+
flattenPubsubTopicIngestionDataSourceSettingsAwsMskTopic(original["topic"], d, config)
1151+
transformed["aws_role_arn"] =
1152+
flattenPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(original["awsRoleArn"], d, config)
1153+
transformed["gcp_service_account"] =
1154+
flattenPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(original["gcpServiceAccount"], d, config)
1155+
return []interface{}{transformed}
1156+
}
1157+
func flattenPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1158+
return v
1159+
}
1160+
1161+
func flattenPubsubTopicIngestionDataSourceSettingsAwsMskTopic(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1162+
return v
1163+
}
1164+
1165+
func flattenPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1166+
return v
1167+
}
1168+
1169+
func flattenPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1170+
return v
1171+
}
1172+
1173+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloud(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1174+
if v == nil {
1175+
return nil
1176+
}
1177+
original := v.(map[string]interface{})
1178+
if len(original) == 0 {
1179+
return nil
1180+
}
1181+
transformed := make(map[string]interface{})
1182+
transformed["bootstrap_server"] =
1183+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(original["bootstrapServer"], d, config)
1184+
transformed["cluster_id"] =
1185+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(original["clusterId"], d, config)
1186+
transformed["topic"] =
1187+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(original["topic"], d, config)
1188+
transformed["identity_pool_id"] =
1189+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(original["identityPoolId"], d, config)
1190+
transformed["gcp_service_account"] =
1191+
flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(original["gcpServiceAccount"], d, config)
1192+
return []interface{}{transformed}
1193+
}
1194+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1195+
return v
1196+
}
1197+
1198+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1199+
return v
1200+
}
1201+
1202+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1203+
return v
1204+
}
1205+
1206+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1207+
return v
1208+
}
1209+
1210+
func flattenPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
1211+
return v
1212+
}
1213+
10611214
func flattenPubsubTopicTerraformLabels(v interface{}, d *schema.ResourceData, config *transport_tpg.Config) interface{} {
10621215
if v == nil {
10631216
return v
@@ -1194,6 +1347,20 @@ func expandPubsubTopicIngestionDataSourceSettings(v interface{}, d tpgresource.T
11941347
transformed["azureEventHubs"] = transformedAzureEventHubs
11951348
}
11961349

1350+
transformedAwsMsk, err := expandPubsubTopicIngestionDataSourceSettingsAwsMsk(original["aws_msk"], d, config)
1351+
if err != nil {
1352+
return nil, err
1353+
} else if val := reflect.ValueOf(transformedAwsMsk); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1354+
transformed["awsMsk"] = transformedAwsMsk
1355+
}
1356+
1357+
transformedConfluentCloud, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloud(original["confluent_cloud"], d, config)
1358+
if err != nil {
1359+
return nil, err
1360+
} else if val := reflect.ValueOf(transformedConfluentCloud); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1361+
transformed["confluentCloud"] = transformedConfluentCloud
1362+
}
1363+
11971364
return transformed, nil
11981365
}
11991366

@@ -1484,6 +1651,129 @@ func expandPubsubTopicIngestionDataSourceSettingsAzureEventHubsGcpServiceAccount
14841651
return v, nil
14851652
}
14861653

1654+
func expandPubsubTopicIngestionDataSourceSettingsAwsMsk(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1655+
l := v.([]interface{})
1656+
if len(l) == 0 || l[0] == nil {
1657+
return nil, nil
1658+
}
1659+
raw := l[0]
1660+
original := raw.(map[string]interface{})
1661+
transformed := make(map[string]interface{})
1662+
1663+
transformedClusterArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(original["cluster_arn"], d, config)
1664+
if err != nil {
1665+
return nil, err
1666+
} else if val := reflect.ValueOf(transformedClusterArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1667+
transformed["clusterArn"] = transformedClusterArn
1668+
}
1669+
1670+
transformedTopic, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskTopic(original["topic"], d, config)
1671+
if err != nil {
1672+
return nil, err
1673+
} else if val := reflect.ValueOf(transformedTopic); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1674+
transformed["topic"] = transformedTopic
1675+
}
1676+
1677+
transformedAwsRoleArn, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(original["aws_role_arn"], d, config)
1678+
if err != nil {
1679+
return nil, err
1680+
} else if val := reflect.ValueOf(transformedAwsRoleArn); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1681+
transformed["awsRoleArn"] = transformedAwsRoleArn
1682+
}
1683+
1684+
transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(original["gcp_service_account"], d, config)
1685+
if err != nil {
1686+
return nil, err
1687+
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1688+
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
1689+
}
1690+
1691+
return transformed, nil
1692+
}
1693+
1694+
func expandPubsubTopicIngestionDataSourceSettingsAwsMskClusterArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1695+
return v, nil
1696+
}
1697+
1698+
func expandPubsubTopicIngestionDataSourceSettingsAwsMskTopic(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1699+
return v, nil
1700+
}
1701+
1702+
func expandPubsubTopicIngestionDataSourceSettingsAwsMskAwsRoleArn(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1703+
return v, nil
1704+
}
1705+
1706+
func expandPubsubTopicIngestionDataSourceSettingsAwsMskGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1707+
return v, nil
1708+
}
1709+
1710+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloud(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1711+
l := v.([]interface{})
1712+
if len(l) == 0 || l[0] == nil {
1713+
return nil, nil
1714+
}
1715+
raw := l[0]
1716+
original := raw.(map[string]interface{})
1717+
transformed := make(map[string]interface{})
1718+
1719+
transformedBootstrapServer, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(original["bootstrap_server"], d, config)
1720+
if err != nil {
1721+
return nil, err
1722+
} else if val := reflect.ValueOf(transformedBootstrapServer); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1723+
transformed["bootstrapServer"] = transformedBootstrapServer
1724+
}
1725+
1726+
transformedClusterId, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(original["cluster_id"], d, config)
1727+
if err != nil {
1728+
return nil, err
1729+
} else if val := reflect.ValueOf(transformedClusterId); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1730+
transformed["clusterId"] = transformedClusterId
1731+
}
1732+
1733+
transformedTopic, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(original["topic"], d, config)
1734+
if err != nil {
1735+
return nil, err
1736+
} else if val := reflect.ValueOf(transformedTopic); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1737+
transformed["topic"] = transformedTopic
1738+
}
1739+
1740+
transformedIdentityPoolId, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(original["identity_pool_id"], d, config)
1741+
if err != nil {
1742+
return nil, err
1743+
} else if val := reflect.ValueOf(transformedIdentityPoolId); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1744+
transformed["identityPoolId"] = transformedIdentityPoolId
1745+
}
1746+
1747+
transformedGcpServiceAccount, err := expandPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(original["gcp_service_account"], d, config)
1748+
if err != nil {
1749+
return nil, err
1750+
} else if val := reflect.ValueOf(transformedGcpServiceAccount); val.IsValid() && !tpgresource.IsEmptyValue(val) {
1751+
transformed["gcpServiceAccount"] = transformedGcpServiceAccount
1752+
}
1753+
1754+
return transformed, nil
1755+
}
1756+
1757+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudBootstrapServer(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1758+
return v, nil
1759+
}
1760+
1761+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudClusterId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1762+
return v, nil
1763+
}
1764+
1765+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudTopic(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1766+
return v, nil
1767+
}
1768+
1769+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudIdentityPoolId(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1770+
return v, nil
1771+
}
1772+
1773+
func expandPubsubTopicIngestionDataSourceSettingsConfluentCloudGcpServiceAccount(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (interface{}, error) {
1774+
return v, nil
1775+
}
1776+
14871777
func expandPubsubTopicEffectiveLabels(v interface{}, d tpgresource.TerraformResourceData, config *transport_tpg.Config) (map[string]string, error) {
14881778
if v == nil {
14891779
return map[string]string{}, nil

google-beta/services/pubsub/resource_pubsub_topic_generated_meta.yaml

+9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ fields:
1010
- field: 'ingestion_data_source_settings.aws_kinesis.consumer_arn'
1111
- field: 'ingestion_data_source_settings.aws_kinesis.gcp_service_account'
1212
- field: 'ingestion_data_source_settings.aws_kinesis.stream_arn'
13+
- field: 'ingestion_data_source_settings.aws_msk.aws_role_arn'
14+
- field: 'ingestion_data_source_settings.aws_msk.cluster_arn'
15+
- field: 'ingestion_data_source_settings.aws_msk.gcp_service_account'
16+
- field: 'ingestion_data_source_settings.aws_msk.topic'
1317
- field: 'ingestion_data_source_settings.azure_event_hubs.client_id'
1418
- field: 'ingestion_data_source_settings.azure_event_hubs.event_hub'
1519
- field: 'ingestion_data_source_settings.azure_event_hubs.gcp_service_account'
@@ -23,6 +27,11 @@ fields:
2327
- field: 'ingestion_data_source_settings.cloud_storage.minimum_object_create_time'
2428
- field: 'ingestion_data_source_settings.cloud_storage.pubsub_avro_format'
2529
- field: 'ingestion_data_source_settings.cloud_storage.text_format.delimiter'
30+
- field: 'ingestion_data_source_settings.confluent_cloud.bootstrap_server'
31+
- field: 'ingestion_data_source_settings.confluent_cloud.cluster_id'
32+
- field: 'ingestion_data_source_settings.confluent_cloud.gcp_service_account'
33+
- field: 'ingestion_data_source_settings.confluent_cloud.identity_pool_id'
34+
- field: 'ingestion_data_source_settings.confluent_cloud.topic'
2635
- field: 'ingestion_data_source_settings.platform_logs_settings.severity'
2736
- field: 'kms_key_name'
2837
- field: 'labels'

0 commit comments

Comments
 (0)