Skip to content

Add active clusters config to domain schema #6945

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions common/domain/attrValidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,25 +99,41 @@ func (d *AttrValidatorImpl) validateDomainReplicationConfigForGlobalDomain(

activeCluster := replicationConfig.ActiveClusterName
clusters := replicationConfig.Clusters
activeClusters := replicationConfig.ActiveClusters

if err := d.validateClusterName(activeCluster); err != nil {
return err
}
for _, clusterConfig := range clusters {
if err := d.validateClusterName(clusterConfig.ClusterName); err != nil {
return err
}
}

activeClusterInClusters := false
for _, clusterConfig := range clusters {
if clusterConfig.ClusterName == activeCluster {
activeClusterInClusters = true
break
isInClusters := func(clusterName string) bool {
for _, clusterConfig := range clusters {
if clusterConfig.ClusterName == clusterName {
return true
}
}
return false
}
if !activeClusterInClusters {
return errActiveClusterNotInClusters

if replicationConfig.IsActiveActive() {
for _, cluster := range activeClusters.ActiveClustersByRegion {
if err := d.validateClusterName(cluster.ActiveClusterName); err != nil {
return err
}

if !isInClusters(cluster.ActiveClusterName) {
return errActiveClusterNotInClusters
}
}
} else {
if err := d.validateClusterName(activeCluster); err != nil {
return err
}

if !isInClusters(activeCluster) {
return errActiveClusterNotInClusters
}
}

return nil
Expand Down
71 changes: 58 additions & 13 deletions common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ type (
// FailoverEvent is the failover information to be stored for each failover event in domain data
FailoverEvent struct {
EventTime time.Time `json:"eventTime"`
FromCluster string `json:"fromCluster"`
ToCluster string `json:"toCluster"`
FailoverType string `json:"failoverType"`
FromCluster string `json:"fromCluster,omitempty"`
ToCluster string `json:"toCluster,omitempty"`
FailoverType string `json:"failoverType,omitempty"`
}

// FailoverHistory is the history of failovers for a domain limited by the FailoverHistoryMaxSize config
Expand Down Expand Up @@ -257,6 +257,7 @@ func (d *handlerImpl) RegisterDomain(
if err != nil {
return err
}

replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: activeClusterName,
Clusters: clusters,
Expand Down Expand Up @@ -476,6 +477,7 @@ func (d *handlerImpl) UpdateDomain(

// Update replication config
replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig(
getResponse.Info.Name,
replicationConfig,
updateRequest,
)
Expand Down Expand Up @@ -535,14 +537,18 @@ func (d *handlerImpl) UpdateDomain(
gracefulFailoverEndTime = nil
previousFailoverVersion = constants.InitialPreviousFailoverVersion
}
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
replicationConfig.ActiveClusterName,
failoverVersion,
updateRequest.Name,
)
err = updateFailoverHistory(info, d.config, now, currentActiveCluster, *updateRequest.ActiveClusterName, failoverType)
if err != nil {
d.logger.Warn("failed to update failover history", tag.Error(err))

if !replicationConfig.IsActiveActive() {
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
replicationConfig.ActiveClusterName,
failoverVersion,
updateRequest.Name,
)

err = updateFailoverHistory(info, d.config, now, currentActiveCluster, *updateRequest.ActiveClusterName, failoverType)
if err != nil {
d.logger.Warn("failed to update failover history", tag.Error(err))
}
}

failoverNotificationVersion = notificationVersion
Expand Down Expand Up @@ -950,6 +956,7 @@ func (d *handlerImpl) createResponse(
replicationConfigResult := &types.DomainReplicationConfiguration{
ActiveClusterName: replicationConfig.ActiveClusterName,
Clusters: clusters,
ActiveClusters: replicationConfig.ActiveClusters,
}

return infoResult, configResult, replicationConfigResult
Expand Down Expand Up @@ -1207,6 +1214,7 @@ func (d *handlerImpl) updateDeleteBadBinary(
}

func (d *handlerImpl) updateReplicationConfig(
domainName string,
config *persistence.DomainReplicationConfig,
updateRequest *types.UpdateDomainRequest,
) (*persistence.DomainReplicationConfig, bool, bool, error) {
Expand Down Expand Up @@ -1236,7 +1244,44 @@ func (d *handlerImpl) updateReplicationConfig(
config.ActiveClusterName = *updateRequest.ActiveClusterName
}

// TODO(active-active): handle active-active case here which would be updateRequest.ActiveClusters != nil.
if updateRequest.ActiveClusters != nil && updateRequest.ActiveClusters.ActiveClustersByRegion != nil {
existingActiveClusters := config.ActiveClusters
finalActiveClusters := make(map[string]types.ActiveClusterInfo)

// first add the ones that are not touched
for region, activeCluster := range existingActiveClusters.ActiveClustersByRegion {
if _, ok := updateRequest.ActiveClusters.ActiveClustersByRegion[region]; !ok {
finalActiveClusters[region] = activeCluster
}
}

// then add the ones that are modified
for region, activeCluster := range updateRequest.ActiveClusters.ActiveClustersByRegion {
existingActiveCluster, ok := existingActiveClusters.ActiveClustersByRegion[region]
if !ok {
// a cluster is being activated on a region that didn't have any active cluster before
// initialize failover version to the initial failover version of the cluster
activeCluster.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeCluster.ActiveClusterName, 0, domainName)
finalActiveClusters[region] = activeCluster
continue
}

// handle modification of an active cluster change on a region that had an active cluster before
if existingActiveCluster.ActiveClusterName != activeCluster.ActiveClusterName {
// a cluster is being deactivated on a region that had an active cluster before
// set failover version to the next failover version of the newcluster
activeCluster.FailoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeCluster.ActiveClusterName, existingActiveCluster.FailoverVersion, domainName)
finalActiveClusters[region] = activeCluster
} else {
// no update case, just copy the existing active cluster
finalActiveClusters[region] = activeCluster
}
}
config.ActiveClusters = &types.ActiveClusters{
ActiveClustersByRegion: finalActiveClusters,
}
activeClusterUpdated = true
}

return config, clusterUpdated, activeClusterUpdated, nil
}
Expand All @@ -1251,7 +1296,7 @@ func (d *handlerImpl) handleGracefulFailover(
isGlobalDomain bool,
) (*int64, int64, error) {
// must update active cluster on a global domain
if !activeClusterChanged || !isGlobalDomain {
if !activeClusterChanged || !isGlobalDomain || replicationConfig.IsActiveActive() {
return nil, 0, errInvalidGracefulFailover
}
// must start with the passive -> active cluster
Expand Down
Loading