Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit c47eb62

Browse files
committed
feat(sessions): use all of wantBudget
As soon as peers appear, consume all of the want budget
1 parent d414b10 commit c47eb62

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

session/session.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,16 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
341341
s.fetchcnt++
342342
s.notif.Publish(blk)
343343

344-
if next := s.tofetch.Pop(); next.Defined() {
345-
s.wantBlocks(ctx, []cid.Cid{next})
344+
toAdd := s.wantBudget()
345+
if toAdd > s.tofetch.Len() {
346+
toAdd = s.tofetch.Len()
347+
}
348+
if toAdd > 0 {
349+
var keys []cid.Cid
350+
for i := 0; i < toAdd; i++ {
351+
keys = append(keys, s.tofetch.Pop())
352+
}
353+
s.wantBlocks(ctx, keys)
346354
}
347355

348356
s.pastWants.Push(c)

session/session_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,11 @@ func TestSessionGetBlocks(t *testing.T) {
9999
receivedBlocks = append(receivedBlocks, receivedBlock)
100100
cancelBlock := <-cancelReqs
101101
newCancelReqs = append(newCancelReqs, cancelBlock)
102-
wantBlock := <-wantReqs
103-
newBlockReqs = append(newBlockReqs, wantBlock)
102+
select {
103+
case wantBlock := <-wantReqs:
104+
newBlockReqs = append(newBlockReqs, wantBlock)
105+
default:
106+
}
104107
}
105108

106109
// verify new peers were recorded
@@ -122,22 +125,22 @@ func TestSessionGetBlocks(t *testing.T) {
122125
t.Fatal("did not cancel each block once it was received")
123126
}
124127
// new session reqs should be targeted
125-
totalEnqueued := 0
128+
var newCidsRequested []cid.Cid
126129
for _, w := range newBlockReqs {
127130
if len(w.peers) == 0 {
128131
t.Fatal("should not have broadcast again after initial broadcast")
129132
}
130-
totalEnqueued += len(w.cids)
133+
newCidsRequested = append(newCidsRequested, w.cids...)
131134
}
132135

133136
// full new round of cids should be requested
134-
if totalEnqueued != broadcastLiveWantsLimit {
137+
if len(newCidsRequested) != broadcastLiveWantsLimit {
135138
t.Fatal("new blocks were not requested")
136139
}
137140

138141
// receive remaining blocks
139142
for i, p := range peers {
140-
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
143+
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
141144
receivedBlock := <-getBlocksCh
142145
receivedBlocks = append(receivedBlocks, receivedBlock)
143146
cancelBlock := <-cancelReqs
@@ -199,7 +202,7 @@ func TestSessionFindMorePeers(t *testing.T) {
199202

200203
// verify a broadcast was made
201204
receivedWantReq := <-wantReqs
202-
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
205+
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
203206
t.Fatal("did not rebroadcast whole live list")
204207
}
205208
if receivedWantReq.peers != nil {

0 commit comments

Comments
 (0)