diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index e66cb9c63bd..8d539b0a5e6 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -488,12 +488,18 @@ where return Ok(()); } - let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await; - - // Release the pending sweep flag again, regardless of result. - self.pending_sweep.store(false, Ordering::Release); + // Use an RAII guard so the flag is released even if this future is dropped mid-await + // (e.g. cancelled by `tokio::time::timeout` or `select!`). A bare `store(false)` after + // the await would never run on cancellation, leaving the sweeper permanently disabled. + struct PendingSweepGuard<'a>(&'a AtomicBool); + impl<'a> Drop for PendingSweepGuard<'a> { + fn drop(&mut self) { + self.0.store(false, Ordering::Release); + } + } + let _guard = PendingSweepGuard(&self.pending_sweep); - result + self.regenerate_and_broadcast_spend_if_necessary_internal().await } /// Regenerates and broadcasts the spending transaction for any outputs that are pending @@ -1176,3 +1182,151 @@ where Ok((best_block, OutputSweeperSync { sweeper })) } } + +#[cfg(all(test, feature = "std"))] +mod tests { + use super::*; + use crate::chain::transaction::OutPoint; + use crate::sign::{ChangeDestinationSource, OutputSpender}; + use crate::util::async_poll::dummy_waker; + use crate::util::logger::Record; + use crate::util::native_async::MaybeSend; + + use bitcoin::hashes::Hash as _; + use bitcoin::secp256k1::All; + use bitcoin::transaction::Version; + use bitcoin::{Amount, BlockHash, ScriptBuf, Transaction, TxOut, Txid}; + + use core::future as core_future; + use core::pin::pin; + use core::sync::atomic::Ordering; + use core::task::Poll; + + struct DummyBroadcaster; + impl BroadcasterInterface for DummyBroadcaster { + fn broadcast_transactions(&self, _: &[(&Transaction, TransactionType)]) {} + } + + struct DummyFeeEstimator; + impl FeeEstimator for DummyFeeEstimator { + fn get_est_sat_per_1000_weight(&self, _: ConfirmationTarget) -> u32 { + 1000 + } + } + + struct DummyFilter; + impl Filter for DummyFilter { + fn register_tx(&self, _: &Txid, _: &bitcoin::Script) {} + fn register_output(&self, _: WatchedOutput) {} + } + + struct DummyLogger; + impl Logger for DummyLogger { + fn log(&self, _: Record) {} + } + + struct DummyOutputSpender; + impl OutputSpender for DummyOutputSpender { + fn spend_spendable_outputs( + &self, _: &[&SpendableOutputDescriptor], _: Vec, _: ScriptBuf, _: u32, + _: Option, _: &Secp256k1, + ) -> Result { + Ok(Transaction { + version: Version::TWO, + lock_time: LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }) + } + } + + struct DummyChangeDestSource; + impl ChangeDestinationSource for DummyChangeDestSource { + fn get_change_destination_script<'a>( + &'a self, + ) -> impl Future> + MaybeSend + 'a { + core_future::ready(Ok(ScriptBuf::new())) + } + } + + struct PendingKVStore; + impl KVStore for PendingKVStore { + fn read( + &self, _: &str, _: &str, _: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + core_future::ready(Err(io::Error::new(io::ErrorKind::NotFound, ""))) + } + fn write( + &self, _: &str, _: &str, _: &str, _: Vec, + ) -> impl Future> + 'static + MaybeSend { + core_future::pending() + } + fn remove( + &self, _: &str, _: &str, _: &str, _: bool, + ) -> impl Future> + 'static + MaybeSend { + core_future::ready(Ok(())) + } + fn list( + &self, _: &str, _: &str, + ) -> impl Future, io::Error>> + 'static + MaybeSend { + core_future::ready(Ok(Vec::new())) + } + } + + #[test] + fn pending_sweep_flag_resets_after_future_drop() { + let best_block = BlockLocator::new(BlockHash::all_zeros(), 1_000); + + let sweeper: OutputSweeper< + DummyBroadcaster, + Box, + DummyFeeEstimator, + DummyFilter, + PendingKVStore, + DummyLogger, + DummyOutputSpender, + > = OutputSweeper::new( + best_block, + DummyBroadcaster, + DummyFeeEstimator, + None, + DummyOutputSpender, + Box::new(DummyChangeDestSource), + PendingKVStore, + DummyLogger, + ); + + // Inject a tracked output directly so the sweep loop has work to do. + let descriptor = SpendableOutputDescriptor::StaticOutput { + outpoint: OutPoint { txid: Txid::all_zeros(), index: 0 }, + output: TxOut { value: Amount::from_sat(100_000), script_pubkey: ScriptBuf::new() }, + channel_keys_id: None, + }; + sweeper.sweeper_state.lock().unwrap().outputs.push(TrackedSpendableOutput { + descriptor, + channel_id: None, + counterparty_node_id: None, + status: OutputSpendStatus::PendingInitialBroadcast { delayed_until_height: None }, + }); + + // Start a sweep, poll once (the persist step stays Pending because our KVStore's + // `write` future is `future::pending()`), then drop the future to mimic + // cancellation - the sort of thing a `tokio::time::timeout` wrapper produces. + { + let mut fut = pin!(sweeper.regenerate_and_broadcast_spend_if_necessary()); + let waker = dummy_waker(); + let mut ctx = task::Context::from_waker(&waker); + assert!(matches!(fut.as_mut().poll(&mut ctx), Poll::Pending)); + } + + // Once the future has been dropped, `pending_sweep` must be cleared. The bug + // is that the flag is only ever cleared after the inner future returns, so a + // dropped future leaves it stuck `true` and every subsequent call to + // `regenerate_and_broadcast_spend_if_necessary` short-circuits with `Ok(())`, + // permanently disabling the sweeper. + assert!( + !sweeper.pending_sweep.load(Ordering::Acquire), + "pending_sweep flag was not reset when the future was dropped", + ); + } +}