Skip to content

Commit e9ccdcb

Browse files
jstuczynmmsinclair
authored andcommitted
Feature/persistent gateway storage (#784)
* Sqlx struct stub * Initial schema * Initial error enum * Managed for persisted shared keys * Initial inbox manager * Comments * Using new database in clients handler * Extending gateway storage API * tokio::main + placeholder values * Removed old client store * Simplified logic of async packet processing * Renamed table + not null restriction * BandwidthManager * Removed sled dependency * Using centralised storage for bandwidth * Dead code removal * WIP connection_handler split and simplification Maybe it doesn't look like it right now, but once completed it will remove bunch of redundant checks for Nones etc * Further more explicit clients handler split * Minor cleanup * Temporary store for active client handles * Fixed error types * Error trait on iv and encrypted address * Authentication and registration moved to the handler * Removal of clients handler * Further logic simplification + returned explicit bandwidth values * Further cleanup and comments * Updated config with relevant changes * Basic bandwidth tracking in client * FreshHandle doc comments + fixed stagger issue * Removed side-effects from .map * More doc comments * Database migration on build * Increased default claimed bandwidth * Renaming * Fixed client determining available bandwidth * Removed dead sql table that might be used in the future * Windows workaround * Comment * Return error rather than cap credential
1 parent 655dd59 commit e9ccdcb

File tree

37 files changed

+1917
-1618
lines changed

37 files changed

+1917
-1618
lines changed

Cargo.lock

+2-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

clients/client-core/src/client/mix_traffic.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ impl MixTrafficController {
4040
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
4141
debug_assert!(!mix_packets.is_empty());
4242

43-
let success = if mix_packets.len() == 1 {
43+
let result = if mix_packets.len() == 1 {
4444
let mix_packet = mix_packets.pop().unwrap();
4545
self.gateway_client.send_mix_packet(mix_packet).await
4646
} else {
@@ -49,7 +49,7 @@ impl MixTrafficController {
4949
.await
5050
};
5151

52-
match success {
52+
match result {
5353
Err(e) => {
5454
error!("Failed to send sphinx packet(s) to the gateway! - {:?}", e);
5555
self.consecutive_gateway_failure_count += 1;

common/client-libs/gateway-client/src/client.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ const DEFAULT_RECONNECTION_BACKOFF: Duration = Duration::from_secs(5);
3636

3737
pub struct GatewayClient {
3838
authenticated: bool,
39-
// TODO: This should be replaced by an actual bandwidth value, with 0 meaning no bandwidth
40-
has_bandwidth: bool,
39+
bandwidth_remaining: i64,
4140
gateway_address: String,
4241
gateway_identity: identity::PublicKey,
4342
local_identity: Arc<identity::KeyPair>,
@@ -72,7 +71,7 @@ impl GatewayClient {
7271
) -> Self {
7372
GatewayClient {
7473
authenticated: false,
75-
has_bandwidth: false,
74+
bandwidth_remaining: 0,
7675
gateway_address,
7776
gateway_identity,
7877
local_identity,
@@ -117,7 +116,7 @@ impl GatewayClient {
117116

118117
GatewayClient {
119118
authenticated: false,
120-
has_bandwidth: false,
119+
bandwidth_remaining: 0,
121120
gateway_address,
122121
gateway_identity,
123122
local_identity,
@@ -474,22 +473,29 @@ impl GatewayClient {
474473
)
475474
.ok_or(GatewayClientError::SerializeCredential)?
476475
.into();
477-
self.has_bandwidth = match self.send_websocket_message(msg).await? {
478-
ServerResponse::Bandwidth { status } => Ok(status),
476+
self.bandwidth_remaining = match self.send_websocket_message(msg).await? {
477+
ServerResponse::Bandwidth { available_total } => Ok(available_total),
479478
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
480479
_ => Err(GatewayClientError::UnexpectedResponse),
481480
}?;
482481
Ok(())
483482
}
484483

484+
fn estimate_required_bandwidth(&self, packets: &[MixPacket]) -> i64 {
485+
packets
486+
.iter()
487+
.map(|packet| packet.sphinx_packet().len())
488+
.sum::<usize>() as i64
489+
}
490+
485491
pub async fn batch_send_mix_packets(
486492
&mut self,
487493
packets: Vec<MixPacket>,
488494
) -> Result<(), GatewayClientError> {
489495
if !self.authenticated {
490496
return Err(GatewayClientError::NotAuthenticated);
491497
}
492-
if !self.has_bandwidth {
498+
if self.estimate_required_bandwidth(&packets) < self.bandwidth_remaining {
493499
return Err(GatewayClientError::NotEnoughBandwidth);
494500
}
495501
if !self.connection.is_established() {
@@ -550,7 +556,7 @@ impl GatewayClient {
550556
if !self.authenticated {
551557
return Err(GatewayClientError::NotAuthenticated);
552558
}
553-
if !self.has_bandwidth {
559+
if (mix_packet.sphinx_packet().len() as i64) > self.bandwidth_remaining {
554560
return Err(GatewayClientError::NotEnoughBandwidth);
555561
}
556562
if !self.connection.is_established() {
@@ -598,7 +604,7 @@ impl GatewayClient {
598604
if !self.authenticated {
599605
return Err(GatewayClientError::NotAuthenticated);
600606
}
601-
if !self.has_bandwidth {
607+
if self.bandwidth_remaining <= 0 {
602608
return Err(GatewayClientError::NotEnoughBandwidth);
603609
}
604610
if self.connection.is_partially_delegated() {

common/credentials/src/bandwidth.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::error::Error;
1212
use crate::utils::{obtain_aggregate_signature, prepare_credential_for_spending};
1313
use coconut_interface::{hash_to_scalar, Credential, Parameters, Signature, VerificationKey};
1414

15-
const BANDWIDTH_VALUE: u64 = 1024 * 1024; // 1 MB
15+
const BANDWIDTH_VALUE: u64 = 10 * 1024 * 1024 * 1024; // 10 GB
1616

1717
pub const PUBLIC_ATTRIBUTES: u32 = 1;
1818
pub const PRIVATE_ATTRIBUTES: u32 = 1;

gateway/Cargo.toml

+6-1
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ log = "0.4"
2020
pretty_env_logger = "0.4"
2121
rand = "0.7"
2222
serde = { version = "1.0.104", features = ["derive"] }
23-
sled = "0.34"
23+
thiserror = "1"
2424
tokio = { version = "1.4", features = [ "rt-multi-thread", "net", "signal", "fs" ] }
2525
tokio-util = { version = "0.6", features = [ "codec" ] }
2626
tokio-stream = { version = "0.1", features = [ "fs" ] }
2727
tokio-tungstenite = "0.14"
2828
url = { version = "2.2", features = [ "serde" ] }
29+
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
2930

3031
# internal
3132
coconut-interface = { path = "../common/coconut-interface" }
@@ -39,3 +40,7 @@ nymsphinx = { path = "../common/nymsphinx" }
3940
pemstore = { path = "../common/pemstore" }
4041
validator-client = { path = "../common/client-libs/validator-client" }
4142
version-checker = { path = "../common/version-checker" }
43+
44+
[build-dependencies]
45+
tokio = { version = "1.4", features = ["rt-multi-thread", "macros"] }
46+
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }

gateway/build.rs

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
use sqlx::{Connection, SqliteConnection};
2+
use std::env;
3+
4+
#[tokio::main]
5+
async fn main() {
6+
let out_dir = env::var("OUT_DIR").unwrap();
7+
let database_path = format!("{}/gateway-example.sqlite", out_dir);
8+
9+
let mut conn = SqliteConnection::connect(&*format!("sqlite://{}?mode=rwc", database_path))
10+
.await
11+
.expect("Failed to create SQLx database connection");
12+
13+
sqlx::migrate!("./migrations")
14+
.run(&mut conn)
15+
.await
16+
.expect("Failed to perform SQLx migrations");
17+
18+
#[cfg(target_family = "unix")]
19+
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);
20+
21+
#[cfg(target_family = "windows")]
22+
// for some strange reason we need to add a leading `/` to the windows path even though it's
23+
// not a valid windows path... but hey, it works...
24+
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
25+
}

gateway/gateway-requests/src/authentication/encrypted_address.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::registration::handshake::shared_key::SharedKeys;
66
use crypto::symmetric::stream_cipher;
77
use nymsphinx::params::GatewayEncryptionAlgorithm;
88
use nymsphinx::{DestinationAddressBytes, DESTINATION_ADDRESS_LENGTH};
9+
use thiserror::Error;
910

1011
pub const ENCRYPTED_ADDRESS_SIZE: usize = DESTINATION_ADDRESS_LENGTH;
1112

@@ -16,9 +17,11 @@ pub const ENCRYPTED_ADDRESS_SIZE: usize = DESTINATION_ADDRESS_LENGTH;
1617
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
1718
pub struct EncryptedAddressBytes([u8; ENCRYPTED_ADDRESS_SIZE]);
1819

19-
#[derive(Debug)]
20+
#[derive(Debug, Error)]
2021
pub enum EncryptedAddressConversionError {
21-
DecodeError(bs58::decode::Error),
22+
#[error("Failed to decode the encrypted address - {0}")]
23+
DecodeError(#[from] bs58::decode::Error),
24+
#[error("The decoded address has invalid length")]
2225
StringOfInvalidLengthError,
2326
}
2427

@@ -54,10 +57,7 @@ impl EncryptedAddressBytes {
5457
pub fn try_from_base58_string<S: Into<String>>(
5558
val: S,
5659
) -> Result<Self, EncryptedAddressConversionError> {
57-
let decoded = match bs58::decode(val.into()).into_vec() {
58-
Ok(decoded) => decoded,
59-
Err(err) => return Err(EncryptedAddressConversionError::DecodeError(err)),
60-
};
60+
let decoded = bs58::decode(val.into()).into_vec()?;
6161

6262
if decoded.len() != ENCRYPTED_ADDRESS_SIZE {
6363
return Err(EncryptedAddressConversionError::StringOfInvalidLengthError);

gateway/gateway-requests/src/iv.rs

+9-6
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,25 @@ use crypto::generic_array::{typenum::Unsigned, GenericArray};
55
use crypto::symmetric::stream_cipher::{random_iv, NewCipher, IV as CryptoIV};
66
use nymsphinx::params::GatewayEncryptionAlgorithm;
77
use rand::{CryptoRng, RngCore};
8+
use thiserror::Error;
89

910
type NonceSize = <GatewayEncryptionAlgorithm as NewCipher>::NonceSize;
1011

1112
// I think 'IV' looks better than 'Iv', feel free to change that.
1213
#[allow(clippy::upper_case_acronyms)]
1314
pub struct IV(CryptoIV<GatewayEncryptionAlgorithm>);
1415

15-
#[derive(Debug)]
16+
#[derive(Error, Debug)]
1617
// I think 'IV' looks better than 'Iv', feel free to change that.
1718
#[allow(clippy::upper_case_acronyms)]
1819
pub enum IVConversionError {
19-
DecodeError(bs58::decode::Error),
20+
#[error("Failed to decode the iv - {0}")]
21+
DecodeError(#[from] bs58::decode::Error),
22+
23+
#[error("The decoded bytes iv has invalid length")]
2024
BytesOfInvalidLengthError,
25+
26+
#[error("The decoded string iv has invalid length")]
2127
StringOfInvalidLengthError,
2228
}
2329

@@ -47,10 +53,7 @@ impl IV {
4753
}
4854

4955
pub fn try_from_base58_string<S: Into<String>>(val: S) -> Result<Self, IVConversionError> {
50-
let decoded = match bs58::decode(val.into()).into_vec() {
51-
Ok(decoded) => decoded,
52-
Err(err) => return Err(IVConversionError::DecodeError(err)),
53-
};
56+
let decoded = bs58::decode(val.into()).into_vec()?;
5457

5558
if decoded.len() != NonceSize::to_usize() {
5659
return Err(IVConversionError::StringOfInvalidLengthError);

gateway/gateway-requests/src/registration/handshake/shared_key.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ type EncryptionKeySize = <GatewayEncryptionAlgorithm as NewCipher>::KeySize;
2222
/// Shared key used when computing MAC for messages exchanged between client and its gateway.
2323
pub type MacKey = GenericArray<u8, MacKeySize>;
2424

25-
#[derive(Clone, Debug)]
25+
#[derive(Clone, Copy, Debug)]
2626
pub struct SharedKeys {
2727
encryption_key: CipherKey<GatewayEncryptionAlgorithm>,
2828
mac_key: MacKey,

gateway/gateway-requests/src/types.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ impl fmt::Display for GatewayRequestsError {
8989
}
9090
}
9191

92+
impl std::error::Error for GatewayRequestsError {}
93+
9294
impl From<NymNodeRoutingAddressError> for GatewayRequestsError {
9395
fn from(_: NymNodeRoutingAddressError) -> Self {
9496
GatewayRequestsError::IncorrectlyEncodedAddress
@@ -191,9 +193,8 @@ impl TryInto<String> for ClientControlRequest {
191193
pub enum ServerResponse {
192194
Authenticate { status: bool },
193195
Register { status: bool },
194-
// Maybe we could return the remaining bandwidth?
195-
Bandwidth { status: bool },
196-
Send { status: bool },
196+
Bandwidth { available_total: i64 },
197+
Send { remaining_bandwidth: i64 },
197198
Error { message: String },
198199
}
199200

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2021 - Nym Technologies SA <[email protected]>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
CREATE TABLE shared_keys
7+
(
8+
client_address_bs58 TEXT NOT NULL PRIMARY KEY UNIQUE,
9+
derived_aes128_ctr_blake3_hmac_keys_bs58 TEXT NOT NULL
10+
);
11+
12+
CREATE TABLE message_store
13+
(
14+
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
15+
client_address_bs58 TEXT NOT NULL,
16+
content BLOB NOT NULL
17+
);
18+
19+
CREATE TABLE available_bandwidth
20+
(
21+
client_address_bs58 TEXT NOT NULL PRIMARY KEY UNIQUE,
22+
available INTEGER NOT NULL
23+
);
24+
25+
CREATE INDEX `message_store_index` ON `message_store` (`client_address_bs58`, `content`);

gateway/src/commands/init.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,9 @@ pub fn command_args<'a, 'b>() -> clap::App<'a, 'b> {
4444
.takes_value(true),
4545
)
4646
.arg(
47-
Arg::with_name(INBOXES_ARG_NAME)
48-
.long(INBOXES_ARG_NAME)
49-
.help("Directory with inboxes where all packets for the clients are stored")
50-
.takes_value(true)
51-
)
52-
.arg(
53-
Arg::with_name(CLIENTS_LEDGER_ARG_NAME)
54-
.long(CLIENTS_LEDGER_ARG_NAME)
55-
.help("Ledger file containing registered clients")
47+
Arg::with_name(DATASTORE_PATH)
48+
.long(DATASTORE_PATH)
49+
.help("Path to sqlite database containing all gateway persistent data")
5650
.takes_value(true)
5751
)
5852
.arg(
@@ -115,7 +109,7 @@ fn show_bonding_info(config: &Config) {
115109
);
116110
}
117111

118-
pub fn execute(matches: &ArgMatches) {
112+
pub async fn execute(matches: ArgMatches<'static>) {
119113
let id = matches.value_of(ID_ARG_NAME).unwrap();
120114
println!("Initialising gateway {}...", id);
121115

@@ -128,7 +122,7 @@ pub fn execute(matches: &ArgMatches) {
128122

129123
let mut config = Config::new(id);
130124

131-
config = override_config(config, matches);
125+
config = override_config(config, &matches);
132126

133127
// if gateway was already initialised, don't generate new keys
134128
if !already_init {

gateway/src/commands/mod.rs

+3-8
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ pub(crate) const MIX_PORT_ARG_NAME: &str = "mix-port";
1515
pub(crate) const CLIENTS_PORT_ARG_NAME: &str = "clients-port";
1616
pub(crate) const VALIDATORS_ARG_NAME: &str = "validators";
1717
pub(crate) const ANNOUNCE_HOST_ARG_NAME: &str = "announce-host";
18-
pub(crate) const INBOXES_ARG_NAME: &str = "inboxes";
19-
pub(crate) const CLIENTS_LEDGER_ARG_NAME: &str = "clients-ledger";
18+
pub(crate) const DATASTORE_PATH: &str = "datastore";
2019

2120
fn parse_validators(raw: &str) -> Vec<Url> {
2221
raw.split(',')
@@ -69,12 +68,8 @@ pub(crate) fn override_config(mut config: Config, matches: &ArgMatches) -> Confi
6968
config = config.with_custom_validator_apis(parse_validators(raw_validators));
7069
}
7170

72-
if let Some(inboxes_dir) = matches.value_of(INBOXES_ARG_NAME) {
73-
config = config.with_custom_clients_inboxes(inboxes_dir);
74-
}
75-
76-
if let Some(clients_ledger) = matches.value_of(CLIENTS_LEDGER_ARG_NAME) {
77-
config = config.with_custom_clients_ledger(clients_ledger);
71+
if let Some(datastore_path) = matches.value_of(DATASTORE_PATH) {
72+
config = config.with_custom_persistent_store(datastore_path);
7873
}
7974

8075
config

0 commit comments

Comments
 (0)