Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions src/virtual_fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub struct VirtualFs {
advanced_writes: bool,
inode_table: Arc<RwLock<InodeTable>>,
/// Maps file_handle → OpenFile (local fd or lazy remote reference).
open_files: RwLock<HashMap<u64, OpenFile>>,
open_files: Arc<RwLock<HashMap<u64, OpenFile>>>,
next_file_handle: AtomicU64,
uid: u32,
gid: u32,
Expand Down Expand Up @@ -150,18 +150,23 @@ impl VirtualFs {
None
};

// Create open_files before poll task so we can share with it
let open_files: Arc<RwLock<HashMap<u64, OpenFile>>> = Arc::new(RwLock::new(HashMap::new()));

// Spawn remote change polling task (if interval > 0)
let invalidator: Invalidator = Arc::new(Mutex::new(None));
let poll_handle = if config.poll_interval_secs > 0 {
let bg_hub = hub_client.clone();
let bg_inodes = inodes.clone();
let bg_open_files = open_files.clone();
let bg_neg_cache = negative_cache.clone();
let bg_invalidator = invalidator.clone();
let interval = Duration::from_secs(config.poll_interval_secs);

Some(runtime.spawn(Self::poll_remote_changes(
bg_hub,
bg_inodes,
bg_open_files,
bg_neg_cache,
bg_invalidator,
interval,
Expand All @@ -178,7 +183,7 @@ impl VirtualFs {
read_only: config.read_only,
advanced_writes: config.advanced_writes,
inode_table: inodes,
open_files: RwLock::new(HashMap::new()),
open_files,
next_file_handle: AtomicU64::new(1),
uid: config.uid,
gid: config.gid,
Expand Down Expand Up @@ -540,15 +545,7 @@ impl VirtualFs {

/// Check if any open file handle references the given inode.
fn has_open_handles(&self, ino: u64) -> bool {
self.open_files
.read()
.expect("open_files poisoned")
.values()
.any(|of| match of {
OpenFile::Local { ino: i, .. } | OpenFile::Lazy { ino: i, .. } | OpenFile::Streaming { ino: i, .. } => {
*i == ino
}
})
has_open_handles_for(&self.open_files, ino)
}

/// Get or create a per-directory lock for serializing ensure_children_loaded().
Expand Down Expand Up @@ -2868,5 +2865,18 @@ enum ReadTarget {
},
}

/// Check if any open file handle references the given inode.
fn has_open_handles_for(open_files: &RwLock<HashMap<u64, OpenFile>>, ino: u64) -> bool {
open_files
.read()
.expect("open_files poisoned")
.values()
.any(|of| match of {
OpenFile::Local { ino: i, .. } | OpenFile::Lazy { ino: i, .. } | OpenFile::Streaming { ino: i, .. } => {
*i == ino
}
})
}

#[cfg(test)]
mod tests;
42 changes: 30 additions & 12 deletions src/virtual_fs/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ impl super::VirtualFs {
pub(super) async fn poll_remote_changes(
hub_client: Arc<dyn HubOps>,
inodes: Arc<RwLock<InodeTable>>,
open_files: Arc<RwLock<HashMap<u64, super::OpenFile>>>,
negative_cache: Arc<RwLock<HashMap<String, Instant>>>,
invalidator: Invalidator,
interval: Duration,
Expand Down Expand Up @@ -59,7 +60,14 @@ impl super::VirtualFs {
}
}
}
Self::apply_poll_diff(all_entries, &polled_prefixes, &inodes, &negative_cache, &invalidator);
Self::apply_poll_diff(
all_entries,
&polled_prefixes,
&inodes,
&open_files,
&negative_cache,
&invalidator,
);
}
}

Expand All @@ -74,6 +82,7 @@ impl super::VirtualFs {
remote_entries: Vec<crate::hub_api::TreeEntry>,
polled_prefixes: &HashSet<String>,
inodes: &Arc<RwLock<InodeTable>>,
open_files: &Arc<RwLock<HashMap<u64, super::OpenFile>>>,
negative_cache: &Arc<RwLock<HashMap<String, Instant>>>,
invalidator: &Invalidator,
) {
Expand Down Expand Up @@ -165,19 +174,28 @@ impl super::VirtualFs {
}

for ino in &deletions {
// Re-check dirty status under the write lock: a local write
// may have dirtied this inode between the read-lock snapshot
// and now. Dirty inodes must not be removed, as that would
// discard uncommitted local data (TOCTOU race).
if let Some(entry) = inode_table.get(*ino) {
if entry.is_dirty() {
continue;
}
let parent_ino = entry.parent;
inos_to_invalidate.push(parent_ino);
// Re-check under write lock: inode may have been removed or
// dirtied between the read-lock snapshot and now.
let (parent_ino, name) = match inode_table.get(*ino) {
Some(entry) if entry.is_dirty() => continue,
Some(entry) => (entry.parent, entry.name.clone()),
None => continue,
};
if super::has_open_handles_for(open_files, *ino) {
// Unlink the pathname but keep the inode as orphan (nlink=0)
// so open handles can still read/fstat. release() will clean
// up the orphan. Without this, the file stays visible by name
// and a recreated file at the same path would collide.
inode_table.unlink_one(parent_ino, &name);
info!(
"Remote deletion of ino={}: unlinked path, kept orphan (open handles)",
ino
);
} else {
inode_table.remove(*ino);
}
inos_to_invalidate.push(parent_ino);
inos_to_invalidate.push(*ino);
inode_table.remove(*ino);
}

// Phase 3: New remote entries (files AND directories) -> invalidate parent dir.
Expand Down
116 changes: 110 additions & 6 deletions src/virtual_fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2079,7 +2079,14 @@ fn poll_skips_unloaded_directories() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// Dir "a" should still NOT have children_loaded set (unchanged).
// Root should still be loaded (not invalidated).
Expand Down Expand Up @@ -2119,7 +2126,14 @@ fn poll_invalidates_loaded_dir_with_new_file() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// Root should be invalidated because it's loaded and has a new file.
assert!(
Expand Down Expand Up @@ -2151,7 +2165,14 @@ fn poll_detects_file_update_in_loaded_dir() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

let inodes = vfs.inode_table.read().unwrap();
let entry = inodes.get(ino).unwrap();
Expand Down Expand Up @@ -2182,14 +2203,80 @@ fn poll_detects_file_deletion_in_loaded_dir() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// ephemeral.txt should be gone, keeper.txt should remain.
assert_eq!(vfs.lookup(ROOT_INODE, "ephemeral.txt").await.unwrap_err(), libc::ENOENT);
vfs.lookup(ROOT_INODE, "keeper.txt").await.unwrap();
});
}

/// Poll skips deletion of inodes with open file handles.
#[test]
fn poll_skips_deletion_with_open_handles() {
let hub = MockHub::new_repo();
hub.add_file("open.txt", 10, Some("h1"), None);
hub.add_file("closed.txt", 20, Some("h2"), None);
let xet = MockXet::new();
let (rt, vfs) = vfs_repo(&hub, &xet);

rt.block_on(async {
let open_attr = vfs.lookup(ROOT_INODE, "open.txt").await.unwrap();
let _ = vfs.lookup(ROOT_INODE, "closed.txt").await.unwrap();

// Open a handle on open.txt (lazy read handle)
let fh = vfs.open(open_attr.ino, false, false, None).await.unwrap();

// Remove both files remotely
hub.remove_file("open.txt");
hub.remove_file("closed.txt");

let prefixes = vfs.inode_table.read().unwrap().loaded_dir_prefixes();
let mut remote = Vec::new();
for prefix in &prefixes {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// closed.txt should be deleted (no open handles)
assert_eq!(vfs.lookup(ROOT_INODE, "closed.txt").await.unwrap_err(), libc::ENOENT);

// open.txt: inode survives as orphan (nlink=0), but path is unlinked
let inodes = vfs.inode_table.read().unwrap();
let entry = inodes.get(open_attr.ino).expect("orphan inode should survive");
assert_eq!(entry.nlink, 0, "inode should be orphaned (nlink=0)");
assert!(
inodes.lookup_child(ROOT_INODE, "open.txt").is_none(),
"open.txt should not be visible by name"
);
drop(inodes);

// Release the handle: release() cleans up the orphan
vfs.release(fh).await.unwrap();

let inodes = vfs.inode_table.read().unwrap();
assert!(
inodes.get(open_attr.ino).is_none(),
"orphan should be removed after release"
);
});
}

/// Poll with multiple loaded directories fetches each independently.
#[test]
fn poll_multiple_loaded_dirs() {
Expand Down Expand Up @@ -2224,7 +2311,14 @@ fn poll_multiple_loaded_dirs() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// sub/nested.txt should be updated.
let nested_ino = vfs.lookup(sub.ino, "nested.txt").await.unwrap().ino;
Expand Down Expand Up @@ -2257,6 +2351,7 @@ fn poll_failed_prefix_no_spurious_deletion() {
root_entries,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);
Expand Down Expand Up @@ -2287,7 +2382,14 @@ fn poll_after_invalidation_no_spurious_deletion() {
remote.extend(hub.list_tree(prefix).await.unwrap());
}
let polled: std::collections::HashSet<String> = prefixes.into_iter().collect();
VirtualFs::apply_poll_diff(remote, &polled, &vfs.inode_table, &vfs.negative_cache, &vfs.invalidator);
VirtualFs::apply_poll_diff(
remote,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);

// Root is now invalidated (children_loaded=false).
// Second poll: root is NOT in loaded_dir_prefixes anymore.
Expand All @@ -2301,6 +2403,7 @@ fn poll_after_invalidation_no_spurious_deletion() {
remote2,
&polled2,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);
Expand Down Expand Up @@ -2344,6 +2447,7 @@ fn poll_detects_deleted_subdirectory() {
root_entries,
&polled,
&vfs.inode_table,
&vfs.open_files,
&vfs.negative_cache,
&vfs.invalidator,
);
Expand Down