Skip to content

[Product Data] Client-side stats collection #5107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/client-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ nym-gateway-requests = { path = "../gateway-requests" }
nym-metrics = { path = "../nym-metrics" }
nym-nonexhaustive-delayqueue = { path = "../nonexhaustive-delayqueue" }
nym-sphinx = { path = "../nymsphinx" }
nym-statistics-common = { path = "../statistics" }
nym-pemstore = { path = "../pemstore" }
nym-topology = { path = "../topology", features = ["serializable"] }
nym-validator-client = { path = "../client-libs/validator-client", default-features = false }
Expand Down
63 changes: 46 additions & 17 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
// Copyright 2022-2023 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use super::packet_statistics_control::PacketStatisticsReporter;
use super::received_buffer::ReceivedBufferMessage;
use super::statistics_control::StatisticsControl;
use super::topology_control::geo_aware_provider::GeoAwareTopologyProvider;
use crate::client::base_client::storage::helpers::store_client_keys;
use crate::client::base_client::storage::MixnetClientStorage;
Expand All @@ -12,7 +12,6 @@ use crate::client::key_manager::persistence::KeyStore;
use crate::client::key_manager::ClientKeys;
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
use crate::client::packet_statistics_control::PacketStatisticsControl;
use crate::client::real_messages_control;
use crate::client::real_messages_control::RealMessagesController;
use crate::client::received_buffer::{
Expand Down Expand Up @@ -49,16 +48,20 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::addressing::nodes::NodeIdentity;
use nym_sphinx::params::PacketType;
use nym_sphinx::receiver::{ReconstructedMessage, SphinxMessageReceiver};
use nym_statistics_common::clients::ClientStatsSender;
use nym_statistics_common::StatsReportingConfig;
use nym_task::connections::{ConnectionCommandReceiver, ConnectionCommandSender, LaneQueueLengths};
use nym_task::{TaskClient, TaskHandle};
use nym_topology::provider_trait::TopologyProvider;
use nym_topology::HardcodedTopologyProvider;
use nym_validator_client::{nyxd::contract_traits::DkgQueryClient, UserAgent};
use rand::rngs::OsRng;
use sha2::Digest;
use std::fmt::Debug;
use std::os::raw::c_int as RawFd;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use url::Url;

#[cfg(all(
Expand Down Expand Up @@ -184,6 +187,7 @@ pub struct BaseClientBuilder<'a, C, S: MixnetClientStorage> {
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
shutdown: Option<TaskClient>,
user_agent: Option<UserAgent>,
stats_reporting_config: Option<StatsReportingConfig>,

setup_method: GatewaySetup,
}
Expand All @@ -207,6 +211,7 @@ where
custom_gateway_transceiver: None,
shutdown: None,
user_agent: None,
stats_reporting_config: None,
setup_method: GatewaySetup::MustLoad { gateway_id: None },
}
}
Expand Down Expand Up @@ -250,6 +255,12 @@ where
self
}

#[must_use]
pub fn with_statistics_reporting(mut self, config: StatsReportingConfig) -> Self {
self.stats_reporting_config = Some(config);
self
}

pub fn with_stored_topology<P: AsRef<Path>>(
mut self,
file: P,
Expand All @@ -273,7 +284,7 @@ where
self_address: Recipient,
topology_accessor: TopologyAccessor,
mix_tx: BatchMixMessageSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
shutdown: TaskClient,
) {
info!("Starting loop cover traffic stream...");
Expand Down Expand Up @@ -306,7 +317,7 @@ where
client_connection_rx: ConnectionCommandReceiver,
shutdown: TaskClient,
packet_type: PacketType,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) {
info!("Starting real traffic stream...");

Expand Down Expand Up @@ -335,7 +346,7 @@ where
reply_key_storage: SentReplyKeys,
reply_controller_sender: ReplyControllerSender,
shutdown: TaskClient,
packet_statistics_control: PacketStatisticsReporter,
metrics_reporter: ClientStatsSender,
) {
info!("Starting received messages buffer controller...");
let controller: ReceivedMessagesBufferController<SphinxMessageReceiver> =
Expand All @@ -345,7 +356,7 @@ where
mixnet_receiver,
reply_key_storage,
reply_controller_sender,
packet_statistics_control,
metrics_reporter,
);
controller.start_with_shutdown(shutdown)
}
Expand Down Expand Up @@ -586,11 +597,19 @@ where
Ok(())
}

fn start_packet_statistics_control(shutdown: TaskClient) -> PacketStatisticsReporter {
info!("Starting packet statistics control...");
let (packet_statistics_control, packet_stats_reporter) = PacketStatisticsControl::new();
packet_statistics_control.start_with_shutdown(shutdown);
packet_stats_reporter
fn start_statistics_control(
stats_reporting_config: Option<StatsReportingConfig>,
client_stats_id: String,
input_sender: Sender<InputMessage>,
shutdown: TaskClient,
) -> ClientStatsSender {
info!("Starting statistics control...");
StatisticsControl::create_and_start_with_shutdown(
stats_reporting_config,
client_stats_id,
input_sender.clone(),
shutdown.with_suffix("controller"),
)
}

fn start_mix_traffic_controller(
Expand Down Expand Up @@ -720,6 +739,17 @@ where
self.user_agent.clone(),
);

let client_stats_id = format!(
"{:x}",
sha2::Sha256::digest(self_address.identity().to_bytes())
);
let stats_reporter = Self::start_statistics_control(
self.stats_reporting_config,
client_stats_id,
input_sender.clone(),
shutdown.fork("statistics_control"),
);

// needs to be started as the first thing to block if required waiting for the gateway
Self::start_topology_refresher(
topology_provider,
Expand All @@ -731,9 +761,6 @@ where
)
.await?;

let packet_stats_reporter =
Self::start_packet_statistics_control(shutdown.fork("packet_statistics_control"));

let gateway_packet_router = PacketRouter::new(
ack_sender,
mixnet_messages_sender,
Expand Down Expand Up @@ -765,7 +792,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
shutdown.fork("received_messages_buffer"),
packet_stats_reporter.clone(),
stats_reporter.clone(),
);

// The message_sender is the transmitter for any component generating sphinx packets
Expand Down Expand Up @@ -804,7 +831,7 @@ where
client_connection_rx,
shutdown.fork("real_traffic_controller"),
self.config.debug.traffic.packet_type,
packet_stats_reporter.clone(),
stats_reporter.clone(),
);

if !self
Expand All @@ -819,7 +846,7 @@ where
self_address,
shared_topology_accessor.clone(),
message_sender,
packet_stats_reporter,
stats_reporter.clone(),
shutdown.fork("cover_traffic_stream"),
);
}
Expand Down Expand Up @@ -847,6 +874,7 @@ where
topology_accessor: shared_topology_accessor,
gateway_connection: GatewayConnection { gateway_ws_fd },
},
stats_reporter,
task_handle: shutdown,
})
}
Expand All @@ -858,6 +886,7 @@ pub struct BaseClient {
pub client_input: ClientInputStatus,
pub client_output: ClientOutputStatus,
pub client_state: ClientState,
pub stats_reporter: ClientStatsSender,

pub task_handle: TaskHandle,
}
12 changes: 6 additions & 6 deletions common/client-core/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

use crate::client::mix_traffic::BatchMixMessageSender;
use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};
use crate::client::topology_control::TopologyAccessor;
use crate::{config, spawn_future};
use futures::task::{Context, Poll};
Expand All @@ -13,6 +12,7 @@ use nym_sphinx::addressing::clients::Recipient;
use nym_sphinx::cover::generate_loop_cover_packet;
use nym_sphinx::params::{PacketSize, PacketType};
use nym_sphinx::utils::sample_poisson_duration;
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};
use rand::{rngs::OsRng, CryptoRng, Rng};
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -63,7 +63,7 @@ where

packet_type: PacketType,

stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
}

impl<R> Stream for LoopCoverTrafficStream<R>
Expand Down Expand Up @@ -109,7 +109,7 @@ impl LoopCoverTrafficStream<OsRng> {
topology_access: TopologyAccessor,
traffic_config: config::Traffic,
cover_config: config::CoverTraffic,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let rng = OsRng;

Expand Down Expand Up @@ -198,9 +198,9 @@ impl LoopCoverTrafficStream<OsRng> {
}
}
} else {
self.stats_tx.report(PacketStatisticsEvent::CoverPacketSent(
cover_traffic_packet_size.size(),
));
self.stats_tx.report(
PacketStatisticsEvent::CoverPacketSent(cover_traffic_packet_size.size()).into(),
);
}

// TODO: I'm not entirely sure whether this is really required, because I'm not 100%
Expand Down
2 changes: 1 addition & 1 deletion common/client-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ pub(crate) mod helpers;
pub mod inbound_messages;
pub mod key_manager;
pub mod mix_traffic;
pub mod packet_statistics_control;
pub mod real_messages_control;
pub mod received_buffer;
pub mod replies;
pub mod statistics_control;
pub mod topology_control;
pub(crate) mod transmission_buffer;
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::client::packet_statistics_control::{PacketStatisticsEvent, PacketStatisticsReporter};

use super::action_controller::{AckActionSender, Action};
use nym_statistics_common::clients::{packet_statistics::PacketStatisticsEvent, ClientStatsSender};

use futures::StreamExt;
use log::*;
use nym_gateway_client::AcknowledgementReceiver;
Expand All @@ -19,15 +19,15 @@ pub(super) struct AcknowledgementListener {
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
}

impl AcknowledgementListener {
pub(super) fn new(
ack_key: Arc<AckKey>,
ack_receiver: AcknowledgementReceiver,
action_sender: AckActionSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
AcknowledgementListener {
ack_key,
Expand All @@ -40,7 +40,7 @@ impl AcknowledgementListener {
async fn on_ack(&mut self, ack_content: Vec<u8>) {
trace!("Received an ack");
self.stats_tx
.report(PacketStatisticsEvent::AckReceived(ack_content.len()));
.report(PacketStatisticsEvent::AckReceived(ack_content.len()).into());

let frag_id = match recover_identifier(&self.ack_key, &ack_content)
.map(FragmentIdentifier::try_from_bytes)
Expand All @@ -57,13 +57,13 @@ impl AcknowledgementListener {
if frag_id == COVER_FRAG_ID {
trace!("Received an ack for a cover message - no need to do anything");
self.stats_tx
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()));
.report(PacketStatisticsEvent::CoverAckReceived(ack_content.len()).into());
return;
}

trace!("Received {} from the mix network", frag_id);
self.stats_tx
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()));
.report(PacketStatisticsEvent::RealAckReceived(ack_content.len()).into());
self.action_sender
.unbounded_send(Action::new_remove(frag_id))
.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use self::{
sent_notification_listener::SentNotificationListener,
};
use crate::client::inbound_messages::InputMessageReceiver;
use crate::client::packet_statistics_control::PacketStatisticsReporter;
use crate::client::real_messages_control::message_handler::MessageHandler;
use crate::client::replies::reply_controller::ReplyControllerSender;
use crate::spawn_future;
Expand All @@ -24,6 +23,7 @@ use nym_sphinx::{
chunking::fragment::{Fragment, FragmentIdentifier},
Delay as SphinxDelay,
};
use nym_statistics_common::clients::ClientStatsSender;
use rand::{CryptoRng, Rng};
use std::{
sync::{Arc, Weak},
Expand Down Expand Up @@ -209,7 +209,7 @@ where
connectors: AcknowledgementControllerConnectors,
message_handler: MessageHandler<R>,
reply_controller_sender: ReplyControllerSender,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let (retransmission_tx, retransmission_rx) = mpsc::unbounded();

Expand Down
4 changes: 2 additions & 2 deletions common/client-core/src/client/real_messages_control/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::client::replies::reply_controller;
use crate::config;
pub(crate) use acknowledgement_control::{AckActionSender, Action};

use super::packet_statistics_control::PacketStatisticsReporter;
use nym_statistics_common::clients::ClientStatsSender;

pub(crate) mod acknowledgement_control;
pub(crate) mod message_handler;
Expand Down Expand Up @@ -145,7 +145,7 @@ impl RealMessagesController<OsRng> {
reply_controller_receiver: ReplyControllerReceiver,
lane_queue_lengths: LaneQueueLengths,
client_connection_rx: ConnectionCommandReceiver,
stats_tx: PacketStatisticsReporter,
stats_tx: ClientStatsSender,
) -> Self {
let rng = OsRng;

Expand Down
Loading
Loading