Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ set(DUCKDB_SRC_FILES
src/duckdb/ub_src_main.cpp
src/duckdb/ub_src_main_buffered_data.cpp
src/duckdb/ub_src_main_chunk_scan_state.cpp
src/duckdb/ub_src_main_profiler.cpp
src/duckdb/ub_src_main_relation.cpp
src/duckdb/ub_src_main_secret.cpp
src/duckdb/ub_src_main_settings.cpp
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
#include "duckdb/common/vector_operations/vector_operations.hpp"
#include "duckdb/planner/expression/bound_aggregate_expression.hpp"
#include "duckdb/function/aggregate/distributive_function_utils.hpp"
#include "duckdb/function/aggregate_state_layout.hpp"
#include "duckdb/function/function_set.hpp"

namespace duckdb {

namespace {

struct ProductState {
static constexpr const char *STATE_NAMES[] = {"empty", "val"};
using STATE_TYPE = StructStateType<bool, double>;
using STATE_TYPE = OptionalStateType<double>;

bool empty;
double val;
bool is_set;
};

struct ProductReduce {
Expand Down
14 changes: 7 additions & 7 deletions src/duckdb/extension/core_functions/aggregate/holistic/mad.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ template <typename MEDIAN_TYPE>
struct MedianAbsoluteDeviationOperation : QuantileOperation {
template <class T, class STATE>
static void Finalize(STATE &state, T &target, AggregateFinalizeData &finalize_data) {
if (state.v.empty()) {
if (state.linked_list.total_capacity == 0) {
finalize_data.ReturnNull();
return;
}
Expand All @@ -183,11 +183,12 @@ struct MedianAbsoluteDeviationOperation : QuantileOperation {
auto &bind_data = finalize_data.input.bind_data->Cast<QuantileBindData>();
D_ASSERT(bind_data.quantiles.size() == 1);
const auto &q = bind_data.quantiles[0];
QuantileInterpolator<false> interp(q, state.v.size(), false);
const auto med = interp.template Operation<INPUT_TYPE, MEDIAN_TYPE>(state.v.data(), finalize_data.result);
auto &flattened = FlattenedQuantileValues<INPUT_TYPE>::Flatten(finalize_data, state.linked_list);
QuantileInterpolator<false> interp(q, state.linked_list.total_capacity, false);
const auto med = interp.template Operation<INPUT_TYPE, MEDIAN_TYPE>(flattened.Data(), finalize_data.result);

MadAccessor<INPUT_TYPE, T, MEDIAN_TYPE> accessor(med);
target = interp.template Operation<INPUT_TYPE, T>(state.v.data(), finalize_data.result, accessor);
target = interp.template Operation<INPUT_TYPE, T>(flattened.Data(), finalize_data.result, accessor);
}

template <class STATE, class INPUT_TYPE, class RESULT_TYPE>
Expand Down Expand Up @@ -269,10 +270,9 @@ unique_ptr<FunctionData> BindMAD(BindAggregateFunctionInput &input) {
template <typename INPUT_TYPE, typename MEDIAN_TYPE, typename TARGET_TYPE>
AggregateFunction GetTypedMedianAbsoluteDeviationAggregateFunction(const LogicalType &input_type,
const LogicalType &target_type) {
using STATE = QuantileState<INPUT_TYPE, QuantileStandardType>;
using STATE = QuantileState<INPUT_TYPE>;
using OP = MedianAbsoluteDeviationOperation<MEDIAN_TYPE>;
auto fun = AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, TARGET_TYPE, OP,
AggregateDestructorType::LEGACY>(input_type, target_type);
auto fun = QuantileBufferingAggregate<STATE, TARGET_TYPE, OP>(input_type, target_type);
fun.SetBindCallback(BindMAD);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
Expand Down
102 changes: 50 additions & 52 deletions src/duckdb/extension/core_functions/aggregate/holistic/quantile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
#include "duckdb/common/types/timestamp.hpp"
#include "duckdb/common/serializer/serializer.hpp"
#include "duckdb/common/serializer/deserializer.hpp"
#include "duckdb/function/aggregate/sort_key_helpers.hpp"
#include "duckdb/function/aggregate/list_aggregate.hpp"
#include "duckdb/function/create_sort_key.hpp"

namespace duckdb {

Expand Down Expand Up @@ -155,19 +156,20 @@ string_t QuantileCast::Operation(const string_t &src, Vector &result) {
//===--------------------------------------------------------------------===//
// Scalar Quantile
//===--------------------------------------------------------------------===//
template <bool DISCRETE, class TYPE_OP = QuantileStandardType>
template <bool DISCRETE>
struct QuantileScalarOperation : public QuantileOperation {
template <class T, class STATE>
static void Finalize(STATE &state, T &target, AggregateFinalizeData &finalize_data) {
if (state.v.empty()) {
if (state.linked_list.total_capacity == 0) {
finalize_data.ReturnNull();
return;
}
D_ASSERT(finalize_data.input.bind_data);
auto &bind_data = finalize_data.input.bind_data->Cast<QuantileBindData>();
D_ASSERT(bind_data.quantiles.size() == 1);
QuantileInterpolator<DISCRETE> interp(bind_data.quantiles[0], state.v.size(), bind_data.desc);
target = interp.template Operation<typename STATE::InputType, T>(state.v.data(), finalize_data.result);
auto &flattened = FlattenedQuantileValues<typename STATE::InputType>::Flatten(finalize_data, state.linked_list);
QuantileInterpolator<DISCRETE> interp(bind_data.quantiles[0], state.linked_list.total_capacity, bind_data.desc);
target = interp.template Operation<typename STATE::InputType, T>(flattened.Data(), finalize_data.result);
}

template <class STATE, class INPUT_TYPE, class RESULT_TYPE>
Expand Down Expand Up @@ -226,23 +228,34 @@ struct QuantileScalarOperation : public QuantileOperation {
}
};

//! Buffers the sort key of each input row in the state's linked list - NULL inputs are skipped
static void QuantileSortKeyUpdate(Vector inputs[], AggregateInputData &aggr_input_data, idx_t input_count,
Vector &states, idx_t count) {
D_ASSERT(input_count == 1);
Vector sort_keys(LogicalType::BLOB);
const OrderModifiers modifiers(OrderType::ASCENDING, OrderByNullType::NULLS_LAST);
CreateSortKeyHelpers::CreateSortKeyWithValidity(inputs[0], sort_keys, modifiers, count);
ListUpdateFunction<true>(&sort_keys, aggr_input_data, 1, states, count);
}

struct QuantileScalarFallback : QuantileOperation {
template <class INPUT_TYPE, class STATE, class OP>
static void Execute(STATE &state, const INPUT_TYPE &key, AggregateInputData &input_data) {
state.AddElement(key, input_data);
//! The fallback buffers sort keys instead of the input values
static LogicalType GetElementType(AggregateInputData &) {
return LogicalType::BLOB;
}

template <class STATE>
static void Finalize(STATE &state, AggregateFinalizeData &finalize_data) {
if (state.v.empty()) {
if (state.linked_list.total_capacity == 0) {
finalize_data.ReturnNull();
return;
}
D_ASSERT(finalize_data.input.bind_data);
auto &bind_data = finalize_data.input.bind_data->Cast<QuantileBindData>();
D_ASSERT(bind_data.quantiles.size() == 1);
QuantileInterpolator<true> interp(bind_data.quantiles[0], state.v.size(), bind_data.desc);
auto interpolation_result = interp.InterpolateInternal<string_t>(state.v.data());
auto &flattened = FlattenedQuantileValues<string_t>::Flatten(finalize_data, state.linked_list);
QuantileInterpolator<true> interp(bind_data.quantiles[0], state.linked_list.total_capacity, bind_data.desc);
auto interpolation_result = interp.InterpolateInternal<string_t>(flattened.Data());
CreateSortKeyHelpers::DecodeSortKey(interpolation_result, finalize_data.result, finalize_data.result_idx,
OrderModifiers(OrderType::ASCENDING, OrderByNullType::NULLS_LAST));
}
Expand All @@ -255,7 +268,7 @@ template <class CHILD_TYPE, bool DISCRETE>
struct QuantileListOperation : QuantileOperation {
template <class T, class STATE>
static void Finalize(STATE &state, T &target, AggregateFinalizeData &finalize_data) {
if (state.v.empty()) {
if (state.linked_list.total_capacity == 0) {
finalize_data.ReturnNull();
return;
}
Expand All @@ -268,15 +281,16 @@ struct QuantileListOperation : QuantileOperation {
ListVector::Reserve(finalize_data.result, ridx + bind_data.quantiles.size());
auto rdata = FlatVector::GetDataMutable<CHILD_TYPE>(result);

auto v_t = state.v.data();
auto &flattened = FlattenedQuantileValues<typename STATE::InputType>::Flatten(finalize_data, state.linked_list);
auto v_t = flattened.Data();
D_ASSERT(v_t);

auto &entry = target;
entry.offset = ridx;
idx_t lower = 0;
for (const auto &q : bind_data.order) {
const auto &quantile = bind_data.quantiles[q];
QuantileInterpolator<DISCRETE> interp(quantile, state.v.size(), bind_data.desc);
QuantileInterpolator<DISCRETE> interp(quantile, state.linked_list.total_capacity, bind_data.desc);
interp.begin = lower;
rdata[ridx + q] = interp.template Operation<typename STATE::InputType, CHILD_TYPE>(v_t, result);
lower = interp.FRN;
Expand Down Expand Up @@ -335,14 +349,14 @@ struct QuantileListOperation : QuantileOperation {
};

struct QuantileListFallback : QuantileOperation {
template <class INPUT_TYPE, class STATE, class OP>
static void Execute(STATE &state, const INPUT_TYPE &key, AggregateInputData &input_data) {
state.AddElement(key, input_data);
//! The fallback buffers sort keys instead of the input values
static LogicalType GetElementType(AggregateInputData &) {
return LogicalType::BLOB;
}

template <class T, class STATE>
static void Finalize(STATE &state, T &target, AggregateFinalizeData &finalize_data) {
if (state.v.empty()) {
if (state.linked_list.total_capacity == 0) {
finalize_data.ReturnNull();
return;
}
Expand All @@ -354,16 +368,16 @@ struct QuantileListFallback : QuantileOperation {
auto ridx = ListVector::GetListSize(finalize_data.result);
ListVector::Reserve(finalize_data.result, ridx + bind_data.quantiles.size());

D_ASSERT(state.v.data());
auto &flattened = FlattenedQuantileValues<string_t>::Flatten(finalize_data, state.linked_list);

auto &entry = target;
entry.offset = ridx;
idx_t lower = 0;
for (const auto &q : bind_data.order) {
const auto &quantile = bind_data.quantiles[q];
QuantileInterpolator<true> interp(quantile, state.v.size(), bind_data.desc);
QuantileInterpolator<true> interp(quantile, state.linked_list.total_capacity, bind_data.desc);
interp.begin = lower;
auto interpolation_result = interp.InterpolateInternal<string_t>(state.v.data());
auto interpolation_result = interp.InterpolateInternal<string_t>(flattened.Data());
CreateSortKeyHelpers::DecodeSortKey(interpolation_result, result, ridx + q,
OrderModifiers(OrderType::ASCENDING, OrderByNullType::NULLS_LAST));
lower = interp.FRN;
Expand Down Expand Up @@ -398,20 +412,19 @@ AggregateFunction GetDiscreteQuantileTemplated(const LogicalType &type) {
case PhysicalType::INTERVAL:
return OP::template GetFunction<interval_t>(type);
case PhysicalType::VARCHAR:
return OP::template GetFunction<string_t, QuantileStringType>(type);
return OP::template GetFunction<string_t>(type);
#endif
default:
return OP::GetFallback(type);
}
}

struct ScalarDiscreteQuantile {
template <typename INPUT_TYPE, class TYPE_OP = QuantileStandardType>
template <typename INPUT_TYPE>
static AggregateFunction GetFunction(const LogicalType &type) {
using STATE = QuantileState<INPUT_TYPE, TYPE_OP>;
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileScalarOperation<true>;
auto fun = AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, INPUT_TYPE, OP,
AggregateDestructorType::LEGACY>(type, type);
auto fun = QuantileBufferingAggregate<STATE, INPUT_TYPE, OP>(type, type);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::Window<STATE, INPUT_TYPE, INPUT_TYPE>);
fun.SetWindowInitCallback(OP::WindowInit<STATE, INPUT_TYPE>);
Expand All @@ -420,36 +433,24 @@ struct ScalarDiscreteQuantile {
}

static AggregateFunction GetFallback(const LogicalType &type) {
using STATE = QuantileState<string_t, QuantileStringType>;
using STATE = QuantileState<string_t>;
using OP = QuantileScalarFallback;

AggregateFunction fun({type}, type, AggregateFunction::StateSize<STATE>,
AggregateFunction::StateInitialize<STATE, OP, AggregateDestructorType::LEGACY>,
AggregateSortKeyHelpers::UnaryUpdate<STATE, OP>,
AggregateFunction::StateCombine<STATE, OP>,
QuantileSortKeyUpdate, ListCombineFunction<OP>,
AggregateFunction::StateVoidFinalize<STATE, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
return fun;
}
};

template <class STATE, class INPUT_TYPE, class RESULT_TYPE, class OP>
static AggregateFunction QuantileListAggregate(const LogicalType &input_type, const LogicalType &child_type) { // NOLINT
LogicalType result_type = LogicalType::LIST(child_type);
return AggregateFunction(
{input_type}, result_type, AggregateFunction::StateSize<STATE>,
AggregateFunction::StateInitialize<STATE, OP, AggregateDestructorType::LEGACY>,
AggregateFunction::UnaryScatterUpdate<STATE, INPUT_TYPE, OP>, AggregateFunction::StateCombine<STATE, OP>,
AggregateFunction::StateFinalize<STATE, RESULT_TYPE, OP>, FunctionNullHandling::DEFAULT_NULL_HANDLING,
AggregateFunction::NoClusterUpdate(), AggregateFunction::NoBind(), AggregateFunction::StateDestroy<STATE, OP>);
}

struct ListDiscreteQuantile {
template <typename INPUT_TYPE, class TYPE_OP = QuantileStandardType>
template <typename INPUT_TYPE>
static AggregateFunction GetFunction(const LogicalType &type) {
using STATE = QuantileState<INPUT_TYPE, TYPE_OP>;
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileListOperation<INPUT_TYPE, true>;
auto fun = QuantileListAggregate<STATE, INPUT_TYPE, list_entry_t, OP>(type, type);
auto fun = QuantileBufferingAggregate<STATE, list_entry_t, OP>(type, LogicalType::LIST(type));
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, list_entry_t>);
Expand All @@ -459,13 +460,12 @@ struct ListDiscreteQuantile {
}

static AggregateFunction GetFallback(const LogicalType &type) {
using STATE = QuantileState<string_t, QuantileStringType>;
using STATE = QuantileState<string_t>;
using OP = QuantileListFallback;

AggregateFunction fun({type}, LogicalType::LIST(type), AggregateFunction::StateSize<STATE>,
AggregateFunction::StateInitialize<STATE, OP, AggregateDestructorType::LEGACY>,
AggregateSortKeyHelpers::UnaryUpdate<STATE, OP>,
AggregateFunction::StateCombine<STATE, OP>,
QuantileSortKeyUpdate, ListCombineFunction<OP>,
AggregateFunction::StateFinalize<STATE, list_entry_t, OP>, nullptr, nullptr,
AggregateFunction::StateDestroy<STATE, OP>);
return fun;
Expand Down Expand Up @@ -539,11 +539,9 @@ AggregateFunction GetContinuousQuantileTemplated(const LogicalType &type) {
struct ScalarContinuousQuantile {
template <typename INPUT_TYPE, typename TARGET_TYPE>
static AggregateFunction GetFunction(const LogicalType &input_type, const LogicalType &target_type) {
using STATE = QuantileState<INPUT_TYPE, QuantileStandardType>;
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileScalarOperation<false>;
auto fun =
AggregateFunction::UnaryAggregateDestructor<STATE, INPUT_TYPE, TARGET_TYPE, OP,
AggregateDestructorType::LEGACY>(input_type, target_type);
auto fun = QuantileBufferingAggregate<STATE, TARGET_TYPE, OP>(input_type, target_type);
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, TARGET_TYPE>);
Expand All @@ -556,9 +554,9 @@ struct ScalarContinuousQuantile {
struct ListContinuousQuantile {
template <typename INPUT_TYPE, typename TARGET_TYPE>
static AggregateFunction GetFunction(const LogicalType &input_type, const LogicalType &target_type) {
using STATE = QuantileState<INPUT_TYPE, QuantileStandardType>;
using STATE = QuantileState<INPUT_TYPE>;
using OP = QuantileListOperation<TARGET_TYPE, false>;
auto fun = QuantileListAggregate<STATE, INPUT_TYPE, list_entry_t, OP>(input_type, target_type);
auto fun = QuantileBufferingAggregate<STATE, list_entry_t, OP>(input_type, LogicalType::LIST(target_type));
fun.SetOrderDependent(AggregateOrderDependent::NOT_ORDER_DEPENDENT);
#ifndef DUCKDB_SMALLER_BINARY
fun.SetWindowBatchCallback(OP::template Window<STATE, INPUT_TYPE, list_entry_t>);
Expand Down
Loading
Loading