Skip to content

Commit fb0d69c

Browse files
committed
Feat s3 transfer manager v2 PutObject (#2733)
* recommit transfer manager v2 files * change pool to store slice pointer * add integ test for putobject * update go mod * minor changes for v0.1.0 * update tags * update tags * update integ test dependency version * change err var name * update go mod * change input/output type comment * minor change --------- Co-authored-by: Tianyi Wang <[email protected]> rebase from main rebase branch from main
1 parent 1572db0 commit fb0d69c

File tree

3 files changed

+287
-0
lines changed

3 files changed

+287
-0
lines changed

service/internal/integrationtest/go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,8 @@ replace github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 => ../../../
192192

193193
replace github.com/aws/aws-sdk-go-v2/service/elasticsearchservice => ../../../service/elasticsearchservice/
194194

195+
replace github.com/aws/aws-sdk-go-v2/service/elastictranscoder => ../../../service/elastictranscoder/
196+
195197
replace github.com/aws/aws-sdk-go-v2/service/emr => ../../../service/emr/
196198

197199
replace github.com/aws/aws-sdk-go-v2/service/eventbridge => ../../../service/eventbridge/
@@ -289,3 +291,5 @@ replace github.com/aws/aws-sdk-go-v2/service/waf => ../../../service/waf/
289291
replace github.com/aws/aws-sdk-go-v2/service/wafregional => ../../../service/wafregional/
290292

291293
replace github.com/aws/aws-sdk-go-v2/service/workspaces => ../../../service/workspaces/
294+
295+
replace github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager => ../../../feature/s3/transfermanager
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package s3transfermanager
5+
6+
import (
7+
"bytes"
8+
"strings"
9+
"testing"
10+
)
11+
12+
func TestInteg_PutObject(t *testing.T) {
13+
cases := map[string]putObjectTestData{
14+
"seekable body": {Body: strings.NewReader("hello world"), ExpectBody: []byte("hello world")},
15+
"empty string body": {Body: strings.NewReader(""), ExpectBody: []byte("")},
16+
"multipart upload body": {Body: bytes.NewReader(largeObjectBuf), ExpectBody: largeObjectBuf},
17+
}
18+
19+
for name, c := range cases {
20+
t.Run(name, func(t *testing.T) {
21+
testPutObject(t, setupMetadata.Buckets.Source.Name, c)
22+
})
23+
}
24+
}
Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
//go:build integration
2+
// +build integration
3+
4+
package s3transfermanager
5+
6+
import (
7+
"bytes"
8+
"context"
9+
"crypto/rand"
10+
"crypto/tls"
11+
"flag"
12+
"fmt"
13+
"io"
14+
"io/ioutil"
15+
"net/http"
16+
"os"
17+
"strings"
18+
"testing"
19+
20+
"github.com/aws/aws-sdk-go-v2/aws"
21+
"github.com/aws/aws-sdk-go-v2/aws/arn"
22+
tm "github.com/aws/aws-sdk-go-v2/feature/s3/transfermanager"
23+
"github.com/aws/aws-sdk-go-v2/service/internal/integrationtest"
24+
"github.com/aws/aws-sdk-go-v2/service/internal/integrationtest/s3shared"
25+
"github.com/aws/aws-sdk-go-v2/service/s3"
26+
"github.com/aws/aws-sdk-go-v2/service/sts"
27+
)
28+
29+
var setupMetadata = struct {
30+
AccountID string
31+
Region string
32+
Buckets struct {
33+
Source struct {
34+
Name string
35+
ARN string
36+
}
37+
}
38+
}{}
39+
40+
// s3 client to use for integ testing
41+
var s3Client *s3.Client
42+
43+
// s3TransferManagerClient to use for integ testing
44+
var s3TransferManagerClient *tm.Client
45+
46+
// sts client to use for integ testing
47+
var stsClient *sts.Client
48+
49+
// http client setting to use for integ testing
50+
var httpClient *http.Client
51+
52+
var region = "us-west-2"
53+
54+
// large object buffer to test multipart upload
55+
var largeObjectBuf []byte
56+
57+
// TestMain executes at start of package tests
58+
func TestMain(m *testing.M) {
59+
flag.Parse()
60+
flag.CommandLine.Visit(func(f *flag.Flag) {
61+
if !(f.Name == "run" || f.Name == "test.run") {
62+
return
63+
}
64+
value := f.Value.String()
65+
if value == `NONE` {
66+
os.Exit(0)
67+
}
68+
})
69+
70+
var result int
71+
defer func() {
72+
if r := recover(); r != nil {
73+
fmt.Fprintln(os.Stderr, "S3 TransferManager integration tests panic,", r)
74+
result = 1
75+
}
76+
os.Exit(result)
77+
}()
78+
79+
var verifyTLS bool
80+
var s3Endpoint string
81+
82+
flag.StringVar(&s3Endpoint, "s3-endpoint", "", "integration endpoint for S3")
83+
84+
flag.StringVar(&setupMetadata.AccountID, "account", "", "integration account id")
85+
flag.BoolVar(&verifyTLS, "verify-tls", true, "verify server TLS certificate")
86+
flag.Parse()
87+
88+
httpClient = &http.Client{
89+
Transport: &http.Transport{
90+
TLSClientConfig: &tls.Config{InsecureSkipVerify: verifyTLS},
91+
},
92+
}
93+
94+
cfg, err := integrationtest.LoadConfigWithDefaultRegion(region)
95+
if err != nil {
96+
fmt.Fprintf(os.Stderr, "Error occurred while loading config with region %v, %v", region, err)
97+
result = 1
98+
return
99+
}
100+
101+
// assign the http client
102+
cfg.HTTPClient = httpClient
103+
104+
// create a s3 client
105+
s3cfg := cfg.Copy()
106+
if len(s3Endpoint) != 0 {
107+
s3cfg.EndpointResolver = aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) {
108+
return aws.Endpoint{
109+
URL: s3Endpoint,
110+
PartitionID: "aws",
111+
SigningName: "s3",
112+
SigningRegion: region,
113+
SigningMethod: "s3v4",
114+
}, nil
115+
})
116+
}
117+
118+
// build s3 client from config
119+
s3Client = s3.NewFromConfig(s3cfg)
120+
121+
// build s3 transfermanager client from config
122+
s3TransferManagerClient = tm.NewFromConfig(s3Client, s3cfg)
123+
124+
// build sts client from config
125+
stsClient = sts.NewFromConfig(cfg)
126+
127+
// context
128+
ctx := context.Background()
129+
130+
setupMetadata.AccountID, err = getAccountID(ctx)
131+
if err != nil {
132+
fmt.Fprintf(os.Stderr, "failed to get integration aws account id: %v\n", err)
133+
result = 1
134+
return
135+
}
136+
137+
bucketCleanup, err := setupBuckets(ctx)
138+
defer bucketCleanup()
139+
if err != nil {
140+
fmt.Fprintf(os.Stderr, "failed to setup integration test buckets: %v\n", err)
141+
result = 1
142+
return
143+
}
144+
145+
largeObjectBuf = make([]byte, 20*1024*1024)
146+
_, err = rand.Read(largeObjectBuf)
147+
if err != nil {
148+
fmt.Fprintf(os.Stderr, "failed to generate large object for multipart upload: %v\n", err)
149+
result = 1
150+
return
151+
}
152+
153+
result = m.Run()
154+
}
155+
156+
// getAccountID retrieves account id
157+
func getAccountID(ctx context.Context) (string, error) {
158+
if len(setupMetadata.AccountID) != 0 {
159+
return setupMetadata.AccountID, nil
160+
}
161+
identity, err := stsClient.GetCallerIdentity(ctx, nil)
162+
if err != nil {
163+
return "", fmt.Errorf("error fetching caller identity, %w", err)
164+
}
165+
return *identity.Account, nil
166+
}
167+
168+
// setupBuckets creates buckets needed for integration test
169+
func setupBuckets(ctx context.Context) (func(), error) {
170+
var cleanups []func()
171+
172+
cleanup := func() {
173+
for i := range cleanups {
174+
cleanups[i]()
175+
}
176+
}
177+
178+
bucketCreates := []struct {
179+
name *string
180+
arn *string
181+
}{
182+
{name: &setupMetadata.Buckets.Source.Name, arn: &setupMetadata.Buckets.Source.ARN},
183+
}
184+
185+
for _, bucket := range bucketCreates {
186+
*bucket.name = s3shared.GenerateBucketName()
187+
188+
if err := s3shared.SetupBucket(ctx, s3Client, *bucket.name); err != nil {
189+
return cleanup, err
190+
}
191+
192+
// Compute ARN
193+
bARN := arn.ARN{
194+
Partition: "aws",
195+
Service: "s3",
196+
Region: region,
197+
AccountID: setupMetadata.AccountID,
198+
Resource: fmt.Sprintf("bucket_name:%s", *bucket.name),
199+
}.String()
200+
201+
*bucket.arn = bARN
202+
203+
bucketName := *bucket.name
204+
cleanups = append(cleanups, func() {
205+
if err := s3shared.CleanupBucket(ctx, s3Client, bucketName); err != nil {
206+
fmt.Fprintln(os.Stderr, err)
207+
}
208+
})
209+
}
210+
211+
return cleanup, nil
212+
}
213+
214+
type putObjectTestData struct {
215+
Body io.Reader
216+
ExpectBody []byte
217+
ExpectError string
218+
}
219+
220+
func testPutObject(t *testing.T, bucket string, testData putObjectTestData, opts ...func(options *tm.Options)) {
221+
key := integrationtest.UniqueID()
222+
223+
_, err := s3TransferManagerClient.PutObject(context.Background(),
224+
&tm.PutObjectInput{
225+
Bucket: bucket,
226+
Key: key,
227+
Body: testData.Body,
228+
}, opts...)
229+
if err != nil {
230+
if len(testData.ExpectError) == 0 {
231+
t.Fatalf("expect no error, got %v", err)
232+
}
233+
if e, a := testData.ExpectError, err.Error(); !strings.Contains(a, e) {
234+
t.Fatalf("expect error to contain %v, got %v", e, a)
235+
}
236+
} else {
237+
if e := testData.ExpectError; len(e) != 0 {
238+
t.Fatalf("expect error: %v, got none", e)
239+
}
240+
}
241+
242+
if len(testData.ExpectError) != 0 {
243+
return
244+
}
245+
246+
resp, err := s3Client.GetObject(context.Background(),
247+
&s3.GetObjectInput{
248+
Bucket: aws.String(bucket),
249+
Key: aws.String(key),
250+
})
251+
if err != nil {
252+
t.Fatalf("expect no error, got %v", err)
253+
}
254+
255+
b, _ := ioutil.ReadAll(resp.Body)
256+
if e, a := testData.ExpectBody, b; !bytes.EqualFold(e, a) {
257+
t.Errorf("expect %s, got %s", e, a)
258+
}
259+
}

0 commit comments

Comments
 (0)