Skip to content

Commit dbbfd59

Browse files
committed
immediately acknowledge GRPC subscriptions
1 parent 21a0637 commit dbbfd59

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

linera-core/src/notifier.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,27 @@ impl<N> Default for ChannelNotifier<N> {
2828
}
2929

3030
impl<N> ChannelNotifier<N> {
31-
/// Creates a subscription given a collection of ChainIds and a sender to the client.
32-
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
33-
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
31+
fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<N>) {
3432
for id in chain_ids {
3533
let mut senders = self.inner.entry(id).or_default();
36-
senders.push(tx.clone());
34+
senders.push(sender.clone());
3735
}
36+
}
37+
38+
/// Creates a subscription given a collection of ChainIds and a sender to the client.
39+
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
40+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
41+
self.add_sender(chain_ids, &tx);
42+
rx
43+
}
44+
45+
/// Creates a subscription given a collection of ChainIds and a sender to the client.
46+
/// Immediately posts a first notification as a ACK.
47+
pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<N> {
48+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
49+
self.add_sender(chain_ids, &tx);
50+
tx.send(ack)
51+
.expect("pushing to a new channel should succeed");
3852
rx
3953
}
4054
}

linera-rpc/src/grpc/client.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use linera_core::{
1717
};
1818
use linera_version::VersionInfo;
1919
use tonic::{Code, IntoRequest, Request, Status};
20-
use tracing::{debug, error, info, instrument};
20+
use tracing::{debug, error, info, instrument, warn};
2121
#[cfg(not(web))]
2222
use {
2323
super::GrpcProtoConversionError,
@@ -259,7 +259,7 @@ impl ValidatorNode for GrpcClient {
259259
// terminates after unexpected or fatal errors.
260260
let notification_stream = endlessly_retrying_notification_stream
261261
.map(|result| {
262-
Notification::try_from(result?).map_err(|err| {
262+
Option::<Notification>::try_from(result?).map_err(|err| {
263263
let message = format!("Could not deserialize notification: {}", err);
264264
tonic::Status::new(Code::Internal, message)
265265
})
@@ -279,7 +279,16 @@ impl ValidatorNode for GrpcClient {
279279
true
280280
})
281281
})
282-
.filter_map(|result| future::ready(result.ok()));
282+
.filter_map(|result| {
283+
future::ready(match result {
284+
Ok(notification @ Some(_)) => notification,
285+
Ok(None) => None,
286+
Err(err) => {
287+
warn!("{}", err);
288+
None
289+
}
290+
})
291+
});
283292

284293
Ok(Box::pin(notification_stream))
285294
}

linera-rpc/src/grpc/conversions.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,18 @@ impl TryFrom<Notification> for api::Notification {
143143
}
144144
}
145145

146-
impl TryFrom<api::Notification> for Notification {
146+
impl TryFrom<api::Notification> for Option<Notification> {
147147
type Error = GrpcProtoConversionError;
148148

149149
fn try_from(notification: api::Notification) -> Result<Self, Self::Error> {
150-
Ok(Self {
151-
chain_id: try_proto_convert(notification.chain_id)?,
152-
reason: bincode::deserialize(&notification.reason)?,
153-
})
150+
if notification.chain_id.is_none() && notification.reason.is_empty() {
151+
Ok(None)
152+
} else {
153+
Ok(Some(Notification {
154+
chain_id: try_proto_convert(notification.chain_id)?,
155+
reason: bincode::deserialize(&notification.reason)?,
156+
}))
157+
}
154158
}
155159
}
156160

@@ -903,6 +907,13 @@ pub mod tests {
903907
hash: CryptoHash::new(&Foo("".into())),
904908
},
905909
};
906-
round_trip_check::<_, api::Notification>(notification);
910+
let message = api::Notification::try_from(notification.clone()).unwrap();
911+
assert_eq!(
912+
Some(notification),
913+
Option::<Notification>::try_from(message).unwrap()
914+
);
915+
916+
let ack = api::Notification::default();
917+
assert_eq!(None, Option::<Notification>::try_from(ack).unwrap());
907918
}
908919
}

linera-service/src/proxy/grpc.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,12 @@ where
406406
.into_iter()
407407
.map(ChainId::try_from)
408408
.collect::<Result<Vec<ChainId>, _>>()?;
409-
let rx = self.0.notifier.subscribe(chain_ids);
409+
// The empty notification seems to be needed in some cases to force
410+
// completion of HTTP2 headers.
411+
let rx = self
412+
.0
413+
.notifier
414+
.subscribe_with_ack(chain_ids, Ok(Notification::default()));
410415
Ok(Response::new(UnboundedReceiverStream::new(rx)))
411416
}
412417

0 commit comments

Comments
 (0)