From 10906f5daa0220689f84fb934e06eb56d6032749 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 6 Feb 2026 16:15:27 +0800 Subject: [PATCH 1/5] test test --- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../analysis/DeduplicateRelations.scala | 2 +- .../ExtractDistributedSequenceID.scala | 8 +++- .../expressions/DistributedSequenceID.scala | 38 ++++++++++++++++++- .../sql/catalyst/optimizer/Optimizer.scala | 2 +- .../logical/pythonLogicalOperators.scala | 8 ++-- .../apache/spark/sql/classic/Dataset.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../AttachDistributedSequenceExec.scala | 37 +++--------------- 9 files changed, 62 insertions(+), 43 deletions(-) diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala index 0f1fe314c3500..50906f0cdbf17 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2038,7 +2038,9 @@ abstract class Dataset[T] extends Serializable { * @since 4.2.0 */ def zipWithIndex(indexColName: String): DataFrame = { - select(col("*"), Column.internalFn("distributed_sequence_id").alias(indexColName)) + select( + col("*"), + Column.internalFn("distributed_sequence_id", functions.lit("NONE")).alias(indexColName)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index b8da376bead6f..2a2440117e401 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -441,7 +441,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { newVersion.copyTagsFrom(oldVersion) Seq((oldVersion, newVersion)) - case oldVersion @ AttachDistributedSequence(sequenceAttr, _) + case oldVersion @ AttachDistributedSequence(sequenceAttr, _, _) if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty => val newVersion = oldVersion.copy(sequenceAttr = sequenceAttr.newInstance()) newVersion.copyTagsFrom(oldVersion) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala index bf6ab8e50616c..3430f4ba12eed 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AttachDistributedSequence, L import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.DISTRIBUTED_SEQUENCE_ID import org.apache.spark.sql.types.LongType +import org.apache.spark.unsafe.types.UTF8String /** * Extracts [[DistributedSequenceID]] in logical plans, and replace it to @@ -34,8 +35,13 @@ object ExtractDistributedSequenceID extends Rule[LogicalPlan] { plan.resolveOperatorsUpWithPruning(_.containsPattern(DISTRIBUTED_SEQUENCE_ID)) { case plan: LogicalPlan if plan.resolved && plan.expressions.exists(_.exists(_.isInstanceOf[DistributedSequenceID])) => + val storageLevel = plan.expressions.flatMap(_.collectFirst { + case id: DistributedSequenceID => + id.storageLevel.eval().asInstanceOf[UTF8String].toString + }).head val attr = AttributeReference("distributed_sequence_id", LongType, nullable = false)() - val newPlan = plan.withNewChildren(plan.children.map(AttachDistributedSequence(attr, _))) + val newPlan = plan.withNewChildren( + plan.children.map(AttachDistributedSequence(attr, _, storageLevel))) .transformExpressions { case _: DistributedSequenceID => attr } Project(plan.output, newPlan) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala index 5a0bff990e68a..d7af4a8706909 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.trees.TreePattern.{DISTRIBUTED_SEQUENCE_ID, TreePattern} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, LongType} +import org.apache.spark.storage.StorageLevelMapper /** * Returns increasing 64-bit integers consecutive from 0. @@ -26,10 +28,42 @@ import org.apache.spark.sql.types.{DataType, LongType} * * @note this expression is dedicated for Pandas API on Spark to use. */ -case class DistributedSequenceID() extends LeafExpression with Unevaluable with NonSQLExpression { +case class DistributedSequenceID(storageLevel: Expression) + extends LeafExpression with Unevaluable with NonSQLExpression { + + // This constructor is dedicated for Pandas API on Spark. + // Get the storageLevel according to pandas_on_Spark.compute.default_index_cache. + def this() = this( + // Before `compute.default_index_cache` is explicitly set via + // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value); + // after `ps.set_option`, `SQLConf.get` can get its value: + // + // In [1]: import pyspark.pandas as ps + // In [2]: ps.get_option("compute.default_index_cache") + // Out[2]: 'MEMORY_AND_DISK_SER' + // In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // ... + // Py4JJavaError: An error occurred while calling o40.get. + // : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_... + // at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError... + // at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766) + // ... + // In [4]: ps.set_option("compute.default_index_cache", "NONE") + // In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[5]: '"NONE"' + // In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY") + // In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[7]: '"DISK_ONLY"' + // The string is double quoted because of JSON ser/deser for pandas API on Spark + Literal( + SQLConf.get.getConfString("pandas_on_Spark.compute.default_index_cache", + StorageLevelMapper.MEMORY_AND_DISK_SER.name() + ).stripPrefix("\"").stripSuffix("\"") + ) + ) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = { - DistributedSequenceID() + DistributedSequenceID(storageLevel) } override def nullable: Boolean = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index fe15819bd44a7..125db2752b209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1068,7 +1068,7 @@ object ColumnPruning extends Rule[LogicalPlan] { a.copy(child = Expand(newProjects, newOutput, grandChild)) // Prune and drop AttachDistributedSequence if the produced attribute is not referred. - case p @ Project(_, a @ AttachDistributedSequence(_, grandChild)) + case p @ Project(_, a @ AttachDistributedSequence(_, grandChild, _)) if !p.references.contains(a.sequenceAttr) => p.copy(child = prunedChild(grandChild, p.references)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index bcfcae2ee16c9..aa5cf2ce184b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -367,12 +367,14 @@ case class ArrowEvalPythonUDTF( /** * A logical plan that adds a new long column with the name `name` that - * increases one by one. This is for 'distributed-sequence' default index - * in pandas API on Spark. + * increases one by one. + * This is used in both 'distributed-sequence' index in pandas API on Spark + * and 'DataFrame.zipWithIndex'. */ case class AttachDistributedSequence( sequenceAttr: Attribute, - child: LogicalPlan) extends UnaryNode { + child: LogicalPlan, + storageLevel: String) extends UnaryNode { override val producedAttributes: AttributeSet = AttributeSet(sequenceAttr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index 088df782a541c..66c600185ed98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -2062,7 +2062,7 @@ class Dataset[T] private[sql]( * This is for 'distributed-sequence' default index in pandas API on Spark. */ private[sql] def withSequenceColumn(name: String) = { - select(Column(DistributedSequenceID()).alias(name), col("*")) + select(Column(new DistributedSequenceID()).alias(name), col("*")) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5efad83bcba78..1e2ceeb2ae691 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -969,8 +969,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.python.MapInPandasExec(func, output, planLater(child), isBarrier, profile) :: Nil case logical.MapInArrow(func, output, child, isBarrier, profile) => execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil - case logical.AttachDistributedSequence(attr, child) => - execution.python.AttachDistributedSequenceExec(attr, planLater(child)) :: Nil + case logical.AttachDistributedSequence(attr, child, storageLevel) => + execution.python.AttachDistributedSequenceExec(attr, planLater(child), storageLevel) :: Nil case logical.PythonWorkerLogs(jsonAttr) => execution.python.PythonWorkerLogsExec(jsonAttr) :: Nil case logical.MapElements(f, _, _, objAttr, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index e27bde38a6f5f..9c20f020328c3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.python +import java.util.Locale + import org.apache.spark.internal.LogKeys.{RDD_ID, SPARK_PLAN_ID} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -24,8 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} +import org.apache.spark.storage.StorageLevel /** * A physical plan that adds a new long column with `sequenceAttr` that @@ -34,7 +35,8 @@ import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} */ case class AttachDistributedSequenceExec( sequenceAttr: Attribute, - child: SparkPlan) + child: SparkPlan, + storageLevel: String) extends UnaryExecNode { override def producedAttributes: AttributeSet = AttributeSet(sequenceAttr) @@ -47,34 +49,7 @@ case class AttachDistributedSequenceExec( override protected def doExecute(): RDD[InternalRow] = { val childRDD = child.execute() - // before `compute.default_index_cache` is explicitly set via - // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value); - // after `ps.set_option`, `SQLConf.get` can get its value: - // - // In [1]: import pyspark.pandas as ps - // In [2]: ps.get_option("compute.default_index_cache") - // Out[2]: 'MEMORY_AND_DISK_SER' - // In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // ... - // Py4JJavaError: An error occurred while calling o40.get. - // : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_... - // at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError... - // at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766) - // ... - // In [4]: ps.set_option("compute.default_index_cache", "NONE") - // In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // Out[5]: '"NONE"' - // In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY") - // In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // Out[7]: '"DISK_ONLY"' - - // The string is double quoted because of JSON ser/deser for pandas API on Spark - val storageLevel = SQLConf.get.getConfString( - "pandas_on_Spark.compute.default_index_cache", - StorageLevelMapper.MEMORY_AND_DISK_SER.name() - ).stripPrefix("\"").stripSuffix("\"") - - val cachedRDD = storageLevel match { + val cachedRDD = storageLevel.toUpperCase(Locale.ROOT) match { // zipWithIndex launches a Spark job only if #partition > 1 case _ if childRDD.getNumPartitions <= 1 => childRDD From afd73c805d7c3255beb041a30ec1d6adc9d2b4fb Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 6 Feb 2026 18:24:42 +0800 Subject: [PATCH 2/5] test --- .../spark/sql/catalyst/optimizer/ColumnPruningSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 266c369894eca..5a44a105bfe8d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -460,7 +460,7 @@ class ColumnPruningSuite extends PlanTest { test("SPARK-36559 Prune and drop distributed-sequence if the produced column is not referred") { val input = LocalRelation($"a".int, $"b".int, $"c".int) - val plan1 = AttachDistributedSequence($"d".int, input).select($"a") + val plan1 = AttachDistributedSequence($"d".int, input, "NONE").select($"a") val correctAnswer1 = Project(Seq($"a"), input).analyze comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) } From 0373a5a7bb515be07cfe184f62492b6904929e37 Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Fri, 6 Feb 2026 21:02:05 +0800 Subject: [PATCH 3/5] regen --- .../query-tests/queries/zipWithIndex.json | 22 ++++++++++++++++++ .../queries/zipWithIndex.proto.bin | Bin 644 -> 837 bytes .../queries/zipWithIndex_custom_column.json | 22 ++++++++++++++++++ .../zipWithIndex_custom_column.proto.bin | Bin 647 -> 840 bytes 4 files changed, 44 insertions(+) diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json index 9c0278561a61b..0ced35dee02d4 100644 --- a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json +++ b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json @@ -37,6 +37,28 @@ "expr": { "unresolvedFunction": { "functionName": "distributed_sequence_id", + "arguments": [{ + "literal": { + "string": "NONE" + }, + "common": { + "origin": { + "jvmOrigin": { + "stackTrace": [{ + "classLoaderName": "app", + "declaringClass": "org.apache.spark.sql.Dataset", + "methodName": "zipWithIndex", + "fileName": "Dataset.scala" + }, { + "classLoaderName": "app", + "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite", + "methodName": "~~trimmed~anonfun~~", + "fileName": "PlanGenerationTestSuite.scala" + }] + } + } + } + }], "isInternal": true }, "common": { diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin index 2109d63291ee6786452591c01f13b81273613bfa..ee9a4f07cb4949684aae3e66dc9d2d812220af0a 100644 GIT binary patch delta 72 zcmZo+J<7(;#U#KewU2Eg`*Z!PEJkygxh6A9-DczxPsuDUDauSLElEv@FHS8iP0dSA cjn7OG+RMnrmc`=d@8>%CJmc2Qf{ewC02t#Iy#N3J delta 61 zcmX@g*22op#U#Ke^_OKL`*YPP%tm(@xh^qENpp#(WEPhcWhRxDq^86drxuo`<|U`b RXQpT{PM*NDb#f=u9soGe6m$vV(Vr>#xh0#1Ai7Z delta 61 zcmX@X*3Qb##U#Ke)xbKD{kiINW~2LzT$dQ7q`AaXGK))!GLuS6Qd8oKQwvK|^O95J RGgCAeC(AN!o!rT^9{?X=6O;e| From c4e7d72ef3c9b2ebe3fe0e2e29456eda4a54522b Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Sat, 7 Feb 2026 09:40:28 +0800 Subject: [PATCH 4/5] test --- python/pyspark/sql/internal.py | 4 +- .../scala/org/apache/spark/sql/Dataset.scala | 4 +- .../ExtractDistributedSequenceID.scala | 10 ++- .../expressions/DistributedSequenceID.scala | 38 ++--------- .../logical/pythonLogicalOperators.scala | 2 +- .../optimizer/ColumnPruningSuite.scala | 2 +- .../query-tests/queries/zipWithIndex.json | 22 ------ .../queries/zipWithIndex.proto.bin | Bin 837 -> 644 bytes .../queries/zipWithIndex_custom_column.json | 22 ------ .../zipWithIndex_custom_column.proto.bin | Bin 840 -> 647 bytes .../apache/spark/sql/classic/Dataset.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 4 +- .../AttachDistributedSequenceExec.scala | 64 ++++++++++++++---- 13 files changed, 68 insertions(+), 106 deletions(-) diff --git a/python/pyspark/sql/internal.py b/python/pyspark/sql/internal.py index 3007b28b00441..dd9ebbcdc1822 100644 --- a/python/pyspark/sql/internal.py +++ b/python/pyspark/sql/internal.py @@ -104,7 +104,9 @@ def distributed_id() -> Column: @staticmethod def distributed_sequence_id() -> Column: - return InternalFunction._invoke_internal_function_over_columns("distributed_sequence_id") + return InternalFunction._invoke_internal_function_over_columns( + "distributed_sequence_id", F.lit(True) + ) @staticmethod def collect_top_k(col: Column, num: int, reverse: bool) -> Column: diff --git a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala index 50906f0cdbf17..0f1fe314c3500 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2038,9 +2038,7 @@ abstract class Dataset[T] extends Serializable { * @since 4.2.0 */ def zipWithIndex(indexColName: String): DataFrame = { - select( - col("*"), - Column.internalFn("distributed_sequence_id", functions.lit("NONE")).alias(indexColName)) + select(col("*"), Column.internalFn("distributed_sequence_id").alias(indexColName)) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala index 3430f4ba12eed..fe26122f3ac13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ExtractDistributedSequenceID.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{AttachDistributedSequence, L import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.DISTRIBUTED_SEQUENCE_ID import org.apache.spark.sql.types.LongType -import org.apache.spark.unsafe.types.UTF8String /** * Extracts [[DistributedSequenceID]] in logical plans, and replace it to @@ -35,13 +34,12 @@ object ExtractDistributedSequenceID extends Rule[LogicalPlan] { plan.resolveOperatorsUpWithPruning(_.containsPattern(DISTRIBUTED_SEQUENCE_ID)) { case plan: LogicalPlan if plan.resolved && plan.expressions.exists(_.exists(_.isInstanceOf[DistributedSequenceID])) => - val storageLevel = plan.expressions.flatMap(_.collectFirst { - case id: DistributedSequenceID => - id.storageLevel.eval().asInstanceOf[UTF8String].toString - }).head + val cache = plan.expressions.exists(_.exists(e => + e.isInstanceOf[DistributedSequenceID] && + e.asInstanceOf[DistributedSequenceID].cache.eval().asInstanceOf[Boolean])) val attr = AttributeReference("distributed_sequence_id", LongType, nullable = false)() val newPlan = plan.withNewChildren( - plan.children.map(AttachDistributedSequence(attr, _, storageLevel))) + plan.children.map(AttachDistributedSequence(attr, _, cache))) .transformExpressions { case _: DistributedSequenceID => attr } Project(plan.output, newPlan) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala index d7af4a8706909..962ba984539eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala @@ -18,9 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.trees.TreePattern.{DISTRIBUTED_SEQUENCE_ID, TreePattern} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, LongType} -import org.apache.spark.storage.StorageLevelMapper /** * Returns increasing 64-bit integers consecutive from 0. @@ -28,42 +26,14 @@ import org.apache.spark.storage.StorageLevelMapper * * @note this expression is dedicated for Pandas API on Spark to use. */ -case class DistributedSequenceID(storageLevel: Expression) +case class DistributedSequenceID(cache: Expression) extends LeafExpression with Unevaluable with NonSQLExpression { - // This constructor is dedicated for Pandas API on Spark. - // Get the storageLevel according to pandas_on_Spark.compute.default_index_cache. - def this() = this( - // Before `compute.default_index_cache` is explicitly set via - // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value); - // after `ps.set_option`, `SQLConf.get` can get its value: - // - // In [1]: import pyspark.pandas as ps - // In [2]: ps.get_option("compute.default_index_cache") - // Out[2]: 'MEMORY_AND_DISK_SER' - // In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // ... - // Py4JJavaError: An error occurred while calling o40.get. - // : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_... - // at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError... - // at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766) - // ... - // In [4]: ps.set_option("compute.default_index_cache", "NONE") - // In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // Out[5]: '"NONE"' - // In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY") - // In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") - // Out[7]: '"DISK_ONLY"' - // The string is double quoted because of JSON ser/deser for pandas API on Spark - Literal( - SQLConf.get.getConfString("pandas_on_Spark.compute.default_index_cache", - StorageLevelMapper.MEMORY_AND_DISK_SER.name() - ).stripPrefix("\"").stripSuffix("\"") - ) - ) + // This argument indicate whether this expression is from Pandas API on Spark. + def this() = this(Literal(false)) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = { - DistributedSequenceID(storageLevel) + DistributedSequenceID(cache) } override def nullable: Boolean = false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala index aa5cf2ce184b3..db22a0781c0e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala @@ -374,7 +374,7 @@ case class ArrowEvalPythonUDTF( case class AttachDistributedSequence( sequenceAttr: Attribute, child: LogicalPlan, - storageLevel: String) extends UnaryNode { + cache: Boolean = false) extends UnaryNode { override val producedAttributes: AttributeSet = AttributeSet(sequenceAttr) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5a44a105bfe8d..266c369894eca 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -460,7 +460,7 @@ class ColumnPruningSuite extends PlanTest { test("SPARK-36559 Prune and drop distributed-sequence if the produced column is not referred") { val input = LocalRelation($"a".int, $"b".int, $"c".int) - val plan1 = AttachDistributedSequence($"d".int, input, "NONE").select($"a") + val plan1 = AttachDistributedSequence($"d".int, input).select($"a") val correctAnswer1 = Project(Seq($"a"), input).analyze comparePlans(Optimize.execute(plan1.analyze), correctAnswer1) } diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json index 0ced35dee02d4..9c0278561a61b 100644 --- a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json +++ b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.json @@ -37,28 +37,6 @@ "expr": { "unresolvedFunction": { "functionName": "distributed_sequence_id", - "arguments": [{ - "literal": { - "string": "NONE" - }, - "common": { - "origin": { - "jvmOrigin": { - "stackTrace": [{ - "classLoaderName": "app", - "declaringClass": "org.apache.spark.sql.Dataset", - "methodName": "zipWithIndex", - "fileName": "Dataset.scala" - }, { - "classLoaderName": "app", - "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite", - "methodName": "~~trimmed~anonfun~~", - "fileName": "PlanGenerationTestSuite.scala" - }] - } - } - } - }], "isInternal": true }, "common": { diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex.proto.bin index ee9a4f07cb4949684aae3e66dc9d2d812220af0a..2109d63291ee6786452591c01f13b81273613bfa 100644 GIT binary patch delta 61 zcmX@g*22op#U#Ke^_OKL`*YPP%tm(@xh^qENpp#(WEPhcWhRxDq^86drxuo`<|U`b RXQpT{PM*NDb#f=u9soGe6m%CJmc2Qf{ewC02t#Iy#N3J diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json index c6c302df2560c..80229004d7a8f 100644 --- a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json +++ b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.json @@ -37,28 +37,6 @@ "expr": { "unresolvedFunction": { "functionName": "distributed_sequence_id", - "arguments": [{ - "literal": { - "string": "NONE" - }, - "common": { - "origin": { - "jvmOrigin": { - "stackTrace": [{ - "classLoaderName": "app", - "declaringClass": "org.apache.spark.sql.Dataset", - "methodName": "zipWithIndex", - "fileName": "Dataset.scala" - }, { - "classLoaderName": "app", - "declaringClass": "org.apache.spark.sql.PlanGenerationTestSuite", - "methodName": "~~trimmed~anonfun~~", - "fileName": "PlanGenerationTestSuite.scala" - }] - } - } - } - }], "isInternal": true }, "common": { diff --git a/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin b/sql/connect/common/src/test/resources/query-tests/queries/zipWithIndex_custom_column.proto.bin index 885707a9cf6c9729cb43f6368eb2cfd8f69882cb..0b60290a424193af15af4d2eb751c39e7ab9709f 100644 GIT binary patch delta 61 zcmX@X*3Qb##U#Ke)xbKD{kiINW~2LzT$dQ7q`AaXGK))!GLuS6Qd8oKQwvK|^O95J RGgCAeC(AN!o!rT^9{?X=6O;e| delta 71 zcmZo?J;BD##U#Keb&zc$`*ZypEJh2Mxh6A9-DczxPsuDUDauSLElEv@FHS8iP0dSA bjn7OG+RMnrmc`=d@8>$vV(Vr>#xh0#1Ai7Z diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala index 66c600185ed98..17d4640f22fad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/Dataset.scala @@ -2062,7 +2062,7 @@ class Dataset[T] private[sql]( * This is for 'distributed-sequence' default index in pandas API on Spark. */ private[sql] def withSequenceColumn(name: String) = { - select(Column(new DistributedSequenceID()).alias(name), col("*")) + select(Column(DistributedSequenceID(Literal(true))).alias(name), col("*")) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1e2ceeb2ae691..5c393b1db227e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -969,8 +969,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.python.MapInPandasExec(func, output, planLater(child), isBarrier, profile) :: Nil case logical.MapInArrow(func, output, child, isBarrier, profile) => execution.python.MapInArrowExec(func, output, planLater(child), isBarrier, profile) :: Nil - case logical.AttachDistributedSequence(attr, child, storageLevel) => - execution.python.AttachDistributedSequenceExec(attr, planLater(child), storageLevel) :: Nil + case logical.AttachDistributedSequence(attr, child, cache) => + execution.python.AttachDistributedSequenceExec(attr, planLater(child), cache) :: Nil case logical.PythonWorkerLogs(jsonAttr) => execution.python.PythonWorkerLogsExec(jsonAttr) :: Nil case logical.MapElements(f, _, _, objAttr, child) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index 9c20f020328c3..507b632f55653 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.python -import java.util.Locale - import org.apache.spark.internal.LogKeys.{RDD_ID, SPARK_PLAN_ID} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -26,17 +24,21 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.storage.{StorageLevel, StorageLevelMapper} /** * A physical plan that adds a new long column with `sequenceAttr` that - * increases one by one. This is for 'distributed-sequence' default index - * in pandas API on Spark. + * increases one by one. + * This is for 'distributed-sequence' default index in pandas API on Spark, + * and 'DataFrame.zipWithIndex' + * When cache is true, the underlying RDD will be cached according to + * PS config "pandas_on_Spark.compute.default_index_cache". */ case class AttachDistributedSequenceExec( sequenceAttr: Attribute, child: SparkPlan, - storageLevel: String) + cache: Boolean) extends UnaryExecNode { override def producedAttributes: AttributeSet = AttributeSet(sequenceAttr) @@ -47,24 +49,60 @@ case class AttachDistributedSequenceExec( @transient private var cached: RDD[InternalRow] = _ - override protected def doExecute(): RDD[InternalRow] = { - val childRDD = child.execute() - val cachedRDD = storageLevel.toUpperCase(Locale.ROOT) match { + // cache the underlying RDD according to + // PS config "pandas_on_Spark.compute.default_index_cache" + private def cacheRDD(rdd: RDD[InternalRow]): RDD[InternalRow] = { + // before `compute.default_index_cache` is explicitly set via + // `ps.set_option`, `SQLConf.get` can not get its value (as well as its default value); + // after `ps.set_option`, `SQLConf.get` can get its value: + // + // In [1]: import pyspark.pandas as ps + // In [2]: ps.get_option("compute.default_index_cache") + // Out[2]: 'MEMORY_AND_DISK_SER' + // In [3]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // ... + // Py4JJavaError: An error occurred while calling o40.get. + // : java.util.NoSuchElementException: pandas_on_Spark.compute.distributed_sequence_... + // at org.apache.spark.sql.errors.QueryExecutionErrors$.noSuchElementExceptionError... + // at org.apache.spark.sql.internal.SQLConf.$anonfun$getConfString$3(SQLConf.scala:4766) + // ... + // In [4]: ps.set_option("compute.default_index_cache", "NONE") + // In [5]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[5]: '"NONE"' + // In [6]: ps.set_option("compute.default_index_cache", "DISK_ONLY") + // In [7]: spark.conf.get("pandas_on_Spark.compute.default_index_cache") + // Out[7]: '"DISK_ONLY"' + + // The string is double quoted because of JSON ser/deser for pandas API on Spark + val storageLevel = SQLConf.get.getConfString( + "pandas_on_Spark.compute.default_index_cache", + StorageLevelMapper.MEMORY_AND_DISK_SER.name() + ).stripPrefix("\"").stripSuffix("\"") + + storageLevel match { // zipWithIndex launches a Spark job only if #partition > 1 - case _ if childRDD.getNumPartitions <= 1 => childRDD + case _ if rdd.getNumPartitions <= 1 => rdd - case "NONE" => childRDD + case "NONE" => rdd case "LOCAL_CHECKPOINT" => // localcheckpointing is unreliable so should not eagerly release it in 'cleanupResources' - childRDD.map(_.copy()).localCheckpoint() + rdd.map(_.copy()).localCheckpoint() .setName(s"Temporary RDD locally checkpointed in AttachDistributedSequenceExec($id)") case _ => - cached = childRDD.map(_.copy()).persist(StorageLevel.fromString(storageLevel)) + cached = rdd.map(_.copy()).persist(StorageLevel.fromString(storageLevel)) .setName(s"Temporary RDD cached in AttachDistributedSequenceExec($id)") cached } + } + + override protected def doExecute(): RDD[InternalRow] = { + val childRDD: RDD[InternalRow] = child.execute() + + // if cache is true, the underlying rdd is cached according to + // PS config "pandas_on_Spark.compute.default_index_cache" + val cachedRDD = if (cache) this.cacheRDD(childRDD) else childRDD cachedRDD.zipWithIndex().mapPartitions { iter => val unsafeProj = UnsafeProjection.create(output, output) From f258667f42e461fe3bb192f73fabe57648144b5d Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Sat, 7 Feb 2026 09:49:07 +0800 Subject: [PATCH 5/5] nit --- .../spark/sql/catalyst/expressions/DistributedSequenceID.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala index 962ba984539eb..cd71ee8580525 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DistributedSequenceID.scala @@ -29,7 +29,8 @@ import org.apache.spark.sql.types.{DataType, LongType} case class DistributedSequenceID(cache: Expression) extends LeafExpression with Unevaluable with NonSQLExpression { - // This argument indicate whether this expression is from Pandas API on Spark. + // This argument indicate whether the underlying RDD should be cached + // according to PS config "pandas_on_Spark.compute.default_index_cache". def this() = this(Literal(false)) override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = {