Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions dash-spv/src/sync/filters/block_match_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,28 @@ impl BlockMatchTracker {
}
}

/// Like [`Self::track`], but for matches driven by newly derived
/// scripts: the prior processing of this block predates those scripts,
/// so processed records do not cover them and the block must be
/// re-applied to the full candidate set. Never returns
/// `AlreadyProcessed`.
pub(super) fn track_for_new_scripts(
&mut self,
key: &FilterMatchKey,
batch_start: u32,
candidate_wallets: BTreeSet<WalletId>,
) -> BlockTrackResult {
if self.blocks_remaining.contains_key(key.hash()) {
return BlockTrackResult::InFlight {
wallets: candidate_wallets,
};
}
self.blocks_remaining.insert(*key.hash(), (key.height(), batch_start));
BlockTrackResult::NewlyTracked {
wallets: candidate_wallets,
}
}

/// Record that `wallets` have had the block at `(height, hash)` applied
/// to their state. Idempotent: existing entries merge, never shrink.
pub(super) fn record_processed(
Expand Down Expand Up @@ -210,6 +232,23 @@ mod tests {
tracker.track(&key, 5000, BTreeSet::from([wallet_a, wallet_b])),
BlockTrackResult::AlreadyProcessed
);

// A match on newly derived scripts re-queues the fully-processed
// block anyway: the prior processing predates those scripts.
assert_eq!(
tracker.track_for_new_scripts(&key, 5000, BTreeSet::from([wallet_a])),
BlockTrackResult::NewlyTracked {
wallets: BTreeSet::from([wallet_a])
}
);
// While re-queued the block reports InFlight, preserving the
// one-increment-one-decrement pending accounting.
assert_eq!(
tracker.track_for_new_scripts(&key, 5000, BTreeSet::from([wallet_a])),
BlockTrackResult::InFlight {
wallets: BTreeSet::from([wallet_a])
}
);
}

/// `prune_at_or_below` drops every entry at or below the given height
Expand Down
52 changes: 50 additions & 2 deletions dash-spv/src/sync/filters/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,11 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
self.progress.add_matched(block_to_wallets.len() as u32);
}
for (key, wallets) in block_to_wallets {
match self.tracker.track(&key, batch_start, wallets) {
// Matches here are driven by scripts that did not exist when the
// block was first processed, so a processed record must not
// suppress the re-download: the block has to be re-applied
// against the extended pools.
match self.tracker.track_for_new_scripts(&key, batch_start, wallets) {
BlockTrackResult::NewlyTracked {
wallets,
} => {
Expand All @@ -679,6 +683,7 @@ impl<H: BlockHeaderStorage, FH: FilterHeaderStorage, F: FilterStorage, W: Wallet
// pipeline's pending wallet set via a fresh BlocksNeeded.
blocks_needed.insert(key, wallets);
}
// Never returned by track_for_new_scripts.
BlockTrackResult::AlreadyProcessed => {}
}
}
Expand Down Expand Up @@ -930,7 +935,7 @@ mod tests {
};
use crate::sync::{ManagerIdentifier, SyncManagerProgress};
use dashcore::bip158::BlockFilter;
use dashcore::Header;
use dashcore::{Address, Header};
use dashcore::{Block, Network, Transaction};
use dashcore_hashes::Hash;
use key_wallet_manager::test_utils::{
Expand Down Expand Up @@ -1341,6 +1346,49 @@ mod tests {
assert!(!attr_b.contains(&wallet_a));
}

/// A block that was already processed for a wallet is re-queued by
/// `rescan_batch` when newly derived scripts match it: the prior
/// processing predates those scripts, so the block must be re-applied
/// against the extended pools. Plain scans keep skipping processed
/// blocks.
#[tokio::test]
async fn test_rescan_batch_reprocesses_already_processed_block() {
let address = Address::dummy(Network::Regtest, 33);
let mut manager = create_test_manager().await;
manager.set_state(SyncState::Syncing);

let (key, filter) = filter_for_address(20, &address);
let mut filters: HashMap<FilterMatchKey, BlockFilter> = HashMap::new();
filters.insert(key.clone(), filter);
let mut batch = FiltersBatch::new(0, 99, filters);
batch.mark_verified();
manager.active_batches.insert(0, batch);

// The block was downloaded and processed for the wallet before the
// rescan script existed.
manager.tracker.record_processed(20, *key.hash(), &BTreeSet::from([MOCK_WALLET_ID]));

let mut new_scripts: HashMap<WalletId, HashSet<ScriptBuf>> = HashMap::new();
new_scripts.insert(MOCK_WALLET_ID, HashSet::from([address.script_pubkey()]));

let events = manager.rescan_batch(0, &new_scripts).await.unwrap();

let blocks = events
.iter()
.find_map(|e| match e {
SyncEvent::BlocksNeeded {
blocks,
} => Some(blocks),
_ => None,
})
.expect("BlocksNeeded event for the re-queued block");
assert!(blocks.get(&key).expect("re-queued block entry").contains(&MOCK_WALLET_ID));

// The re-queued block is accounted exactly once so the batch cannot
// commit before its re-processing completes.
assert_eq!(manager.active_batches.get(&0).unwrap().pending_blocks(), 1);
}

/// `rescan_batch` honours each wallet's own `synced_height`: a new
/// address belonging to a wallet that has already advanced past a height
/// must not produce a `BlocksNeeded` for that height, even when the
Expand Down
18 changes: 17 additions & 1 deletion dash-spv/src/test_utils/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use dashcore::{Address, Amount, BlockHash, Transaction, Txid};
use dashcore_rpc::json as rpc_json;
use dashcore_rpc::{Auth, Client, RpcApi};
use serde::Deserialize;
use serde_json::Value;
use serde_json::{Map, Value};
use std::collections::HashMap;
use std::fs;
use std::net::SocketAddr;
Expand Down Expand Up @@ -316,6 +316,22 @@ impl DashCoreNode {
txid
}

/// Send DASH to many addresses in a single transaction from the primary
/// wallet, so one transaction carries one output per `(address, amount)`
/// pair.
pub fn send_many(&self, payments: &[(Address, Amount)]) -> Txid {
let client = self.rpc_client();
let amounts: Map<String, Value> = payments
.iter()
.map(|(address, amount)| (address.to_string(), serde_json::json!(amount.to_dash())))
.collect();
Comment thread
xdustinface marked this conversation as resolved.
let txid: Txid = client
.call("sendmany", &[serde_json::json!(""), Value::Object(amounts)])
.expect("failed to sendmany");
tracing::info!("Sent {} outputs in one transaction, txid: {}", payments.len(), txid);
txid
}

/// Send DASH to an address from a specific wallet.
pub fn send_to_address_from_wallet(
&self,
Expand Down
81 changes: 80 additions & 1 deletion dash-spv/tests/dashd_sync/tests_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ use std::time::Duration;
use tokio::sync::RwLock;

use super::helpers::{
wait_for_mempool_tx, wait_for_sync, wait_for_wallet_synced, EMPTY_MNEMONIC, SECONDARY_MNEMONIC,
count_wallet_transactions, get_spendable_balance, wait_for_mempool_tx, wait_for_sync,
wait_for_wallet_synced, EMPTY_MNEMONIC, SECONDARY_MNEMONIC,
};
use super::setup::{create_and_start_client, TestContext};
use dash_spv::test_utils::{create_test_wallet, TestChain};
use dashcore::address::NetworkUnchecked;
use dashcore::secp256k1::Secp256k1;
use dashcore::PublicKey;
use key_wallet::account::ManagedAccountTrait;
use key_wallet::bip32::{ChildNumber, ExtendedPrivKey};
use key_wallet::gap_limit::DEFAULT_EXTERNAL_GAP_LIMIT;
use key_wallet::mnemonic::{Language, Mnemonic};
use key_wallet::wallet::managed_wallet_info::transaction_builder::{
BuilderError, TransactionBuilder,
};
Expand Down Expand Up @@ -247,6 +253,79 @@ async fn test_multiple_transactions_across_blocks() {

const MEMPOOL_TIMEOUT: Duration = Duration::from_secs(30);

/// Derive the first `count` BIP44 external addresses of `mnemonic` directly,
/// independently of any wallet state, so a test can pay addresses far beyond
/// the pre-generated pool window.
fn derive_external_addresses(mnemonic: &str, count: u32) -> Vec<Address> {
let mnemonic = Mnemonic::from_phrase(mnemonic, Language::English).expect("mnemonic");
let seed = mnemonic.to_seed("");
let secp = Secp256k1::new();
let master = ExtendedPrivKey::new_master(Network::Regtest, &seed).expect("master key");
let chain = [
ChildNumber::from_hardened_idx(44).expect("purpose"),
ChildNumber::from_hardened_idx(1).expect("coin type"),
ChildNumber::from_hardened_idx(0).expect("account"),
ChildNumber::from_normal_idx(0).expect("external branch"),
];
(0..count)
.map(|index| {
let mut path = chain.to_vec();
path.push(ChildNumber::from_normal_idx(index).expect("index"));
let xprv = master.derive_priv(&secp, &path).expect("derive");
let pk = PublicKey::new(xprv.private_key.public_key(&secp));
Address::p2pkh(&pk, Network::Regtest)
})
.collect()
}

/// A single transaction paying a run of consecutive fresh addresses reaching
/// far beyond the gap window (the shape of a CreateDenominations burst),
/// mined before the client ever starts. A sync from scratch must recognize
/// every output: the block is scanned to fixpoint against the extending
/// pool, so outputs past the initial look-ahead are still credited.
#[tokio::test]
async fn test_burst_payment_beyond_gap_window_synced_from_scratch() {
let Some(ctx) = TestContext::new(TestChain::Minimal).await else {
return;
};
if !ctx.dashd.supports_mining {
eprintln!("Skipping test (dashd RPC miner not available)");
return;
}

let burst = DEFAULT_EXTERNAL_GAP_LIMIT + 21;
let addresses = derive_external_addresses(EMPTY_MNEMONIC, burst);
let per_output = Amount::from_sat(100_000);
let payments: Vec<(Address, Amount)> =
addresses.into_iter().map(|address| (address, per_output)).collect();
let burst_txid = ctx.dashd.node.send_many(&payments);

let miner_address = ctx.dashd.node.get_new_address_from_wallet("default");
ctx.dashd.node.generate_blocks(1, &miner_address);
let funded_height = ctx.dashd.initial_height + 1;

// Only now create the wallet and start the client, so discovery has to
// climb the whole burst during the historical scan.
let (wallet, wallet_id) = create_test_wallet(EMPTY_MNEMONIC, Network::Regtest);
let mut client_handle = create_and_start_client(&ctx.client_config, Arc::clone(&wallet)).await;
wait_for_sync(&mut client_handle.progress_receiver, funded_height).await;
wait_for_wallet_synced(&wallet, &wallet_id, funded_height).await;

assert_eq!(
count_wallet_transactions(&wallet, &wallet_id).await,
1,
"burst tx {} must be discovered",
burst_txid
);
assert_eq!(
get_spendable_balance(&wallet, &wallet_id).await,
burst as u64 * per_output.to_sat(),
"every burst output must be credited, not only those inside the initial gap window"
);

client_handle.stop().await;
}

async fn reserve_first_address(mnemonic: &str) -> Address {
let (temp_mgr, temp_id) = create_test_wallet(mnemonic, Network::Regtest);

Expand Down
Loading