Skip to content

Commit 0c44c24

Browse files
committed
add tar stream for wavefile
1 parent 3485489 commit 0c44c24

File tree

3 files changed

+59
-23
lines changed

3 files changed

+59
-23
lines changed

pkg/remote/connparse/connparse.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ func (c *Connection) GetFullURI() string {
5757
return c.Scheme + "://" + c.GetPathWithHost()
5858
}
5959

60+
func (c *Connection) GetSchemeAndHost() string {
61+
return c.Scheme + "://" + c.Host
62+
}
63+
6064
func ParseURIAndReplaceCurrentHost(ctx context.Context, uri string) (*Connection, error) {
6165
conn, err := ParseURI(uri)
6266
if err != nil {

pkg/remote/fileshare/wavefs/wavefs.go

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ import (
99
"encoding/base64"
1010
"errors"
1111
"fmt"
12+
"io"
1213
"io/fs"
14+
"log"
1315
"path"
1416
"strings"
1517
"time"
@@ -101,15 +103,15 @@ func (c WaveClient) Read(ctx context.Context, conn *connparse.Connection, data w
101103
}
102104

103105
func (c WaveClient) ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet] {
104-
pathPrefix, err := cleanPath(conn.Path)
105-
if err != nil {
106-
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error cleaning path: %w", err))
107-
}
106+
log.Printf("ReadTarStream: conn: %v, opts: %v\n", conn, opts)
108107
list, err := c.ListEntries(ctx, conn, nil)
109108
if err != nil {
110109
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error listing blockfiles: %w", err))
111110
}
112111

112+
pathPrefix := getPathPrefix(conn)
113+
schemeAndHost := conn.GetSchemeAndHost() + "/"
114+
113115
timeout := time.Millisecond * 100
114116
if opts.Timeout > 0 {
115117
timeout = time.Duration(opts.Timeout) * time.Millisecond
@@ -127,6 +129,7 @@ func (c WaveClient) ReadTarStream(ctx context.Context, conn *connparse.Connectio
127129
rtn <- wshutil.RespErr[iochantypes.Packet](readerCtx.Err())
128130
return
129131
}
132+
file.Mode = 0644
130133

131134
if err = writeHeader(fileutil.ToFsFileInfo(file), file.Path); err != nil {
132135
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error writing tar header: %w", err))
@@ -136,7 +139,11 @@ func (c WaveClient) ReadTarStream(ctx context.Context, conn *connparse.Connectio
136139
continue
137140
}
138141

139-
_, dataBuf, err := filestore.WFS.ReadFile(ctx, conn.Host, file.Path)
142+
log.Printf("ReadTarStream: reading file: %s\n", file.Path)
143+
144+
internalPath := strings.TrimPrefix(file.Path, schemeAndHost)
145+
146+
_, dataBuf, err := filestore.WFS.ReadFile(ctx, conn.Host, internalPath)
140147
if err != nil {
141148
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error reading blockfile: %w", err))
142149
return
@@ -168,10 +175,14 @@ func (c WaveClient) ListEntriesStream(ctx context.Context, conn *connparse.Conne
168175
}
169176

170177
func (c WaveClient) ListEntries(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) ([]*wshrpc.FileInfo, error) {
178+
log.Printf("ListEntries: conn: %v, opts: %v\n", conn, opts)
171179
zoneId := conn.Host
172180
if zoneId == "" {
173181
return nil, fmt.Errorf("zoneid not found in connection")
174182
}
183+
if opts == nil {
184+
opts = &wshrpc.FileListOpts{}
185+
}
175186
prefix, err := cleanPath(conn.Path)
176187
if err != nil {
177188
return nil, fmt.Errorf("error cleaning path: %w", err)
@@ -432,13 +443,19 @@ func (c WaveClient) CopyRemote(ctx context.Context, srcConn, destConn *connparse
432443
if zoneId == "" {
433444
return fmt.Errorf("zoneid not found in connection")
434445
}
446+
destPrefix := getPathPrefix(destConn)
447+
destPrefix = strings.TrimPrefix(destPrefix, destConn.GetSchemeAndHost()+"/")
448+
log.Printf("CopyRemote: srcConn: %v, destConn: %v, destPrefix: %s\n", srcConn, destConn, destPrefix)
435449
readCtx, cancel := context.WithCancelCause(ctx)
436450
ioch := srcClient.ReadTarStream(readCtx, srcConn, opts)
437451
err := tarcopy.TarCopyDest(readCtx, cancel, ioch, func(next *tar.Header, reader *tar.Reader) error {
438452
if next.Typeflag == tar.TypeDir {
439453
return nil
440454
}
441-
fileName, err := cleanPath(path.Join(destConn.Path, next.Name))
455+
fileName, err := cleanPath(path.Join(destPrefix, next.Name))
456+
if err != nil {
457+
return fmt.Errorf("error cleaning path: %w", err)
458+
}
442459
_, err = filestore.WFS.Stat(ctx, zoneId, fileName)
443460
if err != nil {
444461
if !errors.Is(err, fs.ErrNotExist) {
@@ -449,12 +466,15 @@ func (c WaveClient) CopyRemote(ctx context.Context, srcConn, destConn *connparse
449466
return fmt.Errorf("error making blockfile: %w", err)
450467
}
451468
}
469+
log.Printf("CopyRemote: writing file: %s; size: %d\n", fileName, next.Size)
452470
dataBuf := make([]byte, next.Size)
453-
n, err := reader.Read(dataBuf)
471+
_, err = reader.Read(dataBuf)
454472
if err != nil {
455-
return fmt.Errorf("error reading tar data: %w", err)
473+
if !errors.Is(err, io.EOF) {
474+
return fmt.Errorf("error reading tar data: %w", err)
475+
}
456476
}
457-
err = filestore.WFS.WriteFile(ctx, zoneId, fileName, dataBuf[:n])
477+
err = filestore.WFS.WriteFile(ctx, zoneId, fileName, dataBuf)
458478
if err != nil {
459479
return fmt.Errorf("error writing to blockfile: %w", err)
460480
}
@@ -535,3 +555,13 @@ func cleanPath(path string) (string, error) {
535555
func (c WaveClient) GetConnectionType() string {
536556
return connparse.ConnectionTypeWave
537557
}
558+
559+
func getPathPrefix(conn *connparse.Connection) string {
560+
fullUri := conn.GetFullURI()
561+
pathPrefix := fullUri
562+
lastSlash := strings.LastIndex(fullUri, "/")
563+
if lastSlash > 10 && lastSlash < len(fullUri)-1 {
564+
pathPrefix = fullUri[:lastSlash+1]
565+
}
566+
return pathPrefix
567+
}

pkg/util/tarcopy/tarcopy.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,19 @@ import (
1717
"github.com/wavetermdev/waveterm/pkg/wshrpc"
1818
)
1919

20+
const (
21+
maxRetries = 5
22+
retryDelay = 10 * time.Millisecond
23+
)
24+
2025
func TarCopySrc(ctx context.Context, chunkSize int, pathPrefix string) (outputChan chan wshrpc.RespOrErrorUnion[iochantypes.Packet], writeHeader func(fi fs.FileInfo, file string) error, writer io.Writer, close func()) {
2126
pipeReader, pipeWriter := io.Pipe()
2227
tarWriter := tar.NewWriter(pipeWriter)
2328
rtnChan := iochan.ReaderChan(ctx, pipeReader, wshrpc.FileChunkSize, func() {
24-
for {
29+
for retries := 0; retries < maxRetries; retries++ {
2530
if err := pipeReader.Close(); err != nil {
2631
log.Printf("TarCopySrc: error closing pipe reader: %v, trying again in 10ms\n", err)
27-
time.Sleep(time.Millisecond * 10)
32+
time.Sleep(retryDelay)
2833
continue
2934
}
3035
break
@@ -52,18 +57,18 @@ func TarCopySrc(ctx context.Context, chunkSize int, pathPrefix string) (outputCh
5257
}
5358
return nil
5459
}, tarWriter, func() {
55-
for {
60+
for retries := 0; retries < maxRetries; retries++ {
5661
if err := tarWriter.Close(); err != nil {
5762
log.Printf("TarCopySrc: error closing tar writer: %v, trying again in 10ms\n", err)
58-
time.Sleep(time.Millisecond * 10)
63+
time.Sleep(retryDelay)
5964
continue
6065
}
6166
break
6267
}
63-
for {
68+
for retries := 0; retries < maxRetries; retries++ {
6469
if err := pipeWriter.Close(); err != nil {
6570
log.Printf("TarCopySrc: error closing pipe writer: %v, trying again in 10ms\n", err)
66-
time.Sleep(time.Millisecond * 10)
71+
time.Sleep(retryDelay)
6772
continue
6873
}
6974
break
@@ -74,10 +79,10 @@ func TarCopySrc(ctx context.Context, chunkSize int, pathPrefix string) (outputCh
7479
func TarCopyDest(ctx context.Context, cancel context.CancelCauseFunc, ch <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet], readNext func(next *tar.Header, reader *tar.Reader) error) error {
7580
pipeReader, pipeWriter := io.Pipe()
7681
iochan.WriterChan(ctx, pipeWriter, ch, func() {
77-
for {
82+
for retries := 0; retries < maxRetries; retries++ {
7883
if err := pipeWriter.Close(); err != nil {
7984
log.Printf("TarCopyDest: error closing pipe writer: %v, trying again in 10ms\n", err)
80-
time.Sleep(time.Millisecond * 10)
85+
time.Sleep(retryDelay)
8186
continue
8287
}
8388
cancel(nil)
@@ -86,10 +91,10 @@ func TarCopyDest(ctx context.Context, cancel context.CancelCauseFunc, ch <-chan
8691
}, cancel)
8792
tarReader := tar.NewReader(pipeReader)
8893
defer func() {
89-
for {
94+
for retries := 0; retries < maxRetries; retries++ {
9095
if err := pipeReader.Close(); err != nil {
9196
log.Printf("error closing pipe reader: %v, trying again in 10ms\n", err)
92-
time.Sleep(time.Millisecond * 10)
97+
time.Sleep(retryDelay)
9398
continue
9499
}
95100
cancel(nil)
@@ -99,16 +104,13 @@ func TarCopyDest(ctx context.Context, cancel context.CancelCauseFunc, ch <-chan
99104
for {
100105
select {
101106
case <-ctx.Done():
102-
if ctx.Err() != nil {
103-
return context.Cause(ctx)
104-
}
105107
return nil
106108
default:
107109
next, err := tarReader.Next()
108110
if err != nil {
109111
// Do one more check for context error before returning
110112
if ctx.Err() != nil {
111-
return context.Cause(ctx)
113+
return nil
112114
}
113115
if errors.Is(err, io.EOF) {
114116
return nil

0 commit comments

Comments
 (0)