diff --git a/Cargo.lock b/Cargo.lock index cad6c654044..b12afa8bd93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4864,6 +4864,7 @@ dependencies = [ "image", "key-wallet", "key-wallet-manager", + "parking_lot", "platform-encryption", "rand 0.8.5", "serde_json", diff --git a/packages/rs-platform-wallet/Cargo.toml b/packages/rs-platform-wallet/Cargo.toml index 71e0e0e9bc8..dcacd15b594 100644 --- a/packages/rs-platform-wallet/Cargo.toml +++ b/packages/rs-platform-wallet/Cargo.toml @@ -47,6 +47,12 @@ image = { version = "0.25", default-features = false, features = ["png", "jpeg", # Security zeroize = "1" +# Sync primitives used by the optional `lock-stats` feature for the +# per-tag breakdown map. Plain `parking_lot::Mutex` (sync, fast) is the +# right shape here — the per-tag map is touched on the lock acquire / +# release path, never across an `.await`. +parking_lot = { version = "0.12", optional = true } + # Shielded pool (optional, behind `shielded` feature) grovedb-commitment-tree = { git = "https://github.com/dashpay/grovedb", rev = "8f25b20d04bfc0e8bdfb3870676d647a0d74918b", optional = true } zip32 = { version = "0.2.0", default-features = false, optional = true } @@ -62,3 +68,12 @@ default = ["bls", "eddsa"] bls = ["key-wallet/bls", "key-wallet-manager/bls"] eddsa = ["key-wallet/eddsa", "key-wallet-manager/eddsa"] shielded = ["dep:grovedb-commitment-tree", "dep:zip32", "dash-sdk/shielded", "dpp/shielded-client"] + +# Off by default. When enabled, the per-wallet `wallet_manager` RwLock is +# wrapped with an `InstrumentedRwLock` that records acquisition counts, +# wait time, and hold time per call site (using `read_at("tag")` / +# `write_at("tag")`). With the feature off the wrapper is a transparent +# type alias for `tokio::sync::RwLock` and there is zero runtime cost. +# See `crate::diagnostics::instrumented_lock` for the API and +# `LockStats` for the snapshot shape. +lock-stats = ["dep:parking_lot"] diff --git a/packages/rs-platform-wallet/src/changeset/core_bridge.rs b/packages/rs-platform-wallet/src/changeset/core_bridge.rs index ddfcfe956e0..de25476a45e 100644 --- a/packages/rs-platform-wallet/src/changeset/core_bridge.rs +++ b/packages/rs-platform-wallet/src/changeset/core_bridge.rs @@ -29,12 +29,17 @@ use key_wallet::transaction_checking::TransactionContext; use key_wallet::Utxo; use key_wallet_manager::{WalletEvent, WalletId, WalletManager}; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::changeset::changeset::{CoreChangeSet, PlatformWalletChangeSet}; use crate::changeset::traits::PlatformWalletPersistence; +// `InstrumentedRwLockExt` is unused when `lock-stats` is on — the +// wrapper has the same methods inherent and method resolution prefers +// inherent over trait. Suppress the warning so the call sites can +// import the trait unconditionally. +#[allow(unused_imports)] +use crate::diagnostics::{InstrumentedRwLock, InstrumentedRwLockExt}; use crate::wallet::platform_wallet::PlatformWalletInfo; /// Spawn the wallet-event subscriber task. @@ -45,13 +50,17 @@ use crate::wallet::platform_wallet::PlatformWalletInfo; /// [`PlatformWalletPersistence::store`]. Exits when `cancel` fires /// or the upstream broadcast channel closes. pub fn spawn_wallet_event_adapter( - wallet_manager: Arc>>, + wallet_manager: Arc>>, persister: Arc, cancel: CancellationToken, ) -> JoinHandle<()> { tokio::spawn(async move { let mut receiver = { - let guard = wallet_manager.read().await; + // Subscribe-time read; happens once at task start. Tagged + // so the `lock-stats` feature can confirm the one-shot + // nature of this acquisition vs. the per-event probe in + // `is_chain_locked`. + let guard = wallet_manager.read_at("event_adapter::subscribe").await; guard.subscribe_events() }; tracing::debug!("WalletEventAdapter task started"); @@ -108,7 +117,7 @@ pub fn spawn_wallet_event_adapter( /// Project an upstream [`WalletEvent`] into a [`CoreChangeSet`] suitable /// for atomic persistence. async fn build_core_changeset( - wallet_manager: &Arc>>, + wallet_manager: &Arc>>, event: &WalletEvent, ) -> CoreChangeSet { match event { @@ -172,11 +181,18 @@ async fn build_core_changeset( /// Returns `true` when the wallet's stored record for `txid` is in a /// chain-locked block. Used to gate IS-lock projection. async fn is_chain_locked( - wallet_manager: &Arc>>, + wallet_manager: &Arc>>, wallet_id: &WalletId, txid: &dashcore::Txid, ) -> bool { - let guard = wallet_manager.read().await; + // Tagged so the `lock-stats` feature can attribute this site's + // contribution to wallet-manager contention. The event adapter + // touches this lock once per `TransactionInstantLocked` event; + // tagging lets a perf audit distinguish the IS-lock finality + // probe from generic identity / token / address-sync reads. + let guard = wallet_manager + .read_at("event_adapter::is_chain_locked") + .await; let Some(info) = guard.get_wallet_info(wallet_id) else { return false; }; diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs new file mode 100644 index 00000000000..94414cacf02 --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs @@ -0,0 +1,594 @@ +//! `InstrumentedRwLock` — opt-in instrumented wrapper around +//! [`tokio::sync::RwLock`]. +//! +//! # Build modes +//! +//! - **`lock-stats` feature OFF (default)** — `InstrumentedRwLock` is +//! a literal type alias for [`tokio::sync::RwLock`]. There is no +//! wrapper struct, no extra `Arc`, and no `Drop` glue. The tagged +//! methods (`read_at` / `write_at` / `try_*_at` / `blocking_*_at`) +//! plus `raw_arc` are provided by zero-cost extension traits +//! ([`InstrumentedRwLockExt`] and [`InstrumentedArcExt`]) whose +//! methods are `#[inline]` and drop the tag at the call site. After +//! inlining, every call collapses to the equivalent inherent +//! [`tokio::sync::RwLock`] method. +//! +//! - **`lock-stats` feature ON** — `InstrumentedRwLock` is a +//! wrapper struct holding `Arc>` plus +//! `Arc<`[`LockStats`]`>`. Each acquisition records the wait time +//! (until the guard resolves) and the hold time (until the guard +//! drops). The tagged methods bucket the acquisition under the +//! given tag; untagged calls bucket into [`UNTAGGED`]. +//! +//! # Why an inner Arc when the feature is on +//! +//! The wrapper has to hand `Arc>` to APIs that +//! take that concrete type literally (e.g. `dash_spv::DashSpvClient::new` +//! which takes `Arc>`). With the feature off the wallet-manager +//! field IS `Arc>`, so [`InstrumentedArcExt::raw_arc`] +//! reduces to `Arc::clone(self)`. With the feature on the wrapper holds +//! its tokio lock as `Arc>` internally so the same +//! `raw_arc()` call extracts the inner Arc. SPV's own acquisitions go +//! through that inner Arc directly and are NOT seen by the wrapper's +//! stats — the intentional trade is that platform-side acquisitions +//! (everything that goes through `wallet_manager.read()` / +//! `wallet_manager.read_at("…")`) are counted, while upstream's own +//! `process_block` write isn't. That's the right shape for "what does +//! platform-wallet contribute to lock pressure?", which is the question +//! this layer was added to answer. +//! +//! # Tagged call sites +//! +//! Untagged calls (`lock.read().await`) bucket into [`UNTAGGED`] — +//! a useful aggregate but not actionable when you're trying to find +//! the specific code path serializing the lock. Tagging individual +//! sites (`lock.read_at("event_adapter::is_chain_locked").await`) is +//! the path to actionable contention numbers. The tag is `&'static str` +//! so it doesn't allocate on the hot path; with the feature off the +//! tag is dropped at the `read_at` boundary and the call collapses +//! into a plain `read().await`. +//! +//! # Snapshot shape +//! +//! With `lock-stats` enabled, calling [`InstrumentedRwLock::stats`] +//! hands back the shared `Arc`. From there +//! [`LockStats::snapshot`] produces a [`Snapshot`] containing the +//! global counters and the per-tag breakdown — clone the snapshot +//! wherever you need to print or log it (e.g. an FFI accessor or a +//! periodic `tracing::info!`). + +#![allow(unused_imports)] // some imports are only used under one cfg branch + +use std::future::Future; +use std::sync::Arc; + +use tokio::sync::{ + RwLock as TokioRwLock, RwLockReadGuard as TokioReadGuard, RwLockWriteGuard as TokioWriteGuard, + TryLockError, +}; + +#[cfg(feature = "lock-stats")] +mod stats; + +#[cfg(feature = "lock-stats")] +pub use stats::{LockStats, SiteStats, Snapshot}; + +#[cfg(feature = "lock-stats")] +use std::time::Instant; + +/// The default tag attributed to acquisitions made through the +/// un-suffixed methods (`read`, `write`, `try_read`, …). Visible in +/// [`LockStats`] snapshots so it's clear which acquisitions came +/// from un-tagged sites. +pub const UNTAGGED: &str = "untagged"; + +// --------------------------------------------------------------------------- +// Extension traits +// +// Defined unconditionally so call sites import them once and stay agnostic +// to the feature flag. The impls differ per cfg branch — see below. +// --------------------------------------------------------------------------- + +/// Tagged-acquisition methods on a `RwLock`-shaped lock. +/// +/// In feature-off mode the impl forwards to the corresponding tokio +/// inherent method and drops the tag. In feature-on mode the impl +/// forwards to the wrapper's inherent method, which records the +/// acquisition under `tag`. +pub trait InstrumentedRwLockExt { + /// Acquire a shared lock, attributing the acquisition to `tag` + /// (when `lock-stats` is enabled). + fn read_at(&self, tag: &'static str) -> impl Future> + Send; + /// Acquire an exclusive lock, attributing the acquisition to `tag`. + fn write_at(&self, tag: &'static str) -> impl Future> + Send; + /// Try to acquire a shared lock without waiting. + fn try_read_at(&self, tag: &'static str) -> Result, TryLockError>; + /// Try to acquire an exclusive lock without waiting. + fn try_write_at(&self, tag: &'static str) -> Result, TryLockError>; + /// Synchronously acquire a shared lock — must NOT be called from a + /// tokio runtime thread (will panic). + fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T>; + /// Synchronously acquire an exclusive lock — must NOT be called + /// from a tokio runtime thread. + fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T>; +} + +/// `raw_arc()` extension on a shared handle to the lock. Returns the +/// `Arc>` shape that external APIs take literally +/// (e.g. `dash_spv::DashSpvClient::new`). With the feature off the +/// handle IS the tokio Arc, so this is `Arc::clone(self)`. With the +/// feature on the handle is `Arc` and this extracts the +/// wrapper's inner Arc. +pub trait InstrumentedArcExt { + /// Cheap clone of the underlying `Arc>`. + /// Acquisitions made through the returned Arc bypass the wrapper's + /// stats — see the module-level docs for the rationale. + fn raw_arc(&self) -> Arc>; +} + +// --------------------------------------------------------------------------- +// Feature OFF: type aliases — the wrapper IS tokio's RwLock. +// --------------------------------------------------------------------------- + +#[cfg(not(feature = "lock-stats"))] +mod alias_mode { + use super::*; + + /// Type alias for [`tokio::sync::RwLock`] when `lock-stats` is + /// off. No wrapper struct, no extra `Arc`, no per-method + /// instrumentation cost. + pub type InstrumentedRwLock = TokioRwLock; + + /// Type alias for [`tokio::sync::RwLockReadGuard<'a, T>`] when + /// `lock-stats` is off. + pub type ReadGuard<'a, T> = TokioReadGuard<'a, T>; + + /// Type alias for [`tokio::sync::RwLockWriteGuard<'a, T>`] when + /// `lock-stats` is off. + pub type WriteGuard<'a, T> = TokioWriteGuard<'a, T>; + + impl InstrumentedRwLockExt for TokioRwLock { + #[inline] + fn read_at(&self, _tag: &'static str) -> impl Future> + Send { + self.read() + } + + #[inline] + fn write_at(&self, _tag: &'static str) -> impl Future> + Send { + self.write() + } + + #[inline] + fn try_read_at(&self, _tag: &'static str) -> Result, TryLockError> { + self.try_read() + } + + #[inline] + fn try_write_at(&self, _tag: &'static str) -> Result, TryLockError> { + self.try_write() + } + + #[inline] + fn blocking_read_at(&self, _tag: &'static str) -> ReadGuard<'_, T> { + self.blocking_read() + } + + #[inline] + fn blocking_write_at(&self, _tag: &'static str) -> WriteGuard<'_, T> { + self.blocking_write() + } + } + + impl InstrumentedArcExt for Arc> { + #[inline] + fn raw_arc(&self) -> Arc> { + Arc::clone(self) + } + } +} + +#[cfg(not(feature = "lock-stats"))] +pub use alias_mode::{InstrumentedRwLock, ReadGuard, WriteGuard}; + +// --------------------------------------------------------------------------- +// Feature ON: full wrapper struct with stats. +// --------------------------------------------------------------------------- + +#[cfg(feature = "lock-stats")] +mod struct_mode { + use super::*; + use std::ops::{Deref, DerefMut}; + + /// Wrapper around [`tokio::sync::RwLock`] that records + /// per-call-site acquisition counts plus wait and hold durations. + /// See the module-level docs. + pub struct InstrumentedRwLock { + inner: Arc>, + stats: Arc, + } + + impl InstrumentedRwLock { + /// Construct a new lock holding `value`. + pub fn new(value: T) -> Self { + Self { + inner: Arc::new(TokioRwLock::new(value)), + stats: Arc::new(LockStats::new()), + } + } + + /// Borrow the wrapped tokio lock. Use only for APIs that + /// genuinely need a `&TokioRwLock`; prefer the wrapper's + /// own methods so acquisitions stay attributed. + #[inline] + pub fn raw(&self) -> &TokioRwLock { + &self.inner + } + + /// Cheap clone of the inner `Arc>`. See the + /// [`InstrumentedArcExt::raw_arc`] doc for the trade. + #[inline] + pub fn raw_arc(&self) -> Arc> { + Arc::clone(&self.inner) + } + + /// Shared handle to the per-lock stats snapshot store. + #[inline] + pub fn stats(&self) -> Arc { + Arc::clone(&self.stats) + } + + /// Acquire a shared lock — buckets into [`UNTAGGED`]. + #[inline] + pub async fn read(&self) -> ReadGuard<'_, T> { + self.read_at(UNTAGGED).await + } + + /// Acquire an exclusive lock — buckets into [`UNTAGGED`]. + #[inline] + pub async fn write(&self) -> WriteGuard<'_, T> { + self.write_at(UNTAGGED).await + } + + /// Acquire a shared lock with a per-call-site tag. + pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.read().await; + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_read_acquired(tag, wait_ns); + ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Acquire an exclusive lock with a per-call-site tag. + pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.write().await; + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_write_acquired(tag, wait_ns); + WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Try to acquire a shared lock without waiting. + #[inline] + pub fn try_read(&self) -> Result, TryLockError> { + self.try_read_at(UNTAGGED) + } + + /// Try to acquire an exclusive lock without waiting. + #[inline] + pub fn try_write(&self) -> Result, TryLockError> { + self.try_write_at(UNTAGGED) + } + + /// Tagged variant of [`try_read`](Self::try_read). + pub fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> { + match self.inner.try_read() { + Ok(inner) => { + self.stats.record_read_acquired(tag, 0); + Ok(ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + }) + } + Err(e) => { + self.stats.record_read_contended(tag); + Err(e) + } + } + } + + /// Tagged variant of [`try_write`](Self::try_write). + pub fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> { + match self.inner.try_write() { + Ok(inner) => { + self.stats.record_write_acquired(tag, 0); + Ok(WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + }) + } + Err(e) => { + self.stats.record_write_contended(tag); + Err(e) + } + } + } + + /// Synchronously acquire a shared lock — must NOT be called from + /// a tokio runtime thread. + #[inline] + pub fn blocking_read(&self) -> ReadGuard<'_, T> { + self.blocking_read_at(UNTAGGED) + } + + /// Synchronously acquire an exclusive lock — must NOT be called + /// from a tokio runtime thread. + #[inline] + pub fn blocking_write(&self) -> WriteGuard<'_, T> { + self.blocking_write_at(UNTAGGED) + } + + /// Tagged variant of [`blocking_read`](Self::blocking_read). + pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.blocking_read(); + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_read_acquired(tag, wait_ns); + ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Tagged variant of [`blocking_write`](Self::blocking_write). + pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.blocking_write(); + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_write_acquired(tag, wait_ns); + WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + } + + impl Default for InstrumentedRwLock { + fn default() -> Self { + Self::new(T::default()) + } + } + + impl std::fmt::Debug for InstrumentedRwLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InstrumentedRwLock") + .field("inner", &self.inner) + .finish() + } + } + + /// Shared read guard. `Deref`. Records hold time on + /// `Drop`. + pub struct ReadGuard<'a, T> { + inner: TokioReadGuard<'a, T>, + stats: Arc, + tag: &'static str, + acquired_at: Instant, + } + + impl Deref for ReadGuard<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + &self.inner + } + } + + impl Drop for ReadGuard<'_, T> { + fn drop(&mut self) { + let held_ns = self.acquired_at.elapsed().as_nanos() as u64; + self.stats.record_read_released(self.tag, held_ns); + } + } + + /// Exclusive write guard. `Deref` + `DerefMut`. Records + /// hold time on `Drop`. + pub struct WriteGuard<'a, T> { + inner: TokioWriteGuard<'a, T>, + stats: Arc, + tag: &'static str, + acquired_at: Instant, + } + + impl Deref for WriteGuard<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + &self.inner + } + } + + impl DerefMut for WriteGuard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut self.inner + } + } + + impl Drop for WriteGuard<'_, T> { + fn drop(&mut self) { + let held_ns = self.acquired_at.elapsed().as_nanos() as u64; + self.stats.record_write_released(self.tag, held_ns); + } + } + + // Trait impls so feature-agnostic call sites that import the Ext + // traits keep working. Method resolution prefers inherent methods + // when they exist, so these are mostly redundant in feature-on + // mode — they're here so a generic helper bounded by + // `InstrumentedRwLockExt` can take the wrapper just as easily as + // it can take a raw `TokioRwLock`. + impl InstrumentedRwLockExt for InstrumentedRwLock { + #[inline] + fn read_at(&self, tag: &'static str) -> impl Future> + Send { + InstrumentedRwLock::read_at(self, tag) + } + + #[inline] + fn write_at(&self, tag: &'static str) -> impl Future> + Send { + InstrumentedRwLock::write_at(self, tag) + } + + #[inline] + fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> { + InstrumentedRwLock::try_read_at(self, tag) + } + + #[inline] + fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> { + InstrumentedRwLock::try_write_at(self, tag) + } + + #[inline] + fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + InstrumentedRwLock::blocking_read_at(self, tag) + } + + #[inline] + fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + InstrumentedRwLock::blocking_write_at(self, tag) + } + } + + impl InstrumentedArcExt for Arc> { + #[inline] + fn raw_arc(&self) -> Arc> { + InstrumentedRwLock::raw_arc(self) + } + } +} + +#[cfg(feature = "lock-stats")] +pub use struct_mode::{InstrumentedRwLock, ReadGuard, WriteGuard}; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn read_write_smoke() { + let lock = InstrumentedRwLock::new(0u32); + { + let guard = lock.read().await; + assert_eq!(*guard, 0); + } + { + let mut guard = lock.write().await; + *guard = 42; + } + let guard = lock.read().await; + assert_eq!(*guard, 42); + } + + #[tokio::test] + async fn try_read_contended() { + let lock = InstrumentedRwLock::new(0u32); + let _w = lock.write().await; + assert!(lock.try_read().is_err()); + } + + #[tokio::test] + async fn read_at_smoke() { + // Tagged calls work in both feature modes — feature-off via the + // `InstrumentedRwLockExt` trait, feature-on via inherent methods. + let lock = InstrumentedRwLock::new(0u32); + let guard = lock.read_at("test::tag").await; + assert_eq!(*guard, 0); + } + + #[tokio::test] + async fn raw_arc_smoke() { + // `raw_arc()` works in both modes — feature-off via the + // `InstrumentedArcExt` trait on `Arc`, feature-on + // via the wrapper's inherent method. + let lock = Arc::new(InstrumentedRwLock::new(0u32)); + let raw: Arc> = lock.raw_arc(); + let guard = raw.read().await; + assert_eq!(*guard, 0); + } + + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn stats_count_and_attribute_to_tag() { + let lock = InstrumentedRwLock::new(0u32); + + // Two reads tagged "ours", one write tagged "theirs". + { + let _r1 = lock.read_at("ours").await; + let _r2 = lock.read_at("ours").await; + } + { + let _w = lock.write_at("theirs").await; + } + + let snap = lock.stats().snapshot(); + let ours = snap.per_tag.get("ours").expect("ours tag present"); + assert_eq!(ours.read_acquired, 2); + assert_eq!(ours.write_acquired, 0); + let theirs = snap.per_tag.get("theirs").expect("theirs tag present"); + assert_eq!(theirs.read_acquired, 0); + assert_eq!(theirs.write_acquired, 1); + assert_eq!(snap.total.read_acquired, 2); + assert_eq!(snap.total.write_acquired, 1); + } + + // Untagged calls go to the UNTAGGED bucket so the snapshot still + // accounts for them — we don't want acquisitions to vanish. + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn untagged_calls_go_to_untagged_bucket() { + let lock = InstrumentedRwLock::new(0u32); + { + let _r = lock.read().await; + } + let snap = lock.stats().snapshot(); + let untagged = snap.per_tag.get(UNTAGGED).expect("UNTAGGED bucket"); + assert_eq!(untagged.read_acquired, 1); + } + + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn try_read_failure_records_contention() { + let lock = InstrumentedRwLock::new(0u32); + let _w = lock.write_at("holder").await; + let r = lock.try_read_at("contender"); + assert!(r.is_err()); + let snap = lock.stats().snapshot(); + let contender = snap.per_tag.get("contender").expect("contender tag"); + assert_eq!(contender.read_contended, 1); + assert_eq!(contender.read_acquired, 0); + } +} diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs new file mode 100644 index 00000000000..d6f5a8dae17 --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs @@ -0,0 +1,238 @@ +//! Stats counters for [`super::InstrumentedRwLock`]. Only compiled when +//! the `lock-stats` Cargo feature is enabled. +//! +//! # Storage shape +//! +//! - **Total counters** — atomic `u64`s, lock-free, bumped from every +//! acquire / release path. Cheap enough that the bump is in the +//! noise even under heavy contention. +//! - **Per-tag breakdown** — `BTreeMap<&'static str, SiteStats>` behind +//! a `parking_lot::Mutex`. The map is touched only on the lock +//! acquire / release boundary (never across an `.await`), so a sync +//! mutex is appropriate. The acquire path holds the mutex just long +//! enough to look up or insert the entry, then bumps the entry's +//! atomics outside the lock. New tags are inserted lazily on first +//! use; existing tags re-use the entry. + +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use parking_lot::Mutex; + +/// Aggregate lock-acquisition counters maintained by an +/// [`InstrumentedRwLock`](super::InstrumentedRwLock). +/// +/// Use [`LockStats::snapshot`] to clone out a readable [`Snapshot`]. +/// The live [`LockStats`] keeps its counters as atomics so reads via +/// `snapshot` don't race with writers. +#[derive(Debug)] +pub struct LockStats { + total: SiteCounters, + per_tag: Mutex>>, +} + +impl LockStats { + pub(super) fn new() -> Self { + Self { + total: SiteCounters::new(), + per_tag: Mutex::new(BTreeMap::new()), + } + } + + /// Take a snapshot of the current counters. Cheap enough to call + /// from a debug UI on every refresh; a periodic logger could call + /// it on a 1-second timer without measurable overhead. + pub fn snapshot(&self) -> Snapshot { + let per_tag: BTreeMap<&'static str, SiteStats> = self + .per_tag + .lock() + .iter() + .map(|(tag, counters)| (*tag, counters.snapshot())) + .collect(); + Snapshot { + total: self.total.snapshot(), + per_tag, + } + } + + /// Look up (or insert on first use) the [`SiteCounters`] for a tag. + /// Holds the per-tag mutex only for the lookup; the returned `Arc` + /// lets the caller bump atomics outside the mutex. + fn site(&self, tag: &'static str) -> Arc { + let mut guard = self.per_tag.lock(); + if let Some(existing) = guard.get(tag) { + return Arc::clone(existing); + } + let new = Arc::new(SiteCounters::new()); + guard.insert(tag, Arc::clone(&new)); + new + } + + pub(super) fn record_read_acquired(&self, tag: &'static str, wait_ns: u64) { + self.total.read_acquired.fetch_add(1, Ordering::Relaxed); + self.total + .read_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + let site = self.site(tag); + site.read_acquired.fetch_add(1, Ordering::Relaxed); + site.read_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + } + + pub(super) fn record_read_released(&self, tag: &'static str, held_ns: u64) { + self.total + .read_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + let site = self.site(tag); + site.read_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + } + + pub(super) fn record_read_contended(&self, tag: &'static str) { + self.total.read_contended.fetch_add(1, Ordering::Relaxed); + let site = self.site(tag); + site.read_contended.fetch_add(1, Ordering::Relaxed); + } + + pub(super) fn record_write_acquired(&self, tag: &'static str, wait_ns: u64) { + self.total.write_acquired.fetch_add(1, Ordering::Relaxed); + self.total + .write_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + let site = self.site(tag); + site.write_acquired.fetch_add(1, Ordering::Relaxed); + site.write_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + } + + pub(super) fn record_write_released(&self, tag: &'static str, held_ns: u64) { + self.total + .write_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + let site = self.site(tag); + site.write_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + } + + pub(super) fn record_write_contended(&self, tag: &'static str) { + self.total.write_contended.fetch_add(1, Ordering::Relaxed); + let site = self.site(tag); + site.write_contended.fetch_add(1, Ordering::Relaxed); + } +} + +/// Live atomic counters for a single bucket (the global "total" plus +/// each per-tag site). +#[derive(Debug)] +struct SiteCounters { + read_acquired: AtomicU64, + write_acquired: AtomicU64, + read_contended: AtomicU64, + write_contended: AtomicU64, + read_wait_ns_total: AtomicU64, + write_wait_ns_total: AtomicU64, + read_hold_ns_total: AtomicU64, + write_hold_ns_total: AtomicU64, +} + +impl SiteCounters { + fn new() -> Self { + Self { + read_acquired: AtomicU64::new(0), + write_acquired: AtomicU64::new(0), + read_contended: AtomicU64::new(0), + write_contended: AtomicU64::new(0), + read_wait_ns_total: AtomicU64::new(0), + write_wait_ns_total: AtomicU64::new(0), + read_hold_ns_total: AtomicU64::new(0), + write_hold_ns_total: AtomicU64::new(0), + } + } + + fn snapshot(&self) -> SiteStats { + SiteStats { + read_acquired: self.read_acquired.load(Ordering::Relaxed), + write_acquired: self.write_acquired.load(Ordering::Relaxed), + read_contended: self.read_contended.load(Ordering::Relaxed), + write_contended: self.write_contended.load(Ordering::Relaxed), + read_wait_ns_total: self.read_wait_ns_total.load(Ordering::Relaxed), + write_wait_ns_total: self.write_wait_ns_total.load(Ordering::Relaxed), + read_hold_ns_total: self.read_hold_ns_total.load(Ordering::Relaxed), + write_hold_ns_total: self.write_hold_ns_total.load(Ordering::Relaxed), + } + } +} + +/// Plain-old-data snapshot of a single bucket (the global total or a +/// single tag). All durations are in nanoseconds; cumulative across +/// every acquisition since the lock was created. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct SiteStats { + /// Number of times a read guard was successfully acquired. + pub read_acquired: u64, + /// Number of times a write guard was successfully acquired. + pub write_acquired: u64, + /// Number of times a `try_read` returned `Err(TryLockError)`. + pub read_contended: u64, + /// Number of times a `try_write` returned `Err(TryLockError)`. + pub write_contended: u64, + /// Cumulative wait time before read acquisitions resolved, in ns. + pub read_wait_ns_total: u64, + /// Cumulative wait time before write acquisitions resolved, in ns. + pub write_wait_ns_total: u64, + /// Cumulative time read guards were held before drop, in ns. + pub read_hold_ns_total: u64, + /// Cumulative time write guards were held before drop, in ns. + pub write_hold_ns_total: u64, +} + +impl SiteStats { + /// Mean wait time for read acquisitions, in nanoseconds. Returns + /// `None` if no read acquisitions have completed. + pub fn read_wait_ns_mean(&self) -> Option { + if self.read_acquired == 0 { + None + } else { + Some(self.read_wait_ns_total / self.read_acquired) + } + } + + /// Mean wait time for write acquisitions, in nanoseconds. + pub fn write_wait_ns_mean(&self) -> Option { + if self.write_acquired == 0 { + None + } else { + Some(self.write_wait_ns_total / self.write_acquired) + } + } + + /// Mean hold time for read acquisitions, in nanoseconds. + pub fn read_hold_ns_mean(&self) -> Option { + if self.read_acquired == 0 { + None + } else { + Some(self.read_hold_ns_total / self.read_acquired) + } + } + + /// Mean hold time for write acquisitions, in nanoseconds. + pub fn write_hold_ns_mean(&self) -> Option { + if self.write_acquired == 0 { + None + } else { + Some(self.write_hold_ns_total / self.write_acquired) + } + } +} + +/// Snapshot of a [`LockStats`]: aggregate totals plus the per-tag +/// breakdown. Cheap to clone; suitable for shipping through a debug +/// UI or a periodic log line. +#[derive(Debug, Clone, Default)] +pub struct Snapshot { + /// Aggregate counters across every tag (including `UNTAGGED`). + pub total: SiteStats, + /// Per-tag breakdown. Tags that have never been used don't appear. + pub per_tag: BTreeMap<&'static str, SiteStats>, +} diff --git a/packages/rs-platform-wallet/src/diagnostics/mod.rs b/packages/rs-platform-wallet/src/diagnostics/mod.rs new file mode 100644 index 00000000000..e503937a65f --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/mod.rs @@ -0,0 +1,18 @@ +//! Optional runtime diagnostics for platform-wallet. +//! +//! Currently a single submodule: +//! +//! - [`instrumented_lock`] — an `InstrumentedRwLock` newtype wrapping +//! [`tokio::sync::RwLock`] that, when the `lock-stats` Cargo feature +//! is enabled, records per-call-site acquisition counts plus wait / +//! hold durations. With the feature off the wrapper compiles down to +//! the underlying tokio lock (zero added overhead in the hot path). + +pub mod instrumented_lock; + +pub use instrumented_lock::{ + InstrumentedArcExt, InstrumentedRwLock, InstrumentedRwLockExt, ReadGuard, WriteGuard, +}; + +#[cfg(feature = "lock-stats")] +pub use instrumented_lock::{LockStats, SiteStats, Snapshot}; diff --git a/packages/rs-platform-wallet/src/lib.rs b/packages/rs-platform-wallet/src/lib.rs index 93e2d43d1ac..82049eb42bc 100644 --- a/packages/rs-platform-wallet/src/lib.rs +++ b/packages/rs-platform-wallet/src/lib.rs @@ -14,6 +14,7 @@ pub mod broadcaster; pub mod changeset; +pub mod diagnostics; pub mod error; pub mod events; pub mod manager; diff --git a/packages/rs-platform-wallet/src/manager/mod.rs b/packages/rs-platform-wallet/src/manager/mod.rs index 446830cc99d..d96e6e06503 100644 --- a/packages/rs-platform-wallet/src/manager/mod.rs +++ b/packages/rs-platform-wallet/src/manager/mod.rs @@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken; use key_wallet_manager::WalletManager; use crate::changeset::{spawn_wallet_event_adapter, PlatformWalletPersistence}; +use crate::diagnostics::InstrumentedRwLock; use crate::events::{PlatformEventHandler, PlatformEventManager}; use crate::platform_address_sync::PlatformAddressSyncManager; use crate::spv::SpvRuntime; @@ -27,7 +28,7 @@ use crate::wallet::PlatformWallet; /// [`PlatformEventHandler`]s by reference (no cloning). pub struct PlatformWalletManager { pub(super) sdk: Arc, - pub(super) wallet_manager: Arc>>, + pub(super) wallet_manager: Arc>>, /// Map of registered wallets. Held in an `Arc` so the /// `BalanceUpdateHandler` can hold a clone and look up wallets to /// update their lock-free balance atomics from event-handler @@ -64,7 +65,7 @@ impl PlatformWalletManager

{ // coerce once here and pass clones along instead of re-erasing // at every call site. let dyn_persister: Arc = Arc::clone(&persister) as _; - let wallet_manager = Arc::new(RwLock::new(WalletManager::new(sdk.network))); + let wallet_manager = Arc::new(InstrumentedRwLock::new(WalletManager::new(sdk.network))); let wallets = Arc::new(RwLock::new(std::collections::BTreeMap::new())); let lock_notify = Arc::new(Notify::new()); diff --git a/packages/rs-platform-wallet/src/spv/runtime.rs b/packages/rs-platform-wallet/src/spv/runtime.rs index eecb0e58607..70ba891d030 100644 --- a/packages/rs-platform-wallet/src/spv/runtime.rs +++ b/packages/rs-platform-wallet/src/spv/runtime.rs @@ -15,6 +15,11 @@ use dash_spv::{ClientConfig, DashSpvClient, EventHandler, Hash}; use key_wallet_manager::WalletManager; +// `InstrumentedArcExt` is unused when `lock-stats` is on — the wrapper +// has `raw_arc()` inherent and method resolution prefers it. +// Suppress the warning so the import line stays uniform across modes. +#[allow(unused_imports)] +use crate::diagnostics::{InstrumentedArcExt, InstrumentedRwLock}; use crate::error::PlatformWalletError; use crate::events::PlatformEventManager; use crate::wallet::platform_wallet::PlatformWalletInfo; @@ -28,7 +33,7 @@ type SpvClient = /// handlers by reference (no cloning). pub struct SpvRuntime { event_manager: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, client: RwLock>, /// Cancel token for the `run()` task when it was spawned via /// [`spawn_in_background`]. [`stop`] fires this token and joins @@ -39,7 +44,7 @@ pub struct SpvRuntime { impl SpvRuntime { /// Create a new SPV runtime. pub fn new( - wallet_manager: Arc>>, + wallet_manager: Arc>>, event_manager: Arc, ) -> Self { Self { @@ -73,11 +78,16 @@ impl SpvRuntime { // platform event manager's own handler list). let event_handlers: Vec> = vec![Arc::clone(&self.event_manager) as Arc]; + // Upstream takes `Arc>` literally; hand + // it the inner Arc that lives inside our `InstrumentedRwLock` + // wrapper. SPV's own acquisitions go through this Arc directly + // and are not seen by the wrapper's stats — see the wrapper's + // type-level docs for the rationale. let spv_client = DashSpvClient::new( config, network_manager, storage_manager, - Arc::clone(&self.wallet_manager), + self.wallet_manager.raw_arc(), event_handlers, ) .await diff --git a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs index 3afc0053612..bcc8f89327b 100644 --- a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs +++ b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs @@ -6,10 +6,11 @@ use std::sync::Arc; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::Notify; use crate::broadcaster::TransactionBroadcaster; use crate::changeset::changeset::AssetLockChangeSet; +use crate::diagnostics::InstrumentedRwLock; use crate::wallet::persister::WalletPersister; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -30,7 +31,7 @@ pub(super) const DEFAULT_FEE_PER_KB: u64 = 1000; pub struct AssetLockManager { pub(super) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(super) wallet_manager: Arc>>, + pub(super) wallet_manager: Arc>>, /// Identifies which wallet within the manager this manager operates on. pub(super) wallet_id: WalletId, /// Notified on InstantLock / ChainLock events by SpvEventForwarder. @@ -64,7 +65,7 @@ impl AssetLockManager { /// Create a new `AssetLockManager`. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, lock_notify: Arc, broadcaster: Arc, diff --git a/packages/rs-platform-wallet/src/wallet/core/wallet.rs b/packages/rs-platform-wallet/src/wallet/core/wallet.rs index 5a29db29002..fe282a69e52 100644 --- a/packages/rs-platform-wallet/src/wallet/core/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/core/wallet.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use super::balance::WalletBalance; use dashcore::Address as DashAddress; -use tokio::sync::RwLock; use key_wallet_manager::WalletManager; use crate::broadcaster::TransactionBroadcaster; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -24,7 +24,7 @@ use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; /// through a `dyn` vtable. pub struct CoreWallet { pub(crate) sdk: Arc, - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, pub(crate) wallet_id: WalletId, /// Injected broadcaster — delegates to SPV or DAPI depending on how /// the wallet was constructed by `PlatformWalletManager`. @@ -36,7 +36,7 @@ pub struct CoreWallet { impl CoreWallet { pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, broadcaster: Arc, balance: Arc, diff --git a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs index 80619ef8663..a872a94f4d4 100644 --- a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs +++ b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs @@ -33,10 +33,10 @@ use key_wallet::dip9::{ use key_wallet::wallet::Wallet; use key_wallet::Network; use key_wallet_manager::WalletManager; -use tokio::sync::RwLock; use zeroize::Zeroizing; use crate::broadcaster::{SpvBroadcaster, TransactionBroadcaster}; +use crate::diagnostics::{InstrumentedRwLock, ReadGuard, WriteGuard}; use crate::error::PlatformWalletError; use crate::wallet::asset_lock::manager::AssetLockManager; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -257,7 +257,7 @@ pub(crate) fn derive_identity_auth_key_hash( pub struct IdentityWallet { pub(crate) sdk: Arc, /// Shared wallet manager holding key material and wallet info. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifier for the wallet within the wallet manager. pub(crate) wallet_id: WalletId, /// Shared asset lock manager for building, broadcasting, and tracking @@ -381,9 +381,7 @@ impl IdentityWallet { /// Access wallet info via `wm.get_wallet_info(&wallet_id)` and key material /// via `wm.get_wallet(&wallet_id)` on the returned guard. The identity /// manager is on the wallet info: `info.identity_manager`. - pub async fn wallet_manager_read( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, WalletManager> { + pub async fn wallet_manager_read(&self) -> ReadGuard<'_, WalletManager> { self.wallet_manager.read().await } @@ -392,9 +390,7 @@ impl IdentityWallet { /// Access wallet info via `wm.get_wallet_info_mut(&wallet_id)` on the /// returned guard. This allows callers to mutate managed identities (e.g. /// adding or updating identities from an external persistence layer). - pub async fn wallet_manager_write( - &self, - ) -> tokio::sync::RwLockWriteGuard<'_, WalletManager> { + pub async fn wallet_manager_write(&self) -> WriteGuard<'_, WalletManager> { self.wallet_manager.write().await } @@ -404,7 +400,7 @@ impl IdentityWallet { /// Useful for synchronous callers that cannot await. pub fn try_wallet_manager_write( &self, - ) -> Option>> { + ) -> Option>> { self.wallet_manager.try_write().ok() } diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs index 807b549f8a1..72ec4ddf73e 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs @@ -30,12 +30,12 @@ use key_wallet::PlatformP2PKHAddress; use async_trait::async_trait; use key_wallet_manager::WalletManager; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use dash_sdk::platform::address_sync::{ AddressFunds, AddressIndex, AddressProvider, AddressSyncResult, }; -use tokio::sync::RwLock; /// DIP-17 address coordinates used as both the pending-bimap key and /// the SDK sync engine's `Tag`. Having the SDK carry these three @@ -151,7 +151,7 @@ pub(crate) type PerWalletInSyncPlatformAddressState = /// view of pending addresses spanning all wallets. pub(crate) struct PlatformPaymentAddressProvider { /// Shared wallet manager for gap-limit extension. - wallet_manager: Arc>>, + wallet_manager: Arc>>, /// Committed per-wallet tracked state — xpub + `addresses` bimap /// + `found` + `absent` from the last successful sync. `found` /// here is what [`current_balances`](Self::current_balances) @@ -185,7 +185,7 @@ impl PlatformPaymentAddressProvider { /// no key derivation happens here. `wallet_ids` not found in the /// wallet manager are silently skipped. pub(crate) async fn from_wallets( - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_ids: impl IntoIterator, ) -> Result { let mut per_wallet: BTreeMap = BTreeMap::new(); @@ -268,7 +268,7 @@ impl PlatformPaymentAddressProvider { /// of sync and the caller needs to reconcile rather than silently /// continue with stale data. pub async fn from_persisted( - wallet_manager: Arc>>, + wallet_manager: Arc>>, per_wallet: BTreeMap, sync_height: u64, sync_timestamp: u64, diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs index 7c618aaf0d5..745319902b4 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs @@ -6,6 +6,7 @@ use dpp::address_funds::PlatformAddress; use dpp::fee::Credits; use tokio::sync::RwLock; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use key_wallet_manager::WalletManager; @@ -19,7 +20,7 @@ use super::provider::PlatformPaymentAddressProvider; pub struct PlatformAddressWallet { pub(crate) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifies which wallet within the manager this sub-wallet operates on. pub(crate) wallet_id: WalletId, /// Single provider covering every platform payment account on the @@ -37,7 +38,7 @@ impl PlatformAddressWallet { /// Call [`initialize`] afterwards to build the unified provider. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, persister: WalletPersister, ) -> Self { diff --git a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs index 114fb291ec0..ca9f7ff2398 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs @@ -10,7 +10,10 @@ use dpp::prelude::Identifier; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet::wallet::Wallet; use key_wallet_manager::WalletManager; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use crate::diagnostics::{ + InstrumentedRwLock, ReadGuard as InstrumentedReadGuard, WriteGuard as InstrumentedWriteGuard, +}; use super::asset_lock::manager::AssetLockManager; use super::asset_lock::tracked::TrackedAssetLock; @@ -60,7 +63,7 @@ pub struct PlatformWalletInfo { pub struct PlatformWallet { wallet_id: WalletId, pub(crate) sdk: Arc, - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, // Sub-wallets that hold a broadcaster are monomorphized with // `SpvBroadcaster` — the only production broadcaster in use. // Swapping this out to another broadcaster is a three-line flip @@ -121,7 +124,7 @@ impl PlatformWallet { } /// Get a reference to the shared wallet manager lock. - pub fn wallet_manager(&self) -> &Arc>> { + pub fn wallet_manager(&self) -> &Arc>> { &self.wallet_manager } @@ -214,7 +217,7 @@ impl PlatformWallet { pub(crate) fn new( sdk: Arc, wallet_id: WalletId, - wallet_manager: Arc>>, + wallet_manager: Arc>>, balance: Arc, lock_notify: Arc, persister: Arc, @@ -424,7 +427,7 @@ impl std::fmt::Debug for PlatformWallet { /// Read guard that locks `WalletManager` and derefs to this wallet's /// `PlatformWalletInfo`. Also provides `.wallet()` for key material access. pub struct WalletStateReadGuard<'a> { - guard: RwLockReadGuard<'a, WalletManager>, + guard: InstrumentedReadGuard<'a, WalletManager>, wallet_id: WalletId, } @@ -449,7 +452,7 @@ impl Deref for WalletStateReadGuard<'_> { /// Write guard that locks `WalletManager` and derefs to this wallet's /// `PlatformWalletInfo` (with `DerefMut`). Also provides `.wallet()`. pub struct WalletStateWriteGuard<'a> { - guard: RwLockWriteGuard<'a, WalletManager>, + guard: InstrumentedWriteGuard<'a, WalletManager>, wallet_id: WalletId, } diff --git a/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs b/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs index 5b8ba807ab8..750b950ab5c 100644 --- a/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs @@ -18,12 +18,12 @@ use std::sync::Arc; use dpp::balances::credits::TokenAmount; use dpp::prelude::Identifier; -use tokio::sync::RwLock; use dash_sdk::platform::tokens::identity_token_balances::IdentityTokenBalancesQuery; use dash_sdk::platform::FetchMany; use crate::changeset::{Merge, TokenBalanceChangeSet}; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use key_wallet_manager::WalletManager; @@ -40,7 +40,7 @@ type IdentityTokenKey = (Identifier, Identifier); pub struct TokenWallet { pub(crate) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifies which wallet within the manager this sub-wallet operates on. pub(crate) wallet_id: WalletId, /// Per-wallet persistence handle for queuing changesets. @@ -51,7 +51,7 @@ impl TokenWallet { /// Create a new TokenWallet. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, persister: crate::wallet::persister::WalletPersister, ) -> Self {