Skip to content

Commit 6c0af78

Browse files
committed
Optimize DynamicRateLimiter to not constantly re-evaluate RPS
While dynamic config lookups are relatively cheap, they're certainly not free and perform several allocations further contributing to GC times. Making matters worse, quotas.RateLimiter has some strange TTL logic such that the result of evaluating the dynamic config value isn't used more than once a minute unless it's lower than the current value. Delete quotas.RateLimiter in favor of clock.RateLimiter and move the TTL to DynamicRateLimiter. Reduce the TTL to a more reasonable value and only evaluate the function if the time has elapsed. Remove the logic allowing the rate change to bypass the TTL if its lower than the current rate. This requires evaluating the RPS value constantly. Instead we've shortened the TTL such that we'll reliably pick up changes within a few seconds regardless of the direction of the change. The main place that the TTL logic seems to be relevant is the Task Dispatch limiter within Matching. Each poller includes an RPS limit and we'd attempt to update the RPS on each request. This is the only place that explicitly provided a TTL to quotas.RateLimiter (60s) rather than relying on the default. Change the Matching Rate limiter to use DynamicRateLimiter so that it also only updates according to its TTL. This is a change in behavior and will make Matching less responsive to changes specified by user requests. It still complies with the "take most recent value" behavior that is advertised.
1 parent dbfed08 commit 6c0af78

20 files changed

+264
-257
lines changed

common/clock/ratelimiter.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ type (
7070
SetBurst(newBurst int)
7171
// SetLimit sets the Limit value
7272
SetLimit(newLimit rate.Limit)
73+
74+
SetLimitAndBurst(newLimit rate.Limit, newBurst int)
7375
// Tokens returns the number of immediately-allowable events when called.
7476
// Values >= 1 will lead to Allow returning true.
7577
//
@@ -205,7 +207,7 @@ func NewRatelimiter(lim rate.Limit, burst int) Ratelimiter {
205207
}
206208
}
207209

208-
func NewMockRatelimiter(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
210+
func NewRateLimiterWithTimeSource(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
209211
return &ratelimiter{
210212
timesource: ts,
211213
limiter: rate.NewLimiter(lim, burst),
@@ -338,6 +340,14 @@ func (r *ratelimiter) SetLimit(newLimit rate.Limit) {
338340
r.limiter.SetLimitAt(r.latestNow, newLimit)
339341
}
340342

343+
func (r *ratelimiter) SetLimitAndBurst(newLimit rate.Limit, newBurst int) {
344+
r.mut.Lock()
345+
defer r.mut.Unlock()
346+
// See SetLimit and SetBurst for more information
347+
r.limiter.SetLimitAt(r.latestNow, newLimit)
348+
r.limiter.SetBurstAt(r.latestNow, newBurst)
349+
}
350+
341351
func (r *ratelimiter) Tokens() float64 {
342352
now, unlock := r.lockNow()
343353
defer unlock()

common/clock/ratelimiter_bench_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func BenchmarkLimiter(b *testing.B) {
161161
})
162162
b.Run("mocked timesource", func(b *testing.B) {
163163
ts := NewMockedTimeSource()
164-
rl := NewMockRatelimiter(ts, rate.Every(normalLimit), burst)
164+
rl := NewRateLimiterWithTimeSource(ts, rate.Every(normalLimit), burst)
165165
runner(b, func(i int) bool {
166166
// adjusted by eye, to try to very roughly match the above values for the final runs.
167167
// anything not extremely higher or lower is probably fine.
@@ -260,7 +260,7 @@ func BenchmarkLimiter(b *testing.B) {
260260
})
261261
b.Run("mocked timesource", func(b *testing.B) {
262262
ts := NewMockedTimeSource()
263-
rl := NewMockRatelimiter(ts, rate.Every(normalLimit), burst)
263+
rl := NewRateLimiterWithTimeSource(ts, rate.Every(normalLimit), burst)
264264
runWrapped(b, rl, runner, func() {
265265
// any value should work, but smaller than normalLimit revealed
266266
// some issues in the past, where time-thrashing would lead to
@@ -322,7 +322,7 @@ func BenchmarkLimiter(b *testing.B) {
322322
})
323323
b.Run("mocked timesource", func(b *testing.B) {
324324
ts := NewMockedTimeSource()
325-
rl := NewMockRatelimiter(ts, limit, burst)
325+
rl := NewRateLimiterWithTimeSource(ts, limit, burst)
326326
ctx := &timesourceContext{
327327
ts: ts,
328328
deadline: time.Now().Add(timeout),

common/clock/ratelimiter_comparison_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -316,8 +316,8 @@ func assertRatelimitersBehaveSimilarly(t *testing.T, burst int, minWaitDuration
316316

317317
actual := rate.NewLimiter(limit, burst)
318318
wrapped := NewRatelimiter(limit, burst)
319-
mocked := NewMockRatelimiter(ts, limit, burst)
320-
compressed := NewMockRatelimiter(compressedTS, limit, burst)
319+
mocked := NewRateLimiterWithTimeSource(ts, limit, burst)
320+
compressed := NewRateLimiterWithTimeSource(compressedTS, limit, burst)
321321

322322
// record "to be executed" closures on the time-compressed ratelimiter too,
323323
// so it can be checked at a sped up rate.

common/clock/ratelimiter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func assertRatelimiterBasicsWork(t *testing.T, makeTimesource func() MockedTimeS
6767
if ts == nil {
6868
return NewRatelimiter(limit, burst), time.Sleep
6969
}
70-
return NewMockRatelimiter(ts, limit, burst), ts.Advance
70+
return NewRateLimiterWithTimeSource(ts, limit, burst), ts.Advance
7171
}
7272

7373
now := func(ts MockedTimeSource) time.Time {
@@ -466,7 +466,7 @@ func TestRatelimiterCoverage(t *testing.T) {
466466
})
467467
t.Run("mock limiter constructor", func(t *testing.T) {
468468
// covered by fuzz testing, but this gets it to 100% without fuzz.
469-
_ = NewMockRatelimiter(NewMockedTimeSource(), 1, 1)
469+
_ = NewRateLimiterWithTimeSource(NewMockedTimeSource(), 1, 1)
470470
})
471471
}
472472

common/dynamicconfig/quotas/fallbackdynamicratelimiterfactory.go

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,9 @@ type fallbackDynamicRateLimiterFactory struct {
4949
// GetLimiter returns a new Limiter for the given domain
5050
func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) quotas.Limiter {
5151
return quotas.NewDynamicRateLimiter(func() float64 {
52-
return limitWithFallback(
53-
float64(f.primary(domain)),
54-
float64(f.secondary()))
52+
if primaryLimit := f.primary(domain); primaryLimit > 0 {
53+
return float64(primaryLimit)
54+
}
55+
return float64(f.secondary())
5556
})
5657
}
57-
58-
func limitWithFallback(primary, secondary float64) float64 {
59-
if primary > 0 {
60-
return primary
61-
}
62-
return secondary
63-
}

common/quotas/dynamicratelimiter.go

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,48 +24,102 @@ package quotas
2424

2525
import (
2626
"context"
27+
"math"
28+
"sync/atomic"
29+
"time"
2730

2831
"golang.org/x/time/rate"
2932

3033
"github.com/uber/cadence/common/clock"
3134
)
3235

36+
const _ttl = time.Second * 5
37+
const _minBurst = 1
38+
3339
// DynamicRateLimiter implements a dynamic config wrapper around the rate limiter,
3440
// checks for updates to the dynamic config and updates the rate limiter accordingly
3541
type DynamicRateLimiter struct {
36-
rps RPSFunc
37-
rl *RateLimiter
42+
rps RPSFunc
43+
rl clock.Ratelimiter
44+
timeSource clock.TimeSource
45+
ttl time.Duration
46+
lastUpdateTime atomic.Pointer[time.Time]
47+
minBurst int
48+
}
49+
50+
type DynamicRateLimiterOpts struct {
51+
TTL time.Duration
52+
MinBurst int
53+
TimeSource clock.TimeSource
54+
}
55+
56+
var _defaultOpts = DynamicRateLimiterOpts{
57+
TTL: _ttl,
58+
MinBurst: _minBurst,
59+
TimeSource: clock.NewRealTimeSource(),
3860
}
3961

4062
// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
41-
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
42-
initialRps := rps()
43-
rl := NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
44-
return &DynamicRateLimiter{rps, rl}
63+
func NewDynamicRateLimiter(rps RPSFunc) Limiter {
64+
return NewDynamicRateLimiterWithOpts(rps, _defaultOpts)
65+
}
66+
67+
func NewDynamicRateLimiterWithOpts(rps RPSFunc, opts DynamicRateLimiterOpts) Limiter {
68+
ts := opts.TimeSource
69+
if ts == nil {
70+
ts = _defaultOpts.TimeSource
71+
}
72+
res := &DynamicRateLimiter{
73+
rps: rps,
74+
timeSource: ts,
75+
ttl: opts.TTL,
76+
minBurst: opts.MinBurst,
77+
}
78+
now := res.timeSource.Now()
79+
res.lastUpdateTime.Store(&now)
80+
lim, burst := res.getLimitAndBurst()
81+
res.rl = clock.NewRateLimiterWithTimeSource(res.timeSource, lim, burst)
82+
return res
4583
}
4684

4785
// Allow immediately returns with true or false indicating if a rate limit
4886
// token is available or not
4987
func (d *DynamicRateLimiter) Allow() bool {
50-
rps := d.rps()
51-
d.rl.UpdateMaxDispatch(&rps)
88+
d.maybeRefreshRps()
5289
return d.rl.Allow()
5390
}
5491

5592
// Wait waits up till deadline for a rate limit token
5693
func (d *DynamicRateLimiter) Wait(ctx context.Context) error {
57-
rps := d.rps()
58-
d.rl.UpdateMaxDispatch(&rps)
94+
d.maybeRefreshRps()
5995
return d.rl.Wait(ctx)
6096
}
6197

6298
// Reserve reserves a rate limit token
6399
func (d *DynamicRateLimiter) Reserve() clock.Reservation {
64-
rps := d.rps()
65-
d.rl.UpdateMaxDispatch(&rps)
100+
d.maybeRefreshRps()
66101
return d.rl.Reserve()
67102
}
68103

69104
func (d *DynamicRateLimiter) Limit() rate.Limit {
70-
return rate.Limit(d.rps())
105+
d.maybeRefreshRps()
106+
return d.rl.Limit()
107+
}
108+
109+
func (d *DynamicRateLimiter) maybeRefreshRps() {
110+
now := d.timeSource.Now()
111+
lastUpdated := d.lastUpdateTime.Load()
112+
if now.After(lastUpdated.Add(d.ttl-1)) && d.lastUpdateTime.CompareAndSwap(lastUpdated, &now) {
113+
d.rl.SetLimitAndBurst(d.getLimitAndBurst())
114+
}
115+
}
116+
117+
func (d *DynamicRateLimiter) getLimitAndBurst() (rate.Limit, int) {
118+
rps := d.rps()
119+
burst := max(int(math.Ceil(rps)), d.minBurst)
120+
// If we have 0 rps we have to zero out the burst to immediately cut off new permits
121+
if rps == 0 {
122+
burst = 0
123+
}
124+
return rate.Limit(rps), burst
71125
}

common/quotas/global/collection/internal/counted_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import (
3939
func TestUsage(t *testing.T) {
4040
t.Run("tracks allow", func(t *testing.T) {
4141
ts := clock.NewMockedTimeSource()
42-
counted := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
42+
counted := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))
4343

4444
assert.True(t, counted.Allow(), "should match wrapped limiter")
4545
assert.Equal(t, UsageMetrics{1, 0, 0}, counted.Collect())
@@ -49,7 +49,7 @@ func TestUsage(t *testing.T) {
4949
})
5050
t.Run("tracks wait", func(t *testing.T) {
5151
ts := clock.NewMockedTimeSource()
52-
counted := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
52+
counted := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))
5353

5454
// consume the available token
5555
requireQuickly(t, 100*time.Millisecond, func() {
@@ -82,7 +82,7 @@ func TestUsage(t *testing.T) {
8282
})
8383
t.Run("tracks reserve", func(t *testing.T) {
8484
ts := clock.NewMockedTimeSource()
85-
lim := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
85+
lim := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))
8686

8787
r := lim.Reserve()
8888
assert.True(t, r.Allow(), "should have used the available burst")
@@ -99,7 +99,7 @@ func TestUsage(t *testing.T) {
9999
// largely for coverage
100100
t.Run("supports Limit", func(t *testing.T) {
101101
rps := rate.Limit(1)
102-
lim := NewCountedLimiter(clock.NewMockRatelimiter(clock.NewMockedTimeSource(), rps, 1))
102+
lim := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(clock.NewMockedTimeSource(), rps, 1))
103103
assert.Equal(t, rps, lim.Limit())
104104
})
105105
}
@@ -152,15 +152,15 @@ func TestRegression_ReserveCountsCorrectly(t *testing.T) {
152152
t.Run("counted", func(t *testing.T) {
153153
// "base" counting-limiter should count correctly
154154
ts := clock.NewMockedTimeSource()
155-
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
155+
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
156156
lim := NewCountedLimiter(wrapped)
157157

158158
run(t, lim, ts.Advance, lim.Collect)
159159
})
160160
t.Run("shadowed", func(t *testing.T) {
161161
// "shadowed" should call the primary correctly at the very least
162162
ts := clock.NewMockedTimeSource()
163-
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
163+
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
164164
counted := NewCountedLimiter(wrapped)
165165
lim := NewShadowedLimiter(counted, allowlimiter{})
166166

@@ -170,7 +170,7 @@ func TestRegression_ReserveCountsCorrectly(t *testing.T) {
170170
// "fallback" uses a different implementation, but it should count exactly the same.
171171
// TODO: ideally it would actually be the same code, but that's a bit awkward due to needing different interfaces.
172172
ts := clock.NewMockedTimeSource()
173-
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
173+
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
174174
l := NewFallbackLimiter(allowlimiter{})
175175
l.Update(1) // allows using primary, else it calls the fallback
176176
l.primary = wrapped // cheat, just swap it out

common/quotas/global/collection/internal/shadowed_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ func TestShadowed(t *testing.T) {
8484
if test.shadow {
8585
shadowBurst = 1
8686
}
87-
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), primaryBurst))
88-
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), shadowBurst))
87+
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), primaryBurst))
88+
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), shadowBurst))
8989
s := NewShadowedLimiter(primary, shadow)
9090

9191
assert.Equalf(t, test.allowed, s.Allow(), "should match primary behavior: %v", test.allowed)
@@ -101,8 +101,8 @@ func TestShadowed(t *testing.T) {
101101
if test.shadow {
102102
shadowBurst = 1
103103
}
104-
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), primaryBurst))
105-
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), shadowBurst))
104+
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), primaryBurst))
105+
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), shadowBurst))
106106
s := NewShadowedLimiter(primary, shadow)
107107

108108
res := s.Reserve()
@@ -162,8 +162,8 @@ func TestShadowed(t *testing.T) {
162162
t.Run("only returns primary behavior", func(t *testing.T) {
163163
t.Run("allowed", func(t *testing.T) {
164164
ts := clock.NewMockedTimeSource()
165-
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1)) // allows an event
166-
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, 0, 0)) // always rejects
165+
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1)) // allows an event
166+
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 0, 0)) // always rejects
167167
s := NewShadowedLimiter(primary, shadow)
168168
res := s.Reserve()
169169
res.Used(res.Allow())
@@ -174,8 +174,8 @@ func TestShadowed(t *testing.T) {
174174
})
175175
t.Run("rejected", func(t *testing.T) {
176176
ts := clock.NewMockedTimeSource()
177-
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, 0, 0)) // always rejects
178-
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1)) // allow an event
177+
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 0, 0)) // always rejects
178+
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1)) // allow an event
179179
s := NewShadowedLimiter(primary, shadow)
180180
res := s.Reserve()
181181
res.Used(res.Allow())
@@ -189,7 +189,7 @@ func TestShadowed(t *testing.T) {
189189
})
190190
})
191191
t.Run("limit", func(t *testing.T) {
192-
l := NewShadowedLimiter(&allowlimiter{}, quotas.NewSimpleRateLimiter(t, 0))
192+
l := NewShadowedLimiter(&allowlimiter{}, clock.NewRatelimiter(rate.Limit(0), 1))
193193
assert.Equal(t, rate.Inf, l.Limit(), "should return the primary limit, not shadowed")
194194
})
195195
}

0 commit comments

Comments
 (0)