From 36504e257a3b86e124c3219dd3040d86dcb066b2 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 28 May 2026 22:07:15 +0000 Subject: [PATCH 1/2] fix: recover from poisoned mutex in SqliteStorage A panic in one thread while holding the storage mutex used to poison the lock for every other thread, taking down the whole crawler. Use unwrap_or_else(PoisonError::into_inner) so the connection stays usable after such a panic. Closes #11 https://claude.ai/code/session_012RmdaovmNWZVAim4XxCWwn --- src/core/storage.rs | 4 ++-- tests/core_storage.rs | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/core/storage.rs b/src/core/storage.rs index cf51381..eee0119 100644 --- a/src/core/storage.rs +++ b/src/core/storage.rs @@ -38,7 +38,7 @@ impl SqliteStorage { ) -> Result<(), StorageError> { let hash = Self::get_hash(identifier); let json = serde_json::to_string(data)?; - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); conn.execute( "INSERT OR REPLACE INTO storage (url, identifier, element_data) VALUES (?1, ?2, ?3)", params![self.url, hash, json], @@ -52,7 +52,7 @@ impl SqliteStorage { identifier: &str, ) -> Result>, StorageError> { let hash = Self::get_hash(identifier); - let conn = self.conn.lock().unwrap(); + let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); let mut stmt = conn.prepare("SELECT element_data FROM storage WHERE url = ?1 AND identifier = ?2")?; let result: Option = stmt diff --git a/tests/core_storage.rs b/tests/core_storage.rs index a5ed969..64e30d5 100644 --- a/tests/core_storage.rs +++ b/tests/core_storage.rs @@ -102,3 +102,38 @@ fn test_different_identifiers_dont_collide() { Some(&serde_json::Value::String("second".to_string())) ); } + +#[test] +fn test_save_works_after_mutex_poisoning() { + // Trigger a panic in a thread while it holds the storage mutex, + // then verify that subsequent calls still succeed instead of panicking + // on the poisoned lock. + use std::sync::Arc; + use std::thread; + + let dir = tempdir().unwrap(); + let db = dir.path().join("storage.db"); + let storage = + Arc::new(SqliteStorage::new(db.to_str().unwrap(), "https://example.com").unwrap()); + + // Poison the mutex by panicking from a thread that uses it. + let s = storage.clone(); + let _ = thread::spawn(move || { + let _ignored = s.save("poison", &make_data("k", "v")); + panic!("intentional poisoning"); + }) + .join(); + + // The mutex is now poisoned. Storage must still be usable. + let data = make_data("after", "ok"); + storage + .save("after-poison", &data) + .expect("save should recover from poisoned mutex"); + let got = storage + .retrieve("after-poison") + .expect("retrieve should recover from poisoned mutex"); + assert_eq!( + got.unwrap().get("after").cloned(), + Some(serde_json::Value::String("ok".to_string())) + ); +} From f2283586aebeb6d7f0c3f6be2dcc8caee4f0721a Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 28 May 2026 22:23:40 +0000 Subject: [PATCH 2/2] fix: reopen rusqlite connection on poisoned mutex, fix test that didn't poison Address CodeRabbit review on #23: - The previous test held the guard only during s.save() and panicked after the guard was dropped, so the mutex was never poisoned and the recovery path was never exercised. Replace with a unit test in the same module that panics while the guard is still alive and asserts is_poisoned(). - Reusing a rusqlite Connection after a panic mid-operation is not documented as safe. Instead of into_inner()ing the recovered handle, reopen a fresh Connection inside the recovered guard. https://claude.ai/code/session_012RmdaovmNWZVAim4XxCWwn --- src/core/storage.rs | 84 +++++++++++++++++++++++++++++++++++++++---- tests/core_storage.rs | 35 ------------------ 2 files changed, 77 insertions(+), 42 deletions(-) diff --git a/src/core/storage.rs b/src/core/storage.rs index eee0119..402b53d 100644 --- a/src/core/storage.rs +++ b/src/core/storage.rs @@ -1,17 +1,29 @@ use rusqlite::{params, Connection}; use sha2::{Digest, Sha256}; use std::collections::HashMap; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; /// SQLite-backed storage for adaptive element relocation. pub struct SqliteStorage { conn: Mutex, url: String, + db_path: String, } impl SqliteStorage { /// Create storage backed by SQLite file. URL is normalized to lowercase. pub fn new(db_path: &str, url: &str) -> Result { + let conn = Self::open(db_path)?; + Ok(Self { + conn: Mutex::new(conn), + url: url.to_lowercase(), + db_path: db_path.to_string(), + }) + } + + /// Open and initialize the SQLite connection at `db_path`. Used both at + /// construction and to recover after a poisoned mutex. + fn open(db_path: &str) -> Result { let conn = Connection::open(db_path)?; conn.execute_batch("PRAGMA journal_mode=WAL;")?; conn.execute( @@ -24,10 +36,24 @@ impl SqliteStorage { )", [], )?; - Ok(Self { - conn: Mutex::new(conn), - url: url.to_lowercase(), - }) + Ok(conn) + } + + /// Lock the connection. If the mutex is poisoned (a previous holder + /// panicked), recover by reopening a fresh connection — rusqlite does not + /// guarantee a `Connection` remains usable after a panic mid-operation, + /// so reopening is safer than reusing the recovered handle. + fn locked_conn(&self) -> MutexGuard<'_, Connection> { + match self.conn.lock() { + Ok(guard) => guard, + Err(poison) => { + let mut guard = poison.into_inner(); + if let Ok(fresh) = Self::open(&self.db_path) { + *guard = fresh; + } + guard + } + } } /// Save element data. Uses INSERT OR REPLACE for upsert. @@ -38,7 +64,7 @@ impl SqliteStorage { ) -> Result<(), StorageError> { let hash = Self::get_hash(identifier); let json = serde_json::to_string(data)?; - let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); + let conn = self.locked_conn(); conn.execute( "INSERT OR REPLACE INTO storage (url, identifier, element_data) VALUES (?1, ?2, ?3)", params![self.url, hash, json], @@ -52,7 +78,7 @@ impl SqliteStorage { identifier: &str, ) -> Result>, StorageError> { let hash = Self::get_hash(identifier); - let conn = self.conn.lock().unwrap_or_else(|e| e.into_inner()); + let conn = self.locked_conn(); let mut stmt = conn.prepare("SELECT element_data FROM storage WHERE url = ?1 AND identifier = ?2")?; let result: Option = stmt @@ -81,3 +107,47 @@ pub enum StorageError { #[error("JSON error: {0}")] Json(#[from] serde_json::Error), } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + use std::thread; + use tempfile::tempdir; + + #[test] + fn storage_recovers_after_mutex_poisoning() { + let dir = tempdir().unwrap(); + let db = dir.path().join("storage.db"); + let storage = + Arc::new(SqliteStorage::new(db.to_str().unwrap(), "https://example.com").unwrap()); + + // Poison the mutex by panicking *while* holding the guard. + let s = storage.clone(); + let _ = thread::spawn(move || { + let _guard = s.conn.lock().unwrap(); + panic!("intentional poisoning while holding the guard"); + }) + .join(); + + // Confirm the recovery path is actually exercised. + assert!( + storage.conn.is_poisoned(), + "mutex should be poisoned after the thread panicked while holding the guard" + ); + + let mut data = HashMap::new(); + data.insert("k".to_string(), serde_json::Value::String("v".to_string())); + storage + .save("after-poison", &data) + .expect("save must recover from poisoned mutex"); + let got = storage + .retrieve("after-poison") + .expect("retrieve must recover from poisoned mutex") + .expect("the row written after recovery should be present"); + assert_eq!( + got.get("k").cloned(), + Some(serde_json::Value::String("v".to_string())) + ); + } +} diff --git a/tests/core_storage.rs b/tests/core_storage.rs index 64e30d5..a5ed969 100644 --- a/tests/core_storage.rs +++ b/tests/core_storage.rs @@ -102,38 +102,3 @@ fn test_different_identifiers_dont_collide() { Some(&serde_json::Value::String("second".to_string())) ); } - -#[test] -fn test_save_works_after_mutex_poisoning() { - // Trigger a panic in a thread while it holds the storage mutex, - // then verify that subsequent calls still succeed instead of panicking - // on the poisoned lock. - use std::sync::Arc; - use std::thread; - - let dir = tempdir().unwrap(); - let db = dir.path().join("storage.db"); - let storage = - Arc::new(SqliteStorage::new(db.to_str().unwrap(), "https://example.com").unwrap()); - - // Poison the mutex by panicking from a thread that uses it. - let s = storage.clone(); - let _ = thread::spawn(move || { - let _ignored = s.save("poison", &make_data("k", "v")); - panic!("intentional poisoning"); - }) - .join(); - - // The mutex is now poisoned. Storage must still be usable. - let data = make_data("after", "ok"); - storage - .save("after-poison", &data) - .expect("save should recover from poisoned mutex"); - let got = storage - .retrieve("after-poison") - .expect("retrieve should recover from poisoned mutex"); - assert_eq!( - got.unwrap().get("after").cloned(), - Some(serde_json::Value::String("ok".to_string())) - ); -}