diff --git a/Cargo.lock b/Cargo.lock index 3480eb81e..6b7da367d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3625,20 +3625,15 @@ dependencies = [ "async-trait", "bincode", "lru 0.16.4", - "magicblock-accounts-db", "magicblock-chainlink", - "magicblock-committor-service", - "magicblock-config", "magicblock-core", "magicblock-ledger", "magicblock-magic-program-api", "magicblock-program", - "magicblock-rpc-client", "rand 0.9.4", "solana-account 3.4.0", "solana-hash 3.1.0", "solana-instruction 3.4.0", - "solana-loader-v3-interface 6.1.1", "solana-loader-v4-interface 3.1.0", "solana-pubkey 3.0.0", "solana-sdk-ids 3.1.0", @@ -3656,21 +3651,11 @@ name = "magicblock-accounts" version = "0.11.2" dependencies = [ "async-trait", - "magicblock-account-cloner", - "magicblock-accounts-db", - "magicblock-chainlink", "magicblock-committor-service", - "magicblock-core", - "magicblock-metrics", - "magicblock-program", - "solana-hash 3.1.0", "solana-pubkey 3.0.0", - "solana-transaction", "solana-transaction-error 3.2.0", "thiserror 2.0.18", "tokio", - "tokio-util", - "tracing", "url", ] @@ -3907,8 +3892,10 @@ dependencies = [ "bincode", "borsh", "futures-util", - "lazy_static", "lru 0.16.4", + "magicblock-account-cloner", + "magicblock-accounts-db", + "magicblock-chainlink", "magicblock-committor-program", "magicblock-core", "magicblock-delegation-program-api", @@ -3918,10 +3905,8 @@ dependencies = [ "magicblock-table-mania", "rand 0.9.4", "rusqlite", - "serde_json", "solana-account 3.4.0", "solana-account-decoder", - "solana-address-lookup-table-interface 3.1.0", "solana-commitment-config", "solana-compute-budget-interface", "solana-hash 3.1.0", @@ -3934,19 +3919,15 @@ dependencies = [ "solana-rpc-client-api", "solana-signature 3.4.0", "solana-signer", - "solana-system-program", "solana-transaction", "solana-transaction-error 3.2.0", "solana-transaction-status-client-types", - "static_assertions", "tempfile", "test-kit", "thiserror 2.0.18", "tokio", "tokio-util", "tracing", - "tracing-log", - "tracing-subscriber", ] [[package]] diff --git a/magicblock-account-cloner/Cargo.toml b/magicblock-account-cloner/Cargo.toml index 84e820f99..2c85fdf94 100644 --- a/magicblock-account-cloner/Cargo.toml +++ b/magicblock-account-cloner/Cargo.toml @@ -12,20 +12,15 @@ async-trait = { workspace = true } bincode = { workspace = true } lru = { workspace = true } tracing = { workspace = true } -magicblock-accounts-db = { workspace = true } magicblock-chainlink = { workspace = true } -magicblock-committor-service = { workspace = true } -magicblock-config = { workspace = true } magicblock-core = { workspace = true } magicblock-ledger = { workspace = true } magicblock-magic-program-api = { workspace = true } magicblock-program = { workspace = true } -magicblock-rpc-client = { workspace = true } rand = { workspace = true } solana-account = { workspace = true } solana-hash = { workspace = true } solana-instruction = { workspace = true } -solana-loader-v3-interface = { workspace = true } solana-loader-v4-interface = { workspace = true } solana-pubkey = { workspace = true } solana-sdk-ids = { workspace = true } @@ -35,8 +30,3 @@ solana-sysvar = { workspace = true } solana-transaction = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } - -[dev-dependencies] -magicblock-committor-service = { workspace = true, features = [ - "dev-context-only-utils", -] } diff --git a/magicblock-account-cloner/src/account_cloner.rs b/magicblock-account-cloner/src/account_cloner.rs deleted file mode 100644 index 67437653a..000000000 --- a/magicblock-account-cloner/src/account_cloner.rs +++ /dev/null @@ -1,68 +0,0 @@ -use std::sync::Arc; - -use magicblock_committor_service::{ - error::{CommittorServiceError, CommittorServiceResult}, - BaseIntentCommittor, -}; -use thiserror::Error; -use tokio::sync::oneshot; - -pub type AccountClonerResult = Result; - -#[derive(Debug, Clone, Error)] -pub enum AccountClonerError { - #[error(transparent)] - RecvError(#[from] tokio::sync::oneshot::error::RecvError), - - #[error("JoinError ({0})")] - JoinError(String), - - #[error("CommittorServiceError {0}")] - CommittorServiceError(String), -} - -pub async fn map_committor_request_result( - res: oneshot::Receiver>, - intent_committor: Arc, -) -> AccountClonerResult { - match res.await.map_err(|err| { - // Send request error - AccountClonerError::CommittorServiceError(format!( - "error sending request {err:?}" - )) - })? { - Ok(val) => Ok(val), - Err(err) => { - // Commit error - match err { - CommittorServiceError::TableManiaError(table_mania_err) => { - let Some(sig) = table_mania_err.signature() else { - return Err(AccountClonerError::CommittorServiceError( - format!("{:?}", table_mania_err), - )); - }; - let (logs, cus) = crate::util::get_tx_diagnostics( - &sig, - &intent_committor, - ) - .await; - - let cus_str = cus - .map(|cus| format!("{:?}", cus)) - .unwrap_or("N/A".to_string()); - let logs_str = logs - .map(|logs| format!("{:#?}", logs)) - .unwrap_or("N/A".to_string()); - Err(AccountClonerError::CommittorServiceError(format!( - "{:?}\nCUs: {cus_str}\nLogs: {logs_str}", - table_mania_err - ))) - } - _ => Err(AccountClonerError::CommittorServiceError(format!( - "{:?}", - err - ))), - } - } - } -} diff --git a/magicblock-account-cloner/src/lib.rs b/magicblock-account-cloner/src/lib.rs index 1adb4b4aa..43f13c9e3 100644 --- a/magicblock-account-cloner/src/lib.rs +++ b/magicblock-account-cloner/src/lib.rs @@ -73,10 +73,8 @@ use tracing::*; /// Max data that fits in a single transaction (~63KB) pub const MAX_INLINE_DATA_SIZE: usize = 63 * 1024; -mod account_cloner; mod util; -pub use account_cloner::*; pub use util::derive_buffer_pubkey; /// Keep only a bounded history of sent action tx signatures to avoid diff --git a/magicblock-account-cloner/src/util.rs b/magicblock-account-cloner/src/util.rs index 992508faa..fcfe876f6 100644 --- a/magicblock-account-cloner/src/util.rs +++ b/magicblock-account-cloner/src/util.rs @@ -1,10 +1,5 @@ -use std::sync::Arc; - -use magicblock_committor_service::BaseIntentCommittor; use magicblock_program::validator::validator_authority_id; -use magicblock_rpc_client::MagicblockRpcClient; use solana_pubkey::Pubkey; -use solana_signature::Signature; /// Seed for deriving buffer account PDA const BUFFER_SEED: &[u8] = b"buffer"; @@ -15,16 +10,3 @@ pub fn derive_buffer_pubkey(program_pubkey: &Pubkey) -> (Pubkey, u8) { let seeds: &[&[u8]] = &[BUFFER_SEED, program_pubkey.as_ref()]; Pubkey::find_program_address(seeds, &validator_authority_id()) } - -pub(crate) async fn get_tx_diagnostics( - sig: &Signature, - committor: &Arc, -) -> (Option>, Option) { - if let Ok(Ok(transaction)) = committor.get_transaction(sig).await { - let cus = MagicblockRpcClient::get_cus_from_transaction(&transaction); - let logs = MagicblockRpcClient::get_logs_from_transaction(&transaction); - (logs, cus) - } else { - (None, None) - } -} diff --git a/magicblock-accounts/Cargo.toml b/magicblock-accounts/Cargo.toml index aaf0879d4..5b48785a5 100644 --- a/magicblock-accounts/Cargo.toml +++ b/magicblock-accounts/Cargo.toml @@ -9,26 +9,10 @@ edition.workspace = true [dependencies] async-trait = { workspace = true } -tracing = { workspace = true } -magicblock-account-cloner = { workspace = true } -magicblock-accounts-db = { workspace = true } -magicblock-chainlink = { workspace = true } magicblock-committor-service = { workspace = true } -magicblock-core = { workspace = true } -magicblock-metrics = { workspace = true } -magicblock-program = { workspace = true } -solana-hash = { workspace = true } solana-pubkey = { workspace = true } solana-transaction-error = { workspace = true } -solana-transaction = { workspace = true } tokio = { workspace = true } -tokio-util = { workspace = true } thiserror = { workspace = true } url = { workspace = true } - -[dev-dependencies] -magicblock-committor-service = { workspace = true, features = [ - "dev-context-only-utils", -] } -tokio-util = { workspace = true } diff --git a/magicblock-accounts/src/errors.rs b/magicblock-accounts/src/errors.rs index 32823039b..3a206dffe 100644 --- a/magicblock-accounts/src/errors.rs +++ b/magicblock-accounts/src/errors.rs @@ -1,9 +1,7 @@ use std::collections::HashSet; -use magicblock_account_cloner::AccountClonerError; use magicblock_committor_service::{ - error::CommittorServiceError, service_ext::CommittorServiceExtError, - ChangesetMeta, + error::CommittorServiceError, ChangesetMeta, }; use solana_pubkey::Pubkey; use solana_transaction_error::TransactionError; @@ -23,15 +21,9 @@ pub enum AccountsError { #[error("CommittorSerivceError: {0}")] CommittorSerivceError(#[from] CommittorServiceError), - #[error("CommittorServiceExtError: {0}")] - CommittorServiceExtError(#[from] CommittorServiceExtError), - #[error("TokioOneshotRecvError")] TokioOneshotRecvError(#[from] Box), - #[error("AccountClonerError")] - AccountClonerError(#[from] AccountClonerError), - #[error("InvalidRpcUrl '{0}'")] InvalidRpcUrl(String), diff --git a/magicblock-accounts/src/lib.rs b/magicblock-accounts/src/lib.rs index d11a013b2..c7eac2118 100644 --- a/magicblock-accounts/src/lib.rs +++ b/magicblock-accounts/src/lib.rs @@ -1,6 +1,5 @@ mod config; pub mod errors; -pub mod scheduled_commits_processor; mod traits; pub use config::*; diff --git a/magicblock-accounts/src/scheduled_commits_processor.rs b/magicblock-accounts/src/scheduled_commits_processor.rs deleted file mode 100644 index 2818d0a47..000000000 --- a/magicblock-accounts/src/scheduled_commits_processor.rs +++ /dev/null @@ -1,351 +0,0 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::{Arc, Mutex}, -}; - -use async_trait::async_trait; -use magicblock_account_cloner::ChainlinkCloner; -use magicblock_chainlink::{ProdChainlink, ProdInnerChainlink}; -use magicblock_committor_service::{ - intent_execution_manager::BroadcastedIntentExecutionResult, - intent_executor::ExecutionOutput, BaseIntentCommittor, CommittorService, -}; -use magicblock_core::link::transactions::{ - with_encoded, TransactionSchedulerHandle, -}; -use magicblock_metrics::metrics; -use magicblock_program::{ - magic_scheduled_base_intent::ScheduledIntentBundle, - register_scheduled_commit_sent, SentCommit, TransactionScheduler, -}; -use solana_hash::Hash; -use solana_pubkey::Pubkey; -use solana_transaction::Transaction; -use tokio::{ - sync::{broadcast, oneshot}, - task, -}; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, instrument}; - -use crate::{ - errors::ScheduledCommitsProcessorResult, ScheduledCommitsProcessor, -}; - -const POISONED_MUTEX_MSG: &str = - "Mutex of RemoteScheduledCommitsProcessor.intents_meta_map is poisoned"; - -pub type InnerChainlinkImpl = ProdInnerChainlink; - -pub type ChainlinkImpl = ProdChainlink; - -pub struct ScheduledCommitsProcessorImpl { - committor: Arc, - chainlink: Arc, - cancellation_token: CancellationToken, - intents_meta_map: Arc>>, - transaction_scheduler: TransactionScheduler, -} - -impl ScheduledCommitsProcessorImpl { - pub fn new( - committor: Arc, - chainlink: Arc, - internal_transaction_scheduler: TransactionSchedulerHandle, - ) -> Self { - let result_subscriber = committor.subscribe_for_results(); - let intents_meta_map = Arc::new(Mutex::default()); - let cancellation_token = CancellationToken::new(); - tokio::spawn(Self::result_processor( - result_subscriber, - cancellation_token.clone(), - intents_meta_map.clone(), - internal_transaction_scheduler.clone(), - )); - - Self { - committor, - chainlink, - cancellation_token, - intents_meta_map, - transaction_scheduler: TransactionScheduler::default(), - } - } - - async fn process_undelegation_requests(&self, pubkeys: Vec) { - let mut join_set = task::JoinSet::new(); - for pubkey in pubkeys.into_iter() { - let chainlink = self.chainlink.clone(); - join_set.spawn(async move { - (pubkey, chainlink.undelegation_requested(pubkey).await) - }); - } - let sub_errors = join_set - .join_all() - .await - .into_iter() - .filter_map(|(pubkey, inner_result)| { - if let Err(err) = inner_result { - Some(format!( - "Subscribing to account {} failed: {}", - pubkey, err - )) - } else { - None - } - }) - .collect::>(); - if !sub_errors.is_empty() { - // Instead of aborting the entire commit we log an error here, however - // this means that the undelegated accounts stay in a problematic state - // in the validator and are not synced from chain. - // We could implement a retry mechanism inside of chainlink in the future. - error!( - error_count = sub_errors.len(), - "Failed to subscribe to accounts being undelegated" - ); - } - } - - #[instrument(skip( - result_subscriber, - cancellation_token, - intents_meta_map, - internal_transaction_scheduler - ))] - async fn result_processor( - result_subscriber: oneshot::Receiver< - broadcast::Receiver, - >, - cancellation_token: CancellationToken, - intents_meta_map: Arc>>, - internal_transaction_scheduler: TransactionSchedulerHandle, - ) { - const SUBSCRIPTION_ERR_MSG: &str = - "Failed to get subscription of results of BaseIntents execution"; - - let mut result_receiver = - result_subscriber.await.expect(SUBSCRIPTION_ERR_MSG); - loop { - let execution_result = tokio::select! { - biased; - _ = cancellation_token.cancelled() => { - info!("Shutting down"); - return; - } - execution_result = result_receiver.recv() => { - match execution_result { - Ok(result) => result, - Err(broadcast::error::RecvError::Closed) => { - info!("Intent execution service shut down"); - break; - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - // SAFETY: This shouldn't happen as our tx execution is faster than Intent execution on Base layer - // If this ever happens it requires investigation - error!(skipped_count = skipped, "Lagging behind intent execution"); - continue; - } - } - } - }; - - let intent_id = execution_result.id; - // Remove intent from metas - let intent_meta = if let Some(intent_meta) = intents_meta_map - .lock() - .expect(POISONED_MUTEX_MSG) - .remove(&intent_id) - { - intent_meta - } else { - // Possible if we have duplicate Intents - // First one will remove id from map and second could fail. - // This should not happen and needs investigation! - error!(intent_id, "Failed to find intent metadata"); - continue; - }; - - Self::process_intent_result( - intent_id, - &internal_transaction_scheduler, - execution_result, - intent_meta, - ) - .await; - } - } - - #[instrument( - skip(internal_transaction_scheduler, result, intent_meta), - fields(intent_id) - )] - async fn process_intent_result( - intent_id: u64, - internal_transaction_scheduler: &TransactionSchedulerHandle, - result: BroadcastedIntentExecutionResult, - mut intent_meta: ScheduledBaseIntentMeta, - ) { - let intent_sent_transaction = - std::mem::take(&mut intent_meta.intent_sent_transaction); - let sent_commit = - Self::build_sent_commit(intent_id, intent_meta, result); - register_scheduled_commit_sent(sent_commit); - let Ok(txn) = with_encoded(intent_sent_transaction) else { - // Unreachable case, all intent transactions are smaller than 64KB by construction - error!("Failed to bincode intent transaction"); - return; - }; - match internal_transaction_scheduler.execute(txn).await { - Ok(()) => { - debug!("Sent commit signaled") - } - Err(err) => { - error!(error = ?err, "Failed to signal sent commit"); - } - } - } - - fn build_sent_commit( - intent_id: u64, - intent_meta: ScheduledBaseIntentMeta, - result: BroadcastedIntentExecutionResult, - ) -> SentCommit { - let error_message = - result.as_ref().err().map(|err| format!("{:?}", err)); - let chain_signatures = match result.inner { - Ok(value) => match value { - ExecutionOutput::SingleStage(signature) => vec![signature], - ExecutionOutput::TwoStage { - commit_signature, - finalize_signature, - } => vec![commit_signature, finalize_signature], - }, - Err(err) => { - error!( - "Failed to commit intent: {}, slot: {}, blockhash: {}. {:?}", - intent_id, intent_meta.slot, intent_meta.blockhash, err - ); - err.signatures() - .map(|(commit, finalize)| { - finalize - .map(|finalize| vec![commit, finalize]) - .unwrap_or(vec![commit]) - }) - .unwrap_or_default() - } - }; - let patched_errors = result - .patched_errors - .iter() - .map(|err| { - info!("Patched intent: {}. error was: {}", intent_id, err); - err.to_string() - }) - .collect(); - - let callbacks_report = result - .callbacks_report - .iter() - .map(|r| match r { - Ok(sig) => { - format!("OK: {sig}") - } - Err(err) => { - error!( - "Callback failed to schedule: {}. error: {}", - intent_id, err - ); - format!("ERR: {err}") - } - }) - .collect(); - - SentCommit { - message_id: intent_id, - slot: intent_meta.slot, - blockhash: intent_meta.blockhash, - payer: intent_meta.payer, - chain_signatures, - included_pubkeys: intent_meta.included_pubkeys, - excluded_pubkeys: vec![], - requested_undelegation: intent_meta.requested_undelegation, - error_message, - patched_errors, - callbacks_scheduling_results: callbacks_report, - } - } -} - -#[async_trait] -impl ScheduledCommitsProcessor for ScheduledCommitsProcessorImpl { - #[instrument(skip(self))] - async fn process(&self) -> ScheduledCommitsProcessorResult<()> { - let intent_bundles = - self.transaction_scheduler.take_scheduled_intent_bundles(); - - if intent_bundles.is_empty() { - return Ok(()); - } - metrics::inc_committor_intents_count_by(intent_bundles.len() as u64); - - // Add metas for intent we schedule - let pubkeys_being_undelegated = { - let mut intent_metas = - self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); - let mut pubkeys_being_undelegated = HashSet::::new(); - - intent_bundles.iter().for_each(|intent| { - intent_metas - .insert(intent.id, ScheduledBaseIntentMeta::new(intent)); - if let Some(undelegate) = intent.get_undelegate_intent_pubkeys() - { - pubkeys_being_undelegated.extend(undelegate); - } - }); - - pubkeys_being_undelegated.into_iter().collect::>() - }; - - self.process_undelegation_requests(pubkeys_being_undelegated) - .await; - self.committor - .schedule_intent_bundles(intent_bundles) - .await??; - Ok(()) - } - - fn scheduled_commits_len(&self) -> usize { - self.transaction_scheduler.scheduled_actions_len() - } - - fn clear_scheduled_commits(&self) { - self.transaction_scheduler.clear_scheduled_actions(); - } - - fn stop(&self) { - self.cancellation_token.cancel(); - } -} - -struct ScheduledBaseIntentMeta { - slot: u64, - blockhash: Hash, - payer: Pubkey, - included_pubkeys: Vec, - intent_sent_transaction: Transaction, - requested_undelegation: bool, -} - -impl ScheduledBaseIntentMeta { - fn new(intent: &ScheduledIntentBundle) -> Self { - Self { - slot: intent.slot, - blockhash: intent.blockhash, - payer: intent.payer, - included_pubkeys: intent.get_all_committed_pubkeys(), - intent_sent_transaction: intent.sent_transaction.clone(), - requested_undelegation: intent.has_undelegate_intent(), - } - } -} diff --git a/magicblock-api/src/errors.rs b/magicblock-api/src/errors.rs index 86c2c8834..4f80a6b72 100644 --- a/magicblock-api/src/errors.rs +++ b/magicblock-api/src/errors.rs @@ -1,4 +1,5 @@ use magicblock_accounts_db::error::AccountsDbError; +use magicblock_committor_service::service::IntentExecutionServiceError; use solana_pubkey::Pubkey; use thiserror::Error; @@ -15,9 +16,6 @@ pub enum ApiError { #[error("Accounts error: {0}")] AccountsError(Box), - #[error("AccountCloner error: {0}")] - AccountClonerError(Box), - #[error("Ledger error: {0}")] LedgerError(Box), @@ -41,6 +39,9 @@ pub enum ApiError { Box, ), + #[error("IntentExecutionServiceError: {0}")] + IntentExecutionServiceError(#[from] IntentExecutionServiceError), + #[error("Failed to load programs into bank: {0}")] FailedToLoadProgramsIntoBank(String), @@ -118,12 +119,6 @@ impl From for ApiError { } } -impl From for ApiError { - fn from(e: magicblock_account_cloner::AccountClonerError) -> Self { - Self::AccountClonerError(Box::new(e)) - } -} - impl From for ApiError { fn from(e: magicblock_ledger::errors::LedgerError) -> Self { Self::LedgerError(Box::new(e)) diff --git a/magicblock-api/src/magic_sys_adapter.rs b/magicblock-api/src/magic_sys_adapter.rs index 1e13b44a7..3347d4dfa 100644 --- a/magicblock-api/src/magic_sys_adapter.rs +++ b/magicblock-api/src/magic_sys_adapter.rs @@ -1,6 +1,9 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; -use magicblock_committor_service::CommittorService; +use magicblock_committor_service::{ + committor_processor::CommittorProcessor, + intent_executor::task_info_fetcher::TaskInfoFetcherResult, +}; use magicblock_core::{intent::CommittedAccount, traits::MagicSys}; use magicblock_metrics::metrics; use solana_instruction::error::InstructionError; @@ -9,7 +12,8 @@ use tracing::error; #[derive(Clone)] pub struct MagicSysAdapter { - committor_service: Option>, + handle: tokio::runtime::Handle, + committor_processor: Arc, } impl MagicSysAdapter { @@ -19,13 +23,43 @@ impl MagicSysAdapter { const TIMEOUT_ERR: u32 = 0xE000_0001; /// Returned when the fetch of current commit nonces fails. const FETCH_ERR: u32 = 0xE000_0002; - /// Returned when no committor service is configured. - const NO_COMMITTOR_ERR: u32 = 0xE000_0003; const FETCH_TIMEOUT: Duration = Duration::from_secs(30); - pub fn new(committor_service: Option>) -> Self { - Self { committor_service } + pub fn new( + handle: tokio::runtime::Handle, + committor_processor: Arc, + ) -> Self { + Self { + handle, + committor_processor, + } + } + + fn fetch_current_commit_nonces_sync( + &self, + pubkeys: &[Pubkey], + min_context_slot: u64, + ) -> std::sync::mpsc::Receiver>> + { + let (sender, receiver) = std::sync::mpsc::channel(); + let committor_processor = self.committor_processor.clone(); + let pubkeys = pubkeys.to_owned(); + + // This is required to switch from TransactionExecutor runtime + // blocking on it would cause a panic + let _guard = self.handle.enter(); + tokio::spawn(async move { + let result = committor_processor + .fetch_current_commit_nonces(&pubkeys, min_context_slot) + .await; + if let Err(err) = sender.send(result) { + error!(error = ?err, "Failed to send result back"); + } + }); + drop(_guard); + + receiver } } @@ -37,12 +71,6 @@ impl MagicSys for MagicSysAdapter { if commits.is_empty() { return Ok(HashMap::new()); } - let committor_service = - if let Some(committor_service) = &self.committor_service { - Ok(committor_service) - } else { - Err(InstructionError::Custom(Self::NO_COMMITTOR_ERR)) - }?; let min_context_slot = commits .iter() @@ -53,8 +81,8 @@ impl MagicSys for MagicSysAdapter { commits.iter().map(|account| account.pubkey).collect(); let _timer = metrics::start_fetch_commit_nonces_wait_timer(); - let receiver = committor_service - .fetch_current_commit_nonces_sync(&pubkeys, min_context_slot); + let receiver = + self.fetch_current_commit_nonces_sync(&pubkeys, min_context_slot); receiver .recv_timeout(Self::FETCH_TIMEOUT) .map_err(|err| match err { diff --git a/magicblock-api/src/magic_validator.rs b/magicblock-api/src/magic_validator.rs index b815d39ca..53ff59503 100644 --- a/magicblock-api/src/magic_validator.rs +++ b/magicblock-api/src/magic_validator.rs @@ -9,10 +9,6 @@ use std::{ }; use magicblock_account_cloner::ChainlinkCloner; -use magicblock_accounts::{ - scheduled_commits_processor::ScheduledCommitsProcessorImpl, - ScheduledCommitsProcessor, -}; use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; use magicblock_aperture::{ initialize_aperture, @@ -23,7 +19,9 @@ use magicblock_chainlink::{ ProdInnerChainlink, }; use magicblock_committor_service::{ - config::ChainConfig, BaseIntentCommittor, CommittorService, + committor_processor::CommittorProcessor, + config::ChainConfig, + service::{intent_client::InternalIntentRpcClient, IntentExecutionService}, ComputeBudgetConfig, DEFAULT_ACTIONS_TIMEOUT, }; use magicblock_config::{ @@ -92,13 +90,16 @@ use crate::{ write_validator_keypair_to_ledger, }, magic_sys_adapter::MagicSysAdapter, - tickers::{init_slot_ticker, init_system_metrics_ticker}, + tickers::init_system_metrics_ticker, }; type InnerChainlinkImpl = ProdInnerChainlink; type ChainlinkImpl = ProdChainlink; +type IntentExecutionServiceImpl = + IntentExecutionService>; + // ----------------- // MagicValidator // ----------------- @@ -109,10 +110,8 @@ pub struct MagicValidator { accountsdb: Arc, ledger: Arc, ledger_truncator: LedgerTruncator, - slot_ticker: Option>, - committor_service: Option>, + intent_execution_service: IntentExecutionServiceImpl, replication_service: Option, - scheduled_commits_processor: Option>, chainlink: Arc, rpc_handle: thread::JoinHandle<()>, identity: Pubkey, @@ -212,15 +211,6 @@ impl MagicValidator { let accountsdb = Arc::new(accountsdb); let (mut dispatch, validator_channels) = link(); - let step_start = Instant::now(); - let committor_service = - Self::init_committor_service(&config, ledger.latest_block()) - .await?; - log_timing("startup", "committor_service_init", step_start); - init_magic_sys(Arc::new(MagicSysAdapter::new( - committor_service.clone(), - ))); - let step_start = Instant::now(); let chainlink = Arc::new( Self::init_chainlink( @@ -233,6 +223,27 @@ impl MagicValidator { ); log_timing("startup", "chainlink_init", step_start); + let step_start = Instant::now(); + let committor_processor = { + let processor = + Self::init_committor_processor(&config, ledger.latest_block())?; + Arc::new(processor) + }; + let intent_execution_service = Self::init_intent_execution_service( + &accountsdb, + &chainlink, + &dispatch.transaction_scheduler, + ledger.latest_block(), + &committor_processor, + config.ledger.block_time, + &token, + ); + log_timing("startup", "committor_service_init", step_start); + init_magic_sys(Arc::new(MagicSysAdapter::new( + tokio::runtime::Handle::current(), + committor_processor.clone(), + ))); + let replication_service = if let Some((broker, is_fresh_start)) = broker { let messages_rx = dispatch.replication_messages.take().expect( @@ -290,15 +301,6 @@ impl MagicValidator { ); log_timing("startup", "system_metrics_ticker_start", step_start); - let scheduled_commits_processor = - committor_service.as_ref().map(|committor_service| { - Arc::new(ScheduledCommitsProcessorImpl::new( - committor_service.clone(), - chainlink.clone(), - dispatch.transaction_scheduler.clone(), - )) - }); - let step_start = Instant::now(); load_upgradeable_programs( &accountsdb, @@ -419,10 +421,8 @@ impl MagicValidator { exit, _metrics: (metrics_service, system_metrics_ticker), // NOTE: set during [Self::start] - slot_ticker: None, - committor_service, + intent_execution_service, replication_service, - scheduled_commits_processor, chainlink, token, ledger, @@ -439,35 +439,58 @@ impl MagicValidator { }) } - #[instrument(skip(config, latest_block))] - async fn init_committor_service( + pub fn init_committor_processor( config: &ValidatorParams, latest_block: &LatestBlock, - ) -> ApiResult>> { + ) -> ApiResult { + let authority = config.validator.keypair.insecure_clone(); let committor_persist_path = config.storage.join("committor_service.sqlite"); - debug!(path = %committor_persist_path.display(), "Initializing committor service"); + let base_chain_config = ChainConfig { + rpc_uri: config.rpc_url().to_owned(), + commitment: CommitmentConfig::confirmed(), + compute_budget_config: ComputeBudgetConfig::new( + config.commit.compute_unit_price, + ), + actions_timeout: DEFAULT_ACTIONS_TIMEOUT, + }; + // TODO(thlorenz): if startup roles change, revisit whether this service is needed for that role. let actions_callback_executor = ActionsCallbackService::new( Arc::new(RpcClient::new(config.aperture.listen.http())), config.validator.keypair.insecure_clone(), latest_block.clone(), ); - let committor_service = Some(Arc::new(CommittorService::try_start( - config.validator.keypair.insecure_clone(), + Ok(CommittorProcessor::try_new( + authority, committor_persist_path, - ChainConfig { - rpc_uri: config.rpc_url().to_owned(), - commitment: CommitmentConfig::confirmed(), - compute_budget_config: ComputeBudgetConfig::new( - config.commit.compute_unit_price, - ), - actions_timeout: DEFAULT_ACTIONS_TIMEOUT, - }, + base_chain_config, actions_callback_executor, - )?)); + )?) + } - Ok(committor_service) + fn init_intent_execution_service( + accounts_db: &Arc, + chainlink: &Arc, + transaction_scheduler: &TransactionSchedulerHandle, + latest_block: &LatestBlock, + committor_processor: &Arc, + slot_interval: Duration, + cancellation_token: &CancellationToken, + ) -> IntentExecutionServiceImpl { + let intent_client = InternalIntentRpcClient::new( + accounts_db.clone(), + transaction_scheduler.clone(), + latest_block.clone(), + ); + + IntentExecutionServiceImpl::new( + chainlink.clone(), + intent_client, + committor_processor.clone(), + slot_interval, + cancellation_token.clone(), + ) } #[allow(clippy::too_many_arguments)] @@ -932,15 +955,8 @@ impl MagicValidator { } let step_start = Instant::now(); - self.slot_ticker = Some(init_slot_ticker( - self.accountsdb.clone(), - &self.scheduled_commits_processor, - self.ledger.latest_block().clone(), - self.config.ledger.block_time, - self.transaction_scheduler.clone(), - self.exit.clone(), - )); - log_timing("startup", "slot_ticker_start", step_start); + self.intent_execution_service.start()?; + log_timing("startup", "intent_execution_service_start", step_start); let step_start = Instant::now(); self.ledger_truncator.start(); @@ -993,22 +1009,11 @@ impl MagicValidator { // Ordering is important here // Commitor service shall be stopped last self.token.cancel(); - if let Some(ref scheduled_commits_processor) = - self.scheduled_commits_processor - { - let step_start = Instant::now(); - scheduled_commits_processor.stop(); - log_timing( - "shutdown", - "scheduled_commits_processor_stop", - step_start, - ); - } - if let Some(ref committor_service) = self.committor_service { - let step_start = Instant::now(); - committor_service.stop(); - log_timing("shutdown", "committor_service_stop", step_start); - } + + let step_start = Instant::now(); + // TODO(edwin): handle returned errs, at least log + let _ = self.intent_execution_service.stop().await; + log_timing("shutdown", "intent_execution_service_stop", step_start); let step_start = Instant::now(); self.claim_fees_task.stop().await; @@ -1027,11 +1032,6 @@ impl MagicValidator { let step_start = Instant::now(); let _ = self.rpc_handle.join(); log_timing("shutdown", "rpc_thread_join", step_start); - if let Some(handle) = self.slot_ticker { - let step_start = Instant::now(); - let _ = handle.await; - log_timing("shutdown", "slot_ticker_join", step_start); - } let step_start = Instant::now(); if let Err(err) = self.ledger_truncator.join() { error!(error = ?err, "Ledger truncator did not gracefully exit"); diff --git a/magicblock-api/src/tickers.rs b/magicblock-api/src/tickers.rs index 30e7e8e9a..c011e3f80 100644 --- a/magicblock-api/src/tickers.rs +++ b/magicblock-api/src/tickers.rs @@ -1,87 +1,11 @@ -use std::{ - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{sync::Arc, time::Duration}; -use magicblock_accounts::ScheduledCommitsProcessor; -use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; -use magicblock_core::link::transactions::{ - with_encoded, TransactionSchedulerHandle, -}; -use magicblock_ledger::{LatestBlock, Ledger}; -use magicblock_magic_program_api as magic_program; +use magicblock_accounts_db::AccountsDb; +use magicblock_ledger::Ledger; use magicblock_metrics::metrics; -use magicblock_program::{instruction_utils::InstructionUtils, MagicContext}; -use solana_account::ReadableAccount; use tokio_util::sync::CancellationToken; use tracing::*; -pub fn init_slot_ticker( - accountsdb: Arc, - committor_processor: &Option>, - latest_block: LatestBlock, - tick_duration: Duration, - transaction_scheduler: TransactionSchedulerHandle, - exit: Arc, -) -> tokio::task::JoinHandle<()> { - let committor_processor = committor_processor.clone(); - - tokio::task::spawn(async move { - while !exit.load(Ordering::Relaxed) { - tokio::time::sleep(tick_duration).await; - - // Handle intents if such feature enabled - let Some(committor_processor) = &committor_processor else { - continue; - }; - - // If accounts were scheduled to be committed, we accept them here - // and processs the commits - let magic_context_acc = accountsdb.get_account(&magic_program::MAGIC_CONTEXT_PUBKEY) - .expect("Validator found to be running without MagicContext account!"); - if MagicContext::has_scheduled_commits(magic_context_acc.data()) { - handle_scheduled_commits( - committor_processor, - &transaction_scheduler, - &latest_block, - ) - .await; - } - } - }) -} - -#[instrument(skip(committor_processor, transaction_scheduler, latest_block))] -async fn handle_scheduled_commits( - committor_processor: &Arc, - transaction_scheduler: &TransactionSchedulerHandle, - latest_block: &LatestBlock, -) { - // 1. Send the transaction to move the scheduled commits from the MagicContext - // to the global ScheduledCommit store - let tx = InstructionUtils::accept_scheduled_commits( - latest_block.load().blockhash, - ); - let Ok(tx) = with_encoded(tx) else { - // Unreachable case, all schedule commit txns are smaller than 64KB by construction - error!("Failed to bincode intent transaction"); - return; - }; - if let Err(err) = transaction_scheduler.execute(tx).await { - error!(error = ?err, "Failed to accept scheduled commits"); - return; - } - - // 2. Process those scheduled commits - // TODO: fix the possible delay here - // https://github.com/magicblock-labs/magicblock-validator/issues/104 - if let Err(err) = committor_processor.process().await { - error!(error = ?err, "Failed to process scheduled commits"); - } -} #[allow(unused_variables)] pub fn init_system_metrics_ticker( tick_duration: Duration, diff --git a/magicblock-committor-service/Cargo.toml b/magicblock-committor-service/Cargo.toml index 11eec877e..e878e7185 100644 --- a/magicblock-committor-service/Cargo.toml +++ b/magicblock-committor-service/Cargo.toml @@ -18,6 +18,12 @@ borsh = { workspace = true } futures-util = { workspace = true } tracing = { workspace = true } lru = { workspace = true } +# TDOO: should be removed +magicblock-account-cloner = { workspace = true } +# TODO: should be removed +magicblock-accounts-db = { workspace = true } +# TODO: should be removed, hidden behinf the trait +magicblock-chainlink = { workspace = true } magicblock-committor-program = { workspace = true, features = [ "no-entrypoint", ] } @@ -30,7 +36,6 @@ magicblock-table-mania = { workspace = true } rusqlite = { workspace = true } solana-account = { workspace = true } solana-account-decoder = { workspace = true } -solana-address-lookup-table-interface = { workspace = true } solana-commitment-config = { workspace = true } solana-compute-budget-interface = { workspace = true } solana-hash = { workspace = true } @@ -43,25 +48,18 @@ solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } solana-signature = { workspace = true } solana-signer = { workspace = true } -solana-system-program = { workspace = true } solana-transaction = { workspace = true } solana-transaction-error = { workspace = true } solana-transaction-status-client-types = { workspace = true } -static_assertions = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } [dev-dependencies] solana-signature = { workspace = true, features = ["rand"] } -serde_json = { workspace = true } -lazy_static = { workspace = true } rand = { workspace = true } tempfile = { workspace = true } test-kit = { workspace = true } -tracing-subscriber = { workspace = true } -tracing-log = { workspace = true } [features] default = [] -dev-context-only-utils = [] diff --git a/magicblock-committor-service/src/committor_processor.rs b/magicblock-committor-service/src/committor_processor.rs index 5439037d8..acf5cfb91 100644 --- a/magicblock-committor-service/src/committor_processor.rs +++ b/magicblock-committor-service/src/committor_processor.rs @@ -1,9 +1,10 @@ use std::{ - collections::{HashMap, HashSet}, + collections::{hash_map::Entry, HashMap}, path::Path, - sync::Arc, + sync::{Arc, Mutex}, }; +use futures_util::future::join_all; use magicblock_core::traits::ActionsCallbackScheduler; use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; use magicblock_rpc_client::MagicblockRpcClient; @@ -11,13 +12,12 @@ use magicblock_table_mania::{GarbageCollectorConfig, TableMania}; use solana_keypair::Keypair; use solana_pubkey::Pubkey; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_signer::Signer; -use tokio::sync::broadcast; -use tracing::{error, instrument}; +use tokio::sync::{broadcast, oneshot, oneshot::error::RecvError}; +use tracing::{error, info, instrument}; use crate::{ config::ChainConfig, - error::CommittorServiceResult, + error::{CommittorServiceError, CommittorServiceResult}, intent_execution_manager::{ db::DummyDB, BroadcastedIntentExecutionResult, IntentExecutionManager, }, @@ -33,14 +33,18 @@ use crate::{ MessageSignatures, }, }; +// use crate::service_ext::CommittorServiceExtError; -pub(crate) struct CommittorProcessor { - pub(crate) magicblock_rpc_client: MagicblockRpcClient, - pub(crate) table_mania: TableMania, - pub(crate) authority: Keypair, +const POISONED_MUTEX_MSG: &str = + "CommittorProcessor pending messages mutex poisoned!"; +type BundleResultListener = oneshot::Sender; + +pub struct CommittorProcessor { + _table_mania: TableMania, persister: IntentPersisterImpl, commits_scheduler: IntentExecutionManager, task_info_fetcher: Arc>, + pending_result_listeners: Arc>>, } impl CommittorProcessor { @@ -91,42 +95,22 @@ impl CommittorProcessor { actions_callback_executor, ); + let result_subscription = commits_scheduler.subscribe_for_results(); + let pending_result_listeners = Arc::new(Mutex::new(HashMap::new())); + tokio::spawn(Self::dispatcher( + result_subscription, + pending_result_listeners.clone(), + )); + Ok(Self { - authority, - magicblock_rpc_client: magic_block_rpc_client, - table_mania, + _table_mania: table_mania, commits_scheduler, persister, task_info_fetcher, + pending_result_listeners, }) } - pub async fn active_lookup_tables(&self) -> Vec { - self.table_mania.active_table_addresses().await - } - - pub async fn released_lookup_tables(&self) -> Vec { - self.table_mania.released_table_addresses().await - } - - pub fn auth_pubkey(&self) -> Pubkey { - self.authority.pubkey() - } - - pub(crate) async fn reserve_pubkeys( - &self, - pubkeys: HashSet, - ) -> CommittorServiceResult<()> { - Ok(self - .table_mania - .reserve_pubkeys(&self.authority, &pubkeys) - .await?) - } - - pub(crate) async fn release_pubkeys(&self, pubkeys: HashSet) { - self.table_mania.release_pubkeys(&pubkeys).await - } - pub fn get_commit_statuses( &self, message_id: u64, @@ -148,7 +132,7 @@ impl CommittorProcessor { } #[instrument(skip(self, intent_bundles))] - pub async fn schedule_intent_bundle( + pub async fn schedule_intent_bundles( &self, intent_bundles: Vec, ) -> CommittorServiceResult<()> { @@ -169,6 +153,45 @@ impl CommittorProcessor { Ok(()) } + pub async fn execute_intent_bundles( + &self, + intent_bundles: Vec, + ) -> CommittorServiceResult> { + // Critical section + let receivers = { + let mut result_listeners = self + .pending_result_listeners + .lock() + .expect(POISONED_MUTEX_MSG); + + intent_bundles + .iter() + .map(|intent| { + let (sender, receiver) = oneshot::channel(); + match result_listeners.entry(intent.id) { + Entry::Vacant(vacant) => { + vacant.insert(sender); + Ok(receiver) + } + Entry::Occupied(_) => { + Err(CommittorServiceError::RepeatingMessageError( + intent.id, + )) + } + } + }) + .collect::, _>>()? + }; + + self.schedule_intent_bundles(intent_bundles).await?; + let results = join_all(receivers.into_iter()) + .await + .into_iter() + .collect::, RecvError>>()?; + + Ok(results) + } + /// Creates a subscription for results of BaseIntent execution pub fn subscribe_for_results( &self, @@ -186,4 +209,48 @@ impl CommittorProcessor { .fetch_current_commit_nonces(pubkeys, min_context_slot) .await } + + /// Dispatch worker + #[instrument(skip(pending_result_listeners, results_subscription))] + async fn dispatcher( + mut results_subscription: broadcast::Receiver< + BroadcastedIntentExecutionResult, + >, + pending_result_listeners: Arc< + Mutex>, + >, + ) { + loop { + let execution_result = match results_subscription.recv().await { + Ok(result) => result, + Err(broadcast::error::RecvError::Closed) => { + info!("Intent execution shutdown"); + break; + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + // SAFETY: not really feasible to happen as this function is way faster than Intent execution + // requires investigation if ever happens! + error!(skipped, "Dispatcher lag detected"); + continue; + } + }; + + let sender = if let Some(sender) = pending_result_listeners + .lock() + .expect(POISONED_MUTEX_MSG) + .remove(&execution_result.id) + { + sender + } else { + continue; + }; + + if let Err(execution_result) = sender.send(execution_result) { + error!( + intent_id = execution_result.id, + "Failed to send execution result" + ); + } + } + } } diff --git a/magicblock-committor-service/src/compute_budget.rs b/magicblock-committor-service/src/compute_budget.rs index 93a40e8e7..4a6b5e79f 100644 --- a/magicblock-committor-service/src/compute_budget.rs +++ b/magicblock-committor-service/src/compute_budget.rs @@ -1,26 +1,6 @@ use solana_compute_budget_interface::ComputeBudgetInstruction; use solana_instruction::Instruction; -// ----------------- -// Budgets -// ----------------- -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct Budget { - base_budget: u32, - per_committee: u32, - compute_unit_price: u64, -} - -impl Default for Budget { - fn default() -> Self { - Self { - base_budget: 80_000, - per_committee: 45_000, - compute_unit_price: 1_000_000, - } - } -} - #[derive(Debug, Clone)] pub struct BufferWithReallocBudget { base_budget: u32, @@ -67,20 +47,9 @@ impl BufferWriteChunkBudget { } } -// ----------------- -// ComputeBudgetConfig -// ----------------- #[derive(Debug, Clone)] pub struct ComputeBudgetConfig { pub compute_unit_price: u64, - pub args_process: Budget, - pub finalize: Budget, - pub buffer_close: Budget, - /// The budget used for processing and process + closing a buffer. - /// Since we mix pure process and process + close instructions, we need to - /// assume the worst case and use the process + close budget for all. - pub buffer_process_and_close: Budget, - pub undelegate: Budget, pub buffer_init: BufferWithReallocBudget, pub buffer_realloc: BufferWithReallocBudget, pub buffer_write: BufferWriteChunkBudget, @@ -90,31 +59,6 @@ impl ComputeBudgetConfig { pub fn new(compute_unit_price: u64) -> Self { Self { compute_unit_price, - args_process: Budget { - compute_unit_price, - base_budget: 80_000, - per_committee: 35_000, - }, - buffer_close: Budget { - compute_unit_price, - base_budget: 10_000, - per_committee: 25_000, - }, - buffer_process_and_close: Budget { - compute_unit_price, - base_budget: 40_000, - per_committee: 45_000, - }, - finalize: Budget { - compute_unit_price, - base_budget: 80_000, - per_committee: 25_000, - }, - undelegate: Budget { - compute_unit_price, - base_budget: 70_000, - per_committee: 35_000, - }, buffer_init: BufferWithReallocBudget { base_budget: 12_000, per_realloc_ix: 6_000, @@ -134,88 +78,6 @@ impl ComputeBudgetConfig { } } -impl ComputeBudgetConfig { - pub fn args_process_budget(&self) -> ComputeBudget { - ComputeBudget::Process(self.args_process) - } - pub fn buffer_close_budget(&self) -> ComputeBudget { - ComputeBudget::Close(self.buffer_close) - } - pub fn buffer_process_and_close_budget(&self) -> ComputeBudget { - ComputeBudget::ProcessAndClose(self.buffer_process_and_close) - } - pub fn finalize_budget(&self) -> ComputeBudget { - ComputeBudget::Finalize(self.finalize) - } - pub fn undelegate_budget(&self) -> ComputeBudget { - ComputeBudget::Undelegate(self.undelegate) - } -} - -// ----------------- -// ComputeBudget -// ----------------- -#[derive(Clone, Debug, PartialEq, Eq)] -pub enum ComputeBudget { - Process(Budget), - Close(Budget), - ProcessAndClose(Budget), - Finalize(Budget), - Undelegate(Budget), -} - -impl ComputeBudget { - fn base_budget(&self) -> u32 { - use ComputeBudget::*; - match self { - Process(budget) => budget.base_budget, - Close(budget) => budget.base_budget, - ProcessAndClose(budget) => budget.base_budget, - Finalize(budget) => budget.base_budget, - Undelegate(budget) => budget.base_budget, - } - } - - fn per_committee(&self) -> u32 { - use ComputeBudget::*; - match self { - Process(budget) => budget.per_committee, - Close(budget) => budget.per_committee, - ProcessAndClose(budget) => budget.per_committee, - Finalize(budget) => budget.per_committee, - Undelegate(budget) => budget.per_committee, - } - } - - pub fn compute_unit_price(&self) -> u64 { - use ComputeBudget::*; - match self { - Process(budget) => budget.compute_unit_price, - Close(budget) => budget.compute_unit_price, - ProcessAndClose(budget) => budget.compute_unit_price, - Finalize(budget) => budget.compute_unit_price, - Undelegate(budget) => budget.compute_unit_price, - } - } - - fn total_budget(&self, committee_count: u32) -> u32 { - self.per_committee() - .checked_mul(committee_count) - .and_then(|product| product.checked_add(self.base_budget())) - .unwrap_or(u32::MAX) - } - - pub fn instructions(&self, committee_count: usize) -> Vec { - let committee_count = - u32::try_from(committee_count).unwrap_or(u32::MAX); - - instructions( - self.total_budget(committee_count), - self.compute_unit_price(), - ) - } -} - fn instructions( compute_budget: u32, compute_unit_price: u64, diff --git a/magicblock-committor-service/src/error.rs b/magicblock-committor-service/src/error.rs index fa13e49b8..2b4ce584b 100644 --- a/magicblock-committor-service/src/error.rs +++ b/magicblock-committor-service/src/error.rs @@ -1,86 +1,21 @@ -use solana_pubkey::Pubkey; -use solana_signature::Signature; -use solana_transaction_error::TransactionError; use thiserror::Error; +use tokio::sync::oneshot::error::RecvError; -use crate::{ - intent_execution_manager::IntentExecutionManagerError, - intent_executor::task_info_fetcher::TaskInfoFetcherError, -}; +use crate::intent_execution_manager::IntentExecutionManagerError; pub type CommittorServiceResult = Result; #[derive(Error, Debug)] pub enum CommittorServiceError { - #[error("CommittorError: {0} ({0:?})")] - CommittorError(#[from] magicblock_committor_program::error::CommittorError), - #[error("CommitPersistError: {0} ({0:?})")] CommitPersistError(#[from] crate::persist::error::CommitPersistError), - #[error("MagicBlockRpcClientError: {0} ({0:?})")] - MagicBlockRpcClientError( - #[from] magicblock_rpc_client::MagicBlockRpcClientError, - ), - - #[error("TableManiaError: {0} ({0:?})")] - TableManiaError(#[from] magicblock_table_mania::error::TableManiaError), - #[error("IntentExecutionManagerError: {0} ({0:?})")] IntentExecutionManagerError(#[from] IntentExecutionManagerError), - #[error("TaskInfoFetcherError: {0} ({0:?})")] - TaskInfoFetcherError(#[from] TaskInfoFetcherError), - - #[error( - "Failed send and confirm transaction to {0} on chain: {1} ({1:?})" - )] - FailedToSendAndConfirmTransaction( - String, - magicblock_rpc_client::MagicBlockRpcClientError, - ), - - #[error("The transaction to {0} was sent and confirmed, but encountered an error: {1} ({1:?})")] - EncounteredTransactionError(String, TransactionError), - - #[error("Failed to send init changeset account: {0} ({0:?})")] - FailedToSendInitChangesetAccount( - Box, - ), - - #[error("Failed to confirm init changeset account: {0} ({0:?})")] - FailedToConfirmInitChangesetAccount( - Box, - ), - #[error("Init transaction '{0}' was not confirmed")] - InitChangesetAccountNotConfirmed(String), - - #[error("Task {0} failed to compile transaction message: {1} ({1:?})")] - FailedToCompileTransactionMessage(String, solana_message::CompileError), - - #[error("Task {0} failed to create transaction: {1} ({1:?})")] - FailedToCreateTransaction(String, solana_signer::SignerError), - - #[error("Could not find commit strategy for bundle {0}")] - CouldNotFindCommitStrategyForBundle(u64), - - #[error("Failed to fetch metadata account for {0}")] - FailedToFetchDelegationMetadata(Pubkey), - - #[error("Failed to deserialize metadata account for {0}, {1:?}")] - FailedToDeserializeDelegationMetadata( - Pubkey, - solana_program::program_error::ProgramError, - ), -} + #[error("RecvError: {0}")] + IntentResultRecvError(#[from] RecvError), -impl CommittorServiceError { - pub fn signature(&self) -> Option { - use CommittorServiceError::*; - match self { - MagicBlockRpcClientError(e) => e.signature(), - FailedToSendAndConfirmTransaction(_, e) => e.signature(), - _ => None, - } - } + #[error("Attempt to schedule already scheduled message id: {0}")] + RepeatingMessageError(u64), } diff --git a/magicblock-committor-service/src/lib.rs b/magicblock-committor-service/src/lib.rs index 9836160f4..48d90930f 100644 --- a/magicblock-committor-service/src/lib.rs +++ b/magicblock-committor-service/src/lib.rs @@ -1,4 +1,4 @@ -mod committor_processor; +pub mod committor_processor; mod compute_budget; pub mod config; mod consts; @@ -6,16 +6,12 @@ pub mod error; pub mod intent_execution_manager; pub mod intent_executor; pub mod persist; -mod pubkeys_provider; -mod service; -pub mod service_ext; -#[cfg(feature = "dev-context-only-utils")] -pub mod stubs; pub mod tasks; pub mod transaction_preparator; pub mod transactions; pub(crate) mod utils; +pub mod service; #[cfg(test)] pub mod test_utils; @@ -24,4 +20,3 @@ pub use config::DEFAULT_ACTIONS_TIMEOUT; pub use magicblock_committor_program::{ ChangedAccount, Changeset, ChangesetMeta, }; -pub use service::{BaseIntentCommittor, CommittorService}; diff --git a/magicblock-committor-service/src/pubkeys_provider.rs b/magicblock-committor-service/src/pubkeys_provider.rs deleted file mode 100644 index 9995c174e..000000000 --- a/magicblock-committor-service/src/pubkeys_provider.rs +++ /dev/null @@ -1,67 +0,0 @@ -use std::collections::HashSet; - -use dlp_api::pda; -use solana_pubkey::Pubkey; -use solana_system_program::id as system_program_id; -use tracing::*; - -/// Returns all accounts needed to process/finalize a commit for the account -/// with the provided `delegated_account`. -/// NOTE: that buffer and chunk accounts are different for each commit and -/// thus are not included -pub fn provide_committee_pubkeys( - committee: &Pubkey, - owner_program: Option<&Pubkey>, -) -> HashSet { - let mut set = HashSet::new(); - set.insert(*committee); - set.insert(pda::delegation_record_pda_from_delegated_account(committee)); - set.insert(pda::delegation_metadata_pda_from_delegated_account( - committee, - )); - set.insert(pda::commit_state_pda_from_delegated_account(committee)); - set.insert(pda::commit_record_pda_from_delegated_account(committee)); - set.insert(pda::undelegate_buffer_pda_from_delegated_account(committee)); - - // NOTE: ideally we'd also include the rent_fee_payer here, but that is - // not known to the validator at the time of cloning since it is - // stored inside the delegation metadata account instead of the - // delegation record - - if let Some(owner_program) = owner_program { - set.insert(pda::program_config_from_program_id(owner_program)); - } else { - warn!(committee = %committee, "No owner program provided for committee"); - } - set -} - -/// Returns common accounts needed for process/finalize transactions, -/// namely the program ids used and the fees vaults and the validator itself. -pub fn provide_common_pubkeys(validator: &Pubkey) -> HashSet { - let mut set = HashSet::new(); - - let deleg_program = dlp_api::id(); - let protocol_fees_vault = pda::fees_vault_pda(); - let validator_fees_vault = - pda::validator_fees_vault_pda_from_validator(validator); - let committor_program = magicblock_committor_program::id(); - - trace!( - validator = %validator, - deleg_program = %deleg_program, - protocol_fees_vault = %protocol_fees_vault, - validator_fees_vault = %validator_fees_vault, - committor_program = %committor_program, - "Common pubkeys loaded" - ); - - set.insert(*validator); - set.insert(system_program_id()); - set.insert(deleg_program); - set.insert(protocol_fees_vault); - set.insert(validator_fees_vault); - set.insert(committor_program); - - set -} diff --git a/magicblock-committor-service/src/service.rs b/magicblock-committor-service/src/service.rs index bbec9d805..f3dd3a6b2 100644 --- a/magicblock-committor-service/src/service.rs +++ b/magicblock-committor-service/src/service.rs @@ -1,570 +1,431 @@ -use std::{collections::HashMap, path::Path, sync::Arc, time::Instant}; - -use magicblock_core::traits::ActionsCallbackScheduler; -use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; -use solana_keypair::Keypair; -use solana_pubkey::Pubkey; -use solana_signature::Signature; -use solana_transaction_status_client_types::EncodedConfirmedTransactionWithStatusMeta; +pub mod intent_client; + +use std::{ + collections::{HashMap, HashSet}, + mem, + sync::{Arc, Mutex}, + time::Duration, +}; + +use intent_client::{ERIntentClient, InternalIntentClientError}; +use magicblock_account_cloner::ChainlinkCloner; +use magicblock_chainlink::{ProdChainlink, ProdInnerChainlink}; +use magicblock_metrics::metrics; +use magicblock_program::{ + magic_scheduled_base_intent::ScheduledIntentBundle, Pubkey, SentCommit, +}; +use solana_hash::Hash; +use solana_transaction::Transaction; use tokio::{ - select, - sync::{ - broadcast, - mpsc::{self, error::TrySendError}, - oneshot, - }, + sync::broadcast, + task, + task::{JoinError, JoinHandle}, }; -use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; -use tracing::*; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, instrument}; use crate::{ - committor_processor::CommittorProcessor, - config::ChainConfig, - error::{CommittorServiceError, CommittorServiceResult}, + committor_processor::CommittorProcessor, error::CommittorServiceError, intent_execution_manager::BroadcastedIntentExecutionResult, - persist::{CommitStatusRow, MessageSignatures}, - pubkeys_provider::{provide_committee_pubkeys, provide_common_pubkeys}, + intent_executor::ExecutionOutput, }; -#[derive(Debug)] -pub struct LookupTables { - pub active: Vec, - pub released: Vec, -} +const POISONED_MUTEX_MSG: &str = "ServiceInner intents_meta_map mutex poisoned"; -#[derive(Debug)] -pub enum CommittorMessage { - ReservePubkeysForCommittee { - /// When the request was initiated - initiated: Instant, - /// Called once the pubkeys have been reserved and includes that timestamp - /// at which the request was initiated - respond_to: oneshot::Sender>, - /// The committee whose pubkeys to reserve in a lookup table - /// These pubkeys are used to process/finalize the commit - committee: Pubkey, - /// The owner program of the committee - owner: Pubkey, - }, - ReserveCommonPubkeys { - /// Called once the pubkeys have been reserved - respond_to: oneshot::Sender>, - }, - ReleaseCommonPubkeys { - /// Called once the pubkeys have been released - respond_to: oneshot::Sender<()>, - }, - ScheduleIntentBundle { - /// The [`ScheduleIntentBundle`]s to commit - intent_bundles: Vec, - respond_to: oneshot::Sender>, - }, - GetCommitStatuses { - respond_to: - oneshot::Sender>>, - message_id: u64, - }, - GetCommitSignatures { - respond_to: - oneshot::Sender>>, - commit_id: u64, - pubkey: Pubkey, - }, - GetLookupTables { - respond_to: oneshot::Sender, - }, - GetTransaction { - respond_to: oneshot::Sender< - CommittorServiceResult, - >, - signature: Signature, - }, - SubscribeForResults { - respond_to: oneshot::Sender< - broadcast::Receiver, - >, - }, - FetchCurrentCommitNonces { - respond_to: - oneshot::Sender>>, - pubkeys: Vec, - min_context_slot: u64, - }, - FetchCurrentCommitNoncesSync { - respond_to: std::sync::mpsc::Sender< - CommittorServiceResult>, - >, - pubkeys: Vec, - min_context_slot: u64, - }, -} +pub type InnerChainlinkImpl = ProdInnerChainlink; +pub type ChainlinkImpl = ProdChainlink; -// ----------------- -// CommittorActor -// ----------------- -struct CommittorActor { - receiver: mpsc::Receiver, - processor: Arc, +pub enum IntentExecutionService { + Created(ServiceInner), + Started(JoinHandle<()>), + Stopped, + Error, } -impl CommittorActor { - pub fn try_new( - receiver: mpsc::Receiver, - authority: Keypair, - persist_file: P, - chain_config: ChainConfig, - actions_callback_executor: A, - ) -> CommittorServiceResult - where - P: AsRef, - A: ActionsCallbackScheduler, - { - let processor = Arc::new(CommittorProcessor::try_new( - authority, - persist_file, - chain_config, - actions_callback_executor, - )?); - - Ok(Self { - receiver, +impl IntentExecutionService +where + R: ERIntentClient, + R::Error: Into, +{ + pub fn new( + chainlink: Arc, + intent_rpc_client: R, + processor: Arc, + slot_interval: Duration, + cancellation_token: CancellationToken, + ) -> Self { + Self::Created(ServiceInner::new( + chainlink, + intent_rpc_client, processor, - }) - } - - #[instrument(skip(self))] - async fn handle_msg(&self, msg: CommittorMessage) { - use CommittorMessage::*; - match msg { - ReservePubkeysForCommittee { - initiated, - respond_to, - committee, - owner, - } => { - let processor = self.processor.clone(); - tokio::task::spawn(async move { - let pubkeys = - provide_committee_pubkeys(&committee, Some(&owner)); - // NOTE: we wait here until the reservation is done which causes the - // cloning of a particular account to be blocked. - // This leads to larger delays on the first clone of an account, but also - // ensures that the account could be committed via a lookup table later. - let result = processor - .reserve_pubkeys(pubkeys) - .await - .map(|_| initiated); - if let Err(e) = respond_to.send(result) { - error!(message_type = "ReservePubkeysForCommittee", error = ?e, "Failed to send response"); - } - }); - } - ReserveCommonPubkeys { respond_to } => { - let processor = self.processor.clone(); - tokio::task::spawn(async move { - let pubkeys = - provide_common_pubkeys(&processor.auth_pubkey()); - let reqid = processor.reserve_pubkeys(pubkeys).await; - if let Err(e) = respond_to.send(reqid) { - error!(message_type = "ReserveCommonPubkeys", error = ?e, "Failed to send response"); - } - }); - } - ReleaseCommonPubkeys { respond_to } => { - let processor = self.processor.clone(); - tokio::task::spawn(async move { - let pubkeys = - provide_common_pubkeys(&processor.auth_pubkey()); - processor.release_pubkeys(pubkeys).await; - if let Err(e) = respond_to.send(()) { - error!(message_type = "ReleaseCommonPubkeys", error = ?e, "Failed to send response"); - } - }); - } - ScheduleIntentBundle { - intent_bundles, - respond_to, - } => { - let result = - self.processor.schedule_intent_bundle(intent_bundles).await; - if let Err(e) = respond_to.send(result) { - error!(message_type = "ScheduleBaseIntents", error = ?e, "Failed to send response"); - } - } - GetCommitStatuses { - message_id, - respond_to, - } => { - let commit_statuses = - self.processor.get_commit_statuses(message_id); - if let Err(e) = respond_to.send(commit_statuses) { - error!(message_type = "GetCommitStatuses", error = ?e, "Failed to send response"); - } - } - GetCommitSignatures { - commit_id, - respond_to, - pubkey, - } => { - let sig = - self.processor.get_commit_signature(commit_id, pubkey); - if let Err(e) = respond_to.send(sig) { - error!(message_type = "GetCommitSignatures", error = ?e, "Failed to send response"); - } - } - GetTransaction { - signature, - respond_to, - } => { - let processor = self.processor.clone(); - tokio::task::spawn(async move { - let res = processor - .magicblock_rpc_client - .get_transaction(&signature, None) - .await - .map_err(Into::into); - if let Err(err) = respond_to.send(res) { - error!(message_type = "GetTransaction", error = ?err, "Failed to send response"); - } - }); - } - GetLookupTables { respond_to } => { - let active_tables = self.processor.active_lookup_tables().await; - let released_tables = - self.processor.released_lookup_tables().await; - if let Err(e) = respond_to.send(LookupTables { - active: active_tables, - released: released_tables, - }) { - error!(message_type = "GetLookupTables", error = ?e, "Failed to send response"); - } - } - SubscribeForResults { respond_to } => { - let subscription = self.processor.subscribe_for_results(); - if let Err(err) = respond_to.send(subscription) { - error!(message_type = "SubscribeForResults", error = ?err, "Failed to send response"); - } - } - FetchCurrentCommitNonces { - respond_to, - pubkeys, - min_context_slot, - } => { - let processor = self.processor.clone(); - tokio::spawn(async move { - let result = processor - .fetch_current_commit_nonces(&pubkeys, min_context_slot) - .await; - if let Err(err) = respond_to - .send(result.map_err(CommittorServiceError::from)) - { - error!(message_type = "FetchCurrentCommitNonces", error = ?err, "Failed to send response"); - } - }); - } - FetchCurrentCommitNoncesSync { - respond_to, - pubkeys, - min_context_slot, - } => { - let processor = self.processor.clone(); - tokio::spawn(async move { - let result = processor - .fetch_current_commit_nonces(&pubkeys, min_context_slot) - .await; - if let Err(err) = respond_to - .send(result.map_err(CommittorServiceError::from)) - { - error!(message_type = "FetchCurrentCommitNoncesSync", error = ?err, "Failed to send response"); - } - }); - } - } + slot_interval, + cancellation_token, + )) } - #[instrument(skip(self, cancel_token))] - pub async fn run(&mut self, cancel_token: CancellationToken) { - loop { - select! { - msg = self.receiver.recv() => { - if let Some(msg) = msg { - self.handle_msg(msg).await; - } else { - break; - } - } - _ = cancel_token.cancelled() => { - break; - } - } - } - - info!("Actor shutdown"); + fn take(&mut self) -> Self { + mem::replace(self, Self::Error) } -} -// ----------------- -// CommittorService -// ----------------- -pub struct CommittorService { - sender: mpsc::Sender, - cancel_token: CancellationToken, -} + pub fn start(&mut self) -> Result<(), IntentExecutionServiceError> { + let Self::Created(service) = self.take() else { + return Err(IntentExecutionServiceError::InvalidState( + "service must be in Created state to start".into(), + )); + }; -impl CommittorService { - pub fn try_start( - authority: Keypair, - persist_file: P, - chain_config: ChainConfig, - actions_callback_executor: A, - ) -> CommittorServiceResult - where - P: AsRef, - A: ActionsCallbackScheduler, - { - debug!("Starting committor service"); - let (sender, receiver) = mpsc::channel(1_000); - let cancel_token = CancellationToken::new(); - { - let cancel_token = cancel_token.clone(); - let mut actor = CommittorActor::try_new( - receiver, - authority, - persist_file, - chain_config, - actions_callback_executor, - )?; - tokio::spawn(async move { - actor.run(cancel_token).await; - }); - } - Ok(Self { - sender, - cancel_token, - }) + let handle = service.start(); + *self = Self::Started(handle); + Ok(()) } - pub fn reserve_common_pubkeys( - &self, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::ReserveCommonPubkeys { - respond_to: tx, - }); - rx - } + pub async fn stop(&mut self) -> Result<(), IntentExecutionServiceError> { + let Self::Started(handle) = self.take() else { + return Err(IntentExecutionServiceError::InvalidState( + "service must be in Started state to stop".into(), + )); + }; - pub fn release_common_pubkeys(&self) -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::ReleaseCommonPubkeys { - respond_to: tx, - }); - rx + handle.await?; + *self = Self::Stopped; + Ok(()) } +} - pub fn get_commit_signatures( - &self, - commit_id: u64, - pubkey: Pubkey, - ) -> oneshot::Receiver>> - { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::GetCommitSignatures { - respond_to: tx, - commit_id, - pubkey, - }); - rx - } +pub struct ServiceInner { + /// Chainlink for notifying of undelegations + chainlink: Arc, + /// ER client specific for Intent needs + intent_rpc_client: Arc, + /// Processor of accepted intents + processor: Arc, + /// Time interval to scrape MagicContext(ER slot interval) + // TODO(edwin): can be removed if LatestBlocK moved into magicblock-core + slot_interval: Duration, + cancellation_token: CancellationToken, + /// Meta for ongoing executing intents + intents_meta_map: Arc>>, +} - pub fn get_lookup_tables(&self) -> oneshot::Receiver { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::GetLookupTables { respond_to: tx }); - rx +impl ServiceInner +where + R: ERIntentClient, + R::Error: Into, +{ + pub fn new( + chainlink: Arc, + intent_rpc_client: R, + processor: Arc, + slot_interval: Duration, + cancellation_token: CancellationToken, + ) -> Self { + Self { + chainlink, + intent_rpc_client: Arc::new(intent_rpc_client), + processor, + slot_interval, + cancellation_token, + intents_meta_map: Arc::new(Mutex::default()), + } } - pub fn fetch_current_commit_nonces_sync( - &self, - pubkeys: &[Pubkey], - min_context_slot: u64, - ) -> std::sync::mpsc::Receiver>> - { - let (tx, rx) = std::sync::mpsc::channel(); - self.try_send(CommittorMessage::FetchCurrentCommitNoncesSync { - respond_to: tx, - pubkeys: pubkeys.to_vec(), - min_context_slot, - }); - rx + pub fn start(self) -> JoinHandle<()> { + let result_subscriber = self.processor.subscribe_for_results(); + let cancellation_token = self.cancellation_token.clone(); + tokio::spawn(Self::result_processor( + result_subscriber, + cancellation_token, + self.intents_meta_map.clone(), + self.intent_rpc_client.clone(), + )); + + tokio::task::spawn(self.accept_worker()) } - fn try_send(&self, msg: CommittorMessage) { - if let Err(e) = self.sender.try_send(msg) { - match e { - TrySendError::Full(msg) => error!( - "Channel full, failed to send commit message {:?}", - msg - ), - TrySendError::Closed(msg) => error!( - "Channel closed, failed to send commit message {:?}", - msg - ), + async fn accept_worker(self) { + let mut interval = tokio::time::interval(self.slot_interval); + loop { + tokio::select! { + biased; + _ = self.cancellation_token.cancelled() => { + break; + } + _ = interval.tick() => { + let accept_result = self + .intent_rpc_client + .accept_scheduled_intents() + .await; + let intent_bundles = match accept_result { + Ok(value) => value, + Err(err) => { + error!("Failed to accept intents: {}", err); + continue; + } + }; + + if let Err(err) = self.schedule_intent_execution(intent_bundles).await { + error!("Failed to schedule intent execution: {}", err); + } + } } } } -} - -impl BaseIntentCommittor for CommittorService { - fn reserve_pubkeys_for_committee( - &self, - committee: Pubkey, - owner: Pubkey, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::ReservePubkeysForCommittee { - initiated: Instant::now(), - respond_to: tx, - committee, - owner, - }); - rx - } - fn schedule_intent_bundles( + async fn schedule_intent_execution( &self, intent_bundles: Vec, - ) -> oneshot::Receiver> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::ScheduleIntentBundle { - intent_bundles, - respond_to: tx, - }); - rx - } + ) -> Result<(), CommittorServiceError> { + if intent_bundles.is_empty() { + return Ok(()); + } - fn get_commit_statuses( - &self, - message_id: u64, - ) -> oneshot::Receiver>> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::GetCommitStatuses { - respond_to: tx, - message_id, - }); - rx - } + metrics::inc_committor_intents_count_by(intent_bundles.len() as u64); - fn get_commit_signatures( - &self, - commit_id: u64, - pubkey: Pubkey, - ) -> oneshot::Receiver>> - { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::GetCommitSignatures { - respond_to: tx, - commit_id, - pubkey, - }); - rx - } + // Add metas for intent we schedule + let pubkeys_being_undelegated = { + let mut intent_metas = + self.intents_meta_map.lock().expect(POISONED_MUTEX_MSG); + let mut pubkeys_being_undelegated = HashSet::::new(); - fn subscribe_for_results( - &self, - ) -> oneshot::Receiver> - { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::SubscribeForResults { respond_to: tx }); - rx + intent_bundles.iter().for_each(|intent| { + intent_metas + .insert(intent.id, ScheduledBaseIntentMeta::new(intent)); + if let Some(undelegate) = intent.get_undelegate_intent_pubkeys() + { + pubkeys_being_undelegated.extend(undelegate); + } + }); + + pubkeys_being_undelegated.into_iter().collect::>() + }; + + self.process_undelegation_requests(pubkeys_being_undelegated) + .await; + self.processor + .schedule_intent_bundles(intent_bundles) + .await?; + Ok(()) } - fn get_transaction( - &self, - signature: &Signature, - ) -> oneshot::Receiver< - CommittorServiceResult, - > { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::GetTransaction { - respond_to: tx, - signature: *signature, - }); - - rx + async fn process_undelegation_requests(&self, pubkeys: Vec) { + let mut join_set = task::JoinSet::new(); + for pubkey in pubkeys.into_iter() { + let chainlink = self.chainlink.clone(); + join_set.spawn(async move { + (pubkey, chainlink.undelegation_requested(pubkey).await) + }); + } + let sub_errors = join_set + .join_all() + .await + .into_iter() + .filter_map(|(pubkey, inner_result)| { + if let Err(err) = inner_result { + Some(format!( + "Subscribing to account {} failed: {}", + pubkey, err + )) + } else { + None + } + }) + .collect::>(); + if !sub_errors.is_empty() { + // Instead of aborting the entire commit we log an error here, however + // this means that the undelegated accounts stay in a problematic state + // in the validator and are not synced from chain. + // We could implement a retry mechanism inside of chainlink in the future. + error!( + error_count = sub_errors.len(), + "Failed to subscribe to accounts being undelegated" + ); + } } - fn fetch_current_commit_nonces( - &self, - pubkeys: &[Pubkey], - min_context_slot: u64, - ) -> oneshot::Receiver>> { - let (tx, rx) = oneshot::channel(); - self.try_send(CommittorMessage::FetchCurrentCommitNonces { - respond_to: tx, - pubkeys: pubkeys.to_vec(), - min_context_slot, - }); - - rx + #[instrument(skip( + result_subscription, + cancellation_token, + intents_meta_map, + intent_client + ))] + async fn result_processor( + mut result_subscription: broadcast::Receiver< + BroadcastedIntentExecutionResult, + >, + cancellation_token: CancellationToken, + intents_meta_map: Arc>>, + intent_client: Arc, + ) { + loop { + let execution_result = tokio::select! { + biased; + _ = cancellation_token.cancelled() => { + info!("Shutting down"); + return; + } + execution_result = result_subscription.recv() => { + match execution_result { + Ok(result) => result, + Err(broadcast::error::RecvError::Closed) => { + info!("Intent execution service shut down"); + break; + } + Err(broadcast::error::RecvError::Lagged(skipped)) => { + // SAFETY: This shouldn't happen as our tx execution is faster than Intent execution on Base layer + // If this ever happens it requires investigation + error!(skipped_count = skipped, "Lagging behind intent execution"); + continue; + } + } + } + }; + + if let Err(err) = ServiceInner::::process_execution_result( + &intent_client, + execution_result, + &intents_meta_map, + ) + .await + { + error!(error = ?err, "Failed process intent execution results"); + } + } } - fn stop(&self) { - self.cancel_token.cancel(); + async fn process_execution_result( + intent_client: &Arc, + execution_result: BroadcastedIntentExecutionResult, + intents_meta_map: &Arc>>, + ) -> Result<(), IntentExecutionServiceError> { + // Create IntentMeta + let intent_id = execution_result.id; + // Remove intent from metas + let Some(mut intent_meta) = intents_meta_map + .lock() + .expect(POISONED_MUTEX_MSG) + .remove(&intent_id) + else { + // Possible if we have duplicate Intents + // First one will remove id from map and second could fail. + // This should not happen and needs investigation! + error!(intent_id, "Failed to find intent metadata"); + return Ok(()); + }; + + let sent_transaction = + mem::take(&mut intent_meta.intent_sent_transaction); + let sent_commit = ServiceInner::::build_sent_commit( + intent_id, + intent_meta, + execution_result, + ); + intent_client + .notify_commit_sent(sent_transaction, sent_commit) + .await + .map_err(Into::into)?; + + Ok(()) } - fn stopped(&self) -> WaitForCancellationFutureOwned { - self.cancel_token.clone().cancelled_owned() + fn build_sent_commit( + intent_id: u64, + intent_meta: ScheduledBaseIntentMeta, + result: BroadcastedIntentExecutionResult, + ) -> SentCommit { + let error_message = + result.as_ref().err().map(|err| format!("{:?}", err)); + let chain_signatures = match result.inner { + Ok(value) => match value { + ExecutionOutput::SingleStage(signature) => vec![signature], + ExecutionOutput::TwoStage { + commit_signature, + finalize_signature, + } => vec![commit_signature, finalize_signature], + }, + Err(err) => { + error!( + "Failed to commit intent: {}, slot: {}, blockhash: {}. {:?}", + intent_id, intent_meta.slot, intent_meta.blockhash, err + ); + err.signatures() + .map(|(commit, finalize)| { + finalize + .map(|finalize| vec![commit, finalize]) + .unwrap_or(vec![commit]) + }) + .unwrap_or_default() + } + }; + let patched_errors = result + .patched_errors + .iter() + .map(|err| { + info!("Patched intent: {}. error was: {}", intent_id, err); + err.to_string() + }) + .collect(); + + let callbacks_report = result + .callbacks_report + .iter() + .map(|r| match r { + Ok(sig) => { + format!("OK: {sig}") + } + Err(err) => { + error!( + "Callback failed to schedule: {}. error: {}", + intent_id, err + ); + format!("ERR: {err}") + } + }) + .collect(); + + SentCommit { + message_id: intent_id, + slot: intent_meta.slot, + blockhash: intent_meta.blockhash, + payer: intent_meta.payer, + chain_signatures, + included_pubkeys: intent_meta.included_pubkeys, + excluded_pubkeys: vec![], + requested_undelegation: intent_meta.requested_undelegation, + error_message, + patched_errors, + callbacks_scheduling_results: callbacks_report, + } } } -pub trait BaseIntentCommittor: Send + Sync + 'static { - /// Reserves pubkeys used in most commits in a lookup table - fn reserve_pubkeys_for_committee( - &self, - committee: Pubkey, - owner: Pubkey, - ) -> oneshot::Receiver>; - - /// Commits the changeset and returns - fn schedule_intent_bundles( - &self, - intent_bundles: Vec, - ) -> oneshot::Receiver>; - - /// Subscribes for results of BaseIntent execution - fn subscribe_for_results( - &self, - ) -> oneshot::Receiver>; - - /// Gets statuses of accounts that were committed as part of a request with provided message_id - fn get_commit_statuses( - &self, - message_id: u64, - ) -> oneshot::Receiver>>; - - /// Gets signatures for commit of particular accounts - fn get_commit_signatures( - &self, - commit_id: u64, - pubkey: Pubkey, - ) -> oneshot::Receiver>>; - - fn get_transaction( - &self, - signature: &Signature, - ) -> oneshot::Receiver< - CommittorServiceResult, - >; - - fn fetch_current_commit_nonces( - &self, - pubkeys: &[Pubkey], - min_context_slot: u64, - ) -> oneshot::Receiver>>; +struct ScheduledBaseIntentMeta { + slot: u64, + blockhash: Hash, + payer: Pubkey, + included_pubkeys: Vec, + intent_sent_transaction: Transaction, + requested_undelegation: bool, +} - /// Stops Committor service - fn stop(&self); +impl ScheduledBaseIntentMeta { + fn new(intent: &ScheduledIntentBundle) -> Self { + Self { + slot: intent.slot, + blockhash: intent.blockhash, + payer: intent.payer, + included_pubkeys: intent.get_all_committed_pubkeys(), + intent_sent_transaction: intent.sent_transaction.clone(), + requested_undelegation: intent.has_undelegate_intent(), + } + } +} - /// Returns future which resolves once committor `stop` got called - fn stopped(&self) -> WaitForCancellationFutureOwned; +#[derive(thiserror::Error, Debug)] +pub enum IntentExecutionServiceError { + #[error("Invalid state: {0}")] + InvalidState(String), + #[error("JoinError: {0}")] + JoinError(#[from] JoinError), + #[error("IntentRpcClientError: {0}")] + IntentRpcClientError(#[from] InternalIntentClientError), } diff --git a/magicblock-committor-service/src/service/intent_client.rs b/magicblock-committor-service/src/service/intent_client.rs new file mode 100644 index 000000000..65aaa2ca3 --- /dev/null +++ b/magicblock-committor-service/src/service/intent_client.rs @@ -0,0 +1,127 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use magicblock_accounts_db::{traits::AccountsBank, AccountsDb}; +use magicblock_core::{ + link::transactions::{with_encoded, TransactionSchedulerHandle}, + traits::LatestBlockProvider, +}; +use magicblock_program::{ + instruction_utils::InstructionUtils, + magic_scheduled_base_intent::ScheduledIntentBundle, + register_scheduled_commit_sent, MagicContext, SentCommit, + TransactionScheduler, MAGIC_CONTEXT_PUBKEY, +}; +use solana_account::ReadableAccount; +use solana_transaction::Transaction; +use solana_transaction_error::TransactionError; +use tracing::{debug, error}; + +#[async_trait] +pub trait ERIntentClient: Send + Sync + 'static { + type Error: std::error::Error + Send; + + /// Executes `Accept` tx and returns accepted intents + async fn accept_scheduled_intents( + &self, + ) -> Result, Self::Error>; + + /// Processes intent results, submitting them on chain(ER) + async fn notify_commit_sent( + &self, + sent_tx: Transaction, + sent_commit: SentCommit, + ) -> Result<(), Self::Error>; +} + +pub struct InternalIntentRpcClient { + /// Provides access to MagicContext + accounts_db: Arc, + /// Internal endpoint for scheduling ER TXs + transaction_scheduler: TransactionSchedulerHandle, + /// Provides access to ER latest block for TX creation + latest_block_provider: L, +} + +impl InternalIntentRpcClient { + pub fn new( + accounts_db: Arc, + transaction_scheduler: TransactionSchedulerHandle, + latest_block_provider: L, + ) -> Self { + Self { + accounts_db, + transaction_scheduler, + latest_block_provider, + } + } + + /// Sends transaction to move the scheduled commits from the `MagicContext` + /// to the global ScheduledCommit store + async fn send_accept_tx(&self) -> Result<(), InternalIntentClientError> { + let tx = InstructionUtils::accept_scheduled_commits( + self.latest_block_provider.blockhash(), + ); + let encoded_tx = with_encoded(tx).inspect_err(|err| { + error!(error = ?err, "Failed to bincode intent transaction"); + })?; + self.transaction_scheduler + .execute(encoded_tx) + .await + .inspect_err(|err| { + error!(error = ?err, "Failed to accept scheduled commits"); + })?; + + Ok(()) + } +} + +#[async_trait] +impl ERIntentClient for InternalIntentRpcClient { + type Error = InternalIntentClientError; + + async fn accept_scheduled_intents( + &self, + ) -> Result, Self::Error> { + // If accounts were scheduled to be committed, we accept them here + // and processs the commits + let magic_context_acc = + self.accounts_db.get_account(&MAGIC_CONTEXT_PUBKEY).expect( + "Validator found to be running without MagicContext account!", + ); + if !MagicContext::has_scheduled_commits(magic_context_acc.data()) { + return Ok(vec![]); + } + self.send_accept_tx().await?; + + // Return intents from global store + Ok(TransactionScheduler::default().take_scheduled_intent_bundles()) + } + + async fn notify_commit_sent( + &self, + sent_tx: Transaction, + sent_commit: SentCommit, + ) -> Result<(), Self::Error> { + register_scheduled_commit_sent(sent_commit); + let txn = with_encoded(sent_tx).inspect_err(|err| { + // Unreachable case, all intent transactions are smaller than 64KB by construction + error!(error = ?err, "Failed to bincode intent transaction"); + })?; + self.transaction_scheduler + .execute(txn) + .await + .inspect(|_| debug!("Sent commit signaled")) + .inspect_err( + |err| error!(error = ?err, "Failed to signal sent commit"), + )?; + + Ok(()) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum InternalIntentClientError { + #[error("TransactionError: {0}")] + TransactionError(#[from] TransactionError), +} diff --git a/magicblock-committor-service/src/service_ext.rs b/magicblock-committor-service/src/service_ext.rs deleted file mode 100644 index 024d0423e..000000000 --- a/magicblock-committor-service/src/service_ext.rs +++ /dev/null @@ -1,253 +0,0 @@ -use std::{ - collections::{hash_map::Entry, HashMap}, - ops::Deref, - sync::{Arc, Mutex}, - time::Instant, -}; - -use async_trait::async_trait; -use futures_util::future::join_all; -use magicblock_program::magic_scheduled_base_intent::ScheduledIntentBundle; -use solana_pubkey::Pubkey; -use solana_signature::Signature; -use solana_transaction_status_client_types::EncodedConfirmedTransactionWithStatusMeta; -use tokio::sync::{broadcast, oneshot, oneshot::error::RecvError}; -use tokio_util::sync::WaitForCancellationFutureOwned; -use tracing::{error, info, instrument}; - -use crate::{ - error::{CommittorServiceError, CommittorServiceResult}, - intent_execution_manager::BroadcastedIntentExecutionResult, - persist::{CommitStatusRow, MessageSignatures}, - BaseIntentCommittor, -}; - -const POISONED_MUTEX_MSG: &str = - "CommittorServiceExt pending messages mutex poisoned!"; - -#[async_trait] -pub trait BaseIntentCommittorExt: BaseIntentCommittor { - /// Schedules Base Intents and waits for their results - async fn schedule_intent_bundles_waiting( - &self, - intent_bundles: Vec, - ) -> BaseIntentCommitorExtResult>; -} - -type MessageResultListener = oneshot::Sender; -pub struct CommittorServiceExt { - inner: Arc, - pending_messages: Arc>>, -} - -impl CommittorServiceExt { - pub fn new(inner: Arc) -> Self { - let pending_messages = Arc::new(Mutex::new(HashMap::new())); - let results_subscription = inner.subscribe_for_results(); - let committor_stopped = inner.stopped(); - tokio::spawn(Self::dispatcher( - committor_stopped, - results_subscription, - pending_messages.clone(), - )); - - Self { - inner, - pending_messages, - } - } - - #[instrument(skip( - committor_stopped, - pending_message, - results_subscription - ))] - async fn dispatcher( - committor_stopped: WaitForCancellationFutureOwned, - results_subscription: oneshot::Receiver< - broadcast::Receiver, - >, - pending_message: Arc>>, - ) { - let mut results_subscription = results_subscription.await.unwrap(); - - tokio::pin!(committor_stopped); - loop { - let execution_result = tokio::select! { - biased; - _ = &mut committor_stopped => { - info!("Shutting down extension"); - return; - } - execution_result = results_subscription.recv() => { - match execution_result { - Ok(result) => result, - Err(broadcast::error::RecvError::Closed) => { - info!("Intent execution shutdown"); - break; - } - Err(broadcast::error::RecvError::Lagged(skipped)) => { - // SAFETY: not really feasible to happen as this function is way faster than Intent execution - // requires investigation if ever happens! - error!(skipped, "Dispatcher lag detected"); - continue; - } - } - } - }; - - let sender = if let Some(sender) = pending_message - .lock() - .expect(POISONED_MUTEX_MSG) - .remove(&execution_result.id) - { - sender - } else { - continue; - }; - - if let Err(execution_result) = sender.send(execution_result) { - error!( - intent_id = execution_result.id, - "Failed to send execution result" - ); - } - } - } -} - -#[async_trait] -impl BaseIntentCommittorExt - for CommittorServiceExt -{ - async fn schedule_intent_bundles_waiting( - &self, - base_intents: Vec, - ) -> BaseIntentCommitorExtResult> - { - // Critical section - let receivers = { - let mut pending_messages = - self.pending_messages.lock().expect(POISONED_MUTEX_MSG); - - base_intents - .iter() - .map(|intent| { - let (sender, receiver) = oneshot::channel(); - match pending_messages.entry(intent.id) { - Entry::Vacant(vacant) => { - vacant.insert(sender); - Ok(receiver) - } - Entry::Occupied(_) => Err( - CommittorServiceExtError::RepeatingMessageError( - intent.id, - ), - ), - } - }) - .collect::, _>>()? - }; - - self.schedule_intent_bundles(base_intents).await??; - let results = join_all(receivers.into_iter()) - .await - .into_iter() - .collect::, RecvError>>()?; - - Ok(results) - } -} - -impl BaseIntentCommittor for CommittorServiceExt { - fn reserve_pubkeys_for_committee( - &self, - committee: Pubkey, - owner: Pubkey, - ) -> oneshot::Receiver> { - self.inner.reserve_pubkeys_for_committee(committee, owner) - } - - fn schedule_intent_bundles( - &self, - intent_bundles: Vec, - ) -> oneshot::Receiver> { - self.inner.schedule_intent_bundles(intent_bundles) - } - - fn subscribe_for_results( - &self, - ) -> oneshot::Receiver> - { - self.inner.subscribe_for_results() - } - - fn get_commit_statuses( - &self, - message_id: u64, - ) -> oneshot::Receiver>> { - self.inner.get_commit_statuses(message_id) - } - - fn get_commit_signatures( - &self, - commit_id: u64, - pubkey: Pubkey, - ) -> oneshot::Receiver>> - { - self.inner.get_commit_signatures(commit_id, pubkey) - } - - fn get_transaction( - &self, - signature: &Signature, - ) -> oneshot::Receiver< - CommittorServiceResult, - > { - self.inner.get_transaction(signature) - } - - fn fetch_current_commit_nonces( - &self, - pubkeys: &[Pubkey], - min_context_slot: u64, - ) -> oneshot::Receiver>> { - self.inner - .fetch_current_commit_nonces(pubkeys, min_context_slot) - } - - fn stop(&self) { - self.inner.stop(); - } - - fn stopped(&self) -> WaitForCancellationFutureOwned { - self.inner.stopped() - } -} - -impl Deref for CommittorServiceExt { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -#[derive(thiserror::Error, Debug)] -pub enum CommittorServiceExtError { - #[error("Attempt to schedule already scheduled message id: {0}")] - RepeatingMessageError(u64), - #[error("RecvError: {0}")] - RecvError(#[from] RecvError), - #[error("CommittorServiceError: {0:?}")] - CommittorServiceError(Box), -} - -impl From for CommittorServiceExtError { - fn from(e: CommittorServiceError) -> Self { - Self::CommittorServiceError(Box::new(e)) - } -} - -pub type BaseIntentCommitorExtResult = - Result; diff --git a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs b/magicblock-committor-service/src/stubs/changeset_committor_stub.rs deleted file mode 100644 index 0b417db47..000000000 --- a/magicblock-committor-service/src/stubs/changeset_committor_stub.rs +++ /dev/null @@ -1,299 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - time::{Instant, SystemTime, UNIX_EPOCH}, -}; - -use async_trait::async_trait; -use magicblock_program::magic_scheduled_base_intent::{ - CommitType, ScheduledIntentBundle, UndelegateType, -}; -use solana_account::Account; -use solana_pubkey::Pubkey; -use solana_signature::Signature; -use solana_transaction_status_client_types::{ - EncodedConfirmedTransactionWithStatusMeta, EncodedTransaction, - EncodedTransactionWithStatusMeta, -}; -use tokio::sync::{broadcast, oneshot}; -use tokio_util::sync::{CancellationToken, WaitForCancellationFutureOwned}; - -use crate::{ - error::CommittorServiceResult, - intent_execution_manager::BroadcastedIntentExecutionResult, - intent_executor::ExecutionOutput, - persist::{CommitStatusRow, IntentPersisterImpl, MessageSignatures}, - service_ext::{BaseIntentCommitorExtResult, BaseIntentCommittorExt}, - BaseIntentCommittor, -}; - -#[derive(Default)] -pub struct ChangesetCommittorStub { - cancellation_token: CancellationToken, - reserved_pubkeys_for_committee: Arc>>, - #[allow(clippy::type_complexity)] - committed_changesets: Arc>>, - committed_accounts: Arc>>, -} - -impl ChangesetCommittorStub { - #[allow(clippy::len_without_is_empty)] - pub fn len(&self) -> usize { - self.committed_changesets.lock().unwrap().len() - } - - pub fn committed(&self, pubkey: &Pubkey) -> Option { - self.committed_accounts.lock().unwrap().get(pubkey).cloned() - } -} - -impl BaseIntentCommittor for ChangesetCommittorStub { - fn reserve_pubkeys_for_committee( - &self, - committee: Pubkey, - owner: Pubkey, - ) -> oneshot::Receiver> { - let initiated = Instant::now(); - let (tx, rx) = oneshot::channel::>(); - self.reserved_pubkeys_for_committee - .lock() - .unwrap() - .insert(committee, owner); - - tx.send(Ok(initiated)).unwrap_or_else(|_| { - tracing::error!( - message_type = "ReservePubkeysForCommittee", - "Failed to send response" - ); - }); - rx - } - - fn schedule_intent_bundles( - &self, - intent_bundles: Vec, - ) -> oneshot::Receiver> { - let (sender, receiver) = oneshot::channel(); - let _ = sender.send(Ok(())); - - { - let mut committed_accounts = - self.committed_accounts.lock().unwrap(); - intent_bundles.iter().for_each(|intent| { - intent - .get_all_committed_accounts() - .iter() - .for_each(|account| { - committed_accounts - .insert(account.pubkey, account.account.clone()); - }) - }) - } - - { - let mut changesets = self.committed_changesets.lock().unwrap(); - intent_bundles.into_iter().for_each(|intent| { - changesets.insert(intent.id, intent); - }); - } - - receiver - } - - fn subscribe_for_results( - &self, - ) -> oneshot::Receiver> - { - let (_, receiver) = oneshot::channel(); - receiver - } - - fn get_commit_statuses( - &self, - message_id: u64, - ) -> oneshot::Receiver>> { - let (tx, rx) = oneshot::channel(); - - let commit = self - .committed_changesets - .lock() - .unwrap() - .remove(&message_id); - let Some(base_intent) = commit else { - tx.send(Ok(vec![])).unwrap_or_else(|_| { - tracing::error!( - message_type = "GetCommitStatuses", - "Failed to send response" - ); - }); - return rx; - }; - - let status_rows = IntentPersisterImpl::create_commit_rows(&base_intent); - tx.send(Ok(status_rows)).unwrap_or_else(|_| { - tracing::error!( - message_type = "GetCommitStatuses", - "Failed to send response" - ); - }); - - rx - } - - fn get_commit_signatures( - &self, - _commit_id: u64, - _pubkey: Pubkey, - ) -> oneshot::Receiver>> - { - let (tx, rx) = oneshot::channel(); - let message_signature = MessageSignatures { - commit_stage_signature: Signature::new_unique(), - finalize_stage_signature: Some(Signature::new_unique()), - created_at: now(), - }; - - tx.send(Ok(Some(message_signature))).unwrap_or_else(|_| { - tracing::error!( - message_type = "GetCommitSignatures", - "Failed to send response" - ); - }); - - rx - } - - fn get_transaction( - &self, - _: &Signature, - ) -> oneshot::Receiver< - CommittorServiceResult, - > { - let (tx, rx) = oneshot::channel(); - if let Err(_err) = - tx.send(Ok(EncodedConfirmedTransactionWithStatusMeta { - slot: 0, - transaction: EncodedTransactionWithStatusMeta { - transaction: EncodedTransaction::LegacyBinary( - "".to_string(), - ), - meta: None, - version: None, - }, - block_time: None, - })) - { - tracing::error!( - message_type = "GetTransaction", - "Failed to send response" - ); - }; - - rx - } - - fn fetch_current_commit_nonces( - &self, - pubkeys: &[Pubkey], - _min_context_slot: u64, - ) -> oneshot::Receiver>> { - let (tx, rx) = oneshot::channel(); - let nonces = pubkeys.iter().map(|p| (*p, 0u64)).collect(); - tx.send(Ok(nonces)).unwrap_or_else(|_| { - tracing::error!( - message_type = "FetchCurrentCommitNonces", - "Failed to send response" - ); - }); - rx - } - - fn stop(&self) { - self.cancellation_token.cancel(); - } - - fn stopped(&self) -> WaitForCancellationFutureOwned { - self.cancellation_token.clone().cancelled_owned() - } -} - -#[async_trait] -impl BaseIntentCommittorExt for ChangesetCommittorStub { - async fn schedule_intent_bundles_waiting( - &self, - intent_bundles: Vec, - ) -> BaseIntentCommitorExtResult> - { - self.schedule_intent_bundles(intent_bundles.clone()) - .await??; - let res = intent_bundles - .into_iter() - .map(|message| { - let callbacks_report = (0..count_bundle_callbacks(&message)) - .map(|_| Ok(Signature::new_unique())) - .collect(); - BroadcastedIntentExecutionResult { - id: message.id, - inner: Ok(ExecutionOutput::TwoStage { - commit_signature: Signature::new_unique(), - finalize_signature: Signature::new_unique(), - }), - patched_errors: Arc::new(vec![]), - callbacks_report, - } - }) - .collect::>(); - - Ok(res) - } -} - -fn count_bundle_callbacks(bundle: &ScheduledIntentBundle) -> usize { - let ib = &bundle.intent_bundle; - - let from_commit = ib - .commit - .as_ref() - .map(|ct| match ct { - CommitType::Standalone(_) => 0, - CommitType::WithBaseActions { base_actions, .. } => { - base_actions.iter().filter(|a| a.callback.is_some()).count() - } - }) - .unwrap_or(0); - - let from_cau = ib - .commit_and_undelegate - .as_ref() - .map(|cau| { - let from_commit_action = match &cau.commit_action { - CommitType::Standalone(_) => 0, - CommitType::WithBaseActions { base_actions, .. } => { - base_actions.iter().filter(|a| a.callback.is_some()).count() - } - }; - let from_undelegate = match &cau.undelegate_action { - UndelegateType::Standalone => 0, - UndelegateType::WithBaseActions(actions) => { - actions.iter().filter(|a| a.callback.is_some()).count() - } - }; - from_commit_action + from_undelegate - }) - .unwrap_or(0); - - let from_standalone = ib - .standalone_actions - .iter() - .filter(|a| a.callback.is_some()) - .count(); - - from_commit + from_cau + from_standalone -} - -fn now() -> u64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_secs() -} diff --git a/magicblock-committor-service/src/stubs/mod.rs b/magicblock-committor-service/src/stubs/mod.rs deleted file mode 100644 index 9cfb6e45c..000000000 --- a/magicblock-committor-service/src/stubs/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -mod changeset_committor_stub; -pub use changeset_committor_stub::ChangesetCommittorStub; diff --git a/magicblock-committor-service/src/transactions.rs b/magicblock-committor-service/src/transactions.rs index 84bf3180c..e24f8504c 100644 --- a/magicblock-committor-service/src/transactions.rs +++ b/magicblock-committor-service/src/transactions.rs @@ -1,676 +1,12 @@ use base64::{prelude::BASE64_STANDARD, Engine}; use solana_rpc_client::rpc_client::SerializableTransaction; -use static_assertions::const_assert; /// From agave rpc/src/rpc.rs [MAX_BASE64_SIZE] pub(crate) const MAX_ENCODED_TRANSACTION_SIZE: usize = 1644; -/// How many process and commit buffer instructions fit into a single transaction -#[allow(unused)] // serves as documentation as well -pub const MAX_PROCESS_PER_TX: u8 = 3; - -/// How many process and commit buffer instructions fit into a single transaction -/// when using address lookup tables but not including the buffer account in the -/// lookup table -#[allow(unused)] // serves as documentation as well -pub const MAX_PROCESS_PER_TX_USING_LOOKUP: u8 = 12; - -/// How many close buffer instructions fit into a single transaction -#[allow(unused)] // serves as documentation as well -pub const MAX_CLOSE_PER_TX: u8 = 8; - -/// How many close buffer instructions fit into a single transaction -/// when using address lookup tables but not including the buffer account -/// nor chunk account in the lookup table -#[allow(unused)] // serves as documentation as well -pub const MAX_CLOSE_PER_TX_USING_LOOKUP: u8 = 8; - -/// How many process and commit buffer instructions combined with close buffer instructions -/// fit into a single transaction -#[allow(unused)] // serves as documentation as well -pub const MAX_PROCESS_AND_CLOSE_PER_TX: u8 = 2; - -/// How many process and commit buffer instructions combined with -/// close buffer instructions fit into a single transaction when -/// using lookup tables but not including the buffer account -#[allow(unused)] // serves as documentation as well -pub const MAX_PROCESS_AND_CLOSE_PER_TX_USING_LOOKUP: u8 = 5; - -/// How many finalize instructions fit into a single transaction -#[allow(unused)] // serves as documentation as well -pub const MAX_FINALIZE_PER_TX: u8 = 5; - -/// How many finalize instructions fit into a single transaction -/// when using address lookup tables -#[allow(unused)] // serves as documentation as well -pub const MAX_FINALIZE_PER_TX_USING_LOOKUP: u8 = 48; - -/// How many undelegate instructions fit into a single transaction -/// NOTE: that we assume the rent reimbursement account to be the delegated account -#[allow(unused)] // serves as documentation as well -pub const MAX_UNDELEGATE_PER_TX: u8 = 3; - -/// How many undelegate instructions fit into a single transaction -/// when using address lookup tables -/// NOTE: that we assume the rent reimbursement account to be the delegated account -#[allow(unused)] // serves as documentation as well -pub const MAX_UNDELEGATE_PER_TX_USING_LOOKUP: u8 = 16; - -// Allows us to run undelegate instructions without rechunking them since we know -// that we didn't process more than we also can undelegate -const_assert!(MAX_PROCESS_PER_TX <= MAX_UNDELEGATE_PER_TX,); - -// Allows us to run undelegate instructions using lookup tables without rechunking -// them since we know that we didn't process more than we also can undelegate -const_assert!( - MAX_PROCESS_PER_TX_USING_LOOKUP <= MAX_UNDELEGATE_PER_TX_USING_LOOKUP -); - pub fn serialize_and_encode_base64( transaction: &impl SerializableTransaction, ) -> String { - // SAFETY: runs statically let serialized = bincode::serialize(transaction).unwrap(); BASE64_STANDARD.encode(serialized) } - -#[cfg(test)] -mod test { - use std::collections::HashSet; - - use dlp_api::args::{CommitStateArgs, CommitStateFromBufferArgs}; - use lazy_static::lazy_static; - use magicblock_committor_program::instruction_builder::close_buffer::{ - create_close_ix, CreateCloseIxArgs, - }; - use solana_address_lookup_table_interface::state::LOOKUP_TABLE_MAX_ADDRESSES; - use solana_hash::Hash; - use solana_instruction::Instruction; - use solana_keypair::Keypair; - use solana_message::{ - v0::Message, AddressLookupTableAccount, VersionedMessage, - }; - use solana_pubkey::Pubkey; - use solana_signer::Signer; - use solana_transaction::versioned::VersionedTransaction; - use tracing::info; - - use super::*; - use crate::{ - compute_budget::{Budget, ComputeBudget}, - error::{ - CommittorServiceError::{ - FailedToCompileTransactionMessage, FailedToCreateTransaction, - }, - CommittorServiceResult, - }, - pubkeys_provider::{provide_committee_pubkeys, provide_common_pubkeys}, - test_utils, - }; - - fn get_lookup_tables( - ixs: &[Instruction], - ) -> Vec { - let pubkeys = ixs - .iter() - .flat_map(|ix| ix.accounts.iter().map(|acc| acc.pubkey)) - .collect::>(); - - let lookup_table = AddressLookupTableAccount { - key: Pubkey::default(), - addresses: pubkeys.into_iter().collect(), - }; - vec![lookup_table] - } - - // ----------------- - // Helpers - // ----------------- - #[allow(clippy::enum_variant_names)] - enum TransactionOpts { - NoLookupTable, - UseLookupTable, - } - fn encoded_tx_size( - auth: &Keypair, - ixs: &[Instruction], - opts: &TransactionOpts, - ) -> CommittorServiceResult { - use TransactionOpts::*; - let lookup_tables = match opts { - NoLookupTable => vec![], - UseLookupTable => get_lookup_tables(ixs), - }; - - let versioned_msg = Message::try_compile( - &auth.pubkey(), - ixs, - &lookup_tables, - Hash::default(), - ) - .map_err(|err| { - FailedToCompileTransactionMessage( - "Calculating transaction size".to_string(), - err, - ) - })?; - let versioned_tx = VersionedTransaction::try_new( - VersionedMessage::V0(versioned_msg), - &[&auth], - ) - .map_err(|err| { - FailedToCreateTransaction( - "Calculating transaction size".to_string(), - err, - ) - })?; - - let encoded = serialize_and_encode_base64(&versioned_tx); - Ok(encoded.len()) - } - - // ----------------- - // Process Commitables and Close Buffers - // ----------------- - pub(crate) fn process_commits_ix( - validator_auth: Pubkey, - pubkey: &Pubkey, - delegated_account_owner: &Pubkey, - buffer_pda: &Pubkey, - commit_args: CommitStateFromBufferArgs, - ) -> Instruction { - dlp_api::instruction_builder::commit_state_from_buffer( - validator_auth, - *pubkey, - *delegated_account_owner, - *buffer_pda, - commit_args, - ) - } - - pub(crate) fn close_buffers_ix( - validator_auth: Pubkey, - pubkey: &Pubkey, - commit_id: u64, - ) -> Instruction { - create_close_ix(CreateCloseIxArgs { - authority: validator_auth, - pubkey: *pubkey, - commit_id, - }) - } - - pub(crate) fn process_and_close_ixs( - validator_auth: Pubkey, - pubkey: &Pubkey, - delegated_account_owner: &Pubkey, - buffer_pda: &Pubkey, - commit_id: u64, - commit_args: CommitStateFromBufferArgs, - ) -> Vec { - let process_ix = process_commits_ix( - validator_auth, - pubkey, - delegated_account_owner, - buffer_pda, - commit_args, - ); - let close_ix = close_buffers_ix(validator_auth, pubkey, commit_id); - - vec![process_ix, close_ix] - } - - // ----------------- - // Finalize - // ----------------- - pub(crate) fn finalize_ix( - validator_auth: Pubkey, - pubkey: &Pubkey, - ) -> Instruction { - dlp_api::instruction_builder::finalize(validator_auth, *pubkey) - } - - // These tests statically determine the optimal ix count to fit into a single - // transaction and assert that the const we export in prod match those numbers. - // Thus when an instruction changes and one of those numbers with it a failing - // test alerts us. - // This is less overhead than running those static functions each time at - // startup. - - #[test] - fn test_max_process_per_tx() { - test_utils::init_test_logger(); - assert_eq!(super::MAX_PROCESS_PER_TX, *MAX_PROCESS_PER_TX); - assert_eq!( - super::MAX_PROCESS_PER_TX_USING_LOOKUP, - *MAX_PROCESS_PER_TX_USING_LOOKUP - ); - } - - #[test] - fn test_max_close_per_tx() { - test_utils::init_test_logger(); - assert_eq!(super::MAX_CLOSE_PER_TX, *MAX_CLOSE_PER_TX); - assert_eq!( - super::MAX_CLOSE_PER_TX_USING_LOOKUP, - *MAX_CLOSE_PER_TX_USING_LOOKUP - ); - } - - #[test] - fn test_max_process_and_closes_per_tx() { - test_utils::init_test_logger(); - assert_eq!( - super::MAX_PROCESS_AND_CLOSE_PER_TX, - *MAX_PROCESS_AND_CLOSE_PER_TX - ); - assert_eq!( - super::MAX_PROCESS_AND_CLOSE_PER_TX_USING_LOOKUP, - *MAX_PROCESS_AND_CLOSE_PER_TX_USING_LOOKUP - ); - } - - #[test] - fn test_max_finalize_per_tx() { - test_utils::init_test_logger(); - assert_eq!(super::MAX_FINALIZE_PER_TX, *MAX_FINALIZE_PER_TX); - assert_eq!( - super::MAX_FINALIZE_PER_TX_USING_LOOKUP, - *MAX_FINALIZE_PER_TX_USING_LOOKUP - ); - } - - #[test] - fn test_max_undelegate_per_tx() { - test_utils::init_test_logger(); - assert_eq!(super::MAX_UNDELEGATE_PER_TX, *MAX_UNDELEGATE_PER_TX); - assert_eq!( - super::MAX_UNDELEGATE_PER_TX_USING_LOOKUP, - *MAX_UNDELEGATE_PER_TX_USING_LOOKUP - ); - } - - // ----------------- - // Process Commitables using Args - // ----------------- - #[test] - fn test_log_commit_args_ix_sizes() { - test_utils::init_test_logger(); - // This test is used to investigate the size of the transaction related to - // the amount of committed accounts and their data size. - fn run(auth: &Keypair, ixs: usize) { - let mut tx_lines = vec![]; - use TransactionOpts::*; - for tx_opts in [NoLookupTable, UseLookupTable] { - let mut tx_sizes = vec![]; - for size in [0, 10, 20, 50, 100, 200, 500, 1024] { - let ixs = (0..ixs) - .map(|_| make_ix(auth, size)) - .collect::>(); - - let tx_size = - encoded_tx_size(auth, &ixs, &tx_opts).unwrap(); - tx_sizes.push((size, tx_size)); - } - tx_lines.push(tx_sizes); - } - let sizes = tx_lines - .into_iter() - .map(|line| { - line.into_iter() - .map(|(size, len)| format!("{:4}:{:5}", size, len)) - .collect::>() - .join("|") - }) - .collect::>() - .join("\n"); - info!(instruction_count = ixs, sizes = %sizes, "Transaction size report"); - } - fn make_ix(auth: &Keypair, data_size: usize) -> Instruction { - let data = vec![1; data_size]; - let args = CommitStateArgs { - data, - ..CommitStateArgs::default() - }; - dlp_api::instruction_builder::commit_state( - auth.pubkey(), - Pubkey::new_unique(), - Pubkey::new_unique(), - args, - ) - } - - let auth = &Keypair::new(); - run(auth, 0); - run(auth, 1); - run(auth, 2); - run(auth, 5); - run(auth, 8); - run(auth, 10); - run(auth, 15); - run(auth, 20); - /* - 0 ixs: - 0: 184| 10: 184| 20: 184| 50: 184| 100: 184| 200: 184| 500: 184|1024: 184 - 0: 184| 10: 184| 20: 184| 50: 184| 100: 184| 200: 184| 500: 184|1024: 184 - 1 ixs: - 0: 620| 10: 636| 20: 648| 50: 688| 100: 756| 200: 888| 500: 1288|1024: 1988 - 0: 336| 10: 348| 20: 364| 50: 404| 100: 472| 200: 604| 500: 1004|1024: 1704 - 2 ixs: - 0: 932| 10: 960| 20: 984| 50: 1064| 100: 1200| 200: 1468| 500: 2268|1024: 3664 - 0: 400| 10: 424| 20: 452| 50: 532| 100: 668| 200: 936| 500: 1736|1024: 3132 - 5 ixs: - 0: 1864| 10: 1932| 20: 1996| 50: 2196| 100: 2536| 200: 3204| 500: 5204|1024: 8696 - 0: 588| 10: 652| 20: 720| 50: 920| 100: 1260| 200: 1928| 500: 3928|1024: 7420 - 8 ixs: - 0: 2796| 10: 2904| 20: 3008| 50: 3328| 100: 3872| 200: 4940| 500: 8140|1024:13728 - 0: 776| 10: 880| 20: 988| 50: 1308| 100: 1852| 200: 2920| 500: 6120|1024:11708 - 10 ixs: - 0: 3416| 10: 3552| 20: 3684| 50: 4084| 100: 4764| 200: 6096| 500:10096|1024:17084 - 0: 900| 10: 1032| 20: 1168| 50: 1568| 100: 2248| 200: 3580| 500: 7580|1024:14568 - 15 ixs: - 0: 4972| 10: 5172| 20: 5372| 50: 5972| 100: 6992| 200: 8992| 500:14992|1024:25472 - 0: 1212| 10: 1412| 20: 1612| 50: 2212| 100: 3232| 200: 5232| 500:11232|1024:21712 - 20 ixs: - 0: 6524| 10: 6792| 20: 7056| 50: 7856| 100: 9216| 200:11884| 500:19884|1024:33856 - 0: 1528| 10: 1792| 20: 2060| 50: 2860| 100: 4220| 200: 6888| 500:14888|1024:28860 - - Legend: - - x ixs: - data size/ix: encoded size | ... - data size/ix: encoded size | ... (using lookup tables) - - Given that max transaction size is 1644 bytes, we can see that the max data size is: - - - 1 ixs: slightly larger than 500 bytes - - 2 ixs: slightly larger than 200 bytes - - 5 ixs: slightly larger than 100 bytes - - 8 ixs: slightly larger than 50 bytes - - 10 ixs: slightly larger than 20 bytes - - 15 ixs: slightly larger than 10 bytes - - 20 ixs: no data supported (only lamport changes) - - Also it is clear that using a lookup table makes a huge difference especially if we commit - lots of different accounts. - */ - } - - // ----------------- - // Process Commitables and Close Buffers - // ----------------- - lazy_static! { - pub(crate) static ref MAX_PROCESS_PER_TX: u8 = { - max_chunks_per_transaction("Max process per tx", |auth_pubkey| { - let pubkey = Pubkey::new_unique(); - let delegated_account_owner = Pubkey::new_unique(); - let buffer_pda = Pubkey::new_unique(); - let commit_args = CommitStateFromBufferArgs::default(); - vec![process_commits_ix( - auth_pubkey, - &pubkey, - &delegated_account_owner, - &buffer_pda, - commit_args, - )] - }) - }; - pub(crate) static ref MAX_PROCESS_PER_TX_USING_LOOKUP: u8 = { - max_chunks_per_transaction_using_lookup_table( - "Max process per tx using lookup", - |auth_pubkey, committee, delegated_account_owner| { - let buffer_pda = Pubkey::new_unique(); - let commit_args = CommitStateFromBufferArgs::default(); - vec![process_commits_ix( - auth_pubkey, - &committee, - &delegated_account_owner, - &buffer_pda, - commit_args, - )] - }, - None, - ) - }; - pub(crate) static ref MAX_CLOSE_PER_TX: u8 = { - let commit_id = 0; - max_chunks_per_transaction("Max close per tx", |auth_pubkey| { - let pubkey = Pubkey::new_unique(); - vec![close_buffers_ix(auth_pubkey, &pubkey, commit_id)] - }) - }; - pub(crate) static ref MAX_CLOSE_PER_TX_USING_LOOKUP: u8 = { - let commit_id = 0; - max_chunks_per_transaction_using_lookup_table( - "Max close per tx using lookup", - |auth_pubkey, committee, _| { - vec![close_buffers_ix(auth_pubkey, &committee, commit_id)] - }, - None, - ) - }; - pub(crate) static ref MAX_PROCESS_AND_CLOSE_PER_TX: u8 = { - let commit_id = 0; - max_chunks_per_transaction( - "Max process and close per tx", - |auth_pubkey| { - let pubkey = Pubkey::new_unique(); - let delegated_account_owner = Pubkey::new_unique(); - let buffer_pda = Pubkey::new_unique(); - let commit_args = CommitStateFromBufferArgs::default(); - process_and_close_ixs( - auth_pubkey, - &pubkey, - &delegated_account_owner, - &buffer_pda, - commit_id, - commit_args, - ) - }, - ) - }; - pub(crate) static ref MAX_PROCESS_AND_CLOSE_PER_TX_USING_LOOKUP: u8 = { - let commit_id = 0; - max_chunks_per_transaction_using_lookup_table( - "Max process and close per tx using lookup", - |auth_pubkey, committee, delegated_account_owner| { - let commit_args = CommitStateFromBufferArgs::default(); - let buffer_pda = Pubkey::new_unique(); - process_and_close_ixs( - auth_pubkey, - &committee, - &delegated_account_owner, - &buffer_pda, - commit_id, - commit_args, - ) - }, - None, - ) - }; - } - - // ----------------- - // Finalize - // ----------------- - lazy_static! { - pub(crate) static ref MAX_FINALIZE_PER_TX: u8 = { - max_chunks_per_transaction("Max finalize per tx", |auth_pubkey| { - let pubkey = Pubkey::new_unique(); - vec![finalize_ix(auth_pubkey, &pubkey)] - }) - }; - pub(crate) static ref MAX_FINALIZE_PER_TX_USING_LOOKUP: u8 = { - max_chunks_per_transaction_using_lookup_table( - "Max finalize per tx using lookup", - |auth_pubkey, committee, _| { - vec![finalize_ix(auth_pubkey, &committee)] - }, - Some(40), - ) - }; - } - - // ----------------- - // Undelegate - // ----------------- - lazy_static! { - pub(crate) static ref MAX_UNDELEGATE_PER_TX: u8 = { - max_chunks_per_transaction("Max undelegate per tx", |auth_pubkey| { - let pubkey = Pubkey::new_unique(); - let owner_program = Pubkey::new_unique(); - vec![dlp_api::instruction_builder::undelegate( - auth_pubkey, - pubkey, - owner_program, - auth_pubkey, - )] - }) - }; - pub(crate) static ref MAX_UNDELEGATE_PER_TX_USING_LOOKUP: u8 = { - max_chunks_per_transaction_using_lookup_table( - "Max undelegate per tx using lookup", - |auth_pubkey, committee, owner_program| { - vec![dlp_api::instruction_builder::undelegate( - auth_pubkey, - committee, - owner_program, - auth_pubkey, - )] - }, - None, - ) - }; - } - - // ----------------- - // Max Chunks Per Transaction - // ----------------- - - fn max_chunks_per_transaction Vec>( - label: &str, - create_ixs: F, - ) -> u8 { - info!(test_label = label, "Starting transaction chunking test"); - - let auth = Keypair::new(); - let auth_pubkey = auth.pubkey(); - // NOTE: the size of the budget instructions is always the same, no matter - // which budget we provide - let mut ixs = ComputeBudget::Process(Budget::default()).instructions(1); - let mut chunks = 0_u8; - loop { - ixs.extend(create_ixs(auth_pubkey)); - chunks += 1; - - // SAFETY: runs statically - let versioned_msg = - Message::try_compile(&auth_pubkey, &ixs, &[], Hash::default()) - .unwrap(); - // SAFETY: runs statically - let versioned_tx = VersionedTransaction::try_new( - VersionedMessage::V0(versioned_msg), - &[&auth], - ) - .unwrap(); - let encoded = serialize_and_encode_base64(&versioned_tx); - info!( - chunks = chunks, - size_bytes = encoded.len(), - "Transaction size measured" - ); - if encoded.len() > MAX_ENCODED_TRANSACTION_SIZE { - return chunks - 1; - } - } - } - - fn extend_lookup_table( - lookup_table: &mut AddressLookupTableAccount, - auth_pubkey: Pubkey, - committee: Pubkey, - owner: Option<&Pubkey>, - ) { - let keys = provide_committee_pubkeys(&committee, owner) - .into_iter() - .chain(provide_common_pubkeys(&auth_pubkey)) - .chain(lookup_table.addresses.iter().cloned()) - .collect::>(); - lookup_table.addresses = keys.into_iter().collect(); - assert!( - lookup_table.addresses.len() <= LOOKUP_TABLE_MAX_ADDRESSES, - "Lookup table has too many ({}) addresses", - lookup_table.addresses.len() - ); - } - - fn max_chunks_per_transaction_using_lookup_table< - FI: Fn(Pubkey, Pubkey, Pubkey) -> Vec, - >( - label: &str, - create_ixs: FI, - start_at: Option, - ) -> u8 { - info!(test_label = label, start_at = ?start_at, "Starting lookup table transaction test"); - let auth = Keypair::new(); - let auth_pubkey = auth.pubkey(); - let mut ixs = ComputeBudget::Process(Budget::default()).instructions(1); - let mut chunks = start_at.unwrap_or_default(); - let mut lookup_table = AddressLookupTableAccount { - key: Pubkey::default(), - addresses: vec![], - }; - // If we start at specific chunk size let's prep the ixs and assume - // we are using the same addresses to avoid blowing out the lookup table - if chunks > 0 { - let committee = Pubkey::new_unique(); - let owner_program = Pubkey::new_unique(); - extend_lookup_table( - &mut lookup_table, - auth_pubkey, - committee, - Some(&owner_program), - ); - for _ in 0..chunks { - ixs.extend(create_ixs(auth_pubkey, committee, owner_program)); - } - } - loop { - let committee = Pubkey::new_unique(); - let owner_program = Pubkey::new_unique(); - ixs.extend(create_ixs(auth_pubkey, committee, owner_program)); - - chunks += 1; - extend_lookup_table( - &mut lookup_table, - auth_pubkey, - committee, - Some(&owner_program), - ); - - // SAFETY: runs statically - let versioned_msg = Message::try_compile( - &auth_pubkey, - &ixs, - &[lookup_table.clone()], - Hash::default(), - ) - .unwrap(); - // SAFETY: runs statically - let versioned_tx = VersionedTransaction::try_new( - VersionedMessage::V0(versioned_msg), - &[&auth], - ) - .unwrap(); - let encoded = serialize_and_encode_base64(&versioned_tx); - info!( - chunks = chunks, - size_bytes = encoded.len(), - "Transaction size measured with lookup table" - ); - if encoded.len() > MAX_ENCODED_TRANSACTION_SIZE { - return chunks - 1; - } - } - } -} diff --git a/test-integration/Cargo.lock b/test-integration/Cargo.lock index 2cfd96d55..e5b712eaf 100644 --- a/test-integration/Cargo.lock +++ b/test-integration/Cargo.lock @@ -4182,20 +4182,15 @@ dependencies = [ "async-trait", "bincode", "lru 0.16.2", - "magicblock-accounts-db", "magicblock-chainlink", - "magicblock-committor-service", - "magicblock-config", "magicblock-core", "magicblock-ledger", "magicblock-magic-program-api 0.11.2", "magicblock-program", - "magicblock-rpc-client", "rand 0.9.2", "solana-account", "solana-hash 3.1.0", "solana-instruction", - "solana-loader-v3-interface", "solana-loader-v4-interface", "solana-pubkey 3.0.0", "solana-sdk-ids", @@ -4213,21 +4208,11 @@ name = "magicblock-accounts" version = "0.11.2" dependencies = [ "async-trait", - "magicblock-account-cloner", - "magicblock-accounts-db", - "magicblock-chainlink", "magicblock-committor-service", - "magicblock-core", - "magicblock-metrics", - "magicblock-program", - "solana-hash 3.1.0", "solana-pubkey 3.0.0", - "solana-transaction", "solana-transaction-error", "thiserror 2.0.18", "tokio", - "tokio-util 0.7.17", - "tracing", "url", ] @@ -4454,6 +4439,9 @@ dependencies = [ "borsh", "futures-util", "lru 0.16.2", + "magicblock-account-cloner", + "magicblock-accounts-db", + "magicblock-chainlink", "magicblock-committor-program", "magicblock-core", "magicblock-delegation-program-api 0.3.0 (git+https://github.com/magicblock-labs/delegation-program.git?rev=25386a7c1d406d06b8d07a4d5b0fd37d5e74213b)", @@ -4464,7 +4452,6 @@ dependencies = [ "rusqlite", "solana-account", "solana-account-decoder", - "solana-address-lookup-table-interface", "solana-commitment-config", "solana-compute-budget-interface", "solana-hash 3.1.0", @@ -4477,11 +4464,9 @@ dependencies = [ "solana-rpc-client-api", "solana-signature", "solana-signer", - "solana-system-program", "solana-transaction", "solana-transaction-error", "solana-transaction-status-client-types", - "static_assertions", "thiserror 2.0.18", "tokio", "tokio-util 0.7.17", diff --git a/test-integration/test-committor-service/Cargo.toml b/test-integration/test-committor-service/Cargo.toml index 53e29921d..6035b5ec8 100644 --- a/test-integration/test-committor-service/Cargo.toml +++ b/test-integration/test-committor-service/Cargo.toml @@ -12,9 +12,7 @@ magicblock-core = { workspace = true } magicblock-committor-program = { workspace = true, features = [ "no-entrypoint", ] } -magicblock-committor-service = { workspace = true, features = [ - "dev-context-only-utils", -] } +magicblock-committor-service = { workspace = true } magicblock-delegation-program-api = { workspace = true } magicblock-program = { workspace = true } magicblock-rpc-client = { workspace = true } diff --git a/test-integration/test-committor-service/tests/test_ix_commit_local.rs b/test-integration/test-committor-service/tests/test_ix_commit_local.rs index d18d0575f..14526a18c 100644 --- a/test-integration/test-committor-service/tests/test_ix_commit_local.rs +++ b/test-integration/test-committor-service/tests/test_ix_commit_local.rs @@ -1,23 +1,18 @@ -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, sync::Arc}; use borsh::to_vec; use magicblock_committor_service::{ + committor_processor::CommittorProcessor, config::ChainConfig, intent_executor::{error::IntentExecutorError, ExecutionOutput}, persist::CommitStrategy, - service_ext::{BaseIntentCommittorExt, CommittorServiceExt}, - BaseIntentCommittor, CommittorService, ComputeBudgetConfig, + ComputeBudgetConfig, }; use magicblock_core::intent::CommittedAccount; use magicblock_program::magic_scheduled_base_intent::{ CommitAndUndelegate, CommitType, MagicBaseIntent, MagicIntentBundle, ScheduledIntentBundle, UndelegateType, }; -use magicblock_rpc_client::MagicblockRpcClient; use program_flexi_counter::state::FlexiCounter; use solana_account::{Account, ReadableAccount}; use solana_commitment_config::CommitmentConfig; @@ -245,14 +240,15 @@ async fn commit_single_account( fund_validator_auth_and_ensure_validator_fees_vault(&validator_auth).await; // Run each test with and without finalizing - let service = CommittorService::try_start( - validator_auth.insecure_clone(), - ":memory:", - ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), - common::MockActionsCallbackExecutor::default(), - ) - .unwrap(); - let service = CommittorServiceExt::new(Arc::new(service)); + let processor = Arc::new( + CommittorProcessor::try_new( + validator_auth.insecure_clone(), + ":memory:", + ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + common::MockActionsCallbackExecutor::default(), + ) + .unwrap(), + ); let counter_auth = Keypair::new(); let (pubkey, mut account) = @@ -308,7 +304,7 @@ async fn commit_single_account( // We should always be able to Commit & Finalize 1 account either with Args or Buffers ix_commit_local( - service, + processor, vec![intent], expect_strategies(&[(expected_strategy, 1)]), program_flexi_counter::ID, @@ -327,14 +323,15 @@ async fn commit_book_order_account( fund_validator_auth_and_ensure_validator_fees_vault(&validator_auth).await; // Run each test with and without finalizing - let service = CommittorService::try_start( - validator_auth.insecure_clone(), - ":memory:", - ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), - common::MockActionsCallbackExecutor::default(), - ) - .unwrap(); - let service = CommittorServiceExt::new(Arc::new(service)); + let processor = Arc::new( + CommittorProcessor::try_new( + validator_auth.insecure_clone(), + ":memory:", + ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + common::MockActionsCallbackExecutor::default(), + ) + .unwrap(), + ); let payer = Keypair::new(); let (order_book_pk, mut order_book_ac) = @@ -387,7 +384,7 @@ async fn commit_book_order_account( }; ix_commit_local( - service, + processor, vec![intent], expect_strategies(&[(expected_strategy, 1)]), program_schedulecommit::ID, @@ -800,14 +797,15 @@ async fn commit_multiple_accounts( let validator_auth = ensure_validator_authority(); fund_validator_auth_and_ensure_validator_fees_vault(&validator_auth).await; - let service = CommittorService::try_start( - validator_auth.insecure_clone(), - ":memory:", - ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), - common::MockActionsCallbackExecutor::default(), - ) - .unwrap(); - let service = CommittorServiceExt::new(Arc::new(service)); + let processor = Arc::new( + CommittorProcessor::try_new( + validator_auth.insecure_clone(), + ":memory:", + ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + common::MockActionsCallbackExecutor::default(), + ) + .unwrap(), + ); // Create bundles of committed accounts let bundles_of_committees = create_bundles(bundle_size, bytess).await; @@ -850,7 +848,7 @@ async fn commit_multiple_accounts( .collect::>(); ix_commit_local( - service, + processor, intents, expected_strategies, program_flexi_counter::ID, @@ -869,14 +867,15 @@ async fn execute_intent_bundle( let validator_auth = ensure_validator_authority(); fund_validator_auth_and_ensure_validator_fees_vault(&validator_auth).await; - let service = CommittorService::try_start( - validator_auth.insecure_clone(), - ":memory:", - ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), - common::MockActionsCallbackExecutor::default(), - ) - .unwrap(); - let service = CommittorServiceExt::new(Arc::new(service)); + let processor = Arc::new( + CommittorProcessor::try_new( + validator_auth.insecure_clone(), + ":memory:", + ChainConfig::local(ComputeBudgetConfig::new(1_000_000)), + common::MockActionsCallbackExecutor::default(), + ) + .unwrap(), + ); // Create bundles of committed accounts let to_commit = create_and_delegate_accounts(bytess_to_commit); @@ -911,7 +910,7 @@ async fn execute_intent_bundle( intent_bundle, }; ix_commit_local( - service, + processor, vec![intent_bundle], expected_strategies, program_flexi_counter::id(), @@ -947,13 +946,13 @@ async fn execute_intent_bundle( // Test Executor // ----------------- async fn ix_commit_local( - service: CommittorServiceExt, + processor: Arc, intent_bundles: Vec, expected_strategies: ExpectedStrategies, program_id: Pubkey, ) { - let execution_outputs = service - .schedule_intent_bundles_waiting(intent_bundles.clone()) + let execution_outputs = processor + .execute_intent_bundles(intent_bundles.clone()) .await .unwrap() .into_iter() @@ -961,7 +960,6 @@ async fn ix_commit_local( // Assert that all completed assert_eq!(execution_outputs.len(), intent_bundles.len()); - service.release_common_pubkeys().await.unwrap(); let rpc_client = RpcClient::new("http://localhost:7799".to_string()); let mut strategies = ExpectedStrategies::new(); @@ -1061,11 +1059,7 @@ async fn ix_commit_local( }) .collect(); - let statuses = service - .get_commit_statuses(base_intent.id) - .await - .unwrap() - .unwrap(); + let statuses = processor.get_commit_statuses(base_intent.id).unwrap(); debug!( "{}", statuses @@ -1121,76 +1115,6 @@ async fn ix_commit_local( strategies, expected_strategies, "Strategies used do not match expected ones" ); - - let expect_empty_lookup_tables = false; - // changeset.accounts.len() == changeset.accounts_to_undelegate.len(); - if expect_empty_lookup_tables { - let lookup_tables = service.get_lookup_tables().await.unwrap(); - assert!(lookup_tables.active.is_empty()); - - if utils::TEST_TABLE_CLOSE { - let mut closing_tables = lookup_tables.released; - - // Tables deactivate after ~2.5 mins (150secs), but most times - // it takes a lot longer so we allow double the time - const MAX_TIME_TO_CLOSE: Duration = Duration::from_secs(300); - info!( - "Waiting for lookup tables close for up to {} secs", - MAX_TIME_TO_CLOSE.as_secs() - ); - - let start = Instant::now(); - let rpc_client = MagicblockRpcClient::from(rpc_client); - loop { - let accs = rpc_client - .get_multiple_accounts_with_commitment( - &closing_tables, - CommitmentConfig::confirmed(), - None, - ) - .await - .unwrap(); - let closed_pubkeys = accs - .into_iter() - .zip(closing_tables.iter()) - .filter_map(|(acc, pubkey)| { - if acc.is_none() { - Some(*pubkey) - } else { - None - } - }) - .collect::>(); - closing_tables.retain(|pubkey| { - if closed_pubkeys.contains(pubkey) { - debug!("Table {} closed", pubkey); - false - } else { - true - } - }); - if closing_tables.is_empty() { - break; - } - debug!( - "Still waiting for {} released table(s) to close", - closing_tables.len() - ); - if Instant::now() - start > MAX_TIME_TO_CLOSE { - panic!( - "Timed out waiting for tables close after {} seconds. Still open: {}", - MAX_TIME_TO_CLOSE.as_secs(), - closing_tables - .iter() - .map(|x| x.to_string()) - .collect::>() - .join(", ") - ); - } - utils::sleep_millis(10_000).await; - } - } - } } fn validate_account(