Skip to content

[SPARK-55395][SQL] Disable RDD cache in DataFrame.zipWithIndex#54178

Open
zhengruifeng wants to merge 5 commits intoapache:masterfrom
zhengruifeng:zip_with_index_cache
Open

[SPARK-55395][SQL] Disable RDD cache in DataFrame.zipWithIndex#54178
zhengruifeng wants to merge 5 commits intoapache:masterfrom
zhengruifeng:zip_with_index_cache

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Feb 6, 2026

What changes were proposed in this pull request?

Disable RDD cache in DataFrame.zipWithIndex

Why are the changes needed?

When AttachDistributedSequence was first introduced for Pandas API on Spark in 93cec49, the underlying RDD was always localCheckpointed to cache to avoid re-computation.
Then we hit serious executor memory issue, and in 4279090 we made the storage level configurable and release the cached data after each stage by AQE.

Since we are reusing AttachDistributedSequence to implement DataFrame.zipWithIndex, to be more conservative, we'd start with a no-cache version, it will be easy to enable the caching if necessary in the future.

Moreover, there is some chance to optimize the no-cache version #54169

This PR disable the RDD cache in DistributedSequenceID by default; and in the PS callsites, explicitly set cache=True

Does this PR introduce any user-facing change?

No

How was this patch tested?

CI

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions
Copy link

github-actions bot commented Feb 6, 2026

JIRA Issue Information

=== Improvement SPARK-55395 ===
Summary: Disable RDD cache in DataFrame.zipWithIndex
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@zhengruifeng zhengruifeng marked this pull request as ready for review February 6, 2026 13:06
@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Feb 7, 2026

on second thought, let me use a bool parameter cache to replace the storage level, to simplify the code

@staticmethod
def distributed_sequence_id() -> Column:
return InternalFunction._invoke_internal_function_over_columns("distributed_sequence_id")
return InternalFunction._invoke_internal_function_over_columns(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only used in PS

*/
private[sql] def withSequenceColumn(name: String) = {
select(Column(DistributedSequenceID()).alias(name), col("*"))
select(Column(DistributedSequenceID(Literal(true))).alias(name), col("*"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be also a place for PS on pyspark classic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant