Skip to content

Commit 8da3ba4

Browse files
authored
Deprecate shard level task scheduler (#6691)
1 parent 1fe9376 commit 8da3ba4

File tree

11 files changed

+6
-238
lines changed

11 files changed

+6
-238
lines changed

common/dynamicconfig/constants.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -919,23 +919,15 @@ const (
919919
// Default value: 200
920920
// Allowed filters: N/A
921921
TaskSchedulerWorkerCount
922-
// TaskSchedulerShardWorkerCount is the number of worker per shard in task scheduler
923-
// KeyName: history.taskSchedulerShardWorkerCount
924-
// Value type: Int
925-
// Default value: 0
926-
// Allowed filters: N/A
922+
// TaskSchedulerShardWorkerCount is deprecated
927923
TaskSchedulerShardWorkerCount
928924
// TaskSchedulerQueueSize is the size of task channel for host level task scheduler
929925
// KeyName: history.taskSchedulerQueueSize
930926
// Value type: Int
931927
// Default value: 10000
932928
// Allowed filters: N/A
933929
TaskSchedulerQueueSize
934-
// TaskSchedulerShardQueueSize is the size of task channel for shard level task scheduler
935-
// KeyName: history.taskSchedulerShardQueueSize
936-
// Value type: Int
937-
// Default value: 200
938-
// Allowed filters: N/A
930+
// TaskSchedulerShardQueueSize is deprecated
939931
TaskSchedulerShardQueueSize
940932
// TaskSchedulerDispatcherCount is the number of task dispatcher in task scheduler (only applies to host level task scheduler)
941933
// KeyName: history.taskSchedulerDispatcherCount
@@ -3377,7 +3369,7 @@ var IntKeys = map[IntKey]DynamicInt{
33773369
},
33783370
TaskSchedulerShardWorkerCount: {
33793371
KeyName: "history.taskSchedulerShardWorkerCount",
3380-
Description: "TaskSchedulerShardWorkerCount is the number of worker per shard in task scheduler",
3372+
Description: "Deprecated",
33813373
DefaultValue: 0,
33823374
},
33833375
TaskSchedulerQueueSize: {
@@ -3387,7 +3379,7 @@ var IntKeys = map[IntKey]DynamicInt{
33873379
},
33883380
TaskSchedulerShardQueueSize: {
33893381
KeyName: "history.taskSchedulerShardQueueSize",
3390-
Description: "TaskSchedulerShardQueueSize is the size of task channel for shard level task scheduler",
3382+
Description: "Deprecated",
33913383
DefaultValue: 200,
33923384
},
33933385
TaskSchedulerDispatcherCount: {

service/history/config/config.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,7 @@ type Config struct {
9191
TaskProcessRPS dynamicconfig.IntPropertyFnWithDomainFilter
9292
TaskSchedulerType dynamicconfig.IntPropertyFn
9393
TaskSchedulerWorkerCount dynamicconfig.IntPropertyFn
94-
TaskSchedulerShardWorkerCount dynamicconfig.IntPropertyFn
9594
TaskSchedulerQueueSize dynamicconfig.IntPropertyFn
96-
TaskSchedulerShardQueueSize dynamicconfig.IntPropertyFn
9795
TaskSchedulerDispatcherCount dynamicconfig.IntPropertyFn
9896
TaskSchedulerRoundRobinWeights dynamicconfig.MapPropertyFn
9997
TaskSchedulerGlobalDomainRPS dynamicconfig.IntPropertyFnWithDomainFilter
@@ -371,9 +369,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, i
371369
TaskProcessRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskProcessRPS),
372370
TaskSchedulerType: dc.GetIntProperty(dynamicconfig.TaskSchedulerType),
373371
TaskSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerWorkerCount),
374-
TaskSchedulerShardWorkerCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardWorkerCount),
375372
TaskSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerQueueSize),
376-
TaskSchedulerShardQueueSize: dc.GetIntProperty(dynamicconfig.TaskSchedulerShardQueueSize),
377373
TaskSchedulerDispatcherCount: dc.GetIntProperty(dynamicconfig.TaskSchedulerDispatcherCount),
378374
TaskSchedulerRoundRobinWeights: dc.GetMapProperty(dynamicconfig.TaskSchedulerRoundRobinWeights),
379375
TaskSchedulerGlobalDomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.TaskSchedulerGlobalDomainRPS),

service/history/config/config_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,9 @@ func TestNewConfig(t *testing.T) {
9494
"TaskProcessRPS": {dynamicconfig.TaskProcessRPS, 30},
9595
"TaskSchedulerType": {dynamicconfig.TaskSchedulerType, 31},
9696
"TaskSchedulerWorkerCount": {dynamicconfig.TaskSchedulerWorkerCount, 32},
97-
"TaskSchedulerShardWorkerCount": {dynamicconfig.TaskSchedulerShardWorkerCount, 33},
9897
"TaskSchedulerQueueSize": {dynamicconfig.TaskSchedulerQueueSize, 34},
9998
"TaskSchedulerDispatcherCount": {dynamicconfig.TaskSchedulerDispatcherCount, 35},
10099
"TaskSchedulerRoundRobinWeights": {dynamicconfig.TaskSchedulerRoundRobinWeights, map[string]interface{}{"key": 1}},
101-
"TaskSchedulerShardQueueSize": {dynamicconfig.TaskSchedulerShardQueueSize, 36},
102100
"TaskCriticalRetryCount": {dynamicconfig.TaskCriticalRetryCount, 37},
103101
"ActiveTaskRedispatchInterval": {dynamicconfig.ActiveTaskRedispatchInterval, time.Second},
104102
"StandbyTaskRedispatchInterval": {dynamicconfig.StandbyTaskRedispatchInterval, time.Second},

service/history/engine/engineimpl/history_engine.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,6 @@ func (e *historyEngineImpl) Stop() {
376376
replicationTaskProcessor.Stop()
377377
}
378378

379-
if e.queueTaskProcessor != nil {
380-
e.queueTaskProcessor.StopShardProcessor(e.shard)
381-
}
382-
383379
e.failoverMarkerNotifier.Stop()
384380

385381
// unset the failover callback

service/history/engine/testdata/engine_for_tests.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ func NewEngineForTest(t *testing.T, newEngineFn NewEngineFn) *EngineForTest {
129129
replicatonTaskFetchers.EXPECT().GetFetchers().Return([]replication.TaskFetcher{replicationTaskFetcher}).AnyTimes()
130130

131131
queueTaskProcessor := task.NewMockProcessor(controller)
132-
queueTaskProcessor.EXPECT().StopShardProcessor(gomock.Any()).Return().Times(1)
133132

134133
failoverCoordinator := failover.NewMockCoordinator(controller)
135134
wfIDCache := workflowcache.NewMockWFCache(controller)

service/history/task/interface.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ type (
8989
// Processor is the worker pool for processing Tasks
9090
Processor interface {
9191
common.Daemon
92-
StopShardProcessor(shard.Context)
9392
Submit(Task) error
9493
TrySubmit(Task) (bool, error)
9594
}

service/history/task/interface_mock.go

Lines changed: 0 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/task/processor.go

Lines changed: 1 addition & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,16 @@ import (
3434
"github.com/uber/cadence/common/metrics"
3535
"github.com/uber/cadence/common/task"
3636
"github.com/uber/cadence/service/history/config"
37-
"github.com/uber/cadence/service/history/shard"
3837
)
3938

4039
type processorImpl struct {
4140
sync.RWMutex
4241

4342
priorityAssigner PriorityAssigner
4443
hostScheduler task.Scheduler
45-
shardSchedulers map[shard.Context]task.Scheduler
4644

4745
status int32
4846
options *task.SchedulerOptions[int]
49-
shardOptions *task.SchedulerOptions[int]
5047
logger log.Logger
5148
metricsClient metrics.Client
5249
timeSource clock.TimeSource
@@ -96,29 +93,11 @@ func NewProcessor(
9693
}
9794
logger.Debug("Host level task scheduler is created", tag.Dynamic("scheduler_options", options.String()))
9895

99-
var shardOptions *task.SchedulerOptions[int]
100-
if config.TaskSchedulerShardWorkerCount() > 0 {
101-
shardOptions, err = task.NewSchedulerOptions[int](
102-
config.TaskSchedulerType(),
103-
config.TaskSchedulerShardQueueSize(),
104-
config.TaskSchedulerShardWorkerCount,
105-
1,
106-
taskToChannelKeyFn,
107-
channelKeyToWeightFn,
108-
)
109-
if err != nil {
110-
return nil, err
111-
}
112-
logger.Debug("Shard level task scheduler is enabled", tag.Dynamic("scheduler_options", shardOptions.String()))
113-
}
114-
11596
return &processorImpl{
11697
priorityAssigner: priorityAssigner,
11798
hostScheduler: hostScheduler,
118-
shardSchedulers: make(map[shard.Context]task.Scheduler),
11999
status: common.DaemonStatusInitialized,
120100
options: options,
121-
shardOptions: shardOptions,
122101
logger: logger,
123102
metricsClient: metricsClient,
124103
timeSource: timeSource,
@@ -142,56 +121,13 @@ func (p *processorImpl) Stop() {
142121

143122
p.hostScheduler.Stop()
144123

145-
p.Lock()
146-
defer p.Unlock()
147-
148-
for shard, scheduler := range p.shardSchedulers {
149-
delete(p.shardSchedulers, shard)
150-
scheduler.Stop()
151-
}
152-
153124
p.logger.Info("Queue task processor stopped.")
154125
}
155126

156-
func (p *processorImpl) StopShardProcessor(shard shard.Context) {
157-
p.Lock()
158-
scheduler, ok := p.shardSchedulers[shard]
159-
if !ok {
160-
p.Unlock()
161-
return
162-
}
163-
164-
delete(p.shardSchedulers, shard)
165-
p.Unlock()
166-
167-
// don't hold the lock while stopping the scheduler
168-
scheduler.Stop()
169-
}
170-
171127
func (p *processorImpl) Submit(task Task) error {
172128
if err := p.priorityAssigner.Assign(task); err != nil {
173129
return err
174130
}
175-
176-
submitted, err := p.hostScheduler.TrySubmit(task)
177-
if err != nil {
178-
return err
179-
}
180-
181-
if submitted {
182-
return nil
183-
}
184-
185-
shardScheduler, err := p.getOrCreateShardTaskScheduler(task.GetShard())
186-
if err != nil {
187-
return err
188-
}
189-
190-
if shardScheduler != nil {
191-
return shardScheduler.Submit(task)
192-
}
193-
194-
// if shard level scheduler is disabled
195131
return p.hostScheduler.Submit(task)
196132
}
197133

@@ -200,67 +136,7 @@ func (p *processorImpl) TrySubmit(task Task) (bool, error) {
200136
return false, err
201137
}
202138

203-
submitted, err := p.hostScheduler.TrySubmit(task)
204-
if err != nil {
205-
return false, err
206-
}
207-
208-
if submitted {
209-
return true, nil
210-
}
211-
212-
shardScheduler, err := p.getOrCreateShardTaskScheduler(task.GetShard())
213-
if err != nil {
214-
return false, err
215-
}
216-
217-
if shardScheduler != nil {
218-
return shardScheduler.TrySubmit(task)
219-
}
220-
221-
// if shard level scheduler is disabled
222-
return false, nil
223-
}
224-
225-
func (p *processorImpl) getOrCreateShardTaskScheduler(shard shard.Context) (task.Scheduler, error) {
226-
if p.shardOptions == nil {
227-
return nil, nil
228-
}
229-
230-
p.RLock()
231-
if scheduler, ok := p.shardSchedulers[shard]; ok {
232-
p.RUnlock()
233-
return scheduler, nil
234-
}
235-
p.RUnlock()
236-
237-
p.Lock()
238-
if scheduler, ok := p.shardSchedulers[shard]; ok {
239-
p.Unlock()
240-
return scheduler, nil
241-
}
242-
243-
if !p.isRunning() {
244-
p.Unlock()
245-
return nil, errTaskProcessorNotRunning
246-
}
247-
248-
scheduler, err := createTaskScheduler(p.shardOptions, p.logger.WithTags(tag.ShardID(shard.GetShardID())), p.metricsClient, p.timeSource)
249-
if err != nil {
250-
p.Unlock()
251-
return nil, err
252-
}
253-
254-
p.shardSchedulers[shard] = scheduler
255-
p.Unlock()
256-
257-
// don't hold the lock while starting the scheduler
258-
scheduler.Start()
259-
p.logger.Debug("Shard level task scheduler is started",
260-
tag.ShardID(shard.GetShardID()),
261-
tag.Dynamic("scheduler_options", p.shardOptions.String()),
262-
)
263-
return scheduler, nil
139+
return p.hostScheduler.TrySubmit(task)
264140
}
265141

266142
func (p *processorImpl) isRunning() bool {

service/history/task/processor_test.go

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -99,88 +99,24 @@ func (s *queueTaskProcessorSuite) TestIsRunning() {
9999
s.False(s.processor.isRunning())
100100
}
101101

102-
func (s *queueTaskProcessorSuite) TestGetOrCreateShardTaskScheduler_ProcessorNotRunning() {
103-
scheduler, err := s.processor.getOrCreateShardTaskScheduler(s.mockShard)
104-
s.Equal(errTaskProcessorNotRunning, err)
105-
s.Nil(scheduler)
106-
}
107-
108-
func (s *queueTaskProcessorSuite) TestGetOrCreateShardTaskScheduler_ShardProcessorAlreadyExists() {
109-
mockScheduler := task.NewMockScheduler(s.controller)
110-
mockScheduler.EXPECT().Stop().Times(1)
111-
s.processor.shardSchedulers[s.mockShard] = mockScheduler
112-
113-
s.processor.Start()
114-
defer s.processor.Stop()
115-
scheduler, err := s.processor.getOrCreateShardTaskScheduler(s.mockShard)
116-
s.NoError(err)
117-
s.Equal(mockScheduler, scheduler)
118-
}
119-
120-
func (s *queueTaskProcessorSuite) TestGetOrCreateShardTaskScheduler_ShardProcessorNotExist() {
121-
s.Empty(s.processor.shardSchedulers)
122-
123-
s.processor.Start()
124-
defer s.processor.Stop()
125-
scheduler, err := s.processor.getOrCreateShardTaskScheduler(s.mockShard)
126-
s.NoError(err)
127-
128-
s.Len(s.processor.shardSchedulers, 1)
129-
scheduler.Stop()
130-
}
131-
132-
func (s *queueTaskProcessorSuite) TestStopShardProcessor() {
133-
s.Empty(s.processor.shardSchedulers)
134-
s.processor.StopShardProcessor(s.mockShard)
135-
136-
mockScheduler := task.NewMockScheduler(s.controller)
137-
mockScheduler.EXPECT().Stop().Times(1)
138-
s.processor.shardSchedulers[s.mockShard] = mockScheduler
139-
140-
s.processor.StopShardProcessor(s.mockShard)
141-
s.Empty(s.processor.shardSchedulers)
142-
}
143-
144102
func (s *queueTaskProcessorSuite) TestStartStop() {
145103
mockScheduler := task.NewMockScheduler(s.controller)
146104
mockScheduler.EXPECT().Start().Times(1)
147105
mockScheduler.EXPECT().Stop().Times(1)
148106
s.processor.hostScheduler = mockScheduler
149107

150-
for i := 0; i != 10; i++ {
151-
mockShard := shard.NewTestContext(
152-
s.T(),
153-
s.controller,
154-
&persistence.ShardInfo{
155-
ShardID: 10,
156-
RangeID: 1,
157-
},
158-
config.NewForTest(),
159-
)
160-
mockShardScheduler := task.NewMockScheduler(s.controller)
161-
mockShardScheduler.EXPECT().Stop().Times(1)
162-
s.processor.shardSchedulers[mockShard] = mockShardScheduler
163-
}
164-
165108
s.processor.Start()
166109
s.processor.Stop()
167-
168-
s.Empty(s.processor.shardSchedulers)
169110
}
170111

171112
func (s *queueTaskProcessorSuite) TestSubmit() {
172113
mockTask := NewMockTask(s.controller)
173-
mockTask.EXPECT().GetShard().Return(s.mockShard).Times(1)
174114
s.mockPriorityAssigner.EXPECT().Assign(NewMockTaskMatcher(mockTask)).Return(nil).Times(1)
175115

176116
mockScheduler := task.NewMockScheduler(s.controller)
177-
mockScheduler.EXPECT().TrySubmit(NewMockTaskMatcher(mockTask)).Return(false, nil).Times(1)
178-
179-
mockShardScheduler := task.NewMockScheduler(s.controller)
180-
mockShardScheduler.EXPECT().Submit(NewMockTaskMatcher(mockTask)).Return(nil).Times(1)
117+
mockScheduler.EXPECT().Submit(NewMockTaskMatcher(mockTask)).Return(nil).Times(1)
181118

182119
s.processor.hostScheduler = mockScheduler
183-
s.processor.shardSchedulers[s.mockShard] = mockShardScheduler
184120

185121
err := s.processor.Submit(mockTask)
186122
s.NoError(err)
@@ -220,7 +156,6 @@ func (s *queueTaskProcessorSuite) TestNewSchedulerOptions_UnknownSchedulerType()
220156

221157
func (s *queueTaskProcessorSuite) newTestQueueTaskProcessor() *processorImpl {
222158
config := config.NewForTest()
223-
config.TaskSchedulerShardWorkerCount = dynamicconfig.GetIntPropertyFn(1)
224159
processor, err := NewProcessor(
225160
s.mockPriorityAssigner,
226161
config,

0 commit comments

Comments
 (0)