Skip to content

Commit f293c2e

Browse files
authored
Simplify notification handling in client.rs (#2469)
* Simplify notification handling in client.rs * Revert stricter check, since it fails in the tests. * Remove pub; move trait bounds to where they are needed.
1 parent ea0d59a commit f293c2e

File tree

1 file changed

+66
-54
lines changed

1 file changed

+66
-54
lines changed

linera-core/src/client.rs

+66-54
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,57 @@ impl<P, S: Storage + Clone> Client<P, S> {
209209
}
210210
}
211211

212+
impl<P, S> Client<P, S>
213+
where
214+
P: LocalValidatorNodeProvider + Sync + 'static,
215+
S: Storage + Sync + Send + Clone + 'static,
216+
{
217+
async fn download_certificates(
218+
&self,
219+
nodes: &[(ValidatorName, P::Node)],
220+
chain_id: ChainId,
221+
height: BlockHeight,
222+
) -> Result<Box<ChainInfo>, LocalNodeError> {
223+
let mut notifications = Vec::<Notification>::new();
224+
let info = self
225+
.local_node
226+
.download_certificates(nodes, chain_id, height, &mut notifications)
227+
.await?;
228+
self.notifier.handle_notifications(&notifications);
229+
Ok(info)
230+
}
231+
232+
async fn try_process_certificates(
233+
&self,
234+
name: &ValidatorName,
235+
node: &impl LocalValidatorNode,
236+
chain_id: ChainId,
237+
certificates: Vec<Certificate>,
238+
) -> Option<Box<ChainInfo>> {
239+
let mut notifications = Vec::<Notification>::new();
240+
let result = self
241+
.local_node
242+
.try_process_certificates(name, node, chain_id, certificates, &mut notifications)
243+
.await;
244+
self.notifier.handle_notifications(&notifications);
245+
result
246+
}
247+
248+
async fn handle_certificate(
249+
&self,
250+
certificate: Certificate,
251+
blobs: Vec<Blob>,
252+
) -> Result<ChainInfoResponse, LocalNodeError> {
253+
let mut notifications = Vec::<Notification>::new();
254+
let result = self
255+
.local_node
256+
.handle_certificate(certificate, blobs, &mut notifications)
257+
.await;
258+
self.notifier.handle_notifications(&notifications);
259+
result
260+
}
261+
}
262+
212263
/// Policies for automatically handling incoming messages.
213264
#[derive(Clone, Debug)]
214265
pub struct MessagePolicy {
@@ -769,13 +820,6 @@ where
769820
.copy())
770821
}
771822

772-
#[tracing::instrument(level = "trace", skip(notifications))]
773-
/// Notifies subscribers and clears the `notifications`.
774-
fn handle_notifications(&self, notifications: &mut Vec<Notification>) {
775-
self.client.notifier.handle_notifications(notifications);
776-
notifications.clear();
777-
}
778-
779823
#[tracing::instrument(level = "trace")]
780824
/// Obtains the public key associated to the current identity.
781825
pub async fn public_key(&self) -> Result<PublicKey, ChainClientError> {
@@ -791,13 +835,10 @@ where
791835
// network.
792836
let next_block_height = self.state().next_block_height;
793837
let nodes = self.validator_nodes().await?;
794-
let mut notifications = vec![];
795838
let mut info = self
796839
.client
797-
.local_node
798-
.download_certificates(&nodes, self.chain_id, next_block_height, &mut notifications)
840+
.download_certificates(&nodes, self.chain_id, next_block_height)
799841
.await?;
800-
self.handle_notifications(&mut notifications);
801842
if info.next_block_height == next_block_height {
802843
// Check that our local node has the expected block hash.
803844
ensure!(
@@ -806,17 +847,13 @@ where
806847
);
807848
}
808849
let ownership = &info.manager.ownership;
809-
let keys: std::collections::HashSet<_> =
810-
self.state().known_key_pairs.keys().cloned().collect();
850+
let keys: HashSet<_> = self.state().known_key_pairs.keys().cloned().collect();
811851
if ownership.all_owners().any(|owner| !keys.contains(owner)) {
812852
// For chains with any owner other than ourselves, we could be missing recent
813853
// certificates created by other owners. Further synchronize blocks from the network.
814854
// This is a best-effort that depends on network conditions.
815855
let nodes = self.validator_nodes().await?;
816-
info = self
817-
.synchronize_chain_state(&nodes, self.chain_id, &mut notifications)
818-
.await?;
819-
self.handle_notifications(&mut notifications);
856+
info = self.synchronize_chain_state(&nodes, self.chain_id).await?;
820857
}
821858
self.update_from_info(&info);
822859
Ok(info)
@@ -1019,12 +1056,9 @@ where
10191056
.validator_node_provider
10201057
.make_nodes(remote_committee)?
10211058
.collect();
1022-
let mut notifications = vec![];
10231059
self.client
1024-
.local_node
1025-
.download_certificates(&nodes, block.chain_id, block.height, &mut notifications)
1060+
.download_certificates(&nodes, block.chain_id, block.height)
10261061
.await?;
1027-
self.handle_notifications(&mut notifications);
10281062
// Process the received operations. Download required hashed certificate values if necessary.
10291063
if let Err(err) = self.process_certificate(certificate.clone(), vec![]).await {
10301064
match &err {
@@ -1186,11 +1220,9 @@ where
11861220
.validator_node_provider
11871221
.make_nodes(&local_committee)?
11881222
.collect();
1189-
let mut notifications = vec![];
11901223
// Synchronize the state of the admin chain from the network.
1191-
self.synchronize_chain_state(&nodes, self.admin_id(), &mut notifications)
1224+
self.synchronize_chain_state(&nodes, self.admin_id())
11921225
.await?;
1193-
self.handle_notifications(&mut notifications);
11941226
let node_client = self.client.local_node.clone();
11951227
// Now we should have a complete view of all committees in the system.
11961228
let (committees, max_epoch) = self.known_committees().await?;
@@ -1285,14 +1317,11 @@ where
12851317
certificate: Certificate,
12861318
blobs: Vec<Blob>,
12871319
) -> Result<(), LocalNodeError> {
1288-
let mut notifications = vec![];
12891320
let info = self
12901321
.client
1291-
.local_node
1292-
.handle_certificate(certificate, blobs, &mut notifications)
1322+
.handle_certificate(certificate, blobs)
12931323
.await?
12941324
.info;
1295-
self.handle_notifications(&mut notifications);
12961325
self.update_from_info(&info);
12971326
Ok(())
12981327
}
@@ -1358,29 +1387,22 @@ where
13581387
&self,
13591388
validators: &[(ValidatorName, impl LocalValidatorNode)],
13601389
chain_id: ChainId,
1361-
notifications: &mut impl Extend<Notification>,
13621390
) -> Result<Box<ChainInfo>, ChainClientError> {
13631391
let mut futures = vec![];
13641392

13651393
for (name, node) in validators {
13661394
let client = self.clone();
1367-
let mut notifications = vec![];
13681395
futures.push(async move {
1369-
(
1370-
client
1371-
.try_synchronize_chain_state_from(name, node, chain_id, &mut notifications)
1372-
.await,
1373-
notifications,
1374-
)
1396+
client
1397+
.try_synchronize_chain_state_from(name, node, chain_id)
1398+
.await
13751399
});
13761400
}
13771401

1378-
for (result, new_notifications) in future::join_all(futures).await {
1402+
for result in future::join_all(futures).await {
13791403
if let Err(e) = result {
13801404
error!(?e, "Error synchronizing chain state");
13811405
}
1382-
1383-
notifications.extend(new_notifications);
13841406
}
13851407

13861408
self.client
@@ -1390,15 +1412,14 @@ where
13901412
.map_err(Into::into)
13911413
}
13921414

1393-
#[tracing::instrument(level = "trace", skip(self, name, node, chain_id, notifications))]
1415+
#[tracing::instrument(level = "trace", skip(self, name, node, chain_id))]
13941416
/// Downloads any certificates from the specified validator that we are missing for the given
13951417
/// chain, and processes them.
13961418
pub async fn try_synchronize_chain_state_from(
13971419
&self,
13981420
name: &ValidatorName,
13991421
node: &impl LocalValidatorNode,
14001422
chain_id: ChainId,
1401-
notifications: &mut impl Extend<Notification>,
14021423
) -> Result<(), ChainClientError> {
14031424
let local_info = self.client.local_node.local_chain_info(chain_id).await?;
14041425
let range = BlockHeightRange {
@@ -1431,8 +1452,7 @@ where
14311452
if !certificates.is_empty()
14321453
&& self
14331454
.client
1434-
.local_node
1435-
.try_process_certificates(name, node, chain_id, certificates, notifications)
1455+
.try_process_certificates(name, node, chain_id, certificates)
14361456
.await
14371457
.is_none()
14381458
{
@@ -1485,11 +1505,7 @@ where
14851505
}
14861506
let hash = cert.hash();
14871507
let mut blobs = vec![];
1488-
while let Err(original_err) = self
1489-
.client
1490-
.local_node
1491-
.handle_certificate(*cert.clone(), blobs, notifications)
1492-
.await
1508+
while let Err(original_err) = self.client.handle_certificate(*cert.clone(), blobs).await
14931509
{
14941510
if let LocalNodeError::WorkerError(WorkerError::BlobsNotFound(blob_ids)) =
14951511
&original_err
@@ -2878,7 +2894,6 @@ where
28782894
}
28792895
}
28802896
Reason::NewBlock { height, .. } => {
2881-
let mut notifications = vec![];
28822897
let chain_id = notification.chain_id;
28832898
if self
28842899
.local_next_block_height(chain_id, &mut local_node)
@@ -2889,13 +2904,12 @@ where
28892904
return;
28902905
}
28912906
if let Err(error) = self
2892-
.try_synchronize_chain_state_from(&name, &node, chain_id, &mut notifications)
2907+
.try_synchronize_chain_state_from(&name, &node, chain_id)
28932908
.await
28942909
{
28952910
error!("Fail to process notification: {error}");
28962911
return;
28972912
}
2898-
self.handle_notifications(&mut notifications);
28992913
let local_height = self
29002914
.local_next_block_height(chain_id, &mut local_node)
29012915
.await;
@@ -2904,7 +2918,6 @@ where
29042918
}
29052919
}
29062920
Reason::NewRound { height, round } => {
2907-
let mut notifications = vec![];
29082921
let chain_id = notification.chain_id;
29092922
if let Some(info) = self.local_chain_info(chain_id, &mut local_node).await {
29102923
if (info.next_block_height, info.manager.current_round) >= (height, round) {
@@ -2913,13 +2926,12 @@ where
29132926
}
29142927
}
29152928
if let Err(error) = self
2916-
.try_synchronize_chain_state_from(&name, &node, chain_id, &mut notifications)
2929+
.try_synchronize_chain_state_from(&name, &node, chain_id)
29172930
.await
29182931
{
29192932
error!("Fail to process notification: {error}");
29202933
return;
29212934
}
2922-
self.handle_notifications(&mut notifications);
29232935
let Some(info) = self.local_chain_info(chain_id, &mut local_node).await else {
29242936
error!("Fail to read local chain info for {chain_id}");
29252937
return;

0 commit comments

Comments
 (0)