diff --git a/python/python/lance/lance/optimize.pyi b/python/python/lance/lance/optimize.pyi index 9a26d23c003..c4b6b6546e6 100644 --- a/python/python/lance/lance/optimize.pyi +++ b/python/python/lance/lance/optimize.pyi @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Optional from lance import LanceDataset from lance.fragment import FragmentMetadata @@ -51,5 +51,7 @@ class Compaction: def plan(dataset: "LanceDataset", options: CompactionOptions) -> CompactionPlan: ... @staticmethod def commit( - dataset: "LanceDataset", rewrites: List[RewriteResult] + dataset: "LanceDataset", + rewrites: List[RewriteResult], + options: Optional[CompactionOptions] = None, ) -> CompactionMetrics: ... diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index ccd889db116..049ce2cc3a5 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -324,6 +324,47 @@ def test_defer_index_remap(tmp_path: Path): assert any(idx.name == "__lance_frag_reuse" for idx in indices) +@pytest.mark.parametrize("use_commit_options", [True, False]) +def test_defer_index_remap_via_commit_options(tmp_path: Path, use_commit_options: bool): + """Compaction.commit respects defer_index_remap passed in options. + + When options={"defer_index_remap": True} is supplied to Compaction.commit + the __lance_frag_reuse system index must appear in describe_indices(). + When the option is omitted (default) no such system index is written. + """ + base_dir = tmp_path / f"dataset_commit_opts_{use_commit_options}" + data = pa.table({"i": range(6_000), "val": range(6_000)}) + dataset = lance.write_dataset(data, base_dir, max_rows_per_file=1_000) + dataset.create_scalar_index("i", "BTREE") + dataset.delete("i < 500") + + plan = Compaction.plan( + dataset, + options=dict(target_rows_per_fragment=2_000, num_threads=1), + ) + rewrites = [task.execute(dataset) for task in plan.tasks] + + if use_commit_options: + Compaction.commit(dataset, rewrites, options={"defer_index_remap": True}) + else: + Compaction.commit(dataset, rewrites) + + dataset = lance.dataset(base_dir) + indices = dataset.describe_indices() + has_frag_reuse = any(idx.name == "__lance_frag_reuse" for idx in indices) + + if use_commit_options: + assert has_frag_reuse, ( + "expected __lance_frag_reuse system index when defer_index_remap=True " + "is passed to Compaction.commit" + ) + else: + assert not has_frag_reuse, ( + "did not expect __lance_frag_reuse system index when options is omitted " + "from Compaction.commit" + ) + + @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_describe_indices_matches_list_indices_for_frag_reuse(tmp_path: Path): """describe_indices() and list_indices() must agree on the index_type diff --git a/python/src/dataset/optimize.rs b/python/src/dataset/optimize.rs index 321d7157b86..f25fbc05bfd 100644 --- a/python/src/dataset/optimize.rs +++ b/python/src/dataset/optimize.rs @@ -551,26 +551,34 @@ impl PyCompaction { /// new version once committed. /// rewrites : List[RewriteResult] /// The results of the compaction tasks to include in the commit. + /// options : dict, optional + /// Compaction options to apply at commit time. + /// When absent or ``None``, defaults to ``CompactionOptions::default()``. /// /// Returns /// ------- /// CompactionMetrics #[staticmethod] + #[pyo3(signature = (dataset, rewrites, options = None))] pub fn commit( dataset: Bound, rewrites: Vec, + options: Option>, ) -> PyResult { let dataset_ref = unwrap_dataset(dataset)?; let dataset = dataset_ref.borrow().clone(); + let config = dataset.ds.manifest.config.clone(); + let opts = match options { + Some(ref dict) => parse_compaction_options(dict, &config)?, + None => CompactionOptions::default(), + }; let rewrites: Vec = rewrites.into_iter().map(|r| r.0).collect(); let mut new_ds = dataset.ds.as_ref().clone(); - // TODO: pass compaction option from plan and execute time - let options: CompactionOptions = CompactionOptions::default(); let fut = commit_compaction( &mut new_ds, rewrites, Arc::new(DatasetIndexRemapperOptions::default()), - &options, + &opts, ); let metrics = rt() .block_on(None, fut)?