Skip to content

Commit edd5cd5

Browse files
committed
Move persist into async part of the sweeper
Prepares for making the kv store async. Otherwise it might be necessary to use block_on in the sweeper. For block_on, a runtime would be needed.
1 parent 27b7347 commit edd5cd5

File tree

1 file changed

+38
-27
lines changed

1 file changed

+38
-27
lines changed

lightning/src/util/sweep.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,10 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state =
386-
Mutex::new(SweeperState { persistent: PersistentSweeperState { outputs, best_block } });
385+
let sweeper_state = Mutex::new(SweeperState {
386+
persistent: PersistentSweeperState { outputs, best_block },
387+
dirty: false,
388+
});
387389
Self {
388390
sweeper_state,
389391
pending_sweep: AtomicBool::new(false),
@@ -445,7 +447,7 @@ where
445447

446448
state_lock.persistent.outputs.push(output_info);
447449
}
448-
self.persist_state(&*state_lock).await.map_err(|e| {
450+
self.flush_state(&mut state_lock).await.map_err(|e| {
449451
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
450452
})
451453
}
@@ -473,7 +475,19 @@ where
473475
return Ok(());
474476
}
475477

476-
let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;
478+
let result = {
479+
self.regenerate_and_broadcast_spend_if_necessary_internal().await?;
480+
481+
// If there is still dirty state, we need to persist it.
482+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
483+
if sweeper_state.dirty {
484+
self.flush_state(&mut sweeper_state).await.map_err(|e| {
485+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
486+
})
487+
} else {
488+
Ok(())
489+
}
490+
};
477491

478492
// Release the pending sweep flag again, regardless of result.
479493
self.pending_sweep.store(false, Ordering::Release);
@@ -567,7 +581,7 @@ where
567581
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
568582
}
569583

570-
self.persist_state(&sweeper_state).await.map_err(|e| {
584+
self.flush_state(&mut sweeper_state).await.map_err(|e| {
571585
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
572586
})?;
573587

@@ -595,9 +609,12 @@ where
595609
}
596610
true
597611
});
612+
613+
sweeper_state.dirty = true;
598614
}
599615

600-
async fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
616+
/// Flushes the current state to the persistence layer and marks the state as clean.
617+
async fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
601618
self.kv_store
602619
.write(
603620
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
@@ -617,6 +634,9 @@ where
617634
);
618635
e
619636
})
637+
.map(|_| {
638+
sweeper_state.dirty = false;
639+
})
620640
}
621641

622642
fn spend_outputs(
@@ -649,13 +669,17 @@ where
649669
}
650670
}
651671
}
672+
673+
sweeper_state.dirty = true;
652674
}
653675

654676
fn best_block_updated_internal(
655677
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
656678
) {
657679
sweeper_state.persistent.best_block = BestBlock::new(header.block_hash(), height);
658680
self.prune_confirmed_outputs(sweeper_state);
681+
682+
sweeper_state.dirty = true;
659683
}
660684
}
661685

@@ -679,12 +703,8 @@ where
679703
assert_eq!(state_lock.persistent.best_block.height, height - 1,
680704
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
681705

682-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
683-
self.best_block_updated_internal(&mut *state_lock, header, height);
684-
685-
// let _ = self.persist_state(&*state_lock).map_err(|e| {
686-
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
687-
// });
706+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
707+
self.best_block_updated_internal(&mut state_lock, header, height);
688708
}
689709

690710
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -706,9 +726,7 @@ where
706726
}
707727
}
708728

709-
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
710-
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
711-
// });
729+
state_lock.dirty = true;
712730
}
713731
}
714732

@@ -728,9 +746,6 @@ where
728746
) {
729747
let mut state_lock = self.sweeper_state.lock().unwrap();
730748
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
731-
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
732-
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
733-
// });
734749
}
735750

736751
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -753,18 +768,13 @@ where
753768
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
754769
.for_each(|o| o.status.unconfirmed());
755770

756-
// self.persist_state(&*state_lock).unwrap_or_else(|e| {
757-
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
758-
// });
771+
state_lock.dirty = true;
759772
}
760773
}
761774

762775
fn best_block_updated(&self, header: &Header, height: u32) {
763776
let mut state_lock = self.sweeper_state.lock().unwrap();
764-
self.best_block_updated_internal(&mut *state_lock, header, height);
765-
// let _ = self.persist_state(&*state_lock).map_err(|e| {
766-
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
767-
// });
777+
self.best_block_updated_internal(&mut state_lock, header, height);
768778
}
769779

770780
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -793,6 +803,7 @@ where
793803
#[derive(Debug)]
794804
struct SweeperState {
795805
persistent: PersistentSweeperState,
806+
dirty: bool,
796807
}
797808

798809
#[derive(Debug, Clone)]
@@ -857,7 +868,7 @@ where
857868
}
858869
}
859870

860-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
871+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
861872
Ok(Self {
862873
sweeper_state,
863874
pending_sweep: AtomicBool::new(false),
@@ -906,7 +917,7 @@ where
906917
}
907918
}
908919

909-
let sweeper_state = Mutex::new(SweeperState { persistent: state });
920+
let sweeper_state = Mutex::new(SweeperState { persistent: state, dirty: false });
910921
Ok((
911922
best_block,
912923
OutputSweeper {

0 commit comments

Comments
 (0)