Skip to content

Commit 1490221

Browse files
authored
Support client-side specification of consistency for DescribeWorkflowExecution and GetWorkflowExecutionHistory (#6789)
* feat(frontend): Support client requested consistency levels Some clients require strong consistency for reads on the DescribeWorkflowExecution and GetWorkflowExecutionHistory rpcs, and are willing to accept the latency trade-off. This adds internal support for endpoints to define a consistency level. * Add tests and remove QueryWorkflowStrongConsistency from configuration * make pr
1 parent 281693e commit 1490221

File tree

7 files changed

+533
-316
lines changed

7 files changed

+533
-316
lines changed

common/types/shared.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,8 +1808,9 @@ func (v *DescribeTaskListResponse) GetTaskListStatus() (o *TaskListStatus) {
18081808

18091809
// DescribeWorkflowExecutionRequest is an internal type (TBD...)
18101810
type DescribeWorkflowExecutionRequest struct {
1811-
Domain string `json:"domain,omitempty"`
1812-
Execution *WorkflowExecution `json:"execution,omitempty"`
1811+
Domain string `json:"domain,omitempty"`
1812+
Execution *WorkflowExecution `json:"execution,omitempty"`
1813+
QueryConsistencyLevel *QueryConsistencyLevel `json:"queryConsistencyLevel,omitempty"`
18131814
}
18141815

18151816
// GetDomain is an internal getter (TBD...)
@@ -1828,6 +1829,14 @@ func (v *DescribeWorkflowExecutionRequest) GetExecution() (o *WorkflowExecution)
18281829
return
18291830
}
18301831

1832+
// GetQueryConsistencyLevel is an internal getter (TBD...)
1833+
func (v *DescribeWorkflowExecutionRequest) GetQueryConsistencyLevel() (o QueryConsistencyLevel) {
1834+
if v != nil && v.QueryConsistencyLevel != nil {
1835+
return *v.QueryConsistencyLevel
1836+
}
1837+
return
1838+
}
1839+
18311840
// DescribeWorkflowExecutionResponse is an internal type (TBD...)
18321841
type DescribeWorkflowExecutionResponse struct {
18331842
ExecutionConfiguration *WorkflowExecutionConfiguration `json:"executionConfiguration,omitempty"`
@@ -2599,6 +2608,7 @@ type GetWorkflowExecutionHistoryRequest struct {
25992608
WaitForNewEvent bool `json:"waitForNewEvent,omitempty"`
26002609
HistoryEventFilterType *HistoryEventFilterType `json:"HistoryEventFilterType,omitempty"`
26012610
SkipArchival bool `json:"skipArchival,omitempty"`
2611+
QueryConsistencyLevel *QueryConsistencyLevel `json:"queryConsistencyLevel,omitempty"`
26022612
}
26032613

26042614
// GetDomain is an internal getter (TBD...)
@@ -2657,6 +2667,14 @@ func (v *GetWorkflowExecutionHistoryRequest) GetSkipArchival() (o bool) {
26572667
return
26582668
}
26592669

2670+
// GetQueryConsistencyLevel is an internal getter (TBD...)
2671+
func (v *GetWorkflowExecutionHistoryRequest) GetQueryConsistencyLevel() (o QueryConsistencyLevel) {
2672+
if v != nil && v.QueryConsistencyLevel != nil {
2673+
return *v.QueryConsistencyLevel
2674+
}
2675+
return
2676+
}
2677+
26602678
// GetWorkflowExecutionHistoryResponse is an internal type (TBD...)
26612679
type GetWorkflowExecutionHistoryResponse struct {
26622680
History *History `json:"history,omitempty"`

service/frontend/templates/clusterredirection.tmpl

Lines changed: 18 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ import (
1212
frontendcfg "github.com/uber/cadence/service/frontend/config"
1313
)
1414

15-
{{$nonFowradingAPIs := list "Health" "DeprecateDomain" "DescribeDomain" "ListDomains" "RegisterDomain" "UpdateDomain" "GetSearchAttributes" "GetClusterInfo" "DiagnoseWorkflowExecution"}}
15+
{{$nonForwardingAPIs := list "Health" "DeprecateDomain" "DescribeDomain" "ListDomains" "RegisterDomain" "UpdateDomain" "GetSearchAttributes" "GetClusterInfo" "DiagnoseWorkflowExecution"}}
1616
{{$domainIDAPIs := list "RecordActivityTaskHeartbeat" "RespondActivityTaskCanceled" "RespondActivityTaskCompleted" "RespondActivityTaskFailed" "RespondDecisionTaskCompleted" "RespondDecisionTaskFailed" "RespondQueryTaskCompleted"}}
1717
{{$queryTaskTokenAPIs := list "RespondQueryTaskCompleted"}}
18-
{{$specialCaseAPIs := list "QueryWorkflow"}}
18+
{{$readAPIsWithStrongConsistency := list "QueryWorkflow" "DescribeWorkflowExecution" "GetWorkflowExecutionHistory"}}
1919

2020
type (
2121
// ClusterRedirectionHandlerImpl is simple wrapper over frontend service, doing redirection based on policy for global domains not being active in current cluster
@@ -55,13 +55,24 @@ func NewAPIHandler(
5555
}
5656

5757
{{range $method := .Interface.Methods}}
58-
{{- if not (has $method.Name $specialCaseAPIs)}}
5958
func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
60-
{{- if has $method.Name $nonFowradingAPIs}}
59+
{{- if has $method.Name $nonForwardingAPIs}}
6160
return handler.frontendHandler.{{$method.Call}}
6261
{{- else}}
63-
var apiName = "{{$method.Name}}"
64-
var cluster string
62+
var (
63+
apiName = "{{$method.Name}}"
64+
cluster string
65+
requestedConsistencyLevel types.QueryConsistencyLevel = types.QueryConsistencyLevelEventual
66+
)
67+
68+
{{- if has $method.Name $readAPIsWithStrongConsistency}}
69+
// Only autoforward strong consistent queries, this is done for two reasons:
70+
// 1. Query is meant to be fast, autoforwarding all queries will increase latency.
71+
// 2. If eventual consistency was requested then the results from running out of local dc will be fine.
72+
if {{(index $method.Params 1).Name}}.GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
73+
requestedConsistencyLevel = types.QueryConsistencyLevelStrong
74+
}
75+
{{- end}}
6576

6677
{{$policyMethod := "WithDomainNameRedirect"}}
6778
{{$domain := printf "%s.GetDomain()" (index $method.Params 1).Name}}
@@ -94,7 +105,7 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
94105
}
95106
{{- end}}
96107

97-
err = handler.redirectionPolicy.{{$policyMethod}}(ctx, {{$domain}}, apiName, func(targetDC string) error {
108+
err = handler.redirectionPolicy.{{$policyMethod}}(ctx, {{$domain}}, apiName, requestedConsistencyLevel, func(targetDC string) error {
98109
cluster = targetDC
99110
switch {
100111
case targetDC == handler.currentClusterName:
@@ -110,38 +121,3 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
110121
{{- end}}
111122
}
112123
{{end}}
113-
{{end}}
114-
115-
func (handler *clusterRedirectionHandler) QueryWorkflow(
116-
ctx context.Context,
117-
request *types.QueryWorkflowRequest,
118-
) (resp *types.QueryWorkflowResponse, retError error) {
119-
var apiName = "QueryWorkflow"
120-
var err error
121-
var cluster string
122-
123-
// Only autoforward strong consistent queries, this is done for two reasons:
124-
// 1. Query is meant to be fast, autoforwarding all queries will increase latency.
125-
// 2. If eventual consistency was requested then the results from running out of local dc will be fine.
126-
if request.GetQueryConsistencyLevel() == types.QueryConsistencyLevelStrong {
127-
apiName = "QueryWorkflowStrongConsistency"
128-
}
129-
scope, startTime := handler.beforeCall(metrics.DCRedirectionQueryWorkflowScope)
130-
defer func() {
131-
handler.afterCall(recover(), scope, startTime, request.GetDomain(), "", cluster, &retError)
132-
}()
133-
134-
err = handler.redirectionPolicy.WithDomainNameRedirect(ctx, request.GetDomain(), apiName, func(targetDC string) error {
135-
cluster = targetDC
136-
switch {
137-
case targetDC == handler.currentClusterName:
138-
resp, err = handler.frontendHandler.QueryWorkflow(ctx, request)
139-
default:
140-
remoteClient := handler.GetRemoteFrontendClient(targetDC)
141-
resp, err = remoteClient.QueryWorkflow(ctx, request, handler.callOptions...)
142-
}
143-
return err
144-
})
145-
146-
return resp, err
147-
}

0 commit comments

Comments
 (0)