From 87a056f09609150ef089d54aa5b4002f62500f0c Mon Sep 17 00:00:00 2001 From: guoxuanlin Date: Fri, 23 Jan 2026 17:19:25 +0800 Subject: [PATCH 1/2] test: add it test for put & get kv for all supported datatypes --- crates/fluss/tests/integration/kv_table.rs | 246 +++++++++++++++++++++ 1 file changed, 246 insertions(+) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index efd7957..850ccfa 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -441,4 +441,250 @@ mod kv_table_test { .await .expect("Failed to drop table"); } + + /// Integration test covering put and get operations for all supported datatypes. + #[tokio::test] + async fn all_supported_datatypes() { + use fluss::row::{Date, Datum, Decimal, Time, TimestampLtz, TimestampNtz}; + + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = + TablePath::new("fluss".to_string(), "test_all_datatypes".to_string()); + + // Create a table with all supported primitive datatypes + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + // Primary key column + .column("pk_int", DataTypes::int()) + // Boolean type + .column("col_boolean", DataTypes::boolean()) + // Integer types + .column("col_tinyint", DataTypes::tinyint()) + .column("col_smallint", DataTypes::smallint()) + .column("col_int", DataTypes::int()) + .column("col_bigint", DataTypes::bigint()) + // Floating point types + .column("col_float", DataTypes::float()) + .column("col_double", DataTypes::double()) + // String types + .column("col_char", DataTypes::char(10)) + .column("col_string", DataTypes::string()) + // Decimal type + .column("col_decimal", DataTypes::decimal(10, 2)) + // Date and time types + .column("col_date", DataTypes::date()) + .column("col_time", DataTypes::time()) + .column("col_timestamp", DataTypes::timestamp()) + .column("col_timestamp_ltz", DataTypes::timestamp_ltz()) + // Binary types + .column("col_bytes", DataTypes::bytes()) + .column("col_binary", DataTypes::binary(20)) + .primary_key(vec!["pk_int".to_string()]) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let table_upsert = table.new_upsert().expect("Failed to create upsert"); + let mut upsert_writer = table_upsert + .create_writer() + .expect("Failed to create writer"); + + // Test data for all datatypes + let pk_int = 1i32; + let col_boolean = true; + let col_tinyint = 127i8; + let col_smallint = 32767i16; + let col_int = 2147483647i32; + let col_bigint = 9223372036854775807i64; + let col_float = 3.14f32; + let col_double = 2.718281828459045f64; + let col_char = "hello"; + let col_string = "world of fluss rust client"; + let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); // 123.45 + let col_date = Date::new(20476); // 2026-01-23 + let col_time = Time::new(36827123); // 10:13:47.123 + let col_timestamp = TimestampNtz::new(1769163227123); // 2026-01-23 10:13:47.123 UTC + let col_timestamp_ltz = TimestampLtz::new(1769163227123); // 2026-01-23 10:13:47.123 UTC + let col_bytes: &[u8] = b"binary data"; + let col_binary: &[u8] = b"fixed binary data!!!"; + + // Upsert a row with all datatypes + let mut row = GenericRow::new(); + row.set_field(0, pk_int); + row.set_field(1, col_boolean); + row.set_field(2, col_tinyint); + row.set_field(3, col_smallint); + row.set_field(4, col_int); + row.set_field(5, col_bigint); + row.set_field(6, col_float); + row.set_field(7, col_double); + row.set_field(8, col_char); + row.set_field(9, col_string); + row.set_field(10, col_decimal.clone()); + row.set_field(11, col_date); + row.set_field(12, col_time); + row.set_field(13, col_timestamp); + row.set_field(14, col_timestamp_ltz); + row.set_field(15, col_bytes); + row.set_field(16, col_binary); + + upsert_writer + .upsert(&row) + .await + .expect("Failed to upsert row with all datatypes"); + + // Lookup the record + let mut lookuper = table + .new_lookup() + .expect("Failed to create lookup") + .create_lookuper() + .expect("Failed to create lookuper"); + + let mut key = GenericRow::new(); + key.set_field(0, pk_int); + + let result = lookuper.lookup(&key).await.expect("Failed to lookup"); + let found_row = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + // Verify all datatypes + assert_eq!(found_row.get_int(0), pk_int, "pk_int mismatch"); + assert_eq!( + found_row.get_boolean(1), + col_boolean, + "col_boolean mismatch" + ); + assert_eq!(found_row.get_byte(2), col_tinyint, "col_tinyint mismatch"); + assert_eq!( + found_row.get_short(3), + col_smallint, + "col_smallint mismatch" + ); + assert_eq!(found_row.get_int(4), col_int, "col_int mismatch"); + assert_eq!(found_row.get_long(5), col_bigint, "col_bigint mismatch"); + assert!( + (found_row.get_float(6) - col_float).abs() < f32::EPSILON, + "col_float mismatch: expected {}, got {}", + col_float, + found_row.get_float(6) + ); + assert!( + (found_row.get_double(7) - col_double).abs() < f64::EPSILON, + "col_double mismatch: expected {}, got {}", + col_double, + found_row.get_double(7) + ); + assert_eq!(found_row.get_char(8, 10), col_char, "col_char mismatch"); + assert_eq!(found_row.get_string(9), col_string, "col_string mismatch"); + assert_eq!( + found_row.get_decimal(10, 10, 2), + col_decimal, + "col_decimal mismatch" + ); + assert_eq!( + found_row.get_date(11).get_inner(), + col_date.get_inner(), + "col_date mismatch" + ); + assert_eq!( + found_row.get_time(12).get_inner(), + col_time.get_inner(), + "col_time mismatch" + ); + assert_eq!( + found_row.get_timestamp_ntz(13, 6).get_millisecond(), + col_timestamp.get_millisecond(), + "col_timestamp mismatch" + ); + assert_eq!( + found_row.get_timestamp_ltz(14, 6).get_epoch_millisecond(), + col_timestamp_ltz.get_epoch_millisecond(), + "col_timestamp_ltz mismatch" + ); + assert_eq!(found_row.get_bytes(15), col_bytes, "col_bytes mismatch"); + assert_eq!( + found_row.get_binary(16, 20), + col_binary, + "col_binary mismatch" + ); + + // Test with null values for nullable columns + let pk_int_2 = 2i32; + let mut row_with_nulls = GenericRow::new(); + row_with_nulls.set_field(0, pk_int_2); + row_with_nulls.set_field(1, Datum::Null); // col_boolean + row_with_nulls.set_field(2, Datum::Null); // col_tinyint + row_with_nulls.set_field(3, Datum::Null); // col_smallint + row_with_nulls.set_field(4, Datum::Null); // col_int + row_with_nulls.set_field(5, Datum::Null); // col_bigint + row_with_nulls.set_field(6, Datum::Null); // col_float + row_with_nulls.set_field(7, Datum::Null); // col_double + row_with_nulls.set_field(8, Datum::Null); // col_char + row_with_nulls.set_field(9, Datum::Null); // col_string + row_with_nulls.set_field(10, Datum::Null); // col_decimal + row_with_nulls.set_field(11, Datum::Null); // col_date + row_with_nulls.set_field(12, Datum::Null); // col_time + row_with_nulls.set_field(13, Datum::Null); // col_timestamp + row_with_nulls.set_field(14, Datum::Null); // col_timestamp_ltz + row_with_nulls.set_field(15, Datum::Null); // col_bytes + row_with_nulls.set_field(16, Datum::Null); // col_binary + + upsert_writer + .upsert(&row_with_nulls) + .await + .expect("Failed to upsert row with nulls"); + + // Lookup row with nulls + let mut key2 = GenericRow::new(); + key2.set_field(0, pk_int_2); + + let result = lookuper.lookup(&key2).await.expect("Failed to lookup"); + let found_row_nulls = result + .get_single_row() + .expect("Failed to get row") + .expect("Row should exist"); + + // Verify all nullable columns are null + assert_eq!(found_row_nulls.get_int(0), pk_int_2, "pk_int mismatch"); + assert!(found_row_nulls.is_null_at(1), "col_boolean should be null"); + assert!(found_row_nulls.is_null_at(2), "col_tinyint should be null"); + assert!(found_row_nulls.is_null_at(3), "col_smallint should be null"); + assert!(found_row_nulls.is_null_at(4), "col_int should be null"); + assert!(found_row_nulls.is_null_at(5), "col_bigint should be null"); + assert!(found_row_nulls.is_null_at(6), "col_float should be null"); + assert!(found_row_nulls.is_null_at(7), "col_double should be null"); + assert!(found_row_nulls.is_null_at(8), "col_char should be null"); + assert!(found_row_nulls.is_null_at(9), "col_string should be null"); + assert!(found_row_nulls.is_null_at(10), "col_decimal should be null"); + assert!(found_row_nulls.is_null_at(11), "col_date should be null"); + assert!(found_row_nulls.is_null_at(12), "col_time should be null"); + assert!(found_row_nulls.is_null_at(13), "col_timestamp should be null"); + assert!( + found_row_nulls.is_null_at(14), + "col_timestamp_ltz should be null" + ); + assert!(found_row_nulls.is_null_at(15), "col_bytes should be null"); + assert!(found_row_nulls.is_null_at(16), "col_binary should be null"); + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } } From 9d1e0b29d508d2ec0c676145c637585df65be7bc Mon Sep 17 00:00:00 2001 From: guoxuanlin Date: Fri, 23 Jan 2026 22:05:51 +0800 Subject: [PATCH 2/2] cargo fmt --- crates/fluss/tests/integration/kv_table.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/crates/fluss/tests/integration/kv_table.rs b/crates/fluss/tests/integration/kv_table.rs index 850ccfa..3f46f9f 100644 --- a/crates/fluss/tests/integration/kv_table.rs +++ b/crates/fluss/tests/integration/kv_table.rs @@ -452,8 +452,7 @@ mod kv_table_test { let admin = connection.get_admin().await.expect("Failed to get admin"); - let table_path = - TablePath::new("fluss".to_string(), "test_all_datatypes".to_string()); + let table_path = TablePath::new("fluss".to_string(), "test_all_datatypes".to_string()); // Create a table with all supported primitive datatypes let table_descriptor = TableDescriptor::builder() @@ -674,7 +673,10 @@ mod kv_table_test { assert!(found_row_nulls.is_null_at(10), "col_decimal should be null"); assert!(found_row_nulls.is_null_at(11), "col_date should be null"); assert!(found_row_nulls.is_null_at(12), "col_time should be null"); - assert!(found_row_nulls.is_null_at(13), "col_timestamp should be null"); + assert!( + found_row_nulls.is_null_at(13), + "col_timestamp should be null" + ); assert!( found_row_nulls.is_null_at(14), "col_timestamp_ltz should be null"