Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class AggFunction implements Serializable {
parameters == null || parameters.isEmpty()
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(parameters));
validate();
}

/**
Expand Down Expand Up @@ -94,6 +95,28 @@ public String getParameter(String key) {
return parameters.get(key);
}

/**
* Gets a boolean parameter value with strict true/false parsing.
*
* @param key the parameter key
* @param defaultValue the default value if parameter is not present
* @return the parsed boolean value or defaultValue if missing
* @throws IllegalArgumentException if the parameter value is not "true" or "false"
*/
public boolean getBooleanParameter(String key, boolean defaultValue) {
String value = parameters.get(key);
if (value == null) {
return defaultValue;
}
if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
return Boolean.parseBoolean(value);
}
throw new IllegalArgumentException(
String.format(
"Parameter '%s' for aggregation function '%s' must be 'true' or 'false'",
key, type));
}

/**
* Checks if this function has any parameters.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,18 @@ public enum AggFunctionType {

// Boolean aggregation
BOOL_AND,
BOOL_OR;
BOOL_OR,

// Collection aggregation
COLLECT,
MERGE_MAP;

/** Parameter name for delimiter used in LISTAGG and STRING_AGG functions. */
public static final String PARAM_DELIMITER = "delimiter";

/** Parameter name for distinct flag used in COLLECT function. */
public static final String PARAM_DISTINCT = "distinct";

/**
* Returns the set of supported parameter names for this aggregation function.
*
Expand All @@ -65,6 +72,9 @@ public Set<String> getSupportedParameters() {
case STRING_AGG:
// LISTAGG and STRING_AGG support optional "delimiter" parameter
return Collections.singleton(PARAM_DELIMITER);
case COLLECT:
// COLLECT supports optional "distinct" parameter
return Collections.singleton(PARAM_DISTINCT);
default:
// All other functions do not accept parameters
return Collections.emptySet();
Expand Down Expand Up @@ -105,6 +115,18 @@ public void validateParameter(String parameterName, String parameterValue) {
}
}
break;
case COLLECT:
if (PARAM_DISTINCT.equals(parameterName)) {
if (parameterValue == null
|| (!parameterValue.equalsIgnoreCase("true")
&& !parameterValue.equalsIgnoreCase("false"))) {
throw new IllegalArgumentException(
String.format(
"Parameter '%s' for aggregation function '%s' must be 'true' or 'false'",
parameterName, this));
}
}
break;
default:
// No validation needed for other functions (they don't have parameters)
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,56 @@ public static AggFunction BOOL_OR() {
return new AggFunction(AggFunctionType.BOOL_OR, null);
}

// ===================================================================================
// Collection Aggregation Functions
// ===================================================================================

/**
* Creates a COLLECT aggregation function that collects multiple elements into an array.
*
* <p>Supported data types: ARRAY&lt;T&gt;
*
* <p>Null handling: Null values are included in the collection
*
* @return a COLLECT aggregation function
*/
public static AggFunction COLLECT() {
return new AggFunction(AggFunctionType.COLLECT, null);
}

/**
* Creates a COLLECT aggregation function with distinct behavior.
*
* <p>Collects multiple elements into an array, optionally removing duplicates.
*
* <p>Supported data types: ARRAY&lt;T&gt;
*
* <p>Null handling: Null values are included in the collection
*
* @param distinct whether to remove duplicates
* @return a COLLECT aggregation function
*/
public static AggFunction COLLECT(boolean distinct) {
Map<String, String> params = new HashMap<>();
params.put("distinct", String.valueOf(distinct));
return new AggFunction(AggFunctionType.COLLECT, params);
}

/**
* Creates a MERGE_MAP aggregation function that merges multiple maps.
*
* <p>Merges maps by combining their key-value pairs. For duplicate keys, the latest value wins.
*
* <p>Supported data types: MAP&lt;K, V&gt;
*
* <p>Null handling: Null keys are not allowed; null values are allowed
*
* @return a MERGE_MAP aggregation function
*/
public static AggFunction MERGE_MAP() {
return new AggFunction(AggFunctionType.MERGE_MAP, null);
}

// ===================================================================================
// Internal Factory Methods
// ===================================================================================
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
import org.apache.fluss.row.array.AlignedArray;
import org.apache.fluss.row.array.CompactedArray;
import org.apache.fluss.row.array.IndexedArray;
import org.apache.fluss.row.serializer.ArraySerializer;
import org.apache.fluss.types.DataType;

/** Utility methods for {@link InternalArray}. */
public final class InternalArrayUtils {

private InternalArrayUtils() {}

public static Object[] toObjectArray(InternalArray array, DataType elementType) {
Object[] result = new Object[array.size()];
InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementType);
for (int i = 0; i < array.size(); i++) {
Object element = elementGetter.getElementOrNull(array, i);
if (element instanceof BinaryString) {
element = element.toString();
}
result[i] = element;
}
return result;
}

public static BinaryArray toBinaryArray(
InternalArray array, DataType elementType, BinaryRowFormat rowFormat) {
ArraySerializer serializer = new ArraySerializer(elementType, rowFormat);
return serializer.toBinaryArray(array);
}

public static BinaryArray concatToBinaryArray(
InternalArray left,
InternalArray right,
DataType elementType,
BinaryRowFormat rowFormat) {
if (left == null && right == null) {
return null;
}
if (left == null) {
return toBinaryArray(right, elementType, rowFormat);
}
if (right == null) {
return toBinaryArray(left, elementType, rowFormat);
}
if (left.size() == 0) {
return toBinaryArray(right, elementType, rowFormat);
}
if (right.size() == 0) {
return toBinaryArray(left, elementType, rowFormat);
}

int totalSize = left.size() + right.size();
BinaryArray result;
switch (rowFormat) {
case COMPACTED:
result = new CompactedArray(elementType);
break;
case INDEXED:
result = new IndexedArray(elementType);
break;
case ALIGNED:
result = new AlignedArray();
break;
default:
throw new IllegalArgumentException("Unsupported row format: " + rowFormat);
}
BinaryArrayWriter writer =
new BinaryArrayWriter(
result, totalSize, BinaryArray.calculateFixLengthPartSize(elementType));
InternalArray.ElementGetter elementGetter = InternalArray.createElementGetter(elementType);
BinaryWriter.ValueWriter valueWriter =
BinaryWriter.createValueWriter(elementType, rowFormat);
int leftSize = left.size();
for (int i = 0; i < leftSize; i++) {
if (left.isNullAt(i)) {
writer.setNullAt(i, elementType);
} else {
valueWriter.writeValue(writer, i, elementGetter.getElementOrNull(left, i));
}
}
for (int i = 0; i < right.size(); i++) {
int pos = leftSize + i;
if (right.isNullAt(i)) {
writer.setNullAt(pos, elementType);
} else {
valueWriter.writeValue(writer, pos, elementGetter.getElementOrNull(right, i));
}
}
writer.complete();
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.types.DataType;

import java.util.HashMap;
import java.util.Map;

/** Utility methods for {@link InternalMap}. */
public final class InternalMapUtils {

private InternalMapUtils() {}

public static Map<Object, Object> toJavaMap(
InternalMap map, DataType keyType, DataType valueType) {
Map<Object, Object> result = new HashMap<>();
InternalArray keyArray = map.keyArray();
InternalArray valueArray = map.valueArray();
InternalArray.ElementGetter keyGetter = InternalArray.createElementGetter(keyType);
InternalArray.ElementGetter valueGetter = InternalArray.createElementGetter(valueType);
for (int i = 0; i < keyArray.size(); i++) {
Object key = keyGetter.getElementOrNull(keyArray, i);
Object value = valueGetter.getElementOrNull(valueArray, i);
if (key instanceof BinaryString) {
key = key.toString();
}
if (value instanceof BinaryString) {
value = value.toString();
}
result.put(key, value);
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.row;

import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
import org.apache.fluss.row.array.CompactedArray;
import org.apache.fluss.types.DataTypes;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link InternalArrayUtils}. */
public class InternalArrayUtilsTest {

@Test
public void testToObjectArrayConvertsBinaryString() {
GenericArray input =
new GenericArray(
new Object[] {BinaryString.fromString("a"), BinaryString.fromString("b")});

Object[] result = InternalArrayUtils.toObjectArray(input, DataTypes.STRING());

assertThat(result).containsExactly("a", "b");
}

@Test
public void testConcatToBinaryArrayCompacted() {
InternalArray left = GenericArray.of(1, 2);
InternalArray right = GenericArray.of(3);

BinaryArray result =
InternalArrayUtils.concatToBinaryArray(
left, right, DataTypes.INT(), BinaryRowFormat.COMPACTED);

assertThat(result).isInstanceOf(CompactedArray.class);
assertThat((Integer[]) result.toObjectArray(DataTypes.INT())).containsExactly(1, 2, 3);
}

@Test
public void testConcatToBinaryArrayEmptyLeft() {
InternalArray left = new GenericArray(new Object[0]);
InternalArray right = GenericArray.of(7);

BinaryArray result =
InternalArrayUtils.concatToBinaryArray(
left, right, DataTypes.INT(), BinaryRowFormat.COMPACTED);

assertThat((Integer[]) result.toObjectArray(DataTypes.INT())).containsExactly(7);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.metadata.AggFunctions;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.row.BinaryRow.BinaryRowFormat;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.encode.RowEncoder;
import org.apache.fluss.server.kv.rowmerger.aggregate.factory.FieldAggregatorFactory;
Expand Down Expand Up @@ -220,6 +221,13 @@ public static AggregationContext create(Schema schema, KvFormat kvFormat) {

// Create aggregators
FieldAggregator[] aggregators = createAggregators(schema);
BinaryRowFormat rowFormat =
kvFormat == KvFormat.COMPACTED
? BinaryRowFormat.COMPACTED
: BinaryRowFormat.INDEXED;
for (FieldAggregator aggregator : aggregators) {
aggregator.setRowFormat(rowFormat);
}

// Create row encoder
RowEncoder rowEncoder = RowEncoder.create(kvFormat, rowType);
Expand Down
Loading
Loading