Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/python/lance/lance/optimize.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List
from typing import List, Optional

from lance import LanceDataset
from lance.fragment import FragmentMetadata
Expand Down Expand Up @@ -51,5 +51,7 @@ class Compaction:
def plan(dataset: "LanceDataset", options: CompactionOptions) -> CompactionPlan: ...
@staticmethod
def commit(
dataset: "LanceDataset", rewrites: List[RewriteResult]
dataset: "LanceDataset",
rewrites: List[RewriteResult],
options: Optional[CompactionOptions] = None,
) -> CompactionMetrics: ...
41 changes: 41 additions & 0 deletions python/python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,47 @@ def test_defer_index_remap(tmp_path: Path):
assert any(idx.name == "__lance_frag_reuse" for idx in indices)


@pytest.mark.parametrize("use_commit_options", [True, False])
def test_defer_index_remap_via_commit_options(tmp_path: Path, use_commit_options: bool):
"""Compaction.commit respects defer_index_remap passed in options.

When options={"defer_index_remap": True} is supplied to Compaction.commit
the __lance_frag_reuse system index must appear in describe_indices().
When the option is omitted (default) no such system index is written.
"""
base_dir = tmp_path / f"dataset_commit_opts_{use_commit_options}"
data = pa.table({"i": range(6_000), "val": range(6_000)})
dataset = lance.write_dataset(data, base_dir, max_rows_per_file=1_000)
dataset.create_scalar_index("i", "BTREE")
dataset.delete("i < 500")

plan = Compaction.plan(
dataset,
options=dict(target_rows_per_fragment=2_000, num_threads=1),
)
rewrites = [task.execute(dataset) for task in plan.tasks]

if use_commit_options:
Compaction.commit(dataset, rewrites, options={"defer_index_remap": True})
else:
Compaction.commit(dataset, rewrites)

dataset = lance.dataset(base_dir)
indices = dataset.describe_indices()
has_frag_reuse = any(idx.name == "__lance_frag_reuse" for idx in indices)

if use_commit_options:
assert has_frag_reuse, (
"expected __lance_frag_reuse system index when defer_index_remap=True "
"is passed to Compaction.commit"
)
else:
assert not has_frag_reuse, (
"did not expect __lance_frag_reuse system index when options is omitted "
"from Compaction.commit"
)


@pytest.mark.filterwarnings("ignore::DeprecationWarning")
def test_describe_indices_matches_list_indices_for_frag_reuse(tmp_path: Path):
"""describe_indices() and list_indices() must agree on the index_type
Expand Down
14 changes: 11 additions & 3 deletions python/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,26 +551,34 @@ impl PyCompaction {
/// new version once committed.
/// rewrites : List[RewriteResult]
/// The results of the compaction tasks to include in the commit.
/// options : dict, optional
/// Compaction options to apply at commit time.
/// When absent or ``None``, defaults to ``CompactionOptions::default()``.
///
/// Returns
/// -------
/// CompactionMetrics
#[staticmethod]
#[pyo3(signature = (dataset, rewrites, options = None))]
pub fn commit(
dataset: Bound<PyAny>,
rewrites: Vec<PyRewriteResult>,
options: Option<Bound<PyDict>>,
) -> PyResult<PyCompactionMetrics> {
let dataset_ref = unwrap_dataset(dataset)?;
let dataset = dataset_ref.borrow().clone();
let config = dataset.ds.manifest.config.clone();
let opts = match options {
Some(ref dict) => parse_compaction_options(dict, &config)?,
None => CompactionOptions::default(),
};
let rewrites: Vec<RewriteResult> = rewrites.into_iter().map(|r| r.0).collect();
let mut new_ds = dataset.ds.as_ref().clone();
// TODO: pass compaction option from plan and execute time
let options: CompactionOptions = CompactionOptions::default();
let fut = commit_compaction(
&mut new_ds,
rewrites,
Arc::new(DatasetIndexRemapperOptions::default()),
&options,
&opts,
);
let metrics = rt()
.block_on(None, fut)?
Expand Down
Loading