diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java new file mode 100644 index 000000000000..2a6f5992f512 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate; + +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import java.util.HashMap; +import java.util.Map; + +/** Aggregator for merging maps with key and timestamp. */ +public class FieldMergeMapWithKeyTimeAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + + private final InternalArray.ElementGetter keyGetter; + private final InternalArray.ElementGetter valueGetter; + private final int timestampFieldIndex; + + public FieldMergeMapWithKeyTimeAgg(String name, MapType dataType) { + super(name, dataType); + this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType()); + this.valueGetter = InternalArray.createElementGetter(dataType.getValueType()); + + if (!(dataType.getValueType() instanceof RowType)) { + throw new IllegalArgumentException("Value type must be ROW"); + } + RowType rowType = (RowType) dataType.getValueType(); + if (rowType.getFieldCount() < 2) { + throw new IllegalArgumentException("ROW type must have at least 2 fields"); + } + + // 验证时间戳字段类型为STRING + if (!rowType.getTypeAt(1).equals(DataTypes.STRING())) { + throw new IllegalArgumentException("Timestamp field must be STRING"); + } + + // 移除了未使用的valueFieldIndex + this.timestampFieldIndex = 1; + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null) { + return inputField; + } + if (inputField == null) { + return accumulator; + } + + InternalMap accMap = (InternalMap) accumulator; + InternalMap inputMap = (InternalMap) inputField; + + Map resultMap = new HashMap<>(); + putToMap(resultMap, accMap); + + mergeInputMap(resultMap, inputMap); + + return new GenericMap(resultMap); + } + + private void putToMap(Map map, InternalMap data) { + InternalArray keyArray = data.keyArray(); + InternalArray valueArray = data.valueArray(); + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + Object value = valueGetter.getElementOrNull(valueArray, i); + map.put(key, value); + } + } + + private void mergeInputMap(Map resultMap, InternalMap inputMap) { + InternalArray keyArray = inputMap.keyArray(); + InternalArray valueArray = inputMap.valueArray(); + + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + InternalRow newRow = (InternalRow) valueGetter.getElementOrNull(valueArray, i); + + if (newRow == null) { + resultMap.remove(key); + continue; + } + + if (newRow.isNullAt(timestampFieldIndex)) { + continue; + } + + Object existingValue = resultMap.get(key); + if (existingValue == null) { + resultMap.put(key, newRow); + } else { + InternalRow existingRow = (InternalRow) existingValue; + if (existingRow.isNullAt(timestampFieldIndex)) { + resultMap.put(key, newRow); + } else { + String newTs = newRow.getString(timestampFieldIndex).toString(); + String existingTs = existingRow.getString(timestampFieldIndex).toString(); + if (newTs.compareTo(existingTs) > 0) { + resultMap.put(key, newRow); + } + } + } + } + } + + @Override + public Object retract(Object accumulator, Object retractField) { + return null; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java new file mode 100644 index 000000000000..2530eb7ec73d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate.factory; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapWithKeyTimeAgg; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Factory for {@link FieldMergeMapWithKeyTimeAgg}. */ +public class FieldMergeMapWithKeyTimeAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "merge_map_with_keytime"; + + @Override + public FieldMergeMapWithKeyTimeAgg create( + DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType instanceof MapType, + "Data type for field '%s' must be 'MAP' but was '%s'", + field, + fieldType); + + MapType mapType = (MapType) fieldType; + DataType valueType = mapType.getValueType(); + + checkArgument( + valueType instanceof RowType, + "Value type of MAP for field '%s' must be ROW but was '%s'", + field, + valueType); + + RowType rowType = (RowType) valueType; + checkArgument( + rowType.getFieldCount() >= 2, + "ROW type for field '%s' must have at least 2 fields, but found %s", + field, + rowType.getFieldCount()); + + checkArgument( + DataTypes.STRING().equals(rowType.getTypeAt(1)), + "The second field (timestamp) of ROW in field '%s' must be STRING, but was '%s'", + field, + rowType.getTypeAt(1)); + + return new FieldMergeMapWithKeyTimeAgg(NAME, mapType); + } + + @Override + public String identifier() { + return NAME; + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 3bfb3480de1b..07ce9ff71e9d 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -28,6 +28,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory +org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapWithKeyTimeAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 8541954be827..007946696ef0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -71,6 +71,7 @@ import java.io.IOException; import java.math.BigDecimal; +import java.util.AbstractMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -1215,4 +1216,94 @@ private Map toJavaMap(Object data) { } return result; } + + @Test + public void testFieldMergeMapWithKeyTimeAgg() { + MapType mapType = + DataTypes.MAP( + DataTypes.STRING(), + DataTypes.ROW( + DataTypes.FIELD(0, "actual_value", DataTypes.STRING()), + DataTypes.FIELD(1, "dbsync_ts", DataTypes.STRING()))); + FieldMergeMapWithKeyTimeAgg agg = new FieldMergeMapWithKeyTimeAgg("test", mapType); + + GenericMap map1 = + createTestMap( + createEntry("key1", "A", "17682882903686900100"), + createEntry("key2", "B", "17682882903686900100")); + GenericMap map2 = + createTestMap( + createEntry("key1", "A1", "17682882903686900200"), + createEntry("key3", "C", "17682882903686900200")); + GenericMap map3 = createTestMap(createEntry("key2", "B2", "17682882903686900050")); + + Object acc = agg.agg(null, map1); + assertTestMap(acc, createExpectedEntry("key1", "A"), createExpectedEntry("key2", "B")); + + acc = agg.agg(acc, map2); + assertTestMap( + acc, + createExpectedEntry("key1", "A1"), + createExpectedEntry("key2", "B"), + createExpectedEntry("key3", "C")); + + acc = agg.agg(acc, map3); + assertTestMap( + acc, + createExpectedEntry("key1", "A1"), + createExpectedEntry("key2", "B"), + createExpectedEntry("key3", "C")); + } + + private Map.Entry createEntry(String key, String value, String ts) { + return new AbstractMap.SimpleEntry<>( + BinaryString.fromString(key), + GenericRow.of( + value == null ? null : BinaryString.fromString(value), + ts == null ? null : BinaryString.fromString(ts))); + } + + private Map.Entry createExpectedEntry(String key, String value) { + return new AbstractMap.SimpleEntry<>(key, value); + } + + @SafeVarargs + private final GenericMap createTestMap(Map.Entry... entries) { + Map map = new HashMap<>(); + for (Map.Entry entry : entries) { + map.put(entry.getKey(), entry.getValue()); + } + return new GenericMap(map); + } + + @SafeVarargs + private final void assertTestMap(Object mapObj, Map.Entry... expected) { + InternalMap map = (InternalMap) mapObj; + Map actual = new HashMap<>(); + + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + + for (int i = 0; i < map.size(); i++) { + BinaryString keyBinary = keyArray.getString(i); + String key = keyBinary.toString(); + + InternalRow row = valueArray.getRow(i, 2); + + String value = null; + if (!row.isNullAt(0)) { + BinaryString valueBinary = row.getString(0); + value = valueBinary != null ? valueBinary.toString() : null; + } + actual.put(key, value); + } + + Map expectedMap = new HashMap<>(); + for (Map.Entry e : expected) { + expectedMap.put(e.getKey(), e.getValue()); + } + + // 断言 + assertThat(actual).containsExactlyInAnyOrderEntriesOf(expectedMap); + } }