Skip to content

Commit 3191540

Browse files
author
nmalhotra
committed
fix/32/pr-ports : Pulled changes from PR 4517
Pulled changes from ipfs/kubo#4517, on top of, ipfs#45. Change added to unblock the `waitPub()` call. With the elimination of stateSync cause a `updateChildEntry` to happen for `stateFlushed` as well, causing it to propogate upwards to the parent(s) [fullSync] and force a publish to happen, hence unblocking `waitPub`.
1 parent 1bbc52d commit 3191540

File tree

6 files changed

+138
-115
lines changed

6 files changed

+138
-115
lines changed

fd.go

+94-74
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,16 @@ import (
77
mod "github.com/ipfs/go-unixfs/mod"
88

99
context "context"
10+
11+
ipld "github.com/ipfs/go-ipld-format"
12+
)
13+
14+
type state uint8
15+
16+
const (
17+
stateFlushed state = iota
18+
stateDirty
19+
stateClosed
1020
)
1121

1222
// One `File` can have many `FileDescriptor`s associated to it
@@ -31,14 +41,31 @@ type FileDescriptor interface {
3141
}
3242

3343
type fileDescriptor struct {
34-
inode *File
35-
mod *mod.DagModifier
36-
perms int
37-
sync bool
38-
hasChanges bool
39-
40-
// TODO: Where is this variable set?
41-
closed bool
44+
inode *File
45+
mod *mod.DagModifier
46+
flags Flags
47+
48+
state state
49+
}
50+
51+
func (fi *fileDescriptor) checkWrite() error {
52+
if fi.state == stateClosed {
53+
return ErrClosed
54+
}
55+
if !fi.flags.Write {
56+
return fmt.Errorf("file is read-only")
57+
}
58+
return nil
59+
}
60+
61+
func (fi *fileDescriptor) checkRead() error {
62+
if fi.state == stateClosed {
63+
return ErrClosed
64+
}
65+
if !fi.flags.Read {
66+
return fmt.Errorf("file is write-only")
67+
}
68+
return nil
4269
}
4370

4471
// Size returns the size of the file referred to by this descriptor
@@ -48,69 +75,52 @@ func (fi *fileDescriptor) Size() (int64, error) {
4875

4976
// Truncate truncates the file to size
5077
func (fi *fileDescriptor) Truncate(size int64) error {
51-
if fi.perms == OpenReadOnly {
52-
return fmt.Errorf("cannot call truncate on readonly file descriptor")
78+
if err := fi.checkWrite(); err != nil {
79+
return fmt.Errorf("truncate failed: %s", err)
5380
}
54-
fi.hasChanges = true
81+
fi.state = stateDirty
5582
return fi.mod.Truncate(size)
5683
}
5784

5885
// Write writes the given data to the file at its current offset
5986
func (fi *fileDescriptor) Write(b []byte) (int, error) {
60-
if fi.perms == OpenReadOnly {
61-
return 0, fmt.Errorf("cannot write on not writeable descriptor")
87+
if err := fi.checkWrite(); err != nil {
88+
return 0, fmt.Errorf("write failed: %s", err)
6289
}
63-
fi.hasChanges = true
90+
fi.state = stateDirty
6491
return fi.mod.Write(b)
6592
}
6693

6794
// Read reads into the given buffer from the current offset
6895
func (fi *fileDescriptor) Read(b []byte) (int, error) {
69-
if fi.perms == OpenWriteOnly {
70-
return 0, fmt.Errorf("cannot read on write-only descriptor")
96+
if err := fi.checkRead(); err != nil {
97+
return 0, fmt.Errorf("read failed: %s", err)
7198
}
7299
return fi.mod.Read(b)
73100
}
74101

75102
// Read reads into the given buffer from the current offset
76103
func (fi *fileDescriptor) CtxReadFull(ctx context.Context, b []byte) (int, error) {
77-
if fi.perms == OpenWriteOnly {
78-
return 0, fmt.Errorf("cannot read on write-only descriptor")
104+
if err := fi.checkRead(); err != nil {
105+
return 0, fmt.Errorf("read failed: %s", err)
79106
}
80107
return fi.mod.CtxReadFull(ctx, b)
81108
}
82109

83110
// Close flushes, then propogates the modified dag node up the directory structure
84111
// and signals a republish to occur
85112
func (fi *fileDescriptor) Close() error {
86-
defer func() {
87-
switch fi.perms {
88-
case OpenReadOnly:
89-
fi.inode.desclock.RUnlock()
90-
case OpenWriteOnly, OpenReadWrite:
91-
fi.inode.desclock.Unlock()
92-
}
93-
// TODO: `closed` should be set here.
94-
}()
95-
96-
if fi.closed {
97-
panic("attempted to close file descriptor twice!")
113+
if fi.state == stateClosed {
114+
return ErrClosed
98115
}
99-
100-
if fi.hasChanges {
101-
err := fi.mod.Sync()
102-
if err != nil {
103-
return err
104-
}
105-
106-
fi.hasChanges = false
107-
108-
// explicitly stay locked for flushUp call,
109-
// it will manage the lock for us
110-
return fi.flushUp(fi.sync)
116+
if fi.flags.Write {
117+
defer fi.inode.desclock.Unlock()
118+
} else if fi.flags.Read {
119+
defer fi.inode.desclock.RUnlock()
111120
}
112-
113-
return nil
121+
err := fi.flushUp(fi.flags.Sync)
122+
fi.state = stateClosed
123+
return err
114124
}
115125

116126
// Flush generates a new version of the node of the underlying
@@ -126,47 +136,57 @@ func (fi *fileDescriptor) Flush() error {
126136
// If `fullSync` is set the changes are propagated upwards
127137
// (the `Up` part of `flushUp`).
128138
func (fi *fileDescriptor) flushUp(fullSync bool) error {
129-
nd, err := fi.mod.GetNode()
130-
if err != nil {
131-
return err
132-
}
139+
var nd ipld.Node
140+
switch fi.state {
141+
case stateDirty:
142+
// calls mod.Sync internally.
143+
var err error
144+
nd, err = fi.mod.GetNode()
145+
if err != nil {
146+
return err
147+
}
148+
err = fi.inode.dagService.Add(context.TODO(), nd)
149+
if err != nil {
150+
return err
151+
}
152+
fi.inode.nodeLock.Lock()
153+
fi.inode.node = nd
154+
fi.inode.nodeLock.Unlock()
155+
fallthrough
156+
case stateFlushed:
157+
if !fullSync {
158+
return nil
159+
}
133160

134-
err = fi.inode.dagService.Add(context.TODO(), nd)
135-
if err != nil {
136-
return err
137-
}
138-
// TODO: Very similar logic to the update process in
139-
// `Directory`, the logic should be unified, both structures
140-
// (`File` and `Directory`) are backed by a IPLD node with
141-
// a UnixFS format that is the actual target of the update
142-
// (regenerating it and adding it to the DAG service).
143-
144-
fi.inode.nodeLock.Lock()
145-
fi.inode.node = nd
146-
// TODO: Create a `SetNode` method.
147-
name := fi.inode.name
148-
parent := fi.inode.parent
149-
// TODO: Can the parent be modified? Do we need to do this inside the lock?
150-
fi.inode.nodeLock.Unlock()
151-
// TODO: Maybe all this logic should happen in `File`.
152-
153-
if fullSync {
154-
return parent.updateChildEntry(child{name, nd})
155-
}
161+
fi.inode.nodeLock.Lock()
162+
nd = fi.inode.node
163+
parent := fi.inode.parent
164+
name := fi.inode.name
165+
fi.inode.nodeLock.Unlock()
156166

157-
return nil
167+
if err := parent.updateChildEntry(child{name, nd}); err != nil {
168+
return err
169+
}
170+
fi.state = stateFlushed
171+
return nil
172+
default:
173+
panic("invalid state")
174+
}
158175
}
159176

160177
// Seek implements io.Seeker
161178
func (fi *fileDescriptor) Seek(offset int64, whence int) (int64, error) {
179+
if fi.state == stateClosed {
180+
return 0, fmt.Errorf("seek failed: %s", ErrClosed)
181+
}
162182
return fi.mod.Seek(offset, whence)
163183
}
164184

165185
// Write At writes the given bytes at the offset 'at'
166186
func (fi *fileDescriptor) WriteAt(b []byte, at int64) (int, error) {
167-
if fi.perms == OpenReadOnly {
168-
return 0, fmt.Errorf("cannot write on not writeable descriptor")
187+
if err := fi.checkWrite(); err != nil {
188+
return 0, fmt.Errorf("write-at failed: %s", err)
169189
}
170-
fi.hasChanges = true
190+
fi.state = stateDirty
171191
return fi.mod.WriteAt(b, at)
172192
}

file.go

+21-20
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type File struct {
2626
// entire DAG of nodes that comprise the file.
2727
// TODO: Rename, there should be an explicit term for these root nodes
2828
// of a particular sub-DAG that abstract an upper layer's entity.
29-
node ipld.Node
29+
node ipld.Node
3030

3131
// Lock around the `node` that represents this file, necessary because
3232
// there may be many `FileDescriptor`s operating on this `File`.
@@ -52,13 +52,25 @@ func NewFile(name string, node ipld.Node, parent parent, dserv ipld.DAGService)
5252
return fi, nil
5353
}
5454

55-
const (
56-
OpenReadOnly = iota
57-
OpenWriteOnly
58-
OpenReadWrite
59-
)
55+
func (fi *File) Open(flags Flags) (_ FileDescriptor, _retErr error) {
56+
if flags.Write {
57+
fi.desclock.Lock()
58+
defer func() {
59+
if _retErr != nil {
60+
fi.desclock.Unlock()
61+
}
62+
}()
63+
} else if flags.Read {
64+
fi.desclock.RLock()
65+
defer func() {
66+
if _retErr != nil {
67+
fi.desclock.Unlock()
68+
}
69+
}()
70+
} else {
71+
return nil, fmt.Errorf("file opened for neither reading nor writing")
72+
}
6073

61-
func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
6274
fi.nodeLock.RLock()
6375
node := fi.node
6476
fi.nodeLock.RUnlock()
@@ -86,16 +98,6 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
8698
// Ok as well.
8799
}
88100

89-
switch flags {
90-
case OpenReadOnly:
91-
fi.desclock.RLock()
92-
case OpenWriteOnly, OpenReadWrite:
93-
fi.desclock.Lock()
94-
default:
95-
// TODO: support other modes
96-
return nil, fmt.Errorf("mode not supported")
97-
}
98-
99101
dmod, err := mod.NewDagModifier(context.TODO(), node, fi.dagService, chunker.DefaultSplitter)
100102
// TODO: Remove the use of the `chunker` package here, add a new `NewDagModifier` in
101103
// `go-unixfs` with the `DefaultSplitter` already included.
@@ -106,8 +108,7 @@ func (fi *File) Open(flags int, sync bool) (FileDescriptor, error) {
106108

107109
return &fileDescriptor{
108110
inode: fi,
109-
perms: flags,
110-
sync: sync,
111+
flags: flags,
111112
mod: dmod,
112113
}, nil
113114
}
@@ -153,7 +154,7 @@ func (fi *File) GetNode() (ipld.Node, error) {
153154
// a file without flushing?)
154155
func (fi *File) Flush() error {
155156
// open the file in fullsync mode
156-
fd, err := fi.Open(OpenWriteOnly, true)
157+
fd, err := fi.Open(Flags{Write: true, Sync: true})
157158
if err != nil {
158159
return err
159160
}

0 commit comments

Comments
 (0)