Skip to content

Feature/persistent gateway storage #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Sep 30, 2021
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
91ddf6b
Sqlx struct stub
jstuczyn Sep 21, 2021
56ca1b9
Initial schema
jstuczyn Sep 21, 2021
27e5ac0
Initial error enum
jstuczyn Sep 21, 2021
403bcfb
Managed for persisted shared keys
jstuczyn Sep 21, 2021
4076c5d
Initial inbox manager
jstuczyn Sep 21, 2021
b994d8b
Comments
jstuczyn Sep 21, 2021
f53d084
Using new database in clients handler
jstuczyn Sep 21, 2021
1fe6c08
Extending gateway storage API
jstuczyn Sep 21, 2021
e1c0247
tokio::main + placeholder values
jstuczyn Sep 21, 2021
8e896d5
Removed old client store
jstuczyn Sep 22, 2021
9fce084
Simplified logic of async packet processing
jstuczyn Sep 22, 2021
650b159
Renamed table + not null restriction
jstuczyn Sep 22, 2021
11997ca
BandwidthManager
jstuczyn Sep 22, 2021
34329d7
Removed sled dependency
jstuczyn Sep 22, 2021
f871dc6
Using centralised storage for bandwidth
jstuczyn Sep 22, 2021
d697e4c
Dead code removal
jstuczyn Sep 22, 2021
f36e305
WIP connection_handler split and simplification
jstuczyn Sep 22, 2021
02967b0
Further more explicit clients handler split
jstuczyn Sep 23, 2021
7a87616
Minor cleanup
jstuczyn Sep 23, 2021
48548c3
Temporary store for active client handles
jstuczyn Sep 23, 2021
c9b7b24
Fixed error types
jstuczyn Sep 23, 2021
de1ae7c
Error trait on iv and encrypted address
jstuczyn Sep 23, 2021
c2f462e
Authentication and registration moved to the handler
jstuczyn Sep 23, 2021
48ef3e3
Removal of clients handler
jstuczyn Sep 23, 2021
00c9d20
Further logic simplification + returned explicit bandwidth values
jstuczyn Sep 23, 2021
ce86348
Further cleanup and comments
jstuczyn Sep 23, 2021
61f11b0
Updated config with relevant changes
jstuczyn Sep 23, 2021
54dd567
Basic bandwidth tracking in client
jstuczyn Sep 23, 2021
6974e11
FreshHandle doc comments + fixed stagger issue
jstuczyn Sep 23, 2021
859a690
Removed side-effects from .map
jstuczyn Sep 23, 2021
07ebd97
More doc comments
jstuczyn Sep 23, 2021
ef63ab3
Database migration on build
jstuczyn Sep 23, 2021
5f75b88
Increased default claimed bandwidth
jstuczyn Sep 24, 2021
24b5a16
Renaming
jstuczyn Sep 24, 2021
a0faed3
Fixed client determining available bandwidth
jstuczyn Sep 24, 2021
f44024f
Removed dead sql table that might be used in the future
jstuczyn Sep 24, 2021
3eceb34
Windows workaround
jstuczyn Sep 24, 2021
b932635
Comment
jstuczyn Sep 29, 2021
0b5212e
Return error rather than cap credential
jstuczyn Sep 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 131 additions & 76 deletions gateway/src/node/client_handling/clients_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2020 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::node::storage::error::StorageError;
use crate::node::storage::GatewayStorage;
use crate::node::{
client_handling::websocket::message_receiver::MixMessageSender,
storage::{inboxes::ClientStorage, ClientLedger},
Expand Down Expand Up @@ -54,16 +56,14 @@ pub(crate) enum ClientsHandlerResponse {

pub(crate) struct ClientsHandler {
open_connections: HashMap<DestinationAddressBytes, MixMessageSender>,
clients_ledger: ClientLedger,
clients_inbox_storage: ClientStorage,
storage: GatewayStorage,
}

impl ClientsHandler {
pub(crate) fn new(clients_ledger: ClientLedger, clients_inbox_storage: ClientStorage) -> Self {
pub(crate) fn new(storage: GatewayStorage) -> Self {
ClientsHandler {
open_connections: HashMap::new(),
clients_ledger,
clients_inbox_storage,
storage,
}
}

Expand Down Expand Up @@ -93,51 +93,68 @@ impl ClientsHandler {
// i.e. be stored on the disk rather than pushed to the client, reason for this is as follows:
// now we push all stored messages from client's inbox to its websocket connection
// however, say, at the same time there's new message to the client - it gets stored on the disk!
// And only after this method exists, mix receivers will become aware of the client
// And only after this method exits, mix receivers will become aware of the client
// connection going online and being able to forward traffic there.
//
// possible solution: spawn a future to empty inbox in X seconds rather than immediately
// JS: I will most likely do that (with including entries to config, etc.) once the
// basic version is up and running as not to waste time on it now
//
// possible solution2 after a year: just have an atomic flag to indicate stuff should cache messages for few seconds

// NOTE: THIS IGNORES MESSAGE RETRIEVAL LIMIT AND TAKES EVERYTHING!
let all_stored_messages = match self
.clients_inbox_storage
.retrieve_all_client_messages(client_address)
.await
{
Ok(msgs) => msgs,
Err(e) => {
error!(
"failed to retrieve client messages. {:?} inbox might be corrupted now - {:?}",
client_address.as_base58_string(),
e
// TODO: SECURITY (kinda):
// We should stagger reading the messages in a different way, i.e. we read some of them,
// send them all the way back to the client and then read next batch. Otherwise we risk
// being vulnerable to trivial attacks causing gateway crashes.
let mut start_next_after = None;
loop {
let (messages, new_start_next_after) = match self
.storage
.retrieve_messages(client_address, start_next_after)
.await
{
Err(err) => {
error!(
"failed to retrieve client messages - {}. There might be some database corruption.",
err
);
return;
}
};
return;
}
Ok(stored) => stored,
};

let (messages, paths): (Vec<_>, Vec<_>) = all_stored_messages
.into_iter()
.map(|c| c.into_tuple())
.unzip();
let (messages, ids) = messages
.into_iter()
.map(|msg| (msg.content, msg.id))
.unzip();

if comm_channel.unbounded_send(messages).is_err() {
error!("Somehow we failed to stored messages to a fresh client channel - there seem to be a weird bug present!");
} else {
// but if all went well, we can now delete it
if let Err(e) = self.clients_inbox_storage.delete_files(paths).await {
error!(
"Failed to remove client ({:?}) files - {:?}",
client_address.as_base58_string(),
e
);
if comm_channel.unbounded_send(messages).is_err() {
error!("Somehow we failed to stored messages to a fresh client channel - there seem to be a weird bug present!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Failed to store

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code no longer exists anyway : )

} else {
// after sending the messages, remove them from the storage
// TODO: this kinda relates to the previously mentioned idea of different staggering method
// because technically we don't know if the client received those messages. We only pushed
// them upon the channel that will eventually be read and then sent to the socket
// so technically we can lose packets here
if let Err(err) = self.storage.remove_messages(ids).await {
error!(
"failed to remove old client messages - {}. There might be some database corruption.",
err
);
}
}

// no more messages to grab
if new_start_next_after.is_none() {
break;
} else {
// finally, everything was fine - we retrieved everything, we deleted everything,
// we assume we can now safely delegate client message pushing
self.open_connections.insert(client_address, comm_channel);
start_next_after = new_start_next_after
}
}

// finally, everything was fine - we retrieved everything, we deleted everything,
// we assume we can now safely delegate client message pushing
self.open_connections.insert(client_address, comm_channel);
}

async fn handle_register_request(
Expand All @@ -148,7 +165,7 @@ impl ClientsHandler {
res_channel: ClientsHandlerResponseSender,
) {
debug!(
"Processing register new client request: {:?}",
"Processing register new client request: {}",
address.as_base58_string()
);

Expand All @@ -160,21 +177,13 @@ impl ClientsHandler {
return;
}

if self
.clients_ledger
.insert_shared_key(derived_shared_key, address)
.unwrap()
.is_some()
if let Err(err) = self
.storage
.insert_shared_keys(address, derived_shared_key)
.await
{
info!(
"Client {:?} was already registered before!",
address.as_base58_string()
)
} else if let Err(e) = self.clients_inbox_storage.create_storage_dir(address).await {
error!("We failed to create inbox directory for the client -{:?}\nReverting stored shared key...", e);
// we must revert our changes if this operation failed
self.clients_ledger.remove_shared_key(&address).unwrap();
self.send_error_response("failed to complete issuing shared key", res_channel);
error!("We failed to store client's shared key... - {}", err);
self.send_error_response("Internal gateway storage error", res_channel);
return;
}

Expand All @@ -189,6 +198,57 @@ impl ClientsHandler {
}
}

/// Checks whether the stored shared keys match the received data, i.e. whether the upon decryption
/// the provided encrypted address matches the expected unencrypted address.
///
/// Returns the result of the check alongside the retrieved shared key,
///
/// # Arguments
///
/// * `client_address`: address of the client.
/// * `encrypted_address`: encrypted address of the client, presumably encrypted using the shared keys.
/// * `iv`: iv created for this particular encryption.
async fn verify_stored_shared_key(
&self,
client_address: DestinationAddressBytes,
encrypted_address: EncryptedAddressBytes,
iv: IV,
) -> Result<(bool, Option<SharedKeys>), StorageError> {
let shared_keys = self.storage.get_shared_keys(client_address).await?;

if let Some(shared_keys) = shared_keys {
// the unwrap here is fine as we only ever construct persisted shared keys ourselves when inserting
// data to the storage. The only way it could fail is if we somehow changed implementation without
// performing proper migration
let keys = SharedKeys::try_from_base58_string(
shared_keys.derived_aes128_ctr_blake3_hmac_keys_bs58,
)
.unwrap();
// TODO: SECURITY:
// this is actually what we have been doing in the past, however,
// after looking deeper into implementation it seems that only checks the encryption
// key part of the shared keys. the MAC key might still be wrong
// (though I don't see how could this happen unless client messed with himself
// and I don't think it could lead to any attacks, but somebody smarter should take a look)
Ok((
encrypted_address.verify(&client_address, &keys, &iv),
Some(keys),
))
} else {
Ok((false, None))
}
}

/// A tiny helper function to log any errors that shouldn't really have occurred when sending responses;
fn send_handler_response(
res_channel: ClientsHandlerResponseSender,
response: ClientsHandlerResponse,
) {
if res_channel.send(response).is_err() {
error!("Somehow we failed to send response back to websocket handler - there seem to be a weird bug present!");
}
}

async fn handle_authenticate_request(
&mut self,
address: DestinationAddressBytes,
Expand All @@ -208,31 +268,26 @@ impl ClientsHandler {
return;
}

if self
.clients_ledger
.verify_shared_key(&address, &encrypted_address, &iv)
.unwrap()
match self
.verify_stored_shared_key(address, encrypted_address, iv)
.await
{
// The first unwrap is due to possible db read errors, but I'm not entirely sure when could
// the second one happen.
let shared_key = self
.clients_ledger
.get_shared_key(&address)
.unwrap()
.unwrap();
self.push_stored_messages_to_client_and_save_channel(address, comm_channel)
.await;
if res_channel
.send(ClientsHandlerResponse::Authenticate(Some(shared_key)))
.is_err()
{
error!("Somehow we failed to send response back to websocket handler - there seem to be a weird bug present!");
Err(err) => {
error!("We failed to read client's stored shared key... - {}", err);
self.send_error_response("Internal gateway storage error", res_channel);
return;
}
Ok((false, _)) => {
Self::send_handler_response(res_channel, ClientsHandlerResponse::Authenticate(None))
}
Ok((true, shared_keys)) => {
self.push_stored_messages_to_client_and_save_channel(address, comm_channel)
.await;
Self::send_handler_response(
res_channel,
ClientsHandlerResponse::Authenticate(shared_keys),
)
}
} else if res_channel
.send(ClientsHandlerResponse::Authenticate(None))
.is_err()
{
error!("Somehow we failed to send response back to websocket handler - there seem to be a weird bug present!");
}
}

Expand Down