Skip to content
Open
175 changes: 173 additions & 2 deletions rust/cubesql/cubesql/src/compile/rewrite/rules/members.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,8 @@ impl RewriteRules for MemberRules {
"?left_join_hints",
"?right_join_hints",
"?out_join_hints",
"?join_type",
"?left_filters",
),
),
];
Expand Down Expand Up @@ -2792,6 +2794,8 @@ impl MemberRules {
left_join_hints_var: &'static str,
right_join_hints_var: &'static str,
out_join_hints_var: &'static str,
join_type_var: &'static str,
left_filters_var: &'static str,
) -> impl Fn(&mut CubeEGraph, &mut Subst) -> bool {
let left_alias_to_cube_var = var!(left_alias_to_cube_var);
let right_alias_to_cube_var = var!(right_alias_to_cube_var);
Expand All @@ -2803,18 +2807,185 @@ impl MemberRules {
let left_join_hints_var = var!(left_join_hints_var);
let right_join_hints_var = var!(right_join_hints_var);
let out_join_hints_var = var!(out_join_hints_var);
let join_type_var = var!(join_type_var);
let left_filters_var = var!(left_filters_var);
let meta_context = self.meta_context.clone();
move |egraph, subst| {
let Some((left_cube, right_cube)) = is_proper_cube_join_condition(
// Resolves a join column to the name of the dimension member it
// references, but only for plain or time dimensions (measures,
// segments, etc. are not valid shared join keys).
fn dimension_member_name(
egraph: &mut CubeEGraph,
members_id: Id,
column: &Column,
) -> Option<String> {
match egraph[members_id].data.find_member_by_column(column) {
Some(((_, Member::Dimension { name, .. }, _), _))
| Some(((_, Member::TimeDimension { name, .. }, _), _)) => Some(name.clone()),
_ => None,
}
}

// Two ways to recognize a joinable pair of CubeScans:
// 1. The classic `left.__cubeJoinField = right.__cubeJoinField`
// condition that comes from the data model join graph.
// 2. A join between two CubeScans (typically views) on a
// dimension that resolves to the *same underlying cube member*
// — e.g. `orders_view.city = customers_view.city` where both
// `city` dimensions are aliases of the same `cube.dimension`.
// Such a join is on the same shared key, so the two scans can
// be merged into a single CubeScan exactly like any other
// cube-to-cube join, letting the query planner treat the result
// as a (multi-fact) query over the combined members.
let cubes = is_proper_cube_join_condition(
egraph,
subst,
left_members_var,
left_on_var,
right_members_var,
right_on_var,
) else {
);
let mut shared_left_keys: Vec<String> = vec![];
let mut shared_right_keys: Vec<String> = vec![];
let (cubes, shared_member_join) = match cubes {
Some(cubes) => (Some(cubes), false),
None => {
// A view dimension keeps the original `cube.dimension` path
// in `alias_member`; for non-view members we fall back to
// the member name itself.
let resolve_underlying = |member_name: &str| -> String {
meta_context
.find_dimension_with_name(member_name)
.and_then(|dim| dim.alias_member.clone())
.unwrap_or_else(|| member_name.to_string())
};

let left_join_ons = var_iter!(egraph[subst[left_on_var]], JoinLeftOn)
.cloned()
.collect::<Vec<_>>();
let right_join_ons = var_iter!(egraph[subst[right_on_var]], JoinRightOn)
.cloned()
.collect::<Vec<_>>();

let mut found = None;
'pairs: for left_on in left_join_ons.iter() {
for right_on in right_join_ons.iter() {
// We can only merge when the *whole* join key is
// fully within dimensions: every column pair must
// resolve to a dimension (or time dimension) on both
// sides and to the same underlying cube member. A
// join key that touches a measure/segment/etc. (or
// mixes underlying members) is rejected, leaving the
// join to the normal (non-merged) handling.
if left_on.is_empty() || left_on.len() != right_on.len() {
continue;
}
let mut left_cube_name: Option<String> = None;
let mut right_cube_name: Option<String> = None;
let mut left_keys: Vec<String> = vec![];
let mut right_keys: Vec<String> = vec![];
let mut all_match = true;
for (left_column, right_column) in left_on.iter().zip(right_on.iter()) {
let Some(left_name) = dimension_member_name(
egraph,
subst[left_members_var],
left_column,
) else {
all_match = false;
break;
};
let Some(right_name) = dimension_member_name(
egraph,
subst[right_members_var],
right_column,
) else {
all_match = false;
break;
};
if resolve_underlying(&left_name) != resolve_underlying(&right_name)
{
all_match = false;
break;
}
left_cube_name = left_name.split('.').next().map(|s| s.to_string());
right_cube_name =
right_name.split('.').next().map(|s| s.to_string());
left_keys.push(left_name);
right_keys.push(right_name);
}
if all_match {
if let (Some(left_cube_name), Some(right_cube_name)) =
(left_cube_name, right_cube_name)
{
found = Some((left_cube_name, right_cube_name));
shared_left_keys = left_keys;
shared_right_keys = right_keys;
break 'pairs;
}
}
}
}
(found, true)
}
};
let Some((left_cube, right_cube)) = cubes else {
return false;
};

// For a join between two views on a shared cube member, Tesseract
// renders the merged multi-fact scan as a FULL OUTER JOIN over the
// shared key. Re-introduce the requested INNER/LEFT/RIGHT semantics
// by requiring the join key of each "must be present" side to be
// set (FULL adds nothing).
if shared_member_join {
let mut require_left = false;
let mut require_right = false;
if let Some(join_type) = var_list_iter!(egraph[subst[join_type_var]], JoinJoinType)
.cloned()
.next()
{
match join_type.0 {
datafusion::prelude::JoinType::Inner => {
require_left = true;
require_right = true;
}
datafusion::prelude::JoinType::Left => require_left = true,
datafusion::prelude::JoinType::Right => require_right = true,
_ => {}
}
}

let mut presence_members: Vec<String> = vec![];
if require_left {
presence_members.extend(shared_left_keys.iter().cloned());
}
if require_right {
presence_members.extend(shared_right_keys.iter().cloned());
}

if !presence_members.is_empty() {
let mut acc = subst[left_filters_var];
for name in presence_members {
let member = egraph.add(LogicalPlanLanguage::FilterMemberMember(
crate::compile::rewrite::FilterMemberMember(name),
));
let op = egraph.add(LogicalPlanLanguage::FilterMemberOp(
crate::compile::rewrite::FilterMemberOp("set".to_string()),
));
let values = egraph.add(LogicalPlanLanguage::FilterMemberValues(
crate::compile::rewrite::FilterMemberValues(vec![]),
));
let filter_member =
egraph.add(LogicalPlanLanguage::FilterMember([member, op, values]));
acc = egraph.add(LogicalPlanLanguage::CubeScanFilters(vec![
filter_member,
acc,
]));
}
subst.insert(left_filters_var, acc);
}
}

for left_alias_to_cube in
var_iter!(egraph[subst[left_alias_to_cube_var]], CubeScanAliasToCube)
{
Expand Down
2 changes: 2 additions & 0 deletions rust/cubesql/cubesql/src/compile/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod test_cube_join;
#[cfg(test)]
pub mod test_cube_join_grouped;
#[cfg(test)]
pub mod test_cube_join_views;
#[cfg(test)]
pub mod test_cube_scan;
#[cfg(test)]
pub mod test_df_execution;
Expand Down
Loading
Loading