From d295f7fe96d5dee7c53700ee02a7738a39005e54 Mon Sep 17 00:00:00 2001 From: liujiwen-up Date: Tue, 16 Jun 2026 23:53:30 +0800 Subject: [PATCH] feat(datafusion): add Lumina index build procedure --- .github/workflows/ci.yml | 10 + .../integrations/datafusion/src/procedures.rs | 41 ++++ .../datafusion/tests/procedures.rs | 38 +++- .../datafusion/tests/read_tables.rs | 214 +++++++++++++++++- docs/src/sql.md | 18 ++ 5 files changed, 319 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0e556d4f..46c0a90c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -130,6 +130,16 @@ jobs: RUST_LOG: DEBUG RUST_BACKTRACE: full + - name: DataFusion Lumina Build Query E2E Test + run: > + cargo test -p paimon-datafusion + --features vortex + vector_search_tests::test_lumina_build_then_vector_search_query + -- --ignored --exact + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + - name: DataFusion Integration Test run: cargo test -p paimon-datafusion --all-targets env: diff --git a/crates/integrations/datafusion/src/procedures.rs b/crates/integrations/datafusion/src/procedures.rs index 183788ab..91904f58 100644 --- a/crates/integrations/datafusion/src/procedures.rs +++ b/crates/integrations/datafusion/src/procedures.rs @@ -23,6 +23,7 @@ //! - `CALL sys.rollback_to(table => '...', snapshot_id => ... | tag => '...')` //! - `CALL sys.rollback_to_timestamp(table => '...', timestamp => ...)` //! - `CALL sys.create_tag_from_timestamp(table => '...', tag => '...', timestamp => ...)` +//! - `CALL sys.create_lumina_index(table => '...', index_column => '...')` use std::collections::HashMap; use std::sync::Arc; @@ -147,6 +148,7 @@ pub async fn execute_call( "create_tag_from_timestamp" => { proc_create_tag_from_timestamp(ctx, catalog, catalog_name, &args).await } + "create_lumina_index" => proc_create_lumina_index(ctx, catalog, catalog_name, &args).await, _ => Err(DataFusionError::Plan(format!( "Unknown procedure: {proc_name}" ))), @@ -505,6 +507,45 @@ async fn proc_create_tag_from_timestamp( ok_result(ctx) } +async fn proc_create_lumina_index( + ctx: &SessionContext, + catalog: &Arc, + catalog_name: &str, + args: &HashMap, +) -> DFResult { + let table = get_table(catalog, catalog_name, args).await?; + let index_column = require_arg(args, "index_column")?; + let mut builder = table.new_lumina_index_build_builder(); + builder.with_index_column(index_column); + if let Some(index_type) = args.get("index_type") { + builder.with_index_type(index_type); + } + if let Some(options) = args.get("options") { + builder.with_options(parse_key_value_options(options)?); + } + builder.execute().await.map_err(to_datafusion_error)?; + ok_result(ctx) +} + +fn parse_key_value_options(options: &str) -> DFResult> { + let mut parsed = HashMap::new(); + for entry in options.split(',').map(str::trim).filter(|s| !s.is_empty()) { + let (key, value) = entry.split_once('=').ok_or_else(|| { + DataFusionError::Plan(format!( + "Invalid options entry '{entry}'. Expected comma-separated key=value pairs" + )) + })?; + let key = key.trim(); + if key.is_empty() { + return Err(DataFusionError::Plan( + "Invalid options entry with empty key".to_string(), + )); + } + parsed.insert(key.to_string(), value.trim().to_string()); + } + Ok(parsed) +} + fn ok_result(ctx: &SessionContext) -> DFResult { let schema = Arc::new(Schema::new(vec![Field::new( "result", diff --git a/crates/integrations/datafusion/tests/procedures.rs b/crates/integrations/datafusion/tests/procedures.rs index c1c670c3..82aa1982 100644 --- a/crates/integrations/datafusion/tests/procedures.rs +++ b/crates/integrations/datafusion/tests/procedures.rs @@ -17,7 +17,7 @@ mod common; -use common::{exec, row_count, setup_sql_context}; +use common::{assert_sql_error, exec, row_count, setup_sql_context}; async fn setup_table_with_snapshots() -> (tempfile::TempDir, paimon_datafusion::SQLContext) { let (tmp, sql_context) = setup_sql_context().await; @@ -85,6 +85,42 @@ async fn test_create_tag_with_snapshot_id() { assert_eq!(count, 1); } +#[tokio::test] +async fn test_create_lumina_index_requires_index_column() { + let (_tmp, sql_context) = setup_table_with_snapshots().await; + + assert_sql_error( + &sql_context, + "CALL sys.create_lumina_index(table => 'test_db.t1')", + "Missing required argument: 'index_column'", + ) + .await; +} + +#[tokio::test] +async fn test_create_lumina_index_rejects_invalid_index_type() { + let (_tmp, sql_context) = setup_table_with_snapshots().await; + + assert_sql_error( + &sql_context, + "CALL sys.create_lumina_index(table => 'test_db.t1', index_column => 'name', index_type => 'btree')", + "Unsupported Lumina index type: btree", + ) + .await; +} + +#[tokio::test] +async fn test_create_lumina_index_rejects_invalid_options() { + let (_tmp, sql_context) = setup_table_with_snapshots().await; + + assert_sql_error( + &sql_context, + "CALL sys.create_lumina_index(table => 'test_db.t1', index_column => 'name', options => 'lumina.index.dimension')", + "Expected comma-separated key=value pairs", + ) + .await; +} + #[tokio::test] async fn test_create_tag_already_exists() { let (_tmp, sql_context) = setup_table_with_snapshots().await; diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index 396e9514..7a3d4f25 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -1017,7 +1017,13 @@ mod fulltext_tests { mod vector_search_tests { use std::sync::Arc; - use datafusion::arrow::array::Int32Array; + use datafusion::arrow::array::{ArrayRef, Float32Builder, Int32Array, ListBuilder}; + use datafusion::arrow::datatypes::{ + DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, + }; + use datafusion::arrow::record_batch::RecordBatch; + use paimon::catalog::Identifier; + use paimon::spec::{ArrayType, DataType, FloatType, IntType, Schema}; use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options}; use paimon_datafusion::{register_vector_search, SQLContext}; @@ -1053,6 +1059,69 @@ mod vector_search_tests { (ctx, tmp) } + async fn create_empty_vector_search_context( + ) -> (SQLContext, Arc, tempfile::TempDir) { + let tmp = tempfile::tempdir().expect("Failed to create temp dir"); + let warehouse = format!("file://{}", tmp.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = Arc::new(FileSystemCatalog::new(options).expect("Failed to create catalog")); + + let mut ctx = SQLContext::new(); + ctx.register_catalog("paimon", catalog.clone()) + .await + .expect("Failed to register catalog"); + (ctx, catalog, tmp) + } + + fn build_lumina_table_schema() -> Schema { + let mut options = std::collections::HashMap::new(); + options.insert("row-tracking.enabled".to_string(), "true".to_string()); + options.insert("data-evolution.enabled".to_string(), "true".to_string()); + options.insert("global-index.enabled".to_string(), "true".to_string()); + options.insert( + "global-index.row-count-per-shard".to_string(), + "3".to_string(), + ); + options.insert("lumina.index.dimension".to_string(), "2".to_string()); + options.insert("lumina.encoding.type".to_string(), "rawf32".to_string()); + + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column( + "embedding", + DataType::Array(ArrayType::new(DataType::Float(FloatType::new()))), + ) + .options(options) + .build() + .expect("Failed to build table schema") + } + + fn build_vector_batch(ids: Vec, vectors: Vec>) -> RecordBatch { + let element_field = Arc::new(ArrowField::new("element", ArrowDataType::Float32, true)); + let mut vector_builder = + ListBuilder::new(Float32Builder::new()).with_field(element_field.clone()); + for vector in vectors { + for value in vector { + vector_builder.values().append_value(value); + } + vector_builder.append(true); + } + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("embedding", ArrowDataType::List(element_field), true), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int32Array::from(ids)) as ArrayRef, + Arc::new(vector_builder.finish()) as ArrayRef, + ], + ) + .expect("Failed to build vector batch") + } + fn extract_ids(batches: &[datafusion::arrow::record_batch::RecordBatch]) -> Vec { let mut ids = Vec::new(); for batch in batches { @@ -1068,6 +1137,61 @@ mod vector_search_tests { ids } + fn extract_index_rows( + batches: &[datafusion::arrow::record_batch::RecordBatch], + ) -> Vec<(String, i64, i64, i64, String)> { + let mut rows = Vec::new(); + for batch in batches { + let index_type_array = batch + .column_by_name("index_type") + .and_then(|c| { + c.as_any() + .downcast_ref::() + }) + .expect("Expected StringArray for index_type"); + let row_count_array = batch + .column_by_name("row_count") + .and_then(|c| { + c.as_any() + .downcast_ref::() + }) + .expect("Expected Int64Array for row_count"); + let row_range_start_array = batch + .column_by_name("row_range_start") + .and_then(|c| { + c.as_any() + .downcast_ref::() + }) + .expect("Expected Int64Array for row_range_start"); + let row_range_end_array = batch + .column_by_name("row_range_end") + .and_then(|c| { + c.as_any() + .downcast_ref::() + }) + .expect("Expected Int64Array for row_range_end"); + let index_field_name_array = batch + .column_by_name("index_field_name") + .and_then(|c| { + c.as_any() + .downcast_ref::() + }) + .expect("Expected StringArray for index_field_name"); + + for row_index in 0..batch.num_rows() { + rows.push(( + index_type_array.value(row_index).to_string(), + row_count_array.value(row_index), + row_range_start_array.value(row_index), + row_range_end_array.value(row_index), + index_field_name_array.value(row_index).to_string(), + )); + } + } + rows.sort_by_key(|row| row.2); + rows + } + #[tokio::test] async fn test_vector_search_top3() { let (ctx, _tmp) = create_vector_search_context().await; @@ -1116,4 +1240,92 @@ mod vector_search_tests { "vector_search without a matching Lumina index should not fall back to a full table scan" ); } + + // Manual run with a local Lumina native library: + // LUMINA_LIB_PATH=/path/to/liblumina_py.so cargo test -p paimon-datafusion \ + // vector_search_tests::test_lumina_build_then_vector_search_query \ + // -- --ignored --exact + #[tokio::test] + #[ignore = "requires LUMINA_LIB_PATH"] + async fn test_lumina_build_then_vector_search_query() { + let (ctx, catalog, _tmp) = create_empty_vector_search_context().await; + let identifier = Identifier::new("default", "lumina_build_query_e2e"); + catalog + .create_table(&identifier, build_lumina_table_schema(), false) + .await + .expect("Failed to create table"); + let table = catalog + .get_table(&identifier) + .await + .expect("Failed to load table"); + + let write_builder = table + .new_write_builder() + .with_commit_user("test-user") + .expect("Failed to configure write builder"); + let mut table_write = write_builder + .new_write() + .expect("Failed to create table write"); + table_write + .write_arrow_batch(&build_vector_batch( + vec![0, 1, 2, 3, 4, 5], + vec![ + vec![1.0, 0.0], + vec![0.9, 0.1], + vec![0.0, 1.0], + vec![-1.0, 0.0], + vec![0.0, -1.0], + vec![0.7, 0.3], + ], + )) + .await + .expect("Failed to write vector batch"); + let messages = table_write + .prepare_commit() + .await + .expect("Failed to prepare commit"); + write_builder + .new_commit() + .commit(messages) + .await + .expect("Failed to commit vector data"); + + ctx.sql("CALL sys.create_lumina_index(table => 'default.lumina_build_query_e2e', index_column => 'embedding')") + .await + .expect("Lumina index build SQL should parse") + .collect() + .await + .expect("Lumina index build SQL should execute"); + + let index_batches = ctx + .sql("SELECT index_type, row_count, row_range_start, row_range_end, index_field_name FROM paimon.default.`lumina_build_query_e2e$table_indexes` WHERE index_type = 'lumina'") + .await + .expect("index metadata SQL should parse") + .collect() + .await + .expect("index metadata query should execute"); + let index_rows = extract_index_rows(&index_batches); + assert_eq!( + index_rows, + vec![ + ("lumina".to_string(), 3, 0, 2, "embedding".to_string()), + ("lumina".to_string(), 3, 3, 5, "embedding".to_string()), + ] + ); + + let search_batches = ctx + .sql("SELECT id FROM vector_search('paimon.default.lumina_build_query_e2e', 'embedding', '[1.0, 0.0]', 2)") + .await + .expect("vector_search SQL should parse") + .collect() + .await + .expect("vector_search query should execute"); + let ids = extract_ids(&search_batches); + assert_eq!(ids.len(), 2); + assert!(ids.contains(&0), "exact vector match should be returned"); + assert!( + ids.iter().any(|id| matches!(id, 1 | 5)), + "one same-direction neighbor should be returned, got {ids:?}" + ); + } } diff --git a/docs/src/sql.md b/docs/src/sql.md index 6705578f..a4283f6d 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -419,6 +419,24 @@ Rollback a table to a specific timestamp: CALL sys.rollback_to_timestamp(table => 'paimon.my_db.my_table', timestamp => 1234567890000); ``` +### create_lumina_index + +Build and commit a Lumina global vector index for a table column: + +```sql +CALL sys.create_lumina_index(table => 'paimon.my_db.my_table', index_column => 'embedding'); +``` + +Optional Lumina builder settings can be supplied as comma-separated `key=value` pairs: + +```sql +CALL sys.create_lumina_index( + table => 'paimon.my_db.my_table', + index_column => 'embedding', + options => 'lumina.index.dimension=128,lumina.encoding.type=pq' +); +``` + ## Queries ### Basic Queries