Skip to content

Commit 647beb9

Browse files
Add active clusters config to domain schema (#6945)
1 parent 6364be5 commit 647beb9

30 files changed

+1314
-348
lines changed

common/domain/attrValidator.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,41 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(
9999

100100
activeCluster := replicationConfig.ActiveClusterName
101101
clusters := replicationConfig.Clusters
102+
activeClusters := replicationConfig.ActiveClusters
102103

103-
if err := d.validateClusterName(activeCluster); err != nil {
104-
return err
105-
}
106104
for _, clusterConfig := range clusters {
107105
if err := d.validateClusterName(clusterConfig.ClusterName); err != nil {
108106
return err
109107
}
110108
}
111109

112-
activeClusterInClusters := false
113-
for _, clusterConfig := range clusters {
114-
if clusterConfig.ClusterName == activeCluster {
115-
activeClusterInClusters = true
116-
break
110+
isInClusters := func(clusterName string) bool {
111+
for _, clusterConfig := range clusters {
112+
if clusterConfig.ClusterName == clusterName {
113+
return true
114+
}
117115
}
116+
return false
118117
}
119-
if !activeClusterInClusters {
120-
return errActiveClusterNotInClusters
118+
119+
if replicationConfig.IsActiveActive() {
120+
for _, cluster := range activeClusters.ActiveClustersByRegion {
121+
if err := d.validateClusterName(cluster.ActiveClusterName); err != nil {
122+
return err
123+
}
124+
125+
if !isInClusters(cluster.ActiveClusterName) {
126+
return errActiveClusterNotInClusters
127+
}
128+
}
129+
} else {
130+
if err := d.validateClusterName(activeCluster); err != nil {
131+
return err
132+
}
133+
134+
if !isInClusters(activeCluster) {
135+
return errActiveClusterNotInClusters
136+
}
121137
}
122138

123139
return nil

common/domain/handler.go

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ type (
113113
// FailoverEvent is the failover information to be stored for each failover event in domain data
114114
FailoverEvent struct {
115115
EventTime time.Time `json:"eventTime"`
116-
FromCluster string `json:"fromCluster"`
117-
ToCluster string `json:"toCluster"`
118-
FailoverType string `json:"failoverType"`
116+
FromCluster string `json:"fromCluster,omitempty"`
117+
ToCluster string `json:"toCluster,omitempty"`
118+
FailoverType string `json:"failoverType,omitempty"`
119119
}
120120

121121
// FailoverHistory is the history of failovers for a domain limited by the FailoverHistoryMaxSize config
@@ -257,6 +257,7 @@ func (d *handlerImpl) RegisterDomain(
257257
if err != nil {
258258
return err
259259
}
260+
260261
replicationConfig := &persistence.DomainReplicationConfig{
261262
ActiveClusterName: activeClusterName,
262263
Clusters: clusters,
@@ -476,6 +477,7 @@ func (d *handlerImpl) UpdateDomain(
476477

477478
// Update replication config
478479
replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig(
480+
getResponse.Info.Name,
479481
replicationConfig,
480482
updateRequest,
481483
)
@@ -535,14 +537,18 @@ func (d *handlerImpl) UpdateDomain(
535537
gracefulFailoverEndTime = nil
536538
previousFailoverVersion = constants.InitialPreviousFailoverVersion
537539
}
538-
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
539-
replicationConfig.ActiveClusterName,
540-
failoverVersion,
541-
updateRequest.Name,
542-
)
543-
err = updateFailoverHistory(info, d.config, now, currentActiveCluster, *updateRequest.ActiveClusterName, failoverType)
544-
if err != nil {
545-
d.logger.Warn("failed to update failover history", tag.Error(err))
540+
541+
if !replicationConfig.IsActiveActive() {
542+
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
543+
replicationConfig.ActiveClusterName,
544+
failoverVersion,
545+
updateRequest.Name,
546+
)
547+
548+
err = updateFailoverHistory(info, d.config, now, currentActiveCluster, *updateRequest.ActiveClusterName, failoverType)
549+
if err != nil {
550+
d.logger.Warn("failed to update failover history", tag.Error(err))
551+
}
546552
}
547553

548554
failoverNotificationVersion = notificationVersion
@@ -950,6 +956,7 @@ func (d *handlerImpl) createResponse(
950956
replicationConfigResult := &types.DomainReplicationConfiguration{
951957
ActiveClusterName: replicationConfig.ActiveClusterName,
952958
Clusters: clusters,
959+
ActiveClusters: replicationConfig.ActiveClusters,
953960
}
954961

955962
return infoResult, configResult, replicationConfigResult
@@ -1207,6 +1214,7 @@ func (d *handlerImpl) updateDeleteBadBinary(
12071214
}
12081215

12091216
func (d *handlerImpl) updateReplicationConfig(
1217+
domainName string,
12101218
config *persistence.DomainReplicationConfig,
12111219
updateRequest *types.UpdateDomainRequest,
12121220
) (*persistence.DomainReplicationConfig, bool, bool, error) {
@@ -1236,7 +1244,44 @@ func (d *handlerImpl) updateReplicationConfig(
12361244
config.ActiveClusterName = *updateRequest.ActiveClusterName
12371245
}
12381246

1239-
// TODO(active-active): handle active-active case here which would be updateRequest.ActiveClusters != nil.
1247+
if updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.ActiveClustersByRegion != nil {
1248+
existingActiveClusters := config.ActiveClusters
1249+
finalActiveClusters := make(map[string]types.ActiveClusterInfo)
1250+
1251+
// first add the ones that are not touched
1252+
for region, activeCluster := range existingActiveClusters.ActiveClustersByRegion {
1253+
if _, ok := updateRequest.ActiveClusters.ActiveClustersByRegion[region]; !ok {
1254+
finalActiveClusters[region] = activeCluster
1255+
}
1256+
}
1257+
1258+
// then add the ones that are modified
1259+
for region, activeCluster := range updateRequest.ActiveClusters.ActiveClustersByRegion {
1260+
existingActiveCluster, ok := existingActiveClusters.ActiveClustersByRegion[region]
1261+
if !ok {
1262+
// a cluster is being activated on a region that didn't have any active cluster before
1263+
// initialize failover version to the initial failover version of the cluster
1264+
activeCluster.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeCluster.ActiveClusterName, 0, domainName)
1265+
finalActiveClusters[region] = activeCluster
1266+
continue
1267+
}
1268+
1269+
// handle modification of an active cluster change on a region that had an active cluster before
1270+
if existingActiveCluster.ActiveClusterName != activeCluster.ActiveClusterName {
1271+
// a cluster is being deactivated on a region that had an active cluster before
1272+
// set failover version to the next failover version of the newcluster
1273+
activeCluster.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeCluster.ActiveClusterName, existingActiveCluster.FailoverVersion, domainName)
1274+
finalActiveClusters[region] = activeCluster
1275+
} else {
1276+
// no update case, just copy the existing active cluster
1277+
finalActiveClusters[region] = activeCluster
1278+
}
1279+
}
1280+
config.ActiveClusters = &types.ActiveClusters{
1281+
ActiveClustersByRegion: finalActiveClusters,
1282+
}
1283+
activeClusterUpdated = true
1284+
}
12401285

12411286
return config, clusterUpdated, activeClusterUpdated, nil
12421287
}
@@ -1251,7 +1296,7 @@ func (d *handlerImpl) handleGracefulFailover(
12511296
isGlobalDomain bool,
12521297
) (*int64, int64, error) {
12531298
// must update active cluster on a global domain
1254-
if !activeClusterChanged || !isGlobalDomain {
1299+
if !activeClusterChanged || !isGlobalDomain || replicationConfig.IsActiveActive() {
12551300
return nil, 0, errInvalidGracefulFailover
12561301
}
12571302
// must start with the passive -> active cluster

0 commit comments

Comments
 (0)