Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/src/main/java/org/lance/index/IndexType.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum IndexType {
MEM_WAL(7),
ZONEMAP(8),
BLOOM_FILTER(9),
FM(11),
VECTOR(100),
IVF_FLAT(101),
IVF_SQ(102),
Expand Down
73 changes: 73 additions & 0 deletions java/src/main/java/org/lance/index/scalar/FMIndexParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lance.index.scalar;

import org.lance.util.JsonUtils;

import java.util.HashMap;
import java.util.Map;

/**
* Builder-style configuration for FM-Index scalar index parameters.
*
* <p>An FM-Index supports exact substring search (the {@code contains} function) over a string or
* binary column and returns exact row ids.
*/
public final class FMIndexParams {

private static final String INDEX_TYPE = "fm";

private FMIndexParams() {}

/**
* Create a new builder for FM-Index parameters.
*
* @return a new {@link Builder}
*/
public static Builder builder() {
return new Builder();
}

public static final class Builder {
private Integer numSegments;

/**
* Configure the number of independent index segments to distribute the dataset fragments
* across. Each segment is a complete FM-Index covering a disjoint set of fragments, enabling
* incremental indexing and segment merge. Defaults to a single segment when not set.
*
* @param numSegments number of segments, must be positive
* @return this builder
* @throws IllegalArgumentException if {@code numSegments} is not positive
*/
public Builder numSegments(int numSegments) {
if (numSegments <= 0) {
throw new IllegalArgumentException("numSegments must be positive");
}
this.numSegments = numSegments;
return this;
}

/** Build a {@link ScalarIndexParams} instance for an FM-Index. */
public ScalarIndexParams build() {
Map<String, Object> params = new HashMap<>();
if (numSegments != null) {
params.put("num_segments", numSegments);
}

String json = JsonUtils.toJson(params);
return ScalarIndexParams.create(INDEX_TYPE, json);
}
}
}
51 changes: 51 additions & 0 deletions java/src/test/java/org/lance/index/ScalarIndexTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.lance.Fragment;
import org.lance.TestUtils;
import org.lance.WriteParams;
import org.lance.index.scalar.FMIndexParams;
import org.lance.index.scalar.ScalarIndexParams;
import org.lance.ipc.LanceScanner;
import org.lance.ipc.ScanOptions;
Expand Down Expand Up @@ -318,4 +319,54 @@ public void testCreateZonemapIndex(@TempDir Path tempDir) throws Exception {
}
}
}

@Test
public void testCreateFMIndexDistributively(@TempDir Path tempDir) throws Exception {
String datasetPath = tempDir.resolve("fm_index_distributed").toString();
try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
TestUtils.SimpleTestDataset testDataset =
new TestUtils.SimpleTestDataset(allocator, datasetPath);
testDataset.createEmptyDataset().close();
// Write two fragments, each with names "Person 0" .. "Person 9".
testDataset.write(1, 10).close();
try (Dataset dataset = testDataset.write(2, 10)) {
List<Fragment> fragments = dataset.getFragments();
assertEquals(2, fragments.size());

IndexParams indexParams =
IndexParams.builder().setScalarIndexParams(FMIndexParams.builder().build()).build();
String indexName = "fm_name_index";

// Build one uncommitted FM segment per fragment.
List<Index> segments = new ArrayList<>();
for (Fragment fragment : fragments) {
segments.add(
dataset.createIndex(
IndexOptions.builder(Collections.singletonList("name"), IndexType.FM, indexParams)
.withIndexName(indexName)
.withFragmentIds(Collections.singletonList(fragment.getId()))
.build()));
}

assertFalse(
dataset.listIndexes().contains(indexName),
"Partially created index should not present");

// FM segments support merge before commit.
Index merged = dataset.mergeExistingIndexSegments(segments);

List<Index> committed =
dataset.commitExistingIndexSegments(
indexName, "name", Collections.singletonList(merged));
assertEquals(1, committed.size());
assertTrue(dataset.listIndexes().contains(indexName));

// FM-Index answers exact substring search via `contains`.
assertEquals(
2, dataset.countIndexedRows(indexName, "contains(name, 'Person 5')", Optional.empty()));
assertEquals(
20, dataset.countIndexedRows(indexName, "contains(name, 'Person')", Optional.empty()));
}
}
}
}
29 changes: 24 additions & 5 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3005,13 +3005,14 @@ def _prepare_scalar_index_request(
"LABEL_LIST",
"INVERTED",
"FTS",
"FM",
"BLOOMFILTER",
"RTREE",
]:
raise NotImplementedError(
(
'Only "BTREE", "BITMAP", "NGRAM", "ZONEMAP", "LABEL_LIST", '
'"INVERTED", "BLOOMFILTER" or "RTREE" are supported for '
'"INVERTED", "FM", "BLOOMFILTER" or "RTREE" are supported for '
f"scalar columns. Received {index_type}",
)
)
Expand Down Expand Up @@ -3044,6 +3045,17 @@ def _prepare_scalar_index_request(
field_type
):
raise TypeError(f"NGRAM index column {column} must be a string")
elif index_type == "FM":
if (
not pa.types.is_string(field_type)
and not pa.types.is_large_string(field_type)
and not pa.types.is_binary(field_type)
and not pa.types.is_large_binary(field_type)
):
raise TypeError(
f"FM index column {column} must be string, large string, "
"binary, or large binary"
)
elif index_type in ["INVERTED", "FTS"]:
value_type = field_type
if pa.types.is_list(field_type) or pa.types.is_large_list(field_type):
Expand Down Expand Up @@ -3093,6 +3105,7 @@ def _is_segment_native_scalar_index_type(
"INVERTED",
"FTS",
"ZONEMAP",
"FM",
}

@classmethod
Expand All @@ -3104,6 +3117,7 @@ def _requires_uncommitted_scalar_index(
"BTREE",
"BITMAP",
"ZONEMAP",
"FM",
}

def create_scalar_index(
Expand All @@ -3117,6 +3131,7 @@ def create_scalar_index(
Literal["FTS"],
Literal["NGRAM"],
Literal["ZONEMAP"],
Literal["FM"],
Literal["BLOOMFILTER"],
Literal["RTREE"],
IndexConfig,
Expand Down Expand Up @@ -3190,6 +3205,10 @@ def create_scalar_index(
index can conduct full-text searches. For example, a column that contains any
word
of query string "hello world". The results will be ranked by BM25.
* ``FM``. An FM-Index over a string or binary column that supports exact
substring search (the ``contains`` function) and returns exact row ids.
For distributed builds use :meth:`create_index_uncommitted` with
``fragment_ids``.
* ``BLOOMFILTER``. This inexact index uses a bloom filter. It is small
but can only handle filters with equals and not equals and may require
more I/O than a btree or bitmap index```
Expand All @@ -3208,7 +3227,7 @@ def create_scalar_index(
index_type : str
The type of the index. One of ``"BTREE"``, ``"BITMAP"``,
``"LABEL_LIST"``, ``"NGRAM"``, ``"ZONEMAP"``, ``"INVERTED"``,
``"FTS"``, ``"BLOOMFILTER"``, ``"RTREE"``.
``"FTS"``, ``"FM"``, ``"BLOOMFILTER"``, ``"RTREE"``.
name : str, optional
The index name. If not provided, it will be generated from the
column name.
Expand Down Expand Up @@ -4038,8 +4057,8 @@ def create_index_uncommitted(
Create one segment without publishing it and return its metadata.

This is the public distributed-build API for vector, BTREE scalar,
canonical bitmap scalar, INVERTED scalar, and ZONEMAP scalar index
construction. Unlike
canonical bitmap scalar, INVERTED scalar, ZONEMAP scalar, and FM scalar
index construction. Unlike
:meth:`create_index`, this method does not publish the index into the
dataset manifest. Instead, it writes one segment under
``_indices/<segment_uuid>/`` and returns the resulting
Expand All @@ -4055,7 +4074,7 @@ def create_index_uncommitted(
4. commit the final segment list with
:meth:`commit_existing_index_segments`

BTREE, BITMAP, INVERTED, and ZONEMAP segments may
BTREE, BITMAP, INVERTED, ZONEMAP, and FM segments may
be merged with :meth:`merge_existing_index_segments` before commit.
Parameters are the same as :meth:`create_index`, with one additional
requirement:
Expand Down
54 changes: 54 additions & 0 deletions python/python/tests/test_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -4054,6 +4054,60 @@ def test_bitmap_uncommitted_segments_can_be_committed_from_python(tmp_path):
)


def test_fm_segment_merge_and_commit(tmp_path):
ds = generate_multi_fragment_dataset(
tmp_path, num_fragments=3, rows_per_fragment=100
)

index_name = "text_fm_segments"
fragment_ids = [fragment.fragment_id for fragment in ds.get_fragments()]
staged_segments = [
ds.create_index_uncommitted(
column="text",
index_type="FM",
name=index_name,
fragment_ids=[fragment_id],
)
for fragment_id in fragment_ids
]

assert len({segment.uuid for segment in staged_segments}) == len(staged_segments)
for segment, fragment_id in zip(staged_segments, fragment_ids):
assert segment.fragment_ids == {fragment_id}

merged_segment = ds.merge_existing_index_segments(staged_segments)
assert merged_segment.uuid not in {segment.uuid for segment in staged_segments}
assert merged_segment.fragment_ids == set(fragment_ids)

ds = ds.commit_existing_index_segments(index_name, "text", [merged_segment])
descriptions = {index.name: index for index in ds.describe_indices()}
assert descriptions[index_name].index_type == "Fm"
assert len(descriptions[index_name].segments) == 1

# FM-Index answers exact substring search via `contains`.
filter_expr = "contains(text, 'frodo')"
without_index = ds.scanner(
filter=filter_expr,
columns=["id", "text"],
use_scalar_index=False,
).to_table()
with_index = ds.scanner(
filter=filter_expr,
columns=["id", "text"],
use_scalar_index=True,
).to_table()

assert with_index.num_rows > 0
assert with_index.num_rows == without_index.num_rows
assert sorted(with_index["id"].to_pylist()) == sorted(
without_index["id"].to_pylist()
)
assert (
"ScalarIndexQuery"
in ds.scanner(filter=filter_expr, use_scalar_index=True).explain_plan()
)


def test_zonemap_fragment_ids_parameter_validation(tmp_path):
ds = generate_multi_fragment_dataset(
tmp_path, num_fragments=2, rows_per_fragment=100
Expand Down
Loading