Skip to content

Update C* queries about execution table with timestamps #6593

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,29 @@ func (db *cdb) InsertWorkflowExecutionWithTasks(
shardID := shardCondition.ShardID
domainID := execution.DomainID
workflowID := execution.WorkflowID
timeStamp := db.timeSrc.Now()

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

err := insertOrUpsertWorkflowRequestRow(batch, requests)
err := insertOrUpsertWorkflowRequestRow(batch, requests, timeStamp)
if err != nil {
return err
}
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest, timeStamp)
if err != nil {
return err
}

err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, execution)
err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, execution, timeStamp)
if err != nil {
return err
}

createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
assertShardRangeID(batch, shardID, shardCondition.RangeID)
createTransferTasks(batch, shardID, domainID, workflowID, transferTasks, timeStamp)
createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks, timeStamp)
createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks, timeStamp)
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks, timeStamp)
assertShardRangeID(batch, shardID, shardCondition.RangeID, timeStamp)

return executeCreateWorkflowBatchTransaction(ctx, db.session, batch, currentWorkflowRequest, execution, shardCondition)
}
Expand Down Expand Up @@ -129,6 +130,7 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(
shardID := shardCondition.ShardID
var domainID, workflowID string
var previousNextEventIDCondition int64
timeStamp := db.timeSrc.Now()
if mutatedExecution != nil {
domainID = mutatedExecution.DomainID
workflowID = mutatedExecution.WorkflowID
Expand All @@ -143,41 +145,41 @@ func (db *cdb) UpdateWorkflowExecutionWithTasks(

batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)

err := insertOrUpsertWorkflowRequestRow(batch, requests)
err := insertOrUpsertWorkflowRequestRow(batch, requests, timeStamp)
if err != nil {
return err
}
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest)
err = createOrUpdateCurrentWorkflow(batch, shardID, domainID, workflowID, currentWorkflowRequest, timeStamp)
if err != nil {
return err
}

if mutatedExecution != nil {
err = updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(batch, shardID, domainID, workflowID, mutatedExecution)
err = updateWorkflowExecutionAndEventBufferWithMergeAndDeleteMaps(batch, shardID, domainID, workflowID, mutatedExecution, timeStamp)
if err != nil {
return err
}
}

if insertedExecution != nil {
err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, insertedExecution)
err = createWorkflowExecutionWithMergeMaps(batch, shardID, domainID, workflowID, insertedExecution, timeStamp)
if err != nil {
return err
}
}

if resetExecution != nil {
err = resetWorkflowExecutionAndMapsAndEventBuffer(batch, shardID, domainID, workflowID, resetExecution)
err = resetWorkflowExecutionAndMapsAndEventBuffer(batch, shardID, domainID, workflowID, resetExecution, timeStamp)
if err != nil {
return err
}
}

createTransferTasks(batch, shardID, domainID, workflowID, transferTasks)
createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks)
createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks)
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks)
assertShardRangeID(batch, shardID, shardCondition.RangeID)
createTransferTasks(batch, shardID, domainID, workflowID, transferTasks, timeStamp)
createReplicationTasks(batch, shardID, domainID, workflowID, replicationTasks, timeStamp)
createCrossClusterTasks(batch, shardID, domainID, workflowID, crossClusterTasks, timeStamp)
createTimerTasks(batch, shardID, domainID, workflowID, timerTasks, timeStamp)
assertShardRangeID(batch, shardID, shardCondition.RangeID, timeStamp)

return executeUpdateWorkflowBatchTransaction(ctx, db.session, batch, currentWorkflowRequest, previousNextEventIDCondition, shardCondition)
}
Expand Down Expand Up @@ -648,6 +650,7 @@ func (db *cdb) InsertReplicationDLQTask(ctx context.Context, shardID int, source
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
task.TaskID,
db.timeSrc.Now(),
).WithContext(ctx)

return query.Exec()
Expand Down Expand Up @@ -724,11 +727,12 @@ func (db *cdb) InsertReplicationTask(ctx context.Context, tasks []*nosqlplugin.R

shardID := shardCondition.ShardID
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
timeStamp := db.timeSrc.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point of view:
It is incorrect. The field belongs to the ReplicationTask, so an SQL plugin could later support it.
I don't think db should have timeSrc field, this should be filled on the PersistenceManager level and pushed as is to the DB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

db object was using time.Now() in a few places since the beginning. While we were adding unit tests last year I replaced all time.Now()s with a db.timeSrc.Now() so the generated queries can be validated in the tests. This PR is following same pattern.
What you suggest is valid I think. The timestamp should be determined by the upper layer PersistentManager and the db is purely translating the entity records to DB queries without adding anything else on top. However that requires some refactoring. I don't think is is a source of bug but it's just better isolation of responsibilities. Let's turn that into a github issue and label with #up-for-grab.

for _, task := range tasks {
createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task})
createReplicationTasks(batch, shardID, task.DomainID, task.WorkflowID, []*nosqlplugin.ReplicationTask{task}, timeStamp)
}

assertShardRangeID(batch, shardID, shardCondition.RangeID)
assertShardRangeID(batch, shardID, shardCondition.RangeID, timeStamp)

previous := make(map[string]interface{})
applied, iter, err := db.session.MapExecuteBatchCAS(batch, previous)
Expand Down
52 changes: 35 additions & 17 deletions common/persistence/nosql/nosqlplugin/cassandra/workflow_cql.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ const (
`SET current_run_id = ?, ` +
`execution = {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, ` +
`workflow_last_write_version = ?, ` +
`workflow_state = ? ` +
`workflow_state = ?, ` +
`last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -238,12 +239,12 @@ const (
`and workflow_state = ? `

templateInsertWorkflowRequestQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS USING TTL ?`
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) IF NOT EXISTS USING TTL ?`

templateUpsertWorkflowRequestQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?`
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, last_updated_time) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) USING TTL ?`

templateGetLatestWorkflowRequestQuery = `SELECT current_run_id ` +
`FROM executions ` +
Expand All @@ -256,31 +257,32 @@ const (
`LIMIT 1`

templateCreateCurrentWorkflowExecutionQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution, workflow_last_write_version, workflow_state) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, ?, ?) IF NOT EXISTS USING TTL 0 `
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, current_run_id, execution, workflow_last_write_version, workflow_state, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ?, {run_id: ?, create_request_id: ?, state: ?, close_status: ?}, ?, ?, ?) IF NOT EXISTS USING TTL 0 `

templateCreateWorkflowExecutionWithVersionHistoriesQuery = `INSERT INTO executions (` +
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum, workflow_last_write_version, workflow_state) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?, ` + templateChecksumType + `, ?, ?) IF NOT EXISTS `
`shard_id, domain_id, workflow_id, run_id, type, execution, next_event_id, visibility_ts, task_id, version_histories, version_histories_encoding, checksum, workflow_last_write_version, workflow_state, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateWorkflowExecutionType + `, ?, ?, ?, ?, ?, ` + templateChecksumType + `, ?, ?, ?) IF NOT EXISTS `

templateCreateTransferTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTransferTaskType + `, ?, ?)`
`shard_id, type, domain_id, workflow_id, run_id, transfer, visibility_ts, task_id, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTransferTaskType + `, ?, ?, ?)`

templateCreateCrossClusterTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, cross_cluster, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateCrossClusterTaskType + `, ?, ?)`
`shard_id, type, domain_id, workflow_id, run_id, cross_cluster, visibility_ts, task_id, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateCrossClusterTaskType + `, ?, ?, ?)`

templateCreateReplicationTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, replication, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateReplicationTaskType + `, ?, ?)`
`shard_id, type, domain_id, workflow_id, run_id, replication, visibility_ts, task_id, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateReplicationTaskType + `, ?, ?, ?)`

templateCreateTimerTaskQuery = `INSERT INTO executions (` +
`shard_id, type, domain_id, workflow_id, run_id, timer, visibility_ts, task_id) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTimerTaskType + `, ?, ?)`
`shard_id, type, domain_id, workflow_id, run_id, timer, visibility_ts, task_id, created_time) ` +
`VALUES(?, ?, ?, ?, ?, ` + templateTimerTaskType + `, ?, ?, ?)`

templateUpdateLeaseQuery = `UPDATE executions ` +
`SET range_id = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand Down Expand Up @@ -341,6 +343,7 @@ const (
`, checksum = ` + templateChecksumType +
`, workflow_last_write_version = ? ` +
`, workflow_state = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -352,6 +355,7 @@ const (

templateUpdateActivityInfoQuery = `UPDATE executions ` +
`SET activity_map[ ? ] = ` + templateActivityInfoType + ` ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -362,6 +366,7 @@ const (

templateResetActivityInfoQuery = `UPDATE executions ` +
`SET activity_map = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -372,6 +377,7 @@ const (

templateUpdateTimerInfoQuery = `UPDATE executions ` +
`SET timer_map[ ? ] = ` + templateTimerInfoType + ` ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -382,6 +388,7 @@ const (

templateResetTimerInfoQuery = `UPDATE executions ` +
`SET timer_map = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -392,6 +399,7 @@ const (

templateUpdateChildExecutionInfoQuery = `UPDATE executions ` +
`SET child_executions_map[ ? ] = ` + templateChildExecutionInfoType + ` ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -402,6 +410,7 @@ const (

templateResetChildExecutionInfoQuery = `UPDATE executions ` +
`SET child_executions_map = ?` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -412,6 +421,7 @@ const (

templateUpdateRequestCancelInfoQuery = `UPDATE executions ` +
`SET request_cancel_map[ ? ] = ` + templateRequestCancelInfoType + ` ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -422,6 +432,7 @@ const (

templateResetRequestCancelInfoQuery = `UPDATE executions ` +
`SET request_cancel_map = ?` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -432,6 +443,7 @@ const (

templateUpdateSignalInfoQuery = `UPDATE executions ` +
`SET signal_map[ ? ] = ` + templateSignalInfoType + ` ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -442,6 +454,7 @@ const (

templateResetSignalInfoQuery = `UPDATE executions ` +
`SET signal_map = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -452,6 +465,7 @@ const (

templateUpdateSignalRequestedQuery = `UPDATE executions ` +
`SET signal_requested = signal_requested + ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -462,6 +476,7 @@ const (

templateResetSignalRequestedQuery = `UPDATE executions ` +
`SET signal_requested = ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -472,6 +487,7 @@ const (

templateAppendBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = buffered_events_list + ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand All @@ -482,6 +498,7 @@ const (

templateDeleteBufferedEventsQuery = `UPDATE executions ` +
`SET buffered_events_list = [] ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand Down Expand Up @@ -553,6 +570,7 @@ const (

templateDeleteWorkflowExecutionSignalRequestedQuery = `UPDATE executions ` +
`SET signal_requested = signal_requested - ? ` +
`, last_updated_time = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
Expand Down
Loading
Loading