From d9800455cd316617fec81df36d508826f4c6b175 Mon Sep 17 00:00:00 2001 From: Felipe Pessoto Date: Mon, 11 May 2026 17:44:46 -0700 Subject: [PATCH 1/2] Add unserializable type detection in VeloxBackend --- .../gluten/substrait/rel/LocalFilesNode.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java index bc42dd302e5..83fe9c0bb30 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/rel/LocalFilesNode.java @@ -23,6 +23,9 @@ import io.substrait.proto.NamedStruct; import io.substrait.proto.ReadRel; import io.substrait.proto.Type; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -111,6 +114,18 @@ private NamedStruct buildNamedStruct() { Type.Struct.Builder structBuilder = Type.Struct.newBuilder(); for (StructField field : fileSchema.fields()) { + // A column that contains a VariantType cannot be encoded in a Substrait NamedStruct + // because ConverterUtils#getTypeNode has no representation for it. If such a column + // is actually projected from this scan, validation in VeloxBackend rejects the plan and + // the whole stage falls back to vanilla Spark; we never reach this code path. If the + // column is *not* projected, however, it still ends up here because the file schema + // contains every data column of the relation regardless of projection. Skipping the + // field is safe: the native reader looks up requested columns by name, so an entry + // that is not requested is unused, and an entry that is missing from the file schema + // simply means the native reader will not see/validate that column. + if (containsUnsupportedFileSchemaType(field.dataType())) { + continue; + } structBuilder.addTypes( ConverterUtils.getTypeNode(field.dataType(), field.nullable()).toProtobuf()); namedStructBuilder.addNames(ConverterUtils.normalizeColName(field.name())); @@ -120,6 +135,33 @@ private NamedStruct buildNamedStruct() { return namedStructBuilder.build(); } + // Returns true if `dt` is, or recursively contains, a Spark type that has no Substrait + // representation in ConverterUtils#getTypeNode. Today this is only Spark 4.0's VariantType, + // matched by name to remain source-compatible with shims that target older Spark versions. + // Package-private for unit tests. + static boolean containsUnsupportedFileSchemaType(DataType dt) { + if ("variant".equals(dt.typeName())) { + return true; + } + if (dt instanceof StructType) { + for (StructField f : ((StructType) dt).fields()) { + if (containsUnsupportedFileSchemaType(f.dataType())) { + return true; + } + } + return false; + } + if (dt instanceof ArrayType) { + return containsUnsupportedFileSchemaType(((ArrayType) dt).elementType()); + } + if (dt instanceof MapType) { + MapType m = (MapType) dt; + return containsUnsupportedFileSchemaType(m.keyType()) + || containsUnsupportedFileSchemaType(m.valueType()); + } + return false; + } + @Override public List preferredLocations() { return this.preferredLocations; From 0e8965dc756963cc8fb239785774ac30e62c9510 Mon Sep 17 00:00:00 2001 From: Felipe Pessoto Date: Mon, 11 May 2026 17:47:28 -0700 Subject: [PATCH 2/2] Add unit tests for LocalFilesNode unsupported types This suite tests the `LocalFilesNode.containsUnsupportedFileSchemaType` method for various data types, ensuring correct identification of unsupported types like 'variant'. It includes tests for primitive types, nested structures, arrays, and maps. --- .../rel/LocalFilesNodeUnsupportedTypeSuite | 174 ++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 gluten-substrait/src/test/scala/org/apache/gluten/substrait/rel/LocalFilesNodeUnsupportedTypeSuite diff --git a/gluten-substrait/src/test/scala/org/apache/gluten/substrait/rel/LocalFilesNodeUnsupportedTypeSuite b/gluten-substrait/src/test/scala/org/apache/gluten/substrait/rel/LocalFilesNodeUnsupportedTypeSuite new file mode 100644 index 00000000000..dbb32e41ba1 --- /dev/null +++ b/gluten-substrait/src/test/scala/org/apache/gluten/substrait/rel/LocalFilesNodeUnsupportedTypeSuite @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.rel + +import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat + +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ + +import io.substrait.proto.ReadRel +import org.scalatest.funsuite.AnyFunSuiteLike + +import java.util.{Collections => JCollections} + +/** + * Unit tests for `LocalFilesNode.containsUnsupportedFileSchemaType`. The method is package-private + * (visible-for-testing) so we can invoke it directly from this suite without spinning up a full + * SparkSession. + * + * VariantType only exists in Spark 4.0+, so the leaf case is exercised through a stub `DataType` + * whose `typeName` matches the production code's `typeName == "variant"` check. This keeps the + * suite source-compatible with shims that target older Spark versions. + */ +class LocalFilesNodeUnsupportedTypeSuite extends AnyFunSuiteLike { + + // Stub DataType that pretends to be Spark's VariantType for the typeName-based detection. + private object FakeVariantType extends DataType { + override def defaultSize: Int = 16 + override def asNullable: DataType = this + override def typeName: String = "variant" + } + + private def check(dt: DataType): Boolean = + LocalFilesNode.containsUnsupportedFileSchemaType(dt) + + test("returns true for a bare variant type") { + assert(check(FakeVariantType)) + } + + test("returns true for a struct that contains a variant field") { + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("data", FakeVariantType, nullable = true))) + assert(check(schema)) + } + + test("returns true for a deeply nested struct that contains a variant field") { + val schema = StructType( + Seq( + StructField( + "outer", + StructType(Seq(StructField("inner", FakeVariantType, nullable = true))), + nullable = true))) + assert(check(schema)) + } + + test("returns true for an array whose element contains variant") { + val arr = ArrayType(FakeVariantType, containsNull = true) + assert(check(arr)) + val arrOfStruct = ArrayType( + StructType(Seq(StructField("v", FakeVariantType, nullable = true))), + containsNull = true) + assert(check(arrOfStruct)) + } + + test("returns true for a map whose key or value contains variant") { + val mapValueVariant = MapType(StringType, FakeVariantType, valueContainsNull = true) + assert(check(mapValueVariant)) + val mapKeyVariant = MapType(FakeVariantType, IntegerType, valueContainsNull = false) + assert(check(mapKeyVariant)) + } + + test("returns false for primitive types and supported nested types without variant") { + Seq( + IntegerType, + LongType, + ShortType, + ByteType, + FloatType, + DoubleType, + BooleanType, + StringType, + BinaryType, + DateType, + TimestampType, + DecimalType(10, 2) + ).foreach(dt => assert(!check(dt), s"$dt should be supported")) + + val supportedNested = StructType( + Seq( + StructField("a", IntegerType), + StructField("b", ArrayType(StringType)), + StructField("c", MapType(StringType, StructType(Seq(StructField("x", DoubleType))))), + StructField("d", StructType(Seq(StructField("y", BinaryType)))) + )) + assert(!check(supportedNested)) + assert(!check(ArrayType(IntegerType, containsNull = true))) + assert(!check(MapType(StringType, LongType))) + } + + // Integration tests that exercise buildNamedStruct() indirectly via toProtobuf(), + // verifying names-types synchronisation when variant fields are skipped. + + private def createLocalFilesNode(): LocalFilesNode = { + new LocalFilesNode( + 0, + JCollections.singletonList("file:///fake/path"), + JCollections.singletonList(java.lang.Long.valueOf(0L)), + JCollections.singletonList(java.lang.Long.valueOf(1024L)), + JCollections.emptyList(), + JCollections.emptyList(), + JCollections.singletonList(JCollections.emptyMap()), + JCollections.singletonList(JCollections.emptyMap()), + ReadFileFormat.ParquetReadFormat, + JCollections.emptyList(), + JCollections.emptyMap(), + JCollections.singletonList(JCollections.emptyMap()) + ) + } + + test("buildNamedStruct omits variant fields and keeps names in sync with types") { + SQLConf.withExistingConf(new SQLConf()) { + val schema = StructType( + Seq( + StructField("id", IntegerType, nullable = false), + StructField("data", FakeVariantType, nullable = true), + StructField("ts", TimestampType, nullable = true))) + + val node = createLocalFilesNode() + node.setFileSchema(schema) + + val proto = node.toProtobuf().asInstanceOf[ReadRel.LocalFiles] + val ns = proto.getItems(0).getSchema + + assert(ns.getNamesCount === 2, "expected 2 names (variant field skipped)") + assert(ns.getStruct.getTypesCount === 2, "expected 2 types (in sync with names)") + assert(ns.getNames(0) === "id") + assert(ns.getNames(1) === "ts") + } + } + + test("buildNamedStruct produces empty struct when all fields are unsupported") { + SQLConf.withExistingConf(new SQLConf()) { + val schema = StructType( + Seq( + StructField("v1", FakeVariantType, nullable = true), + StructField("v2", FakeVariantType, nullable = true))) + + val node = createLocalFilesNode() + node.setFileSchema(schema) + + val proto = node.toProtobuf().asInstanceOf[ReadRel.LocalFiles] + val ns = proto.getItems(0).getSchema + + assert(ns.getNamesCount === 0) + assert(ns.getStruct.getTypesCount === 0) + } + } +}