Skip to content

Commit 1e78d22

Browse files
committed
Retry logic for S3 bucket client (cortexproject#5135)
* Added retry logic for S3 bucket client and updated visit marker error handling Signed-off-by: Alex Le <[email protected]> * Updated CHANGELOG Signed-off-by: Alex Le <[email protected]> * Updated docs Signed-off-by: Alex Le <[email protected]> * Changed default max retry Signed-off-by: Alex Le <[email protected]> * Update doc Signed-off-by: Alex Le <[email protected]> * Revert not related bug fix from s3 retry logic change Signed-off-by: Alex Le <[email protected]> * Hard coded S3 retry config Signed-off-by: Alex Le <[email protected]> * Revert unneccessary change Signed-off-by: Alex Le <[email protected]> * clean ws Signed-off-by: Alex Le <[email protected]> * Revised retry config Signed-off-by: Alex Le <[email protected]> * trigger workflow Signed-off-by: Alex Le <[email protected]> * trigger workflow Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 9da5574 commit 1e78d22

File tree

2 files changed

+120
-2
lines changed

2 files changed

+120
-2
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* [ENHANCEMENT] Push reduce one hash operation of Labels. #4945 #5114
1818
* [ENHANCEMENT] Alertmanager: Added `-alertmanager.enabled-tenants` and `-alertmanager.disabled-tenants` to explicitly enable or disable alertmanager for specific tenants. #5116
1919
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.17`. #5132
20+
* [ENHANCEMENT] Add retry logic to S3 bucket client. #5135
2021
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
2122
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
2223
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

pkg/storage/bucket/s3/bucket_client.go

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,39 @@
11
package s3
22

33
import (
4+
"context"
5+
"io"
6+
"time"
7+
48
"github.com/go-kit/log"
59
"github.com/prometheus/common/model"
610
"github.com/thanos-io/objstore"
711
"github.com/thanos-io/objstore/providers/s3"
12+
13+
"github.com/cortexproject/cortex/pkg/util/backoff"
814
)
915

16+
var defaultOperationRetries = 5
17+
var defaultRetryMinBackoff = 5 * time.Second
18+
var defaultRetryMaxBackoff = 1 * time.Minute
19+
1020
// NewBucketClient creates a new S3 bucket client
1121
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
1222
s3Cfg, err := newS3Config(cfg)
1323
if err != nil {
1424
return nil, err
1525
}
1626

17-
return s3.NewBucketWithConfig(logger, s3Cfg, name)
27+
bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name)
28+
if err != nil {
29+
return nil, err
30+
}
31+
return &BucketWithRetries{
32+
bucket: bucket,
33+
operationRetries: defaultOperationRetries,
34+
retryMinBackoff: defaultRetryMinBackoff,
35+
retryMaxBackoff: defaultRetryMaxBackoff,
36+
}, nil
1837
}
1938

2039
// NewBucketReaderClient creates a new S3 bucket client
@@ -24,7 +43,16 @@ func NewBucketReaderClient(cfg Config, name string, logger log.Logger) (objstore
2443
return nil, err
2544
}
2645

27-
return s3.NewBucketWithConfig(logger, s3Cfg, name)
46+
bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name)
47+
if err != nil {
48+
return nil, err
49+
}
50+
return &BucketWithRetries{
51+
bucket: bucket,
52+
operationRetries: defaultOperationRetries,
53+
retryMinBackoff: defaultRetryMinBackoff,
54+
retryMaxBackoff: defaultRetryMaxBackoff,
55+
}, nil
2856
}
2957

3058
func newS3Config(cfg Config) (s3.Config, error) {
@@ -62,3 +90,92 @@ func newS3Config(cfg Config) (s3.Config, error) {
6290
AWSSDKAuth: cfg.AccessKeyID == "",
6391
}, nil
6492
}
93+
94+
type BucketWithRetries struct {
95+
bucket objstore.Bucket
96+
operationRetries int
97+
retryMinBackoff time.Duration
98+
retryMaxBackoff time.Duration
99+
}
100+
101+
func (b *BucketWithRetries) retry(ctx context.Context, f func() error) error {
102+
var lastErr error
103+
retries := backoff.New(ctx, backoff.Config{
104+
MinBackoff: b.retryMinBackoff,
105+
MaxBackoff: b.retryMaxBackoff,
106+
MaxRetries: b.operationRetries,
107+
})
108+
for retries.Ongoing() {
109+
lastErr = f()
110+
if lastErr == nil {
111+
return nil
112+
}
113+
if b.bucket.IsObjNotFoundErr(lastErr) {
114+
return lastErr
115+
}
116+
retries.Wait()
117+
}
118+
return lastErr
119+
}
120+
121+
func (b *BucketWithRetries) Name() string {
122+
return b.bucket.Name()
123+
}
124+
125+
func (b *BucketWithRetries) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
126+
return b.retry(ctx, func() error {
127+
return b.bucket.Iter(ctx, dir, f, options...)
128+
})
129+
}
130+
131+
func (b *BucketWithRetries) Get(ctx context.Context, name string) (reader io.ReadCloser, err error) {
132+
err = b.retry(ctx, func() error {
133+
reader, err = b.bucket.Get(ctx, name)
134+
return err
135+
})
136+
return
137+
}
138+
139+
func (b *BucketWithRetries) GetRange(ctx context.Context, name string, off, length int64) (closer io.ReadCloser, err error) {
140+
err = b.retry(ctx, func() error {
141+
closer, err = b.bucket.GetRange(ctx, name, off, length)
142+
return err
143+
})
144+
return
145+
}
146+
147+
func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists bool, err error) {
148+
err = b.retry(ctx, func() error {
149+
exists, err = b.bucket.Exists(ctx, name)
150+
return err
151+
})
152+
return
153+
}
154+
155+
func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error {
156+
return b.retry(ctx, func() error {
157+
return b.bucket.Upload(ctx, name, r)
158+
})
159+
}
160+
161+
func (b *BucketWithRetries) Attributes(ctx context.Context, name string) (attributes objstore.ObjectAttributes, err error) {
162+
err = b.retry(ctx, func() error {
163+
attributes, err = b.bucket.Attributes(ctx, name)
164+
return err
165+
})
166+
return
167+
}
168+
169+
func (b *BucketWithRetries) Delete(ctx context.Context, name string) error {
170+
return b.retry(ctx, func() error {
171+
return b.bucket.Delete(ctx, name)
172+
})
173+
}
174+
175+
func (b *BucketWithRetries) IsObjNotFoundErr(err error) bool {
176+
return b.bucket.IsObjNotFoundErr(err)
177+
}
178+
179+
func (b *BucketWithRetries) Close() error {
180+
return b.bucket.Close()
181+
}

0 commit comments

Comments
 (0)