Skip to content
Open
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ add_library(
src/config.cpp
src/cuda_event.cpp
src/integrations/cudf/bloom_filter.cu
src/integrations/cudf/pack.cpp
src/integrations/cudf/partition.cpp
src/integrations/cudf/utils.cpp
src/memory/buffer.cpp
Expand Down
150 changes: 150 additions & 0 deletions cpp/include/rapidsmpf/integrations/cudf/pack.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2024-2026, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <memory>

#include <cudf/table/table_view.hpp>
#include <rmm/cuda_stream_view.hpp>

#include <rapidsmpf/memory/buffer.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/memory/memory_type.hpp>
#include <rapidsmpf/memory/packed_data.hpp>

namespace rapidsmpf {


/**
* @brief Pack a cudf table view into a contiguous buffer using chunked packing.
*
* This function serializes the given table view into a `PackedData` object
* using a bounce buffer for chunked transfer. This is useful when packing to
* host memory to avoid allocating temporary device memory for the entire table.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param bounce_buffer Device buffer used as intermediate storage during chunked packing.
* @param pack_temp_mr Temporary memory resource used for packing.
* @param reservation Memory reservation to use for allocating the packed data buffer.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::chunked_pack
*/
[[nodiscard]] std::unique_ptr<PackedData> chunked_pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
rmm::device_buffer& bounce_buffer,
rmm::device_async_resource_ref pack_temp_mr,
MemoryReservation& reservation
);

namespace detail {

/**
* @brief Pack a cudf table view into a contiguous device buffer.
*
* Uses cudf::pack(). Returns a `PackedData` with a `Buffer` backed by
* `rmm::device_buffer`. The memory is allocated using the provided reservation.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Device memory reservation. Must have memory type DEVICE.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws std::invalid_argument If the reservation's memory type is not DEVICE.
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::pack
*/
[[nodiscard]] std::unique_ptr<PackedData> pack_device(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

/**
* @brief Pack a cudf table view into a contiguous pinned host buffer.
*
* Uses cudf::pack() with a pinned memory resource. Returns a `PackedData` with
* a `Buffer` backed by a pinned `HostBuffer`. The memory is allocated using
* the provided reservation.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Pinned host memory reservation. Must have memory type PINNED_HOST.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws std::invalid_argument If the reservation's memory type is not PINNED_HOST.
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::pack
*/
[[nodiscard]] std::unique_ptr<PackedData> pack_pinned_host(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

/**
* @brief Pack a cudf table view into a contiguous host buffer.
*
* Uses cudf::chunked_pack() with a device bounce buffer when available,
* otherwise a pinned bounce buffer. Returns a `PackedData` with a `Buffer`
* backed by a `HostBuffer`. The memory is allocated using the provided reservation.
*
* Algorithm:
* 1. Special case: empty tables return immediately with empty packed data.
* 2. Fast path for small tables (< 1MB): pack directly on device and copy to host.
* 3. Estimate the table size (est_size), with a minimum of 1MB.
* 4. Try to reserve device memory for est_size with overbooking allowed.
* 5. If available device memory (reservation - overbooking) >= 1MB,
* use chunked packing with the device bounce buffer.
* 6. Otherwise, if pinned memory is available, retry with pinned memory (steps 4-5).
* 7. If all attempts fail, throw an error.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Host memory reservation. Must have memory type HOST.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws std::invalid_argument If the reservation's memory type is not HOST.
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::chunked_pack
*/
[[nodiscard]] std::unique_ptr<PackedData> pack_host(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

} // namespace detail

/**
* @brief Pack a cudf table view into a contiguous buffer.
*
* This function serializes the given table view into a `PackedData` object
* with the data buffer residing in the memory type of the provided reservation.
* The memory for the packed data is allocated using the provided reservation.
*
* @param table The table view to pack.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param reservation Memory reservation to use for allocating the packed data buffer.
* @return A unique pointer to the packed data containing the serialized table.
*
* @throws rapidsmpf::reservation_error If the allocation size exceeds the reservation.
*
* @see cudf::pack
*/
[[nodiscard]] std::unique_ptr<PackedData> pack(
cudf::table_view const& table,
rmm::cuda_stream_view stream,
MemoryReservation& reservation
);

} // namespace rapidsmpf
38 changes: 38 additions & 0 deletions cpp/include/rapidsmpf/memory/buffer_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ class BufferResource {
*/
[[nodiscard]] rmm::host_async_resource_ref pinned_mr();

/**
* @brief Get the RMM pinned host memory resource as a device resource reference.
*
* @return Reference to the RMM resource used for pinned host allocations.
*/
[[nodiscard]] rmm::device_async_resource_ref pinned_mr_as_device();

/**
* @brief Check if pinned memory is available.
*
* @return true if pinned memory is available, false otherwise.
*/
[[nodiscard]] bool is_pinned_memory_available() const noexcept {
return pinned_mr_ != PinnedMemoryResource::Disabled;
}

/**
* @brief Retrieves the memory availability function for a given memory type.
*
Expand Down Expand Up @@ -311,6 +327,28 @@ class BufferResource {
std::unique_ptr<rmm::device_buffer> data, rmm::cuda_stream_view stream
);

/**
* @brief Move host buffer data into a Buffer.
*
* This operation is cheap; no copy is performed. The resulting Buffer resides in
* host memory.
*
* If @p stream differs from the device buffer's current stream:
* - @p stream is synchronized with the device buffer's current stream, and
* - the device buffer's current stream is updated to @p stream.
*
* @param data Unique pointer to the host buffer.
* @param stream CUDA stream associated with the new Buffer. Use or synchronize with
* @param host_mem_type The memory type of the underlying @p host_buffer.
* this stream when operating on the Buffer.
* @return Unique pointer to the resulting Buffer.
*/
std::unique_ptr<Buffer> move(
std::unique_ptr<HostBuffer> data,
rmm::cuda_stream_view stream,
MemoryType host_mem_type
);

/**
* @brief Move a Buffer to the memory type specified by the reservation.
*
Expand Down
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/memory/host_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class HostBuffer {
static HostBuffer from_rmm_device_buffer(
std::unique_ptr<rmm::device_buffer> pinned_host_buffer,
rmm::cuda_stream_view stream,
PinnedMemoryResource& mr
rmm::host_async_resource_ref mr
);

private:
Expand Down
49 changes: 49 additions & 0 deletions cpp/include/rapidsmpf/memory/memory_type.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include <ranges>
#include <span>

#include <rapidsmpf/utils/misc.hpp>

namespace rapidsmpf {

/// @brief Enum representing the type of memory sorted in decreasing order of preference.
Expand Down Expand Up @@ -66,6 +68,53 @@ static_assert(std::ranges::equal(
leq_memory_types(static_cast<MemoryType>(-1)), std::ranges::empty_view<MemoryType>{}
));

/**
* @brief Get the memory types that are device accessible.
*
* @return A span of memory types that are device accessible.
*/
constexpr std::span<MemoryType const> device_accessible_memory_types() noexcept {
return std::span{MEMORY_TYPES}.first<2>();
}

static_assert(std::ranges::equal(
device_accessible_memory_types(),
std::array{MemoryType::DEVICE, MemoryType::PINNED_HOST}
Comment on lines +71 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why not just return a std::array of the two memory types?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make sure that if we meddle with MEMORY_TYPES array in the future, we fail statically if somethings not right.

));

/**
* @brief Check if a memory type is device accessible.
*
* @param mem_type The memory type to check.
* @return true if the memory type is device accessible, false otherwise.
*/
constexpr bool is_device_accessible(MemoryType mem_type) noexcept {
return contains(device_accessible_memory_types(), mem_type);
}

/**
* @brief Get the memory types that are host accessible.
*
* @return A span of memory types that are host accessible.
*/
constexpr std::span<MemoryType const> host_accessible_memory_types() {
return std::span{MEMORY_TYPES}.last<2>();
}

static_assert(std::ranges::equal(
host_accessible_memory_types(), std::array{MemoryType::PINNED_HOST, MemoryType::HOST}
));

/**
* @brief Check if a memory type is host accessible.
*
* @param mem_type The memory type to check.
* @return true if the memory type is host accessible, false otherwise.
*/
constexpr bool is_host_accessible(MemoryType mem_type) noexcept {
return contains(host_accessible_memory_types(), mem_type);
}

/**
* @brief Get the name of a MemoryType.
*
Expand Down
Loading