From 61492983d8adb00aeb4625eb5b2c769a4b0d3d9d Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Tue, 16 Dec 2025 14:20:47 -0800 Subject: [PATCH] Make subscribe durable --- crates/core/src/db/relational_db.rs | 12 +++--------- crates/core/src/sql/execute.rs | 10 ++-------- .../src/subscription/module_subscription_actor.rs | 12 +++++------- crates/core/src/subscription/query.rs | 2 +- .../datastore/src/locking_tx_datastore/datastore.rs | 12 ++++++------ crates/datastore/src/locking_tx_datastore/mut_tx.rs | 12 +++++++++++- 6 files changed, 28 insertions(+), 32 deletions(-) diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index e4706cbde88..93bf29b8091 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -827,16 +827,10 @@ impl RelationalDB { } #[tracing::instrument(level = "trace", skip_all)] - pub fn commit_tx_downgrade( - &self, - tx: MutTx, - workload: Workload, - ) -> Result, TxMetrics, Tx)>, DBError> { + pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc, TxMetrics, Tx) { log::trace!("COMMIT MUT TX"); - let Some((tx_data, tx_metrics, tx)) = self.inner.commit_mut_tx_downgrade(tx, workload)? else { - return Ok(None); - }; + let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload); self.maybe_do_snapshot(&tx_data); @@ -845,7 +839,7 @@ impl RelationalDB { durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data); } - Ok(Some((tx_data, tx_metrics, tx))) + (tx_data, tx_metrics, tx) } /// Get the [`DurableOffset`] of this database, or `None` if this is an diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index 661f6788623..7b0d50ff421 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Duration; use super::ast::SchemaViewer; @@ -215,7 +214,7 @@ pub async fn run( None => tx, }; - let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql); + let (tx_data, tx_metrics_mut, tx) = db.commit_tx_downgrade(tx, Workload::Sql); let (tx_offset_send, tx_offset) = oneshot::channel(); // Release the tx on drop, so that we record metrics @@ -223,12 +222,7 @@ pub async fn run( let mut tx = scopeguard::guard(tx, |tx| { let (offset, tx_metrics_downgrade, reducer) = db.release_tx(tx); let _ = tx_offset_send.send(offset); - db.report_tx_metrics( - reducer, - Some(Arc::new(tx_data)), - Some(tx_metrics_mut), - Some(tx_metrics_downgrade), - ); + db.report_tx_metrics(reducer, Some(tx_data), Some(tx_metrics_mut), Some(tx_metrics_downgrade)); }); // Compute the header for the result set diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 47af21f9db3..f12a007985d 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1027,9 +1027,7 @@ impl ModuleSubscriptions { // We'll later ensure tx is released/cleaned up once out of scope. let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status { EventStatus::Committed(db_update) => { - let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else { - return Ok(Err(WriteConflict)); - }; + let (tx_data, tx_metrics, read_tx) = stdb.commit_tx_downgrade(tx, Workload::Update); *db_update = DatabaseUpdate::from_writes(&tx_data); (read_tx, tx_data, tx_metrics) } @@ -1102,7 +1100,7 @@ impl ModuleSubscriptions { sender: Identity, ) -> Result<(TxGuard, TransactionOffset), DBError> { Self::_unsubscribe_views(&mut tx, view_collector, sender)?; - let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe); + let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe); let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut); Ok(self.guard_tx(tx, opts)) } @@ -1136,7 +1134,7 @@ impl ModuleSubscriptions { .materialize_views(tx, view_collector, sender, Workload::Subscribe) .await? } - let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe); + let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe); let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut); Ok(self.guard_tx(tx, opts)) } @@ -1229,10 +1227,10 @@ impl GuardTxOptions { } } - fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self { + fn from_mut(tx_data: Arc, tx_metrics_mut: TxMetrics) -> Self { Self { extra_tx_offset_sender: None, - tx_data: Some(Arc::new(tx_data)), + tx_data: Some(tx_data), tx_metrics_mut: tx_metrics_mut.into(), } } diff --git a/crates/core/src/subscription/query.rs b/crates/core/src/subscription/query.rs index 143f0ea96e6..a9048160150 100644 --- a/crates/core/src/subscription/query.rs +++ b/crates/core/src/subscription/query.rs @@ -1087,7 +1087,7 @@ mod tests { } } - let (data, _, tx) = tx.commit_downgrade(Workload::ForTests); + let (data, _, tx) = db.commit_tx_downgrade(tx, Workload::ForTests); let table_id = plan.subscribed_table_id(); // This awful construction to convert `Arc` into `Box`. let table_name = (&**plan.subscribed_table_name()).into(); diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 25637031039..f96c083f9c7 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -950,6 +950,8 @@ impl MutTx for Locking { tx.rollback() } + /// This method only updates the in-memory `committed_state`. + /// For durability, see `RelationalDB::commit_tx`. fn commit_mut_tx(&self, tx: Self::MutTx) -> Result> { Ok(Some(tx.commit())) } @@ -960,12 +962,10 @@ impl Locking { tx.rollback_downgrade(workload) } - pub fn commit_mut_tx_downgrade( - &self, - tx: MutTxId, - workload: Workload, - ) -> Result> { - Ok(Some(tx.commit_downgrade(workload))) + /// This method only updates the in-memory `committed_state`. + /// For durability, see `RelationalDB::commit_tx_downgrade`. + pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) { + tx.commit_downgrade(workload) } } diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 1b3aab564fc..0d83d6824be 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1902,6 +1902,11 @@ impl MutTxId { /// Commits this transaction in memory, applying its changes to the committed state. /// This doesn't handle the persistence layer at all. /// + /// IMPORTANT: This method updates the in-memory state of the database but does not make it durable. + /// That is, the tx will not be persisted to the commitlog. + /// Hence you should be careful when calling this method directly. + /// In most cases you'll want to use `RelationalDB::commit_tx` which makes the tx durable. + /// /// Returns: /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. @@ -1947,11 +1952,16 @@ impl MutTxId { /// The lock on the committed state is converted into a read lock, /// and returned as a new read-only transaction. /// + /// IMPORTANT: This method updates the in-memory state of the database but does not make it durable. + /// That is, the tx will not be persisted to the commitlog. + /// Hence you should be careful when calling this method directly. + /// In most cases you'll want to use `RelationalDB::commit_tx_downgrade` which makes the tx durable. + /// /// Returns: /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - [`TxId`], a read-only transaction with a shared lock on the committed state. - pub fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) { + pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) { let tx_data = self .committed_state_write_lock .merge(self.tx_state, self.read_sets, &self.ctx);