Skip to content

Commit d36bcd5

Browse files
javfggmgigi96
andauthored
Ceph driver fixes (#4200)
* Do not cd to user home if user homes are disabled * Simplify uploads * Stat shadow folders before creating them * Clean up chunked upload * Fix panic on shutdown * Changelog * Check type before cache eviction Co-authored-by: Gianmaria Del Monte <[email protected]> * Continue on stat permission denied --------- Co-authored-by: Gianmaria Del Monte <[email protected]>
1 parent 6161ac3 commit d36bcd5

File tree

6 files changed

+41
-455
lines changed

6 files changed

+41
-455
lines changed
+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Enhancement: Multiple fixes for Ceph driver
2+
3+
* Avoid usage/creation of user homes when they are disabled in the config
4+
* Simplify the regular uploads (not chunked)
5+
* Avoid creation of shadow folders at the root if they are already there
6+
* Clean up the chunked upload
7+
* Fix panic on shutdown
8+
9+
https://github.com/cs3org/reva/pull/4200

pkg/storage/fs/cephfs/cephfs.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func init() {
6262
registry.Register("cephfs", New)
6363
}
6464

65-
// New returns an implementation to of the storage.FS interface that talk to
65+
// New returns an implementation to of the storage.FS interface that talks to
6666
// a ceph filesystem.
6767
func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err error) {
6868
var o Options
@@ -81,9 +81,12 @@ func New(ctx context.Context, m map[string]interface{}) (fs storage.FS, err erro
8181
}
8282

8383
for _, dir := range []string{o.ShadowFolder, o.UploadFolder} {
84-
err = adminConn.adminMount.MakeDir(dir, dirPermFull)
85-
if err != nil && err.Error() != errFileExists {
86-
return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error())
84+
_, err := adminConn.adminMount.Statx(dir, cephfs2.StatxMask(cephfs2.StatxIno), 0)
85+
if err != nil {
86+
err = adminConn.adminMount.MakeDir(dir, dirPermFull)
87+
if err != nil && err.Error() != errFileExists {
88+
return nil, errors.New("cephfs: can't initialise system dir " + dir + ":" + err.Error())
89+
}
8790
}
8891
}
8992

pkg/storage/fs/cephfs/chunking.go

+1-121
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,7 @@ func (c *ChunkHandler) getChunkFolderName(i *ChunkBLOBInfo) (path string, err er
108108
}
109109

110110
func (c *ChunkHandler) saveChunk(path string, r io.ReadCloser) (finish bool, chunk string, err error) {
111-
var chunkInfo *ChunkBLOBInfo
112-
113-
chunkInfo, err = GetChunkBLOBInfo(path)
111+
chunkInfo, err := GetChunkBLOBInfo(path)
114112
if err != nil {
115113
err = fmt.Errorf("error getting chunk info from path: %s", path)
116114
return
@@ -223,122 +221,4 @@ func (c *ChunkHandler) WriteChunk(fn string, r io.ReadCloser) (string, string, e
223221
}
224222

225223
return chunkInfo.Path, chunk, nil
226-
227-
// TODO(labkode): implement old chunking
228-
229-
/*
230-
req2 := &provider.StartWriteSessionRequest{}
231-
res2, err := client.StartWriteSession(ctx, req2)
232-
if err != nil {
233-
logger.Error(ctx, err)
234-
w.WriteHeader(http.StatusInternalServerError)
235-
return
236-
}
237-
238-
if res2.Status.Code != rpc.Code_CODE_OK {
239-
logger.Println(ctx, res2.Status)
240-
w.WriteHeader(http.StatusInternalServerError)
241-
return
242-
}
243-
244-
sessID := res2.SessionId
245-
logger.Build().Str("sessID", sessID).Msg(ctx, "got write session id")
246-
247-
stream, err := client.Write(ctx)
248-
if err != nil {
249-
logger.Error(ctx, err)
250-
w.WriteHeader(http.StatusInternalServerError)
251-
return
252-
}
253-
254-
buffer := make([]byte, 1024*1024*3)
255-
var offset uint64
256-
var numChunks uint64
257-
258-
for {
259-
n, err := fd.Read(buffer)
260-
if n > 0 {
261-
req := &provider.WriteRequest{Data: buffer, Length: uint64(n), SessionId: sessID, Offset: offset}
262-
err = stream.Send(req)
263-
if err != nil {
264-
logger.Error(ctx, err)
265-
w.WriteHeader(http.StatusInternalServerError)
266-
return
267-
}
268-
269-
numChunks++
270-
offset += uint64(n)
271-
}
272-
273-
if err == io.EOF {
274-
break
275-
}
276-
277-
if err != nil {
278-
logger.Error(ctx, err)
279-
w.WriteHeader(http.StatusInternalServerError)
280-
return
281-
}
282-
}
283-
284-
res3, err := stream.CloseAndRecv()
285-
if err != nil {
286-
logger.Error(ctx, err)
287-
w.WriteHeader(http.StatusInternalServerError)
288-
return
289-
}
290-
291-
if res3.Status.Code != rpc.Code_CODE_OK {
292-
logger.Println(ctx, err)
293-
w.WriteHeader(http.StatusInternalServerError)
294-
return
295-
}
296-
297-
req4 := &provider.FinishWriteSessionRequest{Filename: chunkInfo.path, SessionId: sessID}
298-
res4, err := client.FinishWriteSession(ctx, req4)
299-
if err != nil {
300-
logger.Error(ctx, err)
301-
w.WriteHeader(http.StatusInternalServerError)
302-
return
303-
}
304-
305-
if res4.Status.Code != rpc.Code_CODE_OK {
306-
logger.Println(ctx, res4.Status)
307-
w.WriteHeader(http.StatusInternalServerError)
308-
return
309-
}
310-
311-
req.Filename = chunkInfo.path
312-
res, err = client.Stat(ctx, req)
313-
if err != nil {
314-
logger.Error(ctx, err)
315-
w.WriteHeader(http.StatusInternalServerError)
316-
return
317-
}
318-
319-
if res.Status.Code != rpc.Code_CODE_OK {
320-
logger.Println(ctx, res.Status)
321-
w.WriteHeader(http.StatusInternalServerError)
322-
return
323-
}
324-
325-
md2 := res.Metadata
326-
327-
w.Header().Add("Content-Type", md2.Mime)
328-
w.Header().Set("ETag", md2.Etag)
329-
w.Header().Set("OC-FileId", md2.Id)
330-
w.Header().Set("OC-ETag", md2.Etag)
331-
t := time.Unix(int64(md2.Mtime), 0)
332-
lastModifiedString := t.Format(time.RFC1123Z)
333-
w.Header().Set("Last-Modified", lastModifiedString)
334-
w.Header().Set("X-OC-MTime", "accepted")
335-
336-
if md == nil {
337-
w.WriteHeader(http.StatusCreated)
338-
return
339-
}
340-
341-
w.WriteHeader(http.StatusNoContent)
342-
return
343-
*/
344224
}

pkg/storage/fs/cephfs/connections.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,11 @@ func newCache() (c *connections, err error) {
6363
MaxCost: usrLimit,
6464
BufferItems: 64,
6565
OnEvict: func(item *ristretto.Item) {
66-
v := item.Value.(cacheVal)
67-
v.perm.Destroy()
68-
_ = v.mount.Unmount()
69-
_ = v.mount.Release()
66+
if v, ok := item.Value.(*cacheVal); ok {
67+
v.perm.Destroy()
68+
_ = v.mount.Unmount()
69+
_ = v.mount.Release()
70+
}
7071
},
7172
})
7273
if err != nil {
@@ -212,7 +213,7 @@ func newConn(user *User) *cacheVal {
212213
return destroyCephConn(mount, perm)
213214
}
214215

215-
if user != nil {
216+
if user != nil && !user.fs.conf.DisableHome {
216217
if err = mount.ChangeDir(user.fs.conf.Root); err != nil {
217218
return destroyCephConn(mount, perm)
218219
}

0 commit comments

Comments
 (0)