@@ -2,7 +2,6 @@ package session
2
2
3
3
import (
4
4
"context"
5
- "math/rand"
6
5
"time"
7
6
8
7
lru "github.com/hashicorp/golang-lru"
@@ -13,16 +12,13 @@ import (
13
12
logging "github.com/ipfs/go-log"
14
13
loggables "github.com/libp2p/go-libp2p-loggables"
15
14
peer "github.com/libp2p/go-libp2p-peer"
15
+
16
+ bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
16
17
)
17
18
18
19
const (
19
- minReceivedToSplit = 2
20
- maxSplit = 32
21
- maxAcceptableDupes = 0.4
22
- minDuplesToTryLessSplits = 0.2
23
- initialSplit = 2
24
- broadcastLiveWantsLimit = 4
25
- targetedLiveWantsLimit = 32
20
+ broadcastLiveWantsLimit = 4
21
+ targetedLiveWantsLimit = 32
26
22
)
27
23
28
24
// WantManager is an interface that can be used to request blocks
@@ -41,6 +37,14 @@ type PeerManager interface {
41
37
RecordPeerResponse (peer.ID , cid.Cid )
42
38
}
43
39
40
+ // RequestSplitter provides an interface for splitting
41
+ // a request for Cids up among peers.
42
+ type RequestSplitter interface {
43
+ SplitRequest ([]peer.ID , []cid.Cid ) []* bssrs.PartialRequest
44
+ RecordDuplicateBlock ()
45
+ RecordUniqueBlock ()
46
+ }
47
+
44
48
type interestReq struct {
45
49
c cid.Cid
46
50
resp chan bool
@@ -60,6 +64,7 @@ type Session struct {
60
64
ctx context.Context
61
65
wm WantManager
62
66
pm PeerManager
67
+ srs RequestSplitter
63
68
64
69
// channels
65
70
incoming chan blkRecv
@@ -70,17 +75,14 @@ type Session struct {
70
75
tickDelayReqs chan time.Duration
71
76
72
77
// do not touch outside run loop
73
- tofetch * cidQueue
74
- interest * lru.Cache
75
- pastWants * cidQueue
76
- liveWants map [cid.Cid ]time.Time
77
- tick * time.Timer
78
- baseTickDelay time.Duration
79
- latTotal time.Duration
80
- fetchcnt int
81
- receivedCount int
82
- split int
83
- duplicateReceivedCount int
78
+ tofetch * cidQueue
79
+ interest * lru.Cache
80
+ pastWants * cidQueue
81
+ liveWants map [cid.Cid ]time.Time
82
+ tick * time.Timer
83
+ baseTickDelay time.Duration
84
+ latTotal time.Duration
85
+ fetchcnt int
84
86
// identifiers
85
87
notif notifications.PubSub
86
88
uuid logging.Loggable
@@ -89,7 +91,7 @@ type Session struct {
89
91
90
92
// New creates a new bitswap session whose lifetime is bounded by the
91
93
// given context.
92
- func New (ctx context.Context , id uint64 , wm WantManager , pm PeerManager ) * Session {
94
+ func New (ctx context.Context , id uint64 , wm WantManager , pm PeerManager , srs RequestSplitter ) * Session {
93
95
s := & Session {
94
96
liveWants : make (map [cid.Cid ]time.Time ),
95
97
newReqs : make (chan []cid.Cid ),
@@ -102,7 +104,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
102
104
ctx : ctx ,
103
105
wm : wm ,
104
106
pm : pm ,
105
- split : initialSplit ,
107
+ srs : srs ,
106
108
incoming : make (chan blkRecv ),
107
109
notif : notifications .New (),
108
110
uuid : loggables .Uuid ("GetBlockRequest" ),
@@ -230,7 +232,7 @@ func (s *Session) run(ctx context.Context) {
230
232
select {
231
233
case blk := <- s .incoming :
232
234
if blk .counterMessage {
233
- s .updateReceiveCounters (ctx , blk . blk )
235
+ s .updateReceiveCounters (ctx , blk )
234
236
} else {
235
237
s .handleIncomingBlock (ctx , blk )
236
238
}
@@ -357,22 +359,16 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
357
359
}
358
360
}
359
361
360
- func (s * Session ) duplicateRatio () float64 {
361
- return float64 (s .duplicateReceivedCount ) / float64 (s .receivedCount )
362
- }
363
- func (s * Session ) updateReceiveCounters (ctx context.Context , blk blocks.Block ) {
364
- if s .pastWants .Has (blk .Cid ()) {
365
- s .receivedCount ++
366
- s .duplicateReceivedCount ++
367
- if (s .receivedCount > minReceivedToSplit ) && (s .duplicateRatio () > maxAcceptableDupes ) && (s .split < maxSplit ) {
368
- s .split ++
362
+ func (s * Session ) updateReceiveCounters (ctx context.Context , blk blkRecv ) {
363
+ ks := blk .blk .Cid ()
364
+ if s .pastWants .Has (ks ) {
365
+ s .srs .RecordDuplicateBlock ()
366
+ if blk .from != "" {
367
+ s .pm .RecordPeerResponse (blk .from , ks )
369
368
}
370
369
} else {
371
- if s .cidIsWanted (blk .Cid ()) {
372
- s .receivedCount ++
373
- if (s .split > 1 ) && (s .duplicateRatio () < minDuplesToTryLessSplits ) {
374
- s .split --
375
- }
370
+ if s .cidIsWanted (ks ) {
371
+ s .srs .RecordUniqueBlock ()
376
372
}
377
373
}
378
374
}
@@ -384,12 +380,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
384
380
}
385
381
peers := s .pm .GetOptimizedPeers ()
386
382
if len (peers ) > 0 {
387
- splitRequests := split (ks , peers , s .split )
388
- for i , currentKeys := range splitRequests .ks {
389
- currentPeers := splitRequests .peers [i ]
390
- // right now we're requesting each block from every peer, but soon, maybe not
391
- s .pm .RecordPeerRequests (currentPeers , currentKeys )
392
- s .wm .WantBlocks (ctx , currentKeys , currentPeers , s .id )
383
+ splitRequests := s .srs .SplitRequest (peers , ks )
384
+ for _ , splitRequest := range splitRequests {
385
+ s .pm .RecordPeerRequests (splitRequest .Peers , splitRequest .Keys )
386
+ s .wm .WantBlocks (ctx , splitRequest .Keys , splitRequest .Peers , s .id )
393
387
}
394
388
} else {
395
389
s .pm .RecordPeerRequests (nil , ks )
@@ -415,34 +409,6 @@ type splitRec struct {
415
409
peers [][]peer.ID
416
410
}
417
411
418
- func split (ks []cid.Cid , peers []peer.ID , split int ) * splitRec {
419
- peerSplit := split
420
- if len (peers ) < peerSplit {
421
- peerSplit = len (peers )
422
- }
423
- keySplit := split
424
- if len (ks ) < keySplit {
425
- keySplit = len (ks )
426
- }
427
- if keySplit > peerSplit {
428
- keySplit = peerSplit
429
- }
430
- out := & splitRec {
431
- ks : make ([][]cid.Cid , keySplit ),
432
- peers : make ([][]peer.ID , peerSplit ),
433
- }
434
- for i , c := range ks {
435
- pos := i % keySplit
436
- out .ks [pos ] = append (out .ks [pos ], c )
437
- }
438
- peerOrder := rand .Perm (len (peers ))
439
- for i , po := range peerOrder {
440
- pos := i % peerSplit
441
- out .peers [pos ] = append (out .peers [pos ], peers [po ])
442
- }
443
- return out
444
- }
445
-
446
412
func (s * Session ) wantBudget () int {
447
413
live := len (s .liveWants )
448
414
var budget int
0 commit comments