From 6f871d77dbdfbdd304ad2b17309044019d87b2fa Mon Sep 17 00:00:00 2001 From: forwardxu Date: Sat, 28 Mar 2026 21:44:09 +0800 Subject: [PATCH] [lake/lance] Add Map type support for Lance --- .../lance/tiering/ShadedArrowBatchWriter.java | 8 + .../lake/lance/utils/ArrowDataConverter.java | 22 +++ .../lake/lance/utils/LanceArrowUtils.java | 18 ++ .../testutils/FlinkLanceTieringTestBase.java | 17 ++ .../lance/tiering/LanceTieringITCase.java | 66 ++++++++ .../lake/lance/tiering/LanceTieringTest.java | 158 ++++++++++++++++++ .../lake/lance/utils/LanceArrowUtilsTest.java | 109 ++++++++++++ 7 files changed, 398 insertions(+) diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java index 5ab4ca5470..9a2c936040 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/ShadedArrowBatchWriter.java @@ -25,6 +25,7 @@ import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector; +import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.ArrowUtils; @@ -108,6 +109,13 @@ private void initFieldVector(FieldVector fieldVector) { if (dataVector != null) { initFieldVector(dataVector); } + } else if (fieldVector instanceof MapVector) { + MapVector mapVector = (MapVector) fieldVector; + mapVector.allocateNew(); + FieldVector dataVector = mapVector.getDataVector(); + if (dataVector != null) { + initFieldVector(dataVector); + } } else if (fieldVector instanceof StructVector) { StructVector structVector = (StructVector) fieldVector; structVector.allocateNew(); diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java index 2c1e7dcd65..d9104e0978 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/ArrowDataConverter.java @@ -77,6 +77,28 @@ private static void copyVectorData( org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedVector, FieldVector nonShadedVector) { + // Check MapVector before ListVector since MapVector extends ListVector. + // In Lance, Map is represented as List>, so the non-shaded + // side is a ListVector. + if (shadedVector + instanceof + org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector) { + if (!(nonShadedVector instanceof ListVector)) { + throw new IllegalArgumentException( + String.format( + "Shaded vector is MapVector but non-shaded vector is %s, expected ListVector.", + nonShadedVector.getClass().getSimpleName())); + } + // MapVector extends ListVector, so we can reuse the ListVector copy logic. + // The memory layout is compatible: MapVector -> StructVector(entries) -> [key, value] + // maps to ListVector -> StructVector(element) -> [key, value] + copyListVectorData( + (org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector) + shadedVector, + (ListVector) nonShadedVector); + return; + } + if (shadedVector instanceof org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) { diff --git a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java index e0d8a0b3ab..ee02e80ebc 100644 --- a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java +++ b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/utils/LanceArrowUtils.java @@ -31,6 +31,7 @@ import org.apache.fluss.types.FloatType; import org.apache.fluss.types.IntType; import org.apache.fluss.types.LocalZonedTimestampType; +import org.apache.fluss.types.MapType; import org.apache.fluss.types.RowType; import org.apache.fluss.types.SmallIntType; import org.apache.fluss.types.StringType; @@ -133,6 +134,19 @@ private static Field toArrowField( for (DataField field : rowType.getFields()) { children.add(toArrowField(field.getName(), field.getType(), tableProperties)); } + } else if (logicalType instanceof MapType) { + // Lance 0.33.0 does not support Arrow Map type natively. + // We convert MapType to List> which is the equivalent + // representation that Lance supports. + MapType mapType = (MapType) logicalType; + Field keyField = toArrowField("key", mapType.getKeyType(), tableProperties); + Field valueField = toArrowField("value", mapType.getValueType(), tableProperties); + FieldType structFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null); + List structChildren = new ArrayList<>(); + structChildren.add(keyField); + structChildren.add(valueField); + Field structField = new Field("element", structFieldType, structChildren); + children = Collections.singletonList(structField); } return new Field(fieldName, fieldType, children); } @@ -202,6 +216,10 @@ private static ArrowType toArrowType(DataType dataType) { return ArrowType.List.INSTANCE; } else if (dataType instanceof RowType) { return ArrowType.Struct.INSTANCE; + } else if (dataType instanceof MapType) { + // Lance 0.33.0 does not support Arrow Map type natively. + // We use List type as the Arrow representation for Map. + return ArrowType.List.INSTANCE; } else { throw new UnsupportedOperationException( String.format( diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java index 08c3ef2235..38bc34a3aa 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/testutils/FlinkLanceTieringTestBase.java @@ -265,6 +265,23 @@ protected long createLogTableWithArrayOfRowType(TablePath tablePath) throws Exce return createTable(tablePath, tableBuilder.build()); } + protected long createLogTableWithMapType(TablePath tablePath) throws Exception { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + + TableDescriptor.Builder tableBuilder = + TableDescriptor.builder() + .distributedBy(1, "id") + .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true") + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)); + + tableBuilder.schema(schemaBuilder.build()); + return createTable(tablePath, tableBuilder.build()); + } + protected void writeRows(TablePath tablePath, List rows, boolean append) throws Exception { try (Table table = conn.getTable(tablePath)) { diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java index 29aaef965a..501f7090d4 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericMap; import org.apache.fluss.row.GenericRow; import org.apache.fluss.server.zk.data.lake.LakeTable; @@ -414,4 +415,69 @@ private String buildExpectedTsvForArrayOfRowTable(int rowCount) { } return sb.toString(); } + + @Test + void testTieringWithMapType() throws Exception { + // Test: Log table with Map type + TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithMap"); + long t1Id = createLogTableWithMapType(t1); + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + // Create map data + for (int i = 0; i < 10; i++) { + java.util.LinkedHashMap map1 = new java.util.LinkedHashMap<>(); + map1.put(BinaryString.fromString("age"), 25); + map1.put(BinaryString.fromString("score"), 100); + + java.util.LinkedHashMap map2 = new java.util.LinkedHashMap<>(); + map2.put(BinaryString.fromString("age"), 30); + map2.put(BinaryString.fromString("score"), 95); + + java.util.LinkedHashMap map3 = new java.util.LinkedHashMap<>(); + map3.put(BinaryString.fromString("age"), 35); + map3.put(BinaryString.fromString("score"), 88); + + writeRows( + t1, + Arrays.asList( + row(1, "Alice", new GenericMap(map1)), + row(2, "Bob", new GenericMap(map2)), + row(3, "Charlie", new GenericMap(map3))), + true); + } + + // then start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + // check the status of replica after synced + assertReplicaStatus(t1Bucket, 30); + + LanceConfig config1 = + LanceConfig.from( + lanceConf.toMap(), + Collections.emptyMap(), + t1.getDatabaseName(), + t1.getTableName()); + + // check data in lance using TSV string comparison + String expectedTsv1 = buildExpectedTsvForMapTable(30); + checkDataInLance(config1, expectedTsv1); + checkSnapshotPropertyInLance(config1, Collections.singletonMap(t1Bucket, 30L)); + + jobClient.cancel().get(); + } + + private String buildExpectedTsvForMapTable(int rowCount) { + StringBuilder sb = new StringBuilder(); + sb.append("id\tname\tattributes\n"); + for (int i = 0; i < rowCount / 3; i++) { + sb.append( + "1\tAlice\t[{\"key\":\"score\",\"value\":100},{\"key\":\"age\",\"value\":25}]\n"); + sb.append( + "2\tBob\t[{\"key\":\"score\",\"value\":95},{\"key\":\"age\",\"value\":30}]\n"); + sb.append( + "3\tCharlie\t[{\"key\":\"score\",\"value\":88},{\"key\":\"age\",\"value\":35}]\n"); + } + return sb.toString(); + } } diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java index 64dae36962..7919c9b183 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/tiering/LanceTieringTest.java @@ -39,6 +39,7 @@ import org.apache.fluss.record.LogRecord; import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericMap; import org.apache.fluss.row.GenericRow; import org.apache.fluss.types.DataTypes; import org.apache.fluss.utils.types.Tuple2; @@ -520,4 +521,161 @@ private void verifyNestedRowRecords(VectorSchemaRoot root, List expec assertThat(root.getVector(2)).isNotNull(); } } + + @ParameterizedTest + @MethodSource("tieringWriteArgs") + void testTieringWriteTableWithMapType(boolean isPartitioned) throws Exception { + int bucketNum = 3; + TablePath tablePath = TablePath.of("lance", "mapTable"); + Map customProperties = new HashMap<>(); + customProperties.put("lance.batch_size", "256"); + LanceConfig config = + LanceConfig.from( + configuration.toMap(), + customProperties, + tablePath.getDatabaseName(), + tablePath.getTableName()); + Schema schema = createMapTable(config); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema(schema) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .customProperties(customProperties) + .build(); + TableInfo tableInfo = + TableInfo.of(tablePath, 0, 1, descriptor, DEFAULT_REMOTE_DATA_DIR, 1L, 1L); + + List lanceWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + lanceLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + lanceLakeTieringFactory.getCommittableSerializer(); + + Map, List> recordsByBucket = new HashMap<>(); + Map partitionIdAndName = + isPartitioned + ? new HashMap() { + { + put(1L, "p1"); + put(2L, "p2"); + put(3L, "p3"); + } + } + : Collections.singletonMap(null, null); + + // First, write data with map types + for (int bucket = 0; bucket < bucketNum; bucket++) { + for (Map.Entry entry : partitionIdAndName.entrySet()) { + String partition = entry.getValue(); + try (LakeWriter lakeWriter = + createLakeWriter(tablePath, bucket, partition, tableInfo)) { + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + Tuple2, List> writeAndExpectRecords = + genMapLogRecords(bucket, 10); + List writtenRecords = writeAndExpectRecords.f0; + List expectRecords = writeAndExpectRecords.f1; + recordsByBucket.put(partitionBucket, expectRecords); + for (LogRecord logRecord : writtenRecords) { + lakeWriter.write(logRecord); + } + // serialize/deserialize writeResult + LanceWriteResult lanceWriteResult = lakeWriter.complete(); + byte[] serialized = writeResultSerializer.serialize(lanceWriteResult); + lanceWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + } + + // Second, commit data + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo)) { + // serialize/deserialize committable + LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults); + byte[] serialized = committableSerializer.serialize(lanceCommittable); + lanceCommittable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + Map snapshotProperties = + Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets"); + LakeCommitResult commitResult = + lakeCommitter.commit(lanceCommittable, snapshotProperties); + // lance dataset version starts from 1 + assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2); + } + + try (Dataset dataset = + Dataset.open( + new RootAllocator(), + config.getDatasetUri(), + LanceConfig.genReadOptionFromConfig(config))) { + ArrowReader reader = dataset.newScan().scanBatches(); + VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot(); + + // Verify data can be read back + for (int bucket = 0; bucket < 3; bucket++) { + for (String partition : partitionIdAndName.values()) { + reader.loadNextBatch(); + Tuple2 partitionBucket = Tuple2.of(partition, bucket); + List expectRecords = recordsByBucket.get(partitionBucket); + verifyMapRecords(readerRoot, expectRecords); + } + } + assertThat(reader.loadNextBatch()).isFalse(); + } + } + + private Schema createMapTable(LanceConfig config) { + Schema.Builder schemaBuilder = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column("name", DataTypes.STRING()) + .column("attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())); + Schema schema = schemaBuilder.build(); + WriteParams params = LanceConfig.genWriteParamsFromConfig(config); + LanceDatasetAdapter.createDataset( + config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params); + + return schema; + } + + private Tuple2, List> genMapLogRecords(int bucket, int numRecords) { + List logRecords = new ArrayList<>(); + for (int i = 0; i < numRecords; i++) { + GenericRow genericRow = new GenericRow(3); + genericRow.setField(0, i); + genericRow.setField(1, BinaryString.fromString("user" + bucket + "_" + i)); + + // Create map data + java.util.LinkedHashMap mapData = + new java.util.LinkedHashMap<>(); + mapData.put(BinaryString.fromString("age"), 20 + bucket); + mapData.put(BinaryString.fromString("score"), 100 + i); + genericRow.setField(2, new GenericMap(mapData)); + + LogRecord logRecord = + new GenericRecord( + i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow); + logRecords.add(logRecord); + } + return Tuple2.of(logRecords, logRecords); + } + + private void verifyMapRecords(VectorSchemaRoot root, List expectRecords) { + assertThat(root.getRowCount()).isEqualTo(expectRecords.size()); + for (int i = 0; i < expectRecords.size(); i++) { + LogRecord expectRecord = expectRecords.get(i); + // check id column + assertThat((int) (root.getVector(0).getObject(i))) + .isEqualTo(expectRecord.getRow().getInt(0)); + // check name column + assertThat(((VarCharVector) root.getVector(1)).getObject(i).toString()) + .isEqualTo(expectRecord.getRow().getString(1).toString()); + // For map, verify that the map vector is not null and has correct structure + assertThat(root.getVector(2)).isNotNull(); + } + } } diff --git a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java index ef224ca7ab..d96df488e7 100644 --- a/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java +++ b/fluss-lake/fluss-lake-lance/src/test/java/org/apache/fluss/lake/lance/utils/LanceArrowUtilsTest.java @@ -298,4 +298,113 @@ void testToArrowSchemaWithRowContainingArray() { assertThat(tagsChildren.get(0).getName()).isEqualTo("element"); assertThat(tagsChildren.get(0).getType()).isEqualTo(ArrowType.Utf8.INSTANCE); } + + @Test + void testToArrowSchemaWithMapType() { + // Create a RowType with a Map field + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD( + "attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))); + + Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType); + + // Verify the schema has 2 fields + assertThat(arrowSchema.getFields()).hasSize(2); + + // Verify the id field + Field idField = arrowSchema.getFields().get(0); + assertThat(idField.getName()).isEqualTo("id"); + assertThat(idField.getType()).isInstanceOf(ArrowType.Int.class); + + // Lance does not support Arrow Map type, Map is converted to List> + Field mapField = arrowSchema.getFields().get(1); + assertThat(mapField.getName()).isEqualTo("attributes"); + assertThat(mapField.getType()).isInstanceOf(ArrowType.List.class); + + // Verify the element struct children + List mapChildren = mapField.getChildren(); + assertThat(mapChildren).hasSize(1); + + Field elementField = mapChildren.get(0); + assertThat(elementField.getName()).isEqualTo("element"); + assertThat(elementField.getType()).isEqualTo(ArrowType.Struct.INSTANCE); + + // Verify the key and value fields + List entryChildren = elementField.getChildren(); + assertThat(entryChildren).hasSize(2); + + Field keyField = entryChildren.get(0); + assertThat(keyField.getName()).isEqualTo("key"); + assertThat(keyField.getType()).isEqualTo(ArrowType.Utf8.INSTANCE); + + Field valueField = entryChildren.get(1); + assertThat(valueField.getName()).isEqualTo("value"); + assertThat(valueField.getType()).isInstanceOf(ArrowType.Int.class); + } + + @Test + void testToArrowSchemaWithMapOfComplexValueType() { + // Create a Map field with Row as the value type + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD( + "contacts", + DataTypes.MAP( + DataTypes.STRING(), + DataTypes.ROW( + DataTypes.FIELD("phone", DataTypes.STRING()), + DataTypes.FIELD("email", DataTypes.STRING()))))); + + Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType); + + // In Lance, Map is represented as List> + Field mapField = arrowSchema.getFields().get(1); + assertThat(mapField.getName()).isEqualTo("contacts"); + assertThat(mapField.getType()).isInstanceOf(ArrowType.List.class); + + // Verify the element struct + Field elementField = mapField.getChildren().get(0); + assertThat(elementField.getName()).isEqualTo("element"); + assertThat(elementField.getChildren()).hasSize(2); + + // Verify the value is a struct type + Field valueField = elementField.getChildren().get(1); + assertThat(valueField.getName()).isEqualTo("value"); + assertThat(valueField.getType()).isEqualTo(ArrowType.Struct.INSTANCE); + assertThat(valueField.getChildren()).hasSize(2); + } + + @Test + void testToArrowSchemaWithRowContainingMap() { + // Create a RowType with a nested Row containing a Map + RowType rowType = + DataTypes.ROW( + DataTypes.FIELD("id", DataTypes.INT()), + DataTypes.FIELD( + "data", + DataTypes.ROW( + DataTypes.FIELD( + "props", + DataTypes.MAP( + DataTypes.STRING(), DataTypes.STRING())), + DataTypes.FIELD("count", DataTypes.INT())))); + + Schema arrowSchema = LanceArrowUtils.toArrowSchema(rowType); + + // Verify the data struct field + Field dataField = arrowSchema.getFields().get(1); + assertThat(dataField.getName()).isEqualTo("data"); + assertThat(dataField.getType()).isEqualTo(ArrowType.Struct.INSTANCE); + + // Verify the props map field within the struct (represented as List in Lance) + List dataChildren = dataField.getChildren(); + assertThat(dataChildren).hasSize(2); + + Field propsField = dataChildren.get(0); + assertThat(propsField.getName()).isEqualTo("props"); + assertThat(propsField.getType()).isInstanceOf(ArrowType.List.class); + } }