From 0697f1344b92a2cf08fe4215e6b48e5ad6bd2f72 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 18 Jun 2026 14:50:13 -0700 Subject: [PATCH 1/2] Report module instance memory usage --- crates/core/src/host/host_controller.rs | 18 +++++ crates/core/src/host/instance_env.rs | 2 + crates/core/src/host/v8/mod.rs | 16 +++- .../src/host/wasmtime/wasm_instance_env.rs | 14 +++- crates/core/src/lib.rs | 1 + crates/core/src/replica_context.rs | 2 + crates/engine/src/lib.rs | 1 + crates/engine/src/resource.rs | 79 +++++++++++++++++++ crates/standalone/src/lib.rs | 1 + 9 files changed, 130 insertions(+), 4 deletions(-) create mode 100644 crates/engine/src/resource.rs diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 425cbd7de07..5a29e505d8b 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -15,6 +15,7 @@ use crate::host::ProcedureCallError; use crate::messages::control_db::{Database, HostType}; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; +use crate::resource::{MemoryObserver, ModuleInstanceMemoryTracker}; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset}; use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool; @@ -108,6 +109,8 @@ pub struct HostController { program_storage: ProgramStorage, /// The [`EnergyMonitor`] used by this controller. energy_monitor: Arc, + /// The [`MemoryObserver`] used by this controller. + memory_observer: Arc, /// Provides persistence services for each replica. persistence: Arc, /// The page pool all databases will use by cloning the ref counted pool. @@ -221,12 +224,14 @@ pub struct CallProcedureReturn { } impl HostController { + #[allow(clippy::too_many_arguments)] pub fn new( data_dir: Arc, default_config: db::Config, runtime_config: HostRuntimeConfig, program_storage: ProgramStorage, energy_monitor: Arc, + memory_observer: Arc, persistence: Arc, db_cores: JobCores, ) -> Self { @@ -235,6 +240,7 @@ impl HostController { default_config, program_storage, energy_monitor, + memory_observer, persistence, runtimes: HostRuntimes::new(Some(&data_dir), runtime_config), data_dir, @@ -653,6 +659,7 @@ async fn make_replica_ctx( replica_id: u64, relational_db: Arc, bsatn_rlb_pool: BsatnRowListBuilderPool, + memory_observer: Arc, ) -> anyhow::Result { let logger = match module_logs { Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await, @@ -680,11 +687,14 @@ async fn make_replica_ctx( } }); + let module_instance_memory_tracker = ModuleInstanceMemoryTracker::new(database.database_identity, memory_observer); + Ok(ReplicaContext { database, replica_id, logger, subscriptions, + module_instance_memory_tracker, }) } @@ -756,6 +766,7 @@ struct ModuleLauncher { on_panic: F, relational_db: Arc, energy_monitor: Arc, + memory_observer: Arc, module_logs: Option, runtimes: Arc, core: AllocatedJobCore, @@ -779,6 +790,7 @@ impl ModuleLauncher { self.replica_id, self.relational_db, self.bsatn_rlb_pool, + self.memory_observer, ) .await .map(Arc::new)?; @@ -886,6 +898,7 @@ impl Host { persistence, page_pool, bsatn_rlb_pool, + memory_observer, .. } = host_controller; let replica_dir = data_dir.replica(replica_id); @@ -979,6 +992,7 @@ impl Host { on_panic: host_controller.unregister_fn(replica_id), relational_db, energy_monitor: energy_monitor.clone(), + memory_observer: memory_observer.clone(), module_logs: match config.storage { db::Storage::Memory => None, db::Storage::Disk => Some(replica_dir.module_logs()), @@ -1008,6 +1022,7 @@ impl Host { on_panic: host_controller.unregister_fn(replica_id), relational_db: relational_db.clone(), energy_monitor: energy_monitor.clone(), + memory_observer: memory_observer.clone(), module_logs: match config.storage { db::Storage::Memory => None, db::Storage::Disk => Some(replica_dir.clone().module_logs()), @@ -1031,6 +1046,7 @@ impl Host { on_panic: host_controller.unregister_fn(replica_id), relational_db: relational_db.clone(), energy_monitor: energy_monitor.clone(), + memory_observer: memory_observer.clone(), module_logs: match config.storage { db::Storage::Memory => None, db::Storage::Disk => Some(replica_dir.module_logs()), @@ -1131,6 +1147,7 @@ impl Host { None, page_pool, )?; + let memory_observer: Arc = Arc::new(()); ModuleLauncher { database, @@ -1142,6 +1159,7 @@ impl Host { on_panic: || log::error!("launch_module on_panic called for temporary publish in-memory instance"), relational_db: Arc::new(db), energy_monitor: Arc::new(NullEnergyMonitor), + memory_observer, module_logs: None, runtimes: runtimes.clone(), core, diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 9c24c5b3a11..a0601f9a3b5 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -1347,6 +1347,7 @@ mod test { host::Scheduler, messages::control_db::{Database, HostType}, replica_context::ReplicaContext, + resource::ModuleInstanceMemoryTracker, subscription::module_subscription_actor::ModuleSubscriptions, }; use anyhow::{anyhow, Result}; @@ -1380,6 +1381,7 @@ mod test { replica_id: 0, logger, subscriptions: subs, + module_instance_memory_tracker: ModuleInstanceMemoryTracker::new(Identity::ZERO, Arc::new(())), }, runtime, )) diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 63cab1ba96f..5004ccd29c7 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -89,6 +89,7 @@ use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler}; use crate::messages::control_db::HostType; use crate::module_host_context::ModuleCreationContext; use crate::replica_context::ReplicaContext; +use crate::resource::ModuleInstanceMemoryTracker; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard}; use crate::worker_metrics::WORKER_METRICS; @@ -965,6 +966,7 @@ pub(in crate::host) struct V8HeapMetrics { external_memory_bytes: IntGauge, native_contexts: IntGauge, detached_contexts: IntGauge, + module_instance_memory_tracker: ModuleInstanceMemoryTracker, /// Previous values observed by this instance. /// @@ -1035,7 +1037,7 @@ impl V8HeapMetrics { } } - fn new(database_identity: &Identity, worker_kind: JsWorkerKind) -> Self { + fn new(database_identity: &Identity, worker_kind: JsWorkerKind, mem_tracker: ModuleInstanceMemoryTracker) -> Self { Self { total_heap_size_bytes: WORKER_METRICS .v8_total_heap_size_bytes @@ -1061,6 +1063,7 @@ impl V8HeapMetrics { detached_contexts: WORKER_METRICS .v8_detached_contexts .with_label_values(database_identity, &worker_kind), + module_instance_memory_tracker: mem_tracker, last_observed: V8HeapSnapshot::default(), } } @@ -1077,6 +1080,11 @@ impl V8HeapMetrics { adjust_gauge(&self.external_memory_bytes, delta.external_memory_bytes); adjust_gauge(&self.native_contexts, delta.native_contexts); adjust_gauge(&self.detached_contexts, delta.detached_contexts); + // Each live V8 isolate reports only the delta from its last heap sample. + // The shared tracker folds those deltas into a database-wide aggregate + // used tor memory-limit enforcement. + self.module_instance_memory_tracker + .adjust_v8_physical(delta.total_physical_size_bytes); } fn observe(&mut self, stats: &v8::HeapStatistics) { @@ -1679,7 +1687,11 @@ where let info = &module_common.info(); let mut instance_common = InstanceCommon::new(&module_common); let replica_ctx: &Arc = module_common.replica_ctx(); - let mut heap_metrics = V8HeapMetrics::new(&info.database_identity, worker_kind); + let mut heap_metrics = V8HeapMetrics::new( + &info.database_identity, + worker_kind, + replica_ctx.module_instance_memory_tracker.clone(), + ); let mut inst = V8Instance { scope, diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index bfff5fc9dc4..3158b852158 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -13,6 +13,7 @@ use crate::host::wasm_common::module_host_actor::{ }; use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet}; use crate::host::AbiCall; +use crate::resource::ModuleInstanceMemoryTracker; use crate::subscription::module_subscription_manager::TransactionOffset; use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context as _}; @@ -144,6 +145,7 @@ pub(super) struct WasmInstanceEnv { pub(in crate::host) struct WasmMemoryBytesMetric { wasm_memory_bytes: IntGauge, + tracker: ModuleInstanceMemoryTracker, /// Previous value observed by this intance. /// @@ -158,9 +160,10 @@ pub(in crate::host) struct WasmMemoryBytesMetric { } impl WasmMemoryBytesMetric { - fn new(database_identity: Identity) -> Self { + fn new(database_identity: Identity, tracker: ModuleInstanceMemoryTracker) -> Self { Self { wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity), + tracker, last_observed: 0, } } @@ -176,6 +179,10 @@ impl WasmMemoryBytesMetric { self.wasm_memory_bytes.sub(-delta); } + // Each WASM instance reports only its own delta. + // The shared tracker folds those deltas into a database-wide aggregate + // used for memory-limit enforcement. + self.tracker.adjust_wasm_linear(delta); self.last_observed = memory_usage; } @@ -188,6 +195,8 @@ impl Drop for WasmMemoryBytesMetric { fn drop(&mut self) { // Clean up this instance's metric value by subtracting its part of the usage. self.wasm_memory_bytes.sub(self.last_observed); + // Remove this instance's contribution from the database-wide aggregate. + self.tracker.adjust_wasm_linear(-self.last_observed); } } @@ -200,6 +209,7 @@ impl WasmInstanceEnv { /// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`. pub fn new(instance_env: InstanceEnv) -> Self { let database_identity = *instance_env.database_identity(); + let instance_memory_tracker = instance_env.replica_ctx.module_instance_memory_tracker.clone(); Self { instance_env, module_def: None, @@ -214,7 +224,7 @@ impl WasmInstanceEnv { timing_spans: Default::default(), call_times: CallTimes::new(), chunk_pool: <_>::default(), - linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity), + linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity, instance_memory_tracker), } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d630faab09a..df35e6ca813 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -19,6 +19,7 @@ pub mod estimation; pub mod host; pub mod module_host_context; pub mod replica_context; +pub use spacetimedb_engine::resource; pub mod startup; pub mod subscription; pub mod util; diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 731288bb459..af54b78ac11 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -4,6 +4,7 @@ use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::messages::control_db::Database; +use crate::resource::ModuleInstanceMemoryTracker; use crate::subscription::module_subscription_actor::ModuleSubscriptions; use std::io; use std::ops::Deref; @@ -18,6 +19,7 @@ pub struct ReplicaContext { pub replica_id: u64, pub logger: Arc, pub subscriptions: ModuleSubscriptions, + pub module_instance_memory_tracker: ModuleInstanceMemoryTracker, } impl ReplicaContext { diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 09229362bd7..d2d6499fd12 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -3,6 +3,7 @@ pub mod error; pub mod metrics; pub mod persistence; pub mod relational_db; +pub mod resource; pub mod snapshot; pub mod sql; pub mod update; diff --git a/crates/engine/src/resource.rs b/crates/engine/src/resource.rs new file mode 100644 index 00000000000..33f3aca6498 --- /dev/null +++ b/crates/engine/src/resource.rs @@ -0,0 +1,79 @@ +use spacetimedb_lib::Identity; +use std::sync::{ + atomic::{AtomicU64, Ordering}, + Arc, +}; + +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum DatabaseMemoryType { + /// Module instance memory such as Wasmtime linear memory and V8 physical heap memory. + Instance, +} + +#[derive(Clone, Debug)] +pub struct MemoryObservation { + pub database_identity: Identity, + pub kind: DatabaseMemoryType, + pub bytes: u64, +} + +pub trait MemoryObserver: Send + Sync + 'static { + fn memory_observed(&self, _: MemoryObservation) {} +} + +impl MemoryObserver for () {} + +#[derive(Clone)] +pub struct ModuleInstanceMemoryTracker { + inner: Arc, +} + +struct ModuleInstanceMemoryTrackerInner { + database_identity: Identity, + observer: Arc, + /// Database-wide aggregate across all live module instances. + /// + /// Wasm and V8 instances each keep their own `last_observed` value and + /// report only the delta when they are sampled or dropped, + /// so that we can compare against a single limit for the entire database. + instance_bytes: AtomicU64, +} + +impl ModuleInstanceMemoryTracker { + pub fn new(database_identity: Identity, observer: Arc) -> Self { + Self { + inner: Arc::new(ModuleInstanceMemoryTrackerInner { + database_identity, + observer, + instance_bytes: AtomicU64::new(0), + }), + } + } + + pub fn adjust_wasm_linear(&self, delta: i64) { + self.adjust_instance(delta); + } + + pub fn adjust_v8_physical(&self, total_physical_delta: i64) { + self.adjust_instance(total_physical_delta); + } + + fn adjust_instance(&self, delta: i64) { + let bytes = adjust_atomic_u64(&self.inner.instance_bytes, delta); + self.inner.observer.memory_observed(MemoryObservation { + database_identity: self.inner.database_identity, + kind: DatabaseMemoryType::Instance, + bytes, + }); + } +} + +fn adjust_atomic_u64(value: &AtomicU64, delta: i64) -> u64 { + if delta >= 0 { + let delta = delta as u64; + value.fetch_add(delta, Ordering::Relaxed) + delta + } else { + let delta = delta.unsigned_abs(); + value.fetch_sub(delta, Ordering::Relaxed) - delta + } +} diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 309ca3cb814..541dd5b6a39 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -86,6 +86,7 @@ impl StandaloneEnv { HostRuntimeConfig::new(config.wasm, config.v8), program_store.clone(), energy_monitor, + Arc::new(()), persistence_provider, db_cores, ); From 1aea13a2f9773c569c2ca6e0beda4b0fa35f31ce Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 18 Jun 2026 18:03:53 -0700 Subject: [PATCH 2/2] Add page-level accounting --- crates/core/src/host/host_controller.rs | 1 + .../locking_tx_datastore/committed_state.rs | 54 ++++++++++++++++--- .../src/locking_tx_datastore/datastore.rs | 8 +++ .../src/locking_tx_datastore/mut_tx.rs | 2 + .../src/locking_tx_datastore/replay.rs | 1 + crates/engine/src/relational_db.rs | 21 ++++++++ crates/engine/src/resource.rs | 8 ++- crates/table/src/table.rs | 5 ++ 8 files changed, 90 insertions(+), 10 deletions(-) diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 5a29e505d8b..c2f813e0bee 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -955,6 +955,7 @@ impl Host { (db, clients) } }; + let db = db.with_memory_observer(memory_observer.clone()); let (mut program, program_needs_init) = match db.program()? { // Launch module with program from existing database. Some(program) => { diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index ea8fa07db8c..f5ed82f196e 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -75,6 +75,8 @@ pub struct CommittedState { /// Pages are shared between all modules running on a particular host, /// not allocated per-module. pub(super) page_pool: PagePool, + /// Total bytes occupied by physical pages in committed tables. + datastore_page_bytes: u64, /// We track the read sets for each view in the committed state. /// We check each reducer's write set against these read sets. /// Any overlap will trigger a re-evaluation of the affected view, @@ -117,6 +119,7 @@ impl MemoryUsage for CommittedState { blob_store, index_id_map, page_pool: _, + datastore_page_bytes, read_sets, ephemeral_tables, } = self; @@ -125,6 +128,7 @@ impl MemoryUsage for CommittedState { + tables.heap_usage() + blob_store.heap_usage() + index_id_map.heap_usage() + + datastore_page_bytes.heap_usage() + read_sets.heap_usage() + ephemeral_tables.heap_usage() } @@ -195,10 +199,33 @@ impl CommittedState { index_id_map: <_>::default(), read_sets: <_>::default(), page_pool, + datastore_page_bytes: 0, ephemeral_tables: <_>::default(), } } + /// Returns committed datastore table page bytes. + pub fn datastore_page_bytes(&self) -> u64 { + self.datastore_page_bytes + } + + /// Recomputes `datastore_page_bytes` from committed tables. + /// + /// This is only used for bootstrap, snapshot restore, and replay paths. + pub(super) fn rebuild_datastore_page_bytes(&mut self) { + self.datastore_page_bytes = self.tables.values().map(Table::page_bytes).sum(); + } + + /// Called when a new page is added to committed state. + fn add_datastore_page_bytes(&mut self, bytes: u64) { + self.datastore_page_bytes += bytes; + } + + /// Called when a page is removed from committed state. + pub(super) fn sub_datastore_page_bytes(&mut self, bytes: u64) { + self.datastore_page_bytes -= bytes; + } + /// Extremely delicate function to bootstrap the system tables. /// Don't update this unless you know what you're doing. pub(super) fn bootstrap_system_tables(&mut self, database_identity: Identity) -> Result<()> { @@ -339,6 +366,7 @@ impl CommittedState { // This is purely a sanity check to ensure that we are setting the ids correctly. self.assert_system_table_schemas_match()?; + self.rebuild_datastore_page_bytes(); Ok(()) } @@ -621,13 +649,20 @@ impl CommittedState { // we just want to include them in subscriptions and the commitlog. Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |_| {}); } else { - let (commit_table, commit_blob_store, page_pool) = - self.get_table_and_blob_store_or_create(table_id, schema); - Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |row| { - commit_table - .insert(page_pool, commit_blob_store, row) - .expect("Failed to insert when merging commit"); - }); + let page_bytes_added = { + let (commit_table, commit_blob_store, page_pool) = + self.get_table_and_blob_store_or_create(table_id, schema); + let page_bytes_before = commit_table.page_bytes(); + Self::collect_inserts(page_pool, truncates, tx_data, &tx_bs, table_id, tx_table, |row| { + commit_table + .insert(page_pool, commit_blob_store, row) + .expect("Failed to insert when merging commit"); + }); + let page_bytes_after = commit_table.page_bytes(); + debug_assert!(page_bytes_after >= page_bytes_before); + page_bytes_after - page_bytes_before + }; + self.add_datastore_page_bytes(page_bytes_added); } } } @@ -713,6 +748,7 @@ impl CommittedState { // We don't need to deal with sub-components. // That is, we don't need to add back indices and such. // Instead, there will be separate pending schema changes like `IndexRemoved`. + self.add_datastore_page_bytes(table.page_bytes()); self.tables.insert(table_id, table); // Incase, the table was ephemeral, add it back to that set as well. @@ -725,7 +761,9 @@ impl CommittedState { // We don't need to deal with sub-components. // That is, we don't need to remove indices and such. // Instead, there will be separate pending schema changes like `IndexAdded`. - self.tables.remove(&table_id); + if let Some(table) = self.tables.remove(&table_id) { + self.sub_datastore_page_bytes(table.page_bytes()); + } // Incase, the table was ephemeral, remove it from that set as well. self.ephemeral_tables.remove(&table_id); } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 4cd2ddb96c5..7f675515ebb 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -198,6 +198,7 @@ impl Locking { .with_label_values(&database_identity, &table_id.into(), &schema.table_name) .set(table_size as i64); } + committed_state.rebuild_datastore_page_bytes(); // Double check that our in-memory system table ids match the on-disk schemas. // committed_state.assert_system_table_schemas_match()?; @@ -238,6 +239,13 @@ impl Locking { committed_state.assert_system_table_schemas_match() } + /// Returns committed datastore table page bytes. + /// + /// This reads the cached committed-state aggregate. + pub fn datastore_page_bytes(&self) -> u64 { + self.committed_state.read().datastore_page_bytes() + } + pub fn take_snapshot_internal( committed_state: &RwLock, repo: &DynSnapshotRepo, diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 8325058aa10..3a3490164f6 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1040,6 +1040,8 @@ impl MutTxId { .tables .remove(&table_id) .expect("there should be a schema in the committed state if we reach here"); + self.committed_state_write_lock + .sub_datastore_page_bytes(commit_table.page_bytes()); self.push_schema_change(PendingSchemaChange::TableRemoved(table_id, commit_table)); Ok(()) diff --git a/crates/datastore/src/locking_tx_datastore/replay.rs b/crates/datastore/src/locking_tx_datastore/replay.rs index 8f55d1cf075..d803360551d 100644 --- a/crates/datastore/src/locking_tx_datastore/replay.rs +++ b/crates/datastore/src/locking_tx_datastore/replay.rs @@ -510,6 +510,7 @@ impl<'cs> ReplayCommittedState<'cs> { self.build_missing_tables()?; self.build_indexes()?; self.collect_ephemeral_tables()?; + self.rebuild_datastore_page_bytes(); // Figure out where to pick up for each sequence. build_sequence_state(datastore, self)?; diff --git a/crates/engine/src/relational_db.rs b/crates/engine/src/relational_db.rs index edcaac618f4..11295239503 100644 --- a/crates/engine/src/relational_db.rs +++ b/crates/engine/src/relational_db.rs @@ -2,6 +2,7 @@ use crate::durability::{request_durability, spawn_close as spawn_durability_clos use crate::error::{DBError, RestoreSnapshotError}; use crate::metrics::ExecutionCounters; use crate::metrics::ENGINE_METRICS; +use crate::resource::{DatabaseMemoryType, MemoryObservation, MemoryObserver}; use crate::util::asyncify; use crate::MetricsRecorderQueue; use anyhow::{anyhow, Context}; @@ -114,6 +115,9 @@ pub struct RelationalDB { /// An async queue for recording transaction metrics off the main thread metrics_recorder_queue: Option, + + /// An observer for memory usage changes in this database. + memory_observer: Arc, } /// Perform a snapshot every `SNAPSHOT_FREQUENCY` transactions. @@ -169,9 +173,24 @@ impl RelationalDB { workload_type_to_exec_counters, metrics_recorder_queue, + memory_observer: Arc::new(()), } } + pub fn with_memory_observer(mut self, memory_observer: Arc) -> Self { + self.memory_observer = memory_observer; + self.observe_datastore_pages(); + self + } + + fn observe_datastore_pages(&self) { + self.memory_observer.memory_observed(MemoryObservation { + database_identity: self.database_identity, + kind: DatabaseMemoryType::Datastore, + bytes: self.inner.datastore_page_bytes(), + }); + } + /// Open a database, which may or may not already exist. /// /// # Initialization @@ -830,6 +849,7 @@ impl RelationalDB { }; self.maybe_do_snapshot(&tx_data); + self.observe_datastore_pages(); Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } @@ -844,6 +864,7 @@ impl RelationalDB { }); self.maybe_do_snapshot(&tx_data); + self.observe_datastore_pages(); (tx_data, tx_metrics, tx) } diff --git a/crates/engine/src/resource.rs b/crates/engine/src/resource.rs index 33f3aca6498..4381800ede7 100644 --- a/crates/engine/src/resource.rs +++ b/crates/engine/src/resource.rs @@ -7,7 +7,11 @@ use std::sync::{ #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] pub enum DatabaseMemoryType { /// Module instance memory such as Wasmtime linear memory and V8 physical heap memory. - Instance, + Module, + /// Memory allocated and managed by the datastore. + /// + /// Currently only page-level memory. + Datastore, } #[derive(Clone, Debug)] @@ -62,7 +66,7 @@ impl ModuleInstanceMemoryTracker { let bytes = adjust_atomic_u64(&self.inner.instance_bytes, delta); self.inner.observer.memory_observed(MemoryObservation { database_identity: self.inner.database_identity, - kind: DatabaseMemoryType::Instance, + kind: DatabaseMemoryType::Module, bytes, }); } diff --git a/crates/table/src/table.rs b/crates/table/src/table.rs index f197951ff9c..b5b26ee746a 100644 --- a/crates/table/src/table.rs +++ b/crates/table/src/table.rs @@ -1622,6 +1622,11 @@ impl Table { (self.num_pages() * PAGE_DATA_SIZE) + (self.blob_store_bytes.0) } + /// Returns the bytes occupied by this table's allocated physical row pages. + pub fn page_bytes(&self) -> u64 { + (self.num_pages() * mem::size_of::()) as u64 + } + /// Reset the internal storage of `self` to be `pages`. /// /// This recomputes the pointer map based on the `pages`,