From e81d9ddb1a6d6ebf7aa0b381622e29d033de5610 Mon Sep 17 00:00:00 2001 From: amackillop Date: Mon, 9 Mar 2026 15:29:37 -0700 Subject: [PATCH] Defer LSPS4 HTLC forwarding until channel usable LDK fires peer_connected after the TCP+Init handshake but before channel_reestablish completes. During this window, channels exist but are not yet usable (is_usable=false). The previous code forwarded HTLCs unconditionally in peer_connected and htlc_intercepted, causing ~10% of payments to fail on reconnect. Distinguish two connected-peer cases in htlc_intercepted and peer_connected: - Channels exist but none usable (reestablish in progress): defer the HTLC for timer-based retry via process_pending_htlcs(). - No channels exist (first payment, JIT open needed): proceed immediately so calculate_htlc_actions_for_peer can emit the OpenChannel event. process_pending_htlcs (5s timer) only retries the reestablish case (channels exist, waiting to become usable). It must not handle the no-channel case to avoid emitting duplicate OpenChannel events while a JIT open is already in flight. Remove the re-check race in htlc_intercepted that could fire concurrently with peer_connected. The webhook + peer_connected path is the single owner of the offline-peer reconnect flow. Blocking inside peer_connected was also considered but rejected: there is no LDK event for "channel usable after reconnect" to wake on, so it would require a spin-wait with arbitrary timeout. A timer-based retry is cleaner and avoids holding the lock. --- lightning-liquidity/src/lsps4/service.rs | 134 +++++++++++++++++------ 1 file changed, 100 insertions(+), 34 deletions(-) diff --git a/lightning-liquidity/src/lsps4/service.rs b/lightning-liquidity/src/lsps4/service.rs index e2ceaaf6534..1a5394168a7 100644 --- a/lightning-liquidity/src/lsps4/service.rs +++ b/lightning-liquidity/src/lsps4/service.rs @@ -179,34 +179,25 @@ where payment_hash ); self.htlc_store.insert(htlc).unwrap(); // TODO: handle persistence failures - // Re-check: the peer may have reconnected while we were persisting. - // If so, peer_connected() already ran and found 0 HTLCs (race). - // Process the stored HTLC now instead of relying on the webhook. - if self.is_peer_connected(&counterparty_node_id) { - log_info!( - self.logger, - "[LSPS4] htlc_intercepted: peer {} reconnected during HTLC store, processing immediately", - counterparty_node_id - ); - let htlcs = self.htlc_store.get_htlcs_by_node_id(&counterparty_node_id); - self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); - } else { - let mut event_queue_notifier = self.pending_events.notifier(); - event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook { - counterparty_node_id: counterparty_node_id.clone(), - payment_hash, - })); - } + // Send webhook to wake the peer. When the peer reconnects, + // peer_connected() will find this HTLC in the store and process it. + // The process_pending_htlcs timer also serves as a fallback. + let mut event_queue_notifier = self.pending_events.notifier(); + event_queue_notifier.enqueue(crate::events::LiquidityEvent::LSPS4Service(LSPS4ServiceEvent::SendWebhook { + counterparty_node_id: counterparty_node_id.clone(), + payment_hash, + })); } else { - // Log channel states when taking the "connected" path let channels = self .channel_manager .get_cm() .list_channels_with_counterparty(&counterparty_node_id); let any_usable = channels.iter().any(|ch| ch.is_usable); + let has_channels = !channels.is_empty(); + log_info!( self.logger, - "[LSPS4] htlc_intercepted: peer {} IS in connected_peers (set size: {}), processing immediately. channels: {}, any_usable: {}, payment_hash: {}", + "[LSPS4] htlc_intercepted: peer {} IS in connected_peers (set size: {}), channels: {}, any_usable: {}, payment_hash: {}", counterparty_node_id, connected_count, channels.len(), @@ -224,21 +215,40 @@ where ); } - let actions = - self.calculate_htlc_actions_for_peer(counterparty_node_id, vec![htlc.clone()]); + if has_channels && !any_usable { + // Channels exist but none usable yet (channel_reestablish in progress). + // Defer until process_pending_htlcs picks them up. + log_info!( + self.logger, + "[LSPS4] htlc_intercepted: peer {} has {} channels but none usable, \ + deferring HTLC until channel_reestablish completes. payment_hash: {}", + counterparty_node_id, + channels.len(), + payment_hash + ); + self.htlc_store.insert(htlc).unwrap(); + } else { + // Either channels are usable, or no channels exist (need JIT open). + // calculate_htlc_actions_for_peer handles both: forward through usable + // channels, or emit OpenChannel event when no capacity exists. + let actions = self.calculate_htlc_actions_for_peer( + counterparty_node_id, + vec![htlc.clone()], + ); - if actions.new_channel_needed_msat.is_some() { - self.htlc_store.insert(htlc).unwrap(); // TODO: handle persistence failures - } + if actions.new_channel_needed_msat.is_some() { + self.htlc_store.insert(htlc).unwrap(); + } - log_debug!( - self.logger, - "[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}", - counterparty_node_id, - actions - ); + log_debug!( + self.logger, + "[LSPS4] htlc_intercepted: calculated actions for peer {}: {:?}", + counterparty_node_id, + actions + ); - self.execute_htlc_actions(actions, counterparty_node_id.clone()); + self.execute_htlc_actions(actions, counterparty_node_id.clone()); + } } } else { log_error!( @@ -305,7 +315,12 @@ where Ok(()) } - /// Will attempt to forward any pending intercepted htlcs to this counterparty. + /// Will attempt to forward any pending intercepted htlcs to this counterparty, + /// but only if a usable channel exists. After reconnect, channels are not usable + /// until channel_reestablish completes. Call [`process_pending_htlcs`] on a timer + /// to retry peers whose channels were not yet usable at connect time. + /// + /// [`process_pending_htlcs`]: Self::process_pending_htlcs pub fn peer_connected(&self, counterparty_node_id: PublicKey) { let fn_start = Instant::now(); let connected_count = { @@ -370,7 +385,20 @@ where } } - self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); + if self.has_usable_channel(&counterparty_node_id) || channels.is_empty() { + // Either channels are usable (forward immediately) or no channels exist + // at all (process_htlcs_for_peer will trigger OpenChannel for JIT). + self.process_htlcs_for_peer(counterparty_node_id.clone(), htlcs); + } else { + // Channels exist but none usable: reestablish still in progress. + // Defer until process_pending_htlcs picks them up on the timer. + log_info!( + self.logger, + "[LSPS4] peer_connected: {counterparty_node_id} has {} pending HTLCs but channels \ + not yet usable (reestablish in progress), deferring", + htlcs.len() + ); + } log_info!( self.logger, @@ -378,6 +406,8 @@ where fn_start.elapsed().as_millis(), counterparty_node_id ); + + } /// Handle expired HTLCs. @@ -430,6 +460,14 @@ where self.connected_peers.read().unwrap().contains(counterparty_node_id) } + fn has_usable_channel(&self, counterparty_node_id: &PublicKey) -> bool { + self.channel_manager + .get_cm() + .list_channels_with_counterparty(counterparty_node_id) + .iter() + .any(|c| c.is_usable) + } + /// Will update the set of connected peers pub fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { let (was_present, remaining_count) = { @@ -462,6 +500,34 @@ where } } + /// Attempt to forward pending HTLCs for all connected peers that have usable channels. + /// + /// After reconnect, `peer_connected` fires before `channel_reestablish` completes, + /// so channels are not yet usable. This method should be called on a regular timer + /// (e.g. every few seconds) to pick up HTLCs that were deferred at connect time. + pub fn process_pending_htlcs(&self) { + let connected_peers: Vec = + self.connected_peers.read().unwrap().iter().copied().collect(); + + for node_id in connected_peers { + let htlcs = self.htlc_store.get_htlcs_by_node_id(&node_id); + if htlcs.is_empty() { + continue; + } + + if self.has_usable_channel(&node_id) { + // Channel reestablish completed — forward the deferred HTLCs. + log_info!( + self.logger, + "[LSPS4] process_pending_htlcs: forwarding {} HTLCs for peer {} (channel now usable)", + htlcs.len(), + node_id + ); + self.process_htlcs_for_peer(node_id, htlcs); + } + } + } + fn handle_register_node_request( &self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey, _params: RegisterNodeRequest, ) -> Result<(), LightningError> {