Skip to content

Commit 0ba1eba

Browse files
authored
Use Cassandra's ttl to populate Expiry field in persistence.TaskInfo (#6624)
* Use Cassandra's ttl to populate Expiry field in persistence.TaskInfo
1 parent bf9f526 commit 0ba1eba

File tree

5 files changed

+39
-9
lines changed

5 files changed

+39
-9
lines changed

common/persistence/nosql/nosqlplugin/cassandra/tasks.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ func (db *cdb) GetTasksCount(ctx context.Context, filter *nosqlplugin.TasksFilte
351351

352352
// SelectTasks return tasks that associated to a tasklist
353353
func (db *cdb) SelectTasks(ctx context.Context, filter *nosqlplugin.TasksFilter) ([]*nosqlplugin.TaskRow, error) {
354-
// Reading tasklist tasks need to be quorum level consistent, otherwise we could loose task
354+
// Reading tasklist tasks need to be quorum level consistent, otherwise we could lose tasks
355355
query := db.session.Query(templateGetTasksQuery,
356356
filter.DomainID,
357357
filter.TaskListName,
@@ -374,8 +374,25 @@ PopulateTasks:
374374
if !ok { // no tasks, but static column record returned
375375
continue
376376
}
377+
378+
// Extract the TTL value
379+
ttlValue, ttlExists := task["ttl"]
380+
381+
// Check if TTL is null or an integer
382+
var ttl *int
383+
if ttlExists && ttlValue != nil {
384+
if ttlInt, ok := ttlValue.(int); ok {
385+
ttl = &ttlInt // TTL is an integer
386+
}
387+
}
388+
377389
t := createTaskInfo(task["task"].(map[string]interface{}))
378390
t.TaskID = taskID.(int64)
391+
392+
if ttl != nil {
393+
t.Expiry = db.timeSrc.Now().Add(time.Duration(*ttl) * time.Second)
394+
}
395+
379396
response = append(response, t)
380397
if len(response) == filter.BatchSize {
381398
break PopulateTasks

common/persistence/nosql/nosqlplugin/cassandra/tasks_cql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ const (
4949
`domain_id, task_list_name, task_list_type, type, task_id, task) ` +
5050
`VALUES(?, ?, ?, ?, ?, ` + templateTaskType + `) USING TTL ?`
5151

52-
templateGetTasksQuery = `SELECT task_id, task ` +
52+
templateGetTasksQuery = `SELECT task_id, task, TTL(task) AS ttl ` +
5353
`FROM tasks ` +
5454
`WHERE domain_id = ? ` +
5555
`and task_list_name = ? ` +

common/persistence/nosql/nosqlplugin/cassandra/tasks_test.go

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -922,23 +922,25 @@ func TestSelectTasks(t *testing.T) {
922922
"task_id": int64(1),
923923
"task": map[string]interface{}{
924924
"domain_id": &fakeUUID{uuid: "domain1"},
925-
"wofklow_id": "wid1",
925+
"workflow_id": "wid1",
926926
"schedule_id": int64(42),
927927
"created_time": ts,
928928
"run_id": &fakeUUID{uuid: "runid1"},
929929
"partition_config": map[string]string{},
930930
},
931+
"ttl": 2,
931932
},
932933
{
933934
"task_id": int64(2),
934935
"task": map[string]interface{}{
935936
"domain_id": &fakeUUID{uuid: "domain1"},
936-
"worklow_id": "wid1",
937+
"workflow_id": "wid1",
937938
"schedule_id": int64(45),
938939
"created_time": ts,
939940
"run_id": &fakeUUID{uuid: "runid1"},
940941
"partition_config": map[string]string{},
941942
},
943+
"ttl": nil,
942944
},
943945
{
944946
"missing_task_id": int64(1), // missing task_id so this row will be skipped
@@ -947,54 +949,62 @@ func TestSelectTasks(t *testing.T) {
947949
"task_id": int64(3),
948950
"task": map[string]interface{}{
949951
"domain_id": &fakeUUID{uuid: "domain1"},
950-
"worklow_id": "wid1",
952+
"workflow_id": "wid1",
951953
"schedule_id": int64(48),
952954
"created_time": ts,
953955
"run_id": &fakeUUID{uuid: "runid1"},
954956
"partition_config": map[string]string{},
955957
},
958+
"ttl": 4,
956959
},
957960
{
958961
"task_id": int64(4), // this will be skipped because filter.BatchSize is reached
959962
"task": map[string]interface{}{
960963
"domain_id": &fakeUUID{uuid: "domain1"},
961-
"worklow_id": "wid1",
964+
"workflow_id": "wid1",
962965
"schedule_id": int64(51),
963966
"created_time": ts,
964967
"run_id": &fakeUUID{uuid: "runid1"},
965968
"partition_config": map[string]string{},
966969
},
970+
"ttl": 5,
967971
},
968972
},
969973
},
970974
wantTasks: []*nosqlplugin.TaskRow{
971975
{
972976
DomainID: "domain1",
973977
TaskID: 1,
978+
WorkflowID: "wid1",
974979
RunID: "runid1",
975980
ScheduledID: 42,
981+
Expiry: ts.Add(time.Second * 2),
976982
CreatedTime: ts,
977983
PartitionConfig: map[string]string{},
978984
},
979985
{
980986
DomainID: "domain1",
981987
TaskID: 2,
988+
WorkflowID: "wid1",
982989
RunID: "runid1",
983990
ScheduledID: 45,
991+
Expiry: time.Time{},
984992
CreatedTime: ts,
985993
PartitionConfig: map[string]string{},
986994
},
987995
{
988996
DomainID: "domain1",
989997
TaskID: 3,
998+
WorkflowID: "wid1",
990999
RunID: "runid1",
9911000
ScheduledID: 48,
1001+
Expiry: ts.Add(time.Second * 4),
9921002
CreatedTime: ts,
9931003
PartitionConfig: map[string]string{},
9941004
},
9951005
},
9961006
wantQueries: []string{
997-
`SELECT task_id, task FROM tasks WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 0 and task_id > 0 and task_id <= 100`,
1007+
`SELECT task_id, task, TTL(task) AS ttl FROM tasks WHERE domain_id = domain1 and task_list_name = tasklist1 and task_list_type = 1 and type = 0 and task_id > 0 and task_id <= 100`,
9981008
},
9991009
},
10001010
}
@@ -1017,7 +1027,9 @@ func TestSelectTasks(t *testing.T) {
10171027
client := gocql.NewMockClient(ctrl)
10181028
cfg := &config.NoSQL{}
10191029
logger := testlogger.New(t)
1020-
db := newCassandraDBFromSession(cfg, session, logger, nil, dbWithClient(client))
1030+
timeSrc := clock.NewMockedTimeSourceAt(ts)
1031+
1032+
db := newCassandraDBFromSession(cfg, session, logger, nil, dbWithClient(client), dbWithTimeSource(timeSrc))
10211033

10221034
gotRows, err := db.SelectTasks(context.Background(), tc.filter)
10231035

common/persistence/nosql/nosqlplugin/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ type (
168168
WorkflowID string
169169
RunID string
170170
ScheduledID int64
171+
Expiry time.Time
171172
CreatedTime time.Time
172173
PartitionConfig map[string]string
173174
}

service/matching/tasklist/task_reader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ func (tr *taskReader) getTaskBatch(readLevel, maxReadLevel int64) ([]*persistenc
299299
}
300300

301301
func (tr *taskReader) isTaskExpired(t *persistence.TaskInfo) bool {
302-
return t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
302+
return !t.Expiry.IsZero() && t.Expiry.After(epochStartTime) && tr.timeSource.Now().After(t.Expiry)
303303
}
304304

305305
func (tr *taskReader) addTasksToBuffer(tasks []*persistence.TaskInfo) bool {

0 commit comments

Comments
 (0)