From b7b44b361fc30c20cb481984e3b7704a55bb2bed Mon Sep 17 00:00:00 2001 From: xdustinface Date: Sun, 14 Dec 2025 10:39:02 +1100 Subject: [PATCH] feat: implement parallel filter matching --- dash-spv/src/sync/filters/matching.rs | 18 +- key-wallet-manager/Cargo.toml | 1 + key-wallet-manager/src/test_utils/wallet.rs | 9 + key-wallet-manager/src/wallet_interface.rs | 5 + .../src/wallet_manager/matching.rs | 169 ++++++++++++++++++ key-wallet-manager/src/wallet_manager/mod.rs | 2 + .../src/wallet_manager/process_block.rs | 7 + .../tests/check_compact_filters_tests.rs | 149 +++++++++++++++ 8 files changed, 356 insertions(+), 4 deletions(-) create mode 100644 key-wallet-manager/src/wallet_manager/matching.rs create mode 100644 key-wallet-manager/tests/check_compact_filters_tests.rs diff --git a/dash-spv/src/sync/filters/matching.rs b/dash-spv/src/sync/filters/matching.rs index 56161a148..21e8b0d34 100644 --- a/dash-spv/src/sync/filters/matching.rs +++ b/dash-spv/src/sync/filters/matching.rs @@ -8,16 +8,16 @@ //! - Efficient filter matching using BIP158 algorithms //! - Block download coordination for matches +use crate::error::{SyncError, SyncResult}; +use crate::network::NetworkManager; +use crate::storage::StorageManager; use dashcore::{ bip158::{BlockFilterReader, Error as Bip158Error}, network::message::NetworkMessage, network::message_blockdata::Inventory, BlockHash, ScriptBuf, }; - -use crate::error::{SyncError, SyncResult}; -use crate::network::NetworkManager; -use crate::storage::StorageManager; +use key_wallet_manager::wallet_manager::{FilterMatchInput, FilterMatchOutput}; impl super::manager::FilterSyncManager { pub async fn check_filter_for_matches< @@ -41,6 +41,16 @@ impl super::manager::FilterSyncManager( + &self, + input_map: FilterMatchInput, + wallet: &W, + ) -> FilterMatchOutput { + wallet.check_compact_filters(input_map).await + } + /// Check if filter matches any of the provided scripts using BIP158 GCS filter. #[allow(dead_code)] fn filter_matches_scripts( diff --git a/key-wallet-manager/Cargo.toml b/key-wallet-manager/Cargo.toml index 255280229..9e7d7d432 100644 --- a/key-wallet-manager/Cargo.toml +++ b/key-wallet-manager/Cargo.toml @@ -25,6 +25,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti async-trait = "0.1" bincode = { version = "=2.0.0-rc.3", optional = true } zeroize = { version = "1.8", features = ["derive"] } +rayon = "1.11" tokio = { version = "1.32", features = ["full"] } [dev-dependencies] diff --git a/key-wallet-manager/src/test_utils/wallet.rs b/key-wallet-manager/src/test_utils/wallet.rs index d15a44074..3e2e120f9 100644 --- a/key-wallet-manager/src/test_utils/wallet.rs +++ b/key-wallet-manager/src/test_utils/wallet.rs @@ -3,6 +3,7 @@ use std::{collections::BTreeMap, sync::Arc}; use dashcore::{Block, Transaction, Txid}; use tokio::sync::Mutex; +use crate::wallet_manager::{FilterMatchInput, FilterMatchOutput}; use crate::{wallet_interface::WalletInterface, BlockProcessingResult}; // Type alias for transaction effects map @@ -74,6 +75,10 @@ impl WalletInterface for MockWallet { let map = self.effects.lock().await; map.get(&tx.txid()).cloned() } + + async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput { + FilterMatchOutput::default() + } } /// Mock wallet that returns false for filter checks @@ -103,6 +108,10 @@ impl WalletInterface for NonMatchingMockWallet { false } + async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput { + FilterMatchOutput::default() + } + async fn describe(&self) -> String { "NonMatchingWallet (test implementation)".to_string() } diff --git a/key-wallet-manager/src/wallet_interface.rs b/key-wallet-manager/src/wallet_interface.rs index 445a9cc28..f812c4a4b 100644 --- a/key-wallet-manager/src/wallet_interface.rs +++ b/key-wallet-manager/src/wallet_interface.rs @@ -2,6 +2,7 @@ //! //! This module defines the trait that SPV clients use to interact with wallets. +use crate::wallet_manager::{FilterMatchInput, FilterMatchOutput}; use alloc::string::String; use alloc::vec::Vec; use async_trait::async_trait; @@ -41,6 +42,10 @@ pub trait WalletInterface: Send + Sync + 'static { block_hash: &dashcore::BlockHash, ) -> bool; + /// Check compact filters against watched addresses in batch + /// Returns map of filter keys to match results + async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput; + /// Return the wallet's per-transaction net change and involved addresses if known. /// Returns (net_amount, addresses) where net_amount is received - sent in satoshis. /// If the wallet has no record for the transaction, returns None. diff --git a/key-wallet-manager/src/wallet_manager/matching.rs b/key-wallet-manager/src/wallet_manager/matching.rs new file mode 100644 index 000000000..dfebd165c --- /dev/null +++ b/key-wallet-manager/src/wallet_manager/matching.rs @@ -0,0 +1,169 @@ +use alloc::vec::Vec; +use dashcore::bip158::BlockFilter; +use dashcore::prelude::CoreBlockHeight; +use dashcore::{Address, BlockHash}; +use rayon::prelude::{IntoParallelIterator, ParallelIterator}; +use std::collections::{BTreeSet, HashMap}; + +pub type FilterMatchInput = HashMap; +pub type FilterMatchOutput = BTreeSet; + +#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct FilterMatchKey { + height: CoreBlockHeight, + hash: BlockHash, +} + +impl FilterMatchKey { + pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self { + Self { + height, + hash, + } + } + pub fn height(&self) -> CoreBlockHeight { + self.height + } + pub fn hash(&self) -> &BlockHash { + &self.hash + } +} + +/// Check compact filters for addresses and return the keys that matched. +pub fn check_compact_filters_for_addresses( + input: FilterMatchInput, + addresses: Vec
, +) -> FilterMatchOutput { + let script_pubkey_bytes: Vec> = + addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect(); + + input + .into_par_iter() + .filter_map(|(key, filter)| { + filter + .match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice())) + .unwrap_or(false) + .then_some(key) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::Network; + use dashcore::{Block, Transaction}; + + #[test] + fn test_empty_input_returns_empty() { + let result = check_compact_filters_for_addresses(FilterMatchInput::new(), vec![]); + assert!(result.is_empty()); + } + + #[test] + fn test_empty_addresses_returns_empty() { + let address = Address::dummy(Network::Regtest, 1); + let tx = Transaction::dummy(&address, 0..0, &[1]); + let block = Block::dummy(100, vec![tx]); + let filter = BlockFilter::dummy(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![]); + assert!(!output.contains(&key)); + } + + #[test] + fn test_matching_filter() { + let address = Address::dummy(Network::Regtest, 1); + let tx = Transaction::dummy(&address, 0..0, &[1]); + let block = Block::dummy(100, vec![tx]); + let filter = BlockFilter::dummy(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![address]); + assert!(output.contains(&key)); + } + + #[test] + fn test_non_matching_filter() { + let address = Address::dummy(Network::Regtest, 1); + let address_other = Address::dummy(Network::Regtest, 2); + + let tx = Transaction::dummy(&address_other, 0..0, &[1]); + let block = Block::dummy(100, vec![tx]); + let filter = BlockFilter::dummy(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = check_compact_filters_for_addresses(input, vec![address]); + assert!(!output.contains(&key)); + } + + #[test] + fn test_batch_mixed_results() { + let unrelated_address = Address::dummy(Network::Regtest, 0); + let address_1 = Address::dummy(Network::Regtest, 1); + let address_2 = Address::dummy(Network::Regtest, 2); + + let tx_1 = Transaction::dummy(&address_1, 0..0, &[1]); + let block_1 = Block::dummy(100, vec![tx_1]); + let filter_1 = BlockFilter::dummy(&block_1); + let key_1 = FilterMatchKey::new(100, block_1.block_hash()); + + let tx_2 = Transaction::dummy(&address_2, 0..0, &[2]); + let block_2 = Block::dummy(100, vec![tx_2]); + let filter_2 = BlockFilter::dummy(&block_2); + let key_2 = FilterMatchKey::new(200, block_2.block_hash()); + + let tx_3 = Transaction::dummy(&unrelated_address, 0..0, &[10]); + let block_3 = Block::dummy(100, vec![tx_3]); + let filter_3 = BlockFilter::dummy(&block_3); + let key_3 = FilterMatchKey::new(300, block_3.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key_1.clone(), filter_1); + input.insert(key_2.clone(), filter_2); + input.insert(key_3.clone(), filter_3); + + let output = check_compact_filters_for_addresses(input, vec![address_1, address_2]); + assert_eq!(output.len(), 2); + assert!(output.contains(&key_1)); + assert!(output.contains(&key_2)); + assert!(!output.contains(&key_3)); + } + + #[test] + fn test_output_sorted_by_height() { + let address = Address::dummy(Network::Regtest, 1); + + // Create blocks at different heights (inserted in non-sorted order) + let heights = [500, 100, 300, 200, 400]; + let mut input = FilterMatchInput::new(); + + for (i, &height) in heights.iter().enumerate() { + let tx = Transaction::dummy(&address, 0..0, &[i as u64]); + let block = Block::dummy(height, vec![tx]); + let filter = BlockFilter::dummy(&block); + let key = FilterMatchKey::new(height, block.block_hash()); + input.insert(key, filter); + } + + let output = check_compact_filters_for_addresses(input, vec![address]); + + // Verify output is sorted by height (ascending) + let heights_out: Vec = output.iter().map(|k| k.height()).collect(); + let mut sorted_heights = heights_out.clone(); + sorted_heights.sort(); + + assert_eq!(heights_out, sorted_heights); + assert_eq!(heights_out, vec![100, 200, 300, 400, 500]); + } +} diff --git a/key-wallet-manager/src/wallet_manager/mod.rs b/key-wallet-manager/src/wallet_manager/mod.rs index f40764d19..0c54ee8d7 100644 --- a/key-wallet-manager/src/wallet_manager/mod.rs +++ b/key-wallet-manager/src/wallet_manager/mod.rs @@ -4,9 +4,11 @@ //! each of which can have multiple accounts. This follows the architecture //! pattern where a manager oversees multiple distinct wallets. +mod matching; mod process_block; mod transaction_building; +pub use crate::wallet_manager::matching::{FilterMatchInput, FilterMatchKey, FilterMatchOutput}; use alloc::collections::BTreeMap; use alloc::string::String; use alloc::vec::Vec; diff --git a/key-wallet-manager/src/wallet_manager/process_block.rs b/key-wallet-manager/src/wallet_manager/process_block.rs index 93690bfa7..57d312727 100644 --- a/key-wallet-manager/src/wallet_manager/process_block.rs +++ b/key-wallet-manager/src/wallet_manager/process_block.rs @@ -1,4 +1,7 @@ use crate::wallet_interface::{BlockProcessingResult, WalletInterface}; +use crate::wallet_manager::matching::{ + check_compact_filters_for_addresses, FilterMatchInput, FilterMatchOutput, +}; use crate::WalletManager; use alloc::string::String; use alloc::vec::Vec; @@ -80,6 +83,10 @@ impl WalletInterface for WalletM hit } + async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput { + check_compact_filters_for_addresses(input, self.monitored_addresses()) + } + async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec)> { // Aggregate across all managed wallets. If any wallet considers it relevant, // compute net = total_received - total_sent and collect involved addresses. diff --git a/key-wallet-manager/tests/check_compact_filters_tests.rs b/key-wallet-manager/tests/check_compact_filters_tests.rs new file mode 100644 index 000000000..5280f4528 --- /dev/null +++ b/key-wallet-manager/tests/check_compact_filters_tests.rs @@ -0,0 +1,149 @@ +//! Integration tests for WalletInterface::check_compact_filters + +use dashcore::bip158::BlockFilter; +use dashcore::{Address, Block, Network, Transaction}; +use key_wallet::wallet::initialization::WalletAccountCreationOptions; +use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; +use key_wallet_manager::wallet_interface::WalletInterface; +use key_wallet_manager::wallet_manager::WalletManager; +use key_wallet_manager::wallet_manager::{FilterMatchInput, FilterMatchKey}; + +#[tokio::test] +async fn test_check_compact_filters_empty_input() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let input = FilterMatchInput::new(); + let output = manager.check_compact_filters(input).await; + + assert!(output.is_empty()); +} + +#[tokio::test] +async fn test_check_compact_filters_no_matches() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + // Create blocks with transactions to unrelated addresses + let unrelated_1 = Address::dummy(Network::Regtest, 100); + let unrelated_2 = Address::dummy(Network::Regtest, 101); + + let tx1 = Transaction::dummy(&unrelated_1, 0..0, &[1000]); + let block1 = Block::dummy(100, vec![tx1]); + let filter1 = BlockFilter::dummy(&block1); + let key1 = FilterMatchKey::new(100, block1.block_hash()); + + let tx2 = Transaction::dummy(&unrelated_2, 0..0, &[2000]); + let block2 = Block::dummy(200, vec![tx2]); + let filter2 = BlockFilter::dummy(&block2); + let key2 = FilterMatchKey::new(200, block2.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key1, filter1); + input.insert(key2, filter2); + + let output = manager.check_compact_filters(input).await; + + assert!(output.is_empty()); +} + +#[tokio::test] +async fn test_check_compact_filters_batch_mixed_results() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(!addresses.is_empty()); + let wallet_address = &addresses[0]; + + // Block with transaction to wallet address (should match) + let tx_match = Transaction::dummy(wallet_address, 0..0, &[1000]); + let block_match = Block::dummy(100, vec![tx_match]); + let filter_match = BlockFilter::dummy(&block_match); + let key_match = FilterMatchKey::new(100, block_match.block_hash()); + + // Block with transaction to unrelated address (should not match) + let unrelated = Address::dummy(Network::Regtest, 999); + let tx_no_match = Transaction::dummy(&unrelated, 0..0, &[2000]); + let block_no_match = Block::dummy(200, vec![tx_no_match]); + let filter_no_match = BlockFilter::dummy(&block_no_match); + let key_no_match = FilterMatchKey::new(200, block_no_match.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key_match.clone(), filter_match); + input.insert(key_no_match.clone(), filter_no_match); + + let output = manager.check_compact_filters(input).await; + + assert_eq!(output.len(), 1); + assert!(output.contains(&key_match)); + assert!(!output.contains(&key_no_match)); +} + +#[tokio::test] +async fn test_check_compact_filters_multiple_matching_addresses() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(addresses.len() >= 2, "Need at least 2 addresses for this test"); + + // Block with transactions to multiple wallet addresses + let tx1 = Transaction::dummy(&addresses[0], 0..0, &[1000]); + let tx2 = Transaction::dummy(&addresses[1], 0..0, &[2000]); + let block = Block::dummy(100, vec![tx1, tx2]); + let filter = BlockFilter::dummy(&block); + let key = FilterMatchKey::new(100, block.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key.clone(), filter); + + let output = manager.check_compact_filters(input).await; + + assert!(output.contains(&key)); +} + +#[tokio::test] +async fn test_check_compact_filters_all_match() { + let mut manager = WalletManager::::new(Network::Testnet); + + let _wallet_id = manager + .create_wallet_with_random_mnemonic(WalletAccountCreationOptions::Default) + .expect("Failed to create wallet"); + + let addresses = manager.monitored_addresses(); + assert!(addresses.len() >= 2, "Need at least 2 addresses"); + + // Two separate blocks, each with a transaction to a wallet address + let tx1 = Transaction::dummy(&addresses[0], 0..0, &[1000]); + let block1 = Block::dummy(100, vec![tx1]); + let filter1 = BlockFilter::dummy(&block1); + let key1 = FilterMatchKey::new(100, block1.block_hash()); + + let tx2 = Transaction::dummy(&addresses[1], 0..0, &[2000]); + let block2 = Block::dummy(200, vec![tx2]); + let filter2 = BlockFilter::dummy(&block2); + let key2 = FilterMatchKey::new(200, block2.block_hash()); + + let mut input = FilterMatchInput::new(); + input.insert(key1.clone(), filter1); + input.insert(key2.clone(), filter2); + + let output = manager.check_compact_filters(input).await; + + assert_eq!(output.len(), 2); + assert!(output.contains(&key1)); + assert!(output.contains(&key2)); +}