Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,59 @@ jobs:
gh pr comment "$PR" --body "$BODY"
echo "Created new comment"
fi

fsx:
name: Data Integrity (fsx)
runs-on:
group: hf-mount-ci
needs: lint-test
env:
HF_TOKEN: ${{ secrets.HF_TOKEN_HUB_CI }}
HF_ENDPOINT: https://hub-ci.huggingface.co
steps:
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

- name: Install system deps
run: |
sudo apt-get update
sudo apt-get install -y fuse3 libfuse3-dev
echo 'user_allow_other' | sudo tee -a /etc/fuse.conf

- name: Build release binaries
run: cargo build --release

- name: Run fsx (50k random ops)
timeout-minutes: 10
run: cargo test --release --test fsx -- --nocapture

xfstests:
name: xfstests generic
runs-on:
group: hf-mount-ci
needs: lint-test
env:
HF_TOKEN: ${{ secrets.HF_TOKEN_HUB_CI }}
HF_ENDPOINT: https://hub-ci.huggingface.co
steps:
- uses: actions/checkout@v4

- uses: dtolnay/rust-toolchain@stable

- uses: Swatinem/rust-cache@v2

- name: Install system deps
run: |
sudo apt-get update
sudo apt-get install -y fuse3 libfuse3-dev libtool autoconf automake libaio-dev libacl1-dev uuid-dev xfsprogs xfslibs-dev attr acl bc
echo 'user_allow_other' | sudo tee -a /etc/fuse.conf

- name: Build release binaries
run: cargo build --release

- name: Run xfstests generic/quick
timeout-minutes: 15
run: cargo test --release --test xfstests -- --nocapture
10 changes: 5 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ edition = "2024"

[dependencies]
# xet-core crates
xet-client = { git = "https://github.com/huggingface/xet-core.git", rev = "0d7808b" }
xet-core-structures = { git = "https://github.com/huggingface/xet-core.git", rev = "0d7808b" }
xet-data = { git = "https://github.com/huggingface/xet-core.git", rev = "0d7808b" }
xet-runtime = { git = "https://github.com/huggingface/xet-core.git", rev = "0d7808b" }
xet-client = { git = "https://github.com/huggingface/xet-core.git", branch = "adrien/combined-hf-mount" }
xet-core-structures = { git = "https://github.com/huggingface/xet-core.git", branch = "adrien/combined-hf-mount" }
xet-data = { git = "https://github.com/huggingface/xet-core.git", branch = "adrien/combined-hf-mount" }
xet-runtime = { git = "https://github.com/huggingface/xet-core.git", branch = "adrien/combined-hf-mount" }

# External crates
async-trait = "0.1"
Expand Down
38 changes: 23 additions & 15 deletions src/cached_xet_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use xet_client::ClientError;
use xet_client::cas_client::adaptive_concurrency::ConnectionPermit;
use xet_client::cas_client::{Client, ProgressCallback, URLProvider};
use xet_client::cas_types::{
BatchQueryReconstructionResponse, FileRange, HexMerkleHash, QueryReconstructionResponse, XorbReconstructionTerm,
BatchQueryReconstructionResponse, FileRange, HexMerkleHash, QueryReconstructionResponseV2, XorbReconstructionTerm,
};
use xet_core_structures::merklehash::MerkleHash;
use xet_core_structures::metadata_shard::file_structs::MDBFileInfo;
Expand All @@ -24,12 +24,12 @@ const MAX_CACHE_ENTRIES: usize = 4096;
const CACHE_TTL: Duration = Duration::from_secs(59 * 60);

struct CacheEntry {
response: QueryReconstructionResponse,
response: QueryReconstructionResponseV2,
inserted_at: Instant,
}

impl CacheEntry {
fn new(response: QueryReconstructionResponse) -> Self {
fn new(response: QueryReconstructionResponseV2) -> Self {
Self {
response,
inserted_at: Instant::now(),
Expand Down Expand Up @@ -69,7 +69,7 @@ impl CachedXetClient {
}
}

/// Derive a range-scoped `QueryReconstructionResponse` from a cached full-file response.
/// Derive a range-scoped `QueryReconstructionResponseV2` from a cached full-file response.
///
/// The full-file response lists all terms in file order with their unpacked byte lengths.
/// We walk the terms, track cumulative byte offsets, and keep only terms that overlap
Expand All @@ -94,7 +94,7 @@ impl CachedXetClient {
/// TODO: fix P2 — add `chunk_uncompressed_sizes: Vec<u32>` (and `chunk_compressed_sizes`)
/// to `XorbReconstructionTerm` in xet-core/xetcas so that chunk-level trimming can be
/// replicated client-side, reducing over-fetch to zero.
fn derive_range_response(full: &QueryReconstructionResponse, range: FileRange) -> QueryReconstructionResponse {
fn derive_range_response(full: &QueryReconstructionResponseV2, range: FileRange) -> QueryReconstructionResponseV2 {
let mut cur_offset: u64 = 0;
let mut result_terms: Vec<XorbReconstructionTerm> = Vec::new();
let mut offset_into_first: u64 = 0;
Expand All @@ -120,17 +120,17 @@ fn derive_range_response(full: &QueryReconstructionResponse, range: FileRange) -
cur_offset = term_end;
}

let fetch_info = full
.fetch_info
let xorbs = full
.xorbs
.iter()
.filter(|(k, _)| needed_hashes.contains(*k))
.map(|(k, v)| (*k, v.clone()))
.collect();

QueryReconstructionResponse {
QueryReconstructionResponseV2 {
offset_into_first_range: offset_into_first,
terms: result_terms,
fetch_info,
xorbs,
}
}

Expand All @@ -140,7 +140,7 @@ impl Client for CachedXetClient {
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponse>> {
) -> Result<Option<QueryReconstructionResponseV2>> {
let key: ReconCacheKey = (*file_id, bytes_range);

// Single-flight action: either wait for an in-flight request or lead the fetch.
Expand All @@ -156,8 +156,8 @@ impl Client for CachedXetClient {
// at chunk granularity) than what derive_range_response produces from the
// full plan, so prefer it when available.
enum CacheResult {
ExactHit(QueryReconstructionResponse),
FullPlan(QueryReconstructionResponse, FileRange),
ExactHit(QueryReconstructionResponseV2),
FullPlan(QueryReconstructionResponseV2, FileRange),
Miss,
}

Expand Down Expand Up @@ -323,6 +323,10 @@ impl Client for CachedXetClient {
.upload_xorb(prefix, serialized_cas_object, progress_callback, upload_permit)
.await
}

async fn get_file_chunk_hashes(&self, file_id: &MerkleHash) -> Result<Vec<(MerkleHash, u64)>> {
self.inner.get_file_chunk_hashes(file_id).await
}
}

#[cfg(test)]
Expand Down Expand Up @@ -391,7 +395,7 @@ mod tests {
&self,
file_id: &MerkleHash,
bytes_range: Option<FileRange>,
) -> Result<Option<QueryReconstructionResponse>> {
) -> Result<Option<QueryReconstructionResponseV2>> {
let key = (*file_id, bytes_range);
{
let mut calls = self.calls.lock().expect("calls lock poisoned");
Expand All @@ -404,10 +408,10 @@ mod tests {
}

match self.mode {
MockMode::ReturnSome => Ok(Some(QueryReconstructionResponse {
MockMode::ReturnSome => Ok(Some(QueryReconstructionResponseV2 {
offset_into_first_range: bytes_range.map_or(0, |r| r.start),
terms: Vec::new(),
fetch_info: HashMap::new(),
xorbs: HashMap::new(),
})),
MockMode::ReturnNone => Ok(None),
MockMode::ReturnErr => Err(ClientError::Other("boom".to_string())),
Expand Down Expand Up @@ -463,6 +467,10 @@ mod tests {
) -> Result<u64> {
unimplemented!("not needed in these tests")
}

async fn get_file_chunk_hashes(&self, _file_id: &MerkleHash) -> Result<Vec<(MerkleHash, u64)>> {
unimplemented!("not needed in these tests")
}
}

fn hash_for(i: usize) -> MerkleHash {
Expand Down
40 changes: 27 additions & 13 deletions src/nfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,34 @@ impl NFSFileSystem for NFSAdapter {
}

async fn write(&self, id: fileid3, offset: u64, data: &[u8]) -> Result<fattr3, nfsstat3> {
// Write requires a handle already in the pool (from create).
let file_handle = self
.handle_pool
.lock()
.expect("handle_pool poisoned")
.get(id)
.ok_or(nfsstat3::NFS3ERR_STALE)?;
// NFS always uses advanced_writes (staging files), so write() is a
// synchronous pwrite() — safe to call from async context.
self.virtual_fs
.write(id, file_handle, offset, data)
// Fast path: try existing pool handle (writable if file was created or already upgraded).
let existing = self.handle_pool.lock().expect("handle_pool poisoned").get(id);
if let Some(fh) = existing {
match self.virtual_fs.write(id, fh, offset, data) {
Ok(_) => {
self.virtual_fs.schedule_flush(id);
return self
.virtual_fs
.getattr(id)
.map(|a| vfs_attr_to_nfs(&a))
.map_err(errno_to_nfs);
}
Err(libc::EBADF) => {
// Handle was opened read-only (from a prior read). Upgrade to writable.
self.evict_handle(id, fh).await;
}
Err(e) => return Err(errno_to_nfs(e)),
}
}

// Slow path: open a writable handle (sparse staging for existing CAS files).
let fh = self
.virtual_fs
.open(id, true, false, None)
.await
.map_err(errno_to_nfs)?;
// NFS has no close/flush RPC, so schedule a debounced flush after
// each write to ensure data eventually gets committed to the Hub.
self.insert_handle(id, fh).await;
self.virtual_fs.write(id, fh, offset, data).map_err(errno_to_nfs)?;
self.virtual_fs.schedule_flush(id);
self.virtual_fs
.getattr(id)
Expand Down
27 changes: 16 additions & 11 deletions src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use clap::Parser;
use tracing::info;
use xet_data::processing::configurations::TranslatorConfig;
use xet_data::processing::data_client::default_config;
use xet_data::processing::{CacheConfig, FileDownloadSession, create_remote_client, get_cache};
use xet_data::processing::{CacheConfig, FileDownloadSession, get_cache};

use crate::cached_xet_client::CachedXetClient;
use crate::hub_api::{HubApiClient, HubTokenRefresher, SourceKind, parse_repo_id, split_path_prefix};
Expand Down Expand Up @@ -296,17 +296,23 @@ pub fn build(source: Source, options: MountOptions, is_nfs: bool) -> MountSetup
Some(get_cache(&config).expect("Failed to create xorb cache"))
};

let raw_client = runtime
.block_on(create_remote_client(
&cas_config,
&uuid::Uuid::new_v4().to_string(),
false,
))
.expect("Failed to create CAS client");
let raw_client: Arc<dyn xet_client::cas_client::Client> = xet_client::cas_client::RemoteClient::new(
&cas_config.session.endpoint,
&cas_config.session.auth,
&uuid::Uuid::new_v4().to_string(),
false,
cas_config.session.custom_headers.clone(),
);
let cached_client = CachedXetClient::new(raw_client);
let download_session = FileDownloadSession::from_client(cached_client.clone(), None, xorb_cache);
let download_session = match xorb_cache {
Some(cache) => FileDownloadSession::with_chunk_cache(cached_client.clone(), cache),
None => FileDownloadSession::from_client(cached_client.clone()),
};
// Separate session without chunk cache for bounded range downloads (seeks, sparse fills).
// The chunk cache fetches full xorbs (~64MB) even for small ranges.
let nocache_session = FileDownloadSession::from_client(cached_client.clone());
let upload_config = if read_only { None } else { Some(cas_config) };
let xet_sessions = XetSessions::new(download_session, upload_config, cached_client);
let xet_sessions = XetSessions::new(download_session, nocache_session, upload_config, cached_client);

let advanced_writes = options.advanced_writes || (is_nfs && !read_only);
// Repos need a staging dir for HTTP download cache (open_readonly),
Expand Down Expand Up @@ -416,7 +422,6 @@ fn build_cas_config(runtime: &tokio::runtime::Runtime, refresher: &Arc<HubTokenR
Arc::new(
default_config(
jwt.cas_url,
None,
Some((jwt.access_token, jwt.exp)),
Some(refresher.clone()),
None,
Expand Down
Loading
Loading