Skip to content

Commit 21f964a

Browse files
committed
Introduce a kill switch for dual write mode
1 parent 7cb78ca commit 21f964a

16 files changed

+56
-16
lines changed

common/dynamicconfig/constants.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,6 +2037,8 @@ const (
20372037
// Allowed filters: DomainName,TasklistName,TasklistType
20382038
MatchingEnableClientAutoConfig
20392039

2040+
EnableNoSQLHistoryTaskDualWriteMode
2041+
20402042
// LastBoolKey must be the last one in this const group
20412043
LastBoolKey
20422044
)
@@ -4371,6 +4373,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
43714373
Description: "MatchingEnableClientAutoConfig is to enable auto config on worker side",
43724374
DefaultValue: false,
43734375
},
4376+
EnableNoSQLHistoryTaskDualWriteMode: {
4377+
KeyName: "history.enableNoSQLHistoryTaskDualWrite",
4378+
Description: "EnableHistoryTaskDualWrite is to enable dual write of history events",
4379+
DefaultValue: false,
4380+
},
43744381
}
43754382

43764383
var FloatKeys = map[FloatKey]DynamicFloat{

common/persistence/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type (
2929
EnableCassandraAllConsistencyLevelDelete dynamicconfig.BoolPropertyFn
3030
PersistenceSampleLoggingRate dynamicconfig.IntPropertyFn
3131
EnableShardIDMetrics dynamicconfig.BoolPropertyFn
32+
EnableHistoryTaskDualWriteMode dynamicconfig.BoolPropertyFn
3233
}
3334
)
3435

@@ -39,5 +40,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
3940
EnableCassandraAllConsistencyLevelDelete: dc.GetBoolProperty(dynamicconfig.EnableCassandraAllConsistencyLevelDelete),
4041
PersistenceSampleLoggingRate: dc.GetIntProperty(dynamicconfig.SampleLoggingRate),
4142
EnableShardIDMetrics: dc.GetBoolProperty(dynamicconfig.EnableShardIDMetrics),
43+
EnableHistoryTaskDualWriteMode: dc.GetBoolProperty(dynamicconfig.EnableNoSQLHistoryTaskDualWriteMode),
4244
}
4345
}

common/persistence/nosql/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (f *executionStoreFactory) new(shardID int) (persistence.ExecutionStore, er
162162
if err != nil {
163163
return nil, err
164164
}
165-
pmgr, err := NewExecutionStore(shardID, storeShard.db, f.logger, f.taskSerializer)
165+
pmgr, err := NewExecutionStore(shardID, storeShard.db, f.logger, f.taskSerializer, storeShard.dc)
166166
if err != nil {
167167
return nil, err
168168
}

common/persistence/nosql/nosql_execution_store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,13 @@ func NewExecutionStore(
4545
db nosqlplugin.DB,
4646
logger log.Logger,
4747
taskSerializer serialization.TaskSerializer,
48+
dc *persistence.DynamicConfiguration,
4849
) (persistence.ExecutionStore, error) {
4950
return &nosqlExecutionStore{
5051
nosqlStore: nosqlStore{
5152
logger: logger,
5253
db: db,
54+
dc: dc,
5355
},
5456
shardID: shardID,
5557
taskSerializer: taskSerializer,

common/persistence/nosql/nosql_execution_store_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"go.uber.org/mock/gomock"
3434

3535
"github.com/uber/cadence/common"
36+
"github.com/uber/cadence/common/dynamicconfig"
3637
"github.com/uber/cadence/common/log"
3738
"github.com/uber/cadence/common/persistence"
3839
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
@@ -286,7 +287,9 @@ func TestUpdateWorkflowExecution(t *testing.T) {
286287
controller := gomock.NewController(t)
287288
mockDB := nosqlplugin.NewMockDB(controller)
288289
mockTaskSerializer := serialization.NewMockTaskSerializer(controller)
289-
store, _ := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer)
290+
store, _ := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer, &persistence.DynamicConfiguration{
291+
EnableHistoryTaskDualWriteMode: func(...dynamicconfig.FilterOption) bool { return false },
292+
})
290293

291294
tc.setupMock(mockDB, 1)
292295

@@ -1509,7 +1512,9 @@ func TestConflictResolveWorkflowExecution(t *testing.T) {
15091512

15101513
mockDB := nosqlplugin.NewMockDB(gomockController)
15111514
mockTaskSerializer := serialization.NewMockTaskSerializer(gomockController)
1512-
store, err := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer)
1515+
store, err := NewExecutionStore(1, mockDB, log.NewNoop(), mockTaskSerializer, &persistence.DynamicConfiguration{
1516+
EnableHistoryTaskDualWriteMode: func(...dynamicconfig.FilterOption) bool { return false },
1517+
})
15131518
require.NoError(t, err)
15141519

15151520
tests := []struct {

common/persistence/nosql/nosql_execution_store_util.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,17 @@ func (d *nosqlExecutionStore) prepareTimerTasksForWorkflowTxn(domainID, workflow
256256
ScheduleAttempt: attempt,
257257
Version: task.GetVersion(),
258258
}
259-
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryTimer, task)
260-
if err != nil {
261-
return nil, err
259+
var blob *persistence.DataBlob
260+
if d.dc.EnableHistoryTaskDualWriteMode() {
261+
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryTimer, task)
262+
if err != nil {
263+
return nil, err
264+
}
265+
blob = &data
262266
}
263267
tasks = append(tasks, &nosqlplugin.HistoryMigrationTask{
264268
Timer: nt,
265-
Task: &data,
269+
Task: blob,
266270
})
267271
}
268272

@@ -316,13 +320,17 @@ func (d *nosqlExecutionStore) prepareReplicationTasksForWorkflowTxn(domainID, wo
316320
BranchToken: branchToken,
317321
NewRunBranchToken: newRunBranchToken,
318322
}
319-
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryReplication, task)
320-
if err != nil {
321-
return nil, err
323+
var blob *persistence.DataBlob
324+
if d.dc.EnableHistoryTaskDualWriteMode() {
325+
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryReplication, task)
326+
if err != nil {
327+
return nil, err
328+
}
329+
blob = &data
322330
}
323331
tasks = append(tasks, &nosqlplugin.HistoryMigrationTask{
324332
Replication: nt,
325-
Task: &data,
333+
Task: blob,
326334
})
327335
}
328336

@@ -444,13 +452,17 @@ func (d *nosqlExecutionStore) prepareTransferTasksForWorkflowTxn(domainID, workf
444452
ScheduleID: scheduleID,
445453
Version: task.GetVersion(),
446454
}
447-
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryTransfer, task)
448-
if err != nil {
449-
return nil, err
455+
var blob *persistence.DataBlob
456+
if d.dc.EnableHistoryTaskDualWriteMode() {
457+
data, err := d.taskSerializer.SerializeTask(persistence.HistoryTaskCategoryTransfer, task)
458+
if err != nil {
459+
return nil, err
460+
}
461+
blob = &data
450462
}
451463
tasks = append(tasks, &nosqlplugin.HistoryMigrationTask{
452464
Transfer: t,
453-
Task: &data,
465+
Task: blob,
454466
})
455467
}
456468
return tasks, nil

common/persistence/nosql/nosql_execution_store_util_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232

3333
"github.com/uber/cadence/common"
3434
"github.com/uber/cadence/common/checksum"
35+
"github.com/uber/cadence/common/dynamicconfig"
3536
"github.com/uber/cadence/common/log"
3637
"github.com/uber/cadence/common/persistence"
3738
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
@@ -42,7 +43,7 @@ import (
4243
func newTestNosqlExecutionStoreWithTaskSerializer(db nosqlplugin.DB, logger log.Logger, taskSerializer serialization.TaskSerializer) *nosqlExecutionStore {
4344
return &nosqlExecutionStore{
4445
shardID: 1,
45-
nosqlStore: nosqlStore{logger: logger, db: db},
46+
nosqlStore: nosqlStore{logger: logger, db: db, dc: &persistence.DynamicConfiguration{EnableHistoryTaskDualWriteMode: func(...dynamicconfig.FilterOption) bool { return true }}},
4647
taskSerializer: taskSerializer,
4748
}
4849
}

common/persistence/nosql/nosql_store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ package nosql
2222

2323
import (
2424
"github.com/uber/cadence/common/log"
25+
"github.com/uber/cadence/common/persistence"
2526
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
2627
)
2728

2829
// a shared struct for all stores in this package
2930
type nosqlStore struct {
3031
logger log.Logger
3132
db nosqlplugin.DB
33+
dc *persistence.DynamicConfiguration
3234
}
3335

3436
func (nm *nosqlStore) GetName() string {

common/persistence/persistence-tests/persistenceTestBase.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func NewTestBaseWithNoSQL(t *testing.T, options *TestBaseOptions) *TestBase {
153153
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
154154
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
155155
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
156+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
156157
}
157158
params := TestBaseParams{
158159
DefaultTestCluster: testCluster,
@@ -181,6 +182,7 @@ func NewTestBaseWithSQL(t *testing.T, options *TestBaseOptions) *TestBase {
181182
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
182183
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
183184
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
185+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
184186
}
185187
params := TestBaseParams{
186188
DefaultTestCluster: testCluster,

host/async_wf_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ func (s *AsyncWFIntegrationSuite) SetupSuite() {
9696
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
9797
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
9898
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
99+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
99100
}
100101
params := pt.TestBaseParams{
101102
DefaultTestCluster: s.DefaultTestCluster,

host/integrationbase.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (s *IntegrationBase) setupSuite() {
111111
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
112112
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
113113
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
114+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
114115
}
115116
params := pt.TestBaseParams{
116117
DefaultTestCluster: s.DefaultTestCluster,

host/ndc/integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func (s *NDCIntegrationTestSuite) SetupSuite() {
9999
dc := persistence.DynamicConfiguration{
100100
EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false),
101101
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
102+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
102103
}
103104
params := pt.TestBaseParams{
104105
DefaultTestCluster: s.defaultTestCluster,

host/pinot_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ func (s *PinotIntegrationSuite) SetupSuite() {
111111
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
112112
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
113113
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
114+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
114115
}
115116
params := pt.TestBaseParams{
116117
DefaultTestCluster: s.DefaultTestCluster,

host/workflowidratelimit_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ func (s *WorkflowIDRateLimitIntegrationSuite) SetupSuite() {
7373
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
7474
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
7575
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
76+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
7677
}
7778
params := pt.TestBaseParams{
7879
DefaultTestCluster: s.DefaultTestCluster,

host/workflowsidinternalratelimit_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ func (s *WorkflowIDInternalRateLimitIntegrationSuite) SetupSuite() {
7777
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
7878
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
7979
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
80+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
8081
}
8182
params := pt.TestBaseParams{
8283
DefaultTestCluster: s.DefaultTestCluster,

simulation/matching/matching_simulation_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ func (s *MatchingSimulationSuite) SetupSuite() {
167167
EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true),
168168
PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100),
169169
EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true),
170+
EnableHistoryTaskDualWriteMode: dynamicconfig.GetBoolPropertyFn(true),
170171
}
171172
params := pt.TestBaseParams{
172173
DefaultTestCluster: s.DefaultTestCluster,

0 commit comments

Comments
 (0)