Skip to content

Commit 27a8ad2

Browse files
committed
rewrite sync bg processor
1 parent 8aae34e commit 27a8ad2

File tree

3 files changed

+76
-72
lines changed

3 files changed

+76
-72
lines changed

lightning-background-processor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ bitcoin-io = { version = "0.1.2", default-features = false }
2525
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
2626
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
2727
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }
28+
futures = "0.3.31"
2829

2930
[dev-dependencies]
3031
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }

lightning-background-processor/src/lib.rs

Lines changed: 30 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use lightning::util::persist::{KVStore, Persister};
4343
use lightning::util::sweep::OutputSweeper;
4444
#[cfg(feature = "std")]
4545
use lightning::util::sweep::OutputSweeperSync;
46+
use lightning::util::wakers::Sleep;
4647
#[cfg(feature = "std")]
4748
use lightning::util::wakers::Sleeper;
4849
use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -991,7 +992,7 @@ impl BackgroundProcessor {
991992
D: 'static + Deref,
992993
O: 'static + Deref,
993994
K: 'static + Deref,
994-
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
995+
OS: 'static + Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>> + Send,
995996
>(
996997
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
997998
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
@@ -1009,79 +1010,33 @@ impl BackgroundProcessor {
10091010
OM::Target: AOnionMessenger,
10101011
PM::Target: APeerManager,
10111012
LM::Target: ALiquidityManager,
1012-
D::Target: ChangeDestinationSourceSync,
1013+
D::Target: ChangeDestinationSource,
10131014
O::Target: 'static + OutputSpender,
10141015
K::Target: 'static + KVStore,
10151016
{
10161017
let stop_thread = Arc::new(AtomicBool::new(false));
10171018
let stop_thread_clone = stop_thread.clone();
10181019
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
1019-
let event_handler = |event| {
1020-
let network_graph = gossip_sync.network_graph();
1021-
if let Some(network_graph) = network_graph {
1022-
handle_network_graph_update(network_graph, &event)
1023-
}
1024-
if let Some(ref scorer) = scorer {
1025-
use std::time::SystemTime;
1026-
let duration_since_epoch = SystemTime::now()
1027-
.duration_since(SystemTime::UNIX_EPOCH)
1028-
.expect("Time should be sometime after 1970");
1029-
if update_scorer(scorer, &event, duration_since_epoch) {
1030-
log_trace!(logger, "Persisting scorer after update");
1031-
if let Err(e) = persister.persist_scorer(&scorer) {
1032-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
1033-
}
1034-
}
1035-
}
1036-
event_handler.handle_event(event)
1037-
};
1038-
define_run_body!(
1020+
let fut = process_events_async(
10391021
persister,
1022+
|event| async { event_handler.handle_event(event) },
10401023
chain_monitor,
1041-
chain_monitor.process_pending_events(&event_handler),
10421024
channel_manager,
1043-
channel_manager.get_cm().process_pending_events(&event_handler),
10441025
onion_messenger,
1045-
if let Some(om) = &onion_messenger {
1046-
om.get_om().process_pending_events(&event_handler)
1047-
},
1048-
peer_manager,
10491026
gossip_sync,
1050-
{
1051-
if let Some(ref sweeper) = sweeper {
1052-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary();
1053-
}
1054-
},
1027+
peer_manager,
1028+
liquidity_manager,
1029+
sweeper,
10551030
logger,
10561031
scorer,
1057-
stop_thread.load(Ordering::Acquire),
1058-
{
1059-
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
1060-
(Some(om), Some(lm)) => Sleeper::from_four_futures(
1061-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1062-
&chain_monitor.get_update_future(),
1063-
&om.get_om().get_update_future(),
1064-
&lm.get_lm().get_pending_msgs_future(),
1065-
),
1066-
(Some(om), None) => Sleeper::from_three_futures(
1067-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1068-
&chain_monitor.get_update_future(),
1069-
&om.get_om().get_update_future(),
1070-
),
1071-
(None, Some(lm)) => Sleeper::from_three_futures(
1072-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1073-
&chain_monitor.get_update_future(),
1074-
&lm.get_lm().get_pending_msgs_future(),
1075-
),
1076-
(None, None) => Sleeper::from_two_futures(
1077-
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
1078-
&chain_monitor.get_update_future(),
1079-
),
1080-
};
1081-
sleeper.wait_timeout(Duration::from_millis(100));
1032+
move |dur: Duration| {
1033+
let stop_thread_clone = stop_thread.clone();
1034+
1035+
Box::pin(async move {
1036+
Sleep::new(dur).await;
1037+
stop_thread_clone.load(Ordering::Acquire)
1038+
})
10821039
},
1083-
|_| Instant::now(),
1084-
|time: &Instant, dur| time.elapsed().as_secs() > dur,
10851040
false,
10861041
|| {
10871042
use std::time::SystemTime;
@@ -1091,7 +1046,10 @@ impl BackgroundProcessor {
10911046
.expect("Time should be sometime after 1970"),
10921047
)
10931048
},
1094-
)
1049+
);
1050+
1051+
// TODO: Implement simple executor in utils.
1052+
futures::executor::block_on(fut).map_err(Into::into)
10951053
});
10961054
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
10971055
}
@@ -1935,7 +1893,7 @@ mod tests {
19351893
nodes[0].p2p_gossip_sync(),
19361894
nodes[0].peer_manager.clone(),
19371895
Some(Arc::clone(&nodes[0].liquidity_manager)),
1938-
Some(nodes[0].sweeper.clone()),
1896+
Some(nodes[0].sweeper.sweeper_async().clone()),
19391897
nodes[0].logger.clone(),
19401898
Some(nodes[0].scorer.clone()),
19411899
);
@@ -2030,7 +1988,7 @@ mod tests {
20301988
nodes[0].no_gossip_sync(),
20311989
nodes[0].peer_manager.clone(),
20321990
Some(Arc::clone(&nodes[0].liquidity_manager)),
2033-
Some(nodes[0].sweeper.clone()),
1991+
Some(nodes[0].sweeper.sweeper_async().clone()),
20341992
nodes[0].logger.clone(),
20351993
Some(nodes[0].scorer.clone()),
20361994
);
@@ -2074,7 +2032,7 @@ mod tests {
20742032
nodes[0].no_gossip_sync(),
20752033
nodes[0].peer_manager.clone(),
20762034
Some(Arc::clone(&nodes[0].liquidity_manager)),
2077-
Some(nodes[0].sweeper.clone()),
2035+
Some(nodes[0].sweeper.sweeper_async().clone()),
20782036
nodes[0].logger.clone(),
20792037
Some(nodes[0].scorer.clone()),
20802038
);
@@ -2145,7 +2103,7 @@ mod tests {
21452103
nodes[0].p2p_gossip_sync(),
21462104
nodes[0].peer_manager.clone(),
21472105
Some(Arc::clone(&nodes[0].liquidity_manager)),
2148-
Some(nodes[0].sweeper.clone()),
2106+
Some(nodes[0].sweeper.sweeper_async().clone()),
21492107
nodes[0].logger.clone(),
21502108
Some(nodes[0].scorer.clone()),
21512109
);
@@ -2176,7 +2134,7 @@ mod tests {
21762134
nodes[0].no_gossip_sync(),
21772135
nodes[0].peer_manager.clone(),
21782136
Some(Arc::clone(&nodes[0].liquidity_manager)),
2179-
Some(nodes[0].sweeper.clone()),
2137+
Some(nodes[0].sweeper.sweeper_async().clone()),
21802138
nodes[0].logger.clone(),
21812139
Some(nodes[0].scorer.clone()),
21822140
);
@@ -2224,7 +2182,7 @@ mod tests {
22242182
nodes[0].no_gossip_sync(),
22252183
nodes[0].peer_manager.clone(),
22262184
Some(Arc::clone(&nodes[0].liquidity_manager)),
2227-
Some(nodes[0].sweeper.clone()),
2185+
Some(nodes[0].sweeper.sweeper_async().clone()),
22282186
nodes[0].logger.clone(),
22292187
Some(nodes[0].scorer.clone()),
22302188
);
@@ -2288,7 +2246,7 @@ mod tests {
22882246
nodes[0].no_gossip_sync(),
22892247
nodes[0].peer_manager.clone(),
22902248
Some(Arc::clone(&nodes[0].liquidity_manager)),
2291-
Some(nodes[0].sweeper.clone()),
2249+
Some(nodes[0].sweeper.sweeper_async().clone()),
22922250
nodes[0].logger.clone(),
22932251
Some(nodes[0].scorer.clone()),
22942252
);
@@ -2453,7 +2411,7 @@ mod tests {
24532411
nodes[0].no_gossip_sync(),
24542412
nodes[0].peer_manager.clone(),
24552413
Some(Arc::clone(&nodes[0].liquidity_manager)),
2456-
Some(nodes[0].sweeper.clone()),
2414+
Some(nodes[0].sweeper.sweeper_async().clone()),
24572415
nodes[0].logger.clone(),
24582416
Some(nodes[0].scorer.clone()),
24592417
);
@@ -2484,7 +2442,7 @@ mod tests {
24842442
nodes[0].no_gossip_sync(),
24852443
nodes[0].peer_manager.clone(),
24862444
Some(Arc::clone(&nodes[0].liquidity_manager)),
2487-
Some(nodes[0].sweeper.clone()),
2445+
Some(nodes[0].sweeper.sweeper_async().clone()),
24882446
nodes[0].logger.clone(),
24892447
Some(nodes[0].scorer.clone()),
24902448
);
@@ -2581,7 +2539,7 @@ mod tests {
25812539
nodes[0].rapid_gossip_sync(),
25822540
nodes[0].peer_manager.clone(),
25832541
Some(Arc::clone(&nodes[0].liquidity_manager)),
2584-
Some(nodes[0].sweeper.clone()),
2542+
Some(nodes[0].sweeper.sweeper_async().clone()),
25852543
nodes[0].logger.clone(),
25862544
Some(nodes[0].scorer.clone()),
25872545
);
@@ -2778,7 +2736,7 @@ mod tests {
27782736
nodes[0].no_gossip_sync(),
27792737
nodes[0].peer_manager.clone(),
27802738
Some(Arc::clone(&nodes[0].liquidity_manager)),
2781-
Some(nodes[0].sweeper.clone()),
2739+
Some(nodes[0].sweeper.sweeper_async().clone()),
27822740
nodes[0].logger.clone(),
27832741
Some(nodes[0].scorer.clone()),
27842742
);

lightning/src/util/wakers.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ use std::time::Duration;
2828
use core::future::Future as StdFuture;
2929
use core::pin::Pin;
3030
use core::task::{Context, Poll};
31+
use std::{
32+
sync::atomic::{AtomicBool, Ordering},
33+
thread,
34+
};
3135

3236
/// Used to signal to one of many waiters that the condition they're waiting on has happened.
3337
///
@@ -340,6 +344,47 @@ impl Sleeper {
340344
}
341345
}
342346

347+
pub struct Sleep {
348+
is_done: Arc<AtomicBool>,
349+
waker: Arc<Mutex<Option<Waker>>>,
350+
}
351+
352+
impl Sleep {
353+
pub fn new(duration: Duration) -> Self {
354+
let is_done = Arc::new(AtomicBool::new(false));
355+
let waker: Arc<Mutex<Option<Waker>>> = Arc::new(Mutex::new(None));
356+
357+
let is_done_clone = is_done.clone();
358+
let waker_clone = waker.clone();
359+
360+
thread::spawn(move || {
361+
thread::sleep(duration);
362+
is_done_clone.store(true, Ordering::SeqCst);
363+
364+
if let Some(w) = waker_clone.lock().unwrap().take() {
365+
w.wake();
366+
}
367+
});
368+
369+
Self { is_done, waker }
370+
}
371+
}
372+
373+
impl core::future::Future for Sleep {
374+
type Output = ();
375+
376+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
377+
if self.is_done.load(Ordering::SeqCst) {
378+
Poll::Ready(())
379+
} else {
380+
let mut waker_lock = self.waker.lock().unwrap();
381+
// Store latest waker in case the task is moved or re-polled
382+
*waker_lock = Some(cx.waker().clone());
383+
Poll::Pending
384+
}
385+
}
386+
}
387+
343388
#[cfg(test)]
344389
mod tests {
345390
use super::*;

0 commit comments

Comments
 (0)