diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index a41b63f07b3e..6706861429a9 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -118,6 +118,17 @@ add_arrow_test(threading-utility-test test_common.cc thread_pool_test.cc) +add_arrow_test(pfor-test + SOURCES + pfor/pfor_test.cc + pfor/pfor.cc + pfor/pfor_wrapper.cc) + +add_arrow_benchmark(pfor/pfor_benchmark + EXTRA_SOURCES + pfor/pfor.cc + pfor/pfor_wrapper.cc) + add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) add_arrow_benchmark(bitmap_reader_benchmark) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc new file mode 100644 index 000000000000..e9e7372e7154 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Core PFOR (Patched Frame of Reference) compression implementation +// +// Adapted from the Snowflake PFOR encoder (PforEncoder.{hpp,cpp}). +// Key differences from the Snowflake implementation: +// - Vector size: 1024 (not 2048) +// - Max exceptions: int16 (not uint8) +// - Exception values: original integers (not FOR offsets) +// - Bit packing: Arrow's BitWriter/unpack (not Snowflake's BitPacker) + +#include "arrow/util/pfor/pfor.h" + +#include +#include +#include +#include + +#include "arrow/util/bit_stream_utils_internal.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/bpacking_internal.h" +#include "arrow/util/endian.h" +#include "arrow/util/logging.h" +#include "arrow/util/macros.h" +#include "arrow/util/span.h" +#include "arrow/util/ubsan.h" + +namespace arrow { +namespace util { +namespace pfor { + +static_assert(ARROW_LITTLE_ENDIAN, + "PFOR serialization assumes little-endian byte order"); + +// ---------------------------------------------------------------------- +// FindOptimalBitWidth: histogram-based cost model + +template +BitWidthResult PforCompression::FindOptimalBitWidth(const UnsignedT* deltas, + int32_t num_elements) { + constexpr uint8_t max_bits = PforTypeTraits::kMaxBitWidth; + constexpr int32_t position_bits = 16; + constexpr int32_t value_bits = sizeof(T) * 8; + + // Build histogram: histogram[b] = count of deltas requiring exactly b bits + std::array histogram{}; + for (int32_t i = 0; i < num_elements; ++i) { + uint8_t bits = PforTypeTraits::BitsRequired(deltas[i]); + histogram[bits]++; + } + + // Evaluate each candidate bit width + int64_t best_cost = std::numeric_limits::max(); + uint8_t best_bit_width = max_bits; + int16_t best_num_exceptions = 0; + + int64_t exceptions_above = num_elements; + + for (uint8_t b = 0; b <= max_bits; ++b) { + exceptions_above -= histogram[b]; + + if (exceptions_above > std::numeric_limits::max()) { + continue; + } + + int64_t packing_cost = static_cast(num_elements) * b; + int64_t exception_cost = exceptions_above * (position_bits + value_bits); + int64_t total_cost = packing_cost + exception_cost; + + if (total_cost < best_cost) { + best_cost = total_cost; + best_bit_width = b; + best_num_exceptions = static_cast(exceptions_above); + } + } + + return {best_bit_width, best_num_exceptions}; +} + +// ---------------------------------------------------------------------- +// EncodeVector + +template +PforEncodedVector PforCompression::EncodeVector(const T* values, + int32_t num_elements) { + ARROW_DCHECK(num_elements > 0); + + // Step 1: Find min (frame of reference) + T min_val = values[0]; + for (int32_t i = 1; i < num_elements; ++i) { + if (values[i] < min_val) min_val = values[i]; + } + + // Step 2: Compute unsigned deltas + const auto unsigned_min = static_cast(min_val); + std::vector deltas(num_elements); + for (int32_t i = 0; i < num_elements; ++i) { + deltas[i] = static_cast(values[i]) - unsigned_min; + } + + // Step 3: Find optimal bit width + auto [bit_width, num_exceptions] = + FindOptimalBitWidth(deltas.data(), num_elements); + + // Step 4: Collect exceptions and replace with placeholder (0) + PforEncodedVector result; + result.info.frame_of_reference = min_val; + result.info.bit_width = bit_width; + result.info.num_exceptions = num_exceptions; + + if (num_exceptions > 0) { + result.exception_positions.reserve(num_exceptions); + result.exception_values.reserve(num_exceptions); + + UnsignedT mask = (bit_width >= PforTypeTraits::kMaxBitWidth) + ? static_cast(-1) + : (static_cast(1) << bit_width) - 1; + + for (int32_t i = 0; i < num_elements; ++i) { + if (deltas[i] > mask) { + result.exception_positions.push_back(static_cast(i)); + result.exception_values.push_back(values[i]); + deltas[i] = 0; + } + } + } + + // Step 5: Bit-pack the deltas + if (bit_width > 0) { + int64_t packed_size = + bit_util::BytesForBits(static_cast(num_elements) * bit_width); + result.packed_values.resize(static_cast(packed_size), 0); + + bit_util::BitWriter writer(result.packed_values.data(), + static_cast(packed_size)); + for (int32_t i = 0; i < num_elements; ++i) { + writer.PutValue(static_cast(deltas[i]), bit_width); + } + writer.Flush(); + } + + return result; +} + +// ---------------------------------------------------------------------- +// DecodeVector + +template +Result PforCompression::DecodeVector(T* values, + arrow::util::span data, + int32_t num_elements) { + // Step 1: Read vector info + ARROW_ASSIGN_OR_RAISE(auto info, PforVectorInfo::Load(data)); + const uint8_t* read_ptr = data.data() + PforVectorInfo::kStoredSize; + + if (info.bit_width > PforTypeTraits::kMaxBitWidth) { + return Status::Invalid("PFOR bit_width out of range: ", info.bit_width); + } + if (info.num_exceptions < 0) { + return Status::Invalid("PFOR num_exceptions negative: ", info.num_exceptions); + } + + // Step 2: Handle constant data (bit_width == 0, no exceptions) + if (info.bit_width == 0 && info.num_exceptions == 0) { + std::fill(values, values + num_elements, info.frame_of_reference); + return PforVectorInfo::kStoredSize; + } + + // Step 3: Unpack bit-packed deltas and add FOR + if (info.bit_width > 0) { + std::vector unsigned_values(num_elements); + const auto unsigned_for = static_cast(info.frame_of_reference); + + // Arrow's unpack handles arbitrary sizes: SIMD for complete batches, + // then unpack_exact for the remainder. + arrow::internal::unpack(read_ptr, unsigned_values.data(), + static_cast(num_elements), info.bit_width); + + // Add FOR and convert to signed output via SafeCopy +#pragma GCC unroll PforConstants::kLoopUnrolls +#pragma GCC ivdep + for (int32_t i = 0; i < num_elements; ++i) { + unsigned_values[i] += unsigned_for; + values[i] = util::SafeCopy(unsigned_values[i]); + } + + int64_t packed_size = + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); + read_ptr += packed_size; + } else { + // bit_width == 0 but has exceptions - fill with FOR + std::fill(values, values + num_elements, info.frame_of_reference); + } + + // Step 4: Patch exceptions (stored as original values) + if (info.num_exceptions > 0) { + const uint8_t* positions_ptr = read_ptr; + read_ptr += info.num_exceptions * sizeof(int16_t); + + const uint8_t* values_ptr = read_ptr; + read_ptr += info.num_exceptions * sizeof(T); + +#pragma GCC unroll PforConstants::kLoopUnrolls +#pragma GCC ivdep + for (int16_t i = 0; i < info.num_exceptions; ++i) { + int16_t pos; + std::memcpy(&pos, positions_ptr + i * sizeof(int16_t), sizeof(int16_t)); + + T value; + std::memcpy(&value, values_ptr + i * sizeof(T), sizeof(T)); + + values[pos] = value; + } + } + + return static_cast(read_ptr - data.data()); +} + +// ---------------------------------------------------------------------- +// Serialization helpers + +// ---------------------------------------------------------------------- +// PforEncodedVectorView::LoadView + +template +Result> PforEncodedVectorView::LoadView( + arrow::util::span data, int32_t num_elements) { + ARROW_ASSIGN_OR_RAISE(auto info, PforVectorInfo::Load(data)); + + PforEncodedVectorView view; + view.info = info; + view.num_elements = num_elements; + + const uint8_t* ptr = data.data() + PforVectorInfo::kStoredSize; + + // packed_values: zero-copy span into the buffer + int64_t packed_size = 0; + if (info.bit_width > 0) { + packed_size = + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); + view.packed_values = arrow::util::span(ptr, packed_size); + ptr += packed_size; + } + + // Exception positions and values: copy into aligned storage + if (info.num_exceptions > 0) { + view.exception_positions.resize(info.num_exceptions); + std::memcpy(view.exception_positions.data(), ptr, + info.num_exceptions * sizeof(int16_t)); + ptr += info.num_exceptions * sizeof(int16_t); + + view.exception_values.resize(info.num_exceptions); + std::memcpy(view.exception_values.data(), ptr, + info.num_exceptions * sizeof(T)); + } + + return view; +} + +template struct PforEncodedVectorView; +template struct PforEncodedVectorView; + +// ---------------------------------------------------------------------- +// Serialization helpers + +template +int64_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec, + int32_t num_elements) { + int64_t size = PforVectorInfo::kStoredSize; + if (vec.info.bit_width > 0) { + size += bit_util::BytesForBits(static_cast(num_elements) * vec.info.bit_width); + } + size += vec.info.num_exceptions * static_cast(sizeof(int16_t)); + size += vec.info.num_exceptions * static_cast(sizeof(T)); + return size; +} + +template +int64_t PforCompression::SerializeVector(const PforEncodedVector& vec, + int32_t num_elements, + arrow::util::span dest) { + uint8_t* write_ptr = dest.data(); + + // Write vector info + vec.info.Store(arrow::util::span(write_ptr, PforVectorInfo::kStoredSize)); + write_ptr += PforVectorInfo::kStoredSize; + + // Write packed values + if (vec.info.bit_width > 0 && !vec.packed_values.empty()) { + std::memcpy(write_ptr, vec.packed_values.data(), vec.packed_values.size()); + write_ptr += vec.packed_values.size(); + } + + // Write exception positions + if (vec.info.num_exceptions > 0) { + std::memcpy(write_ptr, vec.exception_positions.data(), + vec.info.num_exceptions * sizeof(int16_t)); + write_ptr += vec.info.num_exceptions * sizeof(int16_t); + + // Write exception values (original integers) + std::memcpy(write_ptr, vec.exception_values.data(), + vec.info.num_exceptions * sizeof(T)); + write_ptr += vec.info.num_exceptions * sizeof(T); + } + + return static_cast(write_ptr - dest.data()); +} + +// Explicit template instantiations +template class PforCompression; +template class PforCompression; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h new file mode 100644 index 000000000000..d2160bac8cd6 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Core PFOR (Patched Frame of Reference) compression algorithm +// +// PFOR compresses integer columns by: +// 1. Subtracting the minimum value (Frame of Reference) +// 2. Choosing an optimal bit width via a cost model +// 3. Bit-packing the deltas at the chosen width +// 4. Storing outlier values (exceptions) separately + +#pragma once + +#include +#include +#include + +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/pfor/pfor_constants.h" +#include "arrow/util/span.h" + +namespace arrow { +namespace util { +namespace pfor { + +// ---------------------------------------------------------------------- +// Per-vector metadata + +/// \brief PFOR vector metadata stored at the start of each compressed vector. +/// +/// For INT32 (7 bytes): [frame_of_reference(4B)] [bit_width(1B)] [num_exceptions(2B)] +/// For INT64 (11 bytes): [frame_of_reference(8B)] [bit_width(1B)] [num_exceptions(2B)] +template +struct PforVectorInfo { + T frame_of_reference = 0; + uint8_t bit_width = 0; + int16_t num_exceptions = 0; + + /// \brief Store this info to a byte buffer (little-endian) + void Store(arrow::util::span dest) const { + uint8_t* ptr = dest.data(); + std::memcpy(ptr, &frame_of_reference, sizeof(T)); + ptr[sizeof(T)] = bit_width; + std::memcpy(ptr + sizeof(T) + 1, &num_exceptions, sizeof(int16_t)); + } + + /// \brief Load this info from a byte buffer (little-endian) + static Result Load(arrow::util::span src) { + if (src.size() < static_cast(kStoredSize)) { + return Status::Invalid("PFOR vector info buffer too small: ", src.size(), + " < ", kStoredSize); + } + PforVectorInfo info; + const uint8_t* ptr = src.data(); + std::memcpy(&info.frame_of_reference, ptr, sizeof(T)); + info.bit_width = ptr[sizeof(T)]; + std::memcpy(&info.num_exceptions, ptr + sizeof(T) + 1, sizeof(int16_t)); + return info; + } + + /// \brief Serialized size in bytes + static constexpr int64_t kStoredSize = PforTypeTraits::kVectorInfoSize; +}; + +// ---------------------------------------------------------------------- +// Encoded vector representation + +/// \brief A PFOR-encoded vector with all its data sections +template +struct PforEncodedVector { + PforVectorInfo info; + std::vector packed_values; + std::vector exception_positions; + std::vector exception_values; +}; + +// ---------------------------------------------------------------------- +// Zero-copy encoded vector view + +/// \brief A zero-copy view over a serialized PFOR vector +/// +/// The packed_values span points directly into the compressed buffer. +/// Exception positions and values are copied into aligned storage. +template +struct PforEncodedVectorView { + PforVectorInfo info; + int32_t num_elements = 0; + arrow::util::span packed_values; + std::vector exception_positions; + std::vector exception_values; + + /// \brief Create a zero-copy view from a serialized vector buffer + /// + /// \param[in] data span over the serialized vector data + /// \param[in] num_elements number of elements in this vector + /// \return the view, or an error if the buffer is too small + static Result LoadView( + arrow::util::span data, int32_t num_elements); +}; + +// ---------------------------------------------------------------------- +// Cost model result + +/// \brief Result of the optimal bit width search +struct BitWidthResult { + uint8_t bit_width = 0; + int16_t num_exceptions = 0; +}; + +// ---------------------------------------------------------------------- +// Core compression/decompression + +/// \brief PFOR compression and decompression algorithms +/// +/// \tparam T the integer type (int32_t or int64_t) +template +class PforCompression { + public: + using UnsignedT = typename PforTypeTraits::UnsignedType; + + /// \brief Find the optimal bit width using the cost model + /// + /// Evaluates every candidate bit width and selects the one that + /// minimizes total encoded size (packing cost + exception cost). + /// + /// \param[in] deltas unsigned deltas after FOR subtraction + /// \param[in] num_elements number of elements + /// \return the optimal bit width and exception count + static BitWidthResult FindOptimalBitWidth(const UnsignedT* deltas, + int32_t num_elements); + + /// \brief Encode a single vector of integers + /// + /// \param[in] values input integer values + /// \param[in] num_elements number of elements (up to vector_size) + /// \return the encoded vector with all sections + static PforEncodedVector EncodeVector(const T* values, int32_t num_elements); + + /// \brief Decode a single vector from compressed data + /// + /// \param[out] values output buffer for decoded integers + /// \param[in] data span over the compressed vector data + /// \param[in] num_elements number of elements in this vector + /// \return number of bytes consumed from data, or error + static Result DecodeVector(T* values, arrow::util::span data, + int32_t num_elements); + + /// \brief Calculate the serialized size of an encoded vector + static int64_t SerializedVectorSize(const PforEncodedVector& vec, + int32_t num_elements); + + /// \brief Serialize an encoded vector to a byte buffer + /// + /// \param[in] vec the encoded vector + /// \param[in] num_elements number of elements + /// \param[out] dest output buffer (must be large enough) + /// \return number of bytes written + static int64_t SerializeVector(const PforEncodedVector& vec, + int32_t num_elements, + arrow::util::span dest); +}; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_benchmark.cc b/cpp/src/arrow/util/pfor/pfor_benchmark.cc new file mode 100644 index 000000000000..aa07025dd88f --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_benchmark.cc @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// PFOR encoding/decoding benchmarks. +// +// Data distributions are inspired by Snowflake's NumericComprBenchmark.cpp, +// covering the key archetypes that exercise PFOR's cost model differently: +// - Constant: bit_width=0, best case +// - Sequential: small range, ideal FOR +// - SmallRange: clustered random, good FOR compression +// - HighBaseSmallRange: high absolute values, small delta range (timestamps) +// - WithOutliers: tests exception handling path +// - Random: worst case, full bit-width +// - TPC-DS DateSk/StoreSk/ItemSk/Quantity: realistic surrogate key distributions + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "benchmark/benchmark.h" + +#include "arrow/util/pfor/pfor.h" +#include "arrow/util/pfor/pfor_wrapper.h" + +namespace arrow::util::pfor { +namespace { + +// ====================================================================== +// Data Generators + +using Int32Gen = std::vector (*)(int64_t); +using Int64Gen = std::vector (*)(int64_t); + +template +std::vector GenConstant(int64_t n) { + return std::vector(n, static_cast(42)); +} + +template +std::vector GenSequential(int64_t n) { + std::vector v(n); + std::iota(v.begin(), v.end(), static_cast(0)); + return v; +} + +template +std::vector GenSmallRange(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(100000, 200000); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenHighBaseSmallRange(int64_t n) { + std::vector v(n); + const T kBase = static_cast(1704067200); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(0, 1000); + for (auto& x : v) x = kBase + dist(rng); + return v; +} + +template +std::vector GenWithOutliers(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(42); + std::uniform_int_distribution small_dist(1000, 1255); + for (auto& x : v) x = small_dist(rng); + std::uniform_int_distribution pos_dist(0, n - 1); + int num_outliers = std::max(static_cast(1), n / 100); + for (int i = 0; i < num_outliers; ++i) { + v[pos_dist(rng)] = static_cast(std::numeric_limits::max() / 2 + i); + } + return v; +} + +template +std::vector GenRandom(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(99); + std::uniform_int_distribution dist(std::numeric_limits::min(), + std::numeric_limits::max()); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenTpcdsSoldDateSk(int64_t n) { + std::vector v(n); + const T kBase = 2450815; + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(0, 1820); + for (auto& x : v) x = kBase + dist(rng); + return v; +} + +template +std::vector GenTpcdsStoreSk(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(1, 1000); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenTpcdsItemSk(int64_t n) { + std::vector v(n); + const T kMax = 100000; + std::mt19937_64 rng(12345); + std::exponential_distribution exp_dist(0.00005); + for (auto& x : v) { + T val = static_cast(exp_dist(rng)); + x = std::min(static_cast(val + 1), kMax); + } + return v; +} + +template +std::vector GenTpcdsQuantity(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution small_dist(1, 10); + std::uniform_int_distribution large_dist(11, 100); + std::uniform_int_distribution chance(0, 99); + for (auto& x : v) { + x = (chance(rng) < 90) ? small_dist(rng) : large_dist(rng); + } + return v; +} + +// ====================================================================== +// Benchmark Core + +template +void BM_PforEncodeImpl(benchmark::State& state, + std::vector (*generator)(int64_t)) { + const int64_t num_values = state.range(0); + auto values = generator(num_values); + + size_t max_size = PforWrapper::GetMaxCompressedSize(num_values); + std::vector compressed(max_size); + + for (auto _ : state) { + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, + compressed.data(), &comp_size); + benchmark::DoNotOptimize(comp_size); + benchmark::ClobberMemory(); + } + + state.SetBytesProcessed(state.iterations() * num_values * + static_cast(sizeof(T))); + state.SetItemsProcessed(state.iterations() * num_values); + + // Report compression ratio + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, compressed.data(), &comp_size); + state.counters["CompRatio%"] = benchmark::Counter( + 100.0 * static_cast(comp_size) / + static_cast(num_values * sizeof(T))); +} + +template +void BM_PforDecodeImpl(benchmark::State& state, + std::vector (*generator)(int64_t)) { + const int64_t num_values = state.range(0); + auto values = generator(num_values); + + size_t max_size = PforWrapper::GetMaxCompressedSize(num_values); + std::vector compressed(max_size); + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, compressed.data(), &comp_size); + + std::vector decoded(num_values); + + for (auto _ : state) { + PforWrapper::Decode(decoded.data(), num_values, + compressed.data(), comp_size); + benchmark::ClobberMemory(); + } + + state.SetBytesProcessed(state.iterations() * num_values * + static_cast(sizeof(T))); + state.SetItemsProcessed(state.iterations() * num_values); +} + +// ====================================================================== +// Non-template wrappers to avoid comma-in-macro issues with BENCHMARK_CAPTURE + +void BM_PforEncodeInt32(benchmark::State& state, Int32Gen gen) { + BM_PforEncodeImpl(state, gen); +} +void BM_PforDecodeInt32(benchmark::State& state, Int32Gen gen) { + BM_PforDecodeImpl(state, gen); +} +void BM_PforEncodeInt64(benchmark::State& state, Int64Gen gen) { + BM_PforEncodeImpl(state, gen); +} +void BM_PforDecodeInt64(benchmark::State& state, Int64Gen gen) { + BM_PforDecodeImpl(state, gen); +} + +// ====================================================================== +// Benchmark sizes: 1K, 10K, 100K, 1M + +static void CustomArgs(benchmark::internal::Benchmark* b) { + for (int64_t n : {1024, 10240, 102400, 1048576}) { + b->Arg(n); + } +} + +// ====================================================================== +// INT32 Encode + +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsStoreSk, &GenTpcdsStoreSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsItemSk, &GenTpcdsItemSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsQuantity, + &GenTpcdsQuantity) + ->Apply(CustomArgs); + +// INT32 Decode + +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsStoreSk, &GenTpcdsStoreSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsItemSk, &GenTpcdsItemSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsQuantity, + &GenTpcdsQuantity) + ->Apply(CustomArgs); + +// ====================================================================== +// INT64 Encode + +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); + +// INT64 Decode + +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); + +} // namespace +} // namespace arrow::util::pfor diff --git a/cpp/src/arrow/util/pfor/pfor_constants.h b/cpp/src/arrow/util/pfor/pfor_constants.h new file mode 100644 index 000000000000..18e63b962e4c --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_constants.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Constants for PFOR (Patched Frame of Reference) compression + +#pragma once + +#include +#include + +namespace arrow { +namespace util { +namespace pfor { + +/// \brief Constants used throughout PFOR compression +class PforConstants { + public: + /// Number of elements compressed together as a unit. + static constexpr int64_t kPforVectorSize = 1024; + + /// log2(kPforVectorSize) + static constexpr uint8_t kDefaultLogVectorSize = 10; + + /// Minimum allowed log vector size + static constexpr uint8_t kMinLogVectorSize = 3; + + /// Maximum allowed log vector size + static constexpr uint8_t kMaxLogVectorSize = 15; + + /// Type used to store vector data offsets (supports pages up to 4GB) + using OffsetType = uint32_t; + + /// Type used to store exception positions within a compressed vector. + using PositionType = int16_t; + + /// Page header size in bytes. + static constexpr int64_t kHeaderSize = 7; + + /// Packing mode: FOR + bit-packing (currently the only mode). + static constexpr uint8_t kPackingModeForBitPack = 0; + + /// Loop unroll factor for compiler hints in decode loops. + static constexpr int64_t kLoopUnrolls = 4; +}; + +/// \brief Type traits for PFOR integer types +template +struct PforTypeTraits {}; + +template <> +struct PforTypeTraits { + using UnsignedType = uint32_t; + static constexpr uint8_t kMaxBitWidth = 32; + static constexpr uint8_t kValueByteWidth = 4; + + /// PforVectorInfo size: 4B FOR + 1B bitWidth + 2B numExceptions = 7 bytes + static constexpr int64_t kVectorInfoSize = 7; + + static uint8_t BitsRequired(uint32_t value) { + if (value == 0) return 0; + return static_cast(32 - __builtin_clz(value)); + } +}; + +template <> +struct PforTypeTraits { + using UnsignedType = uint64_t; + static constexpr uint8_t kMaxBitWidth = 64; + static constexpr uint8_t kValueByteWidth = 8; + + /// PforVectorInfo size: 8B FOR + 1B bitWidth + 2B numExceptions = 11 bytes + static constexpr int64_t kVectorInfoSize = 11; + + static uint8_t BitsRequired(uint64_t value) { + if (value == 0) return 0; + return static_cast(64 - __builtin_clzll(value)); + } +}; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc new file mode 100644 index 000000000000..665a85ad714d --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -0,0 +1,435 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/pfor/pfor.h" +#include "arrow/util/pfor/pfor_wrapper.h" +#include "arrow/util/span.h" + +namespace arrow::util::pfor { + +// ====================================================================== +// Constants Tests + +TEST(PforConstantsTest, VectorSizeIsPowerOfTwo) { + EXPECT_EQ(PforConstants::kPforVectorSize, 1024); + EXPECT_EQ(1 << PforConstants::kDefaultLogVectorSize, + PforConstants::kPforVectorSize); +} + +TEST(PforConstantsTest, VectorInfoSizes) { + EXPECT_EQ(PforTypeTraits::kVectorInfoSize, 7); + EXPECT_EQ(PforTypeTraits::kVectorInfoSize, 11); +} + +// ====================================================================== +// BitsRequired Tests + +TEST(PforBitsRequiredTest, Int32) { + EXPECT_EQ(PforTypeTraits::BitsRequired(0), 0); + EXPECT_EQ(PforTypeTraits::BitsRequired(1), 1); + EXPECT_EQ(PforTypeTraits::BitsRequired(2), 2); + EXPECT_EQ(PforTypeTraits::BitsRequired(3), 2); + EXPECT_EQ(PforTypeTraits::BitsRequired(255), 8); + EXPECT_EQ(PforTypeTraits::BitsRequired(256), 9); + EXPECT_EQ(PforTypeTraits::BitsRequired(0xFFFFFFFF), 32); +} + +TEST(PforBitsRequiredTest, Int64) { + EXPECT_EQ(PforTypeTraits::BitsRequired(0), 0); + EXPECT_EQ(PforTypeTraits::BitsRequired(1), 1); + EXPECT_EQ(PforTypeTraits::BitsRequired(0xFFFFFFFFFFFFFFFFULL), 64); +} + +// ====================================================================== +// VectorInfo Serialization Tests + +TEST(PforVectorInfoTest, Int32RoundTrip) { + PforVectorInfo info; + info.frame_of_reference = -42; + info.bit_width = 17; + info.num_exceptions = 300; + + uint8_t buf[7]; + info.Store(arrow::util::span(buf, 7)); + ASSERT_OK_AND_ASSIGN(auto loaded, + PforVectorInfo::Load(arrow::util::span(buf, 7))); + + EXPECT_EQ(loaded.frame_of_reference, -42); + EXPECT_EQ(loaded.bit_width, 17); + EXPECT_EQ(loaded.num_exceptions, 300); +} + +TEST(PforVectorInfoTest, Int64RoundTrip) { + PforVectorInfo info; + info.frame_of_reference = -123456789012345LL; + info.bit_width = 48; + info.num_exceptions = 30000; + + uint8_t buf[11]; + info.Store(arrow::util::span(buf, 11)); + ASSERT_OK_AND_ASSIGN(auto loaded, + PforVectorInfo::Load(arrow::util::span(buf, 11))); + + EXPECT_EQ(loaded.frame_of_reference, -123456789012345LL); + EXPECT_EQ(loaded.bit_width, 48); + EXPECT_EQ(loaded.num_exceptions, 30000); +} + +// ====================================================================== +// Cost Model Tests + +TEST(PforCostModelTest, AllIdentical) { + // All deltas are 0 => bit_width should be 0, no exceptions + std::vector deltas(100, 0); + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); // NOLINT + EXPECT_EQ(result.bit_width, 0); + EXPECT_EQ(result.num_exceptions, 0); +} + +TEST(PforCostModelTest, SingleOutlier) { + // 99 values fit in 3 bits, 1 outlier needs 16 bits + std::vector deltas(100, 5); // all fit in 3 bits + deltas[50] = 50000; // outlier: 16 bits + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + // Cost at bit_width=3: 100*3 + 1*(16+32) = 300 + 48 = 348 + // Cost at bit_width=16: 100*16 + 0 = 1600 + // 348 < 1600, so should pick 3 with 1 exception + EXPECT_EQ(result.bit_width, 3); + EXPECT_EQ(result.num_exceptions, 1); +} + +TEST(PforCostModelTest, NoOutliers) { + // All values fit in 8 bits + std::vector deltas(100); + for (int32_t i = 0; i < 100; ++i) deltas[i] = i * 2; + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + EXPECT_EQ(result.num_exceptions, 0); + EXPECT_LE(result.bit_width, 8); +} + +// ====================================================================== +// Vector Encode/Decode Round-Trip Tests + +TEST(PforVectorTest, Int32SimpleSequence) { + std::vector values(64); + std::iota(values.begin(), values.end(), 100); + + auto encoded = PforCompression::EncodeVector(values.data(), 64); + EXPECT_EQ(encoded.info.frame_of_reference, 100); + EXPECT_EQ(encoded.info.num_exceptions, 0); + + // Serialize then decode + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 64); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 64, buffer); + + std::vector decoded(64); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 64)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32WithOutlier) { + std::vector values = {100, 102, 101, 103, 100, 99, 50000, 104}; + + auto encoded = PforCompression::EncodeVector(values.data(), 8); + EXPECT_EQ(encoded.info.frame_of_reference, 99); + EXPECT_GT(encoded.info.num_exceptions, 0); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 8); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 8, buffer); + + std::vector decoded(8); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 8)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32AllIdentical) { + std::vector values(100, 42); + + auto encoded = PforCompression::EncodeVector(values.data(), 100); + EXPECT_EQ(encoded.info.bit_width, 0); + EXPECT_EQ(encoded.info.num_exceptions, 0); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 100); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 100, buffer); + + std::vector decoded(100); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 100)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32NegativeValues) { + std::vector values = {-100, -50, -200, -1, -150}; + + auto encoded = PforCompression::EncodeVector(values.data(), 5); + EXPECT_EQ(encoded.info.frame_of_reference, -200); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 5); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 5, buffer); + + std::vector decoded(5); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 5)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32MinMaxEdge) { + std::vector values = {std::numeric_limits::min(), + std::numeric_limits::max(), 0, -1, 1}; + + auto encoded = PforCompression::EncodeVector(values.data(), 5); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 5); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 5, buffer); + + std::vector decoded(5); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 5)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int64SimpleSequence) { + std::vector values(64); + std::iota(values.begin(), values.end(), 1000000000LL); + + auto encoded = PforCompression::EncodeVector(values.data(), 64); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 64); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 64, buffer); + + std::vector decoded(64); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 64)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int64WithOutlier) { + std::vector values(100, 5000000LL); + values[42] = 999999999999LL; // Outlier + + auto encoded = PforCompression::EncodeVector(values.data(), 100); + EXPECT_GT(encoded.info.num_exceptions, 0); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 100); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 100, buffer); + + std::vector decoded(100); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 100)); + + EXPECT_EQ(values, decoded); +} + +// ====================================================================== +// Page-Level Wrapper Tests + +TEST(PforWrapperTest, Int32SmallPage) { + std::vector values = {10, 20, 30, 40, 50}; + + int64_t max_size = PforWrapper::GetMaxCompressedSize(5); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 5, compressed.data(), &comp_size); + EXPECT_GT(comp_size, 0); + + std::vector decoded(5); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 5, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32ExactOneVector) { + std::vector values(1024); + std::iota(values.begin(), values.end(), 0); + + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + std::vector decoded(1024); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32MultipleVectors) { + // 2.5 vectors worth of data + const int32_t n = 2560; + std::vector values(n); + std::mt19937 rng(42); + std::uniform_int_distribution dist(0, 1000); + for (auto& v : values) v = dist(rng); + + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32WithOutliers) { + std::vector values(1024, 100); + // Sprinkle outliers + values[0] = -999999; + values[100] = 888888; + values[500] = 777777; + values[1023] = -123456; + + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + std::vector decoded(1024); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int64MultipleVectors) { + const int32_t n = 3000; + std::vector values(n); + std::mt19937 rng(123); + std::uniform_int_distribution dist(0, 100000); + for (auto& v : values) v = dist(rng); + // Add outliers + values[0] = 9999999999999LL; + values[1500] = -9999999999999LL; + + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32SingleElement) { + std::vector values = {42}; + + int64_t max_size = PforWrapper::GetMaxCompressedSize(1); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1, compressed.data(), &comp_size); + + std::vector decoded(1); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32AllZeros) { + std::vector values(1024, 0); + + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + // Should compress very well (bit_width = 0) + EXPECT_LT(comp_size, 100); + + std::vector decoded(1024); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32LargeRandom) { + const int32_t n = 10000; + std::vector values(n); + std::mt19937 rng(99); + std::uniform_int_distribution dist( + std::numeric_limits::min(), + std::numeric_limits::max()); + for (auto& v : values) v = dist(rng); + + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); + + EXPECT_EQ(values, decoded); +} + +// ====================================================================== +// Compression Ratio Test + +TEST(PforCompressionRatioTest, ClusteredDataCompresses) { + // Data clustered around 1000 with one outlier + std::vector values(1024); + std::mt19937 rng(42); + std::uniform_int_distribution dist(1000, 1255); + for (auto& v : values) v = dist(rng); + values[500] = 999999; // One outlier + + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + int64_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + // PLAIN would be 4096 bytes. PFOR should be much smaller. + size_t plain_size = 1024 * sizeof(int32_t); + EXPECT_LT(comp_size, plain_size / 2); +} + +} // namespace arrow::util::pfor diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc new file mode 100644 index 000000000000..15ae79cb5610 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -0,0 +1,218 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// PFOR page-level wrapper implementation +// +// Page layout: +// [Header 7B] [Offset Array: numVectors * 4B] [Vector 0] [Vector 1] ... +// +// Each vector: +// [PforVectorInfo] [PackedValues] [ExceptionPositions] [ExceptionValues] + +#include "arrow/util/pfor/pfor_wrapper.h" + +#include +#include +#include + +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" +#include "arrow/util/span.h" + +namespace arrow { +namespace util { +namespace pfor { + +// ---------------------------------------------------------------------- +// Header serialization + +template +void PforWrapper::StoreHeader(arrow::util::span dest, + const PforHeader& header) { + uint8_t* ptr = dest.data(); + ptr[0] = header.packing_mode; + ptr[1] = header.log_vector_size; + ptr[2] = header.value_byte_width; + std::memcpy(ptr + 3, &header.num_elements, sizeof(int32_t)); +} + +template +typename PforWrapper::PforHeader PforWrapper::LoadHeader( + arrow::util::span src) { + PforHeader header; + const uint8_t* ptr = src.data(); + header.packing_mode = ptr[0]; + header.log_vector_size = ptr[1]; + header.value_byte_width = ptr[2]; + std::memcpy(&header.num_elements, ptr + 3, sizeof(int32_t)); + return header; +} + +// ---------------------------------------------------------------------- +// Encode + +template +void PforWrapper::Encode(const T* values, int32_t num_values, int32_t vector_size, + char* comp, int64_t* comp_size) { + ARROW_DCHECK(num_values > 0); + ARROW_DCHECK(comp != nullptr); + ARROW_DCHECK(comp_size != nullptr); + ARROW_DCHECK((vector_size & (vector_size - 1)) == 0); + + const int32_t num_vectors = + (num_values + vector_size - 1) / vector_size; + + // Compute log2(vector_size) + uint8_t log_vector_size = 0; + for (int32_t v = vector_size; v > 1; v >>= 1) ++log_vector_size; + + auto* dest = reinterpret_cast(comp); + + // Step 1: Write header + PforHeader header; + header.packing_mode = PforConstants::kPackingModeForBitPack; + header.log_vector_size = log_vector_size; + header.value_byte_width = sizeof(T); + header.num_elements = num_values; + StoreHeader(arrow::util::span(dest, PforConstants::kHeaderSize), header); + uint8_t* write_ptr = dest + PforConstants::kHeaderSize; + + // Step 2: Reserve space for offset array + uint8_t* offset_array_start = write_ptr; + write_ptr += num_vectors * sizeof(uint32_t); + + // Step 3: Encode each vector and build offset array + const uint8_t* data_start = offset_array_start; + + for (int32_t v = 0; v < num_vectors; ++v) { + // Record offset (from start of offset array) + uint32_t offset = static_cast(write_ptr - data_start); + std::memcpy(offset_array_start + v * sizeof(uint32_t), &offset, sizeof(uint32_t)); + + // Determine elements in this vector + int32_t start_idx = v * vector_size; + int32_t elements_in_vector = + std::min(vector_size, num_values - start_idx); + + // Encode vector + auto encoded = PforCompression::EncodeVector( + values + start_idx, elements_in_vector); + + // Serialize to output + int64_t bytes_written = PforCompression::SerializeVector( + encoded, elements_in_vector, + arrow::util::span(write_ptr, dest + *comp_size - write_ptr)); + write_ptr += bytes_written; + } + + *comp_size = static_cast(write_ptr - dest); +} + +template +void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, + int64_t* comp_size) { + Encode(values, num_values, kVectorSize, comp, comp_size); +} + +// ---------------------------------------------------------------------- +// Decode + +template +Status PforWrapper::Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size) { + if (num_values <= 0) { + return Status::Invalid("PFOR num_values must be positive: ", num_values); + } + if (comp == nullptr) { + return Status::Invalid("PFOR compressed data pointer is null"); + } + if (comp_size < PforConstants::kHeaderSize) { + return Status::Invalid("PFOR compressed buffer too small for header: ", comp_size, + " < ", PforConstants::kHeaderSize); + } + + const auto* src = reinterpret_cast(comp); + + // Step 1: Read header + PforHeader header = LoadHeader( + arrow::util::span(src, PforConstants::kHeaderSize)); + + if (header.packing_mode != PforConstants::kPackingModeForBitPack) { + return Status::Invalid("PFOR unsupported packing mode: ", header.packing_mode); + } + if (header.value_byte_width != sizeof(T)) { + return Status::Invalid("PFOR value_byte_width mismatch: ", header.value_byte_width, + " vs expected ", sizeof(T)); + } + + const int32_t vector_size = 1 << header.log_vector_size; + const int32_t num_vectors = + (header.num_elements + vector_size - 1) / vector_size; + + // Step 2: Read offset array + const uint8_t* offset_array_start = src + PforConstants::kHeaderSize; + + // Step 3: Decode each vector + for (int32_t v = 0; v < num_vectors; ++v) { + uint32_t offset; + std::memcpy(&offset, offset_array_start + v * sizeof(uint32_t), + sizeof(uint32_t)); + + const uint8_t* vector_data = offset_array_start + offset; + + int32_t start_idx = v * vector_size; + int32_t elements_in_vector = + std::min(vector_size, header.num_elements - start_idx); + + ARROW_RETURN_NOT_OK(PforCompression::DecodeVector( + values + start_idx, + arrow::util::span(vector_data, src + comp_size - vector_data), + elements_in_vector)); + } + + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// GetMaxCompressedSize + +template +int64_t PforWrapper::GetMaxCompressedSize(int32_t num_values, int32_t vector_size) { + const int32_t num_vectors = + (num_values + vector_size - 1) / vector_size; + + // Header + offset array + int64_t size = PforConstants::kHeaderSize + + num_vectors * static_cast(sizeof(uint32_t)); + + // Worst case per vector: full bit width + all exceptions + int64_t max_vector_size = PforVectorInfo::kStoredSize + + vector_size * static_cast(sizeof(T)) // packed at full width + + vector_size * static_cast(sizeof(int16_t)) // exception positions + + vector_size * static_cast(sizeof(T)); // exception values + + size += num_vectors * max_vector_size; + return size; +} + +// Explicit template instantiations +template class PforWrapper; +template class PforWrapper; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h new file mode 100644 index 000000000000..5512af4f37b4 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// High-level wrapper interface for PFOR compression +// +// Handles page-level serialization: header, offset array, and vectors. + +#pragma once + +#include +#include + +#include "arrow/status.h" +#include "arrow/util/pfor/pfor.h" +#include "arrow/util/span.h" + +namespace arrow { +namespace util { +namespace pfor { + +/// \class PforWrapper +/// \brief High-level interface for PFOR page-level compression +/// +/// Manages the page layout: [Header 7B] [Offset Array] [Vector 0] [Vector 1] ... +/// +/// \tparam T the integer type (int32_t or int64_t) +template +class PforWrapper { + public: + /// \brief Encode integer values into a PFOR-compressed page + /// + /// \param[in] values pointer to input integers + /// \param[in] num_values total number of values + /// \param[in] vector_size number of elements per vector (must be a power of 2, + /// in [2^kMinLogVectorSize, 2^kMaxLogVectorSize]) + /// \param[out] comp pointer to output buffer (caller must ensure sufficient size) + /// \param[in,out] comp_size input: available buffer size; output: bytes written + static void Encode(const T* values, int32_t num_values, int32_t vector_size, + char* comp, int64_t* comp_size); + + /// Convenience overload with default vector_size = kPforVectorSize + static void Encode(const T* values, int32_t num_values, char* comp, + int64_t* comp_size); + + /// \brief Decode a PFOR-compressed page + /// + /// \param[out] values pointer to output buffer + /// \param[in] num_values number of values to decode (from page context) + /// \param[in] comp pointer to compressed data + /// \param[in] comp_size size of compressed data + /// \return Status::OK on success, or an error if the data is malformed + static Status Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size); + + /// \brief Get the maximum compressed size for a given number of values + /// + /// \param[in] num_values number of integer values + /// \param[in] vector_size number of elements per vector + /// \return maximum possible compressed page size in bytes + static int64_t GetMaxCompressedSize( + int32_t num_values, + int32_t vector_size = static_cast(PforConstants::kPforVectorSize)); + + private: + /// \brief Page header structure (7 bytes) + struct PforHeader { + uint8_t packing_mode; // 0 = FOR + bit-packing + uint8_t log_vector_size; // log2(vector_size) + uint8_t value_byte_width; // sizeof(T): 4 or 8 + int32_t num_elements; // total element count + }; + + static constexpr int32_t kVectorSize = + static_cast(PforConstants::kPforVectorSize); + + static void StoreHeader(arrow::util::span dest, const PforHeader& header); + static PforHeader LoadHeader(arrow::util::span src); +}; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 79b837f755c3..b39e020b1ab7 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -862,7 +862,8 @@ class ColumnReaderImplBase { case Encoding::RLE: case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_BYTE_ARRAY: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: { + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::PFOR: { auto decoder = MakeTypedDecoder(encoding, descr_, pool_); current_decoder_ = decoder.get(); decoders_[static_cast(encoding)] = std::move(decoder); diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index 3ce2323d29a1..f080fc14925f 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -45,6 +45,7 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/spaced_internal.h" +#include "arrow/util/pfor/pfor_wrapper.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" @@ -2323,6 +2324,79 @@ class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase +class PforDecoder : public TypedDecoderImpl { + public: + using T = typename DType::c_type; + + explicit PforDecoder(const ColumnDescriptor* descr, + MemoryPool* pool = ::arrow::default_memory_pool()) + : TypedDecoderImpl(descr, Encoding::PFOR), pool_(pool) {} + + void SetData(int num_values, const uint8_t* data, int len) override { + this->num_values_ = num_values; + data_ = data; + data_len_ = len; + values_decoded_ = 0; + } + + int Decode(T* buffer, int max_values) override { + int values_to_decode = std::min(max_values, this->num_values_); + if (values_to_decode == 0) return 0; + + // Decode all values on first call, cache for subsequent calls + if (decoded_values_.empty() && this->num_values_ > 0) { + decoded_values_.resize(this->num_values_); + arrow::util::pfor::PforWrapper::Decode( + decoded_values_.data(), static_cast(this->num_values_), + reinterpret_cast(data_), static_cast(data_len_)); + } + + std::memcpy(buffer, decoded_values_.data() + values_decoded_, + values_to_decode * sizeof(T)); + values_decoded_ += values_to_decode; + this->num_values_ -= values_to_decode; + return values_to_decode; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out) override { + if (null_count != 0) { + ParquetException::NYI("PFOR DecodeArrow with null slots"); + } + std::vector values(num_values); + int decoded_count = Decode(values.data(), num_values); + PARQUET_THROW_NOT_OK(out->AppendValues(values.data(), decoded_count)); + return decoded_count; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* out) override { + if (null_count != 0) { + ParquetException::NYI("PFOR DecodeArrow with null slots"); + } + std::vector values(num_values); + int decoded_count = Decode(values.data(), num_values); + PARQUET_THROW_NOT_OK(out->Reserve(decoded_count)); + for (int i = 0; i < decoded_count; ++i) { + PARQUET_THROW_NOT_OK(out->Append(values[i])); + } + return decoded_count; + } + + private: + MemoryPool* pool_; + const uint8_t* data_ = nullptr; + int data_len_ = 0; + int values_decoded_ = 0; + std::vector decoded_values_; +}; + } // namespace // ---------------------------------------------------------------------- @@ -2399,6 +2473,15 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin return std::make_unique(descr); } throw ParquetException("RLE encoding only supports BOOLEAN"); + } else if (encoding == Encoding::PFOR) { + switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); + default: + throw ParquetException("PFOR decoder only supports INT32 and INT64"); + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index 04f079ce70cd..a346bbbc2bfd 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -41,6 +41,7 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/spaced_internal.h" +#include "arrow/util/pfor/pfor_wrapper.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" @@ -1751,6 +1752,82 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { } // namespace +// ---------------------------------------------------------------------- +// PFOR Encoder + +template +class PforEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + using TypedEncoder::Put; + + explicit PforEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::PFOR, pool), pool_(pool) {} + + std::shared_ptr FlushValues() override { + if (values_.empty()) { + PARQUET_ASSIGN_OR_THROW(auto empty_buf, ::arrow::AllocateBuffer(0, pool_)); + return std::move(empty_buf); + } + + const uint32_t num_values = static_cast(values_.size()); + size_t max_size = + arrow::util::pfor::PforWrapper::GetMaxCompressedSize(num_values); + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateResizableBuffer( + static_cast(max_size), pool_)); + + size_t comp_size = max_size; + arrow::util::pfor::PforWrapper::Encode( + values_.data(), num_values, + reinterpret_cast(buffer->mutable_data()), &comp_size); + + PARQUET_THROW_NOT_OK(buffer->Resize(static_cast(comp_size))); + values_.clear(); + return std::move(buffer); + } + + int64_t EstimatedDataEncodedSize() override { + return static_cast(values_.size() * sizeof(T)); + } + + void Put(const ::arrow::Array& values) override { + const auto& data = *values.data(); + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); + } + if (values.null_count() == 0) { + Put(data.template GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.template GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } + } + + void Put(const T* buffer, int num_values) override { + values_.insert(values_.end(), buffer, buffer + num_values); + } + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, + ::arrow::AllocateBuffer(num_values * sizeof(T), pool_)); + T* dest = reinterpret_cast(buffer->mutable_data()); + int num_valid = + ::arrow::util::internal::SpacedCompress(src, num_values, valid_bits, + valid_bits_offset, dest); + Put(dest, num_valid); + } else { + Put(src, num_values); + } + } + + private: + MemoryPool* pool_; + std::vector values_; +}; + // ---------------------------------------------------------------------- // Factory function @@ -1850,6 +1927,15 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException( "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } + } else if (encoding == Encoding::PFOR) { + switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); + default: + throw ParquetException("PFOR encoder only supports INT32 and INT64"); + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index fb4eb92a7544..755a7035eabf 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -259,6 +259,8 @@ std::string EncodingToString(Encoding::type t) { return "RLE_DICTIONARY"; case Encoding::BYTE_STREAM_SPLIT: return "BYTE_STREAM_SPLIT"; + case Encoding::PFOR: + return "PFOR"; default: return "UNKNOWN"; } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 7e8a18fc94d6..a8001518b6db 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -538,8 +538,9 @@ struct Encoding { DELTA_BYTE_ARRAY = 7, RLE_DICTIONARY = 8, BYTE_STREAM_SPLIT = 9, + PFOR = 11, // Should always be last element (except UNKNOWN) - UNDEFINED = 10, + UNDEFINED = 12, UNKNOWN = 999 }; };