Skip to content

Commit dbfed08

Browse files
Domain Deprecation: Implement workflow listing and termination activities (#6819)
* GitButler Workspace Commit This is a merge commit the virtual branches in your workspace. Due to GitButler managing multiple virtual branches, you cannot switch back and forth between git branches and virtual branches easily. If you switch to another branch, GitButler will need to be reinitialized. If you commit on this branch, GitButler will throw it away. Here are the branches that are currently applied: - domain-deprecation/list_terminate_activities (refs/gitbutler/domain-deprecation/list_terminate_activities) branch head: 4bfa8c5 - service/worker/domaindeprecation/workflow.go - config/dynamicconfig/development_es.yaml - service/worker/domaindeprecation/module.go - service/worker/domaindeprecation/activities.go - docker/docker-compose-es.yml - docker/docker-compose-es-v7.yml For more information about what we're doing here, check out our docs: https://docs.gitbutler.com/features/virtual-branches/integration-branch * [domain-deletion]Add activities to list and terminate workflows * [domain-deletion]Add activities to list and terminate workflows * Replace the entityNotExistsError with cadence customer error * Add rate limiter * Fix workflow tests * Add listAndTerminate activity * Update comments * Run batch terminate workflow as child workflow * Update child workflow ID --------- Co-authored-by: GitButler <[email protected]>
1 parent 5f3ce46 commit dbfed08

File tree

12 files changed

+205
-60
lines changed

12 files changed

+205
-60
lines changed

service/worker/batcher/batcher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func New(params *BootstrapParams) *Batcher {
8989
// Start starts the scanner
9090
func (s *Batcher) Start() error {
9191
// start worker for batch operation workflows
92-
ctx := context.WithValue(context.Background(), batcherContextKey, s)
92+
ctx := context.WithValue(context.Background(), BatcherContextKey, s)
9393
workerOpts := worker.Options{
9494
MetricsScope: s.tallyScope,
9595
BackgroundActivityContext: ctx,

service/worker/batcher/workflow.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type (
4545
)
4646

4747
const (
48-
batcherContextKey contextKey = "batcherContext"
48+
BatcherContextKey contextKey = "batcherContext"
4949
// BatcherTaskListName is the tasklist name
5050
BatcherTaskListName = "cadence-sys-batcher-tasklist"
5151
// BatchWFTypeName is the workflow type
@@ -183,7 +183,7 @@ func setDefaultParams(params BatchParams) BatchParams {
183183

184184
// BatchActivity is activity for processing batch operation
185185
func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetails, error) {
186-
batcher := ctx.Value(batcherContextKey).(*Batcher)
186+
batcher := ctx.Value(BatcherContextKey).(*Batcher)
187187
client := batcher.clientBean.GetFrontendClient()
188188
var adminClient admin.Client
189189
if batchParams.BatchType == BatchTypeReplicate {
@@ -284,7 +284,7 @@ func BatchActivity(ctx context.Context, batchParams BatchParams) (HeartBeatDetai
284284
func getHeartBeatDetails(ctx context.Context) (hbd HeartBeatDetails, ok bool) {
285285
if activity.HasHeartbeatDetails(ctx) {
286286
if err := activity.GetHeartbeatDetails(ctx, &hbd); err != nil {
287-
batcher := ctx.Value(batcherContextKey).(*Batcher)
287+
batcher := ctx.Value(BatcherContextKey).(*Batcher)
288288
batcher.metricsClient.IncCounter(metrics.BatcherScope, metrics.BatcherProcessorFailures)
289289
getActivityLogger(ctx).Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err))
290290
return HeartBeatDetails{}, false
@@ -305,7 +305,7 @@ func startTaskProcessor(
305305
client frontend.Client,
306306
adminClient admin.Client,
307307
) {
308-
batcher := ctx.Value(batcherContextKey).(*Batcher)
308+
batcher := ctx.Value(BatcherContextKey).(*Batcher)
309309
for {
310310
select {
311311
case <-ctx.Done():
@@ -461,7 +461,7 @@ func isDone(ctx context.Context) bool {
461461
}
462462

463463
func getActivityLogger(ctx context.Context) log.Logger {
464-
batcher := ctx.Value(batcherContextKey).(*Batcher)
464+
batcher := ctx.Value(BatcherContextKey).(*Batcher)
465465
wfInfo := activity.GetInfo(ctx)
466466
return batcher.logger.WithTags(
467467
tag.WorkflowID(wfInfo.WorkflowExecution.ID),

service/worker/batcher/workflow_retry_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (s *workflowRetrySuite) SetupTest() {
7272
mockResource.FrontendClient.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
7373
mockResource.FrontendClient.EXPECT().DescribeWorkflowExecution(gomock.Any(), gomock.Any()).Return(&types.DescribeWorkflowExecutionResponse{}, nil).AnyTimes()
7474

75-
ctx := context.WithValue(context.Background(), batcherContextKey, batcher)
75+
ctx := context.WithValue(context.Background(), BatcherContextKey, batcher)
7676
workerOpts := worker.Options{
7777
MetricsScope: tally.TestScope(nil),
7878
BackgroundActivityContext: ctx,

service/worker/batcher/workflow_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (s *workflowSuite) SetupTest() {
7777

7878
mockResource.RemoteAdminClient.EXPECT().ResendReplicationTasks(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
7979

80-
ctx := context.WithValue(context.Background(), batcherContextKey, batcher)
80+
ctx := context.WithValue(context.Background(), BatcherContextKey, batcher)
8181
workerOpts := worker.Options{
8282
MetricsScope: tally.TestScope(nil),
8383
BackgroundActivityContext: ctx,

service/worker/domaindeprecation/activities.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,38 +25,38 @@ import (
2525
"errors"
2626
"fmt"
2727

28+
"go.uber.org/cadence"
29+
2830
"github.com/uber/cadence/common/log/tag"
2931
"github.com/uber/cadence/common/types"
3032
)
3133

3234
// DisableArchivalActivity disables archival for the domain
33-
func (w *domainDeprecator) DisableArchivalActivity(ctx context.Context, domainName string) error {
35+
func (w *domainDeprecator) DisableArchivalActivity(ctx context.Context, params DomainActivityParams) error {
3436
client := w.clientBean.GetFrontendClient()
3537
disabled := types.ArchivalStatusDisabled
3638

3739
describeRequest := &types.DescribeDomainRequest{
38-
Name: &domainName,
40+
Name: &params.DomainName,
3941
}
4042
domainResp, err := client.DescribeDomain(ctx, describeRequest)
4143
if err != nil {
4244
var entityNotExistsError *types.EntityNotExistsError
4345
if errors.As(err, &entityNotExistsError) {
44-
return types.EntityNotExistsError{Message: errDomainDoesNotExistNonRetryable}
46+
return cadence.NewCustomError(ErrDomainDoesNotExistNonRetryable)
4547
}
46-
4748
return fmt.Errorf("failed to describe domain: %v", err)
48-
4949
}
5050

5151
// Check if archival is already disabled
5252
if *domainResp.Configuration.VisibilityArchivalStatus == disabled &&
5353
*domainResp.Configuration.HistoryArchivalStatus == disabled {
54-
w.logger.Info("Archival is already disabled for domain", tag.WorkflowDomainName(domainName))
54+
w.logger.Info("Archival is already disabled for domain", tag.WorkflowDomainName(params.DomainName))
5555
return nil
5656
}
5757

5858
updateRequest := &types.UpdateDomainRequest{
59-
Name: domainName,
59+
Name: params.DomainName,
6060
HistoryArchivalStatus: &disabled,
6161
VisibilityArchivalStatus: &disabled,
6262
SecurityToken: w.cfg.AdminOperationToken(),
@@ -68,19 +68,19 @@ func (w *domainDeprecator) DisableArchivalActivity(ctx context.Context, domainNa
6868

6969
if *updateResp.Configuration.VisibilityArchivalStatus != disabled ||
7070
*updateResp.Configuration.HistoryArchivalStatus != disabled {
71-
return fmt.Errorf("failed to disable archival for domain %s", domainName)
71+
return fmt.Errorf("failed to disable archival for domain %s", params.DomainName)
7272
}
7373

74-
w.logger.Info("Disabled archival for domain", tag.WorkflowDomainName(domainName))
74+
w.logger.Info("Disabled archival for domain", tag.WorkflowDomainName(params.DomainName))
7575
return nil
7676
}
7777

7878
// DeprecateDomainActivity deprecates the domain
79-
func (w *domainDeprecator) DeprecateDomainActivity(ctx context.Context, domainName string) error {
79+
func (w *domainDeprecator) DeprecateDomainActivity(ctx context.Context, params DomainActivityParams) error {
8080
client := w.clientBean.GetFrontendClient()
8181

8282
err := client.DeprecateDomain(ctx, &types.DeprecateDomainRequest{
83-
Name: domainName,
83+
Name: params.DomainName,
8484
SecurityToken: w.cfg.AdminOperationToken(),
8585
})
8686
if err != nil {

service/worker/domaindeprecation/activities_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ func TestDisableArchivalActivity(t *testing.T) {
129129
for _, tt := range tests {
130130
t.Run(tt.name, func(t *testing.T) {
131131
tt.setupMocks()
132-
err := deprecator.DisableArchivalActivity(context.Background(), testDomain)
132+
err := deprecator.DisableArchivalActivity(context.Background(), DomainActivityParams{
133+
DomainName: testDomain,
134+
})
133135
if tt.expectedError != nil {
134136
assert.Error(t, err)
135137
} else {
@@ -163,14 +165,14 @@ func TestDeprecateDomainActivity(t *testing.T) {
163165
expectedError error
164166
}{
165167
{
166-
name: "Success - Deprecate domain",
168+
name: "Success",
167169
setupMocks: func() {
168170
mockClient.EXPECT().DeprecateDomain(gomock.Any(), gomock.Any()).Return(nil)
169171
},
170172
expectedError: nil,
171173
},
172174
{
173-
name: "Error - Deprecate domain",
175+
name: "Error",
174176
setupMocks: func() {
175177
mockClient.EXPECT().DeprecateDomain(gomock.Any(), gomock.Any()).Return(assert.AnError)
176178
},
@@ -181,7 +183,9 @@ func TestDeprecateDomainActivity(t *testing.T) {
181183
for _, tt := range tests {
182184
t.Run(tt.name, func(t *testing.T) {
183185
tt.setupMocks()
184-
err := deprecator.DeprecateDomainActivity(context.Background(), testDomain)
186+
err := deprecator.DeprecateDomainActivity(context.Background(), DomainActivityParams{
187+
DomainName: testDomain,
188+
})
185189
if tt.expectedError != nil {
186190
assert.Error(t, err)
187191
} else {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// The MIT License (MIT)
2+
3+
// Copyright (c) 2017-2020 Uber Technologies Inc.
4+
5+
// Permission is hereby granted, free of charge, to any person obtaining a copy
6+
// of this software and associated documentation files (the "Software"), to deal
7+
// in the Software without restriction, including without limitation the rights
8+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
// copies of the Software, and to permit persons to whom the Software is
10+
// furnished to do so, subject to the following conditions:
11+
//
12+
// The above copyright notice and this permission notice shall be included in all
13+
// copies or substantial portions of the Software.
14+
//
15+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
// SOFTWARE.
22+
23+
package domaindeprecation
24+
25+
// DomainActivityParams contains the domain name parameter used by domain-related activities.
26+
type DomainActivityParams struct {
27+
DomainName string `json:"domain_name"`
28+
}

service/worker/domaindeprecation/helpers.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,26 @@
2222

2323
package domaindeprecation
2424

25-
// Error reason strings used for Cadence non-retryable errors
25+
import "time"
26+
2627
const (
27-
errDomainDoesNotExistNonRetryable = "domain does not exist"
28+
// ErrDomainDoesNotExistNonRetryable is error reason used for Cadence non-retryable errors
29+
ErrDomainDoesNotExistNonRetryable = "domain does not exist"
30+
// ErrAccessDeniedNonRetryable is permission error that require manual intervention
31+
ErrAccessDeniedNonRetryable = "AccessDeniedError"
32+
// ErrWorkflowAlreadyCompletedNonRetryable is error that indicates workflow execution already completed
33+
ErrWorkflowAlreadyCompletedNonRetryable = "WorkflowExecutionAlreadyCompletedError"
34+
35+
// DefaultRPS is the default RPS
36+
DefaultRPS = 50
37+
// DefaultConcurrency is the default concurrency
38+
DefaultConcurrency = 5
39+
// DefaultPageSize is the default page size
40+
DefaultPageSize = 1000
41+
// DefaultAttemptsOnRetryableError is the default value for AttemptsOnRetryableError
42+
DefaultAttemptsOnRetryableError = 50
43+
// DefaultActivityHeartBeatTimeout is the default value for ActivityHeartBeatTimeout
44+
DefaultActivityHeartBeatTimeout = time.Second * 10
45+
// DefaultMaxActivityRetries is the default value for MaxActivityRetries
46+
DefaultMaxActivityRetries = 4
2847
)

service/worker/domaindeprecation/module.go

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
"github.com/uber/cadence/common/constants"
3737
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
3838
"github.com/uber/cadence/common/log"
39+
"github.com/uber/cadence/common/metrics"
40+
"github.com/uber/cadence/service/worker/batcher"
3941
)
4042

4143
type (
@@ -51,18 +53,20 @@ type (
5153
}
5254

5355
domainDeprecator struct {
54-
cfg Config
55-
svcClient workflowserviceclient.Interface
56-
clientBean client.Bean
57-
worker worker.Worker
58-
tally tally.Scope
59-
logger log.Logger
56+
cfg Config
57+
svcClient workflowserviceclient.Interface
58+
clientBean client.Bean
59+
metricsClient metrics.Client
60+
worker worker.Worker
61+
tally tally.Scope
62+
logger log.Logger
6063
}
6164

6265
Params struct {
6366
Config Config
6467
ServiceClient workflowserviceclient.Interface
6568
ClientBean client.Bean
69+
MetricsClient metrics.Client
6670
Tally tally.Scope
6771
Logger log.Logger
6872
}
@@ -71,27 +75,42 @@ type (
7175
// New creates a new domain deprecation workflow.
7276
func New(params Params) DomainDeprecationWorker {
7377
return &domainDeprecator{
74-
cfg: params.Config,
75-
svcClient: params.ServiceClient,
76-
clientBean: params.ClientBean,
77-
tally: params.Tally,
78-
logger: params.Logger,
78+
cfg: params.Config,
79+
svcClient: params.ServiceClient,
80+
clientBean: params.ClientBean,
81+
metricsClient: params.MetricsClient,
82+
tally: params.Tally,
83+
logger: params.Logger,
7984
}
8085
}
8186

8287
// Start starts the worker
8388
func (w *domainDeprecator) Start() error {
89+
batcherParams := &batcher.BootstrapParams{
90+
Config: batcher.Config{
91+
AdminOperationToken: w.cfg.AdminOperationToken,
92+
},
93+
ServiceClient: w.svcClient,
94+
ClientBean: w.clientBean,
95+
MetricsClient: w.metricsClient,
96+
Logger: w.logger,
97+
TallyScope: w.tally,
98+
}
99+
batcherInstance := batcher.New(batcherParams)
100+
101+
ctx := context.WithValue(context.Background(), batcher.BatcherContextKey, batcherInstance)
102+
84103
workerOpts := worker.Options{
85104
MetricsScope: w.tally,
86-
BackgroundActivityContext: context.Background(),
105+
BackgroundActivityContext: ctx,
87106
Tracer: opentracing.GlobalTracer(),
88107
MaxConcurrentActivityTaskPollers: 10,
89108
MaxConcurrentDecisionTaskPollers: 10,
90109
}
91110
newWorker := worker.New(w.svcClient, constants.SystemLocalDomainName, domainDeprecationTaskListName, workerOpts)
92111
newWorker.RegisterWorkflowWithOptions(w.DomainDeprecationWorkflow, workflow.RegisterOptions{Name: domainDeprecationWorkflowTypeName})
93-
newWorker.RegisterActivityWithOptions(w.DisableArchivalActivity, activity.RegisterOptions{Name: disableArchivalActivity})
94-
newWorker.RegisterActivityWithOptions(w.DeprecateDomainActivity, activity.RegisterOptions{Name: deprecateDomainActivity})
112+
newWorker.RegisterActivityWithOptions(w.DisableArchivalActivity, activity.RegisterOptions{Name: disableArchivalActivity, EnableAutoHeartbeat: true})
113+
newWorker.RegisterActivityWithOptions(w.DeprecateDomainActivity, activity.RegisterOptions{Name: deprecateDomainActivity, EnableAutoHeartbeat: true})
95114
w.worker = newWorker
96115
return newWorker.Start()
97116
}

0 commit comments

Comments
 (0)