Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 94 additions & 23 deletions src/branch.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::path::{Path, PathBuf};
Expand All @@ -18,10 +18,19 @@ pub struct Branch {
pub files_dir: PathBuf,
pub tombstones_file: PathBuf,
tombstones: RwLock<HashSet<String>>,
/// How many children have been committed (merged) into this branch.
pub commit_count: AtomicU64,
/// Parent's commit_count at the time this branch was forked.
pub parent_version_at_fork: u64,
}

impl Branch {
pub fn new(name: &str, parent: Option<&str>, storage_path: &Path) -> Result<Self> {
pub fn new(
name: &str,
parent: Option<&str>,
storage_path: &Path,
parent_version_at_fork: u64,
) -> Result<Self> {
let branch_dir = storage_path.join("branches").join(name);
let files_dir = branch_dir.join("files");
let tombstones_file = branch_dir.join("tombstones");
Expand All @@ -39,6 +48,8 @@ impl Branch {
files_dir,
tombstones_file,
tombstones: RwLock::new(tombstones),
commit_count: AtomicU64::new(0),
parent_version_at_fork,
})
}

Expand Down Expand Up @@ -129,23 +140,25 @@ pub struct BranchManager {
pub storage_path: PathBuf,
pub base_path: PathBuf,
pub workspace_path: PathBuf,
branches: RwLock<std::collections::HashMap<String, Branch>>,
branches: RwLock<HashMap<String, Branch>>,
pub epoch: AtomicU64,
/// Notifiers for invalidating kernel cache on commit/abort
/// Maps (branch_name, mountpoint) -> Notifier
notifiers: Mutex<std::collections::HashMap<(String, PathBuf), Arc<Notifier>>>,
notifiers: Mutex<HashMap<(String, PathBuf), Arc<Notifier>>>,
/// Track opened file inodes per branch for cache invalidation
/// Maps branch_name -> Set of inodes
opened_inodes: Mutex<std::collections::HashMap<String, HashSet<u64>>>,
opened_inodes: Mutex<HashMap<String, HashSet<u64>>>,
/// Current branch per mount — single source of truth
mount_branches: RwLock<HashMap<PathBuf, String>>,
}

impl BranchManager {
pub fn new(storage_path: PathBuf, base_path: PathBuf, workspace_path: PathBuf) -> Result<Self> {
fs::create_dir_all(&storage_path)?;

// Always start fresh with just the "main" branch
let mut branches = std::collections::HashMap::new();
let main_branch = Branch::new("main", None, &storage_path)?;
let mut branches = HashMap::new();
let main_branch = Branch::new("main", None, &storage_path, 0)?;
branches.insert("main".to_string(), main_branch);

Ok(Self {
Expand All @@ -154,11 +167,56 @@ impl BranchManager {
workspace_path,
branches: RwLock::new(branches),
epoch: AtomicU64::new(0),
notifiers: Mutex::new(std::collections::HashMap::new()),
opened_inodes: Mutex::new(std::collections::HashMap::new()),
notifiers: Mutex::new(HashMap::new()),
opened_inodes: Mutex::new(HashMap::new()),
mount_branches: RwLock::new(HashMap::new()),
})
}

/// Register a mount's initial branch (called before FUSE spawn).
pub fn set_mount_branch(&self, mountpoint: &Path, branch: &str) {
self.mount_branches
.write()
.insert(mountpoint.to_path_buf(), branch.to_string());
}

/// Read the current branch for a mount.
pub fn get_mount_branch(&self, mountpoint: &Path) -> Option<String> {
self.mount_branches.read().get(mountpoint).cloned()
}

/// Atomically switch a mount's branch, re-keying the notifier map.
pub fn switch_mount_branch(&self, mountpoint: &Path, new_branch: &str) {
// Lock order: mount_branches → notifiers (never reversed)
let mut mb = self.mount_branches.write();
let old_branch = mb.insert(mountpoint.to_path_buf(), new_branch.to_string());

// Re-key notifier from (old, mount) → (new, mount)
if let Some(old) = old_branch {
let mut notifiers = self.notifiers.lock();
if let Some(notifier) = notifiers.remove(&(old.clone(), mountpoint.to_path_buf())) {
notifiers.insert((new_branch.to_string(), mountpoint.to_path_buf()), notifier);
}
log::info!(
"switch_mount_branch: {:?} '{}' -> '{}'",
mountpoint,
old,
new_branch
);
}
}

/// Remove a mount's branch tracking and notifier (used on unmount).
pub fn unregister_mount(&self, mountpoint: &Path) {
let mut mb = self.mount_branches.write();
let old_branch = mb.remove(mountpoint);
if let Some(old) = old_branch {
self.notifiers
.lock()
.remove(&(old, mountpoint.to_path_buf()));
}
}

pub fn create_branch(&self, name: &str, parent: &str) -> Result<()> {
let start = Instant::now();
validate_branch_name(name)?;
Expand All @@ -169,11 +227,12 @@ impl BranchManager {
return Err(BranchError::AlreadyExists(name.to_string()));
}

if !branches.contains_key(parent) {
return Err(BranchError::ParentNotFound(parent.to_string()));
}
let parent_branch = branches
.get(parent)
.ok_or_else(|| BranchError::ParentNotFound(parent.to_string()))?;
let parent_version = parent_branch.commit_count.load(Ordering::SeqCst);

let branch = Branch::new(name, Some(parent), &self.storage_path)?;
let branch = Branch::new(name, Some(parent), &self.storage_path, parent_version)?;
branches.insert(name.to_string(), branch);

let elapsed = start.elapsed();
Expand Down Expand Up @@ -336,16 +395,6 @@ impl BranchManager {
}
}

/// Return the names of branches whose parent is `parent_name`.
pub fn get_children(&self, parent_name: &str) -> Vec<String> {
self.branches
.read()
.values()
.filter(|b| b.parent.as_deref() == Some(parent_name))
.map(|b| b.name.clone())
.collect()
}

pub fn resolve_path(&self, branch_name: &str, rel_path: &str) -> Result<Option<PathBuf>> {
let branches = self.branches.read();

Expand Down Expand Up @@ -405,6 +454,20 @@ impl BranchManager {
.clone()
.ok_or_else(|| BranchError::NotFound(branch_name.to_string()))?;

let child_version_at_fork = branch.parent_version_at_fork;

// First-wins conflict detection: check that the parent hasn't had
// another sibling committed since this branch was forked.
{
let parent = branches
.get(&parent_name)
.ok_or_else(|| BranchError::NotFound(parent_name.to_string()))?;
let current_parent_version = parent.commit_count.load(Ordering::SeqCst);
if current_parent_version != child_version_at_fork {
return Err(BranchError::Conflict(branch_name.to_string()));
}
}

let child_tombstones = branch.get_tombstones();
let child_files_dir = branch.files_dir.clone();

Expand Down Expand Up @@ -437,6 +500,11 @@ impl BranchManager {
num_files += 1;
})?;

// Increment parent's commit_count (first-wins bookkeeping)
if let Some(main_branch) = branches.get("main") {
main_branch.commit_count.fetch_add(1, Ordering::SeqCst);
}

// Remove branch
branches.remove(branch_name);
let branch_dir = self.storage_path.join("branches").join(branch_name);
Expand Down Expand Up @@ -501,6 +569,9 @@ impl BranchManager {
// Write updated tombstones to parent
parent.set_tombstones(parent_tombstones)?;

// Increment parent's commit_count (first-wins bookkeeping)
parent.commit_count.fetch_add(1, Ordering::SeqCst);

// Remove child branch
branches.remove(branch_name);
let branch_dir = self.storage_path.join("branches").join(branch_name);
Expand Down
73 changes: 23 additions & 50 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ pub enum Request {
name: String,
parent: String,
},
NotifySwitch {
mountpoint: String,
branch: String,
},
GetMountBranch {
mountpoint: String,
},
Expand Down Expand Up @@ -78,10 +74,10 @@ impl Response {
}
}

/// Per-mount state including the FUSE session and current branch
/// Per-mount state (FUSE session handle — kept alive until unmount)
pub struct MountInfo {
#[allow(dead_code)]
session: BackgroundSession,
current_branch: String,
}

pub struct Daemon {
Expand Down Expand Up @@ -145,7 +141,10 @@ impl Daemon {
mountpoint: &Path,
passthrough: bool,
) -> Result<()> {
let fs = BranchFs::new(self.manager.clone(), branch_name.to_string(), passthrough);
// Register the mount branch *before* creating BranchFs so get_branch_name() works
self.manager.set_mount_branch(mountpoint, branch_name);

let fs = BranchFs::new(self.manager.clone(), mountpoint.to_path_buf(), passthrough);
let options = vec![
MountOption::FSName("branchfs".to_string()),
MountOption::DefaultPermissions,
Expand All @@ -157,18 +156,21 @@ impl Daemon {
mountpoint,
);

let session =
fuser::spawn_mount2(fs, mountpoint, &options).map_err(crate::error::BranchError::Io)?;
let session = match fuser::spawn_mount2(fs, mountpoint, &options) {
Ok(s) => s,
Err(e) => {
// Clean up on failure
self.manager.unregister_mount(mountpoint);
return Err(crate::error::BranchError::Io(e));
}
};

// Get the notifier for cache invalidation and register it with the manager
let notifier = Arc::new(session.notifier());
self.manager
.register_notifier(branch_name, mountpoint.to_path_buf(), notifier);

let mount_info = MountInfo {
session,
current_branch: branch_name.to_string(),
};
let mount_info = MountInfo { session };

self.mounts
.lock()
Expand All @@ -178,12 +180,12 @@ impl Daemon {
}

pub fn unmount(&self, mountpoint: &Path) -> Result<()> {
let (should_shutdown, mount_info) = {
let should_shutdown = {
let mut mounts = self.mounts.lock();
if let Some(info) = mounts.remove(mountpoint) {
if mounts.remove(mountpoint).is_some() {
log::info!("Unmounted {:?}", mountpoint);
// The BackgroundSession drop will handle FUSE cleanup
(mounts.is_empty(), Some(info))
mounts.is_empty()
} else {
return Err(crate::error::BranchError::MountNotFound(format!(
"{:?}",
Expand All @@ -192,10 +194,7 @@ impl Daemon {
}
};

if let Some(info) = mount_info {
self.manager
.unregister_notifier(&info.current_branch, mountpoint);
}
self.manager.unregister_mount(mountpoint);

if should_shutdown {
log::info!("All mounts removed, daemon will exit");
Expand All @@ -209,9 +208,8 @@ impl Daemon {
let mut mounts = self.mounts.lock();
let mountpoints: Vec<PathBuf> = mounts.keys().cloned().collect();
for mountpoint in &mountpoints {
if let Some(info) = mounts.remove(mountpoint) {
self.manager
.unregister_notifier(&info.current_branch, mountpoint);
if mounts.remove(mountpoint).is_some() {
self.manager.unregister_mount(mountpoint);
// BackgroundSession dropped here → FUSE unmount
log::info!("Cleaned up mount at {:?}", mountpoint);
}
Expand Down Expand Up @@ -329,35 +327,10 @@ impl Daemon {
Ok(()) => Response::success(),
Err(e) => Response::error(&format!("{}", e)),
},
Request::NotifySwitch { mountpoint, branch } => {
let path = PathBuf::from(&mountpoint);
let mut mounts = self.mounts.lock();
if let Some(ref mut info) = mounts.get_mut(&path) {
// Unregister old notifier
self.manager
.unregister_notifier(&info.current_branch, &path);
// Update tracked branch
let old_branch = std::mem::replace(&mut info.current_branch, branch.clone());
// Register notifier for new branch
let notifier = Arc::new(info.session.notifier());
self.manager
.register_notifier(&branch, path.clone(), notifier);
log::info!(
"Mount {:?} switched from '{}' to '{}'",
path,
old_branch,
branch
);
Response::success()
} else {
Response::error(&format!("Mount not found: {:?}", path))
}
}
Request::GetMountBranch { mountpoint } => {
let path = PathBuf::from(&mountpoint);
let mounts = self.mounts.lock();
if let Some(info) = mounts.get(&path) {
Response::success_with_data(serde_json::json!(info.current_branch))
if let Some(branch) = self.manager.get_mount_branch(&path) {
Response::success_with_data(serde_json::json!(branch))
} else {
Response::error(&format!("Mount not found: {:?}", path))
}
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub enum BranchError {
#[error("cannot commit/abort non-leaf branch '{0}'")]
NotALeaf(String),

#[error("commit conflict: branch '{0}' is stale (another sibling already committed)")]
Conflict(String),

#[error("io error: {0}")]
Io(#[from] std::io::Error),

Expand Down
Loading