From 8e7488d20fef3479a46a72033a3a6020580bd13f Mon Sep 17 00:00:00 2001 From: zouxxyy Date: Thu, 22 Jan 2026 22:41:03 +0800 Subject: [PATCH] [variant] Annotate Variant columns with Variant logical type annotation --- docs/content/spark/quick-start.md | 5 +++ .../data/variant/PaimonShreddingUtils.java | 3 +- .../apache/paimon/data/variant/Variant.java | 2 + .../data/variant/VariantMetadataUtils.java | 9 +++++ paimon-format/pom.xml | 1 - .../parquet/ParquetSchemaConverter.java | 17 ++++++-- .../paimon/format/parquet/VariantUtils.java | 4 +- .../InferVariantShreddingWriteTest.java | 40 ++++++++++++++++++- 8 files changed, 73 insertions(+), 8 deletions(-) diff --git a/docs/content/spark/quick-start.md b/docs/content/spark/quick-start.md index 7a4341a4a0e3..79268feab191 100644 --- a/docs/content/spark/quick-start.md +++ b/docs/content/spark/quick-start.md @@ -358,6 +358,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`. VarBinaryType, BinaryType true + + VariantType(Spark4.0+) + VariantType + true + diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java index dd4b48a17e1a..cd7884bc64fc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java @@ -211,7 +211,8 @@ public BaseVariantReader reader() { } public static RowType variantShreddingSchema(DataType dataType) { - return variantShreddingSchema(dataType, true, false); + return VariantMetadataUtils.addVariantMetadata( + variantShreddingSchema(dataType, true, false)); } /** diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java index 5bc19972f90b..ad44bddeb670 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java @@ -37,6 +37,8 @@ */ public interface Variant { + byte VARIANT_SPEC_VERSION = (byte) 1; + String METADATA = "metadata"; String VALUE = "value"; diff --git a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java index 735ea08af3f4..a0a6208035b0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java @@ -67,6 +67,15 @@ public static boolean isVariantRowType(DataType dataType) { return true; } + /** Add metadata to the top-level fields to mark it s a shredding schema for writers. */ + public static RowType addVariantMetadata(RowType rowType) { + List fields = new ArrayList<>(); + for (DataField f : rowType.getFields()) { + fields.add((f.newDescription(METADATA_KEY))); + } + return rowType.copy(fields); + } + /** Extract the path from variant metadata description. */ public static String path(String description) { return splitDescription(description)[0]; diff --git a/paimon-format/pom.xml b/paimon-format/pom.xml index 4f7f11965abd..75a49da92493 100644 --- a/paimon-format/pom.xml +++ b/paimon-format/pom.xml @@ -351,7 +351,6 @@ under the License. org.apache.parquet:parquet-format-structures org.apache.parquet:parquet-jackson commons-pool:commons-pool - commons-pool:commons-pool org.locationtech.jts:jts-core diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index 19d1beb678fe..bfb6297fae3f 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -19,6 +19,7 @@ package org.apache.paimon.format.parquet; import org.apache.paimon.data.variant.Variant; +import org.apache.paimon.data.variant.VariantMetadataUtils; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; @@ -213,20 +214,30 @@ public static Type convertToParquetType(String name, DataType type, int fieldId, .withId(fieldId); case ROW: RowType rowType = (RowType) type; - return new GroupType(repetition, name, convertToParquetTypes(rowType)) + Types.GroupBuilder groupTypeBuilder = Types.buildGroup(repetition); + if (VariantMetadataUtils.isVariantRowType(rowType)) { + groupTypeBuilder.as( + LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)); + } + return groupTypeBuilder + .addFields(convertToParquetTypes(rowType)) + .named(name) .withId(fieldId); case VARIANT: return Types.buildGroup(repetition) + .as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION)) .addField( Types.primitive( PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) - .named(Variant.VALUE)) + .named(Variant.VALUE) + .withId(0)) .addField( Types.primitive( PrimitiveType.PrimitiveTypeName.BINARY, Type.Repetition.REQUIRED) - .named(Variant.METADATA)) + .named(Variant.METADATA) + .withId(1)) .named(name) .withId(fieldId); default: diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java index b523c3754884..7fec5aee9fc4 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java @@ -49,8 +49,8 @@ public static RowType variantFileType(Type fileType) { return (RowType) ParquetSchemaConverter.convertToPaimonField(fileType).type(); } else { List dataFields = new ArrayList<>(); - dataFields.add(new DataField(0, VALUE, DataTypes.BYTES())); - dataFields.add(new DataField(1, METADATA, DataTypes.BYTES())); + dataFields.add(new DataField(0, VALUE, DataTypes.BYTES().notNull())); + dataFields.add(new DataField(1, METADATA, DataTypes.BYTES().notNull())); return new RowType(dataFields); } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java index f07c70784aec..52a1618c02ce 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java @@ -45,10 +45,14 @@ import org.apache.paimon.types.RowType; import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; @@ -545,9 +549,43 @@ protected void verifyShreddingSchema(RowType... expectShreddedTypes) throws IOEx fileIO, file, fileIO.getFileSize(file), new Options())) { MessageType schema = reader.getFooter().getFileMetaData().getSchema(); for (int i = 0; i < expectShreddedTypes.length; i++) { - assertThat(VariantUtils.variantFileType(schema.getType(i))) + assertThat( + VariantMetadataUtils.addVariantMetadata( + VariantUtils.variantFileType(schema.getType(i)))) .isEqualTo(variantShreddingSchema(expectShreddedTypes[i])); } } } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testVariantTypeAnnotation(boolean inferShredding) throws Exception { + Options options = new Options(); + options.set( + CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(), String.valueOf(inferShredding)); + ParquetFileFormat format = createFormat(options); + RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT())); + + FormatWriterFactory factory = format.createWriterFactory(writeType); + writeRows( + factory, + GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")), + GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}"))); + + // Verify that the Parquet schema contains LogicalTypeAnnotation.variantType + try (ParquetFileReader reader = + ParquetUtil.getParquetReader( + fileIO, file, fileIO.getFileSize(file), new Options())) { + MessageType schema = reader.getFooter().getFileMetaData().getSchema(); + Type variantField = schema.getType(0); + + // The variant field should be a group type + assertThat(variantField.isPrimitive()).isFalse(); + LogicalTypeAnnotation logicalType = variantField.getLogicalTypeAnnotation(); + + // The variant type should have variant annotation + assertThat(logicalType) + .isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class); + } + } }