Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,16 +827,10 @@ impl RelationalDB {
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn commit_tx_downgrade(
&self,
tx: MutTx,
workload: Workload,
) -> Result<Option<(Arc<TxData>, TxMetrics, Tx)>, DBError> {
pub fn commit_tx_downgrade(&self, tx: MutTx, workload: Workload) -> (Arc<TxData>, TxMetrics, Tx) {
log::trace!("COMMIT MUT TX");

let Some((tx_data, tx_metrics, tx)) = self.inner.commit_mut_tx_downgrade(tx, workload)? else {
return Ok(None);
};
let (tx_data, tx_metrics, tx) = self.inner.commit_mut_tx_downgrade(tx, workload);

self.maybe_do_snapshot(&tx_data);

Expand All @@ -845,7 +839,7 @@ impl RelationalDB {
durability.request_durability(tx.ctx.reducer_context().cloned(), &tx_data);
}

Ok(Some((tx_data, tx_metrics, tx)))
(tx_data, tx_metrics, tx)
}

/// Get the [`DurableOffset`] of this database, or `None` if this is an
Expand Down
10 changes: 2 additions & 8 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::sync::Arc;
use std::time::Duration;

use super::ast::SchemaViewer;
Expand Down Expand Up @@ -215,20 +214,15 @@ pub async fn run(
None => tx,
};

let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql);
let (tx_data, tx_metrics_mut, tx) = db.commit_tx_downgrade(tx, Workload::Sql);

let (tx_offset_send, tx_offset) = oneshot::channel();
// Release the tx on drop, so that we record metrics
// and set the transaction offset.
let mut tx = scopeguard::guard(tx, |tx| {
let (offset, tx_metrics_downgrade, reducer) = db.release_tx(tx);
let _ = tx_offset_send.send(offset);
db.report_tx_metrics(
reducer,
Some(Arc::new(tx_data)),
Some(tx_metrics_mut),
Some(tx_metrics_downgrade),
);
db.report_tx_metrics(reducer, Some(tx_data), Some(tx_metrics_mut), Some(tx_metrics_downgrade));
});

// Compute the header for the result set
Expand Down
12 changes: 5 additions & 7 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1027,9 +1027,7 @@ impl ModuleSubscriptions {
// We'll later ensure tx is released/cleaned up once out of scope.
let (read_tx, tx_data, tx_metrics_mut) = match &mut event.status {
EventStatus::Committed(db_update) => {
let Some((tx_data, tx_metrics, read_tx)) = stdb.commit_tx_downgrade(tx, Workload::Update)? else {
return Ok(Err(WriteConflict));
};
let (tx_data, tx_metrics, read_tx) = stdb.commit_tx_downgrade(tx, Workload::Update);
*db_update = DatabaseUpdate::from_writes(&tx_data);
(read_tx, tx_data, tx_metrics)
}
Expand Down Expand Up @@ -1102,7 +1100,7 @@ impl ModuleSubscriptions {
sender: Identity,
) -> Result<(TxGuard<impl FnOnce(TxId) + '_>, TransactionOffset), DBError> {
Self::_unsubscribe_views(&mut tx, view_collector, sender)?;
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
Ok(self.guard_tx(tx, opts))
}
Expand Down Expand Up @@ -1136,7 +1134,7 @@ impl ModuleSubscriptions {
.materialize_views(tx, view_collector, sender, Workload::Subscribe)
.await?
}
let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Subscribe);
let (tx_data, tx_metrics_mut, tx) = self.relational_db.commit_tx_downgrade(tx, Workload::Subscribe);
let opts = GuardTxOptions::from_mut(tx_data, tx_metrics_mut);
Ok(self.guard_tx(tx, opts))
}
Expand Down Expand Up @@ -1229,10 +1227,10 @@ impl GuardTxOptions {
}
}

fn from_mut(tx_data: TxData, tx_metrics_mut: TxMetrics) -> Self {
fn from_mut(tx_data: Arc<TxData>, tx_metrics_mut: TxMetrics) -> Self {
Self {
extra_tx_offset_sender: None,
tx_data: Some(Arc::new(tx_data)),
tx_data: Some(tx_data),
tx_metrics_mut: tx_metrics_mut.into(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/subscription/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1087,7 +1087,7 @@ mod tests {
}
}

let (data, _, tx) = tx.commit_downgrade(Workload::ForTests);
let (data, _, tx) = db.commit_tx_downgrade(tx, Workload::ForTests);
let table_id = plan.subscribed_table_id();
// This awful construction to convert `Arc<str>` into `Box<str>`.
let table_name = (&**plan.subscribed_table_name()).into();
Expand Down
12 changes: 6 additions & 6 deletions crates/datastore/src/locking_tx_datastore/datastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,8 @@ impl MutTx for Locking {
tx.rollback()
}

/// This method only updates the in-memory `committed_state`.
/// For durability, see `RelationalDB::commit_tx`.
fn commit_mut_tx(&self, tx: Self::MutTx) -> Result<Option<(TxOffset, TxData, TxMetrics, String)>> {
Ok(Some(tx.commit()))
}
Expand All @@ -960,12 +962,10 @@ impl Locking {
tx.rollback_downgrade(workload)
}

pub fn commit_mut_tx_downgrade(
&self,
tx: MutTxId,
workload: Workload,
) -> Result<Option<(TxData, TxMetrics, TxId)>> {
Ok(Some(tx.commit_downgrade(workload)))
/// This method only updates the in-memory `committed_state`.
/// For durability, see `RelationalDB::commit_tx_downgrade`.
pub fn commit_mut_tx_downgrade(&self, tx: MutTxId, workload: Workload) -> (TxData, TxMetrics, TxId) {
tx.commit_downgrade(workload)
}
}

Expand Down
12 changes: 11 additions & 1 deletion crates/datastore/src/locking_tx_datastore/mut_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,11 @@ impl MutTxId {
/// Commits this transaction in memory, applying its changes to the committed state.
/// This doesn't handle the persistence layer at all.
///
/// IMPORTANT: This method updates the in-memory state of the database but does not make it durable.
/// That is, the tx will not be persisted to the commitlog.
/// Hence you should be careful when calling this method directly.
/// In most cases you'll want to use `RelationalDB::commit_tx` which makes the tx durable.
///
/// Returns:
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
Expand Down Expand Up @@ -1947,11 +1952,16 @@ impl MutTxId {
/// The lock on the committed state is converted into a read lock,
/// and returned as a new read-only transaction.
///
/// IMPORTANT: This method updates the in-memory state of the database but does not make it durable.
/// That is, the tx will not be persisted to the commitlog.
/// Hence you should be careful when calling this method directly.
/// In most cases you'll want to use `RelationalDB::commit_tx_downgrade` which makes the tx durable.
///
/// Returns:
/// - [`TxData`], the set of inserts and deletes performed by this transaction.
/// - [`TxMetrics`], various measurements of the work performed by this transaction.
/// - [`TxId`], a read-only transaction with a shared lock on the committed state.
pub fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) {
pub(super) fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) {
let tx_data = self
.committed_state_write_lock
.merge(self.tx_state, self.read_sets, &self.ctx);
Expand Down
Loading