From 5ff4051d9361d1699be383edcc71d371896e9552 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 31 Dec 2025 15:03:06 +0000 Subject: [PATCH 1/5] Simplify `Sleeper` init in sync `lightning-background-processor` Rather than `match`ing on on several optional objects (with another one to come in a future commit), build an iterator over the futures using the fact that an `Option` is an iterator. --- lightning-background-processor/src/lib.rs | 33 +++++---------- lightning/src/util/wakers.rs | 50 ++++++----------------- 2 files changed, 23 insertions(+), 60 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index aae738ab1c1..36b563f5925 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1635,28 +1635,17 @@ impl BackgroundProcessor { log_trace!(logger, "Terminating background processor."); break; } - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_or_needs_persist_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_or_needs_persist_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; + let om_fut = onion_messenger.as_ref().map(|om| om.get_om().get_update_future()); + let lm_fut = liquidity_manager + .as_ref() + .map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future()); + let always_futures = [ + channel_manager.get_cm().get_event_or_persistence_needed_future(), + chain_monitor.get_update_future(), + ]; + let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut); + let sleeper = Sleeper::from_futures(futures); + let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { batch_delay.get() } else { diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index a84d90960d8..17edadfd822 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -253,37 +253,13 @@ impl Sleeper { pub fn from_single_future(future: &Future) -> Self { Self { notifiers: vec![Arc::clone(&future.state)] } } - /// Constructs a new sleeper from two futures, allowing blocking on both at once. - pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self { - Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] } - } - /// Constructs a new sleeper from three futures, allowing blocking on all three at once. - /// - // Note that this is the common case - a ChannelManager, a ChainMonitor, and an - // OnionMessenger. - pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self { - let notifiers = - vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state), Arc::clone(&fut_c.state)]; - Self { notifiers } - } - /// Constructs a new sleeper from four futures, allowing blocking on all four at once. - /// - // Note that this is another common case - a ChannelManager, a ChainMonitor, an - // OnionMessenger, and a LiquidityManager. - pub fn from_four_futures( - fut_a: &Future, fut_b: &Future, fut_c: &Future, fut_d: &Future, - ) -> Self { - let notifiers = vec![ - Arc::clone(&fut_a.state), - Arc::clone(&fut_b.state), - Arc::clone(&fut_c.state), - Arc::clone(&fut_d.state), - ]; - Self { notifiers } + /// Constructs an iterator of futures, allowing blocking on all at once. + pub fn from_futures>(futures: I) -> Self { + Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() } } /// Constructs a new sleeper on many futures, allowing blocking on all at once. pub fn new(futures: Vec) -> Self { - Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() } + Self::from_futures(futures) } /// Prepares to go into a wait loop body, creating a condition variable which we can block on /// and an `Arc>>` which gets set to the waking `Future`'s state prior to the @@ -506,15 +482,13 @@ mod tests { // Wait on the other thread to finish its sleep, note that the leak only happened if we // actually have to sleep here, not if we immediately return. - Sleeper::from_two_futures(&future_a, &future_b).wait(); + Sleeper::from_futures([future_a, future_b]).wait(); join_handle.join().unwrap(); // then drop the notifiers and make sure the future states are gone. mem::drop(notifier_a); mem::drop(notifier_b); - mem::drop(future_a); - mem::drop(future_b); assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none()); } @@ -736,18 +710,18 @@ mod tests { // Set both notifiers as woken without sleeping yet. notifier_a.notify(); notifier_b.notify(); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // One future has woken us up, but the other should still have a pending notification. - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // However once we've slept twice, we should no longer have any pending notifications - assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()) + assert!(!Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]) .wait_timeout(Duration::from_millis(10))); // Test ordering somewhat more. notifier_a.notify(); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); } #[test] @@ -765,7 +739,7 @@ mod tests { // After sleeping one future (not guaranteed which one, however) will have its notification // bit cleared. - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // By registering a callback on the futures for both notifiers, one will complete // immediately, but one will remain tied to the notifier, and will complete once the @@ -788,8 +762,8 @@ mod tests { notifier_b.notify(); assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst)); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); - assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()) + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); + assert!(!Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]) .wait_timeout(Duration::from_millis(10))); } From efaadf57f4f550245152e5bf7426c00de631331f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 31 Dec 2025 02:08:22 +0000 Subject: [PATCH 2/5] Pass a new `Notifier` through to `UtxoFuture`s `P2PGossipSync` is a rather poor design. It currently basically requires two circular `Arc` references, leaving `NetworkGraph`s to leak if LDK is un-loaded: * `P2PGossipSync` owns/holds a reference to the `GossipVerifier` and `GossipVerifier` holds an `Arc` to the `P2PGossipSync` and * `PeerManager` holds a reference to the `P2PGossipSync` (as the gossip message handler) which owns/holds a reference to the `GossipVerifier`, which has a `Deref` (likely an `Arc` in practice) to the `PeerManager`. Instead, we should move towards the same design we have elsewhere - hold a `Notifier` and expose waiting on it to the background processor then poll for completion from there (in this case, as in others by checking for completion when handling `get_and_clear_pending_msg_events` calls). Here we take the first step towards this, adding a shared `Notifier` to `PendingChecks` and piping it through to `UtxoFuture`s so that they can be simply resolved and wake the background processor (once it waits on the new `Notifier`). --- fuzz/src/router.rs | 9 +-- lightning-block-sync/src/gossip.rs | 7 ++- lightning/src/routing/utxo.rs | 91 +++++++++++++++++++++--------- lightning/src/util/test_utils.rs | 3 +- 4 files changed, 75 insertions(+), 35 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index af29a0221a9..e6508d06d0e 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -31,6 +31,7 @@ use lightning::types::features::{BlindedHopFeatures, Bolt12InvoiceFeatures}; use lightning::util::config::UserConfig; use lightning::util::hash_tables::*; use lightning::util::ser::LengthReadable; +use lightning::util::wakers::Notifier; use bitcoin::hashes::Hash; use bitcoin::network::Network; @@ -93,7 +94,7 @@ struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } impl UtxoLookup for FuzzChainSource<'_, '_, Out> { - fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { + fn get_utxo(&self, _chain_hash: &ChainHash, _scid: u64, notifier: Arc) -> UtxoResult { let input_slice = self.input.get_slice(2); if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); @@ -107,17 +108,17 @@ impl UtxoLookup for FuzzChainSource<'_, '_, Out> { &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)), &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), &[2, _] => { - let future = UtxoFuture::new(); + let future = UtxoFuture::new(notifier); future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); UtxoResult::Async(future.clone()) }, &[3, _] => { - let future = UtxoFuture::new(); + let future = UtxoFuture::new(notifier); future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); UtxoResult::Async(future.clone()) }, &[4, _] => { - UtxoResult::Async(UtxoFuture::new()) // the future will never resolve + UtxoResult::Async(UtxoFuture::new(notifier)) // the future will never resolve }, &[..] => UtxoResult::Sync(Ok(txo_res)), } diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 596098350c7..2c5dadf57e1 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -14,6 +14,7 @@ use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; use lightning::util::logger::Logger; use lightning::util::native_async::FutureSpawner; +use lightning::util::wakers::Notifier; use std::collections::VecDeque; use std::future::Future; @@ -273,15 +274,15 @@ where Blocks::Target: UtxoSource, L::Target: Logger, { - fn get_utxo(&self, _chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { - let res = UtxoFuture::new(); + fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc) -> UtxoResult { + let res = UtxoFuture::new(notifier); let fut = res.clone(); let source = self.source.clone(); let gossiper = Arc::clone(&self.gossiper); let block_cache = Arc::clone(&self.block_cache); let pmw = Arc::clone(&self.peer_manager_wake); self.spawn.spawn(async move { - let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await; + let res = Self::retrieve_utxo(source, block_cache, scid).await; fut.resolve(gossiper.network_graph(), &*gossiper, res); (pmw)(); }); diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 4299dffb90f..8e0dc113817 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -23,6 +23,7 @@ use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, ErrorAction, LightningError, MessageSendEvent}; use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use crate::util::logger::{Level, Logger}; +use crate::util::wakers::Notifier; use crate::prelude::*; @@ -64,8 +65,14 @@ pub trait UtxoLookup { /// Returns an error if `chain_hash` is for a different chain or if such a transaction output is /// unknown. /// + /// An `async_completion_notifier` is provided which should be [`Notifier::notify`]ed upon + /// resolution of the [`UtxoFuture`] in case this method returns [`UtxoResult::Async`]. + /// /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult; + fn get_utxo( + &self, chain_hash: &ChainHash, short_channel_id: u64, + async_completion_notifier: Arc, + ) -> UtxoResult; } enum ChannelAnnouncement { @@ -108,6 +115,7 @@ impl ChannelUpdate { } struct UtxoMessages { + notifier: Arc, complete: Option>, channel_announce: Option, latest_node_announce_a: Option, @@ -128,23 +136,25 @@ pub struct UtxoFuture { /// once we have a concrete resolution of a request. pub(crate) struct UtxoResolver(Result); impl UtxoLookup for UtxoResolver { - fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { + fn get_utxo(&self, _hash: &ChainHash, _scid: u64, _notifier: Arc) -> UtxoResult { UtxoResult::Sync(self.0.clone()) } } impl UtxoFuture { /// Builds a new future for later resolution. - #[rustfmt::skip] - pub fn new() -> Self { - Self { state: Arc::new(Mutex::new(UtxoMessages { - complete: None, - channel_announce: None, - latest_node_announce_a: None, - latest_node_announce_b: None, - latest_channel_update_a: None, - latest_channel_update_b: None, - }))} + pub fn new(notifier: Arc) -> Self { + Self { + state: Arc::new(Mutex::new(UtxoMessages { + notifier, + complete: None, + channel_announce: None, + latest_node_announce_a: None, + latest_node_announce_b: None, + latest_channel_update_a: None, + latest_channel_update_b: None, + })), + } } /// Resolves this future against the given `graph` and with the given `result`. @@ -202,6 +212,7 @@ impl UtxoFuture { let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); + async_messages.notifier.notify(); if async_messages.channel_announce.is_none() { // We raced returning to `check_channel_announcement` which hasn't updated @@ -321,14 +332,18 @@ impl PendingChecksContext { /// A set of messages which are pending UTXO lookups for processing. pub(super) struct PendingChecks { internal: Mutex, + pub(super) completion_notifier: Arc, } impl PendingChecks { - #[rustfmt::skip] pub(super) fn new() -> Self { - PendingChecks { internal: Mutex::new(PendingChecksContext { - channels: new_hash_map(), nodes: new_hash_map(), - }) } + PendingChecks { + internal: Mutex::new(PendingChecksContext { + channels: new_hash_map(), + nodes: new_hash_map(), + }), + completion_notifier: Arc::new(Notifier::new()), + } } /// Checks if there is a pending `channel_update` UTXO validation for the given channel, @@ -519,7 +534,8 @@ impl PendingChecks { Ok(None) }, &Some(ref utxo_lookup) => { - match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + let notifier = Arc::clone(&self.completion_notifier); + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id, notifier) { UtxoResult::Sync(res) => handle_result(res), UtxoResult::Async(future) => { let mut pending_checks = self.internal.lock().unwrap(); @@ -636,9 +652,11 @@ mod tests { // `get_utxo` call can read it still resolve properly. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); @@ -652,7 +670,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, node_b_announce, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -662,6 +681,7 @@ mod tests { future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); + assert!(notifier.notify_pending()); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); @@ -681,7 +701,8 @@ mod tests { // Test an async lookup which returns an incorrect script let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -691,6 +712,7 @@ mod tests { future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() })); + assert!(notifier.notify_pending()); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -700,7 +722,8 @@ mod tests { // Test an async lookup which returns an error let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -709,6 +732,7 @@ mod tests { assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(notifier.notify_pending()); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -720,7 +744,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -740,8 +765,10 @@ mod tests { assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, "Awaiting channel_announcement validation to accept channel_update"); + assert!(!notifier.notify_pending()); future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); assert!(network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); @@ -762,7 +789,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, _, _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -777,8 +805,10 @@ mod tests { assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, "Awaiting channel_announcement validation to accept channel_update"); + assert!(!notifier.notify_pending()); future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); let graph_lock = network_graph.read_only(); @@ -797,7 +827,8 @@ mod tests { // only if the channel_announcement message is identical. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier_a = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier_a)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -806,7 +837,8 @@ mod tests { assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); // If we make a second request with the same message, the call count doesn't increase... - let future_b = UtxoFuture::new(); + let notifier_b = Arc::new(Notifier::new()); + let future_b = UtxoFuture::new(Arc::clone(¬ifier_b)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); assert_eq!( network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, @@ -827,6 +859,8 @@ mod tests { // Still, if we resolve the original future, the original channel will be accepted. future.resolve_without_forwarding(&network_graph, Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier_a.notify_pending()); + assert!(!notifier_b.notify_pending()); assert!(!network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap() .announcement_message.as_ref().unwrap() @@ -842,7 +876,8 @@ mod tests { let (chain_source, network_graph) = get_network(); // We cheat and use a single future for all the lookups to complete them all at once. - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); @@ -862,6 +897,7 @@ mod tests { // Once the future completes the "too many checks" flag should reset. future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(notifier.notify_pending()); assert!(!network_graph.pending_checks.too_many_checks_pending()); } @@ -874,7 +910,8 @@ mod tests { let (chain_source, network_graph) = get_network(); // We cheat and use a single future for all the lookups to complete them all at once. - *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + let notifier = Arc::new(Notifier::new()); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new(notifier)); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 50514e0a894..34f5d5fe36e 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -61,6 +61,7 @@ use crate::util::mut_global::MutGlobal; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; +use crate::util::wakers::Notifier; use bitcoin::amount::Amount; use bitcoin::block::Block; @@ -2101,7 +2102,7 @@ impl TestChainSource { } impl UtxoLookup for TestChainSource { - fn get_utxo(&self, chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { + fn get_utxo(&self, chain_hash: &ChainHash, _scid: u64, _notifier: Arc) -> UtxoResult { self.get_utxo_call_count.fetch_add(1, Ordering::Relaxed); if self.chain_hash != *chain_hash { return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); From 91a16c53ca6fd3a02171b7f05f64df8de4130adb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 31 Dec 2025 11:39:45 +0000 Subject: [PATCH 3/5] Poll for resolved `UtxoFuture`s rather than resolving on the graph `P2PGossipSync` is a rather poor design. It currently basically requires two circular `Arc` references, leaving `NetworkGraph`s to leak if LDK is un-loaded: * `P2PGossipSync` owns/holds a reference to the `GossipVerifier` and `GossipVerifier` holds an `Arc` to the `P2PGossipSync` and * `PeerManager` holds a reference to the `P2PGossipSync` (as the gossip message handler) which owns/holds a reference to the `GossipVerifier`, which has a `Deref` (likely an `Arc` in practice) to the `PeerManager`. Instead, we should move towards the same design we have elsewhere - hold a `Notifier` and expose waiting on it to the background processor then poll for completion from there (in this case, as in others by checking for completion when handling `get_and_clear_pending_msg_events` calls). Here we do the bulk of this work, moving `UtxoFuture` resolution to a simple function that signals the `Notifier` and stores the result. We then poll to convert the result into forwarded messages in `P2PGossipSync::get_and_clear_pending_message_events`. Note that we still rely on manual wakeups from the gossip validator, but that will be fixed in the next commit. --- fuzz/src/router.rs | 15 +- lightning-block-sync/src/gossip.rs | 2 +- lightning/src/routing/gossip.rs | 66 +++--- lightning/src/routing/utxo.rs | 341 ++++++++++++++--------------- 4 files changed, 204 insertions(+), 220 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index e6508d06d0e..2e5b15fc7f4 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -89,11 +89,10 @@ impl InputData { } } -struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { +struct FuzzChainSource { input: Arc, - net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } -impl UtxoLookup for FuzzChainSource<'_, '_, Out> { +impl UtxoLookup for FuzzChainSource { fn get_utxo(&self, _chain_hash: &ChainHash, _scid: u64, notifier: Arc) -> UtxoResult { let input_slice = self.input.get_slice(2); if input_slice.is_none() { @@ -109,12 +108,12 @@ impl UtxoLookup for FuzzChainSource<'_, '_, Out> { &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), &[2, _] => { let future = UtxoFuture::new(notifier); - future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); + future.resolve(Ok(txo_res)); UtxoResult::Async(future.clone()) }, &[3, _] => { let future = UtxoFuture::new(notifier); - future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve(Err(UtxoLookupError::UnknownTx)); UtxoResult::Async(future.clone()) }, &[4, _] => { @@ -198,7 +197,7 @@ pub fn do_test(data: &[u8], out: Out) { let our_pubkey = get_pubkey!(); let net_graph = NetworkGraph::new(Network::Bitcoin, &logger); - let chain_source = FuzzChainSource { input: Arc::clone(&input), net_graph: &net_graph }; + let chain_source = FuzzChainSource { input: Arc::clone(&input) }; let mut node_pks = new_hash_map(); let mut scid = 42; @@ -336,9 +335,7 @@ pub fn do_test(data: &[u8], out: Out) { node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1), ()); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2), ()); let _ = net_graph - .update_channel_from_unsigned_announcement::<&FuzzChainSource<'_, '_, Out>>( - &msg, &None, - ); + .update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None); }, 2 => { let msg = diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 2c5dadf57e1..63045b6cd92 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -283,7 +283,7 @@ where let pmw = Arc::clone(&self.peer_manager_wake); self.spawn.spawn(async move { let res = Self::retrieve_utxo(source, block_cache, scid).await; - fut.resolve(gossiper.network_graph(), &*gossiper, res); + fut.resolve(res); (pmw)(); }); UtxoResult::Async(res) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index ae317ad1ac3..46ca3322ae7 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -378,39 +378,42 @@ where } } - /// Used to broadcast forward gossip messages which were validated async. - /// - /// Note that this will ignore events other than `Broadcast*` or messages with too much excess - /// data. - pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) { - match &mut ev { - MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { - return; - } - if update_msg.as_ref().map(|msg| msg.contents.excess_data.len()).unwrap_or(0) - > MAX_EXCESS_BYTES_FOR_RELAY - { - *update_msg = None; - } - }, - MessageSendEvent::BroadcastChannelUpdate { msg, .. } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { - return; - } - }, - MessageSendEvent::BroadcastNodeAnnouncement { msg } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY - || msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY - || msg.contents.excess_data.len() + msg.contents.excess_address_data.len() + /// Walks the list of pending UTXO validations and removes completed ones, adding any messages + /// we should forward as a result to [`Self::pending_events`]. + fn process_completed_checks(&self) { + let msgs = self.network_graph.pending_checks.check_resolved_futures(&*self.network_graph); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.reserve(msgs.len()); + for mut message in msgs { + match &mut message { + MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { + continue; + } + if update_msg.as_ref().map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY - { - return; - } - }, - _ => return, + { + *update_msg = None; + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg, .. } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { + continue; + } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + || msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + || msg.contents.excess_data.len() + msg.contents.excess_address_data.len() + > MAX_EXCESS_BYTES_FOR_RELAY + { + continue; + } + }, + _ => continue, + } + pending_events.push(message); } - self.pending_events.lock().unwrap().push(ev); } } @@ -884,6 +887,7 @@ where } fn get_and_clear_pending_msg_events(&self) -> Vec { + self.process_completed_checks(); let mut ret = Vec::new(); let mut pending_events = self.pending_events.lock().unwrap(); core::mem::swap(&mut ret, &mut pending_events); diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 8e0dc113817..f46160f1f14 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -21,7 +21,7 @@ use bitcoin::hex::DisplayHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, ErrorAction, LightningError, MessageSendEvent}; -use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::routing::gossip::{NetworkGraph, NodeId}; use crate::util::logger::{Level, Logger}; use crate::util::wakers::Notifier; @@ -157,148 +157,11 @@ impl UtxoFuture { } } - /// Resolves this future against the given `graph` and with the given `result`. - /// - /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling - /// forwarding the validated gossip message onwards to peers. - /// - /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order - /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] - /// after this. - /// - /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high - /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - pub fn resolve_without_forwarding( - &self, graph: &NetworkGraph, result: Result, - ) where - L::Target: Logger, - { - self.do_resolve(graph, result); - } - - /// Resolves this future against the given `graph` and with the given `result`. - /// - /// The given `gossip` is used to broadcast any validated messages onwards to all peers which - /// have available buffer space. - /// - /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order - /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] - /// after this. - /// - /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high - /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - pub fn resolve< - L: Deref, - G: Deref>, - U: Deref, - GS: Deref>, - >( - &self, graph: &NetworkGraph, gossip: GS, result: Result, - ) where - L::Target: Logger, - U::Target: UtxoLookup, - { - let mut res = self.do_resolve(graph, result); - for msg_opt in res.iter_mut() { - if let Some(msg) = msg_opt.take() { - gossip.forward_gossip_msg(msg); - } - } - } - - #[rustfmt::skip] - fn do_resolve(&self, graph: &NetworkGraph, result: Result) - -> [Option; 5] where L::Target: Logger { - let (announcement, node_a, node_b, update_a, update_b) = { - let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); - let mut async_messages = self.state.lock().unwrap(); - async_messages.notifier.notify(); - - if async_messages.channel_announce.is_none() { - // We raced returning to `check_channel_announcement` which hasn't updated - // `channel_announce` yet. That's okay, we can set the `complete` field which it will - // check once it gets control again. - async_messages.complete = Some(result); - return [None, None, None, None, None]; - } - - let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { - ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, - ChannelAnnouncement::Unsigned(msg) => &msg, - }; - - pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); - - (async_messages.channel_announce.take().unwrap(), - async_messages.latest_node_announce_a.take(), - async_messages.latest_node_announce_b.take(), - async_messages.latest_channel_update_a.take(), - async_messages.latest_channel_update_b.take()) - }; - - let mut res = [None, None, None, None, None]; - let mut res_idx = 0; - - // Now that we've updated our internal state, pass the pending messages back through the - // network graph with a different `UtxoLookup` which will resolve immediately. - // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do - // with them. - let resolver = UtxoResolver(result); - let (node_id_1, node_id_2) = match &announcement { - ChannelAnnouncement::Full(signed_msg) => (signed_msg.contents.node_id_1, signed_msg.contents.node_id_2), - ChannelAnnouncement::Unsigned(msg) => (msg.node_id_1, msg.node_id_2), - }; - match announcement { - ChannelAnnouncement::Full(signed_msg) => { - if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { - msg: signed_msg, update_msg: None, - }); - res_idx += 1; - } - }, - ChannelAnnouncement::Unsigned(msg) => { - let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); - }, - } - - for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { - match announce { - Some(NodeAnnouncement::Full(signed_msg)) => { - if graph.update_node_from_announcement(&signed_msg).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { - msg: signed_msg, - }); - res_idx += 1; - } - }, - Some(NodeAnnouncement::Unsigned(msg)) => { - let _ = graph.update_node_from_unsigned_announcement(&msg); - }, - None => {}, - } - } - - for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { - match update { - Some(ChannelUpdate::Full(signed_msg)) => { - if graph.update_channel(&signed_msg).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { - msg: signed_msg, - node_id_1, - node_id_2, - }); - res_idx += 1; - } - }, - Some(ChannelUpdate::Unsigned(msg)) => { - let _ = graph.update_channel_unsigned(&msg); - }, - None => {}, - } - } - - res + /// Resolves this future with the given result. + pub fn resolve(&self, result: Result) { + let mut state = self.state.lock().unwrap(); + state.complete = Some(result); + state.notifier.notify(); } } @@ -307,28 +170,6 @@ struct PendingChecksContext { nodes: HashMap>>>, } -impl PendingChecksContext { - #[rustfmt::skip] - fn lookup_completed(&mut self, - msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak> - ) { - if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) { - if Weak::ptr_eq(e.get(), &completed_state) { - e.remove(); - } - } - - if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) { - e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); - if e.get().is_empty() { e.remove(); } - } - if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) { - e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); - if e.get().is_empty() { e.remove(); } - } - } -} - /// A set of messages which are pending UTXO lookups for processing. pub(super) struct PendingChecks { internal: Mutex, @@ -597,6 +438,142 @@ impl PendingChecks { false } } + + fn resolve_single_future( + &self, graph: &NetworkGraph, entry: Arc>, + new_messages: &mut Vec, + ) where + L::Target: Logger, + { + let (announcement, result, announce_a, announce_b, update_a, update_b); + { + let mut state = entry.lock().unwrap(); + announcement = if let Some(announcement) = state.channel_announce.take() { + announcement + } else { + // We raced returning to `check_channel_announcement` which hasn't updated + // `channel_announce` yet. That's okay, we can set the `complete` field which it will + // check once it gets control again. + return; + }; + + result = if let Some(result) = state.complete.take() { + result + } else { + debug_assert!(false, "Future should have been resolved"); + return; + }; + + announce_a = state.latest_node_announce_a.take(); + announce_b = state.latest_node_announce_b.take(); + update_a = state.latest_channel_update_a.take(); + update_b = state.latest_channel_update_b.take(); + } + + // Now that we've updated our internal state, pass the pending messages back through the + // network graph with a different `UtxoLookup` which will resolve immediately. + // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do + // with them. + let resolver = UtxoResolver(result); + let (node_id_1, node_id_2) = match &announcement { + ChannelAnnouncement::Full(signed_msg) => { + (signed_msg.contents.node_id_1, signed_msg.contents.node_id_2) + }, + ChannelAnnouncement::Unsigned(msg) => (msg.node_id_1, msg.node_id_2), + }; + match announcement { + ChannelAnnouncement::Full(signed_msg) => { + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + new_messages.push(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, + update_msg: None, + }); + } + }, + ChannelAnnouncement::Unsigned(msg) => { + let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); + }, + } + + for announce in [announce_a, announce_b] { + match announce { + Some(NodeAnnouncement::Full(signed_msg)) => { + if graph.update_node_from_announcement(&signed_msg).is_ok() { + new_messages + .push(MessageSendEvent::BroadcastNodeAnnouncement { msg: signed_msg }); + } + }, + Some(NodeAnnouncement::Unsigned(msg)) => { + let _ = graph.update_node_from_unsigned_announcement(&msg); + }, + None => {}, + } + } + + for update in [update_a, update_b] { + match update { + Some(ChannelUpdate::Full(signed_msg)) => { + if graph.update_channel(&signed_msg).is_ok() { + new_messages.push(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + node_id_1, + node_id_2, + }); + } + }, + Some(ChannelUpdate::Unsigned(msg)) => { + let _ = graph.update_channel_unsigned(&msg); + }, + None => {}, + } + } + } + + pub(super) fn check_resolved_futures( + &self, graph: &NetworkGraph, + ) -> Vec + where + L::Target: Logger, + { + let mut completed_states = Vec::new(); + { + let mut lck = self.internal.lock().unwrap(); + lck.channels.retain(|_, state| { + if let Some(state) = state.upgrade() { + if state.lock().unwrap().complete.is_some() { + completed_states.push(state); + false + } else { + true + } + } else { + // The UtxoFuture has been dropped, drop the pending-lookup state. + false + } + }); + lck.nodes.retain(|_, lookups| { + lookups.retain(|state| { + if let Some(state) = state.upgrade() { + if state.lock().unwrap().complete.is_some() { + completed_states.push(state); + false + } else { + true + } + } else { + // The UtxoFuture has been dropped, drop the pending-lookup state. + false + } + }); + !lookups.is_empty() + }); + } + let mut res = Vec::with_capacity(completed_states.len() * 5); + for state in completed_states { + self.resolve_single_future(graph, state, &mut res); + } + res + } } #[cfg(test)] @@ -654,9 +631,10 @@ mod tests { let notifier = Arc::new(Notifier::new()); let future = UtxoFuture::new(Arc::clone(¬ifier)); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); @@ -679,9 +657,9 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); + future.resolve(Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); @@ -710,9 +688,10 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() })); + let value = Amount::from_sat(1_000_000); + future.resolve(Ok(TxOut { value, script_pubkey: bitcoin::ScriptBuf::new() })); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -731,8 +710,9 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve(Err(UtxoLookupError::UnknownTx)); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -766,9 +746,10 @@ mod tests { "Awaiting channel_announcement validation to accept channel_update"); assert!(!notifier.notify_pending()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); @@ -806,9 +787,9 @@ mod tests { "Awaiting channel_announcement validation to accept channel_update"); assert!(!notifier.notify_pending()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future.resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); let graph_lock = network_graph.read_only(); @@ -857,10 +838,11 @@ mod tests { assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); // Still, if we resolve the original future, the original channel will be accepted. - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); assert!(notifier_a.notify_pending()); assert!(!notifier_b.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(!network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap() .announcement_message.as_ref().unwrap() @@ -896,8 +878,9 @@ mod tests { assert!(network_graph.pending_checks.too_many_checks_pending()); // Once the future completes the "too many checks" flag should reset. - future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve(Err(UtxoLookupError::UnknownTx)); assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(!network_graph.pending_checks.too_many_checks_pending()); } From ad78799b881b3007d3d57b4434bdf2468164ceb6 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 31 Dec 2025 20:26:29 +0000 Subject: [PATCH 4/5] Move to awaiting gossip validation in the background processor `P2PGossipSync` is a rather poor design. It currently basically requires two circular `Arc` references, leaving `NetworkGraph`s to leak if LDK is un-loaded: * `P2PGossipSync` owns/holds a reference to the `GossipVerifier` and `GossipVerifier` holds an `Arc` to the `P2PGossipSync` and * `PeerManager` holds a reference to the `P2PGossipSync` (as the gossip message handler) which owns/holds a reference to the `GossipVerifier`, which has a `Deref` (likely an `Arc` in practice) to the `PeerManager`. Instead, we should move towards the same design we have elsewhere - hold a `Notifier` and expose waiting on it to the background processor then poll for completion from there (in this case, as in others by checking for completion when handling `get_and_clear_pending_msg_events` calls). After the last few commits of setup, here we finally switch to waking the background processor directly when we detect async gossip validation completion, allowing us to drop the circular references in `P2PGossipSync`/`GossipVerifier` entirely. Fixes #3369 --- lightning-background-processor/src/lib.rs | 56 +++++++++++++++++------ lightning-block-sync/src/gossip.rs | 44 ++++-------------- lightning/src/routing/gossip.rs | 12 +++++ 3 files changed, 61 insertions(+), 51 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 36b563f5925..7361d026d5a 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -64,6 +64,7 @@ use lightning::util::persist::{ SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::sweep::{OutputSweeper, OutputSweeperSync}; +use lightning::util::wakers::Future; #[cfg(feature = "std")] use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -235,6 +236,14 @@ where GossipSync::None => None, } } + + fn validation_completion_future(&self) -> Option { + match self { + GossipSync::P2P(gossip_sync) => Some(gossip_sync.validation_completion_future()), + GossipSync::Rapid(_) => None, + GossipSync::None => None, + } + } } /// This is not exported to bindings users as the bindings concretize everything and have constructors for us @@ -520,12 +529,14 @@ pub(crate) mod futures_util { C: Future + Unpin, D: Future + Unpin, E: Future + Unpin, + F: Future + Unpin, > { pub a: A, pub b: B, pub c: C, pub d: D, pub e: E, + pub f: F, } pub(crate) enum SelectorOutput { @@ -534,6 +545,7 @@ pub(crate) mod futures_util { C, D, E, + F, } impl< @@ -542,7 +554,8 @@ pub(crate) mod futures_util { C: Future + Unpin, D: Future + Unpin, E: Future + Unpin, - > Future for Selector + F: Future + Unpin, + > Future for Selector { type Output = SelectorOutput; fn poll( @@ -580,6 +593,12 @@ pub(crate) mod futures_util { }, Poll::Pending => {}, } + match Pin::new(&mut self.f).poll(ctx) { + Poll::Ready(()) => { + return Poll::Ready(SelectorOutput::F); + }, + Poll::Pending => {}, + } Poll::Pending } } @@ -606,6 +625,12 @@ pub(crate) mod futures_util { } } + impl + Unpin> From> for OptionalSelector { + fn from(optional_future: Option) -> Self { + Self { optional_future } + } + } + // If we want to poll a future without an async context to figure out if it has completed or // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values // but sadly there's a good bit of boilerplate here. @@ -1058,18 +1083,13 @@ where if mobile_interruptable_platform { await_start = Some(sleeper(Duration::from_secs(1))); } - let om_fut = if let Some(om) = onion_messenger.as_ref() { - let fut = om.get_om().get_update_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; - let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { - let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; + let om_fut: OptionalSelector<_> = + onion_messenger.as_ref().map(|om| om.get_om().get_update_future()).into(); + let lm_fut: OptionalSelector<_> = liquidity_manager + .as_ref() + .map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future()) + .into(); + let gv_fut: OptionalSelector<_> = gossip_sync.validation_completion_future().into(); let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); let sleep_delay = match (needs_processing, mobile_interruptable_platform) { (true, true) => batch_delay.get().min(Duration::from_millis(100)), @@ -1083,9 +1103,14 @@ where c: chain_monitor.get_update_future(), d: om_fut, e: lm_fut, + f: gv_fut, }; match fut.await { - SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {}, + SelectorOutput::B + | SelectorOutput::C + | SelectorOutput::D + | SelectorOutput::E + | SelectorOutput::F => {}, SelectorOutput::A(exit) => { if exit { break; @@ -1639,11 +1664,12 @@ impl BackgroundProcessor { let lm_fut = liquidity_manager .as_ref() .map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future()); + let gv_fut = gossip_sync.validation_completion_future(); let always_futures = [ channel_manager.get_cm().get_event_or_persistence_needed_future(), chain_monitor.get_update_future(), ]; - let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut); + let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut); let sleeper = Sleeper::from_futures(futures); let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 63045b6cd92..00d321669ca 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -9,10 +9,7 @@ use bitcoin::constants::ChainHash; use bitcoin::hash_types::BlockHash; use bitcoin::transaction::{OutPoint, TxOut}; -use lightning::ln::peer_handler::APeerManager; -use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; -use lightning::util::logger::Logger; use lightning::util::native_async::FutureSpawner; use lightning::util::wakers::Notifier; @@ -128,46 +125,28 @@ impl< /// value of 1024 should more than suffice), and ensure you have sufficient file descriptors /// available on both Bitcoin Core and your LDK application for each request to hold its own /// connection. -pub struct GossipVerifier< - S: FutureSpawner, - Blocks: Deref + Send + Sync + 'static + Clone, - L: Deref + Send + Sync + 'static, -> where +pub struct GossipVerifier +where Blocks::Target: UtxoSource, - L::Target: Logger, { source: Blocks, - peer_manager_wake: Arc, - gossiper: Arc>, Arc, L>>, spawn: S, block_cache: Arc>>, } const BLOCK_CACHE_SIZE: usize = 5; -impl - GossipVerifier +impl GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { - /// Constructs a new [`GossipVerifier`]. + /// Constructs a new [`GossipVerifier`] for use in a [`P2PGossipSync`]. /// - /// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for - /// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`]. - pub fn new( - source: Blocks, spawn: S, gossiper: Arc>, Arc, L>>, - peer_manager: APM, - ) -> Self - where - APM::Target: APeerManager, - { - let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events()); + /// [`P2PGossipSync`]: lightning::routing::gossip::P2PGossipSync + pub fn new(source: Blocks, spawn: S) -> Self { Self { source, spawn, - gossiper, - peer_manager_wake, block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))), } } @@ -256,11 +235,9 @@ where } } -impl Deref - for GossipVerifier +impl Deref for GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { type Target = Self; fn deref(&self) -> &Self { @@ -268,23 +245,18 @@ where } } -impl UtxoLookup - for GossipVerifier +impl UtxoLookup for GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc) -> UtxoResult { let res = UtxoFuture::new(notifier); let fut = res.clone(); let source = self.source.clone(); - let gossiper = Arc::clone(&self.gossiper); let block_cache = Arc::clone(&self.block_cache); - let pmw = Arc::clone(&self.peer_manager_wake); self.spawn.spawn(async move { let res = Self::retrieve_utxo(source, block_cache, scid).await; fut.resolve(res); - (pmw)(); }); UtxoResult::Async(res) } diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 46ca3322ae7..e8fcb7b19d1 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -43,6 +43,7 @@ use crate::util::indexed_map::{ use crate::util::logger::{Level, Logger}; use crate::util::scid_utils::{block_from_scid, scid_from_parts, MAX_SCID_BLOCK}; use crate::util::ser::{MaybeReadable, Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; +use crate::util::wakers::Future; use crate::io; use crate::io_extras::{copy, sink}; @@ -367,6 +368,17 @@ where &self.network_graph } + /// Gets a [`Future`] which will resolve the next time an async validation of gossip data + /// completes. + /// + /// If the [`UtxoLookup`] provided in [`P2PGossipSync::new`] does not return + /// [`UtxoResult::Async`] values, the returned [`Future`] will never resolve + /// + /// [`UtxoResult::Async`]: crate::routing::utxo::UtxoResult::Async + pub fn validation_completion_future(&self) -> Future { + self.network_graph.pending_checks.completion_notifier.get_future() + } + /// Returns true when a full routing table sync should be performed with a peer. fn should_request_full_sync(&self) -> bool { const FULL_SYNCS_TO_REQUEST: usize = 5; From 15ddb3167939edc56f1bc76578e9f32cf51cab97 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 31 Dec 2025 15:27:39 +0000 Subject: [PATCH 5/5] Drop the async-setting of the `P2PGossipSync` `utxo_verifier` Now that we do not rely on circular references for `P2PGossipSync` validation, we no longer need the hacky `P2PGossipSync::add_utxo_lookup` method to add the gossip validator after building the `P2PGossipSync` first. Thus, we remove it here, updating some tests that relied on it. --- lightning/src/routing/gossip.rs | 18 ++++------ lightning/src/routing/router.rs | 12 +++---- lightning/src/routing/test_utils.rs | 53 ++++++++++++++++++++++++++--- 3 files changed, 59 insertions(+), 24 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index e8fcb7b19d1..040a28cddae 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -328,7 +328,10 @@ where L::Target: Logger, { network_graph: G, - utxo_lookup: RwLock>, + #[cfg(any(feature = "_test_utils", test))] + pub(super) utxo_lookup: Option, + #[cfg(not(any(feature = "_test_utils", test)))] + utxo_lookup: Option, full_syncs_requested: AtomicUsize, pending_events: Mutex>, logger: L, @@ -341,25 +344,19 @@ where { /// Creates a new tracker of the actual state of the network of channels and nodes, /// assuming an existing [`NetworkGraph`]. + /// /// UTXO lookup is used to make sure announced channels exist on-chain, channel data is /// correct, and the announcement is signed with channel owners' keys. pub fn new(network_graph: G, utxo_lookup: Option, logger: L) -> Self { P2PGossipSync { network_graph, full_syncs_requested: AtomicUsize::new(0), - utxo_lookup: RwLock::new(utxo_lookup), + utxo_lookup, pending_events: Mutex::new(vec![]), logger, } } - /// Adds a provider used to check new announcements. Does not affect - /// existing announcements unless they are updated. - /// Add, update or remove the provider would replace the current one. - pub fn add_utxo_lookup(&self, utxo_lookup: Option) { - *self.utxo_lookup.write().unwrap() = utxo_lookup; - } - /// Gets a reference to the underlying [`NetworkGraph`] which was provided in /// [`P2PGossipSync::new`]. /// @@ -564,8 +561,7 @@ where fn handle_channel_announcement( &self, _their_node_id: Option, msg: &msgs::ChannelAnnouncement, ) -> Result { - self.network_graph - .update_channel_from_announcement(msg, &*self.utxo_lookup.read().unwrap())?; + self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index c06e5174263..40580a09c8c 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -3943,10 +3943,7 @@ mod tests { ChannelUsage, FixedPenaltyScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, ScoreLookUp, }; - use crate::routing::test_utils::{ - add_channel, add_or_update_node, build_graph, build_line_graph, get_nodes, - id_to_feature_flags, update_channel, - }; + use crate::routing::test_utils::*; use crate::routing::utxo::UtxoResult; use crate::types::features::{BlindedHopFeatures, ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::config::UserConfig; @@ -5368,7 +5365,7 @@ mod tests { fn available_amount_while_routing_test() { // Tests whether we choose the correct available channel amount while routing. - let (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) = build_graph(); + let (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) = build_graph_with_gossip_validation(); let (our_privkey, our_id, privkeys, nodes) = get_nodes(&secp_ctx); let scorer = ln_test_utils::TestScorer::new(); let random_seed_bytes = [42; 32]; @@ -5588,11 +5585,10 @@ mod tests { .push_opcode(opcodes::all::OP_PUSHNUM_2) .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_p2wsh(); + *chain_monitor.utxo_ret.lock().unwrap() = UtxoResult::Sync(Ok(TxOut { value: Amount::from_sat(15), script_pubkey: good_script.clone() })); - gossip_sync.add_utxo_lookup(Some(chain_monitor)); - - add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); + add_channel_skipping_utxo_update(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); update_channel(&gossip_sync, &secp_ctx, &privkeys[0], UnsignedChannelUpdate { chain_hash: ChainHash::using_genesis_block(Network::Testnet), short_channel_id: 333, diff --git a/lightning/src/routing/test_utils.rs b/lightning/src/routing/test_utils.rs index c5c35c9ce77..a433fa30c5b 100644 --- a/lightning/src/routing/test_utils.rs +++ b/lightning/src/routing/test_utils.rs @@ -10,7 +10,9 @@ // licenses. use crate::routing::gossip::{NetworkGraph, NodeAlias, P2PGossipSync}; +use crate::routing::utxo::UtxoResult; use crate::types::features::{ChannelFeatures, NodeFeatures}; +use crate::ln::chan_utils::make_funding_redeemscript; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, MAX_VALUE_MSAT, NodeAnnouncement, RoutingMessageHandler, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement}; use crate::util::test_utils; use crate::util::ser::Writeable; @@ -22,6 +24,7 @@ use bitcoin::hex::FromHex; use bitcoin::network::Network; use bitcoin::secp256k1::{PublicKey,SecretKey}; use bitcoin::secp256k1::{Secp256k1, All}; +use bitcoin::{Amount, TxOut}; #[allow(unused)] use crate::prelude::*; @@ -58,19 +61,34 @@ pub(crate) fn channel_announcement( } // Using the same keys for LN and BTC ids -pub(crate) fn add_channel( +pub(crate) fn add_channel_skipping_utxo_update( gossip_sync: &P2PGossipSync>>, Arc, Arc>, - secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64, ) { let valid_announcement = channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx); - let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); + + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) { Ok(res) => assert!(res), - _ => panic!() + Err(e) => panic!("{:?}", e), }; } +pub(crate) fn add_channel( + gossip_sync: &P2PGossipSync>>, Arc, Arc>, + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64, +) { + gossip_sync.utxo_lookup.as_ref().map(|checker| { + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); + let node_2_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_2_privkey); + let script_pubkey = make_funding_redeemscript(&node_1_pubkey, &node_2_pubkey).to_p2wsh(); + *checker.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: Amount::from_sat(21_000_000_0000_0000), script_pubkey })); + }); + add_channel_skipping_utxo_update(gossip_sync, secp_ctx, node_1_privkey, node_2_privkey, features, short_channel_id); +} + pub(crate) fn add_or_update_node( gossip_sync: &P2PGossipSync>>, Arc, Arc>, secp_ctx: &Secp256k1, node_privkey: &SecretKey, features: NodeFeatures, timestamp: u32 @@ -197,18 +215,43 @@ pub(super) fn build_line_graph() -> ( (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) } +pub(super) fn build_graph_with_gossip_validation() -> ( + Secp256k1, + sync::Arc>>, + P2PGossipSync>>, sync::Arc, sync::Arc>, + sync::Arc, + sync::Arc, +) { + do_build_graph(true) +} + pub(super) fn build_graph() -> ( Secp256k1, sync::Arc>>, P2PGossipSync>>, sync::Arc, sync::Arc>, sync::Arc, sync::Arc, +) { + do_build_graph(false) +} + +fn do_build_graph(with_validation: bool) -> ( + Secp256k1, + sync::Arc>>, + P2PGossipSync>>, sync::Arc, sync::Arc>, + sync::Arc, + sync::Arc, ) { let secp_ctx = Secp256k1::new(); let logger = Arc::new(test_utils::TestLogger::new()); let chain_monitor = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, Arc::clone(&logger))); - let gossip_sync = P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger)); + let checker = if with_validation { + Some(Arc::clone(&chain_monitor)) + } else { + None + }; + let gossip_sync = P2PGossipSync::new(Arc::clone(&network_graph), checker, Arc::clone(&logger)); // Build network from our_id to node6: // // -1(1)2- node0 -1(3)2-