diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index ccb71722..fbdb295d 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -33,55 +33,25 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[after_all] mod admin_test { use super::SHARED_FLUSS_CLUSTER; - use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{get_cluster, start_cluster, stop_cluster}; use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, }; use std::sync::Arc; - use std::thread; fn before_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test-admin").build().await; - let mut guard = cluster_guard.write(); - *guard = Some(cluster); - }); - }) - .join() - .expect("Failed to create cluster"); - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); + start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone()); } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs new file mode 100644 index 00000000..efd79575 --- /dev/null +++ b/crates/fluss/tests/integration/kv_table.rs @@ -0,0 +1,444 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use parking_lot::RwLock; +use std::sync::Arc; +use std::sync::LazyLock; + +use crate::integration::fluss_cluster::FlussTestingCluster; +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: LazyLock>>> = + LazyLock::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod kv_table_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; + use fluss::client::UpsertWriter; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::sync::Arc; + + fn before_all() { + start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone()); + } + + fn get_fluss_cluster() -> Arc { + get_cluster(&SHARED_FLUSS_CLUSTER) + } + + fn after_all() { + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); + } + + fn make_key(id: i32) -> GenericRow<'static> { + let mut row = GenericRow::new(); + row.set_field(0, id); + row.set_field(1, ""); + row.set_field(2, 0i64); + row + } + + #[tokio::test] + async fn upsert_delete_and_lookup() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_upsert_and_lookup".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 35)]; + + // Upsert rows + for (id, name, age) in &test_data { + let mut row = GenericRow::new(); + row.set_field(0, *id); + row.set_field(1, *name); + row.set_field(2, *age); + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert row"); + } + + // Lookup records + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Verify lookup results + for (id, expected_name, expected_age) in &test_data { + let result = lookuper + .lookup(&make_key(*id)) + .await + .expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(row.get_int(0), *id, "id mismatch"); + assert_eq!(row.get_string(1), *expected_name, "name mismatch"); + assert_eq!(row.get_long(2), *expected_age, "age mismatch"); + } + + // Update the record with new age + let mut updated_row = GenericRow::new(); + updated_row.set_field(0, 1); + updated_row.set_field(1, "Verso"); + updated_row.set_field(2, 33i64); + upsert_writer + .upsert(&updated_row) + .await + .expect("Failed to upsert updated row"); + + // Verify the update + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup after update"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!( + found_row.get_long(2), + updated_row.get_long(2), + "Age should be updated" + ); + assert_eq!( + found_row.get_string(1), + updated_row.get_string(1), + "Name should remain unchanged" + ); + + // Delete record with id=1 + let mut delete_row = GenericRow::new(); + delete_row.set_field(0, 1); + delete_row.set_field(1, ""); + delete_row.set_field(2, 0i64); + upsert_writer + .delete(&delete_row) + .await + .expect("Failed to delete"); + + // Verify deletion + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup deleted record"); + assert!( + result + .get_single_row() + .expect("Failed to get row") + .is_none(), + "Record 1 should not exist after delete" + ); + + // Verify other records still exist + for i in [2, 3] { + let result = lookuper + .lookup(&make_key(i)) + .await + .expect("Failed to lookup"); + assert!( + result + .get_single_row() + .expect("Failed to get row") + .is_some(), + "Record {} should still exist after deleting record 1", + i + ); + } + + // Lookup non-existent key + let result = lookuper + .lookup(&make_key(999)) + .await + .expect("Failed to lookup non-existent key"); + assert!( + result + .get_single_row() + .expect("Failed to get row") + .is_none(), + "Non-existent key should return None" + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn composite_primary_keys() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_composite_pk".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("region", DataTypes::string()) + .column("user_id", DataTypes::int()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["region".to_string(), "user_id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert records with composite keys + let test_data = [ + ("US", 1, 100i64), + ("US", 2, 200i64), + ("EU", 1, 150i64), + ("EU", 2, 250i64), + ]; + + for (region, user_id, score) in &test_data { + let mut row = GenericRow::new(); + row.set_field(0, *region); + row.set_field(1, *user_id); + row.set_field(2, *score); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + // Lookup with composite key + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Lookup (US, 1) - should return score 100 + let mut key = GenericRow::new(); + key.set_field(0, "US"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100"); + + // Lookup (EU, 2) - should return score 250 + let mut key = GenericRow::new(); + key.set_field(0, "EU"); + key.set_field(1, 2); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250"); + + // Update (US, 1) score + let mut update_row = GenericRow::new(); + update_row.set_field(0, "US"); + update_row.set_field(1, 1); + update_row.set_field(2, 500i64); + upsert_writer + .upsert(&update_row) + .await + .expect("Failed to update"); + + // Verify update + let mut key = GenericRow::new(); + key.set_field(0, "US"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!( + row.get_long(2), + update_row.get_long(2), + "Row score should be updated" + ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } + + #[tokio::test] + async fn partial_update() { + use fluss::row::Datum; + + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_partial_update".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Insert initial record with all columns + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + let mut row = GenericRow::new(); + row.set_field(0, 1); + row.set_field(1, "Verso"); + row.set_field(2, 32i64); + row.set_field(3, 6942i64); + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert initial row"); + + // Verify initial record + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(found_row.get_int(0), 1); + assert_eq!(found_row.get_string(1), "Verso"); + assert_eq!(found_row.get_long(2), 32i64); + assert_eq!(found_row.get_long(3), 6942i64); + + // Create partial update writer to update only score column + let partial_upsert = table_upsert + .partial_update_with_column_names(&["id", "score"]) + .expect("Failed to create TableUpsert with partial update"); + let mut partial_writer = partial_upsert + .create_writer() + .expect("Failed to create UpsertWriter with partial write"); + + // Update only the score column + let mut partial_row = GenericRow::new(); + partial_row.set_field(0, 1); + partial_row.set_field(1, Datum::Null); // not in partial update column + partial_row.set_field(2, Datum::Null); // not in partial update column + partial_row.set_field(3, 420i64); + partial_writer + .upsert(&partial_row) + .await + .expect("Failed to upsert"); + + // Verify partial update - name and age should remain unchanged + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup after partial update"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(found_row.get_int(0), 1, "id should remain 1"); + assert_eq!( + found_row.get_string(1), + "Verso", + "name should remain unchanged" + ); + assert_eq!(found_row.get_long(2), 32, "age should remain unchanged"); + assert_eq!(found_row.get_long(3), 420, "score should be updated to 420"); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } +} diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 4cba4699..ef73b568 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -33,8 +33,8 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[after_all] mod table_test { use super::SHARED_FLUSS_CLUSTER; - use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::create_table; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; use arrow::array::record_batch; use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; @@ -44,50 +44,18 @@ mod table_test { use jiff::Timestamp; use std::collections::HashMap; use std::sync::Arc; - use std::thread; use std::time::Duration; fn before_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test_table").build().await; - let mut guard = cluster_guard.write(); - *guard = Some(cluster); - }); - }) - .join() - .expect("Failed to create cluster"); - - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); + start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone()); } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] @@ -527,7 +495,7 @@ mod table_test { use arrow::array::Int32Array; let batches = scanner.poll(Duration::from_secs(10)).await.unwrap(); - let mut all_ids: Vec = batches + let all_ids: Vec = batches .iter() .flat_map(|b| { (0..b.num_rows()).map(|i| { diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 43c89b54..e28a8362 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -33,19 +33,20 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_remote_scan_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::create_table; + use crate::integration::utils::{ + create_table, get_cluster, stop_cluster, wait_for_cluster_ready, + }; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; use std::sync::Arc; use std::thread; - use std::thread::sleep; use std::time::Duration; use uuid::Uuid; fn before_all() { // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { @@ -94,32 +95,17 @@ mod table_remote_scan_test { .with_remote_data_dir(temp_dir) .build() .await; - let mut guard = cluster_guard.write(); + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); *guard = Some(cluster); }); }) .join() .expect("Failed to create cluster"); - - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - sleep(Duration::from_secs(20)); } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] @@ -215,10 +201,6 @@ mod table_remote_scan_test { } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } } diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index cd1f6ccb..4d0c349f 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -15,8 +15,43 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use fluss::client::FlussAdmin; use fluss::metadata::{TableDescriptor, TablePath}; +use parking_lot::RwLock; +use std::sync::Arc; +use std::time::Duration; + +/// Polls the cluster until CoordinatorEventProcessor is initialized and tablet server is available. +/// Times out after 20 seconds. +pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) { + let timeout = Duration::from_secs(20); + let poll_interval = Duration::from_millis(500); + let start = std::time::Instant::now(); + + loop { + let connection = cluster.get_fluss_connection().await; + if connection.get_admin().await.is_ok() + && connection + .get_metadata() + .get_cluster() + .get_one_available_server() + .is_some() + { + return; + } + + if start.elapsed() >= timeout { + panic!( + "Server readiness check timed out after {} seconds. \ + CoordinatorEventProcessor may not be initialized or TabletServer may not be available.", + timeout.as_secs() + ); + } + + tokio::time::sleep(poll_interval).await; + } +} pub async fn create_table( admin: &FlussAdmin, @@ -28,3 +63,42 @@ pub async fn create_table( .await .expect("Failed to create table"); } + +pub fn start_cluster(name: &str, cluster_lock: Arc>>) { + let name = name.to_string(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let cluster = FlussTestingClusterBuilder::new(&name).build().await; + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); +} + +pub fn stop_cluster(cluster_lock: Arc>>) { + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let mut guard = cluster_lock.write(); + if let Some(cluster) = guard.take() { + cluster.stop().await; + } + }); + }) + .join() + .expect("Failed to cleanup cluster"); +} + +pub fn get_cluster(cluster_lock: &RwLock>) -> Arc { + let guard = cluster_lock.read(); + Arc::new( + guard + .as_ref() + .expect("Fluss cluster not initialized. Make sure before_all() was called.") + .clone(), + ) +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 65111af2..f3987e62 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -22,6 +22,7 @@ extern crate fluss; mod integration { mod admin; mod fluss_cluster; + mod kv_table; mod table; mod utils;