Skip to content

Commit 6be86a9

Browse files
authored
Faucet has a chain listener (#2410)
* Faucet has a chain listener * Decouple ChainListener from ChainClient for notifications * Faucet uses ChainClients * Updated tests * Formatting * ChainClients created from Iterator instead of Context * Update CLI.md
1 parent 3959fa9 commit 6be86a9

File tree

11 files changed

+173
-53
lines changed

11 files changed

+173
-53
lines changed

CLI.md

+7
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,13 @@ Run a GraphQL service that exposes a faucet where users can claim tokens. This g
525525
Default value: `8080`
526526
* `--amount <AMOUNT>` — The number of tokens to send to each new chain
527527
* `--limit-rate-until <LIMIT_RATE_UNTIL>` — The end timestamp: The faucet will rate-limit the token supply so it runs out of money no earlier than this
528+
* `--listener-skip-process-inbox` — Do not create blocks automatically to receive incoming messages. Instead, wait for an explicit mutation `processInbox`
529+
* `--listener-delay-before-ms <DELAY_BEFORE_MS>` — Wait before processing any notification (useful for testing)
530+
531+
Default value: `0`
532+
* `--listener-delay-after-ms <DELAY_AFTER_MS>` — Wait after processing any notification (useful for rate limiting)
533+
534+
Default value: `0`
528535

529536

530537

linera-client/src/chain_clients.rs

+35-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use linera_base::identifiers::ChainId;
88
use linera_core::client::ChainClient;
99
use linera_storage::Storage;
1010

11-
use crate::error::{self, Error};
11+
use crate::{
12+
chain_listener::ClientContext,
13+
error::{self, Error},
14+
};
1215

1316
pub type ClientMapInner<P, S> = BTreeMap<ChainId, ChainClient<P, S>>;
1417
pub struct ChainClients<P, S>(pub Arc<Mutex<ClientMapInner<P, S>>>)
@@ -24,18 +27,10 @@ where
2427
}
2528
}
2629

27-
impl<P, S> Default for ChainClients<P, S>
28-
where
29-
S: Storage,
30-
{
31-
fn default() -> Self {
32-
Self(Arc::new(Mutex::new(BTreeMap::new())))
33-
}
34-
}
35-
3630
impl<P, S> ChainClients<P, S>
3731
where
3832
S: Storage,
33+
P: 'static,
3934
{
4035
async fn client(&self, chain_id: &ChainId) -> Option<ChainClient<P, S>> {
4136
Some(self.0.lock().await.get(chain_id)?.clone())
@@ -54,4 +49,34 @@ where
5449
pub async fn map_lock(&self) -> MutexGuard<ClientMapInner<P, S>> {
5550
self.0.lock().await
5651
}
52+
53+
pub async fn add_client(&self, client: ChainClient<P, S>) {
54+
self.0.lock().await.insert(client.chain_id(), client);
55+
}
56+
57+
pub async fn request_client(
58+
&self,
59+
chain_id: ChainId,
60+
context: Arc<Mutex<impl ClientContext<ValidatorNodeProvider = P, Storage = S>>>,
61+
) -> ChainClient<P, S> {
62+
let mut guard = self.0.lock().await;
63+
match guard.get(&chain_id) {
64+
Some(client) => client.clone(),
65+
None => {
66+
let context = context.lock().await;
67+
let client = context.make_chain_client(chain_id);
68+
guard.insert(chain_id, client.clone());
69+
client
70+
}
71+
}
72+
}
73+
74+
pub async fn from_clients(chains: impl IntoIterator<Item = ChainClient<P, S>>) -> Self {
75+
let chain_clients = Self(Default::default());
76+
for chain_client in chains {
77+
let mut map_guard = chain_clients.map_lock().await;
78+
map_guard.insert(chain_client.chain_id(), chain_client);
79+
}
80+
chain_clients
81+
}
5782
}

linera-client/src/chain_listener.rs

+32-16
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// Copyright (c) Zefchain Labs, Inc.
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use std::{collections::btree_map, sync::Arc, time::Duration};
4+
use std::{collections::HashSet, sync::Arc, time::Duration};
55

66
use async_trait::async_trait;
77
use futures::{
@@ -81,6 +81,14 @@ pub trait ClientContext {
8181
&mut self,
8282
client: &ChainClient<Self::ValidatorNodeProvider, Self::Storage>,
8383
) -> Result<(), Error>;
84+
85+
fn clients(&self) -> Vec<ChainClient<Self::ValidatorNodeProvider, Self::Storage>> {
86+
let mut clients = vec![];
87+
for chain_id in &self.wallet().chain_ids() {
88+
clients.push(self.make_chain_client(*chain_id));
89+
}
90+
clients
91+
}
8492
}
8593

8694
/// A `ChainListener` is a process that listens to notifications from validators and reacts
@@ -91,6 +99,7 @@ where
9199
{
92100
config: ChainListenerConfig,
93101
clients: ChainClients<P, S>,
102+
listening: Arc<Mutex<HashSet<ChainId>>>,
94103
}
95104

96105
impl<P, S> ChainListener<P, S>
@@ -101,7 +110,11 @@ where
101110
{
102111
/// Creates a new chain listener given client chains.
103112
pub fn new(config: ChainListenerConfig, clients: ChainClients<P, S>) -> Self {
104-
Self { config, clients }
113+
Self {
114+
config,
115+
clients,
116+
listening: Default::default(),
117+
}
105118
}
106119

107120
/// Runs the chain listener.
@@ -117,6 +130,7 @@ where
117130
context.clone(),
118131
storage.clone(),
119132
self.config.clone(),
133+
self.listening.clone(),
120134
);
121135
}
122136
}
@@ -128,13 +142,15 @@ where
128142
context: Arc<Mutex<C>>,
129143
storage: S,
130144
config: ChainListenerConfig,
145+
listening: Arc<Mutex<HashSet<ChainId>>>,
131146
) where
132147
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
133148
{
134149
let _handle = linera_base::task::spawn(
135150
async move {
136151
if let Err(err) =
137-
Self::run_client_stream(chain_id, clients, context, storage, config).await
152+
Self::run_client_stream(chain_id, clients, context, storage, config, listening)
153+
.await
138154
{
139155
error!("Stream for chain {} failed: {}", chain_id, err);
140156
}
@@ -150,24 +166,23 @@ where
150166
context: Arc<Mutex<C>>,
151167
storage: S,
152168
config: ChainListenerConfig,
169+
listening: Arc<Mutex<HashSet<ChainId>>>,
153170
) -> Result<(), Error>
154171
where
155172
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
156173
{
157-
let client = {
158-
let mut map_guard = clients.map_lock().await;
159-
let context_guard = context.lock().await;
160-
let btree_map::Entry::Vacant(entry) = map_guard.entry(chain_id) else {
161-
// For every entry in the client map we are already listening to notifications, so
162-
// there's nothing to do. This can happen if we download a child before the parent
163-
// chain, and then process the OpenChain message in the parent.
164-
return Ok(());
165-
};
166-
let client = context_guard.make_chain_client(chain_id);
167-
entry.insert(client.clone());
168-
client
169-
};
174+
let mut guard = listening.lock().await;
175+
if guard.contains(&chain_id) {
176+
// If we are already listening to notifications, there's nothing to do.
177+
// This can happen if we download a child before the parent
178+
// chain, and then process the OpenChain message in the parent.
179+
return Ok(());
180+
}
181+
// If the client is not present, we can request it.
182+
let client = clients.request_client(chain_id, context.clone()).await;
170183
let (listener, _listen_handle, mut local_stream) = client.listen().await?;
184+
guard.insert(chain_id);
185+
drop(guard);
171186
client.synchronize_from_validators().await?;
172187
drop(linera_base::task::spawn(listener.in_current_span()));
173188
let mut timeout = storage.clock().current_time();
@@ -268,6 +283,7 @@ where
268283
context.clone(),
269284
storage.clone(),
270285
config.clone(),
286+
listening.clone(),
271287
);
272288
}
273289
}

linera-client/src/client_options.rs

+4
Original file line numberDiff line numberDiff line change
@@ -689,6 +689,10 @@ pub enum ClientCommand {
689689
/// no earlier than this.
690690
#[arg(long)]
691691
limit_rate_until: Option<DateTime<Utc>>,
692+
693+
/// Configuration for the faucet chain listener.
694+
#[command(flatten)]
695+
config: ChainListenerConfig,
692696
},
693697

694698
/// Publish bytecode.

linera-client/src/unit_tests/chain_listener.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use linera_views::memory::MemoryStore;
3131
use rand::SeedableRng as _;
3232

3333
use crate::{
34-
chain_listener::{self, ChainListener, ChainListenerConfig, ClientContext as _},
34+
chain_listener::{self, ChainClients, ChainListener, ChainListenerConfig, ClientContext as _},
3535
config::{CommitteeConfig, GenesisConfig, ValidatorConfig},
3636
wallet::{UserChain, Wallet},
3737
Error,
@@ -169,8 +169,9 @@ async fn test_chain_listener() -> anyhow::Result<()> {
169169
context
170170
.update_wallet_for_new_chain(chain_id0, Some(key_pair), clock.current_time())
171171
.await?;
172+
let chain_clients = ChainClients::from_clients(context.clients()).await;
172173
let context = Arc::new(Mutex::new(context));
173-
let listener = ChainListener::new(config, Default::default());
174+
let listener = ChainListener::new(config, chain_clients);
174175
listener.run(context, storage).await;
175176

176177
// Transfer ownership of chain 0 to the chain listener and some other key. The listener will

linera-service/src/faucet.rs

+45-14
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,15 @@ use linera_base::{
1313
identifiers::{ChainId, MessageId},
1414
ownership::ChainOwnership,
1515
};
16-
use linera_client::{chain_listener::ClientContext, config::GenesisConfig};
17-
use linera_core::{client::ChainClient, data_types::ClientOutcome, node::ValidatorNodeProvider};
16+
use linera_client::{
17+
chain_clients::ChainClients,
18+
chain_listener::{ChainListener, ChainListenerConfig, ClientContext},
19+
config::GenesisConfig,
20+
};
21+
use linera_core::{
22+
data_types::ClientOutcome,
23+
node::{ValidatorNode, ValidatorNodeProvider},
24+
};
1825
use linera_execution::committee::ValidatorName;
1926
use linera_storage::{Clock as _, Storage};
2027
use serde::Deserialize;
@@ -33,15 +40,17 @@ where
3340
S: Storage,
3441
{
3542
genesis_config: Arc<GenesisConfig>,
36-
client: Arc<Mutex<ChainClient<P, S>>>,
43+
clients: ChainClients<P, S>,
44+
chain_id: ChainId,
3745
}
3846

3947
/// The root GraphQL mutation type.
4048
pub struct MutationRoot<P, S, C>
4149
where
4250
S: Storage,
4351
{
44-
client: Arc<Mutex<ChainClient<P, S>>>,
52+
clients: ChainClients<P, S>,
53+
chain_id: ChainId,
4554
context: Arc<Mutex<C>>,
4655
amount: Amount,
4756
end_timestamp: Timestamp,
@@ -84,7 +93,7 @@ where
8493

8594
/// Returns the current committee's validators.
8695
async fn current_validators(&self) -> Result<Vec<Validator>, Error> {
87-
let client = self.client.lock().await;
96+
let client = self.clients.try_client_lock(&self.chain_id).await?;
8897
let committee = client.local_committee().await?;
8998
Ok(committee
9099
.validators()
@@ -119,7 +128,7 @@ where
119128
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
120129
{
121130
async fn do_claim(&self, public_key: PublicKey) -> Result<ClaimOutcome, Error> {
122-
let client = self.client.lock().await;
131+
let client = self.clients.try_client_lock(&self.chain_id).await?;
123132

124133
if self.start_timestamp < self.end_timestamp {
125134
let local_time = client.storage_client().clock().current_time();
@@ -149,7 +158,7 @@ where
149158
let result = client
150159
.open_chain(ownership, ApplicationPermissions::default(), self.amount)
151160
.await;
152-
self.context.lock().await.update_wallet(&*client).await?;
161+
self.context.lock().await.update_wallet(&client).await?;
153162
let (message_id, certificate) = match result? {
154163
ClientOutcome::Committed(result) => result,
155164
ClientOutcome::WaitForTimeout(timeout) => {
@@ -189,9 +198,12 @@ pub struct FaucetService<P, S, C>
189198
where
190199
S: Storage,
191200
{
192-
client: Arc<Mutex<ChainClient<P, S>>>,
201+
clients: ChainClients<P, S>,
202+
chain_id: ChainId,
193203
context: Arc<Mutex<C>>,
194204
genesis_config: Arc<GenesisConfig>,
205+
config: ChainListenerConfig,
206+
storage: S,
195207
port: NonZeroU16,
196208
amount: Amount,
197209
end_timestamp: Timestamp,
@@ -205,9 +217,12 @@ where
205217
{
206218
fn clone(&self) -> Self {
207219
Self {
208-
client: self.client.clone(),
220+
clients: self.clients.clone(),
221+
chain_id: self.chain_id,
209222
context: self.context.clone(),
210223
genesis_config: self.genesis_config.clone(),
224+
config: self.config.clone(),
225+
storage: self.storage.clone(),
211226
port: self.port,
212227
amount: self.amount,
213228
end_timestamp: self.end_timestamp,
@@ -220,25 +235,35 @@ where
220235
impl<P, S, C> FaucetService<P, S, C>
221236
where
222237
P: ValidatorNodeProvider + Send + Sync + Clone + 'static,
238+
<<P as ValidatorNodeProvider>::Node as ValidatorNode>::NotificationStream: Send,
223239
S: Storage + Clone + Send + Sync + 'static,
224240
C: ClientContext<ValidatorNodeProvider = P, Storage = S> + Send + 'static,
225241
{
226242
/// Creates a new instance of the faucet service.
243+
#[allow(clippy::too_many_arguments)]
227244
pub async fn new(
228245
port: NonZeroU16,
229-
client: ChainClient<P, S>,
246+
chain_id: ChainId,
230247
context: C,
231248
amount: Amount,
232249
end_timestamp: Timestamp,
233250
genesis_config: Arc<GenesisConfig>,
251+
config: ChainListenerConfig,
252+
storage: S,
234253
) -> anyhow::Result<Self> {
254+
let clients = ChainClients::<P, S>::from_clients(context.clients()).await;
255+
let context = Arc::new(Mutex::new(context));
256+
let client = clients.try_client_lock(&chain_id).await?;
235257
let start_timestamp = client.storage_client().clock().current_time();
236258
client.process_inbox().await?;
237259
let start_balance = client.local_balance().await?;
238260
Ok(Self {
239-
client: Arc::new(Mutex::new(client)),
240-
context: Arc::new(Mutex::new(context)),
261+
clients,
262+
chain_id,
263+
context,
241264
genesis_config,
265+
config,
266+
storage,
242267
port,
243268
amount,
244269
end_timestamp,
@@ -249,7 +274,8 @@ where
249274

250275
pub fn schema(&self) -> Schema<QueryRoot<P, S>, MutationRoot<P, S, C>, EmptySubscription> {
251276
let mutation_root = MutationRoot {
252-
client: self.client.clone(),
277+
clients: self.clients.clone(),
278+
chain_id: self.chain_id,
253279
context: self.context.clone(),
254280
amount: self.amount,
255281
end_timestamp: self.end_timestamp,
@@ -258,7 +284,8 @@ where
258284
};
259285
let query_root = QueryRoot {
260286
genesis_config: self.genesis_config.clone(),
261-
client: self.client.clone(),
287+
clients: self.clients.clone(),
288+
chain_id: self.chain_id,
262289
};
263290
Schema::build(query_root, mutation_root, EmptySubscription).finish()
264291
}
@@ -277,6 +304,10 @@ where
277304

278305
info!("GraphiQL IDE: http://localhost:{}", port);
279306

307+
ChainListener::new(self.config.clone(), self.clients.clone())
308+
.run(self.context.clone(), self.storage.clone())
309+
.await;
310+
280311
axum::serve(
281312
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?,
282313
app,

0 commit comments

Comments
 (0)