Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions docs/source/user-guide/latest/expressions.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ Comet focuses acceleration on mainstream relational, string, datetime, math, and
expressions. The following function families are **not currently planned** for native acceleration (they are not on the 1.0 roadmap): specialized functionality with narrow real-world analytics use and high implementation cost. They fall back to Spark and may be reconsidered based on demand:

- **Probabilistic sketches and approximate top-k** (`kll_sketch_*`, `hll_*`, `theta_*`, `count_min_sketch`, `bitmap_*`, `approx_top_k*`): specialized data structures with exact-correctness traps.
- **XML / XPath** (`from_xml`, `to_xml`, `schema_of_xml`, `xpath*`): legacy text format, rare in accelerated workloads.
- **Geospatial** (`st_*`): brand-new Spark 4.1 functionality, specialized.
- **Avro / Protobuf codecs** (`from_avro`, `to_avro`, `from_protobuf`, `to_protobuf`, `schema_of_avro`): format conversion belongs at the IO layer, not expression evaluation.
- **JVM reflection** (`java_method`, `reflect`): niche, and they invoke arbitrary JVM methods (a security concern).
- **CSV functions** (`from_csv`, `to_csv`, `schema_of_csv`): row-level CSV parsing and formatting in expressions is niche and better handled at the data source layer.
- **UTF-8 validation** (`is_valid_utf8`, `make_valid_utf8`, `validate_utf8`, `try_validate_utf8`): niche Spark 4.x string-validation helpers.
- **File metadata** (`input_file_name`, `input_file_block_start`, `input_file_block_length`): require scan-internal per-row file information, outside the expression layer.
- **Miscellaneous niche** (`histogram_numeric`, `version`, `sentences`, `quote`): low-value or specialized functions with little benefit from native acceleration.
Expand Down Expand Up @@ -220,6 +218,16 @@ The type-name conversion functions (`bigint`, `binary`, `boolean`, `date`, `deci

---

## csv_funcs

| Function | Status | Notes |
| --- | --- | --- |
| `from_csv` | ✅ | |
| `schema_of_csv` | ✅ | |
| `to_csv` | ✅ | |

---

## datetime_funcs

| Function | Status | Notes |
Expand Down Expand Up @@ -339,9 +347,9 @@ expression-level). The `outer` variants are wired but marked `Incompatible`; the
| `from_json` | ✅ | Falls back by default; opt-in via allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#from_json)) |
| `get_json_object` | ✅ | Some inputs need allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#get_json_object)) |
| `json_array_length` | ✅ | Single-quoted/trailing JSON needs allowIncompatible ([audit](../../contributor-guide/expression-audits/json_funcs.md#json_array_length)) |
| `json_object_keys` | 🔜 | [#3161](https://github.com/apache/datafusion-comet/issues/3161) |
| `json_object_keys` | | |
| `json_tuple` | 🔜 | [#3160](https://github.com/apache/datafusion-comet/issues/3160) |
| `schema_of_json` | 🔜 | [#3163](https://github.com/apache/datafusion-comet/issues/3163) |
| `schema_of_json` | | |
| `to_json` | ✅ | Options and map/array inputs fall back ([audit](../../contributor-guide/expression-audits/json_funcs.md#to_json)) |

---
Expand Down Expand Up @@ -639,6 +647,25 @@ fall back to Spark.

---

## xml_funcs

| Function | Status | Notes |
| --- | --- | --- |
| `from_xml` | ✅ | Spark 4.0+ |
| `schema_of_xml` | ✅ | Spark 4.0+ |
| `to_xml` | ✅ | Spark 4.0+ |
| `xpath` | ✅ | |
| `xpath_boolean` | ✅ | |
| `xpath_double` | ✅ | |
| `xpath_float` | ✅ | |
| `xpath_int` | ✅ | |
| `xpath_long` | ✅ | |
| `xpath_number` | ✅ | Alias of `xpath_double` |
| `xpath_short` | ✅ | |
| `xpath_string` | ✅ | |

---

## Beyond SQL functions

Comet also accelerates a number of Catalyst expressions that have no Spark SQL function name and therefore do not appear in the tables above. These arise from the DataFrame API, from SQL syntax other than function calls, or from the query optimizer. They include:
Expand Down
21 changes: 19 additions & 2 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.expressions.xml.{XPathBoolean, XPathDouble, XPathFloat, XPathInt, XPathList, XPathLong, XPathShort, XPathString}
import org.apache.spark.sql.comet.DecimalPrecision
import org.apache.spark.sql.execution.{ScalarSubquery, SparkPlan}
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
Expand Down Expand Up @@ -273,7 +274,22 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
classOf[Cast] -> CometCast)

private val jsonExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[LengthOfJsonArray] -> CometLengthOfJsonArray)
classOf[LengthOfJsonArray] -> CometLengthOfJsonArray,
classOf[SchemaOfJson] -> CometSchemaOfJson,
classOf[JsonObjectKeys] -> CometJsonObjectKeys)

private val csvExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] =
Map(classOf[CsvToStructs] -> CometCsvToStructs, classOf[SchemaOfCsv] -> CometSchemaOfCsv)

private val xpathExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
classOf[XPathBoolean] -> CometXPathBoolean,
classOf[XPathShort] -> CometXPathShort,
classOf[XPathInt] -> CometXPathInt,
classOf[XPathLong] -> CometXPathLong,
classOf[XPathFloat] -> CometXPathFloat,
classOf[XPathDouble] -> CometXPathDouble,
classOf[XPathString] -> CometXPathString,
classOf[XPathList] -> CometXPathList)

private[comet] val miscExpressions: Map[Class[_ <: Expression], CometExpressionSerde[_]] = Map(
// TODO PromotePrecision
Expand Down Expand Up @@ -301,7 +317,8 @@ object QueryPlanSerde extends Logging with CometExprShim with CometTypeShim {
mathExpressions ++ hashExpressions ++ stringExpressions ++
conditionalExpressions ++ mapExpressions ++ predicateExpressions ++
structExpressions ++ bitwiseExpressions ++ miscExpressions ++ arrayExpressions ++
temporalExpressions ++ conversionExpressions ++ urlExpressions ++ jsonExpressions
temporalExpressions ++ conversionExpressions ++ urlExpressions ++ jsonExpressions ++
csvExpressions ++ xpathExpressions

/**
* Mapping of Spark aggregate expression class to Comet expression handler.
Expand Down
26 changes: 26 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/csv.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{CsvToStructs, SchemaOfCsv}

object CometCsvToStructs extends CometCodegenDispatch[CsvToStructs]

object CometSchemaOfCsv extends CometCodegenDispatch[SchemaOfCsv]
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/comet/serde/json.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.{Attribute, LengthOfJsonArray}
import org.apache.spark.sql.catalyst.expressions.{Attribute, JsonObjectKeys, LengthOfJsonArray, SchemaOfJson}

import org.apache.comet.CometConf
import org.apache.comet.serde.ExprOuterClass.Expr
Expand All @@ -46,3 +46,7 @@ object CometLengthOfJsonArray extends CometCodegenDispatch[LengthOfJsonArray] {
super.convert(expr, inputs, binding)
}
}

object CometSchemaOfJson extends CometCodegenDispatch[SchemaOfJson]

object CometJsonObjectKeys extends CometCodegenDispatch[JsonObjectKeys]
38 changes: 38 additions & 0 deletions spark/src/main/scala/org/apache/comet/serde/xpath.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.serde

import org.apache.spark.sql.catalyst.expressions.xml.{XPathBoolean, XPathDouble, XPathFloat, XPathInt, XPathList, XPathLong, XPathShort, XPathString}

object CometXPathBoolean extends CometCodegenDispatch[XPathBoolean]

object CometXPathShort extends CometCodegenDispatch[XPathShort]

object CometXPathInt extends CometCodegenDispatch[XPathInt]

object CometXPathLong extends CometCodegenDispatch[XPathLong]

object CometXPathFloat extends CometCodegenDispatch[XPathFloat]

object CometXPathDouble extends CometCodegenDispatch[XPathDouble]

object CometXPathString extends CometCodegenDispatch[XPathString]

object CometXPathList extends CometCodegenDispatch[XPathList]
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
expr match {
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
// nodes still reach their existing handlers below.
case e if isStructuredTextDispatch(e) =>
convertStructuredText(e, inputs, binding)

case knc: KnownNotContainsNull =>
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
expr match {
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
// nodes still reach their existing handlers below.
case e if isStructuredTextDispatch(e) =>
convertStructuredText(e, inputs, binding)

case knc: KnownNotContainsNull =>
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ trait CometExprShim extends CommonStringExprs with CometExprShim4x {
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
expr match {
// RuntimeReplaceable structured-text functions (schema_of_csv/json, json_object_keys,
// xpath_*, schema_of_xml) and from_xml/to_xml route through the codegen dispatcher; see
// CometExprShim4x.convertStructuredText. Guarded so non-structured-text Invoke/StaticInvoke
// nodes still reach their existing handlers below.
case e if isStructuredTextDispatch(e) =>
convertStructuredText(e, inputs, binding)

case knc: KnownNotContainsNull =>
// On Spark 4.0, array_compact rewrites to KnownNotContainsNull(ArrayFilter(IsNotNull)).
// Strip the wrapper and serialize the inner ArrayFilter as spark_array_compact.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.apache.comet.shims

import org.apache.spark.sql.catalyst.expressions.{Attribute, DayName, Expression, MonthName}
import org.apache.spark.sql.catalyst.expressions.{Attribute, DayName, Expression, Literal, MonthName, StructsToXml, XmlToStructs}
import org.apache.spark.sql.catalyst.expressions.csv.SchemaOfCsvEvaluator
import org.apache.spark.sql.catalyst.expressions.json.{JsonExpressionUtils, SchemaOfJsonEvaluator}
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, StaticInvoke}
import org.apache.spark.sql.catalyst.expressions.xml.{XmlExpressionEvalUtils, XPathEvaluator}

import org.apache.comet.serde.CometScalaUDF
import org.apache.comet.serde.ExprOuterClass.Expr
import org.apache.comet.serde.QueryPlanSerde.{exprToProtoInternal, optExprWithFallbackReason, scalarFunctionExprToProtoWithReturnType}

Expand Down Expand Up @@ -56,4 +61,35 @@ trait CometExprShim4x {
optExprWithFallbackReason(nameExpr, m, m.child)
case _ => None
}

// Spark 4.x lowers the RuntimeReplaceable structured-text functions to an evaluator-backed
// `Invoke` (`schema_of_csv`, `schema_of_json`, `xpath_*`) or `StaticInvoke`
// (`json_object_keys`, `schema_of_xml`) before Comet sees the plan, so the original expression
// class never reaches the serde map. `from_xml` / `to_xml` stay as plain expressions. None of
// these have a native (rust) implementation, so they route through the codegen dispatcher.
//
// [[isStructuredTextDispatch]] is a cheap structural predicate used as the shim match guard so
// the dispatch (which binds, runs `canHandle`, and closure-serializes) only happens once, in
// [[convertStructuredText]]. The single `XPathEvaluator` test covers all eight `xpath_*`
// functions, which lower to its subclasses.
protected def isStructuredTextDispatch(expr: Expression): Boolean = expr match {
case _: XmlToStructs | _: StructsToXml => true
case i: Invoke if i.functionName == "evaluate" =>
i.targetObject match {
case Literal(value, _) =>
value.isInstanceOf[SchemaOfCsvEvaluator] || value.isInstanceOf[SchemaOfJsonEvaluator] ||
value.isInstanceOf[XPathEvaluator]
case _ => false
}
case s: StaticInvoke =>
(s.staticObject == classOf[JsonExpressionUtils] && s.functionName == "jsonObjectKeys") ||
(s.staticObject == XmlExpressionEvalUtils.getClass && s.functionName == "schemaOfXml")
case _ => false
}

protected def convertStructuredText(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] =
CometScalaUDF.emitJvmCodegenDispatch(expr, inputs, binding)
}
34 changes: 34 additions & 0 deletions spark/src/test/resources/sql-tests/expressions/csv/csv.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- CSV structured-text functions (from_csv, schema_of_csv). These have no native (rust)
-- implementation; they extend Spark's CodegenFallback and stay native via the codegen
-- dispatcher.

statement
CREATE TABLE test_csv(s STRING) USING parquet

statement
INSERT INTO test_csv VALUES ('1,abc'), ('2,def'), (''), (NULL)

-- column argument
query
SELECT from_csv(s, 'a INT, b STRING') FROM test_csv

-- literal argument
query
SELECT schema_of_csv('1,abc')
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- With the codegen dispatcher disabled, from_csv has no native path and falls back to Spark.

-- Config: spark.comet.exec.scalaUDF.codegen.enabled=false

statement
CREATE TABLE test_csv_fallback(s STRING) USING parquet

statement
INSERT INTO test_csv_fallback VALUES ('1,abc'), ('2,def'), (''), (NULL)

query expect_fallback(spark.comet.exec.scalaUDF.codegen.enabled)
SELECT from_csv(s, 'a INT, b STRING') FROM test_csv_fallback
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- json_object_keys has no native (rust) implementation; it extends Spark's CodegenFallback and
-- stays native via the codegen dispatcher.

statement
CREATE TABLE test_json_object_keys(s STRING) USING parquet

statement
INSERT INTO test_json_object_keys VALUES
('{"a":1,"b":2}'), ('{"x":true}'), ('{}'), (NULL)

query
SELECT json_object_keys(s) FROM test_json_object_keys
Loading
Loading