Skip to content

bitcoind RPC: Make mempool syncing more efficient #465

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 77 additions & 12 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use std::sync::Arc;
pub struct BitcoindRpcClient {
rpc_client: Arc<RpcClient>,
latest_mempool_timestamp: AtomicU64,
mempool_entries_cache: tokio::sync::Mutex<HashMap<Txid, MempoolEntry>>,
mempool_txs_cache: tokio::sync::Mutex<HashMap<Txid, (Transaction, u64)>>,
}

impl BitcoindRpcClient {
Expand All @@ -42,7 +44,9 @@ impl BitcoindRpcClient {

let latest_mempool_timestamp = AtomicU64::new(0);

Self { rpc_client, latest_mempool_timestamp }
let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new());
let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new());
Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
}

pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Expand Down Expand Up @@ -122,23 +126,68 @@ impl BitcoindRpcClient {
.map(|resp| resp.0)
}

pub(crate) async fn get_mempool_entry(&self, txid: Txid) -> std::io::Result<MempoolEntry> {
pub(crate) async fn get_mempool_entry(
&self, txid: Txid,
) -> std::io::Result<Option<MempoolEntry>> {
let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid);
let txid_json = serde_json::json!(txid_hex);
self.rpc_client
match self
.rpc_client
.call_method::<GetMempoolEntryResponse>("getmempoolentry", &[txid_json])
.await
.map(|resp| MempoolEntry { txid, height: resp.height, time: resp.time })
{
Ok(resp) => Ok(Some(MempoolEntry { txid, height: resp.height, time: resp.time })),
Err(e) => match e.into_inner() {
Some(inner) => {
let rpc_error_res: Result<Box<RpcError>, _> = inner.downcast();

match rpc_error_res {
Ok(rpc_error) => {
// Check if it's the 'not found' error code.
if rpc_error.code == -5 {
Ok(None)
} else {
Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error))
}
},
Err(_) => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to process getmempoolentry response",
)),
}
},
None => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Failed to process getmempoolentry response",
)),
},
}
}

pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> {
let mempool_txids = self.get_raw_mempool().await?;
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());

let mut mempool_entries_cache = self.mempool_entries_cache.lock().await;
mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid));

if let Some(difference) = mempool_txids.len().checked_sub(mempool_entries_cache.capacity())
{
mempool_entries_cache.reserve(difference)
}

for txid in mempool_txids {
let entry = self.get_mempool_entry(txid).await?;
mempool_entries.push(entry);
if mempool_entries_cache.contains_key(&txid) {
continue;
}

if let Some(entry) = self.get_mempool_entry(txid).await? {
mempool_entries_cache.insert(txid, entry.clone());
}
}
Ok(mempool_entries)

mempool_entries_cache.shrink_to_fit();

Ok(())
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
Expand All @@ -152,10 +201,20 @@ impl BitcoindRpcClient {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mut latest_time = prev_mempool_time;

let mempool_entries = self.get_mempool_entries().await?;
let mut txs_to_emit = Vec::new();
self.update_mempool_entries_cache().await?;

let mempool_entries_cache = self.mempool_entries_cache.lock().await;
let mut mempool_txs_cache = self.mempool_txs_cache.lock().await;
mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid));

for entry in mempool_entries {
if let Some(difference) =
mempool_entries_cache.len().checked_sub(mempool_txs_cache.capacity())
{
mempool_txs_cache.reserve(difference)
}

let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len());
for (txid, entry) in mempool_entries_cache.iter() {
if entry.time > latest_time {
latest_time = entry.time;
}
Expand All @@ -171,8 +230,14 @@ impl BitcoindRpcClient {
continue;
}

if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) {
txs_to_emit.push((cached_tx.clone(), *cached_time));
continue;
}

match self.get_raw_transaction(&entry.txid).await {
Ok(Some(tx)) => {
mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time));
txs_to_emit.push((tx, entry.time));
},
Ok(None) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the case of having a txid among the mempool entries but not being able to find its full tx hex be handled somehow?

Copy link
Collaborator Author

@tnull tnull Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we handle it by skipping :)

Note that we used to log/error out in some of these races, but its exactly the point to get rid of that behavior in this PR. In fact, turns out you'll regularly run into these races between getrawmempool/getmempoolentry and getmempoolentry/getrawtransaction, since entries are dropped often from the mempool. So we can just skip processing them and will re-detect them if they would reappear in the mempool.

Expand Down
31 changes: 27 additions & 4 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,14 @@ impl ChainSource {
));
}

log_info!(
logger,
"Starting initial synchronization of chain listeners. This might take a while..",
);

loop {
let mut locked_header_cache = header_cache.lock().await;
let now = SystemTime::now();
match synchronize_listeners(
bitcoind_rpc_client.as_ref(),
config.network,
Expand All @@ -329,6 +335,11 @@ impl ChainSource {
{
Ok(chain_tip) => {
{
log_info!(
logger,
"Finished synchronizing listeners in {}ms",
now.elapsed().unwrap().as_millis()
);
*latest_chain_tip.write().unwrap() = Some(chain_tip);
let unix_time_secs_opt = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down Expand Up @@ -374,6 +385,8 @@ impl ChainSource {
fee_rate_update_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

log_info!(logger, "Starting continuous polling for chain updates.");

// Start the polling loop.
loop {
tokio::select! {
Expand Down Expand Up @@ -692,13 +705,15 @@ impl ChainSource {
&mut *locked_header_cache,
&chain_listener,
);
let mut chain_polling_interval =
tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS));
chain_polling_interval
.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let now = SystemTime::now();
match spv_client.poll_best_tip().await {
Ok((ChainTip::Better(tip), true)) => {
log_trace!(
logger,
"Finished polling best tip in {}ms",
now.elapsed().unwrap().as_millis()
);
*latest_chain_tip.write().unwrap() = Some(tip);
},
Ok(_) => {},
Expand All @@ -711,11 +726,19 @@ impl ChainSource {
}

let cur_height = channel_manager.current_best_block().height;

let now = SystemTime::now();
match bitcoind_rpc_client
.get_mempool_transactions_and_timestamp_at_height(cur_height)
.await
{
Ok(unconfirmed_txs) => {
log_trace!(
logger,
"Finished polling mempool of size {} in {}ms",
unconfirmed_txs.len(),
now.elapsed().unwrap().as_millis()
);
let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs);
},
Err(e) => {
Expand Down
Loading