Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 248 additions & 0 deletions crates/fluss/tests/integration/kv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,4 +441,252 @@ mod kv_table_test {
.await
.expect("Failed to drop table");
}

/// Integration test covering put and get operations for all supported datatypes.
#[tokio::test]
async fn all_supported_datatypes() {
use fluss::row::{Date, Datum, Decimal, Time, TimestampLtz, TimestampNtz};

let cluster = get_fluss_cluster();
let connection = cluster.get_fluss_connection().await;

let admin = connection.get_admin().await.expect("Failed to get admin");

let table_path = TablePath::new("fluss".to_string(), "test_all_datatypes".to_string());

// Create a table with all supported primitive datatypes
let table_descriptor = TableDescriptor::builder()
.schema(
Schema::builder()
// Primary key column
.column("pk_int", DataTypes::int())
// Boolean type
.column("col_boolean", DataTypes::boolean())
// Integer types
.column("col_tinyint", DataTypes::tinyint())
.column("col_smallint", DataTypes::smallint())
.column("col_int", DataTypes::int())
.column("col_bigint", DataTypes::bigint())
// Floating point types
.column("col_float", DataTypes::float())
.column("col_double", DataTypes::double())
// String types
.column("col_char", DataTypes::char(10))
.column("col_string", DataTypes::string())
// Decimal type
.column("col_decimal", DataTypes::decimal(10, 2))
// Date and time types
.column("col_date", DataTypes::date())
.column("col_time", DataTypes::time())
.column("col_timestamp", DataTypes::timestamp())
.column("col_timestamp_ltz", DataTypes::timestamp_ltz())
// Binary types
.column("col_bytes", DataTypes::bytes())
.column("col_binary", DataTypes::binary(20))
.primary_key(vec!["pk_int".to_string()])
.build()
.expect("Failed to build schema"),
)
.build()
.expect("Failed to build table");

create_table(&admin, &table_path, &table_descriptor).await;

let table = connection
.get_table(&table_path)
.await
.expect("Failed to get table");

let table_upsert = table.new_upsert().expect("Failed to create upsert");
let mut upsert_writer = table_upsert
.create_writer()
.expect("Failed to create writer");

// Test data for all datatypes
let pk_int = 1i32;
let col_boolean = true;
let col_tinyint = 127i8;
let col_smallint = 32767i16;
let col_int = 2147483647i32;
let col_bigint = 9223372036854775807i64;
let col_float = 3.14f32;
let col_double = 2.718281828459045f64;
let col_char = "hello";
let col_string = "world of fluss rust client";
let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); // 123.45
let col_date = Date::new(20476); // 2026-01-23
let col_time = Time::new(36827123); // 10:13:47.123
let col_timestamp = TimestampNtz::new(1769163227123); // 2026-01-23 10:13:47.123 UTC
let col_timestamp_ltz = TimestampLtz::new(1769163227123); // 2026-01-23 10:13:47.123 UTC
let col_bytes: &[u8] = b"binary data";
let col_binary: &[u8] = b"fixed binary data!!!";

// Upsert a row with all datatypes
let mut row = GenericRow::new();
row.set_field(0, pk_int);
row.set_field(1, col_boolean);
row.set_field(2, col_tinyint);
row.set_field(3, col_smallint);
row.set_field(4, col_int);
row.set_field(5, col_bigint);
row.set_field(6, col_float);
row.set_field(7, col_double);
row.set_field(8, col_char);
row.set_field(9, col_string);
row.set_field(10, col_decimal.clone());
row.set_field(11, col_date);
row.set_field(12, col_time);
row.set_field(13, col_timestamp);
row.set_field(14, col_timestamp_ltz);
row.set_field(15, col_bytes);
row.set_field(16, col_binary);

upsert_writer
.upsert(&row)
.await
.expect("Failed to upsert row with all datatypes");

// Lookup the record
let mut lookuper = table
.new_lookup()
.expect("Failed to create lookup")
.create_lookuper()
.expect("Failed to create lookuper");

let mut key = GenericRow::new();
key.set_field(0, pk_int);

let result = lookuper.lookup(&key).await.expect("Failed to lookup");
let found_row = result
.get_single_row()
.expect("Failed to get row")
.expect("Row should exist");

// Verify all datatypes
assert_eq!(found_row.get_int(0), pk_int, "pk_int mismatch");
assert_eq!(
found_row.get_boolean(1),
col_boolean,
"col_boolean mismatch"
);
assert_eq!(found_row.get_byte(2), col_tinyint, "col_tinyint mismatch");
assert_eq!(
found_row.get_short(3),
col_smallint,
"col_smallint mismatch"
);
assert_eq!(found_row.get_int(4), col_int, "col_int mismatch");
assert_eq!(found_row.get_long(5), col_bigint, "col_bigint mismatch");
assert!(
(found_row.get_float(6) - col_float).abs() < f32::EPSILON,
"col_float mismatch: expected {}, got {}",
col_float,
found_row.get_float(6)
);
assert!(
(found_row.get_double(7) - col_double).abs() < f64::EPSILON,
"col_double mismatch: expected {}, got {}",
col_double,
found_row.get_double(7)
);
assert_eq!(found_row.get_char(8, 10), col_char, "col_char mismatch");
assert_eq!(found_row.get_string(9), col_string, "col_string mismatch");
assert_eq!(
found_row.get_decimal(10, 10, 2),
col_decimal,
"col_decimal mismatch"
);
assert_eq!(
found_row.get_date(11).get_inner(),
col_date.get_inner(),
"col_date mismatch"
);
assert_eq!(
found_row.get_time(12).get_inner(),
col_time.get_inner(),
"col_time mismatch"
);
assert_eq!(
found_row.get_timestamp_ntz(13, 6).get_millisecond(),
col_timestamp.get_millisecond(),
"col_timestamp mismatch"
);
assert_eq!(
found_row.get_timestamp_ltz(14, 6).get_epoch_millisecond(),
col_timestamp_ltz.get_epoch_millisecond(),
"col_timestamp_ltz mismatch"
);
assert_eq!(found_row.get_bytes(15), col_bytes, "col_bytes mismatch");
assert_eq!(
found_row.get_binary(16, 20),
col_binary,
"col_binary mismatch"
);

// Test with null values for nullable columns
let pk_int_2 = 2i32;
let mut row_with_nulls = GenericRow::new();
row_with_nulls.set_field(0, pk_int_2);
row_with_nulls.set_field(1, Datum::Null); // col_boolean
row_with_nulls.set_field(2, Datum::Null); // col_tinyint
row_with_nulls.set_field(3, Datum::Null); // col_smallint
row_with_nulls.set_field(4, Datum::Null); // col_int
row_with_nulls.set_field(5, Datum::Null); // col_bigint
row_with_nulls.set_field(6, Datum::Null); // col_float
row_with_nulls.set_field(7, Datum::Null); // col_double
row_with_nulls.set_field(8, Datum::Null); // col_char
row_with_nulls.set_field(9, Datum::Null); // col_string
row_with_nulls.set_field(10, Datum::Null); // col_decimal
row_with_nulls.set_field(11, Datum::Null); // col_date
row_with_nulls.set_field(12, Datum::Null); // col_time
row_with_nulls.set_field(13, Datum::Null); // col_timestamp
row_with_nulls.set_field(14, Datum::Null); // col_timestamp_ltz
row_with_nulls.set_field(15, Datum::Null); // col_bytes
row_with_nulls.set_field(16, Datum::Null); // col_binary

upsert_writer
.upsert(&row_with_nulls)
.await
.expect("Failed to upsert row with nulls");

// Lookup row with nulls
let mut key2 = GenericRow::new();
key2.set_field(0, pk_int_2);

let result = lookuper.lookup(&key2).await.expect("Failed to lookup");
let found_row_nulls = result
.get_single_row()
.expect("Failed to get row")
.expect("Row should exist");

// Verify all nullable columns are null
assert_eq!(found_row_nulls.get_int(0), pk_int_2, "pk_int mismatch");
assert!(found_row_nulls.is_null_at(1), "col_boolean should be null");
assert!(found_row_nulls.is_null_at(2), "col_tinyint should be null");
assert!(found_row_nulls.is_null_at(3), "col_smallint should be null");
assert!(found_row_nulls.is_null_at(4), "col_int should be null");
assert!(found_row_nulls.is_null_at(5), "col_bigint should be null");
assert!(found_row_nulls.is_null_at(6), "col_float should be null");
assert!(found_row_nulls.is_null_at(7), "col_double should be null");
assert!(found_row_nulls.is_null_at(8), "col_char should be null");
assert!(found_row_nulls.is_null_at(9), "col_string should be null");
assert!(found_row_nulls.is_null_at(10), "col_decimal should be null");
assert!(found_row_nulls.is_null_at(11), "col_date should be null");
assert!(found_row_nulls.is_null_at(12), "col_time should be null");
assert!(
found_row_nulls.is_null_at(13),
"col_timestamp should be null"
);
assert!(
found_row_nulls.is_null_at(14),
"col_timestamp_ltz should be null"
);
assert!(found_row_nulls.is_null_at(15), "col_bytes should be null");
assert!(found_row_nulls.is_null_at(16), "col_binary should be null");

admin
.drop_table(&table_path, false)
.await
.expect("Failed to drop table");
}
}
Loading