Skip to content

Commit 54bd43c

Browse files
committed
[AURON #2130] Implement native function of weekofyear
Signed-off-by: weimingdiit <weimingdiit@gmail.com>
1 parent 3518098 commit 54bd43c

4 files changed

Lines changed: 128 additions & 1 deletion

File tree

native-engine/datafusion-ext-functions/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ pub fn create_auron_ext_function(
7575
"Spark_Month" => Arc::new(spark_dates::spark_month),
7676
"Spark_Day" => Arc::new(spark_dates::spark_day),
7777
"Spark_DayOfWeek" => Arc::new(spark_dates::spark_dayofweek),
78+
"Spark_WeekOfYear" => Arc::new(spark_dates::spark_weekofyear),
7879
"Spark_Quarter" => Arc::new(spark_dates::spark_quarter),
7980
"Spark_Hour" => Arc::new(spark_dates::spark_hour),
8081
"Spark_Minute" => Arc::new(spark_dates::spark_minute),

native-engine/datafusion-ext-functions/src/spark_dates.rs

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use arrow::{
2020
compute::{DatePart, date_part},
2121
datatypes::{DataType, TimeUnit},
2222
};
23-
use chrono::{TimeZone, Utc, prelude::*};
23+
use chrono::{Duration, TimeZone, Utc, prelude::*};
2424
use chrono_tz::Tz;
2525
use datafusion::{
2626
common::{Result, ScalarValue},
@@ -72,6 +72,77 @@ pub fn spark_dayofweek(args: &[ColumnarValue]) -> Result<ColumnarValue> {
7272
Ok(ColumnarValue::Array(Arc::new(dayofweek)))
7373
}
7474

75+
/// `spark_weekofyear(date/timestamp/compatible-string[, timezone])`
76+
///
77+
/// Matches Spark's `weekofyear()` semantics:
78+
/// ISO week numbering, with Monday as the first day of the week,
79+
/// and week 1 defined as the first week with more than 3 days.
80+
///
81+
/// For `Timestamp` inputs, this function interprets epoch milliseconds in the
82+
/// provided timezone (if any) before deriving the calendar date and ISO week.
83+
/// If no timezone is provided, `UTC` is used by default. For `Date` and
84+
/// compatible string inputs, the behavior is unchanged: the value is cast to
85+
/// `Date32` and the ISO week is computed from the resulting date.
86+
pub fn spark_weekofyear(args: &[ColumnarValue]) -> Result<ColumnarValue> {
87+
// First argument as an Arrow array (date/timestamp/string, etc.)
88+
let array = args[0].clone().into_array(1)?;
89+
90+
// Determine timezone (for timestamp inputs). Default to UTC to match
91+
// existing behavior when no timezone is provided.
92+
let default_tz = chrono_tz::UTC;
93+
let tz: Tz = if args.len() > 1 {
94+
match &args[1] {
95+
ColumnarValue::Scalar(ScalarValue::Utf8(Some(s)))
96+
| ColumnarValue::Scalar(ScalarValue::LargeUtf8(Some(s))) => {
97+
s.parse::<Tz>().unwrap_or(default_tz)
98+
}
99+
_ => default_tz,
100+
}
101+
} else {
102+
default_tz
103+
};
104+
105+
match array.data_type() {
106+
// Timestamp inputs: localize epoch milliseconds before computing ISO week
107+
DataType::Timestamp(TimeUnit::Millisecond, _) => {
108+
let ts_arr = array
109+
.as_any()
110+
.downcast_ref::<TimestampMillisecondArray>()
111+
.expect("internal cast to TimestampMillisecondArray must succeed");
112+
113+
let weekofyear = Int32Array::from_iter(ts_arr.iter().map(|opt_ms| {
114+
opt_ms.and_then(|ms| {
115+
tz.timestamp_millis_opt(ms)
116+
.single()
117+
.map(|dt| dt.date_naive().iso_week().week() as i32)
118+
})
119+
}));
120+
121+
Ok(ColumnarValue::Array(Arc::new(weekofyear)))
122+
}
123+
// Non-timestamp inputs: preserve existing Date32-based behavior
124+
_ => {
125+
let input = cast(&array, &DataType::Date32)?;
126+
let input = input
127+
.as_any()
128+
.downcast_ref::<Date32Array>()
129+
.expect("internal cast to Date32 must succeed");
130+
131+
let epoch =
132+
NaiveDate::from_ymd_opt(1970, 1, 1).expect("1970-01-01 must be a valid date");
133+
let weekofyear = Int32Array::from_iter(input.iter().map(|opt_days| {
134+
opt_days.and_then(|days| {
135+
epoch
136+
.checked_add_signed(Duration::days(days as i64))
137+
.map(|date| date.iso_week().week() as i32)
138+
})
139+
}));
140+
141+
Ok(ColumnarValue::Array(Arc::new(weekofyear)))
142+
}
143+
}
144+
}
145+
75146
/// `spark_quarter(date/timestamp/compatible-string)`
76147
///
77148
/// Simulates Spark's `quarter()` function.
@@ -307,6 +378,29 @@ mod tests {
307378
Ok(())
308379
}
309380

381+
#[test]
382+
fn test_spark_weekofyear() -> Result<()> {
383+
let input = Arc::new(Date32Array::from(vec![
384+
Some(0),
385+
Some(4017),
386+
Some(16801),
387+
Some(17167),
388+
Some(14455),
389+
None,
390+
]));
391+
let args = vec![ColumnarValue::Array(input)];
392+
let expected_ret: ArrayRef = Arc::new(Int32Array::from(vec![
393+
Some(1),
394+
Some(1),
395+
Some(53),
396+
Some(52),
397+
Some(31),
398+
None,
399+
]));
400+
assert_eq!(&spark_weekofyear(&args)?.into_array(1)?, &expected_ret);
401+
Ok(())
402+
}
403+
310404
#[test]
311405
fn test_spark_quarter_basic() -> Result<()> {
312406
// Date32 days relative to 1970-01-01:

spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,35 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
164164
}
165165
}
166166

167+
test("weekofyear function") {
168+
withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
169+
withTable("t1") {
170+
sql(
171+
"create table t1(c1 date, c2 date, c3 date, c4 date, c5 timestamp, c6 string) using parquet")
172+
sql("""insert into t1 values (
173+
| date'2009-07-30',
174+
| date'1980-12-31',
175+
| date'2016-01-01',
176+
| date'2017-01-01',
177+
| timestamp'2016-01-03 23:30:00',
178+
| '2016-01-01'
179+
|)""".stripMargin)
180+
181+
val query =
182+
"""select
183+
| weekofyear(c1),
184+
| weekofyear(c2),
185+
| weekofyear(c3),
186+
| weekofyear(c4),
187+
| weekofyear(c5),
188+
| weekofyear(c6)
189+
|from t1
190+
|""".stripMargin
191+
checkSparkAnswerAndOperator(query)
192+
}
193+
}
194+
}
195+
167196
test("round function with varying scales for intPi") {
168197
withTable("t2") {
169198
sql("CREATE TABLE t2 (c1 INT) USING parquet")

spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeConverters.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -940,6 +940,9 @@ object NativeConverters extends Logging {
940940
case DayOfMonth(child) => buildExtScalarFunction("Spark_Day", child :: Nil, IntegerType)
941941
case DayOfWeek(child) =>
942942
buildExtScalarFunction("Spark_DayOfWeek", child :: Nil, IntegerType)
943+
case WeekOfYear(child) =>
944+
buildExtScalarFunction("Spark_WeekOfYear", child :: Nil, IntegerType)
945+
943946
case Quarter(child) => buildExtScalarFunction("Spark_Quarter", child :: Nil, IntegerType)
944947

945948
case e: Levenshtein =>

0 commit comments

Comments
 (0)