Skip to content

merge queue: embarking unstable (56a9bef) and #6526 together #6527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 9 additions & 10 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4;

/* Inbound Codec */

pub struct SSZSnappyInboundCodec<E> {
pub struct SSZSnappyInboundCodec<E: EthSpec> {
protocol: ProtocolId,
inner: Uvi<usize>,
len: Option<usize>,
Expand Down Expand Up @@ -143,7 +143,7 @@ impl<E: EthSpec> Encoder<RpcResponse<E>> for SSZSnappyInboundCodec<E> {

// Decoder for inbound streams: Decodes RPC requests from peers
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
type Item = RequestType;
type Item = RequestType<E>;
type Error = RPCError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -195,7 +195,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
}

/* Outbound Codec: Codec for initiating RPC requests */
pub struct SSZSnappyOutboundCodec<E> {
pub struct SSZSnappyOutboundCodec<E: EthSpec> {
inner: Uvi<usize>,
len: Option<usize>,
protocol: ProtocolId,
Expand Down Expand Up @@ -322,10 +322,10 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
}

// Encoder for outbound streams: Encodes RPC Requests to peers
impl<E: EthSpec> Encoder<RequestType> for SSZSnappyOutboundCodec<E> {
impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
type Error = RPCError;

fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> {
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
let bytes = match item {
RequestType::Status(req) => req.as_ssz_bytes(),
RequestType::Goodbye(req) => req.as_ssz_bytes(),
Expand Down Expand Up @@ -549,11 +549,11 @@ fn handle_length(
/// Decodes an `InboundRequest` from the byte stream.
/// `decoded_buffer` should be an ssz-encoded bytestream with
// length = length-prefix received in the beginning of the stream.
fn handle_rpc_request(
fn handle_rpc_request<E: EthSpec>(
versioned_protocol: SupportedProtocol,
decoded_buffer: &[u8],
spec: &ChainSpec,
) -> Result<Option<RequestType>, RPCError> {
) -> Result<Option<RequestType<E>>, RPCError> {
match versioned_protocol {
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
StatusMessage::from_ssz_bytes(decoded_buffer)?,
Expand Down Expand Up @@ -1035,7 +1035,6 @@ mod tests {
BlobsByRangeRequest {
start_slot: 0,
count: 10,
max_blobs_per_block: Spec::max_blobs_per_block(),
}
}

Expand Down Expand Up @@ -1181,7 +1180,7 @@ mod tests {
}

/// Verifies that requests we send are encoded in a way that we would correctly decode too.
fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) {
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
let fork_context = Arc::new(fork_context(fork_name));
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
Expand Down Expand Up @@ -1778,7 +1777,7 @@ mod tests {
fn test_encode_then_decode_request() {
let chain_spec = Spec::default_spec();

let requests: &[RequestType] = &[
let requests: &[RequestType<Spec>] = &[
RequestType::Ping(ping_message()),
RequestType::Status(status_message()),
RequestType::Goodbye(GoodbyeReason::Fault),
Expand Down
14 changes: 6 additions & 8 deletions beacon_node/lighthouse_network/src/rpc/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use slog::{crit, debug, trace};
use smallvec::SmallVec;
use std::{
collections::{hash_map::Entry, VecDeque},
marker::PhantomData,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand Down Expand Up @@ -97,7 +96,7 @@ where
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,

/// Queue of outbound substreams to open.
dial_queue: SmallVec<[(Id, RequestType); 4]>,
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,

/// Current number of concurrent outbound substreams being opened.
dial_negotiated: u32,
Expand Down Expand Up @@ -207,7 +206,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
/// The framed negotiated substream.
substream: Box<OutboundFramed<Stream, E>>,
/// Keeps track of the actual request sent.
request: RequestType,
request: RequestType<E>,
},
/// Closing an outbound substream>
Closing(Box<OutboundFramed<Stream, E>>),
Expand Down Expand Up @@ -275,7 +274,7 @@ where
}

/// Opens an outbound substream with a request.
fn send_request(&mut self, id: Id, req: RequestType) {
fn send_request(&mut self, id: Id, req: RequestType<E>) {
match self.state {
HandlerState::Active => {
self.dial_queue.push((id, req));
Expand Down Expand Up @@ -331,7 +330,7 @@ where
type ToBehaviour = HandlerEvent<Id, E>;
type InboundProtocol = RPCProtocol<E>;
type OutboundProtocol = OutboundRequestContainer<E>;
type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
type InboundOpenInfo = ();

fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
Expand Down Expand Up @@ -789,7 +788,6 @@ where
req: req.clone(),
fork_context: self.fork_context.clone(),
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
phantom: PhantomData,
},
(),
)
Expand Down Expand Up @@ -907,7 +905,7 @@ where
fn on_fully_negotiated_outbound(
&mut self,
substream: OutboundFramed<Stream, E>,
(id, request): (Id, RequestType),
(id, request): (Id, RequestType<E>),
) {
self.dial_negotiated -= 1;
// Reset any io-retries counter.
Expand Down Expand Up @@ -963,7 +961,7 @@ where
}
fn on_dial_upgrade_error(
&mut self,
request_info: (Id, RequestType),
request_info: (Id, RequestType<E>),
error: StreamUpgradeError<RPCError>,
) {
let (id, req) = request_info;
Expand Down
28 changes: 17 additions & 11 deletions beacon_node/lighthouse_network/src/rpc/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use ssz_derive::{Decode, Encode};
use ssz_types::{typenum::U256, VariableList};
use std::collections::BTreeMap;
use std::fmt::Display;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::Arc;
use strum::IntoStaticStr;
Expand Down Expand Up @@ -93,19 +94,27 @@ pub struct Ping {
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
)]
#[derive(Clone, Debug, PartialEq)]
pub struct MetadataRequest;
pub struct MetadataRequest<E: EthSpec> {
_phantom_data: PhantomData<E>,
}

impl MetadataRequest {
impl<E: EthSpec> MetadataRequest<E> {
pub fn new_v1() -> Self {
Self::V1(MetadataRequestV1 {})
Self::V1(MetadataRequestV1 {
_phantom_data: PhantomData,
})
}

pub fn new_v2() -> Self {
Self::V2(MetadataRequestV2 {})
Self::V2(MetadataRequestV2 {
_phantom_data: PhantomData,
})
}

pub fn new_v3() -> Self {
Self::V3(MetadataRequestV3 {})
Self::V3(MetadataRequestV3 {
_phantom_data: PhantomData,
})
}
}

Expand Down Expand Up @@ -315,14 +324,11 @@ pub struct BlobsByRangeRequest {

/// The number of slots from the start slot.
pub count: u64,

/// maximum number of blobs in a single block.
pub max_blobs_per_block: usize,
}

impl BlobsByRangeRequest {
pub fn max_blobs_requested(&self) -> u64 {
self.count.saturating_mul(self.max_blobs_per_block as u64)
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
self.count.saturating_mul(E::max_blobs_per_block() as u64)
}
}

Expand All @@ -338,7 +344,7 @@ pub struct DataColumnsByRangeRequest {
}

impl DataColumnsByRangeRequest {
pub fn max_requested(&self) -> u64 {
pub fn max_requested<E: EthSpec>(&self) -> u64 {
self.count.saturating_mul(self.columns.len() as u64)
}

Expand Down
10 changes: 5 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum RPCSend<Id, E: EthSpec> {
///
/// The `Id` is given by the application making the request. These
/// go over *outbound* connections.
Request(Id, RequestType),
Request(Id, RequestType<E>),
/// A response sent from Lighthouse.
///
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
Expand All @@ -79,7 +79,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
///
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
/// *inbound* substream over which it is managed.
Request(Request),
Request(Request<E>),
/// A response received from the outside.
///
/// The `Id` corresponds to the application given ID of the original request sent to the
Expand Down Expand Up @@ -113,10 +113,10 @@ impl RequestId {

/// An Rpc Request.
#[derive(Debug, Clone)]
pub struct Request {
pub struct Request<E: EthSpec> {
pub id: RequestId,
pub substream_id: SubstreamId,
pub r#type: RequestType,
pub r#type: RequestType<E>,
}

impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
Expand Down Expand Up @@ -221,7 +221,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
/// Submits an RPC request.
///
/// The peer must be connected for this to succeed.
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
match self_limiter.allows(peer_id, request_id, req) {
Ok(event) => event,
Expand Down
8 changes: 3 additions & 5 deletions beacon_node/lighthouse_network/src/rpc/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use futures::future::BoxFuture;
use futures::prelude::{AsyncRead, AsyncWrite};
use futures::{FutureExt, SinkExt};
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
use std::marker::PhantomData;
use std::sync::Arc;
use tokio_util::{
codec::Framed,
Expand All @@ -20,14 +19,13 @@ use types::{EthSpec, ForkContext};
// `OutboundUpgrade`

#[derive(Debug, Clone)]
pub struct OutboundRequestContainer<E> {
pub req: RequestType,
pub struct OutboundRequestContainer<E: EthSpec> {
pub req: RequestType<E>,
pub fork_context: Arc<ForkContext>,
pub max_rpc_size: usize,
pub phantom: PhantomData<E>,
}

impl<E> UpgradeInfo for OutboundRequestContainer<E> {
impl<E: EthSpec> UpgradeInfo for OutboundRequestContainer<E> {
type Info = ProtocolId;
type InfoIter = Vec<Self::Info>;

Expand Down
14 changes: 7 additions & 7 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
// handler to respond to once ready.

pub type InboundOutput<TSocket, E> = (RequestType, InboundFramed<TSocket, E>);
pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
pub type InboundFramed<TSocket, E> =
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;

Expand Down Expand Up @@ -754,7 +754,7 @@ where
}

#[derive(Debug, Clone, PartialEq)]
pub enum RequestType {
pub enum RequestType<E: EthSpec> {
Status(StatusMessage),
Goodbye(GoodbyeReason),
BlocksByRange(OldBlocksByRangeRequest),
Expand All @@ -768,11 +768,11 @@ pub enum RequestType {
LightClientFinalityUpdate,
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
Ping(Ping),
MetaData(MetadataRequest),
MetaData(MetadataRequest<E>),
}

/// Implements the encoding per supported protocol for `RPCRequest`.
impl RequestType {
impl<E: EthSpec> RequestType<E> {
/* These functions are used in the handler for stream management */

/// Maximum number of responses expected for this request.
Expand All @@ -782,10 +782,10 @@ impl RequestType {
RequestType::Goodbye(_) => 0,
RequestType::BlocksByRange(req) => *req.count(),
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
RequestType::BlobsByRange(req) => req.max_blobs_requested(),
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
RequestType::DataColumnsByRange(req) => req.max_requested(),
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
RequestType::Ping(_) => 1,
RequestType::MetaData(_) => 1,
RequestType::LightClientBootstrap(_) => 1,
Expand Down Expand Up @@ -1027,7 +1027,7 @@ impl std::error::Error for RPCError {
}
}

impl std::fmt::Display for RequestType {
impl<E: EthSpec> std::fmt::Display for RequestType<E> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RequestType::Status(status) => write!(f, "Status Message: {}", status),
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/lighthouse_network/src/rpc/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time::Interval;
use types::EthSpec;

/// Nanoseconds since a given time.
// Maintained as u64 to reduce footprint
Expand Down Expand Up @@ -261,7 +262,7 @@ pub trait RateLimiterItem {
fn max_responses(&self) -> u64;
}

impl RateLimiterItem for super::RequestType {
impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
fn protocol(&self) -> Protocol {
self.versioned_protocol().protocol()
}
Expand Down
12 changes: 6 additions & 6 deletions beacon_node/lighthouse_network/src/rpc/self_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ use super::{

/// A request that was rate limited or waiting on rate limited requests for the same peer and
/// protocol.
struct QueuedRequest<Id: ReqId> {
req: RequestType,
struct QueuedRequest<Id: ReqId, E: EthSpec> {
req: RequestType<E>,
request_id: Id,
}

pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
/// Requests queued for sending per peer. This requests are stored when the self rate
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
/// are stored in the same way.
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id>>>,
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
/// The delay required to allow a peer's outbound request per protocol.
next_peer_request: DelayQueue<(PeerId, Protocol)>,
/// Rate limiter for our own requests.
Expand Down Expand Up @@ -70,7 +70,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
&mut self,
peer_id: PeerId,
request_id: Id,
req: RequestType,
req: RequestType<E>,
) -> Result<BehaviourAction<Id, E>, Error> {
let protocol = req.versioned_protocol().protocol();
// First check that there are not already other requests waiting to be sent.
Expand Down Expand Up @@ -101,9 +101,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
limiter: &mut RateLimiter,
peer_id: PeerId,
request_id: Id,
req: RequestType,
req: RequestType<E>,
log: &Logger,
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id>, Duration)> {
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
match limiter.allows(&peer_id, &req) {
Ok(()) => Ok(BehaviourAction::NotifyHandler {
peer_id,
Expand Down
Loading
Loading