Skip to content

Commit 04add2d

Browse files
authored
Ratelimiter polish / fix: improve zero -> nonzero filling behavior for new ratelimiters (#6280)
# Motivation: The global ratelimiter system was exhibiting some weird request-rejection at very low RPS usage. On our dashboards it looks like this: <img width="927" alt="Screenshot 2024-09-11 at 18 55 09" src="https://github.com/user-attachments/assets/8236d945-0f8f-45f9-9a9c-5c908f386ae4"> Previously I thought this was just due to undesirably-low weights, and #6238 addressed that (and is still a useful addition). After that was rolled out, behavior improved, but small numbers still occurred... which should not have happened because the "boosting" logic should have meant that the global limits were *at least* identical, and likely larger. Which drove me to re-read the details and think harder. And then I found this PR's issue. # Issue and fix What was happening is that the initial `rate.NewLimiter(0,0)` detail was "leaking" into limits after the first update, so a request that occurred immediately after would likely be rejected, regardless of the configured limit. This happens because `(0, 0)` creates a zero-burst limit on the "primary" limiter, and the shadowed `.Allow()` calls were advancing the limiter's internal "now" value... ... and then when the limit and burst were increased, the limiter would have to fill from zero. This put it in a worse position than local / fallback limiters, which start from `(local, local)` with a zero "now" value, and then the next `.Allow()` is basically guaranteed to fill the token bucket due to many years "elapsing". So the fix has two parts: 1: Avoid advancing the zero-valued limiter's internal time until a reasonable limit/burst has been set. This is done by simply not calling it while in startup mode. 2: Avoid advancing limiters' time when setting limit and burst. This means that after an idle period -> `Update()` -> `Allow()`, tokens will fill as if the new values were set all along, and the setters can be called in any order. The underlying `rate.Limiter` does *not* do the second, it advances time when setting these... but that seems undesirable. It means old values are preferred (which is reasonable, they were set when that time passed), *and* it means that the order you call to set both burst and limit has a significant impact on the outcome, even with the same values and the same timing: time passes only on the first call, the second has basically zero elapsed and has no immediate effect at all (unless lowering burst). I can only see that latter part as surprising, and definitely worth avoiding. # Alternative approach 2 seems worth keeping. But 1 has a relatively clear alternative: Don't create the "primary" limiter until the first `Update()`. Because it's currently atomic-oriented, this can't be done safely without adding atomics or locks everywhere... so I didn't do that. If I were to do this, I would just switch to a mutex, the `rate.Limiter` already uses them so it should be near zero cost. I'm happy to build that if someone prefers, I just didn't bother this time.
1 parent e5bd91e commit 04add2d

File tree

4 files changed

+129
-21
lines changed

4 files changed

+129
-21
lines changed

common/clock/ratelimiter.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -198,13 +198,17 @@ func NewRatelimiter(lim rate.Limit, burst int) Ratelimiter {
198198
return &ratelimiter{
199199
timesource: NewRealTimeSource(),
200200
limiter: rate.NewLimiter(lim, burst),
201+
// intentionally zero, matches rate.Limiter and helps fill the bucket if it is changed before any use.
202+
latestNow: time.Time{},
201203
}
202204
}
203205

204206
func NewMockRatelimiter(ts TimeSource, lim rate.Limit, burst int) Ratelimiter {
205207
return &ratelimiter{
206208
timesource: ts,
207209
limiter: rate.NewLimiter(lim, burst),
210+
// intentionally zero, matches rate.Limiter and helps fill the bucket if it is changed before any use.
211+
latestNow: time.Time{},
208212
}
209213
}
210214

@@ -303,15 +307,33 @@ func (r *ratelimiter) Limit() rate.Limit {
303307
}
304308

305309
func (r *ratelimiter) SetBurst(newBurst int) {
306-
now, unlock := r.lockNow()
307-
defer unlock()
308-
r.limiter.SetBurstAt(now, newBurst)
310+
r.mut.Lock()
311+
defer r.mut.Unlock()
312+
// setting burst/limit does not advance time, unlike the underlying limiter.
313+
//
314+
// this allows calling them in any order, and the next request
315+
// will fill the token bucket to match elapsed time.
316+
//
317+
// this prefers new burst/limit values over past values,
318+
// as they are assumed to be "better", and in particular ensures the first
319+
// time-advancing call fills with the full values (starting from 0 time, like
320+
// the underlying limiter does).
321+
r.limiter.SetBurstAt(r.latestNow, newBurst)
309322
}
310323

311324
func (r *ratelimiter) SetLimit(newLimit rate.Limit) {
312-
now, unlock := r.lockNow()
313-
defer unlock()
314-
r.limiter.SetLimitAt(now, newLimit)
325+
r.mut.Lock()
326+
defer r.mut.Unlock()
327+
// setting burst/limit does not advance time, unlike the underlying limiter.
328+
//
329+
// this allows calling them in any order, and the next request
330+
// will fill the token bucket to match elapsed time.
331+
//
332+
// this prefers new burst/limit values over past values,
333+
// as they are assumed to be "better", and in particular ensures the first
334+
// time-advancing call fills with the full values (starting from 0 time, like
335+
// the underlying limiter does).
336+
r.limiter.SetLimitAt(r.latestNow, newLimit)
315337
}
316338

317339
func (r *ratelimiter) Tokens() float64 {

common/quotas/global/collection/collection_test.go

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,14 +207,17 @@ func TestCollectionSubmitsDataAndUpdates(t *testing.T) {
207207
defer cancel()
208208
require.NoError(t, c.OnStart(ctx))
209209

210-
// generate some data
210+
// generate some data.
211+
// these start with the collections' limits, i.e. 1 token, so only one request is allowed.
211212
someLimiter := c.For("something")
212213
res := someLimiter.Reserve()
213214
assert.True(t, res.Allow(), "first request should have been allowed")
214215
res.Used(true)
215216
assert.False(t, someLimiter.Allow(), "second request on the same domain should have been rejected")
216217
assert.NoError(t, c.For("other").Wait(ctx), "request on a different domain should be allowed")
217218

219+
// all limiters are now drained, and will take ~1s to recover normally.
220+
218221
// prep for the calls
219222
called := make(chan struct{}, 1)
220223
aggs.EXPECT().Update(gomock.Any(), gomock.Any(), map[shared.GlobalKey]rpc.Calls{
@@ -231,31 +234,34 @@ func TestCollectionSubmitsDataAndUpdates(t *testing.T) {
231234
return rpc.UpdateResult{
232235
Weights: map[shared.GlobalKey]rpc.UpdateEntry{
233236
"test:something": {Weight: 1, UsedRPS: 2}, // should recover a token in 100ms
234-
// "test:other": // not returned, should not change weight
237+
// "test:other": // not returned, should not change weight/rps and stay at 1s
235238
},
236239
Err: nil,
237240
}
238241
})
239242

240243
mts.BlockUntil(1) // need to have created timer in background updater
241-
mts.Advance(time.Second) // trigger the update
244+
mts.Advance(time.Second) // trigger the update. also fills all ratelimiters.
242245

243246
// wait until the calls occur
244247
select {
245248
case <-called:
246-
case <-time.After(time.Second):
247-
t.Fatal("did not make an rpc call after 1s")
249+
case <-time.After(time.Second / 2):
250+
// keep total wait shorter than 1s to avoid refilling the slow token, just in case
251+
t.Fatal("did not make an rpc call after 1/2s")
248252
}
249-
// panic if more calls occur
253+
// crash if more calls occur
250254
close(called)
251255

252-
// wait for the updates to be sent to the ratelimiters, and for "something"'s 100ms token to recover
256+
// wait for the updates to be sent to the ratelimiters, and for at least one "something"'s 100ms token to recover
253257
time.Sleep(150 * time.Millisecond)
254258

255259
// and make sure updates occurred
256-
assert.False(t, c.For("other").Allow(), "should be no recovered tokens yet on the slow limit")
257-
assert.True(t, c.For("something").Allow(), "should have allowed one request on the fast limit") // should use weight, not target rps
258-
assert.False(t, c.For("something").Allow(), "should not have allowed as second request on the fast limit") // should use weight, not target rps
260+
assert.False(t, c.For("other").Allow(), "should be no recovered tokens yet on the slow limit") // less than 1 second == no tokens
261+
assert.True(t, c.For("something").Allow(), "should have allowed one request on the fast limit") // over 100ms (got updated rate) == at least one token
262+
// specifically: because this is the first update to this limiter, it should now allow 10 requests, because the token bucket should be full.
263+
// just check once though, no need to be precise here.
264+
assert.True(t, c.For("something").Allow(), "after the initial update, the fast limiter should have extra tokens available")
259265

260266
assert.NoError(t, c.OnStop(ctx))
261267

common/quotas/global/collection/internal/fallback.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,14 @@ const (
119119
// between the two regardless of which is being used.
120120
func NewFallbackLimiter(fallback quotas.Limiter) *FallbackLimiter {
121121
l := &FallbackLimiter{
122-
// 0 allows no requests, will be unused until we receive an update.
122+
// start from 0 as a default, the limiter is unused until it is updated.
123+
//
124+
// caution: it's important to not call any time-advancing methods on this until
125+
// after the first update, so the token bucket will fill properly.
126+
//
127+
// this (partially) mimics how new ratelimiters start with a full token bucket,
128+
// but there does not seem to be any way to perfectly mimic it without using locks,
129+
// and hopefully precision is not needed.
123130
primary: clock.NewRatelimiter(0, 0),
124131
fallback: fallback,
125132
}
@@ -221,7 +228,16 @@ func (b *FallbackLimiter) FallbackLimit() rate.Limit {
221228
}
222229

223230
func (b *FallbackLimiter) both() quotas.Limiter {
224-
if b.useFallback() {
231+
starting, failing := b.mode()
232+
if starting {
233+
// don't touch the primary until an update occurs,
234+
// to allow the token bucket to fill properly.
235+
return b.fallback
236+
}
237+
if failing {
238+
// keep shadowing calls, so the token buckets are similar.
239+
// this prevents allowing a full burst when recovering, which seems
240+
// reasonable as things were apparently unhealthy.
225241
return NewShadowedLimiter(b.fallback, b.primary)
226242
}
227243
return NewShadowedLimiter(b.primary, b.fallback)

common/quotas/global/collection/internal/fallback_test.go

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,71 @@ import (
3737
"github.com/uber/cadence/common/quotas"
3838
)
3939

40+
func TestFallbackRegression(t *testing.T) {
41+
t.Run("primary should start with fallback limit", func(t *testing.T) {
42+
// checks for an issue in earlier versions, where a newly-enabled primary limit would start with an empty token bucket,
43+
// unfairly rejecting requests that other limiters would allow (as they are created with a full bucket at their target rate).
44+
//
45+
// this can be spotted by doing this sequence:
46+
// - create a new limiter, fallback of N per second
47+
// - this starts the "primary" limiter with limit=0, burst=0.
48+
// - Allow() a request
49+
// - the limiters here may disagree. this is fine, primary is not used yet.
50+
// - however: this advanced the primary's internal "now" to now. (this now happens only after update)
51+
// - update the limit to match the fallback
52+
// - primary now has limit==N, burst==N, but tokens=0 and now=now.
53+
// - Allow() a request
54+
// - fallback allows due to having N-1 tokens
55+
// - primary rejects due to 0 tokens
56+
// ^ this is the problem.
57+
// ^ this could also happen if calling `ratelimiter.Limit()` sets "now" in the future, though this seems unlikely.
58+
//
59+
// this uses real time because sleeping is not necessary, and it ensures
60+
// that any time-advancing calls that occur too early will lead to ~zero tokens.
61+
62+
limit := rate.Limit(10) // enough to accept all requests
63+
rl := NewFallbackLimiter(clock.NewRatelimiter(limit, int(limit)))
64+
65+
// sanity check: this may be fine to change, but it will mean the test needs to be rewritten because the token bucket may not be empty.
66+
orig := rl.primary
67+
assert.Zero(t, orig.Burst(), "sanity check: default primary ratelimiter's burst is not zero, this test likely needs a rewrite")
68+
69+
// simulate: call while still starting up, it should not touch the primary limiter.
70+
allowed := rl.Allow()
71+
before, starting, failing := rl.Collect()
72+
assert.True(t, allowed, "first request should have been allowed, on the fallback limiter")
73+
assert.True(t, starting, "should be true == in starting mode")
74+
assert.False(t, failing, "should be false == not failing")
75+
assert.Equal(t, UsageMetrics{
76+
Allowed: 1,
77+
Rejected: 0,
78+
Idle: 0,
79+
}, before)
80+
81+
// update: this should set limits, and either fill the bucket or cause it to be filled on the next request
82+
rl.Update(limit)
83+
84+
// call again: should be allowed, as this is the first time-touching request since it was created,
85+
// and the token bucket should have filled to match the first-update value.
86+
allowed = rl.Allow()
87+
after, starting, failing := rl.Collect()
88+
assert.True(t, allowed, "second request should have been allowed, on the primary limiter")
89+
assert.False(t, starting, "should be false == not in starting mode (using global)")
90+
assert.False(t, failing, "should be false == not failing")
91+
assert.Equal(t, UsageMetrics{
92+
Allowed: 1,
93+
Rejected: 0,
94+
Idle: 0,
95+
}, after)
96+
assert.InDeltaf(t,
97+
// Tokens() advances time, so this will not be precise.
98+
rl.primary.Tokens(), int(limit)-1, 0.1,
99+
"should have ~%v tokens: %v from the initial fill, minus 1 for the allow call",
100+
int(limit)-1, int(limit),
101+
)
102+
})
103+
}
104+
40105
func TestLimiter(t *testing.T) {
41106
t.Run("uses fallback initially", func(t *testing.T) {
42107
m := quotas.NewMockLimiter(gomock.NewController(t))
@@ -93,9 +158,8 @@ func TestLimiter(t *testing.T) {
93158
require.False(t, failing, "should not be using fallback")
94159
require.False(t, startup, "should not be starting up, has had an update")
95160

96-
// bucket starts out empty / with whatever contents it had before (zero).
97-
// this is possibly surprising, so it's asserted.
98-
require.False(t, lim.Allow(), "rate.Limiter should reject requests until filled")
161+
// the bucket will fill from time 0 on the first update, ensuring the first request is allowed
162+
require.True(t, lim.Allow(), "rate.Limiter should start with a full bucket")
99163

100164
// fail enough times to trigger a fallback
101165
for i := 0; i < maxFailedUpdates; i++ {

0 commit comments

Comments
 (0)