Skip to content

Commit 4497c48

Browse files
committed
immediately acknowledge GRPC subscriptions
1 parent 58490f0 commit 4497c48

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

linera-core/src/notifier.rs

+18-4
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,27 @@ impl<N> Default for ChannelNotifier<N> {
2828
}
2929

3030
impl<N> ChannelNotifier<N> {
31-
/// Creates a subscription given a collection of ChainIds and a sender to the client.
32-
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
33-
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
31+
fn add_sender(&self, chain_ids: Vec<ChainId>, sender: &UnboundedSender<N>) {
3432
for id in chain_ids {
3533
let mut senders = self.inner.entry(id).or_default();
36-
senders.push(tx.clone());
34+
senders.push(sender.clone());
3735
}
36+
}
37+
38+
/// Creates a subscription given a collection of ChainIds and a sender to the client.
39+
pub fn subscribe(&self, chain_ids: Vec<ChainId>) -> UnboundedReceiver<N> {
40+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
41+
self.add_sender(chain_ids, &tx);
42+
rx
43+
}
44+
45+
/// Creates a subscription given a collection of ChainIds and a sender to the client.
46+
/// Immediately posts a first notification as a ACK.
47+
pub fn subscribe_with_ack(&self, chain_ids: Vec<ChainId>, ack: N) -> UnboundedReceiver<N> {
48+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
49+
self.add_sender(chain_ids, &tx);
50+
tx.send(ack)
51+
.expect("pushing to a new channel should succeed");
3852
rx
3953
}
4054
}

linera-rpc/src/grpc/client.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use linera_core::{
1616
};
1717
use linera_version::VersionInfo;
1818
use tonic::{Code, IntoRequest, Request, Status};
19-
use tracing::{debug, error, info, instrument};
19+
use tracing::{debug, error, info, instrument, warn};
2020
#[cfg(not(web))]
2121
use {
2222
super::GrpcProtoConversionError,
@@ -258,7 +258,7 @@ impl ValidatorNode for GrpcClient {
258258
// terminates after unexpected or fatal errors.
259259
let notification_stream = endlessly_retrying_notification_stream
260260
.map(|result| {
261-
Notification::try_from(result?).map_err(|err| {
261+
Option::<Notification>::try_from(result?).map_err(|err| {
262262
let message = format!("Could not deserialize notification: {}", err);
263263
tonic::Status::new(Code::Internal, message)
264264
})
@@ -278,7 +278,16 @@ impl ValidatorNode for GrpcClient {
278278
true
279279
})
280280
})
281-
.filter_map(|result| future::ready(result.ok()));
281+
.filter_map(|result| {
282+
future::ready(match result {
283+
Ok(notification @ Some(_)) => notification,
284+
Ok(None) => None,
285+
Err(err) => {
286+
warn!("{}", err);
287+
None
288+
}
289+
})
290+
});
282291

283292
Ok(Box::pin(notification_stream))
284293
}

linera-rpc/src/grpc/conversions.rs

+17-6
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,18 @@ impl TryFrom<Notification> for api::Notification {
137137
}
138138
}
139139

140-
impl TryFrom<api::Notification> for Notification {
140+
impl TryFrom<api::Notification> for Option<Notification> {
141141
type Error = GrpcProtoConversionError;
142142

143143
fn try_from(notification: api::Notification) -> Result<Self, Self::Error> {
144-
Ok(Self {
145-
chain_id: try_proto_convert(notification.chain_id)?,
146-
reason: bincode::deserialize(&notification.reason)?,
147-
})
144+
if notification.chain_id.is_none() && notification.reason.is_empty() {
145+
Ok(None)
146+
} else {
147+
Ok(Some(Notification {
148+
chain_id: try_proto_convert(notification.chain_id)?,
149+
reason: bincode::deserialize(&notification.reason)?,
150+
}))
151+
}
148152
}
149153
}
150154

@@ -858,6 +862,13 @@ pub mod tests {
858862
hash: CryptoHash::new(&Foo("".into())),
859863
},
860864
};
861-
round_trip_check::<_, api::Notification>(notification);
865+
let message = api::Notification::try_from(notification.clone()).unwrap();
866+
assert_eq!(
867+
Some(notification),
868+
Option::<Notification>::try_from(message).unwrap()
869+
);
870+
871+
let ack = api::Notification::default();
872+
assert_eq!(None, Option::<Notification>::try_from(ack).unwrap());
862873
}
863874
}

linera-service/src/proxy/grpc.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,12 @@ where
404404
.into_iter()
405405
.map(ChainId::try_from)
406406
.collect::<Result<Vec<ChainId>, _>>()?;
407-
let rx = self.0.notifier.subscribe(chain_ids);
407+
// The empty notification seems to be needed in some cases to force
408+
// completion of HTTP2 headers.
409+
let rx = self
410+
.0
411+
.notifier
412+
.subscribe_with_ack(chain_ids, Ok(Notification::default()));
408413
Ok(Response::new(UnboundedReceiverStream::new(rx)))
409414
}
410415

0 commit comments

Comments
 (0)