From 4160812582f461fd84bd91abb2eb471ac335c4be Mon Sep 17 00:00:00 2001 From: ujeenet Date: Fri, 22 May 2026 19:04:02 -0300 Subject: [PATCH] chore(executor): extract atomic + tx modules (#116 step 4) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pulls the transaction primitives out into their own files: - tx.rs (136 LOC) — `PoolTx<'a>` enum + `transaction_pool` user-facing helper. Per-backend `BEGIN` wrapper that hands the caller a backend-tagged transaction handle. - atomic.rs (178 LOC) — Django-shape `atomic(&pool, |tx| ...)` + `on_commit(callback)` + `on_commit_pending()` (issue #44). Uses a `tokio::task_local!` to queue commit-time callbacks. The `atomic!` declarative macro stays `#[macro_export]`-mounted at the crate root, co-located with the `atomic` fn for readability. mod.rs: 4139 → 3852 LOC (-287). Both new files are sibling leaves with zero cross-module dependencies beyond `super::{Pool, ExecError, Dialect}` + the public `PoolTx` enum. 1482 lib tests pass on the sqlite,tenancy litmus. --- crates/rustango/src/sql/executor/atomic.rs | 178 ++++++++++++ crates/rustango/src/sql/executor/mod.rs | 299 +-------------------- crates/rustango/src/sql/executor/tx.rs | 136 ++++++++++ 3 files changed, 320 insertions(+), 293 deletions(-) create mode 100644 crates/rustango/src/sql/executor/atomic.rs create mode 100644 crates/rustango/src/sql/executor/tx.rs diff --git a/crates/rustango/src/sql/executor/atomic.rs b/crates/rustango/src/sql/executor/atomic.rs new file mode 100644 index 0000000..0158eec --- /dev/null +++ b/crates/rustango/src/sql/executor/atomic.rs @@ -0,0 +1,178 @@ +//! `atomic()` + `on_commit()` — Django's `transaction.atomic` + +//! `transaction.on_commit`, rolled into one helper. Issue #44. +//! +//! Extracted from `executor/mod.rs` as part of #116 step 4. The +//! `atomic!` declarative-macro sugar lives at the crate root via +//! `#[macro_export]` regardless of which module declares it; we keep +//! it co-located with [`atomic`] for readability. + +use super::{transaction_pool, ExecError, PoolTx}; +use crate::sql::Pool; + +tokio::task_local! { + /// Active callback queue for the current `atomic` scope. Set by + /// [`atomic`] before running its closure; read by [`on_commit`] + /// from anywhere inside that closure's call tree. + static ON_COMMIT: std::sync::Mutex>>; +} + +/// Closure-scoped transaction with after-commit hooks. Django's +/// [`transaction.atomic`](https://docs.djangoproject.com/en/6.0/topics/db/transactions/#django.db.transaction.atomic) +/// + [`transaction.on_commit`](https://docs.djangoproject.com/en/6.0/topics/db/transactions/#performing-actions-after-commit), +/// rolled into one helper. Auto-commits when `f` returns `Ok`, +/// auto-rolls-back when `f` returns `Err`. Callbacks queued via +/// [`on_commit`] inside `f` fire **only on the commit path** — +/// never on rollback. +/// +/// Without this guarantee, side effects like "send the welcome +/// email" after an `INSERT` can leak: the email goes out, the +/// transaction rolls back, the user record never lands, the email +/// references a phantom user. +/// +/// ```ignore +/// use rustango::sql::{atomic, on_commit, insert_tx}; +/// +/// atomic(&pool, |tx| Box::pin(async move { +/// insert_tx(tx, &user_insert).await?; +/// on_commit(|| { +/// // Sync. For async work, spawn here. +/// tokio::spawn(async move { send_welcome_email(user_id).await }); +/// }); +/// Ok(()) +/// })) +/// .await?; +/// ``` +/// +/// The `Box::pin(async move { … })` wrapping is the cost of an async +/// closure that borrows `tx` mutably across `await` points on stable +/// Rust — `&mut PoolTx<'_>` is lifetime-invariant, and `Pin>` is the standard escape hatch. The [`atomic!`] macro +/// hides the ceremony if you prefer: +/// +/// ```ignore +/// rustango::atomic!(&pool, |tx| { +/// insert_tx(tx, &user_insert).await?; +/// on_commit(|| { /* … */ }); +/// Ok(()) +/// }) +/// .await?; +/// ``` +/// +/// **Inside the closure** `tx` is `&mut PoolTx<'_>` — pass directly +/// to the existing `_tx` helpers (`insert_tx` / `update_tx` / +/// `select_rows_tx_with_related` / ...). Raw `sqlx::query` chains +/// still need the per-backend `PoolTx::Postgres(...)` match (that's +/// the escape hatch); typed ORM ops dispatch internally. +/// +/// **Callbacks fire in registration order**, serially, after the +/// `COMMIT` returns OK. A panicking callback aborts the chain — +/// subsequent callbacks won't run. Wrap in `std::panic::catch_unwind` +/// if you need per-callback resilience. +/// +/// # Errors +/// Returns the first `ExecError` produced by `f`, or a driver error +/// from `BEGIN` / `COMMIT` / `ROLLBACK`. +pub async fn atomic(pool: &Pool, f: F) -> Result +where + F: for<'tx> FnOnce( + &'tx mut PoolTx<'_>, + ) -> std::pin::Pin< + Box> + Send + 'tx>, + >, +{ + let queue = std::sync::Mutex::new(Vec::>::new()); + ON_COMMIT + .scope(queue, async move { + let mut tx = transaction_pool(pool).await?; + match f(&mut tx).await { + Ok(val) => { + tx.commit().await?; + // Drain queue + fire callbacks in registration order. + let callbacks = ON_COMMIT + .with(|q| std::mem::take(&mut *q.lock().expect("on_commit mutex"))); + for cb in callbacks { + cb(); + } + Ok(val) + } + Err(e) => { + // Callbacks drop here when the task-local scope ends. + let _ = tx.rollback().await; + Err(e) + } + } + }) + .await +} + +/// Sugar over [`atomic`] that wraps the body in `Box::pin(async move { … })` +/// so callers don't have to. Identical semantics: +/// +/// ```ignore +/// rustango::atomic!(&pool, |tx| { +/// insert_tx(tx, &q).await?; +/// on_commit(|| spawn_email()); +/// Ok(()) +/// }) +/// .await?; +/// ``` +#[macro_export] +macro_rules! atomic { + ($pool:expr, |$tx:ident| $body:block) => { + async { + // Clone the pool into a local so it stays alive for the + // full future, even when nested inside an outer `async + // move` block (which would otherwise try to move the + // caller's `pool` binding through this scope). `Pool` is + // cheap-clone (Arc-based) so this is a zero-cost + // ergonomic shim. + let __rustango_atomic_pool = ::core::clone::Clone::clone($pool); + $crate::sql::atomic(&__rustango_atomic_pool, |$tx| { + ::std::boxed::Box::pin(async move { $body }) + }) + .await + } + }; +} + +/// Queue `f` to run after the enclosing [`atomic`] block commits. If +/// the transaction rolls back instead, `f` is dropped unfired. +/// +/// `f` is sync (`FnOnce() + Send + 'static`). For async work, spawn +/// from inside: +/// +/// ```ignore +/// on_commit(|| { +/// tokio::spawn(async move { send_email().await }); +/// }); +/// ``` +/// +/// Calling `on_commit` **outside** an `atomic` scope is a programmer +/// error and panics with a clear message — flash-fail beats silently +/// dropping the callback into the void. +pub fn on_commit(f: F) +where + F: FnOnce() + Send + 'static, +{ + ON_COMMIT + .try_with(|q| { + q.lock().expect("on_commit mutex").push(Box::new(f)); + }) + .unwrap_or_else(|_| { + panic!( + "rustango::sql::on_commit called outside an `atomic` block — \ + the callback would never fire. Wrap the caller in \ + `atomic(&pool, |tx| async move {{ ... on_commit(...) ... }})`." + ); + }); +} + +/// Returns the number of callbacks queued in the current `atomic` +/// scope. Useful for tests. Returns 0 when called outside an +/// `atomic` block. +#[must_use] +pub fn on_commit_pending() -> usize { + ON_COMMIT + .try_with(|q| q.lock().expect("on_commit mutex").len()) + .unwrap_or(0) +} diff --git a/crates/rustango/src/sql/executor/mod.rs b/crates/rustango/src/sql/executor/mod.rs index ae18f46..7c62928 100644 --- a/crates/rustango/src/sql/executor/mod.rs +++ b/crates/rustango/src/sql/executor/mod.rs @@ -1822,306 +1822,19 @@ where Ok(out) } -// ------------------------------------------------------------------ transaction - // ==================================================================== -// `transaction_pool` — user-facing bi-dialect transaction helper +// `transaction_pool` + PoolTx — extracted to tx.rs (#116 step 4) // ==================================================================== -// -// Wraps a closure in a per-backend `BEGIN`/`COMMIT`/`ROLLBACK`. Same -// shape as the existing PgPool [`transaction`] helper, but the -// closure receives a backend-tagged enum the caller pattern-matches -// to get a typed connection. sqlx's `Transaction` is generic over -// backend, so we can't hand callers a single "any-DB" transaction -// handle without erasing the bind types — exposing the per-arm enum -// keeps users in control of which backend they're talking to and -// avoids surprise driver mismatches. - -/// A transaction handle borrowed from one of [`Pool`]'s backend arms. -/// Yielded by [`transaction_pool`]'s closure so callers run their -/// queries against the right driver-typed connection. -/// -/// In practice users `match` on the variant and call sqlx-style -/// `.execute(&mut **tx)` against the inner connection. Mixing the -/// two arms in one closure body is fine — Rust ensures the closure -/// runs in exactly one arm per pool variant. -pub enum PoolTx<'a> { - #[cfg(feature = "postgres")] - Postgres(sqlx::Transaction<'a, sqlx::Postgres>), - #[cfg(feature = "mysql")] - Mysql(sqlx::Transaction<'a, sqlx::MySql>), - #[cfg(feature = "sqlite")] - Sqlite(sqlx::Transaction<'a, sqlx::Sqlite>), -} - -impl<'a> PoolTx<'a> { - /// Commit this transaction. Consumes the wrapper. - /// - /// # Errors - /// `sqlx::Error` from the underlying `COMMIT`. - pub async fn commit(self) -> Result<(), sqlx::Error> { - match self { - #[cfg(feature = "postgres")] - PoolTx::Postgres(tx) => tx.commit().await, - #[cfg(feature = "mysql")] - PoolTx::Mysql(tx) => tx.commit().await, - #[cfg(feature = "sqlite")] - PoolTx::Sqlite(tx) => tx.commit().await, - } - } - - /// Roll back this transaction. Consumes the wrapper. Best-effort — - /// drop semantics auto-roll-back too if the caller forgets to - /// invoke this explicitly. - /// - /// # Errors - /// `sqlx::Error` from the underlying `ROLLBACK`. - pub async fn rollback(self) -> Result<(), sqlx::Error> { - match self { - #[cfg(feature = "postgres")] - PoolTx::Postgres(tx) => tx.rollback().await, - #[cfg(feature = "mysql")] - PoolTx::Mysql(tx) => tx.rollback().await, - #[cfg(feature = "sqlite")] - PoolTx::Sqlite(tx) => tx.rollback().await, - } - } - - /// Return the dialect for this transaction's backend — same - /// dispatch as [`super::Pool::dialect`] but sourced from the - /// `PoolTx` variant rather than the pool. Used internally by the - /// `_tx` executor helpers to compile SQL against the right backend. - #[must_use] - pub fn dialect(&self) -> &'static dyn Dialect { - match self { - #[cfg(feature = "postgres")] - PoolTx::Postgres(_) => super::postgres::DIALECT, - #[cfg(feature = "mysql")] - PoolTx::Mysql(_) => super::mysql::DIALECT, - #[cfg(feature = "sqlite")] - PoolTx::Sqlite(_) => super::sqlite::DIALECT, - } - } -} -/// Open a transaction against either backend. Bi-dialect counterpart -/// of `pool.begin().await?`. Caller owns the returned [`PoolTx`] and -/// must call `.commit().await?` (or `.rollback().await?`) before -/// dropping; otherwise sqlx auto-rolls-back on drop. -/// -/// Most user code wants the macro-generated `delete_pool` / -/// `save_pool` / `insert_pool` instead — those already wrap each -/// per-row write in a transaction. `transaction_pool` is for -/// cross-row / cross-table atomicity: -/// -/// ```ignore -/// let mut tx = rustango::sql::transaction_pool(&pool).await?; -/// match &mut tx { -/// #[cfg(feature = "postgres")] -/// rustango::sql::PoolTx::Postgres(t) => { -/// sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2") -/// .bind(amount).bind(from).execute(&mut **t).await?; -/// } -/// #[cfg(feature = "mysql")] -/// rustango::sql::PoolTx::Mysql(t) => { -/// sqlx::query("UPDATE accounts SET balance = balance - ? WHERE id = ?") -/// .bind(amount).bind(from).execute(&mut **t).await?; -/// } -/// } -/// tx.commit().await?; -/// ``` -/// -/// The match-on-variant ceremony stays explicit because sqlx's -/// `Transaction` is generic over backend — there's no -/// driver-erased connection type to hand callers without losing -/// bind-side type checking. A future batch may add a `tx_pool!` -/// macro to abstract the match for the common per-backend-mirror -/// pattern. -/// -/// # Errors -/// Driver errors from `BEGIN`. -pub async fn transaction_pool(pool: &Pool) -> Result, ExecError> { - match pool { - #[cfg(feature = "postgres")] - Pool::Postgres(pg) => Ok(PoolTx::Postgres(pg.begin().await?)), - #[cfg(feature = "mysql")] - Pool::Mysql(my) => Ok(PoolTx::Mysql(my.begin().await?)), - #[cfg(feature = "sqlite")] - Pool::Sqlite(sq) => Ok(PoolTx::Sqlite(sq.begin().await?)), - } -} +mod tx; +pub use tx::{transaction_pool, PoolTx}; // ==================================================================== -// atomic() + on_commit() — Django transaction.atomic + on_commit (#44) +// atomic() + on_commit() — extracted to atomic.rs (#116 step 4) // ==================================================================== -tokio::task_local! { - /// Active callback queue for the current `atomic` scope. Set by - /// [`atomic`] before running its closure; read by [`on_commit`] - /// from anywhere inside that closure's call tree. - static ON_COMMIT: std::sync::Mutex>>; -} - -/// Closure-scoped transaction with after-commit hooks. Django's -/// [`transaction.atomic`](https://docs.djangoproject.com/en/6.0/topics/db/transactions/#django.db.transaction.atomic) -/// + [`transaction.on_commit`](https://docs.djangoproject.com/en/6.0/topics/db/transactions/#performing-actions-after-commit), -/// rolled into one helper. Auto-commits when `f` returns `Ok`, -/// auto-rolls-back when `f` returns `Err`. Callbacks queued via -/// [`on_commit`] inside `f` fire **only on the commit path** — -/// never on rollback. -/// -/// Without this guarantee, side effects like "send the welcome -/// email" after an `INSERT` can leak: the email goes out, the -/// transaction rolls back, the user record never lands, the email -/// references a phantom user. -/// -/// ```ignore -/// use rustango::sql::{atomic, on_commit, insert_tx}; -/// -/// atomic(&pool, |tx| Box::pin(async move { -/// insert_tx(tx, &user_insert).await?; -/// on_commit(|| { -/// // Sync. For async work, spawn here. -/// tokio::spawn(async move { send_welcome_email(user_id).await }); -/// }); -/// Ok(()) -/// })) -/// .await?; -/// ``` -/// -/// The `Box::pin(async move { … })` wrapping is the cost of an async -/// closure that borrows `tx` mutably across `await` points on stable -/// Rust — `&mut PoolTx<'_>` is lifetime-invariant, and `Pin>` is the standard escape hatch. The [`atomic!`] macro -/// hides the ceremony if you prefer: -/// -/// ```ignore -/// rustango::atomic!(&pool, |tx| { -/// insert_tx(tx, &user_insert).await?; -/// on_commit(|| { /* … */ }); -/// Ok(()) -/// }) -/// .await?; -/// ``` -/// -/// **Inside the closure** `tx` is `&mut PoolTx<'_>` — pass directly -/// to the existing `_tx` helpers (`insert_tx` / `update_tx` / -/// `select_rows_tx_with_related` / ...). Raw `sqlx::query` chains -/// still need the per-backend `PoolTx::Postgres(...)` match (that's -/// the escape hatch); typed ORM ops dispatch internally. -/// -/// **Callbacks fire in registration order**, serially, after the -/// `COMMIT` returns OK. A panicking callback aborts the chain — -/// subsequent callbacks won't run. Wrap in `std::panic::catch_unwind` -/// if you need per-callback resilience. -/// -/// # Errors -/// Returns the first `ExecError` produced by `f`, or a driver error -/// from `BEGIN` / `COMMIT` / `ROLLBACK`. -pub async fn atomic(pool: &Pool, f: F) -> Result -where - F: for<'tx> FnOnce( - &'tx mut PoolTx<'_>, - ) -> std::pin::Pin< - Box> + Send + 'tx>, - >, -{ - let queue = std::sync::Mutex::new(Vec::>::new()); - ON_COMMIT - .scope(queue, async move { - let mut tx = transaction_pool(pool).await?; - match f(&mut tx).await { - Ok(val) => { - tx.commit().await?; - // Drain queue + fire callbacks in registration order. - let callbacks = ON_COMMIT - .with(|q| std::mem::take(&mut *q.lock().expect("on_commit mutex"))); - for cb in callbacks { - cb(); - } - Ok(val) - } - Err(e) => { - // Callbacks drop here when the task-local scope ends. - let _ = tx.rollback().await; - Err(e) - } - } - }) - .await -} - -/// Sugar over [`atomic`] that wraps the body in `Box::pin(async move { … })` -/// so callers don't have to. Identical semantics: -/// -/// ```ignore -/// rustango::atomic!(&pool, |tx| { -/// insert_tx(tx, &q).await?; -/// on_commit(|| spawn_email()); -/// Ok(()) -/// }) -/// .await?; -/// ``` -#[macro_export] -macro_rules! atomic { - ($pool:expr, |$tx:ident| $body:block) => { - async { - // Clone the pool into a local so it stays alive for the - // full future, even when nested inside an outer `async - // move` block (which would otherwise try to move the - // caller's `pool` binding through this scope). `Pool` is - // cheap-clone (Arc-based) so this is a zero-cost - // ergonomic shim. - let __rustango_atomic_pool = ::core::clone::Clone::clone($pool); - $crate::sql::atomic(&__rustango_atomic_pool, |$tx| { - ::std::boxed::Box::pin(async move { $body }) - }) - .await - } - }; -} - -/// Queue `f` to run after the enclosing [`atomic`] block commits. If -/// the transaction rolls back instead, `f` is dropped unfired. -/// -/// `f` is sync (`FnOnce() + Send + 'static`). For async work, spawn -/// from inside: -/// -/// ```ignore -/// on_commit(|| { -/// tokio::spawn(async move { send_email().await }); -/// }); -/// ``` -/// -/// Calling `on_commit` **outside** an `atomic` scope is a programmer -/// error and panics with a clear message — flash-fail beats silently -/// dropping the callback into the void. -pub fn on_commit(f: F) -where - F: FnOnce() + Send + 'static, -{ - ON_COMMIT - .try_with(|q| { - q.lock().expect("on_commit mutex").push(Box::new(f)); - }) - .unwrap_or_else(|_| { - panic!( - "rustango::sql::on_commit called outside an `atomic` block — \ - the callback would never fire. Wrap the caller in \ - `atomic(&pool, |tx| async move {{ ... on_commit(...) ... }})`." - ); - }); -} - -/// Returns the number of callbacks queued in the current `atomic` -/// scope. Useful for tests. Returns 0 when called outside an -/// `atomic` block. -#[must_use] -pub fn on_commit_pending() -> usize { - ON_COMMIT - .try_with(|q| q.lock().expect("on_commit mutex").len()) - .unwrap_or(0) -} +mod atomic; +pub use atomic::{atomic, on_commit, on_commit_pending}; // ==================================================================== // `&Pool` dispatch — bi-dialect executor surface (v0.23.0-batch5) diff --git a/crates/rustango/src/sql/executor/tx.rs b/crates/rustango/src/sql/executor/tx.rs new file mode 100644 index 0000000..77c4621 --- /dev/null +++ b/crates/rustango/src/sql/executor/tx.rs @@ -0,0 +1,136 @@ +//! `PoolTx` enum + `transaction_pool` user-facing helper. +//! +//! Extracted from `executor/mod.rs` as part of #116 step 4. The per- +//! backend `*_tx` operations (`insert_tx`, `update_tx`, …) and +//! `atomic()` consume `PoolTx`; they still live in `mod.rs` for now +//! and will move into `pool/tx`-shaped modules in a later step. + +use super::{Dialect, ExecError}; +use crate::sql::Pool; + +// ==================================================================== +// `transaction_pool` — user-facing bi-dialect transaction helper +// ==================================================================== +// +// Wraps a closure in a per-backend `BEGIN`/`COMMIT`/`ROLLBACK`. Same +// shape as the existing PgPool [`transaction`] helper, but the +// closure receives a backend-tagged enum the caller pattern-matches +// to get a typed connection. sqlx's `Transaction` is generic over +// backend, so we can't hand callers a single "any-DB" transaction +// handle without erasing the bind types — exposing the per-arm enum +// keeps users in control of which backend they're talking to and +// avoids surprise driver mismatches. + +/// A transaction handle borrowed from one of [`Pool`]'s backend arms. +/// Yielded by [`transaction_pool`]'s closure so callers run their +/// queries against the right driver-typed connection. +/// +/// In practice users `match` on the variant and call sqlx-style +/// `.execute(&mut **tx)` against the inner connection. Mixing the +/// two arms in one closure body is fine — Rust ensures the closure +/// runs in exactly one arm per pool variant. +pub enum PoolTx<'a> { + #[cfg(feature = "postgres")] + Postgres(sqlx::Transaction<'a, sqlx::Postgres>), + #[cfg(feature = "mysql")] + Mysql(sqlx::Transaction<'a, sqlx::MySql>), + #[cfg(feature = "sqlite")] + Sqlite(sqlx::Transaction<'a, sqlx::Sqlite>), +} + +impl<'a> PoolTx<'a> { + /// Commit this transaction. Consumes the wrapper. + /// + /// # Errors + /// `sqlx::Error` from the underlying `COMMIT`. + pub async fn commit(self) -> Result<(), sqlx::Error> { + match self { + #[cfg(feature = "postgres")] + PoolTx::Postgres(tx) => tx.commit().await, + #[cfg(feature = "mysql")] + PoolTx::Mysql(tx) => tx.commit().await, + #[cfg(feature = "sqlite")] + PoolTx::Sqlite(tx) => tx.commit().await, + } + } + + /// Roll back this transaction. Consumes the wrapper. Best-effort — + /// drop semantics auto-roll-back too if the caller forgets to + /// invoke this explicitly. + /// + /// # Errors + /// `sqlx::Error` from the underlying `ROLLBACK`. + pub async fn rollback(self) -> Result<(), sqlx::Error> { + match self { + #[cfg(feature = "postgres")] + PoolTx::Postgres(tx) => tx.rollback().await, + #[cfg(feature = "mysql")] + PoolTx::Mysql(tx) => tx.rollback().await, + #[cfg(feature = "sqlite")] + PoolTx::Sqlite(tx) => tx.rollback().await, + } + } + + /// Return the dialect for this transaction's backend — same + /// dispatch as [`crate::sql::Pool::dialect`] but sourced from the + /// `PoolTx` variant rather than the pool. Used internally by the + /// `_tx` executor helpers to compile SQL against the right backend. + #[must_use] + pub fn dialect(&self) -> &'static dyn Dialect { + match self { + #[cfg(feature = "postgres")] + PoolTx::Postgres(_) => crate::sql::postgres::DIALECT, + #[cfg(feature = "mysql")] + PoolTx::Mysql(_) => crate::sql::mysql::DIALECT, + #[cfg(feature = "sqlite")] + PoolTx::Sqlite(_) => crate::sql::sqlite::DIALECT, + } + } +} + +/// Open a transaction against either backend. Bi-dialect counterpart +/// of `pool.begin().await?`. Caller owns the returned [`PoolTx`] and +/// must call `.commit().await?` (or `.rollback().await?`) before +/// dropping; otherwise sqlx auto-rolls-back on drop. +/// +/// Most user code wants the macro-generated `delete_pool` / +/// `save_pool` / `insert_pool` instead — those already wrap each +/// per-row write in a transaction. `transaction_pool` is for +/// cross-row / cross-table atomicity: +/// +/// ```ignore +/// let mut tx = rustango::sql::transaction_pool(&pool).await?; +/// match &mut tx { +/// #[cfg(feature = "postgres")] +/// rustango::sql::PoolTx::Postgres(t) => { +/// sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE id = $2") +/// .bind(amount).bind(from).execute(&mut **t).await?; +/// } +/// #[cfg(feature = "mysql")] +/// rustango::sql::PoolTx::Mysql(t) => { +/// sqlx::query("UPDATE accounts SET balance = balance - ? WHERE id = ?") +/// .bind(amount).bind(from).execute(&mut **t).await?; +/// } +/// } +/// tx.commit().await?; +/// ``` +/// +/// The match-on-variant ceremony stays explicit because sqlx's +/// `Transaction` is generic over backend — there's no +/// driver-erased connection type to hand callers without losing +/// bind-side type checking. A future batch may add a `tx_pool!` +/// macro to abstract the match for the common per-backend-mirror +/// pattern. +/// +/// # Errors +/// Driver errors from `BEGIN`. +pub async fn transaction_pool(pool: &Pool) -> Result, ExecError> { + match pool { + #[cfg(feature = "postgres")] + Pool::Postgres(pg) => Ok(PoolTx::Postgres(pg.begin().await?)), + #[cfg(feature = "mysql")] + Pool::Mysql(my) => Ok(PoolTx::Mysql(my.begin().await?)), + #[cfg(feature = "sqlite")] + Pool::Sqlite(sq) => Ok(PoolTx::Sqlite(sq.begin().await?)), + } +}