diff --git a/java/src/main/java/org/lance/index/IndexType.java b/java/src/main/java/org/lance/index/IndexType.java index 3a03934effd..a2352ca6855 100644 --- a/java/src/main/java/org/lance/index/IndexType.java +++ b/java/src/main/java/org/lance/index/IndexType.java @@ -24,6 +24,7 @@ public enum IndexType { MEM_WAL(7), ZONEMAP(8), BLOOM_FILTER(9), + FM(11), VECTOR(100), IVF_FLAT(101), IVF_SQ(102), diff --git a/java/src/main/java/org/lance/index/scalar/FMIndexParams.java b/java/src/main/java/org/lance/index/scalar/FMIndexParams.java new file mode 100644 index 00000000000..b111de461aa --- /dev/null +++ b/java/src/main/java/org/lance/index/scalar/FMIndexParams.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lance.index.scalar; + +import org.lance.util.JsonUtils; + +import java.util.HashMap; +import java.util.Map; + +/** + * Builder-style configuration for FM-Index scalar index parameters. + * + *

An FM-Index supports exact substring search (the {@code contains} function) over a string or + * binary column and returns exact row ids. + */ +public final class FMIndexParams { + + private static final String INDEX_TYPE = "fm"; + + private FMIndexParams() {} + + /** + * Create a new builder for FM-Index parameters. + * + * @return a new {@link Builder} + */ + public static Builder builder() { + return new Builder(); + } + + public static final class Builder { + private Integer numSegments; + + /** + * Configure the number of independent index segments to distribute the dataset fragments + * across. Each segment is a complete FM-Index covering a disjoint set of fragments, enabling + * incremental indexing and segment merge. Defaults to a single segment when not set. + * + * @param numSegments number of segments, must be positive + * @return this builder + * @throws IllegalArgumentException if {@code numSegments} is not positive + */ + public Builder numSegments(int numSegments) { + if (numSegments <= 0) { + throw new IllegalArgumentException("numSegments must be positive"); + } + this.numSegments = numSegments; + return this; + } + + /** Build a {@link ScalarIndexParams} instance for an FM-Index. */ + public ScalarIndexParams build() { + Map params = new HashMap<>(); + if (numSegments != null) { + params.put("num_segments", numSegments); + } + + String json = JsonUtils.toJson(params); + return ScalarIndexParams.create(INDEX_TYPE, json); + } + } +} diff --git a/java/src/test/java/org/lance/index/ScalarIndexTest.java b/java/src/test/java/org/lance/index/ScalarIndexTest.java index b993a7e8a5f..abf1593e43f 100644 --- a/java/src/test/java/org/lance/index/ScalarIndexTest.java +++ b/java/src/test/java/org/lance/index/ScalarIndexTest.java @@ -17,6 +17,7 @@ import org.lance.Fragment; import org.lance.TestUtils; import org.lance.WriteParams; +import org.lance.index.scalar.FMIndexParams; import org.lance.index.scalar.ScalarIndexParams; import org.lance.ipc.LanceScanner; import org.lance.ipc.ScanOptions; @@ -318,4 +319,54 @@ public void testCreateZonemapIndex(@TempDir Path tempDir) throws Exception { } } } + + @Test + public void testCreateFMIndexDistributively(@TempDir Path tempDir) throws Exception { + String datasetPath = tempDir.resolve("fm_index_distributed").toString(); + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) { + TestUtils.SimpleTestDataset testDataset = + new TestUtils.SimpleTestDataset(allocator, datasetPath); + testDataset.createEmptyDataset().close(); + // Write two fragments, each with names "Person 0" .. "Person 9". + testDataset.write(1, 10).close(); + try (Dataset dataset = testDataset.write(2, 10)) { + List fragments = dataset.getFragments(); + assertEquals(2, fragments.size()); + + IndexParams indexParams = + IndexParams.builder().setScalarIndexParams(FMIndexParams.builder().build()).build(); + String indexName = "fm_name_index"; + + // Build one uncommitted FM segment per fragment. + List segments = new ArrayList<>(); + for (Fragment fragment : fragments) { + segments.add( + dataset.createIndex( + IndexOptions.builder(Collections.singletonList("name"), IndexType.FM, indexParams) + .withIndexName(indexName) + .withFragmentIds(Collections.singletonList(fragment.getId())) + .build())); + } + + assertFalse( + dataset.listIndexes().contains(indexName), + "Partially created index should not present"); + + // FM segments support merge before commit. + Index merged = dataset.mergeExistingIndexSegments(segments); + + List committed = + dataset.commitExistingIndexSegments( + indexName, "name", Collections.singletonList(merged)); + assertEquals(1, committed.size()); + assertTrue(dataset.listIndexes().contains(indexName)); + + // FM-Index answers exact substring search via `contains`. + assertEquals( + 2, dataset.countIndexedRows(indexName, "contains(name, 'Person 5')", Optional.empty())); + assertEquals( + 20, dataset.countIndexedRows(indexName, "contains(name, 'Person')", Optional.empty())); + } + } + } } diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index e96d9305ce5..b3f1e436c92 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -3005,13 +3005,14 @@ def _prepare_scalar_index_request( "LABEL_LIST", "INVERTED", "FTS", + "FM", "BLOOMFILTER", "RTREE", ]: raise NotImplementedError( ( 'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", ' - '"INVERTED", "BLOOMFILTER" or "RTREE" are supported for ' + '"INVERTED", "FM", "BLOOMFILTER" or "RTREE" are supported for ' f"scalar columns. Received {index_type}", ) ) @@ -3044,6 +3045,17 @@ def _prepare_scalar_index_request( field_type ): raise TypeError(f"NGRAM index column {column} must be a string") + elif index_type == "FM": + if ( + not pa.types.is_string(field_type) + and not pa.types.is_large_string(field_type) + and not pa.types.is_binary(field_type) + and not pa.types.is_large_binary(field_type) + ): + raise TypeError( + f"FM index column {column} must be string, large string, " + "binary, or large binary" + ) elif index_type in ["INVERTED", "FTS"]: value_type = field_type if pa.types.is_list(field_type) or pa.types.is_large_list(field_type): @@ -3093,6 +3105,7 @@ def _is_segment_native_scalar_index_type( "INVERTED", "FTS", "ZONEMAP", + "FM", } @classmethod @@ -3104,6 +3117,7 @@ def _requires_uncommitted_scalar_index( "BTREE", "BITMAP", "ZONEMAP", + "FM", } def create_scalar_index( @@ -3117,6 +3131,7 @@ def create_scalar_index( Literal["FTS"], Literal["NGRAM"], Literal["ZONEMAP"], + Literal["FM"], Literal["BLOOMFILTER"], Literal["RTREE"], IndexConfig, @@ -3190,6 +3205,10 @@ def create_scalar_index( index can conduct full-text searches. For example, a column that contains any word of query string "hello world". The results will be ranked by BM25. + * ``FM``. An FM-Index over a string or binary column that supports exact + substring search (the ``contains`` function) and returns exact row ids. + For distributed builds use :meth:`create_index_uncommitted` with + ``fragment_ids``. * ``BLOOMFILTER``. This inexact index uses a bloom filter. It is small but can only handle filters with equals and not equals and may require more I/O than a btree or bitmap index``` @@ -3208,7 +3227,7 @@ def create_scalar_index( index_type : str The type of the index. One of ``"BTREE"``, ``"BITMAP"``, ``"LABEL_LIST"``, ``"NGRAM"``, ``"ZONEMAP"``, ``"INVERTED"``, - ``"FTS"``, ``"BLOOMFILTER"``, ``"RTREE"``. + ``"FTS"``, ``"FM"``, ``"BLOOMFILTER"``, ``"RTREE"``. name : str, optional The index name. If not provided, it will be generated from the column name. @@ -4038,8 +4057,8 @@ def create_index_uncommitted( Create one segment without publishing it and return its metadata. This is the public distributed-build API for vector, BTREE scalar, - canonical bitmap scalar, INVERTED scalar, and ZONEMAP scalar index - construction. Unlike + canonical bitmap scalar, INVERTED scalar, ZONEMAP scalar, and FM scalar + index construction. Unlike :meth:`create_index`, this method does not publish the index into the dataset manifest. Instead, it writes one segment under ``_indices//`` and returns the resulting @@ -4055,7 +4074,7 @@ def create_index_uncommitted( 4. commit the final segment list with :meth:`commit_existing_index_segments` - BTREE, BITMAP, INVERTED, and ZONEMAP segments may + BTREE, BITMAP, INVERTED, ZONEMAP, and FM segments may be merged with :meth:`merge_existing_index_segments` before commit. Parameters are the same as :meth:`create_index`, with one additional requirement: diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 7ddfbbc0dc8..c967250db42 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -4054,6 +4054,60 @@ def test_bitmap_uncommitted_segments_can_be_committed_from_python(tmp_path): ) +def test_fm_segment_merge_and_commit(tmp_path): + ds = generate_multi_fragment_dataset( + tmp_path, num_fragments=3, rows_per_fragment=100 + ) + + index_name = "text_fm_segments" + fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()] + staged_segments = [ + ds.create_index_uncommitted( + column="text", + index_type="FM", + name=index_name, + fragment_ids=[fragment_id], + ) + for fragment_id in fragment_ids + ] + + assert len({segment.uuid for segment in staged_segments}) == len(staged_segments) + for segment, fragment_id in zip(staged_segments, fragment_ids): + assert segment.fragment_ids == {fragment_id} + + merged_segment = ds.merge_existing_index_segments(staged_segments) + assert merged_segment.uuid not in {segment.uuid for segment in staged_segments} + assert merged_segment.fragment_ids == set(fragment_ids) + + ds = ds.commit_existing_index_segments(index_name, "text", [merged_segment]) + descriptions = {index.name: index for index in ds.describe_indices()} + assert descriptions[index_name].index_type == "Fm" + assert len(descriptions[index_name].segments) == 1 + + # FM-Index answers exact substring search via `contains`. + filter_expr = "contains(text, 'frodo')" + without_index = ds.scanner( + filter=filter_expr, + columns=["id", "text"], + use_scalar_index=False, + ).to_table() + with_index = ds.scanner( + filter=filter_expr, + columns=["id", "text"], + use_scalar_index=True, + ).to_table() + + assert with_index.num_rows > 0 + assert with_index.num_rows == without_index.num_rows + assert sorted(with_index["id"].to_pylist()) == sorted( + without_index["id"].to_pylist() + ) + assert ( + "ScalarIndexQuery" + in ds.scanner(filter=filter_expr, use_scalar_index=True).explain_plan() + ) + + def test_zonemap_fragment_ids_parameter_validation(tmp_path): ds = generate_multi_fragment_dataset( tmp_path, num_fragments=2, rows_per_fragment=100