Skip to content

Add capability to have multiple domains in replication simulation #6923

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ services:
environment:
- "MAX_HEAP_SIZE=256M"
- "HEAP_NEWSIZE=128M"
expose:
- "9042"
ports:
- "9042:9042"
networks:
services-network:
aliases:
Expand Down
38 changes: 21 additions & 17 deletions simulation/replication/replication_simulation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ func TestReplicationSimulation(t *testing.T) {
simCfg.MustInitClientsFor(t, clusterName)
}

simCfg.MustRegisterDomain(t)
simTypes.Logf(t, "Registering domains")
for domainName := range simCfg.Domains {
simTypes.Logf(t, "Domain: %s", domainName)
simCfg.MustRegisterDomain(t, domainName)
}

// wait for domain data to be replicated and workers to start.
waitUntilWorkersReady(t)
Expand Down Expand Up @@ -115,14 +119,14 @@ func startWorkflow(
) error {
t.Helper()

simTypes.Logf(t, "Starting workflow: %s on cluster: %s", op.WorkflowID, op.Cluster)
simTypes.Logf(t, "Starting workflow: %s on domain %s on cluster: %s", op.WorkflowID, op.Domain, op.Cluster)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).StartWorkflowExecution(ctx,
&types.StartWorkflowExecutionRequest{
RequestID: uuid.New(),
Domain: simCfg.Domain.Name,
Domain: op.Domain,
WorkflowID: op.WorkflowID,
WorkflowType: &types.WorkflowType{Name: simTypes.WorkflowName},
TaskList: &types.TaskList{Name: simTypes.TasklistName},
Expand All @@ -135,7 +139,7 @@ func startWorkflow(
return err
}

simTypes.Logf(t, "Started workflow: %s on cluster: %s. RunID: %s", op.WorkflowID, op.Cluster, resp.GetRunID())
simTypes.Logf(t, "Started workflow: %s on domain: %s on cluster: %s. RunID: %s", op.WorkflowID, op.Domain, op.Cluster, resp.GetRunID())

return nil
}
Expand All @@ -147,24 +151,24 @@ func changeActiveClusters(
) error {
t.Helper()

simTypes.Logf(t, "Changing active clusters to: %v", op.NewActiveClusters)
simTypes.Logf(t, "Changing active clusters for domain %s to: %v", op.Domain, op.NewActiveClusters)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
descResp, err := simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).DescribeDomain(ctx, &types.DescribeDomainRequest{Name: common.StringPtr(simCfg.Domain.Name)})
descResp, err := simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).DescribeDomain(ctx, &types.DescribeDomainRequest{Name: common.StringPtr(op.Domain)})
if err != nil {
return fmt.Errorf("failed to describe domain %s: %w", simCfg.Domain.Name, err)
return fmt.Errorf("failed to describe domain %s: %w", op.Domain, err)
}

if !simCfg.IsActiveActiveDomain() {
if !simCfg.IsActiveActiveDomain(op.Domain) {
fromCluster := descResp.ReplicationConfiguration.ActiveClusterName
toCluster := op.NewActiveClusters[0]

ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err = simCfg.MustGetFrontendClient(t, simCfg.PrimaryCluster).UpdateDomain(ctx,
&types.UpdateDomainRequest{
Name: simCfg.Domain.Name,
Name: op.Domain,
ActiveClusterName: &toCluster,
FailoverTimeoutInSeconds: op.FailoverTimeout,
})
Expand Down Expand Up @@ -196,7 +200,7 @@ func validate(
defer cancel()
resp, err := simCfg.MustGetFrontendClient(t, op.Cluster).DescribeWorkflowExecution(ctx,
&types.DescribeWorkflowExecutionRequest{
Domain: simCfg.Domain.Name,
Domain: op.Domain,
Execution: &types.WorkflowExecution{
WorkflowID: op.WorkflowID,
},
Expand All @@ -214,7 +218,7 @@ func validate(

// Get history to validate the worker identity that started and completed the workflow
// Some workflows start in cluster0 and complete in cluster1. This is to validate that
history, err := getAllHistory(t, simCfg, op.Cluster, op.WorkflowID)
history, err := getAllHistory(t, simCfg, op.Cluster, op.Domain, op.WorkflowID)
if err != nil {
return err
}
Expand All @@ -227,17 +231,17 @@ func validate(
if err != nil {
return err
}
if op.Want.StartedByWorkersInCluster != "" && startedWorker != simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster) {
return fmt.Errorf("workflow %s started by worker %s, expected %s", op.WorkflowID, startedWorker, simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster))
if op.Want.StartedByWorkersInCluster != "" && startedWorker != simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster, op.Domain) {
return fmt.Errorf("workflow %s started by worker %s, expected %s", op.WorkflowID, startedWorker, simTypes.WorkerIdentityFor(op.Want.StartedByWorkersInCluster, op.Domain))
}

completedWorker, err := lastDecisionTaskWorker(history)
if err != nil {
return err
}

if op.Want.CompletedByWorkersInCluster != "" && completedWorker != simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster) {
return fmt.Errorf("workflow %s completed by worker %s, expected %s", op.WorkflowID, completedWorker, simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster))
if op.Want.CompletedByWorkersInCluster != "" && completedWorker != simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster, op.Domain) {
return fmt.Errorf("workflow %s completed by worker %s, expected %s", op.WorkflowID, completedWorker, simTypes.WorkerIdentityFor(op.Want.CompletedByWorkersInCluster, op.Domain))
}

return nil
Expand Down Expand Up @@ -273,14 +277,14 @@ func waitForOpTime(t *testing.T, op *simTypes.Operation, startTime time.Time) {
simTypes.Logf(t, "Operation time (t + %ds) reached: %v", int(op.At.Seconds()), startTime.Add(op.At))
}

func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, wfID string) ([]types.HistoryEvent, error) {
func getAllHistory(t *testing.T, simCfg *simTypes.ReplicationSimulationConfig, clusterName, domainName, wfID string) ([]types.HistoryEvent, error) {
frontendCl := simCfg.MustGetFrontendClient(t, clusterName)
var nextPageToken []byte
var history []types.HistoryEvent
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
response, err := frontendCl.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: simCfg.Domain.Name,
Domain: domainName,
Execution: &types.WorkflowExecution{
WorkflowID: wfID,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,33 @@ clusters:
# primaryCluster is where domain data is written to and replicates to others. e.g. domain registration
primaryCluster: "cluster0"

domain:
name: test-domain-aa
activeClusters:
- cluster0
- cluster1
domains:
test-domain-aa:
name: test-domain-aa
activeClusters:
- cluster0
- cluster1

operations:
- op: start_workflow
at: 0s
workflowID: wf1
cluster: cluster0
domain: test-domain-aa
workflowDuration: 60s

- op: start_workflow
at: 0s
workflowID: wf2
cluster: cluster1
domain: test-domain-aa
workflowDuration: 60s

- op: validate
at: 70s
workflowID: wf1
cluster: cluster0
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster0
Expand All @@ -43,6 +47,7 @@ operations:
at: 70s
workflowID: wf2
cluster: cluster1
domain: test-domain-aa
want:
status: completed
startedByWorkersInCluster: cluster1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,31 @@ clusters:
primaryCluster: "cluster0"


domain:
name: test-domain
activeClusters:
- cluster0
domains:
test-domain:
name: test-domain
activeClusters:
- cluster0

operations:
- op: start_workflow
at: 0s
workflowID: wf1
cluster: cluster0
domain: test-domain
workflowDuration: 35s

- op: change_active_clusters # failover from cluster0 to cluster1
at: 20s
domain: test-domain
newActiveClusters: ["cluster1"]
# failoverTimeoutSec: 5 # unset means force failover. setting it means graceful failover request

- op: validate
at: 120s # todo: this should work at 40s mark
workflowID: wf1
cluster: cluster1
domain: test-domain
want:
status: completed
startedByWorkersInCluster: cluster0
Expand Down
17 changes: 9 additions & 8 deletions simulation/replication/types/repl_sim_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type ReplicationSimulationConfig struct {
// PrimaryCluster is used for domain registration
PrimaryCluster string `yaml:"primaryCluster"`

Domain ReplicationDomainConfig `yaml:"domain"`
Domains map[string]ReplicationDomainConfig `yaml:"domains"`

Operations []*Operation `yaml:"operations"`
}
Expand All @@ -79,6 +79,7 @@ type Operation struct {
WorkflowID string `yaml:"workflowID"`
WorkflowDuration time.Duration `yaml:"workflowDuration"`

Domain string `yaml:"domain"`
NewActiveClusters []string `yaml:"newActiveClusters"`
FailoverTimeout *int32 `yaml:"failoverTimeoutSec"`

Expand Down Expand Up @@ -155,12 +156,12 @@ func (s *ReplicationSimulationConfig) MustInitClientsFor(t *testing.T, clusterNa
Logf(t, "Initialized clients for cluster %s", clusterName)
}

func (s *ReplicationSimulationConfig) IsActiveActiveDomain() bool {
return len(s.Domain.ActiveClusters) > 1
func (s *ReplicationSimulationConfig) IsActiveActiveDomain(domainName string) bool {
return len(s.Domains[domainName].ActiveClusters) > 1
}

func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
Logf(t, "Registering domain: %s", s.Domain.Name)
func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T, domainName string) {
Logf(t, "Registering domain: %s", domainName)
var clusters []*types.ClusterReplicationConfiguration
for name := range s.Clusters {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
Expand All @@ -170,7 +171,7 @@ func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := s.MustGetFrontendClient(t, s.PrimaryCluster).RegisterDomain(ctx, &types.RegisterDomainRequest{
Name: s.Domain.Name,
Name: domainName,
Clusters: clusters,
WorkflowExecutionRetentionPeriodInDays: 1,
IsGlobalDomain: true,
Expand All @@ -183,10 +184,10 @@ func (s *ReplicationSimulationConfig) MustRegisterDomain(t *testing.T) {
if _, ok := err.(*shared.DomainAlreadyExistsError); !ok {
require.NoError(t, err, "failed to register domain")
} else {
Logf(t, "Domain already exists: %s", s.Domain.Name)
Logf(t, "Domains already exists: %s", domainName)
}
return
}

Logf(t, "Registered domain: %s", s.Domain.Name)
Logf(t, "Registered domain: %s", domainName)
}
7 changes: 5 additions & 2 deletions simulation/replication/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ type WorkflowOutput struct {
Count int
}

func WorkerIdentityFor(clusterName string) string {
return fmt.Sprintf("worker-%s", clusterName)
func WorkerIdentityFor(clusterName string, domainName string) string {
if domainName == "" {
return fmt.Sprintf("worker-%s", clusterName)
}
return fmt.Sprintf("worker-%s-%s", domainName, clusterName)
}

func Logf(t *testing.T, msg string, args ...interface{}) {
Expand Down
Loading