Skip to content

Feature/persistent gateway storage #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
91ddf6b
Sqlx struct stub
jstuczyn Sep 21, 2021
56ca1b9
Initial schema
jstuczyn Sep 21, 2021
27e5ac0
Initial error enum
jstuczyn Sep 21, 2021
403bcfb
Managed for persisted shared keys
jstuczyn Sep 21, 2021
4076c5d
Initial inbox manager
jstuczyn Sep 21, 2021
b994d8b
Comments
jstuczyn Sep 21, 2021
f53d084
Using new database in clients handler
jstuczyn Sep 21, 2021
1fe6c08
Extending gateway storage API
jstuczyn Sep 21, 2021
e1c0247
tokio::main + placeholder values
jstuczyn Sep 21, 2021
8e896d5
Removed old client store
jstuczyn Sep 22, 2021
9fce084
Simplified logic of async packet processing
jstuczyn Sep 22, 2021
650b159
Renamed table + not null restriction
jstuczyn Sep 22, 2021
11997ca
BandwidthManager
jstuczyn Sep 22, 2021
34329d7
Removed sled dependency
jstuczyn Sep 22, 2021
f871dc6
Using centralised storage for bandwidth
jstuczyn Sep 22, 2021
d697e4c
Dead code removal
jstuczyn Sep 22, 2021
f36e305
WIP connection_handler split and simplification
jstuczyn Sep 22, 2021
02967b0
Further more explicit clients handler split
jstuczyn Sep 23, 2021
7a87616
Minor cleanup
jstuczyn Sep 23, 2021
48548c3
Temporary store for active client handles
jstuczyn Sep 23, 2021
c9b7b24
Fixed error types
jstuczyn Sep 23, 2021
de1ae7c
Error trait on iv and encrypted address
jstuczyn Sep 23, 2021
c2f462e
Authentication and registration moved to the handler
jstuczyn Sep 23, 2021
48ef3e3
Removal of clients handler
jstuczyn Sep 23, 2021
00c9d20
Further logic simplification + returned explicit bandwidth values
jstuczyn Sep 23, 2021
ce86348
Further cleanup and comments
jstuczyn Sep 23, 2021
61f11b0
Updated config with relevant changes
jstuczyn Sep 23, 2021
54dd567
Basic bandwidth tracking in client
jstuczyn Sep 23, 2021
6974e11
FreshHandle doc comments + fixed stagger issue
jstuczyn Sep 23, 2021
859a690
Removed side-effects from .map
jstuczyn Sep 23, 2021
07ebd97
More doc comments
jstuczyn Sep 23, 2021
ef63ab3
Database migration on build
jstuczyn Sep 23, 2021
5f75b88
Increased default claimed bandwidth
jstuczyn Sep 24, 2021
24b5a16
Renaming
jstuczyn Sep 24, 2021
a0faed3
Fixed client determining available bandwidth
jstuczyn Sep 24, 2021
f44024f
Removed dead sql table that might be used in the future
jstuczyn Sep 24, 2021
3eceb34
Windows workaround
jstuczyn Sep 24, 2021
b932635
Comment
jstuczyn Sep 29, 2021
0b5212e
Return error rather than cap credential
jstuczyn Sep 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions clients/client-core/src/client/mix_traffic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl MixTrafficController {
async fn on_messages(&mut self, mut mix_packets: Vec<MixPacket>) {
debug_assert!(!mix_packets.is_empty());

let success = if mix_packets.len() == 1 {
let result = if mix_packets.len() == 1 {
let mix_packet = mix_packets.pop().unwrap();
self.gateway_client.send_mix_packet(mix_packet).await
} else {
Expand All @@ -49,7 +49,7 @@ impl MixTrafficController {
.await
};

match success {
match result {
Err(e) => {
error!("Failed to send sphinx packet(s) to the gateway! - {:?}", e);
self.consecutive_gateway_failure_count += 1;
Expand Down
24 changes: 15 additions & 9 deletions common/client-libs/gateway-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ const DEFAULT_RECONNECTION_BACKOFF: Duration = Duration::from_secs(5);

pub struct GatewayClient {
authenticated: bool,
// TODO: This should be replaced by an actual bandwidth value, with 0 meaning no bandwidth
has_bandwidth: bool,
bandwidth_remaining: i64,
gateway_address: String,
gateway_identity: identity::PublicKey,
local_identity: Arc<identity::KeyPair>,
Expand Down Expand Up @@ -72,7 +71,7 @@ impl GatewayClient {
) -> Self {
GatewayClient {
authenticated: false,
has_bandwidth: false,
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
local_identity,
Expand Down Expand Up @@ -117,7 +116,7 @@ impl GatewayClient {

GatewayClient {
authenticated: false,
has_bandwidth: false,
bandwidth_remaining: 0,
gateway_address,
gateway_identity,
local_identity,
Expand Down Expand Up @@ -474,22 +473,29 @@ impl GatewayClient {
)
.ok_or(GatewayClientError::SerializeCredential)?
.into();
self.has_bandwidth = match self.send_websocket_message(msg).await? {
ServerResponse::Bandwidth { status } => Ok(status),
self.bandwidth_remaining = match self.send_websocket_message(msg).await? {
ServerResponse::Bandwidth { available_total } => Ok(available_total),
ServerResponse::Error { message } => Err(GatewayClientError::GatewayError(message)),
_ => Err(GatewayClientError::UnexpectedResponse),
}?;
Ok(())
}

fn estimate_required_bandwidth(&self, packets: &[MixPacket]) -> i64 {
packets
.iter()
.map(|packet| packet.sphinx_packet().len())
.sum::<usize>() as i64
}

pub async fn batch_send_mix_packets(
&mut self,
packets: Vec<MixPacket>,
) -> Result<(), GatewayClientError> {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
if !self.has_bandwidth {
if self.estimate_required_bandwidth(&packets) < self.bandwidth_remaining {
return Err(GatewayClientError::NotEnoughBandwidth);
}
if !self.connection.is_established() {
Expand Down Expand Up @@ -550,7 +556,7 @@ impl GatewayClient {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
if !self.has_bandwidth {
if (mix_packet.sphinx_packet().len() as i64) > self.bandwidth_remaining {
return Err(GatewayClientError::NotEnoughBandwidth);
}
if !self.connection.is_established() {
Expand Down Expand Up @@ -598,7 +604,7 @@ impl GatewayClient {
if !self.authenticated {
return Err(GatewayClientError::NotAuthenticated);
}
if !self.has_bandwidth {
if self.bandwidth_remaining <= 0 {
return Err(GatewayClientError::NotEnoughBandwidth);
}
if self.connection.is_partially_delegated() {
Expand Down
2 changes: 1 addition & 1 deletion common/credentials/src/bandwidth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::error::Error;
use crate::utils::{obtain_aggregate_signature, prepare_credential_for_spending};
use coconut_interface::{hash_to_scalar, Credential, Parameters, Signature, VerificationKey};

const BANDWIDTH_VALUE: u64 = 1024 * 1024; // 1 MB
const BANDWIDTH_VALUE: u64 = 10 * 1024 * 1024 * 1024; // 10 GB

pub const PUBLIC_ATTRIBUTES: u32 = 1;
pub const PRIVATE_ATTRIBUTES: u32 = 1;
Expand Down
7 changes: 6 additions & 1 deletion gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ log = "0.4"
pretty_env_logger = "0.4"
rand = "0.7"
serde = { version = "1.0.104", features = ["derive"] }
sled = "0.34"
thiserror = "1"
tokio = { version = "1.4", features = [ "rt-multi-thread", "net", "signal", "fs" ] }
tokio-util = { version = "0.6", features = [ "codec" ] }
tokio-stream = { version = "0.1", features = [ "fs" ] }
tokio-tungstenite = "0.14"
url = { version = "2.2", features = [ "serde" ] }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }

# internal
coconut-interface = { path = "../common/coconut-interface" }
Expand All @@ -39,3 +40,7 @@ nymsphinx = { path = "../common/nymsphinx" }
pemstore = { path = "../common/pemstore" }
validator-client = { path = "../common/client-libs/validator-client" }
version-checker = { path = "../common/version-checker" }

[build-dependencies]
tokio = { version = "1.4", features = ["rt-multi-thread", "macros"] }
sqlx = { version = "0.5", features = ["runtime-tokio-rustls", "sqlite", "macros", "migrate"] }
25 changes: 25 additions & 0 deletions gateway/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use sqlx::{Connection, SqliteConnection};
use std::env;

#[tokio::main]
async fn main() {
let out_dir = env::var("OUT_DIR").unwrap();
let database_path = format!("{}/gateway-example.sqlite", out_dir);

let mut conn = SqliteConnection::connect(&*format!("sqlite://{}?mode=rwc", database_path))
.await
.expect("Failed to create SQLx database connection");

sqlx::migrate!("./migrations")
.run(&mut conn)
.await
.expect("Failed to perform SQLx migrations");

#[cfg(target_family = "unix")]
println!("cargo:rustc-env=DATABASE_URL=sqlite://{}", &database_path);

#[cfg(target_family = "windows")]
// for some strange reason we need to add a leading `/` to the windows path even though it's
// not a valid windows path... but hey, it works...
println!("cargo:rustc-env=DATABASE_URL=sqlite:///{}", &database_path);
}
12 changes: 6 additions & 6 deletions gateway/gateway-requests/src/authentication/encrypted_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::registration::handshake::shared_key::SharedKeys;
use crypto::symmetric::stream_cipher;
use nymsphinx::params::GatewayEncryptionAlgorithm;
use nymsphinx::{DestinationAddressBytes, DESTINATION_ADDRESS_LENGTH};
use thiserror::Error;

pub const ENCRYPTED_ADDRESS_SIZE: usize = DESTINATION_ADDRESS_LENGTH;

Expand All @@ -16,9 +17,11 @@ pub const ENCRYPTED_ADDRESS_SIZE: usize = DESTINATION_ADDRESS_LENGTH;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub struct EncryptedAddressBytes([u8; ENCRYPTED_ADDRESS_SIZE]);

#[derive(Debug)]
#[derive(Debug, Error)]
pub enum EncryptedAddressConversionError {
DecodeError(bs58::decode::Error),
#[error("Failed to decode the encrypted address - {0}")]
DecodeError(#[from] bs58::decode::Error),
#[error("The decoded address has invalid length")]
StringOfInvalidLengthError,
}

Expand Down Expand Up @@ -54,10 +57,7 @@ impl EncryptedAddressBytes {
pub fn try_from_base58_string<S: Into<String>>(
val: S,
) -> Result<Self, EncryptedAddressConversionError> {
let decoded = match bs58::decode(val.into()).into_vec() {
Ok(decoded) => decoded,
Err(err) => return Err(EncryptedAddressConversionError::DecodeError(err)),
};
let decoded = bs58::decode(val.into()).into_vec()?;

if decoded.len() != ENCRYPTED_ADDRESS_SIZE {
return Err(EncryptedAddressConversionError::StringOfInvalidLengthError);
Expand Down
15 changes: 9 additions & 6 deletions gateway/gateway-requests/src/iv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,25 @@ use crypto::generic_array::{typenum::Unsigned, GenericArray};
use crypto::symmetric::stream_cipher::{random_iv, NewCipher, IV as CryptoIV};
use nymsphinx::params::GatewayEncryptionAlgorithm;
use rand::{CryptoRng, RngCore};
use thiserror::Error;

type NonceSize = <GatewayEncryptionAlgorithm as NewCipher>::NonceSize;

// I think 'IV' looks better than 'Iv', feel free to change that.
#[allow(clippy::upper_case_acronyms)]
pub struct IV(CryptoIV<GatewayEncryptionAlgorithm>);

#[derive(Debug)]
#[derive(Error, Debug)]
// I think 'IV' looks better than 'Iv', feel free to change that.
#[allow(clippy::upper_case_acronyms)]
pub enum IVConversionError {
DecodeError(bs58::decode::Error),
#[error("Failed to decode the iv - {0}")]
DecodeError(#[from] bs58::decode::Error),

#[error("The decoded bytes iv has invalid length")]
BytesOfInvalidLengthError,

#[error("The decoded string iv has invalid length")]
StringOfInvalidLengthError,
}

Expand Down Expand Up @@ -47,10 +53,7 @@ impl IV {
}

pub fn try_from_base58_string<S: Into<String>>(val: S) -> Result<Self, IVConversionError> {
let decoded = match bs58::decode(val.into()).into_vec() {
Ok(decoded) => decoded,
Err(err) => return Err(IVConversionError::DecodeError(err)),
};
let decoded = bs58::decode(val.into()).into_vec()?;

if decoded.len() != NonceSize::to_usize() {
return Err(IVConversionError::StringOfInvalidLengthError);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type EncryptionKeySize = <GatewayEncryptionAlgorithm as NewCipher>::KeySize;
/// Shared key used when computing MAC for messages exchanged between client and its gateway.
pub type MacKey = GenericArray<u8, MacKeySize>;

#[derive(Clone, Debug)]
#[derive(Clone, Copy, Debug)]
pub struct SharedKeys {
encryption_key: CipherKey<GatewayEncryptionAlgorithm>,
mac_key: MacKey,
Expand Down
7 changes: 4 additions & 3 deletions gateway/gateway-requests/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ impl fmt::Display for GatewayRequestsError {
}
}

impl std::error::Error for GatewayRequestsError {}

impl From<NymNodeRoutingAddressError> for GatewayRequestsError {
fn from(_: NymNodeRoutingAddressError) -> Self {
GatewayRequestsError::IncorrectlyEncodedAddress
Expand Down Expand Up @@ -191,9 +193,8 @@ impl TryInto<String> for ClientControlRequest {
pub enum ServerResponse {
Authenticate { status: bool },
Register { status: bool },
// Maybe we could return the remaining bandwidth?
Bandwidth { status: bool },
Send { status: bool },
Bandwidth { available_total: i64 },
Send { remaining_bandwidth: i64 },
Error { message: String },
}

Expand Down
25 changes: 25 additions & 0 deletions gateway/migrations/20210921120000_create_initial_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2021 - Nym Technologies SA <[email protected]>
* SPDX-License-Identifier: Apache-2.0
*/

CREATE TABLE shared_keys
(
client_address_bs58 TEXT NOT NULL PRIMARY KEY UNIQUE,
derived_aes128_ctr_blake3_hmac_keys_bs58 TEXT NOT NULL
);

CREATE TABLE message_store
(
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
client_address_bs58 TEXT NOT NULL,
content BLOB NOT NULL
);

CREATE TABLE available_bandwidth
(
client_address_bs58 TEXT NOT NULL PRIMARY KEY UNIQUE,
available INTEGER NOT NULL
);

CREATE INDEX `message_store_index` ON `message_store` (`client_address_bs58`, `content`);
16 changes: 5 additions & 11 deletions gateway/src/commands/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,9 @@ pub fn command_args<'a, 'b>() -> clap::App<'a, 'b> {
.takes_value(true),
)
.arg(
Arg::with_name(INBOXES_ARG_NAME)
.long(INBOXES_ARG_NAME)
.help("Directory with inboxes where all packets for the clients are stored")
.takes_value(true)
)
.arg(
Arg::with_name(CLIENTS_LEDGER_ARG_NAME)
.long(CLIENTS_LEDGER_ARG_NAME)
.help("Ledger file containing registered clients")
Arg::with_name(DATASTORE_PATH)
.long(DATASTORE_PATH)
.help("Path to sqlite database containing all gateway persistent data")
.takes_value(true)
)
.arg(
Expand Down Expand Up @@ -115,7 +109,7 @@ fn show_bonding_info(config: &Config) {
);
}

pub fn execute(matches: &ArgMatches) {
pub async fn execute(matches: ArgMatches<'static>) {
let id = matches.value_of(ID_ARG_NAME).unwrap();
println!("Initialising gateway {}...", id);

Expand All @@ -128,7 +122,7 @@ pub fn execute(matches: &ArgMatches) {

let mut config = Config::new(id);

config = override_config(config, matches);
config = override_config(config, &matches);

// if gateway was already initialised, don't generate new keys
if !already_init {
Expand Down
11 changes: 3 additions & 8 deletions gateway/src/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ pub(crate) const MIX_PORT_ARG_NAME: &str = "mix-port";
pub(crate) const CLIENTS_PORT_ARG_NAME: &str = "clients-port";
pub(crate) const VALIDATORS_ARG_NAME: &str = "validators";
pub(crate) const ANNOUNCE_HOST_ARG_NAME: &str = "announce-host";
pub(crate) const INBOXES_ARG_NAME: &str = "inboxes";
pub(crate) const CLIENTS_LEDGER_ARG_NAME: &str = "clients-ledger";
pub(crate) const DATASTORE_PATH: &str = "datastore";

fn parse_validators(raw: &str) -> Vec<Url> {
raw.split(',')
Expand Down Expand Up @@ -69,12 +68,8 @@ pub(crate) fn override_config(mut config: Config, matches: &ArgMatches) -> Confi
config = config.with_custom_validator_apis(parse_validators(raw_validators));
}

if let Some(inboxes_dir) = matches.value_of(INBOXES_ARG_NAME) {
config = config.with_custom_clients_inboxes(inboxes_dir);
}

if let Some(clients_ledger) = matches.value_of(CLIENTS_LEDGER_ARG_NAME) {
config = config.with_custom_clients_ledger(clients_ledger);
if let Some(datastore_path) = matches.value_of(DATASTORE_PATH) {
config = config.with_custom_persistent_store(datastore_path);
}

config
Expand Down
Loading