From 6e73d879ec91d9be89767a989a07837e5b5a05b2 Mon Sep 17 00:00:00 2001 From: Zenas Date: Sat, 1 Nov 2025 16:18:06 +0400 Subject: [PATCH 1/2] feat(sqlite): make thread_stack_size optional with safe defaults Changes `thread_stack_size` from `usize` to `Option` to address concerns about safety and platform compatibility. Key improvements: - Default to `None`, using Rust std's default stack size (typically 2MB) - Only apply custom stack size when explicitly configured - Safer for user callbacks with unpredictable stack requirements - Platform-agnostic (handles 32-bit vs 64-bit differences automatically) - Marked as an advanced option in documentation with appropriate warnings This addresses the feedback from PR #3885 about hardcoded stack sizes being unsafe due to: 1. Unpredictable stack needs of user-supplied callbacks 2. Platform-specific requirements (32-bit vs 64-bit) 3. Need for conservative defaults Related: #3885 --- sqlx-sqlite/src/connection/establish.rs | 2 ++ sqlx-sqlite/src/connection/worker.rs | 11 +++++-- sqlx-sqlite/src/options/mod.rs | 38 +++++++++++++++++++++++++ 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/sqlx-sqlite/src/connection/establish.rs b/sqlx-sqlite/src/connection/establish.rs index d811275409..069547c0c7 100644 --- a/sqlx-sqlite/src/connection/establish.rs +++ b/sqlx-sqlite/src/connection/establish.rs @@ -29,6 +29,7 @@ pub struct EstablishParams { busy_timeout: Duration, statement_cache_capacity: usize, log_settings: LogSettings, + pub(crate) thread_stack_size: Option, #[cfg(feature = "load-extension")] extensions: IndexMap>, pub(crate) thread_name: String, @@ -142,6 +143,7 @@ impl EstablishParams { busy_timeout: options.busy_timeout, statement_cache_capacity: options.statement_cache_capacity, log_settings: options.log_settings.clone(), + thread_stack_size: options.thread_stack_size, #[cfg(feature = "load-extension")] extensions, thread_name: (options.thread_name)(thread_id as u64), diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 11c8778cc9..1e9a98d225 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -106,9 +106,14 @@ impl ConnectionWorker { pub(crate) async fn establish(params: EstablishParams) -> Result { let (establish_tx, establish_rx) = oneshot::channel(); - thread::Builder::new() - .name(params.thread_name.clone()) - .spawn(move || { + let mut builder = thread::Builder::new().name(params.thread_name.clone()); + + // Only set a custom stack size if explicitly configured + if let Some(stack_size) = params.thread_stack_size { + builder = builder.stack_size(stack_size); + } + + builder.spawn(move || { let (command_tx, command_rx) = flume::bounded(params.command_channel_size); let conn = match params.establish() { diff --git a/sqlx-sqlite/src/options/mod.rs b/sqlx-sqlite/src/options/mod.rs index b2849f243c..ffe871eeea 100644 --- a/sqlx-sqlite/src/options/mod.rs +++ b/sqlx-sqlite/src/options/mod.rs @@ -70,6 +70,7 @@ pub struct SqliteConnectOptions { pub(crate) log_settings: LogSettings, pub(crate) immutable: bool, pub(crate) vfs: Option>, + pub(crate) thread_stack_size: Option, pub(crate) pragmas: IndexMap, Option>>, @@ -204,6 +205,7 @@ impl SqliteConnectOptions { log_settings: Default::default(), immutable: false, vfs: None, + thread_stack_size: None, pragmas, #[cfg(feature = "load-extension")] extensions: Default::default(), @@ -233,6 +235,42 @@ impl SqliteConnectOptions { &self.filename } + /// Set the thread stack size in bytes for the SQLite worker thread. + /// + /// **This is an advanced option.** By default (`None`), SQLx uses the Rust standard library's + /// default stack size (typically 2 MB), which is safe for most use cases including user-supplied + /// callbacks and platform-specific requirements. + /// + /// Only set this if you have a specific reason to do so, such as running in an embedded environment + /// with constrained memory. Be aware that: + /// - User-supplied callbacks (hooks, custom functions) run on this thread and may have unpredictable + /// stack requirements + /// - Different platforms (32-bit vs 64-bit) have different stack size requirements + /// - Setting this too low may cause stack overflow crashes + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx_sqlite::SqliteConnectOptions; + /// # use std::str::FromStr; + /// # fn example() -> Result<(), Box> { + /// let options = SqliteConnectOptions::from_str("sqlite::memory:")? + /// .thread_stack_size(1024 * 1024); // 1 MB - use with caution! + /// # Ok(()) + /// # } + /// ``` + pub fn thread_stack_size(mut self, size: usize) -> Self { + self.thread_stack_size = Some(size); + self + } + + /// Get the current thread stack size in bytes. + /// + /// Returns `None` if using the default stack size from the Rust standard library. + pub fn get_thread_stack_size(&self) -> Option { + self.thread_stack_size + } + /// Set the enforcement of [foreign key constraints](https://www.sqlite.org/pragma.html#pragma_foreign_keys). /// /// SQLx chooses to enable this by default so that foreign keys function as expected, From b732790b956d9ab6dfee2d407bdae3dde62ad9c1 Mon Sep 17 00:00:00 2001 From: Zenas Date: Thu, 27 Nov 2025 17:10:47 +0400 Subject: [PATCH 2/2] chore: run `cargo fmt` --- sqlx-sqlite/src/connection/worker.rs | 422 ++++++++++++++------------- 1 file changed, 212 insertions(+), 210 deletions(-) diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 1e9a98d225..253df4c27c 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -114,240 +114,242 @@ impl ConnectionWorker { } builder.spawn(move || { - let (command_tx, command_rx) = flume::bounded(params.command_channel_size); + let (command_tx, command_rx) = flume::bounded(params.command_channel_size); - let conn = match params.establish() { - Ok(conn) => conn, - Err(e) => { - establish_tx.send(Err(e)).ok(); - return; - } - }; - - let shared = Arc::new(WorkerSharedState { - transaction_depth: AtomicUsize::new(0), - cached_statements_size: AtomicUsize::new(0), - // note: must be fair because in `Command::UnlockDb` we unlock the mutex - // and then immediately try to relock it; an unfair mutex would immediately - // grant us the lock even if another task is waiting. - conn: Mutex::new(conn, true), - }); - let mut conn = shared.conn.try_lock().unwrap(); - - if establish_tx - .send(Ok(Self { - command_tx, - shared: Arc::clone(&shared), - })) - .is_err() - { + let conn = match params.establish() { + Ok(conn) => conn, + Err(e) => { + establish_tx.send(Err(e)).ok(); return; } + }; + + let shared = Arc::new(WorkerSharedState { + transaction_depth: AtomicUsize::new(0), + cached_statements_size: AtomicUsize::new(0), + // note: must be fair because in `Command::UnlockDb` we unlock the mutex + // and then immediately try to relock it; an unfair mutex would immediately + // grant us the lock even if another task is waiting. + conn: Mutex::new(conn, true), + }); + let mut conn = shared.conn.try_lock().unwrap(); + + if establish_tx + .send(Ok(Self { + command_tx, + shared: Arc::clone(&shared), + })) + .is_err() + { + return; + } + + // If COMMIT or ROLLBACK is processed but not acknowledged, there would be another + // ROLLBACK sent when the `Transaction` drops. We need to ignore it otherwise we + // would rollback an already completed transaction. + let mut ignore_next_start_rollback = false; + + for (cmd, span) in command_rx { + let _guard = span.enter(); + match cmd { + Command::Prepare { query, tx } => { + tx.send(prepare(&mut conn, query)).ok(); + + // This may issue an unnecessary write on failure, + // but it doesn't matter in the grand scheme of things. + update_cached_statements_size(&conn, &shared.cached_statements_size); + } + Command::Describe { query, tx } => { + tx.send(describe(&mut conn, query)).ok(); + } + Command::Execute { + query, + arguments, + persistent, + tx, + limit, + } => { + let iter = match execute::iter(&mut conn, query, arguments, persistent) { + Ok(iter) => iter, + Err(e) => { + tx.send(Err(e)).ok(); + continue; + } + }; - // If COMMIT or ROLLBACK is processed but not acknowledged, there would be another - // ROLLBACK sent when the `Transaction` drops. We need to ignore it otherwise we - // would rollback an already completed transaction. - let mut ignore_next_start_rollback = false; - - for (cmd, span) in command_rx { - let _guard = span.enter(); - match cmd { - Command::Prepare { query, tx } => { - tx.send(prepare(&mut conn, query)).ok(); - - // This may issue an unnecessary write on failure, - // but it doesn't matter in the grand scheme of things. - update_cached_statements_size( - &conn, - &shared.cached_statements_size, - ); - } - Command::Describe { query, tx } => { - tx.send(describe(&mut conn, query)).ok(); - } - Command::Execute { - query, - arguments, - persistent, - tx, - limit - } => { - let iter = match execute::iter(&mut conn, query, arguments, persistent) - { - Ok(iter) => iter, - Err(e) => { - tx.send(Err(e)).ok(); - continue; - } - }; - - match limit { - None => { - for res in iter { - let has_error = res.is_err(); - if tx.send(res).is_err() || has_error { - break; - } + match limit { + None => { + for res in iter { + let has_error = res.is_err(); + if tx.send(res).is_err() || has_error { + break; } - }, - Some(limit) => { - let mut iter = iter; - let mut rows_returned = 0; - - while let Some(res) = iter.next() { - if let Ok(ok) = &res { - if ok.is_right() { - rows_returned += 1; - if rows_returned >= limit { - drop(iter); - let _ = tx.send(res); - break; - } + } + } + Some(limit) => { + let mut iter = iter; + let mut rows_returned = 0; + + while let Some(res) = iter.next() { + if let Ok(ok) = &res { + if ok.is_right() { + rows_returned += 1; + if rows_returned >= limit { + drop(iter); + let _ = tx.send(res); + break; } } - let has_error = res.is_err(); - if tx.send(res).is_err() || has_error { - break; - } } - }, - } - - update_cached_statements_size(&conn, &shared.cached_statements_size); - } - Command::Begin { tx, statement } => { - let depth = shared.transaction_depth.load(Ordering::Acquire); - - let is_custom_statement = statement.is_some(); - let statement = match statement { - // custom `BEGIN` statements are not allowed if - // we're already in a transaction (we need to - // issue a `SAVEPOINT` instead) - Some(_) if depth > 0 => { - if tx.blocking_send(Err(Error::InvalidSavePointStatement)).is_err() { + let has_error = res.is_err(); + if tx.send(res).is_err() || has_error { break; } - continue; - }, - Some(statement) => statement, - None => begin_ansi_transaction_sql(depth), - }; - let res = - conn.handle - .exec(statement.as_str()) - .and_then(|res| { - if is_custom_statement && !conn.handle.in_transaction() { - return Err(Error::BeginFailed) - } - - shared.transaction_depth.fetch_add(1, Ordering::Release); - - Ok(res) - }); - let res_ok = res.is_ok(); - - if tx.blocking_send(res).is_err() && res_ok { - // The BEGIN was processed but not acknowledged. This means no - // `Transaction` was created and so there is no way to commit / - // rollback this transaction. We need to roll it back - // immediately otherwise it would remain started forever. - if let Err(error) = conn - .handle - .exec(rollback_ansi_transaction_sql(depth + 1).as_str()) - .map(|_| { - shared.transaction_depth.fetch_sub(1, Ordering::Release); - }) - { - // The rollback failed. To prevent leaving the connection - // in an inconsistent state we shutdown this worker which - // causes any subsequent operation on the connection to fail. - tracing::error!(%error, "failed to rollback cancelled transaction"); - break; } } } - Command::Commit { tx } => { - let depth = shared.transaction_depth.load(Ordering::Acquire); - - let res = if depth > 0 { - conn.handle - .exec(commit_ansi_transaction_sql(depth).as_str()) - .map(|_| { - shared.transaction_depth.fetch_sub(1, Ordering::Release); - }) - } else { - Ok(()) - }; - let res_ok = res.is_ok(); - if tx.blocking_send(res).is_err() && res_ok { - // The COMMIT was processed but not acknowledged. This means that - // the `Transaction` doesn't know it was committed and will try to - // rollback on drop. We need to ignore that rollback. - ignore_next_start_rollback = true; - } - } - Command::Rollback { tx } => { - if ignore_next_start_rollback && tx.is_none() { - ignore_next_start_rollback = false; + update_cached_statements_size(&conn, &shared.cached_statements_size); + } + Command::Begin { tx, statement } => { + let depth = shared.transaction_depth.load(Ordering::Acquire); + + let is_custom_statement = statement.is_some(); + let statement = match statement { + // custom `BEGIN` statements are not allowed if + // we're already in a transaction (we need to + // issue a `SAVEPOINT` instead) + Some(_) if depth > 0 => { + if tx + .blocking_send(Err(Error::InvalidSavePointStatement)) + .is_err() + { + break; + } continue; } + Some(statement) => statement, + None => begin_ansi_transaction_sql(depth), + }; + let res = conn.handle.exec(statement.as_str()).and_then(|res| { + if is_custom_statement && !conn.handle.in_transaction() { + return Err(Error::BeginFailed); + } - let depth = shared.transaction_depth.load(Ordering::Acquire); - - let res = if depth > 0 { - conn.handle - .exec(rollback_ansi_transaction_sql(depth).as_str()) - .map(|_| { - shared.transaction_depth.fetch_sub(1, Ordering::Release); - }) - } else { - Ok(()) - }; - - let res_ok = res.is_ok(); - - if let Some(tx) = tx { - if tx.blocking_send(res).is_err() && res_ok { - // The ROLLBACK was processed but not acknowledged. This means - // that the `Transaction` doesn't know it was rolled back and - // will try to rollback again on drop. We need to ignore that - // rollback. - ignore_next_start_rollback = true; - } + shared.transaction_depth.fetch_add(1, Ordering::Release); + + Ok(res) + }); + let res_ok = res.is_ok(); + + if tx.blocking_send(res).is_err() && res_ok { + // The BEGIN was processed but not acknowledged. This means no + // `Transaction` was created and so there is no way to commit / + // rollback this transaction. We need to roll it back + // immediately otherwise it would remain started forever. + if let Err(error) = conn + .handle + .exec(rollback_ansi_transaction_sql(depth + 1).as_str()) + .map(|_| { + shared.transaction_depth.fetch_sub(1, Ordering::Release); + }) + { + // The rollback failed. To prevent leaving the connection + // in an inconsistent state we shutdown this worker which + // causes any subsequent operation on the connection to fail. + tracing::error!(%error, "failed to rollback cancelled transaction"); + break; } } - #[cfg(feature = "deserialize")] - Command::Serialize { schema, tx } => { - tx.send(serialize(&mut conn, schema)).ok(); - } - #[cfg(feature = "deserialize")] - Command::Deserialize { schema, data, read_only, tx } => { - tx.send(deserialize(&mut conn, schema, data, read_only)).ok(); - } - Command::ClearCache { tx } => { - conn.statements.clear(); - update_cached_statements_size(&conn, &shared.cached_statements_size); - tx.send(()).ok(); - } - Command::UnlockDb => { - drop(conn); - conn = futures_executor::block_on(shared.conn.lock()); + } + Command::Commit { tx } => { + let depth = shared.transaction_depth.load(Ordering::Acquire); + + let res = if depth > 0 { + conn.handle + .exec(commit_ansi_transaction_sql(depth).as_str()) + .map(|_| { + shared.transaction_depth.fetch_sub(1, Ordering::Release); + }) + } else { + Ok(()) + }; + let res_ok = res.is_ok(); + + if tx.blocking_send(res).is_err() && res_ok { + // The COMMIT was processed but not acknowledged. This means that + // the `Transaction` doesn't know it was committed and will try to + // rollback on drop. We need to ignore that rollback. + ignore_next_start_rollback = true; } - Command::Ping { tx } => { - tx.send(()).ok(); + } + Command::Rollback { tx } => { + if ignore_next_start_rollback && tx.is_none() { + ignore_next_start_rollback = false; + continue; } - Command::Shutdown { tx } => { - // drop the connection references before sending confirmation - // and ending the command loop - drop(conn); - drop(shared); - let _ = tx.send(()); - return; + + let depth = shared.transaction_depth.load(Ordering::Acquire); + + let res = if depth > 0 { + conn.handle + .exec(rollback_ansi_transaction_sql(depth).as_str()) + .map(|_| { + shared.transaction_depth.fetch_sub(1, Ordering::Release); + }) + } else { + Ok(()) + }; + + let res_ok = res.is_ok(); + + if let Some(tx) = tx { + if tx.blocking_send(res).is_err() && res_ok { + // The ROLLBACK was processed but not acknowledged. This means + // that the `Transaction` doesn't know it was rolled back and + // will try to rollback again on drop. We need to ignore that + // rollback. + ignore_next_start_rollback = true; + } } } + #[cfg(feature = "deserialize")] + Command::Serialize { schema, tx } => { + tx.send(serialize(&mut conn, schema)).ok(); + } + #[cfg(feature = "deserialize")] + Command::Deserialize { + schema, + data, + read_only, + tx, + } => { + tx.send(deserialize(&mut conn, schema, data, read_only)) + .ok(); + } + Command::ClearCache { tx } => { + conn.statements.clear(); + update_cached_statements_size(&conn, &shared.cached_statements_size); + tx.send(()).ok(); + } + Command::UnlockDb => { + drop(conn); + conn = futures_executor::block_on(shared.conn.lock()); + } + Command::Ping { tx } => { + tx.send(()).ok(); + } + Command::Shutdown { tx } => { + // drop the connection references before sending confirmation + // and ending the command loop + drop(conn); + drop(shared); + let _ = tx.send(()); + return; + } } - })?; + } + })?; establish_rx.await.map_err(|_| Error::WorkerCrashed)? }