Skip to content

Commit 3d49db7

Browse files
committed
try async kvstore in sweeper
1 parent edd5cd5 commit 3d49db7

File tree

1 file changed

+44
-26
lines changed

1 file changed

+44
-26
lines changed

lightning/src/util/sweep.rs

Lines changed: 44 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -430,24 +430,35 @@ where
430430
return Ok(());
431431
}
432432

433-
let mut state_lock = self.sweeper_state.lock().unwrap();
434-
for descriptor in relevant_descriptors {
435-
let output_info = TrackedSpendableOutput {
436-
descriptor,
437-
channel_id,
438-
status: OutputSpendStatus::PendingInitialBroadcast {
439-
delayed_until_height: delay_until_height,
440-
},
441-
};
442-
443-
let mut outputs = state_lock.persistent.outputs.iter();
444-
if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() {
445-
continue;
433+
let encoded;
434+
let flush_fut;
435+
{
436+
let mut state_lock = self.sweeper_state.lock().unwrap();
437+
for descriptor in relevant_descriptors {
438+
let output_info = TrackedSpendableOutput {
439+
descriptor,
440+
channel_id,
441+
status: OutputSpendStatus::PendingInitialBroadcast {
442+
delayed_until_height: delay_until_height,
443+
},
444+
};
445+
446+
let mut outputs = state_lock.persistent.outputs.iter();
447+
if outputs.find(|o| o.descriptor == output_info.descriptor).is_some() {
448+
continue;
449+
}
450+
451+
state_lock.persistent.outputs.push(output_info);
446452
}
453+
encoded = state_lock.persistent.encode();
447454

448-
state_lock.persistent.outputs.push(output_info);
455+
// Not safe, because not yet persisted...
456+
state_lock.dirty = false;
457+
458+
// Hopefully this fixates the ordering?
459+
flush_fut = self.flush_state(encoded);
449460
}
450-
self.flush_state(&mut state_lock).await.map_err(|e| {
461+
flush_fut.await.map_err(|e| {
451462
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
452463
})
453464
}
@@ -481,9 +492,12 @@ where
481492
// If there is still dirty state, we need to persist it.
482493
let mut sweeper_state = self.sweeper_state.lock().unwrap();
483494
if sweeper_state.dirty {
484-
self.flush_state(&mut sweeper_state).await.map_err(|e| {
485-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
486-
})
495+
// TODO: Move outside lock.
496+
// self.flush_state(&mut sweeper_state).await.map_err(|e| {
497+
// log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
498+
// })
499+
500+
Ok(())
487501
} else {
488502
Ok(())
489503
}
@@ -533,6 +547,8 @@ where
533547
self.change_destination_source.get_change_destination_script().await?;
534548

535549
// Sweep the outputs.
550+
let flush_fut;
551+
let encoded;
536552
{
537553
let mut sweeper_state = self.sweeper_state.lock().unwrap();
538554

@@ -581,13 +597,18 @@ where
581597
output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
582598
}
583599

584-
self.flush_state(&mut sweeper_state).await.map_err(|e| {
585-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
586-
})?;
600+
encoded = sweeper_state.persistent.encode();
601+
sweeper_state.dirty = false;
602+
603+
flush_fut = self.flush_state(encoded);
587604

588605
self.broadcaster.broadcast_transactions(&[&spending_tx]);
589606
}
590607

608+
flush_fut.await.map_err(|e| {
609+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
610+
})?;
611+
591612
Ok(())
592613
}
593614

@@ -614,13 +635,13 @@ where
614635
}
615636

616637
/// Flushes the current state to the persistence layer and marks the state as clean.
617-
async fn flush_state(&self, sweeper_state: &mut SweeperState) -> Result<(), io::Error> {
638+
async fn flush_state(&self, sweeper_state_encoded: Vec<u8>) -> Result<(), io::Error> {
618639
self.kv_store
619640
.write(
620641
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
621642
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
622643
OUTPUT_SWEEPER_PERSISTENCE_KEY,
623-
&sweeper_state.persistent.encode(),
644+
&sweeper_state_encoded,
624645
)
625646
.await
626647
.map_err(|e| {
@@ -634,9 +655,6 @@ where
634655
);
635656
e
636657
})
637-
.map(|_| {
638-
sweeper_state.dirty = false;
639-
})
640658
}
641659

642660
fn spend_outputs(

0 commit comments

Comments
 (0)