Skip to content

Optimize DynamicRateLimiter to not constantly re-evaluate RPS #6842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion common/clock/ratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ type (
SetBurst(newBurst int)
// SetLimit sets the Limit value
SetLimit(newLimit rate.Limit)

SetLimitAndBurst(newLimit rate.Limit, newBurst int)
// Tokens returns the number of immediately-allowable events when called.
// Values >= 1 will lead to Allow returning true.
//
Expand Down Expand Up @@ -205,7 +207,7 @@ func NewRatelimiter(lim rate.Limit, burst int) Ratelimiter {
}
}

func NewMockRatelimiter(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
func NewRateLimiterWithTimeSource(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
return &ratelimiter{
timesource: ts,
limiter: rate.NewLimiter(lim, burst),
Expand Down Expand Up @@ -338,6 +340,14 @@ func (r *ratelimiter) SetLimit(newLimit rate.Limit) {
r.limiter.SetLimitAt(r.latestNow, newLimit)
}

func (r *ratelimiter) SetLimitAndBurst(newLimit rate.Limit, newBurst int) {
r.mut.Lock()
defer r.mut.Unlock()
// See SetLimit and SetBurst for more information
r.limiter.SetLimitAt(r.latestNow, newLimit)
r.limiter.SetBurstAt(r.latestNow, newBurst)
}

func (r *ratelimiter) Tokens() float64 {
now, unlock := r.lockNow()
defer unlock()
Expand Down
6 changes: 3 additions & 3 deletions common/clock/ratelimiter_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func BenchmarkLimiter(b *testing.B) {
})
b.Run("mocked timesource", func(b *testing.B) {
ts := NewMockedTimeSource()
rl := NewMockRatelimiter(ts, rate.Every(normalLimit), burst)
rl := NewRateLimiterWithTimeSource(ts, rate.Every(normalLimit), burst)
runner(b, func(i int) bool {
// adjusted by eye, to try to very roughly match the above values for the final runs.
// anything not extremely higher or lower is probably fine.
Expand Down Expand Up @@ -260,7 +260,7 @@ func BenchmarkLimiter(b *testing.B) {
})
b.Run("mocked timesource", func(b *testing.B) {
ts := NewMockedTimeSource()
rl := NewMockRatelimiter(ts, rate.Every(normalLimit), burst)
rl := NewRateLimiterWithTimeSource(ts, rate.Every(normalLimit), burst)
runWrapped(b, rl, runner, func() {
// any value should work, but smaller than normalLimit revealed
// some issues in the past, where time-thrashing would lead to
Expand Down Expand Up @@ -322,7 +322,7 @@ func BenchmarkLimiter(b *testing.B) {
})
b.Run("mocked timesource", func(b *testing.B) {
ts := NewMockedTimeSource()
rl := NewMockRatelimiter(ts, limit, burst)
rl := NewRateLimiterWithTimeSource(ts, limit, burst)
ctx := &timesourceContext{
ts: ts,
deadline: time.Now().Add(timeout),
Expand Down
4 changes: 2 additions & 2 deletions common/clock/ratelimiter_comparison_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ func assertRatelimitersBehaveSimilarly(t *testing.T, burst int, minWaitDuration

actual := rate.NewLimiter(limit, burst)
wrapped := NewRatelimiter(limit, burst)
mocked := NewMockRatelimiter(ts, limit, burst)
compressed := NewMockRatelimiter(compressedTS, limit, burst)
mocked := NewRateLimiterWithTimeSource(ts, limit, burst)
compressed := NewRateLimiterWithTimeSource(compressedTS, limit, burst)

// record "to be executed" closures on the time-compressed ratelimiter too,
// so it can be checked at a sped up rate.
Expand Down
4 changes: 2 additions & 2 deletions common/clock/ratelimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func assertRatelimiterBasicsWork(t *testing.T, makeTimesource func() MockedTimeS
if ts == nil {
return NewRatelimiter(limit, burst), time.Sleep
}
return NewMockRatelimiter(ts, limit, burst), ts.Advance
return NewRateLimiterWithTimeSource(ts, limit, burst), ts.Advance
}

now := func(ts MockedTimeSource) time.Time {
Expand Down Expand Up @@ -466,7 +466,7 @@ func TestRatelimiterCoverage(t *testing.T) {
})
t.Run("mock limiter constructor", func(t *testing.T) {
// covered by fuzz testing, but this gets it to 100% without fuzz.
_ = NewMockRatelimiter(NewMockedTimeSource(), 1, 1)
_ = NewRateLimiterWithTimeSource(NewMockedTimeSource(), 1, 1)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,9 @@ type fallbackDynamicRateLimiterFactory struct {
// GetLimiter returns a new Limiter for the given domain
func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) quotas.Limiter {
return quotas.NewDynamicRateLimiter(func() float64 {
return limitWithFallback(
float64(f.primary(domain)),
float64(f.secondary()))
if primaryLimit := f.primary(domain); primaryLimit > 0 {
return float64(primaryLimit)
}
return float64(f.secondary())
Comment on lines +52 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda hate this behavior, but yea. reasonable rewrite 👍

})
}

func limitWithFallback(primary, secondary float64) float64 {
if primary > 0 {
return primary
}
return secondary
}
80 changes: 67 additions & 13 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,102 @@ package quotas

import (
"context"
"math"
"sync/atomic"
"time"

"golang.org/x/time/rate"

"github.com/uber/cadence/common/clock"
)

const _ttl = time.Second * 5
const _minBurst = 1

// DynamicRateLimiter implements a dynamic config wrapper around the rate limiter,
// checks for updates to the dynamic config and updates the rate limiter accordingly
type DynamicRateLimiter struct {
rps RPSFunc
rl *RateLimiter
rps RPSFunc
rl clock.Ratelimiter
timeSource clock.TimeSource
ttl time.Duration
lastUpdateTime atomic.Pointer[time.Time]
minBurst int
}

type DynamicRateLimiterOpts struct {
TTL time.Duration
MinBurst int
TimeSource clock.TimeSource
}

var _defaultOpts = DynamicRateLimiterOpts{
TTL: _ttl,
MinBurst: _minBurst,
TimeSource: clock.NewRealTimeSource(),
}

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
initialRps := rps()
rl := NewRateLimiter(&initialRps, _defaultRPSTTL, _burstSize)
return &DynamicRateLimiter{rps, rl}
func NewDynamicRateLimiter(rps RPSFunc) Limiter {
return NewDynamicRateLimiterWithOpts(rps, _defaultOpts)
}

func NewDynamicRateLimiterWithOpts(rps RPSFunc, opts DynamicRateLimiterOpts) Limiter {
ts := opts.TimeSource
if ts == nil {
ts = _defaultOpts.TimeSource
}
res := &DynamicRateLimiter{
rps: rps,
timeSource: ts,
ttl: opts.TTL,
minBurst: opts.MinBurst,
}
now := res.timeSource.Now()
res.lastUpdateTime.Store(&now)
lim, burst := res.getLimitAndBurst()
res.rl = clock.NewRateLimiterWithTimeSource(res.timeSource, lim, burst)
return res
}

// Allow immediately returns with true or false indicating if a rate limit
// token is available or not
func (d *DynamicRateLimiter) Allow() bool {
rps := d.rps()
d.rl.UpdateMaxDispatch(&rps)
d.maybeRefreshRps()
return d.rl.Allow()
}

// Wait waits up till deadline for a rate limit token
func (d *DynamicRateLimiter) Wait(ctx context.Context) error {
rps := d.rps()
d.rl.UpdateMaxDispatch(&rps)
d.maybeRefreshRps()
return d.rl.Wait(ctx)
}

// Reserve reserves a rate limit token
func (d *DynamicRateLimiter) Reserve() clock.Reservation {
rps := d.rps()
d.rl.UpdateMaxDispatch(&rps)
d.maybeRefreshRps()
return d.rl.Reserve()
}

func (d *DynamicRateLimiter) Limit() rate.Limit {
return rate.Limit(d.rps())
d.maybeRefreshRps()
return d.rl.Limit()
}

func (d *DynamicRateLimiter) maybeRefreshRps() {
now := d.timeSource.Now()
lastUpdated := d.lastUpdateTime.Load()
if now.After(lastUpdated.Add(d.ttl-1)) && d.lastUpdateTime.CompareAndSwap(lastUpdated, &now) {
d.rl.SetLimitAndBurst(d.getLimitAndBurst())
}
}

func (d *DynamicRateLimiter) getLimitAndBurst() (rate.Limit, int) {
rps := d.rps()
burst := max(int(math.Ceil(rps)), d.minBurst)
// If we have 0 rps we have to zero out the burst to immediately cut off new permits
if rps == 0 {
burst = 0
}
return rate.Limit(rps), burst
}
14 changes: 7 additions & 7 deletions common/quotas/global/collection/internal/counted_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func TestUsage(t *testing.T) {
t.Run("tracks allow", func(t *testing.T) {
ts := clock.NewMockedTimeSource()
counted := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
counted := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))

assert.True(t, counted.Allow(), "should match wrapped limiter")
assert.Equal(t, UsageMetrics{1, 0, 0}, counted.Collect())
Expand All @@ -49,7 +49,7 @@ func TestUsage(t *testing.T) {
})
t.Run("tracks wait", func(t *testing.T) {
ts := clock.NewMockedTimeSource()
counted := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
counted := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))

// consume the available token
requireQuickly(t, 100*time.Millisecond, func() {
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestUsage(t *testing.T) {
})
t.Run("tracks reserve", func(t *testing.T) {
ts := clock.NewMockedTimeSource()
lim := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1))
lim := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1))

r := lim.Reserve()
assert.True(t, r.Allow(), "should have used the available burst")
Expand All @@ -99,7 +99,7 @@ func TestUsage(t *testing.T) {
// largely for coverage
t.Run("supports Limit", func(t *testing.T) {
rps := rate.Limit(1)
lim := NewCountedLimiter(clock.NewMockRatelimiter(clock.NewMockedTimeSource(), rps, 1))
lim := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(clock.NewMockedTimeSource(), rps, 1))
assert.Equal(t, rps, lim.Limit())
})
}
Expand Down Expand Up @@ -152,15 +152,15 @@ func TestRegression_ReserveCountsCorrectly(t *testing.T) {
t.Run("counted", func(t *testing.T) {
// "base" counting-limiter should count correctly
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
lim := NewCountedLimiter(wrapped)

run(t, lim, ts.Advance, lim.Collect)
})
t.Run("shadowed", func(t *testing.T) {
// "shadowed" should call the primary correctly at the very least
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
counted := NewCountedLimiter(wrapped)
lim := NewShadowedLimiter(counted, allowlimiter{})

Expand All @@ -170,7 +170,7 @@ func TestRegression_ReserveCountsCorrectly(t *testing.T) {
// "fallback" uses a different implementation, but it should count exactly the same.
// TODO: ideally it would actually be the same code, but that's a bit awkward due to needing different interfaces.
ts := clock.NewMockedTimeSource()
wrapped := clock.NewMockRatelimiter(ts, 1, 100)
wrapped := clock.NewRateLimiterWithTimeSource(ts, 1, 100)
l := NewFallbackLimiter(allowlimiter{})
l.Update(1) // allows using primary, else it calls the fallback
l.primary = wrapped // cheat, just swap it out
Expand Down
18 changes: 9 additions & 9 deletions common/quotas/global/collection/internal/shadowed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestShadowed(t *testing.T) {
if test.shadow {
shadowBurst = 1
}
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), primaryBurst))
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), shadowBurst))
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), primaryBurst))
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), shadowBurst))
s := NewShadowedLimiter(primary, shadow)

assert.Equalf(t, test.allowed, s.Allow(), "should match primary behavior: %v", test.allowed)
Expand All @@ -101,8 +101,8 @@ func TestShadowed(t *testing.T) {
if test.shadow {
shadowBurst = 1
}
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), primaryBurst))
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, rate.Every(time.Second), shadowBurst))
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), primaryBurst))
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, rate.Every(time.Second), shadowBurst))
s := NewShadowedLimiter(primary, shadow)

res := s.Reserve()
Expand Down Expand Up @@ -162,8 +162,8 @@ func TestShadowed(t *testing.T) {
t.Run("only returns primary behavior", func(t *testing.T) {
t.Run("allowed", func(t *testing.T) {
ts := clock.NewMockedTimeSource()
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1)) // allows an event
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, 0, 0)) // always rejects
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1)) // allows an event
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 0, 0)) // always rejects
s := NewShadowedLimiter(primary, shadow)
res := s.Reserve()
res.Used(res.Allow())
Expand All @@ -174,8 +174,8 @@ func TestShadowed(t *testing.T) {
})
t.Run("rejected", func(t *testing.T) {
ts := clock.NewMockedTimeSource()
primary := NewCountedLimiter(clock.NewMockRatelimiter(ts, 0, 0)) // always rejects
shadow := NewCountedLimiter(clock.NewMockRatelimiter(ts, 1, 1)) // allow an event
primary := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 0, 0)) // always rejects
shadow := NewCountedLimiter(clock.NewRateLimiterWithTimeSource(ts, 1, 1)) // allow an event
s := NewShadowedLimiter(primary, shadow)
res := s.Reserve()
res.Used(res.Allow())
Expand All @@ -189,7 +189,7 @@ func TestShadowed(t *testing.T) {
})
})
t.Run("limit", func(t *testing.T) {
l := NewShadowedLimiter(&allowlimiter{}, quotas.NewSimpleRateLimiter(t, 0))
l := NewShadowedLimiter(&allowlimiter{}, clock.NewRatelimiter(rate.Limit(0), 1))
assert.Equal(t, rate.Inf, l.Limit(), "should return the primary limit, not shadowed")
})
}
Loading