Skip to content

Commit df2fce6

Browse files
committed
Cache mempool entries and transactions to avoid retransmissions
Previously, we would retransmit entry and transaction data whenever polling for mempool data. Here we introduce simple caches for both, so most data is only retrieved in bulk on the first iteration after startup.
1 parent 11008ee commit df2fce6

File tree

1 file changed

+37
-9
lines changed

1 file changed

+37
-9
lines changed

src/chain/bitcoind_rpc.rs

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ use std::sync::Arc;
3030
pub struct BitcoindRpcClient {
3131
rpc_client: Arc<RpcClient>,
3232
latest_mempool_timestamp: AtomicU64,
33+
mempool_entries_cache: tokio::sync::Mutex<HashMap<Txid, MempoolEntry>>,
34+
mempool_txs_cache: tokio::sync::Mutex<HashMap<Txid, (Transaction, u64)>>,
3335
}
3436

3537
impl BitcoindRpcClient {
@@ -42,7 +44,9 @@ impl BitcoindRpcClient {
4244

4345
let latest_mempool_timestamp = AtomicU64::new(0);
4446

45-
Self { rpc_client, latest_mempool_timestamp }
47+
let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new());
48+
let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new());
49+
Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
4650
}
4751

4852
pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
@@ -160,16 +164,30 @@ impl BitcoindRpcClient {
160164
}
161165
}
162166

163-
pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
167+
pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> {
164168
let mempool_txids = self.get_raw_mempool().await?;
165-
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());
169+
170+
let mut mempool_entries_cache = self.mempool_entries_cache.lock().await;
171+
mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid));
172+
173+
if let Some(difference) = mempool_txids.len().checked_sub(mempool_entries_cache.capacity())
174+
{
175+
mempool_entries_cache.reserve(difference)
176+
}
177+
166178
for txid in mempool_txids {
167-
// Push any entries that haven't been dropped since `getrawmempool`
179+
if mempool_entries_cache.contains_key(&txid) {
180+
continue;
181+
}
182+
168183
if let Some(entry) = self.get_mempool_entry(txid).await? {
169-
mempool_entries.push(entry);
184+
mempool_entries_cache.insert(txid, entry.clone());
170185
}
171186
}
172-
Ok(mempool_entries)
187+
188+
mempool_entries_cache.shrink_to_fit();
189+
190+
Ok(())
173191
}
174192

175193
/// Get mempool transactions, alongside their first-seen unix timestamps.
@@ -183,10 +201,14 @@ impl BitcoindRpcClient {
183201
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
184202
let mut latest_time = prev_mempool_time;
185203

186-
let mempool_entries = self.get_mempool_entries().await?;
187-
let mut txs_to_emit = Vec::new();
204+
self.update_mempool_entries_cache().await?;
205+
206+
let mempool_entries_cache = self.mempool_entries_cache.lock().await;
207+
let mut mempool_txs_cache = self.mempool_txs_cache.lock().await;
208+
mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid));
188209

189-
for entry in mempool_entries {
210+
let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len());
211+
for (txid, entry) in mempool_entries_cache.iter() {
190212
if entry.time > latest_time {
191213
latest_time = entry.time;
192214
}
@@ -202,8 +224,14 @@ impl BitcoindRpcClient {
202224
continue;
203225
}
204226

227+
if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) {
228+
txs_to_emit.push((cached_tx.clone(), *cached_time));
229+
continue;
230+
}
231+
205232
match self.get_raw_transaction(&entry.txid).await {
206233
Ok(Some(tx)) => {
234+
mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time));
207235
txs_to_emit.push((tx, entry.time));
208236
},
209237
Ok(None) => {

0 commit comments

Comments
 (0)