Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 77 additions & 7 deletions src/core/storage.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,29 @@
use rusqlite::{params, Connection};
use sha2::{Digest, Sha256};
use std::collections::HashMap;
use std::sync::Mutex;
use std::sync::{Mutex, MutexGuard};

/// SQLite-backed storage for adaptive element relocation.
pub struct SqliteStorage {
conn: Mutex<Connection>,
url: String,
db_path: String,
}

impl SqliteStorage {
/// Create storage backed by SQLite file. URL is normalized to lowercase.
pub fn new(db_path: &str, url: &str) -> Result<Self, StorageError> {
let conn = Self::open(db_path)?;
Ok(Self {
conn: Mutex::new(conn),
url: url.to_lowercase(),
db_path: db_path.to_string(),
})
}

/// Open and initialize the SQLite connection at `db_path`. Used both at
/// construction and to recover after a poisoned mutex.
fn open(db_path: &str) -> Result<Connection, StorageError> {
let conn = Connection::open(db_path)?;
conn.execute_batch("PRAGMA journal_mode=WAL;")?;
conn.execute(
Expand All @@ -24,10 +36,24 @@ impl SqliteStorage {
)",
[],
)?;
Ok(Self {
conn: Mutex::new(conn),
url: url.to_lowercase(),
})
Ok(conn)
}

/// Lock the connection. If the mutex is poisoned (a previous holder
/// panicked), recover by reopening a fresh connection — rusqlite does not
/// guarantee a `Connection` remains usable after a panic mid-operation,
/// so reopening is safer than reusing the recovered handle.
fn locked_conn(&self) -> MutexGuard<'_, Connection> {
match self.conn.lock() {
Ok(guard) => guard,
Err(poison) => {
let mut guard = poison.into_inner();
if let Ok(fresh) = Self::open(&self.db_path) {
*guard = fresh;
}
guard
}
}
Comment on lines +46 to +56
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Silent fallback to poisoned connection undermines recovery.

Lines 51-53 silently fall back to the old (potentially corrupted) connection if Self::open() fails. This contradicts the intent stated in the comment (lines 42-45) that "reopening is safer than reusing the recovered handle" and defeats the purpose of the recovery mechanism. Per the past review and rusqlite documentation, a Connection after a panic mid-operation is not guaranteed to be usable.

If reopening fails due to filesystem issues, returning the poisoned connection could lead to data corruption or undefined behavior.

🔧 Proposed fix: propagate the error instead of silent fallback

Change locked_conn to return Result:

-    fn locked_conn(&self) -> MutexGuard<'_, Connection> {
+    fn locked_conn(&self) -> Result<MutexGuard<'_, Connection>, StorageError> {
         match self.conn.lock() {
-            Ok(guard) => guard,
+            Ok(guard) => Ok(guard),
             Err(poison) => {
                 let mut guard = poison.into_inner();
-                if let Ok(fresh) = Self::open(&self.db_path) {
-                    *guard = fresh;
-                }
-                guard
+                let fresh = Self::open(&self.db_path)?;
+                *guard = fresh;
+                Ok(guard)
             }
         }
     }

Update call sites in save and retrieve:

     pub fn save(...) -> Result<(), StorageError> {
         let hash = Self::get_hash(identifier);
         let json = serde_json::to_string(data)?;
-        let conn = self.locked_conn();
+        let conn = self.locked_conn()?;
         conn.execute(...)?;
         Ok(())
     }
     pub fn retrieve(...) -> Result<Option<HashMap<String, serde_json::Value>>, StorageError> {
         let hash = Self::get_hash(identifier);
-        let conn = self.locked_conn();
+        let conn = self.locked_conn()?;
         let mut stmt = conn.prepare(...)?;
         ...
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/core/storage.rs` around lines 46 - 56, The current locked_conn silently
returns a poisoned Connection when Self::open(&self.db_path) fails, which can
reuse a potentially corrupted handle; change locked_conn to return
Result<MutexGuard<'_, Connection>, anyhow::Error> (or a suitable error type) and
propagate errors instead of falling back: in the Err(poison) branch, attempt
Self::open(&self.db_path) and on success replace the inner Connection, but if
opening fails return an error; update all callers (notably save and retrieve) to
handle the Result (propagate with ? or map_err) and adjust their signatures to
return Result as needed so failures to reopen are surfaced instead of reusing a
poisoned connection.

}

/// Save element data. Uses INSERT OR REPLACE for upsert.
Expand All @@ -38,7 +64,7 @@ impl SqliteStorage {
) -> Result<(), StorageError> {
let hash = Self::get_hash(identifier);
let json = serde_json::to_string(data)?;
let conn = self.conn.lock().unwrap();
let conn = self.locked_conn();
conn.execute(
"INSERT OR REPLACE INTO storage (url, identifier, element_data) VALUES (?1, ?2, ?3)",
params![self.url, hash, json],
Expand All @@ -52,7 +78,7 @@ impl SqliteStorage {
identifier: &str,
) -> Result<Option<HashMap<String, serde_json::Value>>, StorageError> {
let hash = Self::get_hash(identifier);
let conn = self.conn.lock().unwrap();
let conn = self.locked_conn();
let mut stmt =
conn.prepare("SELECT element_data FROM storage WHERE url = ?1 AND identifier = ?2")?;
let result: Option<String> = stmt
Expand Down Expand Up @@ -81,3 +107,47 @@ pub enum StorageError {
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
}

#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
use tempfile::tempdir;

#[test]
fn storage_recovers_after_mutex_poisoning() {
let dir = tempdir().unwrap();
let db = dir.path().join("storage.db");
let storage =
Arc::new(SqliteStorage::new(db.to_str().unwrap(), "https://example.com").unwrap());

// Poison the mutex by panicking *while* holding the guard.
let s = storage.clone();
let _ = thread::spawn(move || {
let _guard = s.conn.lock().unwrap();
panic!("intentional poisoning while holding the guard");
})
.join();

// Confirm the recovery path is actually exercised.
assert!(
storage.conn.is_poisoned(),
"mutex should be poisoned after the thread panicked while holding the guard"
);

let mut data = HashMap::new();
data.insert("k".to_string(), serde_json::Value::String("v".to_string()));
storage
.save("after-poison", &data)
.expect("save must recover from poisoned mutex");
let got = storage
.retrieve("after-poison")
.expect("retrieve must recover from poisoned mutex")
.expect("the row written after recovery should be present");
assert_eq!(
got.get("k").cloned(),
Some(serde_json::Value::String("v".to_string()))
);
}
}
Loading