Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions crates/iceberg/src/transform/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ impl TransformFunction for Bucket {
.downcast_ref::<arrow_array::TimestampMicrosecondArray>()
.unwrap()
.unary(|v| self.bucket_timestamp(v)),
DataType::Time64(TimeUnit::Nanosecond) => input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in pyiceberg, TimestampNanoType in converted to micros precision before bucketing. This is to ensure that TimestampType and TimestampNanoType are transformed to the same value

https://github.com/apache/iceberg-python/blob/4d4714a46241d0d89519a2a605dbce27b713a60e/pyiceberg/transforms.py#L322-L340

I think we should add a test to ensure this here too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, It wasn't performing this conversion. The test is added

.as_any()
.downcast_ref::<arrow_array::Time64NanosecondArray>()
.unwrap()
.unary(|v| self.bucket_time(v / 1000)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there was an issue with rounding when performing division for the hour transform (#1146)
but since we're dividing by 1000, i dont think the same issue applies here

DataType::Timestamp(TimeUnit::Nanosecond, _) => input
.as_any()
.downcast_ref::<arrow_array::TimestampNanosecondArray>()
.unwrap()
.unary(|v| self.bucket_timestamp(v / 1000)),
DataType::Utf8 => arrow_array::Int32Array::from_iter(
input
.as_any()
Expand Down Expand Up @@ -228,6 +238,13 @@ impl TransformFunction for Bucket {
(PrimitiveType::Date, PrimitiveLiteral::Int(v)) => self.bucket_date(*v),
(PrimitiveType::Time, PrimitiveLiteral::Long(v)) => self.bucket_time(*v),
(PrimitiveType::Timestamp, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
(PrimitiveType::Timestamptz, PrimitiveLiteral::Long(v)) => self.bucket_timestamp(*v),
(PrimitiveType::TimestampNs, PrimitiveLiteral::Long(v)) => {
self.bucket_timestamp(*v / 1000)
}
(PrimitiveType::TimestamptzNs, PrimitiveLiteral::Long(v)) => {
self.bucket_timestamp(*v / 1000)
}
(PrimitiveType::String, PrimitiveLiteral::String(v)) => self.bucket_str(v.as_str()),
(PrimitiveType::Uuid, PrimitiveLiteral::UInt128(v)) => {
self.bucket_bytes(uuid::Uuid::from_u128(*v).as_ref())
Expand All @@ -250,6 +267,9 @@ impl TransformFunction for Bucket {

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow_array::{ArrayRef, Int32Array, TimestampMicrosecondArray, TimestampNanosecondArray};
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};

use super::Bucket;
Expand Down Expand Up @@ -888,4 +908,66 @@ mod test {
Datum::int(32)
);
}

#[test]
fn test_timestamptz_literal() {
let bucket = Bucket::new(100);
assert_eq!(
bucket
.transform_literal(&Datum::timestamptz_micros(1510871468000000))
.unwrap()
.unwrap(),
Datum::int(7)
);
}

#[test]
fn test_timestamp_ns_literal() {
let bucket = Bucket::new(100);
let ns_value = 1510871468000000i64 * 1000;
assert_eq!(
bucket
.transform_literal(&Datum::timestamp_nanos(ns_value))
.unwrap()
.unwrap(),
Datum::int(7)
);
}

#[test]
fn test_timestamptz_ns_literal() {
let bucket = Bucket::new(100);
let ns_value = 1510871468000000i64 * 1000;
assert_eq!(
bucket
.transform_literal(&Datum::timestamptz_nanos(ns_value))
.unwrap()
.unwrap(),
Datum::int(7)
);
}

#[test]
fn test_transform_timestamp_nanos_and_micros_array_equivalence() {
let bucket = Bucket::new(100);
let micros_value = 1510871468000000;
let nanos_value = micros_value * 1000;

let micro_array = TimestampMicrosecondArray::from_iter_values(vec![micros_value]);
let nano_array = TimestampNanosecondArray::from_iter_values(vec![nanos_value]);

let transformed_micro: ArrayRef = bucket.transform(Arc::new(micro_array)).unwrap();
let transformed_nano: ArrayRef = bucket.transform(Arc::new(nano_array)).unwrap();

let micro_result = transformed_micro
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
let nano_result = transformed_nano
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();

assert_eq!(micro_result.value(0), nano_result.value(0));
}
}
Loading