Skip to content

Commit a047144

Browse files
author
guihuawen
committed
[AURON #2090] Convert CoalesceExec to native implementation. #2090
1 parent 471a865 commit a047144

File tree

9 files changed

+430
-1
lines changed

9 files changed

+430
-1
lines changed

spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,9 @@ class ShimsImpl extends Shims with Logging {
283283
override def createNativeFilterExec(condition: Expression, child: SparkPlan): NativeFilterBase =
284284
NativeFilterExec(condition, child)
285285

286+
def createNativeCoalesceExec(numPartitions: Int, child: SparkPlan): NativeCoalesceBase =
287+
NativeCoalesceExec(numPartitions, child)
288+
286289
override def createNativeGenerateExec(
287290
generator: Generator,
288291
requiredChildOutput: Seq[Attribute],
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.execution.auron.plan
18+
19+
import org.apache.spark.sql.catalyst.expressions.Attribute
20+
import org.apache.spark.sql.execution.SparkPlan
21+
22+
import org.apache.auron.sparkver
23+
24+
case class NativeCoalesceExec(numPartitions: Int, child: SparkPlan)
25+
extends NativeCoalesceBase(numPartitions: Int, child: SparkPlan) {
26+
@sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0 / 4.1")
27+
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
28+
copy(child = newChild)
29+
30+
@sparkver("3.0 / 3.1")
31+
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
32+
copy(child = newChildren.head)
33+
34+
override def output: Seq[Attribute] =
35+
child.output
36+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.auron
18+
19+
import org.apache.spark.sql.{AuronQueryTest, Row}
20+
import org.apache.spark.sql.execution.auron.plan.NativeCoalesceExec
21+
22+
class AuronNativeCoalesceExecSuite extends AuronQueryTest with BaseAuronSQLSuite {
23+
import testImplicits._
24+
25+
test("test CoalesceExec to native") {
26+
withSQLConf("spark.auron.enable.coalesce" -> "true") {
27+
Seq((1, 2, "test test"))
28+
.toDF("c1", "c2", "part")
29+
.createOrReplaceTempView("coalesce_table1")
30+
val df =
31+
spark.sql("select /*+ coalesce(2)*/ a.c1, a.c2 from coalesce_table1 a ")
32+
33+
checkAnswer(df, Seq(Row(1, 2)))
34+
assert(collectFirst(df.queryExecution.executedPlan) {
35+
case coalesceExec: NativeCoalesceExec =>
36+
coalesceExec
37+
}.isDefined)
38+
39+
}
40+
}
41+
}

spark-extension/src/main/java/org/apache/auron/spark/configuration/SparkAuronConfiguration.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,11 @@ public class SparkAuronConfiguration extends AuronConfiguration {
411411
.withDescription("Enable AggregateExec operation conversion to native Auron implementations.")
412412
.withDefaultValue(true);
413413

414+
public static final ConfigOption<Boolean> ENABLE_COALESEC = new SQLConfOption<>(Boolean.class)
415+
.withKey("auron.enable.coalesce")
416+
.withCategory("Operator Supports")
417+
.withDescription("Enable CoalesceExec operation conversion to native Auron implementations.")
418+
.withDefaultValue(true);
414419
public static final ConfigOption<Boolean> ENABLE_EXPAND = new SQLConfOption<>(Boolean.class)
415420
.withKey("auron.enable.expand")
416421
.withCategory("Operator Supports")

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConvertStrategy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,8 @@ object AuronConvertStrategy extends Logging {
165165
e.setTagValue(convertStrategyTag, AlwaysConvert)
166166
case e: GenerateExec if isNative(e.child) =>
167167
e.setTagValue(convertStrategyTag, AlwaysConvert)
168+
case e: CoalesceExec if isNative(e.child) =>
169+
e.setTagValue(convertStrategyTag, AlwaysConvert)
168170
case e: ObjectHashAggregateExec if isNative(e.child) =>
169171
e.setTagValue(convertStrategyTag, AlwaysConvert)
170172
case e: LocalTableScanExec =>

spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ object AuronConverters extends Logging {
9595
SparkAuronConfiguration.ENABLE_TAKE_ORDERED_AND_PROJECT.get()
9696
def enableCollectLimit: Boolean = SparkAuronConfiguration.ENABLE_COLLECT_LIMIT.get()
9797
def enableAggr: Boolean = SparkAuronConfiguration.ENABLE_AGGR.get()
98+
def enableCoalesec: Boolean = SparkAuronConfiguration.ENABLE_COALESEC.get()
9899
def enableExpand: Boolean = SparkAuronConfiguration.ENABLE_EXPAND.get()
99100
def enableWindow: Boolean = SparkAuronConfiguration.ENABLE_WINDOW.get()
100101
def enableWindowGroupLimit: Boolean = SparkAuronConfiguration.ENABLE_WINDOW_GROUP_LIMIT.get()
@@ -201,6 +202,10 @@ object AuronConverters extends Logging {
201202
}
202203
convertedAgg
203204

205+
case e: CoalesceExec if enableCoalesec => // coalesec
206+
val convertedCoalesce = tryConvert(e, convertCoalesceExec)
207+
convertedCoalesce
208+
204209
case e: ObjectHashAggregateExec if enableAggr => // object hash aggregate
205210
val convertedAgg = tryConvert(e, convertObjectHashAggregateExec)
206211
if (!e.getTagValue(convertibleTag).contains(true)) {
@@ -308,6 +313,8 @@ object AuronConverters extends Logging {
308313
"Conversion disabled: spark.auron.enable.local.table.scan=false."
309314
case _: DataWritingCommandExec if !enableDataWriting =>
310315
"Conversion disabled: spark.auron.enable.data.writing=false."
316+
case _: CoalesceExec if !enableCoalesec =>
317+
"Conversion disabled: spark.auron.enable.coalesce=false."
311318
case _ =>
312319
s"${exec.getClass.getSimpleName} is not supported yet."
313320
}
@@ -745,6 +752,10 @@ object AuronConverters extends Logging {
745752
Shims.get.createNativeCollectLimitExec(limit, offset, exec.child)
746753
}
747754

755+
def convertCoalesceExec(exec: CoalesceExec): SparkPlan = {
756+
Shims.get.createNativeCoalesceExec(exec.numPartitions, exec.child)
757+
}
758+
748759
def convertHashAggregateExec(exec: HashAggregateExec): SparkPlan = {
749760
// split non-trivial children exprs in partial-agg to a ProjectExec
750761
// for enabling filter-project optimization in native side

spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeRDD.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.Partitioner
2626
import org.apache.spark.SparkContext
2727
import org.apache.spark.TaskContext
2828
import org.apache.spark.internal.Logging
29-
import org.apache.spark.rdd.RDD
29+
import org.apache.spark.rdd.{CoalescedRDDPartition, RDD}
3030
import org.apache.spark.sql.catalyst.InternalRow
3131

3232
import org.apache.auron.metric.SparkMetricNode
@@ -78,6 +78,32 @@ class NativeRDD(
7878
}
7979
}
8080

81+
class CoalesceNativeRDD(
82+
@transient private val rddSparkContext: SparkContext,
83+
rddDependencies: Seq[Dependency[_]],
84+
partitions: Array[Partition],
85+
friendlyName: String)
86+
extends NativeRDD(
87+
rddSparkContext,
88+
metrics = SparkMetricNode(Map.empty, Seq(), None),
89+
rddPartitions = partitions,
90+
rddPartitioner = None,
91+
rddDependencies,
92+
rddShuffleReadFull = false,
93+
nativePlan = (_, _) => null,
94+
friendlyName)
95+
with Logging
96+
with Serializable {
97+
98+
override protected def getPartitions: Array[Partition] = partitions
99+
100+
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
101+
split.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
102+
firstParent[InternalRow].iterator(parentPartition, context)
103+
}
104+
}
105+
}
106+
81107
class EmptyNativeRDD(@transient private val rddSparkContext: SparkContext)
82108
extends NativeRDD(
83109
rddSparkContext = rddSparkContext,

spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,8 @@ abstract class Shims {
139139
offset: Int,
140140
child: SparkPlan): NativeCollectLimitBase
141141

142+
def createNativeCoalesceExec(numPartitions: Int, child: SparkPlan): NativeCoalesceBase
143+
142144
def createNativeParquetInsertIntoHiveTableExec(
143145
cmd: InsertIntoHiveTable,
144146
child: SparkPlan): NativeParquetInsertIntoHiveTableBase

0 commit comments

Comments
 (0)