diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 831e194f9f3..3f6b553bf5f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4622,6 +4622,10 @@ def migrate_manifest_paths_v2(self): """ self._ds.migrate_manifest_paths_v2() + def cleanup_frag_reuse_index(self) -> None: + """Prune obsolete generations from the ``__lance_frag_reuse`` system index.""" + self._ds.cleanup_frag_reuse_index() + def delete_config_keys(self, keys: list[str]) -> None: """Delete specified configuration keys from the dataset. diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 38d82738063..9af7167940e 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -446,6 +446,7 @@ class _Dataset: ) -> Tuple[_Dataset, Transaction]: ... def validate(self): ... def migrate_manifest_paths_v2(self): ... + def cleanup_frag_reuse_index(self) -> None: ... def drop_columns(self, columns: List[str]): ... def add_columns_from_reader( self, reader: pa.RecordBatchReader, batch_size: Optional[int] = None diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index ccd889db116..ec3fbe53431 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -324,6 +324,49 @@ def test_defer_index_remap(tmp_path: Path): assert any(idx.name == "__lance_frag_reuse" for idx in indices) +def test_cleanup_frag_reuse_index(tmp_path: Path): + """cleanup_frag_reuse_index prunes all reuse generations that every user + index has caught up to. + + Setup: 6 fragments, one BTREE scalar index. Compact with + defer_index_remap=True so the frag-reuse index is populated. Rebuild the + scalar index (create_scalar_index with replace=True) so it is newer than the + reuse generation, meaning the generation can be pruned. Then call + cleanup_frag_reuse_index and verify that num_versions drops to 0. + """ + base_dir = tmp_path / "dataset" + data = pa.table({"i": range(6_000), "val": range(6_000)}) + dataset = lance.write_dataset(data, base_dir, max_rows_per_file=1_000) + dataset.create_scalar_index("i", "BTREE") + + dataset.delete("i < 500") + dataset.optimize.compact_files( + target_rows_per_fragment=2_000, defer_index_remap=True, num_threads=1 + ) + + dataset = lance.dataset(base_dir) + assert any( + idx.name == "__lance_frag_reuse" for idx in dataset.describe_indices() + ), "precondition: defer_index_remap must have created the frag-reuse index" + + before_stats = dataset.stats.index_stats("__lance_frag_reuse") + assert before_stats["num_versions"] >= 1, ( + "precondition: frag-reuse index must have at least one version before cleanup" + ) + + dataset.create_scalar_index("i", "BTREE", replace=True) + dataset = lance.dataset(base_dir) + + dataset.cleanup_frag_reuse_index() + + dataset = lance.dataset(base_dir) + after_stats = dataset.stats.index_stats("__lance_frag_reuse") + assert after_stats["num_versions"] == 0, ( + f"cleanup_frag_reuse_index should have pruned all reuse generations " + f"but num_versions={after_stats['num_versions']}" + ) + + @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_describe_indices_matches_list_indices_for_frag_reuse(tmp_path: Path): """describe_indices() and list_indices() must agree on the index_type diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8bfa81aeae4..e8e864d300a 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2788,6 +2788,17 @@ impl Dataset { Ok(()) } + fn cleanup_frag_reuse_index(&mut self) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + rt().block_on( + None, + lance::dataset::index::frag_reuse::cleanup_frag_reuse_index(&mut new_self), + )? + .map_err(|err: lance::Error| PyIOError::new_err(err.to_string()))?; + self.ds = Arc::new(new_self); + Ok(()) + } + fn drop_columns(&mut self, columns: Vec) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); let columns: Vec<_> = columns.iter().map(|s| s.as_str()).collect();