Skip to content

Commit f791f5b

Browse files
authored
Re-enqueue 429 requests if there are multiple query-schedulers (#5496)
* Re-enqueue 429 requests if there are multiple query-schedulers Signed-off-by: Xiaochao Dong (@damnever) <[email protected]> * Add feature flag Signed-off-by: Xiaochao Dong (@damnever) <[email protected]> --------- Signed-off-by: Xiaochao Dong (@damnever) <[email protected]>
1 parent bf8c040 commit f791f5b

File tree

4 files changed

+26
-10
lines changed

4 files changed

+26
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
2727
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
2828
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
29+
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
2930
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
3031
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
3132
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3532,6 +3532,11 @@ grpc_client_config:
35323532
# CLI flag: -frontend.grpc-client-config.tls-insecure-skip-verify
35333533
[tls_insecure_skip_verify: <boolean> | default = false]
35343534
3535+
# When multiple query-schedulers are available, re-enqueue queries that were
3536+
# rejected due to too many outstanding requests.
3537+
# CLI flag: -frontend.retry-on-too-many-outstanding-requests
3538+
[retry_on_too_many_outstanding_requests: <boolean> | default = false]
3539+
35353540
# Name of network interface to read address from. This address is sent to
35363541
# query-scheduler and querier, which uses it to send the query response back to
35373542
# query-frontend.

pkg/frontend/v2/frontend.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ import (
3030

3131
// Config for a Frontend.
3232
type Config struct {
33-
SchedulerAddress string `yaml:"scheduler_address"`
34-
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
35-
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
36-
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
33+
SchedulerAddress string `yaml:"scheduler_address"`
34+
DNSLookupPeriod time.Duration `yaml:"scheduler_dns_lookup_period"`
35+
WorkerConcurrency int `yaml:"scheduler_worker_concurrency"`
36+
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
37+
RetryOnTooManyOutstandingRequests bool `yaml:"retry_on_too_many_outstanding_requests"`
3738

3839
// Used to find local IP address, that is sent to scheduler and querier-worker.
3940
InfNames []string `yaml:"instance_interface_names"`
@@ -47,6 +48,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4748
f.StringVar(&cfg.SchedulerAddress, "frontend.scheduler-address", "", "DNS hostname used for finding query-schedulers.")
4849
f.DurationVar(&cfg.DNSLookupPeriod, "frontend.scheduler-dns-lookup-period", 10*time.Second, "How often to resolve the scheduler-address, in order to look for new query-scheduler instances.")
4950
f.IntVar(&cfg.WorkerConcurrency, "frontend.scheduler-worker-concurrency", 5, "Number of concurrent workers forwarding queries to single query-scheduler.")
51+
f.BoolVar(&cfg.RetryOnTooManyOutstandingRequests, "frontend.retry-on-too-many-outstanding-requests", false, "When multiple query-schedulers are available, re-enqueue queries that were rejected due to too many outstanding requests.")
5052

5153
cfg.InfNames = []string{"eth0", "en0"}
5254
f.Var((*flagext.StringSlice)(&cfg.InfNames), "frontend.instance-interface-names", "Name of network interface to read address from. This address is sent to query-scheduler and querier, which uses it to send the query response back to query-frontend.")
@@ -86,6 +88,8 @@ type frontendRequest struct {
8688

8789
enqueue chan enqueueResult
8890
response chan *frontendv2pb.QueryResultRequest
91+
92+
retryOnTooManyOutstandingRequests bool
8993
}
9094

9195
type enqueueStatus int
@@ -192,6 +196,8 @@ func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest)
192196
// even if this goroutine goes away due to client context cancellation.
193197
enqueue: make(chan enqueueResult, 1),
194198
response: make(chan *frontendv2pb.QueryResultRequest, 1),
199+
200+
retryOnTooManyOutstandingRequests: f.cfg.RetryOnTooManyOutstandingRequests && f.schedulerWorkers.getWorkersCount() > 1,
195201
}
196202

197203
f.requests.put(freq)

pkg/frontend/v2/frontend_scheduler_worker.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -296,12 +296,16 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
296296
}
297297

298298
case schedulerpb.TOO_MANY_REQUESTS_PER_TENANT:
299-
req.enqueue <- enqueueResult{status: waitForResponse}
300-
req.response <- &frontendv2pb.QueryResultRequest{
301-
HttpResponse: &httpgrpc.HTTPResponse{
302-
Code: http.StatusTooManyRequests,
303-
Body: []byte("too many outstanding requests"),
304-
},
299+
if req.retryOnTooManyOutstandingRequests {
300+
req.enqueue <- enqueueResult{status: failed}
301+
} else {
302+
req.enqueue <- enqueueResult{status: waitForResponse}
303+
req.response <- &frontendv2pb.QueryResultRequest{
304+
HttpResponse: &httpgrpc.HTTPResponse{
305+
Code: http.StatusTooManyRequests,
306+
Body: []byte("too many outstanding requests"),
307+
},
308+
}
305309
}
306310
}
307311

0 commit comments

Comments
 (0)