From ef67372c8d2c0eaef371e9c86a744bff39b57141 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 31 Mar 2025 18:37:43 -0400 Subject: [PATCH 1/4] feat: Support `TimestampNs` and TimestampTzNs` in bucket transform --- crates/iceberg/src/transform/bucket.rs | 45 ++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index ce39826bb3..1f2bc93b3f 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -167,6 +167,16 @@ impl TransformFunction for Bucket { .downcast_ref::() .unwrap() .unary(|v| self.bucket_timestamp(v)), + DataType::Time64(TimeUnit::Nanosecond) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_time(v)), + DataType::Timestamp(TimeUnit::Nanosecond, _) => input + .as_any() + .downcast_ref::() + .unwrap() + .unary(|v| self.bucket_timestamp(v)), DataType::Utf8 => arrow_array::Int32Array::from_iter( input .as_any() @@ -228,6 +238,9 @@ impl TransformFunction for Bucket { (PrimitiveType::Date, PrimitiveLiteral::Int(v)) => self.bucket_date(*v), (PrimitiveType::Time, PrimitiveLiteral::Long(v)) => self.bucket_time(*v), (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), + (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), + (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), (PrimitiveType::String, PrimitiveLiteral::String(v)) => self.bucket_str(v.as_str()), (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(v)) => { self.bucket_bytes(uuid::Uuid::from_u128(*v).as_ref()) @@ -888,4 +901,36 @@ mod test { Datum::int(32) ); } + + #[test] + fn test_timestamptz_literal() { + let bucket = Bucket::new(100); + assert_eq!( + bucket + .transform_literal(&Datum::timestamptz_micros(1510871468000000)) + .unwrap() + .unwrap(), + Datum::int(7) + ); + } + + #[test] + fn test_timestamp_ns_literal() { + let bucket = Bucket::new(100); + let ns_value = 1510871468000000i64 * 1000; + assert_eq!( + bucket.transform_literal(&Datum::timestamp_nanos(ns_value)).unwrap().unwrap(), + Datum::int(bucket.bucket_timestamp(ns_value)) + ); + } + + #[test] + fn test_timestamptz_ns_literal() { + let bucket = Bucket::new(100); + let ns_value = 1510871468000000i64 * 1000; + assert_eq!( + bucket.transform_literal(&Datum::timestamptz_nanos(ns_value)).unwrap().unwrap(), + Datum::int(bucket.bucket_timestamp(ns_value)) + ); + } } From a6329d2f053cd58db9fb1d4ff51e26dbacffb9b9 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 31 Mar 2025 18:42:32 -0400 Subject: [PATCH 2/4] fmt --- crates/iceberg/src/transform/bucket.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index 1f2bc93b3f..d3c7c376b5 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -919,8 +919,11 @@ mod test { let bucket = Bucket::new(100); let ns_value = 1510871468000000i64 * 1000; assert_eq!( - bucket.transform_literal(&Datum::timestamp_nanos(ns_value)).unwrap().unwrap(), - Datum::int(bucket.bucket_timestamp(ns_value)) + bucket + .transform_literal(&Datum::timestamp_nanos(ns_value)) + .unwrap() + .unwrap(), + Datum::int(79) ); } @@ -929,8 +932,11 @@ mod test { let bucket = Bucket::new(100); let ns_value = 1510871468000000i64 * 1000; assert_eq!( - bucket.transform_literal(&Datum::timestamptz_nanos(ns_value)).unwrap().unwrap(), - Datum::int(bucket.bucket_timestamp(ns_value)) + bucket + .transform_literal(&Datum::timestamptz_nanos(ns_value)) + .unwrap() + .unwrap(), + Datum::int(79) ); } } From 54d001b30627afe04aa4059a241e3400235b9cbc Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 1 Apr 2025 16:31:23 -0400 Subject: [PATCH 3/4] add test --- crates/iceberg/src/transform/bucket.rs | 31 ++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index d3c7c376b5..1caae2bd52 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -171,12 +171,12 @@ impl TransformFunction for Bucket { .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_time(v)), + .unary(|v| self.bucket_time(v / 1000)), DataType::Timestamp(TimeUnit::Nanosecond, _) => input .as_any() .downcast_ref::() .unwrap() - .unary(|v| self.bucket_timestamp(v)), + .unary(|v| self.bucket_timestamp(v / 1000)), DataType::Utf8 => arrow_array::Int32Array::from_iter( input .as_any() @@ -263,6 +263,9 @@ impl TransformFunction for Bucket { #[cfg(test)] mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray}; use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime}; use super::Bucket; @@ -939,4 +942,28 @@ mod test { Datum::int(79) ); } + + #[test] + fn test_transform_timestamp_nanos_and_micros_array_equivalence() { + let bucket = Bucket::new(100); + let micros_value = 1510871468000000; + let nanos_value = micros_value * 1000; + + let micro_array = TimestampMicrosecondArray::from_iter_values(vec![micros_value]); + let nano_array = TimestampNanosecondArray::from_iter_values(vec![nanos_value]); + + let transformed_micro: ArrayRef = bucket.transform(Arc::new(micro_array)).unwrap(); + let transformed_nano: ArrayRef = bucket.transform(Arc::new(nano_array)).unwrap(); + + let micro_result = transformed_micro + .as_any() + .downcast_ref::() + .unwrap(); + let nano_result = transformed_nano + .as_any() + .downcast_ref::() + .unwrap(); + + assert_eq!(micro_result.value(0), nano_result.value(0)); + } } From 159890ccb0be1ad67d2ca1a69f3f7b5a1d3cd4da Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 1 Apr 2025 17:34:07 -0400 Subject: [PATCH 4/4] update conversion/test --- crates/iceberg/src/transform/bucket.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/transform/bucket.rs b/crates/iceberg/src/transform/bucket.rs index 1caae2bd52..b64a4631d0 100644 --- a/crates/iceberg/src/transform/bucket.rs +++ b/crates/iceberg/src/transform/bucket.rs @@ -239,8 +239,12 @@ impl TransformFunction for Bucket { (PrimitiveType::Time, PrimitiveLiteral::Long(v)) => self.bucket_time(*v), (PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), (PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), - (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), - (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v), + (PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => { + self.bucket_timestamp(*v / 1000) + } + (PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => { + self.bucket_timestamp(*v / 1000) + } (PrimitiveType::String, PrimitiveLiteral::String(v)) => self.bucket_str(v.as_str()), (PrimitiveType::Uuid, PrimitiveLiteral::UInt128(v)) => { self.bucket_bytes(uuid::Uuid::from_u128(*v).as_ref()) @@ -926,7 +930,7 @@ mod test { .transform_literal(&Datum::timestamp_nanos(ns_value)) .unwrap() .unwrap(), - Datum::int(79) + Datum::int(7) ); } @@ -939,7 +943,7 @@ mod test { .transform_literal(&Datum::timestamptz_nanos(ns_value)) .unwrap() .unwrap(), - Datum::int(79) + Datum::int(7) ); }