From 952ded1ae795f18f521e9d8864c7ce741351f4c2 Mon Sep 17 00:00:00 2001 From: Quantum Explorer Date: Tue, 28 Apr 2026 18:38:33 +0800 Subject: [PATCH] feat(platform-wallet): instrument wallet_manager RwLock behind lock-stats feature MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds an `InstrumentedRwLock` wrapper around `tokio::sync::RwLock` that, when the new `lock-stats` Cargo feature is enabled, records per-call-site acquisition counts plus wait and hold durations. Off by default — with the feature off the wrapper is a thin newtype whose methods are `#[inline]` delegations and whose guards are zero-overhead. Motivation: SPV holds the `wallet_manager` write lock across the full duration of upstream's `process_block_for_wallets`. Anything platform- side holding a read or write lock on the same mutex blocks SPV. We already audited statically (109 acquisition sites, 52 of them writes) but had no way to measure runtime contention. This wrapper makes that measurable on demand without paying the cost when we're not measuring. Wrapper layout -------------- * `InstrumentedRwLock` holds `inner: Arc>` plus, with `lock-stats` on, an `Arc`. The inner-Arc shape lets us hand a real `Arc>` to upstream APIs that take it literally (e.g. `dash_spv::DashSpvClient::new`) via `raw_arc()`. Acquisitions made through that handed-out Arc bypass the wrapper's stats — the intentional trade is that SPV's own `process_block` write isn't tracked, but every platform-side `wallet_manager.read()` / `.write()` IS. * `read_at("tag") / write_at("tag")` (plus `try_*_at` and `blocking_*_at`) attribute the acquisition to a per-call-site bucket. Untagged calls bucket into `UNTAGGED` so no acquisition vanishes from the snapshot. * `LockStats::snapshot()` produces a `Snapshot { total, per_tag }` of cumulative counters: read/write acquired counts, contention counts (try_* failures), wait_ns_total, hold_ns_total, plus mean helpers (`read_wait_ns_mean`, etc.). Cheap enough to call on every refresh of a debug UI. * Per-tag map lives behind a `parking_lot::Mutex>>` — touched only on the lock acquire / release boundary, never across an `.await`, so a sync mutex is appropriate. * `parking_lot` is an optional dep gated on the feature; nothing pulls it in the default build. Migration & call-site tagging ----------------------------- * All 21 declarations of `Arc>>` across `manager`, `spv`, `wallet`, `changeset` modules switched to `Arc>`. The two structs that named `RwLockReadGuard` / `RwLockWriteGuard` directly (`WalletStateReadGuard` / `WalletStateWriteGuard` and `IdentityWallet::wallet_manager_read` / `_write`) switched to the wrapper guards. * Existing call sites continue to use `lock.read().await` / `.write() .await` — they bucket into `UNTAGGED`. Tagging is opt-in per site. * Two sites tagged as worked examples in `core_bridge::spawn_wallet_event_adapter`: the one-shot `event_adapter::subscribe` read at task start, and the `event_adapter::is_chain_locked` per-event probe that the audit flagged as the primary new contention point introduced by the event-bus migration. Tests ----- 5 unit tests in `diagnostics::instrumented_lock::tests` cover the no-stats path (read/write smoke, contended try_read) and the stats path (per-tag attribution, untagged bucket routing, contention counter increments). Verified: `cargo check --workspace` clean, `cargo check -p platform-wallet --features lock-stats` clean, `cargo test -p platform-wallet --features lock-stats --lib diagnostics` passes 5/5, `cargo fmt --all -- --check` clean. No FFI surface in this PR — `LockStats::snapshot()` is Rust-only for the first cut. Adding an FFI accessor is the natural next step once we know what the iOS debug UI wants from the snapshot shape. Co-Authored-By: Claude Opus 4.7 (1M context) --- Cargo.lock | 1 + packages/rs-platform-wallet/Cargo.toml | 15 + .../src/changeset/core_bridge.rs | 28 +- .../src/diagnostics/instrumented_lock/mod.rs | 594 ++++++++++++++++++ .../diagnostics/instrumented_lock/stats.rs | 238 +++++++ .../rs-platform-wallet/src/diagnostics/mod.rs | 18 + packages/rs-platform-wallet/src/lib.rs | 1 + .../rs-platform-wallet/src/manager/mod.rs | 5 +- .../rs-platform-wallet/src/spv/runtime.rs | 16 +- .../src/wallet/asset_lock/manager.rs | 7 +- .../src/wallet/core/wallet.rs | 6 +- .../identity/network/identity_handle.rs | 14 +- .../src/wallet/platform_addresses/provider.rs | 8 +- .../src/wallet/platform_addresses/wallet.rs | 5 +- .../src/wallet/platform_wallet.rs | 15 +- .../src/wallet/tokens/wallet.rs | 6 +- 16 files changed, 936 insertions(+), 41 deletions(-) create mode 100644 packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs create mode 100644 packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs create mode 100644 packages/rs-platform-wallet/src/diagnostics/mod.rs diff --git a/Cargo.lock b/Cargo.lock index cad6c654044..b12afa8bd93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4864,6 +4864,7 @@ dependencies = [ "image", "key-wallet", "key-wallet-manager", + "parking_lot", "platform-encryption", "rand 0.8.5", "serde_json", diff --git a/packages/rs-platform-wallet/Cargo.toml b/packages/rs-platform-wallet/Cargo.toml index 71e0e0e9bc8..dcacd15b594 100644 --- a/packages/rs-platform-wallet/Cargo.toml +++ b/packages/rs-platform-wallet/Cargo.toml @@ -47,6 +47,12 @@ image = { version = "0.25", default-features = false, features = ["png", "jpeg", # Security zeroize = "1" +# Sync primitives used by the optional `lock-stats` feature for the +# per-tag breakdown map. Plain `parking_lot::Mutex` (sync, fast) is the +# right shape here — the per-tag map is touched on the lock acquire / +# release path, never across an `.await`. +parking_lot = { version = "0.12", optional = true } + # Shielded pool (optional, behind `shielded` feature) grovedb-commitment-tree = { git = "https://github.com/dashpay/grovedb", rev = "8f25b20d04bfc0e8bdfb3870676d647a0d74918b", optional = true } zip32 = { version = "0.2.0", default-features = false, optional = true } @@ -62,3 +68,12 @@ default = ["bls", "eddsa"] bls = ["key-wallet/bls", "key-wallet-manager/bls"] eddsa = ["key-wallet/eddsa", "key-wallet-manager/eddsa"] shielded = ["dep:grovedb-commitment-tree", "dep:zip32", "dash-sdk/shielded", "dpp/shielded-client"] + +# Off by default. When enabled, the per-wallet `wallet_manager` RwLock is +# wrapped with an `InstrumentedRwLock` that records acquisition counts, +# wait time, and hold time per call site (using `read_at("tag")` / +# `write_at("tag")`). With the feature off the wrapper is a transparent +# type alias for `tokio::sync::RwLock` and there is zero runtime cost. +# See `crate::diagnostics::instrumented_lock` for the API and +# `LockStats` for the snapshot shape. +lock-stats = ["dep:parking_lot"] diff --git a/packages/rs-platform-wallet/src/changeset/core_bridge.rs b/packages/rs-platform-wallet/src/changeset/core_bridge.rs index ddfcfe956e0..de25476a45e 100644 --- a/packages/rs-platform-wallet/src/changeset/core_bridge.rs +++ b/packages/rs-platform-wallet/src/changeset/core_bridge.rs @@ -29,12 +29,17 @@ use key_wallet::transaction_checking::TransactionContext; use key_wallet::Utxo; use key_wallet_manager::{WalletEvent, WalletId, WalletManager}; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use crate::changeset::changeset::{CoreChangeSet, PlatformWalletChangeSet}; use crate::changeset::traits::PlatformWalletPersistence; +// `InstrumentedRwLockExt` is unused when `lock-stats` is on — the +// wrapper has the same methods inherent and method resolution prefers +// inherent over trait. Suppress the warning so the call sites can +// import the trait unconditionally. +#[allow(unused_imports)] +use crate::diagnostics::{InstrumentedRwLock, InstrumentedRwLockExt}; use crate::wallet::platform_wallet::PlatformWalletInfo; /// Spawn the wallet-event subscriber task. @@ -45,13 +50,17 @@ use crate::wallet::platform_wallet::PlatformWalletInfo; /// [`PlatformWalletPersistence::store`]. Exits when `cancel` fires /// or the upstream broadcast channel closes. pub fn spawn_wallet_event_adapter( - wallet_manager: Arc>>, + wallet_manager: Arc>>, persister: Arc, cancel: CancellationToken, ) -> JoinHandle<()> { tokio::spawn(async move { let mut receiver = { - let guard = wallet_manager.read().await; + // Subscribe-time read; happens once at task start. Tagged + // so the `lock-stats` feature can confirm the one-shot + // nature of this acquisition vs. the per-event probe in + // `is_chain_locked`. + let guard = wallet_manager.read_at("event_adapter::subscribe").await; guard.subscribe_events() }; tracing::debug!("WalletEventAdapter task started"); @@ -108,7 +117,7 @@ pub fn spawn_wallet_event_adapter( /// Project an upstream [`WalletEvent`] into a [`CoreChangeSet`] suitable /// for atomic persistence. async fn build_core_changeset( - wallet_manager: &Arc>>, + wallet_manager: &Arc>>, event: &WalletEvent, ) -> CoreChangeSet { match event { @@ -172,11 +181,18 @@ async fn build_core_changeset( /// Returns `true` when the wallet's stored record for `txid` is in a /// chain-locked block. Used to gate IS-lock projection. async fn is_chain_locked( - wallet_manager: &Arc>>, + wallet_manager: &Arc>>, wallet_id: &WalletId, txid: &dashcore::Txid, ) -> bool { - let guard = wallet_manager.read().await; + // Tagged so the `lock-stats` feature can attribute this site's + // contribution to wallet-manager contention. The event adapter + // touches this lock once per `TransactionInstantLocked` event; + // tagging lets a perf audit distinguish the IS-lock finality + // probe from generic identity / token / address-sync reads. + let guard = wallet_manager + .read_at("event_adapter::is_chain_locked") + .await; let Some(info) = guard.get_wallet_info(wallet_id) else { return false; }; diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs new file mode 100644 index 00000000000..94414cacf02 --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/mod.rs @@ -0,0 +1,594 @@ +//! `InstrumentedRwLock` — opt-in instrumented wrapper around +//! [`tokio::sync::RwLock`]. +//! +//! # Build modes +//! +//! - **`lock-stats` feature OFF (default)** — `InstrumentedRwLock` is +//! a literal type alias for [`tokio::sync::RwLock`]. There is no +//! wrapper struct, no extra `Arc`, and no `Drop` glue. The tagged +//! methods (`read_at` / `write_at` / `try_*_at` / `blocking_*_at`) +//! plus `raw_arc` are provided by zero-cost extension traits +//! ([`InstrumentedRwLockExt`] and [`InstrumentedArcExt`]) whose +//! methods are `#[inline]` and drop the tag at the call site. After +//! inlining, every call collapses to the equivalent inherent +//! [`tokio::sync::RwLock`] method. +//! +//! - **`lock-stats` feature ON** — `InstrumentedRwLock` is a +//! wrapper struct holding `Arc>` plus +//! `Arc<`[`LockStats`]`>`. Each acquisition records the wait time +//! (until the guard resolves) and the hold time (until the guard +//! drops). The tagged methods bucket the acquisition under the +//! given tag; untagged calls bucket into [`UNTAGGED`]. +//! +//! # Why an inner Arc when the feature is on +//! +//! The wrapper has to hand `Arc>` to APIs that +//! take that concrete type literally (e.g. `dash_spv::DashSpvClient::new` +//! which takes `Arc>`). With the feature off the wallet-manager +//! field IS `Arc>`, so [`InstrumentedArcExt::raw_arc`] +//! reduces to `Arc::clone(self)`. With the feature on the wrapper holds +//! its tokio lock as `Arc>` internally so the same +//! `raw_arc()` call extracts the inner Arc. SPV's own acquisitions go +//! through that inner Arc directly and are NOT seen by the wrapper's +//! stats — the intentional trade is that platform-side acquisitions +//! (everything that goes through `wallet_manager.read()` / +//! `wallet_manager.read_at("…")`) are counted, while upstream's own +//! `process_block` write isn't. That's the right shape for "what does +//! platform-wallet contribute to lock pressure?", which is the question +//! this layer was added to answer. +//! +//! # Tagged call sites +//! +//! Untagged calls (`lock.read().await`) bucket into [`UNTAGGED`] — +//! a useful aggregate but not actionable when you're trying to find +//! the specific code path serializing the lock. Tagging individual +//! sites (`lock.read_at("event_adapter::is_chain_locked").await`) is +//! the path to actionable contention numbers. The tag is `&'static str` +//! so it doesn't allocate on the hot path; with the feature off the +//! tag is dropped at the `read_at` boundary and the call collapses +//! into a plain `read().await`. +//! +//! # Snapshot shape +//! +//! With `lock-stats` enabled, calling [`InstrumentedRwLock::stats`] +//! hands back the shared `Arc`. From there +//! [`LockStats::snapshot`] produces a [`Snapshot`] containing the +//! global counters and the per-tag breakdown — clone the snapshot +//! wherever you need to print or log it (e.g. an FFI accessor or a +//! periodic `tracing::info!`). + +#![allow(unused_imports)] // some imports are only used under one cfg branch + +use std::future::Future; +use std::sync::Arc; + +use tokio::sync::{ + RwLock as TokioRwLock, RwLockReadGuard as TokioReadGuard, RwLockWriteGuard as TokioWriteGuard, + TryLockError, +}; + +#[cfg(feature = "lock-stats")] +mod stats; + +#[cfg(feature = "lock-stats")] +pub use stats::{LockStats, SiteStats, Snapshot}; + +#[cfg(feature = "lock-stats")] +use std::time::Instant; + +/// The default tag attributed to acquisitions made through the +/// un-suffixed methods (`read`, `write`, `try_read`, …). Visible in +/// [`LockStats`] snapshots so it's clear which acquisitions came +/// from un-tagged sites. +pub const UNTAGGED: &str = "untagged"; + +// --------------------------------------------------------------------------- +// Extension traits +// +// Defined unconditionally so call sites import them once and stay agnostic +// to the feature flag. The impls differ per cfg branch — see below. +// --------------------------------------------------------------------------- + +/// Tagged-acquisition methods on a `RwLock`-shaped lock. +/// +/// In feature-off mode the impl forwards to the corresponding tokio +/// inherent method and drops the tag. In feature-on mode the impl +/// forwards to the wrapper's inherent method, which records the +/// acquisition under `tag`. +pub trait InstrumentedRwLockExt { + /// Acquire a shared lock, attributing the acquisition to `tag` + /// (when `lock-stats` is enabled). + fn read_at(&self, tag: &'static str) -> impl Future> + Send; + /// Acquire an exclusive lock, attributing the acquisition to `tag`. + fn write_at(&self, tag: &'static str) -> impl Future> + Send; + /// Try to acquire a shared lock without waiting. + fn try_read_at(&self, tag: &'static str) -> Result, TryLockError>; + /// Try to acquire an exclusive lock without waiting. + fn try_write_at(&self, tag: &'static str) -> Result, TryLockError>; + /// Synchronously acquire a shared lock — must NOT be called from a + /// tokio runtime thread (will panic). + fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T>; + /// Synchronously acquire an exclusive lock — must NOT be called + /// from a tokio runtime thread. + fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T>; +} + +/// `raw_arc()` extension on a shared handle to the lock. Returns the +/// `Arc>` shape that external APIs take literally +/// (e.g. `dash_spv::DashSpvClient::new`). With the feature off the +/// handle IS the tokio Arc, so this is `Arc::clone(self)`. With the +/// feature on the handle is `Arc` and this extracts the +/// wrapper's inner Arc. +pub trait InstrumentedArcExt { + /// Cheap clone of the underlying `Arc>`. + /// Acquisitions made through the returned Arc bypass the wrapper's + /// stats — see the module-level docs for the rationale. + fn raw_arc(&self) -> Arc>; +} + +// --------------------------------------------------------------------------- +// Feature OFF: type aliases — the wrapper IS tokio's RwLock. +// --------------------------------------------------------------------------- + +#[cfg(not(feature = "lock-stats"))] +mod alias_mode { + use super::*; + + /// Type alias for [`tokio::sync::RwLock`] when `lock-stats` is + /// off. No wrapper struct, no extra `Arc`, no per-method + /// instrumentation cost. + pub type InstrumentedRwLock = TokioRwLock; + + /// Type alias for [`tokio::sync::RwLockReadGuard<'a, T>`] when + /// `lock-stats` is off. + pub type ReadGuard<'a, T> = TokioReadGuard<'a, T>; + + /// Type alias for [`tokio::sync::RwLockWriteGuard<'a, T>`] when + /// `lock-stats` is off. + pub type WriteGuard<'a, T> = TokioWriteGuard<'a, T>; + + impl InstrumentedRwLockExt for TokioRwLock { + #[inline] + fn read_at(&self, _tag: &'static str) -> impl Future> + Send { + self.read() + } + + #[inline] + fn write_at(&self, _tag: &'static str) -> impl Future> + Send { + self.write() + } + + #[inline] + fn try_read_at(&self, _tag: &'static str) -> Result, TryLockError> { + self.try_read() + } + + #[inline] + fn try_write_at(&self, _tag: &'static str) -> Result, TryLockError> { + self.try_write() + } + + #[inline] + fn blocking_read_at(&self, _tag: &'static str) -> ReadGuard<'_, T> { + self.blocking_read() + } + + #[inline] + fn blocking_write_at(&self, _tag: &'static str) -> WriteGuard<'_, T> { + self.blocking_write() + } + } + + impl InstrumentedArcExt for Arc> { + #[inline] + fn raw_arc(&self) -> Arc> { + Arc::clone(self) + } + } +} + +#[cfg(not(feature = "lock-stats"))] +pub use alias_mode::{InstrumentedRwLock, ReadGuard, WriteGuard}; + +// --------------------------------------------------------------------------- +// Feature ON: full wrapper struct with stats. +// --------------------------------------------------------------------------- + +#[cfg(feature = "lock-stats")] +mod struct_mode { + use super::*; + use std::ops::{Deref, DerefMut}; + + /// Wrapper around [`tokio::sync::RwLock`] that records + /// per-call-site acquisition counts plus wait and hold durations. + /// See the module-level docs. + pub struct InstrumentedRwLock { + inner: Arc>, + stats: Arc, + } + + impl InstrumentedRwLock { + /// Construct a new lock holding `value`. + pub fn new(value: T) -> Self { + Self { + inner: Arc::new(TokioRwLock::new(value)), + stats: Arc::new(LockStats::new()), + } + } + + /// Borrow the wrapped tokio lock. Use only for APIs that + /// genuinely need a `&TokioRwLock`; prefer the wrapper's + /// own methods so acquisitions stay attributed. + #[inline] + pub fn raw(&self) -> &TokioRwLock { + &self.inner + } + + /// Cheap clone of the inner `Arc>`. See the + /// [`InstrumentedArcExt::raw_arc`] doc for the trade. + #[inline] + pub fn raw_arc(&self) -> Arc> { + Arc::clone(&self.inner) + } + + /// Shared handle to the per-lock stats snapshot store. + #[inline] + pub fn stats(&self) -> Arc { + Arc::clone(&self.stats) + } + + /// Acquire a shared lock — buckets into [`UNTAGGED`]. + #[inline] + pub async fn read(&self) -> ReadGuard<'_, T> { + self.read_at(UNTAGGED).await + } + + /// Acquire an exclusive lock — buckets into [`UNTAGGED`]. + #[inline] + pub async fn write(&self) -> WriteGuard<'_, T> { + self.write_at(UNTAGGED).await + } + + /// Acquire a shared lock with a per-call-site tag. + pub async fn read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.read().await; + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_read_acquired(tag, wait_ns); + ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Acquire an exclusive lock with a per-call-site tag. + pub async fn write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.write().await; + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_write_acquired(tag, wait_ns); + WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Try to acquire a shared lock without waiting. + #[inline] + pub fn try_read(&self) -> Result, TryLockError> { + self.try_read_at(UNTAGGED) + } + + /// Try to acquire an exclusive lock without waiting. + #[inline] + pub fn try_write(&self) -> Result, TryLockError> { + self.try_write_at(UNTAGGED) + } + + /// Tagged variant of [`try_read`](Self::try_read). + pub fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> { + match self.inner.try_read() { + Ok(inner) => { + self.stats.record_read_acquired(tag, 0); + Ok(ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + }) + } + Err(e) => { + self.stats.record_read_contended(tag); + Err(e) + } + } + } + + /// Tagged variant of [`try_write`](Self::try_write). + pub fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> { + match self.inner.try_write() { + Ok(inner) => { + self.stats.record_write_acquired(tag, 0); + Ok(WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + }) + } + Err(e) => { + self.stats.record_write_contended(tag); + Err(e) + } + } + } + + /// Synchronously acquire a shared lock — must NOT be called from + /// a tokio runtime thread. + #[inline] + pub fn blocking_read(&self) -> ReadGuard<'_, T> { + self.blocking_read_at(UNTAGGED) + } + + /// Synchronously acquire an exclusive lock — must NOT be called + /// from a tokio runtime thread. + #[inline] + pub fn blocking_write(&self) -> WriteGuard<'_, T> { + self.blocking_write_at(UNTAGGED) + } + + /// Tagged variant of [`blocking_read`](Self::blocking_read). + pub fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.blocking_read(); + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_read_acquired(tag, wait_ns); + ReadGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + + /// Tagged variant of [`blocking_write`](Self::blocking_write). + pub fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + let wait_start = Instant::now(); + let inner = self.inner.blocking_write(); + let wait_ns = wait_start.elapsed().as_nanos() as u64; + self.stats.record_write_acquired(tag, wait_ns); + WriteGuard { + inner, + stats: Arc::clone(&self.stats), + tag, + acquired_at: Instant::now(), + } + } + } + + impl Default for InstrumentedRwLock { + fn default() -> Self { + Self::new(T::default()) + } + } + + impl std::fmt::Debug for InstrumentedRwLock { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InstrumentedRwLock") + .field("inner", &self.inner) + .finish() + } + } + + /// Shared read guard. `Deref`. Records hold time on + /// `Drop`. + pub struct ReadGuard<'a, T> { + inner: TokioReadGuard<'a, T>, + stats: Arc, + tag: &'static str, + acquired_at: Instant, + } + + impl Deref for ReadGuard<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + &self.inner + } + } + + impl Drop for ReadGuard<'_, T> { + fn drop(&mut self) { + let held_ns = self.acquired_at.elapsed().as_nanos() as u64; + self.stats.record_read_released(self.tag, held_ns); + } + } + + /// Exclusive write guard. `Deref` + `DerefMut`. Records + /// hold time on `Drop`. + pub struct WriteGuard<'a, T> { + inner: TokioWriteGuard<'a, T>, + stats: Arc, + tag: &'static str, + acquired_at: Instant, + } + + impl Deref for WriteGuard<'_, T> { + type Target = T; + + #[inline] + fn deref(&self) -> &T { + &self.inner + } + } + + impl DerefMut for WriteGuard<'_, T> { + #[inline] + fn deref_mut(&mut self) -> &mut T { + &mut self.inner + } + } + + impl Drop for WriteGuard<'_, T> { + fn drop(&mut self) { + let held_ns = self.acquired_at.elapsed().as_nanos() as u64; + self.stats.record_write_released(self.tag, held_ns); + } + } + + // Trait impls so feature-agnostic call sites that import the Ext + // traits keep working. Method resolution prefers inherent methods + // when they exist, so these are mostly redundant in feature-on + // mode — they're here so a generic helper bounded by + // `InstrumentedRwLockExt` can take the wrapper just as easily as + // it can take a raw `TokioRwLock`. + impl InstrumentedRwLockExt for InstrumentedRwLock { + #[inline] + fn read_at(&self, tag: &'static str) -> impl Future> + Send { + InstrumentedRwLock::read_at(self, tag) + } + + #[inline] + fn write_at(&self, tag: &'static str) -> impl Future> + Send { + InstrumentedRwLock::write_at(self, tag) + } + + #[inline] + fn try_read_at(&self, tag: &'static str) -> Result, TryLockError> { + InstrumentedRwLock::try_read_at(self, tag) + } + + #[inline] + fn try_write_at(&self, tag: &'static str) -> Result, TryLockError> { + InstrumentedRwLock::try_write_at(self, tag) + } + + #[inline] + fn blocking_read_at(&self, tag: &'static str) -> ReadGuard<'_, T> { + InstrumentedRwLock::blocking_read_at(self, tag) + } + + #[inline] + fn blocking_write_at(&self, tag: &'static str) -> WriteGuard<'_, T> { + InstrumentedRwLock::blocking_write_at(self, tag) + } + } + + impl InstrumentedArcExt for Arc> { + #[inline] + fn raw_arc(&self) -> Arc> { + InstrumentedRwLock::raw_arc(self) + } + } +} + +#[cfg(feature = "lock-stats")] +pub use struct_mode::{InstrumentedRwLock, ReadGuard, WriteGuard}; + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn read_write_smoke() { + let lock = InstrumentedRwLock::new(0u32); + { + let guard = lock.read().await; + assert_eq!(*guard, 0); + } + { + let mut guard = lock.write().await; + *guard = 42; + } + let guard = lock.read().await; + assert_eq!(*guard, 42); + } + + #[tokio::test] + async fn try_read_contended() { + let lock = InstrumentedRwLock::new(0u32); + let _w = lock.write().await; + assert!(lock.try_read().is_err()); + } + + #[tokio::test] + async fn read_at_smoke() { + // Tagged calls work in both feature modes — feature-off via the + // `InstrumentedRwLockExt` trait, feature-on via inherent methods. + let lock = InstrumentedRwLock::new(0u32); + let guard = lock.read_at("test::tag").await; + assert_eq!(*guard, 0); + } + + #[tokio::test] + async fn raw_arc_smoke() { + // `raw_arc()` works in both modes — feature-off via the + // `InstrumentedArcExt` trait on `Arc`, feature-on + // via the wrapper's inherent method. + let lock = Arc::new(InstrumentedRwLock::new(0u32)); + let raw: Arc> = lock.raw_arc(); + let guard = raw.read().await; + assert_eq!(*guard, 0); + } + + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn stats_count_and_attribute_to_tag() { + let lock = InstrumentedRwLock::new(0u32); + + // Two reads tagged "ours", one write tagged "theirs". + { + let _r1 = lock.read_at("ours").await; + let _r2 = lock.read_at("ours").await; + } + { + let _w = lock.write_at("theirs").await; + } + + let snap = lock.stats().snapshot(); + let ours = snap.per_tag.get("ours").expect("ours tag present"); + assert_eq!(ours.read_acquired, 2); + assert_eq!(ours.write_acquired, 0); + let theirs = snap.per_tag.get("theirs").expect("theirs tag present"); + assert_eq!(theirs.read_acquired, 0); + assert_eq!(theirs.write_acquired, 1); + assert_eq!(snap.total.read_acquired, 2); + assert_eq!(snap.total.write_acquired, 1); + } + + // Untagged calls go to the UNTAGGED bucket so the snapshot still + // accounts for them — we don't want acquisitions to vanish. + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn untagged_calls_go_to_untagged_bucket() { + let lock = InstrumentedRwLock::new(0u32); + { + let _r = lock.read().await; + } + let snap = lock.stats().snapshot(); + let untagged = snap.per_tag.get(UNTAGGED).expect("UNTAGGED bucket"); + assert_eq!(untagged.read_acquired, 1); + } + + #[cfg(feature = "lock-stats")] + #[tokio::test] + async fn try_read_failure_records_contention() { + let lock = InstrumentedRwLock::new(0u32); + let _w = lock.write_at("holder").await; + let r = lock.try_read_at("contender"); + assert!(r.is_err()); + let snap = lock.stats().snapshot(); + let contender = snap.per_tag.get("contender").expect("contender tag"); + assert_eq!(contender.read_contended, 1); + assert_eq!(contender.read_acquired, 0); + } +} diff --git a/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs new file mode 100644 index 00000000000..d6f5a8dae17 --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/instrumented_lock/stats.rs @@ -0,0 +1,238 @@ +//! Stats counters for [`super::InstrumentedRwLock`]. Only compiled when +//! the `lock-stats` Cargo feature is enabled. +//! +//! # Storage shape +//! +//! - **Total counters** — atomic `u64`s, lock-free, bumped from every +//! acquire / release path. Cheap enough that the bump is in the +//! noise even under heavy contention. +//! - **Per-tag breakdown** — `BTreeMap<&'static str, SiteStats>` behind +//! a `parking_lot::Mutex`. The map is touched only on the lock +//! acquire / release boundary (never across an `.await`), so a sync +//! mutex is appropriate. The acquire path holds the mutex just long +//! enough to look up or insert the entry, then bumps the entry's +//! atomics outside the lock. New tags are inserted lazily on first +//! use; existing tags re-use the entry. + +use std::collections::BTreeMap; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use parking_lot::Mutex; + +/// Aggregate lock-acquisition counters maintained by an +/// [`InstrumentedRwLock`](super::InstrumentedRwLock). +/// +/// Use [`LockStats::snapshot`] to clone out a readable [`Snapshot`]. +/// The live [`LockStats`] keeps its counters as atomics so reads via +/// `snapshot` don't race with writers. +#[derive(Debug)] +pub struct LockStats { + total: SiteCounters, + per_tag: Mutex>>, +} + +impl LockStats { + pub(super) fn new() -> Self { + Self { + total: SiteCounters::new(), + per_tag: Mutex::new(BTreeMap::new()), + } + } + + /// Take a snapshot of the current counters. Cheap enough to call + /// from a debug UI on every refresh; a periodic logger could call + /// it on a 1-second timer without measurable overhead. + pub fn snapshot(&self) -> Snapshot { + let per_tag: BTreeMap<&'static str, SiteStats> = self + .per_tag + .lock() + .iter() + .map(|(tag, counters)| (*tag, counters.snapshot())) + .collect(); + Snapshot { + total: self.total.snapshot(), + per_tag, + } + } + + /// Look up (or insert on first use) the [`SiteCounters`] for a tag. + /// Holds the per-tag mutex only for the lookup; the returned `Arc` + /// lets the caller bump atomics outside the mutex. + fn site(&self, tag: &'static str) -> Arc { + let mut guard = self.per_tag.lock(); + if let Some(existing) = guard.get(tag) { + return Arc::clone(existing); + } + let new = Arc::new(SiteCounters::new()); + guard.insert(tag, Arc::clone(&new)); + new + } + + pub(super) fn record_read_acquired(&self, tag: &'static str, wait_ns: u64) { + self.total.read_acquired.fetch_add(1, Ordering::Relaxed); + self.total + .read_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + let site = self.site(tag); + site.read_acquired.fetch_add(1, Ordering::Relaxed); + site.read_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + } + + pub(super) fn record_read_released(&self, tag: &'static str, held_ns: u64) { + self.total + .read_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + let site = self.site(tag); + site.read_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + } + + pub(super) fn record_read_contended(&self, tag: &'static str) { + self.total.read_contended.fetch_add(1, Ordering::Relaxed); + let site = self.site(tag); + site.read_contended.fetch_add(1, Ordering::Relaxed); + } + + pub(super) fn record_write_acquired(&self, tag: &'static str, wait_ns: u64) { + self.total.write_acquired.fetch_add(1, Ordering::Relaxed); + self.total + .write_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + let site = self.site(tag); + site.write_acquired.fetch_add(1, Ordering::Relaxed); + site.write_wait_ns_total + .fetch_add(wait_ns, Ordering::Relaxed); + } + + pub(super) fn record_write_released(&self, tag: &'static str, held_ns: u64) { + self.total + .write_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + let site = self.site(tag); + site.write_hold_ns_total + .fetch_add(held_ns, Ordering::Relaxed); + } + + pub(super) fn record_write_contended(&self, tag: &'static str) { + self.total.write_contended.fetch_add(1, Ordering::Relaxed); + let site = self.site(tag); + site.write_contended.fetch_add(1, Ordering::Relaxed); + } +} + +/// Live atomic counters for a single bucket (the global "total" plus +/// each per-tag site). +#[derive(Debug)] +struct SiteCounters { + read_acquired: AtomicU64, + write_acquired: AtomicU64, + read_contended: AtomicU64, + write_contended: AtomicU64, + read_wait_ns_total: AtomicU64, + write_wait_ns_total: AtomicU64, + read_hold_ns_total: AtomicU64, + write_hold_ns_total: AtomicU64, +} + +impl SiteCounters { + fn new() -> Self { + Self { + read_acquired: AtomicU64::new(0), + write_acquired: AtomicU64::new(0), + read_contended: AtomicU64::new(0), + write_contended: AtomicU64::new(0), + read_wait_ns_total: AtomicU64::new(0), + write_wait_ns_total: AtomicU64::new(0), + read_hold_ns_total: AtomicU64::new(0), + write_hold_ns_total: AtomicU64::new(0), + } + } + + fn snapshot(&self) -> SiteStats { + SiteStats { + read_acquired: self.read_acquired.load(Ordering::Relaxed), + write_acquired: self.write_acquired.load(Ordering::Relaxed), + read_contended: self.read_contended.load(Ordering::Relaxed), + write_contended: self.write_contended.load(Ordering::Relaxed), + read_wait_ns_total: self.read_wait_ns_total.load(Ordering::Relaxed), + write_wait_ns_total: self.write_wait_ns_total.load(Ordering::Relaxed), + read_hold_ns_total: self.read_hold_ns_total.load(Ordering::Relaxed), + write_hold_ns_total: self.write_hold_ns_total.load(Ordering::Relaxed), + } + } +} + +/// Plain-old-data snapshot of a single bucket (the global total or a +/// single tag). All durations are in nanoseconds; cumulative across +/// every acquisition since the lock was created. +#[derive(Debug, Clone, Default, PartialEq, Eq)] +pub struct SiteStats { + /// Number of times a read guard was successfully acquired. + pub read_acquired: u64, + /// Number of times a write guard was successfully acquired. + pub write_acquired: u64, + /// Number of times a `try_read` returned `Err(TryLockError)`. + pub read_contended: u64, + /// Number of times a `try_write` returned `Err(TryLockError)`. + pub write_contended: u64, + /// Cumulative wait time before read acquisitions resolved, in ns. + pub read_wait_ns_total: u64, + /// Cumulative wait time before write acquisitions resolved, in ns. + pub write_wait_ns_total: u64, + /// Cumulative time read guards were held before drop, in ns. + pub read_hold_ns_total: u64, + /// Cumulative time write guards were held before drop, in ns. + pub write_hold_ns_total: u64, +} + +impl SiteStats { + /// Mean wait time for read acquisitions, in nanoseconds. Returns + /// `None` if no read acquisitions have completed. + pub fn read_wait_ns_mean(&self) -> Option { + if self.read_acquired == 0 { + None + } else { + Some(self.read_wait_ns_total / self.read_acquired) + } + } + + /// Mean wait time for write acquisitions, in nanoseconds. + pub fn write_wait_ns_mean(&self) -> Option { + if self.write_acquired == 0 { + None + } else { + Some(self.write_wait_ns_total / self.write_acquired) + } + } + + /// Mean hold time for read acquisitions, in nanoseconds. + pub fn read_hold_ns_mean(&self) -> Option { + if self.read_acquired == 0 { + None + } else { + Some(self.read_hold_ns_total / self.read_acquired) + } + } + + /// Mean hold time for write acquisitions, in nanoseconds. + pub fn write_hold_ns_mean(&self) -> Option { + if self.write_acquired == 0 { + None + } else { + Some(self.write_hold_ns_total / self.write_acquired) + } + } +} + +/// Snapshot of a [`LockStats`]: aggregate totals plus the per-tag +/// breakdown. Cheap to clone; suitable for shipping through a debug +/// UI or a periodic log line. +#[derive(Debug, Clone, Default)] +pub struct Snapshot { + /// Aggregate counters across every tag (including `UNTAGGED`). + pub total: SiteStats, + /// Per-tag breakdown. Tags that have never been used don't appear. + pub per_tag: BTreeMap<&'static str, SiteStats>, +} diff --git a/packages/rs-platform-wallet/src/diagnostics/mod.rs b/packages/rs-platform-wallet/src/diagnostics/mod.rs new file mode 100644 index 00000000000..e503937a65f --- /dev/null +++ b/packages/rs-platform-wallet/src/diagnostics/mod.rs @@ -0,0 +1,18 @@ +//! Optional runtime diagnostics for platform-wallet. +//! +//! Currently a single submodule: +//! +//! - [`instrumented_lock`] — an `InstrumentedRwLock` newtype wrapping +//! [`tokio::sync::RwLock`] that, when the `lock-stats` Cargo feature +//! is enabled, records per-call-site acquisition counts plus wait / +//! hold durations. With the feature off the wrapper compiles down to +//! the underlying tokio lock (zero added overhead in the hot path). + +pub mod instrumented_lock; + +pub use instrumented_lock::{ + InstrumentedArcExt, InstrumentedRwLock, InstrumentedRwLockExt, ReadGuard, WriteGuard, +}; + +#[cfg(feature = "lock-stats")] +pub use instrumented_lock::{LockStats, SiteStats, Snapshot}; diff --git a/packages/rs-platform-wallet/src/lib.rs b/packages/rs-platform-wallet/src/lib.rs index 93e2d43d1ac..82049eb42bc 100644 --- a/packages/rs-platform-wallet/src/lib.rs +++ b/packages/rs-platform-wallet/src/lib.rs @@ -14,6 +14,7 @@ pub mod broadcaster; pub mod changeset; +pub mod diagnostics; pub mod error; pub mod events; pub mod manager; diff --git a/packages/rs-platform-wallet/src/manager/mod.rs b/packages/rs-platform-wallet/src/manager/mod.rs index 446830cc99d..d96e6e06503 100644 --- a/packages/rs-platform-wallet/src/manager/mod.rs +++ b/packages/rs-platform-wallet/src/manager/mod.rs @@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken; use key_wallet_manager::WalletManager; use crate::changeset::{spawn_wallet_event_adapter, PlatformWalletPersistence}; +use crate::diagnostics::InstrumentedRwLock; use crate::events::{PlatformEventHandler, PlatformEventManager}; use crate::platform_address_sync::PlatformAddressSyncManager; use crate::spv::SpvRuntime; @@ -27,7 +28,7 @@ use crate::wallet::PlatformWallet; /// [`PlatformEventHandler`]s by reference (no cloning). pub struct PlatformWalletManager { pub(super) sdk: Arc, - pub(super) wallet_manager: Arc>>, + pub(super) wallet_manager: Arc>>, /// Map of registered wallets. Held in an `Arc` so the /// `BalanceUpdateHandler` can hold a clone and look up wallets to /// update their lock-free balance atomics from event-handler @@ -64,7 +65,7 @@ impl PlatformWalletManager

{ // coerce once here and pass clones along instead of re-erasing // at every call site. let dyn_persister: Arc = Arc::clone(&persister) as _; - let wallet_manager = Arc::new(RwLock::new(WalletManager::new(sdk.network))); + let wallet_manager = Arc::new(InstrumentedRwLock::new(WalletManager::new(sdk.network))); let wallets = Arc::new(RwLock::new(std::collections::BTreeMap::new())); let lock_notify = Arc::new(Notify::new()); diff --git a/packages/rs-platform-wallet/src/spv/runtime.rs b/packages/rs-platform-wallet/src/spv/runtime.rs index eecb0e58607..70ba891d030 100644 --- a/packages/rs-platform-wallet/src/spv/runtime.rs +++ b/packages/rs-platform-wallet/src/spv/runtime.rs @@ -15,6 +15,11 @@ use dash_spv::{ClientConfig, DashSpvClient, EventHandler, Hash}; use key_wallet_manager::WalletManager; +// `InstrumentedArcExt` is unused when `lock-stats` is on — the wrapper +// has `raw_arc()` inherent and method resolution prefers it. +// Suppress the warning so the import line stays uniform across modes. +#[allow(unused_imports)] +use crate::diagnostics::{InstrumentedArcExt, InstrumentedRwLock}; use crate::error::PlatformWalletError; use crate::events::PlatformEventManager; use crate::wallet::platform_wallet::PlatformWalletInfo; @@ -28,7 +33,7 @@ type SpvClient = /// handlers by reference (no cloning). pub struct SpvRuntime { event_manager: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, client: RwLock>, /// Cancel token for the `run()` task when it was spawned via /// [`spawn_in_background`]. [`stop`] fires this token and joins @@ -39,7 +44,7 @@ pub struct SpvRuntime { impl SpvRuntime { /// Create a new SPV runtime. pub fn new( - wallet_manager: Arc>>, + wallet_manager: Arc>>, event_manager: Arc, ) -> Self { Self { @@ -73,11 +78,16 @@ impl SpvRuntime { // platform event manager's own handler list). let event_handlers: Vec> = vec![Arc::clone(&self.event_manager) as Arc]; + // Upstream takes `Arc>` literally; hand + // it the inner Arc that lives inside our `InstrumentedRwLock` + // wrapper. SPV's own acquisitions go through this Arc directly + // and are not seen by the wrapper's stats — see the wrapper's + // type-level docs for the rationale. let spv_client = DashSpvClient::new( config, network_manager, storage_manager, - Arc::clone(&self.wallet_manager), + self.wallet_manager.raw_arc(), event_handlers, ) .await diff --git a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs index 3afc0053612..bcc8f89327b 100644 --- a/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs +++ b/packages/rs-platform-wallet/src/wallet/asset_lock/manager.rs @@ -6,10 +6,11 @@ use std::sync::Arc; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::Notify; use crate::broadcaster::TransactionBroadcaster; use crate::changeset::changeset::AssetLockChangeSet; +use crate::diagnostics::InstrumentedRwLock; use crate::wallet::persister::WalletPersister; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -30,7 +31,7 @@ pub(super) const DEFAULT_FEE_PER_KB: u64 = 1000; pub struct AssetLockManager { pub(super) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(super) wallet_manager: Arc>>, + pub(super) wallet_manager: Arc>>, /// Identifies which wallet within the manager this manager operates on. pub(super) wallet_id: WalletId, /// Notified on InstantLock / ChainLock events by SpvEventForwarder. @@ -64,7 +65,7 @@ impl AssetLockManager { /// Create a new `AssetLockManager`. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, lock_notify: Arc, broadcaster: Arc, diff --git a/packages/rs-platform-wallet/src/wallet/core/wallet.rs b/packages/rs-platform-wallet/src/wallet/core/wallet.rs index 5a29db29002..fe282a69e52 100644 --- a/packages/rs-platform-wallet/src/wallet/core/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/core/wallet.rs @@ -5,11 +5,11 @@ use std::sync::Arc; use super::balance::WalletBalance; use dashcore::Address as DashAddress; -use tokio::sync::RwLock; use key_wallet_manager::WalletManager; use crate::broadcaster::TransactionBroadcaster; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -24,7 +24,7 @@ use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; /// through a `dyn` vtable. pub struct CoreWallet { pub(crate) sdk: Arc, - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, pub(crate) wallet_id: WalletId, /// Injected broadcaster — delegates to SPV or DAPI depending on how /// the wallet was constructed by `PlatformWalletManager`. @@ -36,7 +36,7 @@ pub struct CoreWallet { impl CoreWallet { pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, broadcaster: Arc, balance: Arc, diff --git a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs index 80619ef8663..a872a94f4d4 100644 --- a/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs +++ b/packages/rs-platform-wallet/src/wallet/identity/network/identity_handle.rs @@ -33,10 +33,10 @@ use key_wallet::dip9::{ use key_wallet::wallet::Wallet; use key_wallet::Network; use key_wallet_manager::WalletManager; -use tokio::sync::RwLock; use zeroize::Zeroizing; use crate::broadcaster::{SpvBroadcaster, TransactionBroadcaster}; +use crate::diagnostics::{InstrumentedRwLock, ReadGuard, WriteGuard}; use crate::error::PlatformWalletError; use crate::wallet::asset_lock::manager::AssetLockManager; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; @@ -257,7 +257,7 @@ pub(crate) fn derive_identity_auth_key_hash( pub struct IdentityWallet { pub(crate) sdk: Arc, /// Shared wallet manager holding key material and wallet info. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifier for the wallet within the wallet manager. pub(crate) wallet_id: WalletId, /// Shared asset lock manager for building, broadcasting, and tracking @@ -381,9 +381,7 @@ impl IdentityWallet { /// Access wallet info via `wm.get_wallet_info(&wallet_id)` and key material /// via `wm.get_wallet(&wallet_id)` on the returned guard. The identity /// manager is on the wallet info: `info.identity_manager`. - pub async fn wallet_manager_read( - &self, - ) -> tokio::sync::RwLockReadGuard<'_, WalletManager> { + pub async fn wallet_manager_read(&self) -> ReadGuard<'_, WalletManager> { self.wallet_manager.read().await } @@ -392,9 +390,7 @@ impl IdentityWallet { /// Access wallet info via `wm.get_wallet_info_mut(&wallet_id)` on the /// returned guard. This allows callers to mutate managed identities (e.g. /// adding or updating identities from an external persistence layer). - pub async fn wallet_manager_write( - &self, - ) -> tokio::sync::RwLockWriteGuard<'_, WalletManager> { + pub async fn wallet_manager_write(&self) -> WriteGuard<'_, WalletManager> { self.wallet_manager.write().await } @@ -404,7 +400,7 @@ impl IdentityWallet { /// Useful for synchronous callers that cannot await. pub fn try_wallet_manager_write( &self, - ) -> Option>> { + ) -> Option>> { self.wallet_manager.try_write().ok() } diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs index 807b549f8a1..72ec4ddf73e 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/provider.rs @@ -30,12 +30,12 @@ use key_wallet::PlatformP2PKHAddress; use async_trait::async_trait; use key_wallet_manager::WalletManager; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use dash_sdk::platform::address_sync::{ AddressFunds, AddressIndex, AddressProvider, AddressSyncResult, }; -use tokio::sync::RwLock; /// DIP-17 address coordinates used as both the pending-bimap key and /// the SDK sync engine's `Tag`. Having the SDK carry these three @@ -151,7 +151,7 @@ pub(crate) type PerWalletInSyncPlatformAddressState = /// view of pending addresses spanning all wallets. pub(crate) struct PlatformPaymentAddressProvider { /// Shared wallet manager for gap-limit extension. - wallet_manager: Arc>>, + wallet_manager: Arc>>, /// Committed per-wallet tracked state — xpub + `addresses` bimap /// + `found` + `absent` from the last successful sync. `found` /// here is what [`current_balances`](Self::current_balances) @@ -185,7 +185,7 @@ impl PlatformPaymentAddressProvider { /// no key derivation happens here. `wallet_ids` not found in the /// wallet manager are silently skipped. pub(crate) async fn from_wallets( - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_ids: impl IntoIterator, ) -> Result { let mut per_wallet: BTreeMap = BTreeMap::new(); @@ -268,7 +268,7 @@ impl PlatformPaymentAddressProvider { /// of sync and the caller needs to reconcile rather than silently /// continue with stale data. pub async fn from_persisted( - wallet_manager: Arc>>, + wallet_manager: Arc>>, per_wallet: BTreeMap, sync_height: u64, sync_timestamp: u64, diff --git a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs index 7c618aaf0d5..745319902b4 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_addresses/wallet.rs @@ -6,6 +6,7 @@ use dpp::address_funds::PlatformAddress; use dpp::fee::Credits; use tokio::sync::RwLock; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use key_wallet_manager::WalletManager; @@ -19,7 +20,7 @@ use super::provider::PlatformPaymentAddressProvider; pub struct PlatformAddressWallet { pub(crate) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifies which wallet within the manager this sub-wallet operates on. pub(crate) wallet_id: WalletId, /// Single provider covering every platform payment account on the @@ -37,7 +38,7 @@ impl PlatformAddressWallet { /// Call [`initialize`] afterwards to build the unified provider. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, persister: WalletPersister, ) -> Self { diff --git a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs index 114fb291ec0..ca9f7ff2398 100644 --- a/packages/rs-platform-wallet/src/wallet/platform_wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/platform_wallet.rs @@ -10,7 +10,10 @@ use dpp::prelude::Identifier; use key_wallet::wallet::managed_wallet_info::ManagedWalletInfo; use key_wallet::wallet::Wallet; use key_wallet_manager::WalletManager; -use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; + +use crate::diagnostics::{ + InstrumentedRwLock, ReadGuard as InstrumentedReadGuard, WriteGuard as InstrumentedWriteGuard, +}; use super::asset_lock::manager::AssetLockManager; use super::asset_lock::tracked::TrackedAssetLock; @@ -60,7 +63,7 @@ pub struct PlatformWalletInfo { pub struct PlatformWallet { wallet_id: WalletId, pub(crate) sdk: Arc, - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, // Sub-wallets that hold a broadcaster are monomorphized with // `SpvBroadcaster` — the only production broadcaster in use. // Swapping this out to another broadcaster is a three-line flip @@ -121,7 +124,7 @@ impl PlatformWallet { } /// Get a reference to the shared wallet manager lock. - pub fn wallet_manager(&self) -> &Arc>> { + pub fn wallet_manager(&self) -> &Arc>> { &self.wallet_manager } @@ -214,7 +217,7 @@ impl PlatformWallet { pub(crate) fn new( sdk: Arc, wallet_id: WalletId, - wallet_manager: Arc>>, + wallet_manager: Arc>>, balance: Arc, lock_notify: Arc, persister: Arc, @@ -424,7 +427,7 @@ impl std::fmt::Debug for PlatformWallet { /// Read guard that locks `WalletManager` and derefs to this wallet's /// `PlatformWalletInfo`. Also provides `.wallet()` for key material access. pub struct WalletStateReadGuard<'a> { - guard: RwLockReadGuard<'a, WalletManager>, + guard: InstrumentedReadGuard<'a, WalletManager>, wallet_id: WalletId, } @@ -449,7 +452,7 @@ impl Deref for WalletStateReadGuard<'_> { /// Write guard that locks `WalletManager` and derefs to this wallet's /// `PlatformWalletInfo` (with `DerefMut`). Also provides `.wallet()`. pub struct WalletStateWriteGuard<'a> { - guard: RwLockWriteGuard<'a, WalletManager>, + guard: InstrumentedWriteGuard<'a, WalletManager>, wallet_id: WalletId, } diff --git a/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs b/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs index 5b8ba807ab8..750b950ab5c 100644 --- a/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs +++ b/packages/rs-platform-wallet/src/wallet/tokens/wallet.rs @@ -18,12 +18,12 @@ use std::sync::Arc; use dpp::balances::credits::TokenAmount; use dpp::prelude::Identifier; -use tokio::sync::RwLock; use dash_sdk::platform::tokens::identity_token_balances::IdentityTokenBalancesQuery; use dash_sdk::platform::FetchMany; use crate::changeset::{Merge, TokenBalanceChangeSet}; +use crate::diagnostics::InstrumentedRwLock; use crate::error::PlatformWalletError; use crate::wallet::platform_wallet::{PlatformWalletInfo, WalletId}; use key_wallet_manager::WalletManager; @@ -40,7 +40,7 @@ type IdentityTokenKey = (Identifier, Identifier); pub struct TokenWallet { pub(crate) sdk: Arc, /// The shared wallet manager lock for all mutable wallet state. - pub(crate) wallet_manager: Arc>>, + pub(crate) wallet_manager: Arc>>, /// Identifies which wallet within the manager this sub-wallet operates on. pub(crate) wallet_id: WalletId, /// Per-wallet persistence handle for queuing changesets. @@ -51,7 +51,7 @@ impl TokenWallet { /// Create a new TokenWallet. pub(crate) fn new( sdk: Arc, - wallet_manager: Arc>>, + wallet_manager: Arc>>, wallet_id: WalletId, persister: crate::wallet::persister::WalletPersister, ) -> Self {