Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 159 additions & 5 deletions lightning/src/util/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,12 +488,18 @@ where
return Ok(());
}

let result = self.regenerate_and_broadcast_spend_if_necessary_internal().await;

// Release the pending sweep flag again, regardless of result.
self.pending_sweep.store(false, Ordering::Release);
// Use an RAII guard so the flag is released even if this future is dropped mid-await
// (e.g. cancelled by `tokio::time::timeout` or `select!`). A bare `store(false)` after
// the await would never run on cancellation, leaving the sweeper permanently disabled.
struct PendingSweepGuard<'a>(&'a AtomicBool);
impl<'a> Drop for PendingSweepGuard<'a> {
fn drop(&mut self) {
self.0.store(false, Ordering::Release);
}
}
let _guard = PendingSweepGuard(&self.pending_sweep);

result
self.regenerate_and_broadcast_spend_if_necessary_internal().await
}

/// Regenerates and broadcasts the spending transaction for any outputs that are pending
Expand Down Expand Up @@ -1176,3 +1182,151 @@ where
Ok((best_block, OutputSweeperSync { sweeper }))
}
}

#[cfg(all(test, feature = "std"))]
mod tests {
use super::*;
use crate::chain::transaction::OutPoint;
use crate::sign::{ChangeDestinationSource, OutputSpender};
use crate::util::async_poll::dummy_waker;
use crate::util::logger::Record;
use crate::util::native_async::MaybeSend;

use bitcoin::hashes::Hash as _;
use bitcoin::secp256k1::All;
use bitcoin::transaction::Version;
use bitcoin::{Amount, BlockHash, ScriptBuf, Transaction, TxOut, Txid};

use core::future as core_future;
use core::pin::pin;
use core::sync::atomic::Ordering;
use core::task::Poll;

struct DummyBroadcaster;
impl BroadcasterInterface for DummyBroadcaster {
fn broadcast_transactions(&self, _: &[(&Transaction, TransactionType)]) {}
}

struct DummyFeeEstimator;
impl FeeEstimator for DummyFeeEstimator {
fn get_est_sat_per_1000_weight(&self, _: ConfirmationTarget) -> u32 {
1000
}
}

struct DummyFilter;
impl Filter for DummyFilter {
fn register_tx(&self, _: &Txid, _: &bitcoin::Script) {}
fn register_output(&self, _: WatchedOutput) {}
}

struct DummyLogger;
impl Logger for DummyLogger {
fn log(&self, _: Record) {}
}

struct DummyOutputSpender;
impl OutputSpender for DummyOutputSpender {
fn spend_spendable_outputs(
&self, _: &[&SpendableOutputDescriptor], _: Vec<TxOut>, _: ScriptBuf, _: u32,
_: Option<LockTime>, _: &Secp256k1<All>,
) -> Result<Transaction, ()> {
Ok(Transaction {
version: Version::TWO,
lock_time: LockTime::ZERO,
input: Vec::new(),
output: Vec::new(),
})
}
}

struct DummyChangeDestSource;
impl ChangeDestinationSource for DummyChangeDestSource {
fn get_change_destination_script<'a>(
&'a self,
) -> impl Future<Output = Result<ScriptBuf, ()>> + MaybeSend + 'a {
core_future::ready(Ok(ScriptBuf::new()))
}
}

struct PendingKVStore;
impl KVStore for PendingKVStore {
fn read(
&self, _: &str, _: &str, _: &str,
) -> impl Future<Output = Result<Vec<u8>, io::Error>> + 'static + MaybeSend {
core_future::ready(Err(io::Error::new(io::ErrorKind::NotFound, "")))
}
fn write(
&self, _: &str, _: &str, _: &str, _: Vec<u8>,
) -> impl Future<Output = Result<(), io::Error>> + 'static + MaybeSend {
core_future::pending()
}
fn remove(
&self, _: &str, _: &str, _: &str, _: bool,
) -> impl Future<Output = Result<(), io::Error>> + 'static + MaybeSend {
core_future::ready(Ok(()))
}
fn list(
&self, _: &str, _: &str,
) -> impl Future<Output = Result<Vec<String>, io::Error>> + 'static + MaybeSend {
core_future::ready(Ok(Vec::new()))
}
}

#[test]
fn pending_sweep_flag_resets_after_future_drop() {
let best_block = BlockLocator::new(BlockHash::all_zeros(), 1_000);

let sweeper: OutputSweeper<
DummyBroadcaster,
Box<DummyChangeDestSource>,
DummyFeeEstimator,
DummyFilter,
PendingKVStore,
DummyLogger,
DummyOutputSpender,
> = OutputSweeper::new(
best_block,
DummyBroadcaster,
DummyFeeEstimator,
None,
DummyOutputSpender,
Box::new(DummyChangeDestSource),
PendingKVStore,
DummyLogger,
);

// Inject a tracked output directly so the sweep loop has work to do.
let descriptor = SpendableOutputDescriptor::StaticOutput {
outpoint: OutPoint { txid: Txid::all_zeros(), index: 0 },
output: TxOut { value: Amount::from_sat(100_000), script_pubkey: ScriptBuf::new() },
channel_keys_id: None,
};
sweeper.sweeper_state.lock().unwrap().outputs.push(TrackedSpendableOutput {
descriptor,
channel_id: None,
counterparty_node_id: None,
status: OutputSpendStatus::PendingInitialBroadcast { delayed_until_height: None },
});

// Start a sweep, poll once (the persist step stays Pending because our KVStore's
// `write` future is `future::pending()`), then drop the future to mimic
// cancellation - the sort of thing a `tokio::time::timeout` wrapper produces.
{
let mut fut = pin!(sweeper.regenerate_and_broadcast_spend_if_necessary());
let waker = dummy_waker();
let mut ctx = task::Context::from_waker(&waker);
assert!(matches!(fut.as_mut().poll(&mut ctx), Poll::Pending));
}

// Once the future has been dropped, `pending_sweep` must be cleared. The bug
// is that the flag is only ever cleared after the inner future returns, so a
// dropped future leaves it stuck `true` and every subsequent call to
// `regenerate_and_broadcast_spend_if_necessary` short-circuits with `Ok(())`,
// permanently disabling the sweeper.
assert!(
!sweeper.pending_sweep.load(Ordering::Acquire),
"pending_sweep flag was not reset when the future was dropped",
);
}
}
Loading