Skip to content

Commit 1548c8a

Browse files
committed
bitswap: clear wantlists when GetBlocks calls are cancelled
License: MIT Signed-off-by: Jeromy <[email protected]>
1 parent feb653b commit 1548c8a

File tree

11 files changed

+117
-89
lines changed

11 files changed

+117
-89
lines changed

exchange/bitswap/bitswap.go

+43-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
2323
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
2424
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications"
25-
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
2625
flags "github.com/ipfs/go-ipfs/flags"
2726
"github.com/ipfs/go-ipfs/thirdparty/delay"
2827
loggables "github.com/ipfs/go-ipfs/thirdparty/loggables"
@@ -88,7 +87,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
8887
notifications: notif,
8988
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
9089
network: network,
91-
findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
90+
findKeys: make(chan *blockRequest, sizeBatchRequestChan),
9291
process: px,
9392
newBlocks: make(chan blocks.Block, HasBlockBufferSize),
9493
provideKeys: make(chan key.Key, provideKeysBufferSize),
@@ -131,7 +130,7 @@ type Bitswap struct {
131130
notifications notifications.PubSub
132131

133132
// send keys to a worker to find and connect to providers for them
134-
findKeys chan *wantlist.Entry
133+
findKeys chan *blockRequest
135134

136135
engine *decision.Engine
137136

@@ -148,8 +147,8 @@ type Bitswap struct {
148147
}
149148

150149
type blockRequest struct {
151-
key key.Key
152-
ctx context.Context
150+
Key key.Key
151+
Ctx context.Context
153152
}
154153

155154
// GetBlock attempts to retrieve a particular block from peers within the
@@ -235,13 +234,50 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan blocks
235234
// NB: Optimization. Assumes that providers of key[0] are likely to
236235
// be able to provide for all keys. This currently holds true in most
237236
// every situation. Later, this assumption may not hold as true.
238-
req := &wantlist.Entry{
237+
req := &blockRequest{
239238
Key: keys[0],
240239
Ctx: ctx,
241240
}
241+
242+
remaining := make(map[key.Key]struct{})
243+
for _, k := range keys {
244+
remaining[k] = struct{}{}
245+
}
246+
247+
out := make(chan blocks.Block)
248+
go func() {
249+
ctx, cancel := context.WithCancel(ctx)
250+
defer cancel()
251+
defer close(out)
252+
defer func() {
253+
var toCancel []key.Key
254+
for k, _ := range remaining {
255+
toCancel = append(toCancel, k)
256+
}
257+
bs.CancelWants(toCancel)
258+
}()
259+
for {
260+
select {
261+
case blk, ok := <-promise:
262+
if !ok {
263+
return
264+
}
265+
266+
delete(remaining, blk.Key())
267+
select {
268+
case out <- blk:
269+
case <-ctx.Done():
270+
return
271+
}
272+
case <-ctx.Done():
273+
return
274+
}
275+
}
276+
}()
277+
242278
select {
243279
case bs.findKeys <- req:
244-
return promise, nil
280+
return out, nil
245281
case <-ctx.Done():
246282
return nil, ctx.Err()
247283
}

exchange/bitswap/decision/bench_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
2121
}
2222
b.ResetTimer()
2323
for i := 0; i < b.N; i++ {
24-
q.Push(wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
24+
q.Push(&wantlist.Entry{Key: key.Key(i), Priority: math.MaxInt32}, peers[i%len(peers)])
2525
}
2626
}

exchange/bitswap/decision/engine.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
104104
return e
105105
}
106106

107-
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
107+
func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
108108
e.lock.Lock()
109109
partner, ok := e.ledgerMap[p]
110110
if ok {
@@ -218,7 +218,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
218218

219219
for _, entry := range m.Wantlist() {
220220
if entry.Cancel {
221-
log.Debugf("cancel %s", entry.Key)
221+
log.Debugf("%s cancel %s", p, entry.Key)
222222
l.CancelWant(entry.Key)
223223
e.peerRequestQueue.Remove(entry.Key, p)
224224
} else {

exchange/bitswap/decision/ledger.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func (l *ledger) CancelWant(k key.Key) {
7979
l.wantList.Remove(k)
8080
}
8181

82-
func (l *ledger) WantListContains(k key.Key) (wl.Entry, bool) {
82+
func (l *ledger) WantListContains(k key.Key) (*wl.Entry, bool) {
8383
return l.wantList.Contains(k)
8484
}
8585

exchange/bitswap/decision/peer_request_queue.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
type peerRequestQueue interface {
1414
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
1515
Pop() *peerRequestTask
16-
Push(entry wantlist.Entry, to peer.ID)
16+
Push(entry *wantlist.Entry, to peer.ID)
1717
Remove(k key.Key, p peer.ID)
1818

1919
// NB: cannot expose simply expose taskQueue.Len because trashed elements
@@ -45,7 +45,7 @@ type prq struct {
4545
}
4646

4747
// Push currently adds a new peerRequestTask to the end of the list
48-
func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
48+
func (tl *prq) Push(entry *wantlist.Entry, to peer.ID) {
4949
tl.lock.Lock()
5050
defer tl.lock.Unlock()
5151
partner, ok := tl.partners[to]
@@ -166,7 +166,7 @@ func (tl *prq) thawRound() {
166166
}
167167

168168
type peerRequestTask struct {
169-
Entry wantlist.Entry
169+
Entry *wantlist.Entry
170170
Target peer.ID
171171

172172
// A callback to signal that this task has been completed

exchange/bitswap/decision/peer_request_queue_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestPushPop(t *testing.T) {
4141
for _, index := range rand.Perm(len(alphabet)) { // add blocks for all letters
4242
letter := alphabet[index]
4343
t.Log(partner.String())
44-
prq.Push(wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
44+
prq.Push(&wantlist.Entry{Key: key.Key(letter), Priority: math.MaxInt32 - index}, partner)
4545
}
4646
for _, consonant := range consonants {
4747
prq.Remove(key.Key(consonant), partner)
@@ -78,10 +78,10 @@ func TestPeerRepeats(t *testing.T) {
7878
// Have each push some blocks
7979

8080
for i := 0; i < 5; i++ {
81-
prq.Push(wantlist.Entry{Key: key.Key(i)}, a)
82-
prq.Push(wantlist.Entry{Key: key.Key(i)}, b)
83-
prq.Push(wantlist.Entry{Key: key.Key(i)}, c)
84-
prq.Push(wantlist.Entry{Key: key.Key(i)}, d)
81+
prq.Push(&wantlist.Entry{Key: key.Key(i)}, a)
82+
prq.Push(&wantlist.Entry{Key: key.Key(i)}, b)
83+
prq.Push(&wantlist.Entry{Key: key.Key(i)}, c)
84+
prq.Push(&wantlist.Entry{Key: key.Key(i)}, d)
8585
}
8686

8787
// now, pop off four entries, there should be one from each

exchange/bitswap/message/message.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func newMsg(full bool) *impl {
6464
}
6565

6666
type Entry struct {
67-
wantlist.Entry
67+
*wantlist.Entry
6868
Cancel bool
6969
}
7070

@@ -120,7 +120,7 @@ func (m *impl) addEntry(k key.Key, priority int, cancel bool) {
120120
e.Cancel = cancel
121121
} else {
122122
m.wantlist[k] = Entry{
123-
Entry: wantlist.Entry{
123+
Entry: &wantlist.Entry{
124124
Key: k,
125125
Priority: priority,
126126
},

exchange/bitswap/wantlist/wantlist.go

+29-33
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"sync"
88

99
key "github.com/ipfs/go-ipfs/blocks/key"
10-
11-
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
1210
)
1311

1412
type ThreadSafe struct {
@@ -18,19 +16,17 @@ type ThreadSafe struct {
1816

1917
// not threadsafe
2018
type Wantlist struct {
21-
set map[key.Key]Entry
19+
set map[key.Key]*Entry
2220
}
2321

2422
type Entry struct {
2523
Key key.Key
2624
Priority int
2725

28-
Ctx context.Context
29-
cancel func()
3026
RefCnt int
3127
}
3228

33-
type entrySlice []Entry
29+
type entrySlice []*Entry
3430

3531
func (es entrySlice) Len() int { return len(es) }
3632
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
@@ -44,41 +40,41 @@ func NewThreadSafe() *ThreadSafe {
4440

4541
func New() *Wantlist {
4642
return &Wantlist{
47-
set: make(map[key.Key]Entry),
43+
set: make(map[key.Key]*Entry),
4844
}
4945
}
5046

51-
func (w *ThreadSafe) Add(k key.Key, priority int) {
47+
func (w *ThreadSafe) Add(k key.Key, priority int) bool {
5248
w.lk.Lock()
5349
defer w.lk.Unlock()
54-
w.Wantlist.Add(k, priority)
50+
return w.Wantlist.Add(k, priority)
5551
}
5652

57-
func (w *ThreadSafe) AddEntry(e Entry) {
53+
func (w *ThreadSafe) AddEntry(e *Entry) bool {
5854
w.lk.Lock()
5955
defer w.lk.Unlock()
60-
w.Wantlist.AddEntry(e)
56+
return w.Wantlist.AddEntry(e)
6157
}
6258

63-
func (w *ThreadSafe) Remove(k key.Key) {
59+
func (w *ThreadSafe) Remove(k key.Key) bool {
6460
w.lk.Lock()
6561
defer w.lk.Unlock()
66-
w.Wantlist.Remove(k)
62+
return w.Wantlist.Remove(k)
6763
}
6864

69-
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
65+
func (w *ThreadSafe) Contains(k key.Key) (*Entry, bool) {
7066
w.lk.RLock()
7167
defer w.lk.RUnlock()
7268
return w.Wantlist.Contains(k)
7369
}
7470

75-
func (w *ThreadSafe) Entries() []Entry {
71+
func (w *ThreadSafe) Entries() []*Entry {
7672
w.lk.RLock()
7773
defer w.lk.RUnlock()
7874
return w.Wantlist.Entries()
7975
}
8076

81-
func (w *ThreadSafe) SortedEntries() []Entry {
77+
func (w *ThreadSafe) SortedEntries() []*Entry {
8278
w.lk.RLock()
8379
defer w.lk.RUnlock()
8480
return w.Wantlist.SortedEntries()
@@ -94,58 +90,58 @@ func (w *Wantlist) Len() int {
9490
return len(w.set)
9591
}
9692

97-
func (w *Wantlist) Add(k key.Key, priority int) {
93+
func (w *Wantlist) Add(k key.Key, priority int) bool {
9894
if e, ok := w.set[k]; ok {
9995
e.RefCnt++
100-
return
96+
return false
10197
}
10298

103-
ctx, cancel := context.WithCancel(context.Background())
104-
w.set[k] = Entry{
99+
w.set[k] = &Entry{
105100
Key: k,
106101
Priority: priority,
107-
Ctx: ctx,
108-
cancel: cancel,
109102
RefCnt: 1,
110103
}
104+
105+
return true
111106
}
112107

113-
func (w *Wantlist) AddEntry(e Entry) {
114-
if _, ok := w.set[e.Key]; ok {
115-
return
108+
func (w *Wantlist) AddEntry(e *Entry) bool {
109+
if ex, ok := w.set[e.Key]; ok {
110+
ex.RefCnt++
111+
return false
116112
}
117113
w.set[e.Key] = e
114+
return true
118115
}
119116

120-
func (w *Wantlist) Remove(k key.Key) {
117+
func (w *Wantlist) Remove(k key.Key) bool {
121118
e, ok := w.set[k]
122119
if !ok {
123-
return
120+
return false
124121
}
125122

126123
e.RefCnt--
127124
if e.RefCnt <= 0 {
128125
delete(w.set, k)
129-
if e.cancel != nil {
130-
e.cancel()
131-
}
126+
return true
132127
}
128+
return false
133129
}
134130

135-
func (w *Wantlist) Contains(k key.Key) (Entry, bool) {
131+
func (w *Wantlist) Contains(k key.Key) (*Entry, bool) {
136132
e, ok := w.set[k]
137133
return e, ok
138134
}
139135

140-
func (w *Wantlist) Entries() []Entry {
136+
func (w *Wantlist) Entries() []*Entry {
141137
var es entrySlice
142138
for _, e := range w.set {
143139
es = append(es, e)
144140
}
145141
return es
146142
}
147143

148-
func (w *Wantlist) SortedEntries() []Entry {
144+
func (w *Wantlist) SortedEntries() []*Entry {
149145
var es entrySlice
150146
for _, e := range w.set {
151147
es = append(es, e)

0 commit comments

Comments
 (0)