From a97fd42716b80d48fd3b666fa6fb69687e99cb9c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 2 Aug 2023 13:04:00 +0300 Subject: [PATCH 01/27] chainHead/api: Add `chain_head_unstable_continue` method Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/api.rs | 13 +++++++++++++ client/rpc-spec-v2/src/chain_head/chain_head.rs | 8 ++++++++ 2 files changed, 21 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/api.rs b/client/rpc-spec-v2/src/chain_head/api.rs index c002b75efe03..67905d801dd0 100644 --- a/client/rpc-spec-v2/src/chain_head/api.rs +++ b/client/rpc-spec-v2/src/chain_head/api.rs @@ -119,4 +119,17 @@ pub trait ChainHeadApi { /// This method is unstable and subject to change in the future. #[method(name = "chainHead_unstable_unpin", blocking)] fn chain_head_unstable_unpin(&self, follow_subscription: String, hash: Hash) -> RpcResult<()>; + + /// Resumes a storage fetch started with `chainHead_storage` after it has generated an + /// `operationWaitingForContinue` event. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_continue", blocking)] + fn chain_head_unstable_continue( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()>; } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 79cf251f1806..e74aa61c916a 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -443,4 +443,12 @@ where Err(_) => Err(ChainHeadRpcError::InvalidBlock.into()), } } + + fn chain_head_unstable_continue( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()> { + Ok(()) + } } From fe3404c270bda2ebbc9f62834358a64ad48b5a9c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 9 Aug 2023 19:34:10 +0300 Subject: [PATCH 02/27] chainHead/subscriptions: Register operations for pagination Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 + client/rpc-spec-v2/Cargo.toml | 1 + .../src/chain_head/subscription/inner.rs | 97 ++++++++++++++++++- 3 files changed, 98 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3dd5bc52b71c..0267f2f54b52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9618,6 +9618,7 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "assert_matches", + "async-channel", "futures", "futures-util", "hex", diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index 599596777b7b..c7352b111e0d 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -36,6 +36,7 @@ tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = "0.4.17" futures-util = { version = "0.3.19", default-features = false } +async-channel = "1.8.0" [dev-dependencies] serde_json = "1.0" diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 9f42be4a2f7f..42004311bb20 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -22,7 +22,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, - sync::Arc, + sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, }; @@ -163,6 +163,101 @@ impl PermitOperations { } } +/// The state of one operation. +#[derive(Clone)] +struct OperationState { + /// True if the `chainHead` generated `waitingForContinue` event. + requested_continue: Arc, + /// Send notifications when the user calls `chainHead_continue` method. + send_continue: async_channel::Sender<()>, +} + +impl OperationState { + /// Returns true if `chainHead_continue` is called after the + /// `waitingForContinue` event was emitted for the associated + /// operation ID. + pub fn submit_continue(&self) -> bool { + // `waitingForContinue` not generated. + if !self.requested_continue.load(std::sync::atomic::Ordering::Acquire) { + return false + } + + // Has enough capacity for 1 message. + self.send_continue.try_send(()).is_ok() + } +} + +struct RegisteredOperation { + /// True if the `chainHead` generated `waitingForContinue` event. + requested_continue: Arc, + /// Receive notifications when the user calls `chainHead_continue` method. + recv_continue: async_channel::Receiver<()>, + /// The operation ID of the request. + operation_id: String, + /// Track the operations ID of this subscription. + operations: Arc>>, +} + +impl RegisteredOperation { + /// Wait until the user calls `chainHead_continue`. + pub async fn wait_for_continue(&self) { + self.requested_continue.store(true, std::sync::atomic::Ordering::Release); + let _ = self.recv_continue.recv().await; + } + + /// Get the operation ID. + pub fn operation_id(&self) -> String { + self.operation_id.clone() + } +} + +impl Drop for RegisteredOperation { + fn drop(&mut self) { + let mut operations = self.operations.lock(); + operations.remove(&self.operation_id); + } +} + +/// The ongoing operations of a subscription. +struct Operations { + /// The next operation ID to be generated. + next_operation_id: usize, + /// Track the operations ID of this subscription. + operations: Arc>>, +} + +impl Operations { + /// Register a new operation. + pub fn register_operation(&mut self) -> RegisteredOperation { + let operation_id = self.next_operation_id(); + + // At most one message can be sent. + let (send_continue, recv_continue) = async_channel::bounded(1); + let requested_continue = Arc::new(AtomicBool::new(false)); + + let state = + OperationState { requested_continue: requested_continue.clone(), send_continue }; + + // Cloned operations for removing the current ID on drop. + let operations = self.operations.clone(); + operations.lock().insert(operation_id.clone(), state); + + RegisteredOperation { requested_continue, operation_id, recv_continue, operations } + } + + /// Get the operation ID. + pub fn get_operation(&mut self, id: &str) -> Option { + self.operations.lock().get(id).map(|state| state.clone()) + } + + /// Generate the next operation ID for this subscription. + fn next_operation_id(&mut self) -> String { + let op_id = self.next_operation_id; + self.next_operation_id += 1; + op_id.to_string() + } +} + struct BlockState { /// The state machine of this block. state_machine: BlockStateMachine, From 72b26c3572289b5dad88cc0ad11bc59b80f08233 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 9 Aug 2023 20:20:19 +0300 Subject: [PATCH 03/27] chainHead/subscriptions: Merge limits with registered operation Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 84 ++++++++++--------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 42004311bb20..4c166cb3b50c 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -22,7 +22,7 @@ use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, - sync::{atomic::AtomicBool, Arc}, + sync::{atomic::AtomicBool, Arc, Mutex}, time::{Duration, Instant}, }; @@ -187,6 +187,7 @@ impl OperationState { } } +/// The registered operation passed to the `chainHead` methods. struct RegisteredOperation { /// True if the `chainHead` generated `waitingForContinue` event. requested_continue: Arc, @@ -196,6 +197,8 @@ struct RegisteredOperation { operation_id: String, /// Track the operations ID of this subscription. operations: Arc>>, + /// Permit a number of items to be executed by this operation. + permit: PermitOperations, } impl RegisteredOperation { @@ -209,6 +212,13 @@ impl RegisteredOperation { pub fn operation_id(&self) -> String { self.operation_id.clone() } + + /// Returns the number of reserved elements for this permit. + /// + /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. + pub fn num_reserved(&self) -> usize { + self.permit.num_ops + } } impl Drop for RegisteredOperation { @@ -222,13 +232,26 @@ impl Drop for RegisteredOperation { struct Operations { /// The next operation ID to be generated. next_operation_id: usize, + /// Limit the number of ongoing operations. + limits: LimitOperations, /// Track the operations ID of this subscription. operations: Arc>>, } impl Operations { + /// Constructs a new [`Operations`]. + fn new(max_operations: usize) -> Self { + Operations { + next_operation_id: 0, + limits: LimitOperations::new(max_operations), + operations: Default::default(), + } + } + /// Register a new operation. - pub fn register_operation(&mut self) -> RegisteredOperation { + pub fn register_operation(&mut self, to_reserve: usize) -> Option { + let permit = self.limits.reserve_at_most(to_reserve)?; + let operation_id = self.next_operation_id(); // At most one message can be sent. @@ -242,7 +265,13 @@ impl Operations { let operations = self.operations.clone(); operations.lock().insert(operation_id.clone(), state); - RegisteredOperation { requested_continue, operation_id, recv_continue, operations } + Some(RegisteredOperation { + requested_continue, + operation_id, + recv_continue, + operations, + permit, + }) } /// Get the operation ID. @@ -275,10 +304,8 @@ struct SubscriptionState { /// /// This object is cloned between methods. response_sender: TracingUnboundedSender>, - /// Limit the number of ongoing operations. - limits: LimitOperations, - /// The next operation ID. - next_operation_id: usize, + /// The ongoing operations of a subscription. + operations: Operations, /// Track the block hashes available for this subscription. /// /// This implementation assumes: @@ -391,18 +418,11 @@ impl SubscriptionState { timestamp } - /// Generate the next operation ID for this subscription. - fn next_operation_id(&mut self) -> usize { - let op_id = self.next_operation_id; - self.next_operation_id = self.next_operation_id.wrapping_add(1); - op_id - } - - /// Reserves capacity to execute at least one operation and at most the requested items. + /// Register a new operation. /// - /// For more details see [`PermitOperations`]. - fn reserve_at_most(&self, to_reserve: usize) -> Option { - self.limits.reserve_at_most(to_reserve) + /// The registered operation can execute at least one item and at most the requested items. + fn register_operation(&mut self, to_reserve: usize) -> Option { + self.operations.register_operation(to_reserve) } } @@ -413,8 +433,7 @@ pub struct BlockGuard> { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, - operation_id: String, - permit_operations: PermitOperations, + operation: RegisteredOperation, backend: Arc, } @@ -432,22 +451,14 @@ impl> BlockGuard { hash: Block::Hash, with_runtime: bool, response_sender: TracingUnboundedSender>, - operation_id: usize, - permit_operations: PermitOperations, + operation: RegisteredOperation, backend: Arc, ) -> Result { backend .pin_block(hash) .map_err(|err| SubscriptionManagementError::Custom(err.to_string()))?; - Ok(Self { - hash, - with_runtime, - response_sender, - operation_id: operation_id.to_string(), - permit_operations, - backend, - }) + Ok(Self { hash, with_runtime, response_sender, operation, backend }) } /// The `with_runtime` flag of the subscription. @@ -462,14 +473,14 @@ impl> BlockGuard { /// The operation ID of this method. pub fn operation_id(&self) -> String { - self.operation_id.clone() + self.operation.operation_id() } /// Returns the number of reserved elements for this permit. /// /// This can be smaller than the number of items requested. pub fn num_reserved(&self) -> usize { - self.permit_operations.num_reserved() + self.operation.num_reserved() } } @@ -540,9 +551,8 @@ impl> SubscriptionsInner { with_runtime, tx_stop: Some(tx_stop), response_sender, - limits: LimitOperations::new(self.max_ongoing_operations), - next_operation_id: 0, blocks: Default::default(), + operations: Operations::new(self.max_ongoing_operations), }; entry.insert(state); @@ -726,18 +736,16 @@ impl> SubscriptionsInner { return Err(SubscriptionManagementError::BlockHashAbsent) } - let Some(permit_operations) = sub.reserve_at_most(to_reserve) else { + let Some(operation) = sub.register_operation(to_reserve) else { // Error when the server cannot execute at least one operation. return Err(SubscriptionManagementError::ExceededLimits) }; - let operation_id = sub.next_operation_id(); BlockGuard::new( hash, sub.with_runtime, sub.response_sender.clone(), - operation_id, - permit_operations, + operation, self.backend.clone(), ) } From e28d9825d8a1abca3daa57c8650638e9aacd294c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 16:51:11 +0300 Subject: [PATCH 04/27] chainHead/subscriptions: Expose the operation state Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 25 +++++++++++++++---- .../src/chain_head/subscription/mod.rs | 8 ++++++ 2 files changed, 28 insertions(+), 5 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 4c166cb3b50c..c7e0aaaa7441 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -165,7 +165,7 @@ impl PermitOperations { /// The state of one operation. #[derive(Clone)] -struct OperationState { +pub struct OperationState { /// True if the `chainHead` generated `waitingForContinue` event. requested_continue: Arc, /// Send notifications when the user calls `chainHead_continue` method. @@ -209,14 +209,14 @@ impl RegisteredOperation { } /// Get the operation ID. - pub fn operation_id(&self) -> String { + fn operation_id(&self) -> String { self.operation_id.clone() } /// Returns the number of reserved elements for this permit. /// /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. - pub fn num_reserved(&self) -> usize { + fn num_reserved(&self) -> usize { self.permit.num_ops } } @@ -274,8 +274,8 @@ impl Operations { }) } - /// Get the operation ID. - pub fn get_operation(&mut self, id: &str) -> Option { + /// Get the associated operation state with the ID. + pub fn get_operation(&self, id: &str) -> Option { self.operations.lock().get(id).map(|state| state.clone()) } @@ -424,6 +424,11 @@ impl SubscriptionState { fn register_operation(&mut self, to_reserve: usize) -> Option { self.operations.register_operation(to_reserve) } + + /// Get the associated operation state with the ID. + pub fn get_operation(&self, id: &str) -> Option { + self.operations.get_operation(id) + } } /// Keeps a specific block pinned while the handle is alive. @@ -482,6 +487,11 @@ impl> BlockGuard { pub fn num_reserved(&self) -> usize { self.operation.num_reserved() } + + /// Wait until the user calls `chainHead_continue`. + pub async fn wait_for_continue(&self) { + self.operation.wait_for_continue().await + } } impl> Drop for BlockGuard { @@ -749,6 +759,11 @@ impl> SubscriptionsInner { self.backend.clone(), ) } + + pub fn get_operation(&mut self, sub_id: &str, id: &str) -> Option { + let state = self.subs.get(sub_id)?; + state.get_operation(id) + } } #[cfg(test)] diff --git a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs index 39618ecfc1b3..b25b1a4913b4 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/mod.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/mod.rs @@ -25,6 +25,8 @@ mod error; mod inner; use self::inner::SubscriptionsInner; + +pub use self::inner::OperationState; pub use error::SubscriptionManagementError; pub use inner::{BlockGuard, InsertedSubscriptionData}; @@ -126,4 +128,10 @@ impl> SubscriptionManagement { let mut inner = self.inner.write(); inner.lock_block(sub_id, hash, to_reserve) } + + /// Get the operation state. + pub fn get_operation(&self, sub_id: &str, operation_id: &str) -> Option { + let mut inner = self.inner.write(); + inner.get_operation(sub_id, operation_id) + } } From 1d717dfd8ad8e072d3391d5f6ae982d1dc0840ab Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 16:52:13 +0300 Subject: [PATCH 05/27] chain_head/storage: Generate WaitingForContinue event Signed-off-by: Alexandru Vasile --- .../src/chain_head/chain_head_storage.rs | 165 +++++++++++------- 1 file changed, 104 insertions(+), 61 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 393e4489c8c0..6d0fb8e17691 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -18,7 +18,7 @@ //! Implementation of the `chainHead_storage` method. -use std::{marker::PhantomData, sync::Arc}; +use std::{collections::VecDeque, marker::PhantomData, sync::Arc}; use sc_client_api::{Backend, ChildInfo, StorageKey, StorageProvider}; use sc_utils::mpsc::TracingUnboundedSender; @@ -39,7 +39,7 @@ use super::{ /// The maximum number of items the `chainHead_storage` can return /// before paginations is required. -const MAX_ITER_ITEMS: usize = 10; +const MAX_ITER_ITEMS: usize = 5; /// The query type of an interation. enum IterQueryType { @@ -53,16 +53,26 @@ enum IterQueryType { pub struct ChainHeadStorage { /// Substrate client. client: Arc, - _phantom: PhantomData<(Block, BE)>, + /// Queue of operations that may require pagination. + iter_operations: VecDeque, + _phandom: PhantomData<(BE, Block)>, } impl ChainHeadStorage { /// Constructs a new [`ChainHeadStorage`]. pub fn new(client: Arc) -> Self { - Self { client, _phantom: PhantomData } + Self { client, iter_operations: VecDeque::new(), _phandom: PhantomData } } } +/// Query to iterate over storage. +struct QueryIter { + /// The next key from which the iteration should continue. + next_key: StorageKey, + /// The type of the query (either value or hash). + ty: IterQueryType, +} + /// Checks if the provided key (main or child key) is valid /// for queries. /// @@ -77,7 +87,7 @@ fn is_key_queryable(key: &[u8]) -> bool { type QueryResult = Result, String>; /// The result of iterating over keys. -type QueryIterResult = Result, String>; +type QueryIterResult = Result<(Vec, Option), String>; impl ChainHeadStorage where @@ -131,58 +141,100 @@ where .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } - /// Handle iterating over (key, value) or (key, hash) pairs. - fn query_storage_iter( + /// Iterate over at most `MAX_ITER_ITEMS` keys. + /// + /// Returns the storage result with a potential next key to resume iteration. + fn query_storage_iter_pagination( &self, + query: QueryIter, hash: Block::Hash, - key: &StorageKey, child_key: Option<&ChildInfo>, - ty: IterQueryType, ) -> QueryIterResult { - let keys_iter = if let Some(child_key) = child_key { - self.client.child_storage_keys(hash, child_key.to_owned(), Some(key), None) + let QueryIter { next_key, ty } = query; + + let mut keys_iter = if let Some(child_key) = child_key { + self.client + .child_storage_keys(hash, child_key.to_owned(), Some(&next_key), None) } else { - self.client.storage_keys(hash, Some(key), None) + self.client.storage_keys(hash, Some(&next_key), None) } - .map_err(|error| error.to_string())?; + .map_err(|err| err.to_string())?; let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); - let mut keys_iter = keys_iter.take(MAX_ITER_ITEMS); - while let Some(key) = keys_iter.next() { + for _ in 0..MAX_ITER_ITEMS { + let Some(key) = keys_iter.next() else { + break + }; + let result = match ty { IterQueryType::Value => self.query_storage_value(hash, &key, child_key), IterQueryType::Hash => self.query_storage_hash(hash, &key, child_key), }?; - if let Some(result) = result { - ret.push(result); + if let Some(value) = result { + ret.push(value); } } - QueryIterResult::Ok(ret) + // Save the next key if any to continue the iteration. + let maybe_next_query = keys_iter.next().map(|next_key| QueryIter { next_key, ty }); + Ok((ret, maybe_next_query)) } - /// Generate the block events for the `chainHead_storage` method. - pub fn generate_events( - &self, + /// Iterate over (key, hash) and (key, value) generating the `WaitingForContinue` event if + /// necessary. + async fn generate_storage_iter_events( + &mut self, block_guard: BlockGuard, hash: Block::Hash, - items: Vec>, child_key: Option, ) { - /// Build and send the opaque error back to the `chainHead_follow` method. - fn send_error( - sender: &TracingUnboundedSender>, - operation_id: String, - error: String, - ) { - let _ = - sender.unbounded_send(FollowEvent::::OperationError(OperationError { - operation_id, - error, - })); + let sender = block_guard.response_sender(); + let operation_id = block_guard.operation_id(); + + while let Some(query) = self.iter_operations.pop_front() { + let result = self.query_storage_iter_pagination(query, hash, child_key.as_ref()); + let (events, maybe_next_query) = match result { + QueryIterResult::Ok(result) => result, + QueryIterResult::Err(error) => { + send_error::(&sender, operation_id.clone(), error.to_string()); + return + }, + }; + + if !events.is_empty() { + // Send back the results of the iteration produced so far. + let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( + OperationStorageItems { operation_id: operation_id.clone(), items: events }, + )); + } + + if let Some(next_query) = maybe_next_query { + let _ = + sender.unbounded_send(FollowEvent::::OperationWaitingForContinue( + OperationId { operation_id: operation_id.clone() }, + )); + block_guard.wait_for_continue().await; + + // Give a chance for the other operations to advance next time. + self.iter_operations.push_back(next_query); + } } + let _ = + sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { + operation_id, + })); + } + + /// Generate the block events for the `chainHead_storage` method. + pub async fn generate_events( + &mut self, + block_guard: BlockGuard, + hash: Block::Hash, + items: Vec>, + child_key: Option, + ) { let sender = block_guard.response_sender(); if let Some(child_key) = child_key.as_ref() { @@ -220,30 +272,12 @@ where return }, }, - StorageQueryType::DescendantsValues => match self.query_storage_iter( - hash, - &item.key, - child_key.as_ref(), - IterQueryType::Value, - ) { - Ok(values) => storage_results.extend(values), - Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); - return - }, - }, - StorageQueryType::DescendantsHashes => match self.query_storage_iter( - hash, - &item.key, - child_key.as_ref(), - IterQueryType::Hash, - ) { - Ok(values) => storage_results.extend(values), - Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); - return - }, - }, + StorageQueryType::DescendantsValues => self + .iter_operations + .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Value }), + StorageQueryType::DescendantsHashes => self + .iter_operations + .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Hash }), _ => continue, }; } @@ -257,9 +291,18 @@ where )); } - let _ = - sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { - operation_id: block_guard.operation_id(), - })); + self.generate_storage_iter_events(block_guard, hash, child_key).await } } + +/// Build and send the opaque error back to the `chainHead_follow` method. +fn send_error( + sender: &TracingUnboundedSender>, + operation_id: String, + error: String, +) { + let _ = sender.unbounded_send(FollowEvent::::OperationError(OperationError { + operation_id, + error, + })); +} From 3bbc9bad14b0fc70ce89a73ce45690b1097fa8d1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 16:52:31 +0300 Subject: [PATCH 06/27] chainHead: Use the continue operation Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index e74aa61c916a..93b6bc99750b 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -349,7 +349,7 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let storage_client = ChainHeadStorage::::new(self.client.clone()); + let mut storage_client = ChainHeadStorage::::new(self.client.clone()); let operation_id = block_guard.operation_id(); // The number of operations we are allowed to execute. @@ -359,7 +359,7 @@ where items.truncate(num_operations); let fut = async move { - storage_client.generate_events(block_guard, hash, items, child_trie); + storage_client.generate_events(block_guard, hash, items, child_trie).await; }; self.executor @@ -449,6 +449,15 @@ where follow_subscription: String, operation_id: String, ) -> RpcResult<()> { - Ok(()) + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { + return Ok(()) + }; + + if !operation.submit_continue() { + // Continue called without generating a `WaitingForContinue` event. + Err(ChainHeadRpcError::InvalidContinue.into()) + } else { + Ok(()) + } } } From 5afe9c0fb724fe11c44b712e2913ca7fa7cf625e Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 18:16:22 +0300 Subject: [PATCH 07/27] chainHead/tests: Adjust testing to the new storage interface Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 12 ++++----- client/rpc-spec-v2/src/chain_head/tests.rs | 26 +++++++++++++------ 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index c7e0aaaa7441..328d5116e1a3 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -876,8 +876,7 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, - next_operation_id: 0, - limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), + operations: Operations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), }; @@ -906,9 +905,8 @@ mod tests { with_runtime: false, tx_stop: None, response_sender, - next_operation_id: 0, - limits: LimitOperations::new(MAX_OPERATIONS_PER_SUB), blocks: Default::default(), + operations: Operations::new(MAX_OPERATIONS_PER_SUB), }; let hash = H256::random(); @@ -1225,12 +1223,12 @@ mod tests { // One operation is reserved. let permit_one = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_one.num_reserved(), 1); + assert_eq!(permit_one.num_ops, 1); // Request 2 operations, however there is capacity only for one. let permit_two = ops.reserve_at_most(2).unwrap(); // Number of reserved permits is smaller than provided. - assert_eq!(permit_two.num_reserved(), 1); + assert_eq!(permit_two.num_ops, 1); // Try to reserve operations when there's no space. let permit = ops.reserve_at_most(1); @@ -1241,6 +1239,6 @@ mod tests { // Can reserve again let permit_three = ops.reserve_at_most(1).unwrap(); - assert_eq!(permit_three.num_reserved(), 1); + assert_eq!(permit_three.num_ops, 1); } } diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 4bda06d3cf01..fe9767a06825 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -743,11 +743,16 @@ async fn get_storage_multi_query_iter() { assert_matches!( get_next_event::>(&mut block_sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && - res.items.len() == 2 && + res.items.len() == 1 && res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value) + res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == key && + res.items[0].result == StorageResultType::Value(expected_value) ); assert_matches!( get_next_event::>(&mut block_sub).await, @@ -788,11 +793,16 @@ async fn get_storage_multi_query_iter() { assert_matches!( get_next_event::>(&mut block_sub).await, FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && - res.items.len() == 2 && + res.items.len() == 1 && + res.items[0].key == key && + res.items[0].result == StorageResultType::Hash(expected_hash) + ); + assert_matches!( + get_next_event::>(&mut block_sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && res.items[0].key == key && - res.items[1].key == key && - res.items[0].result == StorageResultType::Hash(expected_hash) && - res.items[1].result == StorageResultType::Value(expected_value) + res.items[0].result == StorageResultType::Value(expected_value) ); assert_matches!( get_next_event::>(&mut block_sub).await, From c8e8ed58fc6ee40d9725199efd98d40c9e4f04b6 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 18:25:13 +0300 Subject: [PATCH 08/27] chainHead/config: Make pagination limit configurable Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 17 +++++++++++++++- .../src/chain_head/chain_head_storage.rs | 20 +++++++++++-------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 93b6bc99750b..8cc3a11c0397 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -61,6 +61,9 @@ pub struct ChainHeadConfig { pub subscription_max_pinned_duration: Duration, /// The maximum number of ongoing operations per subscription. pub subscription_max_ongoing_operations: usize, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + pub operation_max_storage_items: usize, } /// Maximum pinned blocks across all connections. @@ -78,12 +81,17 @@ const MAX_PINNED_DURATION: Duration = Duration::from_secs(60); /// Note: The lower limit imposed by the spec is 16. const MAX_ONGOING_OPERATIONS: usize = 16; +/// The maximum number of items the `chainHead_storage` can return +/// before paginations is required. +const MAX_STORAGE_ITER_ITEMS: usize = 5; + impl Default for ChainHeadConfig { fn default() -> Self { ChainHeadConfig { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: MAX_PINNED_DURATION, subscription_max_ongoing_operations: MAX_ONGOING_OPERATIONS, + operation_max_storage_items: MAX_STORAGE_ITER_ITEMS, } } } @@ -100,6 +108,9 @@ pub struct ChainHead, Block: BlockT, Client> { subscriptions: Arc>, /// The hexadecimal encoded hash of the genesis block. genesis_hash: String, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + operation_max_storage_items: usize, /// Phantom member to pin the block type. _phantom: PhantomData, } @@ -124,6 +135,7 @@ impl, Block: BlockT, Client> ChainHead { config.subscription_max_ongoing_operations, backend, )), + operation_max_storage_items: config.operation_max_storage_items, genesis_hash, _phantom: PhantomData, } @@ -349,7 +361,10 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let mut storage_client = ChainHeadStorage::::new(self.client.clone()); + let mut storage_client = ChainHeadStorage::::new( + self.client.clone(), + self.operation_max_storage_items, + ); let operation_id = block_guard.operation_id(); // The number of operations we are allowed to execute. diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 6d0fb8e17691..0d54748d6485 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -37,10 +37,6 @@ use super::{ FollowEvent, }; -/// The maximum number of items the `chainHead_storage` can return -/// before paginations is required. -const MAX_ITER_ITEMS: usize = 5; - /// The query type of an interation. enum IterQueryType { /// Iterating over (key, value) pairs. @@ -55,13 +51,21 @@ pub struct ChainHeadStorage { client: Arc, /// Queue of operations that may require pagination. iter_operations: VecDeque, + /// The maximum number of items reported by the `chainHead_storage` before + /// pagination is required. + operation_max_storage_items: usize, _phandom: PhantomData<(BE, Block)>, } impl ChainHeadStorage { /// Constructs a new [`ChainHeadStorage`]. - pub fn new(client: Arc) -> Self { - Self { client, iter_operations: VecDeque::new(), _phandom: PhantomData } + pub fn new(client: Arc, operation_max_storage_items: usize) -> Self { + Self { + client, + iter_operations: VecDeque::new(), + operation_max_storage_items, + _phandom: PhantomData, + } } } @@ -160,8 +164,8 @@ where } .map_err(|err| err.to_string())?; - let mut ret = Vec::with_capacity(MAX_ITER_ITEMS); - for _ in 0..MAX_ITER_ITEMS { + let mut ret = Vec::with_capacity(self.operation_max_storage_items); + for _ in 0..self.operation_max_storage_items { let Some(key) = keys_iter.next() else { break }; From 60492c7a7f59720fe29ac2072fd1e7b4da29da6f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 18:30:45 +0300 Subject: [PATCH 09/27] chainHead/tests: Adjust chainHeadConfig Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index fe9767a06825..540dec31eefe 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -37,6 +37,7 @@ type Block = substrate_test_runtime_client::runtime::Block; const MAX_PINNED_BLOCKS: usize = 32; const MAX_PINNED_SECS: u64 = 60; const MAX_OPERATIONS: usize = 16; +const MAX_PAGINATION_LIMIT: usize = 5; const CHAIN_GENESIS: [u8; 32] = [0; 32]; const INVALID_HASH: [u8; 32] = [1; 32]; const KEY: &[u8] = b":mock"; @@ -84,6 +85,7 @@ async fn setup_api() -> ( global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -127,6 +129,7 @@ async fn follow_subscription_produces_blocks() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -188,6 +191,7 @@ async fn follow_with_runtime() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -299,6 +303,7 @@ async fn get_genesis() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -508,6 +513,7 @@ async fn call_runtime_without_flag() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1147,6 +1153,7 @@ async fn separate_operation_ids_for_subscriptions() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1227,6 +1234,7 @@ async fn follow_generates_initial_blocks() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1358,6 +1366,7 @@ async fn follow_exceeding_pinned_blocks() { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1412,6 +1421,7 @@ async fn follow_with_unpin() { global_max_pinned_blocks: 2, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1496,6 +1506,7 @@ async fn follow_prune_best_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1656,6 +1667,7 @@ async fn follow_forks_pruned_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1773,6 +1785,7 @@ async fn follow_report_multiple_pruned_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -1981,6 +1994,7 @@ async fn pin_block_references() { global_max_pinned_blocks: 3, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -2094,6 +2108,7 @@ async fn follow_finalized_before_new_block() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); @@ -2194,6 +2209,7 @@ async fn ensure_operation_limits_works() { global_max_pinned_blocks: MAX_PINNED_BLOCKS, subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), subscription_max_ongoing_operations: 1, + operation_max_storage_items: MAX_PAGINATION_LIMIT, }, ) .into_rpc(); From 6929ec593bc016fe0bedc5a70874ef6999992bf0 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 10 Aug 2023 18:52:03 +0300 Subject: [PATCH 10/27] chainHead/tests: Check pagination and continue method Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 169 ++++++++++++++++++++- 1 file changed, 168 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 540dec31eefe..67f3d45a0177 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -25,7 +25,7 @@ use sp_core::{ Blake2Hasher, Hasher, }; use sp_version::RuntimeVersion; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -44,6 +44,7 @@ const KEY: &[u8] = b":mock"; const VALUE: &[u8] = b"hello world"; const CHILD_STORAGE_KEY: &[u8] = b"child"; const CHILD_VALUE: &[u8] = b"child value"; +const DOES_NOT_PRODUCE_EVENTS_SECONDS: u64 = 10; async fn get_next_event(sub: &mut RpcSubscription) -> T { let (event, _sub_id) = tokio::time::timeout(std::time::Duration::from_secs(60), sub.next()) @@ -54,6 +55,13 @@ async fn get_next_event(sub: &mut RpcSubscriptio event } +async fn does_not_produce_event( + sub: &mut RpcSubscription, + duration: std::time::Duration, +) { + tokio::time::timeout(duration, sub.next::()).await.unwrap_err(); +} + async fn run_with_timeout(future: F) -> ::Output { tokio::time::timeout(std::time::Duration::from_secs(60 * 10), future) .await @@ -2285,3 +2293,162 @@ async fn ensure_operation_limits_works() { FollowEvent::OperationCallDone(done) if done.operation_id == operation_id && done.output == "0x0000000000000000" ); } + +#[tokio::test] +async fn check_continue_operation() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead with maximum 1 item before asking for pagination. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":mo".to_vec(), Some(b"ab".to_vec())).unwrap(); + builder.push_storage_change(b":moc".to_vec(), Some(b"abc".to_vec())).unwrap(); + builder.push_storage_change(b":mock".to_vec(), Some(b"abcd".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let invalid_hash = hex_string(&INVALID_HASH); + + // Invalid subscription ID must produce no results. + let _res: () = api + .call("chainHead_unstable_continue", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Invalid operation ID must produce no results. + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &invalid_hash]).await.unwrap(); + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { + key: hex_string(b":m"), + query_type: StorageQueryType::DescendantsValues + }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":m") && + res.items[0].result == StorageResultType::Value(hex_string(b"a")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":mo") && + res.items[0].result == StorageResultType::Value(hex_string(b"ab")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":moc") && + res.items[0].result == StorageResultType::Value(hex_string(b"abc")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; + let _res: () = api.call("chainHead_unstable_continue", [&sub_id, &operation_id]).await.unwrap(); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":mock") && + res.items[0].result == StorageResultType::Value(hex_string(b"abcd")) + ); + + // Finished. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); +} From 2791c90274f407f635e8297975dd66f3a003f2ec Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Aug 2023 12:40:15 +0300 Subject: [PATCH 11/27] chainHead/api: Add `chainHead_unstable_stopOperation` method Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/api.rs | 14 ++++++++++++++ client/rpc-spec-v2/src/chain_head/chain_head.rs | 8 ++++++++ 2 files changed, 22 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/api.rs b/client/rpc-spec-v2/src/chain_head/api.rs index 67905d801dd0..682cd690dd10 100644 --- a/client/rpc-spec-v2/src/chain_head/api.rs +++ b/client/rpc-spec-v2/src/chain_head/api.rs @@ -132,4 +132,18 @@ pub trait ChainHeadApi { follow_subscription: String, operation_id: String, ) -> RpcResult<()>; + + /// Stops an operation started with chainHead_unstable_body, chainHead_unstable_call, or + /// chainHead_unstable_storage. If the operation was still in progress, this interrupts it. If + /// the operation was already finished, this call has no effect. + /// + /// # Unstable + /// + /// This method is unstable and subject to change in the future. + #[method(name = "chainHead_unstable_stopOperation", blocking)] + fn chain_head_unstable_stop_operation( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()>; } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 8cc3a11c0397..bc600eae8c00 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -475,4 +475,12 @@ where Ok(()) } } + + fn chain_head_unstable_stop_operation( + &self, + follow_subscription: String, + operation_id: String, + ) -> RpcResult<()> { + Ok(()) + } } From 8bda4779b0d52cd270c8691fca25040ecccdd2fa Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Aug 2023 12:49:02 +0300 Subject: [PATCH 12/27] chainHead/subscription: Add shared atomic state for efficient alloc Signed-off-by: Alexandru Vasile --- .../src/chain_head/subscription/inner.rs | 79 +++++++++++++++---- 1 file changed, 63 insertions(+), 16 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 328d5116e1a3..1a6647c6bb3e 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -164,10 +164,14 @@ impl PermitOperations { } /// The state of one operation. +/// +/// This is directly exposed to users via `chain_head_unstable_continue` and +/// `chain_head_unstable_stop_operation`. #[derive(Clone)] pub struct OperationState { - /// True if the `chainHead` generated `waitingForContinue` event. - requested_continue: Arc, + /// The shared operation state that holds information about the + /// `waitingForContinue` event and cancellation. + shared_state: Arc, /// Send notifications when the user calls `chainHead_continue` method. send_continue: async_channel::Sender<()>, } @@ -178,19 +182,62 @@ impl OperationState { /// operation ID. pub fn submit_continue(&self) -> bool { // `waitingForContinue` not generated. - if !self.requested_continue.load(std::sync::atomic::Ordering::Acquire) { + if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { return false } // Has enough capacity for 1 message. self.send_continue.try_send(()).is_ok() } + + /// Stops the operation if `waitingForContinue` event was emitted for the associated + /// operation ID. + /// + /// Returns nothing in accordance with `chainHead_unstable_stopOperation`. + pub fn stop_operation(&self) { + // `waitingForContinue` not generated. + if !self.shared_state.requested_continue.load(std::sync::atomic::Ordering::Acquire) { + return + } + + self.shared_state + .operation_stopped + .store(true, std::sync::atomic::Ordering::Release); + + // Send might not have enough capacity if `submit_continue` was sent first. + // However, the `operation_stopped` boolean was set. + let _ = self.send_continue.try_send(()); + } +} + +/// The shared operation state between the backend [`RegisteredOperation`] and frontend +/// [`RegisteredOperation`]. +struct SharedOperationState { + /// True if the `chainHead` generated `waitingForContinue` event. + requested_continue: AtomicBool, + /// True if the operation was cancelled by the user. + operation_stopped: AtomicBool, +} + +impl SharedOperationState { + /// Constructs a new [`SharedOperationState`]. + /// + /// This is efficiently cloned under a single heap allocation. + fn new() -> Arc { + Arc::new(SharedOperationState { + requested_continue: AtomicBool::new(false), + operation_stopped: AtomicBool::new(false), + }) + } } /// The registered operation passed to the `chainHead` methods. +/// +/// This is used internally by the `chainHead` methods. struct RegisteredOperation { - /// True if the `chainHead` generated `waitingForContinue` event. - requested_continue: Arc, + /// The shared operation state that holds information about the + /// `waitingForContinue` event and cancellation. + shared_state: Arc, /// Receive notifications when the user calls `chainHead_continue` method. recv_continue: async_channel::Receiver<()>, /// The operation ID of the request. @@ -204,10 +251,17 @@ struct RegisteredOperation { impl RegisteredOperation { /// Wait until the user calls `chainHead_continue`. pub async fn wait_for_continue(&self) { - self.requested_continue.store(true, std::sync::atomic::Ordering::Release); + self.shared_state + .requested_continue + .store(true, std::sync::atomic::Ordering::Release); let _ = self.recv_continue.recv().await; } + /// Returns true if the current operation was stopped. + pub fn was_stopped(&self) -> bool { + self.shared_state.operation_stopped.load(std::sync::atomic::Ordering::Acquire) + } + /// Get the operation ID. fn operation_id(&self) -> String { self.operation_id.clone() @@ -256,22 +310,15 @@ impl Operations { // At most one message can be sent. let (send_continue, recv_continue) = async_channel::bounded(1); - let requested_continue = Arc::new(AtomicBool::new(false)); + let shared_state = SharedOperationState::new(); - let state = - OperationState { requested_continue: requested_continue.clone(), send_continue }; + let state = OperationState { send_continue, shared_state: shared_state.clone() }; // Cloned operations for removing the current ID on drop. let operations = self.operations.clone(); operations.lock().insert(operation_id.clone(), state); - Some(RegisteredOperation { - requested_continue, - operation_id, - recv_continue, - operations, - permit, - }) + Some(RegisteredOperation { shared_state, operation_id, recv_continue, operations, permit }) } /// Get the associated operation state with the ID. From 382ce98e3367ffa1cf13a4ccddf9d0f9cbc507b4 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Aug 2023 13:25:43 +0300 Subject: [PATCH 13/27] chainHead: Implement operation stop Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 25 +++++++++----- .../src/chain_head/chain_head_storage.rs | 34 +++++++++++++------ .../src/chain_head/subscription/inner.rs | 33 ++++++++---------- 3 files changed, 54 insertions(+), 38 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index bc600eae8c00..03c41cba6407 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -255,6 +255,7 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; + let operation = block_guard.operation(); let event = match self.client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block @@ -264,7 +265,7 @@ where .map(|extrinsic| hex_string(&extrinsic.encode())) .collect(); FollowEvent::::OperationBodyDone(OperationBodyDone { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), value: extrinsics, }) }, @@ -280,14 +281,14 @@ where return Err(ChainHeadRpcError::InvalidBlock.into()) }, Err(error) => FollowEvent::::OperationError(OperationError { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), error: error.to_string(), }), }; let _ = block_guard.response_sender().unbounded_send(event); Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), discarded_items: None, })) } @@ -365,10 +366,11 @@ where self.client.clone(), self.operation_max_storage_items, ); - let operation_id = block_guard.operation_id(); + let operation = block_guard.operation(); + let operation_id = operation.operation_id(); // The number of operations we are allowed to execute. - let num_operations = block_guard.num_reserved(); + let num_operations = operation.num_reserved(); let discarded = items.len().saturating_sub(num_operations); let mut items = items; items.truncate(num_operations); @@ -416,26 +418,27 @@ where .into()) } + let operation = block_guard.operation(); let event = self .client .executor() .call(hash, &function, &call_parameters, CallContext::Offchain) .map(|result| { FollowEvent::::OperationCallDone(OperationCallDone { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), output: hex_string(&result), }) }) .unwrap_or_else(|error| { FollowEvent::::OperationError(OperationError { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), error: error.to_string(), }) }); let _ = block_guard.response_sender().unbounded_send(event); Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), discarded_items: None, })) } @@ -481,6 +484,12 @@ where follow_subscription: String, operation_id: String, ) -> RpcResult<()> { + let Some(operation) = self.subscriptions.get_operation(&follow_subscription, &operation_id) else { + return Ok(()) + }; + + operation.stop_operation(); + Ok(()) } } diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 0d54748d6485..0e960d620523 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -194,14 +194,18 @@ where child_key: Option, ) { let sender = block_guard.response_sender(); - let operation_id = block_guard.operation_id(); + let operation = block_guard.operation(); while let Some(query) = self.iter_operations.pop_front() { + if operation.was_stopped() { + return + } + let result = self.query_storage_iter_pagination(query, hash, child_key.as_ref()); let (events, maybe_next_query) = match result { QueryIterResult::Ok(result) => result, QueryIterResult::Err(error) => { - send_error::(&sender, operation_id.clone(), error.to_string()); + send_error::(&sender, operation.operation_id(), error.to_string()); return }, }; @@ -209,25 +213,32 @@ where if !events.is_empty() { // Send back the results of the iteration produced so far. let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( - OperationStorageItems { operation_id: operation_id.clone(), items: events }, + OperationStorageItems { operation_id: operation.operation_id(), items: events }, )); } if let Some(next_query) = maybe_next_query { let _ = sender.unbounded_send(FollowEvent::::OperationWaitingForContinue( - OperationId { operation_id: operation_id.clone() }, + OperationId { operation_id: operation.operation_id() }, )); - block_guard.wait_for_continue().await; - // Give a chance for the other operations to advance next time. + // The operation might be continued or cancelled only after the + // `OperationWaitingForContinue` is generated above. + operation.wait_for_continue().await; + + // Give a chance for the other items to advance next time. self.iter_operations.push_back(next_query); } } + if operation.was_stopped() { + return + } + let _ = sender.unbounded_send(FollowEvent::::OperationStorageDone(OperationId { - operation_id, + operation_id: operation.operation_id(), })); } @@ -240,11 +251,12 @@ where child_key: Option, ) { let sender = block_guard.response_sender(); + let operation = block_guard.operation(); if let Some(child_key) = child_key.as_ref() { if !is_key_queryable(child_key.storage_key()) { let _ = sender.unbounded_send(FollowEvent::::OperationStorageDone( - OperationId { operation_id: block_guard.operation_id() }, + OperationId { operation_id: operation.operation_id() }, )); return } @@ -262,7 +274,7 @@ where Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); + send_error::(&sender, operation.operation_id(), error); return }, } @@ -272,7 +284,7 @@ where Ok(Some(value)) => storage_results.push(value), Ok(None) => continue, Err(error) => { - send_error::(&sender, block_guard.operation_id(), error); + send_error::(&sender, operation.operation_id(), error); return }, }, @@ -289,7 +301,7 @@ where if !storage_results.is_empty() { let _ = sender.unbounded_send(FollowEvent::::OperationStorageItems( OperationStorageItems { - operation_id: block_guard.operation_id(), + operation_id: operation.operation_id(), items: storage_results, }, )); diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index 1a6647c6bb3e..b27d253386a8 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -187,6 +187,7 @@ impl OperationState { } // Has enough capacity for 1 message. + // Can fail if the `stop_operation` propagated the stop first. self.send_continue.try_send(()).is_ok() } @@ -234,7 +235,7 @@ impl SharedOperationState { /// The registered operation passed to the `chainHead` methods. /// /// This is used internally by the `chainHead` methods. -struct RegisteredOperation { +pub struct RegisteredOperation { /// The shared operation state that holds information about the /// `waitingForContinue` event and cancellation. shared_state: Arc, @@ -249,12 +250,18 @@ struct RegisteredOperation { } impl RegisteredOperation { - /// Wait until the user calls `chainHead_continue`. + /// Wait until the user calls `chainHead_continue` or the operation + /// is cancelled via `chainHead_stopOperation`. pub async fn wait_for_continue(&self) { self.shared_state .requested_continue .store(true, std::sync::atomic::Ordering::Release); + let _ = self.recv_continue.recv().await; + + self.shared_state + .requested_continue + .store(false, std::sync::atomic::Ordering::Release); } /// Returns true if the current operation was stopped. @@ -263,14 +270,14 @@ impl RegisteredOperation { } /// Get the operation ID. - fn operation_id(&self) -> String { + pub fn operation_id(&self) -> String { self.operation_id.clone() } /// Returns the number of reserved elements for this permit. /// /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. - fn num_reserved(&self) -> usize { + pub fn num_reserved(&self) -> usize { self.permit.num_ops } } @@ -523,21 +530,9 @@ impl> BlockGuard { self.response_sender.clone() } - /// The operation ID of this method. - pub fn operation_id(&self) -> String { - self.operation.operation_id() - } - - /// Returns the number of reserved elements for this permit. - /// - /// This can be smaller than the number of items requested. - pub fn num_reserved(&self) -> usize { - self.operation.num_reserved() - } - - /// Wait until the user calls `chainHead_continue`. - pub async fn wait_for_continue(&self) { - self.operation.wait_for_continue().await + /// Get the details of the registered operation. + pub fn operation(&self) -> &RegisteredOperation { + &self.operation } } From 3a5c12afed6831e3365c82fa011f37637e69f10a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Aug 2023 14:03:43 +0300 Subject: [PATCH 14/27] chainHead/tests: Check that storage ops can be cancelled Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/tests.rs | 113 +++++++++++++++++++++ 1 file changed, 113 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 67f3d45a0177..00ed9089058e 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -2452,3 +2452,116 @@ async fn check_continue_operation() { FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id ); } + +#[tokio::test] +async fn stop_storage_operation() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + // Configure the chainHead with maximum 1 item before asking for pagination. + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: 1, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":m".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":mo".to_vec(), Some(b"ab".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + let invalid_hash = hex_string(&INVALID_HASH); + + // Invalid subscription ID must produce no results. + let _res: () = api + .call("chainHead_unstable_stopOperation", ["invalid_sub_id", &invalid_hash]) + .await + .unwrap(); + + // Invalid operation ID must produce no results. + let _res: () = api + .call("chainHead_unstable_stopOperation", [&sub_id, &invalid_hash]) + .await + .unwrap(); + + // Valid call with storage at the key. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![StorageQuery { + key: hex_string(b":m"), + query_type: StorageQueryType::DescendantsValues + }] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageItems(res) if res.operation_id == operation_id && + res.items.len() == 1 && + res.items[0].key == hex_string(b":m") && + res.items[0].result == StorageResultType::Value(hex_string(b"a")) + ); + + // Pagination event. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationWaitingForContinue(res) if res.operation_id == operation_id + ); + + // Stop the operation. + let _res: () = api + .call("chainHead_unstable_stopOperation", [&sub_id, &operation_id]) + .await + .unwrap(); + + does_not_produce_event::>( + &mut sub, + std::time::Duration::from_secs(DOES_NOT_PRODUCE_EVENTS_SECONDS), + ) + .await; +} From 6d825a68f1d8eddca7ccb57d3e64a5404b77ed6c Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 11 Aug 2023 14:40:02 +0300 Subject: [PATCH 15/27] chainHead/storage: Change docs for query_storage_iter_pagination Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/chain_head_storage.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 0e960d620523..3ff01a67eaa2 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -145,7 +145,7 @@ where .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } - /// Iterate over at most `MAX_ITER_ITEMS` keys. + /// Iterate over at most `operation_max_storage_items` keys. /// /// Returns the storage result with a potential next key to resume iteration. fn query_storage_iter_pagination( From cab6202b423665475f634e7e0f4e877667254853 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 16 Aug 2023 12:39:51 +0300 Subject: [PATCH 16/27] chainHead/subscriptions: Fix merge conflicts Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/subscription/inner.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index b27d253386a8..f30cb1bc806a 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -17,12 +17,13 @@ // along with this program. If not, see . use futures::channel::oneshot; +use parking_lot::Mutex; use sc_client_api::Backend; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, - sync::{atomic::AtomicBool, Arc, Mutex}, + sync::{atomic::AtomicBool, Arc}, time::{Duration, Instant}, }; @@ -154,15 +155,6 @@ struct PermitOperations { _permit: tokio::sync::OwnedSemaphorePermit, } -impl PermitOperations { - /// Returns the number of reserved elements for this permit. - /// - /// This can be smaller than the number of items requested via [`LimitOperations::reserve()`]. - fn num_reserved(&self) -> usize { - self.num_ops - } -} - /// The state of one operation. /// /// This is directly exposed to users via `chain_head_unstable_continue` and From 141ca3f049a56316ea03580697f26f6842d4e8c9 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 23 Aug 2023 14:50:25 +0300 Subject: [PATCH 17/27] chainHead: Replace `async-channel` with `tokio::sync` Signed-off-by: Alexandru Vasile --- Cargo.lock | 1 - client/rpc-spec-v2/Cargo.toml | 1 - .../rpc-spec-v2/src/chain_head/chain_head.rs | 29 ++++++++----------- .../src/chain_head/chain_head_storage.rs | 4 +-- .../src/chain_head/subscription/inner.rs | 12 ++++---- 5 files changed, 20 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0267f2f54b52..3dd5bc52b71c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9618,7 +9618,6 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "assert_matches", - "async-channel", "futures", "futures-util", "hex", diff --git a/client/rpc-spec-v2/Cargo.toml b/client/rpc-spec-v2/Cargo.toml index c7352b111e0d..599596777b7b 100644 --- a/client/rpc-spec-v2/Cargo.toml +++ b/client/rpc-spec-v2/Cargo.toml @@ -36,7 +36,6 @@ tokio = { version = "1.22.0", features = ["sync"] } array-bytes = "6.1" log = "0.4.17" futures-util = { version = "0.3.19", default-features = false } -async-channel = "1.8.0" [dev-dependencies] serde_json = "1.0" diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index 03c41cba6407..bae7c84df0ed 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -244,7 +244,7 @@ where follow_subscription: String, hash: Block::Hash, ) -> RpcResult { - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => return Ok(MethodResponse::LimitReached), @@ -255,7 +255,8 @@ where Err(_) => return Err(ChainHeadRpcError::InvalidBlock.into()), }; - let operation = block_guard.operation(); + let operation_id = block_guard.operation().operation_id(); + let event = match self.client.block(hash) { Ok(Some(signed_block)) => { let extrinsics = signed_block @@ -265,7 +266,7 @@ where .map(|extrinsic| hex_string(&extrinsic.encode())) .collect(); FollowEvent::::OperationBodyDone(OperationBodyDone { - operation_id: operation.operation_id(), + operation_id: operation_id.clone(), value: extrinsics, }) }, @@ -281,16 +282,13 @@ where return Err(ChainHeadRpcError::InvalidBlock.into()) }, Err(error) => FollowEvent::::OperationError(OperationError { - operation_id: operation.operation_id(), + operation_id: operation_id.clone(), error: error.to_string(), }), }; let _ = block_guard.response_sender().unbounded_send(event); - Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: operation.operation_id(), - discarded_items: None, - })) + Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None })) } fn chain_head_unstable_header( @@ -350,7 +348,7 @@ where .transpose()? .map(ChildInfo::new_default_from_vec); - let block_guard = + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, items.len()) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | @@ -396,7 +394,7 @@ where ) -> RpcResult { let call_parameters = Bytes::from(parse_hex_param(call_parameters)?); - let block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { + let mut block_guard = match self.subscriptions.lock_block(&follow_subscription, hash, 1) { Ok(block) => block, Err(SubscriptionManagementError::SubscriptionAbsent) | Err(SubscriptionManagementError::ExceededLimits) => { @@ -418,29 +416,26 @@ where .into()) } - let operation = block_guard.operation(); + let operation_id = block_guard.operation().operation_id(); let event = self .client .executor() .call(hash, &function, &call_parameters, CallContext::Offchain) .map(|result| { FollowEvent::::OperationCallDone(OperationCallDone { - operation_id: operation.operation_id(), + operation_id: operation_id.clone(), output: hex_string(&result), }) }) .unwrap_or_else(|error| { FollowEvent::::OperationError(OperationError { - operation_id: operation.operation_id(), + operation_id: operation_id.clone(), error: error.to_string(), }) }); let _ = block_guard.response_sender().unbounded_send(event); - Ok(MethodResponse::Started(MethodResponseStarted { - operation_id: operation.operation_id(), - discarded_items: None, - })) + Ok(MethodResponse::Started(MethodResponseStarted { operation_id, discarded_items: None })) } fn chain_head_unstable_unpin( diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 3ff01a67eaa2..5e1f38f9a997 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -189,7 +189,7 @@ where /// necessary. async fn generate_storage_iter_events( &mut self, - block_guard: BlockGuard, + mut block_guard: BlockGuard, hash: Block::Hash, child_key: Option, ) { @@ -245,7 +245,7 @@ where /// Generate the block events for the `chainHead_storage` method. pub async fn generate_events( &mut self, - block_guard: BlockGuard, + mut block_guard: BlockGuard, hash: Block::Hash, items: Vec>, child_key: Option, diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index f30cb1bc806a..edb219749c6d 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -165,7 +165,7 @@ pub struct OperationState { /// `waitingForContinue` event and cancellation. shared_state: Arc, /// Send notifications when the user calls `chainHead_continue` method. - send_continue: async_channel::Sender<()>, + send_continue: tokio::sync::mpsc::Sender<()>, } impl OperationState { @@ -232,7 +232,7 @@ pub struct RegisteredOperation { /// `waitingForContinue` event and cancellation. shared_state: Arc, /// Receive notifications when the user calls `chainHead_continue` method. - recv_continue: async_channel::Receiver<()>, + recv_continue: tokio::sync::mpsc::Receiver<()>, /// The operation ID of the request. operation_id: String, /// Track the operations ID of this subscription. @@ -244,7 +244,7 @@ pub struct RegisteredOperation { impl RegisteredOperation { /// Wait until the user calls `chainHead_continue` or the operation /// is cancelled via `chainHead_stopOperation`. - pub async fn wait_for_continue(&self) { + pub async fn wait_for_continue(&mut self) { self.shared_state .requested_continue .store(true, std::sync::atomic::Ordering::Release); @@ -308,7 +308,7 @@ impl Operations { let operation_id = self.next_operation_id(); // At most one message can be sent. - let (send_continue, recv_continue) = async_channel::bounded(1); + let (send_continue, recv_continue) = tokio::sync::mpsc::channel(1); let shared_state = SharedOperationState::new(); let state = OperationState { send_continue, shared_state: shared_state.clone() }; @@ -523,8 +523,8 @@ impl> BlockGuard { } /// Get the details of the registered operation. - pub fn operation(&self) -> &RegisteredOperation { - &self.operation + pub fn operation(&mut self) -> &mut RegisteredOperation { + &mut self.operation } } From 7ccef405dbddb76518c60b7d3bc98abf9fc4c8f1 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 23 Aug 2023 15:31:29 +0300 Subject: [PATCH 18/27] chainHead/subscription: Add comment about the sender/recv continue Signed-off-by: Alexandru Vasile --- client/rpc-spec-v2/src/chain_head/subscription/inner.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs index edb219749c6d..d6f64acd63f5 100644 --- a/client/rpc-spec-v2/src/chain_head/subscription/inner.rs +++ b/client/rpc-spec-v2/src/chain_head/subscription/inner.rs @@ -249,6 +249,9 @@ impl RegisteredOperation { .requested_continue .store(true, std::sync::atomic::Ordering::Release); + // The sender part of this channel is around for as long as this object exists, + // because it is stored in the `OperationState` of the `operations` field. + // The sender part is removed from tracking when this object is dropped. let _ = self.recv_continue.recv().await; self.shared_state From e48ba56e5cb036022b785c57903a8bd50f4c7a95 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 13:29:08 +0300 Subject: [PATCH 19/27] Cargo: Point trie-db related crates to git merkle branch Signed-off-by: Alexandru Vasile --- Cargo.lock | 77 +++++++++++++------ bin/node/bench/Cargo.toml | 2 +- client/db/Cargo.toml | 2 +- primitives/api/Cargo.toml | 2 +- primitives/core/Cargo.toml | 2 +- primitives/state-machine/Cargo.toml | 4 +- primitives/trie/Cargo.toml | 8 +- test-utils/runtime/Cargo.toml | 2 +- utils/binary-merkle-tree/Cargo.toml | 2 +- .../rpc/state-trie-migration-rpc/Cargo.toml | 2 +- 10 files changed, 68 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3dd5bc52b71c..81a5f1df0103 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -755,7 +755,7 @@ version = "4.0.0-dev" dependencies = [ "array-bytes", "env_logger 0.9.3", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "sp-core", "sp-runtime", @@ -3114,6 +3114,11 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8e7d7786361d7425ae2fe4f9e407eb0efaa0840f5212d109cc018c40c35c6ab4" +[[package]] +name = "hash-db" +version = "0.16.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" + [[package]] name = "hash256-std-hasher" version = "0.15.2" @@ -3808,7 +3813,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19ea4653859ca2266a86419d3f592d3f22e7a854b482f99180d2498507902048" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "hash256-std-hasher", "tiny-keccak", ] @@ -4708,7 +4713,15 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808b50db46293432a45e63bc15ea51e0ab4c0a1647b8eb114e31a3e698dd6fbe" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "memory-db" +version = "0.32.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -5107,7 +5120,7 @@ dependencies = [ "derive_more", "fs_extra", "futures", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "kitchensink-runtime", "kvdb", "kvdb-rocksdb", @@ -8757,7 +8770,7 @@ version = "0.10.0-dev" dependencies = [ "array-bytes", "criterion", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "kitchensink-runtime", "kvdb", "kvdb-memorydb", @@ -10404,7 +10417,7 @@ dependencies = [ name = "sp-api" version = "4.0.0-dev" dependencies = [ - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "parity-scale-codec", "scale-info", @@ -10765,7 +10778,7 @@ dependencies = [ "dyn-clonable", "ed25519-zebra", "futures", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hash256-std-hasher", "impl-serde", "lazy_static", @@ -11168,7 +11181,7 @@ version = "0.28.0" dependencies = [ "array-bytes", "assert_matches", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "log", "parity-scale-codec", "parking_lot 0.12.1", @@ -11183,7 +11196,7 @@ dependencies = [ "sp-trie", "thiserror", "tracing", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -11290,10 +11303,10 @@ dependencies = [ "ahash 0.8.3", "array-bytes", "criterion", - "hash-db", + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hashbrown 0.13.2", "lazy_static", - "memory-db", + "memory-db 0.32.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "nohash-hasher", "parity-scale-codec", "parking_lot 0.12.1", @@ -11305,8 +11318,8 @@ dependencies = [ "thiserror", "tracing", "trie-bench", - "trie-db", - "trie-root", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", + "trie-root 0.18.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "trie-standardmap", ] @@ -11645,7 +11658,7 @@ dependencies = [ "sp-runtime", "sp-state-machine", "sp-trie", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -11720,7 +11733,7 @@ dependencies = [ "sp-version", "substrate-test-runtime-client", "substrate-wasm-builder", - "trie-db", + "trie-db 0.27.1 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -12340,12 +12353,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f54b4f9d51d368e62cf7e0730c7c1e18fc658cc84333656bab5b328f44aa964" dependencies = [ "criterion", - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hasher", - "memory-db", + "memory-db 0.32.0 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec", - "trie-db", - "trie-root", + "trie-db 0.27.1 (registry+https://github.com/rust-lang/crates.io-index)", + "trie-root 0.18.0 (registry+https://github.com/rust-lang/crates.io-index)", "trie-standardmap", ] @@ -12355,7 +12368,19 @@ version = "0.27.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "767abe6ffed88a1889671a102c2861ae742726f52e0a5a425b92c9fbfa7e9c85" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", + "hashbrown 0.13.2", + "log", + "rustc-hex", + "smallvec", +] + +[[package]] +name = "trie-db" +version = "0.27.1" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", "hashbrown 0.13.2", "log", "rustc-hex", @@ -12368,7 +12393,15 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4ed310ef5ab98f5fa467900ed906cb9232dd5376597e00fd4cba2a449d06c0b" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "trie-root" +version = "0.18.0" +source = "git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value#77f19e2ca5fb45bec3e1dbcca82a9023d75c312d" +dependencies = [ + "hash-db 0.16.0 (git+https://github.com/paritytech/trie.git?branch=lexnv/expose_merkle_value)", ] [[package]] @@ -12377,7 +12410,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "684aafb332fae6f83d7fe10b3fbfdbe39a1b3234c4e2a618f030815838519516" dependencies = [ - "hash-db", + "hash-db 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)", "keccak-hasher", ] diff --git a/bin/node/bench/Cargo.toml b/bin/node/bench/Cargo.toml index 7703f8ed2e4e..2ce17ae94f90 100644 --- a/bin/node/bench/Cargo.toml +++ b/bin/node/bench/Cargo.toml @@ -33,7 +33,7 @@ sc-basic-authorship = { version = "0.10.0-dev", path = "../../../client/basic-au sp-inherents = { version = "4.0.0-dev", path = "../../../primitives/inherents" } sp-timestamp = { version = "4.0.0-dev", default-features = false, path = "../../../primitives/timestamp" } sp-tracing = { version = "10.0.0", path = "../../../primitives/tracing" } -hash-db = "0.16.0" +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" } tempfile = "3.1.0" fs_extra = "1" rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/client/db/Cargo.toml b/client/db/Cargo.toml index 1845158dac11..55ef7278abf4 100644 --- a/client/db/Cargo.toml +++ b/client/db/Cargo.toml @@ -16,7 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] codec = { package = "parity-scale-codec", version = "3.6.1", features = [ "derive", ] } -hash-db = "0.16.0" +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" } kvdb = "0.13.0" kvdb-memorydb = "0.13.0" kvdb-rocksdb = { version = "0.19.0", optional = true } diff --git a/primitives/api/Cargo.toml b/primitives/api/Cargo.toml index 2f0fe5d5d93c..1c6a98ee2c73 100644 --- a/primitives/api/Cargo.toml +++ b/primitives/api/Cargo.toml @@ -22,7 +22,7 @@ sp-externalities = { version = "0.19.0", default-features = false, optional = tr sp-version = { version = "22.0.0", default-features = false, path = "../version" } sp-state-machine = { version = "0.28.0", default-features = false, optional = true, path = "../state-machine" } sp-trie = { version = "22.0.0", default-features = false, optional = true, path = "../trie" } -hash-db = { version = "0.16.0", optional = true } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value" , optional = true } thiserror = { version = "1.0.30", optional = true } scale-info = { version = "2.1.1", default-features = false, features = ["derive"] } sp-metadata-ir = { version = "0.1.0", default-features = false, optional = true, path = "../metadata-ir" } diff --git a/primitives/core/Cargo.toml b/primitives/core/Cargo.toml index ee4bf8924186..9cd638cdc134 100644 --- a/primitives/core/Cargo.toml +++ b/primitives/core/Cargo.toml @@ -21,7 +21,7 @@ serde = { version = "1.0.163", optional = true, default-features = false, featu bounded-collections = { version = "0.1.8", default-features = false } primitive-types = { version = "0.12.0", default-features = false, features = ["codec", "scale-info"] } impl-serde = { version = "0.4.0", default-features = false, optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } hash256-std-hasher = { version = "0.15.2", default-features = false } bs58 = { version = "0.4.0", default-features = false, optional = true } rand = { version = "0.8.5", features = ["small_rng"], optional = true } diff --git a/primitives/state-machine/Cargo.toml b/primitives/state-machine/Cargo.toml index 32be8e518f49..37879a7e46de 100644 --- a/primitives/state-machine/Cargo.toml +++ b/primitives/state-machine/Cargo.toml @@ -15,7 +15,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } log = { version = "0.4.17", default-features = false } parking_lot = { version = "0.12.1", optional = true } rand = { version = "0.8.5", optional = true } @@ -27,7 +27,7 @@ sp-externalities = { version = "0.19.0", default-features = false, path = "../ex sp-panic-handler = { version = "8.0.0", optional = true, path = "../panic-handler" } sp-std = { version = "8.0.0", default-features = false, path = "../std" } sp-trie = { version = "22.0.0", default-features = false, path = "../trie" } -trie-db = { version = "0.27.1", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } [dev-dependencies] array-bytes = "6.1" diff --git a/primitives/trie/Cargo.toml b/primitives/trie/Cargo.toml index 546d6786fc63..6e314478d9e8 100644 --- a/primitives/trie/Cargo.toml +++ b/primitives/trie/Cargo.toml @@ -21,16 +21,16 @@ harness = false ahash = { version = "0.8.2", optional = true } codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false } hashbrown = { version = "0.13.2", optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } lazy_static = { version = "1.4.0", optional = true } -memory-db = { version = "0.32.0", default-features = false } +memory-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } nohash-hasher = { version = "0.2.0", optional = true } parking_lot = { version = "0.12.1", optional = true } scale-info = { version = "2.5.0", default-features = false, features = ["derive"] } thiserror = { version = "1.0.30", optional = true } tracing = { version = "0.1.29", optional = true } -trie-db = { version = "0.27.0", default-features = false } -trie-root = { version = "0.18.0", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } +trie-root = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } sp-core = { version = "21.0.0", default-features = false, path = "../core" } sp-std = { version = "8.0.0", default-features = false, path = "../std" } schnellru = { version = "0.2.1", optional = true } diff --git a/test-utils/runtime/Cargo.toml b/test-utils/runtime/Cargo.toml index 320d0e07dece..ab102b1e037d 100644 --- a/test-utils/runtime/Cargo.toml +++ b/test-utils/runtime/Cargo.toml @@ -40,7 +40,7 @@ pallet-timestamp = { version = "4.0.0-dev", default-features = false, path = ".. sp-consensus-grandpa = { version = "4.0.0-dev", default-features = false, path = "../../primitives/consensus/grandpa", features = ["serde"] } sp-trie = { version = "22.0.0", default-features = false, path = "../../primitives/trie" } sp-transaction-pool = { version = "4.0.0-dev", default-features = false, path = "../../primitives/transaction-pool" } -trie-db = { version = "0.27.0", default-features = false } +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } sc-service = { version = "0.10.0-dev", default-features = false, optional = true, features = ["test-helpers"], path = "../../client/service" } sp-state-machine = { version = "0.28.0", default-features = false, path = "../../primitives/state-machine" } sp-externalities = { version = "0.19.0", default-features = false, path = "../../primitives/externalities" } diff --git a/utils/binary-merkle-tree/Cargo.toml b/utils/binary-merkle-tree/Cargo.toml index 4b7b9e53ef87..cba832e3051a 100644 --- a/utils/binary-merkle-tree/Cargo.toml +++ b/utils/binary-merkle-tree/Cargo.toml @@ -11,7 +11,7 @@ homepage = "https://substrate.io" [dependencies] array-bytes = { version = "6.1", optional = true } log = { version = "0.4", default-features = false, optional = true } -hash-db = { version = "0.16.0", default-features = false } +hash-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } [dev-dependencies] array-bytes = "6.1" diff --git a/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml b/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml index 9eee52aacba7..a87df85ad862 100644 --- a/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml +++ b/utils/frame/rpc/state-trie-migration-rpc/Cargo.toml @@ -19,7 +19,7 @@ serde = { version = "1", features = ["derive"] } sp-core = { path = "../../../../primitives/core" } sp-state-machine = { path = "../../../../primitives/state-machine" } sp-trie = { path = "../../../../primitives/trie" } -trie-db = "0.27.0" +trie-db = { git = "https://github.com/paritytech/trie.git", branch = "lexnv/expose_merkle_value", default-features = false } jsonrpsee = { version = "0.16.2", features = ["client-core", "server", "macros"] } From 08c9082372a7e50d4d3a80e936e9b669021b2a7d Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:02:51 +0300 Subject: [PATCH 20/27] client: Expose merkle value on storageProvider trait Signed-off-by: Alexandru Vasile --- client/api/src/backend.rs | 14 ++++++++++++++ client/service/src/client/client.rs | 16 ++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 465e1988478d..c01efb5bdc9f 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -476,6 +476,20 @@ pub trait StorageProvider> { child_info: &ChildInfo, key: &StorageKey, ) -> sp_blockchain::Result>; + + /// Given a block's `Hash` and a key, return the closest merkle value. + fn closest_merkle_value( + &self, + hash: Block::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result>; + + /// Given a block's `Hash`, a key and a child storage key, return the closest merkle value. + fn child_closest_merkle_value( + &self, + hash: Block::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result>; } /// Client backend. diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index d0a46ab2c011..4238ba407996 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1555,6 +1555,22 @@ where .child_storage_hash(child_info, &key.0) .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) } + + fn closest_merkle_value( + &self, + hash: ::Hash, + key: &StorageKey, + ) -> blockchain::Result::Hash>> { + Ok(None) + } + + fn child_closest_merkle_value( + &self, + hash: ::Hash, + key: &StorageKey, + ) -> blockchain::Result::Hash>> { + Ok(None) + } } impl HeaderMetadata for Client From 40722f3197b2601837c0add032e8a9b904b75f1a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:10:30 +0300 Subject: [PATCH 21/27] primitives/backend: Expose merkle value on Backend trait Signed-off-by: Alexandru Vasile --- primitives/state-machine/src/backend.rs | 12 +++++++++++- primitives/state-machine/src/trie_backend.rs | 12 ++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/primitives/state-machine/src/backend.rs b/primitives/state-machine/src/backend.rs index f3244308a54c..b4e878aa75ea 100644 --- a/primitives/state-machine/src/backend.rs +++ b/primitives/state-machine/src/backend.rs @@ -191,7 +191,17 @@ pub trait Backend: sp_std::fmt::Debug { /// Get keyed storage value hash or None if there is nothing associated. fn storage_hash(&self, key: &[u8]) -> Result, Self::Error>; - /// Get keyed child storage or None if there is nothing associated. + /// Get the merkle value or None if there is nothing associated. + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error>; + + /// Get the child merkle value or None if there is nothing associated. + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error>; + + /// Get child keyed child storage or None if there is nothing associated. fn child_storage( &self, child_info: &ChildInfo, diff --git a/primitives/state-machine/src/trie_backend.rs b/primitives/state-machine/src/trie_backend.rs index b7940fa8c39d..ded44fda27bc 100644 --- a/primitives/state-machine/src/trie_backend.rs +++ b/primitives/state-machine/src/trie_backend.rs @@ -405,6 +405,18 @@ where self.essence.child_storage(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + Ok(None) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + Ok(None) + } + fn next_storage_key(&self, key: &[u8]) -> Result, Self::Error> { let (is_cached, mut cache) = access_cache(&self.next_storage_key_cache, Option::take) .map(|cache| (cache.last_key == key, cache)) From 4a358eefe49e5463d5644937e93acc50aed12632 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:22:32 +0300 Subject: [PATCH 22/27] primitives/trie: Low level functions for reading the merkle value Signed-off-by: Alexandru Vasile --- primitives/trie/src/lib.rs | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/primitives/trie/src/lib.rs b/primitives/trie/src/lib.rs index 94155458569b..4700d3bf8a41 100644 --- a/primitives/trie/src/lib.rs +++ b/primitives/trie/src/lib.rs @@ -295,6 +295,24 @@ pub fn read_trie_value( + db: &DB, + root: &TrieHash, + key: &[u8], + recorder: Option<&mut dyn TrieRecorder>>, + cache: Option<&mut dyn TrieCache>, +) -> Result>, Box>> +where + DB: hash_db::HashDBRef, +{ + TrieDBBuilder::::new(db, root) + .with_optional_cache(cache) + .with_optional_recorder(recorder) + .build() + .get_closest_merkle_value(key) +} + /// Read a value from the trie with given Query. pub fn read_trie_value_with< L: TrieLayout, @@ -397,6 +415,26 @@ where .get_hash(key) } +/// Read the closest merkle value from the child trie. +pub fn read_child_trie_closest_merkle_value( + keyspace: &[u8], + db: &DB, + root: &TrieHash, + key: &[u8], + recorder: Option<&mut dyn TrieRecorder>>, + cache: Option<&mut dyn TrieCache>, +) -> Result>, Box>> +where + DB: hash_db::HashDBRef, +{ + let db = KeySpacedDB::new(db, keyspace); + TrieDBBuilder::::new(&db, &root) + .with_optional_recorder(recorder) + .with_optional_cache(cache) + .build() + .get_closest_merkle_value(key) +} + /// Read a value from the child trie with given query. pub fn read_child_trie_value_with( keyspace: &[u8], From 7477c3be1b0dafcc3b4e0fd6fda75c76b5a29600 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:24:07 +0300 Subject: [PATCH 23/27] primitives/trie: Implement closest merkle value on TrieBackendEssence Signed-off-by: Alexandru Vasile --- .../state-machine/src/trie_backend_essence.rs | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/primitives/state-machine/src/trie_backend_essence.rs b/primitives/state-machine/src/trie_backend_essence.rs index 22c76b56deb0..556a860f9752 100644 --- a/primitives/state-machine/src/trie_backend_essence.rs +++ b/primitives/state-machine/src/trie_backend_essence.rs @@ -32,8 +32,9 @@ use sp_std::{boxed::Box, marker::PhantomData, vec::Vec}; #[cfg(feature = "std")] use sp_trie::recorder::Recorder; use sp_trie::{ - child_delta_trie_root, delta_trie_root, empty_child_trie_root, read_child_trie_hash, - read_child_trie_value, read_trie_value, + child_delta_trie_root, delta_trie_root, empty_child_trie_root, + read_child_trie_closest_merkle_value, read_child_trie_hash, read_child_trie_value, + read_trie_closest_merkle_value, read_trie_value, trie_types::{TrieDBBuilder, TrieError}, DBValue, KeySpacedDB, NodeCodec, Trie, TrieCache, TrieDBRawIterator, TrieRecorder, }; @@ -527,10 +528,7 @@ where /// Returns the hash value pub fn child_storage_hash(&self, child_info: &ChildInfo, key: &[u8]) -> Result> { - let child_root = match self.child_root(child_info)? { - Some(root) => root, - None => return Ok(None), - }; + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; let map_e = |e| format!("Trie lookup error: {}", e); @@ -553,10 +551,7 @@ where child_info: &ChildInfo, key: &[u8], ) -> Result> { - let child_root = match self.child_root(child_info)? { - Some(root) => root, - None => return Ok(None), - }; + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; let map_e = |e| format!("Trie lookup error: {}", e); @@ -573,6 +568,39 @@ where }) } + /// Get the closest merkle value at given key. + pub fn closest_merkle_value(&self, key: &[u8]) -> Result> { + let map_e = |e| format!("Trie lookup error: {}", e); + + self.with_recorder_and_cache(None, |recorder, cache| { + read_trie_closest_merkle_value::, _>(self, &self.root, key, recorder, cache) + .map_err(map_e) + }) + } + + /// Get the child closest merkle value at given key. + pub fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result> { + let Some(child_root) = self.child_root(child_info)? else { return Ok(None) }; + + let map_e = |e| format!("Trie lookup error: {}", e); + + self.with_recorder_and_cache(Some(child_root), |recorder, cache| { + read_child_trie_closest_merkle_value::, _>( + child_info.keyspace(), + self, + &child_root, + key, + recorder, + cache, + ) + .map_err(map_e) + }) + } + /// Create a raw iterator over the storage. pub fn raw_iter(&self, args: IterArgs) -> Result> { let root = if let Some(child_info) = args.child_info.as_ref() { From b04920546064fd8bfeeca6cd4c388711bc8ca761 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:25:55 +0300 Subject: [PATCH 24/27] primitives/trie: Implement closest merkle value on TrieBackend Signed-off-by: Alexandru Vasile --- primitives/state-machine/src/trie_backend.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/primitives/state-machine/src/trie_backend.rs b/primitives/state-machine/src/trie_backend.rs index ded44fda27bc..5bc22be14e33 100644 --- a/primitives/state-machine/src/trie_backend.rs +++ b/primitives/state-machine/src/trie_backend.rs @@ -406,7 +406,7 @@ where } fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { - Ok(None) + self.essence.closest_merkle_value(key) } fn child_closest_merkle_value( @@ -414,7 +414,7 @@ where child_info: &ChildInfo, key: &[u8], ) -> Result, Self::Error> { - Ok(None) + self.essence.child_closest_merkle_value(child_info, key) } fn next_storage_key(&self, key: &[u8]) -> Result, Self::Error> { From 4f94afcbb672af2b1c8514bffbe06c1398b5f398 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:34:28 +0300 Subject: [PATCH 25/27] client: Implement closest merkle value up until `StorageProvider` Signed-off-by: Alexandru Vasile --- client/api/src/backend.rs | 1 + client/db/src/bench.rs | 18 ++++++++++++++++++ client/db/src/lib.rs | 12 ++++++++++++ client/db/src/record_stats_state.rs | 12 ++++++++++++ client/service/src/client/client.rs | 9 +++++++-- 5 files changed, 50 insertions(+), 2 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index c01efb5bdc9f..2d02a861c065 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -488,6 +488,7 @@ pub trait StorageProvider> { fn child_closest_merkle_value( &self, hash: Block::Hash, + child_info: &ChildInfo, key: &StorageKey, ) -> sp_blockchain::Result>; } diff --git a/client/db/src/bench.rs b/client/db/src/bench.rs index 9307a63ad444..d8ba45c3e6cc 100644 --- a/client/db/src/bench.rs +++ b/client/db/src/bench.rs @@ -383,6 +383,24 @@ impl StateBackend> for BenchmarkingState { .child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.add_read_key(None, key); + self.state.borrow().as_ref().ok_or_else(state_err)?.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.add_read_key(None, key); + self.state + .borrow() + .as_ref() + .ok_or_else(state_err)? + .child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.add_read_key(None, key); self.state.borrow().as_ref().ok_or_else(state_err)?.exists_storage(key) diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index aba5b0829b5b..fb3cc172a886 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -215,6 +215,18 @@ impl StateBackend> for RefTrackingState { self.state.child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.state.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.state.child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.state.exists_storage(key) } diff --git a/client/db/src/record_stats_state.rs b/client/db/src/record_stats_state.rs index 005315ce9f45..b7244db5fd8a 100644 --- a/client/db/src/record_stats_state.rs +++ b/client/db/src/record_stats_state.rs @@ -145,6 +145,18 @@ impl>, B: BlockT> StateBackend> self.state.child_storage_hash(child_info, key) } + fn closest_merkle_value(&self, key: &[u8]) -> Result, Self::Error> { + self.state.closest_merkle_value(key) + } + + fn child_closest_merkle_value( + &self, + child_info: &ChildInfo, + key: &[u8], + ) -> Result, Self::Error> { + self.state.child_closest_merkle_value(child_info, key) + } + fn exists_storage(&self, key: &[u8]) -> Result { self.state.exists_storage(key) } diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 4238ba407996..931ad8140d60 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1561,15 +1561,20 @@ where hash: ::Hash, key: &StorageKey, ) -> blockchain::Result::Hash>> { - Ok(None) + self.state_at(hash)? + .closest_merkle_value(&key.0) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) } fn child_closest_merkle_value( &self, hash: ::Hash, + child_info: &ChildInfo, key: &StorageKey, ) -> blockchain::Result::Hash>> { - Ok(None) + self.state_at(hash)? + .child_closest_merkle_value(child_info, &key.0) + .map_err(|e| sp_blockchain::Error::from_state(Box::new(e))) } } From 253edaec3c88d3c6f239a50aef0779f840ac22ca Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Tue, 22 Aug 2023 14:37:46 +0300 Subject: [PATCH 26/27] chianHead/storage: Query the storage for closest merkle value Signed-off-by: Alexandru Vasile --- .../src/chain_head/chain_head_storage.rs | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs index 5e1f38f9a997..224a50b48778 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head_storage.rs @@ -145,6 +145,31 @@ where .unwrap_or_else(|error| QueryResult::Err(error.to_string())) } + /// Fetch the closest merkle value. + fn query_storage_merkle_value( + &self, + hash: Block::Hash, + key: &StorageKey, + child_key: Option<&ChildInfo>, + ) -> QueryResult { + let result = if let Some(child_key) = child_key { + self.client.child_closest_merkle_value(hash, child_key, key) + } else { + self.client.closest_merkle_value(hash, key) + }; + + result + .map(|opt| { + QueryResult::Ok(opt.map(|storage_data| StorageResult { + key: hex_string(&key.0), + result: StorageResultType::ClosestDescendantMerkleValue(hex_string( + &storage_data.as_ref(), + )), + })) + }) + .unwrap_or_else(|error| QueryResult::Err(error.to_string())) + } + /// Iterate over at most `operation_max_storage_items` keys. /// /// Returns the storage result with a potential next key to resume iteration. @@ -288,13 +313,21 @@ where return }, }, + StorageQueryType::ClosestDescendantMerkleValue => + match self.query_storage_merkle_value(hash, &item.key, child_key.as_ref()) { + Ok(Some(value)) => storage_results.push(value), + Ok(None) => continue, + Err(error) => { + send_error::(&sender, operation.operation_id(), error); + return + }, + }, StorageQueryType::DescendantsValues => self .iter_operations .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Value }), StorageQueryType::DescendantsHashes => self .iter_operations .push_back(QueryIter { next_key: item.key, ty: IterQueryType::Hash }), - _ => continue, }; } From 9ca904ddadf09392e437a67638b83f011cd96df7 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 23 Aug 2023 15:50:06 +0300 Subject: [PATCH 27/27] chainHead/tests: Check merkle values propagate to the chainHead_storage Signed-off-by: Alexandru Vasile --- .../rpc-spec-v2/src/chain_head/chain_head.rs | 11 +- .../rpc-spec-v2/src/chain_head/test_utils.rs | 17 ++ client/rpc-spec-v2/src/chain_head/tests.rs | 146 +++++++++++++++++- 3 files changed, 164 insertions(+), 10 deletions(-) diff --git a/client/rpc-spec-v2/src/chain_head/chain_head.rs b/client/rpc-spec-v2/src/chain_head/chain_head.rs index bae7c84df0ed..7be934531332 100644 --- a/client/rpc-spec-v2/src/chain_head/chain_head.rs +++ b/client/rpc-spec-v2/src/chain_head/chain_head.rs @@ -27,7 +27,7 @@ use crate::{ api::ChainHeadApiServer, chain_head_follow::ChainHeadFollower, error::Error as ChainHeadRpcError, - event::{FollowEvent, MethodResponse, OperationError, StorageQuery, StorageQueryType}, + event::{FollowEvent, MethodResponse, OperationError, StorageQuery}, hex_string, subscription::{SubscriptionManagement, SubscriptionManagementError}, }, @@ -329,19 +329,12 @@ where let items = items .into_iter() .map(|query| { - if query.query_type == StorageQueryType::ClosestDescendantMerkleValue { - // Note: remove this once all types are implemented. - return Err(ChainHeadRpcError::InvalidParam( - "Storage query type not supported".into(), - )) - } - Ok(StorageQuery { key: StorageKey(parse_hex_param(query.key)?), query_type: query.query_type, }) }) - .collect::, _>>()?; + .collect::, ChainHeadRpcError>>()?; let child_trie = child_trie .map(|child_trie| parse_hex_param(child_trie)) diff --git a/client/rpc-spec-v2/src/chain_head/test_utils.rs b/client/rpc-spec-v2/src/chain_head/test_utils.rs index 54c585932a74..628aaa39e505 100644 --- a/client/rpc-spec-v2/src/chain_head/test_utils.rs +++ b/client/rpc-spec-v2/src/chain_head/test_utils.rs @@ -198,6 +198,23 @@ impl< ) -> sp_blockchain::Result> { self.client.child_storage_hash(hash, child_info, key) } + + fn closest_merkle_value( + &self, + hash: ::Hash, + key: &StorageKey, + ) -> sp_blockchain::Result::Hash>> { + self.client.closest_merkle_value(hash, key) + } + + fn child_closest_merkle_value( + &self, + hash: ::Hash, + child_info: &ChildInfo, + key: &StorageKey, + ) -> sp_blockchain::Result::Hash>> { + self.client.child_closest_merkle_value(hash, child_info, key) + } } impl> CallApiAt for ChainHeadMockClient { diff --git a/client/rpc-spec-v2/src/chain_head/tests.rs b/client/rpc-spec-v2/src/chain_head/tests.rs index 00ed9089058e..746a799d225c 100644 --- a/client/rpc-spec-v2/src/chain_head/tests.rs +++ b/client/rpc-spec-v2/src/chain_head/tests.rs @@ -25,7 +25,12 @@ use sp_core::{ Blake2Hasher, Hasher, }; use sp_version::RuntimeVersion; -use std::{collections::HashSet, fmt::Debug, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fmt::Debug, + sync::Arc, + time::Duration, +}; use substrate_test_runtime::Transfer; use substrate_test_runtime_client::{ prelude::*, runtime, runtime::RuntimeApi, Backend, BlockBuilderExt, Client, @@ -2565,3 +2570,142 @@ async fn stop_storage_operation() { ) .await; } + +#[tokio::test] +async fn storage_closest_merkle_value() { + let child_info = ChildInfo::new_default(CHILD_STORAGE_KEY); + let builder = TestClientBuilder::new().add_extra_child_storage( + &child_info, + KEY.to_vec(), + CHILD_VALUE.to_vec(), + ); + let backend = builder.backend(); + let mut client = Arc::new(builder.build()); + + let api = ChainHead::new( + client.clone(), + backend, + Arc::new(TaskExecutor::default()), + CHAIN_GENESIS, + ChainHeadConfig { + global_max_pinned_blocks: MAX_PINNED_BLOCKS, + subscription_max_pinned_duration: Duration::from_secs(MAX_PINNED_SECS), + subscription_max_ongoing_operations: MAX_OPERATIONS, + operation_max_storage_items: MAX_PAGINATION_LIMIT, + }, + ) + .into_rpc(); + + let mut sub = api.subscribe("chainHead_unstable_follow", [true]).await.unwrap(); + let sub_id = sub.subscription_id(); + let sub_id = serde_json::to_string(&sub_id).unwrap(); + + // Import a new block with storage changes. + let mut builder = client.new_block(Default::default()).unwrap(); + builder.push_storage_change(b":a".to_vec(), Some(b"a".to_vec())).unwrap(); + builder.push_storage_change(b":aa".to_vec(), Some(b"aa".to_vec())).unwrap(); + builder.push_storage_change(b":aaa".to_vec(), Some(b"aaa".to_vec())).unwrap(); + builder.push_storage_change(b":ab".to_vec(), Some(b"ab".to_vec())).unwrap(); + builder.push_storage_change(b":b".to_vec(), Some(b"b".to_vec())).unwrap(); + let block = builder.build().unwrap().block; + let block_hash = format!("{:?}", block.header.hash()); + client.import(BlockOrigin::Own, block.clone()).await.unwrap(); + + // Ensure the imported block is propagated and pinned for this subscription. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::Initialized(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::NewBlock(_) + ); + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::BestBlockChanged(_) + ); + + // Valid call with storage at the keys. + let response: MethodResponse = api + .call( + "chainHead_unstable_storage", + rpc_params![ + &sub_id, + &block_hash, + vec![ + StorageQuery { + key: hex_string(b":a"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":aa"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":aaa"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":ab"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + StorageQuery { + key: hex_string(b":b"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + // // Key not existant, the partial key is present however. + // StorageQuery { + // key: hex_string(b":aac"), + // query_type: StorageQueryType::ClosestDescendantMerkleValue + // }, + // Key not existant, the partial key is present however. + StorageQuery { + key: hex_string(b":abc"), + query_type: StorageQueryType::ClosestDescendantMerkleValue + }, + ] + ], + ) + .await + .unwrap(); + let operation_id = match response { + MethodResponse::Started(started) => started.operation_id, + MethodResponse::LimitReached => panic!("Expected started response"), + }; + + let event = get_next_event::>(&mut sub).await; + let merkle_values: HashMap<_, _> = match event { + FollowEvent::OperationStorageItems(res) => { + assert_eq!(res.operation_id, operation_id); + assert_eq!(res.items.len(), 6); + + res.items + .into_iter() + .map(|res| { + let value = match res.result { + StorageResultType::ClosestDescendantMerkleValue(value) => value, + _ => panic!("Unexpected StorageResultType"), + }; + (res.key, value) + }) + .collect() + }, + _ => panic!("Expected OperationStorageItems event"), + }; + + // Finished. + assert_matches!( + get_next_event::>(&mut sub).await, + FollowEvent::OperationStorageDone(done) if done.operation_id == operation_id + ); + + // assert_eq!( + // merkle_values.get(&hex_string(b":aac")).unwrap(), + // merkle_values.get(&hex_string(b":aa")).unwrap() + // ); + + assert_eq!( + merkle_values.get(&hex_string(b":abc")).unwrap(), + merkle_values.get(&hex_string(b":ab")).unwrap() + ); +}