Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2073,6 +2073,7 @@ impl LogicalPlan {
filter,
join_constraint,
join_type,
null_aware,
..
}) => {
let join_expr: Vec<String> =
Expand All @@ -2081,6 +2082,8 @@ impl LogicalPlan {
.as_ref()
.map(|expr| format!(" Filter: {expr}"))
.unwrap_or_else(|| "".to_string());
let null_aware_expr =
if *null_aware { " null_aware" } else { "" };
let join_type = if filter.is_none()
&& keys.is_empty()
&& *join_type == JoinType::Inner
Expand All @@ -2100,15 +2103,17 @@ impl LogicalPlan {
filter_expr
)?;
}
write!(f, "{null_aware_expr}")?;
Ok(())
}
JoinConstraint::Using => {
write!(
f,
"{} Join: Using {}{}",
"{} Join: Using {}{}{}",
join_type,
join_expr.join(", "),
filter_expr,
null_aware_expr,
)
}
}
Expand Down
9 changes: 8 additions & 1 deletion datafusion/physical-plan/src/joins/hash_join/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,8 @@ impl DisplayAs for HashJoinExec {
let display_fetch = self
.fetch
.map_or_else(String::new, |f| format!(", fetch={f}"));
let display_null_aware =
if self.null_aware { ", null_aware" } else { "" };
let on = self
.on
.iter()
Expand All @@ -1149,14 +1151,15 @@ impl DisplayAs for HashJoinExec {
.join(", ");
write!(
f,
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}",
"HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}{}{}",
self.mode,
self.join_type,
on,
display_filter,
display_projections,
display_null_equality,
display_fetch,
display_null_aware,
)
}
DisplayFormatType::TreeRender => {
Expand All @@ -1179,6 +1182,10 @@ impl DisplayAs for HashJoinExec {
writeln!(f, "NullsEqual: true")?;
}

if self.null_aware {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add test for this

-- in slt
explain format tree
select...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, put it in explain_tree.slt.

writeln!(f, "null_aware")?;
}

if let Some(filter) = self.filter.as_ref() {
writeln!(f, "filter={filter}")?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ FROM left_parquet l
WHERE l.id NOT IN (SELECT r.id FROM right_parquet r);
----
logical_plan
01)LeftAnti Join: l.id = __correlated_sq_1.id
01)LeftAnti Join: l.id = __correlated_sq_1.id null_aware
02)--SubqueryAlias: l
03)----TableScan: left_parquet projection=[id, data]
04)--SubqueryAlias: __correlated_sq_1
05)----SubqueryAlias: r
06)------TableScan: right_parquet projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)], null_aware
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet
03)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]

Expand Down Expand Up @@ -469,15 +469,15 @@ ORDER BY l.id LIMIT 2;
----
logical_plan
01)Sort: l.id ASC NULLS LAST, fetch=2
02)--LeftAnti Join: l.id = __correlated_sq_1.id
02)--LeftAnti Join: l.id = __correlated_sq_1.id null_aware
03)----SubqueryAlias: l
04)------TableScan: left_parquet projection=[id, data]
05)----SubqueryAlias: __correlated_sq_1
06)------SubqueryAlias: r
07)--------TableScan: right_parquet projection=[id]
physical_plan
01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], preserve_partitioning=[false]
02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)], null_aware
03)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]}, projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
04)----DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]}, projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] AND DynamicFilter [ empty ]

Expand Down
25 changes: 25 additions & 0 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,31 @@ physical_plan
24)-----------------------------│ format: csv │
25)-----------------------------└───────────────────────────┘

# Query with null-aware anti join (NOT IN subquery).
query TT
explain select int_col from table1 where int_col not in (select int_col from table2);
----
physical_plan
01)┌───────────────────────────┐
02)│ HashJoinExec │
03)│ -------------------- │
04)│ join_type: LeftAnti │
05)│ │
06)│ null_aware ├──────────────┐
07)│ │ │
08)│ on: │ │
09)│ (int_col = int_col) │ │
10)└─────────────┬─────────────┘ │
11)┌─────────────┴─────────────┐┌─────────────┴─────────────┐
12)│ DataSourceExec ││ DataSourceExec │
13)│ -------------------- ││ -------------------- │
14)│ files: 1 ││ files: 1 │
15)│ format: csv ││ format: parquet │
16)│ ││ │
17)│ ││ predicate: │
18)│ ││ DynamicFilter [ empty ] │
19)└───────────────────────────┘└───────────────────────────┘

# Query with nested loop join.
query TT
explain select int_col from table1 where exists (select count(*) from table2);
Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1989,7 +1989,7 @@ where join_t1.t1_id + 12 not in
(select join_t2.t2_id + 1 from join_t2 where join_t1.t1_int > 0)
----
logical_plan
01)LeftAnti Join: CAST(join_t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.join_t2.t2_id + Int64(1) Filter: join_t1.t1_int > UInt32(0)
01)LeftAnti Join: CAST(join_t1.t1_id AS Int64) + Int64(12) = __correlated_sq_1.join_t2.t2_id + Int64(1) Filter: join_t1.t1_int > UInt32(0) null_aware
02)--TableScan: join_t1 projection=[t1_id, t1_name, t1_int]
03)--SubqueryAlias: __correlated_sq_1
04)----Projection: CAST(join_t2.t2_id AS Int64) + Int64(1)
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/null_aware_anti_join.slt
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ query TT
EXPLAIN SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_no_null);
----
logical_plan
01)LeftAnti Join: outer_table.id = __correlated_sq_1.id
01)LeftAnti Join: outer_table.id = __correlated_sq_1.id null_aware
02)--TableScan: outer_table projection=[id, value]
03)--SubqueryAlias: __correlated_sq_1
04)----TableScan: inner_table_no_null projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)], null_aware
02)--DataSourceExec: partitions=1, partition_sizes=[1]
03)--DataSourceExec: partitions=1, partition_sizes=[1]

Expand Down Expand Up @@ -193,12 +193,12 @@ query TT
EXPLAIN SELECT * FROM outer_table WHERE id NOT IN (SELECT id FROM inner_table_with_null);
----
logical_plan
01)LeftAnti Join: outer_table.id = __correlated_sq_1.id
01)LeftAnti Join: outer_table.id = __correlated_sq_1.id null_aware
02)--TableScan: outer_table projection=[id, value]
03)--SubqueryAlias: __correlated_sq_1
04)----TableScan: inner_table_with_null projection=[id]
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)], null_aware
02)--DataSourceExec: partitions=1, partition_sizes=[1]
03)--DataSourceExec: partitions=1, partition_sizes=[1]

Expand Down
4 changes: 2 additions & 2 deletions datafusion/sqllogictest/test_files/tpch/plans/q16.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ logical_plan
02)--Projection: part.p_brand, part.p_type, part.p_size, count(alias1) AS supplier_cnt
03)----Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size]], aggr=[[count(alias1)]]
04)------Aggregate: groupBy=[[part.p_brand, part.p_type, part.p_size, partsupp.ps_suppkey AS alias1]], aggr=[[]]
05)--------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey
05)--------LeftAnti Join: partsupp.ps_suppkey = __correlated_sq_1.s_suppkey null_aware
06)----------Projection: partsupp.ps_suppkey, part.p_brand, part.p_type, part.p_size
07)------------Inner Join: partsupp.ps_partkey = part.p_partkey
08)--------------TableScan: partsupp projection=[ps_partkey, ps_suppkey]
Expand All @@ -74,7 +74,7 @@ physical_plan
07)------------AggregateExec: mode=FinalPartitioned, gby=[p_brand@0 as p_brand, p_type@1 as p_type, p_size@2 as p_size, alias1@3 as alias1], aggr=[]
08)--------------RepartitionExec: partitioning=Hash([p_brand@0, p_type@1, p_size@2, alias1@3], 4), input_partitions=4
09)----------------AggregateExec: mode=Partial, gby=[p_brand@1 as p_brand, p_type@2 as p_type, p_size@3 as p_size, ps_suppkey@0 as alias1], aggr=[]
10)------------------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(ps_suppkey@0, s_suppkey@0)]
10)------------------HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(ps_suppkey@0, s_suppkey@0)], null_aware
11)--------------------CoalescePartitionsExec
12)----------------------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(ps_partkey@0, p_partkey@0)], projection=[ps_suppkey@1, p_brand@3, p_type@4, p_size@5]
13)------------------------RepartitionExec: partitioning=Hash([ps_partkey@0], 4), input_partitions=4
Expand Down
Loading