Skip to content

Feature/persistent gateway storage #784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Sep 30, 2021
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
91ddf6b
Sqlx struct stub
jstuczyn Sep 21, 2021
56ca1b9
Initial schema
jstuczyn Sep 21, 2021
27e5ac0
Initial error enum
jstuczyn Sep 21, 2021
403bcfb
Managed for persisted shared keys
jstuczyn Sep 21, 2021
4076c5d
Initial inbox manager
jstuczyn Sep 21, 2021
b994d8b
Comments
jstuczyn Sep 21, 2021
f53d084
Using new database in clients handler
jstuczyn Sep 21, 2021
1fe6c08
Extending gateway storage API
jstuczyn Sep 21, 2021
e1c0247
tokio::main + placeholder values
jstuczyn Sep 21, 2021
8e896d5
Removed old client store
jstuczyn Sep 22, 2021
9fce084
Simplified logic of async packet processing
jstuczyn Sep 22, 2021
650b159
Renamed table + not null restriction
jstuczyn Sep 22, 2021
11997ca
BandwidthManager
jstuczyn Sep 22, 2021
34329d7
Removed sled dependency
jstuczyn Sep 22, 2021
f871dc6
Using centralised storage for bandwidth
jstuczyn Sep 22, 2021
d697e4c
Dead code removal
jstuczyn Sep 22, 2021
f36e305
WIP connection_handler split and simplification
jstuczyn Sep 22, 2021
02967b0
Further more explicit clients handler split
jstuczyn Sep 23, 2021
7a87616
Minor cleanup
jstuczyn Sep 23, 2021
48548c3
Temporary store for active client handles
jstuczyn Sep 23, 2021
c9b7b24
Fixed error types
jstuczyn Sep 23, 2021
de1ae7c
Error trait on iv and encrypted address
jstuczyn Sep 23, 2021
c2f462e
Authentication and registration moved to the handler
jstuczyn Sep 23, 2021
48ef3e3
Removal of clients handler
jstuczyn Sep 23, 2021
00c9d20
Further logic simplification + returned explicit bandwidth values
jstuczyn Sep 23, 2021
ce86348
Further cleanup and comments
jstuczyn Sep 23, 2021
61f11b0
Updated config with relevant changes
jstuczyn Sep 23, 2021
54dd567
Basic bandwidth tracking in client
jstuczyn Sep 23, 2021
6974e11
FreshHandle doc comments + fixed stagger issue
jstuczyn Sep 23, 2021
859a690
Removed side-effects from .map
jstuczyn Sep 23, 2021
07ebd97
More doc comments
jstuczyn Sep 23, 2021
ef63ab3
Database migration on build
jstuczyn Sep 23, 2021
5f75b88
Increased default claimed bandwidth
jstuczyn Sep 24, 2021
24b5a16
Renaming
jstuczyn Sep 24, 2021
a0faed3
Fixed client determining available bandwidth
jstuczyn Sep 24, 2021
f44024f
Removed dead sql table that might be used in the future
jstuczyn Sep 24, 2021
3eceb34
Windows workaround
jstuczyn Sep 24, 2021
b932635
Comment
jstuczyn Sep 29, 2021
0b5212e
Return error rather than cap credential
jstuczyn Sep 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions gateway/src/node/storage/inboxes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2020 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::node::storage::models::StoredMessage;
use futures::lock::Mutex;
use futures::StreamExt;
use gateway_requests::DUMMY_MESSAGE_CONTENT;
Expand All @@ -15,6 +16,123 @@ use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio_stream::wrappers::ReadDirStream;

#[derive(Clone)]
pub(crate) struct InboxManager {
connection_pool: sqlx::SqlitePool,
/// Maximum number of messages that can be obtained from the database per operation.
/// It is used to prevent memory overflows in the case of client receiving a lot of data while
/// offline and then loading it all at once when he comes back online.
retrieval_limit: i64,
}

impl InboxManager {
/// Creates new instance of the `InboxManager` with the provided sqlite connection pool.
///
/// # Arguments
///
/// * `connection_pool`: database connection pool to use.
pub(crate) fn new(connection_pool: sqlx::SqlitePool, retrieval_limit: i64) -> Self {
InboxManager {
connection_pool,
retrieval_limit,
}
}

/// Inserts new message to the storage for an offline client for later retrieval.
///
/// # Arguments
///
/// * `client_address_bs58`: base58-encoded address of the client
/// * `content`: raw content of the message to store.
pub(crate) async fn insert_message(
&self,
client_address_bs58: String,
content: Vec<u8>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO message_store(client_address_bs58, content) VALUES (?, ?)",
client_address_bs58,
content,
)
.execute(&self.connection_pool)
.await?;
Ok(())
}

/// Retrieves messages stored for the particular client specified by the provided address.
///
/// It also respects the specified retrieval limit. If there are more messages stored than allowed
/// by the limit, it returns id of the last message retrieved to indicate start of the next query.
///
/// # Arguments
///
/// * `client_address_bs58`: base58-encoded address of the client
/// * `start_after`: optional starting id of the messages to grab
///
/// returns the retrieved messages alongside optional id of the last message retrieved.
pub(crate) async fn get_messages(
&self,
client_address_bs58: &str,
start_after: Option<i64>,
) -> Result<(Vec<StoredMessage>, Option<i64>), sqlx::Error> {
// get 1 additional message to check whether there will be more to grab
// next time
let limit = self.retrieval_limit + 1;
let mut res = if let Some(start_after) = start_after {
sqlx::query_as!(
StoredMessage,
r#"
SELECT * FROM message_store
WHERE client_address_bs58 = ? AND id > ?
ORDER BY id ASC
LIMIT ?;
"#,
start_after,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
} else {
sqlx::query_as!(
StoredMessage,
r#"
SELECT * FROM message_store
WHERE client_address_bs58 = ?
ORDER BY id ASC
LIMIT ?;
"#,
client_address_bs58,
limit
)
.fetch_all(&self.connection_pool)
.await?
};

if res.len() > self.retrieval_limit as usize {
res.truncate(self.retrieval_limit as usize);
// assuming retrieval_limit > 0, unwrap will not fail
let start_after = res.last().unwrap().id;
Ok((res, Some(start_after)))
//
} else {
Ok((res, None))
}
}

/// Removes message with the specified id
///
/// # Arguments
///
/// * `id`: id of the message to remove
pub(crate) async fn remove_message(&self, id: i64) -> Result<(), sqlx::Error> {
sqlx::query!("DELETE FROM message_store WHERE id = ?", id)
.execute(&self.connection_pool)
.await?;
Ok(())
}
}

fn dummy_message() -> ClientFile {
ClientFile {
content: DUMMY_MESSAGE_CONTENT.to_vec(),
Expand Down