diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index 615e038..3b12150 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -17,8 +17,11 @@ use crate::row::InternalRow; use arrow::array::{ - Array, AsArray, BinaryArray, Decimal128Array, FixedSizeBinaryArray, Float32Array, Float64Array, - Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray, + Array, AsArray, BinaryArray, Date32Array, Decimal128Array, FixedSizeBinaryArray, Float32Array, + Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, RecordBatch, StringArray, + Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, Time64NanosecondArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, }; use arrow::datatypes::{DataType as ArrowDataType, TimeUnit}; use std::sync::Arc; @@ -67,35 +70,110 @@ impl ColumnarRow { ) -> T { let schema = self.record_batch.schema(); let arrow_field = schema.field(pos); - let value = self.get_long(pos); + let column = self.record_batch.column(pos); - match arrow_field.data_type() { - ArrowDataType::Timestamp(time_unit, _) => { - // Convert based on Arrow TimeUnit - let (millis, nanos) = match time_unit { - TimeUnit::Second => (value * 1000, 0), - TimeUnit::Millisecond => (value, 0), - TimeUnit::Microsecond => { - let millis = value / 1000; - let nanos = ((value % 1000) * 1000) as i32; - (millis, nanos) - } - TimeUnit::Nanosecond => { - let millis = value / 1_000_000; - let nanos = (value % 1_000_000) as i32; - (millis, nanos) - } - }; - - if nanos == 0 { - construct_compact(millis) - } else { - // nanos is guaranteed to be in valid range [0, 999_999] by arithmetic - construct_with_nanos(millis, nanos) - .expect("nanos in valid range by construction") + // Read value based on the actual Arrow timestamp type + let value = match arrow_field.data_type() { + ArrowDataType::Timestamp(TimeUnit::Second, _) => column + .as_any() + .downcast_ref::() + .expect("Expected TimestampSecondArray") + .value(self.row_id), + ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => column + .as_any() + .downcast_ref::() + .expect("Expected TimestampMillisecondArray") + .value(self.row_id), + ArrowDataType::Timestamp(TimeUnit::Microsecond, _) => column + .as_any() + .downcast_ref::() + .expect("Expected TimestampMicrosecondArray") + .value(self.row_id), + ArrowDataType::Timestamp(TimeUnit::Nanosecond, _) => column + .as_any() + .downcast_ref::() + .expect("Expected TimestampNanosecondArray") + .value(self.row_id), + other => panic!("Expected Timestamp column at position {pos}, got {other:?}"), + }; + + // Convert based on Arrow TimeUnit + let (millis, nanos) = match arrow_field.data_type() { + ArrowDataType::Timestamp(time_unit, _) => match time_unit { + TimeUnit::Second => (value * 1000, 0), + TimeUnit::Millisecond => (value, 0), + TimeUnit::Microsecond => { + // Use Euclidean division so that nanos is always non-negative, + // even for timestamps before the Unix epoch. + let millis = value.div_euclid(1000); + let nanos = (value.rem_euclid(1000) * 1000) as i32; + (millis, nanos) } + TimeUnit::Nanosecond => { + // Use Euclidean division so that nanos is always in [0, 999_999]. + let millis = value.div_euclid(1_000_000); + let nanos = value.rem_euclid(1_000_000) as i32; + (millis, nanos) + } + }, + _ => unreachable!(), + }; + + if nanos == 0 { + construct_compact(millis) + } else { + // nanos is guaranteed to be in valid range [0, 999_999] by arithmetic + construct_with_nanos(millis, nanos).expect("nanos in valid range by construction") + } + } + + /// Read date value from Arrow Date32Array + fn read_date_from_arrow(&self, pos: usize) -> i32 { + self.record_batch + .column(pos) + .as_any() + .downcast_ref::() + .expect("Expected Date32Array") + .value(self.row_id) + } + + /// Read time value from Arrow Time32/Time64 arrays, converting to milliseconds + fn read_time_from_arrow(&self, pos: usize) -> i32 { + let schema = self.record_batch.schema(); + let arrow_field = schema.field(pos); + let column = self.record_batch.column(pos); + + match arrow_field.data_type() { + ArrowDataType::Time32(TimeUnit::Second) => { + let value = column + .as_any() + .downcast_ref::() + .expect("Expected Time32SecondArray") + .value(self.row_id); + value * 1000 // Convert seconds to milliseconds } - other => panic!("Expected Timestamp column at position {pos}, got {other:?}"), + ArrowDataType::Time32(TimeUnit::Millisecond) => column + .as_any() + .downcast_ref::() + .expect("Expected Time32MillisecondArray") + .value(self.row_id), + ArrowDataType::Time64(TimeUnit::Microsecond) => { + let value = column + .as_any() + .downcast_ref::() + .expect("Expected Time64MicrosecondArray") + .value(self.row_id); + (value / 1000) as i32 // Convert microseconds to milliseconds + } + ArrowDataType::Time64(TimeUnit::Nanosecond) => { + let value = column + .as_any() + .downcast_ref::() + .expect("Expected Time64NanosecondArray") + .value(self.row_id); + (value / 1_000_000) as i32 // Convert nanoseconds to milliseconds + } + other => panic!("Expected Time column at position {pos}, got {other:?}"), } } } @@ -223,11 +301,11 @@ impl InternalRow for ColumnarRow { } fn get_date(&self, pos: usize) -> crate::row::datum::Date { - crate::row::datum::Date::new(self.get_int(pos)) + crate::row::datum::Date::new(self.read_date_from_arrow(pos)) } fn get_time(&self, pos: usize) -> crate::row::datum::Time { - crate::row::datum::Time::new(self.get_int(pos)) + crate::row::datum::Time::new(self.read_time_from_arrow(pos)) } fn get_timestamp_ntz(&self, pos: usize, precision: u32) -> crate::row::datum::TimestampNtz { @@ -253,16 +331,33 @@ impl InternalRow for ColumnarRow { } fn get_char(&self, pos: usize, _length: usize) -> &str { - let array = self - .record_batch - .column(pos) - .as_any() - .downcast_ref::() - .expect("Expected fixed-size binary array for char type"); + let column = self.record_batch.column(pos); + let schema = self.record_batch.schema(); + let arrow_field = schema.field(pos); - let bytes = array.value(self.row_id); - // don't check length, following java client - std::str::from_utf8(bytes).expect("Invalid UTF-8 in char field") + match arrow_field.data_type() { + ArrowDataType::FixedSizeBinary(_) => { + // KV table format: char stored as FixedSizeBinary + let array = column + .as_any() + .downcast_ref::() + .expect("Expected fixed-size binary array for char type"); + let bytes = array.value(self.row_id); + // don't check length, following java client + std::str::from_utf8(bytes).expect("Invalid UTF-8 in char field") + } + ArrowDataType::Utf8 => { + // Log table format: char stored as Utf8 + column + .as_any() + .downcast_ref::() + .expect("Expected String array for char type") + .value(self.row_id) + } + other => panic!( + "Expected FixedSizeBinary or Utf8 for char type at position {pos}, got {other:?}" + ), + } } fn get_string(&self, pos: usize) -> &str { diff --git a/crates/fluss/tests/integration/table.rs b/crates/fluss/tests/integration/table.rs index ef73b56..8dbe4bd 100644 --- a/crates/fluss/tests/integration/table.rs +++ b/crates/fluss/tests/integration/table.rs @@ -570,4 +570,395 @@ mod table_test { // Projected batch should have 1 column (id), not 2 (id, name) assert_eq!(proj_batches[0].num_columns(), 1); } + + /// Integration test covering produce and scan operations for all supported datatypes + /// in log tables. + #[tokio::test] + async fn all_supported_datatypes() { + use fluss::row::{Date, Datum, Decimal, GenericRow, Time, TimestampLtz, TimestampNtz}; + + let cluster = get_fluss_cluster(); + let connection = cluster.get_fluss_connection().await; + + let admin = connection.get_admin().await.expect("Failed to get admin"); + + let table_path = TablePath::new("fluss".to_string(), "test_log_all_datatypes".to_string()); + + // Create a log table with all supported datatypes for append/scan + let table_descriptor = TableDescriptor::builder() + .schema( + Schema::builder() + // Integer types + .column("col_tinyint", DataTypes::tinyint()) + .column("col_smallint", DataTypes::smallint()) + .column("col_int", DataTypes::int()) + .column("col_bigint", DataTypes::bigint()) + // Floating point types + .column("col_float", DataTypes::float()) + .column("col_double", DataTypes::double()) + // Boolean type + .column("col_boolean", DataTypes::boolean()) + // Char type + .column("col_char", DataTypes::char(10)) + // String type + .column("col_string", DataTypes::string()) + // Decimal type + .column("col_decimal", DataTypes::decimal(10, 2)) + // Date type + .column("col_date", DataTypes::date()) + // Time types + .column("col_time_s", DataTypes::time_with_precision(0)) + .column("col_time_ms", DataTypes::time_with_precision(3)) + .column("col_time_us", DataTypes::time_with_precision(6)) + .column("col_time_ns", DataTypes::time_with_precision(9)) + // Timestamp types + .column("col_timestamp_s", DataTypes::timestamp_with_precision(0)) + .column("col_timestamp_ms", DataTypes::timestamp_with_precision(3)) + .column("col_timestamp_us", DataTypes::timestamp_with_precision(6)) + .column("col_timestamp_ns", DataTypes::timestamp_with_precision(9)) + // Timestamp_ltz types + .column( + "col_timestamp_ltz_s", + DataTypes::timestamp_ltz_with_precision(0), + ) + .column( + "col_timestamp_ltz_ms", + DataTypes::timestamp_ltz_with_precision(3), + ) + .column( + "col_timestamp_ltz_us", + DataTypes::timestamp_ltz_with_precision(6), + ) + .column( + "col_timestamp_ltz_ns", + DataTypes::timestamp_ltz_with_precision(9), + ) + // Bytes type + .column("col_bytes", DataTypes::bytes()) + // Timestamp types with negative values (before Unix epoch) + .column( + "col_timestamp_us_neg", + DataTypes::timestamp_with_precision(6), + ) + .column( + "col_timestamp_ns_neg", + DataTypes::timestamp_with_precision(9), + ) + .column( + "col_timestamp_ltz_us_neg", + DataTypes::timestamp_ltz_with_precision(6), + ) + .column( + "col_timestamp_ltz_ns_neg", + DataTypes::timestamp_ltz_with_precision(9), + ) + .build() + .expect("Failed to build schema"), + ) + .build() + .expect("Failed to build table"); + + create_table(&admin, &table_path, &table_descriptor).await; + + let table = connection + .get_table(&table_path) + .await + .expect("Failed to get table"); + + let field_count = table.table_info().schema.columns().len(); + + let append_writer = table + .new_append() + .expect("Failed to create append") + .create_writer(); + + // Test data for all datatypes + let col_tinyint = 127i8; + let col_smallint = 32767i16; + let col_int = 2147483647i32; + let col_bigint = 9223372036854775807i64; + let col_float = 3.14f32; + let col_double = 2.718281828459045f64; + let col_boolean = true; + let col_char = "hello"; + let col_string = "world of fluss rust client"; + let col_decimal = Decimal::from_unscaled_long(12345, 10, 2).unwrap(); // 123.45 + let col_date = Date::new(20476); // 2026-01-23 + let col_time_s = Time::new(36827000); // 10:13:47 + let col_time_ms = Time::new(36827123); // 10:13:47.123 + let col_time_us = Time::new(86399999); // 23:59:59.999 + let col_time_ns = Time::new(1); // 00:00:00.001 + // 2026-01-23 10:13:47 UTC + let col_timestamp_s = TimestampNtz::new(1769163227000); + // 2026-01-23 10:13:47.123 UTC + let col_timestamp_ms = TimestampNtz::new(1769163227123); + // 2026-01-23 10:13:47.123456 UTC + let col_timestamp_us = TimestampNtz::from_millis_nanos(1769163227123, 456000).unwrap(); + // 2026-01-23 10:13:47.123999999 UTC + let col_timestamp_ns = TimestampNtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + let col_timestamp_ltz_s = TimestampLtz::new(1769163227000); + let col_timestamp_ltz_ms = TimestampLtz::new(1769163227123); + let col_timestamp_ltz_us = TimestampLtz::from_millis_nanos(1769163227123, 456000).unwrap(); + let col_timestamp_ltz_ns = TimestampLtz::from_millis_nanos(1769163227123, 999_999).unwrap(); + let col_bytes: Vec = b"binary data".to_vec(); + + // 1960-06-15 08:30:45.123456 UTC (before 1970) + let col_timestamp_us_neg = TimestampNtz::from_millis_nanos(-301234154877, 456000).unwrap(); + // 1960-06-15 08:30:45.123999999 UTC (before 1970) + let col_timestamp_ns_neg = TimestampNtz::from_millis_nanos(-301234154877, 999_999).unwrap(); + let col_timestamp_ltz_us_neg = + TimestampLtz::from_millis_nanos(-301234154877, 456000).unwrap(); + let col_timestamp_ltz_ns_neg = + TimestampLtz::from_millis_nanos(-301234154877, 999_999).unwrap(); + + // Append a row with all datatypes + let mut row = GenericRow::new(); + row.set_field(0, col_tinyint); + row.set_field(1, col_smallint); + row.set_field(2, col_int); + row.set_field(3, col_bigint); + row.set_field(4, col_float); + row.set_field(5, col_double); + row.set_field(6, col_boolean); + row.set_field(7, col_char); + row.set_field(8, col_string); + row.set_field(9, col_decimal.clone()); + row.set_field(10, col_date); + row.set_field(11, col_time_s); + row.set_field(12, col_time_ms); + row.set_field(13, col_time_us); + row.set_field(14, col_time_ns); + row.set_field(15, col_timestamp_s); + row.set_field(16, col_timestamp_ms); + row.set_field(17, col_timestamp_us.clone()); + row.set_field(18, col_timestamp_ns.clone()); + row.set_field(19, col_timestamp_ltz_s); + row.set_field(20, col_timestamp_ltz_ms); + row.set_field(21, col_timestamp_ltz_us.clone()); + row.set_field(22, col_timestamp_ltz_ns.clone()); + row.set_field(23, col_bytes.as_slice()); + row.set_field(24, col_timestamp_us_neg.clone()); + row.set_field(25, col_timestamp_ns_neg.clone()); + row.set_field(26, col_timestamp_ltz_us_neg.clone()); + row.set_field(27, col_timestamp_ltz_ns_neg.clone()); + + append_writer + .append(row) + .await + .expect("Failed to append row with all datatypes"); + + // Append a row with null values for all columns + let mut row_with_nulls = GenericRow::new(); + for i in 0..field_count { + row_with_nulls.set_field(i, Datum::Null); + } + + append_writer + .append(row_with_nulls) + .await + .expect("Failed to append row with nulls"); + + append_writer.flush().await.expect("Failed to flush"); + + // Scan the records + let records = scan_table(&table, |scan| scan).await; + + assert_eq!(records.len(), 2, "Expected 2 records"); + + let found_row = records[0].row(); + assert_eq!(found_row.get_byte(0), col_tinyint, "col_tinyint mismatch"); + assert_eq!( + found_row.get_short(1), + col_smallint, + "col_smallint mismatch" + ); + assert_eq!(found_row.get_int(2), col_int, "col_int mismatch"); + assert_eq!(found_row.get_long(3), col_bigint, "col_bigint mismatch"); + assert!( + (found_row.get_float(4) - col_float).abs() < f32::EPSILON, + "col_float mismatch: expected {}, got {}", + col_float, + found_row.get_float(4) + ); + assert!( + (found_row.get_double(5) - col_double).abs() < f64::EPSILON, + "col_double mismatch: expected {}, got {}", + col_double, + found_row.get_double(5) + ); + assert_eq!( + found_row.get_boolean(6), + col_boolean, + "col_boolean mismatch" + ); + assert_eq!(found_row.get_char(7, 10), col_char, "col_char mismatch"); + assert_eq!(found_row.get_string(8), col_string, "col_string mismatch"); + assert_eq!( + found_row.get_decimal(9, 10, 2), + col_decimal, + "col_decimal mismatch" + ); + assert_eq!( + found_row.get_date(10).get_inner(), + col_date.get_inner(), + "col_date mismatch" + ); + + assert_eq!( + found_row.get_time(11).get_inner(), + col_time_s.get_inner(), + "col_time_s mismatch" + ); + + assert_eq!( + found_row.get_time(12).get_inner(), + col_time_ms.get_inner(), + "col_time_ms mismatch" + ); + + assert_eq!( + found_row.get_time(13).get_inner(), + col_time_us.get_inner(), + "col_time_us mismatch" + ); + + assert_eq!( + found_row.get_time(14).get_inner(), + col_time_ns.get_inner(), + "col_time_ns mismatch" + ); + + assert_eq!( + found_row.get_timestamp_ntz(15, 0).get_millisecond(), + col_timestamp_s.get_millisecond(), + "col_timestamp_s mismatch" + ); + + assert_eq!( + found_row.get_timestamp_ntz(16, 3).get_millisecond(), + col_timestamp_ms.get_millisecond(), + "col_timestamp_ms mismatch" + ); + + let read_ts_us = found_row.get_timestamp_ntz(17, 6); + assert_eq!( + read_ts_us.get_millisecond(), + col_timestamp_us.get_millisecond(), + "col_timestamp_us millis mismatch" + ); + assert_eq!( + read_ts_us.get_nano_of_millisecond(), + col_timestamp_us.get_nano_of_millisecond(), + "col_timestamp_us nanos mismatch" + ); + + let read_ts_ns = found_row.get_timestamp_ntz(18, 9); + assert_eq!( + read_ts_ns.get_millisecond(), + col_timestamp_ns.get_millisecond(), + "col_timestamp_ns millis mismatch" + ); + assert_eq!( + read_ts_ns.get_nano_of_millisecond(), + col_timestamp_ns.get_nano_of_millisecond(), + "col_timestamp_ns nanos mismatch" + ); + + assert_eq!( + found_row.get_timestamp_ltz(19, 0).get_epoch_millisecond(), + col_timestamp_ltz_s.get_epoch_millisecond(), + "col_timestamp_ltz_s mismatch" + ); + + assert_eq!( + found_row.get_timestamp_ltz(20, 3).get_epoch_millisecond(), + col_timestamp_ltz_ms.get_epoch_millisecond(), + "col_timestamp_ltz_ms mismatch" + ); + + let read_ts_ltz_us = found_row.get_timestamp_ltz(21, 6); + assert_eq!( + read_ts_ltz_us.get_epoch_millisecond(), + col_timestamp_ltz_us.get_epoch_millisecond(), + "col_timestamp_ltz_us millis mismatch" + ); + assert_eq!( + read_ts_ltz_us.get_nano_of_millisecond(), + col_timestamp_ltz_us.get_nano_of_millisecond(), + "col_timestamp_ltz_us nanos mismatch" + ); + + let read_ts_ltz_ns = found_row.get_timestamp_ltz(22, 9); + assert_eq!( + read_ts_ltz_ns.get_epoch_millisecond(), + col_timestamp_ltz_ns.get_epoch_millisecond(), + "col_timestamp_ltz_ns millis mismatch" + ); + assert_eq!( + read_ts_ltz_ns.get_nano_of_millisecond(), + col_timestamp_ltz_ns.get_nano_of_millisecond(), + "col_timestamp_ltz_ns nanos mismatch" + ); + assert_eq!(found_row.get_bytes(23), col_bytes, "col_bytes mismatch"); + + // Verify timestamps before Unix epoch (negative timestamps) + let read_ts_us_neg = found_row.get_timestamp_ntz(24, 6); + assert_eq!( + read_ts_us_neg.get_millisecond(), + col_timestamp_us_neg.get_millisecond(), + "col_timestamp_us_neg millis mismatch" + ); + assert_eq!( + read_ts_us_neg.get_nano_of_millisecond(), + col_timestamp_us_neg.get_nano_of_millisecond(), + "col_timestamp_us_neg nanos mismatch" + ); + + let read_ts_ns_neg = found_row.get_timestamp_ntz(25, 9); + assert_eq!( + read_ts_ns_neg.get_millisecond(), + col_timestamp_ns_neg.get_millisecond(), + "col_timestamp_ns_neg millis mismatch" + ); + assert_eq!( + read_ts_ns_neg.get_nano_of_millisecond(), + col_timestamp_ns_neg.get_nano_of_millisecond(), + "col_timestamp_ns_neg nanos mismatch" + ); + + let read_ts_ltz_us_neg = found_row.get_timestamp_ltz(26, 6); + assert_eq!( + read_ts_ltz_us_neg.get_epoch_millisecond(), + col_timestamp_ltz_us_neg.get_epoch_millisecond(), + "col_timestamp_ltz_us_neg millis mismatch" + ); + assert_eq!( + read_ts_ltz_us_neg.get_nano_of_millisecond(), + col_timestamp_ltz_us_neg.get_nano_of_millisecond(), + "col_timestamp_ltz_us_neg nanos mismatch" + ); + + let read_ts_ltz_ns_neg = found_row.get_timestamp_ltz(27, 9); + assert_eq!( + read_ts_ltz_ns_neg.get_epoch_millisecond(), + col_timestamp_ltz_ns_neg.get_epoch_millisecond(), + "col_timestamp_ltz_ns_neg millis mismatch" + ); + assert_eq!( + read_ts_ltz_ns_neg.get_nano_of_millisecond(), + col_timestamp_ltz_ns_neg.get_nano_of_millisecond(), + "col_timestamp_ltz_ns_neg nanos mismatch" + ); + + // Verify row with all nulls (record index 1) + let found_row_nulls = records[1].row(); + for i in 0..field_count { + assert!(found_row_nulls.is_null_at(i), "column {} should be null", i); + } + + admin + .drop_table(&table_path, false) + .await + .expect("Failed to drop table"); + } }