Skip to content

Commit 868898a

Browse files
pracuccijtlisi
andauthored
Add "wait ring stability" to store-gateway and fix cold start issue (#4271)
* Add wait ring stability to store-gateway and improve sharding sync logic Signed-off-by: Marco Pracucci <[email protected]> * Fixed typo in comments Signed-off-by: Marco Pracucci <[email protected]> * Added TestStoreGateway_InitialSyncWithWaitRingStability Signed-off-by: Marco Pracucci <[email protected]> * Improved TestStoreGateway_InitialSyncWithWaitRingStability doc Signed-off-by: Marco Pracucci <[email protected]> * Removed TestStoreGateway_BlocksSharding because superseeded by TestStoreGateway_InitialSyncWithWaitRingStability Signed-off-by: Marco Pracucci <[email protected]> * TestStoreGateway_InitialSyncWithWaitRingStability seed needs to be initialised and logged on a per test case basis, cause tests ordering is unstable Signed-off-by: Marco Pracucci <[email protected]> * Added TestStoreGateway_BlocksSyncWithDefaultSharding_RingTopologyChangedAfterScaleUp Signed-off-by: Marco Pracucci <[email protected]> * Fix linter issue Signed-off-by: Marco Pracucci <[email protected]> * Update pkg/storegateway/sharding_strategy.go Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Jacob Lisi <[email protected]> * Fix integration tests Signed-off-by: Marco Pracucci <[email protected]> * Updated doc Signed-off-by: Marco Pracucci <[email protected]> * Fix TestGettingStartedWithGossipedRing integration test Signed-off-by: Marco Pracucci <[email protected]> * Improved filterBlocksByRingSharding() logic Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Jacob Lisi <[email protected]>
1 parent 9aa910f commit 868898a

18 files changed

+449
-157
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* `memberlist_client_messages_to_broadcast_dropped_total`
4141
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-dispatcher-aggregation-groups` option to control max number of active dispatcher groups in Alertmanager (per tenant, also overrideable). When the limit is reached, Dispatcher produces log message and increases `cortex_alertmanager_dispatcher_aggregation_group_limit_reached_total` metric. #4254
4242
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.max-alerts-count` and `-alertmanager.max-alerts-size-bytes` to control max number of alerts and total size of alerts that a single user can have in Alertmanager's memory. Adding more alerts will fail with a log message and incrementing `cortex_alertmanager_alerts_insert_limited_total` metric (per-user). These limits can be overrided by using per-tenant overrides. Current values are tracked in `cortex_alertmanager_alerts_limiter_current_alerts` and `cortex_alertmanager_alerts_limiter_current_alerts_size_bytes` metrics. #4253
43+
* [ENHANCEMENT] Store-gateway: added `-store-gateway.sharding-ring.wait-stability-min-duration` and `-store-gateway.sharding-ring.wait-stability-max-duration` support to store-gateway, to wait for ring stability at startup. #4271
4344
* [BUGFIX] Purger: fix `Invalid null value in condition for column range` caused by `nil` value in range for WriteBatch query. #4128
4445
* [BUGFIX] Ingester: fixed infrequent panic caused by a race condition between TSDB mmap-ed head chunks truncation and queries. #4176
4546
* [BUGFIX] Alertmanager: fix Alertmanager status page if clustering via gossip is disabled or sharding is enabled. #4184
@@ -49,6 +50,7 @@
4950
* [BUGFIX] Ingester: fix issue where runtime limits erroneously override default limits. #4246
5051
* [BUGFIX] Ruler: fix startup in single-binary mode when the new `ruler_storage` is used. #4252
5152
* [BUGFIX] Querier: fix queries failing with "at least 1 healthy replica required, could only find 0" error right after scaling up store-gateways until they're ACTIVE in the ring. #4263
53+
* [BUGFIX] Store-gateway: when blocks sharding is enabled, do not load all blocks in each store-gateway in case of a cold startup, but load only blocks owned by the store-gateway replica. #4271
5254
* [BUGFIX] Memberlist: fix to setting the default configuration value for `-memberlist.retransmit-factor` when not provided. This should improve propagation delay of the ring state (including, but not limited to, tombstones). Note that if the configuration is already explicitly given, this fix has no effect. #4269
5355

5456
## Blocksconvert

docs/blocks-storage/compactor.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ compactor:
223223
[wait_stability_min_duration: <duration> | default = 1m]
224224

225225
# Maximum time to wait for ring stability at startup. If the compactor ring
226-
# keep changing after this period of time, the compactor will start anyway.
226+
# keeps changing after this period of time, the compactor will start anyway.
227227
# CLI flag: -compactor.ring.wait-stability-max-duration
228228
[wait_stability_max_duration: <duration> | default = 5m]
229229

docs/blocks-storage/store-gateway.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ The store-gateway replication optionally supports [zone-awareness](../guides/zon
8181
2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers.
8282
3. Rollout store-gateways, queriers and rulers to apply the new configuration
8383

84+
### Waiting for stable ring at startup
85+
86+
In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same time we may end up in a situation where each new store-gateway instance starts at a slightly different time and thus each one runs the initial blocks sync based on a different state of the ring. For example, in case of a cold start, the first store-gateway joining the ring may load all blocks since the sharding logic runs based on the current state of the ring, which is 1 single store-gateway.
87+
88+
To reduce the likelihood this could happen, the store-gateway waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-store-gateway.sharding-ring.wait-stability-min-duration`. If the ring keep getting changed after `-store-gateway.sharding-ring.wait-stability-max-duration`, the store-gateway will stop waiting for a stable ring and will proceed starting up normally.
89+
90+
To disable this waiting logic, you can start the store-gateway with `-store-gateway.sharding-ring.wait-stability-min-duration=0`.
91+
8492
## Blocks index-header
8593

8694
The [index-header](./binary-index-header.md) is a subset of the block index which the store-gateway downloads from the object storage and keeps on the local disk in order to speed up queries.
@@ -250,6 +258,16 @@ store_gateway:
250258
# CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled
251259
[zone_awareness_enabled: <boolean> | default = false]
252260

261+
# Minimum time to wait for ring stability at startup. 0 to disable.
262+
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
263+
[wait_stability_min_duration: <duration> | default = 1m]
264+
265+
# Maximum time to wait for ring stability at startup. If the store-gateway
266+
# ring keeps changing after this period of time, the store-gateway will
267+
# start anyway.
268+
# CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration
269+
[wait_stability_max_duration: <duration> | default = 5m]
270+
253271
# Name of network interface to read address from.
254272
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
255273
[instance_interface_names: <list of string> | default = [eth0 en0]]

docs/blocks-storage/store-gateway.template

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ The store-gateway replication optionally supports [zone-awareness](../guides/zon
8181
2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers.
8282
3. Rollout store-gateways, queriers and rulers to apply the new configuration
8383

84+
### Waiting for stable ring at startup
85+
86+
In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same time we may end up in a situation where each new store-gateway instance starts at a slightly different time and thus each one runs the initial blocks sync based on a different state of the ring. For example, in case of a cold start, the first store-gateway joining the ring may load all blocks since the sharding logic runs based on the current state of the ring, which is 1 single store-gateway.
87+
88+
To reduce the likelihood this could happen, the store-gateway waits for a stable ring at startup. A ring is considered stable if no instance is added/removed to the ring for at least `-store-gateway.sharding-ring.wait-stability-min-duration`. If the ring keep getting changed after `-store-gateway.sharding-ring.wait-stability-max-duration`, the store-gateway will stop waiting for a stable ring and will proceed starting up normally.
89+
90+
To disable this waiting logic, you can start the store-gateway with `-store-gateway.sharding-ring.wait-stability-min-duration=0`.
91+
8492
## Blocks index-header
8593

8694
The [index-header](./binary-index-header.md) is a subset of the block index which the store-gateway downloads from the object storage and keeps on the local disk in order to speed up queries.

docs/configuration/config-file-reference.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5175,7 +5175,7 @@ sharding_ring:
51755175
[wait_stability_min_duration: <duration> | default = 1m]
51765176
51775177
# Maximum time to wait for ring stability at startup. If the compactor ring
5178-
# keep changing after this period of time, the compactor will start anyway.
5178+
# keeps changing after this period of time, the compactor will start anyway.
51795179
# CLI flag: -compactor.ring.wait-stability-max-duration
51805180
[wait_stability_max_duration: <duration> | default = 5m]
51815181
@@ -5260,6 +5260,16 @@ sharding_ring:
52605260
# CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled
52615261
[zone_awareness_enabled: <boolean> | default = false]
52625262
5263+
# Minimum time to wait for ring stability at startup. 0 to disable.
5264+
# CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration
5265+
[wait_stability_min_duration: <duration> | default = 1m]
5266+
5267+
# Maximum time to wait for ring stability at startup. If the store-gateway
5268+
# ring keeps changing after this period of time, the store-gateway will start
5269+
# anyway.
5270+
# CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration
5271+
[wait_stability_max_duration: <duration> | default = 5m]
5272+
52635273
# Name of network interface to read address from.
52645274
# CLI flag: -store-gateway.sharding-ring.instance-interface-names
52655275
[instance_interface_names: <list of string> | default = [eth0 en0]]

integration/backward_compatibility_test.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@ var (
2626
"quay.io/cortexproject/cortex:v1.3.0": preCortex14Flags,
2727
"quay.io/cortexproject/cortex:v1.4.0": preCortex16Flags,
2828
"quay.io/cortexproject/cortex:v1.5.0": preCortex16Flags,
29-
"quay.io/cortexproject/cortex:v1.6.0": nil,
30-
"quay.io/cortexproject/cortex:v1.7.0": nil,
31-
"quay.io/cortexproject/cortex:v1.8.0": nil,
32-
"quay.io/cortexproject/cortex:v1.9.0": nil,
29+
"quay.io/cortexproject/cortex:v1.6.0": preCortex110Flags,
30+
"quay.io/cortexproject/cortex:v1.7.0": preCortex110Flags,
31+
"quay.io/cortexproject/cortex:v1.8.0": preCortex110Flags,
32+
"quay.io/cortexproject/cortex:v1.9.0": preCortex110Flags,
3333
}
3434
)
3535

@@ -42,13 +42,27 @@ func preCortex14Flags(flags map[string]string) map[string]string {
4242
"-store-gateway.sharding-ring.replication-factor": "",
4343
// Query-scheduler has been introduced in 1.6.0
4444
"-frontend.scheduler-dns-lookup-period": "",
45+
// Store-gateway "wait ring stability" has been introduced in 1.10.0
46+
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
47+
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
4548
})
4649
}
4750

4851
func preCortex16Flags(flags map[string]string) map[string]string {
4952
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
5053
// Query-scheduler has been introduced in 1.6.0
5154
"-frontend.scheduler-dns-lookup-period": "",
55+
// Store-gateway "wait ring stability" has been introduced in 1.10.0
56+
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
57+
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
58+
})
59+
}
60+
61+
func preCortex110Flags(flags map[string]string) map[string]string {
62+
return e2e.MergeFlagsWithoutRemovingEmpty(flags, map[string]string{
63+
// Store-gateway "wait ring stability" has been introduced in 1.10.0
64+
"-store-gateway.sharding-ring.wait-stability-min-duration": "",
65+
"-store-gateway.sharding-ring.wait-stability-max-duration": "",
5266
})
5367
}
5468

integration/e2ecortex/services.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ func NewStoreGatewayWithConfigFile(name, consulAddress, configFile string, flags
121121
"-store-gateway.sharding-ring.store": "consul",
122122
"-store-gateway.sharding-ring.consul.hostname": consulAddress,
123123
"-store-gateway.sharding-ring.replication-factor": "1",
124+
// Startup quickly.
125+
"-store-gateway.sharding-ring.wait-stability-min-duration": "0",
126+
"-store-gateway.sharding-ring.wait-stability-max-duration": "0",
124127
}, flags))...),
125128
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
126129
httpPort,
@@ -301,6 +304,9 @@ func NewSingleBinary(name string, flags map[string]string, image string, otherPo
301304
"-ingester.concurrent-flushes": "10",
302305
"-ingester.max-transfer-retries": "10",
303306
"-ingester.num-tokens": "512",
307+
// Startup quickly.
308+
"-store-gateway.sharding-ring.wait-stability-min-duration": "0",
309+
"-store-gateway.sharding-ring.wait-stability-max-duration": "0",
304310
}, flags))...),
305311
e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299),
306312
httpPort,

integration/getting_started_with_gossiped_ring_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,17 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
3333
// We don't care for storage part too much here. Both Cortex instances will write new blocks to /tmp, but that's fine.
3434
flags := map[string]string{
3535
// decrease timeouts to make test faster. should still be fine with two instances only
36-
"-ingester.join-after": "0s", // join quickly
37-
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
38-
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
39-
"-blocks-storage.backend": "s3",
40-
"-blocks-storage.s3.bucket-name": bucketName,
41-
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
42-
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
43-
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
44-
"-blocks-storage.s3.insecure": "true",
36+
"-ingester.join-after": "0s", // join quickly
37+
"-ingester.observe-period": "5s", // to avoid conflicts in tokens
38+
"-blocks-storage.bucket-store.sync-interval": "1s", // sync continuously
39+
"-blocks-storage.backend": "s3",
40+
"-blocks-storage.s3.bucket-name": bucketName,
41+
"-blocks-storage.s3.access-key-id": e2edb.MinioAccessKey,
42+
"-blocks-storage.s3.secret-access-key": e2edb.MinioSecretKey,
43+
"-blocks-storage.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName),
44+
"-blocks-storage.s3.insecure": "true",
45+
"-store-gateway.sharding-ring.wait-stability-min-duration": "0", // start quickly
46+
"-store-gateway.sharding-ring.wait-stability-max-duration": "0", // start quickly
4547
}
4648

4749
// This cortex will fail to join the cluster configured in yaml file. That's fine.

pkg/compactor/compactor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ func (c *Compactor) starting(ctx context.Context) error {
400400

401401
// In the event of a cluster cold start or scale up of 2+ compactor instances at the same
402402
// time, we may end up in a situation where each new compactor instance starts at a slightly
403-
// different time and thus each one starts with on a different state of the ring. It's better
403+
// different time and thus each one starts with a different state of the ring. It's better
404404
// to just wait the ring stability for a short time.
405405
if c.compactorCfg.ShardingRing.WaitStabilityMinDuration > 0 {
406406
minWaiting := c.compactorCfg.ShardingRing.WaitStabilityMinDuration

pkg/compactor/compactor_ring.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
5151

5252
// Wait stability flags.
5353
f.DurationVar(&cfg.WaitStabilityMinDuration, "compactor.ring.wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.")
54-
f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keep changing after this period of time, the compactor will start anyway.")
54+
f.DurationVar(&cfg.WaitStabilityMaxDuration, "compactor.ring.wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the compactor ring keeps changing after this period of time, the compactor will start anyway.")
5555

5656
// Instance flags
5757
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}

pkg/storegateway/bucket_store_inmemory_server.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package storegateway
33
import (
44
"context"
55

6+
"github.com/gogo/protobuf/types"
67
"github.com/pkg/errors"
78
"github.com/prometheus/prometheus/storage"
9+
"github.com/thanos-io/thanos/pkg/store/hintspb"
810
"github.com/thanos-io/thanos/pkg/store/storepb"
911
)
1012

@@ -19,6 +21,7 @@ type bucketStoreSeriesServer struct {
1921

2022
SeriesSet []*storepb.Series
2123
Warnings storage.Warnings
24+
Hints hintspb.SeriesResponseHints
2225
}
2326

2427
func newBucketStoreSeriesServer(ctx context.Context) *bucketStoreSeriesServer {
@@ -30,6 +33,13 @@ func (s *bucketStoreSeriesServer) Send(r *storepb.SeriesResponse) error {
3033
s.Warnings = append(s.Warnings, errors.New(r.GetWarning()))
3134
}
3235

36+
if rawHints := r.GetHints(); rawHints != nil {
37+
// We expect only 1 hints entry so we just keep 1.
38+
if err := types.UnmarshalAny(rawHints, &s.Hints); err != nil {
39+
return errors.Wrap(err, "failed to unmarshal series hints")
40+
}
41+
}
42+
3343
if recvSeries := r.GetSeries(); recvSeries != nil {
3444
// Thanos uses a pool for the chunks and may use other pools in the future.
3545
// Given we need to retain the reference after the pooled slices are recycled,

pkg/storegateway/bucket_stores_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ func (u *userShardingStrategy) FilterUsers(ctx context.Context, userIDs []string
589589
return u.users
590590
}
591591

592-
func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, synced *extprom.TxGaugeVec) error {
592+
func (u *userShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*thanos_metadata.Meta, loaded map[ulid.ULID]struct{}, synced *extprom.TxGaugeVec) error {
593593
if util.StringsContain(u.users, userID) {
594594
return nil
595595
}

pkg/storegateway/gateway.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,22 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
231231
return err
232232
}
233233
level.Info(g.logger).Log("msg", "store-gateway is JOINING in the ring")
234+
235+
// In the event of a cluster cold start or scale up of 2+ store-gateway instances at the same
236+
// time, we may end up in a situation where each new store-gateway instance starts at a slightly
237+
// different time and thus each one starts with a different state of the ring. It's better
238+
// to just wait the ring stability for a short time.
239+
if g.gatewayCfg.ShardingRing.WaitStabilityMinDuration > 0 {
240+
minWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMinDuration
241+
maxWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMaxDuration
242+
243+
level.Info(g.logger).Log("msg", "waiting until store-gateway ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String())
244+
if err := ring.WaitRingStability(ctx, g.ring, BlocksOwnerSync, minWaiting, maxWaiting); err != nil {
245+
level.Warn(g.logger).Log("msg", "store-gateway is ring topology is not stable after the max waiting time, proceeding anyway")
246+
} else {
247+
level.Info(g.logger).Log("msg", "store-gateway is ring topology is stable")
248+
}
249+
}
234250
}
235251

236252
// At this point, if sharding is enabled, the instance is registered with some tokens
@@ -271,7 +287,7 @@ func (g *StoreGateway) running(ctx context.Context) error {
271287
defer syncTicker.Stop()
272288

273289
if g.gatewayCfg.ShardingEnabled {
274-
ringLastState, _ = g.ring.GetAllHealthy(BlocksSync) // nolint:errcheck
290+
ringLastState, _ = g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck
275291
ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2))
276292
defer ringTicker.Stop()
277293
ringTickerChan = ringTicker.C
@@ -284,7 +300,7 @@ func (g *StoreGateway) running(ctx context.Context) error {
284300
case <-ringTickerChan:
285301
// We ignore the error because in case of error it will return an empty
286302
// replication set which we use to compare with the previous state.
287-
currRingState, _ := g.ring.GetAllHealthy(BlocksSync) // nolint:errcheck
303+
currRingState, _ := g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck
288304

289305
if ring.HasReplicationSetChanged(ringLastState, currRingState) {
290306
ringLastState = currRingState

0 commit comments

Comments
 (0)