-
Notifications
You must be signed in to change notification settings - Fork 116
chore: ntx-builder subs to block stream + sync before execution #2085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
Changes from all commits
0362e42
509f4e2
b1a599a
e337b7d
67d113a
1bbb08a
339f0a7
6a31203
c9d19bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,14 +7,15 @@ use std::time::Duration; | |||||||||||||||||||||
|
|
||||||||||||||||||||||
| use anyhow::Context; | ||||||||||||||||||||||
| use candidate::TransactionCandidate; | ||||||||||||||||||||||
| pub use execute::compile_expiration_tx_script; | ||||||||||||||||||||||
| use futures::FutureExt; | ||||||||||||||||||||||
| use miden_node_proto::domain::account::NetworkAccountId; | ||||||||||||||||||||||
| use miden_node_utils::ErrorReport; | ||||||||||||||||||||||
| use miden_node_utils::lru_cache::LruCache; | ||||||||||||||||||||||
| use miden_protocol::Word; | ||||||||||||||||||||||
| use miden_protocol::block::BlockNumber; | ||||||||||||||||||||||
| use miden_protocol::note::{NoteScript, Nullifier}; | ||||||||||||||||||||||
| use miden_protocol::transaction::TransactionId; | ||||||||||||||||||||||
| use miden_protocol::transaction::TransactionScript; | ||||||||||||||||||||||
| use miden_remote_prover_client::RemoteTransactionProver; | ||||||||||||||||||||||
| use miden_tx::FailedNote; | ||||||||||||||||||||||
| use tokio::sync::{Notify, Semaphore, mpsc}; | ||||||||||||||||||||||
|
|
@@ -71,7 +72,7 @@ pub struct State { | |||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Per-actor configuration knobs. | ||||||||||||||||||||||
| #[derive(Debug, Clone, Copy)] | ||||||||||||||||||||||
| #[derive(Debug, Clone)] | ||||||||||||||||||||||
| pub struct ActorConfig { | ||||||||||||||||||||||
| /// Maximum number of notes per transaction. | ||||||||||||||||||||||
| pub max_notes_per_tx: NonZeroUsize, | ||||||||||||||||||||||
|
|
@@ -81,6 +82,12 @@ pub struct ActorConfig { | |||||||||||||||||||||
| pub idle_timeout: Duration, | ||||||||||||||||||||||
| /// Maximum number of VM execution cycles for network transactions. | ||||||||||||||||||||||
| pub max_cycles: u32, | ||||||||||||||||||||||
| /// Pre-compiled tx script that sets each submitted transaction's expiration block delta. | ||||||||||||||||||||||
| /// Built once at builder startup and shared by all actors. | ||||||||||||||||||||||
| pub expiration_script: TransactionScript, | ||||||||||||||||||||||
| /// Same delta encoded in [`Self::expiration_script`], retained so the actor can decide when a | ||||||||||||||||||||||
| /// submitted transaction must have either landed or been dropped. | ||||||||||||||||||||||
| pub tx_expiration_delta: u16, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // ACCOUNT ACTOR CONTEXT | ||||||||||||||||||||||
|
|
@@ -133,6 +140,8 @@ impl AccountActorContext { | |||||||||||||||||||||
| max_note_attempts: 1, | ||||||||||||||||||||||
| idle_timeout: Duration::from_secs(60), | ||||||||||||||||||||||
| max_cycles: 1 << 18, | ||||||||||||||||||||||
| expiration_script: crate::actor::compile_expiration_tx_script(5), | ||||||||||||||||||||||
| tx_expiration_delta: 5, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| request_tx, | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -147,7 +156,21 @@ impl AccountActorContext { | |||||||||||||||||||||
| enum ActorMode { | ||||||||||||||||||||||
| NoViableNotes, | ||||||||||||||||||||||
| NotesAvailable, | ||||||||||||||||||||||
| TransactionInflight(TransactionId), | ||||||||||||||||||||||
| /// The actor has just submitted a transaction and is waiting for it to either land on-chain or | ||||||||||||||||||||||
| /// expire before doing any more work for this account. This avoids busy-looping with the same | ||||||||||||||||||||||
| /// notes between submit and block commit. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// Carries the commitment of the account state we executed against and the chain tip at | ||||||||||||||||||||||
| /// submission time, so on each wake-up we can detect cheaply whether the tx landed (the | ||||||||||||||||||||||
| /// committed account commitment in the DB has moved) or definitely won't (the chain has | ||||||||||||||||||||||
| /// advanced past the expiration window). | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// Network accounts are only ever updated by ntx-builder transactions, so a commitment change | ||||||||||||||||||||||
| /// here is equivalent to "some submission for this account has been included in a block." | ||||||||||||||||||||||
| WaitForNextBlock { | ||||||||||||||||||||||
| submitted_commitment: Word, | ||||||||||||||||||||||
| submitted_at: BlockNumber, | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
|
Comment on lines
+168
to
+173
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have a problem with pass-through transactions maybe? As in, if the network account for whatever reason just acts as a pass-through then the commitment wouldn't change even though the transaction has been committed? Maybe a better model is
Suggested change
though this might mean we need to track the latest committed transaction ID for a given account? Or we need to allow inspection of the block by actors and our notify model is overly simplistic?
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something else.. We should probably store the transaction itself so we can apply its delta to the local account without having to hit the database again. |
||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // ACCOUNT ACTOR | ||||||||||||||||||||||
|
|
@@ -211,7 +234,7 @@ impl AccountActor { | |||||||||||||||||||||
| account_id, | ||||||||||||||||||||||
| clients: actor_context.clients.clone(), | ||||||||||||||||||||||
| state: actor_context.state.clone(), | ||||||||||||||||||||||
| config: actor_context.config, | ||||||||||||||||||||||
| config: actor_context.config.clone(), | ||||||||||||||||||||||
| notify, | ||||||||||||||||||||||
| request: actor_context.request_tx.clone(), | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -251,7 +274,7 @@ impl AccountActor { | |||||||||||||||||||||
| // Enable or disable transaction execution based on actor mode. | ||||||||||||||||||||||
| let tx_permit_acquisition = match mode { | ||||||||||||||||||||||
| // Disable transaction execution. | ||||||||||||||||||||||
| ActorMode::NoViableNotes | ActorMode::TransactionInflight(_) => { | ||||||||||||||||||||||
| ActorMode::NoViableNotes | ActorMode::WaitForNextBlock { .. } => { | ||||||||||||||||||||||
| std::future::pending().boxed() | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| // Enable transaction execution. | ||||||||||||||||||||||
|
|
@@ -266,34 +289,17 @@ impl AccountActor { | |||||||||||||||||||||
| }; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| tokio::select! { | ||||||||||||||||||||||
| // Handle coordinator notifications. On notification, re-evaluate state from DB. | ||||||||||||||||||||||
| // Handle coordinator notifications. Whether we re-evaluate depends on whether | ||||||||||||||||||||||
| // an in-flight submission still might land. | ||||||||||||||||||||||
| _ = self.notify.notified() => { | ||||||||||||||||||||||
| match mode { | ||||||||||||||||||||||
| ActorMode::TransactionInflight(awaited_id) => { | ||||||||||||||||||||||
| // Check DB: is the inflight tx still pending? | ||||||||||||||||||||||
| let exists = self | ||||||||||||||||||||||
| .state | ||||||||||||||||||||||
| .db | ||||||||||||||||||||||
| .transaction_exists(awaited_id) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .context("failed to check transaction status")?; | ||||||||||||||||||||||
| if exists { | ||||||||||||||||||||||
| mode = ActorMode::NotesAvailable; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| _ => { | ||||||||||||||||||||||
| mode = ActorMode::NotesAvailable; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| mode = self.on_notified(mode).await?; | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| // Execute transactions. | ||||||||||||||||||||||
| permit = tx_permit_acquisition => { | ||||||||||||||||||||||
| let _permit = permit.context("semaphore closed")?; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Read the chain state. | ||||||||||||||||||||||
| // Read the chain state and query the DB for an executable candidate. | ||||||||||||||||||||||
| let chain_state = self.state.chain.get_cloned(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Query DB for latest account and available notes. | ||||||||||||||||||||||
| let tx_candidate = self.select_candidate_from_db( | ||||||||||||||||||||||
| account_id, | ||||||||||||||||||||||
| chain_state, | ||||||||||||||||||||||
|
|
@@ -315,6 +321,47 @@ impl AccountActor { | |||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Decides the next mode when a coordinator notification arrives. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// If we are in [`ActorMode::WaitForNextBlock`], we only transition out once either: | ||||||||||||||||||||||
| /// - the committed account commitment in the DB no longer matches the one we executed against, | ||||||||||||||||||||||
| /// which means some network transaction for this account has been included in a block, or | ||||||||||||||||||||||
| /// - the chain has advanced past the expiration window, in which case the submission must be | ||||||||||||||||||||||
| /// considered dropped. | ||||||||||||||||||||||
| /// | ||||||||||||||||||||||
| /// While neither holds we stay in `WaitForNextBlock` so the actor does not re-execute and | ||||||||||||||||||||||
| /// re-submit the same notes every block. In any other mode we move to | ||||||||||||||||||||||
| /// [`ActorMode::NotesAvailable`] so the next loop iteration re-queries the DB. | ||||||||||||||||||||||
| async fn on_notified(&self, mode: ActorMode) -> anyhow::Result<ActorMode> { | ||||||||||||||||||||||
| let ActorMode::WaitForNextBlock { submitted_commitment, submitted_at } = mode else { | ||||||||||||||||||||||
| return Ok(ActorMode::NotesAvailable); | ||||||||||||||||||||||
| }; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let chain_tip = self.state.chain.chain_tip_block_number(); | ||||||||||||||||||||||
| let blocks_elapsed = chain_tip.as_u32().saturating_sub(submitted_at.as_u32()); | ||||||||||||||||||||||
| let expired = blocks_elapsed >= u32::from(self.config.tx_expiration_delta); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if expired { | ||||||||||||||||||||||
| // Submission can no longer be included; re-evaluate from scratch. | ||||||||||||||||||||||
| return Ok(ActorMode::NotesAvailable); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let current = self | ||||||||||||||||||||||
| .state | ||||||||||||||||||||||
| .db | ||||||||||||||||||||||
| .account_commitment(self.account_id) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .context("failed to read account commitment")?; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // The account row goes away only if the network account was removed entirely. Treat that | ||||||||||||||||||||||
| // as "something changed" so the next iteration re-evaluates (and almost certainly idles). | ||||||||||||||||||||||
| if current == Some(submitted_commitment) { | ||||||||||||||||||||||
| Ok(ActorMode::WaitForNextBlock { submitted_commitment, submitted_at }) | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| Ok(ActorMode::NotesAvailable) | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| /// Selects a transaction candidate by querying the DB. | ||||||||||||||||||||||
| async fn select_candidate_from_db( | ||||||||||||||||||||||
| &self, | ||||||||||||||||||||||
|
|
@@ -366,7 +413,7 @@ impl AccountActor { | |||||||||||||||||||||
| if self | ||||||||||||||||||||||
| .state | ||||||||||||||||||||||
| .db | ||||||||||||||||||||||
| .has_committed_account(account_id) | ||||||||||||||||||||||
| .has_account(account_id) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .context("failed to check for committed account")? | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
|
|
@@ -379,7 +426,7 @@ impl AccountActor { | |||||||||||||||||||||
| if self | ||||||||||||||||||||||
| .state | ||||||||||||||||||||||
| .db | ||||||||||||||||||||||
| .has_committed_account(account_id) | ||||||||||||||||||||||
| .has_account(account_id) | ||||||||||||||||||||||
| .await | ||||||||||||||||||||||
| .context("failed to check for committed account")? | ||||||||||||||||||||||
| { | ||||||||||||||||||||||
|
|
@@ -407,7 +454,11 @@ impl AccountActor { | |||||||||||||||||||||
| account_id: NetworkAccountId, | ||||||||||||||||||||||
| tx_candidate: TransactionCandidate, | ||||||||||||||||||||||
| ) -> ActorMode { | ||||||||||||||||||||||
| let block_num = tx_candidate.chain_tip_header.block_num(); | ||||||||||||||||||||||
| // Captured before execution so that `WaitForNextBlock` records the state the tx was run | ||||||||||||||||||||||
| // against even though `tx_candidate` is moved into the executor below. | ||||||||||||||||||||||
| let submitted_commitment = tx_candidate.account.to_commitment(); | ||||||||||||||||||||||
| let submitted_at = tx_candidate.chain_tip_header.block_num(); | ||||||||||||||||||||||
| let block_num = submitted_at; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // Execute the selected transaction. | ||||||||||||||||||||||
| let context = execute::NtxContext::new( | ||||||||||||||||||||||
|
|
@@ -418,6 +469,7 @@ impl AccountActor { | |||||||||||||||||||||
| self.state.script_cache.clone(), | ||||||||||||||||||||||
| self.state.db.clone(), | ||||||||||||||||||||||
| self.config.max_cycles, | ||||||||||||||||||||||
| self.config.expiration_script.clone(), | ||||||||||||||||||||||
| ); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| let notes = tx_candidate.notes.clone(); | ||||||||||||||||||||||
|
|
@@ -444,7 +496,7 @@ impl AccountActor { | |||||||||||||||||||||
| let failed_notes = log_failed_notes(failed); | ||||||||||||||||||||||
| self.mark_notes_failed(&failed_notes, block_num).await; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| ActorMode::TransactionInflight(tx_id) | ||||||||||||||||||||||
| ActorMode::WaitForNextBlock { submitted_commitment, submitted_at } | ||||||||||||||||||||||
| }, | ||||||||||||||||||||||
| // Transaction execution failed. | ||||||||||||||||||||||
| Err(err) => { | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not realize this had to be inserted as
masmcode 🤔 I somehow thought its something you specify on the transaction/builder itself.. But I guess it makes sense,