diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index c6160bddd4..d43c4bfafd 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2034,6 +2034,7 @@ impl PhysicalPlanner { Ok(JoinType::FullOuter) => DFJoinType::Full, Ok(JoinType::LeftSemi) => DFJoinType::LeftSemi, Ok(JoinType::LeftAnti) => DFJoinType::LeftAnti, + Ok(JoinType::Existence) => DFJoinType::LeftMark, Err(_) => { return Err(GeneralError(format!( "Unsupported join type: {join_type:?}" diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 7f50aa928c..37d5f99c51 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -397,6 +397,7 @@ enum JoinType { FullOuter = 3; LeftSemi = 4; LeftAnti = 5; + Existence = 6; } enum BuildSide { diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala index 8cbf7c9189..9a3789b548 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/operators.scala @@ -1893,6 +1893,7 @@ trait CometHashJoin { case FullOuter => JoinType.FullOuter case LeftSemi => JoinType.LeftSemi case LeftAnti => JoinType.LeftAnti + case ExistenceJoin(_) => JoinType.Existence case _ => // Spark doesn't support other join types withFallbackReason(join, s"Unsupported join type ${join.joinType}") @@ -2009,6 +2010,11 @@ case class CometHashJoinExec( override def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = this.copy(left = newLeft, right = newRight) + override def producedAttributes: AttributeSet = joinType match { + case ExistenceJoin(exists) => AttributeSet(exists) + case _ => AttributeSet.empty + } + override def stringArgs: Iterator[Any] = Iterator(leftKeys, rightKeys, joinType, buildSide, condition, left, right) @@ -2150,6 +2156,11 @@ case class CometBroadcastHashJoinExec( override def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = this.copy(left = newLeft, right = newRight) + override def producedAttributes: AttributeSet = joinType match { + case ExistenceJoin(exists) => AttributeSet(exists) + case _ => AttributeSet.empty + } + override def stringArgs: Iterator[Any] = Iterator(leftKeys, rightKeys, joinType, condition, buildSide, left, right) @@ -2231,6 +2242,7 @@ object CometSortMergeJoinExec extends CometOperatorSerde[SortMergeJoinExec] { case FullOuter => JoinType.FullOuter case LeftSemi => JoinType.LeftSemi case LeftAnti => JoinType.LeftAnti + case ExistenceJoin(_) => JoinType.Existence case _ => // Spark doesn't support other join types withFallbackReason(join, s"Unsupported join type ${join.joinType}") @@ -2340,6 +2352,11 @@ case class CometSortMergeJoinExec( override def withNewChildrenInternal(newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = this.copy(left = newLeft, right = newRight) + override def producedAttributes: AttributeSet = joinType match { + case ExistenceJoin(exists) => AttributeSet(exists) + case _ => AttributeSet.empty + } + override def stringArgs: Iterator[Any] = Iterator(leftKeys, rightKeys, joinType, condition, left, right) diff --git a/spark/src/test/resources/sql-tests/join/existence_join.sql b/spark/src/test/resources/sql-tests/join/existence_join.sql new file mode 100644 index 0000000000..c2dbbbf525 --- /dev/null +++ b/spark/src/test/resources/sql-tests/join/existence_join.sql @@ -0,0 +1,164 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Tests for ExistenceJoin: produced when EXISTS / NOT EXISTS is combined +-- with another predicate via OR, preventing rewrite to LeftSemi / LeftAnti. +-- Each query runs against the three physical join strategies (BHJ, SHJ, +-- SMJ) via hints, so we exercise CometBroadcastHashJoinExec, +-- CometHashJoinExec, and CometSortMergeJoinExec all carrying joinType = +-- ExistenceJoin. + +-- ============================================================ +-- Setup: covers NULLs, duplicates, empty build side +-- ============================================================ + +statement +CREATE TABLE ex_left(id int, k int, region string) USING parquet + +statement +INSERT INTO ex_left VALUES + (1, 1, 'US'), + (2, 2, 'EU'), + (3, NULL, 'US'), + (4, 4, 'EU'), + (5, 5, 'EU') + +statement +CREATE TABLE ex_right(id int, k int) USING parquet + +statement +INSERT INTO ex_right VALUES (10, 1), (11, 2), (12, 2), (13, NULL) + +statement +CREATE TABLE ex_right_no_nulls(id int, k int) USING parquet + +statement +INSERT INTO ex_right_no_nulls VALUES (10, 1), (11, 5) + +statement +CREATE TABLE ex_right_empty(id int, k int) USING parquet + +statement +CREATE TABLE ex_right_dups(id int, k int) USING parquet + +statement +INSERT INTO ex_right_dups VALUES (10, 1), (11, 1), (12, 1), (13, 2) + +-- ============================================================ +-- EXISTS with OR: BHJ build-right +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- EXISTS with OR: SHJ build-right +-- ============================================================ + +query +SELECT /*+ SHUFFLE_HASH(ex_right) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- EXISTS with OR: SMJ +-- ============================================================ + +query +SELECT /*+ MERGE(ex_right) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- Empty build: every left row is unmatched, only OR-arm rows survive +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right_empty) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right_empty r WHERE r.k = l.k) +ORDER BY l.id + +query +SELECT /*+ MERGE(ex_right_empty) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right_empty r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- Right side has no NULL: NULL-keyed left row reaches the marker +-- evaluation but cannot match (NULL = anything is NULL → false), +-- so its exists tag is false. +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right_no_nulls) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right_no_nulls r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- NOT EXISTS combined with OR: also lowers to ExistenceJoin +-- (the optimizer flips the marker via NOT in the filter). +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right) */ * FROM ex_left l +WHERE l.region = 'US' OR NOT EXISTS (SELECT 1 FROM ex_right r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- Build with duplicate keys: marker is "at least one match", so duplicates +-- on the right must not multiply the output. +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right_dups) */ * FROM ex_left l +WHERE l.region = 'US' OR EXISTS (SELECT 1 FROM ex_right_dups r WHERE r.k = l.k) +ORDER BY l.id + +-- ============================================================ +-- Marker used inside a more complex predicate (NOT exists OR ...). +-- ============================================================ + +query +SELECT /*+ BROADCAST(ex_right) */ id, k, region FROM ex_left l +WHERE l.id > 1 + AND (l.region = 'US' OR NOT EXISTS (SELECT 1 FROM ex_right r WHERE r.k = l.k)) +ORDER BY l.id + +-- ============================================================ +-- Multi-column correlation +-- ============================================================ + +statement +CREATE TABLE ex_left_multi(id int, k1 int, k2 int) USING parquet + +statement +INSERT INTO ex_left_multi VALUES (1, 1, 100), (2, 2, 200), (3, 1, 300) + +statement +CREATE TABLE ex_right_multi(k1 int, k2 int) USING parquet + +statement +INSERT INTO ex_right_multi VALUES (1, 100), (2, 999) + +query +SELECT /*+ BROADCAST(ex_right_multi) */ * FROM ex_left_multi l +WHERE l.id > 0 + AND (l.k1 = 1 + OR EXISTS (SELECT 1 FROM ex_right_multi r WHERE r.k1 = l.k1 AND r.k2 = l.k2)) +ORDER BY l.id diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10/extended.txt new file mode 100644 index 0000000000..3d8ef408c2 --- /dev/null +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q10/extended.txt @@ -0,0 +1,57 @@ +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer_demographics + +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q35/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q35/extended.txt new file mode 100644 index 0000000000..3d8ef408c2 --- /dev/null +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q35/extended.txt @@ -0,0 +1,57 @@ +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer_demographics + +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q45/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q45/extended.txt new file mode 100644 index 0000000000..7f5a5b390d --- /dev/null +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4-spark3_5/q45/extended.txt @@ -0,0 +1,44 @@ +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometFilter + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.web_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.customer_address + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.item + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.item + +Comet accelerated 40 out of 41 eligible operators (97%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt index 07af300183..3d8ef408c2 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q10/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats] - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- Filter - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/extended.txt index 07af300183..3d8ef408c2 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats] - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- Filter - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q45/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q45/extended.txt index f95c69368f..7f5a5b390d 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q45/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q45/extended.txt @@ -1,45 +1,44 @@ -TakeOrderedAndProject -+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats] - +- Exchange - +- HashAggregate - +- Project - +- Filter - +- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] - :- CometNativeColumnarToRow - : +- CometProject - : +- CometBroadcastHashJoin - : :- CometProject - : : +- CometBroadcastHashJoin - : : :- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.web_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.customer_address - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- CometBroadcastExchange - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.item - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometFilter + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometBroadcastHashJoin + : : :- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.web_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.customer_address + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.item + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.item -Comet accelerated 32 out of 41 eligible operators (78%). Final plan contains 2 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 40 out of 41 eligible operators (97%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q35/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q35/extended.txt new file mode 100644 index 0000000000..3d8ef408c2 --- /dev/null +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7-spark3_5/q35/extended.txt @@ -0,0 +1,57 @@ +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange + +- CometProject + +- CometFilter + +- CometNativeScan parquet spark_catalog.default.customer_demographics + +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35/extended.txt b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35/extended.txt index 07af300183..3d8ef408c2 100644 --- a/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35/extended.txt +++ b/spark/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q35/extended.txt @@ -1,61 +1,57 @@ -TakeOrderedAndProject -+- HashAggregate [COMET: Spark Final aggregate without Comet Partial requires compatible intermediate buffer formats] - +- Exchange - +- HashAggregate - +- Project - +- BroadcastHashJoin - :- Project - : +- BroadcastHashJoin - : :- Project - : : +- Filter - : : +- BroadcastHashJoin - : : :- BroadcastHashJoin [COMET: Unsupported join type ExistenceJoin(exists#1)] - : : : :- CometNativeColumnarToRow - : : : : +- CometBroadcastHashJoin - : : : : :- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.customer - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometBroadcastHashJoin - : : : : :- CometNativeScan parquet spark_catalog.default.store_sales - : : : : : +- CometSubqueryBroadcast - : : : : : +- CometBroadcastExchange - : : : : : +- CometProject - : : : : : +- CometFilter - : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : : +- CometBroadcastExchange - : : : : +- CometProject - : : : : +- CometFilter - : : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : : +- BroadcastExchange - : : : +- CometNativeColumnarToRow - : : : +- CometProject - : : : +- CometBroadcastHashJoin - : : : :- CometNativeScan parquet spark_catalog.default.web_sales - : : : : +- ReusedSubquery - : : : +- CometBroadcastExchange - : : : +- CometProject - : : : +- CometFilter - : : : +- CometNativeScan parquet spark_catalog.default.date_dim - : : +- BroadcastExchange - : : +- CometNativeColumnarToRow - : : +- CometProject - : : +- CometBroadcastHashJoin - : : :- CometNativeScan parquet spark_catalog.default.catalog_sales - : : : +- ReusedSubquery - : : +- CometBroadcastExchange - : : +- CometProject - : : +- CometFilter - : : +- CometNativeScan parquet spark_catalog.default.date_dim - : +- BroadcastExchange - : +- CometNativeColumnarToRow - : +- CometProject - : +- CometFilter - : +- CometNativeScan parquet spark_catalog.default.customer_address - +- BroadcastExchange - +- CometNativeColumnarToRow +CometNativeColumnarToRow ++- CometTakeOrderedAndProject + +- CometHashAggregate + +- CometExchange + +- CometHashAggregate + +- CometProject + +- CometBroadcastHashJoin + :- CometProject + : +- CometBroadcastHashJoin + : :- CometProject + : : +- CometFilter + : : +- CometBroadcastHashJoin + : : :- CometBroadcastHashJoin + : : : :- CometBroadcastHashJoin + : : : : :- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.customer + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometBroadcastHashJoin + : : : : :- CometNativeScan parquet spark_catalog.default.store_sales + : : : : : +- CometSubqueryBroadcast + : : : : : +- CometBroadcastExchange + : : : : : +- CometProject + : : : : : +- CometFilter + : : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : : +- CometBroadcastExchange + : : : : +- CometProject + : : : : +- CometFilter + : : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometBroadcastHashJoin + : : : :- CometNativeScan parquet spark_catalog.default.web_sales + : : : : +- ReusedSubquery + : : : +- CometBroadcastExchange + : : : +- CometProject + : : : +- CometFilter + : : : +- CometNativeScan parquet spark_catalog.default.date_dim + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometBroadcastHashJoin + : : :- CometNativeScan parquet spark_catalog.default.catalog_sales + : : : +- ReusedSubquery + : : +- CometBroadcastExchange + : : +- CometProject + : : +- CometFilter + : : +- CometNativeScan parquet spark_catalog.default.date_dim + : +- CometBroadcastExchange + : +- CometProject + : +- CometFilter + : +- CometNativeScan parquet spark_catalog.default.customer_address + +- CometBroadcastExchange +- CometProject +- CometFilter +- CometNativeScan parquet spark_catalog.default.customer_demographics -Comet accelerated 35 out of 54 eligible operators (64%). Final plan contains 5 transitions between Spark and Comet. \ No newline at end of file +Comet accelerated 51 out of 54 eligible operators (94%). Final plan contains 1 transitions between Spark and Comet. \ No newline at end of file diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 29c9a7833f..0a4e867462 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.Tag import org.apache.spark.sql.CometTestBase import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometSortMergeJoinExec} +import org.apache.spark.sql.comet.{CometBroadcastExchangeExec, CometBroadcastHashJoinExec, CometHashJoinExec, CometSortMergeJoinExec} import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.internal.SQLConf @@ -702,4 +702,53 @@ class CometJoinSuite extends CometTestBase { } } } + + test("ExistenceJoin via BroadcastHashJoin (EXISTS combined with OR)") { + withSQLConf( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { + withParquetTable((0 until 100).map(i => (i, if (i % 3 == 0) "US" else "EU")), "tbl_a") { + withParquetTable((0 until 30).map(i => (i, i + 1)), "tbl_b") { + val df = sql("SELECT * FROM tbl_a a " + + "WHERE a._2 = 'US' OR EXISTS (SELECT /*+ BROADCAST(b) */ 1 FROM tbl_b b WHERE b._1 = a._1)") + checkSparkAnswerAndOperator( + df, + Seq(classOf[CometBroadcastExchangeExec], classOf[CometBroadcastHashJoinExec])) + } + } + } + } + + test("ExistenceJoin via ShuffledHashJoin (EXISTS combined with OR)") { + withSQLConf( + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + "spark.sql.join.forceApplyShuffledHashJoin" -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withParquetTable((0 until 100).map(i => (i, if (i % 3 == 0) "US" else "EU")), "tbl_a") { + withParquetTable((0 until 30).map(i => (i, i + 1)), "tbl_b") { + val df = sql( + "SELECT * FROM tbl_a a " + + "WHERE a._2 = 'US' OR EXISTS (SELECT 1 FROM tbl_b b WHERE b._1 = a._1)") + checkSparkAnswerAndOperator(df, Seq(classOf[CometHashJoinExec])) + } + } + } + } + + test("ExistenceJoin via SortMergeJoin (EXISTS combined with OR)") { + withSQLConf( + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withParquetTable((0 until 100).map(i => (i, if (i % 3 == 0) "US" else "EU")), "tbl_a") { + withParquetTable((0 until 30).map(i => (i, i + 1)), "tbl_b") { + val df = sql( + "SELECT * FROM tbl_a a " + + "WHERE a._2 = 'US' OR EXISTS (SELECT 1 FROM tbl_b b WHERE b._1 = a._1)") + checkSparkAnswerAndOperator(df, Seq(classOf[CometSortMergeJoinExec])) + } + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExistenceJoinBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExistenceJoinBenchmark.scala new file mode 100644 index 0000000000..e9cb2f9de6 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometExistenceJoinBenchmark.scala @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +import org.apache.comet.{CometConf, CometSparkSessionExtensions} + +/** + * Benchmark to measure performance of Comet's ExistenceJoin support across the three join + * physical operators (BHJ, SHJ, SMJ). To run this benchmark: + * {{{ + * SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometExistenceJoinBenchmark + * }}} + * Results will be written to "spark/benchmarks/CometExistenceJoinBenchmark-**results.txt". + */ +object CometExistenceJoinBenchmark extends CometBenchmarkBase { + + override def getSparkSession: SparkSession = { + val conf = new SparkConf() + .setAppName("CometExistenceJoinBenchmark") + .set("spark.master", "local[5]") + .setIfMissing("spark.driver.memory", "3g") + .setIfMissing("spark.executor.memory", "3g") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager") + + val sparkSession = SparkSession.builder + .config(conf) + .withExtensions(new CometSparkSessionExtensions) + .getOrCreate() + + sparkSession.conf.set(CometConf.COMET_ENABLED.key, "false") + sparkSession.conf.set(CometConf.COMET_EXEC_ENABLED.key, "false") + sparkSession.conf.set(SQLConf.ANSI_ENABLED.key, "false") + sparkSession.conf.set("spark.sql.shuffle.partitions", "2") + + sparkSession + } + + override def runCometBenchmark(mainArgs: Array[String]): Unit = { + val probeRows = 1024 * 1024 + val buildRows = 10000 + + withTempPath { dir => + withTempTable("probe", "build") { + spark + .range(probeRows) + .selectExpr("id AS k", "CASE WHEN id % 3 = 0 THEN 'US' ELSE 'EU' END AS region") + .write + .parquet(s"${dir.getAbsolutePath}/probe") + spark + .range(buildRows) + .selectExpr("id * 7 AS k") + .write + .parquet(s"${dir.getAbsolutePath}/build") + + spark.read.parquet(s"${dir.getAbsolutePath}/probe").createOrReplaceTempView("probe") + spark.read.parquet(s"${dir.getAbsolutePath}/build").createOrReplaceTempView("build") + + val query = + "SELECT count(*) FROM probe p " + + "WHERE p.region = 'US' OR EXISTS (SELECT 1 FROM build b WHERE b.k = p.k)" + + withSQLConf( + CometConf.COMET_ENABLED.key -> "true", + CometConf.COMET_EXEC_ENABLED.key -> "true") { + spark.sql(query).explain() + } + + runBenchmark("ExistenceJoin - BroadcastHashJoin") { + runExpressionBenchmark( + "exists OR predicate (BHJ)", + probeRows, + query, + Map( + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB")) + } + + runBenchmark("ExistenceJoin - ShuffledHashJoin") { + runExpressionBenchmark( + "exists OR predicate (SHJ)", + probeRows, + query, + Map( + SQLConf.PREFER_SORTMERGEJOIN.key -> "false", + "spark.sql.join.forceApplyShuffledHashJoin" -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1")) + } + + runBenchmark("ExistenceJoin - SortMergeJoin") { + runExpressionBenchmark( + "exists OR predicate (SMJ)", + probeRows, + query, + Map( + SQLConf.PREFER_SORTMERGEJOIN.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1")) + } + } + } + } +}