Skip to content

Commit beb022f

Browse files
ensure chunk ordering is correct (#55)
Co-authored-by: Luke Lombardi <[email protected]>
1 parent e240318 commit beb022f

File tree

1 file changed

+28
-27
lines changed

1 file changed

+28
-27
lines changed

pkg/s3_client.go

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ func (c *S3Client) Head(ctx context.Context, key string) (bool, *s3.HeadObjectOu
8787
return true, output, nil
8888
}
8989

90+
type chunkResult struct {
91+
index int
92+
data []byte
93+
err error
94+
}
95+
9096
func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *bytes.Buffer) error {
9197
ok, head, err := c.Head(ctx, key)
9298
if err != nil || !ok {
@@ -98,26 +104,19 @@ func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *b
98104
}
99105

100106
numChunks := int((size + c.DownloadChunkSize - 1) / c.DownloadChunkSize)
101-
chunks := make([][]byte, numChunks)
107+
chunkCh := make(chan chunkResult, numChunks)
102108
sem := make(chan struct{}, c.DownloadConcurrency)
103-
var wg sync.WaitGroup
104109

105110
ctx, cancel := context.WithCancel(ctx)
106111
defer cancel()
107112

108-
errCh := make(chan error, 1)
109-
113+
var wg sync.WaitGroup
110114
for i := 0; i < numChunks; i++ {
111115
wg.Add(1)
112-
113116
go func(i int) {
117+
defer wg.Done()
114118
sem <- struct{}{}
115119
defer func() { <-sem }()
116-
defer wg.Done()
117-
118-
if ctx.Err() != nil {
119-
return
120-
}
121120

122121
start := int64(i) * c.DownloadChunkSize
123122
end := start + c.DownloadChunkSize - 1
@@ -132,41 +131,43 @@ func (c *S3Client) DownloadIntoBuffer(ctx context.Context, key string, buffer *b
132131
Range: &rangeHeader,
133132
})
134133
if err != nil {
135-
select {
136-
case errCh <- fmt.Errorf("range request failed for %s: %w", rangeHeader, err):
137-
cancel()
138-
default:
139-
}
134+
chunkCh <- chunkResult{i, nil, fmt.Errorf("range request failed for %s: %w", rangeHeader, err)}
135+
cancel()
140136
return
141137
}
142138
defer resp.Body.Close()
143139

144140
part := make([]byte, end-start+1)
145141
n, err := io.ReadFull(resp.Body, part)
146142
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
147-
select {
148-
case errCh <- fmt.Errorf("error reading range %s: %w", rangeHeader, err):
149-
cancel()
150-
default:
151-
}
143+
chunkCh <- chunkResult{i, nil, fmt.Errorf("error reading range %s: %w", rangeHeader, err)}
144+
cancel()
152145
return
153146
}
154-
155-
chunks[i] = part[:n]
147+
chunkCh <- chunkResult{i, part[:n], nil}
156148
}(i)
157149
}
158150

159-
wg.Wait()
160-
close(errCh)
151+
go func() {
152+
wg.Wait()
153+
close(chunkCh)
154+
}()
161155

162-
if err, ok := <-errCh; ok {
163-
return err
156+
chunks := make([][]byte, numChunks)
157+
var errs []error
158+
for res := range chunkCh {
159+
if res.err != nil {
160+
errs = append(errs, res.err)
161+
}
162+
chunks[res.index] = res.data
163+
}
164+
if len(errs) > 0 {
165+
return fmt.Errorf("download errors: %v", errs)
164166
}
165167

166168
buffer.Reset()
167169
for _, chunk := range chunks {
168170
buffer.Write(chunk)
169171
}
170-
171172
return nil
172173
}

0 commit comments

Comments
 (0)