Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions rust/lance-namespace-impls/src/dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11779,6 +11779,40 @@ mod tests {
);
}

#[tokio::test]
async fn test_manifest_reload_observes_new_version_from_other_namespace() {
let temp_dir = TempStdDir::default();
let temp_path = temp_dir.to_str().unwrap();

let namespace_a = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(false)
.build()
.await
.unwrap();
create_scalar_table(&namespace_a, "alpha").await;

let namespace_b = DirectoryNamespaceBuilder::new(temp_path)
.manifest_enabled(true)
.dir_listing_enabled(false)
.build()
.await
.unwrap();
create_scalar_table(&namespace_b, "beta").await;

let response = namespace_a
.list_tables(ListTablesRequest {
id: Some(vec![]),
..Default::default()
})
.await
.unwrap();

let mut tables = response.tables;
tables.sort();
assert_eq!(tables, vec!["alpha", "beta"]);
}

#[tokio::test]
async fn test_migration_not_found_errors_include_table_id() {
let temp_dir = TempStdDir::default();
Expand Down
32 changes: 23 additions & 9 deletions rust/lance-namespace-impls/src/dir/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,13 +670,23 @@ pub struct NamespaceInfo {
/// A wrapper around a Dataset that provides concurrent access.
///
/// This can be cloned cheaply. It supports concurrent reads or exclusive writes.
/// The manifest dataset is always kept strongly consistent by reloading on each read.
/// The manifest dataset uses contiguous attached versions and this module never
/// runs old-version cleanup on it, allowing reads to check only the immediate
/// successor manifest before deciding whether a reload is needed.
#[derive(Debug, Clone)]
pub struct DatasetConsistencyWrapper(Arc<RwLock<Dataset>>);

impl DatasetConsistencyWrapper {
/// Create a new wrapper with the given dataset.
pub fn new(dataset: Dataset) -> Self {
debug_assert!(
!dataset
.manifest()
.config
.keys()
.any(|key| key.starts_with("lance.auto_cleanup.")),
"the directory manifest dataset must not enable old-version cleanup"
);
Self(Arc::new(RwLock::new(dataset)))
}

Expand Down Expand Up @@ -728,21 +738,25 @@ impl DatasetConsistencyWrapper {
dataset_uri,
current_version
);
let latest_version = read_guard.latest_version_id().await.map_err(|e| {
// The directory manifest table uses contiguous attached versions and
// does not run old-version cleanup, so the immediate successor probe is
// enough to detect changes without resolving or loading the latest
// manifest on every namespace read.
let has_successor_version = read_guard.has_successor_version().await.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to get latest version: {:?}", e),
message: format!("Failed to check dataset staleness: {:?}", e),
})
})?;
log::debug!(
"Reload got latest_version={} for uri={}, current_version={}",
latest_version,
"Reload checked successor_version_exists={} for uri={}, current_version={}",
has_successor_version,
dataset_uri,
current_version
);
drop(read_guard);

// If already up-to-date, return early
if latest_version == current_version {
if !has_successor_version {
log::debug!("Already up-to-date for uri={}", dataset_uri);
return Ok(());
}
Expand All @@ -751,13 +765,13 @@ impl DatasetConsistencyWrapper {
let mut write_guard = self.0.write().await;

// Double-check after acquiring write lock (someone else might have reloaded)
let latest_version = write_guard.latest_version_id().await.map_err(|e| {
let has_successor_version = write_guard.has_successor_version().await.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to get latest version: {:?}", e),
message: format!("Failed to check dataset staleness: {:?}", e),
})
})?;

if latest_version != write_guard.version().version {
if has_successor_version {
write_guard.checkout_latest().await.map_err(|e| {
lance_core::Error::from(NamespaceError::Internal {
message: format!("Failed to checkout latest: {:?}", e),
Expand Down
20 changes: 20 additions & 0 deletions rust/lance-table/src/io/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,26 @@ pub trait CommitHandler: Debug + Send + Sync {
default_resolve_version(base_path, version, object_store).await
}

/// Check whether an attached manifest version exists without loading it.
///
/// The default implementation probes the deterministic manifest path for
/// the given naming scheme. Commit handlers with an external source of
/// truth should override this method.
async fn version_exists(
&self,
base_path: &Path,
version: u64,
object_store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> Result<bool> {
let path = naming_scheme.manifest_path(base_path, version);
match object_store.head(&path).await {
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(e) => Err(e.into()),
}
}

/// List detached manifest locations.
///
/// Returns a stream of detached manifest locations in arbitrary order.
Expand Down
25 changes: 25 additions & 0 deletions rust/lance-table/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,31 @@ impl CommitHandler for ExternalManifestCommitHandler {
.await
}

async fn version_exists(
&self,
base_path: &Path,
version: u64,
object_store: &dyn OSObjectStore,
naming_scheme: ManifestNamingScheme,
) -> Result<bool> {
match self
.external_manifest_store
.get_manifest_location(base_path.as_ref(), version)
.await
{
Ok(_) => Ok(true),
Err(Error::NotFound { .. }) => {
let path = naming_scheme.manifest_path(base_path, version);
match object_store.head(&path).await {
Ok(_) => Ok(true),
Err(ObjectStoreError::NotFound { .. }) => Ok(false),
Err(e) => Err(e.into()),
}
}
Err(e) => Err(e),
}
}

async fn commit(
&self,
manifest: &mut Manifest,
Expand Down
33 changes: 33 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2235,6 +2235,39 @@ impl Dataset {
.version)
}

/// Return whether the dataset has a newer committed version.
pub async fn is_stale(&self) -> Result<bool> {
let latest_version = self.latest_version_id().await?;
Ok(latest_version != self.manifest.version)
}

/// Return whether the immediate attached successor manifest exists.
///
/// This is a fast contiguous-history probe. It does not resolve the latest
/// version and may return `false` if intermediate manifests have been
/// removed. Callers that need a general freshness check should use
/// [`Self::is_stale`].
#[doc(hidden)]
pub async fn has_successor_version(&self) -> Result<bool> {
let Some(next_version) = self.manifest.version.checked_add(1) else {
return Ok(false);
};
if lance_table::format::is_detached_version(next_version) {
return Ok(false);
}

let exists = self
.commit_handler
.version_exists(
&self.base,
next_version,
self.object_store.inner.as_ref(),
self.manifest_location.naming_scheme,
)
.await?;
Ok(exists)
}

pub fn count_fragments(&self) -> usize {
self.manifest.fragments.len()
}
Expand Down
71 changes: 71 additions & 0 deletions rust/lance/src/dataset/tests/dataset_versioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,77 @@ async fn test_version_id_fast_path() {
assert_eq!(historical.latest_version_id().await.unwrap(), 2);
}

#[rstest]
#[tokio::test]
async fn test_stale_checks_cover_fast_successor_and_latest_version(
#[values(false, true)] enable_v2_manifest_paths: bool,
) {
let expected_scheme = if enable_v2_manifest_paths {
ManifestNamingScheme::V2
} else {
ManifestNamingScheme::V1
};
let test_uri = TempStrDir::default();
let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
"i",
DataType::UInt32,
false,
)]));

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(0..5))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema.clone());

let original = Dataset::write(
reader,
&test_uri,
Some(WriteParams {
enable_v2_manifest_paths,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(original.manifest_location().naming_scheme, expected_scheme);
assert!(!original.is_stale().await.unwrap());
assert!(!original.has_successor_version().await.unwrap());

let data = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(UInt32Array::from_iter_values(5..10))],
)
.unwrap();
let reader = RecordBatchIterator::new(vec![data].into_iter().map(Ok), schema);
let updated = Dataset::write(
reader,
&test_uri,
Some(WriteParams {
mode: WriteMode::Append,
enable_v2_manifest_paths,
..Default::default()
}),
)
.await
.unwrap();

assert!(original.is_stale().await.unwrap());
assert!(original.has_successor_version().await.unwrap());
assert_eq!(updated.manifest_location().naming_scheme, expected_scheme);
assert!(!updated.is_stale().await.unwrap());
assert!(!updated.has_successor_version().await.unwrap());

let historical = updated.checkout_version(1).await.unwrap();
assert_eq!(
historical.manifest_location().naming_scheme,
expected_scheme
);
assert!(historical.is_stale().await.unwrap());
assert!(historical.has_successor_version().await.unwrap());
}

#[rstest]
#[tokio::test]
async fn test_restore(
Expand Down
26 changes: 26 additions & 0 deletions rust/lance/src/io/commit/external_manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,32 @@ mod test {
assert_eq!(ds.version().version, 6);
assert_eq!(ds.count_rows(None).await.unwrap(), 60);

{
inner_store.lock().await.remove(&(ds.base.to_string(), 6));
}
assert!(
handler
.version_exists(
&ds.base,
6,
ds.object_store.inner.as_ref(),
ds.manifest_location().naming_scheme,
)
.await
.unwrap()
);
assert!(
!handler
.version_exists(
&ds.base,
7,
ds.object_store.inner.as_ref(),
ds.manifest_location().naming_scheme,
)
.await
.unwrap()
);

// Open without external store handler again, should see the newly sync'd commit
let ds = DatasetBuilder::from_uri(ds_uri).load().await.unwrap();
assert_eq!(ds.version().version, 6);
Expand Down
Loading