Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::host::ProcedureCallError;
use crate::messages::control_db::{Database, HostType};
use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::resource::{MemoryObserver, ModuleInstanceMemoryTracker};
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager, TransactionOffset};
use crate::subscription::row_list_builder_pool::BsatnRowListBuilderPool;
Expand Down Expand Up @@ -108,6 +109,8 @@ pub struct HostController {
program_storage: ProgramStorage,
/// The [`EnergyMonitor`] used by this controller.
energy_monitor: Arc<dyn EnergyMonitor>,
/// The [`MemoryObserver`] used by this controller.
memory_observer: Arc<dyn MemoryObserver>,
/// Provides persistence services for each replica.
persistence: Arc<dyn PersistenceProvider>,
/// The page pool all databases will use by cloning the ref counted pool.
Expand Down Expand Up @@ -221,12 +224,14 @@ pub struct CallProcedureReturn {
}

impl HostController {
#[allow(clippy::too_many_arguments)]
pub fn new(
data_dir: Arc<ServerDataDir>,
default_config: db::Config,
runtime_config: HostRuntimeConfig,
program_storage: ProgramStorage,
energy_monitor: Arc<impl EnergyMonitor>,
memory_observer: Arc<dyn MemoryObserver>,
persistence: Arc<dyn PersistenceProvider>,
db_cores: JobCores,
) -> Self {
Expand All @@ -235,6 +240,7 @@ impl HostController {
default_config,
program_storage,
energy_monitor,
memory_observer,
persistence,
runtimes: HostRuntimes::new(Some(&data_dir), runtime_config),
data_dir,
Expand Down Expand Up @@ -653,6 +659,7 @@ async fn make_replica_ctx(
replica_id: u64,
relational_db: Arc<RelationalDB>,
bsatn_rlb_pool: BsatnRowListBuilderPool,
memory_observer: Arc<dyn MemoryObserver>,
) -> anyhow::Result<ReplicaContext> {
let logger = match module_logs {
Some(path) => asyncify(move || Arc::new(DatabaseLogger::open_today(path))).await,
Expand Down Expand Up @@ -680,11 +687,14 @@ async fn make_replica_ctx(
}
});

let module_instance_memory_tracker = ModuleInstanceMemoryTracker::new(database.database_identity, memory_observer);

Ok(ReplicaContext {
database,
replica_id,
logger,
subscriptions,
module_instance_memory_tracker,
})
}

Expand Down Expand Up @@ -756,6 +766,7 @@ struct ModuleLauncher<F> {
on_panic: F,
relational_db: Arc<RelationalDB>,
energy_monitor: Arc<dyn EnergyMonitor>,
memory_observer: Arc<dyn MemoryObserver>,
module_logs: Option<ModuleLogsDir>,
runtimes: Arc<HostRuntimes>,
core: AllocatedJobCore,
Expand All @@ -779,6 +790,7 @@ impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
self.replica_id,
self.relational_db,
self.bsatn_rlb_pool,
self.memory_observer,
)
.await
.map(Arc::new)?;
Expand Down Expand Up @@ -886,6 +898,7 @@ impl Host {
persistence,
page_pool,
bsatn_rlb_pool,
memory_observer,
..
} = host_controller;
let replica_dir = data_dir.replica(replica_id);
Expand Down Expand Up @@ -942,6 +955,7 @@ impl Host {
(db, clients)
}
};
let db = db.with_memory_observer(memory_observer.clone());
let (mut program, program_needs_init) = match db.program()? {
// Launch module with program from existing database.
Some(program) => {
Expand Down Expand Up @@ -979,6 +993,7 @@ impl Host {
on_panic: host_controller.unregister_fn(replica_id),
relational_db,
energy_monitor: energy_monitor.clone(),
memory_observer: memory_observer.clone(),
module_logs: match config.storage {
db::Storage::Memory => None,
db::Storage::Disk => Some(replica_dir.module_logs()),
Expand Down Expand Up @@ -1008,6 +1023,7 @@ impl Host {
on_panic: host_controller.unregister_fn(replica_id),
relational_db: relational_db.clone(),
energy_monitor: energy_monitor.clone(),
memory_observer: memory_observer.clone(),
module_logs: match config.storage {
db::Storage::Memory => None,
db::Storage::Disk => Some(replica_dir.clone().module_logs()),
Expand All @@ -1031,6 +1047,7 @@ impl Host {
on_panic: host_controller.unregister_fn(replica_id),
relational_db: relational_db.clone(),
energy_monitor: energy_monitor.clone(),
memory_observer: memory_observer.clone(),
module_logs: match config.storage {
db::Storage::Memory => None,
db::Storage::Disk => Some(replica_dir.module_logs()),
Expand Down Expand Up @@ -1131,6 +1148,7 @@ impl Host {
None,
page_pool,
)?;
let memory_observer: Arc<dyn MemoryObserver> = Arc::new(());

ModuleLauncher {
database,
Expand All @@ -1142,6 +1160,7 @@ impl Host {
on_panic: || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
relational_db: Arc::new(db),
energy_monitor: Arc::new(NullEnergyMonitor),
memory_observer,
module_logs: None,
runtimes: runtimes.clone(),
core,
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/host/instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,7 @@ mod test {
host::Scheduler,
messages::control_db::{Database, HostType},
replica_context::ReplicaContext,
resource::ModuleInstanceMemoryTracker,
subscription::module_subscription_actor::ModuleSubscriptions,
};
use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -1380,6 +1381,7 @@ mod test {
replica_id: 0,
logger,
subscriptions: subs,
module_instance_memory_tracker: ModuleInstanceMemoryTracker::new(Identity::ZERO, Arc::new(())),
},
runtime,
))
Expand Down
16 changes: 14 additions & 2 deletions crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ use crate::host::{ModuleHost, ReducerCallError, ReducerCallResult, Scheduler};
use crate::messages::control_db::HostType;
use crate::module_host_context::ModuleCreationContext;
use crate::replica_context::ReplicaContext;
use crate::resource::ModuleInstanceMemoryTracker;
use crate::subscription::module_subscription_manager::TransactionOffset;
use crate::util::jobs::{AllocatedJobCore, CorePinner, LoadBalanceOnDropGuard};
use crate::worker_metrics::WORKER_METRICS;
Expand Down Expand Up @@ -965,6 +966,7 @@ pub(in crate::host) struct V8HeapMetrics {
external_memory_bytes: IntGauge,
native_contexts: IntGauge,
detached_contexts: IntGauge,
module_instance_memory_tracker: ModuleInstanceMemoryTracker,

/// Previous values observed by this instance.
///
Expand Down Expand Up @@ -1035,7 +1037,7 @@ impl V8HeapMetrics {
}
}

fn new(database_identity: &Identity, worker_kind: JsWorkerKind) -> Self {
fn new(database_identity: &Identity, worker_kind: JsWorkerKind, mem_tracker: ModuleInstanceMemoryTracker) -> Self {
Self {
total_heap_size_bytes: WORKER_METRICS
.v8_total_heap_size_bytes
Expand All @@ -1061,6 +1063,7 @@ impl V8HeapMetrics {
detached_contexts: WORKER_METRICS
.v8_detached_contexts
.with_label_values(database_identity, &worker_kind),
module_instance_memory_tracker: mem_tracker,
last_observed: V8HeapSnapshot::default(),
}
}
Expand All @@ -1077,6 +1080,11 @@ impl V8HeapMetrics {
adjust_gauge(&self.external_memory_bytes, delta.external_memory_bytes);
adjust_gauge(&self.native_contexts, delta.native_contexts);
adjust_gauge(&self.detached_contexts, delta.detached_contexts);
// Each live V8 isolate reports only the delta from its last heap sample.
// The shared tracker folds those deltas into a database-wide aggregate
// used tor memory-limit enforcement.
self.module_instance_memory_tracker
.adjust_v8_physical(delta.total_physical_size_bytes);
}

fn observe(&mut self, stats: &v8::HeapStatistics) {
Expand Down Expand Up @@ -1679,7 +1687,11 @@ where
let info = &module_common.info();
let mut instance_common = InstanceCommon::new(&module_common);
let replica_ctx: &Arc<ReplicaContext> = module_common.replica_ctx();
let mut heap_metrics = V8HeapMetrics::new(&info.database_identity, worker_kind);
let mut heap_metrics = V8HeapMetrics::new(
&info.database_identity,
worker_kind,
replica_ctx.module_instance_memory_tracker.clone(),
);

let mut inst = V8Instance {
scope,
Expand Down
14 changes: 12 additions & 2 deletions crates/core/src/host/wasmtime/wasm_instance_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::host::wasm_common::module_host_actor::{
};
use crate::host::wasm_common::{err_to_errno_and_log, RowIterIdx, RowIters, TimingSpan, TimingSpanIdx, TimingSpanSet};
use crate::host::AbiCall;
use crate::resource::ModuleInstanceMemoryTracker;
use crate::subscription::module_subscription_manager::TransactionOffset;
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context as _};
Expand Down Expand Up @@ -144,6 +145,7 @@ pub(super) struct WasmInstanceEnv {

pub(in crate::host) struct WasmMemoryBytesMetric {
wasm_memory_bytes: IntGauge,
tracker: ModuleInstanceMemoryTracker,

/// Previous value observed by this intance.
///
Expand All @@ -158,9 +160,10 @@ pub(in crate::host) struct WasmMemoryBytesMetric {
}

impl WasmMemoryBytesMetric {
fn new(database_identity: Identity) -> Self {
fn new(database_identity: Identity, tracker: ModuleInstanceMemoryTracker) -> Self {
Self {
wasm_memory_bytes: WORKER_METRICS.wasm_memory_bytes.with_label_values(&database_identity),
tracker,
last_observed: 0,
}
}
Expand All @@ -176,6 +179,10 @@ impl WasmMemoryBytesMetric {
self.wasm_memory_bytes.sub(-delta);
}

// Each WASM instance reports only its own delta.
// The shared tracker folds those deltas into a database-wide aggregate
// used for memory-limit enforcement.
self.tracker.adjust_wasm_linear(delta);
self.last_observed = memory_usage;
}

Expand All @@ -188,6 +195,8 @@ impl Drop for WasmMemoryBytesMetric {
fn drop(&mut self) {
// Clean up this instance's metric value by subtracting its part of the usage.
self.wasm_memory_bytes.sub(self.last_observed);
// Remove this instance's contribution from the database-wide aggregate.
self.tracker.adjust_wasm_linear(-self.last_observed);
}
}

Expand All @@ -200,6 +209,7 @@ impl WasmInstanceEnv {
/// Create a new `WasmEnstanceEnv` from the given `InstanceEnv`.
pub fn new(instance_env: InstanceEnv) -> Self {
let database_identity = *instance_env.database_identity();
let instance_memory_tracker = instance_env.replica_ctx.module_instance_memory_tracker.clone();
Self {
instance_env,
module_def: None,
Expand All @@ -214,7 +224,7 @@ impl WasmInstanceEnv {
timing_spans: Default::default(),
call_times: CallTimes::new(),
chunk_pool: <_>::default(),
linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity),
linear_memory_size_metric: WasmMemoryBytesMetric::new(database_identity, instance_memory_tracker),
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod estimation;
pub mod host;
pub mod module_host_context;
pub mod replica_context;
pub use spacetimedb_engine::resource;
pub mod startup;
pub mod subscription;
pub mod util;
Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/replica_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::database_logger::DatabaseLogger;
use crate::db::relational_db::RelationalDB;
use crate::error::DBError;
use crate::messages::control_db::Database;
use crate::resource::ModuleInstanceMemoryTracker;
use crate::subscription::module_subscription_actor::ModuleSubscriptions;
use std::io;
use std::ops::Deref;
Expand All @@ -18,6 +19,7 @@ pub struct ReplicaContext {
pub replica_id: u64,
pub logger: Arc<DatabaseLogger>,
pub subscriptions: ModuleSubscriptions,
pub module_instance_memory_tracker: ModuleInstanceMemoryTracker,
}

impl ReplicaContext {
Expand Down
Loading
Loading