Skip to content

Commit 9aefb55

Browse files
Fix BlobsByRange by reverting PR6462 (#6526)
* Revert "Remove generic E from RequestId (#6462)" This reverts commit 772929f.
1 parent 56a9bef commit 9aefb55

File tree

15 files changed

+68
-69
lines changed

15 files changed

+68
-69
lines changed

beacon_node/lighthouse_network/src/rpc/codec.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ const CONTEXT_BYTES_LEN: usize = 4;
2828

2929
/* Inbound Codec */
3030

31-
pub struct SSZSnappyInboundCodec<E> {
31+
pub struct SSZSnappyInboundCodec<E: EthSpec> {
3232
protocol: ProtocolId,
3333
inner: Uvi<usize>,
3434
len: Option<usize>,
@@ -143,7 +143,7 @@ impl<E: EthSpec> Encoder<RpcResponse<E>> for SSZSnappyInboundCodec<E> {
143143

144144
// Decoder for inbound streams: Decodes RPC requests from peers
145145
impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
146-
type Item = RequestType;
146+
type Item = RequestType<E>;
147147
type Error = RPCError;
148148

149149
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
@@ -195,7 +195,7 @@ impl<E: EthSpec> Decoder for SSZSnappyInboundCodec<E> {
195195
}
196196

197197
/* Outbound Codec: Codec for initiating RPC requests */
198-
pub struct SSZSnappyOutboundCodec<E> {
198+
pub struct SSZSnappyOutboundCodec<E: EthSpec> {
199199
inner: Uvi<usize>,
200200
len: Option<usize>,
201201
protocol: ProtocolId,
@@ -322,10 +322,10 @@ impl<E: EthSpec> SSZSnappyOutboundCodec<E> {
322322
}
323323

324324
// Encoder for outbound streams: Encodes RPC Requests to peers
325-
impl<E: EthSpec> Encoder<RequestType> for SSZSnappyOutboundCodec<E> {
325+
impl<E: EthSpec> Encoder<RequestType<E>> for SSZSnappyOutboundCodec<E> {
326326
type Error = RPCError;
327327

328-
fn encode(&mut self, item: RequestType, dst: &mut BytesMut) -> Result<(), Self::Error> {
328+
fn encode(&mut self, item: RequestType<E>, dst: &mut BytesMut) -> Result<(), Self::Error> {
329329
let bytes = match item {
330330
RequestType::Status(req) => req.as_ssz_bytes(),
331331
RequestType::Goodbye(req) => req.as_ssz_bytes(),
@@ -549,11 +549,11 @@ fn handle_length(
549549
/// Decodes an `InboundRequest` from the byte stream.
550550
/// `decoded_buffer` should be an ssz-encoded bytestream with
551551
// length = length-prefix received in the beginning of the stream.
552-
fn handle_rpc_request(
552+
fn handle_rpc_request<E: EthSpec>(
553553
versioned_protocol: SupportedProtocol,
554554
decoded_buffer: &[u8],
555555
spec: &ChainSpec,
556-
) -> Result<Option<RequestType>, RPCError> {
556+
) -> Result<Option<RequestType<E>>, RPCError> {
557557
match versioned_protocol {
558558
SupportedProtocol::StatusV1 => Ok(Some(RequestType::Status(
559559
StatusMessage::from_ssz_bytes(decoded_buffer)?,
@@ -1035,7 +1035,6 @@ mod tests {
10351035
BlobsByRangeRequest {
10361036
start_slot: 0,
10371037
count: 10,
1038-
max_blobs_per_block: Spec::max_blobs_per_block(),
10391038
}
10401039
}
10411040

@@ -1181,7 +1180,7 @@ mod tests {
11811180
}
11821181

11831182
/// Verifies that requests we send are encoded in a way that we would correctly decode too.
1184-
fn encode_then_decode_request(req: RequestType, fork_name: ForkName, spec: &ChainSpec) {
1183+
fn encode_then_decode_request(req: RequestType<Spec>, fork_name: ForkName, spec: &ChainSpec) {
11851184
let fork_context = Arc::new(fork_context(fork_name));
11861185
let max_packet_size = max_rpc_size(&fork_context, spec.max_chunk_size as usize);
11871186
let protocol = ProtocolId::new(req.versioned_protocol(), Encoding::SSZSnappy);
@@ -1778,7 +1777,7 @@ mod tests {
17781777
fn test_encode_then_decode_request() {
17791778
let chain_spec = Spec::default_spec();
17801779

1781-
let requests: &[RequestType] = &[
1780+
let requests: &[RequestType<Spec>] = &[
17821781
RequestType::Ping(ping_message()),
17831782
RequestType::Status(status_message()),
17841783
RequestType::Goodbye(GoodbyeReason::Fault),

beacon_node/lighthouse_network/src/rpc/handler.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use slog::{crit, debug, trace};
2020
use smallvec::SmallVec;
2121
use std::{
2222
collections::{hash_map::Entry, VecDeque},
23-
marker::PhantomData,
2423
pin::Pin,
2524
sync::Arc,
2625
task::{Context, Poll},
@@ -97,7 +96,7 @@ where
9796
events_out: SmallVec<[HandlerEvent<Id, E>; 4]>,
9897

9998
/// Queue of outbound substreams to open.
100-
dial_queue: SmallVec<[(Id, RequestType); 4]>,
99+
dial_queue: SmallVec<[(Id, RequestType<E>); 4]>,
101100

102101
/// Current number of concurrent outbound substreams being opened.
103102
dial_negotiated: u32,
@@ -207,7 +206,7 @@ pub enum OutboundSubstreamState<E: EthSpec> {
207206
/// The framed negotiated substream.
208207
substream: Box<OutboundFramed<Stream, E>>,
209208
/// Keeps track of the actual request sent.
210-
request: RequestType,
209+
request: RequestType<E>,
211210
},
212211
/// Closing an outbound substream>
213212
Closing(Box<OutboundFramed<Stream, E>>),
@@ -275,7 +274,7 @@ where
275274
}
276275

277276
/// Opens an outbound substream with a request.
278-
fn send_request(&mut self, id: Id, req: RequestType) {
277+
fn send_request(&mut self, id: Id, req: RequestType<E>) {
279278
match self.state {
280279
HandlerState::Active => {
281280
self.dial_queue.push((id, req));
@@ -331,7 +330,7 @@ where
331330
type ToBehaviour = HandlerEvent<Id, E>;
332331
type InboundProtocol = RPCProtocol<E>;
333332
type OutboundProtocol = OutboundRequestContainer<E>;
334-
type OutboundOpenInfo = (Id, RequestType); // Keep track of the id and the request
333+
type OutboundOpenInfo = (Id, RequestType<E>); // Keep track of the id and the request
335334
type InboundOpenInfo = ();
336335

337336
fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
@@ -789,7 +788,6 @@ where
789788
req: req.clone(),
790789
fork_context: self.fork_context.clone(),
791790
max_rpc_size: self.listen_protocol().upgrade().max_rpc_size,
792-
phantom: PhantomData,
793791
},
794792
(),
795793
)
@@ -907,7 +905,7 @@ where
907905
fn on_fully_negotiated_outbound(
908906
&mut self,
909907
substream: OutboundFramed<Stream, E>,
910-
(id, request): (Id, RequestType),
908+
(id, request): (Id, RequestType<E>),
911909
) {
912910
self.dial_negotiated -= 1;
913911
// Reset any io-retries counter.
@@ -963,7 +961,7 @@ where
963961
}
964962
fn on_dial_upgrade_error(
965963
&mut self,
966-
request_info: (Id, RequestType),
964+
request_info: (Id, RequestType<E>),
967965
error: StreamUpgradeError<RPCError>,
968966
) {
969967
let (id, req) = request_info;

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use ssz_derive::{Decode, Encode};
88
use ssz_types::{typenum::U256, VariableList};
99
use std::collections::BTreeMap;
1010
use std::fmt::Display;
11+
use std::marker::PhantomData;
1112
use std::ops::Deref;
1213
use std::sync::Arc;
1314
use strum::IntoStaticStr;
@@ -93,19 +94,27 @@ pub struct Ping {
9394
variant_attributes(derive(Clone, Debug, PartialEq, Serialize),)
9495
)]
9596
#[derive(Clone, Debug, PartialEq)]
96-
pub struct MetadataRequest;
97+
pub struct MetadataRequest<E: EthSpec> {
98+
_phantom_data: PhantomData<E>,
99+
}
97100

98-
impl MetadataRequest {
101+
impl<E: EthSpec> MetadataRequest<E> {
99102
pub fn new_v1() -> Self {
100-
Self::V1(MetadataRequestV1 {})
103+
Self::V1(MetadataRequestV1 {
104+
_phantom_data: PhantomData,
105+
})
101106
}
102107

103108
pub fn new_v2() -> Self {
104-
Self::V2(MetadataRequestV2 {})
109+
Self::V2(MetadataRequestV2 {
110+
_phantom_data: PhantomData,
111+
})
105112
}
106113

107114
pub fn new_v3() -> Self {
108-
Self::V3(MetadataRequestV3 {})
115+
Self::V3(MetadataRequestV3 {
116+
_phantom_data: PhantomData,
117+
})
109118
}
110119
}
111120

@@ -315,14 +324,11 @@ pub struct BlobsByRangeRequest {
315324

316325
/// The number of slots from the start slot.
317326
pub count: u64,
318-
319-
/// maximum number of blobs in a single block.
320-
pub max_blobs_per_block: usize,
321327
}
322328

323329
impl BlobsByRangeRequest {
324-
pub fn max_blobs_requested(&self) -> u64 {
325-
self.count.saturating_mul(self.max_blobs_per_block as u64)
330+
pub fn max_blobs_requested<E: EthSpec>(&self) -> u64 {
331+
self.count.saturating_mul(E::max_blobs_per_block() as u64)
326332
}
327333
}
328334

@@ -338,7 +344,7 @@ pub struct DataColumnsByRangeRequest {
338344
}
339345

340346
impl DataColumnsByRangeRequest {
341-
pub fn max_requested(&self) -> u64 {
347+
pub fn max_requested<E: EthSpec>(&self) -> u64 {
342348
self.count.saturating_mul(self.columns.len() as u64)
343349
}
344350

beacon_node/lighthouse_network/src/rpc/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub enum RPCSend<Id, E: EthSpec> {
6161
///
6262
/// The `Id` is given by the application making the request. These
6363
/// go over *outbound* connections.
64-
Request(Id, RequestType),
64+
Request(Id, RequestType<E>),
6565
/// A response sent from Lighthouse.
6666
///
6767
/// The `SubstreamId` must correspond to the RPC-given ID of the original request received from the
@@ -79,7 +79,7 @@ pub enum RPCReceived<Id, E: EthSpec> {
7979
///
8080
/// The `SubstreamId` is given by the `RPCHandler` as it identifies this request with the
8181
/// *inbound* substream over which it is managed.
82-
Request(Request),
82+
Request(Request<E>),
8383
/// A response received from the outside.
8484
///
8585
/// The `Id` corresponds to the application given ID of the original request sent to the
@@ -113,10 +113,10 @@ impl RequestId {
113113

114114
/// An Rpc Request.
115115
#[derive(Debug, Clone)]
116-
pub struct Request {
116+
pub struct Request<E: EthSpec> {
117117
pub id: RequestId,
118118
pub substream_id: SubstreamId,
119-
pub r#type: RequestType,
119+
pub r#type: RequestType<E>,
120120
}
121121

122122
impl<E: EthSpec, Id: std::fmt::Debug> std::fmt::Display for RPCSend<Id, E> {
@@ -221,7 +221,7 @@ impl<Id: ReqId, E: EthSpec> RPC<Id, E> {
221221
/// Submits an RPC request.
222222
///
223223
/// The peer must be connected for this to succeed.
224-
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType) {
224+
pub fn send_request(&mut self, peer_id: PeerId, request_id: Id, req: RequestType<E>) {
225225
let event = if let Some(self_limiter) = self.self_limiter.as_mut() {
226226
match self_limiter.allows(peer_id, request_id, req) {
227227
Ok(event) => event,

beacon_node/lighthouse_network/src/rpc/outbound.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ use futures::future::BoxFuture;
77
use futures::prelude::{AsyncRead, AsyncWrite};
88
use futures::{FutureExt, SinkExt};
99
use libp2p::core::{OutboundUpgrade, UpgradeInfo};
10-
use std::marker::PhantomData;
1110
use std::sync::Arc;
1211
use tokio_util::{
1312
codec::Framed,
@@ -20,14 +19,13 @@ use types::{EthSpec, ForkContext};
2019
// `OutboundUpgrade`
2120

2221
#[derive(Debug, Clone)]
23-
pub struct OutboundRequestContainer<E> {
24-
pub req: RequestType,
22+
pub struct OutboundRequestContainer<E: EthSpec> {
23+
pub req: RequestType<E>,
2524
pub fork_context: Arc<ForkContext>,
2625
pub max_rpc_size: usize,
27-
pub phantom: PhantomData<E>,
2826
}
2927

30-
impl<E> UpgradeInfo for OutboundRequestContainer<E> {
28+
impl<E: EthSpec> UpgradeInfo for OutboundRequestContainer<E> {
3129
type Info = ProtocolId;
3230
type InfoIter = Vec<Self::Info>;
3331

beacon_node/lighthouse_network/src/rpc/protocol.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -686,7 +686,7 @@ pub fn rpc_data_column_limits<E: EthSpec>() -> RpcLimits {
686686
// The inbound protocol reads the request, decodes it and returns the stream to the protocol
687687
// handler to respond to once ready.
688688

689-
pub type InboundOutput<TSocket, E> = (RequestType, InboundFramed<TSocket, E>);
689+
pub type InboundOutput<TSocket, E> = (RequestType<E>, InboundFramed<TSocket, E>);
690690
pub type InboundFramed<TSocket, E> =
691691
Framed<std::pin::Pin<Box<TimeoutStream<Compat<TSocket>>>>, SSZSnappyInboundCodec<E>>;
692692

@@ -754,7 +754,7 @@ where
754754
}
755755

756756
#[derive(Debug, Clone, PartialEq)]
757-
pub enum RequestType {
757+
pub enum RequestType<E: EthSpec> {
758758
Status(StatusMessage),
759759
Goodbye(GoodbyeReason),
760760
BlocksByRange(OldBlocksByRangeRequest),
@@ -768,11 +768,11 @@ pub enum RequestType {
768768
LightClientFinalityUpdate,
769769
LightClientUpdatesByRange(LightClientUpdatesByRangeRequest),
770770
Ping(Ping),
771-
MetaData(MetadataRequest),
771+
MetaData(MetadataRequest<E>),
772772
}
773773

774774
/// Implements the encoding per supported protocol for `RPCRequest`.
775-
impl RequestType {
775+
impl<E: EthSpec> RequestType<E> {
776776
/* These functions are used in the handler for stream management */
777777

778778
/// Maximum number of responses expected for this request.
@@ -782,10 +782,10 @@ impl RequestType {
782782
RequestType::Goodbye(_) => 0,
783783
RequestType::BlocksByRange(req) => *req.count(),
784784
RequestType::BlocksByRoot(req) => req.block_roots().len() as u64,
785-
RequestType::BlobsByRange(req) => req.max_blobs_requested(),
785+
RequestType::BlobsByRange(req) => req.max_blobs_requested::<E>(),
786786
RequestType::BlobsByRoot(req) => req.blob_ids.len() as u64,
787787
RequestType::DataColumnsByRoot(req) => req.data_column_ids.len() as u64,
788-
RequestType::DataColumnsByRange(req) => req.max_requested(),
788+
RequestType::DataColumnsByRange(req) => req.max_requested::<E>(),
789789
RequestType::Ping(_) => 1,
790790
RequestType::MetaData(_) => 1,
791791
RequestType::LightClientBootstrap(_) => 1,
@@ -1027,7 +1027,7 @@ impl std::error::Error for RPCError {
10271027
}
10281028
}
10291029

1030-
impl std::fmt::Display for RequestType {
1030+
impl<E: EthSpec> std::fmt::Display for RequestType<E> {
10311031
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
10321032
match self {
10331033
RequestType::Status(status) => write!(f, "Status Message: {}", status),

beacon_node/lighthouse_network/src/rpc/rate_limiter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::pin::Pin;
99
use std::task::{Context, Poll};
1010
use std::time::{Duration, Instant};
1111
use tokio::time::Interval;
12+
use types::EthSpec;
1213

1314
/// Nanoseconds since a given time.
1415
// Maintained as u64 to reduce footprint
@@ -261,7 +262,7 @@ pub trait RateLimiterItem {
261262
fn max_responses(&self) -> u64;
262263
}
263264

264-
impl RateLimiterItem for super::RequestType {
265+
impl<E: EthSpec> RateLimiterItem for super::RequestType<E> {
265266
fn protocol(&self) -> Protocol {
266267
self.versioned_protocol().protocol()
267268
}

beacon_node/lighthouse_network/src/rpc/self_limiter.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@ use super::{
1919

2020
/// A request that was rate limited or waiting on rate limited requests for the same peer and
2121
/// protocol.
22-
struct QueuedRequest<Id: ReqId> {
23-
req: RequestType,
22+
struct QueuedRequest<Id: ReqId, E: EthSpec> {
23+
req: RequestType<E>,
2424
request_id: Id,
2525
}
2626

2727
pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
2828
/// Requests queued for sending per peer. This requests are stored when the self rate
2929
/// limiter rejects them. Rate limiting is based on a Peer and Protocol basis, therefore
3030
/// are stored in the same way.
31-
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id>>>,
31+
delayed_requests: HashMap<(PeerId, Protocol), VecDeque<QueuedRequest<Id, E>>>,
3232
/// The delay required to allow a peer's outbound request per protocol.
3333
next_peer_request: DelayQueue<(PeerId, Protocol)>,
3434
/// Rate limiter for our own requests.
@@ -70,7 +70,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
7070
&mut self,
7171
peer_id: PeerId,
7272
request_id: Id,
73-
req: RequestType,
73+
req: RequestType<E>,
7474
) -> Result<BehaviourAction<Id, E>, Error> {
7575
let protocol = req.versioned_protocol().protocol();
7676
// First check that there are not already other requests waiting to be sent.
@@ -101,9 +101,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
101101
limiter: &mut RateLimiter,
102102
peer_id: PeerId,
103103
request_id: Id,
104-
req: RequestType,
104+
req: RequestType<E>,
105105
log: &Logger,
106-
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id>, Duration)> {
106+
) -> Result<BehaviourAction<Id, E>, (QueuedRequest<Id, E>, Duration)> {
107107
match limiter.allows(&peer_id, &req) {
108108
Ok(()) => Ok(BehaviourAction::NotifyHandler {
109109
peer_id,

0 commit comments

Comments
 (0)