Skip to content

Commit 479dc42

Browse files
active-active prototyping
1 parent 2c2a590 commit 479dc42

File tree

92 files changed

+1106
-286
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+1106
-286
lines changed

common/activecluster/manager.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package activecluster
24+
25+
import (
26+
"context"
27+
28+
"github.com/uber/cadence/common"
29+
"github.com/uber/cadence/common/cache"
30+
"github.com/uber/cadence/common/cluster"
31+
"github.com/uber/cadence/common/log"
32+
"github.com/uber/cadence/common/log/tag"
33+
"github.com/uber/cadence/common/metrics"
34+
)
35+
36+
//go:generate mockgen -package $GOPACKAGE -destination manager_mock.go -self_package github.com/uber/cadence/common/activecluster github.com/uber/cadence/common/activecluster Manager
37+
38+
type Manager interface {
39+
common.Daemon
40+
41+
// ActiveCluster returns active cluster name of domains.
42+
// 1. If domain is local, return current cluster name.
43+
// 2. If domain is active-passive global domain, return it's domain entry's ActiveClusterName.
44+
// 3. If domain is active-active global domain, check workflow's activeness metadata and return corresponding active cluster name.
45+
ActiveCluster(ctx context.Context, domainID, wfID, rID string) (string, error)
46+
47+
// FailoverVersion returns failover version of domains.
48+
// 1. If domain is local, return current it's domain entry's failover version.
49+
// 2. If domain is active-passive global domain, return it's domain entry's failover version.
50+
// 3. If domain is active-active domain, check workflow's activeness metadata and lookup failover version from EntityActiveRegion lookup table.
51+
FailoverVersion(ctx context.Context, domainID, wfID, rID string) (int64, error)
52+
}
53+
54+
type DomainIDToDomainFn func(id string) (*cache.DomainCacheEntry, error)
55+
56+
type manager struct {
57+
domainIDToDomainFn DomainIDToDomainFn
58+
clusterMetadata cluster.Metadata
59+
metricsCl metrics.Client
60+
logger log.Logger
61+
}
62+
63+
func NewManager(
64+
domainIDToDomainFn DomainIDToDomainFn,
65+
clusterMetadata cluster.Metadata,
66+
metricsCl metrics.Client,
67+
logger log.Logger,
68+
) Manager {
69+
return &manager{
70+
domainIDToDomainFn: domainIDToDomainFn,
71+
clusterMetadata: clusterMetadata,
72+
metricsCl: metricsCl,
73+
logger: logger.WithTags(tag.ComponentActiveRegionManager),
74+
}
75+
}
76+
77+
func (m *manager) ActiveCluster(ctx context.Context, domainID, wfID, rID string) (string, error) {
78+
d, err := m.domainIDToDomainFn(domainID)
79+
if err != nil {
80+
return "", err
81+
}
82+
83+
if !d.GetReplicationConfig().IsActiveActive() {
84+
// Not an active-active domain. return ActiveClusterName from domain entry
85+
return d.GetReplicationConfig().ActiveClusterName, nil
86+
}
87+
88+
// TODO: Remove below fake implementation and implement properly
89+
// - lookup active region given <domain id, wf id, run id> from executions table RowType=ActiveCluster.
90+
// - cache this info
91+
// - add metrics for cache hit/miss
92+
// - return cluster name
93+
if wfID == "wf1" {
94+
return "cluster0", nil
95+
}
96+
if wfID == "wf2" {
97+
return "cluster1", nil
98+
}
99+
100+
return d.GetReplicationConfig().ActiveClusterName, nil
101+
}
102+
103+
func (m *manager) FailoverVersion(ctx context.Context, domainID, wfID, rID string) (int64, error) {
104+
// TODO: implement this
105+
return 0, nil
106+
}
107+
108+
func (m *manager) Start() {
109+
}
110+
111+
func (m *manager) Stop() {
112+
}

common/activecluster/manager_mock.go

Lines changed: 117 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/cache/domainCache.go

Lines changed: 49 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,16 @@ type (
9797
}
9898

9999
DefaultDomainCache struct {
100-
status int32
101-
shutdownChan chan struct{}
102-
clusterGroup string
103-
cacheNameToID *atomic.Value
104-
cacheByID *atomic.Value
105-
domainManager persistence.DomainManager
106-
timeSource clock.TimeSource
107-
scope metrics.Scope
108-
logger log.Logger
100+
status int32
101+
shutdownChan chan struct{}
102+
clusterGroup string
103+
clusterMetadata cluster.Metadata
104+
cacheNameToID *atomic.Value
105+
cacheByID *atomic.Value
106+
domainManager persistence.DomainManager
107+
timeSource clock.TimeSource
108+
scope metrics.Scope
109+
logger log.Logger
109110

110111
// refresh lock is used to guarantee at most one
111112
// coroutine is doing domain refreshment
@@ -126,11 +127,12 @@ type (
126127

127128
// DomainCacheEntry contains the info and config for a domain
128129
DomainCacheEntry struct {
129-
mu sync.RWMutex
130-
info *persistence.DomainInfo
131-
config *persistence.DomainConfig
132-
replicationConfig *persistence.DomainReplicationConfig
133-
configVersion int64
130+
mu sync.RWMutex
131+
info *persistence.DomainInfo
132+
config *persistence.DomainConfig
133+
replicationConfig *persistence.DomainReplicationConfig
134+
configVersion int64
135+
// failoverVersion is the failover version of domain's active cluster
134136
failoverVersion int64
135137
isGlobalDomain bool
136138
failoverNotificationVersion int64
@@ -169,6 +171,7 @@ func NewDomainCache(
169171
status: domainCacheInitialized,
170172
shutdownChan: make(chan struct{}),
171173
clusterGroup: getClusterGroupIdentifier(metadata),
174+
clusterMetadata: metadata,
172175
cacheNameToID: &atomic.Value{},
173176
cacheByID: &atomic.Value{},
174177
domainManager: domainManager,
@@ -459,6 +462,8 @@ func (c *DefaultDomainCache) refreshDomainsLocked() error {
459462
for continuePage {
460463
ctx, cancel := context.WithTimeout(context.Background(), domainCachePersistenceTimeout)
461464
request.NextPageToken = token
465+
// TODO: update DB layer to support ActiveClusterNames
466+
// Also think about how to support failover overrides and rebalance.
462467
response, err := c.domainManager.ListDomains(ctx, request)
463468
cancel()
464469
if err != nil {
@@ -496,6 +501,7 @@ UpdateLoop:
496501
c.logger.Info("Domain notification is not less than than metadata notification version", tag.WorkflowDomainName(domain.GetInfo().Name))
497502
break UpdateLoop
498503
}
504+
499505
triggerCallback, nextEntry, err := c.updateIDToDomainCache(newCacheByID, domain.info.ID, domain)
500506
if err != nil {
501507
return err
@@ -506,6 +512,7 @@ UpdateLoop:
506512
metrics.DomainTypeTag(nextEntry.isGlobalDomain),
507513
metrics.ClusterGroupTag(c.clusterGroup),
508514
metrics.ActiveClusterTag(nextEntry.replicationConfig.ActiveClusterName),
515+
metrics.IsActiveActiveDomainTag(nextEntry.replicationConfig.IsActiveActive()),
509516
).UpdateGauge(metrics.ActiveClusterGauge, 1)
510517

511518
c.updateNameToIDCache(newCacheNameToID, nextEntry.info.Name, nextEntry.info.ID)
@@ -625,6 +632,11 @@ func (c *DefaultDomainCache) getDomainByID(
625632
) (*DomainCacheEntry, error) {
626633

627634
var result *DomainCacheEntry
635+
defer func() {
636+
if result != nil {
637+
c.logger.Debugf("GetDomainByID returning domain %s, failoverVersion: %d", result.info.Name, result.failoverVersion)
638+
}
639+
}()
628640
entry, cacheHit := c.cacheByID.Load().(Cache).Get(id).(*DomainCacheEntry)
629641
if cacheHit {
630642
entry.mu.RLock()
@@ -695,7 +707,7 @@ func (c *DefaultDomainCache) buildEntryFromRecord(
695707

696708
// this is a shallow copy, but since the record is generated by persistence
697709
// and only accessible here, it would be fine
698-
return &DomainCacheEntry{
710+
entry := &DomainCacheEntry{
699711
info: record.Info,
700712
config: record.Config,
701713
replicationConfig: record.ReplicationConfig,
@@ -708,6 +720,8 @@ func (c *DefaultDomainCache) buildEntryFromRecord(
708720
notificationVersion: record.NotificationVersion,
709721
initialized: true,
710722
}
723+
724+
return entry
711725
}
712726

713727
func copyResetBinary(bins types.BadBinaries) types.BadBinaries {
@@ -749,7 +763,12 @@ func (entry *DomainCacheEntry) duplicate() *DomainCacheEntry {
749763
ActiveClusterName: entry.replicationConfig.ActiveClusterName,
750764
}
751765
for _, clusterCfg := range entry.replicationConfig.Clusters {
752-
result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &*clusterCfg)
766+
c := *clusterCfg
767+
result.replicationConfig.Clusters = append(result.replicationConfig.Clusters, &c)
768+
}
769+
for _, clusterCfg := range entry.replicationConfig.ActiveClusters {
770+
c := *clusterCfg
771+
result.replicationConfig.ActiveClusters = append(result.replicationConfig.ActiveClusters, &c)
753772
}
754773
result.configVersion = entry.configVersion
755774
result.failoverVersion = entry.failoverVersion
@@ -821,12 +840,24 @@ func (entry *DomainCacheEntry) IsActiveIn(currentCluster string) (bool, error) {
821840
}
822841

823842
domainName := entry.GetInfo().Name
824-
activeCluster := entry.GetReplicationConfig().ActiveClusterName
825-
826843
if entry.IsDomainPendingActive() {
827844
return false, errors.NewDomainPendingActiveError(domainName, currentCluster)
828845
}
829846

847+
if len(entry.GetReplicationConfig().ActiveClusters) > 0 {
848+
// TODO: optimize this loop by using a map
849+
var activeClusters []string
850+
for _, cl := range entry.GetReplicationConfig().ActiveClusters {
851+
if cl.ClusterName == currentCluster {
852+
return true, nil
853+
}
854+
activeClusters = append(activeClusters, cl.ClusterName)
855+
}
856+
857+
return false, errors.NewDomainNotActiveError(domainName, currentCluster, activeClusters...)
858+
}
859+
860+
activeCluster := entry.GetReplicationConfig().ActiveClusterName
830861
if currentCluster != activeCluster {
831862
return false, errors.NewDomainNotActiveError(domainName, currentCluster, activeCluster)
832863
}

common/domain/handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,9 @@ func (d *handlerImpl) updateReplicationConfig(
11791179
activeClusterUpdated = true
11801180
config.ActiveClusterName = *updateRequest.ActiveClusterName
11811181
}
1182+
1183+
// TODO: handle active-active case here which would be updateRequest.ActiveClusters != nil.
1184+
11821185
return config, clusterUpdated, activeClusterUpdated, nil
11831186
}
11841187

0 commit comments

Comments
 (0)