diff --git a/lightning-liquidity/src/lsps4/event.rs b/lightning-liquidity/src/lsps4/event.rs index a10dad682a0..b6f40feeb2e 100644 --- a/lightning-liquidity/src/lsps4/event.rs +++ b/lightning-liquidity/src/lsps4/event.rs @@ -12,10 +12,9 @@ use crate::lsps0::ser::LSPSRequestId; use std::string::String; use std::vec::Vec; +use lightning::ln::types::ChannelId; use lightning_types::payment::PaymentHash; - - use bitcoin::secp256k1::PublicKey; /// An event which an LSPS4 client should take some action in response to. @@ -63,4 +62,22 @@ pub enum LSPS4ServiceEvent { /// The number of channels currently open with the peer when the request is made. channel_count: usize, }, + /// You should splice into an existing channel to add capacity. + /// + /// If the splice fails, fall back to opening a new channel using the same sizing policy + /// as [`OpenChannel`]. + /// + /// [`OpenChannel`]: Self::OpenChannel + SpliceChannel { + /// The node whose channel to splice. + their_network_key: PublicKey, + /// The channel to splice into (the largest usable channel with this peer). + channel_id: ChannelId, + /// The user channel id, for diagnostics. + user_channel_id: u128, + /// Additional capacity needed (msat). The splice-in amount should be at least this. + amt_to_forward_msat: u64, + /// The number of channels currently open with the peer. + channel_count: usize, + }, } diff --git a/lightning-liquidity/src/lsps4/service.rs b/lightning-liquidity/src/lsps4/service.rs index 52b23f31170..8ab023d07c7 100644 --- a/lightning-liquidity/src/lsps4/service.rs +++ b/lightning-liquidity/src/lsps4/service.rs @@ -51,6 +51,11 @@ use std::vec::Vec; const HTLC_EXPIRY_THRESHOLD_SECS: u64 = 45; +/// Cooldown duration for liquidity actions. After a splice or channel open is emitted, +/// suppress further liquidity actions for this peer until the cooldown expires. +/// Set below HTLC_EXPIRY_THRESHOLD_SECS so at most 1 wasted retry occurs before expiry. +const LIQUIDITY_COOLDOWN_SECS: u64 = 30; + /// Action to forward a specific HTLC through a channel #[derive(Debug)] pub(crate) struct HtlcForwardAction { @@ -60,17 +65,32 @@ pub(crate) struct HtlcForwardAction { pub skimmed_fee_msat: u64, } +/// Action to splice into an existing channel to add capacity. +#[derive(Debug)] +pub(crate) struct SpliceAction { + pub channel_id: ChannelId, + pub user_channel_id: u128, + pub amt_to_forward_msat: u64, +} + /// Actions to take for processing HTLCs for a peer #[derive(Debug)] pub(crate) struct HtlcProcessingActions { pub forwards: Vec, pub new_channel_needed_msat: Option, + pub splice_needed: Option, pub channel_count: usize, } impl HtlcProcessingActions { pub fn is_empty(&self) -> bool { - self.forwards.is_empty() && self.new_channel_needed_msat.is_none() + self.forwards.is_empty() + && self.new_channel_needed_msat.is_none() + && self.splice_needed.is_none() + } + + pub fn needs_liquidity_action(&self) -> bool { + self.new_channel_needed_msat.is_some() || self.splice_needed.is_some() } pub fn total_forward_amount(&self) -> u64 { @@ -102,6 +122,10 @@ where htlc_store: HTLCStore, connected_peers: RwLock>, config: LSPS4ServiceConfig, + /// Per-peer timestamped cooldown for liquidity actions (splice or channel open). + /// Prevents 1Hz retry loops from process_pending_htlcs when actions fail. + /// Auto-expires after LIQUIDITY_COOLDOWN_SECS. + liquidity_cooldown: RwLock>, } impl LSPS4ServiceHandler @@ -128,6 +152,7 @@ where config, logger, connected_peers: RwLock::new(HashSet::new()), + liquidity_cooldown: RwLock::new(new_hash_map()), }) } @@ -236,7 +261,7 @@ where vec![htlc.clone()], ); - if actions.new_channel_needed_msat.is_some() { + if actions.needs_liquidity_action() { self.htlc_store.insert(htlc).unwrap(); } @@ -275,6 +300,10 @@ where pub fn channel_ready( &self, counterparty_node_id: &PublicKey, ) -> Result<(), APIError> { + // Clear liquidity cooldown on channel_ready. This fires for both new channels + // and splice completions (splice_locked), allowing immediate retry if needed. + self.liquidity_cooldown.write().unwrap().remove(counterparty_node_id); + let is_connected = self.is_peer_connected(counterparty_node_id); log_info!( @@ -476,6 +505,35 @@ where .any(|c| c.is_usable) } + fn is_liquidity_cooling_down(&self, peer: &PublicKey) -> bool { + self.liquidity_cooldown + .read() + .unwrap() + .get(peer) + .map(|t| t.elapsed().as_secs() < LIQUIDITY_COOLDOWN_SECS) + .unwrap_or(false) + } + + /// Called when a liquidity action (splice or channel open) completes successfully. + /// Clears the cooldown so new actions can be emitted immediately. + pub fn liquidity_action_completed(&self, peer: &PublicKey) { + self.liquidity_cooldown.write().unwrap().remove(peer); + } + + /// Called when a liquidity action fails asynchronously (e.g., ChannelClosed before ready). + /// Clears the cooldown so the timer or next HTLC can retry immediately. + pub fn liquidity_action_failed(&self, peer: &PublicKey) { + self.liquidity_cooldown.write().unwrap().remove(peer); + } + + /// Called when a splice fails asynchronously (SpliceFailed event). + /// Resets the cooldown timestamp instead of clearing it entirely. + /// This allows retry after LIQUIDITY_COOLDOWN_SECS, not immediately, + /// preventing 1Hz loops on persistent async splice failures. + pub fn liquidity_action_reset_cooldown(&self, peer: &PublicKey) { + self.liquidity_cooldown.write().unwrap().insert(*peer, Instant::now()); + } + /// Will update the set of connected peers pub fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { let (was_present, remaining_count) = { @@ -518,30 +576,21 @@ where self.connected_peers.read().unwrap().iter().copied().collect(); for node_id in connected_peers { + // Skip peers with a liquidity action cooling down. + // Prevents 1Hz retry loops. execute_htlc_actions also checks this + // as a safety net for the htlc_intercepted path. + if self.is_liquidity_cooling_down(&node_id) { + continue; + } + let htlcs = self.htlc_store.get_htlcs_by_node_id(&node_id); if htlcs.is_empty() { continue; } if self.has_usable_channel(&node_id) { - // Channel reestablish completed — forward the deferred HTLCs. - log_info!( - self.logger, - "[LSPS4] process_pending_htlcs: forwarding {} HTLCs for peer {} (channel now usable)", - htlcs.len(), - node_id - ); let actions = self.calculate_htlc_actions_for_peer(node_id, htlcs); - if actions.new_channel_needed_msat.is_some() { - // A channel open is already in flight from htlc_intercepted or - // peer_connected. Skip — channel_ready will handle forwarding - // once the new channel is established. - log_info!( - self.logger, - "[LSPS4] process_pending_htlcs: peer {} needs a new channel, \ - skipping (channel open already in flight)", - node_id - ); + if actions.is_empty() { continue; } self.execute_htlc_actions(actions, node_id); @@ -727,29 +776,65 @@ where } if !can_forward { - // No existing channel has sufficient capacity, need to open a new channel + // No existing channel has sufficient capacity. // Calculate total amount needed for remaining HTLCs (including current one) let total_remaining_amount = computed_htlcs .iter() .fold(required_amount, |acc, h| acc.saturating_add(h.amount_to_forward_msat)); + // Prefer splicing into the largest usable channel over opening a new one. + // Use is_channel_ready (not is_usable) so we prefer splice even during + // channel_reestablish. splice_channel() will fail if the channel isn't + // usable yet, and the timer will retry once reestablishment completes. + let splice_candidate = channels + .iter() + .filter(|c| c.is_channel_ready) + .max_by_key(|c| c.channel_value_satoshis); + + if let Some(candidate) = splice_candidate { + return HtlcProcessingActions { + forwards, + new_channel_needed_msat: None, + splice_needed: Some(SpliceAction { + channel_id: candidate.channel_id, + user_channel_id: candidate.user_channel_id, + amt_to_forward_msat: total_remaining_amount, + }), + channel_count, + }; + } + + // No usable channels to splice into - need a new channel. return HtlcProcessingActions { forwards, new_channel_needed_msat: Some(total_remaining_amount), + splice_needed: None, channel_count, }; } } - HtlcProcessingActions { forwards, new_channel_needed_msat: None, channel_count } + HtlcProcessingActions { + forwards, + new_channel_needed_msat: None, + splice_needed: None, + channel_count, + } } /// Execute the actions calculated by calculate_htlc_actions_for_peer pub(crate) fn execute_htlc_actions( &self, actions: HtlcProcessingActions, their_node_id: PublicKey, ) { + let HtlcProcessingActions { + forwards, + new_channel_needed_msat, + splice_needed, + channel_count, + } = actions; + // Execute forwards - for forward_action in actions.forwards { + for forward_action in forwards { // Re-check peer liveness right before forwarding to narrow the // TOCTOU window between the usability check and the actual forward. if !self.is_peer_connected(&their_node_id) { @@ -826,21 +911,55 @@ where } } - // Handle new channel opening - if let Some(channel_size_msat) = actions.new_channel_needed_msat { - log_info!( - self.logger, - "Need a new channel with peer {} for {}msat to forward HTLCs", - their_node_id, - channel_size_msat - ); + // Handle liquidity actions (splice or new channel) + let needs_liquidity = new_channel_needed_msat.is_some() || splice_needed.is_some(); + if needs_liquidity { + if self.is_liquidity_cooling_down(&their_node_id) { + log_info!( + self.logger, + "[LSPS4] Liquidity action suppressed for peer {} (cooling down)", + their_node_id + ); + return; + } + + // Set cooldown BEFORE emitting to prevent re-entrant emit on next tick. + self.liquidity_cooldown.write().unwrap().insert(their_node_id, Instant::now()); let mut event_queue_notifier = self.pending_events.notifier(); - event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::OpenChannel { - their_network_key: their_node_id, - amt_to_forward_msat: channel_size_msat, - channel_count: actions.channel_count, - })); + + if let Some(splice_action) = splice_needed { + log_info!( + self.logger, + "[LSPS4] Splicing into channel {} with peer {} for {}msat", + splice_action.channel_id, + their_node_id, + splice_action.amt_to_forward_msat + ); + event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service( + LSPS4ServiceEvent::SpliceChannel { + their_network_key: their_node_id, + channel_id: splice_action.channel_id, + user_channel_id: splice_action.user_channel_id, + amt_to_forward_msat: splice_action.amt_to_forward_msat, + channel_count, + }, + )); + } else if let Some(channel_size_msat) = new_channel_needed_msat { + log_info!( + self.logger, + "[LSPS4] Need a new channel with peer {} for {}msat to forward HTLCs", + their_node_id, + channel_size_msat + ); + event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service( + LSPS4ServiceEvent::OpenChannel { + their_network_key: their_node_id, + amt_to_forward_msat: channel_size_msat, + channel_count, + }, + )); + } } } @@ -917,4 +1036,89 @@ where }, } } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_htlc_processing_actions_is_empty() { + let empty = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: None, + splice_needed: None, + channel_count: 0, + }; + assert!(empty.is_empty()); + + let with_channel = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: Some(100_000), + splice_needed: None, + channel_count: 0, + }; + assert!(!with_channel.is_empty()); + + let with_splice = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: None, + splice_needed: Some(SpliceAction { + channel_id: ChannelId::from_bytes([0; 32]), + user_channel_id: 0, + amt_to_forward_msat: 50_000, + }), + channel_count: 1, + }; + assert!(!with_splice.is_empty()); + } + + #[test] + fn test_needs_liquidity_action() { + let no_action = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: None, + splice_needed: None, + channel_count: 0, + }; + assert!(!no_action.needs_liquidity_action()); + + let needs_channel = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: Some(100_000), + splice_needed: None, + channel_count: 0, + }; + assert!(needs_channel.needs_liquidity_action()); + + let needs_splice = HtlcProcessingActions { + forwards: vec![], + new_channel_needed_msat: None, + splice_needed: Some(SpliceAction { + channel_id: ChannelId::from_bytes([0; 32]), + user_channel_id: 0, + amt_to_forward_msat: 50_000, + }), + channel_count: 1, + }; + assert!(needs_splice.needs_liquidity_action()); + } + + #[test] + fn test_cooldown_auto_expires() { + // Verify the cooldown constant is less than HTLC expiry + assert!(LIQUIDITY_COOLDOWN_SECS < HTLC_EXPIRY_THRESHOLD_SECS); + } + + #[test] + fn test_splice_action_fields() { + let action = SpliceAction { + channel_id: ChannelId::from_bytes([1; 32]), + user_channel_id: 42, + amt_to_forward_msat: 1_000_000, + }; + assert_eq!(action.channel_id, ChannelId::from_bytes([1; 32])); + assert_eq!(action.user_channel_id, 42); + assert_eq!(action.amt_to_forward_msat, 1_000_000); + } } \ No newline at end of file