Skip to content

feature: periodically remove stale gateway messages #5312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion common/gateway-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ license.workspace = true
[dependencies]
bincode = { workspace = true }
defguard_wireguard_rs = { workspace = true }
log = { workspace = true }
sqlx = { workspace = true, features = [
"runtime-tokio-rustls",
"sqlite",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright 2025 - Nym Technologies SA <[email protected]>
* SPDX-License-Identifier: GPL-3.0-only
*/

ALTER TABLE message_store
ADD COLUMN timestamp TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP;
67 changes: 28 additions & 39 deletions common/gateway-storage/src/inboxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
// SPDX-License-Identifier: GPL-3.0-only

use crate::models::StoredMessage;
use time::OffsetDateTime;
use tracing::debug;

#[derive(Clone)]
pub(crate) struct InboxManager {
pub struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent out of memory errors in the case of client receiving a lot of data while
Expand Down Expand Up @@ -71,44 +73,22 @@ impl InboxManager {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let mut res = if let Some(start_after) = start_after {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
start_after,
limit
)
.fetch_all(&self.connection_pool)
.await?
} else {
sqlx::query_as!(
StoredMessage,
r#"
SELECT
id as "id!",
client_address_bs58 as "client_address_bs58!",
content as "content!"
FROM message_store
WHERE client_address_bs58 = ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
};
let start_after = start_after.unwrap_or(-1);

let mut res: Vec<StoredMessage> = sqlx::query_as(
r#"
SELECT id, client_address_bs58, content, timestamp
FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
)
.bind(client_address_bs58)
.bind(start_after)
.bind(limit)
.fetch_all(&self.connection_pool)
.await?;

if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
Expand Down Expand Up @@ -146,4 +126,13 @@ impl InboxManager {
.await?;
Ok(())
}

pub async fn remove_stale(&self, cutoff: OffsetDateTime) -> Result<(), sqlx::Error> {
let affected = sqlx::query!("DELETE FROM message_store WHERE timestamp < ?", cutoff)
.execute(&self.connection_pool)
.await?
.rows_affected();
debug!("Removed {affected} stale messages");
Ok(())
}
}
4 changes: 2 additions & 2 deletions common/gateway-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use bandwidth::BandwidthManager;
use clients::{ClientManager, ClientType};
use inboxes::InboxManager;
use models::{
Client, PersistedBandwidth, PersistedSharedKeys, RedemptionProposal, StoredMessage,
VerifiedTicket, WireguardPeer,
Expand Down Expand Up @@ -31,6 +30,7 @@ mod tickets;
mod wireguard_peers;

pub use error::GatewayStorageError;
pub use inboxes::InboxManager;

// note that clone here is fine as upon cloning the same underlying pool will be used
#[derive(Clone)]
Expand All @@ -53,7 +53,7 @@ impl GatewayStorage {
&self.shared_key_manager
}

pub(crate) fn inbox_manager(&self) -> &InboxManager {
pub fn inbox_manager(&self) -> &InboxManager {
&self.inbox_manager
}

Expand Down
2 changes: 2 additions & 0 deletions common/gateway-storage/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ impl TryFrom<PersistedSharedKeys> for SharedGatewayKey {
}
}

#[derive(FromRow)]
pub struct StoredMessage {
pub id: i64,
#[allow(dead_code)]
pub client_address_bs58: String,
pub content: Vec<u8>,
pub timestamp: OffsetDateTime,
}

#[derive(Debug, Clone, FromRow)]
Expand Down
63 changes: 6 additions & 57 deletions gateway/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
// Copyright 2024 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: GPL-3.0-only

use nym_network_defaults::TICKETBOOK_VALIDITY_DAYS;
use std::net::SocketAddr;
use std::time::Duration;
use url::Url;

// TODO: can we move those away?
pub const DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE: Duration = Duration::from_millis(5);
pub const DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT: i64 = 512 * 1024; // 512kB

#[derive(Debug)]
pub struct Config {
pub gateway: Gateway,
Expand Down Expand Up @@ -96,18 +91,13 @@ pub struct Debug {
/// Defines a maximum change in client bandwidth before it gets flushed to the persistent storage.
pub client_bandwidth_max_delta_flushing_amount: i64,

pub zk_nym_tickets: ZkNymTicketHandlerDebug,
}
/// Specifies how often the clean-up task should check for stale data.
pub stale_messages_cleaner_run_interval: Duration,

impl Default for Debug {
fn default() -> Self {
Debug {
client_bandwidth_max_flushing_rate: DEFAULT_CLIENT_BANDWIDTH_MAX_FLUSHING_RATE,
client_bandwidth_max_delta_flushing_amount:
DEFAULT_CLIENT_BANDWIDTH_MAX_DELTA_FLUSHING_AMOUNT,
zk_nym_tickets: Default::default(),
}
}
/// Specifies maximum age of stored messages before they are removed from the storage
pub stale_messages_max_age: Duration,

pub zk_nym_tickets: ZkNymTicketHandlerDebug,
}

#[derive(Debug, Clone)]
Expand All @@ -131,44 +121,3 @@ pub struct ZkNymTicketHandlerDebug {
/// That's required as nym-apis will purge all ticket information for tickets older than maximum validity.
pub maximum_time_between_redemption: Duration,
}

impl ZkNymTicketHandlerDebug {
pub const DEFAULT_REVOCATION_BANDWIDTH_PENALTY: f32 = 10.0;
pub const DEFAULT_PENDING_POLLER: Duration = Duration::from_secs(300);
pub const DEFAULT_MINIMUM_API_QUORUM: f32 = 0.8;
pub const DEFAULT_MINIMUM_REDEMPTION_TICKETS: usize = 100;

// use min(4/5 of max validity, validity - 1), but making sure it's no greater than 1 day
// ASSUMPTION: our validity period is AT LEAST 2 days
//
// this could have been a constant, but it's more readable as a function
pub const fn default_maximum_time_between_redemption() -> Duration {
let desired_secs = TICKETBOOK_VALIDITY_DAYS * (86400 * 4) / 5;
let desired_secs_alt = (TICKETBOOK_VALIDITY_DAYS - 1) * 86400;

// can't use `min` in const context
let target_secs = if desired_secs < desired_secs_alt {
desired_secs
} else {
desired_secs_alt
};

assert!(
target_secs > 86400,
"the maximum time between redemption can't be lower than 1 day!"
);
Duration::from_secs(target_secs as u64)
}
}

impl Default for ZkNymTicketHandlerDebug {
fn default() -> Self {
ZkNymTicketHandlerDebug {
revocation_bandwidth_penalty: Self::DEFAULT_REVOCATION_BANDWIDTH_PENALTY,
pending_poller: Self::DEFAULT_PENDING_POLLER,
minimum_api_quorum: Self::DEFAULT_MINIMUM_API_QUORUM,
minimum_redemption_tickets: Self::DEFAULT_MINIMUM_REDEMPTION_TICKETS,
maximum_time_between_redemption: Self::default_maximum_time_between_redemption(),
}
}
}
7 changes: 0 additions & 7 deletions gateway/src/node/mixnet_handling/mod.rs

This file was deleted.

Loading
Loading