Skip to content

Commit a15e72b

Browse files
committed
feat(gossipsub): introduce backpressure
1 parent add1ff6 commit a15e72b

File tree

7 files changed

+582
-400
lines changed

7 files changed

+582
-400
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protocols/gossipsub/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ sha2 = "0.10.8"
3737
smallvec = "1.11.2"
3838
tracing = "0.1.37"
3939
void = "1.0.2"
40+
async-channel = "1.9.0"
4041

4142
# Metrics dependencies
4243
prometheus-client = { workspace = true }

protocols/gossipsub/src/behaviour.rs

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ use libp2p_swarm::{
4545
THandlerOutEvent, ToSwarm,
4646
};
4747

48-
use crate::backoff::BackoffStorage;
49-
use crate::config::{Config, ValidationMode};
5048
use crate::gossip_promises::GossipPromises;
5149
use crate::handler::{Handler, HandlerEvent, HandlerIn};
5250
use crate::mcache::MessageCache;
@@ -62,6 +60,11 @@ use crate::types::{
6260
SubscriptionAction,
6361
};
6462
use crate::types::{PeerConnections, PeerKind, RpcOut};
63+
use crate::{backoff::BackoffStorage, types::RpcSender};
64+
use crate::{
65+
config::{Config, ValidationMode},
66+
types::rpc_channel,
67+
};
6568
use crate::{rpc_proto::proto, TopicScoreParams};
6669
use crate::{PublishError, SubscriptionError, ValidationError};
6770
use instant::SystemTime;
@@ -332,6 +335,9 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
332335

333336
/// Keep track of a set of internal metrics relating to gossipsub.
334337
metrics: Option<Metrics>,
338+
339+
/// Connection handler message queue channels.
340+
handler_send_queues: HashMap<PeerId, RpcSender>,
335341
}
336342

337343
impl<D, F> Behaviour<D, F>
@@ -471,6 +477,7 @@ where
471477
config,
472478
subscription_filter,
473479
data_transform,
480+
handler_send_queues: Default::default(),
474481
})
475482
}
476483
}
@@ -537,7 +544,8 @@ where
537544
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
538545
tracing::debug!(%peer, "Sending SUBSCRIBE to peer");
539546
let event = RpcOut::Subscribe(topic_hash.clone());
540-
self.send_message(peer, event);
547+
self.send_message(peer, event)
548+
.expect("Subscribe messages should be always sent");
541549
}
542550

543551
// call JOIN(topic)
@@ -564,7 +572,8 @@ where
564572
for peer in self.peer_topics.keys().copied().collect::<Vec<_>>() {
565573
tracing::debug!(%peer, "Sending UNSUBSCRIBE to peer");
566574
let event = RpcOut::Unsubscribe(topic_hash.clone());
567-
self.send_message(peer, event);
575+
self.send_message(peer, event)
576+
.expect("Subscribe messages should be always sent");
568577
}
569578

570579
// call LEAVE(topic)
@@ -711,9 +720,18 @@ where
711720
}
712721

713722
// Send to peers we know are subscribed to the topic.
723+
let mut errors = 0;
714724
for peer_id in recipient_peers.iter() {
715725
tracing::trace!(peer=%peer_id, "Sending message to peer");
716-
self.send_message(*peer_id, RpcOut::Publish(raw_message.clone()));
726+
if self
727+
.send_message(*peer_id, RpcOut::Publish(raw_message.clone()))
728+
.is_err()
729+
{
730+
errors += 1;
731+
}
732+
}
733+
if errors == recipient_peers.len() {
734+
return Err(PublishError::InsufficientPeers);
717735
}
718736

719737
tracing::debug!(message=%msg_id, "Published message");
@@ -1311,7 +1329,7 @@ where
13111329
);
13121330
} else {
13131331
tracing::debug!(peer=%peer_id, "IWANT: Sending cached messages to peer");
1314-
self.send_message(*peer_id, RpcOut::Forward(msg));
1332+
let _ = self.send_message(*peer_id, RpcOut::Forward(msg));
13151333
}
13161334
}
13171335
}
@@ -1469,7 +1487,8 @@ where
14691487
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
14701488
.collect::<Vec<_>>()
14711489
{
1472-
self.send_message(*peer_id, RpcOut::Control(action));
1490+
self.send_message(*peer_id, RpcOut::Control(action))
1491+
.expect("PRUNE messages should always be sent");
14731492
}
14741493
// Send the prune messages to the peer
14751494
tracing::debug!(
@@ -1970,6 +1989,7 @@ where
19701989
.collect::<Vec<_>>()
19711990
{
19721991
self.send_message(*propagation_source, RpcOut::Control(action))
1992+
.expect("GRAFT messages should always be sent");
19731993
}
19741994

19751995
// Notify the application of the subscriptions
@@ -2520,7 +2540,8 @@ where
25202540

25212541
// send the control messages
25222542
for msg in control_msgs.chain(prunes).collect::<Vec<_>>() {
2523-
self.send_message(peer, RpcOut::Control(msg));
2543+
self.send_message(peer, RpcOut::Control(msg))
2544+
.expect("PRUNE messages should always be sent");
25242545
}
25252546
}
25262547

@@ -2534,7 +2555,8 @@ where
25342555
self.config.do_px() && !no_px.contains(peer),
25352556
false,
25362557
);
2537-
self.send_message(*peer, RpcOut::Control(prune));
2558+
self.send_message(*peer, RpcOut::Control(prune))
2559+
.expect("PRUNE messages should always be sent");
25382560

25392561
// inform the handler
25402562
peer_removed_from_mesh(
@@ -2606,7 +2628,7 @@ where
26062628

26072629
for peer in recipient_peers.iter() {
26082630
tracing::debug!(%peer, message=%msg_id, "Sending message to peer");
2609-
self.send_message(*peer, event.clone());
2631+
let _ = self.send_message(*peer, event.clone());
26102632
}
26112633
tracing::debug!("Completed forwarding message");
26122634
Ok(true)
@@ -2720,7 +2742,7 @@ where
27202742
fn flush_control_pool(&mut self) {
27212743
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
27222744
for msg in controls {
2723-
self.send_message(peer, RpcOut::Control(msg));
2745+
let _ = self.send_message(peer, RpcOut::Control(msg));
27242746
}
27252747
}
27262748

@@ -2730,19 +2752,24 @@ where
27302752

27312753
/// Send a [`RpcOut`] message to a peer. This will wrap the message in an arc if it
27322754
/// is not already an arc.
2733-
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) {
2755+
fn send_message(&mut self, peer_id: PeerId, rpc: RpcOut) -> Result<(), ()> {
2756+
let sender = self
2757+
.handler_send_queues
2758+
.get_mut(&peer_id)
2759+
.expect("Peerid should exist");
2760+
2761+
if sender.try_send(rpc.clone()).is_err() {
2762+
tracing::debug!(peer=%peer_id, "Dropping message as peer is full");
2763+
return Err(());
2764+
}
2765+
27342766
if let Some(m) = self.metrics.as_mut() {
27352767
if let RpcOut::Publish(ref message) | RpcOut::Forward(ref message) = rpc {
27362768
// register bytes sent on the internal metrics.
27372769
m.msg_sent(&message.topic, message.raw_protobuf_len());
27382770
}
27392771
}
2740-
2741-
self.events.push_back(ToSwarm::NotifyHandler {
2742-
peer_id,
2743-
event: HandlerIn::Message(rpc),
2744-
handler: NotifyHandler::Any,
2745-
});
2772+
Ok(())
27462773
}
27472774

27482775
fn on_connection_established(
@@ -2811,7 +2838,8 @@ where
28112838
tracing::debug!(peer=%peer_id, "New peer connected");
28122839
// We need to send our subscriptions to the newly-connected node.
28132840
for topic_hash in self.mesh.clone().into_keys() {
2814-
self.send_message(peer_id, RpcOut::Subscribe(topic_hash));
2841+
self.send_message(peer_id, RpcOut::Subscribe(topic_hash))
2842+
.expect("Subscribe messages should be always sent");
28152843
}
28162844
}
28172845

@@ -2939,6 +2967,7 @@ where
29392967
}
29402968

29412969
self.connected_peers.remove(&peer_id);
2970+
self.handler_send_queues.remove(&peer_id);
29422971

29432972
if let Some((peer_score, ..)) = &mut self.peer_score {
29442973
peer_score.remove_peer(&peer_id);
@@ -2998,21 +3027,25 @@ where
29983027
fn handle_established_inbound_connection(
29993028
&mut self,
30003029
_: ConnectionId,
3001-
_: PeerId,
3030+
peer_id: PeerId,
30023031
_: &Multiaddr,
30033032
_: &Multiaddr,
30043033
) -> Result<THandler<Self>, ConnectionDenied> {
3005-
Ok(Handler::new(self.config.protocol_config()))
3034+
let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len());
3035+
self.handler_send_queues.insert(peer_id, sender);
3036+
Ok(Handler::new(self.config.protocol_config(), receiver))
30063037
}
30073038

30083039
fn handle_established_outbound_connection(
30093040
&mut self,
30103041
_: ConnectionId,
3011-
_: PeerId,
3042+
peer_id: PeerId,
30123043
_: &Multiaddr,
30133044
_: Endpoint,
30143045
) -> Result<THandler<Self>, ConnectionDenied> {
3015-
Ok(Handler::new(self.config.protocol_config()))
3046+
let (sender, receiver) = rpc_channel(self.config.connection_handler_queue_len());
3047+
self.handler_send_queues.insert(peer_id, sender);
3048+
Ok(Handler::new(self.config.protocol_config(), receiver))
30163049
}
30173050

30183051
fn on_connection_handler_event(

0 commit comments

Comments
 (0)