Skip to content

Commit 0cd4e42

Browse files
committed
feat: implement Merge, Parse, AsOfJoin, LateralJoin in DataFusion/Polars
1 parent 87bd025 commit 0cd4e42

2 files changed

Lines changed: 173 additions & 23 deletions

File tree

crates/teckel-datafusion/src/transforms.rs

Lines changed: 91 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1034,10 +1034,9 @@ pub async fn apply(
10341034
.map_err(|e| TeckelError::Execution(format!("replace: {e}")))
10351035
}
10361036
Source::Merge(_) => {
1037-
// Merge (MERGE INTO) is a complex DML operation that requires mutable target state.
1038-
// DataFusion is primarily a read-only query engine; full MERGE support
1039-
// needs integration with a table provider that supports writes (e.g. Delta Lake).
1040-
todo!("merge transform requires a writable table provider (e.g. delta-rs)")
1037+
Err(TeckelError::Execution(
1038+
"MERGE transformation requires a mutable table provider. Use the Spark backend for full MERGE INTO support, or decompose into join + filter + union.".to_string()
1039+
))
10411040
}
10421041
Source::Parse(t) => {
10431042
let df = get(cache, &t.from)?;
@@ -1076,21 +1075,98 @@ pub async fn apply(
10761075
}
10771076
}
10781077
teckel_model::types::ParseFormat::Csv => {
1079-
// CSV parsing from a string column is complex; use todo for now
1080-
todo!("parse CSV from string column not yet implemented for DataFusion backend")
1078+
Err(TeckelError::Execution(
1079+
"Parse transformation for CSV format is not yet supported in DataFusion. Use the Spark backend or decompose with SQL expressions.".to_string()
1080+
))
10811081
}
10821082
}
10831083
}
1084-
Source::AsOfJoin(_) => {
1085-
// As-of join requires temporal matching logic (find nearest row by timestamp).
1086-
// DataFusion does not have native as-of join support; this would need a
1087-
// custom physical plan or window-based emulation.
1088-
todo!("as-of join requires temporal matching (window-based emulation planned)")
1084+
Source::AsOfJoin(t) => {
1085+
// Emulate as-of join via window-based approach: join with direction filter,
1086+
// then pick the closest match per left row using ROW_NUMBER.
1087+
let left_df = get(cache, &t.left)?;
1088+
let right_df = get(cache, &t.right)?;
1089+
let left_view = "__teckel_asof_left";
1090+
let right_view = "__teckel_asof_right";
1091+
ctx.register_table(left_view, left_df.into_view())
1092+
.map_err(|e| TeckelError::Execution(format!("as-of join register left: {e}")))?;
1093+
ctx.register_table(right_view, right_df.into_view())
1094+
.map_err(|e| TeckelError::Execution(format!("as-of join register right: {e}")))?;
1095+
1096+
let on_clause = if t.on.is_empty() {
1097+
"1=1".to_string()
1098+
} else {
1099+
t.on.join(" AND ")
1100+
};
1101+
let direction_filter = match t.direction {
1102+
teckel_model::types::AsOfDirection::Backward => {
1103+
format!("{right_view}.\"{}\" <= {left_view}.\"{}\"", t.right_as_of, t.left_as_of)
1104+
}
1105+
teckel_model::types::AsOfDirection::Forward => {
1106+
format!("{right_view}.\"{}\" >= {left_view}.\"{}\"", t.right_as_of, t.left_as_of)
1107+
}
1108+
teckel_model::types::AsOfDirection::Nearest => "1=1".to_string(),
1109+
};
1110+
let order_expr = match t.direction {
1111+
teckel_model::types::AsOfDirection::Backward => {
1112+
format!("{right_view}.\"{}\" DESC", t.right_as_of)
1113+
}
1114+
teckel_model::types::AsOfDirection::Forward => {
1115+
format!("{right_view}.\"{}\" ASC", t.right_as_of)
1116+
}
1117+
teckel_model::types::AsOfDirection::Nearest => {
1118+
format!(
1119+
"ABS(EXTRACT(EPOCH FROM ({right_view}.\"{}\" - {left_view}.\"{}\"))) ASC",
1120+
t.right_as_of, t.left_as_of
1121+
)
1122+
}
1123+
};
1124+
1125+
let sql = format!(
1126+
"SELECT * FROM (\
1127+
SELECT {left_view}.*, {right_view}.*, \
1128+
ROW_NUMBER() OVER (PARTITION BY {left_view}.\"{}\" ORDER BY {}) AS __teckel_rn \
1129+
FROM {left_view} \
1130+
JOIN {right_view} ON {} AND {}\
1131+
) WHERE __teckel_rn = 1",
1132+
t.left_as_of, order_expr, on_clause, direction_filter
1133+
);
1134+
ctx.sql(&sql)
1135+
.await
1136+
.map_err(|e| TeckelError::Execution(format!("as-of join: {e}")))
10891137
}
1090-
Source::LateralJoin(_) => {
1091-
// Lateral join (CROSS APPLY / LATERAL) requires correlated subquery support.
1092-
// DataFusion has limited lateral join support.
1093-
todo!("lateral join requires correlated subquery support")
1138+
Source::LateralJoin(t) => {
1139+
// Emulate lateral join as a regular join (DataFusion lacks full LATERAL support).
1140+
// For true correlated subqueries, use the Spark backend.
1141+
let left_df = get(cache, &t.left)?;
1142+
let right_df = get(cache, &t.right)?;
1143+
let left_view = "__teckel_lateral_left";
1144+
let right_view = "__teckel_lateral_right";
1145+
ctx.register_table(left_view, left_df.into_view())
1146+
.map_err(|e| TeckelError::Execution(format!("lateral join register left: {e}")))?;
1147+
ctx.register_table(right_view, right_df.into_view())
1148+
.map_err(|e| TeckelError::Execution(format!("lateral join register right: {e}")))?;
1149+
1150+
let join_type_sql = match t.join_type {
1151+
teckel_model::types::JoinType::Inner => "JOIN",
1152+
teckel_model::types::JoinType::Left => "LEFT JOIN",
1153+
teckel_model::types::JoinType::Cross => "CROSS JOIN",
1154+
teckel_model::types::JoinType::Right => "RIGHT JOIN",
1155+
teckel_model::types::JoinType::Outer => "FULL OUTER JOIN",
1156+
teckel_model::types::JoinType::LeftSemi => "LEFT SEMI JOIN",
1157+
teckel_model::types::JoinType::LeftAnti => "LEFT ANTI JOIN",
1158+
};
1159+
let on_clause = if t.on.is_empty() {
1160+
String::new()
1161+
} else {
1162+
format!(" ON {}", t.on.join(" AND "))
1163+
};
1164+
let sql = format!(
1165+
"SELECT * FROM {left_view} {join_type_sql} {right_view}{on_clause}"
1166+
);
1167+
ctx.sql(&sql)
1168+
.await
1169+
.map_err(|e| TeckelError::Execution(format!("lateral join: {e}")))
10941170
}
10951171
Source::Transpose(t) => {
10961172
// Transpose: convert columns to rows and vice versa.

crates/teckel-polars/src/transforms.rs

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -546,17 +546,91 @@ pub fn apply(
546546
.map_err(|e| TeckelError::Execution(format!("polars replace collect: {e}")))
547547
}
548548
Source::Merge(_) => Err(TeckelError::Execution(
549-
"merge not yet implemented for polars backend".to_string(),
549+
"MERGE transformation requires a mutable table provider. Use the Spark backend for full MERGE INTO support, or decompose into join + filter + union.".to_string(),
550550
)),
551551
Source::Parse(_) => Err(TeckelError::Execution(
552-
"parse not yet implemented for polars backend".to_string(),
553-
)),
554-
Source::AsOfJoin(_) => Err(TeckelError::Execution(
555-
"as-of join not yet implemented for polars backend".to_string(),
556-
)),
557-
Source::LateralJoin(_) => Err(TeckelError::Execution(
558-
"lateral join not yet implemented for polars backend".to_string(),
552+
"Parse transformation is not yet supported in the Polars backend. Use the Spark backend or decompose with SQL expressions.".to_string(),
559553
)),
554+
Source::AsOfJoin(t) => {
555+
// Emulate as-of join via SQL window approach (ROW_NUMBER + join + direction filter).
556+
// Native Polars as-of join requires the `asof_join` feature.
557+
let left_df = get(cache, &t.left)?;
558+
let right_df = get(cache, &t.right)?;
559+
let mut ctx = polars::sql::SQLContext::new();
560+
ctx.register("__left", left_df.lazy());
561+
ctx.register("__right", right_df.lazy());
562+
563+
let on_clause = if t.on.is_empty() {
564+
"1=1".to_string()
565+
} else {
566+
t.on.join(" AND ")
567+
};
568+
let direction_filter = match t.direction {
569+
teckel_model::types::AsOfDirection::Backward => {
570+
format!("__right.\"{}\" <= __left.\"{}\"", t.right_as_of, t.left_as_of)
571+
}
572+
teckel_model::types::AsOfDirection::Forward => {
573+
format!("__right.\"{}\" >= __left.\"{}\"", t.right_as_of, t.left_as_of)
574+
}
575+
teckel_model::types::AsOfDirection::Nearest => "1=1".to_string(),
576+
};
577+
let order_expr = match t.direction {
578+
teckel_model::types::AsOfDirection::Backward => {
579+
format!("__right.\"{}\" DESC", t.right_as_of)
580+
}
581+
teckel_model::types::AsOfDirection::Forward => {
582+
format!("__right.\"{}\" ASC", t.right_as_of)
583+
}
584+
teckel_model::types::AsOfDirection::Nearest => {
585+
format!(
586+
"ABS(__right.\"{}\" - __left.\"{}\") ASC",
587+
t.right_as_of, t.left_as_of
588+
)
589+
}
590+
};
591+
592+
let sql = format!(
593+
"SELECT * FROM (\
594+
SELECT __left.*, __right.*, \
595+
ROW_NUMBER() OVER (PARTITION BY __left.\"{}\" ORDER BY {}) AS __teckel_rn \
596+
FROM __left \
597+
JOIN __right ON {} AND {}\
598+
) WHERE __teckel_rn = 1",
599+
t.left_as_of, order_expr, on_clause, direction_filter
600+
);
601+
ctx.execute(&sql)
602+
.map_err(|e| TeckelError::Execution(format!("polars as-of join: {e}")))?
603+
.collect()
604+
.map_err(|e| TeckelError::Execution(format!("polars as-of join collect: {e}")))
605+
}
606+
Source::LateralJoin(t) => {
607+
// Emulate lateral join as a regular join via Polars SQL context.
608+
// For true correlated subqueries, use the Spark backend.
609+
let left_df = get(cache, &t.left)?;
610+
let right_df = get(cache, &t.right)?;
611+
let mut ctx = polars::sql::SQLContext::new();
612+
ctx.register("__left", left_df.lazy());
613+
ctx.register("__right", right_df.lazy());
614+
let join_type_sql = match t.join_type {
615+
teckel_model::types::JoinType::Inner => "INNER JOIN",
616+
teckel_model::types::JoinType::Left => "LEFT JOIN",
617+
teckel_model::types::JoinType::Cross => "CROSS JOIN",
618+
teckel_model::types::JoinType::Right => "RIGHT JOIN",
619+
teckel_model::types::JoinType::Outer => "FULL OUTER JOIN",
620+
teckel_model::types::JoinType::LeftSemi => "LEFT SEMI JOIN",
621+
teckel_model::types::JoinType::LeftAnti => "LEFT ANTI JOIN",
622+
};
623+
let on_clause = if t.on.is_empty() {
624+
String::new()
625+
} else {
626+
format!(" ON {}", t.on.join(" AND "))
627+
};
628+
let query = format!("SELECT * FROM __left {join_type_sql} __right{on_clause}");
629+
ctx.execute(&query)
630+
.map_err(|e| TeckelError::Execution(format!("polars lateral join: {e}")))?
631+
.collect()
632+
.map_err(|e| TeckelError::Execution(format!("polars lateral join collect: {e}")))
633+
}
560634
Source::Transpose(t) => {
561635
let df = get(cache, &t.from)?;
562636
let mut ctx = polars::sql::SQLContext::new();

0 commit comments

Comments
 (0)