diff --git a/CHANGELOG.md b/CHANGELOG.md index e2bc8f0b3d..7f592ccb3a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - Replaced blocking-in-async LargeSmt and account state forest operations in the store with wrappers using Tokio's `block_in_place()` ([#2076](https://github.com/0xMiden/node/pull/2076)). - [BREAKING] Reworked note proto types for multi-attachment support: `NoteMetadata` now carries `attachment_schemes` (repeated) and `attachments_commitment` instead of a single `attachment`. `Note` and `NetworkNote` gained an `attachments` field. `NoteSyncRecord` now embeds full `NoteMetadata` instead of `NoteMetadataHeader`. Removed `NoteAttachmentKind` enum and `NoteMetadataHeader` message ([#2078](https://github.com/0xMiden/node/pull/2078)). - [BREAKING] Changed `SyncChainMmr` endpoint: the upper end of the block range we're syncing is now the chain tip with the requested finality level. Validator signature is also returned ([#2075](https://github.com/0xMiden/node/pull/2075)). +- Added a `--tx-expiration-delta` parameter in the `ntx-builder` binary to defined expiracy of transaction for network accounts ([#2085](https://github.com/0xMiden/node/pull/2085)). - [BREAKING] Renamed `SubmitProvenTransaction` RPC endpoint to `SubmitProvenTx` ([#2094](https://github.com/0xMiden/node/pull/2094)). - [BREAKING] Renamed `SubmitProvenBatch` RPC endpoint to `SubmitProvenTxBatch` ([#2094](https://github.com/0xMiden/node/pull/2094)). diff --git a/Makefile b/Makefile index 2628151c20..7aaa672afe 100644 --- a/Makefile +++ b/Makefile @@ -108,6 +108,10 @@ install-node: ## Installs node install-validator: ## Installs validator cargo install --path bin/validator --locked +.PHONY: install-ntx-builder +install-ntx-builder: ## Installs ntx-builder + cargo install --path bin/ntx-builder --locked + .PHONY: install-remote-prover install-remote-prover: ## Install remote prover's CLI cargo install --path bin/remote-prover --bin miden-remote-prover --locked diff --git a/bin/ntx-builder/src/actor/execute.rs b/bin/ntx-builder/src/actor/execute.rs index d58ec5b00d..cf81fc89bc 100644 --- a/bin/ntx-builder/src/actor/execute.rs +++ b/bin/ntx-builder/src/actor/execute.rs @@ -30,9 +30,11 @@ use miden_protocol::transaction::{ TransactionArgs, TransactionId, TransactionInputs, + TransactionScript, }; use miden_protocol::vm::FutureMaybeSend; use miden_remote_prover_client::RemoteTransactionProver; +use miden_standards::code_builder::CodeBuilder; use miden_standards::note::AccountTargetNetworkNote; use miden_tx::auth::UnreachableAuth; use miden_tx::{ @@ -74,6 +76,19 @@ pub enum NtxError { type NtxResult = Result; +/// Compiles the tx script that sets the expiration block delta on every network transaction. +/// +/// Called once at builder startup; the resulting [`TransactionScript`] is shared across actors +/// and cloned cheaply (`Arc` internally). +pub fn compile_expiration_tx_script(tx_expiration_delta: u16) -> TransactionScript { + let script_src = format!( + "begin\n push.{tx_expiration_delta}\n exec.::miden::protocol::tx::update_expiration_block_delta\nend", + ); + CodeBuilder::new() + .compile_tx_script(script_src.as_str()) + .expect("expiration tx script should compile") +} + /// The result of a successful transaction execution. /// /// Contains the transaction ID, any notes that failed during filtering, and note scripts fetched @@ -109,10 +124,15 @@ pub struct NtxContext { /// Maximum number of VM execution cycles for network transactions. max_cycles: u32, + + /// Pre-compiled tx script that sets each submitted transaction's expiration block delta. + /// Built once at builder startup and reused for every transaction. + expiration_script: TransactionScript, } impl NtxContext { /// Creates a new [`NtxContext`] instance. + #[expect(clippy::too_many_arguments)] pub fn new( block_producer: BlockProducerClient, validator: ValidatorClient, @@ -121,6 +141,7 @@ impl NtxContext { script_cache: LruCache, db: Db, max_cycles: u32, + expiration_script: TransactionScript, ) -> Self { Self { block_producer, @@ -130,6 +151,7 @@ impl NtxContext { script_cache, db, max_cycles, + expiration_script, } } @@ -198,6 +220,10 @@ impl NtxContext { let notes = notes.into_iter().map(AccountTargetNetworkNote::into_note).collect::>(); + // The expiration script is pre-compiled at builder startup; cloning is cheap as + // TransactionScript wraps an Arc. + let expiration_script = self.expiration_script.clone(); + // VM execution (note filtering + transaction execution) is CPU-intensive and may // not yield between await points. Run it on a dedicated blocking thread while using // the parent runtime handle to drive async store callbacks. @@ -219,8 +245,12 @@ impl NtxContext { async { let (successful_notes, failed_notes) = ctx.filter_notes(&data_store, notes).await?; - let executed_tx = - Box::pin(ctx.execute(&data_store, successful_notes)).await?; + let executed_tx = Box::pin(ctx.execute( + &data_store, + successful_notes, + expiration_script, + )) + .await?; let scripts_to_cache = data_store.take_fetched_scripts(); Ok::<_, NtxError>((executed_tx, failed_notes, scripts_to_cache)) } @@ -317,14 +347,16 @@ impl NtxContext { &self, data_store: &NtxDataStore, notes: InputNotes, + expiration_script: TransactionScript, ) -> NtxResult { let executor = self.create_executor(data_store); + let tx_args = TransactionArgs::default().with_tx_script(expiration_script); Box::pin(executor.execute_transaction( data_store.account.id(), data_store.reference_block.block_num(), notes, - TransactionArgs::default(), + tx_args, )) .await .map_err(NtxError::Execution) diff --git a/bin/ntx-builder/src/actor/mod.rs b/bin/ntx-builder/src/actor/mod.rs index 4609bebecd..224721ccdc 100644 --- a/bin/ntx-builder/src/actor/mod.rs +++ b/bin/ntx-builder/src/actor/mod.rs @@ -7,6 +7,7 @@ use std::time::Duration; use anyhow::Context; use candidate::TransactionCandidate; +pub use execute::compile_expiration_tx_script; use futures::FutureExt; use miden_node_proto::domain::account::NetworkAccountId; use miden_node_utils::ErrorReport; @@ -14,7 +15,7 @@ use miden_node_utils::lru_cache::LruCache; use miden_protocol::Word; use miden_protocol::block::BlockNumber; use miden_protocol::note::{NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; +use miden_protocol::transaction::TransactionScript; use miden_remote_prover_client::RemoteTransactionProver; use miden_tx::FailedNote; use tokio::sync::{Notify, Semaphore, mpsc}; @@ -71,7 +72,7 @@ pub struct State { } /// Per-actor configuration knobs. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone)] pub struct ActorConfig { /// Maximum number of notes per transaction. pub max_notes_per_tx: NonZeroUsize, @@ -81,6 +82,12 @@ pub struct ActorConfig { pub idle_timeout: Duration, /// Maximum number of VM execution cycles for network transactions. pub max_cycles: u32, + /// Pre-compiled tx script that sets each submitted transaction's expiration block delta. + /// Built once at builder startup and shared by all actors. + pub expiration_script: TransactionScript, + /// Same delta encoded in [`Self::expiration_script`], retained so the actor can decide when a + /// submitted transaction must have either landed or been dropped. + pub tx_expiration_delta: u16, } // ACCOUNT ACTOR CONTEXT @@ -133,6 +140,8 @@ impl AccountActorContext { max_note_attempts: 1, idle_timeout: Duration::from_secs(60), max_cycles: 1 << 18, + expiration_script: crate::actor::compile_expiration_tx_script(5), + tx_expiration_delta: 5, }, request_tx, } @@ -147,7 +156,21 @@ impl AccountActorContext { enum ActorMode { NoViableNotes, NotesAvailable, - TransactionInflight(TransactionId), + /// The actor has just submitted a transaction and is waiting for it to either land on-chain or + /// expire before doing any more work for this account. This avoids busy-looping with the same + /// notes between submit and block commit. + /// + /// Carries the commitment of the account state we executed against and the chain tip at + /// submission time, so on each wake-up we can detect cheaply whether the tx landed (the + /// committed account commitment in the DB has moved) or definitely won't (the chain has + /// advanced past the expiration window). + /// + /// Network accounts are only ever updated by ntx-builder transactions, so a commitment change + /// here is equivalent to "some submission for this account has been included in a block." + WaitForNextBlock { + submitted_commitment: Word, + submitted_at: BlockNumber, + }, } // ACCOUNT ACTOR @@ -211,7 +234,7 @@ impl AccountActor { account_id, clients: actor_context.clients.clone(), state: actor_context.state.clone(), - config: actor_context.config, + config: actor_context.config.clone(), notify, request: actor_context.request_tx.clone(), } @@ -251,7 +274,7 @@ impl AccountActor { // Enable or disable transaction execution based on actor mode. let tx_permit_acquisition = match mode { // Disable transaction execution. - ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => { + ActorMode::NoViableNotes | ActorMode::WaitForNextBlock { .. } => { std::future::pending().boxed() }, // Enable transaction execution. @@ -266,34 +289,17 @@ impl AccountActor { }; tokio::select! { - // Handle coordinator notifications. On notification, re-evaluate state from DB. + // Handle coordinator notifications. Whether we re-evaluate depends on whether + // an in-flight submission still might land. _ = self.notify.notified() => { - match mode { - ActorMode::TransactionInflight(awaited_id) => { - // Check DB: is the inflight tx still pending? - let exists = self - .state - .db - .transaction_exists(awaited_id) - .await - .context("failed to check transaction status")?; - if exists { - mode = ActorMode::NotesAvailable; - } - }, - _ => { - mode = ActorMode::NotesAvailable; - } - } + mode = self.on_notified(mode).await?; }, // Execute transactions. permit = tx_permit_acquisition => { let _permit = permit.context("semaphore closed")?; - // Read the chain state. + // Read the chain state and query the DB for an executable candidate. let chain_state = self.state.chain.get_cloned(); - - // Query DB for latest account and available notes. let tx_candidate = self.select_candidate_from_db( account_id, chain_state, @@ -315,6 +321,47 @@ impl AccountActor { } } + /// Decides the next mode when a coordinator notification arrives. + /// + /// If we are in [`ActorMode::WaitForNextBlock`], we only transition out once either: + /// - the committed account commitment in the DB no longer matches the one we executed against, + /// which means some network transaction for this account has been included in a block, or + /// - the chain has advanced past the expiration window, in which case the submission must be + /// considered dropped. + /// + /// While neither holds we stay in `WaitForNextBlock` so the actor does not re-execute and + /// re-submit the same notes every block. In any other mode we move to + /// [`ActorMode::NotesAvailable`] so the next loop iteration re-queries the DB. + async fn on_notified(&self, mode: ActorMode) -> anyhow::Result { + let ActorMode::WaitForNextBlock { submitted_commitment, submitted_at } = mode else { + return Ok(ActorMode::NotesAvailable); + }; + + let chain_tip = self.state.chain.chain_tip_block_number(); + let blocks_elapsed = chain_tip.as_u32().saturating_sub(submitted_at.as_u32()); + let expired = blocks_elapsed >= u32::from(self.config.tx_expiration_delta); + + if expired { + // Submission can no longer be included; re-evaluate from scratch. + return Ok(ActorMode::NotesAvailable); + } + + let current = self + .state + .db + .account_commitment(self.account_id) + .await + .context("failed to read account commitment")?; + + // The account row goes away only if the network account was removed entirely. Treat that + // as "something changed" so the next iteration re-evaluates (and almost certainly idles). + if current == Some(submitted_commitment) { + Ok(ActorMode::WaitForNextBlock { submitted_commitment, submitted_at }) + } else { + Ok(ActorMode::NotesAvailable) + } + } + /// Selects a transaction candidate by querying the DB. async fn select_candidate_from_db( &self, @@ -366,7 +413,7 @@ impl AccountActor { if self .state .db - .has_committed_account(account_id) + .has_account(account_id) .await .context("failed to check for committed account")? { @@ -379,7 +426,7 @@ impl AccountActor { if self .state .db - .has_committed_account(account_id) + .has_account(account_id) .await .context("failed to check for committed account")? { @@ -407,7 +454,11 @@ impl AccountActor { account_id: NetworkAccountId, tx_candidate: TransactionCandidate, ) -> ActorMode { - let block_num = tx_candidate.chain_tip_header.block_num(); + // Captured before execution so that `WaitForNextBlock` records the state the tx was run + // against even though `tx_candidate` is moved into the executor below. + let submitted_commitment = tx_candidate.account.to_commitment(); + let submitted_at = tx_candidate.chain_tip_header.block_num(); + let block_num = submitted_at; // Execute the selected transaction. let context = execute::NtxContext::new( @@ -418,6 +469,7 @@ impl AccountActor { self.state.script_cache.clone(), self.state.db.clone(), self.config.max_cycles, + self.config.expiration_script.clone(), ); let notes = tx_candidate.notes.clone(); @@ -444,7 +496,7 @@ impl AccountActor { let failed_notes = log_failed_notes(failed); self.mark_notes_failed(&failed_notes, block_num).await; } - ActorMode::TransactionInflight(tx_id) + ActorMode::WaitForNextBlock { submitted_commitment, submitted_at } }, // Transaction execution failed. Err(err) => { diff --git a/bin/ntx-builder/src/builder.rs b/bin/ntx-builder/src/builder.rs index f2df31a69d..98a292e8d7 100644 --- a/bin/ntx-builder/src/builder.rs +++ b/bin/ntx-builder/src/builder.rs @@ -4,19 +4,18 @@ use std::sync::Arc; use anyhow::Context; use futures::Stream; use miden_node_proto::domain::account::NetworkAccountId; -use miden_node_proto::domain::mempool::MempoolEvent; -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::BlockHeader; +use miden_protocol::block::{BlockNumber, SignedBlock}; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio::task::JoinSet; use tokio_stream::StreamExt; -use tonic::Status; use crate::NtxBuilderConfig; use crate::actor::{AccountActorContext, ActorRequest}; use crate::chain_state::SharedChainState; use crate::clients::StoreClient; +use crate::clients::store::StoreError; +use crate::committed_block::CommittedBlockEffects; use crate::coordinator::Coordinator; use crate::db::Db; use crate::server::NtxBuilderRpcServer; @@ -24,17 +23,16 @@ use crate::server::NtxBuilderRpcServer; // NETWORK TRANSACTION BUILDER // ================================================================================================ -/// A boxed, pinned stream of mempool events with a `'static` lifetime. +/// A boxed, pinned stream of committed blocks coming from the store. /// /// Boxing gives the stream a `'static` lifetime by ensuring it owns all its data, avoiding -/// complex lifetime annotations that would otherwise be required when storing `impl TryStream`. -pub(crate) type MempoolEventStream = - Pin> + Send>>; +/// complex lifetime annotations that would otherwise be required when storing `impl Stream`. +pub(crate) type BlockStream = Pin> + Send>>; /// Network transaction builder component. /// /// The network transaction builder is in charge of building transactions that consume notes -/// against network accounts. These notes are identified and communicated by the block producer. +/// against network accounts. These notes are identified by the store's committed block stream. /// The service maintains a list of unconsumed notes and periodically executes and proves /// transactions that consume them (reaching out to the store to retrieve state as necessary). /// @@ -54,8 +52,13 @@ pub struct NetworkTransactionBuilder { chain_state: Arc, /// Context shared with all account actors. actor_context: AccountActorContext, - /// Stream of mempool events from the block producer. - mempool_events: MempoolEventStream, + /// Stream of committed blocks from the store. + block_stream: BlockStream, + /// The chain tip the catch-up phase must reach before actors are spawned. + catch_up_target: BlockNumber, + /// Highest block number applied to the DB so far. Used during catch-up to decide when to + /// stop draining the stream. + last_applied_block: BlockNumber, /// Database update requests from account actors. /// /// We keep database writes centralized so this is how actors communicate @@ -72,7 +75,9 @@ impl NetworkTransactionBuilder { db: Db, chain_state: Arc, actor_context: AccountActorContext, - mempool_events: MempoolEventStream, + block_stream: BlockStream, + catch_up_target: BlockNumber, + last_applied_block: BlockNumber, actor_request_rx: mpsc::Receiver, ) -> Self { Self { @@ -82,7 +87,9 @@ impl NetworkTransactionBuilder { db, chain_state, actor_context, - mempool_events, + block_stream, + catch_up_target, + last_applied_block, actor_request_rx, } } @@ -93,18 +100,20 @@ impl NetworkTransactionBuilder { /// `GetNetworkNoteStatus` endpoint. /// /// This method: - /// 1. Optionally starts a gRPC server for note error queries - /// 2. Spawns a background task to load existing network accounts from the store - /// 3. Runs the main event loop, processing mempool events and managing actors + /// 1. Starts a gRPC server for note error queries. + /// 2. Catches up to the chain tip by draining the block stream. No actors run during this + /// phase. + /// 3. Spawns a background task to load existing network accounts from the store. + /// 4. Runs the main event loop, processing committed blocks and managing actors. /// /// # Errors /// /// Returns an error if: - /// - The mempool event stream ends unexpectedly + /// - The block stream ends unexpectedly /// - An actor encounters a fatal error /// - The account loader task fails /// - The gRPC server fails - pub async fn run(self, listener: TcpListener) -> anyhow::Result<()> { + pub async fn run(mut self, listener: TcpListener) -> anyhow::Result<()> { let mut join_set = JoinSet::new(); // Start the gRPC server. @@ -113,6 +122,9 @@ impl NetworkTransactionBuilder { server.serve(listener).await.context("ntx-builder gRPC server failed") }); + // Catch up to the chain tip before spawning any actors. + self.catch_up().await?; + join_set.spawn(self.run_event_loop()); // Wait for either the event loop or the gRPC server to complete. @@ -124,6 +136,46 @@ impl NetworkTransactionBuilder { Ok(()) } + /// Drains the block stream until the synced block reaches the catch-up target. + /// + /// During this phase the coordinator does not spawn any actors: we just apply committed-state + /// effects to the local DB and advance the shared chain state. + async fn catch_up(&mut self) -> anyhow::Result<()> { + let target = self.catch_up_target; + + if self.last_applied_block >= target { + tracing::info!( + current = %self.last_applied_block, + %target, + "ntx-builder already at or past chain tip" + ); + return Ok(()); + } + + tracing::info!( + current = %self.last_applied_block, + %target, + "ntx-builder catching up to chain tip before starting actors" + ); + + while self.last_applied_block < target { + let block = self + .block_stream + .next() + .await + .context("block stream ended during catch-up")? + .context("block stream failed during catch-up")?; + self.apply_committed_block(block).await?; + } + + tracing::info!( + tip = %self.last_applied_block, + "ntx-builder catch-up complete, starting actors" + ); + + Ok(()) + } + /// Runs the main event loop. async fn run_event_loop(mut self) -> anyhow::Result<()> { // Spawn a background task to load network accounts from the store. @@ -148,13 +200,13 @@ impl NetworkTransactionBuilder { .spawn_actor(account_id, &self.actor_context); } }, - // Handle mempool events. - event = self.mempool_events.next() => { - let event = event - .context("mempool event stream ended")? - .context("mempool event stream failed")?; + // Handle committed blocks. + block = self.block_stream.next() => { + let block = block + .context("block stream ended")? + .context("block stream failed")?; - self.handle_mempool_event(event).await?; + self.handle_committed_block(block).await?; }, // Handle account batches loaded from the store. // Once all accounts are loaded, the channel closes and this branch @@ -187,6 +239,17 @@ impl NetworkTransactionBuilder { &mut self, account_id: NetworkAccountId, ) -> Result<(), anyhow::Error> { + // Skip accounts already populated by the catch-up phase. + if self + .db + .has_account(account_id) + .await + .context("failed to check for committed account")? + { + self.coordinator.spawn_actor(account_id, &self.actor_context); + return Ok(()); + } + // Fetch account from store and write to DB. let account = self .store @@ -212,58 +275,54 @@ impl NetworkTransactionBuilder { Ok(()) } - /// Handles mempool events by writing to DB first, then notifying actors. - #[tracing::instrument(name = "ntx.builder.handle_mempool_event", skip(self, event))] - async fn handle_mempool_event(&mut self, event: MempoolEvent) -> Result<(), anyhow::Error> { - match &event { - MempoolEvent::TransactionAdded { account_delta, .. } => { - // Write event effects to DB first. - self.coordinator - .write_event(&event) - .await - .context("failed to write TransactionAdded to DB")?; - - // Spawn new actors for newly created network accounts. - if let Some(AccountUpdateDetails::Delta(delta)) = account_delta { - if delta.is_full_state() { - if let Ok(network_id) = NetworkAccountId::try_from(delta.id()) { - self.coordinator.spawn_actor(network_id, &self.actor_context); - } - } - } - let inactive_targets = self.coordinator.send_targeted(&event); - for account_id in inactive_targets { - self.coordinator.spawn_actor(account_id, &self.actor_context); - } - Ok(()) - }, - // Update chain state and notify affected actors. - MempoolEvent::BlockCommitted { header, .. } => { - // Write event effects to DB first. - let result = self - .coordinator - .write_event(&event) - .await - .context("failed to write BlockCommitted to DB")?; + /// Handles a committed block from the live stream: applies effects, updates chain state, and + /// notifies (and possibly respawns) affected actors. + #[tracing::instrument( + name = "ntx.builder.handle_committed_block", + skip(self, block), + fields(block.num = %block.header().block_num()), + )] + async fn handle_committed_block(&mut self, block: SignedBlock) -> Result<(), anyhow::Error> { + let header = block.header().clone(); + let block_num = header.block_num(); + let effects = CommittedBlockEffects::from_signed_block(&block); + let result = self + .coordinator + .apply_block(&effects) + .await + .context("failed to apply committed block to DB")?; - self.update_chain_tip(header.as_ref().clone()); - self.coordinator.notify_accounts(&result.accounts_to_notify); - Ok(()) - }, - // Notify affected actors (reverted account actors will self-cancel when they - // detect their account has been removed from the DB). - MempoolEvent::TransactionsReverted(_) => { - // Write event effects to DB first. - let result = self - .coordinator - .write_event(&event) - .await - .context("failed to write TransactionsReverted to DB")?; + self.chain_state.update_chain_tip(header, self.config.max_block_count); + self.last_applied_block = block_num; - self.coordinator.notify_accounts(&result.accounts_to_notify); - Ok(()) - }, + // Respawn inactive actors targeted by new notes and notify any active actor whose state + // changed. + let inactive_targets = self.coordinator.send_targeted(&effects); + for account_id in inactive_targets { + self.coordinator.spawn_actor(account_id, &self.actor_context); } + self.coordinator.notify_accounts(&result.accounts_to_notify); + + // Also notify every active actor so any actor currently waiting on its own submitted + // transaction wakes up, even if its account wasn't touched by this block (e.g. tx was + // dropped or expired without inclusion). + self.coordinator.notify_all(); + + Ok(()) + } + + /// Applies a committed block during the catch-up phase. Does not notify actors (there are + /// none yet). The in-memory chain state is not touched during catch-up either, since it was + /// initialized to the chain tip we are catching up to. + async fn apply_committed_block(&mut self, block: SignedBlock) -> anyhow::Result<()> { + let block_num = block.header().block_num(); + let effects = CommittedBlockEffects::from_signed_block(&block); + self.coordinator + .apply_block(&effects) + .await + .context("failed to apply committed block during catch-up")?; + self.last_applied_block = block_num; + Ok(()) } /// Processes a request from an account actor. @@ -285,9 +344,138 @@ impl NetworkTransactionBuilder { } Ok(()) } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use futures::stream; + use miden_protocol::block::SignedBlock; + use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr}; + use url::Url; + + use super::*; + use crate::NtxBuilderConfig; + use crate::actor::AccountActorContext; + use crate::clients::store::StoreError; + use crate::test_utils::{mock_block_header, mock_signed_block}; + + impl NetworkTransactionBuilder { + /// Test-only accessor for `last_applied_block`. + pub(crate) fn last_applied_block(&self) -> BlockNumber { + self.last_applied_block + } + } + + /// Constructs a `NetworkTransactionBuilder` suitable for testing `catch_up`. Only the fields + /// actually exercised during catch-up (`db`, `coordinator`, `chain_state`, `block_stream`, + /// `catch_up_target`, `last_applied_block`) are populated with real values; everything else + /// uses throwaway placeholders. + async fn builder_for_catch_up_test( + block_stream: BlockStream, + catch_up_target: BlockNumber, + last_applied_block: BlockNumber, + ) -> (NetworkTransactionBuilder, tempfile::TempDir) { + let (db, dir) = Db::test_setup().await; + let url = Url::parse("http://127.0.0.1:1").unwrap(); + let config = NtxBuilderConfig::new( + url.clone(), + url.clone(), + url.clone(), + PathBuf::from("unused.sqlite3"), + ); + let coordinator = Coordinator::new(4, 10, db.clone()); + let store = StoreClient::new(url); + let chain_mmr = PartialMmr::from_peaks(MmrPeaks::new(Forest::new(0), vec![]).unwrap()); + let chain_state = + Arc::new(SharedChainState::new(mock_block_header(0_u32.into()), chain_mmr)); + let actor_context = AccountActorContext::test(&db); + let (_request_tx, actor_request_rx) = mpsc::channel(1); + + let builder = NetworkTransactionBuilder::new( + config, + coordinator, + store, + db, + chain_state, + actor_context, + block_stream, + catch_up_target, + last_applied_block, + actor_request_rx, + ); + (builder, dir) + } + + /// Builds a `BlockStream` from a sequence of `SignedBlock`s. + fn ok_stream(blocks: Vec) -> BlockStream { + Box::pin(stream::iter(blocks.into_iter().map(Ok::<_, StoreError>))) + } + + #[tokio::test] + async fn catch_up_returns_immediately_when_already_at_tip() { + let target = BlockNumber::from(5u32); + // No blocks: if catch_up tried to pull from the stream, it would error. + let stream = ok_stream(vec![]); + + let (mut builder, _dir) = builder_for_catch_up_test(stream, target, target).await; + builder.catch_up().await.expect("catch_up should no-op when already at tip"); + + assert_eq!(builder.last_applied_block(), target); + } + + #[tokio::test] + async fn catch_up_drains_stream_to_target() { + let target = BlockNumber::from(3u32); + let blocks = vec![ + mock_signed_block(BlockNumber::from(1u32), &[], vec![]), + mock_signed_block(BlockNumber::from(2u32), &[], vec![]), + mock_signed_block(BlockNumber::from(3u32), &[], vec![]), + ]; + let stream = ok_stream(blocks); + + let (mut builder, _dir) = + builder_for_catch_up_test(stream, target, BlockNumber::from(0u32)).await; + builder.catch_up().await.expect("catch_up should drain stream up to target"); + + assert_eq!(builder.last_applied_block(), target); + } + + #[tokio::test] + async fn catch_up_errors_when_stream_ends_before_target() { + let target = BlockNumber::from(3u32); + // Stream yields only block 1, then ends. + let stream = ok_stream(vec![mock_signed_block(BlockNumber::from(1u32), &[], vec![])]); + + let (mut builder, _dir) = + builder_for_catch_up_test(stream, target, BlockNumber::from(0u32)).await; + let err = builder.catch_up().await.expect_err("catch_up should fail when stream ends"); + + assert!( + format!("{err:#}").contains("block stream ended during catch-up"), + "unexpected error message: {err:#}" + ); + // The block that did arrive should still have been applied. + assert_eq!(builder.last_applied_block(), BlockNumber::from(1u32)); + } + + #[tokio::test] + async fn catch_up_propagates_stream_error() { + let target = BlockNumber::from(2u32); + // First item is an error; catch_up should surface it without applying anything. + let stream: BlockStream = Box::pin(stream::iter(vec![Err::( + StoreError::MalformedResponse("boom".into()), + )])); + + let (mut builder, _dir) = + builder_for_catch_up_test(stream, target, BlockNumber::from(0u32)).await; + let err = builder.catch_up().await.expect_err("catch_up should propagate stream error"); - /// Updates the chain tip and prunes old blocks from the MMR. - fn update_chain_tip(&mut self, tip: BlockHeader) { - self.chain_state.update_chain_tip(tip, self.config.max_block_count); + assert!( + format!("{err:#}").contains("block stream failed during catch-up"), + "unexpected error message: {err:#}" + ); + assert_eq!(builder.last_applied_block(), BlockNumber::from(0u32)); } } diff --git a/bin/ntx-builder/src/chain_state.rs b/bin/ntx-builder/src/chain_state.rs index 12d5b79c57..6a67e1554c 100644 --- a/bin/ntx-builder/src/chain_state.rs +++ b/bin/ntx-builder/src/chain_state.rs @@ -49,19 +49,6 @@ impl ChainState { /// Updates the chain tip and prunes old blocks from the MMR. fn update_chain_tip(&mut self, tip: BlockHeader, max_block_count: usize) { - // Skip blocks already reflected in the chain state. A `BlockCommitted` event may arrive - // for a block whose state was already loaded from the store during startup: the mempool - // subscription is established first and then the chain tip is fetched, so any block - // committed in that window produces an event for state we have already ingested. - if tip.block_num() <= self.chain_tip_header.block_num() { - tracing::debug!( - event_block = %tip.block_num(), - current_tip = %self.chain_tip_header.block_num(), - "skipping BlockCommitted event for block already in chain state", - ); - return; - } - // Update MMR which lags by one block. let mmr_tip = self.chain_tip_header.clone(); Arc::make_mut(&mut self.chain_mmr).add_block(&mmr_tip, true); diff --git a/bin/ntx-builder/src/clients/block_producer.rs b/bin/ntx-builder/src/clients/block_producer.rs index b761e7dde1..10ff8584a0 100644 --- a/bin/ntx-builder/src/clients/block_producer.rs +++ b/bin/ntx-builder/src/clients/block_producer.rs @@ -1,13 +1,7 @@ -use std::time::Duration; - -use futures::{TryStream, TryStreamExt}; use miden_node_proto::clients::{BlockProducerClient as InnerBlockProducerClient, Builder}; -use miden_node_proto::domain::mempool::MempoolEvent; use miden_node_proto::generated::{self as proto}; -use miden_node_utils::FlattenResult; use miden_protocol::transaction::ProvenTransaction; use miden_protocol::utils::serde::Serializable; -use tokio_stream::StreamExt; use tonic::Status; use tracing::{info, instrument}; use url::Url; @@ -53,45 +47,4 @@ impl BlockProducerClient { Ok(()) } - - #[instrument(target = COMPONENT, name = "ntx.block_producer.client.subscribe_to_mempool", skip_all, err)] - pub async fn subscribe_to_mempool_with_retry( - &self, - ) -> Result + Send + 'static, Status> { - let mut retry_counter = 0; - loop { - match self.subscribe_to_mempool().await { - Err(err) if err.code() == tonic::Code::Unavailable => { - // Exponential backoff with base 500ms and max 30s. - let backoff = Duration::from_millis(500) - .saturating_mul(1 << retry_counter.min(6)) - .min(Duration::from_secs(30)); - - tracing::warn!( - ?backoff, - %retry_counter, - %err, - "connection failed while subscribing to the mempool, retrying" - ); - - retry_counter += 1; - tokio::time::sleep(backoff).await; - }, - result => return result, - } - } - } - - async fn subscribe_to_mempool( - &self, - ) -> Result + Send + 'static, Status> { - let stream = self.client.clone().mempool_subscription(()).await?; - - let stream = stream - .into_inner() - .map_ok(MempoolEvent::try_from) - .map(FlattenResult::flatten_result); - - Ok(stream) - } } diff --git a/bin/ntx-builder/src/clients/mod.rs b/bin/ntx-builder/src/clients/mod.rs index 19814602bb..f4841dabf0 100644 --- a/bin/ntx-builder/src/clients/mod.rs +++ b/bin/ntx-builder/src/clients/mod.rs @@ -1,5 +1,5 @@ mod block_producer; -mod store; +pub(crate) mod store; mod validator; pub use block_producer::BlockProducerClient; diff --git a/bin/ntx-builder/src/clients/store.rs b/bin/ntx-builder/src/clients/store.rs index c901fc69ab..47e2411f97 100644 --- a/bin/ntx-builder/src/clients/store.rs +++ b/bin/ntx-builder/src/clients/store.rs @@ -2,6 +2,7 @@ use std::collections::BTreeSet; use std::ops::RangeInclusive; use std::time::Duration; +use futures::{Stream, StreamExt}; use miden_node_proto::clients::{Builder, StoreNtxBuilderClient}; use miden_node_proto::decode::ConversionResultExt; use miden_node_proto::domain::account::{AccountDetails, AccountResponse, NetworkAccountId}; @@ -22,7 +23,7 @@ use miden_protocol::account::{ StorageSlotName, }; use miden_protocol::asset::{AssetVaultKey, AssetWitness, PartialVault}; -use miden_protocol::block::{BlockHeader, BlockNumber}; +use miden_protocol::block::{BlockHeader, BlockNumber, SignedBlock}; use miden_protocol::crypto::merkle::mmr::{Forest, MmrPeaks, PartialMmr}; use miden_protocol::crypto::merkle::smt::SmtProof; use miden_protocol::note::NoteScript; @@ -62,6 +63,56 @@ impl StoreClient { Self { inner: store } } + /// Opens a block subscription stream starting from `block_from` (inclusive). + /// + /// On `Unavailable` errors the connection is retried with exponential backoff. The returned + /// stream yields decoded [`SignedBlock`]s as they arrive. + #[instrument(target = COMPONENT, name = "store.client.block_subscription_with_retry", skip_all, err)] + pub async fn block_subscription_with_retry( + &self, + block_from: BlockNumber, + ) -> Result> + Send + 'static, StoreError> + { + let mut retry_counter = 0u32; + loop { + match self.block_subscription(block_from).await { + Err(StoreError::GrpcClientError(err)) if err.code() == tonic::Code::Unavailable => { + let backoff = Duration::from_millis(500) + .saturating_mul(1 << retry_counter.min(6)) + .min(Duration::from_secs(30)); + + tracing::warn!( + ?backoff, + %retry_counter, + %err, + "store connection failed while subscribing to blocks, retrying" + ); + + retry_counter += 1; + tokio::time::sleep(backoff).await; + }, + result => return result, + } + } + } + + async fn block_subscription( + &self, + block_from: BlockNumber, + ) -> Result> + Send + 'static, StoreError> + { + let request = proto::store::BlockSubscriptionRequest { block_from: block_from.as_u32() }; + + let stream = self.inner.clone().block_subscription(request).await?.into_inner(); + + Ok(stream.map(|res| { + let signed = res.map_err(StoreError::GrpcClientError)?; + SignedBlock::read_from_bytes(&signed.block).map_err(|err| { + StoreError::DeserializationError(ConversionError::from(err).context("SignedBlock")) + }) + })) + } + /// Returns the block header and MMR peaks at the current chain tip. #[instrument(target = COMPONENT, name = "store.client.get_latest_blockchain_data_with_retry", skip_all, err)] pub async fn get_latest_blockchain_data_with_retry( diff --git a/bin/ntx-builder/src/commands/mod.rs b/bin/ntx-builder/src/commands/mod.rs index e309419caa..f1523b91fc 100644 --- a/bin/ntx-builder/src/commands/mod.rs +++ b/bin/ntx-builder/src/commands/mod.rs @@ -19,10 +19,12 @@ const ENV_TX_PROVER_URL: &str = "MIDEN_NODE_NTX_BUILDER_NTX_PROVER_URL"; const ENV_SCRIPT_CACHE_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SCRIPT_CACHE_SIZE"; const ENV_MAX_CYCLES: &str = "MIDEN_NODE_NTX_BUILDER_MAX_CYCLES"; const ENV_SQLITE_CONNECTION_POOL_SIZE: &str = "MIDEN_NODE_NTX_BUILDER_SQLITE_CONNECTION_POOL_SIZE"; +const ENV_TX_EXPIRATION_DELTA: &str = "MIDEN_NODE_NTX_BUILDER_TX_EXPIRATION_DELTA"; const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(5 * 60); const DEFAULT_SCRIPT_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1000).unwrap(); const DEFAULT_MAX_CYCLES: u32 = 1 << 18; +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 5; #[derive(Parser)] #[command(version, about, long_about = None)] @@ -110,6 +112,10 @@ pub enum NtxBuilderCommand { /// OpenTelemetry documentation. See our operator manual for further details. #[arg(long = "enable-otel", default_value_t = false, env = ENV_ENABLE_OTEL, value_name = "BOOL")] enable_otel: bool, + + /// Sets the transaction expiration delta (in blocks). + #[arg(long = "tx-expiration-delta", default_value_t = DEFAULT_TX_EXPIRATION_DELTA, env = ENV_TX_EXPIRATION_DELTA, value_name = "NUM")] + tx_expiration_delta: u16, }, } @@ -128,6 +134,7 @@ impl NtxBuilderCommand { sqlite_connection_pool_size, data_directory, enable_otel: _, + tx_expiration_delta, } = self; let listener = TcpListener::bind(listen) @@ -147,7 +154,8 @@ impl NtxBuilderCommand { .with_idle_timeout(idle_timeout) .with_max_account_crashes(max_account_crashes) .with_max_cycles(max_tx_cycles) - .with_sqlite_connection_pool_size(sqlite_connection_pool_size); + .with_sqlite_connection_pool_size(sqlite_connection_pool_size) + .with_tx_expiration_delta(tx_expiration_delta); config .build() diff --git a/bin/ntx-builder/src/committed_block.rs b/bin/ntx-builder/src/committed_block.rs new file mode 100644 index 0000000000..13aafd1e18 --- /dev/null +++ b/bin/ntx-builder/src/committed_block.rs @@ -0,0 +1,61 @@ +use miden_node_proto::domain::account::NetworkAccountId; +use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::block::{BlockHeader, SignedBlock}; +use miden_protocol::note::Nullifier; +use miden_protocol::transaction::OutputNote; +use miden_standards::note::AccountTargetNetworkNote; + +/// Network-relevant state extracted from a committed [`SignedBlock`]. +/// +/// Produced once per committed block on the ntx-builder side; downstream code (coordinator, DB) +/// applies the contained effects to local state and notifies affected actors. +#[derive(Debug, Clone)] +pub struct CommittedBlockEffects { + pub header: BlockHeader, + pub network_notes: Vec, + pub nullifiers: Vec, + pub network_account_updates: Vec<(NetworkAccountId, AccountUpdateDetails)>, +} + +impl CommittedBlockEffects { + /// Filters the committed block down to the slice the ntx-builder cares about: network notes, + /// network-account updates, and nullifiers. + /// + /// Non-network output notes and non-network account updates are dropped. Private output notes + /// cannot be network notes (which must be public) and are skipped. + pub fn from_signed_block(block: &SignedBlock) -> Self { + let header = block.header().clone(); + let body = block.body(); + + let mut network_notes = Vec::new(); + for batch in body.output_note_batches() { + for (_idx, output_note) in batch { + if let OutputNote::Public(public) = output_note + && let Ok(network_note) = + AccountTargetNetworkNote::new(public.as_note().clone()) + { + network_notes.push(network_note); + } + } + } + + let nullifiers = body.created_nullifiers().to_vec(); + + let network_account_updates = body + .updated_accounts() + .iter() + .filter_map(|update| { + let account_id = update.account_id(); + let network_id = NetworkAccountId::try_from(account_id).ok()?; + Some((network_id, update.details().clone())) + }) + .collect(); + + Self { + header, + network_notes, + nullifiers, + network_account_updates, + } + } +} diff --git a/bin/ntx-builder/src/coordinator.rs b/bin/ntx-builder/src/coordinator.rs index 7fdb6f0d5d..4fbc2a470b 100644 --- a/bin/ntx-builder/src/coordinator.rs +++ b/bin/ntx-builder/src/coordinator.rs @@ -3,19 +3,18 @@ use std::sync::Arc; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; -use miden_node_proto::domain::mempool::MempoolEvent; -use miden_protocol::account::delta::AccountUpdateDetails; use tokio::sync::{Notify, Semaphore}; use tokio::task::JoinSet; use crate::actor::{AccountActor, AccountActorContext}; +use crate::committed_block::CommittedBlockEffects; use crate::db::Db; -// WRITE EVENT RESULT +// APPLY BLOCK RESULT // ================================================================================================ -/// Result of writing a mempool event to the database. -pub struct WriteEventResult { +/// Result of applying a committed block to the database. +pub struct ApplyBlockResult { /// Accounts that should be notified of state changes. pub accounts_to_notify: Vec, } @@ -94,7 +93,7 @@ impl ActorHandle { /// /// The coordinator operates in an event-driven manner: /// 1. Network accounts are registered and actors spawned as needed. -/// 2. Mempool events are written to DB, then actors are notified. +/// 2. Committed block effects are written to DB, then actors are notified. /// 3. Actor completion/failure events are monitored and handled. /// 4. Failed or completed actors are cleaned up from the registry. pub struct Coordinator { @@ -209,6 +208,17 @@ impl Coordinator { } } + /// Notifies every active actor that a new block has been applied. + /// + /// Called once per committed block so that actors waiting on their own submitted transaction + /// (in [`ActorMode::WaitForNextBlock`](crate::actor::ActorMode)) reliably wake up even when + /// the block did not touch their account. + pub fn notify_all(&self) { + for handle in self.actor_registry.values() { + handle.notify(); + } + } + /// Waits for the next actor to complete and handles the outcome. /// /// This method monitors the join set for actor task completion and handles @@ -256,47 +266,39 @@ impl Coordinator { } } - /// Notifies account actors that are affected by a `TransactionAdded` event. + /// Notifies account actors affected by newly created network notes or account updates in a + /// committed block. /// - /// Only actors that are currently active are notified. Since event effects are already - /// persisted in the DB by `write_event()`, actors that spawn later read their state from the - /// DB and do not need predating events. + /// Only actors that are currently active are notified. Since the block effects are already + /// persisted in the DB by `apply_block()`, actors that spawn later read their state from the + /// DB and do not need to replay predating blocks. /// /// Returns account IDs of note targets that do not have active actors (e.g. previously /// deactivated due to sterility). The caller can use this to re-activate actors for those /// accounts. - pub fn send_targeted(&self, event: &MempoolEvent) -> Vec { + pub fn send_targeted(&self, effects: &CommittedBlockEffects) -> Vec { let mut target_account_ids = HashSet::new(); let mut inactive_targets = Vec::new(); - if let MempoolEvent::TransactionAdded { network_notes, account_delta, .. } = event { - // We need to inform the account if it was updated. This lets it know that its own - // transaction has been applied, and in the future also resolves race conditions with - // external network transactions (once these are allowed). - if let Some(AccountUpdateDetails::Delta(delta)) = account_delta { - let account_id = delta.id(); - if account_id.is_network() { - let network_account_id = - account_id.try_into().expect("account is network account"); - if self.actor_registry.contains_key(&network_account_id) { - target_account_ids.insert(network_account_id); - } - } + // Notify any active actor whose network account was updated in this block. + for (account_id, _details) in &effects.network_account_updates { + if self.actor_registry.contains_key(account_id) { + target_account_ids.insert(*account_id); } + } + + // Determine target actors for each newly created network note. + for note in &effects.network_notes { + let account = NetworkAccountId::try_from(note.target_account_id()) + .expect("network note target account should be a network account"); - // Determine target actors for each note. - for note in network_notes { - let account = note.target_account_id(); - let account = NetworkAccountId::try_from(account) - .expect("network note target account should be a network account"); - - if self.actor_registry.contains_key(&account) { - target_account_ids.insert(account); - } else { - inactive_targets.push(account); - } + if self.actor_registry.contains_key(&account) { + target_account_ids.insert(account); + } else { + inactive_targets.push(account); } } + // Notify target actors. for account_id in &target_account_ids { if let Some(handle) = self.actor_registry.get(account_id) { @@ -307,48 +309,16 @@ impl Coordinator { inactive_targets } - /// Writes mempool event effects to the database. + /// Applies a committed block's effects to the database. /// - /// This must be called BEFORE sending notifications to actors. Returns a [`WriteEventResult`] - /// with the accounts to notify and cancel. - pub async fn write_event( + /// This must be called BEFORE sending notifications to actors. Returns an [`ApplyBlockResult`] + /// with the accounts that should be notified. + pub async fn apply_block( &self, - event: &MempoolEvent, - ) -> Result { - match event { - MempoolEvent::TransactionAdded { - id, - nullifiers, - network_notes, - account_delta, - } => { - self.db - .handle_transaction_added( - *id, - account_delta.clone(), - network_notes.clone(), - nullifiers.clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: Vec::new() }) - }, - MempoolEvent::BlockCommitted { header, txs } => { - let affected_accounts = self - .db - .handle_block_committed( - txs.clone(), - header.block_num(), - header.as_ref().clone(), - ) - .await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - MempoolEvent::TransactionsReverted(tx_ids) => { - let affected_accounts = - self.db.handle_transactions_reverted(tx_ids.iter().copied().collect()).await?; - Ok(WriteEventResult { accounts_to_notify: affected_accounts }) - }, - } + effects: &CommittedBlockEffects, + ) -> Result { + let affected_accounts = self.db.apply_committed_block(effects.clone()).await?; + Ok(ApplyBlockResult { accounts_to_notify: affected_accounts }) } } @@ -363,7 +333,7 @@ impl Coordinator { #[cfg(test)] mod tests { - use miden_node_proto::domain::mempool::MempoolEvent; + use miden_protocol::block::BlockNumber; use super::*; use crate::actor::AccountActorContext; @@ -392,14 +362,14 @@ mod tests { let note_active = mock_single_target_note(active_id, 10); let note_inactive = mock_single_target_note(inactive_id, 20); - let event = MempoolEvent::TransactionAdded { - id: mock_tx_id(1), - nullifiers: vec![], + let effects = CommittedBlockEffects { + header: mock_block_header(BlockNumber::from(1)), network_notes: vec![note_active, note_inactive], - account_delta: None, + nullifiers: vec![], + network_account_updates: vec![], }; - let inactive_targets = coordinator.send_targeted(&event); + let inactive_targets = coordinator.send_targeted(&effects); assert_eq!(inactive_targets.len(), 1); assert_eq!(inactive_targets[0], inactive_id); diff --git a/bin/ntx-builder/src/db/migrations/2026020900000_setup/up.sql b/bin/ntx-builder/src/db/migrations/2026020900000_setup/up.sql index 46d71689c0..b151cb3345 100644 --- a/bin/ntx-builder/src/db/migrations/2026020900000_setup/up.sql +++ b/bin/ntx-builder/src/db/migrations/2026020900000_setup/up.sql @@ -11,32 +11,15 @@ CREATE TABLE chain_state ( CONSTRAINT chain_state_block_num_is_u32 CHECK (block_num BETWEEN 0 AND 0xFFFFFFFF) ); --- Account states: both committed and inflight. --- Committed rows have transaction_id = NULL. Inflight rows have transaction_id set. --- The auto-incrementing order_id preserves insertion order (VecDeque semantics). +-- Network account states, one row per network account. CREATE TABLE accounts ( - -- Auto-incrementing ID preserves insertion order. - order_id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, -- AccountId serialized bytes (8 bytes). - account_id BLOB NOT NULL, + account_id BLOB PRIMARY KEY, -- Serialized Account state. - account_data BLOB NOT NULL, - -- NULL if this is the committed state; transaction ID if inflight. - transaction_id BLOB -); - --- At most one committed row per account. -CREATE UNIQUE INDEX idx_accounts_committed ON accounts(account_id) WHERE transaction_id IS NULL; --- At most one inflight row per (account, transaction) pair. -CREATE UNIQUE INDEX idx_accounts_inflight ON accounts(account_id, transaction_id) - WHERE transaction_id IS NOT NULL; -CREATE INDEX idx_accounts_account ON accounts(account_id); -CREATE INDEX idx_accounts_tx ON accounts(transaction_id) WHERE transaction_id IS NOT NULL; + account_data BLOB NOT NULL +) WITHOUT ROWID; --- Notes: committed, inflight, and nullified — all in one table. --- created_by = NULL means committed note; non-NULL means created by inflight tx. --- consumed_by = NULL means unconsumed; non-NULL means consumed by inflight tx. --- committed_at = block number when the consuming transaction was committed on-chain. +-- Network notes received from committed blocks, keyed by nullifier. CREATE TABLE notes ( -- Nullifier bytes (32 bytes). Primary key. nullifier BLOB PRIMARY KEY, @@ -52,12 +35,8 @@ CREATE TABLE notes ( last_attempt INTEGER, -- Latest execution error message. NULL if no error recorded. last_error TEXT, - -- NULL if the note came from a committed block; transaction ID if created by inflight tx. - created_by BLOB, - -- NULL if unconsumed; transaction ID of the consuming inflight tx. - consumed_by BLOB, -- Block number at which the note's consuming transaction was committed. - -- NULL while the note is still pending or in-flight; set on block commit. + -- NULL while the note is still pending; set on block commit. committed_at INTEGER, CONSTRAINT notes_attempt_count_non_negative CHECK (attempt_count >= 0), @@ -66,8 +45,6 @@ CREATE TABLE notes ( ) WITHOUT ROWID; CREATE INDEX idx_notes_account ON notes(account_id); -CREATE INDEX idx_notes_created_by ON notes(created_by) WHERE created_by IS NOT NULL; -CREATE INDEX idx_notes_consumed_by ON notes(consumed_by) WHERE consumed_by IS NOT NULL; CREATE INDEX idx_notes_note_id ON notes(note_id) WHERE note_id IS NOT NULL; -- Persistent cache of note scripts, keyed by script root hash. diff --git a/bin/ntx-builder/src/db/mod.rs b/bin/ntx-builder/src/db/mod.rs index a7644f485f..5b8820d61b 100644 --- a/bin/ntx-builder/src/db/mod.rs +++ b/bin/ntx-builder/src/db/mod.rs @@ -6,13 +6,12 @@ use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::Word; use miden_protocol::account::Account; -use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; use miden_standards::note::AccountTargetNetworkNote; use tracing::{info, instrument}; +use crate::committed_block::CommittedBlockEffects; use crate::db::migrations::apply_migrations; use crate::db::models::queries; use crate::{COMPONENT, NoteError}; @@ -94,19 +93,10 @@ impl Db { .await } - /// Returns `true` when an inflight account row exists with the given transaction ID. - pub async fn transaction_exists(&self, tx_id: TransactionId) -> Result { + /// Returns `true` if a row exists for the given account. + pub async fn has_account(&self, account_id: NetworkAccountId) -> Result { self.inner - .query("transaction_exists", move |conn| queries::transaction_exists(conn, &tx_id)) - .await - } - - /// Returns `true` if a committed account state exists for the given account. - pub async fn has_committed_account(&self, account_id: NetworkAccountId) -> Result { - self.inner - .query("has_committed_account", move |conn| { - Ok(queries::get_committed_account(conn, account_id)?.is_some()) - }) + .query("has_account", move |conn| Ok(queries::get_account(conn, account_id)?.is_some())) .await } @@ -127,6 +117,19 @@ impl Db { .await } + /// Returns the commitment of the stored account state, or `None` if no row exists. + /// + /// Used by an account actor in `WaitForNextBlock` to detect when its in-flight submission has + /// been included in a block, at which point the on-chain account commitment differs from the + /// one the tx was executed against. + pub async fn account_commitment(&self, account_id: NetworkAccountId) -> Result> { + self.inner + .query("account_commitment", move |conn| { + Ok(queries::get_account(conn, account_id)?.map(|a| a.to_commitment())) + }) + .await + } + /// Marks notes as failed by incrementing `attempt_count`, setting `last_attempt`, and storing /// the latest error message. pub async fn notes_failed( @@ -149,54 +152,24 @@ impl Db { .await } - /// Handles a `TransactionAdded` mempool event by writing effects to the DB. - pub async fn handle_transaction_added( - &self, - tx_id: TransactionId, - account_delta: Option, - notes: Vec, - nullifiers: Vec, - ) -> Result<()> { - self.inner - .transact("handle_transaction_added", move |conn| { - queries::add_transaction(conn, &tx_id, account_delta.as_ref(), ¬es, &nullifiers) - }) - .await - } - - /// Handles a `BlockCommitted` mempool event by committing transaction effects. + /// Applies a committed block's effects to the database in a single transaction. /// /// Returns the list of affected account IDs that should be notified. - pub async fn handle_block_committed( - &self, - txs: Vec, - block_num: BlockNumber, - header: BlockHeader, - ) -> Result> { - self.inner - .transact("handle_block_committed", move |conn| { - queries::commit_block(conn, &txs, block_num, &header) - }) - .await - } - - /// Handles a `TransactionsReverted` mempool event by undoing transaction effects. - /// - /// Returns all affected account IDs that should be notified. - pub async fn handle_transactions_reverted( + pub async fn apply_committed_block( &self, - tx_ids: Vec, + effects: CommittedBlockEffects, ) -> Result> { self.inner - .transact("handle_transactions_reverted", move |conn| { - queries::revert_transaction(conn, &tx_ids) + .transact("apply_committed_block", move |conn| { + queries::apply_committed_block(conn, &effects) }) .await } - /// Purges all inflight state. Called on startup to get a clean slate. - pub async fn purge_inflight(&self) -> Result<()> { - self.inner.transact("purge_inflight", queries::purge_inflight).await + /// Reads the singleton chain state row, returning the last synced block number and its header + /// if any block has been applied locally. + pub async fn get_chain_state(&self) -> Result> { + self.inner.query("get_chain_state", queries::select_chain_state).await } /// Inserts or replaces the singleton chain state row. @@ -221,7 +194,7 @@ impl Db { ) -> Result<()> { self.inner .transact("sync_account_from_store", move |conn| { - queries::upsert_committed_account(conn, account_id, &account)?; + queries::upsert_account(conn, account_id, &account)?; queries::insert_committed_notes(conn, ¬es)?; Ok(()) }) diff --git a/bin/ntx-builder/src/db/models/account_effect.rs b/bin/ntx-builder/src/db/models/account_effect.rs index 7a6acf0058..37250a342b 100644 --- a/bin/ntx-builder/src/db/models/account_effect.rs +++ b/bin/ntx-builder/src/db/models/account_effect.rs @@ -1,4 +1,3 @@ -use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::delta::AccountUpdateDetails; use miden_protocol::account::{Account, AccountDelta, AccountId}; @@ -28,11 +27,6 @@ impl NetworkAccountEffect { update.protocol_account_id().is_network().then_some(update) } - pub fn network_account_id(&self) -> NetworkAccountId { - // SAFETY: This is a network account by construction. - self.protocol_account_id().try_into().unwrap() - } - fn protocol_account_id(&self) -> AccountId { match self { NetworkAccountEffect::Created(acc) => acc.id(), diff --git a/bin/ntx-builder/src/db/models/conv.rs b/bin/ntx-builder/src/db/models/conv.rs index 0fed2b9593..b6842fa87e 100644 --- a/bin/ntx-builder/src/db/models/conv.rs +++ b/bin/ntx-builder/src/db/models/conv.rs @@ -6,7 +6,6 @@ use miden_protocol::Word; use miden_protocol::account::{Account, AccountId}; use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; -use miden_protocol::transaction::TransactionId; use miden_protocol::utils::serde::{Deserializable, Serializable}; // SERIALIZATION (domain → DB) @@ -24,10 +23,6 @@ pub fn network_account_id_to_bytes(id: NetworkAccountId) -> Vec { id.inner().to_bytes() } -pub fn transaction_id_to_bytes(id: &TransactionId) -> Vec { - id.to_bytes() -} - pub fn nullifier_to_bytes(nullifier: &Nullifier) -> Vec { nullifier.to_bytes() } @@ -73,3 +68,8 @@ pub fn note_script_to_bytes(script: &NoteScript) -> Vec { pub fn note_script_from_bytes(bytes: &[u8]) -> Result { NoteScript::read_from_bytes(bytes).map_err(|e| DatabaseError::deserialization("note script", e)) } + +pub fn block_header_from_bytes(bytes: &[u8]) -> Result { + BlockHeader::read_from_bytes(bytes) + .map_err(|e| DatabaseError::deserialization("block header", e)) +} diff --git a/bin/ntx-builder/src/db/models/queries/accounts.rs b/bin/ntx-builder/src/db/models/queries/accounts.rs index 79035918e0..e89ce7a580 100644 --- a/bin/ntx-builder/src/db/models/queries/accounts.rs +++ b/bin/ntx-builder/src/db/models/queries/accounts.rs @@ -1,11 +1,9 @@ //! Account-related queries and models. -use diesel::dsl::exists; use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::account::Account; -use miden_protocol::transaction::TransactionId; use crate::db::models::conv as conversions; use crate::db::schema; @@ -13,16 +11,13 @@ use crate::db::schema; // MODELS // ================================================================================================ -/// Row for inserting into the unified `accounts` table. -/// -/// `transaction_id = None` means committed; `Some(tx_id_bytes)` means inflight. +/// Row for inserting into the `accounts` table. #[derive(Debug, Clone, Insertable)] #[diesel(table_name = schema::accounts)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] pub struct AccountInsert { pub account_id: Vec, pub account_data: Vec, - pub transaction_id: Option>, } /// Row read from `accounts`. @@ -36,54 +31,32 @@ pub struct AccountRow { // QUERIES // ================================================================================================ -/// Inserts or replaces the committed account state (`transaction_id = NULL`). -/// -/// Deletes any existing committed row first, then inserts a fresh one. +/// Inserts or replaces the account state. /// /// # Raw SQL /// /// ```sql -/// DELETE FROM accounts WHERE account_id = ?1 AND transaction_id IS NULL -/// -/// INSERT INTO accounts (account_id, account_data, transaction_id) -/// VALUES (?1, ?2, NULL) +/// INSERT OR REPLACE INTO accounts (account_id, account_data) VALUES (?1, ?2) /// ``` -pub fn upsert_committed_account( +pub fn upsert_account( conn: &mut SqliteConnection, account_id: NetworkAccountId, account: &Account, ) -> Result<(), DatabaseError> { - let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - - // Delete the existing committed row (if any). - diesel::delete( - schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()), - ) - .execute(conn)?; - - // Insert the new committed row. let row = AccountInsert { - account_id: account_id_bytes, + account_id: conversions::network_account_id_to_bytes(account_id), account_data: conversions::account_to_bytes(account), - transaction_id: None, }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn)?; + diesel::replace_into(schema::accounts::table).values(&row).execute(conn)?; Ok(()) } -/// Returns the latest account state: last inflight row (highest `order_id`), or committed if -/// none. +/// Returns the account state, or `None` if no row exists. /// /// # Raw SQL /// /// ```sql -/// SELECT account_data -/// FROM accounts -/// WHERE account_id = ?1 -/// ORDER BY order_id DESC -/// LIMIT 1 +/// SELECT account_data FROM accounts WHERE account_id = ?1 LIMIT 1 /// ``` pub fn get_account( conn: &mut SqliteConnection, @@ -91,37 +64,8 @@ pub fn get_account( ) -> Result, DatabaseError> { let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - // ORDER BY order_id DESC returns the latest inflight first, then committed. - let row: Option = schema::accounts::table - .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .order(schema::accounts::order_id.desc()) - .select(AccountRow::as_select()) - .first(conn) - .optional()?; - - row.map(|AccountRow { account_data, .. }| conversions::account_from_bytes(&account_data)) - .transpose() -} - -/// Returns the committed account state (`transaction_id IS NULL`), ignoring any inflight rows. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT account_data -/// FROM accounts -/// WHERE account_id = ?1 AND transaction_id IS NULL -/// LIMIT 1 -/// ``` -pub fn get_committed_account( - conn: &mut SqliteConnection, - account_id: NetworkAccountId, -) -> Result, DatabaseError> { - let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - let row: Option = schema::accounts::table .filter(schema::accounts::account_id.eq(&account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()) .select(AccountRow::as_select()) .first(conn) .optional()?; @@ -129,24 +73,3 @@ pub fn get_committed_account( row.map(|AccountRow { account_data, .. }| conversions::account_from_bytes(&account_data)) .transpose() } - -/// Returns `true` when an inflight account row exists with the given `transaction_id`. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT EXISTS (SELECT 1 FROM accounts WHERE transaction_id = ?1) -/// ``` -pub fn transaction_exists( - conn: &mut SqliteConnection, - tx_id: &TransactionId, -) -> Result { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - let result: bool = diesel::select(exists( - schema::accounts::table.filter(schema::accounts::transaction_id.eq(&tx_id_bytes)), - )) - .get_result(conn)?; - - Ok(result) -} diff --git a/bin/ntx-builder/src/db/models/queries/chain_state.rs b/bin/ntx-builder/src/db/models/queries/chain_state.rs index 9b529cadc5..8f55124e3c 100644 --- a/bin/ntx-builder/src/db/models/queries/chain_state.rs +++ b/bin/ntx-builder/src/db/models/queries/chain_state.rs @@ -20,6 +20,14 @@ pub struct ChainStateInsert { pub block_header: Vec, } +#[derive(Debug, Clone, Queryable, Selectable)] +#[diesel(table_name = schema::chain_state)] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +pub struct ChainStateRow { + pub block_num: i64, + pub block_header: Vec, +} + // QUERIES // ================================================================================================ @@ -44,3 +52,27 @@ pub fn upsert_chain_state( diesel::replace_into(schema::chain_state::table).values(&row).execute(conn)?; Ok(()) } + +/// Reads the singleton chain state row, if any. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT block_num, block_header FROM chain_state WHERE id = 0 +/// ``` +pub fn select_chain_state( + conn: &mut SqliteConnection, +) -> Result, DatabaseError> { + let row: Option = schema::chain_state::table + .find(0i32) + .select(ChainStateRow::as_select()) + .first(conn) + .optional()?; + + row.map(|row| { + let block_num = conversions::block_num_from_i64(row.block_num); + let header = conversions::block_header_from_bytes(&row.block_header)?; + Ok((block_num, header)) + }) + .transpose() +} diff --git a/bin/ntx-builder/src/db/models/queries/mod.rs b/bin/ntx-builder/src/db/models/queries/mod.rs index 1bf74a66ba..5aedd7e479 100644 --- a/bin/ntx-builder/src/db/models/queries/mod.rs +++ b/bin/ntx-builder/src/db/models/queries/mod.rs @@ -5,14 +5,9 @@ use std::collections::HashSet; use diesel::prelude::*; use miden_node_db::DatabaseError; use miden_node_proto::domain::account::NetworkAccountId; -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::{BlockHeader, BlockNumber}; -use miden_protocol::note::Nullifier; -use miden_protocol::transaction::TransactionId; -use miden_protocol::utils::serde::Serializable; -use miden_standards::note::AccountTargetNetworkNote; use super::account_effect::NetworkAccountEffect; +use crate::committed_block::CommittedBlockEffects; use crate::db::models::conv as conversions; use crate::db::schema; @@ -31,309 +26,86 @@ pub use notes::*; #[cfg(test)] mod tests; -// STARTUP QUERIES +// COMMITTED BLOCK HANDLER // ================================================================================================ -/// Purges all inflight state. Called on startup to get a clean state. +/// Applies the network-relevant effects of a committed block to the local DB. /// -/// - Deletes account rows with `transaction_id IS NOT NULL`. -/// - Deletes note rows with `created_by IS NOT NULL`. -/// - Sets `consumed_by = NULL` on notes consumed by inflight transactions. +/// In a single transaction: +/// - Updates or creates network account state from the block's account updates. +/// - Inserts newly created network notes. +/// - Marks notes consumed by the block's nullifiers as committed at this block. +/// - Updates the chain state singleton to point at this block. /// -/// # Raw SQL -/// -/// ```sql -/// DELETE FROM accounts WHERE transaction_id IS NOT NULL -/// -/// DELETE FROM notes WHERE created_by IS NOT NULL -/// -/// UPDATE notes SET consumed_by = NULL WHERE consumed_by IS NOT NULL AND committed_at IS NULL -/// ``` -pub fn purge_inflight(conn: &mut SqliteConnection) -> Result<(), DatabaseError> { - // Delete inflight account rows. - diesel::delete(schema::accounts::table.filter(schema::accounts::transaction_id.is_not_null())) - .execute(conn)?; - - // Delete inflight-created notes. - diesel::delete(schema::notes::table.filter(schema::notes::created_by.is_not_null())) - .execute(conn)?; - - // Un-nullify notes consumed by inflight transactions (skip committed notes). - diesel::update( - schema::notes::table - .filter(schema::notes::consumed_by.is_not_null()) - .filter(schema::notes::committed_at.is_null()), - ) - .set(schema::notes::consumed_by.eq(None::>)) - .execute(conn)?; - - Ok(()) -} - -// MEMPOOL EVENT HANDLERS -// ================================================================================================ - -/// Handles a `TransactionAdded` event by writing effects to the DB. -/// -/// # Raw SQL -/// -/// For account updates (applies delta to latest state and inserts inflight row): -/// -/// ```sql -/// -- Fetch latest account (see latest_account) -/// INSERT INTO accounts (account_id, transaction_id, account_data) -/// VALUES (?1, ?2, ?3) -/// ``` -/// -/// Per note (idempotent via `INSERT OR IGNORE`): -/// -/// ```sql -/// INSERT OR IGNORE INTO notes -/// (nullifier, account_id, note_data, attempt_count, last_attempt, created_by, consumed_by) -/// VALUES (?1, ?2, ?3, 0, NULL, ?4, NULL) -/// ``` -/// -/// Per nullifier (marks notes as consumed): -/// -/// ```sql -/// UPDATE notes -/// SET consumed_by = ?1 -/// WHERE nullifier = ?2 AND consumed_by IS NULL -/// ``` -pub fn add_transaction( +/// Returns the set of network account IDs whose state changed and whose active actors (if any) +/// should be notified to re-evaluate their work. +pub fn apply_committed_block( conn: &mut SqliteConnection, - tx_id: &TransactionId, - account_delta: Option<&AccountUpdateDetails>, - notes: &[AccountTargetNetworkNote], - nullifiers: &[Nullifier], -) -> Result<(), DatabaseError> { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Process account delta. - if let Some(update) = account_delta.and_then(NetworkAccountEffect::from_protocol) { - let account_id = update.network_account_id(); - match update { - NetworkAccountEffect::Updated(ref account_delta) => { - // Query latest_account, apply delta, insert inflight row. - let current_account = - get_account(conn, account_id)?.expect("account must exist to apply delta"); - let mut updated = current_account; - updated.apply_delta(account_delta).expect( - "network account delta should apply since it was accepted by the mempool", - ); + effects: &CommittedBlockEffects, +) -> Result, DatabaseError> { + let block_num = effects.header.block_num(); + let mut affected_accounts: HashSet = HashSet::new(); - let insert = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(tx_id_bytes.clone()), - account_data: conversions::account_to_bytes(&updated), - }; - diesel::insert_into(schema::accounts::table).values(&insert).execute(conn)?; + // Apply account updates. + for (network_id, details) in &effects.network_account_updates { + let Some(effect) = NetworkAccountEffect::from_protocol(details) else { + continue; + }; + match effect { + NetworkAccountEffect::Created(account) => { + upsert_account(conn, *network_id, &account)?; }, - NetworkAccountEffect::Created(ref account) => { - let insert = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(tx_id_bytes.clone()), - account_data: conversions::account_to_bytes(account), - }; - diesel::insert_into(schema::accounts::table).values(&insert).execute(conn)?; + NetworkAccountEffect::Updated(delta) => { + let mut account = get_account(conn, *network_id)?.ok_or_else(|| { + DatabaseError::Io(std::io::Error::other(format!( + "account {network_id} must exist to apply committed delta from block \ + {block_num}" + ))) + })?; + account + .apply_delta(&delta) + .expect("committed account delta should apply cleanly"); + upsert_account(conn, *network_id, &account)?; }, } + affected_accounts.insert(*network_id); } - // Insert notes with created_by = tx_id. - // Uses INSERT OR IGNORE to make this idempotent if the same event is delivered twice - // (the nullifier PK would otherwise cause a constraint violation). - for note in notes { - let insert = NoteInsert { - nullifier: conversions::nullifier_to_bytes(¬e.as_note().nullifier()), - account_id: conversions::network_account_id_to_bytes( - note.target_account_id() - .try_into() - .expect("network note's target account must be a network account"), - ), - note_data: note.as_note().to_bytes(), - note_id: Some(conversions::note_id_to_bytes(¬e.as_note().id())), - attempt_count: 0, - last_attempt: None, - last_error: None, - created_by: Some(tx_id_bytes.clone()), - consumed_by: None, - committed_at: None, - }; - diesel::insert_or_ignore_into(schema::notes::table) - .values(&insert) - .execute(conn)?; + // Insert newly created network notes. + if !effects.network_notes.is_empty() { + for note in &effects.network_notes { + let target_id = NetworkAccountId::try_from(note.target_account_id()) + .expect("network note's target account must be a network account"); + affected_accounts.insert(target_id); + } + insert_committed_notes(conn, &effects.network_notes)?; } - // Mark consumed notes: set consumed_by = tx_id for matching nullifiers. - for nullifier in nullifiers { + // Mark consumed notes as committed. + let block_num_val = conversions::block_num_to_i64(block_num); + for nullifier in &effects.nullifiers { let nullifier_bytes = conversions::nullifier_to_bytes(nullifier); + // Collect affected account ids for matching notes before updating. + let matched_accounts: Vec> = schema::notes::table + .filter(schema::notes::nullifier.eq(&nullifier_bytes)) + .filter(schema::notes::committed_at.is_null()) + .select(schema::notes::account_id) + .load(conn)?; + for account_id_bytes in &matched_accounts { + affected_accounts.insert(conversions::network_account_id_from_bytes(account_id_bytes)?); + } - // Only mark notes that are not already consumed. diesel::update( schema::notes::table .find(&nullifier_bytes) - .filter(schema::notes::consumed_by.is_null()), + .filter(schema::notes::committed_at.is_null()), ) - .set(schema::notes::consumed_by.eq(Some(&tx_id_bytes))) + .set(schema::notes::committed_at.eq(Some(block_num_val))) .execute(conn)?; } - Ok(()) -} - -/// Handles a `BlockCommitted` event by committing transaction effects. -/// -/// # Raw SQL -/// -/// Per committed transaction: -/// -/// ```sql -/// -- Find inflight accounts for this tx -/// SELECT account_id FROM accounts WHERE transaction_id = ?1 -/// -/// -- Delete old committed row -/// DELETE FROM accounts WHERE account_id = ?1 AND transaction_id IS NULL -/// -/// -- Promote inflight row to committed -/// UPDATE accounts SET transaction_id = NULL -/// WHERE account_id = ?1 AND transaction_id = ?2 -/// -/// -- Mark consumed notes as committed -/// UPDATE notes SET committed_at = ?block_num WHERE consumed_by = ?1 -/// -/// -- Promote inflight-created notes to committed -/// UPDATE notes SET created_by = NULL WHERE created_by = ?1 -/// ``` -/// -/// Finally updates chain state (see [`upsert_chain_state`]). -pub fn commit_block( - conn: &mut SqliteConnection, - tx_ids: &[TransactionId], - block_num: BlockNumber, - block_header: &BlockHeader, -) -> Result, DatabaseError> { - let mut affected_accounts = HashSet::new(); - - for tx_id in tx_ids { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Promote inflight account rows: delete old committed, set transaction_id = NULL. - // Find accounts that have an inflight row for this tx. - let inflight_account_ids: Vec> = schema::accounts::table - .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)) - .select(schema::accounts::account_id) - .load(conn)?; - - for account_id_bytes in &inflight_account_ids { - affected_accounts.insert(conversions::network_account_id_from_bytes(account_id_bytes)?); - - // Delete the old committed row for this account. - diesel::delete( - schema::accounts::table - .filter(schema::accounts::account_id.eq(account_id_bytes)) - .filter(schema::accounts::transaction_id.is_null()), - ) - .execute(conn)?; - - // Promote the inflight row to committed (set transaction_id = NULL). - // Only promote the row for this specific tx. - diesel::update( - schema::accounts::table - .filter(schema::accounts::account_id.eq(account_id_bytes)) - .filter(schema::accounts::transaction_id.eq(&tx_id_bytes)), - ) - .set(schema::accounts::transaction_id.eq(None::>)) - .execute(conn)?; - } - - // Collect accounts of notes consumed by this tx. - let consumed_note_accounts: Vec> = schema::notes::table - .filter(schema::notes::consumed_by.eq(&tx_id_bytes)) - .select(schema::notes::account_id) - .load(conn)?; - for account_id_bytes in &consumed_note_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(account_id_bytes)?); - } - - // Mark consumed notes as committed (set committed_at = block_num). - let block_num_val = conversions::block_num_to_i64(block_num); - diesel::update(schema::notes::table.filter(schema::notes::consumed_by.eq(&tx_id_bytes))) - .set(schema::notes::committed_at.eq(Some(block_num_val))) - .execute(conn)?; - - // Promote inflight-created notes to committed (set created_by = NULL). - diesel::update(schema::notes::table.filter(schema::notes::created_by.eq(&tx_id_bytes))) - .set(schema::notes::created_by.eq(None::>)) - .execute(conn)?; - } - - // Update chain state. - upsert_chain_state(conn, block_num, block_header)?; + // Update chain state singleton. + upsert_chain_state(conn, block_num, &effects.header)?; Ok(affected_accounts.into_iter().collect()) } - -/// Handles a `TransactionsReverted` event by undoing transaction effects. -/// -/// Returns all affected account IDs (for notification). Accounts whose creation was fully -/// reverted are included. -/// -/// # Raw SQL -/// -/// Per reverted transaction: -/// -/// ```sql -/// DELETE FROM accounts WHERE transaction_id = ?1 RETURNING account_id -/// -/// DELETE FROM notes WHERE created_by = ?1 -/// -/// UPDATE notes SET consumed_by = NULL WHERE consumed_by = ?1 RETURNING account_id -/// ``` -pub fn revert_transaction( - conn: &mut SqliteConnection, - tx_ids: &[TransactionId], -) -> Result, DatabaseError> { - use diesel::sql_types::Binary; - - let mut affected_accounts = HashSet::new(); - - for tx_id in tx_ids { - let tx_id_bytes = conversions::transaction_id_to_bytes(tx_id); - - // Delete inflight account rows and collect affected account IDs. - let deleted_accounts: Vec = diesel::sql_query( - "DELETE FROM accounts WHERE transaction_id = ?1 RETURNING account_id", - ) - .bind::(&tx_id_bytes) - .load(conn)?; - - for row in &deleted_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(&row.account_id)?); - } - - // Delete inflight-created notes (created_by = tx_id). - diesel::delete(schema::notes::table.filter(schema::notes::created_by.eq(&tx_id_bytes))) - .execute(conn)?; - - // Restore consumed notes and collect affected account IDs. - let restored_accounts: Vec = diesel::sql_query( - "UPDATE notes SET consumed_by = NULL WHERE consumed_by = ?1 RETURNING account_id", - ) - .bind::(&tx_id_bytes) - .load(conn)?; - - for row in &restored_accounts { - affected_accounts.insert(conversions::network_account_id_from_bytes(&row.account_id)?); - } - } - - Ok(affected_accounts.into_iter().collect()) -} - -/// Helper row type for `RETURNING account_id` queries. -#[derive(diesel::QueryableByName)] -struct AccountIdRow { - #[diesel(sql_type = diesel::sql_types::Binary)] - account_id: Vec, -} diff --git a/bin/ntx-builder/src/db/models/queries/notes.rs b/bin/ntx-builder/src/db/models/queries/notes.rs index 384994c74f..15762ae9ec 100644 --- a/bin/ntx-builder/src/db/models/queries/notes.rs +++ b/bin/ntx-builder/src/db/models/queries/notes.rs @@ -25,7 +25,7 @@ pub struct NoteRow { pub last_attempt: Option, } -/// Row for inserting into the unified `notes` table. +/// Row for inserting into the `notes` table. #[derive(Debug, Clone, Insertable)] #[diesel(table_name = schema::notes)] #[diesel(check_for_backend(diesel::sqlite::Sqlite))] @@ -37,8 +37,6 @@ pub struct NoteInsert { pub attempt_count: i32, pub last_attempt: Option, pub last_error: Option, - pub created_by: Option>, - pub consumed_by: Option>, pub committed_at: Option, } @@ -51,14 +49,13 @@ pub struct NoteStatusRow { pub last_error: Option, pub attempt_count: i32, pub last_attempt: Option, - pub consumed_by: Option>, pub committed_at: Option, } // QUERIES // ================================================================================================ -/// Batch inserts committed notes (`created_by = NULL`, `consumed_by = NULL`). +/// Batch inserts notes received from a committed block. /// /// # Raw SQL /// @@ -67,8 +64,8 @@ pub struct NoteStatusRow { /// ```sql /// INSERT OR REPLACE INTO notes /// (nullifier, account_id, note_data, note_id, attempt_count, last_attempt, last_error, -/// created_by, consumed_by) -/// VALUES (?1, ?2, ?3, ?4, 0, NULL, NULL, NULL, NULL) +/// committed_at) +/// VALUES (?1, ?2, ?3, ?4, 0, NULL, NULL, NULL) /// ``` pub fn insert_committed_notes( conn: &mut SqliteConnection, @@ -86,8 +83,6 @@ pub fn insert_committed_notes( attempt_count: 0, last_attempt: None, last_error: None, - created_by: None, - consumed_by: None, committed_at: None, }; diesel::replace_into(schema::notes::table).values(&row).execute(conn)?; @@ -97,8 +92,8 @@ pub fn insert_committed_notes( /// Returns notes available for consumption by a given account. /// -/// Queries unconsumed notes (`consumed_by IS NULL`) for the account that have not exceeded the -/// maximum attempt count, then applies backoff and execution hint filtering in Rust. +/// Queries uncommitted notes for the account that have not exceeded the maximum attempt count, +/// then applies backoff and execution hint filtering in Rust. /// /// # Raw SQL /// @@ -107,7 +102,6 @@ pub fn insert_committed_notes( /// FROM notes /// WHERE /// account_id = ?1 -/// AND consumed_by IS NULL /// AND committed_at IS NULL /// AND attempt_count < ?2 /// ``` @@ -120,11 +114,8 @@ pub fn available_notes( ) -> Result, DatabaseError> { let account_id_bytes = conversions::network_account_id_to_bytes(account_id); - // Get unconsumed, uncommitted notes for this account that haven't exceeded the max - // attempt count. let rows: Vec = schema::notes::table .filter(schema::notes::account_id.eq(&account_id_bytes)) - .filter(schema::notes::consumed_by.is_null()) .filter(schema::notes::committed_at.is_null()) .filter(schema::notes::attempt_count.lt(max_attempts as i32)) .select(NoteRow::as_select()) @@ -185,7 +176,7 @@ pub fn notes_failed( /// # Raw SQL /// /// ```sql -/// SELECT note_id, last_error, attempt_count, last_attempt, consumed_by +/// SELECT note_id, last_error, attempt_count, last_attempt, committed_at /// FROM notes /// WHERE note_id = ?1 /// ``` diff --git a/bin/ntx-builder/src/db/models/queries/tests.rs b/bin/ntx-builder/src/db/models/queries/tests.rs index f46b1aafab..47787bacf2 100644 --- a/bin/ntx-builder/src/db/models/queries/tests.rs +++ b/bin/ntx-builder/src/db/models/queries/tests.rs @@ -8,6 +8,7 @@ use miden_protocol::block::BlockNumber; use super::*; use crate::NoteError; +use crate::committed_block::CommittedBlockEffects; use crate::db::models::conv as conversions; use crate::db::{Db, schema}; use crate::test_utils::*; @@ -30,344 +31,110 @@ fn count_notes(conn: &mut SqliteConnection) -> i64 { schema::notes::table.count().get_result(conn).unwrap() } -/// Counts the total number of rows in the `accounts` table. -fn count_accounts(conn: &mut SqliteConnection) -> i64 { - schema::accounts::table.count().get_result(conn).unwrap() -} - -/// Counts inflight account rows. -fn count_inflight_accounts(conn: &mut SqliteConnection) -> i64 { - schema::accounts::table - .filter(schema::accounts::transaction_id.is_not_null()) - .count() - .get_result(conn) - .unwrap() -} - -/// Counts committed account rows. -fn count_committed_accounts(conn: &mut SqliteConnection) -> i64 { - schema::accounts::table - .filter(schema::accounts::transaction_id.is_null()) - .count() - .get_result(conn) - .unwrap() -} - -// PURGE INFLIGHT TESTS +// APPLY COMMITTED BLOCK TESTS // ================================================================================================ #[test] -fn purge_inflight_clears_all_inflight_state() { +fn apply_committed_block_inserts_notes_and_advances_chain_state() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); let note = mock_single_target_note(account_id, 10); - // Insert committed account. - upsert_committed_account(conn, account_id, &mock_account(account_id)).unwrap(); + let block_num = BlockNumber::from(1u32); + let effects = CommittedBlockEffects { + header: mock_block_header(block_num), + network_notes: vec![note.clone()], + nullifiers: vec![], + network_account_updates: vec![], + }; - // Insert a transaction (creates inflight account row + note + consumption). - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); + let affected = apply_committed_block(conn, &effects).unwrap(); - assert!(count_inflight_accounts(conn) == 0); // No account delta, so no inflight account. + // Note inserted; note's target account is reported as affected. assert_eq!(count_notes(conn), 1); + assert!(affected.contains(&account_id)); - // Mark note as consumed by another tx. - let tx_id2 = mock_tx_id(2); - add_transaction(conn, &tx_id2, None, &[], &[note.as_note().nullifier()]).unwrap(); - - // Verify consumed_by is set. - let consumed_count: i64 = schema::notes::table - .filter(schema::notes::consumed_by.is_not_null()) - .count() - .get_result(conn) - .unwrap(); - assert_eq!(consumed_count, 1); - - // Purge inflight state. - purge_inflight(conn).unwrap(); - - // Inflight accounts should be gone. - assert_eq!(count_inflight_accounts(conn), 0); - // Committed account should remain. - assert_eq!(count_committed_accounts(conn), 1); - // Inflight-created notes should be gone. - assert_eq!(count_notes(conn), 0); + // Chain state singleton updated. + let stored = select_chain_state(conn).unwrap().unwrap(); + assert_eq!(stored.0, block_num); } -// HANDLE TRANSACTION ADDED TESTS -// ================================================================================================ - #[test] -fn transaction_added_inserts_notes_and_marks_consumed() { +fn apply_committed_block_marks_nullifiers_consumed() { let (conn, _dir) = &mut test_conn(); let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note1 = mock_single_target_note(account_id, 10); - let note2 = mock_single_target_note(account_id, 20); - - // Insert committed note first (to test consumption). - insert_committed_notes(conn, std::slice::from_ref(¬e1)).unwrap(); - assert_eq!(count_notes(conn), 1); + let note = mock_single_target_note(account_id, 10); + let note_id = note.as_note().id(); + let nullifier = note.as_note().nullifier(); - // Add transaction that creates note2 and consumes note1. - add_transaction( + // First block: create note. + let block_num1 = BlockNumber::from(1u32); + apply_committed_block( conn, - &tx_id, - None, - std::slice::from_ref(¬e2), - &[note1.as_note().nullifier()], + &CommittedBlockEffects { + header: mock_block_header(block_num1), + network_notes: vec![note.clone()], + nullifiers: vec![], + network_account_updates: vec![], + }, ) .unwrap(); - // Should now have 2 notes total. - assert_eq!(count_notes(conn), 2); - - // note1 should be consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e1.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_some()); - - // note2 should have created_by set. - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e2.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_some()); -} - -#[test] -fn transaction_added_is_idempotent_for_notes() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - - // Insert the same transaction twice. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - - // Should only have one note (INSERT OR IGNORE). - assert_eq!(count_notes(conn), 1); -} - -// HANDLE BLOCK COMMITTED TESTS -// ================================================================================================ - -#[test] -fn block_committed_promotes_inflight_notes_to_committed() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - - // Add a transaction that creates a note. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - - // Verify created_by is set. - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_some()); - - // Commit the block. - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // created_by should now be NULL (promoted to committed). - let created: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::created_by) - .first(conn) - .unwrap(); - assert!(created.is_none()); -} - -#[test] -fn block_committed_marks_consumed_notes_as_committed() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - let note_id = note.as_note().id(); - - // Insert a committed note. - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - assert_eq!(count_notes(conn), 1); - - // Consume it via a transaction. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note.as_note().nullifier()]).unwrap(); - - // Commit the block. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); + // Second block: consume note. + let block_num2 = BlockNumber::from(2u32); + let affected = apply_committed_block( + conn, + &CommittedBlockEffects { + header: mock_block_header(block_num2), + network_notes: vec![], + nullifiers: vec![nullifier], + network_account_updates: vec![], + }, + ) + .unwrap(); - // Note should still exist but be marked as committed. - assert_eq!(count_notes(conn), 1); + // Note should be marked committed at block 2 (consumed). let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) .unwrap() .unwrap(); - assert_eq!(row.committed_at, Some(conversions::block_num_to_i64(block_num))); - assert!(row.consumed_by.is_some()); -} + assert_eq!(row.committed_at, Some(conversions::block_num_to_i64(block_num2))); -#[test] -fn block_committed_promotes_inflight_account_to_committed() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let account = mock_account(account_id); - - // Insert committed account. - upsert_committed_account(conn, account_id, &account).unwrap(); - assert_eq!(count_committed_accounts(conn), 1); - - // Insert inflight row. - let tx_id = mock_tx_id(1); - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); - - assert_eq!(count_inflight_accounts(conn), 1); - assert_eq!(count_committed_accounts(conn), 1); - - // Commit the block. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // Should have 1 committed and 0 inflight. - assert_eq!(count_committed_accounts(conn), 1); - assert_eq!(count_inflight_accounts(conn), 0); -} - -// GET COMMITTED ACCOUNT TESTS -// ================================================================================================ - -#[test] -fn get_committed_account_ignores_inflight() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let account = mock_account(account_id); - - // Insert only an inflight account row (simulating account creation). - let tx_id = mock_tx_id(1); - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); - - // get_committed_account should return None (only inflight exists). - let result = get_committed_account(conn, account_id).unwrap(); - assert!(result.is_none()); - - // Commit the block to promote inflight to committed. - let block_num = BlockNumber::from(1u32); - let header = mock_block_header(block_num); - commit_block(conn, &[tx_id], block_num, &header).unwrap(); - - // Now get_committed_account should return the account. - let result = get_committed_account(conn, account_id).unwrap(); - assert!(result.is_some()); -} - -// HANDLE TRANSACTIONS REVERTED TESTS -// ================================================================================================ - -#[test] -fn transactions_reverted_restores_consumed_notes() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - - // Insert committed note. - insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); - - // Consume it via a transaction. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note.as_note().nullifier()]).unwrap(); - - // Verify consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_some()); - - // Revert the transaction. - revert_transaction(conn, &[tx_id]).unwrap(); - - // Note should be un-consumed. - let consumed: Option> = schema::notes::table - .find(conversions::nullifier_to_bytes(¬e.as_note().nullifier())) - .select(schema::notes::consumed_by) - .first(conn) - .unwrap(); - assert!(consumed.is_none()); -} - -#[test] -fn transactions_reverted_deletes_inflight_created_notes() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let tx_id = mock_tx_id(1); - let note = mock_single_target_note(account_id, 10); - - // Add transaction that creates a note. - add_transaction(conn, &tx_id, None, std::slice::from_ref(¬e), &[]).unwrap(); - assert_eq!(count_notes(conn), 1); - - // Revert the transaction. - revert_transaction(conn, &[tx_id]).unwrap(); - - // Inflight-created note should be deleted. - assert_eq!(count_notes(conn), 0); + // Account whose note was consumed should be in affected list. + assert!(affected.contains(&account_id)); } #[test] -fn transactions_reverted_reports_reverted_account_creations() { +fn apply_committed_block_advances_chain_state() { let (conn, _dir) = &mut test_conn(); - let account_id = mock_network_account_id(); - let account = mock_account(account_id); - let tx_id = mock_tx_id(1); - - // Insert an inflight account row (simulating account creation by tx). - let row = AccountInsert { - account_id: conversions::network_account_id_to_bytes(account_id), - transaction_id: Some(conversions::transaction_id_to_bytes(&tx_id)), - account_data: conversions::account_to_bytes(&account), - }; - diesel::insert_into(schema::accounts::table).values(&row).execute(conn).unwrap(); + let block_num1 = BlockNumber::from(1u32); + let block_num2 = BlockNumber::from(2u32); - // Revert the transaction, account should be included in affected accounts. - let affected = revert_transaction(conn, &[tx_id]).unwrap(); - assert!(affected.contains(&account_id)); + apply_committed_block( + conn, + &CommittedBlockEffects { + header: mock_block_header(block_num1), + network_notes: vec![], + nullifiers: vec![], + network_account_updates: vec![], + }, + ) + .unwrap(); + apply_committed_block( + conn, + &CommittedBlockEffects { + header: mock_block_header(block_num2), + network_notes: vec![], + nullifiers: vec![], + network_account_updates: vec![], + }, + ) + .unwrap(); - // Account should be gone. - assert_eq!(count_accounts(conn), 0); + let stored = select_chain_state(conn).unwrap().unwrap(); + assert_eq!(stored.0, block_num2); } // AVAILABLE NOTES TESTS @@ -386,36 +153,30 @@ fn available_notes_filters_consumed_and_exceeded_attempts() { insert_committed_notes(conn, &[note_good.clone(), note_consumed.clone(), note_failed.clone()]) .unwrap(); - // Consume one note. - let tx_id = mock_tx_id(1); - add_transaction(conn, &tx_id, None, &[], &[note_consumed.as_note().nullifier()]).unwrap(); + // Mark one as consumed by setting committed_at directly (simulates a later block). + diesel::update( + schema::notes::table + .find(conversions::nullifier_to_bytes(¬e_consumed.as_note().nullifier())), + ) + .set(schema::notes::committed_at.eq(Some(1i64))) + .execute(conn) + .unwrap(); // Mark one note as failed many times (exceed max_attempts=3). let block_num = BlockNumber::from(100u32); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); - notes_failed( - conn, - &[(note_failed.as_note().nullifier(), test_note_error("test error"))], - block_num, - ) - .unwrap(); + for _ in 0..3 { + notes_failed( + conn, + &[(note_failed.as_note().nullifier(), test_note_error("test error"))], + block_num, + ) + .unwrap(); + } // Query available notes with max_attempts=3. let result = available_notes(conn, account_id, block_num, 3).unwrap(); - // Only note_good should be available (note_consumed is consumed, note_failed exceeded - // attempts). + // Only note_good should be available. assert_eq!(result.len(), 1); assert_eq!(result[0].as_note().nullifier(), note_good.as_note().nullifier()); } @@ -490,20 +251,21 @@ fn get_note_status_returns_latest_error() { insert_committed_notes(conn, std::slice::from_ref(¬e)).unwrap(); // Initially no error, not consumed. - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - assert!(result.is_some()); - let row = result.unwrap(); + let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) + .unwrap() + .unwrap(); assert!(row.last_error.is_none()); assert_eq!(row.attempt_count, 0); - assert!(row.consumed_by.is_none()); + assert!(row.committed_at.is_none()); // Mark as failed. let block_num = BlockNumber::from(5u32); notes_failed(conn, &[(note.as_note().nullifier(), test_note_error("first error"))], block_num) .unwrap(); - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - let row = result.unwrap(); + let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) + .unwrap() + .unwrap(); assert_eq!(row.last_error.as_deref(), Some("first error")); assert_eq!(row.attempt_count, 1); @@ -515,8 +277,9 @@ fn get_note_status_returns_latest_error() { ) .unwrap(); - let result = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)).unwrap(); - let row = result.unwrap(); + let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) + .unwrap() + .unwrap(); assert_eq!(row.last_error.as_deref(), Some("second error")); assert_eq!(row.attempt_count, 2); } @@ -530,39 +293,6 @@ fn get_note_status_returns_none_for_unknown_note() { assert!(result.is_none()); } -#[test] -fn get_note_status_includes_consumed_by() { - let (conn, _dir) = &mut test_conn(); - - let account_id = mock_network_account_id(); - let note = mock_single_target_note(account_id, 10); - let note_id = note.as_note().id(); - - // Insert as committed note. - insert_committed_notes(conn, &[note]).unwrap(); - - // Initially consumed_by is NULL. - let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) - .unwrap() - .unwrap(); - assert!(row.consumed_by.is_none()); - - // Simulate consumption by setting consumed_by to a dummy transaction ID. - let dummy_tx_id = vec![42u8; 32]; - diesel::update( - schema::notes::table - .filter(schema::notes::note_id.eq(conversions::note_id_to_bytes(¬e_id))), - ) - .set(schema::notes::consumed_by.eq(Some(&dummy_tx_id))) - .execute(conn) - .unwrap(); - - let row = get_note_status(conn, &conversions::note_id_to_bytes(¬e_id)) - .unwrap() - .unwrap(); - assert_eq!(row.consumed_by, Some(dummy_tx_id)); -} - // CHAIN STATE TESTS // ================================================================================================ @@ -584,11 +314,16 @@ fn upsert_chain_state_updates_singleton() { assert_eq!(row_count, 1); // Should have the latest block number. - let stored_block_num: i64 = schema::chain_state::table - .select(schema::chain_state::block_num) - .first(conn) - .unwrap(); - assert_eq!(stored_block_num, conversions::block_num_to_i64(block_num_2)); + let stored = select_chain_state(conn).unwrap().unwrap(); + assert_eq!(stored.0, block_num_2); +} + +#[test] +fn select_chain_state_returns_none_for_empty_db() { + let (conn, _dir) = &mut test_conn(); + + let result = select_chain_state(conn).unwrap(); + assert!(result.is_none()); } // NOTE SCRIPT TESTS diff --git a/bin/ntx-builder/src/db/schema.rs b/bin/ntx-builder/src/db/schema.rs index 9797dca10a..f9f17fc826 100644 --- a/bin/ntx-builder/src/db/schema.rs +++ b/bin/ntx-builder/src/db/schema.rs @@ -1,11 +1,9 @@ // @generated automatically by Diesel CLI. diesel::table! { - accounts (order_id) { - order_id -> Integer, + accounts (account_id) { account_id -> Binary, account_data -> Binary, - transaction_id -> Nullable, } } @@ -33,8 +31,6 @@ diesel::table! { attempt_count -> Integer, last_attempt -> Nullable, last_error -> Nullable, - created_by -> Nullable, - consumed_by -> Nullable, committed_at -> Nullable, } } diff --git a/bin/ntx-builder/src/lib.rs b/bin/ntx-builder/src/lib.rs index ac4d770c04..3fa7c59e25 100644 --- a/bin/ntx-builder/src/lib.rs +++ b/bin/ntx-builder/src/lib.rs @@ -5,14 +5,14 @@ use std::time::Duration; use actor::{AccountActorContext, ActorConfig, GrpcClients, State}; use anyhow::Context; -use builder::MempoolEventStream; +use builder::BlockStream; use chain_state::SharedChainState; use clients::{BlockProducerClient, StoreClient, ValidatorClient}; use coordinator::Coordinator; use db::Db; -use futures::TryStreamExt; use miden_node_utils::ErrorReport; use miden_node_utils::lru_cache::LruCache; +use miden_protocol::block::BlockNumber; use miden_remote_prover_client::RemoteTransactionProver; use tokio::sync::mpsc; use url::Url; @@ -23,6 +23,7 @@ mod actor; mod builder; mod chain_state; mod clients; +mod committed_block; mod coordinator; pub(crate) mod db; pub mod server; @@ -72,6 +73,13 @@ const DEFAULT_MAX_ACCOUNT_CRASHES: usize = 10; /// `1 << 29` but network transactions should be much cheaper. const DEFAULT_MAX_TX_CYCLES: u32 = 1 << 19; +/// Default number of blocks after which a submitted network transaction expires. +/// +/// Short expiry means: if a tx doesn't get included into a block within this many blocks of +/// the chain tip at submission time, it is dropped. The ntx-builder can then construct a new tx +/// without waiting on the mempool to time out. +const DEFAULT_TX_EXPIRATION_DELTA: u16 = 5; + // CONFIGURATION // ================================================================================================= @@ -130,6 +138,12 @@ pub struct NtxBuilderConfig { /// Defaults to 2^18 cycles. pub max_cycles: u32, + /// Expiration delta (in blocks) applied to every submitted network transaction. + /// + /// Transactions expire if they are not included within this many blocks of the reference + /// block they were executed against. + pub tx_expiration_delta: u16, + /// Path to the SQLite database file used for persistent state. pub database_filepath: PathBuf, @@ -158,11 +172,19 @@ impl NtxBuilderConfig { idle_timeout: DEFAULT_IDLE_TIMEOUT, max_account_crashes: DEFAULT_MAX_ACCOUNT_CRASHES, max_cycles: DEFAULT_MAX_TX_CYCLES, + tx_expiration_delta: DEFAULT_TX_EXPIRATION_DELTA, database_filepath, sqlite_connection_pool_size: miden_node_db::default_connection_pool_size(), } } + /// Sets the transaction expiration delta (in blocks). + #[must_use] + pub fn with_tx_expiration_delta(mut self, delta: u16) -> Self { + self.tx_expiration_delta = delta; + self + } + /// Sets the remote transaction prover URL. /// /// If not set, transactions will be proven locally. @@ -256,14 +278,15 @@ impl NtxBuilderConfig { /// Builds and initializes the network transaction builder. /// - /// This method connects to the store and block producer services, fetches the current - /// chain tip, and subscribes to mempool events. + /// This method connects to the store services, determines the catch-up target block, and + /// opens a committed-block subscription. The catch-up phase itself runs inside + /// [`NetworkTransactionBuilder::run`]. /// /// # Errors /// /// Returns an error if: /// - The store connection fails - /// - The mempool subscription fails (after retries) + /// - The block subscription cannot be opened (after retries) /// - The store contains no blocks (not bootstrapped) pub async fn build(self) -> anyhow::Result { // Set up the database (bootstrap + connection pool). @@ -273,9 +296,6 @@ impl NtxBuilderConfig { ) .await?; - // Purge inflight state from previous run. - db.purge_inflight().await.context("failed to purge inflight state")?; - let script_cache = LruCache::new(self.script_cache_size); let coordinator = Coordinator::new(self.max_concurrent_txs, self.max_account_crashes, db.clone()); @@ -285,25 +305,45 @@ impl NtxBuilderConfig { let validator = ValidatorClient::new(self.validator_url.clone()); let prover = self.tx_prover_url.clone().map(RemoteTransactionProver::new); - // Subscribe to mempool first to ensure we don't miss any events. The subscription - // replays all inflight transactions, so the subscriber's state is fully reconstructed. - let subscription = block_producer - .subscribe_to_mempool_with_retry() - .await - .map_err(|err| anyhow::anyhow!(err)) - .context("failed to subscribe to mempool events")?; - let mempool_events: MempoolEventStream = Box::pin(subscription.into_stream()); - + // Fetch the current chain tip + MMR. This is needed as the catch-up target and as the + // initial in-memory chain state used by actors. let (chain_tip_header, chain_mmr) = store .get_latest_blockchain_data_with_retry() .await? .context("store should contain a latest block")?; + let chain_tip_block_num = chain_tip_header.block_num(); + + // Resume from where we left off. If the DB has no chain state yet, we initialize it + // from the current chain tip. + // Existing DBs resume from their persisted block; the catch-up phase then drains the + // stream until the in-memory chain state reaches the current tip. + let stored_chain_state = + db.get_chain_state().await.context("failed to read chain state")?; + + let (block_from, last_applied_block) = + resume_point(stored_chain_state.as_ref().map(|(num, _)| *num), chain_tip_block_num); + + if stored_chain_state.is_none() { + db.upsert_chain_state(chain_tip_block_num, chain_tip_header.clone()) + .await + .context("failed to upsert chain state")?; + } + + tracing::info!( + %block_from, + %chain_tip_block_num, + "ntx-builder opening block subscription" + ); - // Store the chain tip in the DB. - db.upsert_chain_state(chain_tip_header.block_num(), chain_tip_header.clone()) + let block_stream_inner = store + .block_subscription_with_retry(block_from) .await - .context("failed to upsert chain state")?; + .map_err(|err| anyhow::anyhow!(err)) + .context("failed to subscribe to committed blocks")?; + let block_stream: BlockStream = Box::pin(block_stream_inner); + // Chain state is initialized at the chain tip, actors only start after catch-up, so the + // tip is consistent with the DB by the time they run. let chain_state = Arc::new(SharedChainState::new(chain_tip_header, chain_mmr)); let (request_tx, actor_request_rx) = mpsc::channel(1); @@ -325,6 +365,8 @@ impl NtxBuilderConfig { max_note_attempts: self.max_note_attempts, idle_timeout: self.idle_timeout, max_cycles: self.max_cycles, + expiration_script: actor::compile_expiration_tx_script(self.tx_expiration_delta), + tx_expiration_delta: self.tx_expiration_delta, }, request_tx, }; @@ -336,8 +378,69 @@ impl NtxBuilderConfig { db, chain_state, actor_context, - mempool_events, + block_stream, + chain_tip_block_num, + last_applied_block, actor_request_rx, )) } } + +// HELPERS +// ================================================================================================= + +/// Decides where the ntx-builder should start consuming the block stream from on startup. +/// +/// Returns `(block_from, last_applied_block)`: +/// - `block_from` is the first block number the subscription should yield (inclusive). +/// - `last_applied_block` is the highest block already reflected in the DB. The catch-up phase +/// drains the stream until this reaches the chain tip. +/// +/// If the DB has a persisted chain state, resume from the block after it. Otherwise the DB is +/// fresh and we treat the current chain tip as already applied (the caller is responsible for +/// persisting that state). +fn resume_point(stored: Option, chain_tip: BlockNumber) -> (BlockNumber, BlockNumber) { + match stored { + Some(num) => (num.child(), num), + None => (chain_tip.child(), chain_tip), + } +} + +#[cfg(test)] +mod tests { + use miden_protocol::block::BlockNumber; + + use super::resume_point; + + #[test] + fn resume_point_fresh_db_starts_after_chain_tip() { + let tip = BlockNumber::from(10u32); + + let (block_from, last_applied) = resume_point(None, tip); + + assert_eq!(last_applied, tip); + assert_eq!(block_from, tip.child()); + } + + #[test] + fn resume_point_existing_db_resumes_after_stored_block() { + let tip = BlockNumber::from(10u32); + let stored = BlockNumber::from(7u32); + + let (block_from, last_applied) = resume_point(Some(stored), tip); + + assert_eq!(last_applied, stored); + assert_eq!(block_from, stored.child()); + } + + #[test] + fn resume_point_db_already_at_tip() { + let tip = BlockNumber::from(10u32); + + let (block_from, last_applied) = resume_point(Some(tip), tip); + + assert_eq!(last_applied, tip); + assert_eq!(block_from, tip.child()); + // Catch-up loop terminates immediately because last_applied >= target. + } +} diff --git a/bin/ntx-builder/src/server.rs b/bin/ntx-builder/src/server.rs index 12ef5bfa63..71bc3cd208 100644 --- a/bin/ntx-builder/src/server.rs +++ b/bin/ntx-builder/src/server.rs @@ -86,7 +86,6 @@ impl api_server::Api for NtxBuilderRpcServer { let status = derive_status( row.committed_at.is_some(), - row.consumed_by.is_some(), row.attempt_count as usize, self.max_note_attempts, ); @@ -108,14 +107,11 @@ impl api_server::Api for NtxBuilderRpcServer { /// Derives the lifecycle status of a network note from its DB state. fn derive_status( is_committed: bool, - is_consumed: bool, attempt_count: usize, max_note_attempts: usize, ) -> rpc::NetworkNoteStatus { if is_committed { rpc::NetworkNoteStatus::NullifierCommitted - } else if is_consumed { - rpc::NetworkNoteStatus::NullifierInflight } else if attempt_count >= max_note_attempts { rpc::NetworkNoteStatus::Discarded } else { @@ -131,30 +127,22 @@ mod tests { #[test] fn derive_status_pending() { - assert_eq!(derive_status(false, false, 0, 30), NetworkNoteStatus::Pending); - assert_eq!(derive_status(false, false, 15, 30), NetworkNoteStatus::Pending); - assert_eq!(derive_status(false, false, 29, 30), NetworkNoteStatus::Pending); - } - - #[test] - fn derive_status_processed() { - assert_eq!(derive_status(false, true, 0, 30), NetworkNoteStatus::NullifierInflight); - assert_eq!(derive_status(false, true, 5, 30), NetworkNoteStatus::NullifierInflight); - // consumed_by takes precedence over attempt count - assert_eq!(derive_status(false, true, 30, 30), NetworkNoteStatus::NullifierInflight); + assert_eq!(derive_status(false, 0, 30), NetworkNoteStatus::Pending); + assert_eq!(derive_status(false, 15, 30), NetworkNoteStatus::Pending); + assert_eq!(derive_status(false, 29, 30), NetworkNoteStatus::Pending); } #[test] fn derive_status_discarded() { - assert_eq!(derive_status(false, false, 30, 30), NetworkNoteStatus::Discarded); - assert_eq!(derive_status(false, false, 100, 30), NetworkNoteStatus::Discarded); + assert_eq!(derive_status(false, 30, 30), NetworkNoteStatus::Discarded); + assert_eq!(derive_status(false, 100, 30), NetworkNoteStatus::Discarded); } #[test] fn derive_status_committed() { - assert_eq!(derive_status(true, true, 0, 30), NetworkNoteStatus::NullifierCommitted); - assert_eq!(derive_status(true, true, 5, 30), NetworkNoteStatus::NullifierCommitted); - // committed takes precedence over everything - assert_eq!(derive_status(true, false, 30, 30), NetworkNoteStatus::NullifierCommitted); + assert_eq!(derive_status(true, 0, 30), NetworkNoteStatus::NullifierCommitted); + assert_eq!(derive_status(true, 5, 30), NetworkNoteStatus::NullifierCommitted); + // committed takes precedence over attempt count + assert_eq!(derive_status(true, 30, 30), NetworkNoteStatus::NullifierCommitted); } } diff --git a/bin/ntx-builder/src/test_utils.rs b/bin/ntx-builder/src/test_utils.rs index 1c2c743106..730e9bc673 100644 --- a/bin/ntx-builder/src/test_utils.rs +++ b/bin/ntx-builder/src/test_utils.rs @@ -3,12 +3,14 @@ use miden_node_proto::domain::account::NetworkAccountId; use miden_protocol::Word; use miden_protocol::account::{AccountId, AccountStorageMode, AccountType}; -use miden_protocol::block::BlockNumber; +use miden_protocol::block::{BlockBody, BlockNumber, SignedBlock}; +use miden_protocol::note::Nullifier; use miden_protocol::testing::account_id::{ ACCOUNT_ID_REGULAR_NETWORK_ACCOUNT_IMMUTABLE_CODE, AccountIdBuilder, }; -use miden_protocol::transaction::TransactionId; +use miden_protocol::testing::random_secret_key::random_secret_key; +use miden_protocol::transaction::{OrderedTransactionHeaders, OutputNote, PublicOutputNote}; use miden_standards::note::{AccountTargetNetworkNote, NetworkAccountTarget, NoteExecutionHint}; use miden_standards::testing::note::NoteBuilder; use rand_chacha::ChaCha20Rng; @@ -30,16 +32,6 @@ pub fn mock_network_account_id_seeded(seed: u8) -> NetworkAccountId { NetworkAccountId::try_from(account_id).unwrap() } -/// Creates a unique `TransactionId` from a seed value. -pub fn mock_tx_id(seed: u64) -> TransactionId { - use miden_protocol::testing::account_id::ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET; - - let w = |n: u64| Word::try_from([n, 0, 0, 0]).unwrap(); - let faucet_id = AccountId::try_from(ACCOUNT_ID_PUBLIC_FUNGIBLE_FAUCET).unwrap(); - let fee = miden_protocol::asset::FungibleAsset::new(faucet_id, 0).unwrap(); - TransactionId::new(w(seed), w(seed + 1), w(seed + 2), w(seed + 3), fee) -} - /// Creates a `AccountTargetNetworkNote` targeting the given network account. pub fn mock_single_target_note( network_account_id: NetworkAccountId, @@ -59,24 +51,48 @@ pub fn mock_single_target_note( AccountTargetNetworkNote::try_from(note).expect("note should be single-target network note") } -/// Creates a mock `Account` for a network account. -/// -/// Uses `AccountBuilder` with minimal components needed for serialization. -pub fn mock_account(_account_id: NetworkAccountId) -> miden_protocol::account::Account { - use miden_protocol::account::AccountBuilder; - use miden_protocol::testing::noop_auth_component::NoopAuthComponent; - use miden_standards::testing::account_component::MockAccountComponent; - - AccountBuilder::new([0u8; 32]) - .account_type(AccountType::RegularAccountImmutableCode) - .storage_mode(AccountStorageMode::Network) - .with_component(MockAccountComponent::with_slots(vec![])) - .with_auth_component(NoopAuthComponent) - .build_existing() - .unwrap() -} - /// Creates a mock `BlockHeader` for the given block number. pub fn mock_block_header(block_num: BlockNumber) -> miden_protocol::block::BlockHeader { miden_protocol::block::BlockHeader::mock(block_num, None, None, &[], Word::default()) } + +/// Creates a mock [`SignedBlock`] for the given block number containing the provided network notes +/// and nullifiers. +/// +/// The block is built with `SignedBlock::new_unchecked`, so the signature is generated against an +/// independent key (not the validator key in the mock header). Suitable for code paths that only +/// observe the block via [`crate::committed_block::CommittedBlockEffects::from_signed_block`]. +pub fn mock_signed_block( + block_num: BlockNumber, + network_notes: &[AccountTargetNetworkNote], + nullifiers: Vec, +) -> SignedBlock { + let header = mock_block_header(block_num); + + let output_notes: Vec<(usize, OutputNote)> = network_notes + .iter() + .enumerate() + .map(|(idx, note)| { + let public = PublicOutputNote::new(note.as_note().clone()) + .expect("network note should be public"); + (idx, OutputNote::Public(public)) + }) + .collect(); + + let output_note_batches = if output_notes.is_empty() { + Vec::new() + } else { + vec![output_notes] + }; + + let body = BlockBody::new_unchecked( + Vec::new(), + output_note_batches, + nullifiers, + OrderedTransactionHeaders::new_unchecked(Vec::new()), + ); + + let signature = random_secret_key().sign(header.commitment()); + + SignedBlock::new_unchecked(header, body, signature) +} diff --git a/crates/store/src/server/ntx_builder.rs b/crates/store/src/server/ntx_builder.rs index 3b114da5f0..badf1ea119 100644 --- a/crates/store/src/server/ntx_builder.rs +++ b/crates/store/src/server/ntx_builder.rs @@ -7,7 +7,7 @@ use miden_node_proto::domain::account::AccountInfo; use miden_node_proto::errors::ConversionError; use miden_node_proto::generated as proto; use miden_node_proto::generated::rpc::BlockRange; -use miden_node_proto::generated::store::ntx_builder_server; +use miden_node_proto::generated::store::{BlockSubscriptionRequest, ntx_builder_server}; use miden_node_utils::ErrorReport; use miden_protocol::account::{StorageMapKey, StorageSlotName}; use miden_protocol::asset::AssetVaultKey; @@ -25,6 +25,7 @@ use crate::errors::{ GetWitnessesError, }; use crate::server::api::{StoreApi, internal_error, invalid_argument}; +use crate::server::replica::BlockSubscriptionStream; use crate::state::Finality; // NTX BUILDER ENDPOINTS @@ -32,14 +33,16 @@ use crate::state::Finality; #[tonic::async_trait] impl ntx_builder_server::NtxBuilder for StoreApi { - /// Returns block header for the specified block number. - /// - /// If the block number is not provided, block header for the latest block is returned. - async fn get_block_header_by_number( + type BlockSubscriptionStream = BlockSubscriptionStream; + + /// See [`StoreApi::block_subscription_inner`] — same semantics as + /// `StoreReplica::BlockSubscription`. Declared on this service so the ntx-builder can drive + /// its committed-block subscription through a single client. + async fn block_subscription( &self, - request: Request, - ) -> Result, Status> { - self.get_block_header_by_number_inner(request).await + request: Request, + ) -> Result, Status> { + self.block_subscription_inner(request) } /// Returns the chain tip's header and MMR peaks corresponding to that header. diff --git a/crates/store/src/server/replica.rs b/crates/store/src/server/replica.rs index 5f0162262b..7d27c8855f 100644 --- a/crates/store/src/server/replica.rs +++ b/crates/store/src/server/replica.rs @@ -42,34 +42,29 @@ impl Stream for GuardedStream { // STORE REPLICA API // ================================================================================================ -#[tonic::async_trait] -impl store_replica_server::StoreReplica for StoreApi { - type BlockSubscriptionStream = Pin< - Box< - dyn tonic::codegen::tokio_stream::Stream> - + Send - + 'static, - >, - >; - - type ProofSubscriptionStream = Pin< - Box< - dyn tonic::codegen::tokio_stream::Stream> - + Send - + 'static, - >, - >; +/// Boxed, pinned stream of committed blocks. Shared concrete type behind both +/// [`store_replica_server::StoreReplica::BlockSubscriptionStream`] and +/// [`ntx_builder_server::NtxBuilder::BlockSubscriptionStream`]. +pub(super) type BlockSubscriptionStream = Pin< + Box< + dyn tonic::codegen::tokio_stream::Stream> + + Send + + 'static, + >, +>; - /// Streams committed blocks to a replica starting from `from_block_number`. +impl StoreApi { + /// Shared implementation for the `BlockSubscription` RPC, used by both the `StoreReplica` and + /// `NtxBuilder` gRPC services. /// /// Subscribes to the committed-tip watch channel and maintains a sequential counter. On each /// tip advance it emits all blocks from the current position up to the new tip, falling back to /// the block store for any entry not in the in-memory cache. The stream closes only when the /// client disconnects or the server shuts down. - async fn block_subscription( + pub(super) fn block_subscription_inner( &self, request: Request, - ) -> Result, Status> { + ) -> Result, Status> { let permit = Arc::clone(&self.block_subscription_semaphore) .try_acquire_owned() .map_err(|_| Status::resource_exhausted("maximum block subscriptions reached"))?; @@ -84,6 +79,26 @@ impl store_replica_server::StoreReplica for StoreApi { ); Ok(Response::new(Box::pin(GuardedStream { inner: stream, _permit: permit }))) } +} + +#[tonic::async_trait] +impl store_replica_server::StoreReplica for StoreApi { + type BlockSubscriptionStream = BlockSubscriptionStream; + + type ProofSubscriptionStream = Pin< + Box< + dyn tonic::codegen::tokio_stream::Stream> + + Send + + 'static, + >, + >; + + async fn block_subscription( + &self, + request: Request, + ) -> Result, Status> { + self.block_subscription_inner(request) + } /// Streams block proofs to a replica starting from `from_block_number`. /// diff --git a/proto/proto/internal/store.proto b/proto/proto/internal/store.proto index 04fb41bb25..6c3bdde72d 100644 --- a/proto/proto/internal/store.proto +++ b/proto/proto/internal/store.proto @@ -287,9 +287,12 @@ message BlockProof { // Store API for the network transaction builder component service NtxBuilder { - // Retrieves block header by given block number. Optionally, it also returns the MMR path - // and current chain length to authenticate the block's inclusion. - rpc GetBlockHeaderByNumber(rpc.BlockHeaderByNumberRequest) returns (rpc.BlockHeaderByNumberResponse) {} + // Streams committed blocks starting from the given block number (inclusive). + // + // Same semantics as StoreReplica.BlockSubscription (which actual store replicas use); declared + // on this service so the ntx-builder can drive its committed-block subscription through a + // single client. + rpc BlockSubscription(BlockSubscriptionRequest) returns (stream SignedBlock) {} // Returns a paginated list of unconsumed network notes. rpc GetUnconsumedNetworkNotes(UnconsumedNetworkNotesRequest) returns (UnconsumedNetworkNotes) {}