Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 83 additions & 58 deletions crates/bitcoind_rpc/src/bip158.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
//! [0]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
//! [1]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki

use core::fmt::Debug;

use bdk_core::bitcoin;
use bdk_core::CheckPoint;
use bdk_core::FromBlockHeader;
use bdk_core::ToBlockHash;
use bitcoin::block::Header;
use bitcoin::BlockHash;
use bitcoin::{bip158::BlockFilter, Block, ScriptBuf};
use bitcoincore_rpc;
Expand All @@ -29,22 +34,25 @@ use bitcoincore_rpc::{json::GetBlockHeaderResult, RpcApi};
/// Events contain the updated checkpoint `cp` which may be incorporated into the local chain
/// state to stay in sync with the tip.
#[derive(Debug)]
pub struct FilterIter<'a> {
pub struct FilterIter<'a, D = BlockHash> {
/// RPC client
client: &'a bitcoincore_rpc::Client,
/// SPK inventory
spks: Vec<ScriptBuf>,
/// checkpoint
cp: CheckPoint<BlockHash>,
cp: CheckPoint<D>,
/// Header info, contains the prev and next hashes for each header.
header: Option<GetBlockHeaderResult>,
}

impl<'a> FilterIter<'a> {
impl<'a, D> FilterIter<'a, D>
where
D: ToBlockHash + Clone + Debug,
{
/// Construct [`FilterIter`] with checkpoint, RPC client and SPKs.
pub fn new(
client: &'a bitcoincore_rpc::Client,
cp: CheckPoint,
cp: CheckPoint<D>,
spks: impl IntoIterator<Item = ScriptBuf>,
) -> Self {
Self {
Expand All @@ -69,13 +77,76 @@ impl<'a> FilterIter<'a> {
}
Err(Error::ReorgDepthExceeded)
}

fn try_next_with<F>(&mut self, to_data: F) -> Result<Option<Event<D>>, Error>
where
F: Fn(Header) -> D,
{
let mut cp = self.cp.clone();

let header = match self.header.take() {
Some(header) => header,
// If no header is cached we need to locate a base of the local
// checkpoint from which the scan may proceed.
None => self.find_base()?,
};

let mut next_hash = match header.next_block_hash {
Some(hash) => hash,
None => return Ok(None),
};

let mut next_header = self.client.get_block_header_info(&next_hash)?;

// In case of a reorg, rewind by fetching headers of previous hashes until we find
// one with enough confirmations.
while next_header.confirmations < 0 {
let prev_hash = next_header
.previous_block_hash
.ok_or(Error::ReorgDepthExceeded)?;
let prev_header = self.client.get_block_header_info(&prev_hash)?;
next_header = prev_header;
}

next_hash = next_header.hash;
let next_height: u32 = next_header.height.try_into()?;

cp = cp.insert(
next_height,
to_data(self.client.get_block_header(&next_hash)?),
);

let mut block = None;
let filter = BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
if filter
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
.map_err(Error::Bip158)?
{
block = Some(self.client.get_block(&next_hash)?);
}

// Store the next header
self.header = Some(next_header);
// Update self.cp
self.cp = cp.clone();

Ok(Some(Event { cp, block }))
}

/// Get the next event with a custom checkpoint data type.
pub fn next_with<F>(&mut self, to_data: F) -> Option<Result<Event<D>, Error>>
where
F: Fn(Header) -> D,
{
self.try_next_with(to_data).transpose()
}
}

/// Event returned by [`FilterIter`].
#[derive(Debug, Clone)]
pub struct Event {
pub struct Event<D = BlockHash> {
/// Checkpoint
pub cp: CheckPoint,
pub cp: CheckPoint<D>,
/// Block, will be `Some(..)` for matching blocks
pub block: Option<Block>,
}
Expand All @@ -92,60 +163,14 @@ impl Event {
}
}

impl Iterator for FilterIter<'_> {
type Item = Result<Event, Error>;
impl<D> Iterator for FilterIter<'_, D>
where
D: ToBlockHash + FromBlockHeader + Clone + Debug,
{
type Item = Result<Event<D>, Error>;

fn next(&mut self) -> Option<Self::Item> {
(|| -> Result<Option<_>, Error> {
let mut cp = self.cp.clone();

let header = match self.header.take() {
Some(header) => header,
// If no header is cached we need to locate a base of the local
// checkpoint from which the scan may proceed.
None => self.find_base()?,
};

let mut next_hash = match header.next_block_hash {
Some(hash) => hash,
None => return Ok(None),
};

let mut next_header = self.client.get_block_header_info(&next_hash)?;

// In case of a reorg, rewind by fetching headers of previous hashes until we find
// one with enough confirmations.
while next_header.confirmations < 0 {
let prev_hash = next_header
.previous_block_hash
.ok_or(Error::ReorgDepthExceeded)?;
let prev_header = self.client.get_block_header_info(&prev_hash)?;
next_header = prev_header;
}

next_hash = next_header.hash;
let next_height: u32 = next_header.height.try_into()?;

cp = cp.insert(next_height, next_hash);

let mut block = None;
let filter =
BlockFilter::new(self.client.get_block_filter(&next_hash)?.filter.as_slice());
if filter
.match_any(&next_hash, self.spks.iter().map(ScriptBuf::as_ref))
.map_err(Error::Bip158)?
{
block = Some(self.client.get_block(&next_hash)?);
}

// Store the next header
self.header = Some(next_header);
// Update self.cp
self.cp = cp.clone();

Ok(Some(Event { cp, block }))
})()
.transpose()
self.try_next_with(D::from_blockheader).transpose()
}
}

Expand Down
69 changes: 46 additions & 23 deletions crates/bitcoind_rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ extern crate alloc;

use alloc::sync::Arc;
use bdk_core::collections::{HashMap, HashSet};
use bdk_core::{BlockId, CheckPoint};
use bdk_core::{BlockId, CheckPoint, FromBlockHeader, ToBlockHash};
use bitcoin::{Block, BlockHash, Transaction, Txid};
use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi};
use core::ops::Deref;
use std::fmt::Debug;

pub mod bip158;

Expand All @@ -30,13 +31,13 @@ pub use bitcoincore_rpc;
/// Refer to [module-level documentation] for more.
///
/// [module-level documentation]: crate
pub struct Emitter<C> {
pub struct Emitter<C, D = BlockHash> {
client: C,
start_height: u32,

/// The checkpoint of the last-emitted block that is in the best chain. If it is later found
/// that the block is no longer in the best chain, it will be popped off from here.
last_cp: CheckPoint<BlockHash>,
last_cp: CheckPoint<D>,

/// The block result returned from rpc of the last-emitted block. As this result contains the
/// next block's block hash (which we use to fetch the next block), we set this to `None`
Expand All @@ -62,10 +63,11 @@ pub struct Emitter<C> {
/// to start empty (i.e. with no unconfirmed transactions).
pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty<Arc<Transaction>> = core::iter::empty();

impl<C> Emitter<C>
impl<C, D> Emitter<C, D>
where
C: Deref,
C::Target: RpcApi,
D: ToBlockHash + Clone + Debug,
{
/// Construct a new [`Emitter`].
///
Expand All @@ -80,7 +82,7 @@ where
/// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used.
pub fn new(
client: C,
last_cp: CheckPoint<BlockHash>,
last_cp: CheckPoint<D>,
start_height: u32,
expected_mempool_txs: impl IntoIterator<Item = impl Into<Arc<Transaction>>>,
) -> Self {
Expand Down Expand Up @@ -198,9 +200,18 @@ where
Ok(mempool_event)
}

/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block>>, bitcoincore_rpc::Error> {
if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? {
/// Emit the next block, using `to_data` to construct checkpoint data from the block.
///
/// This is the alternative to [`next_block`](Self::next_block) when [`FromBlockHeader`] isn't
/// implemented for `D`.
pub fn next_block_with<F>(
&mut self,
to_data: F,
) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
where
F: Fn(&Block) -> D,
{
if let Some((checkpoint, block)) = poll(self, to_data)? {
// Stop tracking unconfirmed transactions that have been confirmed in this block.
for tx in &block.txdata {
self.mempool_snapshot.remove(&tx.compute_txid());
Expand All @@ -209,6 +220,14 @@ where
}
Ok(None)
}

/// Emit the next block height and block (if any).
pub fn next_block(&mut self) -> Result<Option<BlockEvent<Block, D>>, bitcoincore_rpc::Error>
where
D: FromBlockHeader,
{
self.next_block_with(|block| D::from_blockheader(block.header))
}
}

/// A new emission from mempool.
Expand All @@ -223,7 +242,7 @@ pub struct MempoolEvent {

/// A newly emitted block from [`Emitter`].
#[derive(Debug)]
pub struct BlockEvent<B> {
pub struct BlockEvent<B = Block, D = BlockHash> {
/// The block.
pub block: B,

Expand All @@ -235,7 +254,7 @@ pub struct BlockEvent<B> {
///
/// This is important as BDK structures require block-to-apply to be connected with another
/// block in the original chain.
pub checkpoint: CheckPoint<BlockHash>,
pub checkpoint: CheckPoint<D>,
}

impl<B> BlockEvent<B> {
Expand Down Expand Up @@ -264,17 +283,17 @@ impl<B> BlockEvent<B> {
}
}

enum PollResponse {
enum PollResponse<D = BlockHash> {
Block(bitcoincore_rpc_json::GetBlockResult),
NoMoreBlocks,
/// Fetched block is not in the best chain.
BlockNotInBestChain,
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint<BlockHash>),
AgreementFound(bitcoincore_rpc_json::GetBlockResult, CheckPoint<D>),
/// Force the genesis checkpoint down the receiver's throat.
AgreementPointNotFound(BlockHash),
}

fn poll_once<C>(emitter: &Emitter<C>) -> Result<PollResponse, bitcoincore_rpc::Error>
fn poll_once<C, D>(emitter: &Emitter<C, D>) -> Result<PollResponse<D>, bitcoincore_rpc::Error>
where
C: Deref,
C::Target: RpcApi,
Expand Down Expand Up @@ -328,30 +347,32 @@ where
Ok(PollResponse::AgreementPointNotFound(genesis_hash))
}

fn poll<C, V, F>(
emitter: &mut Emitter<C>,
get_item: F,
) -> Result<Option<(CheckPoint<BlockHash>, V)>, bitcoincore_rpc::Error>
fn poll<C, D, F>(
emitter: &mut Emitter<C, D>,
to_cp_data: F,
) -> Result<Option<(CheckPoint<D>, Block)>, bitcoincore_rpc::Error>
where
C: Deref,
C::Target: RpcApi,
F: Fn(&BlockHash, &C::Target) -> Result<V, bitcoincore_rpc::Error>,
D: ToBlockHash + Clone + Debug,
F: Fn(&Block) -> D,
{
let client = &emitter.client;
loop {
match poll_once(emitter)? {
PollResponse::Block(res) => {
let height = res.height as u32;
let hash = res.hash;
let item = get_item(&hash, &emitter.client)?;
let block = client.get_block(&res.hash)?;
let cp_data = to_cp_data(&block);

let new_cp = emitter
.last_cp
.clone()
.push(height, hash)
.push(height, cp_data)
.expect("must push");
emitter.last_cp = new_cp.clone();
emitter.last_block = Some(res);
return Ok(Some((new_cp, item)));
return Ok(Some((new_cp, block)));
}
PollResponse::NoMoreBlocks => {
emitter.last_block = None;
Expand All @@ -368,7 +389,9 @@ where
continue;
}
PollResponse::AgreementPointNotFound(genesis_hash) => {
emitter.last_cp = CheckPoint::new(0, genesis_hash);
let block = client.get_block(&genesis_hash)?;
let cp_data = to_cp_data(&block);
emitter.last_cp = CheckPoint::new(0, cp_data);
emitter.last_block = None;
continue;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/bitcoind_rpc/tests/test_filter_iter.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bdk_bitcoind_rpc::bip158::{Error, FilterIter};
use bdk_core::CheckPoint;
use bdk_testenv::{anyhow, bitcoind, TestEnv};
use bitcoin::{Address, Amount, Network, ScriptBuf};
use bitcoin::{Address, Amount, BlockHash, Network, ScriptBuf};
use bitcoincore_rpc::RpcApi;

fn testenv() -> anyhow::Result<TestEnv> {
Expand Down Expand Up @@ -59,7 +59,7 @@ fn filter_iter_error_wrong_network() -> anyhow::Result<()> {
let _ = env.mine_blocks(10, None)?;

// Try to initialize FilterIter with a CP on the wrong network
let cp = CheckPoint::new(0, bitcoin::hashes::Hash::hash(b"wrong-hash"));
let cp = CheckPoint::<BlockHash>::new(0, bitcoin::hashes::Hash::hash(b"wrong-hash"));
let mut iter = FilterIter::new(&env.bitcoind.client, cp, [ScriptBuf::new()]);
assert!(matches!(iter.next(), Some(Err(Error::ReorgDepthExceeded))));

Expand Down
Loading