Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
27e07a7
feat: add spark.comet.cache.serializer.enabled config
andygrove Jun 2, 2026
0a9c233
feat: add CometCachedBatch and column stats helper
andygrove Jun 2, 2026
82517f7
fix: use ByteArray.compareBinary for version-safe string stats ordering
andygrove Jun 2, 2026
cf3a73c
test: exercise CometCachedBatch and document setRowCount ordering
andygrove Jun 2, 2026
6c80ef0
feat: add CometCachedBatchSerializer skeleton with schema routing
andygrove Jun 2, 2026
ec936f9
refactor: simplify CometCachedBatchSerializer schema routing and imports
andygrove Jun 2, 2026
fe21619
feat: encode cached batches as compressed Arrow IPC with stats
andygrove Jun 2, 2026
bf522db
feat: support TimestampNTZ in cache stats and harden build test
andygrove Jun 2, 2026
29cb511
feat: decode Comet cached batches with column pruning and row fallback
andygrove Jun 2, 2026
4f4aa16
test: cover columnar cache read path and clarify selectedIndices errors
andygrove Jun 2, 2026
3c1cd5f
feat: pass already-Arrow batches through CometSparkToColumnarExec wit…
andygrove Jun 2, 2026
826249f
refactor: import CometVector and clarify passthrough test/metric
andygrove Jun 2, 2026
4960cfb
feat: install Comet cache serializer from CometDriverPlugin when enabled
andygrove Jun 2, 2026
bec8c30
refactor: use StaticSQLConf.SPARK_CACHE_SERIALIZER and log serializer…
andygrove Jun 2, 2026
7ba47f4
test: end-to-end Comet cache serializer correctness, pruning, spill, …
andygrove Jun 2, 2026
9dc0f7d
test: assert cached values (not just counts) for filter and timestamp…
andygrove Jun 2, 2026
40184ce
fix: copy UTF8String for cache stats to avoid use-after-free on strin…
andygrove Jun 2, 2026
d1af3a3
chore: open as draft pull request [skip ci]
andygrove Jun 2, 2026
bedaa77
refactor: dedup cache decode path and precompute stat orderedness
andygrove Jun 2, 2026
920c57f
chore: keep draft CI skipped after cleanup [skip ci]
andygrove Jun 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions spark/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,19 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_CACHE_SERIALIZER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.cache.serializer.enabled")
.category(CATEGORY_EXEC)
.doc(
"When enabled, Comet installs a CachedBatchSerializer that stores Spark's in-memory " +
"table cache as compressed Arrow IPC. Repeated scans of cached data are then read " +
"natively without a per-read conversion. Schemas Comet cannot handle transparently " +
"fall back to Spark's default cache serializer. This sets " +
"spark.sql.cache.serializer for the session unless that property is already set " +
"to a non-default value. Disabled by default.")
.booleanConf
.createWithDefault(false)

val COMET_EXEC_ENABLED: ConfigEntry[Boolean] = conf(s"$COMET_EXEC_CONFIG_PREFIX.enabled")
.category(CATEGORY_EXEC)
.doc(
Expand Down
25 changes: 24 additions & 1 deletion spark/src/main/scala/org/apache/spark/Plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD, EXECUTOR_MEMORY_OVERHEAD_FACTOR}
import org.apache.spark.sql.internal.StaticSQLConf

import org.apache.comet.CometConf.{COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
import org.apache.comet.CometConf.{COMET_CACHE_SERIALIZER_ENABLED, COMET_METRICS_ENABLED, COMET_ONHEAP_ENABLED}
import org.apache.comet.CometSparkSessionExtensions

/**
Expand Down Expand Up @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
// register CometSparkSessionExtensions if it isn't already registered
CometDriverPlugin.registerCometSessionExtension(sc.conf)

// Install the Comet cache serializer if requested
CometDriverPlugin.setCacheSerializerIfEnabled(sc.conf)

// Register Comet metrics
CometDriverPlugin.registerCometMetrics(sc)

Expand Down Expand Up @@ -104,6 +107,26 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
}

object CometDriverPlugin extends Logging {
private[spark] val COMET_CACHE_SERIALIZER =
"org.apache.spark.sql.comet.CometCachedBatchSerializer"

/**
* If the Comet cache serializer is enabled, install it as Spark's cache serializer. This is a
* static SQL config, so it must be set on the SparkConf before the session is created. A
* user-provided non-default serializer is respected and not overridden.
*/
private[spark] def setCacheSerializerIfEnabled(conf: SparkConf): Unit = {
if (conf.getBoolean(COMET_CACHE_SERIALIZER_ENABLED.key, defaultValue = false)) {
val key = StaticSQLConf.SPARK_CACHE_SERIALIZER.key
val default = StaticSQLConf.SPARK_CACHE_SERIALIZER.defaultValueString
val existing = conf.get(key, default)
if (existing == default) {
logInfo(s"Setting $key=$COMET_CACHE_SERIALIZER")
conf.set(key, COMET_CACHE_SERIALIZER)
}
}
}

def registerCometMetrics(sc: SparkContext): Unit = {
if (sc.getConf.getBoolean(
COMET_METRICS_ENABLED.key,
Expand Down
Loading