From d923ce12f93d93fc3c0e100b718c3eeb92efc291 Mon Sep 17 00:00:00 2001 From: yingying Date: Thu, 22 Jan 2026 16:49:12 +0800 Subject: [PATCH 1/3] new aggregate function:merge_map_with_keytime --- .../FieldMergeMapWithKeyTimeAgg.java | 137 ++++++++++++++++++ .../FieldMergeMapWithKeyTimeAggFactory.java | 73 ++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../aggregate/FieldAggregatorTest.java | 112 ++++++++++++++ 4 files changed, 323 insertions(+) create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java new file mode 100644 index 000000000000..c36b1983bdfc --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate; + +import org.apache.paimon.data.GenericMap; +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalMap; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import java.util.HashMap; +import java.util.Map; + +/** Aggregator for merging maps with key and timestamp. */ +public class FieldMergeMapWithKeyTimeAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + + private final InternalArray.ElementGetter keyGetter; + private final InternalArray.ElementGetter valueGetter; + private final int timestampFieldIndex; + + public FieldMergeMapWithKeyTimeAgg(String name, MapType dataType) { + super(name, dataType); + this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType()); + this.valueGetter = InternalArray.createElementGetter(dataType.getValueType()); + + if (!(dataType.getValueType() instanceof RowType)) { + throw new IllegalArgumentException("Value type must be ROW"); + } + RowType rowType = (RowType) dataType.getValueType(); + if (rowType.getFieldCount() < 2) { + throw new IllegalArgumentException("ROW type must have at least 2 fields"); + } + + // 验证时间戳字段类型为STRING + if (!rowType.getTypeAt(1).equals(DataTypes.STRING())) { + throw new IllegalArgumentException("Timestamp field must be STRING"); + } + + // 移除了未使用的valueFieldIndex + this.timestampFieldIndex = 1; + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null) { + return inputField; + } + if (inputField == null) { + return accumulator; + } + + InternalMap accMap = (InternalMap) accumulator; + InternalMap inputMap = (InternalMap) inputField; + + // 将累加器转换为可修改的Map + Map resultMap = new HashMap<>(); + putToMap(resultMap, accMap); + + // 合并新数据 + mergeInputMap(resultMap, inputMap); + + return new GenericMap(resultMap); + } + + private void putToMap(Map map, InternalMap data) { + InternalArray keyArray = data.keyArray(); + InternalArray valueArray = data.valueArray(); + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + Object value = valueGetter.getElementOrNull(valueArray, i); + map.put(key, value); + } + } + + private void mergeInputMap(Map resultMap, InternalMap inputMap) { + InternalArray keyArray = inputMap.keyArray(); + InternalArray valueArray = inputMap.valueArray(); + + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + InternalRow newRow = (InternalRow) valueGetter.getElementOrNull(valueArray, i); + + if (newRow == null) { + resultMap.remove(key); + continue; + } + + // 跳过value/time为null的情况(时间戳为null) + if (newRow.isNullAt(timestampFieldIndex)) { + continue; + } + + Object existingValue = resultMap.get(key); + if (existingValue == null) { + resultMap.put(key, newRow); + } else { + InternalRow existingRow = (InternalRow) existingValue; + // 防御:检查现有行时间戳是否有效 + if (existingRow.isNullAt(timestampFieldIndex)) { + resultMap.put(key, newRow); + } else { + String newTs = newRow.getString(timestampFieldIndex).toString(); + String existingTs = existingRow.getString(timestampFieldIndex).toString(); + if (newTs.compareTo(existingTs) > 0) { + resultMap.put(key, newRow); + } + } + } + } + } + + @Override + public Object retract(Object accumulator, Object retractField) { + // 在合并树场景中通常不需要实现 + return null; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java new file mode 100644 index 000000000000..2530eb7ec73d --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapWithKeyTimeAggFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.mergetree.compact.aggregate.factory; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapWithKeyTimeAgg; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.MapType; +import org.apache.paimon.types.RowType; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** Factory for {@link FieldMergeMapWithKeyTimeAgg}. */ +public class FieldMergeMapWithKeyTimeAggFactory implements FieldAggregatorFactory { + + public static final String NAME = "merge_map_with_keytime"; + + @Override + public FieldMergeMapWithKeyTimeAgg create( + DataType fieldType, CoreOptions options, String field) { + checkArgument( + fieldType instanceof MapType, + "Data type for field '%s' must be 'MAP' but was '%s'", + field, + fieldType); + + MapType mapType = (MapType) fieldType; + DataType valueType = mapType.getValueType(); + + checkArgument( + valueType instanceof RowType, + "Value type of MAP for field '%s' must be ROW but was '%s'", + field, + valueType); + + RowType rowType = (RowType) valueType; + checkArgument( + rowType.getFieldCount() >= 2, + "ROW type for field '%s' must have at least 2 fields, but found %s", + field, + rowType.getFieldCount()); + + checkArgument( + DataTypes.STRING().equals(rowType.getTypeAt(1)), + "The second field (timestamp) of ROW in field '%s' must be STRING, but was '%s'", + field, + rowType.getTypeAt(1)); + + return new FieldMergeMapWithKeyTimeAgg(NAME, mapType); + } + + @Override + public String identifier() { + return NAME; + } +} diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 3bfb3480de1b..07ce9ff71e9d 100644 --- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -28,6 +28,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory +org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapWithKeyTimeAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 8541954be827..f0979b615a27 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -71,6 +71,7 @@ import java.io.IOException; import java.math.BigDecimal; +import java.util.AbstractMap; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -1215,4 +1216,115 @@ private Map toJavaMap(Object data) { } return result; } + + // ... 原有代码保持不变 ... + @Test + public void testFieldMergeMapWithKeyTimeAgg() { + // 创建聚合函数 + MapType mapType = + DataTypes.MAP( + DataTypes.STRING(), + DataTypes.ROW( + DataTypes.FIELD(0, "actual_value", DataTypes.STRING()), + DataTypes.FIELD(1, "dbsync_ts", DataTypes.STRING()))); + FieldMergeMapWithKeyTimeAgg agg = new FieldMergeMapWithKeyTimeAgg("test", mapType); + + // 测试数据准备 + GenericMap map1 = + createTestMap( + createEntry("key1", "A", "17682882903686900100"), + createEntry("key2", "B", "17682882903686900100")); + GenericMap map2 = + createTestMap( + createEntry("key1", "A1", "17682882903686900200"), + createEntry("key3", "C", "17682882903686900200")); + GenericMap map3 = createTestMap(createEntry("key2", "B2", "17682882903686900050")); + GenericMap map4 = + createTestMap( + createEntry("key3", null, null) // 删除标记 + ); + + // 执行测试 + Object acc = agg.agg(null, map1); + assertTestMap(acc, createExpectedEntry("key1", "A"), createExpectedEntry("key2", "B")); + + acc = agg.agg(acc, map2); + assertTestMap( + acc, + createExpectedEntry("key1", "A1"), + createExpectedEntry("key2", "B"), + createExpectedEntry("key3", "C")); + + acc = agg.agg(acc, map3); + assertTestMap( + acc, + createExpectedEntry("key1", "A1"), + createExpectedEntry("key2", "B"), // 未改变 + createExpectedEntry("key3", "C")); + + acc = agg.agg(acc, map4); + assertTestMap( + acc, createExpectedEntry("key1", "A1"), createExpectedEntry("key2", "B") // key3 被删除 + ); + } + + // 辅助方法:创建测试条目 + private Map.Entry createEntry(String key, String value, String ts) { + return new AbstractMap.SimpleEntry<>( + BinaryString.fromString(key), + GenericRow.of( + value == null ? null : BinaryString.fromString(value), + ts == null ? null : BinaryString.fromString(ts))); + } + + // 辅助方法:创建预期条目 + private Map.Entry createExpectedEntry(String key, String value) { + return new AbstractMap.SimpleEntry<>(key, value); + } + + @SafeVarargs + private final GenericMap createTestMap(Map.Entry... entries) { + Map map = new HashMap<>(); + for (Map.Entry entry : entries) { + map.put(entry.getKey(), entry.getValue()); + } + return new GenericMap(map); + } + + // 修复后的断言方法 + @SafeVarargs + private final void assertTestMap(Object mapObj, Map.Entry... expected) { + InternalMap map = (InternalMap) mapObj; + Map actual = new HashMap<>(); + + // 使用 InternalArray 替代 ArrayData + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + + for (int i = 0; i < map.size(); i++) { + // 获取键(BinaryString) + BinaryString keyBinary = keyArray.getString(i); + String key = keyBinary.toString(); + + // 获取值行(InternalRow) + InternalRow row = valueArray.getRow(i, 2); + + // 处理可能为 null 的值 + String value = null; + if (!row.isNullAt(0)) { + BinaryString valueBinary = row.getString(0); + value = valueBinary != null ? valueBinary.toString() : null; + } + actual.put(key, value); + } + + // 构建预期结果 + Map expectedMap = new HashMap<>(); + for (Map.Entry e : expected) { + expectedMap.put(e.getKey(), e.getValue()); + } + + // 断言 + assertThat(actual).containsExactlyInAnyOrderEntriesOf(expectedMap); + } } From d1dbb727413c4b0d3868ff3e72003e4e9a6488c0 Mon Sep 17 00:00:00 2001 From: yingying Date: Thu, 22 Jan 2026 17:42:33 +0800 Subject: [PATCH 2/3] new aggregate function:merge_map_with_keytime-fix bug --- .../FieldMergeMapWithKeyTimeAgg.java | 5 ----- .../aggregate/FieldAggregatorTest.java | 20 ------------------- 2 files changed, 25 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java index c36b1983bdfc..2a6f5992f512 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapWithKeyTimeAgg.java @@ -72,11 +72,9 @@ public Object agg(Object accumulator, Object inputField) { InternalMap accMap = (InternalMap) accumulator; InternalMap inputMap = (InternalMap) inputField; - // 将累加器转换为可修改的Map Map resultMap = new HashMap<>(); putToMap(resultMap, accMap); - // 合并新数据 mergeInputMap(resultMap, inputMap); return new GenericMap(resultMap); @@ -105,7 +103,6 @@ private void mergeInputMap(Map resultMap, InternalMap inputMap) continue; } - // 跳过value/time为null的情况(时间戳为null) if (newRow.isNullAt(timestampFieldIndex)) { continue; } @@ -115,7 +112,6 @@ private void mergeInputMap(Map resultMap, InternalMap inputMap) resultMap.put(key, newRow); } else { InternalRow existingRow = (InternalRow) existingValue; - // 防御:检查现有行时间戳是否有效 if (existingRow.isNullAt(timestampFieldIndex)) { resultMap.put(key, newRow); } else { @@ -131,7 +127,6 @@ private void mergeInputMap(Map resultMap, InternalMap inputMap) @Override public Object retract(Object accumulator, Object retractField) { - // 在合并树场景中通常不需要实现 return null; } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index f0979b615a27..331f993759dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -1217,10 +1217,8 @@ private Map toJavaMap(Object data) { return result; } - // ... 原有代码保持不变 ... @Test public void testFieldMergeMapWithKeyTimeAgg() { - // 创建聚合函数 MapType mapType = DataTypes.MAP( DataTypes.STRING(), @@ -1229,7 +1227,6 @@ public void testFieldMergeMapWithKeyTimeAgg() { DataTypes.FIELD(1, "dbsync_ts", DataTypes.STRING()))); FieldMergeMapWithKeyTimeAgg agg = new FieldMergeMapWithKeyTimeAgg("test", mapType); - // 测试数据准备 GenericMap map1 = createTestMap( createEntry("key1", "A", "17682882903686900100"), @@ -1239,12 +1236,7 @@ public void testFieldMergeMapWithKeyTimeAgg() { createEntry("key1", "A1", "17682882903686900200"), createEntry("key3", "C", "17682882903686900200")); GenericMap map3 = createTestMap(createEntry("key2", "B2", "17682882903686900050")); - GenericMap map4 = - createTestMap( - createEntry("key3", null, null) // 删除标记 - ); - // 执行测试 Object acc = agg.agg(null, map1); assertTestMap(acc, createExpectedEntry("key1", "A"), createExpectedEntry("key2", "B")); @@ -1262,13 +1254,8 @@ public void testFieldMergeMapWithKeyTimeAgg() { createExpectedEntry("key2", "B"), // 未改变 createExpectedEntry("key3", "C")); - acc = agg.agg(acc, map4); - assertTestMap( - acc, createExpectedEntry("key1", "A1"), createExpectedEntry("key2", "B") // key3 被删除 - ); } - // 辅助方法:创建测试条目 private Map.Entry createEntry(String key, String value, String ts) { return new AbstractMap.SimpleEntry<>( BinaryString.fromString(key), @@ -1277,7 +1264,6 @@ private Map.Entry createEntry(String key, String valu ts == null ? null : BinaryString.fromString(ts))); } - // 辅助方法:创建预期条目 private Map.Entry createExpectedEntry(String key, String value) { return new AbstractMap.SimpleEntry<>(key, value); } @@ -1291,25 +1277,20 @@ private final GenericMap createTestMap(Map.Entry... e return new GenericMap(map); } - // 修复后的断言方法 @SafeVarargs private final void assertTestMap(Object mapObj, Map.Entry... expected) { InternalMap map = (InternalMap) mapObj; Map actual = new HashMap<>(); - // 使用 InternalArray 替代 ArrayData InternalArray keyArray = map.keyArray(); InternalArray valueArray = map.valueArray(); for (int i = 0; i < map.size(); i++) { - // 获取键(BinaryString) BinaryString keyBinary = keyArray.getString(i); String key = keyBinary.toString(); - // 获取值行(InternalRow) InternalRow row = valueArray.getRow(i, 2); - // 处理可能为 null 的值 String value = null; if (!row.isNullAt(0)) { BinaryString valueBinary = row.getString(0); @@ -1318,7 +1299,6 @@ private final void assertTestMap(Object mapObj, Map.Entry... exp actual.put(key, value); } - // 构建预期结果 Map expectedMap = new HashMap<>(); for (Map.Entry e : expected) { expectedMap.put(e.getKey(), e.getValue()); From 26eb8844e31c89d5d471a48b9c91373e02bf6662 Mon Sep 17 00:00:00 2001 From: yingying Date: Thu, 22 Jan 2026 18:32:49 +0800 Subject: [PATCH 3/3] new aggregate function:merge_map_with_keytime-fix bug --- .../mergetree/compact/aggregate/FieldAggregatorTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java index 331f993759dd..007946696ef0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java @@ -1251,9 +1251,8 @@ public void testFieldMergeMapWithKeyTimeAgg() { assertTestMap( acc, createExpectedEntry("key1", "A1"), - createExpectedEntry("key2", "B"), // 未改变 + createExpectedEntry("key2", "B"), createExpectedEntry("key3", "C")); - } private Map.Entry createEntry(String key, String value, String ts) {