From 5df605f77bdd2c3858b56d4073b4c936f181a7d5 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Mon, 20 Apr 2026 23:58:37 +0000 Subject: [PATCH 01/12] Add PFOR core algorithm and tests Implements the PFOR (Patched Frame of Reference) integer compression algorithm as a standalone utility library in arrow/util/pfor/. Includes: - Cost model for optimal bit width selection (histogram-based) - Vector-level encode/decode with FOR + bit-packing + exceptions - Page-level wrapper with header, offset array, and multi-vector layout - Comprehensive unit tests covering edge cases and round-trips --- cpp/src/arrow/util/CMakeLists.txt | 6 + cpp/src/arrow/util/pfor/pfor.cc | 291 +++++++++++++++ cpp/src/arrow/util/pfor/pfor.h | 146 ++++++++ cpp/src/arrow/util/pfor/pfor_constants.h | 96 +++++ cpp/src/arrow/util/pfor/pfor_test.cc | 431 +++++++++++++++++++++++ cpp/src/arrow/util/pfor/pfor_wrapper.cc | 184 ++++++++++ cpp/src/arrow/util/pfor/pfor_wrapper.h | 83 +++++ 7 files changed, 1237 insertions(+) create mode 100644 cpp/src/arrow/util/pfor/pfor.cc create mode 100644 cpp/src/arrow/util/pfor/pfor.h create mode 100644 cpp/src/arrow/util/pfor/pfor_constants.h create mode 100644 cpp/src/arrow/util/pfor/pfor_test.cc create mode 100644 cpp/src/arrow/util/pfor/pfor_wrapper.cc create mode 100644 cpp/src/arrow/util/pfor/pfor_wrapper.h diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index a41b63f07b3e..3fc6f60d9045 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -118,6 +118,12 @@ add_arrow_test(threading-utility-test test_common.cc thread_pool_test.cc) +add_arrow_test(pfor-test + SOURCES + pfor/pfor_test.cc + pfor/pfor.cc + pfor/pfor_wrapper.cc) + add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) add_arrow_benchmark(bitmap_reader_benchmark) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc new file mode 100644 index 000000000000..54bb131d1964 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Core PFOR (Patched Frame of Reference) compression implementation +// +// Adapted from the Snowflake PFOR encoder (PforEncoder.{hpp,cpp}). +// Key differences from the Snowflake implementation: +// - Vector size: 1024 (not 2048) +// - Max exceptions: uint16 (not uint8) +// - Exception values: original integers (not FOR offsets) +// - Bit packing: Arrow's BitWriter/unpack (not Snowflake's BitPacker) + +#include "arrow/util/pfor/pfor.h" + +#include +#include +#include +#include + +#include "arrow/util/bit_stream_utils_internal.h" +#include "arrow/util/bit_util.h" +#include "arrow/util/bpacking_internal.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { +namespace pfor { + +// ---------------------------------------------------------------------- +// FindOptimalBitWidth: histogram-based cost model + +template +BitWidthResult PforCompression::FindOptimalBitWidth(const UnsignedT* deltas, + uint32_t num_elements) { + constexpr uint8_t max_bits = PforTypeTraits::kMaxBitWidth; + constexpr uint8_t position_bits = 16; // uint16_t for exception position + constexpr uint8_t value_bits = sizeof(T) * 8; + + // Build histogram: histogram[b] = count of deltas requiring exactly b bits + std::array histogram{}; // Support up to 64 bits + for (uint32_t i = 0; i < num_elements; ++i) { + uint8_t bits = PforTypeTraits::BitsRequired(deltas[i]); + histogram[bits]++; + } + + // Evaluate each candidate bit width + uint64_t best_cost = std::numeric_limits::max(); + uint8_t best_bit_width = max_bits; + uint16_t best_num_exceptions = 0; + + uint64_t exceptions_above = num_elements; // All start as potential exceptions + + for (uint8_t b = 0; b <= max_bits; ++b) { + exceptions_above -= histogram[b]; + + if (exceptions_above > PforConstants::kMaxExceptions) { + continue; + } + + uint64_t packing_cost = static_cast(num_elements) * b; + uint64_t exception_cost = exceptions_above * (position_bits + value_bits); + uint64_t total_cost = packing_cost + exception_cost; + + if (total_cost < best_cost) { + best_cost = total_cost; + best_bit_width = b; + best_num_exceptions = static_cast(exceptions_above); + } + } + + return {best_bit_width, best_num_exceptions}; +} + +// ---------------------------------------------------------------------- +// EncodeVector + +template +PforEncodedVector PforCompression::EncodeVector(const T* values, + uint32_t num_elements) { + ARROW_DCHECK(num_elements > 0); + + // Step 1: Find min (frame of reference) + T min_val = values[0]; + for (uint32_t i = 1; i < num_elements; ++i) { + if (values[i] < min_val) min_val = values[i]; + } + + // Step 2: Compute unsigned deltas + const auto unsigned_min = static_cast(min_val); + std::vector deltas(num_elements); + for (uint32_t i = 0; i < num_elements; ++i) { + deltas[i] = static_cast(values[i]) - unsigned_min; + } + + // Step 3: Find optimal bit width + auto [bit_width, num_exceptions] = + FindOptimalBitWidth(deltas.data(), num_elements); + + // Step 4: Collect exceptions and replace with placeholder (0) + PforEncodedVector result; + result.info.frame_of_reference = min_val; + result.info.bit_width = bit_width; + result.info.num_exceptions = num_exceptions; + + if (num_exceptions > 0) { + result.exception_positions.reserve(num_exceptions); + result.exception_values.reserve(num_exceptions); + + UnsignedT mask = (bit_width >= PforTypeTraits::kMaxBitWidth) + ? static_cast(-1) + : (static_cast(1) << bit_width) - 1; + + for (uint32_t i = 0; i < num_elements; ++i) { + if (deltas[i] > mask) { + result.exception_positions.push_back(static_cast(i)); + result.exception_values.push_back(values[i]); // Store ORIGINAL value + deltas[i] = 0; // Placeholder + } + } + } + + // Step 5: Bit-pack the deltas + if (bit_width > 0) { + size_t packed_size = static_cast( + bit_util::BytesForBits(static_cast(num_elements) * bit_width)); + result.packed_values.resize(packed_size, 0); + + bit_util::BitWriter writer(result.packed_values.data(), + static_cast(packed_size)); + for (uint32_t i = 0; i < num_elements; ++i) { + writer.PutValue(static_cast(deltas[i]), bit_width); + } + writer.Flush(); + } + + return result; +} + +// ---------------------------------------------------------------------- +// DecodeVector + +template +size_t PforCompression::DecodeVector(T* values, const uint8_t* data, + uint32_t num_elements) { + // Step 1: Read vector info + auto info = PforVectorInfo::Load(data); + const uint8_t* read_ptr = data + PforVectorInfo::kSerializedSize; + + // Step 2: Handle constant data (bit_width == 0, no exceptions) + if (info.bit_width == 0 && info.num_exceptions == 0) { + std::fill(values, values + num_elements, info.frame_of_reference); + return PforVectorInfo::kSerializedSize; + } + + // Step 3: Unpack bit-packed deltas and add FOR + if (info.bit_width > 0) { + // Use SIMD-optimized unpack for batches of 32 (uint32) or 32 (uint64) + constexpr int kBatchSize = 32; + uint32_t full_batches = num_elements / kBatchSize; + uint32_t remainder = num_elements % kBatchSize; + + UnsignedT* unsigned_values = reinterpret_cast(values); + const auto unsigned_for = static_cast(info.frame_of_reference); + + // Unpack full batches using SIMD + for (uint32_t batch = 0; batch < full_batches; ++batch) { + arrow::internal::unpack(read_ptr, unsigned_values + batch * kBatchSize, + kBatchSize, info.bit_width, + batch * kBatchSize * info.bit_width); + } + + // Unpack remainder using BitReader + if (remainder > 0) { + size_t packed_size = static_cast( + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width)); + bit_util::BitReader reader(read_ptr, static_cast(packed_size)); + // Skip past the full batches + for (uint32_t i = 0; i < full_batches * kBatchSize; ++i) { + uint64_t val; + reader.GetValue(info.bit_width, &val); + } + for (uint32_t i = full_batches * kBatchSize; i < num_elements; ++i) { + uint64_t val; + reader.GetValue(info.bit_width, &val); + unsigned_values[i] = static_cast(val); + } + } + + // Add FOR to all values + for (uint32_t i = 0; i < num_elements; ++i) { + unsigned_values[i] += unsigned_for; + } + + size_t packed_size = static_cast( + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width)); + read_ptr += packed_size; + } else { + // bit_width == 0 but has exceptions - fill with FOR + std::fill(values, values + num_elements, info.frame_of_reference); + } + + // Step 4: Patch exceptions (stored as original values) + if (info.num_exceptions > 0) { + const uint8_t* positions_ptr = read_ptr; + read_ptr += info.num_exceptions * sizeof(uint16_t); + + const uint8_t* values_ptr = read_ptr; + read_ptr += info.num_exceptions * sizeof(T); + + for (uint16_t i = 0; i < info.num_exceptions; ++i) { + uint16_t pos; + std::memcpy(&pos, positions_ptr + i * sizeof(uint16_t), sizeof(uint16_t)); + + T value; + std::memcpy(&value, values_ptr + i * sizeof(T), sizeof(T)); + + values[pos] = value; + } + } + + return static_cast(read_ptr - data); +} + +// ---------------------------------------------------------------------- +// Serialization helpers + +template +size_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec, + uint32_t num_elements) { + size_t size = PforVectorInfo::kSerializedSize; + if (vec.info.bit_width > 0) { + size += static_cast( + bit_util::BytesForBits(static_cast(num_elements) * vec.info.bit_width)); + } + size += vec.info.num_exceptions * sizeof(uint16_t); // positions + size += vec.info.num_exceptions * sizeof(T); // values + return size; +} + +template +size_t PforCompression::SerializeVector(const PforEncodedVector& vec, + uint32_t num_elements, + uint8_t* dest) { + uint8_t* write_ptr = dest; + + // Write vector info + vec.info.Store(write_ptr); + write_ptr += PforVectorInfo::kSerializedSize; + + // Write packed values + if (vec.info.bit_width > 0 && !vec.packed_values.empty()) { + std::memcpy(write_ptr, vec.packed_values.data(), vec.packed_values.size()); + write_ptr += vec.packed_values.size(); + } + + // Write exception positions + if (vec.info.num_exceptions > 0) { + std::memcpy(write_ptr, vec.exception_positions.data(), + vec.info.num_exceptions * sizeof(uint16_t)); + write_ptr += vec.info.num_exceptions * sizeof(uint16_t); + + // Write exception values (original integers) + std::memcpy(write_ptr, vec.exception_values.data(), + vec.info.num_exceptions * sizeof(T)); + write_ptr += vec.info.num_exceptions * sizeof(T); + } + + return static_cast(write_ptr - dest); +} + +// Explicit template instantiations +template class PforCompression; +template class PforCompression; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h new file mode 100644 index 000000000000..af25cbbedfc1 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -0,0 +1,146 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Core PFOR (Patched Frame of Reference) compression algorithm +// +// PFOR compresses integer columns by: +// 1. Subtracting the minimum value (Frame of Reference) +// 2. Choosing an optimal bit width via a cost model +// 3. Bit-packing the deltas at the chosen width +// 4. Storing outlier values (exceptions) separately + +#pragma once + +#include +#include +#include + +#include "arrow/util/pfor/pfor_constants.h" + +namespace arrow { +namespace util { +namespace pfor { + +// ---------------------------------------------------------------------- +// Per-vector metadata + +/// \brief PFOR vector metadata stored at the start of each compressed vector. +/// +/// For INT32 (7 bytes): [frame_of_reference(4B)] [bit_width(1B)] [num_exceptions(2B)] +/// For INT64 (11 bytes): [frame_of_reference(8B)] [bit_width(1B)] [num_exceptions(2B)] +template +struct PforVectorInfo { + T frame_of_reference = 0; + uint8_t bit_width = 0; + uint16_t num_exceptions = 0; + + /// \brief Store this info to a byte buffer (little-endian) + void Store(uint8_t* dest) const { + std::memcpy(dest, &frame_of_reference, sizeof(T)); + dest[sizeof(T)] = bit_width; + uint16_t le_exceptions = num_exceptions; // Assume LE platform + std::memcpy(dest + sizeof(T) + 1, &le_exceptions, sizeof(uint16_t)); + } + + /// \brief Load this info from a byte buffer (little-endian) + static PforVectorInfo Load(const uint8_t* src) { + PforVectorInfo info; + std::memcpy(&info.frame_of_reference, src, sizeof(T)); + info.bit_width = src[sizeof(T)]; + std::memcpy(&info.num_exceptions, src + sizeof(T) + 1, sizeof(uint16_t)); + return info; + } + + /// \brief Serialized size in bytes + static constexpr uint8_t kSerializedSize = PforTypeTraits::kVectorInfoSize; +}; + +// ---------------------------------------------------------------------- +// Encoded vector representation + +/// \brief A PFOR-encoded vector with all its data sections +template +struct PforEncodedVector { + PforVectorInfo info; + std::vector packed_values; + std::vector exception_positions; + std::vector exception_values; +}; + +// ---------------------------------------------------------------------- +// Cost model result + +/// \brief Result of the optimal bit width search +struct BitWidthResult { + uint8_t bit_width = 0; + uint16_t num_exceptions = 0; +}; + +// ---------------------------------------------------------------------- +// Core compression/decompression + +/// \brief PFOR compression and decompression algorithms +/// +/// \tparam T the integer type (int32_t or int64_t) +template +class PforCompression { + public: + using UnsignedT = typename PforTypeTraits::UnsignedType; + + /// \brief Find the optimal bit width using the cost model + /// + /// Evaluates every candidate bit width and selects the one that + /// minimizes total encoded size (packing cost + exception cost). + /// + /// \param[in] deltas unsigned deltas after FOR subtraction + /// \param[in] num_elements number of elements + /// \return the optimal bit width and exception count + static BitWidthResult FindOptimalBitWidth(const UnsignedT* deltas, + uint32_t num_elements); + + /// \brief Encode a single vector of integers + /// + /// \param[in] values input integer values + /// \param[in] num_elements number of elements (up to vector_size) + /// \return the encoded vector with all sections + static PforEncodedVector EncodeVector(const T* values, uint32_t num_elements); + + /// \brief Decode a single vector from compressed data + /// + /// \param[out] values output buffer for decoded integers + /// \param[in] data pointer to the start of the vector data + /// \param[in] num_elements number of elements in this vector + /// \return number of bytes consumed from data + static size_t DecodeVector(T* values, const uint8_t* data, uint32_t num_elements); + + /// \brief Calculate the serialized size of an encoded vector + static size_t SerializedVectorSize(const PforEncodedVector& vec, + uint32_t num_elements); + + /// \brief Serialize an encoded vector to a byte buffer + /// + /// \param[in] vec the encoded vector + /// \param[in] num_elements number of elements + /// \param[out] dest output buffer (must be large enough) + /// \return number of bytes written + static size_t SerializeVector(const PforEncodedVector& vec, + uint32_t num_elements, uint8_t* dest); +}; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_constants.h b/cpp/src/arrow/util/pfor/pfor_constants.h new file mode 100644 index 000000000000..1820ede324f0 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_constants.h @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Constants for PFOR (Patched Frame of Reference) compression + +#pragma once + +#include +#include + +namespace arrow { +namespace util { +namespace pfor { + +/// \brief Constants used throughout PFOR compression +class PforConstants { + public: + /// Number of elements compressed together as a unit. + static constexpr uint32_t kPforVectorSize = 1024; + + /// log2(kPforVectorSize) + static constexpr uint8_t kDefaultLogVectorSize = 10; + + /// Minimum allowed log vector size + static constexpr uint8_t kMinLogVectorSize = 3; + + /// Maximum allowed log vector size + static constexpr uint8_t kMaxLogVectorSize = 15; + + /// Type used to store vector data offsets (supports pages up to 4GB) + using OffsetType = uint32_t; + + /// Type used to store exception positions within a compressed vector. + using PositionType = uint16_t; + + /// Maximum number of exceptions per vector (uint16 limit). + static constexpr uint16_t kMaxExceptions = 65535; + + /// Page header size in bytes. + static constexpr uint8_t kHeaderSize = 7; + + /// Packing mode: FOR + bit-packing (currently the only mode). + static constexpr uint8_t kPackingModeForBitPack = 0; +}; + +/// \brief Type traits for PFOR integer types +template +struct PforTypeTraits {}; + +template <> +struct PforTypeTraits { + using UnsignedType = uint32_t; + static constexpr uint8_t kMaxBitWidth = 32; + static constexpr uint8_t kValueByteWidth = 4; + + /// PforVectorInfo size: 4B FOR + 1B bitWidth + 2B numExceptions = 7 bytes + static constexpr uint8_t kVectorInfoSize = 7; + + static uint8_t BitsRequired(uint32_t value) { + if (value == 0) return 0; + return static_cast(32 - __builtin_clz(value)); + } +}; + +template <> +struct PforTypeTraits { + using UnsignedType = uint64_t; + static constexpr uint8_t kMaxBitWidth = 64; + static constexpr uint8_t kValueByteWidth = 8; + + /// PforVectorInfo size: 8B FOR + 1B bitWidth + 2B numExceptions = 11 bytes + static constexpr uint8_t kVectorInfoSize = 11; + + static uint8_t BitsRequired(uint64_t value) { + if (value == 0) return 0; + return static_cast(64 - __builtin_clzll(value)); + } +}; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc new file mode 100644 index 000000000000..c9e6f77f05ab --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -0,0 +1,431 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "arrow/util/pfor/pfor.h" +#include "arrow/util/pfor/pfor_wrapper.h" + +namespace arrow::util::pfor { + +// ====================================================================== +// Constants Tests + +TEST(PforConstantsTest, VectorSizeIsPowerOfTwo) { + EXPECT_EQ(PforConstants::kPforVectorSize, 1024u); + EXPECT_EQ(1u << PforConstants::kDefaultLogVectorSize, + PforConstants::kPforVectorSize); +} + +TEST(PforConstantsTest, VectorInfoSizes) { + EXPECT_EQ(PforTypeTraits::kVectorInfoSize, 7); + EXPECT_EQ(PforTypeTraits::kVectorInfoSize, 11); +} + +// ====================================================================== +// BitsRequired Tests + +TEST(PforBitsRequiredTest, Int32) { + EXPECT_EQ(PforTypeTraits::BitsRequired(0), 0); + EXPECT_EQ(PforTypeTraits::BitsRequired(1), 1); + EXPECT_EQ(PforTypeTraits::BitsRequired(2), 2); + EXPECT_EQ(PforTypeTraits::BitsRequired(3), 2); + EXPECT_EQ(PforTypeTraits::BitsRequired(255), 8); + EXPECT_EQ(PforTypeTraits::BitsRequired(256), 9); + EXPECT_EQ(PforTypeTraits::BitsRequired(0xFFFFFFFF), 32); +} + +TEST(PforBitsRequiredTest, Int64) { + EXPECT_EQ(PforTypeTraits::BitsRequired(0), 0); + EXPECT_EQ(PforTypeTraits::BitsRequired(1), 1); + EXPECT_EQ(PforTypeTraits::BitsRequired(0xFFFFFFFFFFFFFFFFULL), 64); +} + +// ====================================================================== +// VectorInfo Serialization Tests + +TEST(PforVectorInfoTest, Int32RoundTrip) { + PforVectorInfo info; + info.frame_of_reference = -42; + info.bit_width = 17; + info.num_exceptions = 300; + + uint8_t buf[7]; + info.Store(buf); + auto loaded = PforVectorInfo::Load(buf); + + EXPECT_EQ(loaded.frame_of_reference, -42); + EXPECT_EQ(loaded.bit_width, 17); + EXPECT_EQ(loaded.num_exceptions, 300); +} + +TEST(PforVectorInfoTest, Int64RoundTrip) { + PforVectorInfo info; + info.frame_of_reference = -123456789012345LL; + info.bit_width = 48; + info.num_exceptions = 65000; + + uint8_t buf[11]; + info.Store(buf); + auto loaded = PforVectorInfo::Load(buf); + + EXPECT_EQ(loaded.frame_of_reference, -123456789012345LL); + EXPECT_EQ(loaded.bit_width, 48); + EXPECT_EQ(loaded.num_exceptions, 65000); +} + +// ====================================================================== +// Cost Model Tests + +TEST(PforCostModelTest, AllIdentical) { + // All deltas are 0 => bit_width should be 0, no exceptions + std::vector deltas(100, 0); + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + EXPECT_EQ(result.bit_width, 0); + EXPECT_EQ(result.num_exceptions, 0); +} + +TEST(PforCostModelTest, SingleOutlier) { + // 99 values fit in 3 bits, 1 outlier needs 16 bits + std::vector deltas(100, 5); // all fit in 3 bits + deltas[50] = 50000; // outlier: 16 bits + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + // Cost at bit_width=3: 100*3 + 1*(16+32) = 300 + 48 = 348 + // Cost at bit_width=16: 100*16 + 0 = 1600 + // 348 < 1600, so should pick 3 with 1 exception + EXPECT_EQ(result.bit_width, 3); + EXPECT_EQ(result.num_exceptions, 1); +} + +TEST(PforCostModelTest, NoOutliers) { + // All values fit in 8 bits + std::vector deltas(100); + for (uint32_t i = 0; i < 100; ++i) deltas[i] = i * 2; + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + EXPECT_EQ(result.num_exceptions, 0); + EXPECT_LE(result.bit_width, 8); +} + +// ====================================================================== +// Vector Encode/Decode Round-Trip Tests + +TEST(PforVectorTest, Int32SimpleSequence) { + std::vector values(64); + std::iota(values.begin(), values.end(), 100); + + auto encoded = PforCompression::EncodeVector(values.data(), 64); + EXPECT_EQ(encoded.info.frame_of_reference, 100); + EXPECT_EQ(encoded.info.num_exceptions, 0); + + // Serialize then decode + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 64); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 64, buffer.data()); + + std::vector decoded(64); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 64); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32WithOutlier) { + std::vector values = {100, 102, 101, 103, 100, 99, 50000, 104}; + + auto encoded = PforCompression::EncodeVector(values.data(), 8); + EXPECT_EQ(encoded.info.frame_of_reference, 99); + EXPECT_GT(encoded.info.num_exceptions, 0u); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 8); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 8, buffer.data()); + + std::vector decoded(8); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 8); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32AllIdentical) { + std::vector values(100, 42); + + auto encoded = PforCompression::EncodeVector(values.data(), 100); + EXPECT_EQ(encoded.info.bit_width, 0); + EXPECT_EQ(encoded.info.num_exceptions, 0); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 100); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 100, buffer.data()); + + std::vector decoded(100); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 100); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32NegativeValues) { + std::vector values = {-100, -50, -200, -1, -150}; + + auto encoded = PforCompression::EncodeVector(values.data(), 5); + EXPECT_EQ(encoded.info.frame_of_reference, -200); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 5); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 5, buffer.data()); + + std::vector decoded(5); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 5); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int32MinMaxEdge) { + std::vector values = {std::numeric_limits::min(), + std::numeric_limits::max(), 0, -1, 1}; + + auto encoded = PforCompression::EncodeVector(values.data(), 5); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 5); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 5, buffer.data()); + + std::vector decoded(5); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 5); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int64SimpleSequence) { + std::vector values(64); + std::iota(values.begin(), values.end(), 1000000000LL); + + auto encoded = PforCompression::EncodeVector(values.data(), 64); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 64); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 64, buffer.data()); + + std::vector decoded(64); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 64); + + EXPECT_EQ(values, decoded); +} + +TEST(PforVectorTest, Int64WithOutlier) { + std::vector values(100, 5000000LL); + values[42] = 999999999999LL; // Outlier + + auto encoded = PforCompression::EncodeVector(values.data(), 100); + EXPECT_GT(encoded.info.num_exceptions, 0u); + + size_t serialized_size = + PforCompression::SerializedVectorSize(encoded, 100); + std::vector buffer(serialized_size); + PforCompression::SerializeVector(encoded, 100, buffer.data()); + + std::vector decoded(100); + PforCompression::DecodeVector(decoded.data(), buffer.data(), 100); + + EXPECT_EQ(values, decoded); +} + +// ====================================================================== +// Page-Level Wrapper Tests + +TEST(PforWrapperTest, Int32SmallPage) { + std::vector values = {10, 20, 30, 40, 50}; + + size_t max_size = PforWrapper::GetMaxCompressedSize(5); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 5, compressed.data(), &comp_size); + EXPECT_GT(comp_size, 0u); + + std::vector decoded(5); + PforWrapper::Decode(decoded.data(), 5, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32ExactOneVector) { + std::vector values(1024); + std::iota(values.begin(), values.end(), 0); + + size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + std::vector decoded(1024); + PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32MultipleVectors) { + // 2.5 vectors worth of data + const uint32_t n = 2560; + std::vector values(n); + std::mt19937 rng(42); + std::uniform_int_distribution dist(0, 1000); + for (auto& v : values) v = dist(rng); + + size_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32WithOutliers) { + std::vector values(1024, 100); + // Sprinkle outliers + values[0] = -999999; + values[100] = 888888; + values[500] = 777777; + values[1023] = -123456; + + size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + std::vector decoded(1024); + PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int64MultipleVectors) { + const uint32_t n = 3000; + std::vector values(n); + std::mt19937 rng(123); + std::uniform_int_distribution dist(0, 100000); + for (auto& v : values) v = dist(rng); + // Add outliers + values[0] = 9999999999999LL; + values[1500] = -9999999999999LL; + + size_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32SingleElement) { + std::vector values = {42}; + + size_t max_size = PforWrapper::GetMaxCompressedSize(1); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1, compressed.data(), &comp_size); + + std::vector decoded(1); + PforWrapper::Decode(decoded.data(), 1, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32AllZeros) { + std::vector values(1024, 0); + + size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + // Should compress very well (bit_width = 0) + EXPECT_LT(comp_size, 100u); + + std::vector decoded(1024); + PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +TEST(PforWrapperTest, Int32LargeRandom) { + const uint32_t n = 10000; + std::vector values(n); + std::mt19937 rng(99); + std::uniform_int_distribution dist( + std::numeric_limits::min(), + std::numeric_limits::max()); + for (auto& v : values) v = dist(rng); + + size_t max_size = PforWrapper::GetMaxCompressedSize(n); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); + + std::vector decoded(n); + PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + + EXPECT_EQ(values, decoded); +} + +// ====================================================================== +// Compression Ratio Test + +TEST(PforCompressionRatioTest, ClusteredDataCompresses) { + // Data clustered around 1000 with one outlier + std::vector values(1024); + std::mt19937 rng(42); + std::uniform_int_distribution dist(1000, 1255); + for (auto& v : values) v = dist(rng); + values[500] = 999999; // One outlier + + size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + std::vector compressed(max_size); + size_t comp_size = max_size; + + PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); + + // PLAIN would be 4096 bytes. PFOR should be much smaller. + size_t plain_size = 1024 * sizeof(int32_t); + EXPECT_LT(comp_size, plain_size / 2); +} + +} // namespace arrow::util::pfor diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc new file mode 100644 index 000000000000..5556a5e0bf53 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -0,0 +1,184 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// PFOR page-level wrapper implementation +// +// Page layout: +// [Header 7B] [Offset Array: numVectors * 4B] [Vector 0] [Vector 1] ... +// +// Each vector: +// [PforVectorInfo] [PackedValues] [ExceptionPositions] [ExceptionValues] + +#include "arrow/util/pfor/pfor_wrapper.h" + +#include +#include +#include + +#include "arrow/util/bit_util.h" +#include "arrow/util/logging.h" + +namespace arrow { +namespace util { +namespace pfor { + +// ---------------------------------------------------------------------- +// Header serialization + +template +void PforWrapper::StoreHeader(uint8_t* dest, const PforHeader& header) { + dest[0] = header.packing_mode; + dest[1] = header.log_vector_size; + dest[2] = header.value_byte_width; + uint32_t le_num = header.num_elements; // Assume LE platform + std::memcpy(dest + 3, &le_num, sizeof(uint32_t)); +} + +template +typename PforWrapper::PforHeader PforWrapper::LoadHeader(const uint8_t* src) { + PforHeader header; + header.packing_mode = src[0]; + header.log_vector_size = src[1]; + header.value_byte_width = src[2]; + std::memcpy(&header.num_elements, src + 3, sizeof(uint32_t)); + return header; +} + +// ---------------------------------------------------------------------- +// Encode + +template +void PforWrapper::Encode(const T* values, uint32_t num_values, char* comp, + size_t* comp_size) { + ARROW_DCHECK(num_values > 0); + ARROW_DCHECK(comp != nullptr); + ARROW_DCHECK(comp_size != nullptr); + + const uint32_t vector_size = kVectorSize; + const uint32_t num_vectors = + (num_values + vector_size - 1) / vector_size; + + auto* dest = reinterpret_cast(comp); + + // Step 1: Write header + PforHeader header; + header.packing_mode = PforConstants::kPackingModeForBitPack; + header.log_vector_size = PforConstants::kDefaultLogVectorSize; + header.value_byte_width = sizeof(T); + header.num_elements = num_values; + StoreHeader(dest, header); + uint8_t* write_ptr = dest + PforConstants::kHeaderSize; + + // Step 2: Reserve space for offset array + uint8_t* offset_array_start = write_ptr; + write_ptr += num_vectors * sizeof(uint32_t); + + // Step 3: Encode each vector and build offset array + const uint8_t* data_start = offset_array_start; // Offsets relative to offset array start + + for (uint32_t v = 0; v < num_vectors; ++v) { + // Record offset (from start of offset array) + uint32_t offset = static_cast(write_ptr - data_start); + std::memcpy(offset_array_start + v * sizeof(uint32_t), &offset, sizeof(uint32_t)); + + // Determine elements in this vector + uint32_t start_idx = v * vector_size; + uint32_t elements_in_vector = + std::min(vector_size, num_values - start_idx); + + // Encode vector + auto encoded = PforCompression::EncodeVector( + values + start_idx, elements_in_vector); + + // Serialize to output + size_t bytes_written = PforCompression::SerializeVector( + encoded, elements_in_vector, write_ptr); + write_ptr += bytes_written; + } + + *comp_size = static_cast(write_ptr - dest); +} + +// ---------------------------------------------------------------------- +// Decode + +template +void PforWrapper::Decode(T* values, uint32_t num_values, const char* comp, + size_t comp_size) { + ARROW_DCHECK(num_values > 0); + ARROW_DCHECK(comp != nullptr); + + const auto* src = reinterpret_cast(comp); + + // Step 1: Read header + PforHeader header = LoadHeader(src); + ARROW_DCHECK(header.packing_mode == PforConstants::kPackingModeForBitPack); + ARROW_DCHECK(header.value_byte_width == sizeof(T)); + + const uint32_t vector_size = 1u << header.log_vector_size; + const uint32_t num_vectors = + (header.num_elements + vector_size - 1) / vector_size; + + // Step 2: Read offset array + const uint8_t* offset_array_start = src + PforConstants::kHeaderSize; + + // Step 3: Decode each vector + for (uint32_t v = 0; v < num_vectors; ++v) { + uint32_t offset; + std::memcpy(&offset, offset_array_start + v * sizeof(uint32_t), + sizeof(uint32_t)); + + const uint8_t* vector_data = offset_array_start + offset; + + uint32_t start_idx = v * vector_size; + uint32_t elements_in_vector = + std::min(vector_size, header.num_elements - start_idx); + + PforCompression::DecodeVector( + values + start_idx, vector_data, elements_in_vector); + } +} + +// ---------------------------------------------------------------------- +// GetMaxCompressedSize + +template +size_t PforWrapper::GetMaxCompressedSize(uint32_t num_values) { + const uint32_t vector_size = kVectorSize; + const uint32_t num_vectors = + (num_values + vector_size - 1) / vector_size; + + // Header + offset array + size_t size = PforConstants::kHeaderSize + num_vectors * sizeof(uint32_t); + + // Worst case per vector: full bit width + all exceptions + size_t max_vector_size = PforVectorInfo::kSerializedSize + + vector_size * sizeof(T) // packed at full width + + vector_size * sizeof(uint16_t) // exception positions + + vector_size * sizeof(T); // exception values + + size += num_vectors * max_vector_size; + return size; +} + +// Explicit template instantiations +template class PforWrapper; +template class PforWrapper; + +} // namespace pfor +} // namespace util +} // namespace arrow diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h new file mode 100644 index 000000000000..4a9b6435a5c8 --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// High-level wrapper interface for PFOR compression +// +// Handles page-level serialization: header, offset array, and vectors. + +#pragma once + +#include +#include + +#include "arrow/util/pfor/pfor.h" + +namespace arrow { +namespace util { +namespace pfor { + +/// \class PforWrapper +/// \brief High-level interface for PFOR page-level compression +/// +/// Manages the page layout: [Header 7B] [Offset Array] [Vector 0] [Vector 1] ... +/// +/// \tparam T the integer type (int32_t or int64_t) +template +class PforWrapper { + public: + /// \brief Encode integer values into a PFOR-compressed page + /// + /// \param[in] values pointer to input integers + /// \param[in] num_values total number of values + /// \param[out] comp pointer to output buffer (caller must ensure sufficient size) + /// \param[in,out] comp_size input: available buffer size; output: bytes written + static void Encode(const T* values, uint32_t num_values, char* comp, + size_t* comp_size); + + /// \brief Decode a PFOR-compressed page + /// + /// \param[out] values pointer to output buffer + /// \param[in] num_values number of values to decode (from page context) + /// \param[in] comp pointer to compressed data + /// \param[in] comp_size size of compressed data + static void Decode(T* values, uint32_t num_values, const char* comp, + size_t comp_size); + + /// \brief Get the maximum compressed size for a given number of values + /// + /// \param[in] num_values number of integer values + /// \return maximum possible compressed page size in bytes + static size_t GetMaxCompressedSize(uint32_t num_values); + + private: + /// \brief Page header structure (7 bytes) + struct PforHeader { + uint8_t packing_mode; // 0 = FOR + bit-packing + uint8_t log_vector_size; // log2(vector_size) + uint8_t value_byte_width; // sizeof(T): 4 or 8 + uint32_t num_elements; // total element count + }; + + static constexpr uint32_t kVectorSize = PforConstants::kPforVectorSize; + + static void StoreHeader(uint8_t* dest, const PforHeader& header); + static PforHeader LoadHeader(const uint8_t* src); +}; + +} // namespace pfor +} // namespace util +} // namespace arrow From 248e21e3d57ec3e75072de7ff3ba3b3f9cd57788 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:00:07 +0000 Subject: [PATCH 02/12] Integrate PFOR encoding into parquet encoder/decoder Adds PFOR = 11 to the Encoding enum and wires it into the parquet read/write pipeline: - PforEncoder in encoder.cc (buffers values, calls PforWrapper::Encode) - PforDecoder in decoder.cc (decodes all values on first access) - PFOR case in column_reader.cc InitializeDataDecoder - Encoding string mapping in types.cc Supports INT32 and INT64 column types. --- cpp/src/parquet/column_reader.cc | 3 +- cpp/src/parquet/decoder.cc | 83 ++++++++++++++++++++++++++++++ cpp/src/parquet/encoder.cc | 86 ++++++++++++++++++++++++++++++++ cpp/src/parquet/types.cc | 2 + cpp/src/parquet/types.h | 3 +- 5 files changed, 175 insertions(+), 2 deletions(-) diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc index 79b837f755c3..b39e020b1ab7 100644 --- a/cpp/src/parquet/column_reader.cc +++ b/cpp/src/parquet/column_reader.cc @@ -862,7 +862,8 @@ class ColumnReaderImplBase { case Encoding::RLE: case Encoding::DELTA_BINARY_PACKED: case Encoding::DELTA_BYTE_ARRAY: - case Encoding::DELTA_LENGTH_BYTE_ARRAY: { + case Encoding::DELTA_LENGTH_BYTE_ARRAY: + case Encoding::PFOR: { auto decoder = MakeTypedDecoder(encoding, descr_, pool_); current_decoder_ = decoder.get(); decoders_[static_cast(encoding)] = std::move(decoder); diff --git a/cpp/src/parquet/decoder.cc b/cpp/src/parquet/decoder.cc index 3ce2323d29a1..f080fc14925f 100644 --- a/cpp/src/parquet/decoder.cc +++ b/cpp/src/parquet/decoder.cc @@ -45,6 +45,7 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/spaced_internal.h" +#include "arrow/util/pfor/pfor_wrapper.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" @@ -2323,6 +2324,79 @@ class ByteStreamSplitDecoder : public ByteStreamSplitDecoderBase +class PforDecoder : public TypedDecoderImpl { + public: + using T = typename DType::c_type; + + explicit PforDecoder(const ColumnDescriptor* descr, + MemoryPool* pool = ::arrow::default_memory_pool()) + : TypedDecoderImpl(descr, Encoding::PFOR), pool_(pool) {} + + void SetData(int num_values, const uint8_t* data, int len) override { + this->num_values_ = num_values; + data_ = data; + data_len_ = len; + values_decoded_ = 0; + } + + int Decode(T* buffer, int max_values) override { + int values_to_decode = std::min(max_values, this->num_values_); + if (values_to_decode == 0) return 0; + + // Decode all values on first call, cache for subsequent calls + if (decoded_values_.empty() && this->num_values_ > 0) { + decoded_values_.resize(this->num_values_); + arrow::util::pfor::PforWrapper::Decode( + decoded_values_.data(), static_cast(this->num_values_), + reinterpret_cast(data_), static_cast(data_len_)); + } + + std::memcpy(buffer, decoded_values_.data() + values_decoded_, + values_to_decode * sizeof(T)); + values_decoded_ += values_to_decode; + this->num_values_ -= values_to_decode; + return values_to_decode; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::Accumulator* out) override { + if (null_count != 0) { + ParquetException::NYI("PFOR DecodeArrow with null slots"); + } + std::vector values(num_values); + int decoded_count = Decode(values.data(), num_values); + PARQUET_THROW_NOT_OK(out->AppendValues(values.data(), decoded_count)); + return decoded_count; + } + + int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits, + int64_t valid_bits_offset, + typename EncodingTraits::DictAccumulator* out) override { + if (null_count != 0) { + ParquetException::NYI("PFOR DecodeArrow with null slots"); + } + std::vector values(num_values); + int decoded_count = Decode(values.data(), num_values); + PARQUET_THROW_NOT_OK(out->Reserve(decoded_count)); + for (int i = 0; i < decoded_count; ++i) { + PARQUET_THROW_NOT_OK(out->Append(values[i])); + } + return decoded_count; + } + + private: + MemoryPool* pool_; + const uint8_t* data_ = nullptr; + int data_len_ = 0; + int values_decoded_ = 0; + std::vector decoded_values_; +}; + } // namespace // ---------------------------------------------------------------------- @@ -2399,6 +2473,15 @@ std::unique_ptr MakeDecoder(Type::type type_num, Encoding::type encodin return std::make_unique(descr); } throw ParquetException("RLE encoding only supports BOOLEAN"); + } else if (encoding == Encoding::PFOR) { + switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); + default: + throw ParquetException("PFOR decoder only supports INT32 and INT64"); + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/encoder.cc b/cpp/src/parquet/encoder.cc index 04f079ce70cd..a346bbbc2bfd 100644 --- a/cpp/src/parquet/encoder.cc +++ b/cpp/src/parquet/encoder.cc @@ -41,6 +41,7 @@ #include "arrow/util/logging_internal.h" #include "arrow/util/rle_encoding_internal.h" #include "arrow/util/spaced_internal.h" +#include "arrow/util/pfor/pfor_wrapper.h" #include "arrow/util/ubsan.h" #include "arrow/visit_data_inline.h" @@ -1751,6 +1752,82 @@ std::shared_ptr RleBooleanEncoder::FlushValues() { } // namespace +// ---------------------------------------------------------------------- +// PFOR Encoder + +template +class PforEncoder : public EncoderImpl, virtual public TypedEncoder { + public: + using T = typename DType::c_type; + using TypedEncoder::Put; + + explicit PforEncoder(const ColumnDescriptor* descr, MemoryPool* pool) + : EncoderImpl(descr, Encoding::PFOR, pool), pool_(pool) {} + + std::shared_ptr FlushValues() override { + if (values_.empty()) { + PARQUET_ASSIGN_OR_THROW(auto empty_buf, ::arrow::AllocateBuffer(0, pool_)); + return std::move(empty_buf); + } + + const uint32_t num_values = static_cast(values_.size()); + size_t max_size = + arrow::util::pfor::PforWrapper::GetMaxCompressedSize(num_values); + PARQUET_ASSIGN_OR_THROW(auto buffer, ::arrow::AllocateResizableBuffer( + static_cast(max_size), pool_)); + + size_t comp_size = max_size; + arrow::util::pfor::PforWrapper::Encode( + values_.data(), num_values, + reinterpret_cast(buffer->mutable_data()), &comp_size); + + PARQUET_THROW_NOT_OK(buffer->Resize(static_cast(comp_size))); + values_.clear(); + return std::move(buffer); + } + + int64_t EstimatedDataEncodedSize() override { + return static_cast(values_.size() * sizeof(T)); + } + + void Put(const ::arrow::Array& values) override { + const auto& data = *values.data(); + if (data.length > std::numeric_limits::max()) { + throw ParquetException("Array cannot be longer than ", + std::numeric_limits::max()); + } + if (values.null_count() == 0) { + Put(data.template GetValues(1), static_cast(data.length)); + } else { + PutSpaced(data.template GetValues(1), static_cast(data.length), + data.GetValues(0, 0), data.offset); + } + } + + void Put(const T* buffer, int num_values) override { + values_.insert(values_.end(), buffer, buffer + num_values); + } + + void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits, + int64_t valid_bits_offset) override { + if (valid_bits != NULLPTR) { + PARQUET_ASSIGN_OR_THROW(auto buffer, + ::arrow::AllocateBuffer(num_values * sizeof(T), pool_)); + T* dest = reinterpret_cast(buffer->mutable_data()); + int num_valid = + ::arrow::util::internal::SpacedCompress(src, num_values, valid_bits, + valid_bits_offset, dest); + Put(dest, num_valid); + } else { + Put(src, num_values); + } + } + + private: + MemoryPool* pool_; + std::vector values_; +}; + // ---------------------------------------------------------------------- // Factory function @@ -1850,6 +1927,15 @@ std::unique_ptr MakeEncoder(Type::type type_num, Encoding::type encodin throw ParquetException( "DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY"); } + } else if (encoding == Encoding::PFOR) { + switch (type_num) { + case Type::INT32: + return std::make_unique>(descr, pool); + case Type::INT64: + return std::make_unique>(descr, pool); + default: + throw ParquetException("PFOR encoder only supports INT32 and INT64"); + } } else { ParquetException::NYI("Selected encoding is not supported"); } diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc index fb4eb92a7544..755a7035eabf 100644 --- a/cpp/src/parquet/types.cc +++ b/cpp/src/parquet/types.cc @@ -259,6 +259,8 @@ std::string EncodingToString(Encoding::type t) { return "RLE_DICTIONARY"; case Encoding::BYTE_STREAM_SPLIT: return "BYTE_STREAM_SPLIT"; + case Encoding::PFOR: + return "PFOR"; default: return "UNKNOWN"; } diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 7e8a18fc94d6..a8001518b6db 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -538,8 +538,9 @@ struct Encoding { DELTA_BYTE_ARRAY = 7, RLE_DICTIONARY = 8, BYTE_STREAM_SPLIT = 9, + PFOR = 11, // Should always be last element (except UNKNOWN) - UNDEFINED = 10, + UNDEFINED = 12, UNKNOWN = 999 }; }; From f6ead7e596547b5a9e7bf597efb05138b0669ba5 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Tue, 21 Apr 2026 00:40:10 +0000 Subject: [PATCH 03/12] Add PFOR encoding benchmark Benchmarks encode/decode throughput for int32/int64 across 10 data distributions inspired by Snowflake's NumericComprBenchmark: constant, sequential, small range, high-base-small-range (timestamps), with outliers (exception path), random, TPC-DS date/store/item/quantity keys. Each distribution runs at 1K/10K/100K/1M elements. Reports bytes/s, items/s, and compression ratio. --- cpp/src/arrow/util/CMakeLists.txt | 5 + cpp/src/arrow/util/pfor/pfor_benchmark.cc | 327 ++++++++++++++++++++++ 2 files changed, 332 insertions(+) create mode 100644 cpp/src/arrow/util/pfor/pfor_benchmark.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 3fc6f60d9045..6706861429a9 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -124,6 +124,11 @@ add_arrow_test(pfor-test pfor/pfor.cc pfor/pfor_wrapper.cc) +add_arrow_benchmark(pfor/pfor_benchmark + EXTRA_SOURCES + pfor/pfor.cc + pfor/pfor_wrapper.cc) + add_arrow_benchmark(bit_block_counter_benchmark) add_arrow_benchmark(bit_util_benchmark) add_arrow_benchmark(bitmap_reader_benchmark) diff --git a/cpp/src/arrow/util/pfor/pfor_benchmark.cc b/cpp/src/arrow/util/pfor/pfor_benchmark.cc new file mode 100644 index 000000000000..aa07025dd88f --- /dev/null +++ b/cpp/src/arrow/util/pfor/pfor_benchmark.cc @@ -0,0 +1,327 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// PFOR encoding/decoding benchmarks. +// +// Data distributions are inspired by Snowflake's NumericComprBenchmark.cpp, +// covering the key archetypes that exercise PFOR's cost model differently: +// - Constant: bit_width=0, best case +// - Sequential: small range, ideal FOR +// - SmallRange: clustered random, good FOR compression +// - HighBaseSmallRange: high absolute values, small delta range (timestamps) +// - WithOutliers: tests exception handling path +// - Random: worst case, full bit-width +// - TPC-DS DateSk/StoreSk/ItemSk/Quantity: realistic surrogate key distributions + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "benchmark/benchmark.h" + +#include "arrow/util/pfor/pfor.h" +#include "arrow/util/pfor/pfor_wrapper.h" + +namespace arrow::util::pfor { +namespace { + +// ====================================================================== +// Data Generators + +using Int32Gen = std::vector (*)(int64_t); +using Int64Gen = std::vector (*)(int64_t); + +template +std::vector GenConstant(int64_t n) { + return std::vector(n, static_cast(42)); +} + +template +std::vector GenSequential(int64_t n) { + std::vector v(n); + std::iota(v.begin(), v.end(), static_cast(0)); + return v; +} + +template +std::vector GenSmallRange(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(100000, 200000); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenHighBaseSmallRange(int64_t n) { + std::vector v(n); + const T kBase = static_cast(1704067200); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(0, 1000); + for (auto& x : v) x = kBase + dist(rng); + return v; +} + +template +std::vector GenWithOutliers(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(42); + std::uniform_int_distribution small_dist(1000, 1255); + for (auto& x : v) x = small_dist(rng); + std::uniform_int_distribution pos_dist(0, n - 1); + int num_outliers = std::max(static_cast(1), n / 100); + for (int i = 0; i < num_outliers; ++i) { + v[pos_dist(rng)] = static_cast(std::numeric_limits::max() / 2 + i); + } + return v; +} + +template +std::vector GenRandom(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(99); + std::uniform_int_distribution dist(std::numeric_limits::min(), + std::numeric_limits::max()); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenTpcdsSoldDateSk(int64_t n) { + std::vector v(n); + const T kBase = 2450815; + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(0, 1820); + for (auto& x : v) x = kBase + dist(rng); + return v; +} + +template +std::vector GenTpcdsStoreSk(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution dist(1, 1000); + for (auto& x : v) x = dist(rng); + return v; +} + +template +std::vector GenTpcdsItemSk(int64_t n) { + std::vector v(n); + const T kMax = 100000; + std::mt19937_64 rng(12345); + std::exponential_distribution exp_dist(0.00005); + for (auto& x : v) { + T val = static_cast(exp_dist(rng)); + x = std::min(static_cast(val + 1), kMax); + } + return v; +} + +template +std::vector GenTpcdsQuantity(int64_t n) { + std::vector v(n); + std::mt19937_64 rng(12345); + std::uniform_int_distribution small_dist(1, 10); + std::uniform_int_distribution large_dist(11, 100); + std::uniform_int_distribution chance(0, 99); + for (auto& x : v) { + x = (chance(rng) < 90) ? small_dist(rng) : large_dist(rng); + } + return v; +} + +// ====================================================================== +// Benchmark Core + +template +void BM_PforEncodeImpl(benchmark::State& state, + std::vector (*generator)(int64_t)) { + const int64_t num_values = state.range(0); + auto values = generator(num_values); + + size_t max_size = PforWrapper::GetMaxCompressedSize(num_values); + std::vector compressed(max_size); + + for (auto _ : state) { + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, + compressed.data(), &comp_size); + benchmark::DoNotOptimize(comp_size); + benchmark::ClobberMemory(); + } + + state.SetBytesProcessed(state.iterations() * num_values * + static_cast(sizeof(T))); + state.SetItemsProcessed(state.iterations() * num_values); + + // Report compression ratio + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, compressed.data(), &comp_size); + state.counters["CompRatio%"] = benchmark::Counter( + 100.0 * static_cast(comp_size) / + static_cast(num_values * sizeof(T))); +} + +template +void BM_PforDecodeImpl(benchmark::State& state, + std::vector (*generator)(int64_t)) { + const int64_t num_values = state.range(0); + auto values = generator(num_values); + + size_t max_size = PforWrapper::GetMaxCompressedSize(num_values); + std::vector compressed(max_size); + size_t comp_size = max_size; + PforWrapper::Encode(values.data(), num_values, compressed.data(), &comp_size); + + std::vector decoded(num_values); + + for (auto _ : state) { + PforWrapper::Decode(decoded.data(), num_values, + compressed.data(), comp_size); + benchmark::ClobberMemory(); + } + + state.SetBytesProcessed(state.iterations() * num_values * + static_cast(sizeof(T))); + state.SetItemsProcessed(state.iterations() * num_values); +} + +// ====================================================================== +// Non-template wrappers to avoid comma-in-macro issues with BENCHMARK_CAPTURE + +void BM_PforEncodeInt32(benchmark::State& state, Int32Gen gen) { + BM_PforEncodeImpl(state, gen); +} +void BM_PforDecodeInt32(benchmark::State& state, Int32Gen gen) { + BM_PforDecodeImpl(state, gen); +} +void BM_PforEncodeInt64(benchmark::State& state, Int64Gen gen) { + BM_PforEncodeImpl(state, gen); +} +void BM_PforDecodeInt64(benchmark::State& state, Int64Gen gen) { + BM_PforDecodeImpl(state, gen); +} + +// ====================================================================== +// Benchmark sizes: 1K, 10K, 100K, 1M + +static void CustomArgs(benchmark::internal::Benchmark* b) { + for (int64_t n : {1024, 10240, 102400, 1048576}) { + b->Arg(n); + } +} + +// ====================================================================== +// INT32 Encode + +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsStoreSk, &GenTpcdsStoreSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsItemSk, &GenTpcdsItemSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt32, TpcdsQuantity, + &GenTpcdsQuantity) + ->Apply(CustomArgs); + +// INT32 Decode + +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsStoreSk, &GenTpcdsStoreSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsItemSk, &GenTpcdsItemSk) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt32, TpcdsQuantity, + &GenTpcdsQuantity) + ->Apply(CustomArgs); + +// ====================================================================== +// INT64 Encode + +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforEncodeInt64, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); + +// INT64 Decode + +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Constant, &GenConstant) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Sequential, &GenSequential) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, SmallRange, &GenSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, HighBaseSmallRange, + &GenHighBaseSmallRange) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, WithOutliers, &GenWithOutliers) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, Random, &GenRandom) + ->Apply(CustomArgs); +BENCHMARK_CAPTURE(BM_PforDecodeInt64, TpcdsSoldDateSk, + &GenTpcdsSoldDateSk) + ->Apply(CustomArgs); + +} // namespace +} // namespace arrow::util::pfor From 087372d488075b022f0d84843cd396f313a849c0 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 17:47:14 +0000 Subject: [PATCH 04/12] Use signed integer types consistently per Arrow style guide --- cpp/src/arrow/util/pfor/pfor.cc | 117 +++++++++++------------ cpp/src/arrow/util/pfor/pfor.h | 27 +++--- cpp/src/arrow/util/pfor/pfor_constants.h | 13 +-- cpp/src/arrow/util/pfor/pfor_test.cc | 62 ++++++------ cpp/src/arrow/util/pfor/pfor_wrapper.cc | 56 +++++------ cpp/src/arrow/util/pfor/pfor_wrapper.h | 15 +-- 6 files changed, 142 insertions(+), 148 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 54bb131d1964..08d7490b66e8 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -20,7 +20,7 @@ // Adapted from the Snowflake PFOR encoder (PforEncoder.{hpp,cpp}). // Key differences from the Snowflake implementation: // - Vector size: 1024 (not 2048) -// - Max exceptions: uint16 (not uint8) +// - Max exceptions: int16 (not uint8) // - Exception values: original integers (not FOR offsets) // - Bit packing: Arrow's BitWriter/unpack (not Snowflake's BitPacker) @@ -45,40 +45,40 @@ namespace pfor { template BitWidthResult PforCompression::FindOptimalBitWidth(const UnsignedT* deltas, - uint32_t num_elements) { + int32_t num_elements) { constexpr uint8_t max_bits = PforTypeTraits::kMaxBitWidth; - constexpr uint8_t position_bits = 16; // uint16_t for exception position - constexpr uint8_t value_bits = sizeof(T) * 8; + constexpr int32_t position_bits = 16; + constexpr int32_t value_bits = sizeof(T) * 8; // Build histogram: histogram[b] = count of deltas requiring exactly b bits - std::array histogram{}; // Support up to 64 bits - for (uint32_t i = 0; i < num_elements; ++i) { + std::array histogram{}; + for (int32_t i = 0; i < num_elements; ++i) { uint8_t bits = PforTypeTraits::BitsRequired(deltas[i]); histogram[bits]++; } // Evaluate each candidate bit width - uint64_t best_cost = std::numeric_limits::max(); + int64_t best_cost = std::numeric_limits::max(); uint8_t best_bit_width = max_bits; - uint16_t best_num_exceptions = 0; + int16_t best_num_exceptions = 0; - uint64_t exceptions_above = num_elements; // All start as potential exceptions + int64_t exceptions_above = num_elements; for (uint8_t b = 0; b <= max_bits; ++b) { exceptions_above -= histogram[b]; - if (exceptions_above > PforConstants::kMaxExceptions) { + if (exceptions_above > std::numeric_limits::max()) { continue; } - uint64_t packing_cost = static_cast(num_elements) * b; - uint64_t exception_cost = exceptions_above * (position_bits + value_bits); - uint64_t total_cost = packing_cost + exception_cost; + int64_t packing_cost = static_cast(num_elements) * b; + int64_t exception_cost = exceptions_above * (position_bits + value_bits); + int64_t total_cost = packing_cost + exception_cost; if (total_cost < best_cost) { best_cost = total_cost; best_bit_width = b; - best_num_exceptions = static_cast(exceptions_above); + best_num_exceptions = static_cast(exceptions_above); } } @@ -90,19 +90,19 @@ BitWidthResult PforCompression::FindOptimalBitWidth(const UnsignedT* deltas, template PforEncodedVector PforCompression::EncodeVector(const T* values, - uint32_t num_elements) { + int32_t num_elements) { ARROW_DCHECK(num_elements > 0); // Step 1: Find min (frame of reference) T min_val = values[0]; - for (uint32_t i = 1; i < num_elements; ++i) { + for (int32_t i = 1; i < num_elements; ++i) { if (values[i] < min_val) min_val = values[i]; } // Step 2: Compute unsigned deltas const auto unsigned_min = static_cast(min_val); std::vector deltas(num_elements); - for (uint32_t i = 0; i < num_elements; ++i) { + for (int32_t i = 0; i < num_elements; ++i) { deltas[i] = static_cast(values[i]) - unsigned_min; } @@ -124,24 +124,24 @@ PforEncodedVector PforCompression::EncodeVector(const T* values, ? static_cast(-1) : (static_cast(1) << bit_width) - 1; - for (uint32_t i = 0; i < num_elements; ++i) { + for (int32_t i = 0; i < num_elements; ++i) { if (deltas[i] > mask) { - result.exception_positions.push_back(static_cast(i)); - result.exception_values.push_back(values[i]); // Store ORIGINAL value - deltas[i] = 0; // Placeholder + result.exception_positions.push_back(static_cast(i)); + result.exception_values.push_back(values[i]); + deltas[i] = 0; } } } // Step 5: Bit-pack the deltas if (bit_width > 0) { - size_t packed_size = static_cast( - bit_util::BytesForBits(static_cast(num_elements) * bit_width)); - result.packed_values.resize(packed_size, 0); + int64_t packed_size = + bit_util::BytesForBits(static_cast(num_elements) * bit_width); + result.packed_values.resize(static_cast(packed_size), 0); bit_util::BitWriter writer(result.packed_values.data(), static_cast(packed_size)); - for (uint32_t i = 0; i < num_elements; ++i) { + for (int32_t i = 0; i < num_elements; ++i) { writer.PutValue(static_cast(deltas[i]), bit_width); } writer.Flush(); @@ -154,30 +154,29 @@ PforEncodedVector PforCompression::EncodeVector(const T* values, // DecodeVector template -size_t PforCompression::DecodeVector(T* values, const uint8_t* data, - uint32_t num_elements) { +int64_t PforCompression::DecodeVector(T* values, const uint8_t* data, + int32_t num_elements) { // Step 1: Read vector info auto info = PforVectorInfo::Load(data); - const uint8_t* read_ptr = data + PforVectorInfo::kSerializedSize; + const uint8_t* read_ptr = data + PforVectorInfo::kStoredSize; // Step 2: Handle constant data (bit_width == 0, no exceptions) if (info.bit_width == 0 && info.num_exceptions == 0) { std::fill(values, values + num_elements, info.frame_of_reference); - return PforVectorInfo::kSerializedSize; + return PforVectorInfo::kStoredSize; } // Step 3: Unpack bit-packed deltas and add FOR if (info.bit_width > 0) { - // Use SIMD-optimized unpack for batches of 32 (uint32) or 32 (uint64) constexpr int kBatchSize = 32; - uint32_t full_batches = num_elements / kBatchSize; - uint32_t remainder = num_elements % kBatchSize; + int32_t full_batches = num_elements / kBatchSize; + int32_t remainder = num_elements % kBatchSize; UnsignedT* unsigned_values = reinterpret_cast(values); const auto unsigned_for = static_cast(info.frame_of_reference); // Unpack full batches using SIMD - for (uint32_t batch = 0; batch < full_batches; ++batch) { + for (int32_t batch = 0; batch < full_batches; ++batch) { arrow::internal::unpack(read_ptr, unsigned_values + batch * kBatchSize, kBatchSize, info.bit_width, batch * kBatchSize * info.bit_width); @@ -185,15 +184,14 @@ size_t PforCompression::DecodeVector(T* values, const uint8_t* data, // Unpack remainder using BitReader if (remainder > 0) { - size_t packed_size = static_cast( - bit_util::BytesForBits(static_cast(num_elements) * info.bit_width)); + int64_t packed_size = + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); bit_util::BitReader reader(read_ptr, static_cast(packed_size)); - // Skip past the full batches - for (uint32_t i = 0; i < full_batches * kBatchSize; ++i) { + for (int32_t i = 0; i < full_batches * kBatchSize; ++i) { uint64_t val; reader.GetValue(info.bit_width, &val); } - for (uint32_t i = full_batches * kBatchSize; i < num_elements; ++i) { + for (int32_t i = full_batches * kBatchSize; i < num_elements; ++i) { uint64_t val; reader.GetValue(info.bit_width, &val); unsigned_values[i] = static_cast(val); @@ -201,12 +199,12 @@ size_t PforCompression::DecodeVector(T* values, const uint8_t* data, } // Add FOR to all values - for (uint32_t i = 0; i < num_elements; ++i) { + for (int32_t i = 0; i < num_elements; ++i) { unsigned_values[i] += unsigned_for; } - size_t packed_size = static_cast( - bit_util::BytesForBits(static_cast(num_elements) * info.bit_width)); + int64_t packed_size = + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); read_ptr += packed_size; } else { // bit_width == 0 but has exceptions - fill with FOR @@ -216,14 +214,14 @@ size_t PforCompression::DecodeVector(T* values, const uint8_t* data, // Step 4: Patch exceptions (stored as original values) if (info.num_exceptions > 0) { const uint8_t* positions_ptr = read_ptr; - read_ptr += info.num_exceptions * sizeof(uint16_t); + read_ptr += info.num_exceptions * sizeof(int16_t); const uint8_t* values_ptr = read_ptr; read_ptr += info.num_exceptions * sizeof(T); - for (uint16_t i = 0; i < info.num_exceptions; ++i) { - uint16_t pos; - std::memcpy(&pos, positions_ptr + i * sizeof(uint16_t), sizeof(uint16_t)); + for (int16_t i = 0; i < info.num_exceptions; ++i) { + int16_t pos; + std::memcpy(&pos, positions_ptr + i * sizeof(int16_t), sizeof(int16_t)); T value; std::memcpy(&value, values_ptr + i * sizeof(T), sizeof(T)); @@ -232,34 +230,33 @@ size_t PforCompression::DecodeVector(T* values, const uint8_t* data, } } - return static_cast(read_ptr - data); + return static_cast(read_ptr - data); } // ---------------------------------------------------------------------- // Serialization helpers template -size_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec, - uint32_t num_elements) { - size_t size = PforVectorInfo::kSerializedSize; +int64_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec, + int32_t num_elements) { + int64_t size = PforVectorInfo::kStoredSize; if (vec.info.bit_width > 0) { - size += static_cast( - bit_util::BytesForBits(static_cast(num_elements) * vec.info.bit_width)); + size += bit_util::BytesForBits(static_cast(num_elements) * vec.info.bit_width); } - size += vec.info.num_exceptions * sizeof(uint16_t); // positions - size += vec.info.num_exceptions * sizeof(T); // values + size += vec.info.num_exceptions * static_cast(sizeof(int16_t)); + size += vec.info.num_exceptions * static_cast(sizeof(T)); return size; } template -size_t PforCompression::SerializeVector(const PforEncodedVector& vec, - uint32_t num_elements, - uint8_t* dest) { +int64_t PforCompression::SerializeVector(const PforEncodedVector& vec, + int32_t num_elements, + uint8_t* dest) { uint8_t* write_ptr = dest; // Write vector info vec.info.Store(write_ptr); - write_ptr += PforVectorInfo::kSerializedSize; + write_ptr += PforVectorInfo::kStoredSize; // Write packed values if (vec.info.bit_width > 0 && !vec.packed_values.empty()) { @@ -270,8 +267,8 @@ size_t PforCompression::SerializeVector(const PforEncodedVector& vec, // Write exception positions if (vec.info.num_exceptions > 0) { std::memcpy(write_ptr, vec.exception_positions.data(), - vec.info.num_exceptions * sizeof(uint16_t)); - write_ptr += vec.info.num_exceptions * sizeof(uint16_t); + vec.info.num_exceptions * sizeof(int16_t)); + write_ptr += vec.info.num_exceptions * sizeof(int16_t); // Write exception values (original integers) std::memcpy(write_ptr, vec.exception_values.data(), @@ -279,7 +276,7 @@ size_t PforCompression::SerializeVector(const PforEncodedVector& vec, write_ptr += vec.info.num_exceptions * sizeof(T); } - return static_cast(write_ptr - dest); + return static_cast(write_ptr - dest); } // Explicit template instantiations diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h index af25cbbedfc1..47c85bbeb548 100644 --- a/cpp/src/arrow/util/pfor/pfor.h +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -46,14 +46,13 @@ template struct PforVectorInfo { T frame_of_reference = 0; uint8_t bit_width = 0; - uint16_t num_exceptions = 0; + int16_t num_exceptions = 0; /// \brief Store this info to a byte buffer (little-endian) void Store(uint8_t* dest) const { std::memcpy(dest, &frame_of_reference, sizeof(T)); dest[sizeof(T)] = bit_width; - uint16_t le_exceptions = num_exceptions; // Assume LE platform - std::memcpy(dest + sizeof(T) + 1, &le_exceptions, sizeof(uint16_t)); + std::memcpy(dest + sizeof(T) + 1, &num_exceptions, sizeof(int16_t)); } /// \brief Load this info from a byte buffer (little-endian) @@ -61,12 +60,12 @@ struct PforVectorInfo { PforVectorInfo info; std::memcpy(&info.frame_of_reference, src, sizeof(T)); info.bit_width = src[sizeof(T)]; - std::memcpy(&info.num_exceptions, src + sizeof(T) + 1, sizeof(uint16_t)); + std::memcpy(&info.num_exceptions, src + sizeof(T) + 1, sizeof(int16_t)); return info; } /// \brief Serialized size in bytes - static constexpr uint8_t kSerializedSize = PforTypeTraits::kVectorInfoSize; + static constexpr int64_t kStoredSize = PforTypeTraits::kVectorInfoSize; }; // ---------------------------------------------------------------------- @@ -77,7 +76,7 @@ template struct PforEncodedVector { PforVectorInfo info; std::vector packed_values; - std::vector exception_positions; + std::vector exception_positions; std::vector exception_values; }; @@ -87,7 +86,7 @@ struct PforEncodedVector { /// \brief Result of the optimal bit width search struct BitWidthResult { uint8_t bit_width = 0; - uint16_t num_exceptions = 0; + int16_t num_exceptions = 0; }; // ---------------------------------------------------------------------- @@ -110,14 +109,14 @@ class PforCompression { /// \param[in] num_elements number of elements /// \return the optimal bit width and exception count static BitWidthResult FindOptimalBitWidth(const UnsignedT* deltas, - uint32_t num_elements); + int32_t num_elements); /// \brief Encode a single vector of integers /// /// \param[in] values input integer values /// \param[in] num_elements number of elements (up to vector_size) /// \return the encoded vector with all sections - static PforEncodedVector EncodeVector(const T* values, uint32_t num_elements); + static PforEncodedVector EncodeVector(const T* values, int32_t num_elements); /// \brief Decode a single vector from compressed data /// @@ -125,11 +124,11 @@ class PforCompression { /// \param[in] data pointer to the start of the vector data /// \param[in] num_elements number of elements in this vector /// \return number of bytes consumed from data - static size_t DecodeVector(T* values, const uint8_t* data, uint32_t num_elements); + static int64_t DecodeVector(T* values, const uint8_t* data, int32_t num_elements); /// \brief Calculate the serialized size of an encoded vector - static size_t SerializedVectorSize(const PforEncodedVector& vec, - uint32_t num_elements); + static int64_t SerializedVectorSize(const PforEncodedVector& vec, + int32_t num_elements); /// \brief Serialize an encoded vector to a byte buffer /// @@ -137,8 +136,8 @@ class PforCompression { /// \param[in] num_elements number of elements /// \param[out] dest output buffer (must be large enough) /// \return number of bytes written - static size_t SerializeVector(const PforEncodedVector& vec, - uint32_t num_elements, uint8_t* dest); + static int64_t SerializeVector(const PforEncodedVector& vec, + int32_t num_elements, uint8_t* dest); }; } // namespace pfor diff --git a/cpp/src/arrow/util/pfor/pfor_constants.h b/cpp/src/arrow/util/pfor/pfor_constants.h index 1820ede324f0..244330357c2f 100644 --- a/cpp/src/arrow/util/pfor/pfor_constants.h +++ b/cpp/src/arrow/util/pfor/pfor_constants.h @@ -30,7 +30,7 @@ namespace pfor { class PforConstants { public: /// Number of elements compressed together as a unit. - static constexpr uint32_t kPforVectorSize = 1024; + static constexpr int64_t kPforVectorSize = 1024; /// log2(kPforVectorSize) static constexpr uint8_t kDefaultLogVectorSize = 10; @@ -45,13 +45,10 @@ class PforConstants { using OffsetType = uint32_t; /// Type used to store exception positions within a compressed vector. - using PositionType = uint16_t; - - /// Maximum number of exceptions per vector (uint16 limit). - static constexpr uint16_t kMaxExceptions = 65535; + using PositionType = int16_t; /// Page header size in bytes. - static constexpr uint8_t kHeaderSize = 7; + static constexpr int64_t kHeaderSize = 7; /// Packing mode: FOR + bit-packing (currently the only mode). static constexpr uint8_t kPackingModeForBitPack = 0; @@ -68,7 +65,7 @@ struct PforTypeTraits { static constexpr uint8_t kValueByteWidth = 4; /// PforVectorInfo size: 4B FOR + 1B bitWidth + 2B numExceptions = 7 bytes - static constexpr uint8_t kVectorInfoSize = 7; + static constexpr int64_t kVectorInfoSize = 7; static uint8_t BitsRequired(uint32_t value) { if (value == 0) return 0; @@ -83,7 +80,7 @@ struct PforTypeTraits { static constexpr uint8_t kValueByteWidth = 8; /// PforVectorInfo size: 8B FOR + 1B bitWidth + 2B numExceptions = 11 bytes - static constexpr uint8_t kVectorInfoSize = 11; + static constexpr int64_t kVectorInfoSize = 11; static uint8_t BitsRequired(uint64_t value) { if (value == 0) return 0; diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc index c9e6f77f05ab..698e83b53c84 100644 --- a/cpp/src/arrow/util/pfor/pfor_test.cc +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -33,8 +33,8 @@ namespace arrow::util::pfor { // Constants Tests TEST(PforConstantsTest, VectorSizeIsPowerOfTwo) { - EXPECT_EQ(PforConstants::kPforVectorSize, 1024u); - EXPECT_EQ(1u << PforConstants::kDefaultLogVectorSize, + EXPECT_EQ(PforConstants::kPforVectorSize, 1024); + EXPECT_EQ(1 << PforConstants::kDefaultLogVectorSize, PforConstants::kPforVectorSize); } @@ -84,7 +84,7 @@ TEST(PforVectorInfoTest, Int64RoundTrip) { PforVectorInfo info; info.frame_of_reference = -123456789012345LL; info.bit_width = 48; - info.num_exceptions = 65000; + info.num_exceptions = 30000; uint8_t buf[11]; info.Store(buf); @@ -92,7 +92,7 @@ TEST(PforVectorInfoTest, Int64RoundTrip) { EXPECT_EQ(loaded.frame_of_reference, -123456789012345LL); EXPECT_EQ(loaded.bit_width, 48); - EXPECT_EQ(loaded.num_exceptions, 65000); + EXPECT_EQ(loaded.num_exceptions, 30000); } // ====================================================================== @@ -101,7 +101,7 @@ TEST(PforVectorInfoTest, Int64RoundTrip) { TEST(PforCostModelTest, AllIdentical) { // All deltas are 0 => bit_width should be 0, no exceptions std::vector deltas(100, 0); - auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); + auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); // NOLINT EXPECT_EQ(result.bit_width, 0); EXPECT_EQ(result.num_exceptions, 0); } @@ -121,7 +121,7 @@ TEST(PforCostModelTest, SingleOutlier) { TEST(PforCostModelTest, NoOutliers) { // All values fit in 8 bits std::vector deltas(100); - for (uint32_t i = 0; i < 100; ++i) deltas[i] = i * 2; + for (int32_t i = 0; i < 100; ++i) deltas[i] = i * 2; auto result = PforCompression::FindOptimalBitWidth(deltas.data(), 100); EXPECT_EQ(result.num_exceptions, 0); EXPECT_LE(result.bit_width, 8); @@ -155,7 +155,7 @@ TEST(PforVectorTest, Int32WithOutlier) { auto encoded = PforCompression::EncodeVector(values.data(), 8); EXPECT_EQ(encoded.info.frame_of_reference, 99); - EXPECT_GT(encoded.info.num_exceptions, 0u); + EXPECT_GT(encoded.info.num_exceptions, 0); size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 8); @@ -242,7 +242,7 @@ TEST(PforVectorTest, Int64WithOutlier) { values[42] = 999999999999LL; // Outlier auto encoded = PforCompression::EncodeVector(values.data(), 100); - EXPECT_GT(encoded.info.num_exceptions, 0u); + EXPECT_GT(encoded.info.num_exceptions, 0); size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 100); @@ -261,12 +261,12 @@ TEST(PforVectorTest, Int64WithOutlier) { TEST(PforWrapperTest, Int32SmallPage) { std::vector values = {10, 20, 30, 40, 50}; - size_t max_size = PforWrapper::GetMaxCompressedSize(5); + int64_t max_size = PforWrapper::GetMaxCompressedSize(5); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 5, compressed.data(), &comp_size); - EXPECT_GT(comp_size, 0u); + EXPECT_GT(comp_size, 0); std::vector decoded(5); PforWrapper::Decode(decoded.data(), 5, compressed.data(), comp_size); @@ -278,9 +278,9 @@ TEST(PforWrapperTest, Int32ExactOneVector) { std::vector values(1024); std::iota(values.begin(), values.end(), 0); - size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); @@ -292,15 +292,15 @@ TEST(PforWrapperTest, Int32ExactOneVector) { TEST(PforWrapperTest, Int32MultipleVectors) { // 2.5 vectors worth of data - const uint32_t n = 2560; + const int32_t n = 2560; std::vector values(n); std::mt19937 rng(42); std::uniform_int_distribution dist(0, 1000); for (auto& v : values) v = dist(rng); - size_t max_size = PforWrapper::GetMaxCompressedSize(n); + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); @@ -318,9 +318,9 @@ TEST(PforWrapperTest, Int32WithOutliers) { values[500] = 777777; values[1023] = -123456; - size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); @@ -331,7 +331,7 @@ TEST(PforWrapperTest, Int32WithOutliers) { } TEST(PforWrapperTest, Int64MultipleVectors) { - const uint32_t n = 3000; + const int32_t n = 3000; std::vector values(n); std::mt19937 rng(123); std::uniform_int_distribution dist(0, 100000); @@ -340,9 +340,9 @@ TEST(PforWrapperTest, Int64MultipleVectors) { values[0] = 9999999999999LL; values[1500] = -9999999999999LL; - size_t max_size = PforWrapper::GetMaxCompressedSize(n); + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); @@ -355,9 +355,9 @@ TEST(PforWrapperTest, Int64MultipleVectors) { TEST(PforWrapperTest, Int32SingleElement) { std::vector values = {42}; - size_t max_size = PforWrapper::GetMaxCompressedSize(1); + int64_t max_size = PforWrapper::GetMaxCompressedSize(1); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 1, compressed.data(), &comp_size); @@ -370,14 +370,14 @@ TEST(PforWrapperTest, Int32SingleElement) { TEST(PforWrapperTest, Int32AllZeros) { std::vector values(1024, 0); - size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); // Should compress very well (bit_width = 0) - EXPECT_LT(comp_size, 100u); + EXPECT_LT(comp_size, 100); std::vector decoded(1024); PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); @@ -386,7 +386,7 @@ TEST(PforWrapperTest, Int32AllZeros) { } TEST(PforWrapperTest, Int32LargeRandom) { - const uint32_t n = 10000; + const int32_t n = 10000; std::vector values(n); std::mt19937 rng(99); std::uniform_int_distribution dist( @@ -394,9 +394,9 @@ TEST(PforWrapperTest, Int32LargeRandom) { std::numeric_limits::max()); for (auto& v : values) v = dist(rng); - size_t max_size = PforWrapper::GetMaxCompressedSize(n); + int64_t max_size = PforWrapper::GetMaxCompressedSize(n); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); @@ -417,9 +417,9 @@ TEST(PforCompressionRatioTest, ClusteredDataCompresses) { for (auto& v : values) v = dist(rng); values[500] = 999999; // One outlier - size_t max_size = PforWrapper::GetMaxCompressedSize(1024); + int64_t max_size = PforWrapper::GetMaxCompressedSize(1024); std::vector compressed(max_size); - size_t comp_size = max_size; + int64_t comp_size = max_size; PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc index 5556a5e0bf53..f41ba4738c7c 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.cc +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -44,8 +44,7 @@ void PforWrapper::StoreHeader(uint8_t* dest, const PforHeader& header) { dest[0] = header.packing_mode; dest[1] = header.log_vector_size; dest[2] = header.value_byte_width; - uint32_t le_num = header.num_elements; // Assume LE platform - std::memcpy(dest + 3, &le_num, sizeof(uint32_t)); + std::memcpy(dest + 3, &header.num_elements, sizeof(int32_t)); } template @@ -54,7 +53,7 @@ typename PforWrapper::PforHeader PforWrapper::LoadHeader(const uint8_t* sr header.packing_mode = src[0]; header.log_vector_size = src[1]; header.value_byte_width = src[2]; - std::memcpy(&header.num_elements, src + 3, sizeof(uint32_t)); + std::memcpy(&header.num_elements, src + 3, sizeof(int32_t)); return header; } @@ -62,14 +61,14 @@ typename PforWrapper::PforHeader PforWrapper::LoadHeader(const uint8_t* sr // Encode template -void PforWrapper::Encode(const T* values, uint32_t num_values, char* comp, - size_t* comp_size) { +void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, + int64_t* comp_size) { ARROW_DCHECK(num_values > 0); ARROW_DCHECK(comp != nullptr); ARROW_DCHECK(comp_size != nullptr); - const uint32_t vector_size = kVectorSize; - const uint32_t num_vectors = + const int32_t vector_size = kVectorSize; + const int32_t num_vectors = (num_values + vector_size - 1) / vector_size; auto* dest = reinterpret_cast(comp); @@ -88,16 +87,16 @@ void PforWrapper::Encode(const T* values, uint32_t num_values, char* comp, write_ptr += num_vectors * sizeof(uint32_t); // Step 3: Encode each vector and build offset array - const uint8_t* data_start = offset_array_start; // Offsets relative to offset array start + const uint8_t* data_start = offset_array_start; - for (uint32_t v = 0; v < num_vectors; ++v) { + for (int32_t v = 0; v < num_vectors; ++v) { // Record offset (from start of offset array) uint32_t offset = static_cast(write_ptr - data_start); std::memcpy(offset_array_start + v * sizeof(uint32_t), &offset, sizeof(uint32_t)); // Determine elements in this vector - uint32_t start_idx = v * vector_size; - uint32_t elements_in_vector = + int32_t start_idx = v * vector_size; + int32_t elements_in_vector = std::min(vector_size, num_values - start_idx); // Encode vector @@ -105,20 +104,20 @@ void PforWrapper::Encode(const T* values, uint32_t num_values, char* comp, values + start_idx, elements_in_vector); // Serialize to output - size_t bytes_written = PforCompression::SerializeVector( + int64_t bytes_written = PforCompression::SerializeVector( encoded, elements_in_vector, write_ptr); write_ptr += bytes_written; } - *comp_size = static_cast(write_ptr - dest); + *comp_size = static_cast(write_ptr - dest); } // ---------------------------------------------------------------------- // Decode template -void PforWrapper::Decode(T* values, uint32_t num_values, const char* comp, - size_t comp_size) { +void PforWrapper::Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size) { ARROW_DCHECK(num_values > 0); ARROW_DCHECK(comp != nullptr); @@ -129,23 +128,23 @@ void PforWrapper::Decode(T* values, uint32_t num_values, const char* comp, ARROW_DCHECK(header.packing_mode == PforConstants::kPackingModeForBitPack); ARROW_DCHECK(header.value_byte_width == sizeof(T)); - const uint32_t vector_size = 1u << header.log_vector_size; - const uint32_t num_vectors = + const int32_t vector_size = 1 << header.log_vector_size; + const int32_t num_vectors = (header.num_elements + vector_size - 1) / vector_size; // Step 2: Read offset array const uint8_t* offset_array_start = src + PforConstants::kHeaderSize; // Step 3: Decode each vector - for (uint32_t v = 0; v < num_vectors; ++v) { + for (int32_t v = 0; v < num_vectors; ++v) { uint32_t offset; std::memcpy(&offset, offset_array_start + v * sizeof(uint32_t), sizeof(uint32_t)); const uint8_t* vector_data = offset_array_start + offset; - uint32_t start_idx = v * vector_size; - uint32_t elements_in_vector = + int32_t start_idx = v * vector_size; + int32_t elements_in_vector = std::min(vector_size, header.num_elements - start_idx); PforCompression::DecodeVector( @@ -157,19 +156,20 @@ void PforWrapper::Decode(T* values, uint32_t num_values, const char* comp, // GetMaxCompressedSize template -size_t PforWrapper::GetMaxCompressedSize(uint32_t num_values) { - const uint32_t vector_size = kVectorSize; - const uint32_t num_vectors = +int64_t PforWrapper::GetMaxCompressedSize(int32_t num_values) { + const int32_t vector_size = kVectorSize; + const int32_t num_vectors = (num_values + vector_size - 1) / vector_size; // Header + offset array - size_t size = PforConstants::kHeaderSize + num_vectors * sizeof(uint32_t); + int64_t size = PforConstants::kHeaderSize + + num_vectors * static_cast(sizeof(uint32_t)); // Worst case per vector: full bit width + all exceptions - size_t max_vector_size = PforVectorInfo::kSerializedSize - + vector_size * sizeof(T) // packed at full width - + vector_size * sizeof(uint16_t) // exception positions - + vector_size * sizeof(T); // exception values + int64_t max_vector_size = PforVectorInfo::kStoredSize + + vector_size * static_cast(sizeof(T)) // packed at full width + + vector_size * static_cast(sizeof(int16_t)) // exception positions + + vector_size * static_cast(sizeof(T)); // exception values size += num_vectors * max_vector_size; return size; diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h index 4a9b6435a5c8..2cb8d8573187 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.h +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -45,8 +45,8 @@ class PforWrapper { /// \param[in] num_values total number of values /// \param[out] comp pointer to output buffer (caller must ensure sufficient size) /// \param[in,out] comp_size input: available buffer size; output: bytes written - static void Encode(const T* values, uint32_t num_values, char* comp, - size_t* comp_size); + static void Encode(const T* values, int32_t num_values, char* comp, + int64_t* comp_size); /// \brief Decode a PFOR-compressed page /// @@ -54,14 +54,14 @@ class PforWrapper { /// \param[in] num_values number of values to decode (from page context) /// \param[in] comp pointer to compressed data /// \param[in] comp_size size of compressed data - static void Decode(T* values, uint32_t num_values, const char* comp, - size_t comp_size); + static void Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size); /// \brief Get the maximum compressed size for a given number of values /// /// \param[in] num_values number of integer values /// \return maximum possible compressed page size in bytes - static size_t GetMaxCompressedSize(uint32_t num_values); + static int64_t GetMaxCompressedSize(int32_t num_values); private: /// \brief Page header structure (7 bytes) @@ -69,10 +69,11 @@ class PforWrapper { uint8_t packing_mode; // 0 = FOR + bit-packing uint8_t log_vector_size; // log2(vector_size) uint8_t value_byte_width; // sizeof(T): 4 or 8 - uint32_t num_elements; // total element count + int32_t num_elements; // total element count }; - static constexpr uint32_t kVectorSize = PforConstants::kPforVectorSize; + static constexpr int32_t kVectorSize = + static_cast(PforConstants::kPforVectorSize); static void StoreHeader(uint8_t* dest, const PforHeader& header); static PforHeader LoadHeader(const uint8_t* src); From 90d1b565e6066814bcab37fe9d82dbb3e900a7a7 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 17:54:17 +0000 Subject: [PATCH 05/12] Use arrow::util::span for buffer parameters in Store/Load/Decode/Serialize --- cpp/src/arrow/util/pfor/pfor.cc | 16 ++++++----- cpp/src/arrow/util/pfor/pfor.h | 27 ++++++++++-------- cpp/src/arrow/util/pfor/pfor_test.cc | 9 +++--- cpp/src/arrow/util/pfor/pfor_wrapper.cc | 37 +++++++++++++++---------- cpp/src/arrow/util/pfor/pfor_wrapper.h | 5 ++-- 5 files changed, 56 insertions(+), 38 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 08d7490b66e8..ef853659292f 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -35,6 +35,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bpacking_internal.h" #include "arrow/util/logging.h" +#include "arrow/util/span.h" namespace arrow { namespace util { @@ -154,11 +155,12 @@ PforEncodedVector PforCompression::EncodeVector(const T* values, // DecodeVector template -int64_t PforCompression::DecodeVector(T* values, const uint8_t* data, +int64_t PforCompression::DecodeVector(T* values, + arrow::util::span data, int32_t num_elements) { // Step 1: Read vector info auto info = PforVectorInfo::Load(data); - const uint8_t* read_ptr = data + PforVectorInfo::kStoredSize; + const uint8_t* read_ptr = data.data() + PforVectorInfo::kStoredSize; // Step 2: Handle constant data (bit_width == 0, no exceptions) if (info.bit_width == 0 && info.num_exceptions == 0) { @@ -230,7 +232,7 @@ int64_t PforCompression::DecodeVector(T* values, const uint8_t* data, } } - return static_cast(read_ptr - data); + return static_cast(read_ptr - data.data()); } // ---------------------------------------------------------------------- @@ -251,11 +253,11 @@ int64_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec template int64_t PforCompression::SerializeVector(const PforEncodedVector& vec, int32_t num_elements, - uint8_t* dest) { - uint8_t* write_ptr = dest; + arrow::util::span dest) { + uint8_t* write_ptr = dest.data(); // Write vector info - vec.info.Store(write_ptr); + vec.info.Store(arrow::util::span(write_ptr, PforVectorInfo::kStoredSize)); write_ptr += PforVectorInfo::kStoredSize; // Write packed values @@ -276,7 +278,7 @@ int64_t PforCompression::SerializeVector(const PforEncodedVector& vec, write_ptr += vec.info.num_exceptions * sizeof(T); } - return static_cast(write_ptr - dest); + return static_cast(write_ptr - dest.data()); } // Explicit template instantiations diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h index 47c85bbeb548..93c7624d56e8 100644 --- a/cpp/src/arrow/util/pfor/pfor.h +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -30,6 +30,7 @@ #include #include "arrow/util/pfor/pfor_constants.h" +#include "arrow/util/span.h" namespace arrow { namespace util { @@ -49,18 +50,20 @@ struct PforVectorInfo { int16_t num_exceptions = 0; /// \brief Store this info to a byte buffer (little-endian) - void Store(uint8_t* dest) const { - std::memcpy(dest, &frame_of_reference, sizeof(T)); - dest[sizeof(T)] = bit_width; - std::memcpy(dest + sizeof(T) + 1, &num_exceptions, sizeof(int16_t)); + void Store(arrow::util::span dest) const { + uint8_t* ptr = dest.data(); + std::memcpy(ptr, &frame_of_reference, sizeof(T)); + ptr[sizeof(T)] = bit_width; + std::memcpy(ptr + sizeof(T) + 1, &num_exceptions, sizeof(int16_t)); } /// \brief Load this info from a byte buffer (little-endian) - static PforVectorInfo Load(const uint8_t* src) { + static PforVectorInfo Load(arrow::util::span src) { PforVectorInfo info; - std::memcpy(&info.frame_of_reference, src, sizeof(T)); - info.bit_width = src[sizeof(T)]; - std::memcpy(&info.num_exceptions, src + sizeof(T) + 1, sizeof(int16_t)); + const uint8_t* ptr = src.data(); + std::memcpy(&info.frame_of_reference, ptr, sizeof(T)); + info.bit_width = ptr[sizeof(T)]; + std::memcpy(&info.num_exceptions, ptr + sizeof(T) + 1, sizeof(int16_t)); return info; } @@ -121,10 +124,11 @@ class PforCompression { /// \brief Decode a single vector from compressed data /// /// \param[out] values output buffer for decoded integers - /// \param[in] data pointer to the start of the vector data + /// \param[in] data span over the compressed vector data /// \param[in] num_elements number of elements in this vector /// \return number of bytes consumed from data - static int64_t DecodeVector(T* values, const uint8_t* data, int32_t num_elements); + static int64_t DecodeVector(T* values, arrow::util::span data, + int32_t num_elements); /// \brief Calculate the serialized size of an encoded vector static int64_t SerializedVectorSize(const PforEncodedVector& vec, @@ -137,7 +141,8 @@ class PforCompression { /// \param[out] dest output buffer (must be large enough) /// \return number of bytes written static int64_t SerializeVector(const PforEncodedVector& vec, - int32_t num_elements, uint8_t* dest); + int32_t num_elements, + arrow::util::span dest); }; } // namespace pfor diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc index 698e83b53c84..156313b08ee2 100644 --- a/cpp/src/arrow/util/pfor/pfor_test.cc +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -26,6 +26,7 @@ #include "arrow/util/pfor/pfor.h" #include "arrow/util/pfor/pfor_wrapper.h" +#include "arrow/util/span.h" namespace arrow::util::pfor { @@ -72,8 +73,8 @@ TEST(PforVectorInfoTest, Int32RoundTrip) { info.num_exceptions = 300; uint8_t buf[7]; - info.Store(buf); - auto loaded = PforVectorInfo::Load(buf); + info.Store(arrow::util::span(buf, 7)); + auto loaded = PforVectorInfo::Load(arrow::util::span(buf, 7)); EXPECT_EQ(loaded.frame_of_reference, -42); EXPECT_EQ(loaded.bit_width, 17); @@ -87,8 +88,8 @@ TEST(PforVectorInfoTest, Int64RoundTrip) { info.num_exceptions = 30000; uint8_t buf[11]; - info.Store(buf); - auto loaded = PforVectorInfo::Load(buf); + info.Store(arrow::util::span(buf, 11)); + auto loaded = PforVectorInfo::Load(arrow::util::span(buf, 11)); EXPECT_EQ(loaded.frame_of_reference, -123456789012345LL); EXPECT_EQ(loaded.bit_width, 48); diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc index f41ba4738c7c..638c1d9c1226 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.cc +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -31,6 +31,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/logging.h" +#include "arrow/util/span.h" namespace arrow { namespace util { @@ -40,20 +41,24 @@ namespace pfor { // Header serialization template -void PforWrapper::StoreHeader(uint8_t* dest, const PforHeader& header) { - dest[0] = header.packing_mode; - dest[1] = header.log_vector_size; - dest[2] = header.value_byte_width; - std::memcpy(dest + 3, &header.num_elements, sizeof(int32_t)); +void PforWrapper::StoreHeader(arrow::util::span dest, + const PforHeader& header) { + uint8_t* ptr = dest.data(); + ptr[0] = header.packing_mode; + ptr[1] = header.log_vector_size; + ptr[2] = header.value_byte_width; + std::memcpy(ptr + 3, &header.num_elements, sizeof(int32_t)); } template -typename PforWrapper::PforHeader PforWrapper::LoadHeader(const uint8_t* src) { +typename PforWrapper::PforHeader PforWrapper::LoadHeader( + arrow::util::span src) { PforHeader header; - header.packing_mode = src[0]; - header.log_vector_size = src[1]; - header.value_byte_width = src[2]; - std::memcpy(&header.num_elements, src + 3, sizeof(int32_t)); + const uint8_t* ptr = src.data(); + header.packing_mode = ptr[0]; + header.log_vector_size = ptr[1]; + header.value_byte_width = ptr[2]; + std::memcpy(&header.num_elements, ptr + 3, sizeof(int32_t)); return header; } @@ -79,7 +84,7 @@ void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, header.log_vector_size = PforConstants::kDefaultLogVectorSize; header.value_byte_width = sizeof(T); header.num_elements = num_values; - StoreHeader(dest, header); + StoreHeader(arrow::util::span(dest, PforConstants::kHeaderSize), header); uint8_t* write_ptr = dest + PforConstants::kHeaderSize; // Step 2: Reserve space for offset array @@ -105,7 +110,8 @@ void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, // Serialize to output int64_t bytes_written = PforCompression::SerializeVector( - encoded, elements_in_vector, write_ptr); + encoded, elements_in_vector, + arrow::util::span(write_ptr, dest + *comp_size - write_ptr)); write_ptr += bytes_written; } @@ -124,7 +130,8 @@ void PforWrapper::Decode(T* values, int32_t num_values, const char* comp, const auto* src = reinterpret_cast(comp); // Step 1: Read header - PforHeader header = LoadHeader(src); + PforHeader header = LoadHeader( + arrow::util::span(src, PforConstants::kHeaderSize)); ARROW_DCHECK(header.packing_mode == PforConstants::kPackingModeForBitPack); ARROW_DCHECK(header.value_byte_width == sizeof(T)); @@ -148,7 +155,9 @@ void PforWrapper::Decode(T* values, int32_t num_values, const char* comp, std::min(vector_size, header.num_elements - start_idx); PforCompression::DecodeVector( - values + start_idx, vector_data, elements_in_vector); + values + start_idx, + arrow::util::span(vector_data, src + comp_size - vector_data), + elements_in_vector); } } diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h index 2cb8d8573187..542f371efa5c 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.h +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -25,6 +25,7 @@ #include #include "arrow/util/pfor/pfor.h" +#include "arrow/util/span.h" namespace arrow { namespace util { @@ -75,8 +76,8 @@ class PforWrapper { static constexpr int32_t kVectorSize = static_cast(PforConstants::kPforVectorSize); - static void StoreHeader(uint8_t* dest, const PforHeader& header); - static PforHeader LoadHeader(const uint8_t* src); + static void StoreHeader(arrow::util::span dest, const PforHeader& header); + static PforHeader LoadHeader(arrow::util::span src); }; } // namespace pfor From 8a5450a38d3f253bedad1c0225469d79a3262176 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:01:33 +0000 Subject: [PATCH 06/12] Return Result/Status on decode paths instead of ARROW_DCHECK --- cpp/src/arrow/util/pfor/pfor.cc | 16 ++++++--- cpp/src/arrow/util/pfor/pfor.h | 14 +++++--- cpp/src/arrow/util/pfor/pfor_test.cc | 45 +++++++++++++------------ cpp/src/arrow/util/pfor/pfor_wrapper.cc | 32 +++++++++++++----- cpp/src/arrow/util/pfor/pfor_wrapper.h | 6 ++-- 5 files changed, 73 insertions(+), 40 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index ef853659292f..354ea7cf81fc 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -35,6 +35,7 @@ #include "arrow/util/bit_util.h" #include "arrow/util/bpacking_internal.h" #include "arrow/util/logging.h" +#include "arrow/util/macros.h" #include "arrow/util/span.h" namespace arrow { @@ -155,13 +156,20 @@ PforEncodedVector PforCompression::EncodeVector(const T* values, // DecodeVector template -int64_t PforCompression::DecodeVector(T* values, - arrow::util::span data, - int32_t num_elements) { +Result PforCompression::DecodeVector(T* values, + arrow::util::span data, + int32_t num_elements) { // Step 1: Read vector info - auto info = PforVectorInfo::Load(data); + ARROW_ASSIGN_OR_RAISE(auto info, PforVectorInfo::Load(data)); const uint8_t* read_ptr = data.data() + PforVectorInfo::kStoredSize; + if (info.bit_width > PforTypeTraits::kMaxBitWidth) { + return Status::Invalid("PFOR bit_width out of range: ", info.bit_width); + } + if (info.num_exceptions < 0) { + return Status::Invalid("PFOR num_exceptions negative: ", info.num_exceptions); + } + // Step 2: Handle constant data (bit_width == 0, no exceptions) if (info.bit_width == 0 && info.num_exceptions == 0) { std::fill(values, values + num_elements, info.frame_of_reference); diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h index 93c7624d56e8..258bdbf74efd 100644 --- a/cpp/src/arrow/util/pfor/pfor.h +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -29,6 +29,8 @@ #include #include +#include "arrow/result.h" +#include "arrow/status.h" #include "arrow/util/pfor/pfor_constants.h" #include "arrow/util/span.h" @@ -58,7 +60,11 @@ struct PforVectorInfo { } /// \brief Load this info from a byte buffer (little-endian) - static PforVectorInfo Load(arrow::util::span src) { + static Result Load(arrow::util::span src) { + if (src.size() < static_cast(kStoredSize)) { + return Status::Invalid("PFOR vector info buffer too small: ", src.size(), + " < ", kStoredSize); + } PforVectorInfo info; const uint8_t* ptr = src.data(); std::memcpy(&info.frame_of_reference, ptr, sizeof(T)); @@ -126,9 +132,9 @@ class PforCompression { /// \param[out] values output buffer for decoded integers /// \param[in] data span over the compressed vector data /// \param[in] num_elements number of elements in this vector - /// \return number of bytes consumed from data - static int64_t DecodeVector(T* values, arrow::util::span data, - int32_t num_elements); + /// \return number of bytes consumed from data, or error + static Result DecodeVector(T* values, arrow::util::span data, + int32_t num_elements); /// \brief Calculate the serialized size of an encoded vector static int64_t SerializedVectorSize(const PforEncodedVector& vec, diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc index 156313b08ee2..5b9673445f0c 100644 --- a/cpp/src/arrow/util/pfor/pfor_test.cc +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -24,6 +24,7 @@ #include #include +#include "arrow/testing/gtest_util.h" #include "arrow/util/pfor/pfor.h" #include "arrow/util/pfor/pfor_wrapper.h" #include "arrow/util/span.h" @@ -143,10 +144,10 @@ TEST(PforVectorTest, Int32SimpleSequence) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 64); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 64, buffer.data()); + PforCompression::SerializeVector(encoded, 64, buffer); std::vector decoded(64); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 64); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 64)); EXPECT_EQ(values, decoded); } @@ -161,10 +162,10 @@ TEST(PforVectorTest, Int32WithOutlier) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 8); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 8, buffer.data()); + PforCompression::SerializeVector(encoded, 8, buffer); std::vector decoded(8); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 8); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 8)); EXPECT_EQ(values, decoded); } @@ -179,10 +180,10 @@ TEST(PforVectorTest, Int32AllIdentical) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 100); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 100, buffer.data()); + PforCompression::SerializeVector(encoded, 100, buffer); std::vector decoded(100); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 100); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 100)); EXPECT_EQ(values, decoded); } @@ -196,10 +197,10 @@ TEST(PforVectorTest, Int32NegativeValues) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 5); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 5, buffer.data()); + PforCompression::SerializeVector(encoded, 5, buffer); std::vector decoded(5); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 5); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 5)); EXPECT_EQ(values, decoded); } @@ -213,10 +214,10 @@ TEST(PforVectorTest, Int32MinMaxEdge) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 5); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 5, buffer.data()); + PforCompression::SerializeVector(encoded, 5, buffer); std::vector decoded(5); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 5); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 5)); EXPECT_EQ(values, decoded); } @@ -230,10 +231,10 @@ TEST(PforVectorTest, Int64SimpleSequence) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 64); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 64, buffer.data()); + PforCompression::SerializeVector(encoded, 64, buffer); std::vector decoded(64); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 64); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 64)); EXPECT_EQ(values, decoded); } @@ -248,10 +249,10 @@ TEST(PforVectorTest, Int64WithOutlier) { size_t serialized_size = PforCompression::SerializedVectorSize(encoded, 100); std::vector buffer(serialized_size); - PforCompression::SerializeVector(encoded, 100, buffer.data()); + PforCompression::SerializeVector(encoded, 100, buffer); std::vector decoded(100); - PforCompression::DecodeVector(decoded.data(), buffer.data(), 100); + ASSERT_OK(PforCompression::DecodeVector(decoded.data(), buffer, 100)); EXPECT_EQ(values, decoded); } @@ -270,7 +271,7 @@ TEST(PforWrapperTest, Int32SmallPage) { EXPECT_GT(comp_size, 0); std::vector decoded(5); - PforWrapper::Decode(decoded.data(), 5, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 5, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -286,7 +287,7 @@ TEST(PforWrapperTest, Int32ExactOneVector) { PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); std::vector decoded(1024); - PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -306,7 +307,7 @@ TEST(PforWrapperTest, Int32MultipleVectors) { PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); std::vector decoded(n); - PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -326,7 +327,7 @@ TEST(PforWrapperTest, Int32WithOutliers) { PforWrapper::Encode(values.data(), 1024, compressed.data(), &comp_size); std::vector decoded(1024); - PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -348,7 +349,7 @@ TEST(PforWrapperTest, Int64MultipleVectors) { PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); std::vector decoded(n); - PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -363,7 +364,7 @@ TEST(PforWrapperTest, Int32SingleElement) { PforWrapper::Encode(values.data(), 1, compressed.data(), &comp_size); std::vector decoded(1); - PforWrapper::Decode(decoded.data(), 1, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -381,7 +382,7 @@ TEST(PforWrapperTest, Int32AllZeros) { EXPECT_LT(comp_size, 100); std::vector decoded(1024); - PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), 1024, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } @@ -402,7 +403,7 @@ TEST(PforWrapperTest, Int32LargeRandom) { PforWrapper::Encode(values.data(), n, compressed.data(), &comp_size); std::vector decoded(n); - PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size); + ASSERT_OK(PforWrapper::Decode(decoded.data(), n, compressed.data(), comp_size)); EXPECT_EQ(values, decoded); } diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc index 638c1d9c1226..8aefd32871e4 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.cc +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -122,18 +122,32 @@ void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, // Decode template -void PforWrapper::Decode(T* values, int32_t num_values, const char* comp, - int64_t comp_size) { - ARROW_DCHECK(num_values > 0); - ARROW_DCHECK(comp != nullptr); +Status PforWrapper::Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size) { + if (num_values <= 0) { + return Status::Invalid("PFOR num_values must be positive: ", num_values); + } + if (comp == nullptr) { + return Status::Invalid("PFOR compressed data pointer is null"); + } + if (comp_size < PforConstants::kHeaderSize) { + return Status::Invalid("PFOR compressed buffer too small for header: ", comp_size, + " < ", PforConstants::kHeaderSize); + } const auto* src = reinterpret_cast(comp); // Step 1: Read header PforHeader header = LoadHeader( arrow::util::span(src, PforConstants::kHeaderSize)); - ARROW_DCHECK(header.packing_mode == PforConstants::kPackingModeForBitPack); - ARROW_DCHECK(header.value_byte_width == sizeof(T)); + + if (header.packing_mode != PforConstants::kPackingModeForBitPack) { + return Status::Invalid("PFOR unsupported packing mode: ", header.packing_mode); + } + if (header.value_byte_width != sizeof(T)) { + return Status::Invalid("PFOR value_byte_width mismatch: ", header.value_byte_width, + " vs expected ", sizeof(T)); + } const int32_t vector_size = 1 << header.log_vector_size; const int32_t num_vectors = @@ -154,11 +168,13 @@ void PforWrapper::Decode(T* values, int32_t num_values, const char* comp, int32_t elements_in_vector = std::min(vector_size, header.num_elements - start_idx); - PforCompression::DecodeVector( + ARROW_RETURN_NOT_OK(PforCompression::DecodeVector( values + start_idx, arrow::util::span(vector_data, src + comp_size - vector_data), - elements_in_vector); + elements_in_vector)); } + + return Status::OK(); } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h index 542f371efa5c..003a8b4693e6 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.h +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -24,6 +24,7 @@ #include #include +#include "arrow/status.h" #include "arrow/util/pfor/pfor.h" #include "arrow/util/span.h" @@ -55,8 +56,9 @@ class PforWrapper { /// \param[in] num_values number of values to decode (from page context) /// \param[in] comp pointer to compressed data /// \param[in] comp_size size of compressed data - static void Decode(T* values, int32_t num_values, const char* comp, - int64_t comp_size); + /// \return Status::OK on success, or an error if the data is malformed + static Status Decode(T* values, int32_t num_values, const char* comp, + int64_t comp_size); /// \brief Get the maximum compressed size for a given number of values /// From eb49c6b6d0fff66e3b2e853c0edbfd1353319dd8 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:03:23 +0000 Subject: [PATCH 07/12] Add static_assert(ARROW_LITTLE_ENDIAN) and replace reinterpret_cast with SafeCopy --- cpp/src/arrow/util/pfor/pfor.cc | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 354ea7cf81fc..41d94c3aba9f 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -34,14 +34,19 @@ #include "arrow/util/bit_stream_utils_internal.h" #include "arrow/util/bit_util.h" #include "arrow/util/bpacking_internal.h" +#include "arrow/util/endian.h" #include "arrow/util/logging.h" #include "arrow/util/macros.h" #include "arrow/util/span.h" +#include "arrow/util/ubsan.h" namespace arrow { namespace util { namespace pfor { +static_assert(ARROW_LITTLE_ENDIAN, + "PFOR serialization assumes little-endian byte order"); + // ---------------------------------------------------------------------- // FindOptimalBitWidth: histogram-based cost model @@ -182,12 +187,12 @@ Result PforCompression::DecodeVector(T* values, int32_t full_batches = num_elements / kBatchSize; int32_t remainder = num_elements % kBatchSize; - UnsignedT* unsigned_values = reinterpret_cast(values); + std::vector unsigned_values(num_elements); const auto unsigned_for = static_cast(info.frame_of_reference); // Unpack full batches using SIMD for (int32_t batch = 0; batch < full_batches; ++batch) { - arrow::internal::unpack(read_ptr, unsigned_values + batch * kBatchSize, + arrow::internal::unpack(read_ptr, unsigned_values.data() + batch * kBatchSize, kBatchSize, info.bit_width, batch * kBatchSize * info.bit_width); } @@ -208,9 +213,10 @@ Result PforCompression::DecodeVector(T* values, } } - // Add FOR to all values + // Add FOR and convert to signed output via SafeCopy for (int32_t i = 0; i < num_elements; ++i) { unsigned_values[i] += unsigned_for; + values[i] = util::SafeCopy(unsigned_values[i]); } int64_t packed_size = From 6f3e91cfa4341310597d892e6c2db091e8cefc13 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:04:20 +0000 Subject: [PATCH 08/12] Use single-call unpack() instead of manual batch loop + BitReader remainder --- cpp/src/arrow/util/pfor/pfor.cc | 30 ++++-------------------------- 1 file changed, 4 insertions(+), 26 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 41d94c3aba9f..1a50a788e7f6 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -183,35 +183,13 @@ Result PforCompression::DecodeVector(T* values, // Step 3: Unpack bit-packed deltas and add FOR if (info.bit_width > 0) { - constexpr int kBatchSize = 32; - int32_t full_batches = num_elements / kBatchSize; - int32_t remainder = num_elements % kBatchSize; - std::vector unsigned_values(num_elements); const auto unsigned_for = static_cast(info.frame_of_reference); - // Unpack full batches using SIMD - for (int32_t batch = 0; batch < full_batches; ++batch) { - arrow::internal::unpack(read_ptr, unsigned_values.data() + batch * kBatchSize, - kBatchSize, info.bit_width, - batch * kBatchSize * info.bit_width); - } - - // Unpack remainder using BitReader - if (remainder > 0) { - int64_t packed_size = - bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); - bit_util::BitReader reader(read_ptr, static_cast(packed_size)); - for (int32_t i = 0; i < full_batches * kBatchSize; ++i) { - uint64_t val; - reader.GetValue(info.bit_width, &val); - } - for (int32_t i = full_batches * kBatchSize; i < num_elements; ++i) { - uint64_t val; - reader.GetValue(info.bit_width, &val); - unsigned_values[i] = static_cast(val); - } - } + // Arrow's unpack handles arbitrary sizes: SIMD for complete batches, + // then unpack_exact for the remainder. + arrow::internal::unpack(read_ptr, unsigned_values.data(), + static_cast(num_elements), info.bit_width); // Add FOR and convert to signed output via SafeCopy for (int32_t i = 0; i < num_elements; ++i) { From 8501f9a50ddfdad7ce5ad044bb97b74392645cc5 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:05:32 +0000 Subject: [PATCH 09/12] Add pragma GCC unroll/ivdep to decode loops for better vectorization --- cpp/src/arrow/util/pfor/pfor.cc | 4 ++++ cpp/src/arrow/util/pfor/pfor_constants.h | 3 +++ 2 files changed, 7 insertions(+) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 1a50a788e7f6..7028be94ffdf 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -192,6 +192,8 @@ Result PforCompression::DecodeVector(T* values, static_cast(num_elements), info.bit_width); // Add FOR and convert to signed output via SafeCopy +#pragma GCC unroll PforConstants::kLoopUnrolls +#pragma GCC ivdep for (int32_t i = 0; i < num_elements; ++i) { unsigned_values[i] += unsigned_for; values[i] = util::SafeCopy(unsigned_values[i]); @@ -213,6 +215,8 @@ Result PforCompression::DecodeVector(T* values, const uint8_t* values_ptr = read_ptr; read_ptr += info.num_exceptions * sizeof(T); +#pragma GCC unroll PforConstants::kLoopUnrolls +#pragma GCC ivdep for (int16_t i = 0; i < info.num_exceptions; ++i) { int16_t pos; std::memcpy(&pos, positions_ptr + i * sizeof(int16_t), sizeof(int16_t)); diff --git a/cpp/src/arrow/util/pfor/pfor_constants.h b/cpp/src/arrow/util/pfor/pfor_constants.h index 244330357c2f..18e63b962e4c 100644 --- a/cpp/src/arrow/util/pfor/pfor_constants.h +++ b/cpp/src/arrow/util/pfor/pfor_constants.h @@ -52,6 +52,9 @@ class PforConstants { /// Packing mode: FOR + bit-packing (currently the only mode). static constexpr uint8_t kPackingModeForBitPack = 0; + + /// Loop unroll factor for compiler hints in decode loops. + static constexpr int64_t kLoopUnrolls = 4; }; /// \brief Type traits for PFOR integer types From 4c6b0e2f63062c19253b200e8000e03fe931f279 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:06:43 +0000 Subject: [PATCH 10/12] Add PforEncodedVectorView for zero-copy decode path --- cpp/src/arrow/util/pfor/pfor.cc | 44 +++++++++++++++++++++++++++++++++ cpp/src/arrow/util/pfor/pfor.h | 24 ++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/cpp/src/arrow/util/pfor/pfor.cc b/cpp/src/arrow/util/pfor/pfor.cc index 7028be94ffdf..e9e7372e7154 100644 --- a/cpp/src/arrow/util/pfor/pfor.cc +++ b/cpp/src/arrow/util/pfor/pfor.cc @@ -234,6 +234,50 @@ Result PforCompression::DecodeVector(T* values, // ---------------------------------------------------------------------- // Serialization helpers +// ---------------------------------------------------------------------- +// PforEncodedVectorView::LoadView + +template +Result> PforEncodedVectorView::LoadView( + arrow::util::span data, int32_t num_elements) { + ARROW_ASSIGN_OR_RAISE(auto info, PforVectorInfo::Load(data)); + + PforEncodedVectorView view; + view.info = info; + view.num_elements = num_elements; + + const uint8_t* ptr = data.data() + PforVectorInfo::kStoredSize; + + // packed_values: zero-copy span into the buffer + int64_t packed_size = 0; + if (info.bit_width > 0) { + packed_size = + bit_util::BytesForBits(static_cast(num_elements) * info.bit_width); + view.packed_values = arrow::util::span(ptr, packed_size); + ptr += packed_size; + } + + // Exception positions and values: copy into aligned storage + if (info.num_exceptions > 0) { + view.exception_positions.resize(info.num_exceptions); + std::memcpy(view.exception_positions.data(), ptr, + info.num_exceptions * sizeof(int16_t)); + ptr += info.num_exceptions * sizeof(int16_t); + + view.exception_values.resize(info.num_exceptions); + std::memcpy(view.exception_values.data(), ptr, + info.num_exceptions * sizeof(T)); + } + + return view; +} + +template struct PforEncodedVectorView; +template struct PforEncodedVectorView; + +// ---------------------------------------------------------------------- +// Serialization helpers + template int64_t PforCompression::SerializedVectorSize(const PforEncodedVector& vec, int32_t num_elements) { diff --git a/cpp/src/arrow/util/pfor/pfor.h b/cpp/src/arrow/util/pfor/pfor.h index 258bdbf74efd..d2160bac8cd6 100644 --- a/cpp/src/arrow/util/pfor/pfor.h +++ b/cpp/src/arrow/util/pfor/pfor.h @@ -89,6 +89,30 @@ struct PforEncodedVector { std::vector exception_values; }; +// ---------------------------------------------------------------------- +// Zero-copy encoded vector view + +/// \brief A zero-copy view over a serialized PFOR vector +/// +/// The packed_values span points directly into the compressed buffer. +/// Exception positions and values are copied into aligned storage. +template +struct PforEncodedVectorView { + PforVectorInfo info; + int32_t num_elements = 0; + arrow::util::span packed_values; + std::vector exception_positions; + std::vector exception_values; + + /// \brief Create a zero-copy view from a serialized vector buffer + /// + /// \param[in] data span over the serialized vector data + /// \param[in] num_elements number of elements in this vector + /// \return the view, or an error if the buffer is too small + static Result LoadView( + arrow::util::span data, int32_t num_elements); +}; + // ---------------------------------------------------------------------- // Cost model result From 3b608b2944f6050be7ac084a6a5e37459780a4af Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 18:08:35 +0000 Subject: [PATCH 11/12] Make vector_size configurable on encode path with default kPforVectorSize --- cpp/src/arrow/util/pfor/pfor_wrapper.cc | 21 +++++++++++++++------ cpp/src/arrow/util/pfor/pfor_wrapper.h | 11 ++++++++++- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.cc b/cpp/src/arrow/util/pfor/pfor_wrapper.cc index 8aefd32871e4..15ae79cb5610 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.cc +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.cc @@ -66,22 +66,26 @@ typename PforWrapper::PforHeader PforWrapper::LoadHeader( // Encode template -void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, - int64_t* comp_size) { +void PforWrapper::Encode(const T* values, int32_t num_values, int32_t vector_size, + char* comp, int64_t* comp_size) { ARROW_DCHECK(num_values > 0); ARROW_DCHECK(comp != nullptr); ARROW_DCHECK(comp_size != nullptr); + ARROW_DCHECK((vector_size & (vector_size - 1)) == 0); - const int32_t vector_size = kVectorSize; const int32_t num_vectors = (num_values + vector_size - 1) / vector_size; + // Compute log2(vector_size) + uint8_t log_vector_size = 0; + for (int32_t v = vector_size; v > 1; v >>= 1) ++log_vector_size; + auto* dest = reinterpret_cast(comp); // Step 1: Write header PforHeader header; header.packing_mode = PforConstants::kPackingModeForBitPack; - header.log_vector_size = PforConstants::kDefaultLogVectorSize; + header.log_vector_size = log_vector_size; header.value_byte_width = sizeof(T); header.num_elements = num_values; StoreHeader(arrow::util::span(dest, PforConstants::kHeaderSize), header); @@ -118,6 +122,12 @@ void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, *comp_size = static_cast(write_ptr - dest); } +template +void PforWrapper::Encode(const T* values, int32_t num_values, char* comp, + int64_t* comp_size) { + Encode(values, num_values, kVectorSize, comp, comp_size); +} + // ---------------------------------------------------------------------- // Decode @@ -181,8 +191,7 @@ Status PforWrapper::Decode(T* values, int32_t num_values, const char* comp, // GetMaxCompressedSize template -int64_t PforWrapper::GetMaxCompressedSize(int32_t num_values) { - const int32_t vector_size = kVectorSize; +int64_t PforWrapper::GetMaxCompressedSize(int32_t num_values, int32_t vector_size) { const int32_t num_vectors = (num_values + vector_size - 1) / vector_size; diff --git a/cpp/src/arrow/util/pfor/pfor_wrapper.h b/cpp/src/arrow/util/pfor/pfor_wrapper.h index 003a8b4693e6..5512af4f37b4 100644 --- a/cpp/src/arrow/util/pfor/pfor_wrapper.h +++ b/cpp/src/arrow/util/pfor/pfor_wrapper.h @@ -45,8 +45,14 @@ class PforWrapper { /// /// \param[in] values pointer to input integers /// \param[in] num_values total number of values + /// \param[in] vector_size number of elements per vector (must be a power of 2, + /// in [2^kMinLogVectorSize, 2^kMaxLogVectorSize]) /// \param[out] comp pointer to output buffer (caller must ensure sufficient size) /// \param[in,out] comp_size input: available buffer size; output: bytes written + static void Encode(const T* values, int32_t num_values, int32_t vector_size, + char* comp, int64_t* comp_size); + + /// Convenience overload with default vector_size = kPforVectorSize static void Encode(const T* values, int32_t num_values, char* comp, int64_t* comp_size); @@ -63,8 +69,11 @@ class PforWrapper { /// \brief Get the maximum compressed size for a given number of values /// /// \param[in] num_values number of integer values + /// \param[in] vector_size number of elements per vector /// \return maximum possible compressed page size in bytes - static int64_t GetMaxCompressedSize(int32_t num_values); + static int64_t GetMaxCompressedSize( + int32_t num_values, + int32_t vector_size = static_cast(PforConstants::kPforVectorSize)); private: /// \brief Page header structure (7 bytes) From 98acdc2ce0ea88d055a5a508e164d6702d448a70 Mon Sep 17 00:00:00 2001 From: Prateek Gaur Date: Wed, 3 Jun 2026 19:23:55 +0000 Subject: [PATCH 12/12] Fix pfor_test.cc: unwrap Result<> from PforVectorInfo::Load() Load() now returns Result after the Status/Result refactoring. Use ASSERT_OK_AND_ASSIGN to properly unwrap the result in tests. --- cpp/src/arrow/util/pfor/pfor_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/pfor/pfor_test.cc b/cpp/src/arrow/util/pfor/pfor_test.cc index 5b9673445f0c..665a85ad714d 100644 --- a/cpp/src/arrow/util/pfor/pfor_test.cc +++ b/cpp/src/arrow/util/pfor/pfor_test.cc @@ -75,7 +75,8 @@ TEST(PforVectorInfoTest, Int32RoundTrip) { uint8_t buf[7]; info.Store(arrow::util::span(buf, 7)); - auto loaded = PforVectorInfo::Load(arrow::util::span(buf, 7)); + ASSERT_OK_AND_ASSIGN(auto loaded, + PforVectorInfo::Load(arrow::util::span(buf, 7))); EXPECT_EQ(loaded.frame_of_reference, -42); EXPECT_EQ(loaded.bit_width, 17); @@ -90,7 +91,8 @@ TEST(PforVectorInfoTest, Int64RoundTrip) { uint8_t buf[11]; info.Store(arrow::util::span(buf, 11)); - auto loaded = PforVectorInfo::Load(arrow::util::span(buf, 11)); + ASSERT_OK_AND_ASSIGN(auto loaded, + PforVectorInfo::Load(arrow::util::span(buf, 11))); EXPECT_EQ(loaded.frame_of_reference, -123456789012345LL); EXPECT_EQ(loaded.bit_width, 48);