Skip to content

Commit 5f42a9b

Browse files
authored
NetworkMonitorBuilder - starting the monitor after rocket has launched (#754)
* NetworkMonitorBuilder - starting the monitor after rocket has launched * Removed unused import
1 parent 1811df9 commit 5f42a9b

File tree

2 files changed

+133
-102
lines changed

2 files changed

+133
-102
lines changed

validator-api/src/main.rs

+37-24
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ extern crate rocket;
77
use crate::cache::ValidatorCacheRefresher;
88
use crate::config::Config;
99
use crate::network_monitor::tested_network::good_topology::parse_topology_file;
10-
use crate::network_monitor::{new_monitor_runnables, NetworkMonitorRunnables};
10+
use crate::network_monitor::NetworkMonitorBuilder;
1111
use crate::nymd_client::Client;
1212
use crate::storage::NodeStatusStorage;
1313
use ::config::{defaults::DEFAULT_VALIDATOR_API_PORT, NymConfig};
@@ -16,10 +16,13 @@ use cache::ValidatorCache;
1616
use clap::{App, Arg, ArgMatches};
1717
use coconut::InternalSignRequest;
1818
use log::info;
19+
use rocket::fairing::AdHoc;
1920
use rocket::http::Method;
2021
use rocket::{Ignite, Rocket};
2122
use rocket_cors::{AllowedHeaders, AllowedOrigins, Cors};
2223
use std::process;
24+
use std::sync::Arc;
25+
use tokio::sync::Notify;
2326
use url::Url;
2427

2528
pub(crate) mod cache;
@@ -220,10 +223,16 @@ fn setup_cors() -> Result<Cors> {
220223
Ok(cors)
221224
}
222225

223-
async fn setup_network_monitor(
224-
config: &Config,
226+
fn setup_liftoff_notify(notify: Arc<Notify>) -> AdHoc {
227+
AdHoc::on_liftoff("Liftoff notifier", |_| {
228+
Box::pin(async move { notify.notify_one() })
229+
})
230+
}
231+
232+
fn setup_network_monitor<'a>(
233+
config: &'a Config,
225234
rocket: &Rocket<Ignite>,
226-
) -> Option<NetworkMonitorRunnables> {
235+
) -> Option<NetworkMonitorBuilder<'a>> {
227236
if !config.get_network_monitor_enabled() {
228237
return None;
229238
}
@@ -236,19 +245,16 @@ async fn setup_network_monitor(
236245
let v6_topology = parse_topology_file(config.get_v6_good_topology_file());
237246
network_monitor::check_if_up_to_date(&v4_topology, &v6_topology);
238247

239-
Some(
240-
new_monitor_runnables(
241-
config,
242-
v4_topology,
243-
v6_topology,
244-
node_status_storage,
245-
validator_cache,
246-
)
247-
.await,
248-
)
248+
Some(NetworkMonitorBuilder::new(
249+
config,
250+
v4_topology,
251+
v6_topology,
252+
node_status_storage,
253+
validator_cache,
254+
))
249255
}
250256

251-
async fn setup_rocket(config: &Config) -> Result<Rocket<Ignite>> {
257+
async fn setup_rocket(config: &Config, liftoff_notify: Arc<Notify>) -> Result<Rocket<Ignite>> {
252258
// let's build our rocket!
253259
let rocket_config = rocket::config::Config {
254260
// TODO: probably the port should be configurable?
@@ -257,6 +263,7 @@ async fn setup_rocket(config: &Config) -> Result<Rocket<Ignite>> {
257263
};
258264
let rocket = rocket::custom(rocket_config)
259265
.attach(setup_cors()?)
266+
.attach(setup_liftoff_notify(liftoff_notify))
260267
.attach(ValidatorCache::stage())
261268
.attach(InternalSignRequest::stage(config.keypair()));
262269

@@ -296,10 +303,11 @@ async fn main() -> Result<()> {
296303

297304
let matches = parse_args();
298305
let config = override_config(config, &matches);
306+
let liftoff_notify = Arc::new(Notify::new());
299307

300308
// let's build our rocket!
301-
let rocket = setup_rocket(&config).await?;
302-
let monitor_runnables = setup_network_monitor(&config, &rocket).await;
309+
let rocket = setup_rocket(&config, Arc::clone(&liftoff_notify)).await?;
310+
let monitor_builder = setup_network_monitor(&config, &rocket);
303311

304312
let validator_cache = rocket.state::<ValidatorCache>().unwrap().clone();
305313

@@ -327,19 +335,24 @@ async fn main() -> Result<()> {
327335
tokio::spawn(async move { validator_cache_refresher.run().await });
328336
}
329337

330-
if let Some(runnables) = monitor_runnables {
338+
// launch the rocket!
339+
let shutdown_handle = rocket.shutdown();
340+
tokio::spawn(rocket.launch());
341+
342+
// to finish building our monitor, we need to have rocket up and running so that we could
343+
// obtain our bandwidth credential
344+
if let Some(monitor_builder) = monitor_builder {
331345
info!("Starting network monitor...");
332-
// spawn network monitor!
346+
// wait for rocket's liftoff stage
347+
liftoff_notify.notified().await;
348+
349+
// we're ready to go! spawn the network monitor!
350+
let runnables = monitor_builder.build().await;
333351
runnables.spawn_tasks();
334352
} else {
335353
info!("Network monitoring is disabled.");
336354
}
337355

338-
// and launch the rocket
339-
let shutdown_handle = rocket.shutdown();
340-
341-
tokio::spawn(rocket.launch());
342-
343356
wait_for_interrupt().await;
344357
shutdown_handle.notify();
345358

validator-api/src/network_monitor/mod.rs

+96-78
Original file line numberDiff line numberDiff line change
@@ -31,99 +31,117 @@ pub(crate) mod monitor;
3131
pub(crate) mod test_packet;
3232
pub(crate) mod tested_network;
3333

34-
pub(crate) struct NetworkMonitorRunnables {
35-
monitor: Monitor,
36-
packet_receiver: PacketReceiver,
34+
pub(crate) struct NetworkMonitorBuilder<'a> {
35+
config: &'a Config,
36+
tested_network: TestedNetwork,
37+
node_status_storage: NodeStatusStorage,
38+
validator_cache: ValidatorCache,
3739
}
3840

39-
impl NetworkMonitorRunnables {
40-
// TODO: note, that is not exactly doing what we want, because when
41-
// `ReceivedProcessor` is constructed, it already spawns a future
42-
// this needs to be refactored!
43-
pub(crate) fn spawn_tasks(self) {
44-
let mut packet_receiver = self.packet_receiver;
45-
let mut monitor = self.monitor;
46-
tokio::spawn(async move { packet_receiver.run().await });
47-
tokio::spawn(async move { monitor.run().await });
41+
impl<'a> NetworkMonitorBuilder<'a> {
42+
pub(crate) fn new(
43+
config: &'a Config,
44+
v4_topology: NymTopology,
45+
v6_topology: NymTopology,
46+
node_status_storage: NodeStatusStorage,
47+
validator_cache: ValidatorCache,
48+
) -> Self {
49+
let tested_network = TestedNetwork::new_good(v4_topology, v6_topology);
50+
51+
NetworkMonitorBuilder {
52+
config,
53+
tested_network,
54+
node_status_storage,
55+
validator_cache,
56+
}
4857
}
49-
}
5058

51-
pub(crate) async fn new_monitor_runnables(
52-
config: &Config,
53-
v4_topology: NymTopology,
54-
v6_topology: NymTopology,
55-
node_status_storage: NodeStatusStorage,
56-
validator_cache: ValidatorCache,
57-
) -> NetworkMonitorRunnables {
58-
// TODO: in the future I guess this should somehow change to distribute the load
59-
let tested_mix_gateway = v4_topology.gateways()[0].clone();
60-
info!(
61-
"* gateway for testing mixnodes: {}",
62-
tested_mix_gateway.identity_key.to_base58_string()
63-
);
59+
pub(crate) async fn build(self) -> NetworkMonitorRunnables {
60+
// TODO: in the future I guess this should somehow change to distribute the load
61+
let tested_mix_gateway = self.tested_network.main_v4_gateway().clone();
62+
info!(
63+
"* gateway for testing mixnodes: {}",
64+
tested_mix_gateway.identity_key.to_base58_string()
65+
);
6466

65-
let tested_network = TestedNetwork::new_good(v4_topology, v6_topology);
67+
// TODO: those keys change constant throughout the whole execution of the monitor.
68+
// and on top of that, they are used with ALL the gateways -> presumably this should change
69+
// in the future
70+
let mut rng = rand::rngs::OsRng;
6671

67-
// TODO: those keys change constant throughout the whole execution of the monitor.
68-
// and on top of that, they are used with ALL the gateways -> presumably this should change
69-
// in the future
70-
let mut rng = rand::rngs::OsRng;
72+
let identity_keypair = Arc::new(identity::KeyPair::new(&mut rng));
73+
let encryption_keypair = Arc::new(encryption::KeyPair::new(&mut rng));
7174

72-
let identity_keypair = Arc::new(identity::KeyPair::new(&mut rng));
73-
let encryption_keypair = Arc::new(encryption::KeyPair::new(&mut rng));
75+
let test_mixnode_sender = Recipient::new(
76+
*identity_keypair.public_key(),
77+
*encryption_keypair.public_key(),
78+
tested_mix_gateway.identity_key,
79+
);
7480

75-
let test_mixnode_sender = Recipient::new(
76-
*identity_keypair.public_key(),
77-
*encryption_keypair.public_key(),
78-
tested_mix_gateway.identity_key,
79-
);
81+
let (gateway_status_update_sender, gateway_status_update_receiver) = mpsc::unbounded();
82+
let (received_processor_sender_channel, received_processor_receiver_channel) =
83+
mpsc::unbounded();
8084

81-
let (gateway_status_update_sender, gateway_status_update_receiver) = mpsc::unbounded();
82-
let (received_processor_sender_channel, received_processor_receiver_channel) =
83-
mpsc::unbounded();
85+
let packet_preparer = new_packet_preparer(
86+
self.validator_cache,
87+
self.tested_network.clone(),
88+
test_mixnode_sender,
89+
*identity_keypair.public_key(),
90+
*encryption_keypair.public_key(),
91+
);
8492

85-
let packet_preparer = new_packet_preparer(
86-
validator_cache,
87-
tested_network.clone(),
88-
test_mixnode_sender,
89-
*identity_keypair.public_key(),
90-
*encryption_keypair.public_key(),
91-
);
93+
let bandwidth_credential =
94+
TEMPORARY_obtain_bandwidth_credential(self.config, identity_keypair.public_key()).await;
9295

93-
let bandwidth_credential =
94-
TEMPORARY_obtain_bandwidth_credential(config, identity_keypair.public_key()).await;
96+
let packet_sender = new_packet_sender(
97+
self.config,
98+
gateway_status_update_sender,
99+
Arc::clone(&identity_keypair),
100+
bandwidth_credential,
101+
self.config.get_gateway_sending_rate(),
102+
);
95103

96-
let packet_sender = new_packet_sender(
97-
config,
98-
gateway_status_update_sender,
99-
Arc::clone(&identity_keypair),
100-
bandwidth_credential,
101-
config.get_gateway_sending_rate(),
102-
);
104+
let received_processor = new_received_processor(
105+
received_processor_receiver_channel,
106+
Arc::clone(&encryption_keypair),
107+
);
108+
let summary_producer = new_summary_producer(self.config.get_detailed_report());
109+
let packet_receiver = new_packet_receiver(
110+
gateway_status_update_receiver,
111+
received_processor_sender_channel,
112+
);
103113

104-
let received_processor = new_received_processor(
105-
received_processor_receiver_channel,
106-
Arc::clone(&encryption_keypair),
107-
);
108-
let summary_producer = new_summary_producer(config.get_detailed_report());
109-
let packet_receiver = new_packet_receiver(
110-
gateway_status_update_receiver,
111-
received_processor_sender_channel,
112-
);
114+
let monitor = monitor::Monitor::new(
115+
self.config,
116+
packet_preparer,
117+
packet_sender,
118+
received_processor,
119+
summary_producer,
120+
self.node_status_storage,
121+
self.tested_network,
122+
);
113123

114-
let monitor = monitor::Monitor::new(
115-
config,
116-
packet_preparer,
117-
packet_sender,
118-
received_processor,
119-
summary_producer,
120-
node_status_storage,
121-
tested_network,
122-
);
124+
NetworkMonitorRunnables {
125+
monitor,
126+
packet_receiver,
127+
}
128+
}
129+
}
123130

124-
NetworkMonitorRunnables {
125-
monitor,
126-
packet_receiver,
131+
pub(crate) struct NetworkMonitorRunnables {
132+
monitor: Monitor,
133+
packet_receiver: PacketReceiver,
134+
}
135+
136+
impl NetworkMonitorRunnables {
137+
// TODO: note, that is not exactly doing what we want, because when
138+
// `ReceivedProcessor` is constructed, it already spawns a future
139+
// this needs to be refactored!
140+
pub(crate) fn spawn_tasks(self) {
141+
let mut packet_receiver = self.packet_receiver;
142+
let mut monitor = self.monitor;
143+
tokio::spawn(async move { packet_receiver.run().await });
144+
tokio::spawn(async move { monitor.run().await });
127145
}
128146
}
129147

0 commit comments

Comments
 (0)