Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ jobs:
RUST_LOG: DEBUG
RUST_BACKTRACE: full

- name: DataFusion Lumina Build Query E2E Test
run: >
cargo test -p paimon-datafusion
--features vortex
vector_search_tests::test_lumina_build_then_vector_search_query
-- --ignored --exact
env:
RUST_LOG: DEBUG
RUST_BACKTRACE: full

- name: DataFusion Integration Test
run: cargo test -p paimon-datafusion --all-targets
env:
Expand Down
41 changes: 41 additions & 0 deletions crates/integrations/datafusion/src/procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! - `CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')`
//! - `CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)`
//! - `CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)`
//! - `CALL sys.create_lumina_index(table => '...', index_column => '...')`

use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -147,6 +148,7 @@ pub async fn execute_call(
"create_tag_from_timestamp" => {
proc_create_tag_from_timestamp(ctx, catalog, catalog_name, &args).await
}
"create_lumina_index" => proc_create_lumina_index(ctx, catalog, catalog_name, &args).await,
_ => Err(DataFusionError::Plan(format!(
"Unknown procedure: {proc_name}"
))),
Expand Down Expand Up @@ -505,6 +507,45 @@ async fn proc_create_tag_from_timestamp(
ok_result(ctx)
}

async fn proc_create_lumina_index(
ctx: &SessionContext,
catalog: &Arc<dyn Catalog>,
catalog_name: &str,
args: &HashMap<String, String>,
) -> DFResult<DataFrame> {
let table = get_table(catalog, catalog_name, args).await?;
let index_column = require_arg(args, "index_column")?;
let mut builder = table.new_lumina_index_build_builder();
builder.with_index_column(index_column);
if let Some(index_type) = args.get("index_type") {
builder.with_index_type(index_type);
}
if let Some(options) = args.get("options") {
builder.with_options(parse_key_value_options(options)?);
}
builder.execute().await.map_err(to_datafusion_error)?;
ok_result(ctx)
}

fn parse_key_value_options(options: &str) -> DFResult<HashMap<String, String>> {
let mut parsed = HashMap::new();
for entry in options.split(',').map(str::trim).filter(|s| !s.is_empty()) {
let (key, value) = entry.split_once('=').ok_or_else(|| {
DataFusionError::Plan(format!(
"Invalid options entry '{entry}'. Expected comma-separated key=value pairs"
))
})?;
let key = key.trim();
if key.is_empty() {
return Err(DataFusionError::Plan(
"Invalid options entry with empty key".to_string(),
));
}
parsed.insert(key.to_string(), value.trim().to_string());
}
Ok(parsed)
}

fn ok_result(ctx: &SessionContext) -> DFResult<DataFrame> {
let schema = Arc::new(Schema::new(vec![Field::new(
"result",
Expand Down
38 changes: 37 additions & 1 deletion crates/integrations/datafusion/tests/procedures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

mod common;

use common::{exec, row_count, setup_sql_context};
use common::{assert_sql_error, exec, row_count, setup_sql_context};

async fn setup_table_with_snapshots() -> (tempfile::TempDir, paimon_datafusion::SQLContext) {
let (tmp, sql_context) = setup_sql_context().await;
Expand Down Expand Up @@ -85,6 +85,42 @@ async fn test_create_tag_with_snapshot_id() {
assert_eq!(count, 1);
}

#[tokio::test]
async fn test_create_lumina_index_requires_index_column() {
let (_tmp, sql_context) = setup_table_with_snapshots().await;

assert_sql_error(
&sql_context,
"CALL sys.create_lumina_index(table => 'test_db.t1')",
"Missing required argument: 'index_column'",
)
.await;
}

#[tokio::test]
async fn test_create_lumina_index_rejects_invalid_index_type() {
let (_tmp, sql_context) = setup_table_with_snapshots().await;

assert_sql_error(
&sql_context,
"CALL sys.create_lumina_index(table => 'test_db.t1', index_column => 'name', index_type => 'btree')",
"Unsupported Lumina index type: btree",
)
.await;
}

#[tokio::test]
async fn test_create_lumina_index_rejects_invalid_options() {
let (_tmp, sql_context) = setup_table_with_snapshots().await;

assert_sql_error(
&sql_context,
"CALL sys.create_lumina_index(table => 'test_db.t1', index_column => 'name', options => 'lumina.index.dimension')",
"Expected comma-separated key=value pairs",
)
.await;
}

#[tokio::test]
async fn test_create_tag_already_exists() {
let (_tmp, sql_context) = setup_table_with_snapshots().await;
Expand Down
214 changes: 213 additions & 1 deletion crates/integrations/datafusion/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,13 @@ mod fulltext_tests {
mod vector_search_tests {
use std::sync::Arc;

use datafusion::arrow::array::Int32Array;
use datafusion::arrow::array::{ArrayRef, Float32Builder, Int32Array, ListBuilder};
use datafusion::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use datafusion::arrow::record_batch::RecordBatch;
use paimon::catalog::Identifier;
use paimon::spec::{ArrayType, DataType, FloatType, IntType, Schema};
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
use paimon_datafusion::{register_vector_search, SQLContext};

Expand Down Expand Up @@ -1053,6 +1059,69 @@ mod vector_search_tests {
(ctx, tmp)
}

async fn create_empty_vector_search_context(
) -> (SQLContext, Arc<FileSystemCatalog>, tempfile::TempDir) {
let tmp = tempfile::tempdir().expect("Failed to create temp dir");
let warehouse = format!("file://{}", tmp.path().display());
let mut options = Options::new();
options.set(CatalogOptions::WAREHOUSE, warehouse);
let catalog = Arc::new(FileSystemCatalog::new(options).expect("Failed to create catalog"));

let mut ctx = SQLContext::new();
ctx.register_catalog("paimon", catalog.clone())
.await
.expect("Failed to register catalog");
(ctx, catalog, tmp)
}

fn build_lumina_table_schema() -> Schema {
let mut options = std::collections::HashMap::new();
options.insert("row-tracking.enabled".to_string(), "true".to_string());
options.insert("data-evolution.enabled".to_string(), "true".to_string());
options.insert("global-index.enabled".to_string(), "true".to_string());
options.insert(
"global-index.row-count-per-shard".to_string(),
"3".to_string(),
);
options.insert("lumina.index.dimension".to_string(), "2".to_string());
options.insert("lumina.encoding.type".to_string(), "rawf32".to_string());

Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column(
"embedding",
DataType::Array(ArrayType::new(DataType::Float(FloatType::new()))),
)
.options(options)
.build()
.expect("Failed to build table schema")
}

fn build_vector_batch(ids: Vec<i32>, vectors: Vec<Vec<f32>>) -> RecordBatch {
let element_field = Arc::new(ArrowField::new("element", ArrowDataType::Float32, true));
let mut vector_builder =
ListBuilder::new(Float32Builder::new()).with_field(element_field.clone());
for vector in vectors {
for value in vector {
vector_builder.values().append_value(value);
}
vector_builder.append(true);
}

let schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("id", ArrowDataType::Int32, false),
ArrowField::new("embedding", ArrowDataType::List(element_field), true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int32Array::from(ids)) as ArrayRef,
Arc::new(vector_builder.finish()) as ArrayRef,
],
)
.expect("Failed to build vector batch")
}

fn extract_ids(batches: &[datafusion::arrow::record_batch::RecordBatch]) -> Vec<i32> {
let mut ids = Vec::new();
for batch in batches {
Expand All @@ -1068,6 +1137,61 @@ mod vector_search_tests {
ids
}

fn extract_index_rows(
batches: &[datafusion::arrow::record_batch::RecordBatch],
) -> Vec<(String, i64, i64, i64, String)> {
let mut rows = Vec::new();
for batch in batches {
let index_type_array = batch
.column_by_name("index_type")
.and_then(|c| {
c.as_any()
.downcast_ref::<datafusion::arrow::array::StringArray>()
})
.expect("Expected StringArray for index_type");
let row_count_array = batch
.column_by_name("row_count")
.and_then(|c| {
c.as_any()
.downcast_ref::<datafusion::arrow::array::Int64Array>()
})
.expect("Expected Int64Array for row_count");
let row_range_start_array = batch
.column_by_name("row_range_start")
.and_then(|c| {
c.as_any()
.downcast_ref::<datafusion::arrow::array::Int64Array>()
})
.expect("Expected Int64Array for row_range_start");
let row_range_end_array = batch
.column_by_name("row_range_end")
.and_then(|c| {
c.as_any()
.downcast_ref::<datafusion::arrow::array::Int64Array>()
})
.expect("Expected Int64Array for row_range_end");
let index_field_name_array = batch
.column_by_name("index_field_name")
.and_then(|c| {
c.as_any()
.downcast_ref::<datafusion::arrow::array::StringArray>()
})
.expect("Expected StringArray for index_field_name");

for row_index in 0..batch.num_rows() {
rows.push((
index_type_array.value(row_index).to_string(),
row_count_array.value(row_index),
row_range_start_array.value(row_index),
row_range_end_array.value(row_index),
index_field_name_array.value(row_index).to_string(),
));
}
}
rows.sort_by_key(|row| row.2);
rows
}

#[tokio::test]
async fn test_vector_search_top3() {
let (ctx, _tmp) = create_vector_search_context().await;
Expand Down Expand Up @@ -1116,4 +1240,92 @@ mod vector_search_tests {
"vector_search without a matching Lumina index should not fall back to a full table scan"
);
}

// Manual run with a local Lumina native library:
// LUMINA_LIB_PATH=/path/to/liblumina_py.so cargo test -p paimon-datafusion \
// vector_search_tests::test_lumina_build_then_vector_search_query \
// -- --ignored --exact
#[tokio::test]
#[ignore = "requires LUMINA_LIB_PATH"]
async fn test_lumina_build_then_vector_search_query() {
let (ctx, catalog, _tmp) = create_empty_vector_search_context().await;
let identifier = Identifier::new("default", "lumina_build_query_e2e");
catalog
.create_table(&identifier, build_lumina_table_schema(), false)
.await
.expect("Failed to create table");
let table = catalog
.get_table(&identifier)
.await
.expect("Failed to load table");

let write_builder = table
.new_write_builder()
.with_commit_user("test-user")
.expect("Failed to configure write builder");
let mut table_write = write_builder
.new_write()
.expect("Failed to create table write");
table_write
.write_arrow_batch(&build_vector_batch(
vec![0, 1, 2, 3, 4, 5],
vec![
vec![1.0, 0.0],
vec![0.9, 0.1],
vec![0.0, 1.0],
vec![-1.0, 0.0],
vec![0.0, -1.0],
vec![0.7, 0.3],
],
))
.await
.expect("Failed to write vector batch");
let messages = table_write
.prepare_commit()
.await
.expect("Failed to prepare commit");
write_builder
.new_commit()
.commit(messages)
.await
.expect("Failed to commit vector data");

ctx.sql("CALL sys.create_lumina_index(table => 'default.lumina_build_query_e2e', index_column => 'embedding')")
.await
.expect("Lumina index build SQL should parse")
.collect()
.await
.expect("Lumina index build SQL should execute");

let index_batches = ctx
.sql("SELECT index_type, row_count, row_range_start, row_range_end, index_field_name FROM paimon.default.`lumina_build_query_e2e$table_indexes` WHERE index_type = 'lumina'")
.await
.expect("index metadata SQL should parse")
.collect()
.await
.expect("index metadata query should execute");
let index_rows = extract_index_rows(&index_batches);
assert_eq!(
index_rows,
vec![
("lumina".to_string(), 3, 0, 2, "embedding".to_string()),
("lumina".to_string(), 3, 3, 5, "embedding".to_string()),
]
);

let search_batches = ctx
.sql("SELECT id FROM vector_search('paimon.default.lumina_build_query_e2e', 'embedding', '[1.0, 0.0]', 2)")
.await
.expect("vector_search SQL should parse")
.collect()
.await
.expect("vector_search query should execute");
let ids = extract_ids(&search_batches);
assert_eq!(ids.len(), 2);
assert!(ids.contains(&0), "exact vector match should be returned");
assert!(
ids.iter().any(|id| matches!(id, 1 | 5)),
"one same-direction neighbor should be returned, got {ids:?}"
);
}
}
18 changes: 18 additions & 0 deletions docs/src/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,24 @@ Rollback a table to a specific timestamp:
CALL sys.rollback_to_timestamp(table => 'paimon.my_db.my_table', timestamp => 1234567890000);
```

### create_lumina_index

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: the procedure also accepts the optional index_type argument, while the docs only mention options. It may be worth documenting the default and valid values here so the SQL surface matches the implemented arguments.


Build and commit a Lumina global vector index for a table column:

```sql
CALL sys.create_lumina_index(table => 'paimon.my_db.my_table', index_column => 'embedding');
```

Optional Lumina builder settings can be supplied as comma-separated `key=value` pairs:

```sql
CALL sys.create_lumina_index(
table => 'paimon.my_db.my_table',
index_column => 'embedding',
options => 'lumina.index.dimension=128,lumina.encoding.type=pq'
);
```

## Queries

### Basic Queries
Expand Down
Loading