Skip to content

Commit 50ccc5c

Browse files
authored
fix(IDONTWANT)!: Do not IDONTWANT your sender (#609)
We were sending IDONTWANT to the sender of the received message. This is pointless, as the sender should not repeat a message it already sent. The sender could also have tracked that it had sent this peer the message (we don't do this currently, and it's probably not necessary). @ppopth
1 parent 95a070a commit 50ccc5c

File tree

5 files changed

+81
-5
lines changed

5 files changed

+81
-5
lines changed

floodsub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (fs *FloodSubRouter) AcceptFrom(peer.ID) AcceptStatus {
7171
return AcceptAll
7272
}
7373

74-
func (fs *FloodSubRouter) PreValidation([]*Message) {}
74+
func (fs *FloodSubRouter) PreValidation(from peer.ID, msgs []*Message) {}
7575

7676
func (fs *FloodSubRouter) HandleRPC(rpc *RPC) {}
7777

gossipsub.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ func (gs *GossipSubRouter) AcceptFrom(p peer.ID) AcceptStatus {
707707
// PreValidation sends the IDONTWANT control messages to all the mesh
708708
// peers. They need to be sent right before the validation because they
709709
// should be seen by the peers as soon as possible.
710-
func (gs *GossipSubRouter) PreValidation(msgs []*Message) {
710+
func (gs *GossipSubRouter) PreValidation(from peer.ID, msgs []*Message) {
711711
tmids := make(map[string][]string)
712712
for _, msg := range msgs {
713713
if len(msg.GetData()) < gs.params.IDontWantMessageThreshold {
@@ -724,6 +724,10 @@ func (gs *GossipSubRouter) PreValidation(msgs []*Message) {
724724
shuffleStrings(mids)
725725
// send IDONTWANT to all the mesh peers
726726
for p := range gs.mesh[topic] {
727+
if p == from {
728+
// We don't send IDONTWANT to the peer that sent us the messages
729+
continue
730+
}
727731
// send to only peers that support IDONTWANT
728732
if gs.feature(GossipSubFeatureIdontwant, gs.peers[p]) {
729733
idontwant := []*pb.ControlIDontWant{{MessageIDs: mids}}

gossipsub_test.go

+72
Original file line numberDiff line numberDiff line change
@@ -2815,6 +2815,78 @@ func TestGossipsubIdontwantReceive(t *testing.T) {
28152815
<-ctx.Done()
28162816
}
28172817

2818+
type mockRawTracer struct {
2819+
onRecvRPC func(*RPC)
2820+
}
2821+
2822+
func (m *mockRawTracer) RecvRPC(rpc *RPC) {
2823+
if m.onRecvRPC != nil {
2824+
m.onRecvRPC(rpc)
2825+
}
2826+
}
2827+
2828+
func (m *mockRawTracer) AddPeer(p peer.ID, proto protocol.ID) {}
2829+
func (m *mockRawTracer) DeliverMessage(msg *Message) {}
2830+
func (m *mockRawTracer) DropRPC(rpc *RPC, p peer.ID) {}
2831+
func (m *mockRawTracer) DuplicateMessage(msg *Message) {}
2832+
func (m *mockRawTracer) Graft(p peer.ID, topic string) {}
2833+
func (m *mockRawTracer) Join(topic string) {}
2834+
func (m *mockRawTracer) Leave(topic string) {}
2835+
func (m *mockRawTracer) Prune(p peer.ID, topic string) {}
2836+
func (m *mockRawTracer) RejectMessage(msg *Message, reason string) {}
2837+
func (m *mockRawTracer) RemovePeer(p peer.ID) {}
2838+
func (m *mockRawTracer) SendRPC(rpc *RPC, p peer.ID) {}
2839+
func (m *mockRawTracer) ThrottlePeer(p peer.ID) {}
2840+
func (m *mockRawTracer) UndeliverableMessage(msg *Message) {}
2841+
func (m *mockRawTracer) ValidateMessage(msg *Message) {}
2842+
2843+
var _ RawTracer = &mockRawTracer{}
2844+
2845+
func TestGossipsubNoIDONTWANTToMessageSender(t *testing.T) {
2846+
ctx, cancel := context.WithCancel(context.Background())
2847+
defer cancel()
2848+
hosts := getDefaultHosts(t, 3)
2849+
denseConnect(t, hosts)
2850+
2851+
psubs := make([]*PubSub, 2)
2852+
2853+
receivedIDONTWANT := make(chan struct{})
2854+
psubs[0] = getGossipsub(ctx, hosts[0], WithRawTracer(&mockRawTracer{
2855+
onRecvRPC: func(rpc *RPC) {
2856+
if len(rpc.GetControl().GetIdontwant()) > 0 {
2857+
close(receivedIDONTWANT)
2858+
}
2859+
},
2860+
}))
2861+
psubs[1] = getGossipsub(ctx, hosts[1])
2862+
2863+
topicString := "foobar"
2864+
var topics []*Topic
2865+
for _, ps := range psubs {
2866+
topic, err := ps.Join(topicString)
2867+
if err != nil {
2868+
t.Fatal(err)
2869+
}
2870+
topics = append(topics, topic)
2871+
2872+
_, err = ps.Subscribe(topicString)
2873+
if err != nil {
2874+
t.Fatal(err)
2875+
}
2876+
}
2877+
time.Sleep(time.Second)
2878+
2879+
msg := make([]byte, GossipSubIDontWantMessageThreshold+1)
2880+
topics[0].Publish(ctx, msg)
2881+
2882+
select {
2883+
case <-receivedIDONTWANT:
2884+
t.Fatal("IDONTWANT should not be sent to the message sender")
2885+
case <-time.After(time.Second):
2886+
}
2887+
2888+
}
2889+
28182890
// Test that non-mesh peers will not get IDONTWANT
28192891
func TestGossipsubIdontwantNonMesh(t *testing.T) {
28202892
ctx, cancel := context.WithCancel(context.Background())

pubsub.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ type PubSubRouter interface {
203203
AcceptFrom(peer.ID) AcceptStatus
204204
// PreValidation is invoked on messages in the RPC envelope right before pushing it to
205205
// the validation pipeline
206-
PreValidation([]*Message)
206+
PreValidation(from peer.ID, msgs []*Message)
207207
// HandleRPC is invoked to process control messages in the RPC envelope.
208208
// It is invoked after subscriptions and payload messages have been processed.
209209
HandleRPC(*RPC)
@@ -1106,7 +1106,7 @@ func (p *PubSub) handleIncomingRPC(rpc *RPC) {
11061106
toPush = append(toPush, msg)
11071107
}
11081108
}
1109-
p.rt.PreValidation(toPush)
1109+
p.rt.PreValidation(rpc.from, toPush)
11101110
for _, msg := range toPush {
11111111
p.pushMsg(msg)
11121112
}

randomsub.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (rs *RandomSubRouter) AcceptFrom(peer.ID) AcceptStatus {
9494
return AcceptAll
9595
}
9696

97-
func (rs *RandomSubRouter) PreValidation([]*Message) {}
97+
func (rs *RandomSubRouter) PreValidation(from peer.ID, msgs []*Message) {}
9898

9999
func (rs *RandomSubRouter) HandleRPC(rpc *RPC) {}
100100

0 commit comments

Comments
 (0)