Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions datafusion/optimizer/src/optimize_projections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,12 @@ fn split_join_requirements(
// The mark column is synthetic (produced by the join itself),
// so discard it and route only to the left child.
let (left_indices, _mark) = indices.split_off(left_len);
(left_indices, RequiredIndices::new())
(left_indices, RequiredIndices::new().append(&[0]))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this code was added in #21265 from @buraksenn

@buraksenn do you have some time to help review this proposed change?

I think it would help if we can also document in comments why [0] is being appended

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we only need to add the column in some cases -- why does this code add it unconditionally?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I traced the error there and that change guaranteed the column would be non-empty. Typing that out makes me think I shouldn't assume they are empty? So I should only deal with the empty case.

}
JoinType::RightMark => {
// Same as LeftMark, but for the right child.
let (right_indices, _mark) = indices.split_off(right_len);
(RequiredIndices::new(), right_indices)
(RequiredIndices::new().append(&[0]), right_indices)
}
// All requirements can be re-routed to left child directly.
JoinType::LeftAnti | JoinType::LeftSemi => (indices, RequiredIndices::new()),
Expand Down Expand Up @@ -2462,6 +2462,62 @@ mod tests {
)
}

// Stacked filter-less LeftMark joins (from `= ANY` / `<> ALL`) must keep
// each `mark` qualified so they don't collide.
#[test]
fn optimize_projections_stacked_mark_joins_keep_qualified_mark() -> Result<()> {
let person = test_table_scan_with_name("person")?;

let aliased_scan = |table: &str, alias: &str| -> Result<LogicalPlan> {
LogicalPlanBuilder::from(test_table_scan_with_name(table)?)
.project(vec![col(format!("{table}.a"))])?
.alias(alias)?
.build()
};

let plan = LogicalPlanBuilder::from(person)
.join_on(
aliased_scan("s1", "__correlated_sq_1")?,
JoinType::LeftMark,
vec![lit(true)],
)?
.join_on(
aliased_scan("s2", "__correlated_sq_2")?,
JoinType::LeftMark,
vec![lit(true)],
)?
.join_on(
aliased_scan("s3", "__correlated_sq_3")?,
JoinType::LeftMark,
vec![lit(true)],
)?
.filter(
col("__correlated_sq_1.mark")
.or(col("__correlated_sq_2.mark"))
.and(not(col("__correlated_sq_3.mark"))),
)?
.project(vec![col("person.a")])?
.build()?;

assert_optimized_plan_equal!(
plan,
@r"
Projection: person.a
Filter: (__correlated_sq_1.mark OR __correlated_sq_2.mark) AND NOT __correlated_sq_3.mark
LeftMark Join: Filter: Boolean(true)
LeftMark Join: Filter: Boolean(true)
LeftMark Join: Filter: Boolean(true)
TableScan: person projection=[a]
SubqueryAlias: __correlated_sq_1
TableScan: s1 projection=[a]
SubqueryAlias: __correlated_sq_2
TableScan: s2 projection=[a]
SubqueryAlias: __correlated_sq_3
TableScan: s3 projection=[a]
"
)
}

fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}

fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
Expand Down
19 changes: 19 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,25 @@ logical_plan
21)----------Projection: column1 AS v
22)------------Values: (Int64(5)), (Int64(NULL))

# same-table `= ANY` / `<> ALL` must plan without

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the code change in this PR, this test fails as expected

cargo test --profile=ci --test sqllogictests
...

Completed 475 test files in 9 seconds                                                                                                                                              External error: 2 errors in file /Users/andrewlamb/Software/datafusion3/datafusion/sqllogictest/test_files/subquery.slt

1. query failed: DataFusion error: Optimizer rule 'optimize_projections' failed
caused by
Schema error: Schema contains duplicate unqualified field name mark
[SQL] select id from set_cmp_self where age = any(select age from set_cmp_self);
at /Users/andrewlamb/Software/datafusion3/datafusion/sqllogictest/test_files/subquery.slt:1606


2. query failed: DataFusion error: Optimizer rule 'optimize_projections' failed
caused by
Schema error: Schema contains duplicate unqualified field name mark
[SQL] select id from set_cmp_self where age <> all(select age from set_cmp_self);
at /Users/andrewlamb/Software/datafusion3/datafusion/sqllogictest/test_files/subquery.slt:1613

# "duplicate unqualified field name mark".
statement ok
create table set_cmp_self(id int, age int) as values (1, 20), (2, 30), (3, 40);

query I rowsort
select id from set_cmp_self where age = any(select age from set_cmp_self);
----
1
2
3

query I
select id from set_cmp_self where age <> all(select age from set_cmp_self);
----

statement count 0
drop table set_cmp_self;

# correlated_recursive_scalar_subquery_with_level_3_exists_subquery_referencing_level1_relation
query TT
explain select c_custkey from customer
Expand Down
Loading