Skip to content

Commit ed87699

Browse files
committed
Improve the code to compare once more and skip if no op
1 parent 8d283d7 commit ed87699

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

pkg/storegateway/gateway.go

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func (g *StoreGateway) starting(ctx context.Context) (err error) {
233233
// time, we may end up in a situation where each new store-gateway instance starts at a slightly
234234
// different time and thus each one starts with a different state of the ring. It's better
235235
// to just wait the ring stability for a short time.
236-
if g.gatewayCfg.ShardingRing.WaitStabilityMinDuration > 0 {
236+
if g.shouldWaitRingStability() {
237237
g.waitRingStability(ctx, syncReasonInitial)
238238
}
239239
}
@@ -276,7 +276,7 @@ func (g *StoreGateway) running(ctx context.Context) error {
276276
defer syncTicker.Stop()
277277

278278
if g.gatewayCfg.ShardingEnabled {
279-
ringLastState, _ = g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck
279+
ringLastState = g.getRingState(BlocksOwnerSync)
280280
ringTicker := time.NewTicker(util.DurationWithJitter(g.gatewayCfg.ShardingRing.RingCheckPeriod, 0.2))
281281
defer ringTicker.Stop()
282282
ringTickerChan = ringTicker.C
@@ -285,15 +285,25 @@ func (g *StoreGateway) running(ctx context.Context) error {
285285
for {
286286
select {
287287
case <-syncTicker.C:
288-
g.syncStores(ctx, ringLastState, syncReasonPeriodic)
288+
if g.shouldWaitRingStability() {
289+
g.waitRingStability(ctx, syncReasonPeriodic)
290+
}
291+
g.syncStores(ctx, syncReasonPeriodic)
289292
case <-ringTickerChan:
290-
// We ignore the error because in case of error it will return an empty
291-
// replication set which we use to compare with the previous state.
292-
currRingState, _ := g.ring.GetAllHealthy(BlocksOwnerSync) // nolint:errcheck
293+
currRingState := g.getRingState(BlocksOwnerSync)
293294

294-
// Ignore address when comparing to avoid block re-sync if tokens are persisted with tokens_file_path
295295
if ring.HasReplicationSetTokensOrZonesChanged(ringLastState, currRingState) {
296-
ringLastState = g.syncStores(ctx, currRingState, syncReasonRingChange)
296+
if g.shouldWaitRingStability() {
297+
g.waitRingStability(ctx, syncReasonRingChange)
298+
currRingState = g.getRingState(BlocksOwnerSync)
299+
if ring.HasReplicationSetTokensOrZonesChanged(ringLastState, currRingState) {
300+
ringLastState = currRingState
301+
g.syncStores(ctx, syncReasonRingChange)
302+
}
303+
} else {
304+
ringLastState = currRingState
305+
g.syncStores(ctx, syncReasonRingChange)
306+
}
297307
}
298308
case <-ctx.Done():
299309
return nil
@@ -310,13 +320,7 @@ func (g *StoreGateway) stopping(_ error) error {
310320
return nil
311321
}
312322

313-
func (g *StoreGateway) syncStores(ctx context.Context, currRingState ring.ReplicationSet, reason string) ring.ReplicationSet {
314-
ringLastState := currRingState
315-
if g.gatewayCfg.ShardingEnabled && g.gatewayCfg.ShardingRing.WaitStabilityMinDuration > 0 {
316-
g.waitRingStability(ctx, reason)
317-
ringLastState, _ = g.ring.GetAllHealthy(BlocksOwnerSync)
318-
}
319-
323+
func (g *StoreGateway) syncStores(ctx context.Context, reason string) {
320324
level.Info(g.logger).Log("msg", "synchronizing TSDB blocks for all users", "reason", reason)
321325
g.bucketSync.WithLabelValues(reason).Inc()
322326

@@ -325,8 +329,6 @@ func (g *StoreGateway) syncStores(ctx context.Context, currRingState ring.Replic
325329
} else {
326330
level.Info(g.logger).Log("msg", "successfully synchronized TSDB blocks for all users", "reason", reason)
327331
}
328-
329-
return ringLastState
330332
}
331333

332334
func (g *StoreGateway) Series(req *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
@@ -370,6 +372,10 @@ func (g *StoreGateway) waitRingStability(ctx context.Context, reason string) {
370372
minWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMinDuration
371373
maxWaiting := g.gatewayCfg.ShardingRing.WaitStabilityMaxDuration
372374

375+
if !g.gatewayCfg.ShardingEnabled || minWaiting <= 0 {
376+
return
377+
}
378+
373379
level.Info(g.logger).Log("msg", "waiting until store-gateway ring topology is stable", "min_waiting", minWaiting.String(), "max_waiting", maxWaiting.String(), "reason", reason)
374380
if err := ring.WaitRingTokensAndZonesStability(ctx, g.ring, BlocksOwnerSync, minWaiting, maxWaiting); err != nil {
375381
level.Warn(g.logger).Log("msg", "store-gateway ring topology is not stable after the max waiting time, proceeding anyway", "reason", reason)
@@ -378,6 +384,17 @@ func (g *StoreGateway) waitRingStability(ctx context.Context, reason string) {
378384
}
379385
}
380386

387+
func (g *StoreGateway) getRingState(op ring.Operation) ring.ReplicationSet {
388+
// We ignore the error because in case of error it will return an empty
389+
// replication set which we use to compare with the previous state.
390+
ringState, _ := g.ring.GetAllHealthy(op) // nolint:errcheck
391+
return ringState
392+
}
393+
394+
func (g *StoreGateway) shouldWaitRingStability() bool {
395+
return g.gatewayCfg.ShardingEnabled && g.gatewayCfg.ShardingRing.WaitStabilityMinDuration > 0
396+
}
397+
381398
func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
382399
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "store-gateway", logger, reg)
383400
if err != nil {

pkg/storegateway/gateway_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -795,8 +795,7 @@ func TestStoreGateway_syncStoresShouldWaitRingStability(t *testing.T) {
795795
}))
796796

797797
syncCtx, cancelCtx := context.WithTimeout(ctx, 3*time.Second)
798-
ringState, _ := g.ring.GetAllHealthy(BlocksOwnerSync)
799-
g.syncStores(syncCtx, ringState, "test")
798+
g.syncStores(syncCtx, "test")
800799
cancelCtx()
801800

802801
// No blocks should be synced

0 commit comments

Comments
 (0)