Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.StructVector;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.ArrowUtils;
Expand Down Expand Up @@ -108,6 +109,13 @@ private void initFieldVector(FieldVector fieldVector) {
if (dataVector != null) {
initFieldVector(dataVector);
}
} else if (fieldVector instanceof MapVector) {
MapVector mapVector = (MapVector) fieldVector;
mapVector.allocateNew();
FieldVector dataVector = mapVector.getDataVector();
if (dataVector != null) {
initFieldVector(dataVector);
}
} else if (fieldVector instanceof StructVector) {
StructVector structVector = (StructVector) fieldVector;
structVector.allocateNew();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,28 @@ private static void copyVectorData(
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.FieldVector shadedVector,
FieldVector nonShadedVector) {

// Check MapVector before ListVector since MapVector extends ListVector.
// In Lance, Map is represented as List<Struct<key, value>>, so the non-shaded
// side is a ListVector.
if (shadedVector
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector) {
if (!(nonShadedVector instanceof ListVector)) {
throw new IllegalArgumentException(
String.format(
"Shaded vector is MapVector but non-shaded vector is %s, expected ListVector.",
nonShadedVector.getClass().getSimpleName()));
}
// MapVector extends ListVector, so we can reuse the ListVector copy logic.
// The memory layout is compatible: MapVector -> StructVector(entries) -> [key, value]
// maps to ListVector -> StructVector(element) -> [key, value]
copyListVectorData(
(org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.MapVector)
shadedVector,
(ListVector) nonShadedVector);
return;
}

if (shadedVector
instanceof
org.apache.fluss.shaded.arrow.org.apache.arrow.vector.complex.ListVector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.fluss.types.FloatType;
import org.apache.fluss.types.IntType;
import org.apache.fluss.types.LocalZonedTimestampType;
import org.apache.fluss.types.MapType;
import org.apache.fluss.types.RowType;
import org.apache.fluss.types.SmallIntType;
import org.apache.fluss.types.StringType;
Expand Down Expand Up @@ -133,6 +134,19 @@ private static Field toArrowField(
for (DataField field : rowType.getFields()) {
children.add(toArrowField(field.getName(), field.getType(), tableProperties));
}
} else if (logicalType instanceof MapType) {
// Lance 0.33.0 does not support Arrow Map type natively.
// We convert MapType to List<Struct<key, value>> which is the equivalent
// representation that Lance supports.
MapType mapType = (MapType) logicalType;
Field keyField = toArrowField("key", mapType.getKeyType(), tableProperties);
Field valueField = toArrowField("value", mapType.getValueType(), tableProperties);
FieldType structFieldType = new FieldType(false, ArrowType.Struct.INSTANCE, null);
List<Field> structChildren = new ArrayList<>();
structChildren.add(keyField);
structChildren.add(valueField);
Field structField = new Field("element", structFieldType, structChildren);
children = Collections.singletonList(structField);
}
return new Field(fieldName, fieldType, children);
}
Expand Down Expand Up @@ -202,6 +216,10 @@ private static ArrowType toArrowType(DataType dataType) {
return ArrowType.List.INSTANCE;
} else if (dataType instanceof RowType) {
return ArrowType.Struct.INSTANCE;
} else if (dataType instanceof MapType) {
// Lance 0.33.0 does not support Arrow Map type natively.
// We use List type as the Arrow representation for Map.
return ArrowType.List.INSTANCE;
} else {
throw new UnsupportedOperationException(
String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,23 @@ protected long createLogTableWithArrayOfRowType(TablePath tablePath) throws Exce
return createTable(tablePath, tableBuilder.build());
}

protected long createLogTableWithMapType(TablePath tablePath) throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));

TableDescriptor.Builder tableBuilder =
TableDescriptor.builder()
.distributedBy(1, "id")
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));

tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
}

protected void writeRows(TablePath tablePath, List<InternalRow> rows, boolean append)
throws Exception {
try (Table table = conn.getTable(tablePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.server.zk.data.lake.LakeTable;

Expand Down Expand Up @@ -414,4 +415,69 @@ private String buildExpectedTsvForArrayOfRowTable(int rowCount) {
}
return sb.toString();
}

@Test
void testTieringWithMapType() throws Exception {
// Test: Log table with Map type
TablePath t1 = TablePath.of(DEFAULT_DB, "logTableWithMap");
long t1Id = createLogTableWithMapType(t1);
TableBucket t1Bucket = new TableBucket(t1Id, 0);

// Create map data
for (int i = 0; i < 10; i++) {
java.util.LinkedHashMap<BinaryString, Integer> map1 = new java.util.LinkedHashMap<>();
map1.put(BinaryString.fromString("age"), 25);
map1.put(BinaryString.fromString("score"), 100);

java.util.LinkedHashMap<BinaryString, Integer> map2 = new java.util.LinkedHashMap<>();
map2.put(BinaryString.fromString("age"), 30);
map2.put(BinaryString.fromString("score"), 95);

java.util.LinkedHashMap<BinaryString, Integer> map3 = new java.util.LinkedHashMap<>();
map3.put(BinaryString.fromString("age"), 35);
map3.put(BinaryString.fromString("score"), 88);

writeRows(
t1,
Arrays.asList(
row(1, "Alice", new GenericMap(map1)),
row(2, "Bob", new GenericMap(map2)),
row(3, "Charlie", new GenericMap(map3))),
true);
}

// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);

// check the status of replica after synced
assertReplicaStatus(t1Bucket, 30);

LanceConfig config1 =
LanceConfig.from(
lanceConf.toMap(),
Collections.emptyMap(),
t1.getDatabaseName(),
t1.getTableName());

// check data in lance using TSV string comparison
String expectedTsv1 = buildExpectedTsvForMapTable(30);
checkDataInLance(config1, expectedTsv1);
checkSnapshotPropertyInLance(config1, Collections.singletonMap(t1Bucket, 30L));

jobClient.cancel().get();
}

private String buildExpectedTsvForMapTable(int rowCount) {
StringBuilder sb = new StringBuilder();
sb.append("id\tname\tattributes\n");
for (int i = 0; i < rowCount / 3; i++) {
sb.append(
"1\tAlice\t[{\"key\":\"score\",\"value\":100},{\"key\":\"age\",\"value\":25}]\n");
sb.append(
"2\tBob\t[{\"key\":\"score\",\"value\":95},{\"key\":\"age\",\"value\":30}]\n");
sb.append(
"3\tCharlie\t[{\"key\":\"score\",\"value\":88},{\"key\":\"age\",\"value\":35}]\n");
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.fluss.record.LogRecord;
import org.apache.fluss.row.BinaryString;
import org.apache.fluss.row.GenericArray;
import org.apache.fluss.row.GenericMap;
import org.apache.fluss.row.GenericRow;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.types.Tuple2;
Expand Down Expand Up @@ -520,4 +521,161 @@ private void verifyNestedRowRecords(VectorSchemaRoot root, List<LogRecord> expec
assertThat(root.getVector(2)).isNotNull();
}
}

@ParameterizedTest
@MethodSource("tieringWriteArgs")
void testTieringWriteTableWithMapType(boolean isPartitioned) throws Exception {
int bucketNum = 3;
TablePath tablePath = TablePath.of("lance", "mapTable");
Map<String, String> customProperties = new HashMap<>();
customProperties.put("lance.batch_size", "256");
LanceConfig config =
LanceConfig.from(
configuration.toMap(),
customProperties,
tablePath.getDatabaseName(),
tablePath.getTableName());
Schema schema = createMapTable(config);

TableDescriptor descriptor =
TableDescriptor.builder()
.schema(schema)
.distributedBy(bucketNum)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.customProperties(customProperties)
.build();
TableInfo tableInfo =
TableInfo.of(tablePath, 0, 1, descriptor, DEFAULT_REMOTE_DATA_DIR, 1L, 1L);

List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
lanceLakeTieringFactory.getWriteResultSerializer();
SimpleVersionedSerializer<LanceCommittable> committableSerializer =
lanceLakeTieringFactory.getCommittableSerializer();

Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>();
Map<Long, String> partitionIdAndName =
isPartitioned
? new HashMap<Long, String>() {
{
put(1L, "p1");
put(2L, "p2");
put(3L, "p3");
}
}
: Collections.singletonMap(null, null);

// First, write data with map types
for (int bucket = 0; bucket < bucketNum; bucket++) {
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
String partition = entry.getValue();
try (LakeWriter<LanceWriteResult> lakeWriter =
createLakeWriter(tablePath, bucket, partition, tableInfo)) {
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
genMapLogRecords(bucket, 10);
List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
List<LogRecord> expectRecords = writeAndExpectRecords.f1;
recordsByBucket.put(partitionBucket, expectRecords);
for (LogRecord logRecord : writtenRecords) {
lakeWriter.write(logRecord);
}
// serialize/deserialize writeResult
LanceWriteResult lanceWriteResult = lakeWriter.complete();
byte[] serialized = writeResultSerializer.serialize(lanceWriteResult);
lanceWriteResults.add(
writeResultSerializer.deserialize(
writeResultSerializer.getVersion(), serialized));
}
}
}

// Second, commit data
try (LakeCommitter<LanceWriteResult, LanceCommittable> lakeCommitter =
createLakeCommitter(tablePath, tableInfo)) {
// serialize/deserialize committable
LanceCommittable lanceCommittable = lakeCommitter.toCommittable(lanceWriteResults);
byte[] serialized = committableSerializer.serialize(lanceCommittable);
lanceCommittable =
committableSerializer.deserialize(
committableSerializer.getVersion(), serialized);
Map<String, String> snapshotProperties =
Collections.singletonMap(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, "offsets");
LakeCommitResult commitResult =
lakeCommitter.commit(lanceCommittable, snapshotProperties);
// lance dataset version starts from 1
assertThat(commitResult.getCommittedSnapshotId()).isEqualTo(2);
}

try (Dataset dataset =
Dataset.open(
new RootAllocator(),
config.getDatasetUri(),
LanceConfig.genReadOptionFromConfig(config))) {
ArrowReader reader = dataset.newScan().scanBatches();
VectorSchemaRoot readerRoot = reader.getVectorSchemaRoot();

// Verify data can be read back
for (int bucket = 0; bucket < 3; bucket++) {
for (String partition : partitionIdAndName.values()) {
reader.loadNextBatch();
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
List<LogRecord> expectRecords = recordsByBucket.get(partitionBucket);
verifyMapRecords(readerRoot, expectRecords);
}
}
assertThat(reader.loadNextBatch()).isFalse();
}
}

private Schema createMapTable(LanceConfig config) {
Schema.Builder schemaBuilder =
Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("attributes", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()));
Schema schema = schemaBuilder.build();
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
LanceDatasetAdapter.createDataset(
config.getDatasetUri(), LanceArrowUtils.toArrowSchema(schema.getRowType()), params);

return schema;
}

private Tuple2<List<LogRecord>, List<LogRecord>> genMapLogRecords(int bucket, int numRecords) {
List<LogRecord> logRecords = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
GenericRow genericRow = new GenericRow(3);
genericRow.setField(0, i);
genericRow.setField(1, BinaryString.fromString("user" + bucket + "_" + i));

// Create map data
java.util.LinkedHashMap<BinaryString, Integer> mapData =
new java.util.LinkedHashMap<>();
mapData.put(BinaryString.fromString("age"), 20 + bucket);
mapData.put(BinaryString.fromString("score"), 100 + i);
genericRow.setField(2, new GenericMap(mapData));

LogRecord logRecord =
new GenericRecord(
i, System.currentTimeMillis(), ChangeType.APPEND_ONLY, genericRow);
logRecords.add(logRecord);
}
return Tuple2.of(logRecords, logRecords);
}

private void verifyMapRecords(VectorSchemaRoot root, List<LogRecord> expectRecords) {
assertThat(root.getRowCount()).isEqualTo(expectRecords.size());
for (int i = 0; i < expectRecords.size(); i++) {
LogRecord expectRecord = expectRecords.get(i);
// check id column
assertThat((int) (root.getVector(0).getObject(i)))
.isEqualTo(expectRecord.getRow().getInt(0));
// check name column
assertThat(((VarCharVector) root.getVector(1)).getObject(i).toString())
.isEqualTo(expectRecord.getRow().getString(1).toString());
// For map, verify that the map vector is not null and has correct structure
assertThat(root.getVector(2)).isNotNull();
}
}
}
Loading