diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 4da8bf83..ff4d687d 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -36,3 +36,4 @@ arrow-schema = "57.0.0" arrow-array = "57.0.0" pyo3-async-runtimes = { version = "0.26.0", features = ["tokio-runtime"] } jiff = { workspace = true } +bigdecimal = "0.4" diff --git a/bindings/python/example/example.py b/bindings/python/example/example.py index f1f20d15..730416bb 100644 --- a/bindings/python/example/example.py +++ b/bindings/python/example/example.py @@ -17,6 +17,8 @@ import asyncio import time +from datetime import date, time as dt_time, datetime +from decimal import Decimal import pandas as pd import pyarrow as pa @@ -45,6 +47,11 @@ async def main(): pa.field("name", pa.string()), pa.field("score", pa.float32()), pa.field("age", pa.int32()), + pa.field("birth_date", pa.date32()), + pa.field("check_in_time", pa.time32("ms")), + pa.field("created_at", pa.timestamp("us")), # TIMESTAMP (NTZ) + pa.field("updated_at", pa.timestamp("us", tz="UTC")), # TIMESTAMP_LTZ + pa.field("salary", pa.decimal128(10, 2)), ] # Create a PyArrow schema @@ -60,7 +67,7 @@ async def main(): admin = await conn.get_admin() # Create a Fluss table - table_path = fluss.TablePath("fluss", "sample_table") + table_path = fluss.TablePath("fluss", "sample_table_types") try: await admin.create_table(table_path, table_descriptor, True) @@ -96,6 +103,11 @@ async def main(): pa.array(["Alice", "Bob", "Charlie"], type=pa.string()), pa.array([95.2, 87.2, 92.1], type=pa.float32()), pa.array([25, 30, 35], type=pa.int32()), + pa.array([date(1999, 5, 15), date(1994, 3, 20), date(1989, 11, 8)], type=pa.date32()), + pa.array([dt_time(9, 0, 0), dt_time(9, 30, 0), dt_time(10, 0, 0)], type=pa.time32("ms")), + pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us")), + pa.array([datetime(2024, 1, 15, 10, 30), datetime(2024, 1, 15, 11, 0), datetime(2024, 1, 15, 11, 30)], type=pa.timestamp("us", tz="UTC")), + pa.array([Decimal("75000.00"), Decimal("82000.50"), Decimal("95000.75")], type=pa.decimal128(10, 2)), ], schema=schema, ) @@ -111,6 +123,11 @@ async def main(): pa.array(["David", "Eve"], type=pa.string()), pa.array([88.5, 91.0], type=pa.float32()), pa.array([28, 32], type=pa.int32()), + pa.array([date(1996, 7, 22), date(1992, 12, 1)], type=pa.date32()), + pa.array([dt_time(14, 15, 0), dt_time(8, 45, 0)], type=pa.time32("ms")), + pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], type=pa.timestamp("us")), + pa.array([datetime(2024, 1, 16, 9, 0), datetime(2024, 1, 16, 9, 30)], type=pa.timestamp("us", tz="UTC")), + pa.array([Decimal("68000.00"), Decimal("72500.25")], type=pa.decimal128(10, 2)), ], schema=schema, ) @@ -118,15 +135,32 @@ async def main(): append_writer.write_arrow_batch(pa_record_batch) print("Successfully wrote PyArrow RecordBatch") - # Test 3: Append single rows - print("\n--- Testing single row append ---") - # Dict input - await append_writer.append({"id": 8, "name": "Helen", "score": 93.5, "age": 26}) - print("Successfully appended row (dict)") - - # List input - await append_writer.append([9, "Ivan", 90.0, 31]) - print("Successfully appended row (list)") + # Test 3: Append single rows with Date, Time, Timestamp, Decimal + print("\n--- Testing single row append with temporal/decimal types ---") + # Dict input with all types including Date, Time, Timestamp, Decimal + await append_writer.append({ + "id": 8, + "name": "Helen", + "score": 93.5, + "age": 26, + "birth_date": date(1998, 4, 10), + "check_in_time": dt_time(11, 30, 45), + "created_at": datetime(2024, 1, 17, 14, 0, 0), + "updated_at": datetime(2024, 1, 17, 14, 0, 0), + "salary": Decimal("88000.00"), + }) + print("Successfully appended row (dict with Date, Time, Timestamp, Decimal)") + + # List input with all types + await append_writer.append([ + 9, "Ivan", 90.0, 31, + date(1993, 8, 25), + dt_time(16, 45, 0), + datetime(2024, 1, 17, 15, 30, 0), + datetime(2024, 1, 17, 15, 30, 0), + Decimal("91500.50"), + ]) + print("Successfully appended row (list with Date, Time, Timestamp, Decimal)") # Test 4: Write Pandas DataFrame print("\n--- Testing Pandas DataFrame write ---") @@ -136,6 +170,11 @@ async def main(): "name": ["Frank", "Grace"], "score": [89.3, 94.7], "age": [29, 27], + "birth_date": [date(1995, 2, 14), date(1997, 9, 30)], + "check_in_time": [dt_time(10, 0, 0), dt_time(10, 30, 0)], + "created_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 18, 8, 30)], + "updated_at": [datetime(2024, 1, 18, 8, 0), datetime(2024, 1, 18, 8, 30)], + "salary": [Decimal("79000.00"), Decimal("85500.75")], } ) diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 773354e8..b56a29db 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -18,12 +18,22 @@ use crate::TOKIO_RUNTIME; use crate::*; use arrow::array::RecordBatch; -use arrow_pyarrow::FromPyArrow; +use arrow_pyarrow::{FromPyArrow, ToPyArrow}; use fluss::client::EARLIEST_OFFSET; use fluss::rpc::message::OffsetSpec; +use pyo3::types::IntoPyDict; use pyo3_async_runtimes::tokio::future_into_py; use std::sync::Arc; +// Time conversion constants +const MILLIS_PER_SECOND: i64 = 1_000; +const MILLIS_PER_MINUTE: i64 = 60_000; +const MILLIS_PER_HOUR: i64 = 3_600_000; +const MICROS_PER_MILLI: i64 = 1_000; +const MICROS_PER_SECOND: i64 = 1_000_000; +const MICROS_PER_DAY: i64 = 86_400_000_000; +const NANOS_PER_MILLI: i64 = 1_000_000; + /// Represents a Fluss table for data operations #[pyclass] pub struct FlussTable { @@ -246,14 +256,29 @@ impl AppendWriter { /// Write Pandas DataFrame data pub fn write_pandas(&self, py: Python, df: Py) -> PyResult<()> { + // Get the expected Arrow schema from the Fluss table + let row_type = self.table_info.get_row_type(); + let expected_schema = fcore::record::to_arrow_schema(row_type) + .map_err(|e| FlussError::new_err(format!("Failed to get table schema: {}", e)))?; + + // Convert Arrow schema to PyArrow schema + let py_schema = expected_schema + .as_ref() + .to_pyarrow(py) + .map_err(|e| FlussError::new_err(format!("Failed to convert schema: {}", e)))?; + // Import pyarrow module let pyarrow = py.import("pyarrow")?; // Get the Table class from pyarrow module let table_class = pyarrow.getattr("Table")?; - // Call Table.from_pandas(df) - from_pandas is a class method - let pa_table = table_class.call_method1("from_pandas", (df,))?; + // Call Table.from_pandas(df, schema=expected_schema) to ensure proper type casting + let pa_table = table_class.call_method( + "from_pandas", + (df,), + Some(&[("schema", py_schema)].into_py_dict(py)?), + )?; // Then call write_arrow with the converted table self.write_arrow(py, pa_table.into()) @@ -473,12 +498,393 @@ fn python_value_to_datum( ))) } } + fcore::metadata::DataType::Decimal(decimal_type) => { + python_decimal_to_datum(value, decimal_type.precision(), decimal_type.scale()) + } + fcore::metadata::DataType::Date(_) => python_date_to_datum(value), + fcore::metadata::DataType::Time(_) => python_time_to_datum(value), + fcore::metadata::DataType::Timestamp(_) => python_datetime_to_timestamp_ntz(value), + fcore::metadata::DataType::TimestampLTz(_) => python_datetime_to_timestamp_ltz(value), _ => Err(FlussError::new_err(format!( "Unsupported data type for row-level operations: {data_type}" ))), } } +/// Cached decimal.Decimal type +/// Uses PyOnceLock for thread-safety and subinterpreter compatibility. +static DECIMAL_TYPE: pyo3::sync::PyOnceLock> = + pyo3::sync::PyOnceLock::new(); + +/// Cached UTC epoch type +static UTC_EPOCH: pyo3::sync::PyOnceLock> = pyo3::sync::PyOnceLock::new(); + +/// Get the cached decimal.Decimal type, importing it once per interpreter. +fn get_decimal_type(py: Python) -> PyResult> { + let ty = DECIMAL_TYPE.get_or_try_init(py, || -> PyResult<_> { + let decimal_mod = py.import("decimal")?; + let decimal_ty = decimal_mod + .getattr("Decimal")? + .downcast_into::()?; + Ok(decimal_ty.unbind()) + })?; + Ok(ty.bind(py).clone()) +} + +/// Get the cached UTC epoch datetime, creating it once per interpreter. +fn get_utc_epoch(py: Python) -> PyResult> { + let epoch = UTC_EPOCH.get_or_try_init(py, || -> PyResult<_> { + let datetime_mod = py.import("datetime")?; + let timezone = datetime_mod.getattr("timezone")?; + let utc = timezone.getattr("utc")?; + let epoch = datetime_mod + .getattr("datetime")? + .call1((1970, 1, 1, 0, 0, 0, 0, &utc))?; + Ok(epoch.unbind()) + })?; + Ok(epoch.bind(py).clone()) +} + +/// Validate that value is a decimal.Decimal instance. +fn ensure_is_decimal(value: &Bound) -> PyResult<()> { + let decimal_ty = get_decimal_type(value.py())?; + if !value.is_instance(&decimal_ty.into_any())? { + return Err(FlussError::new_err(format!( + "Expected decimal.Decimal, got {}", + get_type_name(value) + ))); + } + Ok(()) +} + +/// Convert Python decimal.Decimal to Datum::Decimal. +/// Only accepts decimal.Decimal +fn python_decimal_to_datum( + value: &Bound, + precision: u32, + scale: u32, +) -> PyResult> { + use std::str::FromStr; + + ensure_is_decimal(value)?; + + let decimal_str: String = value.str()?.extract()?; + let bd = bigdecimal::BigDecimal::from_str(&decimal_str).map_err(|e| { + FlussError::new_err(format!("Failed to parse decimal '{}': {}", decimal_str, e)) + })?; + + let decimal = fcore::row::Decimal::from_big_decimal(bd, precision, scale).map_err(|e| { + FlussError::new_err(format!( + "Failed to convert decimal '{}' to DECIMAL({}, {}): {}", + decimal_str, precision, scale, e + )) + })?; + + Ok(fcore::row::Datum::Decimal(decimal)) +} + +/// Convert Python datetime.date to Datum::Date. +fn python_date_to_datum(value: &Bound) -> PyResult> { + use pyo3::types::{PyDate, PyDateAccess, PyDateTime}; + + // Reject datetime.datetime (subclass of date) - use timestamp columns for those + if value.downcast::().is_ok() { + return Err(FlussError::new_err( + "Expected datetime.date, got datetime.datetime. Use a TIMESTAMP column for datetime values.", + )); + } + + let date = value.downcast::().map_err(|_| { + FlussError::new_err(format!( + "Expected datetime.date, got {}", + get_type_name(value) + )) + })?; + + let year = date.get_year(); + let month = date.get_month(); + let day = date.get_day(); + + // Calculate days since Unix epoch (1970-01-01) + let civil_date = jiff::civil::date(year as i16, month as i8, day as i8); + let epoch = jiff::civil::date(1970, 1, 1); + let days_since_epoch = (civil_date - epoch).get_days(); + + Ok(fcore::row::Datum::Date(fcore::row::Date::new( + days_since_epoch, + ))) +} + +/// Convert Python datetime.time to Datum::Time. +/// Uses PyO3's native PyTime type for efficient access. +/// +/// Note: Fluss TIME is always stored as milliseconds since midnight (i32) regardless +/// of the schema's precision setting. This matches the Java Fluss wire protocol. +/// Sub-millisecond precision (microseconds not divisible by 1000) will raise an error +/// to prevent silent data loss and ensure fail-fast behavior. +fn python_time_to_datum(value: &Bound) -> PyResult> { + use pyo3::types::{PyTime, PyTimeAccess}; + + let time = value.downcast::().map_err(|_| { + FlussError::new_err(format!( + "Expected datetime.time, got {}", + get_type_name(value) + )) + })?; + + let hour = time.get_hour() as i32; + let minute = time.get_minute() as i32; + let second = time.get_second() as i32; + let microsecond = time.get_microsecond() as i32; + + // Strict validation: reject sub-millisecond precision + if microsecond % MICROS_PER_MILLI as i32 != 0 { + return Err(FlussError::new_err(format!( + "TIME values with sub-millisecond precision are not supported. \ + Got time with {} microseconds (not divisible by 1000). \ + Fluss stores TIME as milliseconds since midnight. \ + Please round to milliseconds before insertion.", + microsecond + ))); + } + + // Convert to milliseconds since midnight + let millis = hour * MILLIS_PER_HOUR as i32 + + minute * MILLIS_PER_MINUTE as i32 + + second * MILLIS_PER_SECOND as i32 + + microsecond / MICROS_PER_MILLI as i32; + + Ok(fcore::row::Datum::Time(fcore::row::Time::new(millis))) +} + +/// Convert Python datetime-like object to Datum::TimestampNtz. +/// Supports: datetime.datetime (naive preferred), pd.Timestamp, np.datetime64 +fn python_datetime_to_timestamp_ntz(value: &Bound) -> PyResult> { + let (epoch_millis, nano_of_milli) = extract_datetime_components_ntz(value)?; + + let ts = fcore::row::TimestampNtz::from_millis_nanos(epoch_millis, nano_of_milli) + .map_err(|e| FlussError::new_err(format!("Failed to create TimestampNtz: {}", e)))?; + + Ok(fcore::row::Datum::TimestampNtz(ts)) +} + +/// Convert Python datetime-like object to Datum::TimestampLtz. +/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC. +/// Supports: datetime.datetime, pd.Timestamp, np.datetime64 +fn python_datetime_to_timestamp_ltz(value: &Bound) -> PyResult> { + let (epoch_millis, nano_of_milli) = extract_datetime_components_ltz(value)?; + + let ts = fcore::row::TimestampLtz::from_millis_nanos(epoch_millis, nano_of_milli) + .map_err(|e| FlussError::new_err(format!("Failed to create TimestampLtz: {}", e)))?; + + Ok(fcore::row::Datum::TimestampLtz(ts)) +} + +/// Extract epoch milliseconds for TimestampNtz (wall-clock time, no timezone conversion). +/// Uses integer arithmetic to avoid float precision issues. +/// For clarity, tz-aware datetimes are rejected - use TimestampLtz for those. +fn extract_datetime_components_ntz(value: &Bound) -> PyResult<(i64, i32)> { + use pyo3::types::PyDateTime; + + // Try PyDateTime first + if let Ok(dt) = value.downcast::() { + // Reject tz-aware datetime for NTZ - it's ambiguous what the user wants + let tzinfo = dt.getattr("tzinfo")?; + if !tzinfo.is_none() { + return Err(FlussError::new_err( + "TIMESTAMP (without timezone) requires a naive datetime. \ + Got timezone-aware datetime. Either remove tzinfo or use TIMESTAMP_LTZ column.", + )); + } + return datetime_to_epoch_millis_as_utc(dt); + } + + // Check for pandas Timestamp by verifying module name + if is_pandas_timestamp(value) { + // For NTZ, reject tz-aware pandas Timestamps for consistency with datetime behavior + if let Ok(tz) = value.getattr("tz") { + if !tz.is_none() { + return Err(FlussError::new_err( + "TIMESTAMP (without timezone) requires a naive pd.Timestamp. \ + Got timezone-aware Timestamp. Either use tz_localize(None) or use TIMESTAMP_LTZ column.", + )); + } + } + // Naive pandas Timestamp: .value is nanoseconds since epoch (wall-clock as UTC) + let nanos: i64 = value.getattr("value")?.extract()?; + return Ok(nanos_to_millis_and_submillis(nanos)); + } + + // Try to_pydatetime() for objects that support it + if let Ok(py_dt) = value.call_method0("to_pydatetime") { + if let Ok(dt) = py_dt.downcast::() { + let tzinfo = dt.getattr("tzinfo")?; + if !tzinfo.is_none() { + return Err(FlussError::new_err( + "TIMESTAMP (without timezone) requires a naive datetime. \ + Got timezone-aware value. Use TIMESTAMP_LTZ column instead.", + )); + } + return datetime_to_epoch_millis_as_utc(dt); + } + } + + Err(FlussError::new_err(format!( + "Expected naive datetime.datetime or pd.Timestamp, got {}", + get_type_name(value) + ))) +} + +/// Extract epoch milliseconds for TimestampLtz (instant in time, UTC-based). +/// For naive datetimes, assumes UTC. For aware datetimes, converts to UTC. +fn extract_datetime_components_ltz(value: &Bound) -> PyResult<(i64, i32)> { + use pyo3::types::PyDateTime; + + // Try PyDateTime first + if let Ok(dt) = value.downcast::() { + // Check if timezone-aware + let tzinfo = dt.getattr("tzinfo")?; + if tzinfo.is_none() { + // Naive datetime: assume UTC (treat components as UTC time) + return datetime_to_epoch_millis_as_utc(dt); + } else { + // Aware datetime: use timedelta from epoch to get correct UTC instant + return datetime_to_epoch_millis_utc_aware(dt); + } + } + + // Check for pandas Timestamp + if is_pandas_timestamp(value) { + // pandas Timestamp.value is always nanoseconds since UTC epoch + let nanos: i64 = value.getattr("value")?.extract()?; + return Ok(nanos_to_millis_and_submillis(nanos)); + } + + // Try to_pydatetime() + if let Ok(py_dt) = value.call_method0("to_pydatetime") { + if let Ok(dt) = py_dt.downcast::() { + let tzinfo = dt.getattr("tzinfo")?; + if tzinfo.is_none() { + return datetime_to_epoch_millis_as_utc(dt); + } else { + return datetime_to_epoch_millis_utc_aware(dt); + } + } + } + + Err(FlussError::new_err(format!( + "Expected datetime.datetime or pd.Timestamp, got {}", + get_type_name(value) + ))) +} + +/// Convert datetime components to epoch milliseconds treating them as UTC +fn datetime_to_epoch_millis_as_utc( + dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, +) -> PyResult<(i64, i32)> { + use pyo3::types::{PyDateAccess, PyTimeAccess}; + + let year = dt.get_year(); + let month = dt.get_month(); + let day = dt.get_day(); + let hour = dt.get_hour(); + let minute = dt.get_minute(); + let second = dt.get_second(); + let microsecond = dt.get_microsecond(); + + // Create jiff civil datetime and convert to UTC timestamp + // Safe casts: hour (0-23), minute (0-59), second (0-59) all fit in i8 + let civil_dt = jiff::civil::date(year as i16, month as i8, day as i8).at( + hour as i8, + minute as i8, + second as i8, + microsecond as i32 * 1000, + ); + + let timestamp = jiff::tz::Offset::UTC + .to_timestamp(civil_dt) + .map_err(|e| FlussError::new_err(format!("Invalid datetime: {}", e)))?; + + let millis = timestamp.as_millisecond(); + let nano_of_milli = (timestamp.subsec_nanosecond() % NANOS_PER_MILLI as i32) as i32; + + Ok((millis, nano_of_milli)) +} + +/// Convert timezone-aware datetime to epoch milliseconds using Python's timedelta. +/// This correctly handles timezone conversions by computing (dt - UTC_EPOCH). +/// The UTC epoch is cached for performance. +fn datetime_to_epoch_millis_utc_aware( + dt: &pyo3::Bound<'_, pyo3::types::PyDateTime>, +) -> PyResult<(i64, i32)> { + use pyo3::types::{PyDelta, PyDeltaAccess}; + + let py = dt.py(); + let epoch = get_utc_epoch(py)?; + + // Compute delta = dt - epoch (this handles timezone conversion correctly) + let delta = dt.call_method1("__sub__", (epoch,))?; + let delta = delta.downcast::()?; + + // Extract components using integer arithmetic + let days = delta.get_days() as i64; + let seconds = delta.get_seconds() as i64; + let microseconds = delta.get_microseconds() as i64; + + // Total milliseconds (note: days can be negative for dates before epoch) + let total_micros = days * MICROS_PER_DAY + seconds * MICROS_PER_SECOND + microseconds; + let millis = total_micros / MICROS_PER_MILLI; + let nano_of_milli = ((total_micros % MICROS_PER_MILLI) * MICROS_PER_MILLI) as i32; + + // Handle negative microseconds remainder + let (millis, nano_of_milli) = if nano_of_milli < 0 { + (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32) + } else { + (millis, nano_of_milli) + }; + + Ok((millis, nano_of_milli)) +} + +/// Convert nanoseconds to (milliseconds, nano_of_millisecond) +fn nanos_to_millis_and_submillis(nanos: i64) -> (i64, i32) { + let millis = nanos / NANOS_PER_MILLI; + let nano_of_milli = (nanos % NANOS_PER_MILLI) as i32; + + // Handle negative nanoseconds correctly (Euclidean remainder) + if nano_of_milli < 0 { + (millis - 1, nano_of_milli + NANOS_PER_MILLI as i32) + } else { + (millis, nano_of_milli) + } +} + +/// Check if value is a pandas Timestamp by examining its type. +fn is_pandas_timestamp(value: &Bound) -> bool { + // Check module and class name to avoid importing pandas + if let Ok(cls) = value.get_type().getattr("__module__") { + if let Ok(module) = cls.extract::<&str>() { + if module.starts_with("pandas") { + if let Ok(name) = value.get_type().getattr("__name__") { + if let Ok(name_str) = name.extract::<&str>() { + return name_str == "Timestamp"; + } + } + } + } + } + false +} + +/// Get type name +fn get_type_name(value: &Bound) -> String { + value + .get_type() + .name() + .map(|s| s.to_string()) + .unwrap_or_else(|_| "unknown".to_string()) +} + /// Scanner for reading log data from a Fluss table #[pyclass] pub struct LogScanner { @@ -621,3 +1027,39 @@ impl LogScanner { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_nanos_to_millis_and_submillis() { + // Simple positive case + assert_eq!(nanos_to_millis_and_submillis(1_500_000), (1, 500_000)); + + // Exact millisecond boundary + assert_eq!(nanos_to_millis_and_submillis(2_000_000), (2, 0)); + + // Zero + assert_eq!(nanos_to_millis_and_submillis(0), (0, 0)); + + // Large value + assert_eq!( + nanos_to_millis_and_submillis(86_400_000_000_000), // 1 day in nanos + (86_400_000, 0) + ); + + // Negative: -1.5 milliseconds should be (-2 millis, +500_000 nanos) + // Because -1_500_000 nanos = -2ms + 500_000ns + assert_eq!(nanos_to_millis_and_submillis(-1_500_000), (-2, 500_000)); + + // Negative exact boundary + assert_eq!(nanos_to_millis_and_submillis(-2_000_000), (-2, 0)); + + // Small negative + assert_eq!(nanos_to_millis_and_submillis(-1), (-1, 999_999)); + + // Negative with sub-millisecond part + assert_eq!(nanos_to_millis_and_submillis(-500_000), (-1, 500_000)); + } +} diff --git a/bindings/python/src/utils.rs b/bindings/python/src/utils.rs index 09e6b5f5..ee32c9c1 100644 --- a/bindings/python/src/utils.rs +++ b/bindings/python/src/utils.rs @@ -59,8 +59,39 @@ impl Utils { ArrowDataType::Binary | ArrowDataType::LargeBinary => DataTypes::bytes(), ArrowDataType::Date32 => DataTypes::date(), ArrowDataType::Date64 => DataTypes::date(), - ArrowDataType::Time32(_) | ArrowDataType::Time64(_) => DataTypes::time(), - ArrowDataType::Timestamp(_, _) => DataTypes::timestamp(), + ArrowDataType::Time32(unit) => match unit { + arrow_schema::TimeUnit::Second => DataTypes::time_with_precision(0), + arrow_schema::TimeUnit::Millisecond => DataTypes::time_with_precision(3), + _ => { + return Err(FlussError::new_err(format!( + "Unsupported Time32 unit: {unit:?}" + ))); + } + }, + ArrowDataType::Time64(unit) => match unit { + arrow_schema::TimeUnit::Microsecond => DataTypes::time_with_precision(6), + arrow_schema::TimeUnit::Nanosecond => DataTypes::time_with_precision(9), + _ => { + return Err(FlussError::new_err(format!( + "Unsupported Time64 unit: {unit:?}" + ))); + } + }, + ArrowDataType::Timestamp(unit, tz) => { + let precision = match unit { + arrow_schema::TimeUnit::Second => 0, + arrow_schema::TimeUnit::Millisecond => 3, + arrow_schema::TimeUnit::Microsecond => 6, + arrow_schema::TimeUnit::Nanosecond => 9, + }; + // Arrow Timestamp with timezone -> Fluss TimestampLtz + // Arrow Timestamp without timezone -> Fluss Timestamp (NTZ) + if tz.is_some() { + DataTypes::timestamp_ltz_with_precision(precision) + } else { + DataTypes::timestamp_with_precision(precision) + } + } ArrowDataType::Decimal128(precision, scale) => { DataTypes::decimal(*precision as u32, *scale as u32) }