From e9d5aa3f4bb72afd7b6f12e5c4ad1efedbe23ebc Mon Sep 17 00:00:00 2001 From: Boris Oncev Date: Wed, 18 Feb 2026 16:54:35 +0700 Subject: [PATCH 1/2] Wallet sync with the mempool after initial sync --- test/functional/test_runner.py | 1 + test/functional/wallet_scan_mempool.py | 238 ++++++++++++++++++ test/functional/wallet_watch_address.py | 16 +- wallet/wallet-controller/src/lib.rs | 25 ++ .../wallet-controller/src/sync/tests/mod.rs | 4 + .../src/handles_client/mod.rs | 5 + wallet/wallet-node-client/src/mock.rs | 4 + wallet/wallet-node-client/src/node_traits.rs | 1 + .../src/rpc_client/client_impl.rs | 7 + .../src/rpc_client/cold_wallet_client.rs | 4 + 10 files changed, 296 insertions(+), 9 deletions(-) create mode 100644 test/functional/wallet_scan_mempool.py diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 6cccb1cd4..b0a1065c3 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -155,6 +155,7 @@ class UnicodeOnWindowsError(ValueError): 'wallet_sweep_delegation.py', 'wallet_recover_accounts.py', 'wallet_mempool_events.py', + 'wallet_scan_mempool.py', 'wallet_tokens.py', 'wallet_tokens_freeze.py', 'wallet_tokens_transfer_from_multisig_addr.py', diff --git a/test/functional/wallet_scan_mempool.py b/test/functional/wallet_scan_mempool.py new file mode 100644 index 000000000..10dcdbbf3 --- /dev/null +++ b/test/functional/wallet_scan_mempool.py @@ -0,0 +1,238 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 RBB S.r.l +# Copyright (c) 2017-2021 The Bitcoin Core developers +# opensource@mintlayer.org +# SPDX-License-Identifier: MIT +# Licensed under the MIT License; +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Wallet scan mempool on startup test + +Check that: +* We create 2 wallets with same mnemonic, +* get an address from the first wallet +* send coins to the wallet's address +* sync both wallets with the node +* check balance in both wallets +* close the second wallet +* from the first wallet send coins from Acc 0 to Acc 1 without creating a block +* reopen the second wallet and create a third wallet with the same mnemonic +* they both should get the new Tx from the mempool upon creation/opening +* second wallet can create a new unconfirmed Tx on top of the Tx in mempool +""" + +import asyncio + +from test_framework.mintlayer import (ATOMS_PER_COIN, block_input_data_obj, + make_tx, reward_input) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal, assert_in +from test_framework.wallet_cli_controller import (DEFAULT_ACCOUNT_INDEX, + WalletCliController) + + +class WalletMempoolScanning(BitcoinTestFramework): + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [ + [ + "--blockprod-min-peers-to-produce-blocks=0", + ] + ] + + def setup_network(self): + self.setup_nodes() + self.sync_all(self.nodes[0:1]) + + def generate_block(self, transactions=[]): + node = self.nodes[0] + + block_input_data = {"PoW": {"reward_destination": "AnyoneCanSpend"}} + block_input_data = block_input_data_obj.encode(block_input_data).to_hex()[2:] + + # create a new block, taking transactions from mempool + block = node.blockprod_generate_block( + block_input_data, transactions, [], "FillSpaceFromMempool" + ) + node.chainstate_submit_block(block) + block_id = node.chainstate_best_block_id() + + # Wait for mempool to sync + self.wait_until( + lambda: node.mempool_local_best_block_id() == block_id, timeout=5 + ) + + return block_id + + def run_test(self): + asyncio.run(self.async_test()) + + async def async_test(self): + node = self.nodes[0] + async with WalletCliController( + node, self.config, self.log + ) as wallet, WalletCliController(node, self.config, self.log) as wallet2: + # new wallet + await wallet.create_wallet() + # create wallet2 with the same mnemonic + mnemonic = await wallet.show_seed_phrase() + assert mnemonic is not None + wallet2_name = "wallet2" + assert_in( + "Wallet recovered successfully", + await wallet2.recover_wallet(mnemonic, name=wallet2_name), + ) + + # check it is on genesis + best_block_height = await wallet.get_best_block_height() + self.log.info(f"best block height = {best_block_height}") + assert_equal(best_block_height, "0") + best_block_height = await wallet2.get_best_block_height() + assert_equal(best_block_height, "0") + + # new address + pub_key_bytes = await wallet.new_public_key() + assert_equal(len(pub_key_bytes), 33) + + # Get chain tip + tip_id = node.chainstate_best_block_id() + + # Submit a valid transaction + token_fee = 1000 + coins_to_send = 1 + token_fee_output = { + "Transfer": [ + {"Coin": token_fee * ATOMS_PER_COIN}, + { + "PublicKey": { + "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} + } + }, + ], + } + tx_fee_output = { + "Transfer": [ + {"Coin": coins_to_send * ATOMS_PER_COIN}, + { + "PublicKey": { + "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} + } + }, + ], + } + encoded_tx, tx_id = make_tx( + [reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0 + ) + + self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}") + + assert_in("No transaction found", await wallet.get_transaction(tx_id)) + + node.mempool_submit_transaction(encoded_tx, {}) + assert node.mempool_contains_tx(tx_id) + + self.generate_block() + assert not node.mempool_contains_tx(tx_id) + + # sync the wallet + assert_in("Success", await wallet.sync()) + assert_in("Success", await wallet2.sync()) + + acc0_address = await wallet.new_address() + + # both wallets have the same balances after syncing the new block + assert_in( + f"Coins amount: {coins_to_send * 2 + token_fee}", + await wallet.get_balance(), + ) + assert_in( + f"Coins amount: {coins_to_send * 2 + token_fee}", + await wallet2.get_balance(), + ) + + # create new account and get an address + assert_in("Success", await wallet.create_new_account()) + assert_in("Success", await wallet2.create_new_account()) + assert_in("Success", await wallet.select_account(1)) + acc1_address = await wallet.new_address() + + # close wallet2 + await wallet2.close_wallet() + + # go back to Acc 0 and send 1 coin to Acc 1 + coins_to_send = 2 + assert_in("Success", await wallet.select_account(DEFAULT_ACCOUNT_INDEX)) + assert_in( + "The transaction was submitted successfully", + await wallet.send_to_address(acc1_address, coins_to_send), + ) + + # check mempool has 1 transaction now + transactions = node.mempool_transactions() + assert_equal(len(transactions), 1) + + # check wallet 1 has it as pending + pending_txs = await wallet.list_pending_transactions() + assert_equal(1, len(pending_txs)) + transfer_tx_id = pending_txs[0] + + # reopen wallet2 and sync it will scan the mempool on first sync + await wallet2.open_wallet(wallet2_name) + assert_in("Success", await wallet2.sync()) + + # check wallet 2 has the new tx from scanning the mempool + pending_txs = await wallet2.list_pending_transactions() + assert_equal(1, len(pending_txs)) + assert_equal(transfer_tx_id, pending_txs[0]) + + assert_in("Success", await wallet.select_account(1)) + # wallet 2 should automatically recover Acc 1 + assert_in("Success", await wallet2.select_account(1)) + + # check both balances have `coins_to_send` coins in-mempool state + assert_in( + f"Coins amount: {coins_to_send}", + await wallet.get_balance(utxo_states=["in-mempool"]), + ) + assert_in( + f"Coins amount: {coins_to_send}", + await wallet2.get_balance(utxo_states=["in-mempool"]), + ) + + # check wallet2 can send 1 coin back to Acc0 from the not yet confirmed tx in mempool + assert_in( + "The transaction was submitted successfully", + await wallet2.send_to_address(acc0_address, 1), + ) + + # close wallet2 and recover a new one with the same mnemonic + await wallet2.close_wallet() + assert_in( + "Wallet recovered successfully", + await wallet2.recover_wallet(mnemonic), + ) + # sync the new wallet2 + assert_in("Success", await wallet2.sync()) + # check wallet 2 has the new tx from scanning the mempool + pending_txs = await wallet2.list_pending_transactions() + assert_equal(2, len(pending_txs)) + assert_in(transfer_tx_id, pending_txs) + + self.generate_block() + + assert_in("Success", await wallet.sync()) + assert_in("Success", await wallet2.sync()) + + +if __name__ == "__main__": + WalletMempoolScanning().main() diff --git a/test/functional/wallet_watch_address.py b/test/functional/wallet_watch_address.py index 525769038..9c38a4990 100644 --- a/test/functional/wallet_watch_address.py +++ b/test/functional/wallet_watch_address.py @@ -143,24 +143,23 @@ async def async_test(self): output = await wallet.send_to_address(address_from_wallet1, 1) assert_in("The transaction was submitted successfully", output) receive_coins_tx_id = output.splitlines()[-1] + tx = await wallet.get_raw_signed_transaction(receive_coins_tx_id) # check in wallet2 await wallet.close_wallet() await wallet.open_wallet('wallet2') + assert_in("Success", await wallet.sync()) # tx is still in mempool assert node.mempool_contains_tx(receive_coins_tx_id) - assert_in("No transaction found", await wallet.get_raw_signed_transaction(receive_coins_tx_id)) + # wallet2 should also have the tx because it scanned the mempool + assert_equal(tx, await wallet.get_raw_signed_transaction(receive_coins_tx_id)) block_id = self.generate_block() assert not node.mempool_contains_tx(receive_coins_tx_id) assert_in("Success", await wallet.sync()) - # after syncing the tx should be found - assert_not_in("No transaction found", await wallet.get_raw_signed_transaction(receive_coins_tx_id)) - - # go back to wallet 1 await wallet.close_wallet() await wallet.open_wallet('wallet1') @@ -176,19 +175,18 @@ async def async_test(self): # go back to wallet 2 await wallet.close_wallet() await wallet.open_wallet('wallet2') + assert_in("Success", await wallet.sync()) # tx is still in mempool assert node.mempool_contains_tx(send_coins_tx_id) - assert_in("No transaction found", await wallet.get_raw_signed_transaction(send_coins_tx_id)) + # wallet2 should again have the tx present + assert_not_in("No transaction found", await wallet.get_raw_signed_transaction(send_coins_tx_id)) block_id = self.generate_block() assert not node.mempool_contains_tx(send_coins_tx_id) assert_in("Success", await wallet.sync()) - # after syncing the tx should be found - assert_not_in("No transaction found", await wallet.get_raw_signed_transaction(send_coins_tx_id)) - output = await wallet.get_standalone_addresses() assert_in(address_from_wallet1, output) if label: diff --git a/wallet/wallet-controller/src/lib.rs b/wallet/wallet-controller/src/lib.rs index 76b6498a1..59b13c01e 100644 --- a/wallet/wallet-controller/src/lib.rs +++ b/wallet/wallet-controller/src/lib.rs @@ -207,6 +207,7 @@ pub struct Controller { wallet_events: W, mempool_events: MempoolEvents, + finished_initial_sync: bool, } impl std::fmt::Debug for Controller { @@ -243,6 +244,7 @@ where staking_started: BTreeSet::new(), wallet_events, mempool_events, + finished_initial_sync: false, }; log::info!("Syncing the wallet..."); @@ -268,6 +270,7 @@ where staking_started: BTreeSet::new(), wallet_events, mempool_events, + finished_initial_sync: false, }) } @@ -1360,6 +1363,28 @@ where } } + // after the first successful sync to the tip fetch all mempool transactions + if !self.finished_initial_sync { + let txs = self.rpc_client.mempool_get_transactions().await; + + match txs { + Ok(txs) => { + if let Err(err) = + self.wallet.add_mempool_transactions(&txs, &self.wallet_events) + { + log::error!("Txs from mempool failed to be added in the wallet because of an error: {err}"); + } else { + self.finished_initial_sync = true; + } + } + Err(err) => { + log::error!("Failed to fetch all transactios from the mempool: {err}"); + tokio::time::sleep(ERROR_DELAY).await; + continue; + } + } + } + let mut delay = Box::pin(tokio::time::sleep(NORMAL_DELAY)); loop { diff --git a/wallet/wallet-controller/src/sync/tests/mod.rs b/wallet/wallet-controller/src/sync/tests/mod.rs index 8082f580c..c90c050f5 100644 --- a/wallet/wallet-controller/src/sync/tests/mod.rs +++ b/wallet/wallet-controller/src/sync/tests/mod.rs @@ -446,6 +446,10 @@ impl NodeInterface for MockNode { unreachable!() } + async fn mempool_get_transactions(&self) -> Result, Self::Error> { + unreachable!() + } + async fn mempool_subscribe_to_events(&self) -> Result { unreachable!() } diff --git a/wallet/wallet-node-client/src/handles_client/mod.rs b/wallet/wallet-node-client/src/handles_client/mod.rs index da5c45598..ba21ab10a 100644 --- a/wallet/wallet-node-client/src/handles_client/mod.rs +++ b/wallet/wallet-node-client/src/handles_client/mod.rs @@ -479,4 +479,9 @@ impl NodeInterface for WalletHandlesClient { let res = self.mempool.call(move |this| this.transaction(&tx_id)).await?; Ok(res) } + + async fn mempool_get_transactions(&self) -> Result, Self::Error> { + let res = self.mempool.call(move |this| this.get_all()).await?; + Ok(res) + } } diff --git a/wallet/wallet-node-client/src/mock.rs b/wallet/wallet-node-client/src/mock.rs index 6a448f408..cbe071e5c 100644 --- a/wallet/wallet-node-client/src/mock.rs +++ b/wallet/wallet-node-client/src/mock.rs @@ -322,6 +322,10 @@ impl NodeInterface for ClonableMockNodeInterface { self.lock().await.mempool_get_transaction(tx_id).await } + async fn mempool_get_transactions(&self) -> Result, Self::Error> { + self.lock().await.mempool_get_transactions().await + } + async fn mempool_subscribe_to_events(&self) -> Result { self.lock().await.mempool_subscribe_to_events().await } diff --git a/wallet/wallet-node-client/src/node_traits.rs b/wallet/wallet-node-client/src/node_traits.rs index f33403936..87c1093e1 100644 --- a/wallet/wallet-node-client/src/node_traits.rs +++ b/wallet/wallet-node-client/src/node_traits.rs @@ -152,6 +152,7 @@ pub trait NodeInterface { &self, tx_id: Id, ) -> Result, Self::Error>; + async fn mempool_get_transactions(&self) -> Result, Self::Error>; async fn mempool_subscribe_to_events(&self) -> Result; async fn get_utxo(&self, outpoint: UtxoOutPoint) -> Result, Self::Error>; diff --git a/wallet/wallet-node-client/src/rpc_client/client_impl.rs b/wallet/wallet-node-client/src/rpc_client/client_impl.rs index 7be819c4d..d7be5f880 100644 --- a/wallet/wallet-node-client/src/rpc_client/client_impl.rs +++ b/wallet/wallet-node-client/src/rpc_client/client_impl.rs @@ -419,6 +419,13 @@ impl NodeInterface for NodeRpcClient { .map(|opt| opt.map(|resp| resp.transaction.take())) } + async fn mempool_get_transactions(&self) -> Result, Self::Error> { + MempoolRpcClient::get_all_transactions(&*self.rpc_client) + .await + .map_err(NodeRpcError::ResponseError) + .map(|txs| txs.into_iter().map(|tx| tx.take()).collect()) + } + async fn mempool_subscribe_to_events(&self) -> Result { let subscription = MempoolRpcClient::subscribe_to_events(&*self.rpc_client) .await diff --git a/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs b/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs index 4c9eb1825..743bef161 100644 --- a/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs +++ b/wallet/wallet-node-client/src/rpc_client/cold_wallet_client.rs @@ -301,6 +301,10 @@ impl NodeInterface for ColdWalletClient { Err(ColdWalletRpcError::NotAvailable) } + async fn mempool_get_transactions(&self) -> Result, Self::Error> { + Err(ColdWalletRpcError::NotAvailable) + } + async fn get_utxo( &self, _outpoint: common::chain::UtxoOutPoint, From 3454e254163cdfa9308f1e689cf6d9c0007edaa4 Mon Sep 17 00:00:00 2001 From: Boris Oncev Date: Sun, 22 Feb 2026 12:55:48 +0700 Subject: [PATCH 2/2] fix comments --- test/functional/wallet_mempool_events.py | 23 +++++------------------ test/functional/wallet_scan_mempool.py | 23 +++++------------------ wallet/wallet-controller/src/lib.rs | 15 ++++++++------- 3 files changed, 18 insertions(+), 43 deletions(-) diff --git a/test/functional/wallet_mempool_events.py b/test/functional/wallet_mempool_events.py index 083a68711..229ee9126 100644 --- a/test/functional/wallet_mempool_events.py +++ b/test/functional/wallet_mempool_events.py @@ -104,21 +104,10 @@ async def async_test(self): tip_id = node.chainstate_best_block_id() # Submit a valid transaction - token_fee = 1000 - coins_to_send = 1 - token_fee_output = { - "Transfer": [ - {"Coin": token_fee * ATOMS_PER_COIN}, - { - "PublicKey": { - "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} - } - }, - ], - } + total_coins = 100 tx_fee_output = { "Transfer": [ - {"Coin": coins_to_send * ATOMS_PER_COIN}, + {"Coin": total_coins * ATOMS_PER_COIN}, { "PublicKey": { "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} @@ -126,9 +115,7 @@ async def async_test(self): }, ], } - encoded_tx, tx_id = make_tx( - [reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0 - ) + encoded_tx, tx_id = make_tx([reward_input(tip_id)], [tx_fee_output], 0) self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}") @@ -148,11 +135,11 @@ async def async_test(self): # both wallets have the same balances after syncing the new block assert_in( - f"Coins amount: {coins_to_send * 2 + token_fee}", + f"Coins amount: {total_coins}", await wallet.get_balance(), ) assert_in( - f"Coins amount: {coins_to_send * 2 + token_fee}", + f"Coins amount: {total_coins}", await wallet2.get_balance(), ) diff --git a/test/functional/wallet_scan_mempool.py b/test/functional/wallet_scan_mempool.py index 10dcdbbf3..655b262a8 100644 --- a/test/functional/wallet_scan_mempool.py +++ b/test/functional/wallet_scan_mempool.py @@ -108,21 +108,10 @@ async def async_test(self): tip_id = node.chainstate_best_block_id() # Submit a valid transaction - token_fee = 1000 - coins_to_send = 1 - token_fee_output = { - "Transfer": [ - {"Coin": token_fee * ATOMS_PER_COIN}, - { - "PublicKey": { - "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} - } - }, - ], - } + total_coins = 100 tx_fee_output = { "Transfer": [ - {"Coin": coins_to_send * ATOMS_PER_COIN}, + {"Coin": total_coins * ATOMS_PER_COIN}, { "PublicKey": { "key": {"Secp256k1Schnorr": {"pubkey_data": pub_key_bytes}} @@ -130,9 +119,7 @@ async def async_test(self): }, ], } - encoded_tx, tx_id = make_tx( - [reward_input(tip_id)], [token_fee_output] + [tx_fee_output] * 2, 0 - ) + encoded_tx, tx_id = make_tx([reward_input(tip_id)], [tx_fee_output], 0) self.log.debug(f"Encoded transaction {tx_id}: {encoded_tx}") @@ -152,11 +139,11 @@ async def async_test(self): # both wallets have the same balances after syncing the new block assert_in( - f"Coins amount: {coins_to_send * 2 + token_fee}", + f"Coins amount: {total_coins}", await wallet.get_balance(), ) assert_in( - f"Coins amount: {coins_to_send * 2 + token_fee}", + f"Coins amount: {total_coins}", await wallet2.get_balance(), ) diff --git a/wallet/wallet-controller/src/lib.rs b/wallet/wallet-controller/src/lib.rs index 59b13c01e..54074d7df 100644 --- a/wallet/wallet-controller/src/lib.rs +++ b/wallet/wallet-controller/src/lib.rs @@ -53,6 +53,7 @@ use types::{ SeedWithPassPhrase, SignatureStats, TransactionToInspect, ValidatedSignatures, WalletInfo, WalletTypeArgsComputed, }; +use utils::set_flag::SetFlag; use wallet_storage::DefaultBackend; use read::ReadOnlyController; @@ -207,7 +208,7 @@ pub struct Controller { wallet_events: W, mempool_events: MempoolEvents, - finished_initial_sync: bool, + finished_initial_sync: SetFlag, } impl std::fmt::Debug for Controller { @@ -244,7 +245,7 @@ where staking_started: BTreeSet::new(), wallet_events, mempool_events, - finished_initial_sync: false, + finished_initial_sync: SetFlag::new(), }; log::info!("Syncing the wallet..."); @@ -270,7 +271,7 @@ where staking_started: BTreeSet::new(), wallet_events, mempool_events, - finished_initial_sync: false, + finished_initial_sync: SetFlag::new(), }) } @@ -1364,7 +1365,7 @@ where } // after the first successful sync to the tip fetch all mempool transactions - if !self.finished_initial_sync { + if !self.finished_initial_sync.test() { let txs = self.rpc_client.mempool_get_transactions().await; match txs { @@ -1372,13 +1373,13 @@ where if let Err(err) = self.wallet.add_mempool_transactions(&txs, &self.wallet_events) { - log::error!("Txs from mempool failed to be added in the wallet because of an error: {err}"); + log::error!("Error adding mempool transactions: {err}"); } else { - self.finished_initial_sync = true; + self.finished_initial_sync.set(); } } Err(err) => { - log::error!("Failed to fetch all transactios from the mempool: {err}"); + log::error!("Failed to fetch all transactions from the mempool: {err}"); tokio::time::sleep(ERROR_DELAY).await; continue; }