Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 4bfa7b7

Browse files
committed
refactor(sessions): extract request splitting
Move the job of splitting requests to its own package
1 parent c47eb62 commit 4bfa7b7

File tree

7 files changed

+356
-96
lines changed

7 files changed

+356
-96
lines changed

bitswap.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
13+
1214
decision "github.com/ipfs/go-bitswap/decision"
1315
bsgetter "github.com/ipfs/go-bitswap/getter"
1416
bsmsg "github.com/ipfs/go-bitswap/message"
@@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
103105
}
104106

105107
wm := bswm.New(ctx)
106-
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
107-
return bssession.New(ctx, id, wm, pm)
108+
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
109+
return bssession.New(ctx, id, wm, pm, srs)
108110
}
109111
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
110112
return bsspm.New(ctx, id, network)
111113
}
114+
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
115+
return bssrs.New(ctx)
116+
}
112117

113118
bs := &Bitswap{
114119
blockstore: bstore,
@@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
121126
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
122127
wm: wm,
123128
pm: bspm.New(ctx, peerQueueFactory),
124-
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
129+
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
125130
counters: new(counters),
126131
dupMetric: dupHist,
127132
allMetric: allHist,

session/session.go

+34-76
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package session
22

33
import (
44
"context"
5-
"math/rand"
65
"time"
76

87
lru "github.com/hashicorp/golang-lru"
@@ -13,16 +12,13 @@ import (
1312
logging "github.com/ipfs/go-log"
1413
loggables "github.com/libp2p/go-libp2p-loggables"
1514
peer "github.com/libp2p/go-libp2p-peer"
15+
16+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
1617
)
1718

1819
const (
19-
minReceivedToSplit = 2
20-
maxSplit = 32
21-
maxAcceptableDupes = 0.4
22-
minDuplesToTryLessSplits = 0.2
23-
initialSplit = 2
24-
broadcastLiveWantsLimit = 4
25-
targetedLiveWantsLimit = 32
20+
broadcastLiveWantsLimit = 4
21+
targetedLiveWantsLimit = 32
2622
)
2723

2824
// WantManager is an interface that can be used to request blocks
@@ -41,6 +37,14 @@ type PeerManager interface {
4137
RecordPeerResponse(peer.ID, cid.Cid)
4238
}
4339

40+
// RequestSplitter provides an interface for splitting
41+
// a request for Cids up among peers.
42+
type RequestSplitter interface {
43+
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
44+
RecordDuplicateBlock()
45+
RecordUniqueBlock()
46+
}
47+
4448
type interestReq struct {
4549
c cid.Cid
4650
resp chan bool
@@ -60,6 +64,7 @@ type Session struct {
6064
ctx context.Context
6165
wm WantManager
6266
pm PeerManager
67+
srs RequestSplitter
6368

6469
// channels
6570
incoming chan blkRecv
@@ -70,17 +75,14 @@ type Session struct {
7075
tickDelayReqs chan time.Duration
7176

7277
// do not touch outside run loop
73-
tofetch *cidQueue
74-
interest *lru.Cache
75-
pastWants *cidQueue
76-
liveWants map[cid.Cid]time.Time
77-
tick *time.Timer
78-
baseTickDelay time.Duration
79-
latTotal time.Duration
80-
fetchcnt int
81-
receivedCount int
82-
split int
83-
duplicateReceivedCount int
78+
tofetch *cidQueue
79+
interest *lru.Cache
80+
pastWants *cidQueue
81+
liveWants map[cid.Cid]time.Time
82+
tick *time.Timer
83+
baseTickDelay time.Duration
84+
latTotal time.Duration
85+
fetchcnt int
8486
// identifiers
8587
notif notifications.PubSub
8688
uuid logging.Loggable
@@ -89,7 +91,7 @@ type Session struct {
8991

9092
// New creates a new bitswap session whose lifetime is bounded by the
9193
// given context.
92-
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
94+
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
9395
s := &Session{
9496
liveWants: make(map[cid.Cid]time.Time),
9597
newReqs: make(chan []cid.Cid),
@@ -102,7 +104,7 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
102104
ctx: ctx,
103105
wm: wm,
104106
pm: pm,
105-
split: initialSplit,
107+
srs: srs,
106108
incoming: make(chan blkRecv),
107109
notif: notifications.New(),
108110
uuid: loggables.Uuid("GetBlockRequest"),
@@ -230,7 +232,7 @@ func (s *Session) run(ctx context.Context) {
230232
select {
231233
case blk := <-s.incoming:
232234
if blk.counterMessage {
233-
s.updateReceiveCounters(ctx, blk.blk)
235+
s.updateReceiveCounters(ctx, blk)
234236
} else {
235237
s.handleIncomingBlock(ctx, blk)
236238
}
@@ -357,22 +359,13 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
357359
}
358360
}
359361

360-
func (s *Session) duplicateRatio() float64 {
361-
return float64(s.duplicateReceivedCount) / float64(s.receivedCount)
362-
}
363-
func (s *Session) updateReceiveCounters(ctx context.Context, blk blocks.Block) {
364-
if s.pastWants.Has(blk.Cid()) {
365-
s.receivedCount++
366-
s.duplicateReceivedCount++
367-
if (s.receivedCount > minReceivedToSplit) && (s.duplicateRatio() > maxAcceptableDupes) && (s.split < maxSplit) {
368-
s.split++
369-
}
362+
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
363+
ks := blk.blk.Cid()
364+
if s.pastWants.Has(ks) {
365+
s.srs.RecordDuplicateBlock()
370366
} else {
371-
if s.cidIsWanted(blk.Cid()) {
372-
s.receivedCount++
373-
if (s.split > 1) && (s.duplicateRatio() < minDuplesToTryLessSplits) {
374-
s.split--
375-
}
367+
if s.cidIsWanted(ks) {
368+
s.srs.RecordUniqueBlock()
376369
}
377370
}
378371
}
@@ -384,12 +377,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
384377
}
385378
peers := s.pm.GetOptimizedPeers()
386379
if len(peers) > 0 {
387-
splitRequests := split(ks, peers, s.split)
388-
for i, currentKeys := range splitRequests.ks {
389-
currentPeers := splitRequests.peers[i]
390-
// right now we're requesting each block from every peer, but soon, maybe not
391-
s.pm.RecordPeerRequests(currentPeers, currentKeys)
392-
s.wm.WantBlocks(ctx, currentKeys, currentPeers, s.id)
380+
splitRequests := s.srs.SplitRequest(peers, ks)
381+
for _, splitRequest := range splitRequests {
382+
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
383+
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
393384
}
394385
} else {
395386
s.pm.RecordPeerRequests(nil, ks)
@@ -410,39 +401,6 @@ func (s *Session) resetTick() {
410401
}
411402
}
412403

413-
type splitRec struct {
414-
ks [][]cid.Cid
415-
peers [][]peer.ID
416-
}
417-
418-
func split(ks []cid.Cid, peers []peer.ID, split int) *splitRec {
419-
peerSplit := split
420-
if len(peers) < peerSplit {
421-
peerSplit = len(peers)
422-
}
423-
keySplit := split
424-
if len(ks) < keySplit {
425-
keySplit = len(ks)
426-
}
427-
if keySplit > peerSplit {
428-
keySplit = peerSplit
429-
}
430-
out := &splitRec{
431-
ks: make([][]cid.Cid, keySplit),
432-
peers: make([][]peer.ID, peerSplit),
433-
}
434-
for i, c := range ks {
435-
pos := i % keySplit
436-
out.ks[pos] = append(out.ks[pos], c)
437-
}
438-
peerOrder := rand.Perm(len(peers))
439-
for i, po := range peerOrder {
440-
pos := i % peerSplit
441-
out.peers[pos] = append(out.peers[pos], peers[po])
442-
}
443-
return out
444-
}
445-
446404
func (s *Session) wantBudget() int {
447405
live := len(s.liveWants)
448406
var budget int

session/session_test.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ipfs/go-block-format"
1010

11+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
1112
"github.com/ipfs/go-bitswap/testutil"
1213
cid "github.com/ipfs/go-cid"
1314
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
@@ -57,15 +58,26 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
5758
fpm.lk.Unlock()
5859
}
5960

61+
type fakeRequestSplitter struct {
62+
}
63+
64+
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
65+
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
66+
}
67+
68+
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
69+
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
70+
6071
func TestSessionGetBlocks(t *testing.T) {
6172
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
6273
defer cancel()
6374
wantReqs := make(chan wantReq, 1)
6475
cancelReqs := make(chan wantReq, 1)
6576
fwm := &fakeWantManager{wantReqs, cancelReqs}
6677
fpm := &fakePeerManager{}
78+
frs := &fakeRequestSplitter{}
6779
id := testutil.GenerateSessionID()
68-
session := New(ctx, id, fwm, fpm)
80+
session := New(ctx, id, fwm, fpm, frs)
6981
blockGenerator := blocksutil.NewBlockGenerator()
7082
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
7183
var cids []cid.Cid
@@ -165,8 +177,9 @@ func TestSessionFindMorePeers(t *testing.T) {
165177
cancelReqs := make(chan wantReq, 1)
166178
fwm := &fakeWantManager{wantReqs, cancelReqs}
167179
fpm := &fakePeerManager{}
180+
frs := &fakeRequestSplitter{}
168181
id := testutil.GenerateSessionID()
169-
session := New(ctx, id, fwm, fpm)
182+
session := New(ctx, id, fwm, fpm, frs)
170183
session.SetBaseTickDelay(200 * time.Microsecond)
171184
blockGenerator := blocksutil.NewBlockGenerator()
172185
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)

sessionmanager/sessionmanager.go

+18-10
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,26 @@ type Session interface {
2323
type sesTrk struct {
2424
session Session
2525
pm bssession.PeerManager
26+
srs bssession.RequestSplitter
2627
}
2728

2829
// SessionFactory generates a new session for the SessionManager to track.
29-
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager) Session
30+
type SessionFactory func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) Session
31+
32+
// RequestSplitterFactory generates a new request splitter for a session.
33+
type RequestSplitterFactory func(ctx context.Context) bssession.RequestSplitter
3034

3135
// PeerManagerFactory generates a new peer manager for a session.
3236
type PeerManagerFactory func(ctx context.Context, id uint64) bssession.PeerManager
3337

3438
// SessionManager is responsible for creating, managing, and dispatching to
3539
// sessions.
3640
type SessionManager struct {
37-
ctx context.Context
38-
sessionFactory SessionFactory
39-
peerManagerFactory PeerManagerFactory
41+
ctx context.Context
42+
sessionFactory SessionFactory
43+
peerManagerFactory PeerManagerFactory
44+
requestSplitterFactory RequestSplitterFactory
45+
4046
// Sessions
4147
sessLk sync.Mutex
4248
sessions []sesTrk
@@ -47,11 +53,12 @@ type SessionManager struct {
4753
}
4854

4955
// New creates a new SessionManager.
50-
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory) *SessionManager {
56+
func New(ctx context.Context, sessionFactory SessionFactory, peerManagerFactory PeerManagerFactory, requestSplitterFactory RequestSplitterFactory) *SessionManager {
5157
return &SessionManager{
52-
ctx: ctx,
53-
sessionFactory: sessionFactory,
54-
peerManagerFactory: peerManagerFactory,
58+
ctx: ctx,
59+
sessionFactory: sessionFactory,
60+
peerManagerFactory: peerManagerFactory,
61+
requestSplitterFactory: requestSplitterFactory,
5562
}
5663
}
5764

@@ -62,8 +69,9 @@ func (sm *SessionManager) NewSession(ctx context.Context) exchange.Fetcher {
6269
sessionctx, cancel := context.WithCancel(ctx)
6370

6471
pm := sm.peerManagerFactory(sessionctx, id)
65-
session := sm.sessionFactory(sessionctx, id, pm)
66-
tracked := sesTrk{session, pm}
72+
srs := sm.requestSplitterFactory(sessionctx)
73+
session := sm.sessionFactory(sessionctx, id, pm, srs)
74+
tracked := sesTrk{session, pm, srs}
6775
sm.sessLk.Lock()
6876
sm.sessions = append(sm.sessions, tracked)
6977
sm.sessLk.Unlock()

0 commit comments

Comments
 (0)