Skip to content

Commit db1cbe7

Browse files
authored
Initial commit for the block exporter. (#3746)
## Motivation Initial work for #3661. ## Proposal - Created a new service implementing the `NotifierService` from linera-rpc to receive redirected notifications about new blocks from the chain workers, similar to linera-proxy(closes #3662). - A new CLI parsable type `BlockExporterConfig` to define the configuration options for the linera-exporter from a toml file(closes #3664). - Extended `linera net up` to optionally run a block exporter with a optional specified configuration(closes #3663). - Added a new view object `BlockExporterStateView` to track and update the latest information about the blocks from the workers(closes #3665). - This PR also partially addresses #3666, without the topological sorting part. ## Test Plan new unit and integration tests + CI
1 parent e04728f commit db1cbe7

File tree

15 files changed

+815
-6
lines changed

15 files changed

+815
-6
lines changed

Diff for: CLI.md

+3
Original file line numberDiff line numberDiff line change
@@ -995,6 +995,9 @@ Start a Local Linera Network
995995
* `--faucet-amount <FAUCET_AMOUNT>` — The number of tokens to send to each new chain created by the faucet
996996

997997
Default value: `1000`
998+
* `--block-exporters <BLOCK_EXPORTERS>` — The number of block exporters per validator in the local test network. Default is 0
999+
1000+
Default value: `0`
9981001

9991002

10001003

Diff for: Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: linera-client/src/config.rs

+37-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ use linera_execution::{
1919
committee::{Committee, ValidatorState},
2020
ResourceControlPolicy,
2121
};
22-
use linera_rpc::config::{ValidatorInternalNetworkConfig, ValidatorPublicNetworkConfig};
22+
use linera_rpc::config::{
23+
ExporterServiceConfig, ValidatorInternalNetworkConfig, ValidatorPublicNetworkConfig,
24+
};
2325
use linera_storage::Storage;
2426
use serde::{Deserialize, Serialize};
2527

@@ -244,3 +246,37 @@ impl GenesisConfig {
244246
CryptoHash::new(self)
245247
}
246248
}
249+
250+
/// The configuration file for the linera-exporter.
251+
#[derive(Serialize, Deserialize, Debug, Clone)]
252+
pub struct BlockExporterConfig {
253+
/// The server configuration for the linera-exporter.
254+
pub service_config: ExporterServiceConfig,
255+
256+
/// The configuration file for the export destinations.
257+
#[serde(default)]
258+
pub destination_config: DestinationConfig,
259+
260+
/// Identity for the block exporter state.
261+
pub id: u32,
262+
}
263+
264+
/// Configuration file for the exports.
265+
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
266+
pub struct DestinationConfig {
267+
/// The destination URIs to export to.
268+
pub destinations: Vec<Destination>,
269+
}
270+
271+
// Each destination has an ID and a configuration.
272+
pub type DestinationId = u16;
273+
274+
/// The uri to provide export services to.
275+
#[allow(dead_code)]
276+
#[derive(Serialize, Deserialize, Debug, Clone)]
277+
pub struct Destination {
278+
/// The host name of the target destination (IP or hostname).
279+
pub endpoint: String,
280+
/// The port number of the target destination.
281+
pub port: u16,
282+
}

Diff for: linera-rpc/src/config.rs

+23
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ pub struct ValidatorInternalNetworkPreConfig<P> {
140140
pub host: String,
141141
/// The port the proxy listens on the internal network.
142142
pub port: u16,
143+
/// The server configurations for the linera-exporter.
144+
/// They can be used as optional locations to forward notifications to destinations other than
145+
/// the proxy, by the workers.
146+
pub block_exporters: Vec<ExporterServiceConfig>,
143147
/// The port of the proxy's metrics endpoint.
144148
pub metrics_port: u16,
145149
}
@@ -152,6 +156,7 @@ impl<P> ValidatorInternalNetworkPreConfig<P> {
152156
shards: self.shards.clone(),
153157
host: self.host.clone(),
154158
port: self.port,
159+
block_exporters: self.block_exporters.clone(),
155160
metrics_port: self.metrics_port,
156161
}
157162
}
@@ -161,6 +166,15 @@ impl ValidatorInternalNetworkConfig {
161166
pub fn proxy_address(&self) -> String {
162167
format!("{}://{}:{}", self.protocol.scheme(), self.host, self.port)
163168
}
169+
170+
pub fn exporter_addresses(&self) -> Vec<String> {
171+
self.block_exporters
172+
.iter()
173+
.map(|ExporterServiceConfig { host, port }| {
174+
format!("{}://{}:{}", self.protocol.scheme(), host, port)
175+
})
176+
.collect::<Vec<_>>()
177+
}
164178
}
165179

166180
impl ValidatorPublicNetworkConfig {
@@ -273,6 +287,15 @@ impl<P> ValidatorInternalNetworkPreConfig<P> {
273287
}
274288
}
275289

290+
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
291+
/// The server configuration for the linera-exporter.
292+
pub struct ExporterServiceConfig {
293+
/// The host name of the server (IP or hostname).
294+
pub host: String,
295+
/// The port for the server to listen on.
296+
pub port: u16,
297+
}
298+
276299
#[test]
277300
fn cross_chain_config_to_args() {
278301
let config = CrossChainConfig::default();

Diff for: linera-rpc/src/grpc/server.rs

+31-2
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ use futures::{
1818
use linera_base::{data_types::Blob, identifiers::ChainId};
1919
use linera_core::{
2020
node::NodeError,
21-
worker::{NetworkActions, Notification, WorkerError, WorkerState},
21+
worker::{NetworkActions, Notification, Reason, WorkerError, WorkerState},
2222
JoinSetExt as _, TaskHandle,
2323
};
2424
use linera_storage::Storage;
2525
use rand::Rng;
2626
use tokio::{sync::oneshot, task::JoinSet};
2727
use tokio_util::sync::CancellationToken;
28-
use tonic::{Request, Response, Status};
28+
use tonic::{transport::Channel, Request, Response, Status};
2929
use tower::{builder::ServiceBuilder, Layer, Service};
3030
use tracing::{debug, error, info, instrument, trace, warn};
3131
#[cfg(with_metrics)]
@@ -240,6 +240,7 @@ where
240240
Self::forward_notifications(
241241
state.nickname().to_string(),
242242
internal_network.proxy_address(),
243+
internal_network.exporter_addresses(),
243244
notification_receiver,
244245
)
245246
});
@@ -293,6 +294,7 @@ where
293294
async fn forward_notifications(
294295
nickname: String,
295296
proxy_address: String,
297+
exporter_addresses: Vec<String>,
296298
mut receiver: Receiver<Notification>,
297299
) {
298300
let channel = tonic::transport::Channel::from_shared(proxy_address.clone())
@@ -302,7 +304,20 @@ where
302304
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
303305
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
304306

307+
let mut exporter_clients: Vec<NotifierServiceClient<Channel>> = exporter_addresses
308+
.iter()
309+
.map(|address| {
310+
let channel = tonic::transport::Channel::from_shared(address.clone())
311+
.expect("Exporter URI should be valid")
312+
.connect_lazy();
313+
NotifierServiceClient::new(channel)
314+
.max_encoding_message_size(GRPC_MAX_MESSAGE_SIZE)
315+
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE)
316+
})
317+
.collect::<Vec<_>>();
318+
305319
while let Some(notification) = receiver.next().await {
320+
let reason = &notification.reason;
306321
let notification: api::Notification = match notification.clone().try_into() {
307322
Ok(notification) => notification,
308323
Err(error) => {
@@ -319,6 +334,20 @@ where
319334
"could not send notification",
320335
)
321336
}
337+
338+
if let Reason::NewBlock { height: _, hash: _ } = reason {
339+
for exporter_client in &mut exporter_clients {
340+
let request = tonic::Request::new(notification.clone());
341+
if let Err(error) = exporter_client.notify(request).await {
342+
error!(
343+
%error,
344+
nickname,
345+
?notification,
346+
"could not send notification",
347+
)
348+
}
349+
}
350+
}
322351
}
323352
}
324353

Diff for: linera-service/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ async-lock.workspace = true
6060
async-trait.workspace = true
6161
async-tungstenite.workspace = true
6262
axum = { workspace = true, features = ["ws"] }
63+
bcs.workspace = true
64+
bincode.workspace = true
6365
cargo_toml.workspace = true
6466
cfg-if.workspace = true
6567
chrono = { workspace = true, features = ["clock"] }
@@ -174,6 +176,10 @@ name = "linera-benchmark"
174176
path = "src/benchmark.rs"
175177
required-features = ["benchmark"]
176178

179+
[[bin]]
180+
name = "linera-exporter"
181+
path = "src/linera-exporter/main.rs"
182+
177183
[[bench]]
178184
name = "transfers"
179185
harness = false

0 commit comments

Comments
 (0)