Skip to content

Commit 0c7c441

Browse files
committed
Fix S3 BucketWithRetries upload empty content issue
1 parent 9bc9fbf commit 0c7c441

File tree

2 files changed

+108
-1
lines changed

2 files changed

+108
-1
lines changed

pkg/storage/bucket/s3/bucket_client.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package s3
22

33
import (
4+
"bytes"
45
"context"
56
"io"
67
"time"
@@ -153,8 +154,18 @@ func (b *BucketWithRetries) Exists(ctx context.Context, name string) (exists boo
153154
}
154155

155156
func (b *BucketWithRetries) Upload(ctx context.Context, name string, r io.Reader) error {
157+
// Convert Reader to Seeker
158+
var buf bytes.Buffer
159+
if _, err := buf.ReadFrom(r); err != nil {
160+
return err
161+
}
162+
s := bytes.NewReader(buf.Bytes())
156163
return b.retry(ctx, func() error {
157-
return b.bucket.Upload(ctx, name, r)
164+
err := b.bucket.Upload(ctx, name, s)
165+
if _, err := s.Seek(0, io.SeekStart); err != nil {
166+
return err
167+
}
168+
return err
158169
})
159170
}
160171

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package s3
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
"io"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/thanos-io/objstore"
13+
)
14+
15+
func TestBucketWithRetries_Upload(t *testing.T) {
16+
t.Parallel()
17+
18+
m := mockBucket{
19+
MaxFailCount: 3,
20+
}
21+
b := BucketWithRetries{
22+
bucket: &m,
23+
operationRetries: 5,
24+
retryMinBackoff: 10 * time.Millisecond,
25+
retryMaxBackoff: time.Second,
26+
}
27+
28+
input := []byte("test input")
29+
err := b.Upload(context.Background(), "dummy", bytes.NewReader(input))
30+
require.NoError(t, err)
31+
require.Equal(t, input, m.uploadedContent)
32+
}
33+
34+
type mockBucket struct {
35+
MaxFailCount int
36+
uploadedContent []byte
37+
}
38+
39+
// Upload mocks objstore.Bucket.Upload()
40+
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
41+
var buf bytes.Buffer
42+
if _, err := buf.ReadFrom(r); err != nil {
43+
return err
44+
}
45+
m.uploadedContent = buf.Bytes()
46+
if m.MaxFailCount > 0 {
47+
m.MaxFailCount--
48+
return fmt.Errorf("failed upload: %d", m.MaxFailCount)
49+
}
50+
return nil
51+
}
52+
53+
// Delete mocks objstore.Bucket.Delete()
54+
func (m *mockBucket) Delete(ctx context.Context, name string) error {
55+
return nil
56+
}
57+
58+
// Name mocks objstore.Bucket.Name()
59+
func (m *mockBucket) Name() string {
60+
return "mock"
61+
}
62+
63+
// Iter mocks objstore.Bucket.Iter()
64+
func (m *mockBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
65+
return nil
66+
}
67+
68+
// Get mocks objstore.Bucket.Get()
69+
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
70+
return nil, nil
71+
}
72+
73+
// GetRange mocks objstore.Bucket.GetRange()
74+
func (m *mockBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
75+
return nil, nil
76+
}
77+
78+
// Exists mocks objstore.Bucket.Exists()
79+
func (m *mockBucket) Exists(ctx context.Context, name string) (bool, error) {
80+
return false, nil
81+
}
82+
83+
// IsObjNotFoundErr mocks objstore.Bucket.IsObjNotFoundErr()
84+
func (m *mockBucket) IsObjNotFoundErr(err error) bool {
85+
return false
86+
}
87+
88+
// ObjectSize mocks objstore.Bucket.Attributes()
89+
func (m *mockBucket) Attributes(ctx context.Context, name string) (objstore.ObjectAttributes, error) {
90+
return objstore.ObjectAttributes{Size: 0, LastModified: time.Now()}, nil
91+
}
92+
93+
// Close mocks objstore.Bucket.Close()
94+
func (m *mockBucket) Close() error {
95+
return nil
96+
}

0 commit comments

Comments
 (0)