diff --git a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs index 1429f99e5a7f8..c0aaffec860cd 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs @@ -434,6 +434,87 @@ impl RewriteRules for MemberRules { "?out_join_hints", ), ), + // Merge a join between two (view) CubeScans on a dimension that + // resolves to the same underlying cube member into a single + // CubeScan, but ONLY under an aggregate whose GROUP BY is exactly + // that shared join key. The merged scan becomes a multi-fact query + // (FULL OUTER stitched over the group-by key by the planner). + // Gating on the aggregate means ungrouped queries (e.g. SELECT *) + // and queries grouping by a non-join-key dimension are not merged. + transforming_rewrite( + "push-down-aggregate-shared-member-join", + aggregate( + join( + cube_scan( + "?left_alias_to_cube", + "?left_members", + "?left_filters", + "?left_orders", + "CubeScanLimit:None", + "CubeScanOffset:None", + "?left_split", + "CubeScanCanPushdownJoin:true", + "CubeScanWrapped:false", + "CubeScanUngrouped:true", + "?left_join_hints", + ), + cube_scan( + "?right_alias_to_cube", + "?right_members", + "?right_filters", + "?right_orders", + "CubeScanLimit:None", + "CubeScanOffset:None", + "?right_split", + "CubeScanCanPushdownJoin:true", + "CubeScanWrapped:false", + "CubeScanUngrouped:true", + "?right_join_hints", + ), + "?left_on", + "?right_on", + "?join_type", + "?join_constraint", + "?null_equals_null", + ), + "?group_expr", + "?aggr_expr", + "?agg_split", + ), + aggregate( + cube_scan( + "?out_alias_to_cube", + cube_scan_members("?left_members", "?right_members"), + cube_scan_filters("?left_filters", "?right_filters"), + cube_scan_order_empty_tail(), + "CubeScanLimit:None", + "CubeScanOffset:None", + "CubeScanSplit:false", + "CubeScanCanPushdownJoin:true", + "CubeScanWrapped:false", + "CubeScanUngrouped:true", + "?out_join_hints", + ), + "?group_expr", + "?aggr_expr", + "?agg_split", + ), + self.push_down_aggregate_shared_member_join( + "?left_alias_to_cube", + "?right_alias_to_cube", + "?out_alias_to_cube", + "?left_members", + "?right_members", + "?left_on", + "?right_on", + "?join_type", + "?left_join_hints", + "?right_join_hints", + "?out_join_hints", + "?left_filters", + "?group_expr", + ), + ), ]; rules.extend(self.member_pushdown_rules()); @@ -2866,6 +2947,267 @@ impl MemberRules { } } + #[allow(clippy::too_many_arguments)] + fn push_down_aggregate_shared_member_join( + &self, + left_alias_to_cube_var: &'static str, + right_alias_to_cube_var: &'static str, + out_alias_to_cube_var: &'static str, + left_members_var: &'static str, + right_members_var: &'static str, + left_on_var: &'static str, + right_on_var: &'static str, + join_type_var: &'static str, + left_join_hints_var: &'static str, + right_join_hints_var: &'static str, + out_join_hints_var: &'static str, + left_filters_var: &'static str, + group_expr_var: &'static str, + ) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool { + let left_alias_to_cube_var = var!(left_alias_to_cube_var); + let right_alias_to_cube_var = var!(right_alias_to_cube_var); + let out_alias_to_cube_var = var!(out_alias_to_cube_var); + let left_members_var = var!(left_members_var); + let right_members_var = var!(right_members_var); + let left_on_var = var!(left_on_var); + let right_on_var = var!(right_on_var); + let join_type_var = var!(join_type_var); + let left_join_hints_var = var!(left_join_hints_var); + let right_join_hints_var = var!(right_join_hints_var); + let out_join_hints_var = var!(out_join_hints_var); + let left_filters_var = var!(left_filters_var); + let group_expr_var = var!(group_expr_var); + let meta_context = self.meta_context.clone(); + move |egraph, subst| { + fn dimension_member_name( + egraph: &mut CubeEGraph, + members_id: Id, + column: &Column, + ) -> Option { + match egraph[members_id].data.find_member_by_column(column) { + Some(((_, Member::Dimension { name, .. }, _), _)) + | Some(((_, Member::TimeDimension { name, .. }, _), _)) => Some(name.clone()), + _ => None, + } + } + + let resolve_underlying = |member_name: &str| -> String { + meta_context + .find_dimension_with_name(member_name) + .and_then(|dim| dim.alias_member.clone()) + .unwrap_or_else(|| member_name.to_string()) + }; + + // The join must be on dimensions that resolve to the same + // underlying cube member on both sides (a shared key). + let left_join_ons = var_iter!(egraph[subst[left_on_var]], JoinLeftOn) + .cloned() + .collect::>(); + let right_join_ons = var_iter!(egraph[subst[right_on_var]], JoinRightOn) + .cloned() + .collect::>(); + + let mut matched: Option<( + String, + String, + Vec, + Vec, + Vec, + Vec, + )> = None; + 'pairs: for left_on in left_join_ons.iter() { + for right_on in right_join_ons.iter() { + if left_on.is_empty() || left_on.len() != right_on.len() { + continue; + } + let mut left_cube_name: Option = None; + let mut right_cube_name: Option = None; + let mut left_keys: Vec = vec![]; + let mut right_keys: Vec = vec![]; + let mut all_match = true; + for (left_column, right_column) in left_on.iter().zip(right_on.iter()) { + let Some(left_name) = + dimension_member_name(egraph, subst[left_members_var], left_column) + else { + all_match = false; + break; + }; + let Some(right_name) = + dimension_member_name(egraph, subst[right_members_var], right_column) + else { + all_match = false; + break; + }; + if resolve_underlying(&left_name) != resolve_underlying(&right_name) { + all_match = false; + break; + } + left_cube_name = left_name.split('.').next().map(|s| s.to_string()); + right_cube_name = right_name.split('.').next().map(|s| s.to_string()); + left_keys.push(left_name); + right_keys.push(right_name); + } + if all_match { + if let (Some(left_cube_name), Some(right_cube_name)) = + (left_cube_name, right_cube_name) + { + matched = Some(( + left_cube_name, + right_cube_name, + left_keys, + right_keys, + left_on.clone(), + right_on.clone(), + )); + break 'pairs; + } + } + } + } + + let Some(( + left_cube, + right_cube, + shared_left_keys, + shared_right_keys, + matched_left_cols, + matched_right_cols, + )) = matched + else { + return false; + }; + + // The join key must be fully within the GROUP BY dimensions: every + // group-by column must be one of the join-key columns, and every + // join-key pair must be grouped. This is what makes the multi-fact + // stitch over the group-by key match the requested join. + let Some(group_referenced_expr) = + &egraph.index(subst[group_expr_var]).data.referenced_expr + else { + return false; + }; + let group_cols = referenced_columns(group_referenced_expr); + if group_cols.is_empty() { + return false; + } + let join_key_cols: HashSet = matched_left_cols + .iter() + .chain(matched_right_cols.iter()) + .map(|c| c.flat_name()) + .collect(); + if !group_cols.iter().all(|c| join_key_cols.contains(c)) { + return false; + } + let group_set: HashSet<&String> = group_cols.iter().collect(); + for (left_col, right_col) in matched_left_cols.iter().zip(matched_right_cols.iter()) { + if !group_set.contains(&left_col.flat_name()) + && !group_set.contains(&right_col.flat_name()) + { + return false; + } + } + + // Re-introduce INNER/LEFT/RIGHT semantics on top of the FULL OUTER + // multi-fact stitch by requiring the join key of each "must be + // present" side to be set (FULL adds nothing). + let mut require_left = false; + let mut require_right = false; + if let Some(join_type) = var_list_iter!(egraph[subst[join_type_var]], JoinJoinType) + .cloned() + .next() + { + match join_type.0 { + datafusion::prelude::JoinType::Inner => { + require_left = true; + require_right = true; + } + datafusion::prelude::JoinType::Left => require_left = true, + datafusion::prelude::JoinType::Right => require_right = true, + _ => {} + } + } + + let mut presence_members: Vec = vec![]; + if require_left { + presence_members.extend(shared_left_keys.iter().cloned()); + } + if require_right { + presence_members.extend(shared_right_keys.iter().cloned()); + } + + if !presence_members.is_empty() { + let mut acc = subst[left_filters_var]; + for name in presence_members { + let member = egraph.add(LogicalPlanLanguage::FilterMemberMember( + crate::compile::rewrite::FilterMemberMember(name), + )); + let op = egraph.add(LogicalPlanLanguage::FilterMemberOp( + crate::compile::rewrite::FilterMemberOp("set".to_string()), + )); + let values = egraph.add(LogicalPlanLanguage::FilterMemberValues( + crate::compile::rewrite::FilterMemberValues(vec![]), + )); + let filter_member = + egraph.add(LogicalPlanLanguage::FilterMember([member, op, values])); + acc = egraph.add(LogicalPlanLanguage::CubeScanFilters(vec![ + filter_member, + acc, + ])); + } + subst.insert(left_filters_var, acc); + } + + for left_alias_to_cube in + var_iter!(egraph[subst[left_alias_to_cube_var]], CubeScanAliasToCube) + { + for right_alias_to_cube in + var_iter!(egraph[subst[right_alias_to_cube_var]], CubeScanAliasToCube) + { + for left_join_hints in + var_iter!(egraph[subst[left_join_hints_var]], CubeScanJoinHints) + { + for right_join_hints in + var_iter!(egraph[subst[right_join_hints_var]], CubeScanJoinHints) + { + let out_alias_to_cube = CubeScanAliasToCube( + left_alias_to_cube + .iter() + .chain(right_alias_to_cube.iter()) + .cloned() + .collect(), + ); + + let out_join_hints = CubeScanJoinHints( + left_join_hints + .iter() + .chain(right_join_hints.iter()) + .cloned() + .chain(iter::once(vec![left_cube.clone(), right_cube.clone()])) + .collect(), + ); + + subst.insert( + out_alias_to_cube_var, + egraph.add(LogicalPlanLanguage::CubeScanAliasToCube( + out_alias_to_cube, + )), + ); + + subst.insert( + out_join_hints_var, + egraph.add(LogicalPlanLanguage::CubeScanJoinHints(out_join_hints)), + ); + + return true; + } + } + } + } + + false + } + } + fn push_down_cross_join_to_cubescan_rewrite( &self, name: &str, diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index 20e63e584b5f8..cfefbc79da11d 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -33,6 +33,8 @@ pub mod test_cube_join; #[cfg(test)] pub mod test_cube_join_grouped; #[cfg(test)] +pub mod test_cube_join_views; +#[cfg(test)] pub mod test_cube_scan; #[cfg(test)] pub mod test_df_execution; diff --git a/rust/cubesql/cubesql/src/compile/test/test_cube_join_views.rs b/rust/cubesql/cubesql/src/compile/test/test_cube_join_views.rs new file mode 100644 index 0000000000000..64373622fd2b8 --- /dev/null +++ b/rust/cubesql/cubesql/src/compile/test/test_cube_join_views.rs @@ -0,0 +1,267 @@ +use cubeclient::models::{V1CubeMetaType, V1LoadRequestQuery, V1LoadRequestQueryFilterItem}; +use pretty_assertions::assert_eq; + +use crate::{ + compile::{ + rewrite::rewriter::Rewriter, + test::{ + convert_select_to_query_plan_with_meta, convert_sql_to_cube_query, get_test_session, + get_test_tenant_ctx_with_meta, init_testing_logger, utils::LogicalPlanTestUtils, + }, + CompilationError, DatabaseProtocol, + }, + transport::{CubeMeta, CubeMetaDimension, CubeMetaMeasure}, +}; + +/// Two views that both expose the same underlying `customers.customer_city` +/// dimension (via `aliasMember`). `orders_view` carries an `orders` measure +/// while `customers_view` carries a `customers` measure, so a query that +/// touches both is a multi-fact query joined on the shared key. +fn views_meta() -> Vec { + let dimension = |name: &str, alias: &str| CubeMetaDimension { + name: name.to_string(), + r#type: "string".to_string(), + alias_member: Some(alias.to_string()), + ..CubeMetaDimension::default() + }; + let measure = |name: &str, alias: &str, agg: &str| CubeMetaMeasure { + name: name.to_string(), + title: None, + short_title: None, + description: None, + r#type: "number".to_string(), + agg_type: Some(agg.to_string()), + meta: None, + alias_member: Some(alias.to_string()), + format: None, + format_description: None, + currency: None, + }; + + vec![ + CubeMeta { + name: "customers_view".to_string(), + description: None, + title: None, + r#type: V1CubeMetaType::View, + dimensions: vec![ + dimension("customers_view.customer_city", "customers.customer_city"), + // A second dimension that is NOT a join key, used to test that a + // query grouping by it (instead of the join key) is not merged. + dimension("customers_view.status", "customers.status"), + ], + measures: vec![measure( + "customers_view.avg_age", + "customers.avg_age", + "avg", + )], + segments: vec![], + joins: None, + folders: None, + nested_folders: None, + hierarchies: None, + meta: None, + }, + CubeMeta { + name: "orders_view".to_string(), + description: None, + title: None, + r#type: V1CubeMetaType::View, + dimensions: vec![dimension( + "orders_view.customer_city", + "customers.customer_city", + )], + measures: vec![measure("orders_view.revenue", "orders.revenue", "sum")], + segments: vec![], + joins: None, + folders: None, + nested_folders: None, + hierarchies: None, + meta: None, + }, + ] +} + +fn set_filter(member: &str) -> V1LoadRequestQueryFilterItem { + V1LoadRequestQueryFilterItem { + member: Some(member.to_string()), + operator: Some("set".to_string()), + values: None, + or: None, + and: None, + } +} + +/// The motivating query: a grouped (multi-fact) `LEFT JOIN` selecting a +/// dimension and measures from each view, joined on the shared `customer_city` +/// which is also the GROUP BY key. The two view scans are merged into a single +/// grouped CubeScan, and the left join key gets a `set` filter to recover +/// LEFT-join semantics on top of the FULL OUTER multi-fact stitch. +#[tokio::test] +async fn test_group_by_left_join_two_views_on_shared_member() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let logical_plan = convert_select_to_query_plan_with_meta( + r#" + SELECT c.customer_city, measure(o.revenue), measure(c.avg_age) + FROM customers_view c + LEFT JOIN orders_view o ON o.customer_city = c.customer_city + GROUP BY 1 + "# + .to_string(), + views_meta(), + ) + .await + .as_logical_plan(); + + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![ + "orders_view.revenue".to_string(), + "customers_view.avg_age".to_string(), + ]), + dimensions: Some(vec!["customers_view.customer_city".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + filters: Some(vec![set_filter("customers_view.customer_city")]), + join_hints: Some(vec![vec![ + "customers_view".to_string(), + "orders_view".to_string(), + ]]), + ..Default::default() + } + ) +} + +/// Same shape but `INNER JOIN`: both sides must be present, so the merged scan +/// carries a `set` filter on the join key of each side. +#[tokio::test] +async fn test_group_by_inner_join_two_views_on_shared_member() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let logical_plan = convert_select_to_query_plan_with_meta( + r#" + SELECT c.customer_city, measure(o.revenue), measure(c.avg_age) + FROM customers_view c + INNER JOIN orders_view o ON o.customer_city = c.customer_city + GROUP BY 1 + "# + .to_string(), + views_meta(), + ) + .await + .as_logical_plan(); + + assert_eq!( + logical_plan.find_cube_scan().request, + V1LoadRequestQuery { + measures: Some(vec![ + "orders_view.revenue".to_string(), + "customers_view.avg_age".to_string(), + ]), + dimensions: Some(vec!["customers_view.customer_city".to_string()]), + segments: Some(vec![]), + order: Some(vec![]), + filters: Some(vec![ + set_filter("orders_view.customer_city"), + set_filter("customers_view.customer_city"), + ]), + join_hints: Some(vec![vec![ + "customers_view".to_string(), + "orders_view".to_string(), + ]]), + ..Default::default() + } + ) +} + +/// Ungrouped query (`SELECT *`): the shared-member merge only applies to +/// grouped queries, so an ungrouped join is not merged and is rejected the +/// same way any other unsupported cube join is. +#[tokio::test] +async fn test_ungrouped_join_two_views_on_shared_member_is_not_merged() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let meta = get_test_tenant_ctx_with_meta(views_meta()); + let query = convert_sql_to_cube_query( + &r#" + SELECT * + FROM customers_view + LEFT JOIN orders_view + ON (orders_view.customer_city = customers_view.customer_city) + "# + .to_string(), + meta.clone(), + get_test_session(DatabaseProtocol::PostgreSQL, meta).await, + ) + .await; + + let error = query.unwrap_err(); + assert!(matches!(error, CompilationError::Rewrite(..))); +} + +/// The join is over a dimension (`customer_city`) that is not in the GROUP BY +/// (the query groups by `status` instead). The merge requires the join key to +/// be the group-by key, so this is not merged and is rejected. +#[tokio::test] +async fn test_group_by_join_dimension_not_in_group_by_is_not_merged() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let meta = get_test_tenant_ctx_with_meta(views_meta()); + let query = convert_sql_to_cube_query( + &r#" + SELECT c.status, measure(o.revenue), measure(c.avg_age) + FROM customers_view c + LEFT JOIN orders_view o ON o.customer_city = c.customer_city + GROUP BY 1 + "# + .to_string(), + meta.clone(), + get_test_session(DatabaseProtocol::PostgreSQL, meta).await, + ) + .await; + + let error = query.unwrap_err(); + assert!(matches!(error, CompilationError::Rewrite(..))); +} + +/// The merge only fires when the join key is fully within dimensions. Joining +/// the two views on a measure (`o.revenue = c.avg_age`) is not a shared-member +/// dimension join, so the scans are not merged and the query is rejected. +#[tokio::test] +async fn test_join_two_views_on_measure_is_not_merged() { + if !Rewriter::sql_push_down_enabled() { + return; + } + init_testing_logger(); + + let meta = get_test_tenant_ctx_with_meta(views_meta()); + let query = convert_sql_to_cube_query( + &r#" + SELECT c.customer_city, measure(o.revenue) + FROM customers_view c + LEFT JOIN orders_view o ON (o.revenue = c.avg_age) + GROUP BY 1 + "# + .to_string(), + meta.clone(), + get_test_session(DatabaseProtocol::PostgreSQL, meta).await, + ) + .await; + + let error = query.unwrap_err(); + assert!(matches!(error, CompilationError::Rewrite(..))); +}