Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions fuzz/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use lightning::types::features::{BlindedHopFeatures, Bolt12InvoiceFeatures};
use lightning::util::config::UserConfig;
use lightning::util::hash_tables::*;
use lightning::util::ser::LengthReadable;
use lightning::util::wakers::Notifier;

use bitcoin::hashes::Hash;
use bitcoin::network::Network;
Expand Down Expand Up @@ -88,12 +89,11 @@ impl InputData {
}
}

struct FuzzChainSource<'a, 'b, Out: test_logger::Output> {
struct FuzzChainSource {
input: Arc<InputData>,
net_graph: &'a NetworkGraph<&'b test_logger::TestLogger<Out>>,
}
impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
fn get_utxo(&self, _chain_hash: &ChainHash, _short_channel_id: u64) -> UtxoResult {
impl UtxoLookup for FuzzChainSource {
fn get_utxo(&self, _chain_hash: &ChainHash, _scid: u64, notifier: Arc<Notifier>) -> UtxoResult {
let input_slice = self.input.get_slice(2);
if input_slice.is_none() {
return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
Expand All @@ -107,17 +107,17 @@ impl<Out: test_logger::Output> UtxoLookup for FuzzChainSource<'_, '_, Out> {
&[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)),
&[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)),
&[2, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Ok(txo_res));
let future = UtxoFuture::new(notifier);
future.resolve(Ok(txo_res));
UtxoResult::Async(future.clone())
},
&[3, _] => {
let future = UtxoFuture::new();
future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx));
let future = UtxoFuture::new(notifier);
future.resolve(Err(UtxoLookupError::UnknownTx));
UtxoResult::Async(future.clone())
},
&[4, _] => {
UtxoResult::Async(UtxoFuture::new()) // the future will never resolve
UtxoResult::Async(UtxoFuture::new(notifier)) // the future will never resolve
},
&[..] => UtxoResult::Sync(Ok(txo_res)),
}
Expand Down Expand Up @@ -197,7 +197,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {

let our_pubkey = get_pubkey!();
let net_graph = NetworkGraph::new(Network::Bitcoin, &logger);
let chain_source = FuzzChainSource { input: Arc::clone(&input), net_graph: &net_graph };
let chain_source = FuzzChainSource { input: Arc::clone(&input) };

let mut node_pks = new_hash_map();
let mut scid = 42;
Expand Down Expand Up @@ -335,9 +335,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1), ());
node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2), ());
let _ = net_graph
.update_channel_from_unsigned_announcement::<&FuzzChainSource<'_, '_, Out>>(
&msg, &None,
);
.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None);
},
2 => {
let msg =
Expand Down
87 changes: 51 additions & 36 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use lightning::util::persist::{
SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::sweep::{OutputSweeper, OutputSweeperSync};
use lightning::util::wakers::Future;
#[cfg(feature = "std")]
use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;
Expand Down Expand Up @@ -235,6 +236,14 @@ where
GossipSync::None => None,
}
}

fn validation_completion_future(&self) -> Option<Future> {
match self {
GossipSync::P2P(gossip_sync) => Some(gossip_sync.validation_completion_future()),
GossipSync::Rapid(_) => None,
GossipSync::None => None,
}
}
}

/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
Expand Down Expand Up @@ -520,12 +529,14 @@ pub(crate) mod futures_util {
C: Future<Output = ()> + Unpin,
D: Future<Output = ()> + Unpin,
E: Future<Output = ()> + Unpin,
F: Future<Output = ()> + Unpin,
> {
pub a: A,
pub b: B,
pub c: C,
pub d: D,
pub e: E,
pub f: F,
}

pub(crate) enum SelectorOutput {
Expand All @@ -534,6 +545,7 @@ pub(crate) mod futures_util {
C,
D,
E,
F,
}

impl<
Expand All @@ -542,7 +554,8 @@ pub(crate) mod futures_util {
C: Future<Output = ()> + Unpin,
D: Future<Output = ()> + Unpin,
E: Future<Output = ()> + Unpin,
> Future for Selector<A, B, C, D, E>
F: Future<Output = ()> + Unpin,
> Future for Selector<A, B, C, D, E, F>
{
type Output = SelectorOutput;
fn poll(
Expand Down Expand Up @@ -580,6 +593,12 @@ pub(crate) mod futures_util {
},
Poll::Pending => {},
}
match Pin::new(&mut self.f).poll(ctx) {
Poll::Ready(()) => {
return Poll::Ready(SelectorOutput::F);
},
Poll::Pending => {},
}
Poll::Pending
}
}
Expand All @@ -606,6 +625,12 @@ pub(crate) mod futures_util {
}
}

impl<F: Future<Output = ()> + Unpin> From<Option<F>> for OptionalSelector<F> {
fn from(optional_future: Option<F>) -> Self {
Self { optional_future }
}
}

// If we want to poll a future without an async context to figure out if it has completed or
// not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
// but sadly there's a good bit of boilerplate here.
Expand Down Expand Up @@ -1058,18 +1083,13 @@ where
if mobile_interruptable_platform {
await_start = Some(sleeper(Duration::from_secs(1)));
}
let om_fut = if let Some(om) = onion_messenger.as_ref() {
let fut = om.get_om().get_update_future();
OptionalSelector { optional_future: Some(fut) }
} else {
OptionalSelector { optional_future: None }
};
let lm_fut = if let Some(lm) = liquidity_manager.as_ref() {
let fut = lm.get_lm().get_pending_msgs_or_needs_persist_future();
OptionalSelector { optional_future: Some(fut) }
} else {
OptionalSelector { optional_future: None }
};
let om_fut: OptionalSelector<_> =
onion_messenger.as_ref().map(|om| om.get_om().get_update_future()).into();
let lm_fut: OptionalSelector<_> = liquidity_manager
.as_ref()
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future())
.into();
let gv_fut: OptionalSelector<_> = gossip_sync.validation_completion_future().into();
let needs_processing = channel_manager.get_cm().needs_pending_htlc_processing();
let sleep_delay = match (needs_processing, mobile_interruptable_platform) {
(true, true) => batch_delay.get().min(Duration::from_millis(100)),
Expand All @@ -1083,9 +1103,14 @@ where
c: chain_monitor.get_update_future(),
d: om_fut,
e: lm_fut,
f: gv_fut,
};
match fut.await {
SelectorOutput::B | SelectorOutput::C | SelectorOutput::D | SelectorOutput::E => {},
SelectorOutput::B
| SelectorOutput::C
| SelectorOutput::D
| SelectorOutput::E
| SelectorOutput::F => {},
SelectorOutput::A(exit) => {
if exit {
break;
Expand Down Expand Up @@ -1635,28 +1660,18 @@ impl BackgroundProcessor {
log_trace!(logger, "Terminating background processor.");
break;
}
let sleeper = match (onion_messenger.as_ref(), liquidity_manager.as_ref()) {
(Some(om), Some(lm)) => Sleeper::from_four_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
),
(Some(om), None) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&om.get_om().get_update_future(),
),
(None, Some(lm)) => Sleeper::from_three_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
&lm.get_lm().get_pending_msgs_or_needs_persist_future(),
),
(None, None) => Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
&chain_monitor.get_update_future(),
),
};
let om_fut = onion_messenger.as_ref().map(|om| om.get_om().get_update_future());
let lm_fut = liquidity_manager
.as_ref()
.map(|lm| lm.get_lm().get_pending_msgs_or_needs_persist_future());
let gv_fut = gossip_sync.validation_completion_future();
let always_futures = [
channel_manager.get_cm().get_event_or_persistence_needed_future(),
chain_monitor.get_update_future(),
];
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut);
let sleeper = Sleeper::from_futures(futures);

let batch_delay = if channel_manager.get_cm().needs_pending_htlc_processing() {
batch_delay.get()
} else {
Expand Down
53 changes: 13 additions & 40 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@ use bitcoin::constants::ChainHash;
use bitcoin::hash_types::BlockHash;
use bitcoin::transaction::{OutPoint, TxOut};

use lightning::ln::peer_handler::APeerManager;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};
use lightning::util::logger::Logger;
use lightning::util::native_async::FutureSpawner;
use lightning::util::wakers::Notifier;

use std::collections::VecDeque;
use std::future::Future;
Expand Down Expand Up @@ -127,46 +125,28 @@ impl<
/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors
/// available on both Bitcoin Core and your LDK application for each request to hold its own
/// connection.
pub struct GossipVerifier<
S: FutureSpawner,
Blocks: Deref + Send + Sync + 'static + Clone,
L: Deref + Send + Sync + 'static,
> where
pub struct GossipVerifier<S: FutureSpawner, Blocks: Deref + Send + Sync + 'static + Clone>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
source: Blocks,
peer_manager_wake: Arc<dyn Fn() + Send + Sync>,
gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
spawn: S,
block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
}

const BLOCK_CACHE_SIZE: usize = 5;

impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync>
GossipVerifier<S, Blocks, L>
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> GossipVerifier<S, Blocks>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
/// Constructs a new [`GossipVerifier`].
/// Constructs a new [`GossipVerifier`] for use in a [`P2PGossipSync`].
///
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
pub fn new<APM: Deref + Send + Sync + Clone + 'static>(
source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Arc<Self>, L>>,
peer_manager: APM,
) -> Self
where
APM::Target: APeerManager,
{
let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events());
/// [`P2PGossipSync`]: lightning::routing::gossip::P2PGossipSync
pub fn new(source: Blocks, spawn: S) -> Self {
Self {
source,
spawn,
gossiper,
peer_manager_wake,
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
}
}
Expand Down Expand Up @@ -255,35 +235,28 @@ where
}
}

impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> Deref
for GossipVerifier<S, Blocks, L>
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> Deref for GossipVerifier<S, Blocks>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
type Target = Self;
fn deref(&self) -> &Self {
self
}
}

impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> UtxoLookup
for GossipVerifier<S, Blocks, L>
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone> UtxoLookup for GossipVerifier<S, Blocks>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
fn get_utxo(&self, _chain_hash: &ChainHash, short_channel_id: u64) -> UtxoResult {
let res = UtxoFuture::new();
fn get_utxo(&self, _chain_hash: &ChainHash, scid: u64, notifier: Arc<Notifier>) -> UtxoResult {
let res = UtxoFuture::new(notifier);
let fut = res.clone();
let source = self.source.clone();
let gossiper = Arc::clone(&self.gossiper);
let block_cache = Arc::clone(&self.block_cache);
let pmw = Arc::clone(&self.peer_manager_wake);
self.spawn.spawn(async move {
let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
fut.resolve(gossiper.network_graph(), &*gossiper, res);
(pmw)();
let res = Self::retrieve_utxo(source, block_cache, scid).await;
fut.resolve(res);
});
UtxoResult::Async(res)
}
Expand Down
Loading
Loading