Skip to content

Commit 18819e3

Browse files
committed
Optimize DynamicRateLimiter to not constantly re-evaluate RPS
While dynamic config lookups are relatively cheap, they're certainly not free and perform several allocations further contributing to GC times. Making matters worse, quotas.RateLimiter has some strange TTL logic such that the result of evaluating the dynamic config value isn't used more than once a minute unless it's lower than the current value. Delete quotas.RateLimiter in favor of clock.RateLimiter and move the TTL to DynamicRateLimiter. Reduce the TTL to a more reasonable value and only evaluate the function if the time has elapsed. Remove the logic allowing the rate change to bypass the TTL if its lower than the current rate. This requires evaluating the RPS value constantly. Instead we've shortened the TTL such that we'll reliably pick up changes within a few seconds regardless of the direction of the change. The main place that the TTL logic seems to be relevant is the Task Dispatch limiter within Matching. Each poller includes an RPS limit and we'd attempt to update the RPS on each request. This is the only place that explicitly provided a TTL to quotas.RateLimiter (60s) rather than relying on the default. Change the Matching Rate limiter to use DynamicRateLimiter so that it also only updates according to its TTL. This is a change in behavior and will make Matching less responsive to changes specified by user requests. It still complies with the "take most recent value" behavior that is advertised.
1 parent 3c2837b commit 18819e3

File tree

19 files changed

+367
-240
lines changed

19 files changed

+367
-240
lines changed

common/clock/ratelimiter.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ type (
7070
SetBurst(newBurst int)
7171
// SetLimit sets the Limit value
7272
SetLimit(newLimit rate.Limit)
73+
74+
SetLimitAndBurst(newLimit rate.Limit, newBurst int)
7375
// Tokens returns the number of immediately-allowable events when called.
7476
// Values >= 1 will lead to Allow returning true.
7577
//
@@ -338,6 +340,14 @@ func (r *ratelimiter) SetLimit(newLimit rate.Limit) {
338340
r.limiter.SetLimitAt(r.latestNow, newLimit)
339341
}
340342

343+
func (r *ratelimiter) SetLimitAndBurst(newLimit rate.Limit, newBurst int) {
344+
r.mut.Lock()
345+
defer r.mut.Unlock()
346+
// See SetLimit and SetBurst for more information
347+
r.limiter.SetLimitAt(r.latestNow, newLimit)
348+
r.limiter.SetBurstAt(r.latestNow, newBurst)
349+
}
350+
341351
func (r *ratelimiter) Tokens() float64 {
342352
now, unlock := r.lockNow()
343353
defer unlock()

common/dynamicconfig/quotas/fallbackdynamicratelimiterfactory.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,9 @@ type fallbackDynamicRateLimiterFactory struct {
4949
// GetLimiter returns a new Limiter for the given domain
5050
func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) quotas.Limiter {
5151
return quotas.NewDynamicRateLimiter(func() float64 {
52-
return limitWithFallback(
53-
float64(f.primary(domain)),
54-
float64(f.secondary()))
52+
if primaryLimit := f.primary(domain); primaryLimit > 0 {
53+
return float64(primaryLimit)
54+
}
55+
return float64(f.secondary())
5556
})
5657
}
57-
58-
func limitWithFallback(primary, secondary float64) float64 {
59-
if primary > 0 {
60-
return primary
61-
}
62-
return secondary
63-
}

common/quotas/dynamicratelimiter.go

Lines changed: 68 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,48 +24,103 @@ package quotas
2424

2525
import (
2626
"context"
27+
"math"
28+
"sync/atomic"
29+
"time"
2730

2831
"golang.org/x/time/rate"
2932

3033
"github.com/uber/cadence/common/clock"
3134
)
3235

36+
const _ttl = time.Second * 5
37+
const _minBurst = 1
38+
3339
// DynamicRateLimiter implements a dynamic config wrapper around the rate limiter,
3440
// checks for updates to the dynamic config and updates the rate limiter accordingly
3541
type DynamicRateLimiter struct {
36-
rps RPSFunc
37-
rl *RateLimiter
42+
rps RPSFunc
43+
rl clock.Ratelimiter
44+
timeSource clock.TimeSource
45+
ttl time.Duration
46+
lastUpdateTime atomic.Value
47+
minBurst int
48+
}
49+
50+
type DynamicRateLimiterOpt = func(limiter *DynamicRateLimiter)
51+
52+
func WithTTL(ttl time.Duration) DynamicRateLimiterOpt {
53+
return func(limiter *DynamicRateLimiter) {
54+
limiter.ttl = ttl
55+
}
56+
}
57+
58+
func WithMinBurst(minBurst int) DynamicRateLimiterOpt {
59+
return func(limiter *DynamicRateLimiter) {
60+
limiter.minBurst = minBurst
61+
}
62+
}
63+
64+
func WithTimeSource(timeSource clock.TimeSource) DynamicRateLimiterOpt {
65+
return func(limiter *DynamicRateLimiter) {
66+
limiter.timeSource = timeSource
67+
}
3868
}
3969

4070
// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
41-
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
42-
initialRps := rps()
43-
rl := NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
44-
return &DynamicRateLimiter{rps, rl}
71+
func NewDynamicRateLimiter(rps RPSFunc, opts ...DynamicRateLimiterOpt) Limiter {
72+
res := &DynamicRateLimiter{
73+
rps: rps,
74+
timeSource: clock.NewRealTimeSource(),
75+
ttl: _ttl,
76+
minBurst: _minBurst,
77+
}
78+
for _, opt := range opts {
79+
opt(res)
80+
}
81+
res.lastUpdateTime.Store(res.timeSource.Now())
82+
res.rl = clock.NewRatelimiter(res.getLimitAndBurst())
83+
return res
4584
}
4685

4786
// Allow immediately returns with true or false indicating if a rate limit
4887
// token is available or not
4988
func (d *DynamicRateLimiter) Allow() bool {
50-
rps := d.rps()
51-
d.rl.UpdateMaxDispatch(&rps)
89+
d.maybeRefreshRps()
5290
return d.rl.Allow()
5391
}
5492

5593
// Wait waits up till deadline for a rate limit token
5694
func (d *DynamicRateLimiter) Wait(ctx context.Context) error {
57-
rps := d.rps()
58-
d.rl.UpdateMaxDispatch(&rps)
95+
d.maybeRefreshRps()
5996
return d.rl.Wait(ctx)
6097
}
6198

6299
// Reserve reserves a rate limit token
63100
func (d *DynamicRateLimiter) Reserve() clock.Reservation {
64-
rps := d.rps()
65-
d.rl.UpdateMaxDispatch(&rps)
101+
d.maybeRefreshRps()
66102
return d.rl.Reserve()
67103
}
68104

69105
func (d *DynamicRateLimiter) Limit() rate.Limit {
70-
return rate.Limit(d.rps())
106+
d.maybeRefreshRps()
107+
return d.rl.Limit()
108+
}
109+
110+
func (d *DynamicRateLimiter) maybeRefreshRps() {
111+
now := d.timeSource.Now()
112+
lastUpdated := d.lastUpdateTime.Load().(time.Time)
113+
if now.After(lastUpdated.Add(d.ttl-1)) && d.lastUpdateTime.CompareAndSwap(lastUpdated, now) {
114+
d.rl.SetLimitAndBurst(d.getLimitAndBurst())
115+
}
116+
}
117+
118+
func (d *DynamicRateLimiter) getLimitAndBurst() (rate.Limit, int) {
119+
rps := d.rps()
120+
burst := max(int(math.Ceil(rps)), d.minBurst)
121+
// If we have 0 rps we have to zero out the burst to immediately cut off new permits
122+
if rps == 0 {
123+
burst = 0
124+
}
125+
return rate.Limit(rps), burst
71126
}

common/quotas/global/collection/internal/shadowed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func TestShadowed(t *testing.T) {
189189
})
190190
})
191191
t.Run("limit", func(t *testing.T) {
192-
l := NewShadowedLimiter(&allowlimiter{}, quotas.NewSimpleRateLimiter(t, 0))
192+
l := NewShadowedLimiter(&allowlimiter{}, clock.NewRatelimiter(rate.Limit(0), 1))
193193
assert.Equal(t, rate.Inf, l.Limit(), "should return the primary limit, not shadowed")
194194
})
195195
}

0 commit comments

Comments
 (0)