From 16bbde7d626294a82f49861769c375ce4aef48e1 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 17 Mar 2026 15:44:05 +0100 Subject: [PATCH 1/3] y --- Cargo.lock | 1 + Cargo.toml | 2 +- crates/store/Cargo.toml | 5 + crates/store/benches/state_load.rs | 406 +++++++++++++++++++++++++++++ 4 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 crates/store/benches/state_load.rs diff --git a/Cargo.lock b/Cargo.lock index 16ea17404b..bf40da6489 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3188,6 +3188,7 @@ dependencies = [ "tonic-reflection", "tower-http", "tracing", + "tracing-subscriber", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 2156ddf8f8..8ba0915833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -121,7 +121,7 @@ tonic-reflection = { version = "0.14" } tower = { version = "0.5" } tower-http = { features = ["cors", "trace"], version = "0.6" } tracing = { version = "0.1" } -tracing-subscriber = { features = ["env-filter", "fmt", "json"], version = "0.3" } +tracing-subscriber = { features = ["ansi", "env-filter", "fmt", "json"], version = "0.3" } url = { features = ["serde"], version = "2.5" } # Lints are set to warn for development, which are promoted to errors in CI. diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 7c8135d97a..e9211f1ac8 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -69,6 +69,7 @@ rand = { workspace = true } regex = { version = "1.11" } tempfile = { workspace = true } termtree = "1.0" +tracing-subscriber = { workspace = true } [features] default = ["rocksdb"] @@ -79,6 +80,10 @@ harness = false name = "account_tree" required-features = ["rocksdb"] +[[bench]] +harness = false +name = "state_load" + [package.metadata.cargo-machete] # This is an indirect dependency for which we need to enable optimisations/features # via feature flags. Because we don't use it directly in code, machete diff --git a/crates/store/benches/state_load.rs b/crates/store/benches/state_load.rs new file mode 100644 index 0000000000..decff23aa4 --- /dev/null +++ b/crates/store/benches/state_load.rs @@ -0,0 +1,406 @@ +//! Benchmarks for `State::load` performance at different account scales. +//! +//! Measures the time taken by `State::load` with 1e4, 1e5, and 1e6 **public** accounts in the +//! database. Each account carries vault assets and storage data (both plain values and a storage +//! map), so every phase of loading is exercised: +//! +//! - `load_mmr`: rebuilds the chain MMR from block header commitments +//! - `load_account_tree`: rebuilds the account SMT from the DB +//! - `load_nullifier_tree`: rebuilds the nullifier SMT (empty here) +//! - `verify_tree_consistency`: checks tree roots against the block header +//! - `load_smt_forest`: loads per-account SMT forest for vault witnesses and storage-map proofs — +//! exercised because all accounts are public +//! +//! Span names and elapsed times are printed in **bold yellow** when the bench is run with +//! `--nocapture`. Set `RUST_LOG` to control verbosity (defaults to `miden-store=info`). +//! +//! ```sh +//! RUST_LOG=miden-store=info \ +//! cargo bench --bench state_load -p miden-node-store -- --nocapture +//! ``` + +use std::fmt; +use std::path::PathBuf; + +use criterion::{Criterion, criterion_group, criterion_main}; +use miden_crypto::utils::Serializable; +use miden_node_store::Store; +use miden_node_store::genesis::GenesisBlock; +use miden_node_store::state::State; +use miden_node_utils::clap::StorageOptions; +use miden_protocol::account::auth::{AuthScheme, PublicKeyCommitment}; +use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::account::{ + Account, + AccountBuilder, + AccountComponent, + AccountComponentCode, + AccountComponentMetadata, + AccountDelta, + AccountStorageMode, + AccountType, + StorageMap, + StorageMapKey, + StorageSlot, + StorageSlotName, +}; +use miden_protocol::asset::{Asset, FungibleAsset}; +use miden_protocol::block::account_tree::{AccountTree, account_id_to_smt_key}; +use miden_protocol::block::{ + BlockAccountUpdate, + BlockBody, + BlockHeader, + BlockNoteTree, + BlockNumber, + BlockProof, + ProvenBlock, +}; +use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey; +use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks}; +use miden_protocol::crypto::merkle::smt::{LargeSmt, MemoryStorage, Smt}; +use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET; +use miden_protocol::transaction::{OrderedTransactionHeaders, TransactionKernel}; +use miden_protocol::{EMPTY_WORD, Felt, FieldElement, Word}; +use miden_standards::account::auth::AuthSingleSig; +use miden_standards::account::wallets::BasicWallet; +use tempfile::TempDir; +use tracing::Subscriber; +use tracing_subscriber::fmt::format::{FmtSpan, FormatEvent, FormatFields, Writer}; +use tracing_subscriber::fmt::{FmtContext, FormattedFields}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{EnvFilter, Layer}; + +// CONSTANTS +// ================================================================================================ + +/// Account counts to benchmark: 1e4, 1e5, 1e6. +const ACCOUNT_COUNTS: &[usize] = &[1_000_000, 100_000]; + +/// ANSI escape: bold yellow. +const BOLD_YELLOW: &str = "\x1b[1;33m"; +/// ANSI escape: reset all attributes. +const RESET: &str = "\x1b[0m"; + +// TRACING SETUP +// ================================================================================================ + +/// A `FormatEvent` that renders span names in **bold yellow** and delegates everything else to +/// the standard compact format. +struct BoldYellowSpanFormatter; + +impl FormatEvent for BoldYellowSpanFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &FmtContext<'_, S, N>, + mut writer: Writer<'_>, + event: &tracing::Event<'_>, + ) -> fmt::Result { + let ansi = writer.has_ansi_escapes(); + let meta = event.metadata(); + + // Level + write!(writer, "{} ", meta.level())?; + + // Span scope: name in bold yellow, stored fields in normal weight. + if let Some(scope) = ctx.event_scope() { + for span in scope.from_root() { + let name = span.metadata().name(); + if ansi { + write!(writer, "{BOLD_YELLOW}{name}{RESET}")?; + } else { + write!(writer, "{name}")?; + } + let ext = span.extensions(); + if let Some(fields) = ext.get::>() { + if !fields.is_empty() { + write!(writer, "{{{}}}", fields.fields)?; + } + } + write!(writer, ": ")?; + } + } + + // Target + write!(writer, "{}: ", meta.target())?; + + // Event fields (timing fields are coloured by BoldYellowFields below). + ctx.format_fields(writer.by_ref(), event)?; + writeln!(writer) + } +} + +/// A `FormatFields` that renders `time.busy` and `time.idle` in **bold yellow**; all other +/// fields are rendered normally. +struct BoldYellowFields; + +impl<'writer> FormatFields<'writer> for BoldYellowFields { + fn format_fields( + &self, + mut writer: Writer<'writer>, + fields: R, + ) -> fmt::Result { + use tracing::field::{Field, Visit}; + + let ansi = writer.has_ansi_escapes(); + + struct Visitor<'w> { + writer: Writer<'w>, + ansi: bool, + first: bool, + } + + impl Visit for Visitor<'_> { + fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { + let sep = if self.first { "" } else { ", " }; + self.first = false; + let name = field.name(); + let is_timing = name == "time.busy" || name == "time.idle"; + if self.ansi && is_timing { + let _ = write!( + self.writer, + "{sep}{BOLD_YELLOW}{name}{RESET}={BOLD_YELLOW}{value:?}{RESET}", + ); + } else { + let _ = write!(self.writer, "{sep}{name}={value:?}"); + } + } + } + + fields.record(&mut Visitor { + writer: writer.by_ref(), + ansi, + first: true, + }); + Ok(()) + } +} + +/// Installs the global tracing subscriber once. Subsequent calls are silently ignored. +fn init_tracing() { + let filter = std::env::var("RUST_LOG") + .map(EnvFilter::new) + .unwrap_or_else(|_| EnvFilter::new("miden-store=info")); + + let layer = tracing_subscriber::fmt::layer() + .with_span_events(FmtSpan::CLOSE) + .with_ansi(true) + .event_format(BoldYellowSpanFormatter) + .fmt_fields(BoldYellowFields) + .with_filter(filter); + + let _ = tracing_subscriber::registry().with(layer).try_init(); +} + +// ACCOUNT CONSTRUCTION +// ================================================================================================ + +/// Component name shared across all benchmark accounts. +const DATA_COMPONENT_NAME: &str = "bench::wallet::data"; + +fn value_slot_name() -> StorageSlotName { + StorageSlotName::new("bench::wallet::data::balance").expect("valid slot name") +} + +fn map_slot_name() -> StorageSlotName { + StorageSlotName::new("bench::wallet::data::ledger").expect("valid slot name") +} + +/// Builds one public account with: +/// - a `BasicWallet` component (pre-compiled, `LazyLock`-cached) +/// - a data component with one value slot and one two-entry storage map +/// - an `AuthSingleSig` auth component (pre-compiled, `LazyLock`-cached) +/// - one fungible vault asset +/// +/// `wallet_code` is the pre-compiled `BasicWallet` library reused across all accounts so that +/// MASM compilation only happens once (it is backed by a `LazyLock` inside `miden-standards`). +/// All numeric data is derived from the seed so every account holds distinct state. +fn build_public_account( + seed: [u8; 32], + faucet_id: miden_protocol::account::AccountId, + wallet_code: AccountComponentCode, +) -> Account { + let balance_value = Word::from([ + Felt::new(seed[0] as u64 + 1), + Felt::new(seed[1] as u64 + 1), + Felt::new(seed[2] as u64 + 1), + Felt::new(seed[3] as u64 + 1), + ]); + + let storage_map = StorageMap::with_entries([ + ( + StorageMapKey::from_index(seed[4] as u32 + 1), + Word::from([Felt::new(seed[5] as u64 + 1), Felt::ZERO, Felt::ZERO, Felt::ZERO]), + ), + ( + StorageMapKey::from_index(seed[6] as u32 + 128), + Word::from([Felt::new(seed[7] as u64 + 1), Felt::ZERO, Felt::ZERO, Felt::ZERO]), + ), + ]) + .expect("valid storage map entries"); + + // Reuse the pre-compiled wallet code (cloned cheaply — it's an Arc-backed Library). + let data_component = AccountComponent::new( + wallet_code.clone(), + vec![ + StorageSlot::with_value(value_slot_name(), balance_value), + StorageSlot::with_map(map_slot_name(), storage_map), + ], + AccountComponentMetadata::new(DATA_COMPONENT_NAME).with_supports_all_types(), + ) + .expect("data component should be valid"); + + let asset_amount = (seed[8] as u64 + 1) * 100; + let fungible_asset = FungibleAsset::new(faucet_id, asset_amount).expect("valid fungible asset"); + + AccountBuilder::new(seed) + .account_type(AccountType::RegularAccountImmutableCode) + .storage_mode(AccountStorageMode::Public) + .with_component(BasicWallet) + .with_component(data_component) + .with_assets([Asset::Fungible(fungible_asset)]) + .with_auth_component(AuthSingleSig::new( + PublicKeyCommitment::from(EMPTY_WORD), + AuthScheme::Falcon512Rpo, + )) + .build_existing() + .expect("account should build successfully") +} + +// BENCHMARK SETUP +// ================================================================================================ + +/// Creates a fully bootstrapped data directory with `num_accounts` public accounts. +/// +/// Every account carries a vault asset and storage data so that `load_smt_forest` has real work +/// to do. The returned [`TempDir`] must be kept alive for the duration of the benchmark. +fn setup_data_directory(num_accounts: usize) -> (TempDir, PathBuf) { + let temp_dir = TempDir::new().expect("failed to create temp dir"); + let data_dir = temp_dir.path().to_path_buf(); + + let faucet_id = miden_protocol::account::AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET) + .expect("valid faucet account id"); + + // Precompile the BasicWallet library once; `AccountComponentCode` is `Clone` (Arc-backed). + let wallet_code: AccountComponentCode = AccountComponent::from(BasicWallet).into(); + + tracing::info!(num_accounts=%num_accounts, "Building accounts...."); + let accounts = Vec::::from_iter((0..num_accounts).map(|i| { + let src = Felt::new(i as u64 + 235230).to_bytes(); + let mut seed = [0u8; 32]; + seed[0..src.len()].copy_from_slice(&src[..]); + build_public_account(seed, faucet_id, wallet_code.clone()) + })); + tracing::info!(num_accounts=%num_accounts, "Building accounts >> DONE"); + + tracing::info!(num_accounts=%num_accounts, "Building genesis block...."); + let genesis_block = build_genesis_block(accounts); + tracing::info!(num_accounts=%num_accounts, "Building genesis block >> DONE"); + + tracing::info!(num_accounts=%num_accounts, "Bootstraping...."); + Store::bootstrap(&genesis_block, &data_dir).expect("Store::bootstrap failed"); + tracing::info!(num_accounts=%num_accounts, "Bootstraping >> DONE"); + + (temp_dir, data_dir) +} + +/// Wraps the account list in a `GenesisBlock` with consistent account-tree and nullifier-tree +/// roots so that `verify_tree_consistency` passes on every `State::load` call. +fn build_genesis_block(accounts: Vec) -> GenesisBlock { + let account_updates: Vec = accounts + .iter() + .map(|account| { + let delta = AccountDelta::try_from(account.clone()) + .expect("full-state delta should always succeed"); + BlockAccountUpdate::new( + account.id(), + account.to_commitment(), + AccountUpdateDetails::Delta(delta), + ) + }) + .collect(); + + let smt_entries = accounts.iter().map(|a| (account_id_to_smt_key(a.id()), a.to_commitment())); + let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries) + .expect("failed to build account SMT"); + let account_tree = AccountTree::new(smt).expect("failed to create AccountTree"); + + let secret_key = SecretKey::new(); + let header = BlockHeader::new( + 1_u32, + Word::empty(), + BlockNumber::GENESIS, + MmrPeaks::new(Forest::empty(), Vec::new()) + .expect("empty MmrPeaks is always valid") + .hash_peaks(), + account_tree.root(), + Smt::new().root(), + BlockNoteTree::empty().root(), + Word::empty(), + TransactionKernel.to_commitment(), + secret_key.public_key(), + miden_node_utils::fee::test_fee_params(), + 0_u32, + ); + let signature = secret_key.sign(header.commitment()); + + let body = BlockBody::new_unchecked( + account_updates, + vec![], + vec![], + OrderedTransactionHeaders::new_unchecked(vec![]), + ); + + let proven = ProvenBlock::new_unchecked(header, body, signature, BlockProof::new_dummy()); + GenesisBlock::try_from(proven).expect("synthetic genesis block should be valid") +} + +// BENCHMARK FUNCTIONS +// ================================================================================================ + +fn bench_state_load(c: &mut Criterion) { + init_tracing(); + + let mut group = c.benchmark_group("state_load"); + + for &num_accounts in ACCOUNT_COUNTS { + let (_temp_dir, data_dir) = setup_data_directory(num_accounts); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to build Tokio runtime"); + + group.bench_function(format!("{num_accounts}_accounts"), |b| { + b.iter(|| { + let data_dir = data_dir.clone(); + rt.block_on(async move { + let (termination_ask, _rx) = tokio::sync::mpsc::channel(1); + let state = State::load(&data_dir, StorageOptions::default(), termination_ask) + .await + .expect("State::load failed during benchmark"); + // Drop inside the async context so the Db pool shuts down while + // a Tokio runtime is still active. + drop(state); + }) + }); + }); + } + + group.finish(); +} + +criterion_group!( + name = state_load; + config = Criterion::default() + .sample_size(10) + .measurement_time(std::time::Duration::from_secs(600)) + .warm_up_time(std::time::Duration::from_secs(3)); + targets = bench_state_load +); +criterion_main!(state_load); From d25c7ed8209c2c14bfbf90170acbfaa068405785 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 17 Mar 2026 17:16:54 +0100 Subject: [PATCH 2/3] fun --- Cargo.lock | 49 ++++++++++ Cargo.toml | 2 + crates/store/Cargo.toml | 3 + crates/store/benches/state_load.rs | 89 +++++++++++------ crates/store/src/account_state_forest/mod.rs | 96 ++++++++++++++++++- crates/store/src/db/mod.rs | 28 +++++- .../store/src/db/models/queries/accounts.rs | 2 +- .../src/db/models/queries/accounts/delta.rs | 2 +- crates/store/src/state/loader.rs | 64 +++++++++---- 9 files changed, 284 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf40da6489..eaf8e350e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -998,6 +998,19 @@ dependencies = [ "memchr", ] +[[package]] +name = "console" +version = "0.15.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "054ccb5b10f9f2cbf51eb355ca1d05c2d279ce1804688d0db74b4733a5aeafd8" +dependencies = [ + "encode_unicode", + "libc", + "once_cell", + "unicode-width 0.2.2", + "windows-sys 0.59.0", +] + [[package]] name = "const-oid" version = "0.9.6" @@ -1475,6 +1488,12 @@ dependencies = [ "log", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.35" @@ -2300,6 +2319,19 @@ dependencies = [ "serde_core", ] +[[package]] +name = "indicatif" +version = "0.17.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "183b3088984b400f4cfac3620d5e076c84da5364016b4f49473de574b2586235" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width 0.2.2", + "web-time", +] + [[package]] name = "inout" version = "0.1.4" @@ -3160,6 +3192,7 @@ dependencies = [ "futures", "hex", "indexmap", + "indicatif", "libsqlite3-sys", "miden-agglayer", "miden-block-prover", @@ -3176,6 +3209,7 @@ dependencies = [ "pretty_assertions", "rand", "rand_chacha", + "rayon", "regex", "serde", "tempfile", @@ -3783,6 +3817,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.37.3" @@ -6433,6 +6473,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.60.2" diff --git a/Cargo.toml b/Cargo.toml index 8ba0915833..57ef17ae5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,9 +100,11 @@ pretty_assertions = { version = "1.4" } # prost and protox are from different authors and are _not_ released in # lockstep, nor are they adhering to semver semantics. We keep this # to avoid future breakage. +indicatif = { version = "0.17" } prost = { default-features = false, version = "=0.14.3" } protox = { version = "=0.9.1" } rand = { version = "0.9" } +rayon = { version = "1" } rand_chacha = { default-features = false, version = "0.9" } reqwest = { version = "0.13" } rstest = { version = "0.26" } diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index e9211f1ac8..57a5cb04c3 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -39,6 +39,7 @@ miden-protocol = { features = ["std", "testing"], workspace = true } pretty_assertions = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } +rayon = { workspace = true } serde = { features = ["derive"], version = "1" } thiserror = { workspace = true } tokio = { features = ["fs", "rt-multi-thread"], workspace = true } @@ -66,6 +67,8 @@ miden-node-utils = { features = ["testing", "tracing-forest"], workspace = miden-protocol = { default-features = true, features = ["testing"], workspace = true } miden-standards = { features = ["testing"], workspace = true } rand = { workspace = true } +indicatif = { workspace = true } +rayon = { workspace = true } regex = { version = "1.11" } tempfile = { workspace = true } termtree = "1.0" diff --git a/crates/store/benches/state_load.rs b/crates/store/benches/state_load.rs index decff23aa4..3bd3f62456 100644 --- a/crates/store/benches/state_load.rs +++ b/crates/store/benches/state_load.rs @@ -23,6 +23,7 @@ use std::fmt; use std::path::PathBuf; use criterion::{Criterion, criterion_group, criterion_main}; +use indicatif::{ProgressBar, ProgressStyle}; use miden_crypto::utils::Serializable; use miden_node_store::Store; use miden_node_store::genesis::GenesisBlock; @@ -63,7 +64,7 @@ use miden_protocol::transaction::{OrderedTransactionHeaders, TransactionKernel}; use miden_protocol::{EMPTY_WORD, Felt, FieldElement, Word}; use miden_standards::account::auth::AuthSingleSig; use miden_standards::account::wallets::BasicWallet; -use tempfile::TempDir; +use rayon::prelude::*; use tracing::Subscriber; use tracing_subscriber::fmt::format::{FmtSpan, FormatEvent, FormatFields, Writer}; use tracing_subscriber::fmt::{FmtContext, FormattedFields}; @@ -76,7 +77,7 @@ use tracing_subscriber::{EnvFilter, Layer}; // ================================================================================================ /// Account counts to benchmark: 1e4, 1e5, 1e6. -const ACCOUNT_COUNTS: &[usize] = &[1_000_000, 100_000]; +const ACCOUNT_COUNTS: &[usize] = &[100_000]; /// ANSI escape: bold yellow. const BOLD_YELLOW: &str = "\x1b[1;33m"; @@ -275,13 +276,23 @@ fn build_public_account( // BENCHMARK SETUP // ================================================================================================ -/// Creates a fully bootstrapped data directory with `num_accounts` public accounts. +/// Returns a stable, reusable data directory for `num_accounts` public accounts. +/// +/// The directory is stored at `target/bench-data-{num_accounts}` relative to the workspace root +/// (via the `CARGO_MANIFEST_DIR` env var). If `miden-store.sqlite3` already exists inside it, +/// the previous bootstrap is reused and account generation is skipped entirely. Otherwise the +/// full setup runs and the result persists for future runs. /// /// Every account carries a vault asset and storage data so that `load_smt_forest` has real work -/// to do. The returned [`TempDir`] must be kept alive for the duration of the benchmark. -fn setup_data_directory(num_accounts: usize) -> (TempDir, PathBuf) { - let temp_dir = TempDir::new().expect("failed to create temp dir"); - let data_dir = temp_dir.path().to_path_buf(); +/// to do. +fn setup_data_directory(num_accounts: usize) -> PathBuf { + let cache_root = PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("../../target/bench-data") + .join(num_accounts.to_string()); + + std::fs::create_dir_all(&cache_root).expect("failed to create bench-data cache directory"); + + let data_dir = cache_root; let faucet_id = miden_protocol::account::AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET) .expect("valid faucet account id"); @@ -289,13 +300,35 @@ fn setup_data_directory(num_accounts: usize) -> (TempDir, PathBuf) { // Precompile the BasicWallet library once; `AccountComponentCode` is `Clone` (Arc-backed). let wallet_code: AccountComponentCode = AccountComponent::from(BasicWallet).into(); + // If the database file already exists this directory was fully bootstrapped previously. + let db_path = data_dir.join("miden-store.sqlite3"); + if db_path.exists() { + tracing::info!(num_accounts=%num_accounts, "Reusing cached data directory"); + return data_dir; + } + tracing::info!(num_accounts=%num_accounts, "Building accounts...."); - let accounts = Vec::::from_iter((0..num_accounts).map(|i| { - let src = Felt::new(i as u64 + 235230).to_bytes(); - let mut seed = [0u8; 32]; - seed[0..src.len()].copy_from_slice(&src[..]); - build_public_account(seed, faucet_id, wallet_code.clone()) - })); + let pb = ProgressBar::new(num_accounts as u64); + pb.set_style( + ProgressStyle::with_template( + "{msg} [{bar:50.cyan/blue}] {pos}/{len} accounts ({eta} remaining)", + ) + .unwrap() + .progress_chars("=> "), + ); + pb.set_message(format!("Building {num_accounts} accounts")); + let accounts: Vec = (0..num_accounts) + .into_par_iter() + .map(|i| { + let src = Felt::new(i as u64 + 235230).to_bytes(); + let mut seed = [0u8; 32]; + seed[0..src.len()].copy_from_slice(&src[..]); + let account = build_public_account(seed, faucet_id, wallet_code.clone()); + pb.inc(1); + account + }) + .collect(); + pb.finish_with_message(format!("Built {num_accounts} accounts")); tracing::info!(num_accounts=%num_accounts, "Building accounts >> DONE"); tracing::info!(num_accounts=%num_accounts, "Building genesis block...."); @@ -306,27 +339,29 @@ fn setup_data_directory(num_accounts: usize) -> (TempDir, PathBuf) { Store::bootstrap(&genesis_block, &data_dir).expect("Store::bootstrap failed"); tracing::info!(num_accounts=%num_accounts, "Bootstraping >> DONE"); - (temp_dir, data_dir) + data_dir } /// Wraps the account list in a `GenesisBlock` with consistent account-tree and nullifier-tree /// roots so that `verify_tree_consistency` passes on every `State::load` call. fn build_genesis_block(accounts: Vec) -> GenesisBlock { - let account_updates: Vec = accounts - .iter() + let (account_updates, smt_entries): (Vec, Vec<_>) = accounts + .into_par_iter() .map(|account| { - let delta = AccountDelta::try_from(account.clone()) - .expect("full-state delta should always succeed"); - BlockAccountUpdate::new( - account.id(), - account.to_commitment(), - AccountUpdateDetails::Delta(delta), - ) + let id = account.id(); + let commitment = account.to_commitment(); + + let delta = + AccountDelta::try_from(account).expect("full-state delta should always succeed"); + let update = + BlockAccountUpdate::new(id, commitment, AccountUpdateDetails::Delta(delta)); + let smt_entry = + (account_id_to_smt_key(update.account_id()), update.final_state_commitment()); + (update, smt_entry) }) - .collect(); + .unzip(); - let smt_entries = accounts.iter().map(|a| (account_id_to_smt_key(a.id()), a.to_commitment())); - let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries) + let smt = LargeSmt::with_entries(MemoryStorage::default(), smt_entries.into_iter()) .expect("failed to build account SMT"); let account_tree = AccountTree::new(smt).expect("failed to create AccountTree"); @@ -369,7 +404,7 @@ fn bench_state_load(c: &mut Criterion) { let mut group = c.benchmark_group("state_load"); for &num_accounts in ACCOUNT_COUNTS { - let (_temp_dir, data_dir) = setup_data_directory(num_accounts); + let data_dir = setup_data_directory(num_accounts); let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() diff --git a/crates/store/src/account_state_forest/mod.rs b/crates/store/src/account_state_forest/mod.rs index 58026cfe23..c6af818333 100644 --- a/crates/store/src/account_state_forest/mod.rs +++ b/crates/store/src/account_state_forest/mod.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use miden_crypto::hash::rpo::Rpo256; use miden_crypto::merkle::smt::ForestInMemoryBackend; @@ -341,6 +341,100 @@ impl AccountStateForest { /// # Errors /// /// Returns an error if applying a vault delta results in a negative balance. + /// Inserts a brand-new account into the forest directly from raw DB data, without going + /// through a full `Account` or `AccountDelta`. + /// + /// This is the fast path used during `load_smt_forest` at startup: the vault assets and + /// storage map entries are fetched directly from their respective tables and inserted here, + /// avoiding the expensive full-account load + `AccountDelta::try_from` conversion. + /// + /// Semantics are identical to calling `update_account` with a full-state delta: + /// - `insert_account_vault` is called unconditionally (empty vault is still recorded) + /// - `insert_account_storage` is called for every map slot with its entries + pub(crate) fn insert_account( + &mut self, + block_num: BlockNumber, + account_id: AccountId, + assets: &[Asset], + map_entries: &BTreeMap>, + ) -> Result<(), AccountStateForestError> { + // --- vault ----------------------------------------------------------- + { + let prev_root = self.get_latest_vault_root(account_id); + let lineage = Self::vault_lineage_id(account_id); + assert_eq!(prev_root, Self::empty_smt_root(), "account should not be in the forest"); + assert!( + self.forest.latest_version(lineage).is_none(), + "account should not be in the forest" + ); + + let mut entries: Vec<(Word, Word)> = Vec::new(); + for asset in assets { + match asset { + Asset::Fungible(f) => entries.push((f.vault_key().into(), (*f).into())), + Asset::NonFungible(nf) => entries.push((nf.vault_key().into(), (*nf).into())), + } + } + + let num_entries = entries.len(); + let lineage = Self::vault_lineage_id(account_id); + let operations = Self::build_forest_operations(entries); + let new_root = self.apply_forest_updates(lineage, block_num, operations); + + tracing::debug!( + target: crate::COMPONENT, + %account_id, + %block_num, + %new_root, + vault_entries = num_entries, + "Inserted vault into forest" + ); + } + + // --- storage maps ---------------------------------------------------- + for (slot_name, entries) in map_entries { + let prev_root = self.get_latest_storage_map_root(account_id, slot_name); + assert_eq!(prev_root, Self::empty_smt_root(), "account should not be in the forest"); + + let raw_map_entries: Vec<(StorageMapKey, Word)> = entries + .iter() + .filter(|&(_, v)| *v != EMPTY_WORD) + .map(|(&k, &v)| (k, v)) + .collect(); + + if raw_map_entries.is_empty() { + let lineage = Self::storage_lineage_id(account_id, slot_name); + let _new_root = self.apply_forest_updates(lineage, block_num, Vec::new()); + continue; + } + + let hashed_entries: Vec<(Word, Word)> = raw_map_entries + .iter() + .map(|(raw_key, value)| (raw_key.hash().into(), *value)) + .collect(); + + let lineage = Self::storage_lineage_id(account_id, slot_name); + assert!( + self.forest.latest_version(lineage).is_none(), + "account should not be in the forest" + ); + let operations = Self::build_forest_operations(hashed_entries); + let new_root = self.apply_forest_updates(lineage, block_num, operations); + + tracing::debug!( + target: crate::COMPONENT, + %account_id, + %block_num, + ?slot_name, + %new_root, + delta_entries = raw_map_entries.len(), + "Inserted storage map into forest" + ); + } + + Ok(()) + } + pub(crate) fn update_account( &mut self, block_num: BlockNumber, diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 43df59d5e3..1184eec5d3 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -10,7 +10,13 @@ use miden_node_proto::generated as proto; use miden_node_utils::limiter::MAX_RESPONSE_PAYLOAD_BYTES; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_protocol::Word; -use miden_protocol::account::{AccountHeader, AccountId, AccountStorageHeader, StorageMapKey}; +use miden_protocol::account::{ + AccountHeader, + AccountId, + AccountStorageHeader, + StorageMapKey, + StorageSlotName, +}; use miden_protocol::asset::{Asset, AssetVaultKey, FungibleAsset}; use miden_protocol::block::{BlockHeader, BlockNoteIndex, BlockNumber, SignedBlock}; use miden_protocol::crypto::merkle::SparseMerklePath; @@ -82,6 +88,7 @@ pub type Result = std::result::Result; /// The Store's database. /// /// Extends the underlying [`miden_node_db::Db`] type with functionality specific to the Store. +#[derive(Clone)] pub struct Db { db: miden_node_db::Db, } @@ -429,6 +436,25 @@ impl Db { .await } + /// Loads only the vault assets and storage map entries for a public account. + /// + /// This is the minimal data needed to populate + /// [`crate::account_state_forest::AccountStateForest`] during startup via + /// `load_smt_forest`, avoiding a full account load. + #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] + pub async fn select_account_forest_data( + &self, + id: AccountId, + ) -> Result<(Vec, BTreeMap>)> { + self.transact("get account forest data", move |conn| { + let assets = queries::select_latest_vault_assets(conn, id)?; + let (_header, map_entries) = + queries::select_latest_account_storage_components(conn, id)?; + Ok((assets, map_entries)) + }) + .await + } + /// Loads public account details for a network account by its full account ID. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn select_network_account_by_id( diff --git a/crates/store/src/db/models/queries/accounts.rs b/crates/store/src/db/models/queries/accounts.rs index 0e8ae47b60..7ab2203489 100644 --- a/crates/store/src/db/models/queries/accounts.rs +++ b/crates/store/src/db/models/queries/accounts.rs @@ -53,11 +53,11 @@ pub(crate) use at_block::{ }; mod delta; +pub(crate) use delta::select_latest_vault_assets; use delta::{ AccountStateForInsert, PartialAccountState, apply_storage_delta, - select_latest_vault_assets, select_minimal_account_state_headers, select_vault_balances_by_faucet_ids, }; diff --git a/crates/store/src/db/models/queries/accounts/delta.rs b/crates/store/src/db/models/queries/accounts/delta.rs index 4f39352397..d9a5b68fc2 100644 --- a/crates/store/src/db/models/queries/accounts/delta.rs +++ b/crates/store/src/db/models/queries/accounts/delta.rs @@ -198,7 +198,7 @@ pub(super) fn select_vault_balances_by_faucet_ids( /// FROM account_vault_assets /// WHERE account_id = ?1 AND is_latest = 1 /// ``` -pub(super) fn select_latest_vault_assets( +pub(crate) fn select_latest_vault_assets( conn: &mut SqliteConnection, account_id: AccountId, ) -> Result, DatabaseError> { diff --git a/crates/store/src/state/loader.rs b/crates/store/src/state/loader.rs index 20ef9cdc04..bcfc4f6d84 100644 --- a/crates/store/src/state/loader.rs +++ b/crates/store/src/state/loader.rs @@ -361,14 +361,50 @@ pub async fn load_mmr(db: &mut Db) -> Result Result { - use miden_protocol::account::delta::AccountDelta; - - let mut forest = AccountStateForest::new(); let mut cursor = None; + let limit = PUBLIC_ACCOUNT_IDS_PAGE_SIZE.get() * 2; + + let (tx, mut rx) = tokio::sync::mpsc::channel(100); + + let (tx2, mut rx2) = tokio::sync::mpsc::channel(limit); + + let jh1 = tokio::spawn(async move { + let mut buffer = Vec::with_capacity(limit); + loop { + let n = rx.recv_many(&mut buffer, limit).await; + if n == 0 || rx.is_closed() { + break; + } + for account_id in buffer.drain(..).flatten() { + let (assets, map_entries) = db.select_account_forest_data(account_id).await?; + tx2.send((account_id, assets, map_entries)).await.unwrap(); + } + } + Ok::<_, DatabaseError>(()) + }); + + let jh2 = tokio::task::spawn_blocking( + move || -> Result { + let mut forest = AccountStateForest::new(); + + let mut buffer = Vec::with_capacity(limit); + loop { + let n = rx2.blocking_recv_many(&mut buffer, limit); + if n == 0 || rx2.is_closed() { + break; + } + for (account_id, ref assets, ref map_entries) in buffer.drain(..) { + forest.insert_account(block_num, account_id, assets, map_entries)?; + } + } + Ok(forest) + }, + ); + loop { let page = db.select_public_account_ids_paged(PUBLIC_ACCOUNT_IDS_PAGE_SIZE, cursor).await?; @@ -376,28 +412,16 @@ pub async fn load_smt_forest( break; } - // Process each account in this page - for account_id in page.account_ids { - // TODO: Loading the full account from the database is inefficient and will need to - // go away. - let account_info = db.select_account(account_id).await?; - let account = account_info - .details - .ok_or(StateInitializationError::PublicAccountMissingDetails(account_id))?; - - // Convert the full account to a full-state delta - let delta = AccountDelta::try_from(account).map_err(|e| { - StateInitializationError::AccountToDeltaConversionFailed(e.to_string()) - })?; - - forest.update_account(block_num, &delta)?; - } + tx.send(page.account_ids).await; cursor = page.next_cursor; if cursor.is_none() { break; } } + drop(tx); + jh1.await.unwrap()?; + let forest = jh2.await.unwrap()?; Ok(forest) } From 56fab68c16ad9820d87b8b842ce847bd8d6d9f91 Mon Sep 17 00:00:00 2001 From: Bernhard Schuster Date: Tue, 17 Mar 2026 22:30:08 +0100 Subject: [PATCH 3/3] perf of startup investigation --- Cargo.lock | 25 ++++- Cargo.toml | 1 + crates/store/Cargo.toml | 4 +- crates/store/benches/state_load.rs | 35 +++++- crates/store/src/account_state_forest/mod.rs | 73 ++++++++++++ crates/store/src/db/mod.rs | 37 ++++++ .../store/src/db/models/queries/accounts.rs | 23 +++- .../src/db/models/queries/accounts/delta.rs | 97 ++++++++++++++++ crates/store/src/state/loader.rs | 105 ++++++++++-------- 9 files changed, 350 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eaf8e350e7..5e35d34091 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,6 +152,28 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -3182,6 +3204,7 @@ version = "0.14.0-alpha.5" dependencies = [ "anyhow", "assert_matches", + "async-stream", "build-rs", "criterion", "deadpool", @@ -4711,7 +4734,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 57ef17ae5c..69bd687230 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ miden-crypto = { version = "0.19.7" } # External dependencies anyhow = { version = "1.0" } +async-stream = { version = "0.3" } assert_matches = { version = "1.5" } async-trait = { version = "0.1" } build-rs = { version = "0.3" } diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index 57a5cb04c3..09774aa5b4 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -21,7 +21,9 @@ deadpool-diesel = { features = ["sqlite"], version = "0.6" } diesel = { features = ["numeric", "sqlite"], version = "2.3" } diesel_migrations = { features = ["sqlite"], version = "2.3" } fs-err = { workspace = true } +async-stream = { workspace = true } futures = { workspace = true } +indicatif = { workspace = true } hex = { version = "0.4" } indexmap = { workspace = true } libsqlite3-sys = { workspace = true } @@ -67,7 +69,7 @@ miden-node-utils = { features = ["testing", "tracing-forest"], workspace = miden-protocol = { default-features = true, features = ["testing"], workspace = true } miden-standards = { features = ["testing"], workspace = true } rand = { workspace = true } -indicatif = { workspace = true } + rayon = { workspace = true } regex = { version = "1.11" } tempfile = { workspace = true } diff --git a/crates/store/benches/state_load.rs b/crates/store/benches/state_load.rs index 3bd3f62456..af45476c22 100644 --- a/crates/store/benches/state_load.rs +++ b/crates/store/benches/state_load.rs @@ -25,9 +25,9 @@ use std::path::PathBuf; use criterion::{Criterion, criterion_group, criterion_main}; use indicatif::{ProgressBar, ProgressStyle}; use miden_crypto::utils::Serializable; -use miden_node_store::Store; use miden_node_store::genesis::GenesisBlock; use miden_node_store::state::State; +use miden_node_store::{Db, Store}; use miden_node_utils::clap::StorageOptions; use miden_protocol::account::auth::{AuthScheme, PublicKeyCommitment}; use miden_protocol::account::delta::AccountUpdateDetails; @@ -439,3 +439,36 @@ criterion_group!( targets = bench_state_load ); criterion_main!(state_load); + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + /// For every entry in [`ACCOUNT_COUNTS`], bootstraps (or reuses) the cached data directory + /// and asserts that the database contains exactly that many accounts. + #[test] + fn account_count_matches_setup() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to build Tokio runtime"); + + for &num_accounts in ACCOUNT_COUNTS { + let data_dir = setup_data_directory(num_accounts); + let db_path = data_dir.join("miden-store.sqlite3"); + + let count = rt.block_on(async { + let db = Db::load(db_path).await.expect("failed to open DB"); + db.count_accounts().await.expect("failed to count accounts") + }); + + assert_eq!( + count, num_accounts, + "expected {num_accounts} accounts in DB, found {count}" + ); + } + } +} diff --git a/crates/store/src/account_state_forest/mod.rs b/crates/store/src/account_state_forest/mod.rs index c6af818333..791f6a7e93 100644 --- a/crates/store/src/account_state_forest/mod.rs +++ b/crates/store/src/account_state_forest/mod.rs @@ -21,6 +21,7 @@ use miden_protocol::crypto::merkle::smt::{ LineageId, RootInfo, SMT_DEPTH, + SmtForestUpdateBatch, SmtUpdateBatch, TreeId, }; @@ -351,6 +352,78 @@ impl AccountStateForest { /// Semantics are identical to calling `update_account` with a full-state delta: /// - `insert_account_vault` is called unconditionally (empty vault is still recorded) /// - `insert_account_storage` is called for every map slot with its entries + /// Inserts a batch of brand-new accounts into the forest. + pub(crate) fn insert_accounts_batch( + &mut self, + block_num: BlockNumber, + batch: BTreeMap< + AccountId, + (Vec, BTreeMap>), + >, + ) -> Result<(), AccountStateForestError> { + let prepared: SmtForestUpdateBatch = batch + .into_iter() + .map(|(account_id, (assets, map_entries))| { + let mut forest_batch = SmtForestUpdateBatch::empty(); + + // vault + { + let entries: Vec<(Word, Word)> = assets + .iter() + .map(|asset| match asset { + Asset::Fungible(f) => (f.vault_key().into(), (*f).into()), + Asset::NonFungible(nf) => (nf.vault_key().into(), (*nf).into()), + }) + .collect(); + let ops = Self::build_forest_operations(entries); + forest_batch + .add_operations(Self::vault_lineage_id(account_id), ops.into_iter()); + } + + // storage maps + for (slot_name, entries) in map_entries { + let hashed: Vec<(Word, Word)> = entries + .iter() + .filter(|&(_, v)| *v != EMPTY_WORD) + .map(|(&k, &v)| (k.hash().into(), v)) + .collect(); + let ops = Self::build_forest_operations(hashed); + forest_batch.add_operations( + Self::storage_lineage_id(account_id, &slot_name), + ops.into_iter(), + ); + } + + forest_batch + }) + .fold(SmtForestUpdateBatch::empty(), |mut a, b| { + for (lineage, batch) in b { + a.add_operations(lineage, batch.into_iter()); + } + a + }); + + // --- sequential application ----------------------------------------- + self.update_forest(block_num, prepared); + + Ok(()) + } + + /// Applies a pre-computed [`SmtForestUpdateBatch`] to the forest in a single sequential pass. + /// + /// Every lineage must be new (no prior version). This is the only method that mutates + /// `self.forest` during bulk loading; all CPU-bound preparation happens before this call. + fn update_forest(&mut self, block_num: BlockNumber, updates: SmtForestUpdateBatch) { + let version = block_num.as_u64(); + // TODO FIXME + for (lineage, batch) in updates.into_iter() { + self.forest + .add_lineage(lineage, version, SmtUpdateBatch::new(batch.into_iter())) + .expect("forest update should succeed"); + } + } + + #[cfg(test)] pub(crate) fn insert_account( &mut self, block_num: BlockNumber, diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index 1184eec5d3..50066d9ee9 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -436,6 +436,12 @@ impl Db { .await } + /// Returns the total number of accounts in the database with `is_latest = true`. + #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] + pub async fn count_accounts(&self) -> Result { + self.transact("count accounts", |conn| queries::count_accounts(conn)).await + } + /// Loads only the vault assets and storage map entries for a public account. /// /// This is the minimal data needed to populate @@ -455,6 +461,37 @@ impl Db { .await } + /// Loads vault assets and storage map entries for a batch of public accounts in two queries. + /// + /// Fetches all vault assets and all storage map entries for the given `account_ids` using one + /// `WHERE account_id IN (...)` query each, then groups results by account ID. This replaces + /// `N` calls to [`Self::select_account_forest_data`] with two round-trips regardless of batch + /// size. + #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] + pub async fn select_account_forest_data_batch( + &self, + account_ids: Vec, + ) -> Result< + BTreeMap, BTreeMap>)>, + > { + self.transact("get account forest data batch", move |conn| { + let assets_by_id = queries::select_latest_vault_assets_batch(conn, &account_ids)?; + let maps_by_id = queries::select_latest_storage_map_entries_batch(conn, &account_ids)?; + + let result = account_ids + .into_iter() + .map(|id| { + let assets = assets_by_id.get(&id).cloned().unwrap_or_default(); + let maps = maps_by_id.get(&id).cloned().unwrap_or_default(); + (id, (assets, maps)) + }) + .collect(); + + Ok(result) + }) + .await + } + /// Loads public account details for a network account by its full account ID. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn select_network_account_by_id( diff --git a/crates/store/src/db/models/queries/accounts.rs b/crates/store/src/db/models/queries/accounts.rs index 7ab2203489..a50b130e11 100644 --- a/crates/store/src/db/models/queries/accounts.rs +++ b/crates/store/src/db/models/queries/accounts.rs @@ -53,7 +53,6 @@ pub(crate) use at_block::{ }; mod delta; -pub(crate) use delta::select_latest_vault_assets; use delta::{ AccountStateForInsert, PartialAccountState, @@ -61,6 +60,11 @@ use delta::{ select_minimal_account_state_headers, select_vault_balances_by_faucet_ids, }; +pub(crate) use delta::{ + select_latest_storage_map_entries_batch, + select_latest_vault_assets, + select_latest_vault_assets_batch, +}; #[cfg(test)] mod tests; @@ -336,6 +340,23 @@ pub(crate) fn select_account_commitments_paged( Ok(AccountCommitmentsPage { commitments, next_cursor }) } +/// Returns the total number of accounts in the database with `is_latest = true`. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT COUNT(*) FROM accounts WHERE is_latest = 1 +/// ``` +pub(crate) fn count_accounts(conn: &mut SqliteConnection) -> Result { + use diesel::dsl::count_star; + + let n: i64 = SelectDsl::select(schema::accounts::table, count_star()) + .filter(schema::accounts::is_latest.eq(true)) + .get_result(conn)?; + + Ok(n as usize) +} + /// Page of public account IDs returned by [`select_public_account_ids_paged`]. #[derive(Debug)] pub struct PublicAccountIdsPage { diff --git a/crates/store/src/db/models/queries/accounts/delta.rs b/crates/store/src/db/models/queries/accounts/delta.rs index d9a5b68fc2..bdf5ad84e8 100644 --- a/crates/store/src/db/models/queries/accounts/delta.rs +++ b/crates/store/src/db/models/queries/accounts/delta.rs @@ -219,6 +219,103 @@ pub(crate) fn select_latest_vault_assets( .map_err(Into::into) } +/// Selects the latest vault assets for a batch of accounts in a single query. +/// +/// Returns a map from `AccountId` to its list of assets. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT account_id, asset +/// FROM account_vault_assets +/// WHERE account_id IN (...) AND is_latest = 1 +/// ``` +pub(crate) fn select_latest_vault_assets_batch( + conn: &mut SqliteConnection, + account_ids: &[AccountId], +) -> Result>, DatabaseError> { + use schema::account_vault_assets as vault; + + if account_ids.is_empty() { + return Ok(BTreeMap::new()); + } + + let id_bytes: Vec> = account_ids.iter().map(AccountId::to_bytes).collect(); + + let entries: Vec<(Vec, Option>)> = + SelectDsl::select(vault::table, (vault::account_id, vault::asset)) + .filter(vault::account_id.eq_any(&id_bytes)) + .filter(vault::is_latest.eq(true)) + .load(conn)?; + + let mut result: BTreeMap> = + account_ids.iter().map(|id| (*id, Vec::new())).collect(); + + for (account_id_bytes, maybe_asset_bytes) in entries { + let Some(asset_bytes) = maybe_asset_bytes else { + continue; + }; + let account_id = AccountId::read_from_bytes(&account_id_bytes)?; + let asset = Asset::read_from_bytes(&asset_bytes)?; + result.entry(account_id).or_default().push(asset); + } + + Ok(result) +} + +/// Selects the latest storage map entries for a batch of accounts in a single query. +/// +/// Returns a map from `AccountId` to its storage map entries (slot name → key → value). +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT account_id, slot_name, key, value +/// FROM account_storage_map_values +/// WHERE account_id IN (...) AND is_latest = 1 +/// ``` +pub(crate) fn select_latest_storage_map_entries_batch( + conn: &mut SqliteConnection, + account_ids: &[AccountId], +) -> Result< + BTreeMap>>, + DatabaseError, +> { + use schema::account_storage_map_values as t; + + if account_ids.is_empty() { + return Ok(BTreeMap::new()); + } + + let id_bytes: Vec> = account_ids.iter().map(AccountId::to_bytes).collect(); + + let rows: Vec<(Vec, String, Vec, Vec)> = + SelectDsl::select(t::table, (t::account_id, t::slot_name, t::key, t::value)) + .filter(t::account_id.eq_any(&id_bytes)) + .filter(t::is_latest.eq(true)) + .load(conn)?; + + let mut result: BTreeMap>> = + account_ids.iter().map(|id| (*id, BTreeMap::new())).collect(); + + for (account_id_bytes, slot_name_str, key_bytes, value_bytes) in rows { + let account_id = AccountId::read_from_bytes(&account_id_bytes)?; + let slot_name: StorageSlotName = slot_name_str.parse().map_err(|_| { + DatabaseError::DataCorrupted(format!("Invalid slot name: {slot_name_str}")) + })?; + let key = StorageMapKey::read_from_bytes(&key_bytes)?; + let value = Word::read_from_bytes(&value_bytes)?; + result + .entry(account_id) + .or_default() + .entry(slot_name) + .or_default() + .insert(key, value); + } + + Ok(result) +} + // HELPER FUNCTIONS // ================================================================================================ diff --git a/crates/store/src/state/loader.rs b/crates/store/src/state/loader.rs index bcfc4f6d84..c2fbc2c4a7 100644 --- a/crates/store/src/state/loader.rs +++ b/crates/store/src/state/loader.rs @@ -364,64 +364,77 @@ pub async fn load_smt_forest( db: &Db, block_num: BlockNumber, ) -> Result { - let mut cursor = None; + use futures::StreamExt; + use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; - let limit = PUBLIC_ACCOUNT_IDS_PAGE_SIZE.get() * 2; + // How many DB fetches to drive concurrently. + const CONCURRENCY: usize = 64; - let (tx, mut rx) = tokio::sync::mpsc::channel(100); + let total = db.count_accounts().await? as u64; - let (tx2, mut rx2) = tokio::sync::mpsc::channel(limit); + let multi = MultiProgress::new(); + let style = + ProgressStyle::with_template("{msg:12} [{bar:50.cyan/blue}] {pos}/{len} ({eta} remaining)") + .unwrap() + .progress_chars("=> "); - let jh1 = tokio::spawn(async move { - let mut buffer = Vec::with_capacity(limit); + let pb_page = multi.add(ProgressBar::new(total)); + pb_page.set_style(style.clone()); + pb_page.set_message("paging"); + + let pb_fetch = multi.add(ProgressBar::new(total)); + pb_fetch.set_style(style.clone()); + pb_fetch.set_message("fetching"); + + let pb_insert = multi.add(ProgressBar::new(total)); + pb_insert.set_style(style); + pb_insert.set_message("inserting"); + + // Build a stream of (account_id, assets, map_entries) by paging through IDs and + // mapping each one to a fetch future, then running up to CONCURRENCY futures at once. + // Each item in the stream is a full page of account IDs. + let page_stream = async_stream::stream! { + let mut cursor = None; loop { - let n = rx.recv_many(&mut buffer, limit).await; - if n == 0 || rx.is_closed() { + let page = db + .select_public_account_ids_paged(PUBLIC_ACCOUNT_IDS_PAGE_SIZE, cursor) + .await?; + let done = page.next_cursor.is_none(); + let n = page.account_ids.len() as u64; + yield Ok::<_, StateInitializationError>(page.account_ids); + pb_page.inc(n); + if done { break; } - for account_id in buffer.drain(..).flatten() { - let (assets, map_entries) = db.select_account_forest_data(account_id).await?; - tx2.send((account_id, assets, map_entries)).await.unwrap(); - } + cursor = page.next_cursor; } - Ok::<_, DatabaseError>(()) - }); - - let jh2 = tokio::task::spawn_blocking( - move || -> Result { - let mut forest = AccountStateForest::new(); - - let mut buffer = Vec::with_capacity(limit); - loop { - let n = rx2.blocking_recv_many(&mut buffer, limit); - if n == 0 || rx2.is_closed() { - break; - } - for (account_id, ref assets, ref map_entries) in buffer.drain(..) { - forest.insert_account(block_num, account_id, assets, map_entries)?; - } - } - Ok(forest) - }, - ); - - loop { - let page = db.select_public_account_ids_paged(PUBLIC_ACCOUNT_IDS_PAGE_SIZE, cursor).await?; - - if page.account_ids.is_empty() { - break; + pb_page.finish_with_message("paged"); + }; + + // Map each page to a single batch fetch future. + let fetch_stream = page_stream.map(|res| { + let pb_fetch = pb_fetch.clone(); + async move { + let account_ids = res?; + let n = account_ids.len() as u64; + let batch = db.select_account_forest_data_batch(account_ids).await?; + pb_fetch.inc(n); + Ok::<_, StateInitializationError>(batch) } + }); - tx.send(page.account_ids).await; + let buffered = fetch_stream.buffered(CONCURRENCY); - cursor = page.next_cursor; - if cursor.is_none() { - break; - } + let mut forest = AccountStateForest::new(); + tokio::pin!(buffered); + while let Some(result) = buffered.next().await { + let batch = result?; + let n = batch.len() as u64; + forest.insert_accounts_batch(block_num, batch)?; + pb_insert.inc(n); } - drop(tx); - jh1.await.unwrap()?; - let forest = jh2.await.unwrap()?; + pb_fetch.finish_with_message("fetched"); + pb_insert.finish_with_message("inserted"); Ok(forest) }