Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate;

import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import java.util.HashMap;
import java.util.Map;

/** Aggregator for merging maps with key and timestamp. */
public class FieldMergeMapWithKeyTimeAgg extends FieldAggregator {

private static final long serialVersionUID = 1L;

private final InternalArray.ElementGetter keyGetter;
private final InternalArray.ElementGetter valueGetter;
private final int timestampFieldIndex;

public FieldMergeMapWithKeyTimeAgg(String name, MapType dataType) {
super(name, dataType);
this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType());
this.valueGetter = InternalArray.createElementGetter(dataType.getValueType());

if (!(dataType.getValueType() instanceof RowType)) {
throw new IllegalArgumentException("Value type must be ROW<value, timestamp>");
}
RowType rowType = (RowType) dataType.getValueType();
if (rowType.getFieldCount() < 2) {
throw new IllegalArgumentException("ROW type must have at least 2 fields");
}

// 验证时间戳字段类型为STRING
if (!rowType.getTypeAt(1).equals(DataTypes.STRING())) {
throw new IllegalArgumentException("Timestamp field must be STRING");
}

// 移除了未使用的valueFieldIndex
this.timestampFieldIndex = 1;
}

@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null) {
return inputField;
}
if (inputField == null) {
return accumulator;
}

InternalMap accMap = (InternalMap) accumulator;
InternalMap inputMap = (InternalMap) inputField;

Map<Object, Object> resultMap = new HashMap<>();
putToMap(resultMap, accMap);

mergeInputMap(resultMap, inputMap);

return new GenericMap(resultMap);
}

private void putToMap(Map<Object, Object> map, InternalMap data) {
InternalArray keyArray = data.keyArray();
InternalArray valueArray = data.valueArray();
for (int i = 0; i < keyArray.size(); i++) {
Object key = keyGetter.getElementOrNull(keyArray, i);
Object value = valueGetter.getElementOrNull(valueArray, i);
map.put(key, value);
}
}

private void mergeInputMap(Map<Object, Object> resultMap, InternalMap inputMap) {
InternalArray keyArray = inputMap.keyArray();
InternalArray valueArray = inputMap.valueArray();

for (int i = 0; i < keyArray.size(); i++) {
Object key = keyGetter.getElementOrNull(keyArray, i);
InternalRow newRow = (InternalRow) valueGetter.getElementOrNull(valueArray, i);

if (newRow == null) {
resultMap.remove(key);
continue;
}

if (newRow.isNullAt(timestampFieldIndex)) {
continue;
}

Object existingValue = resultMap.get(key);
if (existingValue == null) {
resultMap.put(key, newRow);
} else {
InternalRow existingRow = (InternalRow) existingValue;
if (existingRow.isNullAt(timestampFieldIndex)) {
resultMap.put(key, newRow);
} else {
String newTs = newRow.getString(timestampFieldIndex).toString();
String existingTs = existingRow.getString(timestampFieldIndex).toString();
if (newTs.compareTo(existingTs) > 0) {
resultMap.put(key, newRow);
}
}
}
}
}

@Override
public Object retract(Object accumulator, Object retractField) {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.mergetree.compact.aggregate.factory;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapWithKeyTimeAgg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;

import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Factory for {@link FieldMergeMapWithKeyTimeAgg}. */
public class FieldMergeMapWithKeyTimeAggFactory implements FieldAggregatorFactory {

public static final String NAME = "merge_map_with_keytime";

@Override
public FieldMergeMapWithKeyTimeAgg create(
DataType fieldType, CoreOptions options, String field) {
checkArgument(
fieldType instanceof MapType,
"Data type for field '%s' must be 'MAP' but was '%s'",
field,
fieldType);

MapType mapType = (MapType) fieldType;
DataType valueType = mapType.getValueType();

checkArgument(
valueType instanceof RowType,
"Value type of MAP for field '%s' must be ROW but was '%s'",
field,
valueType);

RowType rowType = (RowType) valueType;
checkArgument(
rowType.getFieldCount() >= 2,
"ROW type for field '%s' must have at least 2 fields, but found %s",
field,
rowType.getFieldCount());

checkArgument(
DataTypes.STRING().equals(rowType.getTypeAt(1)),
"The second field (timestamp) of ROW in field '%s' must be STRING, but was '%s'",
field,
rowType.getTypeAt(1));

return new FieldMergeMapWithKeyTimeAgg(NAME, mapType);
}

@Override
public String identifier() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapWithKeyTimeAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedPartialUpdateAggFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -1215,4 +1216,94 @@ private Map<Object, Object> toJavaMap(Object data) {
}
return result;
}

@Test
public void testFieldMergeMapWithKeyTimeAgg() {
MapType mapType =
DataTypes.MAP(
DataTypes.STRING(),
DataTypes.ROW(
DataTypes.FIELD(0, "actual_value", DataTypes.STRING()),
DataTypes.FIELD(1, "dbsync_ts", DataTypes.STRING())));
FieldMergeMapWithKeyTimeAgg agg = new FieldMergeMapWithKeyTimeAgg("test", mapType);

GenericMap map1 =
createTestMap(
createEntry("key1", "A", "17682882903686900100"),
createEntry("key2", "B", "17682882903686900100"));
GenericMap map2 =
createTestMap(
createEntry("key1", "A1", "17682882903686900200"),
createEntry("key3", "C", "17682882903686900200"));
GenericMap map3 = createTestMap(createEntry("key2", "B2", "17682882903686900050"));

Object acc = agg.agg(null, map1);
assertTestMap(acc, createExpectedEntry("key1", "A"), createExpectedEntry("key2", "B"));

acc = agg.agg(acc, map2);
assertTestMap(
acc,
createExpectedEntry("key1", "A1"),
createExpectedEntry("key2", "B"),
createExpectedEntry("key3", "C"));

acc = agg.agg(acc, map3);
assertTestMap(
acc,
createExpectedEntry("key1", "A1"),
createExpectedEntry("key2", "B"),
createExpectedEntry("key3", "C"));
}

private Map.Entry<BinaryString, InternalRow> createEntry(String key, String value, String ts) {
return new AbstractMap.SimpleEntry<>(
BinaryString.fromString(key),
GenericRow.of(
value == null ? null : BinaryString.fromString(value),
ts == null ? null : BinaryString.fromString(ts)));
}

private Map.Entry<String, String> createExpectedEntry(String key, String value) {
return new AbstractMap.SimpleEntry<>(key, value);
}

@SafeVarargs
private final GenericMap createTestMap(Map.Entry<BinaryString, InternalRow>... entries) {
Map<BinaryString, InternalRow> map = new HashMap<>();
for (Map.Entry<BinaryString, InternalRow> entry : entries) {
map.put(entry.getKey(), entry.getValue());
}
return new GenericMap(map);
}

@SafeVarargs
private final void assertTestMap(Object mapObj, Map.Entry<String, String>... expected) {
InternalMap map = (InternalMap) mapObj;
Map<String, String> actual = new HashMap<>();

InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();

for (int i = 0; i < map.size(); i++) {
BinaryString keyBinary = keyArray.getString(i);
String key = keyBinary.toString();

InternalRow row = valueArray.getRow(i, 2);

String value = null;
if (!row.isNullAt(0)) {
BinaryString valueBinary = row.getString(0);
value = valueBinary != null ? valueBinary.toString() : null;
}
actual.put(key, value);
}

Map<String, String> expectedMap = new HashMap<>();
for (Map.Entry<String, String> e : expected) {
expectedMap.put(e.getKey(), e.getValue());
}

// 断言
assertThat(actual).containsExactlyInAnyOrderEntriesOf(expectedMap);
}
}