Skip to content

Commit 3485489

Browse files
committed
Add wavefs tar stream
1 parent 2db3666 commit 3485489

File tree

3 files changed

+147
-6
lines changed

3 files changed

+147
-6
lines changed

pkg/remote/fileshare/wavefs/wavefs.go

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,22 @@
44
package wavefs
55

66
import (
7+
"archive/tar"
78
"context"
89
"encoding/base64"
910
"errors"
1011
"fmt"
1112
"io/fs"
1213
"path"
1314
"strings"
15+
"time"
1416

1517
"github.com/wavetermdev/waveterm/pkg/filestore"
1618
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
1719
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fstype"
20+
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
1821
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
22+
"github.com/wavetermdev/waveterm/pkg/util/tarcopy"
1923
"github.com/wavetermdev/waveterm/pkg/util/wavefileutil"
2024
"github.com/wavetermdev/waveterm/pkg/waveobj"
2125
"github.com/wavetermdev/waveterm/pkg/wps"
@@ -97,7 +101,54 @@ func (c WaveClient) Read(ctx context.Context, conn *connparse.Connection, data w
97101
}
98102

99103
func (c WaveClient) ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet] {
100-
return nil
104+
pathPrefix, err := cleanPath(conn.Path)
105+
if err != nil {
106+
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error cleaning path: %w", err))
107+
}
108+
list, err := c.ListEntries(ctx, conn, nil)
109+
if err != nil {
110+
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error listing blockfiles: %w", err))
111+
}
112+
113+
timeout := time.Millisecond * 100
114+
if opts.Timeout > 0 {
115+
timeout = time.Duration(opts.Timeout) * time.Millisecond
116+
}
117+
readerCtx, cancel := context.WithTimeout(context.Background(), timeout)
118+
rtn, writeHeader, fileWriter, tarClose := tarcopy.TarCopySrc(readerCtx, wshrpc.FileChunkSize, pathPrefix)
119+
120+
go func() {
121+
defer func() {
122+
tarClose()
123+
cancel()
124+
}()
125+
for _, file := range list {
126+
if readerCtx.Err() != nil {
127+
rtn <- wshutil.RespErr[iochantypes.Packet](readerCtx.Err())
128+
return
129+
}
130+
131+
if err = writeHeader(fileutil.ToFsFileInfo(file), file.Path); err != nil {
132+
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error writing tar header: %w", err))
133+
return
134+
}
135+
if file.IsDir {
136+
continue
137+
}
138+
139+
_, dataBuf, err := filestore.WFS.ReadFile(ctx, conn.Host, file.Path)
140+
if err != nil {
141+
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error reading blockfile: %w", err))
142+
return
143+
}
144+
if _, err = fileWriter.Write(dataBuf); err != nil {
145+
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error writing tar data: %w", err))
146+
return
147+
}
148+
}
149+
}()
150+
151+
return rtn
101152
}
102153

103154
func (c WaveClient) ListEntriesStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) <-chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData] {
@@ -377,6 +428,50 @@ func (c WaveClient) CopyInternal(ctx context.Context, srcConn, destConn *connpar
377428
}
378429

379430
func (c WaveClient) CopyRemote(ctx context.Context, srcConn, destConn *connparse.Connection, srcClient fstype.FileShareClient, opts *wshrpc.FileCopyOpts) error {
431+
zoneId := destConn.Host
432+
if zoneId == "" {
433+
return fmt.Errorf("zoneid not found in connection")
434+
}
435+
readCtx, cancel := context.WithCancelCause(ctx)
436+
ioch := srcClient.ReadTarStream(readCtx, srcConn, opts)
437+
err := tarcopy.TarCopyDest(readCtx, cancel, ioch, func(next *tar.Header, reader *tar.Reader) error {
438+
if next.Typeflag == tar.TypeDir {
439+
return nil
440+
}
441+
fileName, err := cleanPath(path.Join(destConn.Path, next.Name))
442+
_, err = filestore.WFS.Stat(ctx, zoneId, fileName)
443+
if err != nil {
444+
if !errors.Is(err, fs.ErrNotExist) {
445+
return fmt.Errorf("error getting blockfile info: %w", err)
446+
}
447+
err := filestore.WFS.MakeFile(ctx, zoneId, fileName, nil, wshrpc.FileOpts{})
448+
if err != nil {
449+
return fmt.Errorf("error making blockfile: %w", err)
450+
}
451+
}
452+
dataBuf := make([]byte, next.Size)
453+
n, err := reader.Read(dataBuf)
454+
if err != nil {
455+
return fmt.Errorf("error reading tar data: %w", err)
456+
}
457+
err = filestore.WFS.WriteFile(ctx, zoneId, fileName, dataBuf[:n])
458+
if err != nil {
459+
return fmt.Errorf("error writing to blockfile: %w", err)
460+
}
461+
wps.Broker.Publish(wps.WaveEvent{
462+
Event: wps.Event_BlockFile,
463+
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, zoneId).String()},
464+
Data: &wps.WSFileEventData{
465+
ZoneId: zoneId,
466+
FileName: fileName,
467+
FileOp: wps.FileOp_Invalidate,
468+
},
469+
})
470+
return nil
471+
})
472+
if err != nil {
473+
return fmt.Errorf("error copying tar stream: %w", err)
474+
}
380475
return nil
381476
}
382477

pkg/util/fileutil/fileutil.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import (
1313
"path/filepath"
1414
"regexp"
1515
"strings"
16+
"time"
1617

1718
"github.com/wavetermdev/waveterm/pkg/wavebase"
19+
"github.com/wavetermdev/waveterm/pkg/wshrpc"
1820
)
1921

2022
func FixPath(path string) (string, error) {
@@ -164,3 +166,47 @@ func IsInitScriptPath(input string) bool {
164166

165167
return true
166168
}
169+
170+
type FsFileInfo struct {
171+
NameInternal string
172+
ModeInternal os.FileMode
173+
SizeInternal int64
174+
ModTimeInternal int64
175+
IsDirInternal bool
176+
}
177+
178+
func (f FsFileInfo) Name() string {
179+
return f.NameInternal
180+
}
181+
182+
func (f FsFileInfo) Size() int64 {
183+
return f.SizeInternal
184+
}
185+
186+
func (f FsFileInfo) Mode() os.FileMode {
187+
return f.ModeInternal
188+
}
189+
190+
func (f FsFileInfo) ModTime() time.Time {
191+
return time.Unix(0, f.ModTimeInternal)
192+
}
193+
194+
func (f FsFileInfo) IsDir() bool {
195+
return f.IsDirInternal
196+
}
197+
198+
func (f FsFileInfo) Sys() interface{} {
199+
return nil
200+
}
201+
202+
var _ fs.FileInfo = FsFileInfo{}
203+
204+
func ToFsFileInfo(fi *wshrpc.FileInfo) FsFileInfo {
205+
return FsFileInfo{
206+
NameInternal: fi.Name,
207+
ModeInternal: fi.Mode,
208+
SizeInternal: fi.Size,
209+
ModTimeInternal: fi.ModTime,
210+
IsDirInternal: fi.IsDir,
211+
}
212+
}

pkg/util/iochan/iochan.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func ReaderChan(ctx context.Context, r io.Reader, chunkSize int64, callback func
5656
}
5757

5858
// WriterChan reads from a channel and writes the data to an io.Writer
59-
func WriterChan(ctx context.Context, w io.Writer, ch <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet], callback func(), errCallback func(error)) {
59+
func WriterChan(ctx context.Context, w io.Writer, ch <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet], callback func(), cancel context.CancelCauseFunc) {
6060
sha256Hash := sha256.New()
6161
go func() {
6262
defer func() {
@@ -72,23 +72,23 @@ func WriterChan(ctx context.Context, w io.Writer, ch <-chan wshrpc.RespOrErrorUn
7272
return
7373
}
7474
if resp.Error != nil {
75-
errCallback(resp.Error)
75+
cancel(resp.Error)
7676
return
7777
}
7878
if _, err := sha256Hash.Write(resp.Response.Data); err != nil {
79-
errCallback(fmt.Errorf("WriterChan: error writing to sha256 hash: %v", err))
79+
cancel(fmt.Errorf("WriterChan: error writing to sha256 hash: %v", err))
8080
return
8181
}
8282
// The checksum is sent as the last packet
8383
if resp.Response.Checksum != nil {
8484
localChecksum := sha256Hash.Sum(nil)
8585
if !bytes.Equal(localChecksum, resp.Response.Checksum) {
86-
errCallback(fmt.Errorf("WriterChan: checksum mismatch"))
86+
cancel(fmt.Errorf("WriterChan: checksum mismatch"))
8787
}
8888
return
8989
}
9090
if _, err := w.Write(resp.Response.Data); err != nil {
91-
errCallback(fmt.Errorf("WriterChan: write error: %v", err))
91+
cancel(fmt.Errorf("WriterChan: write error: %v", err))
9292
return
9393
}
9494
}

0 commit comments

Comments
 (0)