Skip to content

Commit 89bae56

Browse files
author
Tamer Eldeeb
authored
Enable GetWorkflowExecutionHistory after mutable state is deleted (#236)
In this change we keep the length of the history in the visibility record. This is useful in itself, but also allows reading the execution history even after deleting its mutable state. Issue #206
1 parent b8d3a81 commit 89bae56

File tree

10 files changed

+193
-16
lines changed

10 files changed

+193
-16
lines changed

.gen/go/shared/shared.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,6 +1768,7 @@ func (p *WorkflowExecution) String() string {
17681768
// - StartTime
17691769
// - CloseTime
17701770
// - CloseStatus
1771+
// - HistoryLength
17711772
type WorkflowExecutionInfo struct {
17721773
// unused fields # 1 to 9
17731774
Execution *WorkflowExecution `thrift:"execution,10" db:"execution" json:"execution,omitempty"`
@@ -1779,6 +1780,8 @@ type WorkflowExecutionInfo struct {
17791780
CloseTime *int64 `thrift:"closeTime,40" db:"closeTime" json:"closeTime,omitempty"`
17801781
// unused fields # 41 to 49
17811782
CloseStatus *WorkflowExecutionCloseStatus `thrift:"closeStatus,50" db:"closeStatus" json:"closeStatus,omitempty"`
1783+
// unused fields # 51 to 59
1784+
HistoryLength *int64 `thrift:"historyLength,60" db:"historyLength" json:"historyLength,omitempty"`
17821785
}
17831786

17841787
func NewWorkflowExecutionInfo() *WorkflowExecutionInfo {
@@ -1820,6 +1823,13 @@ func (p *WorkflowExecutionInfo) GetCloseStatus() WorkflowExecutionCloseStatus {
18201823
}
18211824
return *p.CloseStatus
18221825
}
1826+
var WorkflowExecutionInfo_HistoryLength_DEFAULT int64
1827+
func (p *WorkflowExecutionInfo) GetHistoryLength() int64 {
1828+
if !p.IsSetHistoryLength() {
1829+
return WorkflowExecutionInfo_HistoryLength_DEFAULT
1830+
}
1831+
return *p.HistoryLength
1832+
}
18231833
func (p *WorkflowExecutionInfo) IsSetExecution() bool {
18241834
return p.Execution != nil
18251835
}
@@ -1840,6 +1850,10 @@ func (p *WorkflowExecutionInfo) IsSetCloseStatus() bool {
18401850
return p.CloseStatus != nil
18411851
}
18421852

1853+
func (p *WorkflowExecutionInfo) IsSetHistoryLength() bool {
1854+
return p.HistoryLength != nil
1855+
}
1856+
18431857
func (p *WorkflowExecutionInfo) Read(iprot thrift.TProtocol) error {
18441858
if _, err := iprot.ReadStructBegin(); err != nil {
18451859
return thrift.PrependError(fmt.Sprintf("%T read error: ", p), err)
@@ -1873,6 +1887,10 @@ func (p *WorkflowExecutionInfo) Read(iprot thrift.TProtocol) error {
18731887
if err := p.ReadField50(iprot); err != nil {
18741888
return err
18751889
}
1890+
case 60:
1891+
if err := p.ReadField60(iprot); err != nil {
1892+
return err
1893+
}
18761894
default:
18771895
if err := iprot.Skip(fieldTypeId); err != nil {
18781896
return err
@@ -1932,6 +1950,15 @@ func (p *WorkflowExecutionInfo) ReadField50(iprot thrift.TProtocol) error {
19321950
return nil
19331951
}
19341952

1953+
func (p *WorkflowExecutionInfo) ReadField60(iprot thrift.TProtocol) error {
1954+
if v, err := iprot.ReadI64(); err != nil {
1955+
return thrift.PrependError("error reading field 60: ", err)
1956+
} else {
1957+
p.HistoryLength = &v
1958+
}
1959+
return nil
1960+
}
1961+
19351962
func (p *WorkflowExecutionInfo) Write(oprot thrift.TProtocol) error {
19361963
if err := oprot.WriteStructBegin("WorkflowExecutionInfo"); err != nil {
19371964
return thrift.PrependError(fmt.Sprintf("%T write struct begin error: ", p), err) }
@@ -1941,6 +1968,7 @@ func (p *WorkflowExecutionInfo) Write(oprot thrift.TProtocol) error {
19411968
if err := p.writeField30(oprot); err != nil { return err }
19421969
if err := p.writeField40(oprot); err != nil { return err }
19431970
if err := p.writeField50(oprot); err != nil { return err }
1971+
if err := p.writeField60(oprot); err != nil { return err }
19441972
}
19451973
if err := oprot.WriteFieldStop(); err != nil {
19461974
return thrift.PrependError("write field stop error: ", err) }
@@ -2011,6 +2039,18 @@ func (p *WorkflowExecutionInfo) writeField50(oprot thrift.TProtocol) (err error)
20112039
return err
20122040
}
20132041

2042+
func (p *WorkflowExecutionInfo) writeField60(oprot thrift.TProtocol) (err error) {
2043+
if p.IsSetHistoryLength() {
2044+
if err := oprot.WriteFieldBegin("historyLength", thrift.I64, 60); err != nil {
2045+
return thrift.PrependError(fmt.Sprintf("%T write field begin error 60:historyLength: ", p), err) }
2046+
if err := oprot.WriteI64(int64(*p.HistoryLength)); err != nil {
2047+
return thrift.PrependError(fmt.Sprintf("%T.historyLength (60) field write error: ", p), err) }
2048+
if err := oprot.WriteFieldEnd(); err != nil {
2049+
return thrift.PrependError(fmt.Sprintf("%T write field end error 60:historyLength: ", p), err) }
2050+
}
2051+
return err
2052+
}
2053+
20142054
func (p *WorkflowExecutionInfo) String() string {
20152055
if p == nil {
20162056
return "<nil>"

common/mocks/VisibilityManager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,29 @@ type VisibilityManager struct {
2828
mock.Mock
2929
}
3030

31+
// GetClosedWorkflowExecution provides a mock function with given fields: request
32+
func (_m *VisibilityManager) GetClosedWorkflowExecution(request *persistence.GetClosedWorkflowExecutionRequest) (*persistence.GetClosedWorkflowExecutionResponse, error) {
33+
ret := _m.Called(request)
34+
35+
var r0 *persistence.GetClosedWorkflowExecutionResponse
36+
if rf, ok := ret.Get(0).(func(*persistence.GetClosedWorkflowExecutionRequest) *persistence.GetClosedWorkflowExecutionResponse); ok {
37+
r0 = rf(request)
38+
} else {
39+
if ret.Get(0) != nil {
40+
r0 = ret.Get(0).(*persistence.GetClosedWorkflowExecutionResponse)
41+
}
42+
}
43+
44+
var r1 error
45+
if rf, ok := ret.Get(1).(func(*persistence.GetClosedWorkflowExecutionRequest) error); ok {
46+
r1 = rf(request)
47+
} else {
48+
r1 = ret.Error(1)
49+
}
50+
51+
return r0, r1
52+
}
53+
3154
// ListClosedWorkflowExecutions provides a mock function with given fields: request
3255
func (_m *VisibilityManager) ListClosedWorkflowExecutions(request *persistence.ListWorkflowExecutionsRequest) (*persistence.ListWorkflowExecutionsResponse, error) {
3356
ret := _m.Called(request)

common/persistence/cassandraVisibilityPersistence.go

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ const (
4949
`AND run_id = ?`
5050

5151
templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
52-
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status) ` +
53-
`VALUES (?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
52+
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length) ` +
53+
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) using TTL ?`
5454

5555
templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
5656
`FROM open_executions ` +
@@ -59,7 +59,7 @@ const (
5959
`AND start_time >= ? ` +
6060
`AND start_time <= ? `
6161

62-
templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
62+
templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
6363
`FROM closed_executions ` +
6464
`WHERE domain_id = ? ` +
6565
`AND domain_partition IN (?) ` +
@@ -74,7 +74,7 @@ const (
7474
`AND start_time <= ? ` +
7575
`AND workflow_type_name = ? `
7676

77-
templateGetClosedWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
77+
templateGetClosedWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
7878
`FROM closed_executions ` +
7979
`WHERE domain_id = ? ` +
8080
`AND domain_partition = ? ` +
@@ -90,21 +90,28 @@ const (
9090
`AND start_time <= ? ` +
9191
`AND workflow_id = ? `
9292

93-
templateGetClosedWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
93+
templateGetClosedWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
9494
`FROM closed_executions ` +
9595
`WHERE domain_id = ? ` +
9696
`AND domain_partition = ? ` +
9797
`AND start_time >= ? ` +
9898
`AND start_time <= ? ` +
9999
`AND workflow_id = ? `
100100

101-
templateGetClosedWorkflowExecutionsByStatus = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status ` +
101+
templateGetClosedWorkflowExecutionsByStatus = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
102102
`FROM closed_executions ` +
103103
`WHERE domain_id = ? ` +
104104
`AND domain_partition = ? ` +
105105
`AND start_time >= ? ` +
106106
`AND start_time <= ? ` +
107107
`AND status = ? `
108+
109+
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name, status, history_length ` +
110+
`FROM closed_executions ` +
111+
`WHERE domain_id = ? ` +
112+
`AND domain_partition = ? ` +
113+
`AND workflow_id = ? ` +
114+
`AND run_id = ? ALLOW FILTERING `
108115
)
109116

110117
type (
@@ -183,6 +190,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
183190
common.UnixNanoToCQLTimestamp(request.CloseTimestamp),
184191
request.WorkflowTypeName,
185192
request.Status,
193+
request.HistoryLength,
186194
retention,
187195
)
188196

@@ -446,6 +454,41 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
446454
return response, nil
447455
}
448456

457+
func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
458+
request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error) {
459+
execution := request.Execution
460+
query := v.session.Query(templateGetClosedWorkflowExecution,
461+
request.DomainUUID,
462+
domainPartition,
463+
execution.GetWorkflowId(),
464+
execution.GetRunId())
465+
466+
iter := query.Iter()
467+
if iter == nil {
468+
return nil, &workflow.InternalServiceError{
469+
Message: "GetClosedWorkflowExecution operation failed. Not able to create query iterator.",
470+
}
471+
}
472+
473+
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
474+
if !has {
475+
return nil, &workflow.EntityNotExistsError{
476+
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
477+
execution.GetWorkflowId(), execution.GetRunId()),
478+
}
479+
}
480+
481+
if err := iter.Close(); err != nil {
482+
return nil, &workflow.InternalServiceError{
483+
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
484+
}
485+
}
486+
487+
return &GetClosedWorkflowExecutionResponse{
488+
Execution: wfexecution,
489+
}, nil
490+
}
491+
449492
func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExecutionInfo, bool) {
450493
var workflowID string
451494
var runID gocql.UUID
@@ -475,7 +518,8 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExec
475518
var startTime time.Time
476519
var closeTime time.Time
477520
var status workflow.WorkflowExecutionCloseStatus
478-
if iter.Scan(&workflowID, &runID, &startTime, &closeTime, &typeName, &status) {
521+
var historyLength int64
522+
if iter.Scan(&workflowID, &runID, &startTime, &closeTime, &typeName, &status, &historyLength) {
479523
execution := workflow.NewWorkflowExecution()
480524
execution.WorkflowId = common.StringPtr(workflowID)
481525
execution.RunId = common.StringPtr(runID.String())
@@ -489,6 +533,7 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*workflow.WorkflowExec
489533
record.CloseTime = common.Int64Ptr(closeTime.UnixNano())
490534
record.Type = wfType
491535
record.CloseStatus = workflow.WorkflowExecutionCloseStatusPtr(status)
536+
record.HistoryLength = common.Int64Ptr(historyLength)
492537
return record, true
493538
}
494539
return nil, false

common/persistence/cassandraVisibilityPersistence_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,3 +404,45 @@ func (s *visibilityPersistenceSuite) TestFilteringByCloseStatus() {
404404
s.Equal(1, len(resp.Executions))
405405
s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId())
406406
}
407+
408+
func (s *visibilityPersistenceSuite) TestGetClosedExecution() {
409+
testDomainUUID := uuid.New()
410+
411+
workflowExecution := gen.WorkflowExecution{
412+
WorkflowId: common.StringPtr("visibility-workflow-test"),
413+
RunId: common.StringPtr("a3dbc7bf-deb1-4946-b57c-cf0615ea553f"),
414+
}
415+
416+
startTime := time.Now().Add(time.Second * -5).UnixNano()
417+
err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{
418+
DomainUUID: testDomainUUID,
419+
Execution: workflowExecution,
420+
WorkflowTypeName: "visibility-workflow",
421+
StartTimestamp: startTime,
422+
})
423+
s.Nil(err0)
424+
425+
_, err1 := s.VisibilityMgr.GetClosedWorkflowExecution(&GetClosedWorkflowExecutionRequest{
426+
DomainUUID: testDomainUUID,
427+
Execution: workflowExecution,
428+
})
429+
s.NotNil(err1)
430+
431+
err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&RecordWorkflowExecutionClosedRequest{
432+
DomainUUID: testDomainUUID,
433+
Execution: workflowExecution,
434+
WorkflowTypeName: "visibility-workflow",
435+
StartTimestamp: startTime,
436+
CloseTimestamp: time.Now().UnixNano(),
437+
HistoryLength: 3,
438+
})
439+
s.Nil(err2)
440+
441+
resp, err3 := s.VisibilityMgr.GetClosedWorkflowExecution(&GetClosedWorkflowExecutionRequest{
442+
DomainUUID: testDomainUUID,
443+
Execution: workflowExecution,
444+
})
445+
s.Nil(err3)
446+
s.Equal(workflowExecution.GetWorkflowId(), resp.Execution.GetExecution().GetWorkflowId())
447+
s.Equal(int64(3), resp.Execution.GetHistoryLength())
448+
}

common/persistence/visibilityInterfaces.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type (
4747
StartTimestamp int64
4848
CloseTimestamp int64
4949
Status s.WorkflowExecutionCloseStatus
50+
HistoryLength int64
5051
RetentionSeconds int64
5152
}
5253

@@ -91,6 +92,17 @@ type (
9192
Status s.WorkflowExecutionCloseStatus
9293
}
9394

95+
// GetClosedWorkflowExecutionRequest is used retrieve the record for a specific execution
96+
GetClosedWorkflowExecutionRequest struct {
97+
DomainUUID string
98+
Execution s.WorkflowExecution
99+
}
100+
101+
// GetClosedWorkflowExecutionResponse is the response to GetClosedWorkflowExecutionRequest
102+
GetClosedWorkflowExecutionResponse struct {
103+
Execution *s.WorkflowExecutionInfo
104+
}
105+
94106
// VisibilityManager is used to manage the visibility store
95107
VisibilityManager interface {
96108
RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error
@@ -102,5 +114,6 @@ type (
102114
ListOpenWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
103115
ListClosedWorkflowExecutionsByWorkflowID(request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error)
104116
ListClosedWorkflowExecutionsByStatus(request *ListClosedWorkflowExecutionsByStatusRequest) (*ListWorkflowExecutionsResponse, error)
117+
GetClosedWorkflowExecution(request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error)
105118
}
106119
)

idl/github.com/uber/cadence/shared.thrift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ struct WorkflowExecutionInfo {
174174
30: optional i64 (js.type = "Long") startTime
175175
40: optional i64 (js.type = "Long") closeTime
176176
50: optional WorkflowExecutionCloseStatus closeStatus
177+
60: optional i64 (js.type = "Long") historyLength
177178
}
178179

179180
struct ScheduleActivityTaskDecisionAttributes {

schema/visibility/schema.cql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ CREATE TABLE closed_executions (
2525
close_time timestamp,
2626
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
2727
workflow_type_name text,
28+
history_length bigint,
2829
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
2930
) WITH CLUSTERING ORDER BY (start_time DESC)
3031
AND COMPACTION = {

schema/visibility/versioned/v0.1/base.cql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ CREATE TABLE closed_executions (
2525
close_time timestamp,
2626
status int, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT}
2727
workflow_type_name text,
28+
history_length bigint,
2829
PRIMARY KEY ((domain_id, domain_partition), start_time, run_id)
2930
) WITH CLUSTERING ORDER BY (start_time DESC)
3031
AND COMPACTION = {

service/frontend/handler.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -558,11 +558,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
558558
return nil, errWorkflowIDNotSet
559559
}
560560

561-
if !getRequest.GetExecution().IsSetRunId() {
562-
return nil, errRunIDNotSet
563-
}
564-
565-
if uuid.Parse(getRequest.GetExecution().GetRunId()) == nil {
561+
if getRequest.GetExecution().IsSetRunId() && uuid.Parse(getRequest.GetExecution().GetRunId()) == nil {
566562
return nil, errInvalidRunID
567563
}
568564

@@ -587,11 +583,25 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
587583
DomainUUID: common.StringPtr(info.ID),
588584
Execution: getRequest.GetExecution(),
589585
})
590-
if err != nil {
591-
return nil, wrapError(err)
586+
if err == nil {
587+
token.nextEventID = response.GetEventId()
588+
token.runID = response.GetRunId()
589+
} else {
590+
if _, ok := err.(*gen.EntityNotExistsError); !ok || !getRequest.GetExecution().IsSetRunId() {
591+
return nil, wrapError(err)
592+
}
593+
// It is possible that we still have the events in the table even though the mutable state is gone
594+
// Get the nextEventID from visibility store if we still have it.
595+
visibilityResp, err := wh.visibitiltyMgr.GetClosedWorkflowExecution(&persistence.GetClosedWorkflowExecutionRequest{
596+
DomainUUID: info.ID,
597+
Execution: *getRequest.GetExecution(),
598+
})
599+
if err != nil {
600+
return nil, wrapError(err)
601+
}
602+
token.nextEventID = visibilityResp.Execution.GetHistoryLength()
603+
token.runID = visibilityResp.Execution.GetExecution().GetRunId()
592604
}
593-
token.nextEventID = response.GetEventId()
594-
token.runID = response.GetRunId()
595605
}
596606

597607
we := gen.WorkflowExecution{

0 commit comments

Comments
 (0)