diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index af29a0221a9..2e5b15fc7f4 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -31,6 +31,7 @@ use lightning::types::features::{BlindedHopFeatures, Bolt12InvoiceFeatures}; use lightning::util::config::UserConfig; use lightning::util::hash_tables::*; use lightning::util::ser::LengthReadable; +use lightning::util::wakers::Notifier; use bitcoin::hashes::Hash; use bitcoin::network::Network; @@ -88,12 +89,11 @@ impl InputData { } } -struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { +struct FuzzChainSource { input: Arc, - net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } -impl UtxoLookup for FuzzChainSource<'_, '_, Out> { - fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { +impl UtxoLookup for FuzzChainSource { + fn get_utxo(&self, _chain_hash: &ChainHash, _scid: u64, notifier: Arc) -> UtxoResult { let input_slice = self.input.get_slice(2); if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); @@ -107,17 +107,17 @@ impl UtxoLookup for FuzzChainSource<'_, '_, Out> { &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)), &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), &[2, _] => { - let future = UtxoFuture::new(); - future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); + let future = UtxoFuture::new(notifier); + future.resolve(Ok(txo_res)); UtxoResult::Async(future.clone()) }, &[3, _] => { - let future = UtxoFuture::new(); - future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); + let future = UtxoFuture::new(notifier); + future.resolve(Err(UtxoLookupError::UnknownTx)); UtxoResult::Async(future.clone()) }, &[4, _] => { - UtxoResult::Async(UtxoFuture::new()) // the future will never resolve + UtxoResult::Async(UtxoFuture::new(notifier)) // the future will never resolve }, &[..] => UtxoResult::Sync(Ok(txo_res)), } @@ -197,7 +197,7 @@ pub fn do_test(data: &[u8], out: Out) { let our_pubkey = get_pubkey!(); let net_graph = NetworkGraph::new(Network::Bitcoin, &logger); - let chain_source = FuzzChainSource { input: Arc::clone(&input), net_graph: &net_graph }; + let chain_source = FuzzChainSource { input: Arc::clone(&input) }; let mut node_pks = new_hash_map(); let mut scid = 42; @@ -335,9 +335,7 @@ pub fn do_test(data: &[u8], out: Out) { node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1), ()); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2), ()); let _ = net_graph - .update_channel_from_unsigned_announcement::<&FuzzChainSource<'_, '_, Out>>( - &msg, &None, - ); + .update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None); }, 2 => { let msg = diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index aae738ab1c1..7361d026d5a 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -64,6 +64,7 @@ use lightning::util::persist::{ SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::sweep::{OutputSweeper, OutputSweeperSync}; +use lightning::util::wakers::Future; #[cfg(feature = "std")] use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -235,6 +236,14 @@ where GossipSync::None => None, } } + + fn validation_completion_future(&self) -> Option { + match self { + GossipSync::P2P(gossip_sync) => Some(gossip_sync.validation_completion_future()), + GossipSync::Rapid(_) => None, + GossipSync::None => None, + } + } } /// This is not exported to bindings users as the bindings concretize everything and have constructors for us @@ -520,12 +529,14 @@ pub(crate) mod futures_util { C: Future + Unpin, D: Future + Unpin, E: Future + Unpin, + F: Future + Unpin, > { pub a: A, pub b: B, pub c: C, pub d: D, pub e: E, + pub f: F, } pub(crate) enum SelectorOutput { @@ -534,6 +545,7 @@ pub(crate) mod futures_util { C, D, E, + F, } impl< @@ -542,7 +554,8 @@ pub(crate) mod futures_util { C: Future + Unpin, D: Future + Unpin, E: Future + Unpin, - > Future for Selector + F: Future + Unpin, + > Future for Selector { type Output = SelectorOutput; fn poll( @@ -580,6 +593,12 @@ pub(crate) mod futures_util { }, Poll::Pending => {}, } + match Pin::new(&mut self.f).poll(ctx) { + Poll::Ready(()) => { + return Poll::Ready(SelectorOutput::F); + }, + Poll::Pending => {}, + } Poll::Pending } } @@ -606,6 +625,12 @@ pub(crate) mod futures_util { } } + impl + Unpin> From> for OptionalSelector { + fn from(optional_future: Option) -> Self { + Self { optional_future } + } + } + // If we want to poll a future without an async context to figure out if it has completed or // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values // but sadly there's a good bit of boilerplate here. @@ -1058,18 +1083,13 @@ where if mobile_interruptable_platform { await_start = Some(sleeper(Duration::from_secs(1))); } - let om_fut = if let Some(om) = onion_messenger.as_ref() { - let fut = om.get_om().get_update_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; - let lm_fut = if let Some(lm) = liquidity_manager.as_ref() { - let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future(); - OptionalSelector { optional_future: Some(fut) } - } else { - OptionalSelector { optional_future: None } - }; + let om_fut: OptionalSelector<_> = + onion_messenger.as_ref().map(|om| om.get_om().get_update_future()).into(); + let lm_fut: OptionalSelector<_> = liquidity_manager + .as_ref() + .map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future()) + .into(); + let gv_fut: OptionalSelector<_> = gossip_sync.validation_completion_future().into(); let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing(); let sleep_delay = match (needs_processing, mobile_interruptable_platform) { (true, true) => batch_delay.get().min(Duration::from_millis(100)), @@ -1083,9 +1103,14 @@ where c: chain_monitor.get_update_future(), d: om_fut, e: lm_fut, + f: gv_fut, }; match fut.await { - SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {}, + SelectorOutput::B + | SelectorOutput::C + | SelectorOutput::D + | SelectorOutput::E + | SelectorOutput::F => {}, SelectorOutput::A(exit) => { if exit { break; @@ -1635,28 +1660,18 @@ impl BackgroundProcessor { log_trace!(logger, "Terminating background processor."); break; } - let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) { - (Some(om), Some(lm)) => Sleeper::from_four_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - &lm.get_lm().get_pending_msgs_or_needs_persist_future(), - ), - (Some(om), None) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &om.get_om().get_update_future(), - ), - (None, Some(lm)) => Sleeper::from_three_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - &lm.get_lm().get_pending_msgs_or_needs_persist_future(), - ), - (None, None) => Sleeper::from_two_futures( - &channel_manager.get_cm().get_event_or_persistence_needed_future(), - &chain_monitor.get_update_future(), - ), - }; + let om_fut = onion_messenger.as_ref().map(|om| om.get_om().get_update_future()); + let lm_fut = liquidity_manager + .as_ref() + .map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future()); + let gv_fut = gossip_sync.validation_completion_future(); + let always_futures = [ + channel_manager.get_cm().get_event_or_persistence_needed_future(), + chain_monitor.get_update_future(), + ]; + let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut); + let sleeper = Sleeper::from_futures(futures); + let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() { batch_delay.get() } else { diff --git a/lightning-block-sync/src/gossip.rs b/lightning-block-sync/src/gossip.rs index 596098350c7..00d321669ca 100644 --- a/lightning-block-sync/src/gossip.rs +++ b/lightning-block-sync/src/gossip.rs @@ -9,11 +9,9 @@ use bitcoin::constants::ChainHash; use bitcoin::hash_types::BlockHash; use bitcoin::transaction::{OutPoint, TxOut}; -use lightning::ln::peer_handler::APeerManager; -use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; -use lightning::util::logger::Logger; use lightning::util::native_async::FutureSpawner; +use lightning::util::wakers::Notifier; use std::collections::VecDeque; use std::future::Future; @@ -127,46 +125,28 @@ impl< /// value of 1024 should more than suffice), and ensure you have sufficient file descriptors /// available on both Bitcoin Core and your LDK application for each request to hold its own /// connection. -pub struct GossipVerifier< - S: FutureSpawner, - Blocks: Deref + Send + Sync + 'static + Clone, - L: Deref + Send + Sync + 'static, -> where +pub struct GossipVerifier +where Blocks::Target: UtxoSource, - L::Target: Logger, { source: Blocks, - peer_manager_wake: Arc, - gossiper: Arc>, Arc, L>>, spawn: S, block_cache: Arc>>, } const BLOCK_CACHE_SIZE: usize = 5; -impl - GossipVerifier +impl GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { - /// Constructs a new [`GossipVerifier`]. + /// Constructs a new [`GossipVerifier`] for use in a [`P2PGossipSync`]. /// - /// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for - /// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`]. - pub fn new( - source: Blocks, spawn: S, gossiper: Arc>, Arc, L>>, - peer_manager: APM, - ) -> Self - where - APM::Target: APeerManager, - { - let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events()); + /// [`P2PGossipSync`]: lightning::routing::gossip::P2PGossipSync + pub fn new(source: Blocks, spawn: S) -> Self { Self { source, spawn, - gossiper, - peer_manager_wake, block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))), } } @@ -255,11 +235,9 @@ where } } -impl Deref - for GossipVerifier +impl Deref for GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { type Target = Self; fn deref(&self) -> &Self { @@ -267,23 +245,18 @@ where } } -impl UtxoLookup - for GossipVerifier +impl UtxoLookup for GossipVerifier where Blocks::Target: UtxoSource, - L::Target: Logger, { - fn get_utxo(&self, _chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult { - let res = UtxoFuture::new(); + fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc) -> UtxoResult { + let res = UtxoFuture::new(notifier); let fut = res.clone(); let source = self.source.clone(); - let gossiper = Arc::clone(&self.gossiper); let block_cache = Arc::clone(&self.block_cache); - let pmw = Arc::clone(&self.peer_manager_wake); self.spawn.spawn(async move { - let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await; - fut.resolve(gossiper.network_graph(), &*gossiper, res); - (pmw)(); + let res = Self::retrieve_utxo(source, block_cache, scid).await; + fut.resolve(res); }); UtxoResult::Async(res) } diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index ae317ad1ac3..040a28cddae 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -43,6 +43,7 @@ use crate::util::indexed_map::{ use crate::util::logger::{Level, Logger}; use crate::util::scid_utils::{block_from_scid, scid_from_parts, MAX_SCID_BLOCK}; use crate::util::ser::{MaybeReadable, Readable, ReadableArgs, RequiredWrapper, Writeable, Writer}; +use crate::util::wakers::Future; use crate::io; use crate::io_extras::{copy, sink}; @@ -327,7 +328,10 @@ where L::Target: Logger, { network_graph: G, - utxo_lookup: RwLock>, + #[cfg(any(feature = "_test_utils", test))] + pub(super) utxo_lookup: Option, + #[cfg(not(any(feature = "_test_utils", test)))] + utxo_lookup: Option, full_syncs_requested: AtomicUsize, pending_events: Mutex>, logger: L, @@ -340,25 +344,19 @@ where { /// Creates a new tracker of the actual state of the network of channels and nodes, /// assuming an existing [`NetworkGraph`]. + /// /// UTXO lookup is used to make sure announced channels exist on-chain, channel data is /// correct, and the announcement is signed with channel owners' keys. pub fn new(network_graph: G, utxo_lookup: Option, logger: L) -> Self { P2PGossipSync { network_graph, full_syncs_requested: AtomicUsize::new(0), - utxo_lookup: RwLock::new(utxo_lookup), + utxo_lookup, pending_events: Mutex::new(vec![]), logger, } } - /// Adds a provider used to check new announcements. Does not affect - /// existing announcements unless they are updated. - /// Add, update or remove the provider would replace the current one. - pub fn add_utxo_lookup(&self, utxo_lookup: Option) { - *self.utxo_lookup.write().unwrap() = utxo_lookup; - } - /// Gets a reference to the underlying [`NetworkGraph`] which was provided in /// [`P2PGossipSync::new`]. /// @@ -367,6 +365,17 @@ where &self.network_graph } + /// Gets a [`Future`] which will resolve the next time an async validation of gossip data + /// completes. + /// + /// If the [`UtxoLookup`] provided in [`P2PGossipSync::new`] does not return + /// [`UtxoResult::Async`] values, the returned [`Future`] will never resolve + /// + /// [`UtxoResult::Async`]: crate::routing::utxo::UtxoResult::Async + pub fn validation_completion_future(&self) -> Future { + self.network_graph.pending_checks.completion_notifier.get_future() + } + /// Returns true when a full routing table sync should be performed with a peer. fn should_request_full_sync(&self) -> bool { const FULL_SYNCS_TO_REQUEST: usize = 5; @@ -378,39 +387,42 @@ where } } - /// Used to broadcast forward gossip messages which were validated async. - /// - /// Note that this will ignore events other than `Broadcast*` or messages with too much excess - /// data. - pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) { - match &mut ev { - MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { - return; - } - if update_msg.as_ref().map(|msg| msg.contents.excess_data.len()).unwrap_or(0) - > MAX_EXCESS_BYTES_FOR_RELAY - { - *update_msg = None; - } - }, - MessageSendEvent::BroadcastChannelUpdate { msg, .. } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { - return; - } - }, - MessageSendEvent::BroadcastNodeAnnouncement { msg } => { - if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY - || msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY - || msg.contents.excess_data.len() + msg.contents.excess_address_data.len() + /// Walks the list of pending UTXO validations and removes completed ones, adding any messages + /// we should forward as a result to [`Self::pending_events`]. + fn process_completed_checks(&self) { + let msgs = self.network_graph.pending_checks.check_resolved_futures(&*self.network_graph); + let mut pending_events = self.pending_events.lock().unwrap(); + pending_events.reserve(msgs.len()); + for mut message in msgs { + match &mut message { + MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { + continue; + } + if update_msg.as_ref().map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY - { - return; - } - }, - _ => return, + { + *update_msg = None; + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg, .. } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { + continue; + } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + || msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + || msg.contents.excess_data.len() + msg.contents.excess_address_data.len() + > MAX_EXCESS_BYTES_FOR_RELAY + { + continue; + } + }, + _ => continue, + } + pending_events.push(message); } - self.pending_events.lock().unwrap().push(ev); } } @@ -549,8 +561,7 @@ where fn handle_channel_announcement( &self, _their_node_id: Option, msg: &msgs::ChannelAnnouncement, ) -> Result { - self.network_graph - .update_channel_from_announcement(msg, &*self.utxo_lookup.read().unwrap())?; + self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } @@ -884,6 +895,7 @@ where } fn get_and_clear_pending_msg_events(&self) -> Vec { + self.process_completed_checks(); let mut ret = Vec::new(); let mut pending_events = self.pending_events.lock().unwrap(); core::mem::swap(&mut ret, &mut pending_events); diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index c06e5174263..40580a09c8c 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -3943,10 +3943,7 @@ mod tests { ChannelUsage, FixedPenaltyScorer, ProbabilisticScorer, ProbabilisticScoringDecayParameters, ProbabilisticScoringFeeParameters, ScoreLookUp, }; - use crate::routing::test_utils::{ - add_channel, add_or_update_node, build_graph, build_line_graph, get_nodes, - id_to_feature_flags, update_channel, - }; + use crate::routing::test_utils::*; use crate::routing::utxo::UtxoResult; use crate::types::features::{BlindedHopFeatures, ChannelFeatures, InitFeatures, NodeFeatures}; use crate::util::config::UserConfig; @@ -5368,7 +5365,7 @@ mod tests { fn available_amount_while_routing_test() { // Tests whether we choose the correct available channel amount while routing. - let (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) = build_graph(); + let (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) = build_graph_with_gossip_validation(); let (our_privkey, our_id, privkeys, nodes) = get_nodes(&secp_ctx); let scorer = ln_test_utils::TestScorer::new(); let random_seed_bytes = [42; 32]; @@ -5588,11 +5585,10 @@ mod tests { .push_opcode(opcodes::all::OP_PUSHNUM_2) .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_p2wsh(); + *chain_monitor.utxo_ret.lock().unwrap() = UtxoResult::Sync(Ok(TxOut { value: Amount::from_sat(15), script_pubkey: good_script.clone() })); - gossip_sync.add_utxo_lookup(Some(chain_monitor)); - - add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); + add_channel_skipping_utxo_update(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); update_channel(&gossip_sync, &secp_ctx, &privkeys[0], UnsignedChannelUpdate { chain_hash: ChainHash::using_genesis_block(Network::Testnet), short_channel_id: 333, diff --git a/lightning/src/routing/test_utils.rs b/lightning/src/routing/test_utils.rs index c5c35c9ce77..a433fa30c5b 100644 --- a/lightning/src/routing/test_utils.rs +++ b/lightning/src/routing/test_utils.rs @@ -10,7 +10,9 @@ // licenses. use crate::routing::gossip::{NetworkGraph, NodeAlias, P2PGossipSync}; +use crate::routing::utxo::UtxoResult; use crate::types::features::{ChannelFeatures, NodeFeatures}; +use crate::ln::chan_utils::make_funding_redeemscript; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, MAX_VALUE_MSAT, NodeAnnouncement, RoutingMessageHandler, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement}; use crate::util::test_utils; use crate::util::ser::Writeable; @@ -22,6 +24,7 @@ use bitcoin::hex::FromHex; use bitcoin::network::Network; use bitcoin::secp256k1::{PublicKey,SecretKey}; use bitcoin::secp256k1::{Secp256k1, All}; +use bitcoin::{Amount, TxOut}; #[allow(unused)] use crate::prelude::*; @@ -58,19 +61,34 @@ pub(crate) fn channel_announcement( } // Using the same keys for LN and BTC ids -pub(crate) fn add_channel( +pub(crate) fn add_channel_skipping_utxo_update( gossip_sync: &P2PGossipSync>>, Arc, Arc>, - secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64 + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64, ) { let valid_announcement = channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx); - let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey); + + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) { Ok(res) => assert!(res), - _ => panic!() + Err(e) => panic!("{:?}", e), }; } +pub(crate) fn add_channel( + gossip_sync: &P2PGossipSync>>, Arc, Arc>, + secp_ctx: &Secp256k1, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64, +) { + gossip_sync.utxo_lookup.as_ref().map(|checker| { + let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); + let node_2_pubkey = PublicKey::from_secret_key(&secp_ctx, &node_2_privkey); + let script_pubkey = make_funding_redeemscript(&node_1_pubkey, &node_2_pubkey).to_p2wsh(); + *checker.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: Amount::from_sat(21_000_000_0000_0000), script_pubkey })); + }); + add_channel_skipping_utxo_update(gossip_sync, secp_ctx, node_1_privkey, node_2_privkey, features, short_channel_id); +} + pub(crate) fn add_or_update_node( gossip_sync: &P2PGossipSync>>, Arc, Arc>, secp_ctx: &Secp256k1, node_privkey: &SecretKey, features: NodeFeatures, timestamp: u32 @@ -197,18 +215,43 @@ pub(super) fn build_line_graph() -> ( (secp_ctx, network_graph, gossip_sync, chain_monitor, logger) } +pub(super) fn build_graph_with_gossip_validation() -> ( + Secp256k1, + sync::Arc>>, + P2PGossipSync>>, sync::Arc, sync::Arc>, + sync::Arc, + sync::Arc, +) { + do_build_graph(true) +} + pub(super) fn build_graph() -> ( Secp256k1, sync::Arc>>, P2PGossipSync>>, sync::Arc, sync::Arc>, sync::Arc, sync::Arc, +) { + do_build_graph(false) +} + +fn do_build_graph(with_validation: bool) -> ( + Secp256k1, + sync::Arc>>, + P2PGossipSync>>, sync::Arc, sync::Arc>, + sync::Arc, + sync::Arc, ) { let secp_ctx = Secp256k1::new(); let logger = Arc::new(test_utils::TestLogger::new()); let chain_monitor = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); let network_graph = Arc::new(NetworkGraph::new(Network::Testnet, Arc::clone(&logger))); - let gossip_sync = P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger)); + let checker = if with_validation { + Some(Arc::clone(&chain_monitor)) + } else { + None + }; + let gossip_sync = P2PGossipSync::new(Arc::clone(&network_graph), checker, Arc::clone(&logger)); // Build network from our_id to node6: // // -1(1)2- node0 -1(3)2- diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 4299dffb90f..f46160f1f14 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -21,8 +21,9 @@ use bitcoin::hex::DisplayHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, ErrorAction, LightningError, MessageSendEvent}; -use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::routing::gossip::{NetworkGraph, NodeId}; use crate::util::logger::{Level, Logger}; +use crate::util::wakers::Notifier; use crate::prelude::*; @@ -64,8 +65,14 @@ pub trait UtxoLookup { /// Returns an error if `chain_hash` is for a different chain or if such a transaction output is /// unknown. /// + /// An `async_completion_notifier` is provided which should be [`Notifier::notify`]ed upon + /// resolution of the [`UtxoFuture`] in case this method returns [`UtxoResult::Async`]. + /// /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult; + fn get_utxo( + &self, chain_hash: &ChainHash, short_channel_id: u64, + async_completion_notifier: Arc, + ) -> UtxoResult; } enum ChannelAnnouncement { @@ -108,6 +115,7 @@ impl ChannelUpdate { } struct UtxoMessages { + notifier: Arc, complete: Option>, channel_announce: Option, latest_node_announce_a: Option, @@ -128,166 +136,32 @@ pub struct UtxoFuture { /// once we have a concrete resolution of a request. pub(crate) struct UtxoResolver(Result); impl UtxoLookup for UtxoResolver { - fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { + fn get_utxo(&self, _hash: &ChainHash, _scid: u64, _notifier: Arc) -> UtxoResult { UtxoResult::Sync(self.0.clone()) } } impl UtxoFuture { /// Builds a new future for later resolution. - #[rustfmt::skip] - pub fn new() -> Self { - Self { state: Arc::new(Mutex::new(UtxoMessages { - complete: None, - channel_announce: None, - latest_node_announce_a: None, - latest_node_announce_b: None, - latest_channel_update_a: None, - latest_channel_update_b: None, - }))} - } - - /// Resolves this future against the given `graph` and with the given `result`. - /// - /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling - /// forwarding the validated gossip message onwards to peers. - /// - /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order - /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] - /// after this. - /// - /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high - /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - pub fn resolve_without_forwarding( - &self, graph: &NetworkGraph, result: Result, - ) where - L::Target: Logger, - { - self.do_resolve(graph, result); - } - - /// Resolves this future against the given `graph` and with the given `result`. - /// - /// The given `gossip` is used to broadcast any validated messages onwards to all peers which - /// have available buffer space. - /// - /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order - /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] - /// after this. - /// - /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high - /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events - pub fn resolve< - L: Deref, - G: Deref>, - U: Deref, - GS: Deref>, - >( - &self, graph: &NetworkGraph, gossip: GS, result: Result, - ) where - L::Target: Logger, - U::Target: UtxoLookup, - { - let mut res = self.do_resolve(graph, result); - for msg_opt in res.iter_mut() { - if let Some(msg) = msg_opt.take() { - gossip.forward_gossip_msg(msg); - } + pub fn new(notifier: Arc) -> Self { + Self { + state: Arc::new(Mutex::new(UtxoMessages { + notifier, + complete: None, + channel_announce: None, + latest_node_announce_a: None, + latest_node_announce_b: None, + latest_channel_update_a: None, + latest_channel_update_b: None, + })), } } - #[rustfmt::skip] - fn do_resolve(&self, graph: &NetworkGraph, result: Result) - -> [Option; 5] where L::Target: Logger { - let (announcement, node_a, node_b, update_a, update_b) = { - let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); - let mut async_messages = self.state.lock().unwrap(); - - if async_messages.channel_announce.is_none() { - // We raced returning to `check_channel_announcement` which hasn't updated - // `channel_announce` yet. That's okay, we can set the `complete` field which it will - // check once it gets control again. - async_messages.complete = Some(result); - return [None, None, None, None, None]; - } - - let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { - ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, - ChannelAnnouncement::Unsigned(msg) => &msg, - }; - - pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); - - (async_messages.channel_announce.take().unwrap(), - async_messages.latest_node_announce_a.take(), - async_messages.latest_node_announce_b.take(), - async_messages.latest_channel_update_a.take(), - async_messages.latest_channel_update_b.take()) - }; - - let mut res = [None, None, None, None, None]; - let mut res_idx = 0; - - // Now that we've updated our internal state, pass the pending messages back through the - // network graph with a different `UtxoLookup` which will resolve immediately. - // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do - // with them. - let resolver = UtxoResolver(result); - let (node_id_1, node_id_2) = match &announcement { - ChannelAnnouncement::Full(signed_msg) => (signed_msg.contents.node_id_1, signed_msg.contents.node_id_2), - ChannelAnnouncement::Unsigned(msg) => (msg.node_id_1, msg.node_id_2), - }; - match announcement { - ChannelAnnouncement::Full(signed_msg) => { - if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { - msg: signed_msg, update_msg: None, - }); - res_idx += 1; - } - }, - ChannelAnnouncement::Unsigned(msg) => { - let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); - }, - } - - for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { - match announce { - Some(NodeAnnouncement::Full(signed_msg)) => { - if graph.update_node_from_announcement(&signed_msg).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { - msg: signed_msg, - }); - res_idx += 1; - } - }, - Some(NodeAnnouncement::Unsigned(msg)) => { - let _ = graph.update_node_from_unsigned_announcement(&msg); - }, - None => {}, - } - } - - for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { - match update { - Some(ChannelUpdate::Full(signed_msg)) => { - if graph.update_channel(&signed_msg).is_ok() { - res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { - msg: signed_msg, - node_id_1, - node_id_2, - }); - res_idx += 1; - } - }, - Some(ChannelUpdate::Unsigned(msg)) => { - let _ = graph.update_channel_unsigned(&msg); - }, - None => {}, - } - } - - res + /// Resolves this future with the given result. + pub fn resolve(&self, result: Result) { + let mut state = self.state.lock().unwrap(); + state.complete = Some(result); + state.notifier.notify(); } } @@ -296,39 +170,21 @@ struct PendingChecksContext { nodes: HashMap>>>, } -impl PendingChecksContext { - #[rustfmt::skip] - fn lookup_completed(&mut self, - msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak> - ) { - if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) { - if Weak::ptr_eq(e.get(), &completed_state) { - e.remove(); - } - } - - if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) { - e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); - if e.get().is_empty() { e.remove(); } - } - if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) { - e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); - if e.get().is_empty() { e.remove(); } - } - } -} - /// A set of messages which are pending UTXO lookups for processing. pub(super) struct PendingChecks { internal: Mutex, + pub(super) completion_notifier: Arc, } impl PendingChecks { - #[rustfmt::skip] pub(super) fn new() -> Self { - PendingChecks { internal: Mutex::new(PendingChecksContext { - channels: new_hash_map(), nodes: new_hash_map(), - }) } + PendingChecks { + internal: Mutex::new(PendingChecksContext { + channels: new_hash_map(), + nodes: new_hash_map(), + }), + completion_notifier: Arc::new(Notifier::new()), + } } /// Checks if there is a pending `channel_update` UTXO validation for the given channel, @@ -519,7 +375,8 @@ impl PendingChecks { Ok(None) }, &Some(ref utxo_lookup) => { - match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + let notifier = Arc::clone(&self.completion_notifier); + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id, notifier) { UtxoResult::Sync(res) => handle_result(res), UtxoResult::Async(future) => { let mut pending_checks = self.internal.lock().unwrap(); @@ -581,6 +438,142 @@ impl PendingChecks { false } } + + fn resolve_single_future( + &self, graph: &NetworkGraph, entry: Arc>, + new_messages: &mut Vec, + ) where + L::Target: Logger, + { + let (announcement, result, announce_a, announce_b, update_a, update_b); + { + let mut state = entry.lock().unwrap(); + announcement = if let Some(announcement) = state.channel_announce.take() { + announcement + } else { + // We raced returning to `check_channel_announcement` which hasn't updated + // `channel_announce` yet. That's okay, we can set the `complete` field which it will + // check once it gets control again. + return; + }; + + result = if let Some(result) = state.complete.take() { + result + } else { + debug_assert!(false, "Future should have been resolved"); + return; + }; + + announce_a = state.latest_node_announce_a.take(); + announce_b = state.latest_node_announce_b.take(); + update_a = state.latest_channel_update_a.take(); + update_b = state.latest_channel_update_b.take(); + } + + // Now that we've updated our internal state, pass the pending messages back through the + // network graph with a different `UtxoLookup` which will resolve immediately. + // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do + // with them. + let resolver = UtxoResolver(result); + let (node_id_1, node_id_2) = match &announcement { + ChannelAnnouncement::Full(signed_msg) => { + (signed_msg.contents.node_id_1, signed_msg.contents.node_id_2) + }, + ChannelAnnouncement::Unsigned(msg) => (msg.node_id_1, msg.node_id_2), + }; + match announcement { + ChannelAnnouncement::Full(signed_msg) => { + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + new_messages.push(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, + update_msg: None, + }); + } + }, + ChannelAnnouncement::Unsigned(msg) => { + let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); + }, + } + + for announce in [announce_a, announce_b] { + match announce { + Some(NodeAnnouncement::Full(signed_msg)) => { + if graph.update_node_from_announcement(&signed_msg).is_ok() { + new_messages + .push(MessageSendEvent::BroadcastNodeAnnouncement { msg: signed_msg }); + } + }, + Some(NodeAnnouncement::Unsigned(msg)) => { + let _ = graph.update_node_from_unsigned_announcement(&msg); + }, + None => {}, + } + } + + for update in [update_a, update_b] { + match update { + Some(ChannelUpdate::Full(signed_msg)) => { + if graph.update_channel(&signed_msg).is_ok() { + new_messages.push(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + node_id_1, + node_id_2, + }); + } + }, + Some(ChannelUpdate::Unsigned(msg)) => { + let _ = graph.update_channel_unsigned(&msg); + }, + None => {}, + } + } + } + + pub(super) fn check_resolved_futures( + &self, graph: &NetworkGraph, + ) -> Vec + where + L::Target: Logger, + { + let mut completed_states = Vec::new(); + { + let mut lck = self.internal.lock().unwrap(); + lck.channels.retain(|_, state| { + if let Some(state) = state.upgrade() { + if state.lock().unwrap().complete.is_some() { + completed_states.push(state); + false + } else { + true + } + } else { + // The UtxoFuture has been dropped, drop the pending-lookup state. + false + } + }); + lck.nodes.retain(|_, lookups| { + lookups.retain(|state| { + if let Some(state) = state.upgrade() { + if state.lock().unwrap().complete.is_some() { + completed_states.push(state); + false + } else { + true + } + } else { + // The UtxoFuture has been dropped, drop the pending-lookup state. + false + } + }); + !lookups.is_empty() + }); + } + let mut res = Vec::with_capacity(completed_states.len() * 5); + for state in completed_states { + self.resolve_single_future(graph, state, &mut res); + } + res + } } #[cfg(test)] @@ -636,9 +629,12 @@ mod tests { // `get_utxo` call can read it still resolve properly. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); - let future = UtxoFuture::new(); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); @@ -652,7 +648,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, node_b_announce, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -660,8 +657,9 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); + future.resolve(Ok(TxOut { value: Amount::ZERO, script_pubkey: good_script })); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); @@ -681,7 +679,8 @@ mod tests { // Test an async lookup which returns an incorrect script let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -689,8 +688,10 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: bitcoin::ScriptBuf::new() })); + let value = Amount::from_sat(1_000_000); + future.resolve(Ok(TxOut { value, script_pubkey: bitcoin::ScriptBuf::new() })); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -700,7 +701,8 @@ mod tests { // Test an async lookup which returns an error let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -708,7 +710,9 @@ mod tests { "Channel being checked async"); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); - future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve(Err(UtxoLookupError::UnknownTx)); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); } @@ -720,7 +724,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -740,8 +745,11 @@ mod tests { assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, "Awaiting channel_announcement validation to accept channel_update"); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(!notifier.notify_pending()); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); @@ -762,7 +770,8 @@ mod tests { let (valid_announcement, chain_source, network_graph, good_script, _, _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -777,8 +786,10 @@ mod tests { assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, "Awaiting channel_announcement validation to accept channel_update"); - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(!notifier.notify_pending()); + future.resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); let graph_lock = network_graph.read_only(); @@ -797,7 +808,8 @@ mod tests { // only if the channel_announcement message is identical. let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); - let future = UtxoFuture::new(); + let notifier_a = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier_a)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); assert_eq!( @@ -806,7 +818,8 @@ mod tests { assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); // If we make a second request with the same message, the call count doesn't increase... - let future_b = UtxoFuture::new(); + let notifier_b = Arc::new(Notifier::new()); + let future_b = UtxoFuture::new(Arc::clone(¬ifier_b)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); assert_eq!( network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, @@ -825,8 +838,11 @@ mod tests { assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); // Still, if we resolve the original future, the original channel will be accepted. - future.resolve_without_forwarding(&network_graph, - Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + future + .resolve(Ok(TxOut { value: Amount::from_sat(1_000_000), script_pubkey: good_script })); + assert!(notifier_a.notify_pending()); + assert!(!notifier_b.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(!network_graph.read_only().channels() .get(&valid_announcement.contents.short_channel_id).unwrap() .announcement_message.as_ref().unwrap() @@ -842,7 +858,8 @@ mod tests { let (chain_source, network_graph) = get_network(); // We cheat and use a single future for all the lookups to complete them all at once. - let future = UtxoFuture::new(); + let notifier = Arc::new(Notifier::new()); + let future = UtxoFuture::new(Arc::clone(¬ifier)); *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); @@ -861,7 +878,9 @@ mod tests { assert!(network_graph.pending_checks.too_many_checks_pending()); // Once the future completes the "too many checks" flag should reset. - future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve(Err(UtxoLookupError::UnknownTx)); + assert!(notifier.notify_pending()); + network_graph.pending_checks.check_resolved_futures(&network_graph); assert!(!network_graph.pending_checks.too_many_checks_pending()); } @@ -874,7 +893,8 @@ mod tests { let (chain_source, network_graph) = get_network(); // We cheat and use a single future for all the lookups to complete them all at once. - *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + let notifier = Arc::new(Notifier::new()); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new(notifier)); let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 50514e0a894..34f5d5fe36e 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -61,6 +61,7 @@ use crate::util::mut_global::MutGlobal; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; +use crate::util::wakers::Notifier; use bitcoin::amount::Amount; use bitcoin::block::Block; @@ -2101,7 +2102,7 @@ impl TestChainSource { } impl UtxoLookup for TestChainSource { - fn get_utxo(&self, chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult { + fn get_utxo(&self, chain_hash: &ChainHash, _scid: u64, _notifier: Arc) -> UtxoResult { self.get_utxo_call_count.fetch_add(1, Ordering::Relaxed); if self.chain_hash != *chain_hash { return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index a84d90960d8..17edadfd822 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -253,37 +253,13 @@ impl Sleeper { pub fn from_single_future(future: &Future) -> Self { Self { notifiers: vec![Arc::clone(&future.state)] } } - /// Constructs a new sleeper from two futures, allowing blocking on both at once. - pub fn from_two_futures(fut_a: &Future, fut_b: &Future) -> Self { - Self { notifiers: vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state)] } - } - /// Constructs a new sleeper from three futures, allowing blocking on all three at once. - /// - // Note that this is the common case - a ChannelManager, a ChainMonitor, and an - // OnionMessenger. - pub fn from_three_futures(fut_a: &Future, fut_b: &Future, fut_c: &Future) -> Self { - let notifiers = - vec![Arc::clone(&fut_a.state), Arc::clone(&fut_b.state), Arc::clone(&fut_c.state)]; - Self { notifiers } - } - /// Constructs a new sleeper from four futures, allowing blocking on all four at once. - /// - // Note that this is another common case - a ChannelManager, a ChainMonitor, an - // OnionMessenger, and a LiquidityManager. - pub fn from_four_futures( - fut_a: &Future, fut_b: &Future, fut_c: &Future, fut_d: &Future, - ) -> Self { - let notifiers = vec![ - Arc::clone(&fut_a.state), - Arc::clone(&fut_b.state), - Arc::clone(&fut_c.state), - Arc::clone(&fut_d.state), - ]; - Self { notifiers } + /// Constructs an iterator of futures, allowing blocking on all at once. + pub fn from_futures>(futures: I) -> Self { + Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() } } /// Constructs a new sleeper on many futures, allowing blocking on all at once. pub fn new(futures: Vec) -> Self { - Self { notifiers: futures.into_iter().map(|f| Arc::clone(&f.state)).collect() } + Self::from_futures(futures) } /// Prepares to go into a wait loop body, creating a condition variable which we can block on /// and an `Arc>>` which gets set to the waking `Future`'s state prior to the @@ -506,15 +482,13 @@ mod tests { // Wait on the other thread to finish its sleep, note that the leak only happened if we // actually have to sleep here, not if we immediately return. - Sleeper::from_two_futures(&future_a, &future_b).wait(); + Sleeper::from_futures([future_a, future_b]).wait(); join_handle.join().unwrap(); // then drop the notifiers and make sure the future states are gone. mem::drop(notifier_a); mem::drop(notifier_b); - mem::drop(future_a); - mem::drop(future_b); assert!(future_state_a.upgrade().is_none() && future_state_b.upgrade().is_none()); } @@ -736,18 +710,18 @@ mod tests { // Set both notifiers as woken without sleeping yet. notifier_a.notify(); notifier_b.notify(); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // One future has woken us up, but the other should still have a pending notification. - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // However once we've slept twice, we should no longer have any pending notifications - assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()) + assert!(!Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]) .wait_timeout(Duration::from_millis(10))); // Test ordering somewhat more. notifier_a.notify(); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); } #[test] @@ -765,7 +739,7 @@ mod tests { // After sleeping one future (not guaranteed which one, however) will have its notification // bit cleared. - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); // By registering a callback on the futures for both notifiers, one will complete // immediately, but one will remain tied to the notifier, and will complete once the @@ -788,8 +762,8 @@ mod tests { notifier_b.notify(); assert!(callback_a.load(Ordering::SeqCst) && callback_b.load(Ordering::SeqCst)); - Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()).wait(); - assert!(!Sleeper::from_two_futures(¬ifier_a.get_future(), ¬ifier_b.get_future()) + Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]).wait(); + assert!(!Sleeper::from_futures([notifier_a.get_future(), notifier_b.get_future()]) .wait_timeout(Duration::from_millis(10))); }