Skip to content

Fix queriers shuffle-sharding blast radius containment #3901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
85f8a15
Added forget timeout support to queues
pracucci Mar 2, 2021
97a06ef
Added notify shutdown rpc to query-frontend and query-scheduler proto
pracucci Mar 3, 2021
69766c5
Querier worker notifies shutdown to query-frontend/scheduler
pracucci Mar 3, 2021
a7a9b31
Log when query-frontend/scheduler receives a shutdown notification
pracucci Mar 3, 2021
c3833e8
Added config option to configure the forget timeout
pracucci Mar 3, 2021
ad200c7
Fixed re-connect while in forget waiting period
pracucci Mar 3, 2021
ad49aaf
Fixed unit tests
pracucci Mar 3, 2021
ed4afa9
Fixed GetNextRequestForQuerier() when a resharding happen after foget…
pracucci Mar 3, 2021
c4d9312
Update pkg/frontend/v1/frontend.go
pracucci Mar 4, 2021
5888f62
Update pkg/scheduler/queue/user_queues.go
pracucci Mar 4, 2021
874b482
Update pkg/scheduler/queue/user_queues.go
pracucci Mar 4, 2021
8cdd60a
Update pkg/scheduler/scheduler.go
pracucci Mar 4, 2021
ee4130e
Update pkg/querier/worker/frontend_processor.go
pracucci Mar 4, 2021
182c979
Updated comment based on review feedback
pracucci Mar 4, 2021
dd358ed
Updated comment based on review feedback
pracucci Mar 4, 2021
2505912
Updated generated doc
pracucci Mar 4, 2021
b39bd26
Added name to services
pracucci Mar 4, 2021
326435a
Moved forgetCheckPeriod where it's used
pracucci Mar 4, 2021
47d1ce2
Added queues forget timeout unit tests
pracucci Mar 4, 2021
5b8b6a7
Added RequestQueue unit test
pracucci Mar 4, 2021
cb30035
Renamed querier forget timeout into delay
pracucci Mar 5, 2021
8cf703e
Added timeout to the notify shutdown notification
pracucci Mar 5, 2021
ab55057
Updated doc
pracucci Mar 5, 2021
42979eb
Added CHANGELOG entry
pracucci Mar 5, 2021
84160d8
Update pkg/scheduler/scheduler.go
pracucci Mar 8, 2021
1ca9855
Update pkg/frontend/v1/frontend.go
pracucci Mar 8, 2021
d56be11
Updated doc
pracucci Mar 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [ENHANCEMENT] Ruler: added the following metrics when ruler sharding is enabled: #3916
* `cortex_ruler_clients`
* `cortex_ruler_client_request_duration_seconds`
* [ENHANCEMENT] Query-frontend/scheduler: added querier forget delay (`-query-frontend.querier-forget-delay` and `-query-scheduler.querier-forget-delay`) to mitigate the blast radius in the event queriers crash because of a repeatedly sent "query of death" when shuffle-sharding is enabled. #3901

## 1.8.0 in progress

Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ query_scheduler:
# CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
[max_outstanding_requests_per_tenant: <int> | default = 100]

# If a querier disconnects without sending notification about graceful
# shutdown, the query-scheduler will keep the querier in the tenant's shard
# until the forget delay has passed. This feature is useful to reduce the
# blast radius when shuffle-sharding is enabled.
# CLI flag: -query-scheduler.querier-forget-delay
[querier_forget_delay: <duration> | default = 0s]

# This configures the gRPC client used to report errors back to the
# query-frontend.
grpc_client_config:
Expand Down Expand Up @@ -1322,6 +1329,13 @@ The `query_frontend_config` configures the Cortex query-frontend.
# CLI flag: -querier.max-outstanding-requests-per-tenant
[max_outstanding_per_tenant: <int> | default = 100]

# If a querier disconnects without sending notification about graceful shutdown,
# the query-frontend will keep the querier in the tenant's shard until the
# forget delay has passed. This feature is useful to reduce the blast radius
# when shuffle-sharding is enabled.
# CLI flag: -query-frontend.querier-forget-delay
[querier_forget_delay: <duration> | default = 0s]

# DNS hostname used for finding query-schedulers.
# CLI flag: -frontend.scheduler-address
[scheduler_address: <string> | default = ""]
Expand Down
9 changes: 9 additions & 0 deletions docs/guides/shuffle-sharding.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ Note that this distribution happens in query-frontend, or query-scheduler if use

_The maximum number of queriers can be overridden on a per-tenant basis in the limits overrides configuration._

#### The impact of "query of death"

In the event a tenant is repeatedly sending a "query of death" which leads the querier to crash or getting killed because of out-of-memory, the crashed querier will get disconnected from the query-frontend or query-scheduler and a new querier will be immediately assigned to the tenant's shard. This practically invalidates the assumption that shuffle-sharding can be used to contain the blast radius in case of a query of death.

To mitigate it, Cortex allows to configure a delay between when a querier disconnects because of a crash and when the crashed querier is actually removed from the tenant's shard (and another healthy querier is added as replacement). A delay of 1 minute may be a reasonable trade-off:

- Query-frontend: `-query-frontend.querier-forget-delay=1m`
- Query-scheduler: `-query-scheduler.querier-forget-delay=1m`

### Store-gateway shuffle sharding

The Cortex store-gateway -- used by the [blocks storage](../blocks-storage/_index.md) -- by default spreads each tenant's blocks across all running store-gateways.
Expand Down
5 changes: 4 additions & 1 deletion pkg/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ func InitFrontend(cfg CombinedFrontendConfig, limits v1.Limits, grpcListenPort i

default:
// No scheduler = use original frontend.
fr := v1.New(cfg.FrontendV1, limits, log, reg)
fr, err := v1.New(cfg.FrontendV1, limits, log, reg)
if err != nil {
return nil, nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil, nil
}
}
56 changes: 45 additions & 11 deletions pkg/frontend/v1/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package v1

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
Expand All @@ -11,6 +10,7 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
Expand All @@ -31,12 +31,14 @@ var (

// Config for a Frontend.
type Config struct {
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
MaxOutstandingPerTenant int `yaml:"max_outstanding_per_tenant"`
QuerierForgetDelay time.Duration `yaml:"querier_forget_delay"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.DurationVar(&cfg.QuerierForgetDelay, "query-frontend.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-frontend will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
}

type Limits interface {
Expand All @@ -56,6 +58,10 @@ type Frontend struct {
requestQueue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService

// Subservices manager.
subservices *services.Manager
subservicesWatcher *services.FailureWatcher

// Metrics.
queueLength *prometheus.GaugeVec
discardedRequests *prometheus.CounterVec
Expand All @@ -74,8 +80,7 @@ type request struct {
}

// New creates a new frontend. Frontend implements service, and must be started and stopped.
func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) *Frontend {

func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Registerer) (*Frontend, error) {
f := &Frontend{
cfg: cfg,
log: log,
Expand All @@ -95,26 +100,48 @@ func New(cfg Config, limits Limits, log log.Logger, registerer prometheus.Regist
}),
}

f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, f.queueLength, f.discardedRequests)
f.requestQueue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, cfg.QuerierForgetDelay, f.queueLength, f.discardedRequests)
f.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(f.cleanupInactiveUserMetrics)

var err error
f.subservices, err = services.NewManager(f.requestQueue, f.activeUsers)
if err != nil {
return nil, err
}

f.numClients = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_frontend_connected_clients",
Help: "Number of worker clients currently connected to the frontend.",
}, f.requestQueue.GetConnectedQuerierWorkersMetric)

f.Service = services.NewIdleService(f.starting, f.stopping)
return f
f.Service = services.NewBasicService(f.starting, f.running, f.stopping)
return f, nil
}

func (f *Frontend) starting(ctx context.Context) error {
return services.StartAndAwaitRunning(ctx, f.activeUsers)
f.subservicesWatcher.WatchManager(f.subservices)

if err := services.StartManagerAndAwaitHealthy(ctx, f.subservices); err != nil {
return errors.Wrap(err, "unable to start frontend subservices")
}

return nil
}

func (f *Frontend) running(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case err := <-f.subservicesWatcher.Chan():
return errors.Wrap(err, "frontend subservice failed")
}
}
}

func (f *Frontend) stopping(_ error) error {
// Stops new requests and errors out any pending requests.
f.requestQueue.Stop()
return services.StopAndAwaitTerminated(context.Background(), f.activeUsers)
// This will also stop the requests queue, which stop accepting new requests and errors out any pending requests.
return services.StopManagerAndAwaitStopped(context.Background(), f.subservices)
}

func (f *Frontend) cleanupInactiveUserMetrics(user string) {
Expand Down Expand Up @@ -258,6 +285,13 @@ func (f *Frontend) Process(server frontendv1pb.Frontend_ProcessServer) error {
}
}

func (f *Frontend) NotifyClientShutdown(_ context.Context, req *frontendv1pb.NotifyClientShutdownRequest) (*frontendv1pb.NotifyClientShutdownResponse, error) {
level.Info(f.log).Log("msg", "received shutdown notification from querier", "querier", req.GetClientID())
f.requestQueue.NotifyQuerierShutdown(req.GetClientID())

return &frontendv1pb.NotifyClientShutdownResponse{}, nil
}

func getQuerierID(server frontendv1pb.Frontend_ProcessServer) (string, error) {
err := server.Send(&frontendv1pb.FrontendToClient{
Type: frontendv1pb.GET_ID,
Expand Down
5 changes: 3 additions & 2 deletions pkg/frontend/v1/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestFrontendCheckReady(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
f := &Frontend{
log: log.NewNopLogger(),
requestQueue: queue.NewRequestQueue(5,
requestQueue: queue.NewRequestQueue(5, 0,
prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"user"}),
prometheus.NewCounterVec(prometheus.CounterOpts{}, []string{"user"}),
),
Expand Down Expand Up @@ -243,7 +243,8 @@ func testFrontend(t *testing.T, config Config, handler http.Handler, test func(a
httpListen, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

v1 := New(config, limits{}, logger, reg)
v1, err := New(config, limits{}, logger, reg)
require.NoError(t, err)
require.NotNil(t, v1)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), v1))
defer func() {
Expand Down
Loading