diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java index 996b3d3e3c..92180006fc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunction.java @@ -63,6 +63,7 @@ public final class AggFunction implements Serializable { parameters == null || parameters.isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(parameters)); + validate(); } /** @@ -94,6 +95,28 @@ public String getParameter(String key) { return parameters.get(key); } + /** + * Gets a boolean parameter value with strict true/false parsing. + * + * @param key the parameter key + * @param defaultValue the default value if parameter is not present + * @return the parsed boolean value or defaultValue if missing + * @throws IllegalArgumentException if the parameter value is not "true" or "false" + */ + public boolean getBooleanParameter(String key, boolean defaultValue) { + String value = parameters.get(key); + if (value == null) { + return defaultValue; + } + if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) { + return Boolean.parseBoolean(value); + } + throw new IllegalArgumentException( + String.format( + "Parameter '%s' for aggregation function '%s' must be 'true' or 'false'", + key, type)); + } + /** * Checks if this function has any parameters. * diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java index 9cf148ad2e..467d2190c1 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctionType.java @@ -49,11 +49,18 @@ public enum AggFunctionType { // Boolean aggregation BOOL_AND, - BOOL_OR; + BOOL_OR, + + // Collection aggregation + COLLECT, + MERGE_MAP; /** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */ public static final String PARAM_DELIMITER = "delimiter"; + /** Parameter name for distinct flag used in COLLECT function. */ + public static final String PARAM_DISTINCT = "distinct"; + /** * Returns the set of supported parameter names for this aggregation function. * @@ -65,6 +72,9 @@ public Set getSupportedParameters() { case STRING_AGG: // LISTAGG and STRING_AGG support optional "delimiter" parameter return Collections.singleton(PARAM_DELIMITER); + case COLLECT: + // COLLECT supports optional "distinct" parameter + return Collections.singleton(PARAM_DISTINCT); default: // All other functions do not accept parameters return Collections.emptySet(); @@ -105,6 +115,18 @@ public void validateParameter(String parameterName, String parameterValue) { } } break; + case COLLECT: + if (PARAM_DISTINCT.equals(parameterName)) { + if (parameterValue == null + || (!parameterValue.equalsIgnoreCase("true") + && !parameterValue.equalsIgnoreCase("false"))) { + throw new IllegalArgumentException( + String.format( + "Parameter '%s' for aggregation function '%s' must be 'true' or 'false'", + parameterName, this)); + } + } + break; default: // No validation needed for other functions (they don't have parameters) break; diff --git a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java index 7cc7024d0f..bd2c94f05b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java +++ b/fluss-common/src/main/java/org/apache/fluss/metadata/AggFunctions.java @@ -271,6 +271,56 @@ public static AggFunction BOOL_OR() { return new AggFunction(AggFunctionType.BOOL_OR, null); } + // =================================================================================== + // Collection Aggregation Functions + // =================================================================================== + + /** + * Creates a COLLECT aggregation function that collects multiple elements into an array. + * + *

Supported data types: ARRAY<T> + * + *

Null handling: Null values are included in the collection + * + * @return a COLLECT aggregation function + */ + public static AggFunction COLLECT() { + return new AggFunction(AggFunctionType.COLLECT, null); + } + + /** + * Creates a COLLECT aggregation function with distinct behavior. + * + *

Collects multiple elements into an array, optionally removing duplicates. + * + *

Supported data types: ARRAY<T> + * + *

Null handling: Null values are included in the collection + * + * @param distinct whether to remove duplicates + * @return a COLLECT aggregation function + */ + public static AggFunction COLLECT(boolean distinct) { + Map params = new HashMap<>(); + params.put("distinct", String.valueOf(distinct)); + return new AggFunction(AggFunctionType.COLLECT, params); + } + + /** + * Creates a MERGE_MAP aggregation function that merges multiple maps. + * + *

Merges maps by combining their key-value pairs. For duplicate keys, the latest value wins. + * + *

Supported data types: MAP<K, V> + * + *

Null handling: Null keys are not allowed; null values are allowed + * + * @return a MERGE_MAP aggregation function + */ + public static AggFunction MERGE_MAP() { + return new AggFunction(AggFunctionType.MERGE_MAP, null); + } + // =================================================================================== // Internal Factory Methods // =================================================================================== diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalArrayUtils.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalArrayUtils.java new file mode 100644 index 0000000000..a90f319af1 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalArrayUtils.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; +import org.apache.fluss.row.array.AlignedArray; +import org.apache.fluss.row.array.CompactedArray; +import org.apache.fluss.row.array.IndexedArray; +import org.apache.fluss.row.serializer.ArraySerializer; +import org.apache.fluss.types.DataType; + +/** Utility methods for {@link InternalArray}. */ +public final class InternalArrayUtils { + + private InternalArrayUtils() {} + + public static Object[] toObjectArray(InternalArray array, DataType elementType) { + Object[] result = new Object[array.size()]; + InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementType); + for (int i = 0; i < array.size(); i++) { + Object element = elementGetter.getElementOrNull(array, i); + if (element instanceof BinaryString) { + element = element.toString(); + } + result[i] = element; + } + return result; + } + + public static BinaryArray toBinaryArray( + InternalArray array, DataType elementType, BinaryRowFormat rowFormat) { + ArraySerializer serializer = new ArraySerializer(elementType, rowFormat); + return serializer.toBinaryArray(array); + } + + public static BinaryArray concatToBinaryArray( + InternalArray left, + InternalArray right, + DataType elementType, + BinaryRowFormat rowFormat) { + if (left == null && right == null) { + return null; + } + if (left == null) { + return toBinaryArray(right, elementType, rowFormat); + } + if (right == null) { + return toBinaryArray(left, elementType, rowFormat); + } + if (left.size() == 0) { + return toBinaryArray(right, elementType, rowFormat); + } + if (right.size() == 0) { + return toBinaryArray(left, elementType, rowFormat); + } + + int totalSize = left.size() + right.size(); + BinaryArray result; + switch (rowFormat) { + case COMPACTED: + result = new CompactedArray(elementType); + break; + case INDEXED: + result = new IndexedArray(elementType); + break; + case ALIGNED: + result = new AlignedArray(); + break; + default: + throw new IllegalArgumentException("Unsupported row format: " + rowFormat); + } + BinaryArrayWriter writer = + new BinaryArrayWriter( + result, totalSize, BinaryArray.calculateFixLengthPartSize(elementType)); + InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementType); + BinaryWriter.ValueWriter valueWriter = + BinaryWriter.createValueWriter(elementType, rowFormat); + int leftSize = left.size(); + for (int i = 0; i < leftSize; i++) { + if (left.isNullAt(i)) { + writer.setNullAt(i, elementType); + } else { + valueWriter.writeValue(writer, i, elementGetter.getElementOrNull(left, i)); + } + } + for (int i = 0; i < right.size(); i++) { + int pos = leftSize + i; + if (right.isNullAt(i)) { + writer.setNullAt(pos, elementType); + } else { + valueWriter.writeValue(writer, pos, elementGetter.getElementOrNull(right, i)); + } + } + writer.complete(); + return result; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/InternalMapUtils.java b/fluss-common/src/main/java/org/apache/fluss/row/InternalMapUtils.java new file mode 100644 index 0000000000..da98bd21cf --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/InternalMapUtils.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.apache.fluss.types.DataType; + +import java.util.HashMap; +import java.util.Map; + +/** Utility methods for {@link InternalMap}. */ +public final class InternalMapUtils { + + private InternalMapUtils() {} + + public static Map toJavaMap( + InternalMap map, DataType keyType, DataType valueType) { + Map result = new HashMap<>(); + InternalArray keyArray = map.keyArray(); + InternalArray valueArray = map.valueArray(); + InternalArray.ElementGetter keyGetter = InternalArray.createElementGetter(keyType); + InternalArray.ElementGetter valueGetter = InternalArray.createElementGetter(valueType); + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + Object value = valueGetter.getElementOrNull(valueArray, i); + if (key instanceof BinaryString) { + key = key.toString(); + } + if (value instanceof BinaryString) { + value = value.toString(); + } + result.put(key, value); + } + return result; + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayUtilsTest.java new file mode 100644 index 0000000000..c22b35fa18 --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/row/InternalArrayUtilsTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.row; + +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; +import org.apache.fluss.row.array.CompactedArray; +import org.apache.fluss.types.DataTypes; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link InternalArrayUtils}. */ +public class InternalArrayUtilsTest { + + @Test + public void testToObjectArrayConvertsBinaryString() { + GenericArray input = + new GenericArray( + new Object[] {BinaryString.fromString("a"), BinaryString.fromString("b")}); + + Object[] result = InternalArrayUtils.toObjectArray(input, DataTypes.STRING()); + + assertThat(result).containsExactly("a", "b"); + } + + @Test + public void testConcatToBinaryArrayCompacted() { + InternalArray left = GenericArray.of(1, 2); + InternalArray right = GenericArray.of(3); + + BinaryArray result = + InternalArrayUtils.concatToBinaryArray( + left, right, DataTypes.INT(), BinaryRowFormat.COMPACTED); + + assertThat(result).isInstanceOf(CompactedArray.class); + assertThat((Integer[]) result.toObjectArray(DataTypes.INT())).containsExactly(1, 2, 3); + } + + @Test + public void testConcatToBinaryArrayEmptyLeft() { + InternalArray left = new GenericArray(new Object[0]); + InternalArray right = GenericArray.of(7); + + BinaryArray result = + InternalArrayUtils.concatToBinaryArray( + left, right, DataTypes.INT(), BinaryRowFormat.COMPACTED); + + assertThat((Integer[]) result.toObjectArray(DataTypes.INT())).containsExactly(7); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContext.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContext.java index 7cec63a4e8..d4350a85f2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/AggregationContext.java @@ -23,6 +23,7 @@ import org.apache.fluss.metadata.AggFunctions; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.encode.RowEncoder; import org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory; @@ -220,6 +221,13 @@ public static AggregationContext create(Schema schema, KvFormat kvFormat) { // Create aggregators FieldAggregator[] aggregators = createAggregators(schema); + BinaryRowFormat rowFormat = + kvFormat == KvFormat.COMPACTED + ? BinaryRowFormat.COMPACTED + : BinaryRowFormat.INDEXED; + for (FieldAggregator aggregator : aggregators) { + aggregator.setRowFormat(rowFormat); + } // Create row encoder RowEncoder rowEncoder = RowEncoder.create(kvFormat, rowType); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldCollectAggFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldCollectAggFactory.java new file mode 100644 index 0000000000..91e430e653 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldCollectAggFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.factory; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.AggFunctionType; +import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldCollectAgg; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.DataType; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Factory for {@link FieldCollectAgg}. */ +public class FieldCollectAggFactory implements FieldAggregatorFactory { + + @Override + public FieldCollectAgg create(DataType fieldType, AggFunction aggFunction) { + checkArgument( + fieldType instanceof ArrayType, + "Data type for collect column must be 'ArrayType' but was '%s'.", + fieldType); + + // Get distinct parameter, default to false + boolean distinct = aggFunction.getBooleanParameter(AggFunctionType.PARAM_DISTINCT, false); + + return new FieldCollectAgg((ArrayType) fieldType, distinct); + } + + @Override + public String identifier() { + return AggFunctionType.COLLECT.toString(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldMergeMapAggFactory.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldMergeMapAggFactory.java new file mode 100644 index 0000000000..f4dfd20ef8 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/factory/FieldMergeMapAggFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.factory; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.metadata.AggFunction; +import org.apache.fluss.metadata.AggFunctionType; +import org.apache.fluss.server.kv.rowmerger.aggregate.functions.FieldMergeMapAgg; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.MapType; + +import static org.apache.fluss.utils.Preconditions.checkArgument; + +/** Factory for {@link FieldMergeMapAgg}. */ +public class FieldMergeMapAggFactory implements FieldAggregatorFactory { + + @Override + public FieldMergeMapAgg create(DataType fieldType, AggFunction aggFunction) { + checkArgument( + fieldType instanceof MapType, + "Data type for merge_map column must be 'MapType' but was '%s'.", + fieldType); + + return new FieldMergeMapAgg((MapType) fieldType); + } + + @Override + public String identifier() { + return AggFunctionType.MERGE_MAP.toString(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldAggregator.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldAggregator.java index f6ec77a494..a50ed7d60e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldAggregator.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldAggregator.java @@ -22,6 +22,7 @@ * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for * additional information regarding copyright ownership. */ +import org.apache.fluss.row.BinaryRow.BinaryRowFormat; import org.apache.fluss.types.DataType; import org.apache.fluss.types.DataTypeRoot; @@ -34,6 +35,7 @@ public abstract class FieldAggregator implements Serializable { protected final DataType fieldType; protected final DataTypeRoot typeRoot; + protected BinaryRowFormat rowFormat = BinaryRowFormat.COMPACTED; public FieldAggregator(DataType dataType) { this.fieldType = dataType; @@ -61,6 +63,10 @@ public Object aggReversed(Object accumulator, Object inputField) { return agg(inputField, accumulator); } + public void setRowFormat(BinaryRowFormat rowFormat) { + this.rowFormat = rowFormat; + } + /** Resets the aggregator to a clean start state. */ public void reset() {} } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldCollectAgg.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldCollectAgg.java new file mode 100644 index 0000000000..ddaaab22a3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldCollectAgg.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.functions; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalArrayUtils; +import org.apache.fluss.types.ArrayType; +import org.apache.fluss.types.DataType; +import org.apache.fluss.types.DataTypeRoot; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** Collect elements into an ARRAY. */ +public class FieldCollectAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + + private final boolean distinct; + private final DataType elementType; + private final DataTypeRoot elementTypeRoot; + private final InternalArray.ElementGetter elementGetter; + + public FieldCollectAgg(ArrayType dataType, boolean distinct) { + super(dataType); + this.distinct = distinct; + this.elementType = dataType.getElementType(); + this.elementTypeRoot = dataType.getElementType().getTypeRoot(); + this.elementGetter = InternalArray.createElementGetter(elementType); + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null && inputField == null) { + return null; + } + + if (accumulator == null) { + return inputField; + } + if (inputField == null) { + return accumulator; + } + + InternalArray accumulatorArray = (InternalArray) accumulator; + InternalArray inputArray = (InternalArray) inputField; + int expectedSize = accumulatorArray.size() + inputArray.size(); + + if (distinct) { + int capacity = (int) (expectedSize / 0.75f) + 1; + if (elementTypeRoot == DataTypeRoot.BINARY || elementTypeRoot == DataTypeRoot.BYTES) { + List collection = new ArrayList<>(expectedSize); + Set seen = new HashSet<>(capacity); + boolean seenNull = false; + seenNull = collectDistinctBinary(collection, seen, seenNull, accumulatorArray); + collectDistinctBinary(collection, seen, seenNull, inputArray); + return new GenericArray(collection.toArray()); + } else { + Set seen = new HashSet<>(capacity); + collectDistinct(seen, accumulatorArray); + collectDistinct(seen, inputArray); + return new GenericArray(seen.toArray()); + } + } + + // No distinct: encode directly to binary to avoid extra heap allocations/copies. + return InternalArrayUtils.concatToBinaryArray( + accumulatorArray, inputArray, elementType, rowFormat); + } + + @Override + public Object aggReversed(Object accumulator, Object inputField) { + // For collect, order doesn't matter for distinct sets + // For non-distinct, we just append, so reversed is same as normal + return agg(accumulator, inputField); + } + + private void collectDistinct(Set seen, InternalArray array) { + for (int i = 0; i < array.size(); i++) { + Object element = elementGetter.getElementOrNull(array, i); + seen.add(element); + } + } + + private boolean collectDistinctBinary( + List collection, + Set seen, + boolean seenNull, + InternalArray array) { + for (int i = 0; i < array.size(); i++) { + Object element = elementGetter.getElementOrNull(array, i); + if (element == null) { + if (!seenNull) { + seenNull = true; + collection.add(null); + } + continue; + } + byte[] bytes = (byte[]) element; + if (seen.add(new ByteArrayWrapper(bytes))) { + collection.add(bytes); + } + } + return seenNull; + } + + private static final class ByteArrayWrapper { + private final byte[] bytes; + + private ByteArrayWrapper(byte[] bytes) { + this.bytes = bytes; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ByteArrayWrapper)) { + return false; + } + ByteArrayWrapper other = (ByteArrayWrapper) obj; + return Arrays.equals(bytes, other.bytes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(bytes); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldMergeMapAgg.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldMergeMapAgg.java new file mode 100644 index 0000000000..a378fdf852 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/rowmerger/aggregate/functions/FieldMergeMapAgg.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.kv.rowmerger.aggregate.functions; + +/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache + * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for + * additional information regarding copyright ownership. */ + +import org.apache.fluss.row.GenericMap; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalMap; +import org.apache.fluss.types.MapType; + +import java.util.HashMap; +import java.util.Map; + +/** Merge two maps. */ +public class FieldMergeMapAgg extends FieldAggregator { + + private static final long serialVersionUID = 1L; + private final InternalArray.ElementGetter keyGetter; + private final InternalArray.ElementGetter valueGetter; + + public FieldMergeMapAgg(MapType dataType) { + super(dataType); + this.keyGetter = InternalArray.createElementGetter(dataType.getKeyType()); + this.valueGetter = InternalArray.createElementGetter(dataType.getValueType()); + } + + @Override + public Object agg(Object accumulator, Object inputField) { + if (accumulator == null || inputField == null) { + return accumulator == null ? inputField : accumulator; + } + + InternalMap accumulatorMap = (InternalMap) accumulator; + InternalMap inputMap = (InternalMap) inputField; + if (accumulatorMap.size() == 0) { + return inputField; + } + if (inputMap.size() == 0) { + return accumulator; + } + + int expectedSize = accumulatorMap.size() + inputMap.size(); + int capacity = (int) (expectedSize / 0.75f) + 1; + Map resultMap = new HashMap<>(capacity); + putToMap(resultMap, accumulatorMap); + putToMap(resultMap, inputMap); + return new GenericMap(resultMap); + } + + private void putToMap(Map map, InternalMap mapData) { + InternalArray keyArray = mapData.keyArray(); + InternalArray valueArray = mapData.valueArray(); + for (int i = 0; i < keyArray.size(); i++) { + Object key = keyGetter.getElementOrNull(keyArray, i); + Object value = valueGetter.getElementOrNull(valueArray, i); + map.put(key, value); + } + } +} diff --git a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory index f3a27da5f7..a4feec83f8 100644 --- a/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory +++ b/fluss-server/src/main/resources/META-INF/services/org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory @@ -28,3 +28,5 @@ org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolAndAggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldBoolOrAggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldListaggAggFactory org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldStringAggFactory +org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldCollectAggFactory +org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldMergeMapAggFactory diff --git a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java index 37a4a3d46a..62fe457ba5 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/rowmerger/aggregate/FieldAggregatorParameterizedTest.java @@ -25,7 +25,14 @@ import org.apache.fluss.record.BinaryValue; import org.apache.fluss.record.TestingSchemaGetter; import org.apache.fluss.row.BinaryRow; +import org.apache.fluss.row.BinaryString; import org.apache.fluss.row.Decimal; +import org.apache.fluss.row.GenericArray; +import org.apache.fluss.row.GenericMap; +import org.apache.fluss.row.InternalArray; +import org.apache.fluss.row.InternalArrayUtils; +import org.apache.fluss.row.InternalMap; +import org.apache.fluss.row.InternalMapUtils; import org.apache.fluss.row.TimestampLtz; import org.apache.fluss.row.TimestampNtz; import org.apache.fluss.server.kv.rowmerger.AggregateRowMerger; @@ -44,6 +51,9 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.stream.Stream; import static org.apache.fluss.testutils.DataTestUtils.compactedRow; @@ -624,6 +634,140 @@ void testBoolOrWithNull() { assertThat(merged.row.getBoolean(1)).isFalse(); } + // =================================================================================== + // Collection Aggregation Tests + // =================================================================================== + + @Test + void testCollectAggregationNonDistinct() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column( + "values", + DataTypes.ARRAY(DataTypes.INT()), + AggFunctions.COLLECT(false)) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + BinaryRow row1 = compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(1, 2)}); + BinaryRow row2 = + compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(2, null, 3)}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + InternalArray resultArray = merged.row.getArray(1); + Object[] actual = InternalArrayUtils.toObjectArray(resultArray, DataTypes.INT()); + assertThat(actual).containsExactly(1, 2, 2, null, 3); + } + + @Test + void testCollectAggregationDistinct() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column( + "values", + DataTypes.ARRAY(DataTypes.INT()), + AggFunctions.COLLECT(true)) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + BinaryRow row1 = + compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(1, 2, null)}); + BinaryRow row2 = + compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(2, 3, null)}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + InternalArray resultArray = merged.row.getArray(1); + Object[] actual = InternalArrayUtils.toObjectArray(resultArray, DataTypes.INT()); + assertThat(actual).containsExactlyInAnyOrder(1, 2, 3, null); + } + + @Test + void testCollectAggregationDistinctBytes() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column( + "values", + DataTypes.ARRAY(DataTypes.BYTES()), + AggFunctions.COLLECT(true)) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + byte[] v1 = new byte[] {1, 2}; + byte[] v2 = new byte[] {3}; + byte[] v3 = new byte[] {1, 2}; + byte[] v4 = new byte[] {4}; + + BinaryRow row1 = + compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(v1, v2)}); + BinaryRow row2 = + compactedRow(schema.getRowType(), new Object[] {1, GenericArray.of(v3, null, v4)}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + InternalArray resultArray = merged.row.getArray(1); + Object[] actual = InternalArrayUtils.toObjectArray(resultArray, DataTypes.BYTES()); + + // Null values are included in collect, distinct keeps a single null. + assertThat(actual).hasSize(4); + assertThat(Arrays.toString((byte[]) actual[0])).isEqualTo(Arrays.toString(v1)); + assertThat(Arrays.toString((byte[]) actual[1])).isEqualTo(Arrays.toString(v2)); + assertThat(actual[2]).isNull(); + assertThat(Arrays.toString((byte[]) actual[3])).isEqualTo(Arrays.toString(v4)); + } + + @Test + void testMergeMapAggregation() { + Schema schema = + Schema.newBuilder() + .column("id", DataTypes.INT()) + .column( + "attributes", + DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), + AggFunctions.MERGE_MAP()) + .primaryKey("id") + .build(); + + TableConfig tableConfig = new TableConfig(new Configuration()); + AggregateRowMerger merger = createMerger(schema, tableConfig); + + BinaryRow row1 = + compactedRow( + schema.getRowType(), + new Object[] { + 1, + GenericMap.of( + 1, + BinaryString.fromString("a"), + 2, + BinaryString.fromString("b")) + }); + BinaryRow row2 = + compactedRow( + schema.getRowType(), + new Object[] {1, GenericMap.of(2, BinaryString.fromString("c"), 3, null)}); + + BinaryValue merged = merger.merge(toBinaryValue(row1), toBinaryValue(row2)); + InternalMap resultMap = merged.row.getMap(1); + Map actual = + InternalMapUtils.toJavaMap(resultMap, DataTypes.INT(), DataTypes.STRING()); + Map expected = new HashMap<>(); + expected.put(1, "a"); + expected.put(2, "c"); + expected.put(3, null); + assertThat(actual).isEqualTo(expected); + } + // =================================================================================== // Listagg Aggregation Tests // =================================================================================== diff --git a/website/docs/table-design/merge-engines/aggregation.md b/website/docs/table-design/merge-engines/aggregation.md index c416c8a399..c609280b14 100644 --- a/website/docs/table-design/merge-engines/aggregation.md +++ b/website/docs/table-design/merge-engines/aggregation.md @@ -934,6 +934,113 @@ TableDescriptor.builder() +### collect + +Collects array elements into a single array. + +- **Supported Data Types**: `ARRAY` +- **Behavior**: Concatenates array elements; with `distinct=true`, duplicates are removed +- **Null Handling**: Null elements are retained; null inputs are ignored +- **Parameters**: `distinct` (`true` or `false`, default is `false`) + +**Example:** + + + +```sql +CREATE TABLE test_collect ( + id BIGINT, + tags ARRAY, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'table.merge-engine' = 'aggregation', + 'fields.tags.agg' = 'collect', + 'fields.tags.distinct' = 'true' +); + +INSERT INTO test_collect VALUES + (1, ARRAY[1, 2, 2]), + (1, ARRAY[2, 3, NULL]); + +SELECT * FROM test_collect; +-- Result (order is not guaranteed with distinct): +-- (1, [1, 2, 3, NULL]) +``` + + + + +```java +Schema schema = Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("tags", DataTypes.ARRAY(DataTypes.INT()), AggFunctions.COLLECT(true)) + .primaryKey("id") + .build(); + +TableDescriptor.builder() + .schema(schema) + .property("table.merge-engine", "aggregation") + .build(); + +// Input: (1, [1, 2, 2]), (1, [2, 3, null]) +// Result: (1, [1, 2, 3, null]) -- order not guaranteed when distinct is true +``` + + + + +### merge_map + +Merges map values by combining key-value pairs. For duplicate keys, the latest value wins. + +- **Supported Data Types**: `MAP` +- **Behavior**: Merges maps, overriding earlier values for duplicate keys +- **Null Handling**: Null keys are not allowed; null values are allowed + +**Example:** + + + +```sql +CREATE TABLE test_merge_map ( + id BIGINT, + attributes MAP, + PRIMARY KEY (id) NOT ENFORCED +) WITH ( + 'table.merge-engine' = 'aggregation', + 'fields.attributes.agg' = 'merge_map' +); + +INSERT INTO test_merge_map VALUES + (1, MAP[1, 'a', 2, 'b']), + (1, MAP[2, 'c', 3, NULL]); + +SELECT * FROM test_merge_map; +-- Result: (1, {1='a', 2='c', 3=NULL}) +``` + + + + +```java +Schema schema = Schema.newBuilder() + .column("id", DataTypes.BIGINT()) + .column("attributes", DataTypes.MAP(DataTypes.INT(), DataTypes.STRING()), AggFunctions.MERGE_MAP()) + .primaryKey("id") + .build(); + +TableDescriptor.builder() + .schema(schema) + .property("table.merge-engine", "aggregation") + .build(); + +// Input: (1, {1='a', 2='b'}), (1, {2='c', 3=null}) +// Result: (1, {1='a', 2='c', 3=null}) +``` + + + + ## Delete Behavior The aggregation merge engine provides limited support for delete operations. You can configure the behavior using the `'table.delete.behavior'` option: