Skip to content

Commit 363ecc8

Browse files
committed
Refactor history task key
1 parent 33d8b7d commit 363ecc8

31 files changed

+405
-612
lines changed

common/persistence/nosql/nosql_execution_store.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,7 @@ func (d *nosqlExecutionStore) getImmediateHistoryTasks(
723723
) (*persistence.GetHistoryTasksResponse, error) {
724724
switch request.TaskCategory.ID() {
725725
case persistence.HistoryTaskCategoryIDTransfer:
726-
tasks, nextPageToken, err := d.db.SelectTransferTasksOrderByTaskID(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID)
726+
tasks, nextPageToken, err := d.db.SelectTransferTasksOrderByTaskID(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.GetTaskID(), request.ExclusiveMaxTaskKey.GetTaskID())
727727
if err != nil {
728728
return nil, convertCommonErrors(d.db, "GetImmediateHistoryTasks", err)
729729
}
@@ -749,7 +749,7 @@ func (d *nosqlExecutionStore) getImmediateHistoryTasks(
749749
NextPageToken: nextPageToken,
750750
}, nil
751751
case persistence.HistoryTaskCategoryIDReplication:
752-
tasks, nextPageToken, err := d.db.SelectReplicationTasksOrderByTaskID(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID)
752+
tasks, nextPageToken, err := d.db.SelectReplicationTasksOrderByTaskID(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.GetTaskID(), request.ExclusiveMaxTaskKey.GetTaskID())
753753
if err != nil {
754754
return nil, convertCommonErrors(d.db, "GetImmediateHistoryTasks", err)
755755
}
@@ -785,7 +785,7 @@ func (d *nosqlExecutionStore) getScheduledHistoryTasks(
785785
) (*persistence.GetHistoryTasksResponse, error) {
786786
switch request.TaskCategory.ID() {
787787
case persistence.HistoryTaskCategoryIDTimer:
788-
timers, nextPageToken, err := d.db.SelectTimerTasksOrderByVisibilityTime(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.ScheduledTime, request.ExclusiveMaxTaskKey.ScheduledTime)
788+
timers, nextPageToken, err := d.db.SelectTimerTasksOrderByVisibilityTime(ctx, d.shardID, request.PageSize, request.NextPageToken, request.InclusiveMinTaskKey.GetScheduledTime(), request.ExclusiveMaxTaskKey.GetScheduledTime())
789789
if err != nil {
790790
return nil, convertCommonErrors(d.db, "GetScheduledHistoryTasks", err)
791791
}
@@ -836,7 +836,7 @@ func (d *nosqlExecutionStore) completeScheduledHistoryTask(
836836
) error {
837837
switch request.TaskCategory.ID() {
838838
case persistence.HistoryTaskCategoryIDTimer:
839-
err := d.db.DeleteTimerTask(ctx, d.shardID, request.TaskKey.TaskID, request.TaskKey.ScheduledTime)
839+
err := d.db.DeleteTimerTask(ctx, d.shardID, request.TaskKey.GetTaskID(), request.TaskKey.GetScheduledTime())
840840
if err != nil {
841841
return convertCommonErrors(d.db, "CompleteScheduledHistoryTask", err)
842842
}
@@ -852,13 +852,13 @@ func (d *nosqlExecutionStore) completeImmediateHistoryTask(
852852
) error {
853853
switch request.TaskCategory.ID() {
854854
case persistence.HistoryTaskCategoryIDTransfer:
855-
err := d.db.DeleteTransferTask(ctx, d.shardID, request.TaskKey.TaskID)
855+
err := d.db.DeleteTransferTask(ctx, d.shardID, request.TaskKey.GetTaskID())
856856
if err != nil {
857857
return convertCommonErrors(d.db, "CompleteImmediateHistoryTask", err)
858858
}
859859
return nil
860860
case persistence.HistoryTaskCategoryIDReplication:
861-
err := d.db.DeleteReplicationTask(ctx, d.shardID, request.TaskKey.TaskID)
861+
err := d.db.DeleteReplicationTask(ctx, d.shardID, request.TaskKey.GetTaskID())
862862
if err != nil {
863863
return convertCommonErrors(d.db, "CompleteImmediateHistoryTask", err)
864864
}
@@ -888,7 +888,7 @@ func (d *nosqlExecutionStore) rangeCompleteScheduledHistoryTask(
888888
) (*persistence.RangeCompleteHistoryTaskResponse, error) {
889889
switch request.TaskCategory.ID() {
890890
case persistence.HistoryTaskCategoryIDTimer:
891-
err := d.db.RangeDeleteTimerTasks(ctx, d.shardID, request.InclusiveMinTaskKey.ScheduledTime, request.ExclusiveMaxTaskKey.ScheduledTime)
891+
err := d.db.RangeDeleteTimerTasks(ctx, d.shardID, request.InclusiveMinTaskKey.GetScheduledTime(), request.ExclusiveMaxTaskKey.GetScheduledTime())
892892
if err != nil {
893893
return nil, convertCommonErrors(d.db, "RangeCompleteTimerTask", err)
894894
}
@@ -904,12 +904,12 @@ func (d *nosqlExecutionStore) rangeCompleteImmediateHistoryTask(
904904
) (*persistence.RangeCompleteHistoryTaskResponse, error) {
905905
switch request.TaskCategory.ID() {
906906
case persistence.HistoryTaskCategoryIDTransfer:
907-
err := d.db.RangeDeleteTransferTasks(ctx, d.shardID, request.InclusiveMinTaskKey.TaskID, request.ExclusiveMaxTaskKey.TaskID)
907+
err := d.db.RangeDeleteTransferTasks(ctx, d.shardID, request.InclusiveMinTaskKey.GetTaskID(), request.ExclusiveMaxTaskKey.GetTaskID())
908908
if err != nil {
909909
return nil, convertCommonErrors(d.db, "RangeCompleteTransferTask", err)
910910
}
911911
case persistence.HistoryTaskCategoryIDReplication:
912-
err := d.db.RangeDeleteReplicationTasks(ctx, d.shardID, request.ExclusiveMaxTaskKey.TaskID)
912+
err := d.db.RangeDeleteReplicationTasks(ctx, d.shardID, request.ExclusiveMaxTaskKey.GetTaskID())
913913
if err != nil {
914914
return nil, convertCommonErrors(d.db, "RangeCompleteReplicationTask", err)
915915
}

common/persistence/nosql/nosql_execution_store_test.go

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,8 +1294,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
12941294
name: "success - scheduled timer task",
12951295
request: &persistence.RangeCompleteHistoryTaskRequest{
12961296
TaskCategory: persistence.HistoryTaskCategoryTimer,
1297-
InclusiveMinTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0)},
1298-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0).Add(time.Minute)},
1297+
InclusiveMinTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0), 0),
1298+
ExclusiveMaxTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0).Add(time.Minute), 0),
12991299
},
13001300
setupMock: func(mockDB *nosqlplugin.MockDB) {
13011301
mockDB.EXPECT().RangeDeleteTimerTasks(ctx, shardID, time.Unix(0, 0), time.Unix(0, 0).Add(time.Minute)).Return(nil)
@@ -1306,8 +1306,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
13061306
name: "success - immediate transfer task",
13071307
request: &persistence.RangeCompleteHistoryTaskRequest{
13081308
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1309-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1310-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1309+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1310+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
13111311
},
13121312
setupMock: func(mockDB *nosqlplugin.MockDB) {
13131313
mockDB.EXPECT().RangeDeleteTransferTasks(ctx, shardID, int64(100), int64(200)).Return(nil)
@@ -1318,8 +1318,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
13181318
name: "success - immediate replication task",
13191319
request: &persistence.RangeCompleteHistoryTaskRequest{
13201320
TaskCategory: persistence.HistoryTaskCategoryReplication,
1321-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100}, // this is ignored by replication task
1322-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1321+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100), // this is ignored by replication task
1322+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
13231323
},
13241324
setupMock: func(mockDB *nosqlplugin.MockDB) {
13251325
mockDB.EXPECT().RangeDeleteReplicationTasks(ctx, shardID, int64(200)).Return(nil)
@@ -1338,8 +1338,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
13381338
name: "database error on timer task",
13391339
request: &persistence.RangeCompleteHistoryTaskRequest{
13401340
TaskCategory: persistence.HistoryTaskCategoryTimer,
1341-
InclusiveMinTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0)},
1342-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0).Add(time.Minute)},
1341+
InclusiveMinTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0), 0),
1342+
ExclusiveMaxTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0).Add(time.Minute), 0),
13431343
},
13441344
setupMock: func(mockDB *nosqlplugin.MockDB) {
13451345
mockDB.EXPECT().RangeDeleteTimerTasks(ctx, shardID, time.Unix(0, 0), time.Unix(0, 0).Add(time.Minute)).Return(errors.New("db error"))
@@ -1351,8 +1351,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
13511351
name: "database error on transfer task",
13521352
request: &persistence.RangeCompleteHistoryTaskRequest{
13531353
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1354-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1355-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1354+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1355+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
13561356
},
13571357
setupMock: func(mockDB *nosqlplugin.MockDB) {
13581358
mockDB.EXPECT().RangeDeleteTransferTasks(ctx, shardID, int64(100), int64(200)).Return(errors.New("db error"))
@@ -1364,8 +1364,8 @@ func TestRangeCompleteHistoryTask(t *testing.T) {
13641364
name: "database error on replication task",
13651365
request: &persistence.RangeCompleteHistoryTaskRequest{
13661366
TaskCategory: persistence.HistoryTaskCategoryReplication,
1367-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1368-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1367+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1368+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
13691369
},
13701370
setupMock: func(mockDB *nosqlplugin.MockDB) {
13711371
mockDB.EXPECT().RangeDeleteReplicationTasks(ctx, shardID, int64(200)).Return(errors.New("db error"))
@@ -1411,8 +1411,8 @@ func TestGetHistoryTasks(t *testing.T) {
14111411
name: "success - get immediate transfer tasks",
14121412
request: &persistence.GetHistoryTasksRequest{
14131413
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1414-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1415-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1414+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1415+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
14161416
PageSize: 10,
14171417
NextPageToken: []byte("next-page-token"),
14181418
},
@@ -1456,8 +1456,8 @@ func TestGetHistoryTasks(t *testing.T) {
14561456
name: "success - get immediate transfer tasks from data blob",
14571457
request: &persistence.GetHistoryTasksRequest{
14581458
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1459-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1460-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1459+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1460+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
14611461
PageSize: 10,
14621462
NextPageToken: []byte("next-page-token"),
14631463
},
@@ -1506,8 +1506,8 @@ func TestGetHistoryTasks(t *testing.T) {
15061506
name: "success - get scheduled timer tasks",
15071507
request: &persistence.GetHistoryTasksRequest{
15081508
TaskCategory: persistence.HistoryTaskCategoryTimer,
1509-
InclusiveMinTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0)},
1510-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0).Add(time.Minute)},
1509+
InclusiveMinTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0), 0),
1510+
ExclusiveMaxTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0).Add(time.Minute), 0),
15111511
PageSize: 10,
15121512
NextPageToken: []byte("next-page-token"),
15131513
},
@@ -1549,8 +1549,8 @@ func TestGetHistoryTasks(t *testing.T) {
15491549
name: "success - get scheduled timer tasks from data blob",
15501550
request: &persistence.GetHistoryTasksRequest{
15511551
TaskCategory: persistence.HistoryTaskCategoryTimer,
1552-
InclusiveMinTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0)},
1553-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{ScheduledTime: time.Unix(0, 0).Add(time.Minute)},
1552+
InclusiveMinTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0), 0),
1553+
ExclusiveMaxTaskKey: persistence.NewHistoryTaskKey(time.Unix(0, 0).Add(time.Minute), 0),
15541554
PageSize: 10,
15551555
NextPageToken: []byte("next-page-token"),
15561556
},
@@ -1596,8 +1596,8 @@ func TestGetHistoryTasks(t *testing.T) {
15961596
name: "success - get immediate replication tasks",
15971597
request: &persistence.GetHistoryTasksRequest{
15981598
TaskCategory: persistence.HistoryTaskCategoryReplication,
1599-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1600-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1599+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1600+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
16011601
PageSize: 10,
16021602
NextPageToken: []byte("next-page-token"),
16031603
},
@@ -1646,8 +1646,8 @@ func TestGetHistoryTasks(t *testing.T) {
16461646
name: "success - get immediate replication tasks from data blob",
16471647
request: &persistence.GetHistoryTasksRequest{
16481648
TaskCategory: persistence.HistoryTaskCategoryReplication,
1649-
InclusiveMinTaskKey: persistence.HistoryTaskKey{TaskID: 100},
1650-
ExclusiveMaxTaskKey: persistence.HistoryTaskKey{TaskID: 200},
1649+
InclusiveMinTaskKey: persistence.NewImmediateTaskKey(100),
1650+
ExclusiveMaxTaskKey: persistence.NewImmediateTaskKey(200),
16511651
PageSize: 10,
16521652
NextPageToken: []byte("next-page-token"),
16531653
},
@@ -1779,7 +1779,7 @@ func TestCompleteHistoryTask(t *testing.T) {
17791779
name: "success - complete scheduled timer task",
17801780
request: &persistence.CompleteHistoryTaskRequest{
17811781
TaskCategory: persistence.HistoryTaskCategoryTimer,
1782-
TaskKey: persistence.HistoryTaskKey{TaskID: 1, ScheduledTime: time.Unix(10, 10)},
1782+
TaskKey: persistence.NewHistoryTaskKey(time.Unix(10, 10), 1),
17831783
},
17841784
setupMock: func(mockDB *nosqlplugin.MockDB) {
17851785
mockDB.EXPECT().DeleteTimerTask(ctx, shardID, int64(1), time.Unix(10, 10)).Return(nil)
@@ -1790,7 +1790,7 @@ func TestCompleteHistoryTask(t *testing.T) {
17901790
name: "success - complete immediate transfer task",
17911791
request: &persistence.CompleteHistoryTaskRequest{
17921792
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1793-
TaskKey: persistence.HistoryTaskKey{TaskID: 2},
1793+
TaskKey: persistence.NewImmediateTaskKey(2),
17941794
},
17951795
setupMock: func(mockDB *nosqlplugin.MockDB) {
17961796
mockDB.EXPECT().DeleteTransferTask(ctx, shardID, int64(2)).Return(nil)
@@ -1801,7 +1801,7 @@ func TestCompleteHistoryTask(t *testing.T) {
18011801
name: "success - complete immediate replication task",
18021802
request: &persistence.CompleteHistoryTaskRequest{
18031803
TaskCategory: persistence.HistoryTaskCategoryReplication,
1804-
TaskKey: persistence.HistoryTaskKey{TaskID: 3},
1804+
TaskKey: persistence.NewImmediateTaskKey(3),
18051805
},
18061806
setupMock: func(mockDB *nosqlplugin.MockDB) {
18071807
mockDB.EXPECT().DeleteReplicationTask(ctx, shardID, int64(3)).Return(nil)
@@ -1820,7 +1820,7 @@ func TestCompleteHistoryTask(t *testing.T) {
18201820
name: "delete timer task error",
18211821
request: &persistence.CompleteHistoryTaskRequest{
18221822
TaskCategory: persistence.HistoryTaskCategoryTimer,
1823-
TaskKey: persistence.HistoryTaskKey{TaskID: 1, ScheduledTime: time.Unix(10, 10)},
1823+
TaskKey: persistence.NewHistoryTaskKey(time.Unix(10, 10), 1),
18241824
},
18251825
setupMock: func(mockDB *nosqlplugin.MockDB) {
18261826
mockDB.EXPECT().DeleteTimerTask(ctx, shardID, int64(1), time.Unix(10, 10)).Return(errors.New("db error"))
@@ -1832,7 +1832,7 @@ func TestCompleteHistoryTask(t *testing.T) {
18321832
name: "delete transfer task error",
18331833
request: &persistence.CompleteHistoryTaskRequest{
18341834
TaskCategory: persistence.HistoryTaskCategoryTransfer,
1835-
TaskKey: persistence.HistoryTaskKey{TaskID: 2},
1835+
TaskKey: persistence.NewImmediateTaskKey(2),
18361836
},
18371837
setupMock: func(mockDB *nosqlplugin.MockDB) {
18381838
mockDB.EXPECT().DeleteTransferTask(ctx, shardID, int64(2)).Return(errors.New("db error"))
@@ -1844,7 +1844,7 @@ func TestCompleteHistoryTask(t *testing.T) {
18441844
name: "delete replication task error",
18451845
request: &persistence.CompleteHistoryTaskRequest{
18461846
TaskCategory: persistence.HistoryTaskCategoryReplication,
1847-
TaskKey: persistence.HistoryTaskKey{TaskID: 3},
1847+
TaskKey: persistence.NewImmediateTaskKey(3),
18481848
},
18491849
setupMock: func(mockDB *nosqlplugin.MockDB) {
18501850
mockDB.EXPECT().DeleteReplicationTask(ctx, shardID, int64(3)).Return(errors.New("db error"))

0 commit comments

Comments
 (0)