Skip to content

Fix stats bug & remove HM caching #5495

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
set -e

user_rust_log_preference=$RUST_LOG
export ENVIRONMENT=${ENVIRONMENT:-"sandbox"}
export ENVIRONMENT=${ENVIRONMENT:-"mainnet"}
export NYM_API_CLIENT_TIMEOUT=60
export EXPLORER_CLIENT_TIMEOUT=60
export NODE_STATUS_API_TESTRUN_REFRESH_INTERVAL=120
Expand Down
3 changes: 0 additions & 3 deletions nym-node-status-api/nym-node-status-api/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ pub(crate) struct Cli {
env = "NYM_NODE_STATUS_API_MAX_AGENT_COUNT"
)]
pub(crate) max_agent_count: i64,

#[clap(long, default_value = "", env = "NYM_NODE_STATUS_API_HM_URL")]
pub(crate) hm_url: String,
}

fn parse_duration(arg: &str) -> Result<std::time::Duration, std::num::ParseIntError> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub(crate) async fn get_daily_stats(pool: &DbPool, offset: i64) -> anyhow::Resul
WHERE nym_node_daily_mixing_stats.node_id IS NULL
)
GROUP BY date_utc
ORDER BY date_utc DESC
ORDER BY date_utc ASC
LIMIT 30
OFFSET ?
"#,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ async fn get_stats(
Query(MixStatsQueryParams { offset }): Query<MixStatsQueryParams>,
State(state): State<AppState>,
) -> HttpResult<Json<Vec<DailyStats>>> {
let offset = offset.unwrap_or(0);
let offset: usize = offset
.unwrap_or(0)
.try_into()
.map_err(|_| HttpError::invalid_input("Offset must be non-negative"))?;
let last_30_days = state
.cache()
.get_mixnode_stats(state.db_pool(), offset)
Expand Down
10 changes: 1 addition & 9 deletions nym-node-status-api/nym-node-status-api/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,10 @@ pub(crate) async fn start_http_api(
nym_http_cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> anyhow::Result<ShutdownHandles> {
let router_builder = RouterBuilder::with_default_routes();

let state = AppState::new(
db_pool,
nym_http_cache_ttl,
agent_key_list,
agent_max_count,
hm_url,
)
.await;
let state = AppState::new(db_pool, nym_http_cache_ttl, agent_key_list, agent_max_count).await;
let router = router_builder.with_state(state);

let bind_addr = format!("0.0.0.0:{}", http_port);
Expand Down
120 changes: 19 additions & 101 deletions nym-node-status-api/nym-node-status-api/src/http/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ impl AppState {
cache_ttl: u64,
agent_key_list: Vec<PublicKey>,
agent_max_count: i64,
hm_url: String,
) -> Self {
Self {
db_pool,
cache: HttpCache::new(cache_ttl, hm_url).await,
cache: HttpCache::new(cache_ttl).await,
agent_key_list,
agent_max_count,
}
Expand All @@ -52,108 +51,25 @@ impl AppState {
}
}

#[derive(Debug, Clone)]
struct HistoricMixingStats {
historic_stats: Vec<DailyStats>,
}

impl HistoricMixingStats {
/// Collect historic stats only on initialization. From this point onwards,
/// service will collect its own stats
async fn init(hm_url: String) -> Self {
tracing::info!("Fetching historic mixnode stats from {}", hm_url);

let target_url = format!("{}/v2/mixnodes/stats", hm_url);
if let Ok(response) = reqwest::get(&target_url)
.await
.and_then(|res| res.error_for_status())
.inspect_err(|err| tracing::error!("Failed to fetch cache from HM: {}", err))
{
if let Ok(mut daily_stats) = response.json::<Vec<DailyStats>>().await {
// sorting required for seamless comparison later (descending, newest first)
daily_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));

tracing::info!(
"Successfully fetched {} historic entries from {}",
daily_stats.len(),
hm_url
);
return Self {
historic_stats: daily_stats,
};
}
};

tracing::warn!("Failed to get historic daily stats from {}", hm_url);
Self {
historic_stats: Vec::new(),
}
}

/// polyfill with historical data obtained from Harbour Master
fn merge_with_historic_stats(&self, mut new_stats: Vec<DailyStats>) -> Vec<DailyStats> {
// newest first
new_stats.sort_by(|left, right| right.date_utc.cmp(&left.date_utc));

// historic stats are only used for dates when we don't have new data
let oldest_date_in_new_stats = new_stats
.last()
.map(|day| day.date_utc.to_owned())
.unwrap_or(String::from("1900-01-01"));

// given 2 arrays
// index historic_stats new_stats
// 0 30-01 31-01
// 1 29-01 30-01
// 2 28-01
// ...
// N 01-01
// cutoff point would be at historic_stats[1]
// (first date smaller than oldest we've already got)
if let Some(cutoff) = self
.historic_stats
.iter()
.position(|elem| elem.date_utc < oldest_date_in_new_stats)
{
// missing data = (all historic data) - (however many days we already have)
let missing_data = self.historic_stats.iter().skip(cutoff).cloned();

// extend new data with missing days
tracing::debug!(
"Polyfilled with {} historic records from {:?} to {:?}",
missing_data.len(),
self.historic_stats.last(),
self.historic_stats.get(cutoff)
);
new_stats.extend(missing_data);

// oldest first
new_stats.into_iter().rev().collect::<Vec<_>>()
} else {
// if all historic data is older than what we've got, don't use it
new_stats
}
}
}

static GATEWAYS_LIST_KEY: &str = "gateways";
static MIXNODES_LIST_KEY: &str = "mixnodes";
static MIXSTATS_LIST_KEY: &str = "mixstats";
static SUMMARY_HISTORY_LIST_KEY: &str = "summary-history";
static SESSION_STATS_LIST_KEY: &str = "session-stats";

const MIXNODE_STATS_HISTORY_DAYS: usize = 30;

#[derive(Debug, Clone)]
pub(crate) struct HttpCache {
gateways: Cache<String, Arc<RwLock<Vec<Gateway>>>>,
mixnodes: Cache<String, Arc<RwLock<Vec<Mixnode>>>>,
mixstats: Cache<String, Arc<RwLock<Vec<DailyStats>>>>,
history: Cache<String, Arc<RwLock<Vec<SummaryHistory>>>>,
session_stats: Cache<String, Arc<RwLock<Vec<SessionStats>>>>,
mixnode_historic_daily_stats: HistoricMixingStats,
}

impl HttpCache {
pub async fn new(ttl_seconds: u64, hm_url: String) -> Self {
pub async fn new(ttl_seconds: u64) -> Self {
HttpCache {
gateways: Cache::builder()
.max_capacity(2)
Expand All @@ -175,7 +91,6 @@ impl HttpCache {
.max_capacity(2)
.time_to_live(Duration::from_secs(ttl_seconds))
.build(),
mixnode_historic_daily_stats: HistoricMixingStats::init(hm_url).await,
}
}

Expand Down Expand Up @@ -285,24 +200,27 @@ impl HttpCache {
.await
}

pub async fn get_mixnode_stats(&self, db: &DbPool, offset: i64) -> Vec<DailyStats> {
pub async fn get_mixnode_stats(&self, db: &DbPool, offset: usize) -> Vec<DailyStats> {
match self.mixstats.get(MIXSTATS_LIST_KEY).await {
Some(guard) => {
let read_lock = guard.read().await;
read_lock.to_vec()
let mut stats = read_lock.to_vec();

stats.truncate(MIXNODE_STATS_HISTORY_DAYS + offset);
stats.into_iter().skip(offset).collect()
}
None => {
let new_node_stats = crate::db::queries::get_daily_stats(db, offset)
let mut new_node_stats = crate::db::queries::get_daily_stats(db, 0)
.await
.unwrap_or_default();
// for every day that's missing, fill it with cached historic data
let mut mixnode_stats = self
.mixnode_historic_daily_stats
.merge_with_historic_stats(new_node_stats);
mixnode_stats.truncate(30);

self.upsert_mixnode_stats(mixnode_stats.clone()).await;
mixnode_stats
.unwrap_or_default()
.into_iter()
.rev()
.collect::<Vec<_>>();
// cache result without offset
self.upsert_mixnode_stats(new_node_stats.clone()).await;

new_node_stats.truncate(MIXNODE_STATS_HISTORY_DAYS + offset);
new_node_stats.into_iter().skip(offset).collect()
}
}
}
Expand Down
1 change: 0 additions & 1 deletion nym-node-status-api/nym-node-status-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ async fn main() -> anyhow::Result<()> {
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
args.max_agent_count,
args.hm_url,
)
.await
.expect("Failed to start server");
Expand Down
Loading