Skip to content

Commit aab5a6f

Browse files
author
Tamer Eldeeb
authored
Store status for closed workflow executions (#164)
This is a first step to being able to query closed workflows by status in visibility APIs. Issue #123
1 parent 37331d8 commit aab5a6f

File tree

4 files changed

+24
-0
lines changed

4 files changed

+24
-0
lines changed

common/persistence/cassandraPersistence.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const (
7171
`decision_task_timeout: ?, ` +
7272
`execution_context: ?, ` +
7373
`state: ?, ` +
74+
`close_status: ?, ` +
7475
`next_event_id: ?, ` +
7576
`last_processed_event: ?, ` +
7677
`start_time: ?, ` +
@@ -645,6 +646,7 @@ func (d *cassandraPersistence) CreateWorkflowExecutionWithinBatch(request *Creat
645646
request.DecisionTimeoutValue,
646647
request.ExecutionContext,
647648
WorkflowStateCreated,
649+
WorkflowCloseStatusNone,
648650
request.NextEventID,
649651
request.LastProcessedEvent,
650652
cqlNowTimestamp,
@@ -720,6 +722,7 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
720722
executionInfo.DecisionTimeoutValue,
721723
executionInfo.ExecutionContext,
722724
executionInfo.State,
725+
executionInfo.CloseStatus,
723726
executionInfo.NextEventID,
724727
executionInfo.LastProcessedEvent,
725728
executionInfo.StartTimestamp,
@@ -830,6 +833,7 @@ func (d *cassandraPersistence) DeleteWorkflowExecution(request *DeleteWorkflowEx
830833
info.DecisionTimeoutValue,
831834
info.ExecutionContext,
832835
info.State,
836+
info.CloseStatus,
833837
info.NextEventID,
834838
info.LastProcessedEvent,
835839
info.StartTimestamp,

common/persistence/dataInterfaces.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ const (
2020
WorkflowStateCompleted
2121
)
2222

23+
// Workflow execution close status
24+
const (
25+
WorkflowCloseStatusNone = iota
26+
WorkflowCloseStatusCompleted
27+
WorkflowCloseStatusFailed
28+
WorkflowCloseStatusCanceled
29+
WorkflowCloseStatusTerminated
30+
WorkflowCloseStatusContinuedAsNew
31+
WorkflowCloseStatusTimedOut
32+
)
33+
2334
// Types of task lists
2435
const (
2536
TaskListTypeDecision = iota
@@ -83,6 +94,7 @@ type (
8394
DecisionTimeoutValue int32
8495
ExecutionContext []byte
8596
State int
97+
CloseStatus int
8698
NextEventID int64
8799
LastProcessedEvent int64
88100
StartTimestamp time.Time

schema/workflow_test.cql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ CREATE TYPE workflow_execution (
2525
decision_task_timeout int,
2626
execution_context blob,
2727
state int, -- enum WorkflowState {Created, Running, Completed}
28+
close_status int, -- enum WorkflowCloseStatus {None, Completed, Failed, Canceled, Terminated, ContinuedAsNew, TimedOut}
2829
next_event_id bigint,
2930
last_processed_event bigint,
3031
start_time timestamp,

service/history/mutableStateBuilder.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ func newMutableStateBuilder(logger bark.Logger) *mutableStateBuilder {
6969
s.executionInfo = &persistence.WorkflowExecutionInfo{
7070
NextEventID: firstEventID,
7171
State: persistence.WorkflowStateCreated,
72+
CloseStatus: persistence.WorkflowCloseStatusNone,
7273
LastProcessedEvent: emptyEventID,
7374
}
7475

@@ -330,6 +331,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionStartedEvent(domainID string,
330331
e.executionInfo.DecisionTimeoutValue = request.GetTaskStartToCloseTimeoutSeconds()
331332

332333
e.executionInfo.State = persistence.WorkflowStateCreated
334+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusNone
333335
e.executionInfo.LastProcessedEvent = emptyEventID
334336
e.executionInfo.CreateRequestID = request.GetRequestId()
335337
e.executionInfo.DecisionScheduleID = emptyEventID
@@ -591,6 +593,7 @@ func (e *mutableStateBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID
591593
}
592594

593595
e.executionInfo.State = persistence.WorkflowStateCompleted
596+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCompleted
594597
return e.hBuilder.AddCompletedWorkflowEvent(decisionCompletedEventID, attributes)
595598
}
596599

@@ -602,6 +605,7 @@ func (e *mutableStateBuilder) AddFailWorkflowEvent(decisionCompletedEventID int6
602605
}
603606

604607
e.executionInfo.State = persistence.WorkflowStateCompleted
608+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusFailed
605609
return e.hBuilder.AddFailWorkflowEvent(decisionCompletedEventID, attributes)
606610
}
607611

@@ -628,6 +632,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionCanceledEvent(decisionTaskComp
628632
}
629633

630634
e.executionInfo.State = persistence.WorkflowStateCompleted
635+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusCanceled
631636
return e.hBuilder.AddWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID, attributes)
632637
}
633638

@@ -735,6 +740,7 @@ func (e *mutableStateBuilder) AddWorkflowExecutionTerminatedEvent(
735740
}
736741

737742
e.executionInfo.State = persistence.WorkflowStateCompleted
743+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusTerminated
738744
return e.hBuilder.AddWorkflowExecutionTerminatedEvent(request)
739745
}
740746

@@ -759,6 +765,7 @@ func (e *mutableStateBuilder) AddContinueAsNewEvent(decisionCompletedEventID int
759765
}
760766

761767
e.executionInfo.State = persistence.WorkflowStateCompleted
768+
e.executionInfo.CloseStatus = persistence.WorkflowCloseStatusContinuedAsNew
762769
newExecution := workflow.WorkflowExecution{
763770
WorkflowId: common.StringPtr(e.executionInfo.WorkflowID),
764771
RunId: common.StringPtr(newRunID),

0 commit comments

Comments
 (0)