diff --git a/dash-spv/src/sync/filters/block_match_tracker.rs b/dash-spv/src/sync/filters/block_match_tracker.rs index 6f9301491..e0d911b47 100644 --- a/dash-spv/src/sync/filters/block_match_tracker.rs +++ b/dash-spv/src/sync/filters/block_match_tracker.rs @@ -92,6 +92,28 @@ impl BlockMatchTracker { } } + /// Like [`Self::track`], but for matches driven by newly derived + /// scripts: the prior processing of this block predates those scripts, + /// so processed records do not cover them and the block must be + /// re-applied to the full candidate set. Never returns + /// `AlreadyProcessed`. + pub(super) fn track_for_new_scripts( + &mut self, + key: &FilterMatchKey, + batch_start: u32, + candidate_wallets: BTreeSet, + ) -> BlockTrackResult { + if self.blocks_remaining.contains_key(key.hash()) { + return BlockTrackResult::InFlight { + wallets: candidate_wallets, + }; + } + self.blocks_remaining.insert(*key.hash(), (key.height(), batch_start)); + BlockTrackResult::NewlyTracked { + wallets: candidate_wallets, + } + } + /// Record that `wallets` have had the block at `(height, hash)` applied /// to their state. Idempotent: existing entries merge, never shrink. pub(super) fn record_processed( @@ -210,6 +232,23 @@ mod tests { tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])), BlockTrackResult::AlreadyProcessed ); + + // A match on newly derived scripts re-queues the fully-processed + // block anyway: the prior processing predates those scripts. + assert_eq!( + tracker.track_for_new_scripts(&key, 5000, BTreeSet::from([wallet_a])), + BlockTrackResult::NewlyTracked { + wallets: BTreeSet::from([wallet_a]) + } + ); + // While re-queued the block reports InFlight, preserving the + // one-increment-one-decrement pending accounting. + assert_eq!( + tracker.track_for_new_scripts(&key, 5000, BTreeSet::from([wallet_a])), + BlockTrackResult::InFlight { + wallets: BTreeSet::from([wallet_a]) + } + ); } /// `prune_at_or_below` drops every entry at or below the given height diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index 2464466b2..a0c806151 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -665,7 +665,11 @@ impl { @@ -679,6 +683,7 @@ impl {} } } @@ -930,7 +935,7 @@ mod tests { }; use crate::sync::{ManagerIdentifier, SyncManagerProgress}; use dashcore::bip158::BlockFilter; - use dashcore::Header; + use dashcore::{Address, Header}; use dashcore::{Block, Network, Transaction}; use dashcore_hashes::Hash; use key_wallet_manager::test_utils::{ @@ -1341,6 +1346,49 @@ mod tests { assert!(!attr_b.contains(&wallet_a)); } + /// A block that was already processed for a wallet is re-queued by + /// `rescan_batch` when newly derived scripts match it: the prior + /// processing predates those scripts, so the block must be re-applied + /// against the extended pools. Plain scans keep skipping processed + /// blocks. + #[tokio::test] + async fn test_rescan_batch_reprocesses_already_processed_block() { + let address = Address::dummy(Network::Regtest, 33); + let mut manager = create_test_manager().await; + manager.set_state(SyncState::Syncing); + + let (key, filter) = filter_for_address(20, &address); + let mut filters: HashMap = HashMap::new(); + filters.insert(key.clone(), filter); + let mut batch = FiltersBatch::new(0, 99, filters); + batch.mark_verified(); + manager.active_batches.insert(0, batch); + + // The block was downloaded and processed for the wallet before the + // rescan script existed. + manager.tracker.record_processed(20, *key.hash(), &BTreeSet::from([MOCK_WALLET_ID])); + + let mut new_scripts: HashMap> = HashMap::new(); + new_scripts.insert(MOCK_WALLET_ID, HashSet::from([address.script_pubkey()])); + + let events = manager.rescan_batch(0, &new_scripts).await.unwrap(); + + let blocks = events + .iter() + .find_map(|e| match e { + SyncEvent::BlocksNeeded { + blocks, + } => Some(blocks), + _ => None, + }) + .expect("BlocksNeeded event for the re-queued block"); + assert!(blocks.get(&key).expect("re-queued block entry").contains(&MOCK_WALLET_ID)); + + // The re-queued block is accounted exactly once so the batch cannot + // commit before its re-processing completes. + assert_eq!(manager.active_batches.get(&0).unwrap().pending_blocks(), 1); + } + /// `rescan_batch` honours each wallet's own `synced_height`: a new /// address belonging to a wallet that has already advanced past a height /// must not produce a `BlocksNeeded` for that height, even when the diff --git a/dash-spv/src/test_utils/node.rs b/dash-spv/src/test_utils/node.rs index aaf46574b..41dcd9b71 100644 --- a/dash-spv/src/test_utils/node.rs +++ b/dash-spv/src/test_utils/node.rs @@ -6,7 +6,7 @@ use dashcore::{Address, Amount, BlockHash, Transaction, Txid}; use dashcore_rpc::json as rpc_json; use dashcore_rpc::{Auth, Client, RpcApi}; use serde::Deserialize; -use serde_json::Value; +use serde_json::{Map, Value}; use std::collections::HashMap; use std::fs; use std::net::SocketAddr; @@ -316,6 +316,22 @@ impl DashCoreNode { txid } + /// Send DASH to many addresses in a single transaction from the primary + /// wallet, so one transaction carries one output per `(address, amount)` + /// pair. + pub fn send_many(&self, payments: &[(Address, Amount)]) -> Txid { + let client = self.rpc_client(); + let amounts: Map = payments + .iter() + .map(|(address, amount)| (address.to_string(), serde_json::json!(amount.to_dash()))) + .collect(); + let txid: Txid = client + .call("sendmany", &[serde_json::json!(""), Value::Object(amounts)]) + .expect("failed to sendmany"); + tracing::info!("Sent {} outputs in one transaction, txid: {}", payments.len(), txid); + txid + } + /// Send DASH to an address from a specific wallet. pub fn send_to_address_from_wallet( &self, diff --git a/dash-spv/tests/dashd_sync/tests_transaction.rs b/dash-spv/tests/dashd_sync/tests_transaction.rs index ec3b66957..0b99f8a32 100644 --- a/dash-spv/tests/dashd_sync/tests_transaction.rs +++ b/dash-spv/tests/dashd_sync/tests_transaction.rs @@ -5,12 +5,18 @@ use std::time::Duration; use tokio::sync::RwLock; use super::helpers::{ - wait_for_mempool_tx, wait_for_sync, wait_for_wallet_synced, EMPTY_MNEMONIC, SECONDARY_MNEMONIC, + count_wallet_transactions, get_spendable_balance, wait_for_mempool_tx, wait_for_sync, + wait_for_wallet_synced, EMPTY_MNEMONIC, SECONDARY_MNEMONIC, }; use super::setup::{create_and_start_client, TestContext}; use dash_spv::test_utils::{create_test_wallet, TestChain}; use dashcore::address::NetworkUnchecked; +use dashcore::secp256k1::Secp256k1; +use dashcore::PublicKey; use key_wallet::account::ManagedAccountTrait; +use key_wallet::bip32::{ChildNumber, ExtendedPrivKey}; +use key_wallet::gap_limit::DEFAULT_EXTERNAL_GAP_LIMIT; +use key_wallet::mnemonic::{Language, Mnemonic}; use key_wallet::wallet::managed_wallet_info::transaction_builder::{ BuilderError, TransactionBuilder, }; @@ -247,6 +253,79 @@ async fn test_multiple_transactions_across_blocks() { const MEMPOOL_TIMEOUT: Duration = Duration::from_secs(30); +/// Derive the first `count` BIP44 external addresses of `mnemonic` directly, +/// independently of any wallet state, so a test can pay addresses far beyond +/// the pre-generated pool window. +fn derive_external_addresses(mnemonic: &str, count: u32) -> Vec
{ + let mnemonic = Mnemonic::from_phrase(mnemonic, Language::English).expect("mnemonic"); + let seed = mnemonic.to_seed(""); + let secp = Secp256k1::new(); + let master = ExtendedPrivKey::new_master(Network::Regtest, &seed).expect("master key"); + let chain = [ + ChildNumber::from_hardened_idx(44).expect("purpose"), + ChildNumber::from_hardened_idx(1).expect("coin type"), + ChildNumber::from_hardened_idx(0).expect("account"), + ChildNumber::from_normal_idx(0).expect("external branch"), + ]; + (0..count) + .map(|index| { + let mut path = chain.to_vec(); + path.push(ChildNumber::from_normal_idx(index).expect("index")); + let xprv = master.derive_priv(&secp, &path).expect("derive"); + let pk = PublicKey::new(xprv.private_key.public_key(&secp)); + Address::p2pkh(&pk, Network::Regtest) + }) + .collect() +} + +/// A single transaction paying a run of consecutive fresh addresses reaching +/// far beyond the gap window (the shape of a CreateDenominations burst), +/// mined before the client ever starts. A sync from scratch must recognize +/// every output: the block is scanned to fixpoint against the extending +/// pool, so outputs past the initial look-ahead are still credited. +#[tokio::test] +async fn test_burst_payment_beyond_gap_window_synced_from_scratch() { + let Some(ctx) = TestContext::new(TestChain::Minimal).await else { + return; + }; + if !ctx.dashd.supports_mining { + eprintln!("Skipping test (dashd RPC miner not available)"); + return; + } + + let burst = DEFAULT_EXTERNAL_GAP_LIMIT + 21; + let addresses = derive_external_addresses(EMPTY_MNEMONIC, burst); + let per_output = Amount::from_sat(100_000); + let payments: Vec<(Address, Amount)> = + addresses.into_iter().map(|address| (address, per_output)).collect(); + let burst_txid = ctx.dashd.node.send_many(&payments); + + let miner_address = ctx.dashd.node.get_new_address_from_wallet("default"); + ctx.dashd.node.generate_blocks(1, &miner_address); + let funded_height = ctx.dashd.initial_height + 1; + + // Only now create the wallet and start the client, so discovery has to + // climb the whole burst during the historical scan. + let (wallet, wallet_id) = create_test_wallet(EMPTY_MNEMONIC, Network::Regtest); + let mut client_handle = create_and_start_client(&ctx.client_config, Arc::clone(&wallet)).await; + wait_for_sync(&mut client_handle.progress_receiver, funded_height).await; + wait_for_wallet_synced(&wallet, &wallet_id, funded_height).await; + + assert_eq!( + count_wallet_transactions(&wallet, &wallet_id).await, + 1, + "burst tx {} must be discovered", + burst_txid + ); + assert_eq!( + get_spendable_balance(&wallet, &wallet_id).await, + burst as u64 * per_output.to_sat(), + "every burst output must be credited, not only those inside the initial gap window" + ); + + client_handle.stop().await; +} + async fn reserve_first_address(mnemonic: &str) -> Address { let (temp_mgr, temp_id) = create_test_wallet(mnemonic, Network::Regtest);