Skip to content

Commit 5ad493b

Browse files
committed
Gossipsub v2.0: Handle IANNOUNCE and send INEED
1 parent fc928e5 commit 5ad493b

File tree

4 files changed

+943
-1
lines changed

4 files changed

+943
-1
lines changed

acache.go

Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
package pubsub
2+
3+
import (
4+
"container/list"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/libp2p/go-libp2p/core/peer"
10+
)
11+
12+
type IneedMeta struct {
13+
pid peer.ID
14+
mid string
15+
}
16+
17+
type sendList struct {
18+
// Timeout
19+
t time.Duration
20+
// List of message ids
21+
l *list.List
22+
// Elements in l indexed by message ids.
23+
es map[string]*list.Element
24+
}
25+
26+
type sendListEntry struct {
27+
meta *IneedMeta
28+
// Send time
29+
sendTime time.Time
30+
// Timeout time
31+
expiryTime time.Time
32+
}
33+
34+
func newSendList(timeout time.Duration) *sendList {
35+
return &sendList{
36+
t: timeout,
37+
l: list.New(),
38+
es: make(map[string]*list.Element),
39+
}
40+
}
41+
42+
// Front returns the first message id in the list.
43+
func (sl *sendList) Front() *sendListEntry {
44+
e := sl.l.Front()
45+
if e != nil {
46+
entry := e.Value.(sendListEntry)
47+
return &entry
48+
} else {
49+
return nil
50+
}
51+
}
52+
53+
// Push pushes the message id and the peer to the list with send time set to now and expiry time set to now plus timeout.
54+
func (sl *sendList) Push(meta *IneedMeta) {
55+
// there shouldn't already be a message id in the list
56+
if _, ok := sl.es[meta.mid]; ok {
57+
panic(fmt.Errorf("there is already a message id in the list: %s", meta.mid))
58+
}
59+
// push to the back and remember the element
60+
sl.es[meta.mid] = sl.l.PushBack(sendListEntry{
61+
meta: meta,
62+
sendTime: time.Now(),
63+
expiryTime: time.Now().Add(sl.t),
64+
})
65+
}
66+
67+
// Remove removes the message id from the list.
68+
func (sl *sendList) Remove(mid string) {
69+
// there shouldn already be a message id in the list
70+
if _, ok := sl.es[mid]; !ok {
71+
panic(fmt.Errorf("there is no message id in the list to remove: %s", mid))
72+
}
73+
// remove it from both the list and the indexing map
74+
sl.l.Remove(sl.es[mid])
75+
delete(sl.es, mid)
76+
}
77+
78+
// Has checks if the message id is in the list.
79+
func (sl *sendList) Has(mid string) bool {
80+
_, ok := sl.es[mid]
81+
return ok
82+
}
83+
84+
type AnnounceCache struct {
85+
lk sync.RWMutex
86+
// Queues indexed by messages ids containing the peers from whom we already receive IANNOUNCE, but not yet send INEED.
87+
m map[string][]peer.ID
88+
// List of pairs of peers and message ids that we already send INEED, but the timeout hasn't occured and the message is not received yet.
89+
// There is supposed to be at most one element per message id in the list at any time.
90+
sl *sendList
91+
// Channel to wake up the background routine and try to send INEED.
92+
c chan<- *IneedMeta
93+
// Channel used to notify a request to send INEED from the cache.
94+
R <-chan *IneedMeta
95+
// Channel used to notify a timeout of INEED from the cache.
96+
T <-chan *IneedMeta
97+
// Used to indicate that the cache is stopped
98+
stopped chan struct{}
99+
}
100+
101+
func NewAnnounceCache(timeout time.Duration) *AnnounceCache {
102+
c := make(chan *IneedMeta)
103+
R := make(chan *IneedMeta)
104+
T := make(chan *IneedMeta)
105+
ac := &AnnounceCache{
106+
c: c,
107+
R: R,
108+
T: T,
109+
m: make(map[string][]peer.ID),
110+
sl: newSendList(timeout),
111+
112+
stopped: make(chan struct{}),
113+
}
114+
go ac.background(c, R, T)
115+
116+
return ac
117+
}
118+
119+
func (ac *AnnounceCache) background(c <-chan *IneedMeta, R chan<- *IneedMeta, T chan<- *IneedMeta) {
120+
timer := time.NewTimer(0)
121+
for {
122+
select {
123+
case <-ac.stopped:
124+
return
125+
case meta := <-c:
126+
ac.lk.Lock()
127+
if !ac.sl.Has(meta.mid) {
128+
// If there is no INEED on flight, just send INEED right away by putting it in the list
129+
ac.sl.Push(meta)
130+
// Send the meta data to the cache user, so they can send INEED using that
131+
select {
132+
case R <- meta:
133+
case <-ac.stopped:
134+
ac.lk.Unlock()
135+
return
136+
}
137+
} else {
138+
ac.m[meta.mid] = append(ac.m[meta.mid], meta.pid)
139+
}
140+
case <-timer.C:
141+
ac.lk.Lock()
142+
}
143+
entry := ac.sl.Front()
144+
for entry != nil && entry.expiryTime.Before(time.Now()) {
145+
// If the ongoing INEED times out
146+
mid := entry.meta.mid
147+
148+
// Remove it from the list
149+
ac.sl.Remove(mid)
150+
151+
// Notify the cache user that the ongoing INEED times out
152+
select {
153+
case T <- entry.meta:
154+
case <-ac.stopped:
155+
ac.lk.Unlock()
156+
return
157+
}
158+
159+
// If there is another peer waiting for INEED
160+
if len(ac.m[mid]) > 0 {
161+
meta := &IneedMeta{
162+
pid: ac.m[mid][0],
163+
mid: mid,
164+
}
165+
ac.m[mid] = ac.m[mid][1:]
166+
ac.sl.Push(meta)
167+
168+
// Send the meta data to the cache user, so they can send INEED using that
169+
select {
170+
case R <- meta:
171+
case <-ac.stopped:
172+
ac.lk.Unlock()
173+
return
174+
}
175+
} else {
176+
delete(ac.m, mid)
177+
}
178+
179+
// Look at the next entry
180+
entry = ac.sl.Front()
181+
}
182+
timer.Stop()
183+
if entry = ac.sl.Front(); entry != nil {
184+
// If there still the next entry, wake this background routine correspondingly
185+
timer.Reset(time.Until(entry.expiryTime))
186+
}
187+
ac.lk.Unlock()
188+
}
189+
}
190+
191+
func (ac *AnnounceCache) Add(mid string, pid peer.ID) {
192+
meta := &IneedMeta{
193+
mid: mid,
194+
pid: pid,
195+
}
196+
select {
197+
case ac.c <- meta:
198+
case <-ac.stopped:
199+
}
200+
}
201+
202+
// Clear clears all the pending IANNOUNCE and remove the ongoing INEED from the list so the the timeout
203+
// will not be triggered
204+
func (ac *AnnounceCache) Clear(mid string) {
205+
ac.lk.Lock()
206+
defer ac.lk.Unlock()
207+
208+
// Clear the cache for the given message id
209+
ac.m[mid] = []peer.ID{}
210+
if ac.sl.Has(mid) {
211+
ac.sl.Remove(mid)
212+
}
213+
}
214+
215+
func (ac *AnnounceCache) Stop() {
216+
close(ac.stopped)
217+
}

0 commit comments

Comments
 (0)