diff --git a/docs/content/spark/quick-start.md b/docs/content/spark/quick-start.md
index 7a4341a4a0e3..79268feab191 100644
--- a/docs/content/spark/quick-start.md
+++ b/docs/content/spark/quick-start.md
@@ -358,6 +358,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`.
VarBinaryType, BinaryType |
true |
+
+ VariantType(Spark4.0+) |
+ VariantType |
+ true |
+
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
index dd4b48a17e1a..cd7884bc64fc 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java
@@ -211,7 +211,8 @@ public BaseVariantReader reader() {
}
public static RowType variantShreddingSchema(DataType dataType) {
- return variantShreddingSchema(dataType, true, false);
+ return VariantMetadataUtils.addVariantMetadata(
+ variantShreddingSchema(dataType, true, false));
}
/**
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
index 5bc19972f90b..ad44bddeb670 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java
@@ -37,6 +37,8 @@
*/
public interface Variant {
+ byte VARIANT_SPEC_VERSION = (byte) 1;
+
String METADATA = "metadata";
String VALUE = "value";
diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
index 735ea08af3f4..a0a6208035b0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java
@@ -67,6 +67,15 @@ public static boolean isVariantRowType(DataType dataType) {
return true;
}
+ /** Add metadata to the top-level fields to mark it s a shredding schema for writers. */
+ public static RowType addVariantMetadata(RowType rowType) {
+ List fields = new ArrayList<>();
+ for (DataField f : rowType.getFields()) {
+ fields.add((f.newDescription(METADATA_KEY)));
+ }
+ return rowType.copy(fields);
+ }
+
/** Extract the path from variant metadata description. */
public static String path(String description) {
return splitDescription(description)[0];
diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml
index 4f7f11965abd..75a49da92493 100644
--- a/paimon-format/pom.xml
+++ b/paimon-format/pom.xml
@@ -351,7 +351,6 @@ under the License.
org.apache.parquet:parquet-format-structures
org.apache.parquet:parquet-jackson
commons-pool:commons-pool
- commons-pool:commons-pool
org.locationtech.jts:jts-core
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 19d1beb678fe..bfb6297fae3f 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.data.variant.VariantMetadataUtils;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
@@ -213,20 +214,30 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
.withId(fieldId);
case ROW:
RowType rowType = (RowType) type;
- return new GroupType(repetition, name, convertToParquetTypes(rowType))
+ Types.GroupBuilder groupTypeBuilder = Types.buildGroup(repetition);
+ if (VariantMetadataUtils.isVariantRowType(rowType)) {
+ groupTypeBuilder.as(
+ LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
+ }
+ return groupTypeBuilder
+ .addFields(convertToParquetTypes(rowType))
+ .named(name)
.withId(fieldId);
case VARIANT:
return Types.buildGroup(repetition)
+ .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
- .named(Variant.VALUE))
+ .named(Variant.VALUE)
+ .withId(0))
.addField(
Types.primitive(
PrimitiveType.PrimitiveTypeName.BINARY,
Type.Repetition.REQUIRED)
- .named(Variant.METADATA))
+ .named(Variant.METADATA)
+ .withId(1))
.named(name)
.withId(fieldId);
default:
diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
index b523c3754884..7fec5aee9fc4 100644
--- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
+++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java
@@ -49,8 +49,8 @@ public static RowType variantFileType(Type fileType) {
return (RowType) ParquetSchemaConverter.convertToPaimonField(fileType).type();
} else {
List dataFields = new ArrayList<>();
- dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
- dataFields.add(new DataField(1, METADATA, DataTypes.BYTES()));
+ dataFields.add(new DataField(0, VALUE, DataTypes.BYTES().notNull()));
+ dataFields.add(new DataField(1, METADATA, DataTypes.BYTES().notNull()));
return new RowType(dataFields);
}
}
diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
index f07c70784aec..52a1618c02ce 100644
--- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
+++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java
@@ -45,10 +45,14 @@
import org.apache.paimon.types.RowType;
import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Type;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.util.ArrayList;
@@ -545,9 +549,43 @@ protected void verifyShreddingSchema(RowType... expectShreddedTypes) throws IOEx
fileIO, file, fileIO.getFileSize(file), new Options())) {
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
for (int i = 0; i < expectShreddedTypes.length; i++) {
- assertThat(VariantUtils.variantFileType(schema.getType(i)))
+ assertThat(
+ VariantMetadataUtils.addVariantMetadata(
+ VariantUtils.variantFileType(schema.getType(i))))
.isEqualTo(variantShreddingSchema(expectShreddedTypes[i]));
}
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testVariantTypeAnnotation(boolean inferShredding) throws Exception {
+ Options options = new Options();
+ options.set(
+ CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(), String.valueOf(inferShredding));
+ ParquetFileFormat format = createFormat(options);
+ RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT()));
+
+ FormatWriterFactory factory = format.createWriterFactory(writeType);
+ writeRows(
+ factory,
+ GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")),
+ GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}")));
+
+ // Verify that the Parquet schema contains LogicalTypeAnnotation.variantType
+ try (ParquetFileReader reader =
+ ParquetUtil.getParquetReader(
+ fileIO, file, fileIO.getFileSize(file), new Options())) {
+ MessageType schema = reader.getFooter().getFileMetaData().getSchema();
+ Type variantField = schema.getType(0);
+
+ // The variant field should be a group type
+ assertThat(variantField.isPrimitive()).isFalse();
+ LogicalTypeAnnotation logicalType = variantField.getLogicalTypeAnnotation();
+
+ // The variant type should have variant annotation
+ assertThat(logicalType)
+ .isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class);
+ }
+ }
}