Skip to content

Commit 623d30d

Browse files
authored
[variant] Annotate Variant columns with Variant logical type annotation (#7110)
1 parent 8e5378a commit 623d30d

8 files changed

Lines changed: 73 additions & 8 deletions

File tree

docs/content/spark/quick-start.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,11 @@ All Spark's data types are available in package `org.apache.spark.sql.types`.
358358
<td><code>VarBinaryType</code>, <code>BinaryType</code></td>
359359
<td>true</td>
360360
</tr>
361+
<tr>
362+
<td><code>VariantType(Spark4.0+)</code></td>
363+
<td><code>VariantType</code></td>
364+
<td>true</td>
365+
</tr>
361366
</tbody>
362367
</table>
363368

paimon-common/src/main/java/org/apache/paimon/data/variant/PaimonShreddingUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ public BaseVariantReader reader() {
211211
}
212212

213213
public static RowType variantShreddingSchema(DataType dataType) {
214-
return variantShreddingSchema(dataType, true, false);
214+
return VariantMetadataUtils.addVariantMetadata(
215+
variantShreddingSchema(dataType, true, false));
215216
}
216217

217218
/**

paimon-common/src/main/java/org/apache/paimon/data/variant/Variant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
*/
3838
public interface Variant {
3939

40+
byte VARIANT_SPEC_VERSION = (byte) 1;
41+
4042
String METADATA = "metadata";
4143

4244
String VALUE = "value";

paimon-common/src/main/java/org/apache/paimon/data/variant/VariantMetadataUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,15 @@ public static boolean isVariantRowType(DataType dataType) {
6767
return true;
6868
}
6969

70+
/** Add metadata to the top-level fields to mark it s a shredding schema for writers. */
71+
public static RowType addVariantMetadata(RowType rowType) {
72+
List<DataField> fields = new ArrayList<>();
73+
for (DataField f : rowType.getFields()) {
74+
fields.add((f.newDescription(METADATA_KEY)));
75+
}
76+
return rowType.copy(fields);
77+
}
78+
7079
/** Extract the path from variant metadata description. */
7180
public static String path(String description) {
7281
return splitDescription(description)[0];

paimon-format/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,6 @@ under the License.
351351
<include>org.apache.parquet:parquet-format-structures</include>
352352
<include>org.apache.parquet:parquet-jackson</include>
353353
<include>commons-pool:commons-pool</include>
354-
<include>commons-pool:commons-pool</include>
355354
<include>org.locationtech.jts:jts-core</include>
356355

357356
<!-- compress -->

paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.format.parquet;
2020

2121
import org.apache.paimon.data.variant.Variant;
22+
import org.apache.paimon.data.variant.VariantMetadataUtils;
2223
import org.apache.paimon.table.SpecialFields;
2324
import org.apache.paimon.types.ArrayType;
2425
import org.apache.paimon.types.DataField;
@@ -213,20 +214,30 @@ public static Type convertToParquetType(String name, DataType type, int fieldId,
213214
.withId(fieldId);
214215
case ROW:
215216
RowType rowType = (RowType) type;
216-
return new GroupType(repetition, name, convertToParquetTypes(rowType))
217+
Types.GroupBuilder<GroupType> groupTypeBuilder = Types.buildGroup(repetition);
218+
if (VariantMetadataUtils.isVariantRowType(rowType)) {
219+
groupTypeBuilder.as(
220+
LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION));
221+
}
222+
return groupTypeBuilder
223+
.addFields(convertToParquetTypes(rowType))
224+
.named(name)
217225
.withId(fieldId);
218226
case VARIANT:
219227
return Types.buildGroup(repetition)
228+
.as(LogicalTypeAnnotation.variantType(Variant.VARIANT_SPEC_VERSION))
220229
.addField(
221230
Types.primitive(
222231
PrimitiveType.PrimitiveTypeName.BINARY,
223232
Type.Repetition.REQUIRED)
224-
.named(Variant.VALUE))
233+
.named(Variant.VALUE)
234+
.withId(0))
225235
.addField(
226236
Types.primitive(
227237
PrimitiveType.PrimitiveTypeName.BINARY,
228238
Type.Repetition.REQUIRED)
229-
.named(Variant.METADATA))
239+
.named(Variant.METADATA)
240+
.withId(1))
230241
.named(name)
231242
.withId(fieldId);
232243
default:

paimon-format/src/main/java/org/apache/paimon/format/parquet/VariantUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public static RowType variantFileType(Type fileType) {
4949
return (RowType) ParquetSchemaConverter.convertToPaimonField(fileType).type();
5050
} else {
5151
List<DataField> dataFields = new ArrayList<>();
52-
dataFields.add(new DataField(0, VALUE, DataTypes.BYTES()));
53-
dataFields.add(new DataField(1, METADATA, DataTypes.BYTES()));
52+
dataFields.add(new DataField(0, VALUE, DataTypes.BYTES().notNull()));
53+
dataFields.add(new DataField(1, METADATA, DataTypes.BYTES().notNull()));
5454
return new RowType(dataFields);
5555
}
5656
}

paimon-format/src/test/java/org/apache/paimon/format/parquet/writer/InferVariantShreddingWriteTest.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,14 @@
4545
import org.apache.paimon.types.RowType;
4646

4747
import org.apache.parquet.hadoop.ParquetFileReader;
48+
import org.apache.parquet.schema.LogicalTypeAnnotation;
4849
import org.apache.parquet.schema.MessageType;
50+
import org.apache.parquet.schema.Type;
4951
import org.junit.jupiter.api.BeforeEach;
5052
import org.junit.jupiter.api.Test;
5153
import org.junit.jupiter.api.io.TempDir;
54+
import org.junit.jupiter.params.ParameterizedTest;
55+
import org.junit.jupiter.params.provider.ValueSource;
5256

5357
import java.io.IOException;
5458
import java.util.ArrayList;
@@ -545,9 +549,43 @@ protected void verifyShreddingSchema(RowType... expectShreddedTypes) throws IOEx
545549
fileIO, file, fileIO.getFileSize(file), new Options())) {
546550
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
547551
for (int i = 0; i < expectShreddedTypes.length; i++) {
548-
assertThat(VariantUtils.variantFileType(schema.getType(i)))
552+
assertThat(
553+
VariantMetadataUtils.addVariantMetadata(
554+
VariantUtils.variantFileType(schema.getType(i))))
549555
.isEqualTo(variantShreddingSchema(expectShreddedTypes[i]));
550556
}
551557
}
552558
}
559+
560+
@ParameterizedTest
561+
@ValueSource(booleans = {false, true})
562+
public void testVariantTypeAnnotation(boolean inferShredding) throws Exception {
563+
Options options = new Options();
564+
options.set(
565+
CoreOptions.VARIANT_INFER_SHREDDING_SCHEMA.key(), String.valueOf(inferShredding));
566+
ParquetFileFormat format = createFormat(options);
567+
RowType writeType = DataTypes.ROW(DataTypes.FIELD(0, "v", DataTypes.VARIANT()));
568+
569+
FormatWriterFactory factory = format.createWriterFactory(writeType);
570+
writeRows(
571+
factory,
572+
GenericRow.of(GenericVariant.fromJson("{\"name\":\"Alice\"}")),
573+
GenericRow.of(GenericVariant.fromJson("{\"name\":\"Bob\"}")));
574+
575+
// Verify that the Parquet schema contains LogicalTypeAnnotation.variantType
576+
try (ParquetFileReader reader =
577+
ParquetUtil.getParquetReader(
578+
fileIO, file, fileIO.getFileSize(file), new Options())) {
579+
MessageType schema = reader.getFooter().getFileMetaData().getSchema();
580+
Type variantField = schema.getType(0);
581+
582+
// The variant field should be a group type
583+
assertThat(variantField.isPrimitive()).isFalse();
584+
LogicalTypeAnnotation logicalType = variantField.getLogicalTypeAnnotation();
585+
586+
// The variant type should have variant annotation
587+
assertThat(logicalType)
588+
.isInstanceOf(LogicalTypeAnnotation.VariantLogicalTypeAnnotation.class);
589+
}
590+
}
553591
}

0 commit comments

Comments
 (0)