From eb50c02f67a1a5658041ea6bc81352d1f791e792 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 21 Jan 2026 13:43:11 +0000 Subject: [PATCH 01/11] KvTable integration tests --- crates/fluss/tests/integration/kv_table.rs | 516 +++++++++++++++++++++ crates/fluss/tests/test_fluss.rs | 1 + 2 files changed, 517 insertions(+) create mode 100644 crates/fluss/tests/integration/kv_table.rs diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs new file mode 100644 index 00000000..4b778090 --- /dev/null +++ b/crates/fluss/tests/integration/kv_table.rs @@ -0,0 +1,516 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use parking_lot::RwLock; +use std::sync::Arc; +use std::sync::LazyLock; + +use crate::integration::fluss_cluster::FlussTestingCluster; +#[cfg(test)] +use test_env_helpers::*; + +// Module-level shared cluster instance (only for this test file) +static SHARED_FLUSS_CLUSTER: LazyLock>>> = + LazyLock::new(|| Arc::new(RwLock::new(None))); + +#[cfg(test)] +#[before_all] +#[after_all] +mod kv_table_test { + use super::SHARED_FLUSS_CLUSTER; + use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::create_table; + use fluss::client::UpsertWriter; + use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; + use fluss::row::{GenericRow, InternalRow}; + use std::sync::Arc; + use std::thread; + + fn before_all() { + // Create a new tokio runtime in a separate thread + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let cluster = FlussTestingClusterBuilder::new("test_table").build().await; + let mut guard = cluster_guard.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); + + // wait for 20 seconds to avoid the error like + // CoordinatorEventProcessor is not initialized yet + thread::sleep(std::time::Duration::from_secs(20)); + } + + fn get_fluss_cluster() -> Arc { + let cluster_guard = SHARED_FLUSS_CLUSTER.read(); + if cluster_guard.is_none() { + panic!("Fluss cluster not initialized. Make sure before_all() was called."); + } + Arc::new(cluster_guard.as_ref().unwrap().clone()) + } + + fn after_all() { + let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let mut guard = cluster_guard.write(); + if let Some(cluster) = guard.take() { + cluster.stop().await; + } + }); + }) + .join() + .expect("Failed to cleanup cluster"); + } + + fn make_key(id: i32) -> GenericRow<'static> { + let mut row = GenericRow::new(); + row.set_field(0, id); + row + } + + #[tokio::test] + async fn upsert_and_lookup() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new( + "fluss".to_string(), + "test_upsert_and_lookup".to_string(), + ); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + let test_data = [(1, "Verso", 32i64), (2, "Noco", 25), (3, "Esquie", 35)]; + + // Upsert rows + for (id, name, age) in &test_data { + let mut row = GenericRow::new(); + row.set_field(0, *id); + row.set_field(1, *name); + row.set_field(2, *age); + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert row"); + } + + // Lookup records + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Verify lookup results + for (id, expected_name, expected_age) in &test_data { + let result = lookuper + .lookup(&make_key(*id)) + .await + .expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(row.get_int(0), *id, "id mismatch"); + assert_eq!(row.get_string(1), *expected_name, "name mismatch"); + assert_eq!(row.get_long(2), *expected_age, "age mismatch"); + } + } + + #[tokio::test] + async fn update_existing_record() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_update_existing_record".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert initial record + let mut row = GenericRow::new(); + row.set_field(0, 1); + row.set_field(1, "Flash"); + row.set_field(2, 123456789012i64); + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert initial row"); + + // Verify initial record + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(found_row.get_long(2), row.get_long(2), "Expected initial score to be 123456789012i64"); + + // Update the record with new score + let mut updated_row = GenericRow::new(); + updated_row.set_field(0, 1); + updated_row.set_field(1, "Flash"); + updated_row.set_field(2, 987654321098i64); + upsert_writer + .upsert(&updated_row) + .await + .expect("Failed to upsert updated row"); + + // Verify the update + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup after update"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!( + found_row.get_long(2), + updated_row.get_long(2), + "Score should be updated" + ); + assert_eq!( + found_row.get_string(1), + updated_row.get_string(1), + "Name should remain unchanged" + ); + } + + #[tokio::test] + async fn delete_record() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_delete_record".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("data", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert records + for i in 1..=3 { + let mut row = GenericRow::new(); + row.set_field(0, i); + let data = format!("data{}", i); + row.set_field(1, data.as_str()); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + // Verify records exist + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + for i in 1..=3 { + let result = lookuper + .lookup(&make_key(i)) + .await + .expect("Failed to lookup"); + assert!( + result.get_single_row().expect("Failed to get row").is_some(), + "Record {} should exist before delete", + i + ); + } + + // Delete record with id=2 + let mut delete_row = GenericRow::new(); + delete_row.set_field(0, 2); + delete_row.set_field(1, ""); + upsert_writer + .delete(&delete_row) + .await + .expect("Failed to delete"); + + // Verify deletion + let result = lookuper + .lookup(&make_key(2)) + .await + .expect("Failed to lookup deleted record"); + assert!( + result.get_single_row().expect("Failed to get row").is_none(), + "Record 2 should not exist after delete" + ); + + // Verify other records still exist + for i in [1, 3] { + let result = lookuper + .lookup(&make_key(i)) + .await + .expect("Failed to lookup"); + assert!( + result.get_single_row().expect("Failed to get row").is_some(), + "Record {} should still exist after deleting record 2", + i + ); + } + } + + #[tokio::test] + async fn test_lookup_non_existent_key() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_lookup_non_existent".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("value", DataTypes::string()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Insert one record + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + let mut row = GenericRow::new(); + row.set_field(0, 1); + row.set_field(1, "exists"); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + + // Lookup non-existent key + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let result = lookuper + .lookup(&make_key(999)) + .await + .expect("Failed to lookup non-existent key"); + assert!( + result.get_single_row().expect("Failed to get row").is_none(), + "Non-existent key should return None" + ); + } + + #[tokio::test] + async fn test_multiple_primary_key_columns() { + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_composite_pk".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("region", DataTypes::string()) + .column("user_id", DataTypes::int()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["region".to_string(), "user_id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Insert records with composite keys + let test_data = [ + ("US", 1, 100i64), + ("US", 2, 200i64), + ("EU", 1, 150i64), + ("EU", 2, 250i64), + ]; + + for (region, user_id, score) in &test_data { + let mut row = GenericRow::new(); + row.set_field(0, *region); + row.set_field(1, *user_id); + row.set_field(2, *score); + upsert_writer.upsert(&row).await.expect("Failed to upsert"); + } + + // Lookup with composite key + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + // Lookup (US, 1) - should return score 100 + let mut key = GenericRow::new(); + key.set_field(0, "US"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_long(2), 100, "Score for (US, 1) should be 100"); + + // Lookup (EU, 2) - should return score 250 + let mut key = GenericRow::new(); + key.set_field(0, "EU"); + key.set_field(1, 2); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!(row.get_long(2), 250, "Score for (EU, 2) should be 250"); + + // Update (US, 1) score + let mut update_row = GenericRow::new(); + update_row.set_field(0, "US"); + update_row.set_field(1, 1); + update_row.set_field(2, 500i64); + upsert_writer + .upsert(&update_row) + .await + .expect("Failed to update"); + + // Verify update + let mut key = GenericRow::new(); + key.set_field(0, "US"); + key.set_field(1, 1); + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + assert_eq!( + row.get_long(2), + update_row.get_long(2), + "Row balance should be updated" + ); + } +} diff --git a/crates/fluss/tests/test_fluss.rs b/crates/fluss/tests/test_fluss.rs index 65111af2..f3987e62 100644 --- a/crates/fluss/tests/test_fluss.rs +++ b/crates/fluss/tests/test_fluss.rs @@ -22,6 +22,7 @@ extern crate fluss; mod integration { mod admin; mod fluss_cluster; + mod kv_table; mod table; mod utils; From 9d2f5a33b02a9130941f8a13033f58a6322918b5 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 21 Jan 2026 14:03:48 +0000 Subject: [PATCH 02/11] KvTable integration tests --- crates/fluss/tests/integration/kv_table.rs | 156 ++++++++++++++++++--- 1 file changed, 139 insertions(+), 17 deletions(-) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 4b778090..a106690f 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -52,8 +52,8 @@ mod kv_table_test { *guard = Some(cluster); }); }) - .join() - .expect("Failed to create cluster"); + .join() + .expect("Failed to create cluster"); // wait for 20 seconds to avoid the error like // CoordinatorEventProcessor is not initialized yet @@ -96,10 +96,7 @@ mod kv_table_test { let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = TablePath::new( - "fluss".to_string(), - "test_upsert_and_lookup".to_string(), - ); + let table_path = TablePath::new("fluss".to_string(), "test_upsert_and_lookup".to_string()); let table_descriptor = TableDescriptor::builder() .schema( @@ -171,8 +168,10 @@ mod kv_table_test { let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = - TablePath::new("fluss".to_string(), "test_update_existing_record".to_string()); + let table_path = TablePath::new( + "fluss".to_string(), + "test_update_existing_record".to_string(), + ); let table_descriptor = TableDescriptor::builder() .schema( @@ -224,7 +223,11 @@ mod kv_table_test { .get_single_row() .expect("Failed to get row") .expect("Row should exist"); - assert_eq!(found_row.get_long(2), row.get_long(2), "Expected initial score to be 123456789012i64"); + assert_eq!( + found_row.get_long(2), + row.get_long(2), + "Expected initial score to match" + ); // Update the record with new score let mut updated_row = GenericRow::new(); @@ -312,7 +315,10 @@ mod kv_table_test { .await .expect("Failed to lookup"); assert!( - result.get_single_row().expect("Failed to get row").is_some(), + result + .get_single_row() + .expect("Failed to get row") + .is_some(), "Record {} should exist before delete", i ); @@ -333,7 +339,10 @@ mod kv_table_test { .await .expect("Failed to lookup deleted record"); assert!( - result.get_single_row().expect("Failed to get row").is_none(), + result + .get_single_row() + .expect("Failed to get row") + .is_none(), "Record 2 should not exist after delete" ); @@ -344,7 +353,10 @@ mod kv_table_test { .await .expect("Failed to lookup"); assert!( - result.get_single_row().expect("Failed to get row").is_some(), + result + .get_single_row() + .expect("Failed to get row") + .is_some(), "Record {} should still exist after deleting record 2", i ); @@ -352,7 +364,7 @@ mod kv_table_test { } #[tokio::test] - async fn test_lookup_non_existent_key() { + async fn lookup_non_existent_key() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; @@ -403,20 +415,22 @@ mod kv_table_test { .await .expect("Failed to lookup non-existent key"); assert!( - result.get_single_row().expect("Failed to get row").is_none(), + result + .get_single_row() + .expect("Failed to get row") + .is_none(), "Non-existent key should return None" ); } #[tokio::test] - async fn test_multiple_primary_key_columns() { + async fn multiple_primary_keys() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = - TablePath::new("fluss".to_string(), "test_composite_pk".to_string()); + let table_path = TablePath::new("fluss".to_string(), "test_composite_pk".to_string()); let table_descriptor = TableDescriptor::builder() .schema( @@ -513,4 +527,112 @@ mod kv_table_test { "Row balance should be updated" ); } + + #[tokio::test] + async fn partial_update() { + use fluss::row::Datum; + + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_partial_update".to_string()); + + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + .column("id", DataTypes::int()) + .column("name", DataTypes::string()) + .column("age", DataTypes::bigint()) + .column("score", DataTypes::bigint()) + .primary_key(vec!["id".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + // Insert initial record with all columns + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + let mut row = GenericRow::new(); + row.set_field(0, 1); + row.set_field(1, "Verso"); + row.set_field(2, 32i64); + row.set_field(3, 6942i64); + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert initial row"); + + // Verify initial record + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(found_row.get_int(0), 1); + assert_eq!(found_row.get_string(1), "Verso"); + assert_eq!(found_row.get_long(2), 32i64); + assert_eq!(found_row.get_long(3), 6942i64); + + // Create partial update writer to update only score column + let partial_upsert = table_upsert + .partial_update_with_column_names(&["id", "score"]) + .expect("Failed to create TableUpsert with partial update"); + let mut partial_writer = partial_upsert + .create_writer() + .expect("Failed to create UpsertWriter with partial write"); + + // Update only the score column + let mut partial_row = GenericRow::new(); + partial_row.set_field(0, 1); + partial_row.set_field(1, Datum::Null); // not in partial update column + partial_row.set_field(2, Datum::Null); // not in partial update column + partial_row.set_field(3, 420i64); + partial_writer + .upsert(&partial_row) + .await + .expect("Failed to upsert"); + + // Verify partial update - name and age should remain unchanged + let result = lookuper + .lookup(&make_key(1)) + .await + .expect("Failed to lookup after partial update"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + assert_eq!(found_row.get_int(0), 1, "id should remain 1"); + assert_eq!( + found_row.get_string(1), + "Verso", + "name should remain unchanged" + ); + assert_eq!(found_row.get_long(2), 32, "age should remain unchanged"); + assert_eq!(found_row.get_long(3), 420, "score should be updated to 420"); + } } From d0c38dc4a766ced5c028ba62eb8962f80a07aebc Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Wed, 21 Jan 2026 14:07:02 +0000 Subject: [PATCH 03/11] Remove unnecessary mut to quelch warning --- crates/fluss/tests/integration/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 4cba4699..ffc3b98c 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -527,7 +527,7 @@ mod table_test { use arrow::array::Int32Array; let batches = scanner.poll(Duration::from_secs(10)).await.unwrap(); - let mut all_ids: Vec = batches + let all_ids: Vec = batches .iter() .flat_map(|b| { (0..b.num_rows()).map(|i| { From 552023b27b30c76051f2e1f56b98c9e29bc551d6 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 09:50:30 +0000 Subject: [PATCH 04/11] Poll server readiness for 20 seconds --- crates/fluss/tests/integration/admin.rs | 9 +++---- crates/fluss/tests/integration/kv_table.rs | 17 +++++------- crates/fluss/tests/integration/table.rs | 11 +++----- .../tests/integration/table_remote_scan.rs | 12 +++------ crates/fluss/tests/integration/utils.rs | 27 +++++++++++++++++++ 5 files changed, 46 insertions(+), 30 deletions(-) diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index ccb71722..84e26f6e 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -34,6 +34,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod admin_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; + use crate::integration::utils::wait_for_cluster_ready; use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, @@ -44,20 +45,18 @@ mod admin_test { fn before_all() { // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { let cluster = FlussTestingClusterBuilder::new("test-admin").build().await; - let mut guard = cluster_guard.write(); + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); *guard = Some(cluster); }); }) .join() .expect("Failed to create cluster"); - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); } fn get_fluss_cluster() -> Arc { diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index a106690f..d8b1ad97 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -34,7 +34,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod kv_table_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::create_table; + use crate::integration::utils::{create_table, wait_for_cluster_ready}; use fluss::client::UpsertWriter; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; @@ -43,21 +43,18 @@ mod kv_table_test { fn before_all() { // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test_table").build().await; - let mut guard = cluster_guard.write(); + let cluster = FlussTestingClusterBuilder::new("test_kv_table").build().await; + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); *guard = Some(cluster); }); }) .join() .expect("Failed to create cluster"); - - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); } fn get_fluss_cluster() -> Arc { @@ -424,7 +421,7 @@ mod kv_table_test { } #[tokio::test] - async fn multiple_primary_keys() { + async fn composite_primary_keys() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; @@ -524,7 +521,7 @@ mod kv_table_test { assert_eq!( row.get_long(2), update_row.get_long(2), - "Row balance should be updated" + "Row score should be updated" ); } diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index ffc3b98c..9291507f 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -34,7 +34,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::create_table; + use crate::integration::utils::{create_table, wait_for_cluster_ready}; use arrow::array::record_batch; use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; @@ -49,21 +49,18 @@ mod table_test { fn before_all() { // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { let cluster = FlussTestingClusterBuilder::new("test_table").build().await; - let mut guard = cluster_guard.write(); + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); *guard = Some(cluster); }); }) .join() .expect("Failed to create cluster"); - - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - thread::sleep(std::time::Duration::from_secs(20)); } fn get_fluss_cluster() -> Arc { diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 43c89b54..73ecfec8 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -33,19 +33,18 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_remote_scan_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::create_table; + use crate::integration::utils::{create_table, wait_for_cluster_ready}; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; use std::sync::Arc; use std::thread; - use std::thread::sleep; use std::time::Duration; use uuid::Uuid; fn before_all() { // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); + let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); thread::spawn(move || { let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); rt.block_on(async { @@ -94,16 +93,13 @@ mod table_remote_scan_test { .with_remote_data_dir(temp_dir) .build() .await; - let mut guard = cluster_guard.write(); + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); *guard = Some(cluster); }); }) .join() .expect("Failed to create cluster"); - - // wait for 20 seconds to avoid the error like - // CoordinatorEventProcessor is not initialized yet - sleep(Duration::from_secs(20)); } fn after_all() { diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index cd1f6ccb..17a41d31 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -15,8 +15,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +use crate::integration::fluss_cluster::FlussTestingCluster; use fluss::client::FlussAdmin; use fluss::metadata::{TableDescriptor, TablePath}; +use std::time::Duration; + +/// Polls the cluster until the CoordinatorEventProcessor is initialized. +/// Times out after 20 seconds. +pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) { + let timeout = Duration::from_secs(20); + let poll_interval = Duration::from_millis(500); + let start = std::time::Instant::now(); + + loop { + let connection = cluster.get_fluss_connection().await; + if connection.get_admin().await.is_ok() { + return; + } + + if start.elapsed() >= timeout { + panic!( + "Server readiness check timed out after {} seconds. \ + CoordinatorEventProcessor may not be initialized.", + timeout.as_secs() + ); + } + + tokio::time::sleep(poll_interval).await; + } +} pub async fn create_table( admin: &FlussAdmin, From 3a13253b990fd82c6d501d137c9bcc034f44fa89 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 10:00:22 +0000 Subject: [PATCH 05/11] Refactored integration tests --- crates/fluss/tests/integration/admin.rs | 39 +++-------------- crates/fluss/tests/integration/kv_table.rs | 38 +++------------- crates/fluss/tests/integration/table.rs | 39 +++-------------- .../tests/integration/table_remote_scan.rs | 20 +-------- crates/fluss/tests/integration/utils.rs | 43 ++++++++++++++++++- 5 files changed, 59 insertions(+), 120 deletions(-) diff --git a/crates/fluss/tests/integration/admin.rs b/crates/fluss/tests/integration/admin.rs index 84e26f6e..fbdb295d 100644 --- a/crates/fluss/tests/integration/admin.rs +++ b/crates/fluss/tests/integration/admin.rs @@ -33,54 +33,25 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[after_all] mod admin_test { use super::SHARED_FLUSS_CLUSTER; - use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::wait_for_cluster_ready; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{get_cluster, start_cluster, stop_cluster}; use fluss::error::FlussError; use fluss::metadata::{ DataTypes, DatabaseDescriptorBuilder, KvFormat, LogFormat, Schema, TableDescriptor, TablePath, }; use std::sync::Arc; - use std::thread; fn before_all() { - // Create a new tokio runtime in a separate thread - let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test-admin").build().await; - wait_for_cluster_ready(&cluster).await; - let mut guard = cluster_lock.write(); - *guard = Some(cluster); - }); - }) - .join() - .expect("Failed to create cluster"); + start_cluster("test-admin", SHARED_FLUSS_CLUSTER.clone()); } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index d8b1ad97..ecc5453a 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -33,51 +33,23 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[after_all] mod kv_table_test { use super::SHARED_FLUSS_CLUSTER; - use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::{create_table, wait_for_cluster_ready}; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; use fluss::client::UpsertWriter; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::sync::Arc; - use std::thread; fn before_all() { - // Create a new tokio runtime in a separate thread - let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test_kv_table").build().await; - wait_for_cluster_ready(&cluster).await; - let mut guard = cluster_lock.write(); - *guard = Some(cluster); - }); - }) - .join() - .expect("Failed to create cluster"); + start_cluster("test_kv_table", SHARED_FLUSS_CLUSTER.clone()); } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } fn after_all() { - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - std::thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } fn make_key(id: i32) -> GenericRow<'static> { diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index 9291507f..ef73b568 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -33,8 +33,8 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> #[after_all] mod table_test { use super::SHARED_FLUSS_CLUSTER; - use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::{create_table, wait_for_cluster_ready}; + use crate::integration::fluss_cluster::FlussTestingCluster; + use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster}; use arrow::array::record_batch; use fluss::client::{FlussTable, TableScan}; use fluss::metadata::{DataTypes, Schema, TableBucket, TableDescriptor, TablePath}; @@ -44,47 +44,18 @@ mod table_test { use jiff::Timestamp; use std::collections::HashMap; use std::sync::Arc; - use std::thread; use std::time::Duration; fn before_all() { - // Create a new tokio runtime in a separate thread - let cluster_lock = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let cluster = FlussTestingClusterBuilder::new("test_table").build().await; - wait_for_cluster_ready(&cluster).await; - let mut guard = cluster_lock.write(); - *guard = Some(cluster); - }); - }) - .join() - .expect("Failed to create cluster"); + start_cluster("test_table", SHARED_FLUSS_CLUSTER.clone()); } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 73ecfec8..332c1c84 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -103,19 +103,7 @@ mod table_remote_scan_test { } fn after_all() { - // Create a new tokio runtime in a separate thread - let cluster_guard = SHARED_FLUSS_CLUSTER.clone(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); - rt.block_on(async { - let mut guard = cluster_guard.write(); - if let Some(cluster) = guard.take() { - cluster.stop().await; - } - }); - }) - .join() - .expect("Failed to cleanup cluster"); + stop_cluster(SHARED_FLUSS_CLUSTER.clone()); } #[tokio::test] @@ -211,10 +199,6 @@ mod table_remote_scan_test { } fn get_fluss_cluster() -> Arc { - let cluster_guard = SHARED_FLUSS_CLUSTER.read(); - if cluster_guard.is_none() { - panic!("Fluss cluster not initialized. Make sure before_all() was called."); - } - Arc::new(cluster_guard.as_ref().unwrap().clone()) + get_cluster(&SHARED_FLUSS_CLUSTER) } } diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index 17a41d31..fbf78d46 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -15,9 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -use crate::integration::fluss_cluster::FlussTestingCluster; +use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use fluss::client::FlussAdmin; use fluss::metadata::{TableDescriptor, TablePath}; +use parking_lot::RwLock; +use std::sync::Arc; use std::time::Duration; /// Polls the cluster until the CoordinatorEventProcessor is initialized. @@ -55,3 +57,42 @@ pub async fn create_table( .await .expect("Failed to create table"); } + +pub fn start_cluster(name: &str, cluster_lock: Arc>>) { + let name = name.to_string(); + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let cluster = FlussTestingClusterBuilder::new(&name).build().await; + wait_for_cluster_ready(&cluster).await; + let mut guard = cluster_lock.write(); + *guard = Some(cluster); + }); + }) + .join() + .expect("Failed to create cluster"); +} + +pub fn stop_cluster(cluster_lock: Arc>>) { + std::thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + rt.block_on(async { + let mut guard = cluster_lock.write(); + if let Some(cluster) = guard.take() { + cluster.stop().await; + } + }); + }) + .join() + .expect("Failed to cleanup cluster"); +} + +pub fn get_cluster(cluster_lock: &RwLock>) -> Arc { + let guard = cluster_lock.read(); + Arc::new( + guard + .as_ref() + .expect("Fluss cluster not initialized. Make sure before_all() was called.") + .clone(), + ) +} From 866fe945ca0d3cbf4a9a6b89b41f8251b113147d Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 10:05:45 +0000 Subject: [PATCH 06/11] Refactored integration tests --- crates/fluss/tests/integration/table_remote_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 332c1c84..8555db71 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -33,7 +33,7 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_remote_scan_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::{create_table, wait_for_cluster_ready}; + use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster, wait_for_cluster_ready}; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; From b971b4bb54ac863cade6e469e42240b91c08ee74 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 10:05:53 +0000 Subject: [PATCH 07/11] Remove tables at the end of IT --- crates/fluss/tests/integration/kv_table.rs | 30 ++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index ecc5453a..0afc8e67 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -128,6 +128,11 @@ mod kv_table_test { assert_eq!(row.get_string(1), *expected_name, "name mismatch"); assert_eq!(row.get_long(2), *expected_age, "age mismatch"); } + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } #[tokio::test] @@ -227,6 +232,11 @@ mod kv_table_test { updated_row.get_string(1), "Name should remain unchanged" ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } #[tokio::test] @@ -330,6 +340,11 @@ mod kv_table_test { i ); } + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } #[tokio::test] @@ -390,6 +405,11 @@ mod kv_table_test { .is_none(), "Non-existent key should return None" ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } #[tokio::test] @@ -495,6 +515,11 @@ mod kv_table_test { update_row.get_long(2), "Row score should be updated" ); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } #[tokio::test] @@ -603,5 +628,10 @@ mod kv_table_test { ); assert_eq!(found_row.get_long(2), 32, "age should remain unchanged"); assert_eq!(found_row.get_long(3), 420, "score should be updated to 420"); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); } } From 7549c9aefc55e73619c8488b90c10d3b37863d22 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 10:06:24 +0000 Subject: [PATCH 08/11] Formatting and clippy --- crates/fluss/tests/integration/table_remote_scan.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index 8555db71..bb09c1e6 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -33,7 +33,9 @@ static SHARED_FLUSS_CLUSTER: LazyLock>>> mod table_remote_scan_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; - use crate::integration::utils::{create_table, get_cluster, start_cluster, stop_cluster, wait_for_cluster_ready}; + use crate::integration::utils::{ + create_table, get_cluster, start_cluster, stop_cluster, wait_for_cluster_ready, + }; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; use std::collections::HashMap; From 25f43002d2dd62d64b07c754072f6f20b7ae2444 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 10:10:44 +0000 Subject: [PATCH 09/11] Formatting and clippy --- crates/fluss/tests/integration/table_remote_scan.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/fluss/tests/integration/table_remote_scan.rs b/crates/fluss/tests/integration/table_remote_scan.rs index bb09c1e6..e28a8362 100644 --- a/crates/fluss/tests/integration/table_remote_scan.rs +++ b/crates/fluss/tests/integration/table_remote_scan.rs @@ -34,7 +34,7 @@ mod table_remote_scan_test { use super::SHARED_FLUSS_CLUSTER; use crate::integration::fluss_cluster::{FlussTestingCluster, FlussTestingClusterBuilder}; use crate::integration::utils::{ - create_table, get_cluster, start_cluster, stop_cluster, wait_for_cluster_ready, + create_table, get_cluster, stop_cluster, wait_for_cluster_ready, }; use fluss::metadata::{DataTypes, Schema, TableDescriptor, TablePath}; use fluss::row::{GenericRow, InternalRow}; From 043cf27a36c2ab8b72e662da2db9769649a89b25 Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Thu, 22 Jan 2026 13:49:04 +0000 Subject: [PATCH 10/11] Squash IT test cases --- crates/fluss/tests/integration/kv_table.rs | 221 ++------------------- 1 file changed, 14 insertions(+), 207 deletions(-) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 0afc8e67..efd79575 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -55,11 +55,13 @@ mod kv_table_test { fn make_key(id: i32) -> GenericRow<'static> { let mut row = GenericRow::new(); row.set_field(0, id); + row.set_field(1, ""); + row.set_field(2, 0i64); row } #[tokio::test] - async fn upsert_and_lookup() { + async fn upsert_delete_and_lookup() { let cluster = get_fluss_cluster(); let connection = cluster.get_fluss_connection().await; @@ -129,85 +131,11 @@ mod kv_table_test { assert_eq!(row.get_long(2), *expected_age, "age mismatch"); } - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn update_existing_record() { - let cluster = get_fluss_cluster(); - let connection = cluster.get_fluss_connection().await; - - let admin = connection.get_admin().await.expect("Failed to get admin"); - - let table_path = TablePath::new( - "fluss".to_string(), - "test_update_existing_record".to_string(), - ); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("name", DataTypes::string()) - .column("score", DataTypes::bigint()) - .primary_key(vec!["id".to_string()]) - .build() - .expect("Failed to build schema"), - ) - .build() - .expect("Failed to build table"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let table_upsert = table.new_upsert().expect("Failed to create upsert"); - let mut upsert_writer = table_upsert - .create_writer() - .expect("Failed to create writer"); - - // Insert initial record - let mut row = GenericRow::new(); - row.set_field(0, 1); - row.set_field(1, "Flash"); - row.set_field(2, 123456789012i64); - upsert_writer - .upsert(&row) - .await - .expect("Failed to upsert initial row"); - - // Verify initial record - let mut lookuper = table - .new_lookup() - .expect("Failed to create lookup") - .create_lookuper() - .expect("Failed to create lookuper"); - - let result = lookuper - .lookup(&make_key(1)) - .await - .expect("Failed to lookup"); - let found_row = result - .get_single_row() - .expect("Failed to get row") - .expect("Row should exist"); - assert_eq!( - found_row.get_long(2), - row.get_long(2), - "Expected initial score to match" - ); - - // Update the record with new score + // Update the record with new age let mut updated_row = GenericRow::new(); updated_row.set_field(0, 1); - updated_row.set_field(1, "Flash"); - updated_row.set_field(2, 987654321098i64); + updated_row.set_field(1, "Verso"); + updated_row.set_field(2, 33i64); upsert_writer .upsert(&updated_row) .await @@ -225,7 +153,7 @@ mod kv_table_test { assert_eq!( found_row.get_long(2), updated_row.get_long(2), - "Score should be updated" + "Age should be updated" ); assert_eq!( found_row.get_string(1), @@ -233,80 +161,11 @@ mod kv_table_test { "Name should remain unchanged" ); - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn delete_record() { - let cluster = get_fluss_cluster(); - let connection = cluster.get_fluss_connection().await; - - let admin = connection.get_admin().await.expect("Failed to get admin"); - - let table_path = TablePath::new("fluss".to_string(), "test_delete_record".to_string()); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("data", DataTypes::string()) - .primary_key(vec!["id".to_string()]) - .build() - .expect("Failed to build schema"), - ) - .build() - .expect("Failed to build table"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - let table_upsert = table.new_upsert().expect("Failed to create upsert"); - let mut upsert_writer = table_upsert - .create_writer() - .expect("Failed to create writer"); - - // Insert records - for i in 1..=3 { - let mut row = GenericRow::new(); - row.set_field(0, i); - let data = format!("data{}", i); - row.set_field(1, data.as_str()); - upsert_writer.upsert(&row).await.expect("Failed to upsert"); - } - - // Verify records exist - let mut lookuper = table - .new_lookup() - .expect("Failed to create lookup") - .create_lookuper() - .expect("Failed to create lookuper"); - - for i in 1..=3 { - let result = lookuper - .lookup(&make_key(i)) - .await - .expect("Failed to lookup"); - assert!( - result - .get_single_row() - .expect("Failed to get row") - .is_some(), - "Record {} should exist before delete", - i - ); - } - - // Delete record with id=2 + // Delete record with id=1 let mut delete_row = GenericRow::new(); - delete_row.set_field(0, 2); + delete_row.set_field(0, 1); delete_row.set_field(1, ""); + delete_row.set_field(2, 0i64); upsert_writer .delete(&delete_row) .await @@ -314,7 +173,7 @@ mod kv_table_test { // Verify deletion let result = lookuper - .lookup(&make_key(2)) + .lookup(&make_key(1)) .await .expect("Failed to lookup deleted record"); assert!( @@ -322,11 +181,11 @@ mod kv_table_test { .get_single_row() .expect("Failed to get row") .is_none(), - "Record 2 should not exist after delete" + "Record 1 should not exist after delete" ); // Verify other records still exist - for i in [1, 3] { + for i in [2, 3] { let result = lookuper .lookup(&make_key(i)) .await @@ -336,64 +195,12 @@ mod kv_table_test { .get_single_row() .expect("Failed to get row") .is_some(), - "Record {} should still exist after deleting record 2", + "Record {} should still exist after deleting record 1", i ); } - admin - .drop_table(&table_path, false) - .await - .expect("Failed to drop table"); - } - - #[tokio::test] - async fn lookup_non_existent_key() { - let cluster = get_fluss_cluster(); - let connection = cluster.get_fluss_connection().await; - - let admin = connection.get_admin().await.expect("Failed to get admin"); - - let table_path = - TablePath::new("fluss".to_string(), "test_lookup_non_existent".to_string()); - - let table_descriptor = TableDescriptor::builder() - .schema( - Schema::builder() - .column("id", DataTypes::int()) - .column("value", DataTypes::string()) - .primary_key(vec!["id".to_string()]) - .build() - .expect("Failed to build schema"), - ) - .build() - .expect("Failed to build table"); - - create_table(&admin, &table_path, &table_descriptor).await; - - let table = connection - .get_table(&table_path) - .await - .expect("Failed to get table"); - - // Insert one record - let table_upsert = table.new_upsert().expect("Failed to create upsert"); - let mut upsert_writer = table_upsert - .create_writer() - .expect("Failed to create writer"); - - let mut row = GenericRow::new(); - row.set_field(0, 1); - row.set_field(1, "exists"); - upsert_writer.upsert(&row).await.expect("Failed to upsert"); - // Lookup non-existent key - let mut lookuper = table - .new_lookup() - .expect("Failed to create lookup") - .create_lookuper() - .expect("Failed to create lookuper"); - let result = lookuper .lookup(&make_key(999)) .await From 4eb5de2c67416d8bff83ba9dd9ea2eba920b16ee Mon Sep 17 00:00:00 2001 From: Keith Lee Date: Fri, 23 Jan 2026 07:11:17 +0000 Subject: [PATCH 11/11] Wait for tablet server to be available --- crates/fluss/tests/integration/utils.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/fluss/tests/integration/utils.rs b/crates/fluss/tests/integration/utils.rs index fbf78d46..4d0c349f 100644 --- a/crates/fluss/tests/integration/utils.rs +++ b/crates/fluss/tests/integration/utils.rs @@ -22,7 +22,7 @@ use parking_lot::RwLock; use std::sync::Arc; use std::time::Duration; -/// Polls the cluster until the CoordinatorEventProcessor is initialized. +/// Polls the cluster until CoordinatorEventProcessor is initialized and tablet server is available. /// Times out after 20 seconds. pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) { let timeout = Duration::from_secs(20); @@ -31,14 +31,20 @@ pub async fn wait_for_cluster_ready(cluster: &FlussTestingCluster) { loop { let connection = cluster.get_fluss_connection().await; - if connection.get_admin().await.is_ok() { + if connection.get_admin().await.is_ok() + && connection + .get_metadata() + .get_cluster() + .get_one_available_server() + .is_some() + { return; } if start.elapsed() >= timeout { panic!( "Server readiness check timed out after {} seconds. \ - CoordinatorEventProcessor may not be initialized.", + CoordinatorEventProcessor may not be initialized or TabletServer may not be available.", timeout.as_secs() ); }