Skip to content

Commit e87850b

Browse files
committed
Add task list partition config to schema
1 parent a32159c commit e87850b

33 files changed

+1174
-85
lines changed

.gen/go/sqlblobs/sqlblobs.go

Lines changed: 418 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/log/tag/values.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ var (
225225
StoreOperationCompleteTask = storeOperation("complete-task")
226226
StoreOperationCompleteTasksLessThan = storeOperation("complete-tasks-less-than")
227227
StoreOperationLeaseTaskList = storeOperation("lease-task-list")
228+
StoreOperationGetTaskList = storeOperation("get-task-list")
228229
StoreOperationUpdateTaskList = storeOperation("update-task-list")
229230
StoreOperationListTaskList = storeOperation("list-task-list")
230231
StoreOperationDeleteTaskList = storeOperation("delete-task-list")

common/metrics/defs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ const (
213213
PersistenceGetOrphanTasksScope
214214
// PersistenceLeaseTaskListScope tracks LeaseTaskList calls made by service to persistence layer
215215
PersistenceLeaseTaskListScope
216+
// PersistenceGetTaskListScope tracks GetTaskList calls made by service to persistence layer
217+
PersistenceGetTaskListScope
216218
// PersistenceUpdateTaskListScope tracks PersistenceUpdateTaskListScope calls made by service to persistence layer
217219
PersistenceUpdateTaskListScope
218220
// PersistenceListTaskListScope is the metric scope for persistence.TaskManager.ListTaskList API
@@ -1418,6 +1420,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
14181420
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan"},
14191421
PersistenceGetOrphanTasksScope: {operation: "GetOrphanTasks"},
14201422
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList"},
1423+
PersistenceGetTaskListScope: {operation: "GetTaskList"},
14211424
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"},
14221425
PersistenceListTaskListScope: {operation: "ListTaskList"},
14231426
PersistenceDeleteTaskListScope: {operation: "DeleteTaskList"},

common/mocks/TaskManager.go

Lines changed: 28 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/dataManagerInterfaces_mock.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/data_manager_interfaces.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -448,14 +448,22 @@ type (
448448

449449
// TaskListInfo describes a state of a task list implementation.
450450
TaskListInfo struct {
451-
DomainID string
452-
Name string
453-
TaskType int
454-
RangeID int64
455-
AckLevel int64
456-
Kind int
457-
Expiry time.Time
458-
LastUpdated time.Time
451+
DomainID string
452+
Name string
453+
TaskType int
454+
RangeID int64
455+
AckLevel int64
456+
Kind int
457+
Expiry time.Time
458+
LastUpdated time.Time
459+
AdaptivePartitionConfig *TaskListPartitionConfig
460+
}
461+
462+
// TaskListPartitionConfig represents the configuration for task list partitions.
463+
TaskListPartitionConfig struct {
464+
Version int64
465+
NumReadPartitions int
466+
NumWritePartitions int
459467
}
460468

461469
// TaskInfo describes either activity or decision task
@@ -1018,6 +1026,17 @@ type (
10181026
TaskListInfo *TaskListInfo
10191027
}
10201028

1029+
GetTaskListRequest struct {
1030+
DomainID string
1031+
DomainName string
1032+
TaskList string
1033+
TaskType int
1034+
}
1035+
1036+
GetTaskListResponse struct {
1037+
TaskListInfo *TaskListInfo
1038+
}
1039+
10211040
// UpdateTaskListRequest is used to update task list implementation information
10221041
UpdateTaskListRequest struct {
10231042
TaskListInfo *TaskListInfo
@@ -1577,6 +1596,7 @@ type (
15771596
GetName() string
15781597
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
15791598
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
1599+
GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error)
15801600
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
15811601
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error
15821602
GetTaskListSize(ctx context.Context, request *GetTaskListSizeRequest) (*GetTaskListSizeResponse, error)

common/persistence/data_store_interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type (
5757
Closeable
5858
GetName() string
5959
LeaseTaskList(ctx context.Context, request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
60+
GetTaskList(ctx context.Context, request *GetTaskListRequest) (*GetTaskListResponse, error)
6061
UpdateTaskList(ctx context.Context, request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
6162
ListTaskList(ctx context.Context, request *ListTaskListRequest) (*ListTaskListResponse, error)
6263
DeleteTaskList(ctx context.Context, request *DeleteTaskListRequest) error

common/persistence/nosql/nosql_task_store.go

Lines changed: 53 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,14 @@ func (t *nosqlTaskStore) LeaseTaskList(
141141
currTL.RangeID++
142142

143143
err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{
144-
DomainID: request.DomainID,
145-
TaskListName: request.TaskList,
146-
TaskListType: request.TaskType,
147-
RangeID: currTL.RangeID,
148-
TaskListKind: currTL.TaskListKind,
149-
AckLevel: currTL.AckLevel,
150-
LastUpdatedTime: now,
144+
DomainID: request.DomainID,
145+
TaskListName: request.TaskList,
146+
TaskListType: request.TaskType,
147+
RangeID: currTL.RangeID,
148+
TaskListKind: currTL.TaskListKind,
149+
AckLevel: currTL.AckLevel,
150+
LastUpdatedTime: now,
151+
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
151152
}, currTL.RangeID-1)
152153
}
153154
if err != nil {
@@ -161,31 +162,62 @@ func (t *nosqlTaskStore) LeaseTaskList(
161162
return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
162163
}
163164
tli := &persistence.TaskListInfo{
164-
DomainID: request.DomainID,
165-
Name: request.TaskList,
166-
TaskType: request.TaskType,
167-
RangeID: currTL.RangeID,
168-
AckLevel: currTL.AckLevel,
169-
Kind: request.TaskListKind,
170-
LastUpdated: now,
165+
DomainID: request.DomainID,
166+
Name: request.TaskList,
167+
TaskType: request.TaskType,
168+
RangeID: currTL.RangeID,
169+
AckLevel: currTL.AckLevel,
170+
Kind: request.TaskListKind,
171+
LastUpdated: now,
172+
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
171173
}
172174
return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil
173175
}
174176

177+
func (t *nosqlTaskStore) GetTaskList(
178+
ctx context.Context,
179+
request *persistence.GetTaskListRequest,
180+
) (*persistence.GetTaskListResponse, error) {
181+
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
182+
if err != nil {
183+
return nil, err
184+
}
185+
currTL, err := storeShard.db.SelectTaskList(ctx, &nosqlplugin.TaskListFilter{
186+
DomainID: request.DomainID,
187+
TaskListName: request.TaskList,
188+
TaskListType: request.TaskType,
189+
})
190+
if err != nil {
191+
return nil, convertCommonErrors(storeShard.db, "GetTaskList", err)
192+
}
193+
tli := &persistence.TaskListInfo{
194+
DomainID: request.DomainID,
195+
Name: request.TaskList,
196+
TaskType: request.TaskType,
197+
RangeID: currTL.RangeID,
198+
AckLevel: currTL.AckLevel,
199+
Kind: currTL.TaskListKind,
200+
LastUpdated: currTL.LastUpdatedTime,
201+
AdaptivePartitionConfig: currTL.AdaptivePartitionConfig,
202+
}
203+
return &persistence.GetTaskListResponse{TaskListInfo: tli}, nil
204+
}
205+
175206
func (t *nosqlTaskStore) UpdateTaskList(
176207
ctx context.Context,
177208
request *persistence.UpdateTaskListRequest,
178209
) (*persistence.UpdateTaskListResponse, error) {
179210
tli := request.TaskListInfo
180211
var err error
181212
taskListToUpdate := &nosqlplugin.TaskListRow{
182-
DomainID: tli.DomainID,
183-
TaskListName: tli.Name,
184-
TaskListType: tli.TaskType,
185-
RangeID: tli.RangeID,
186-
TaskListKind: tli.Kind,
187-
AckLevel: tli.AckLevel,
188-
LastUpdatedTime: time.Now(),
213+
DomainID: tli.DomainID,
214+
TaskListName: tli.Name,
215+
TaskListType: tli.TaskType,
216+
RangeID: tli.RangeID,
217+
TaskListKind: tli.Kind,
218+
AckLevel: tli.AckLevel,
219+
LastUpdatedTime: time.Now(),
220+
AdaptivePartitionConfig: tli.AdaptivePartitionConfig,
189221
}
190222
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
191223
if err != nil {

common/persistence/nosql/nosql_task_store_test.go

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package nosql
2424

2525
import (
2626
"context"
27+
"errors"
2728
"testing"
2829
"time"
2930

@@ -154,7 +155,8 @@ func TestLeaseTaskList_selectErrNotFound(t *testing.T) {
154155
// We then expect the tasklist to be inserted
155156
db.EXPECT().InsertTaskList(gomock.Any(), gomock.Any()).
156157
DoAndReturn(func(ctx context.Context, taskList *nosqlplugin.TaskListRow) error {
157-
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
158+
tl := getExpectedTaskListRow()
159+
checkTaskListRowExpected(t, tl, taskList)
158160
return nil
159161
})
160162

@@ -242,12 +244,34 @@ func TestLeaseTaskList_RenewUpdateFailed_OtherError(t *testing.T) {
242244
assert.ErrorContains(t, err, assert.AnError.Error())
243245
}
244246

247+
func TestGetTaskList_Success(t *testing.T) {
248+
store, db := setupNoSQLStoreMocks(t)
249+
250+
taskListRow := getExpectedTaskListRow()
251+
db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(taskListRow, nil)
252+
resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest())
253+
254+
assert.NoError(t, err)
255+
checkTaskListInfoExpected(t, resp.TaskListInfo)
256+
}
257+
258+
func TestGetTaskList_NotFound(t *testing.T) {
259+
store, db := setupNoSQLStoreMocks(t)
260+
261+
db.EXPECT().SelectTaskList(gomock.Any(), getDecisionTaskListFilter()).Return(nil, errors.New("not found"))
262+
db.EXPECT().IsNotFoundError(gomock.Any()).Return(true)
263+
resp, err := store.GetTaskList(context.Background(), getValidGetTaskListRequest())
264+
265+
assert.ErrorAs(t, err, new(*types.EntityNotExistsError))
266+
assert.Nil(t, resp)
267+
}
268+
245269
func TestUpdateTaskList(t *testing.T) {
246270
store, db := setupNoSQLStoreMocks(t)
247271

248272
db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
249273
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
250-
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
274+
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
251275
return nil
252276
},
253277
)
@@ -265,7 +289,7 @@ func TestUpdateTaskList_Sticky(t *testing.T) {
265289

266290
db.EXPECT().UpdateTaskListWithTTL(gomock.Any(), stickyTaskListTTL, gomock.Any(), int64(1)).DoAndReturn(
267291
func(ctx context.Context, ttlSeconds int64, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
268-
expectedTaskList := getExpectedTaskListRow()
292+
expectedTaskList := getExpectedTaskListRowWithPartitionConfig()
269293
expectedTaskList.TaskListKind = int(types.TaskListKindSticky)
270294
checkTaskListRowExpected(t, expectedTaskList, taskList)
271295
return nil
@@ -288,7 +312,7 @@ func TestUpdateTaskList_ConditionFailure(t *testing.T) {
288312

289313
db.EXPECT().UpdateTaskList(gomock.Any(), gomock.Any(), int64(1)).DoAndReturn(
290314
func(ctx context.Context, taskList *nosqlplugin.TaskListRow, previousRangeID int64) error {
291-
checkTaskListRowExpected(t, getExpectedTaskListRow(), taskList)
315+
checkTaskListRowExpected(t, getExpectedTaskListRowWithPartitionConfig(), taskList)
292316
return &nosqlplugin.TaskOperationConditionFailure{Details: "test-details"}
293317
},
294318
)
@@ -431,6 +455,15 @@ func getValidLeaseTaskListRequest() *persistence.LeaseTaskListRequest {
431455
}
432456
}
433457

458+
func getValidGetTaskListRequest() *persistence.GetTaskListRequest {
459+
return &persistence.GetTaskListRequest{
460+
DomainID: TestDomainID,
461+
DomainName: TestDomainName,
462+
TaskList: TestTaskListName,
463+
TaskType: int(types.TaskListTypeDecision),
464+
}
465+
}
466+
434467
func checkTaskListInfoExpected(t *testing.T, taskListInfo *persistence.TaskListInfo) {
435468
assert.Equal(t, TestDomainID, taskListInfo.DomainID)
436469
assert.Equal(t, TestTaskListName, taskListInfo.Name)
@@ -484,6 +517,23 @@ func getExpectedTaskListRow() *nosqlplugin.TaskListRow {
484517
}
485518
}
486519

520+
func getExpectedTaskListRowWithPartitionConfig() *nosqlplugin.TaskListRow {
521+
return &nosqlplugin.TaskListRow{
522+
DomainID: TestDomainID,
523+
TaskListName: TestTaskListName,
524+
TaskListType: int(types.TaskListTypeDecision),
525+
RangeID: initialRangeID,
526+
TaskListKind: int(types.TaskListKindNormal),
527+
AckLevel: initialAckLevel,
528+
LastUpdatedTime: time.Now(),
529+
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
530+
Version: 1,
531+
NumReadPartitions: 2,
532+
NumWritePartitions: 2,
533+
},
534+
}
535+
}
536+
487537
func checkTaskListRowExpected(t *testing.T, expectedRow *nosqlplugin.TaskListRow, taskList *nosqlplugin.TaskListRow) {
488538
// Check the duration
489539
assert.WithinDuration(t, expectedRow.LastUpdatedTime, taskList.LastUpdatedTime, time.Second)
@@ -502,6 +552,11 @@ func getExpectedTaskListInfo() *persistence.TaskListInfo {
502552
AckLevel: initialAckLevel,
503553
Kind: int(types.TaskListKindNormal),
504554
LastUpdated: time.Now(),
555+
AdaptivePartitionConfig: &persistence.TaskListPartitionConfig{
556+
Version: 1,
557+
NumReadPartitions: 2,
558+
NumWritePartitions: 2,
559+
},
505560
}
506561
}
507562

0 commit comments

Comments
 (0)