Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ typedef dictionary TorConfig;

typedef interface NodeEntropy;

typedef interface ProbingConfig;

typedef interface ProbingConfigBuilder;

typedef enum WordCount;

[Remote]
Expand Down Expand Up @@ -61,6 +65,7 @@ interface Builder {
[Throws=BuildError]
void set_async_payments_role(AsyncPaymentsRole? role);
void set_wallet_recovery_mode();
void set_probing_config(ProbingConfig config);
[Throws=BuildError]
Node build(NodeEntropy node_entropy);
[Throws=BuildError]
Expand Down
89 changes: 85 additions & 4 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::convert::TryInto;
use std::default::Default;
use std::net::ToSocketAddrs;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex, Once, RwLock};
use std::time::SystemTime;
use std::{fmt, fs};
Expand Down Expand Up @@ -51,6 +52,7 @@ use crate::config::{
default_user_config, may_announce_channel, AnnounceError, AsyncPaymentsRole,
BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, HRNResolverConfig,
TorConfig, DEFAULT_ESPLORA_SERVER_URL, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL,
DEFAULT_MAX_PROBE_AMOUNT_MSAT, DEFAULT_MIN_PROBE_AMOUNT_MSAT,
};
use crate::connection::ConnectionManager;
use crate::entropy::NodeEntropy;
Expand All @@ -77,6 +79,9 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::probing::{
HighDegreeStrategy, Prober, ProbingConfig, ProbingStrategy, ProbingStrategyKind, RandomStrategy,
};
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -293,6 +298,7 @@ pub struct NodeBuilder {
runtime_handle: Option<tokio::runtime::Handle>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
recovery_mode: bool,
probing_config: Option<ProbingConfig>,
}

impl NodeBuilder {
Expand All @@ -311,16 +317,19 @@ impl NodeBuilder {
let runtime_handle = None;
let pathfinding_scores_sync_config = None;
let recovery_mode = false;
let async_payments_role = None;
let probing_config = None;
Self {
config,
chain_data_source_config,
gossip_source_config,
liquidity_source_config,
log_writer_config,
runtime_handle,
async_payments_role: None,
async_payments_role,
pathfinding_scores_sync_config,
recovery_mode,
probing_config,
}
}

Expand Down Expand Up @@ -626,6 +635,31 @@ impl NodeBuilder {
self
}

/// Configures background probing.
///
/// Use [`ProbingConfigBuilder`] to build the configuration:
Comment thread
randomlogin marked this conversation as resolved.
/// ```no_run
/// # #[cfg(not(feature = "uniffi"))]
/// # {
/// use std::time::Duration;
/// use ldk_node::Builder;
/// use ldk_node::probing::ProbingConfigBuilder;
///
/// let mut builder = Builder::new();
/// builder.set_probing_config(
/// ProbingConfigBuilder::high_degree(100)
/// .interval(Duration::from_secs(30))
/// .build()
/// );
/// # }
/// ```
///
/// [`ProbingConfigBuilder`]: crate::probing::ProbingConfigBuilder
pub fn set_probing_config(&mut self, config: ProbingConfig) -> &mut Self {
self.probing_config = Some(config);
self
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: NodeEntropy) -> Result<Node, BuildError> {
Expand Down Expand Up @@ -797,6 +831,7 @@ impl NodeBuilder {
self.gossip_source_config.as_ref(),
self.liquidity_source_config.as_ref(),
self.pathfinding_scores_sync_config.as_ref(),
self.probing_config.as_ref(),
self.async_payments_role,
self.recovery_mode,
seed_bytes,
Expand Down Expand Up @@ -1097,6 +1132,13 @@ impl ArcedNodeBuilder {
self.inner.write().expect("lock").set_wallet_recovery_mode();
}

/// Configures background probing.
///
/// See [`ProbingConfig`] for details.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure the docs on the ArcedNodeBuilder mirror the ones on the regular NodeBuilder.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it does so already. The builder methods do have (short) docs on them for both normal and arced builders.

pub fn set_probing_config(&self, config: Arc<ProbingConfig>) {
self.inner.write().expect("lock").set_probing_config((*config).clone());
}

/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
/// previously configured.
pub fn build(&self, node_entropy: Arc<NodeEntropy>) -> Result<Arc<Node>, BuildError> {
Expand Down Expand Up @@ -1240,8 +1282,9 @@ fn build_with_store_internal(
gossip_source_config: Option<&GossipSourceConfig>,
liquidity_source_config: Option<&LiquiditySourceConfig>,
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
async_payments_role: Option<AsyncPaymentsRole>, recovery_mode: bool, seed_bytes: [u8; 64],
runtime: Arc<Runtime>, logger: Arc<Logger>, kv_store: Arc<DynStore>,
probing_config: Option<&ProbingConfig>, async_payments_role: Option<AsyncPaymentsRole>,
recovery_mode: bool, seed_bytes: [u8; 64], runtime: Arc<Runtime>, logger: Arc<Logger>,
kv_store: Arc<DynStore>,
) -> Result<Node, BuildError> {
optionally_install_rustls_cryptoprovider();

Expand Down Expand Up @@ -1639,7 +1682,10 @@ fn build_with_store_internal(
},
}

let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let mut scoring_fee_params = ProbabilisticScoringFeeParameters::default();
Comment thread
randomlogin marked this conversation as resolved.
if let Some(penalty) = probing_config.and_then(|c| c.diversity_penalty_msat) {
scoring_fee_params.probing_diversity_penalty_msat = penalty;
}
let router = Arc::new(DefaultRouter::new(
Arc::clone(&network_graph),
Arc::clone(&logger),
Expand Down Expand Up @@ -2019,6 +2065,40 @@ fn build_with_store_internal(
_leak_checker.0.push(Arc::downgrade(&wallet) as Weak<dyn Any + Send + Sync>);
}

let prober = probing_config.map(|probing_cfg| {
let strategy: Arc<dyn ProbingStrategy> = match &probing_cfg.kind {
ProbingStrategyKind::HighDegree { top_node_count } => {
Arc::new(HighDegreeStrategy::new(
Arc::clone(&network_graph),
Arc::clone(&channel_manager),
Arc::clone(&router),
*top_node_count,
DEFAULT_MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
probing_cfg.cooldown,
config.probing_liquidity_limit_multiplier,
))
},
ProbingStrategyKind::Random { max_hops } => Arc::new(RandomStrategy::new(
Arc::clone(&network_graph),
Arc::clone(&channel_manager),
*max_hops,
DEFAULT_MIN_PROBE_AMOUNT_MSAT,
DEFAULT_MAX_PROBE_AMOUNT_MSAT,
)),
ProbingStrategyKind::Custom(s) => Arc::clone(s),
};
Arc::new(Prober {
channel_manager: Arc::clone(&channel_manager),
logger: Arc::clone(&logger),
strategy,
interval: probing_cfg.interval,
max_locked_msat: probing_cfg.max_locked_msat,
locked_msat: Arc::new(AtomicU64::new(0)),
inflight_probes: Mutex::new(HashMap::new()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, it seems accounting will still be off if we send out probes and then restart, as we'll re-init with an empty map that forgot about the previously-sent probes. Do we think that's acceptable or do we need to somehow persist this map? Seems like persistence would add some considerable complication on top? Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially I wanted probing to be without persistence for the sake of simplicity, but now it seems it is really important, because if the node really goes out of the budget then likely this will occur after a restart and eventually it will drain the whole balance.

The corner case is the situation when a probing node:

  1. has a lot of unresolved probes (or small probing budget)
  2. restarts frequently

In that case the budget would be falsely shown as near-zero, whereas in fact there some probes sent and unresolved.

If we restart node more rarely than the time needed for locked inflight probes to release, we're more or less are fine. So if a node restarts more rarely than once in two hours it should be okay (it would not drain the balance completely, but there could be time periods where the probing budget is in reality around twice as bigger as it should be).

There is an option to add to the public API this: https://github.com/lightningdevkit/rust-lightning/blob/03ab73da95af34f3ae3d28d7e83383cfa0b04fc1/lightning/src/ln/channelmanager.rs#L6143
Then we could iterate through recent payments on restart and populate the inflight_probes with max_locked_msat without persisting anything new.

Also some calculations when when the budget limit hits: assume probing is done every 500ms, or equivalently it has frequency of $\nu = 2 s^{-1}$ probes per second.
Let's assume the probability of a probe to get locked for $T=2$ hours is $p$.
Let's assume node has $PB$ probing budget, for ordinary user-node it is $PB=10A$ (max 10 probes can be fired simultaneously), and for example for a routing node it can be something like $100A-200A$.

That would mean during time $T$ we got locked $A\cdot p \cdot \nu \cdot T$, if it is greater than $PB=10A$ then we have hit the ceiling of the budget. So $A\cdot p \cdot \nu \cdot T \geq 10A$
With figures from the above if the rate of failure of probes is bigger than 0.0007 (0.07%), then we would hit the budget bound. If the node restarts. And actually it's not that big (1 out of 1440 probes is locked for 2 hours).

All in all, I think it's worth adding some way of persistence, because for 'ordinary' nodes the cap seems to be easily reached if the settings (frequency, amount) is not tuned properly.

})
});

Ok(Node {
runtime,
stop_sender,
Expand Down Expand Up @@ -2052,6 +2132,7 @@ fn build_with_store_internal(
om_mailbox,
async_payments_role,
hrn_resolver,
prober,
#[cfg(cycle_tests)]
_leak_checker,
})
Expand Down
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ const DEFAULT_BDK_WALLET_SYNC_INTERVAL_SECS: u64 = 80;
const DEFAULT_LDK_WALLET_SYNC_INTERVAL_SECS: u64 = 30;
const DEFAULT_FEE_RATE_CACHE_UPDATE_INTERVAL_SECS: u64 = 60 * 10;
const DEFAULT_PROBING_LIQUIDITY_LIMIT_MULTIPLIER: u64 = 3;
pub(crate) const DEFAULT_PROBING_INTERVAL_SECS: u64 = 10;
pub(crate) const MIN_PROBING_INTERVAL: Duration = Duration::from_millis(100);
pub(crate) const DEFAULT_PROBED_NODE_COOLDOWN_SECS: u64 = 60 * 60; // 1 hour
pub(crate) const DEFAULT_MAX_PROBE_LOCKED_MSAT: u64 = 100_000_000; // 100k sats
pub(crate) const DEFAULT_MIN_PROBE_AMOUNT_MSAT: u64 = 1_000_000; // 1k sats
pub(crate) const DEFAULT_MAX_PROBE_AMOUNT_MSAT: u64 = 10_000_000; // 10k sats
const DEFAULT_ANCHOR_PER_CHANNEL_RESERVE_SATS: u64 = 25_000;

// The default timeout after which we abort a wallet syncing operation.
Expand Down
37 changes: 28 additions & 9 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ use crate::payment::asynchronous::static_invoice_store::StaticInvoiceStore;
use crate::payment::store::{
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentKind, PaymentStatus,
};
use crate::probing::Prober;
use crate::runtime::Runtime;
use crate::types::{
CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet,
Expand Down Expand Up @@ -509,12 +510,13 @@ where
payment_store: Arc<PaymentStore>,
peer_store: Arc<PeerStore<L>>,
keys_manager: Arc<KeysManager>,
runtime: Arc<Runtime>,
logger: L,
config: Arc<Config>,
static_invoice_store: Option<StaticInvoiceStore>,
onion_messenger: Arc<OnionMessenger>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
prober: Option<Arc<Prober>>,
runtime: Arc<Runtime>,
logger: L,
config: Arc<Config>,
}

impl<L: Deref + Clone + Sync + Send + 'static> EventHandler<L>
Expand All @@ -530,7 +532,7 @@ where
payment_store: Arc<PaymentStore>, peer_store: Arc<PeerStore<L>>,
keys_manager: Arc<KeysManager>, static_invoice_store: Option<StaticInvoiceStore>,
onion_messenger: Arc<OnionMessenger>, om_mailbox: Option<Arc<OnionMessageMailbox>>,
runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
prober: Option<Arc<Prober>>, runtime: Arc<Runtime>, logger: L, config: Arc<Config>,
) -> Self {
Self {
event_queue,
Expand All @@ -544,12 +546,13 @@ where
payment_store,
peer_store,
keys_manager,
logger,
runtime,
config,
static_invoice_store,
onion_messenger,
om_mailbox,
prober,
runtime,
logger,
config,
}
}

Expand Down Expand Up @@ -1158,8 +1161,24 @@ where

LdkEvent::PaymentPathSuccessful { .. } => {},
LdkEvent::PaymentPathFailed { .. } => {},
LdkEvent::ProbeSuccessful { .. } => {},
LdkEvent::ProbeFailed { .. } => {},
LdkEvent::ProbeSuccessful { path, payment_id, .. } => {
if let Some(prober) = &self.prober {
if let Some(amount) =
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just move this check into handle_background_probe_successful/handle_background_probe_failed? Then you could keep inflight_probes visibility at the module level, too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

prober.inflight_probes.lock().expect("lock").remove(&payment_id)
{
prober.handle_background_probe_successful(&path, amount);
}
}
},
LdkEvent::ProbeFailed { path, payment_id, .. } => {
if let Some(prober) = &self.prober {
if let Some(amount) =
prober.inflight_probes.lock().expect("lock").remove(&payment_id)
{
prober.handle_background_probe_failed(&path, amount);
}
}
},
LdkEvent::HTLCHandlingFailed { failure_type, .. } => {
if let Some(liquidity_source) = self.liquidity_source.as_ref() {
liquidity_source.handle_htlc_handling_failed(failure_type).await;
Expand Down
1 change: 1 addition & 0 deletions src/ffi/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ pub use crate::entropy::{generate_entropy_mnemonic, NodeEntropy, WordCount};
use crate::error::Error;
pub use crate::liquidity::LSPS1OrderStatus;
pub use crate::logger::{LogLevel, LogRecord, LogWriter};
pub use crate::probing::ProbingConfig;
use crate::{hex_utils, SocketAddress, UserChannelId};

uniffi::custom_type!(PublicKey, String, {
Expand Down
17 changes: 17 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,12 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
pub mod probing;
mod runtime;
mod scoring;
mod tx_broadcaster;
mod types;
mod util;
mod wallet;

use std::default::Default;
Expand Down Expand Up @@ -170,6 +172,7 @@ use payment::{
UnifiedPayment,
};
use peer_store::{PeerInfo, PeerStore};
use probing::{run_prober, Prober};
use runtime::Runtime;
pub use tokio;
use types::{
Expand Down Expand Up @@ -239,6 +242,7 @@ pub struct Node {
om_mailbox: Option<Arc<OnionMessageMailbox>>,
async_payments_role: Option<AsyncPaymentsRole>,
hrn_resolver: HRNResolver,
prober: Option<Arc<Prober>>,
#[cfg(cycle_tests)]
_leak_checker: LeakChecker,
}
Expand Down Expand Up @@ -596,11 +600,19 @@ impl Node {
static_invoice_store,
Arc::clone(&self.onion_messenger),
self.om_mailbox.clone(),
self.prober.clone(),
Arc::clone(&self.runtime),
Arc::clone(&self.logger),
Arc::clone(&self.config),
));

if let Some(prober) = self.prober.clone() {
let stop_rx = self.stop_sender.subscribe();
self.runtime.spawn_cancellable_background_task(async move {
run_prober(prober, stop_rx).await;
});
}

// Setup background processing
let background_persister = Arc::clone(&self.kv_store);
let background_event_handler = Arc::clone(&event_handler);
Expand Down Expand Up @@ -1079,6 +1091,11 @@ impl Node {
))
}

/// Returns a reference to the [`Prober`], or `None` if no probing strategy is configured.
pub fn prober(&self) -> Option<&Prober> {
self.prober.as_deref()
}

/// Retrieve a list of known channels.
pub fn list_channels(&self) -> Vec<ChannelDetails> {
self.channel_manager.list_channels().into_iter().map(|c| c.into()).collect()
Expand Down
Loading
Loading