Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b145819
Add OrderScheme partitioning metadata
rjzamora Feb 10, 2026
5fb6d2f
Merge branch 'main' into orderscheme-metadata
rjzamora Feb 17, 2026
fec05ee
Merge branch 'main' into orderscheme-metadata
rjzamora Feb 25, 2026
6c08a84
Merge branch 'main' into orderscheme-metadata
rjzamora Feb 26, 2026
215a757
Merge branch 'main' into orderscheme-metadata
rjzamora Mar 4, 2026
5461cc2
Merge branch 'main' into orderscheme-metadata
rjzamora Mar 5, 2026
1011476
Merge branch 'main' into orderscheme-metadata
rjzamora Mar 11, 2026
0818a8a
Merge branch 'main' into orderscheme-metadata
rjzamora Apr 13, 2026
103930a
update
rjzamora Apr 13, 2026
fbd5009
Merge remote-tracking branch 'upstream/main' into orderscheme-metadata
rjzamora Apr 14, 2026
f736ef9
address code review and clean up
rjzamora Apr 14, 2026
7a2e9ba
rewrite OrderScheme API
rjzamora Apr 15, 2026
760605b
address review
rjzamora Apr 16, 2026
7b1bfa5
Merge remote-tracking branch 'upstream/main' into orderscheme-metadata
rjzamora Apr 16, 2026
ba63956
make boundaries spillable
rjzamora Apr 16, 2026
8efb567
spilling test
rjzamora Apr 16, 2026
76f3bb1
Merge branch 'main' into orderscheme-metadata
rjzamora Apr 17, 2026
82d3730
Merge remote-tracking branch 'upstream/main' into orderscheme-metadata
rjzamora Apr 20, 2026
40de4e8
fix spillablemessages
rjzamora Apr 20, 2026
1609f66
accept suggestion
rjzamora Apr 21, 2026
6bb185b
simplify
rjzamora Apr 21, 2026
3544505
heavy revisions
rjzamora Apr 21, 2026
2bd1a26
avoid unnecessary aliases
rjzamora Apr 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 221 additions & 4 deletions cpp/include/rapidsmpf/streaming/cudf/channel_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
/**
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES. All rights
* reserved. SPDX-License-Identifier: Apache-2.0
* SPDX-FileCopyrightText: Copyright (c) 2025-2026, NVIDIA CORPORATION & AFFILIATES.
* SPDX-License-Identifier: Apache-2.0
*/

#pragma once

#include <cstdint>
#include <memory>
#include <optional>
#include <vector>

#include <cuda_runtime_api.h>

#include <cudf/types.hpp>

#include <rapidsmpf/memory/content_description.hpp>
#include <rapidsmpf/memory/memory_reservation.hpp>
#include <rapidsmpf/streaming/core/message.hpp>
#include <rapidsmpf/streaming/cudf/table_chunk.hpp>

namespace rapidsmpf::streaming {

Expand All @@ -31,6 +37,75 @@ struct HashScheme {
bool operator==(HashScheme const&) const = default;
};

/**
* @brief A single sort key: column index, sort direction, and null placement.
*/
struct OrderKey {
cudf::size_type column_index; ///< Column to sort on.
cudf::order order; ///< ASCENDING or DESCENDING.
cudf::null_order null_order; ///< BEFORE or AFTER.

/**
* @brief Equality comparison.
* @return True if all fields are equal.
*/
bool operator==(OrderKey const&) const = default;
};

/**
* @brief Order-based partitioning scheme for sorted/range-partitioned data.
*
* Data is partitioned by value ranges based on predetermined boundaries.
* For N partitions, there are N-1 boundary rows:
* - Partition 0: values < boundaries[0]
* - Partition i (0 < i < N-1): boundaries[i-1] <= values < boundaries[i]
* - Partition N-1: values >= boundaries[N-2]
*
* `keys[i]` is the i-th sort column; ordering is lexicographic by `keys[0]`,
* then `keys[1]`, and so on.
*
* When `boundaries` is set, its columns must align with `keys`
* (same count and compatible dtypes). Mismatched dtypes are a usage error.
*
* `strict_boundary`: when true, every row in a chunk belongs to a single partition's
* half-open key range (partition keys do not straddle chunk interiors). When false,
* a chunk may contain keys spanning multiple partitions (e.g. before a shuffle).
*/
struct OrderScheme {
std::vector<OrderKey> keys; ///< Sort keys (column, order, null_order per entry).
std::unique_ptr<TableChunk> boundaries; ///< N-1 boundary rows for N partitions.
/// See struct-level note on `strict_boundary` semantics.
bool strict_boundary{false};

/**
* @brief Deep-copy this scheme into a new one.
*
* Copies vectors and `strict_boundary`. If `boundaries` is non-null, copies
* the underlying `TableChunk` via `TableChunk::copy(reservation)`.
*
* @param reservation Memory reservation for the boundary copy.
* @return A new independent `OrderScheme`.
*/
[[nodiscard]] OrderScheme clone(MemoryReservation& reservation) const;

/**
* @brief Shallow metadata equality (not semantic boundary value equality).
*
* Returns true when `keys` and `strict_boundary` match, and boundary tables
* are consistent in the weak sense: both absent, or both present with the
* same shape from `TableChunk::shape()` (not `table_view()`, which requires an
* available device table).
* Cell values inside `boundaries` are intentionally not compared (that would
* require a device comparison API with stream and memory resource). Do not
* use `operator==` to assert that two schemes have identical range boundaries.
* A future API may add deep boundary comparison with explicit stream and MR.
*
* @param other The OrderScheme to compare against.
* @return True under the shallow rules above.
*/
bool operator==(OrderScheme const& other) const;
};

/**
* @brief Partitioning specification for a single hierarchical level.
*
Expand All @@ -40,6 +115,7 @@ struct HashScheme {
* - `none()`: No partitioning information at this level.
* - `inherit()`: Partitioning is inherited from the parent level unchanged.
* - `from_hash(h)`: Explicit hash partitioning with the given scheme.
* - `from_order(o)`: Explicit order/range partitioning with the given scheme.
*/
struct PartitioningSpec {
/**
Expand All @@ -49,10 +125,12 @@ struct PartitioningSpec {
NONE, ///< No partitioning information at this level.
INHERIT, ///< Partitioning is inherited from parent level unchanged.
HASH, ///< Hash partitioning.
ORDER, ///< Order/range partitioning.
};

Type type = Type::NONE; ///< The type of partitioning.
std::optional<HashScheme> hash; ///< Valid only when type == HASH.
std::optional<OrderScheme> order; ///< Valid only when type == ORDER.
Comment on lines +128 to +133
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Can one have both hash and order for the same partitioningspec?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose you could, but hash is ignored unless type is HASH and order is ignored unless type is ORDER. In the absence of Cython, I would never design it this way, but I remember we landed on this after I struggled with HashScheme.


/**
* @brief Create a spec indicating no partitioning information.
Expand All @@ -67,7 +145,7 @@ struct PartitioningSpec {
* @return A PartitioningSpec with type INHERIT.
*/
static PartitioningSpec inherit() {
return {.type = Type::INHERIT, .hash = std::nullopt};
return {.type = Type::INHERIT, .hash = std::nullopt, .order = std::nullopt};
}

/**
Expand All @@ -76,9 +154,28 @@ struct PartitioningSpec {
* @return A PartitioningSpec with type HASH.
*/
static PartitioningSpec from_hash(HashScheme h) {
return {.type = Type::HASH, .hash = std::move(h)};
return {.type = Type::HASH, .hash = std::move(h), .order = std::nullopt};
}

/**
* @brief Create a spec for order/range partitioning.
* @param o The order scheme to use. `o.keys` must be non-empty; otherwise
* throws `std::invalid_argument`.
* @return A PartitioningSpec with type ORDER.
*/
static PartitioningSpec from_order(OrderScheme o);

/**
* @brief Deep-copy this spec, cloning any ORDER boundaries via `reservation`.
*
* NONE/INHERIT/HASH arms are trivially cheap; ORDER delegates to
* `OrderScheme::clone(reservation)`.
*
* @param reservation Memory reservation forwarded to `OrderScheme::clone`.
* @return A new independent `PartitioningSpec`.
*/
[[nodiscard]] PartitioningSpec clone(MemoryReservation& reservation) const;

/**
* @brief Equality comparison.
* @return True if both specs are equal.
Expand Down Expand Up @@ -106,6 +203,13 @@ struct Partitioning {
/// Distribution within a rank (corresponds to local/single communicator).
PartitioningSpec local;

/**
* @brief Deep-copy, cloning any ORDER boundaries via `reservation`.
* @param reservation Memory reservation forwarded through the clone chain.
* @return A new independent `Partitioning`.
*/
[[nodiscard]] Partitioning clone(MemoryReservation& reservation) const;

/**
* @brief Equality comparison.
* @return True if both partitionings are equal.
Expand Down Expand Up @@ -141,13 +245,126 @@ struct ChannelMetadata {
partitioning(std::move(partitioning)),
duplicated(duplicated) {}

/**
* @brief Deep-copy, cloning any ORDER boundaries via `reservation`.
* @param reservation Memory reservation forwarded through the clone chain.
* @return A new independent `ChannelMetadata`.
*/
[[nodiscard]] ChannelMetadata clone(MemoryReservation& reservation) const;

/**
* @brief Equality comparison.
* @return True if both metadata objects are equal.
*/
bool operator==(ChannelMetadata const&) const = default;
};

/**
* @brief Construct an `OrderScheme` in one step (e.g. from Python bindings).
*
* @param keys Sort keys (must be non-empty).
* @param boundaries Optional boundary rows; may be null.
* @param strict_boundary See `OrderScheme::strict_boundary`.
* @return A fully initialized `OrderScheme`.
*/
[[nodiscard]] OrderScheme make_order_scheme(
std::vector<OrderKey> keys,
std::unique_ptr<TableChunk> boundaries,
bool strict_boundary
);

/**
* @brief Set `spec` to `PartitioningSpec::none()`.
* @param spec Target spec.
*/
void partitioning_spec_set_none(PartitioningSpec& spec);

/**
* @brief Set `spec` to `PartitioningSpec::inherit()`.
* @param spec Target spec.
*/
void partitioning_spec_set_inherit(PartitioningSpec& spec);

/**
* @brief Set `spec` to hash partitioning, moving `hash_scheme` into place.
* @param spec Target spec.
* @param hash_scheme Hash scheme to move into `spec`.
*/
void partitioning_spec_set_hash(PartitioningSpec& spec, HashScheme hash_scheme);

/**
* @brief Set `spec` to ORDER partitioning, moving boundaries out of `src`.
* @param spec Target spec.
* @param src Source order scheme (keys copied, boundaries moved).
*/
void partitioning_spec_set_order(PartitioningSpec& spec, OrderScheme& src);

/**
* @brief Non-null pointer to the `OrderScheme` inside an ORDER spec.
* @param spec Spec whose `type` must be `ORDER`.
* @return Address of the stored `OrderScheme`.
*/
[[nodiscard]] OrderScheme* partitioning_spec_order_scheme_ptr(PartitioningSpec& spec);

/**
* @brief Row count of boundary table from `shape()` (works even when not
* device-available).
* @param scheme Scheme whose `boundaries` must be non-null.
* @return Number of boundary rows.
*/
[[nodiscard]] cudf::size_type order_scheme_boundary_row_count(OrderScheme const* scheme);

/**
* @brief Device `table_view` of boundary rows; `boundaries` must be device-resident.
* @param scheme Scheme with non-null, available `boundaries`.
* @return View over boundary columns.
*/
[[nodiscard]] cudf::table_view order_scheme_boundaries_table_view(
OrderScheme const* scheme
);

/**
* @brief CUDA stream for boundary `TableChunk`; `boundaries` must be device-resident.
* @param scheme Scheme with non-null, available `boundaries`.
* @return Raw stream handle for the boundary table.
*/
[[nodiscard]] cudaStream_t order_scheme_boundaries_cuda_stream(OrderScheme const* scheme);

/**
* @brief Shallow-clone `partitioning` into a new `ChannelMetadata`.
*
* Hash specs are copied; ORDER specs copy keys and move boundary `TableChunk`
* ownership out of `partitioning` (source ORDER specs lose boundaries).
*
* @param local_count Local chunk count for the new metadata.
* @param partitioning Partitioning to shallow-clone (ORDER boundaries may be moved out).
* @param duplicated Whether data is duplicated across workers.
* @return Newly allocated `ChannelMetadata` owning the cloned partitioning.
*/
[[nodiscard]] std::unique_ptr<ChannelMetadata> make_channel_metadata(
std::uint64_t local_count, Partitioning& partitioning, bool duplicated
);

/**
* @brief Consume a `Message` and return its `ChannelMetadata` payload.
*
* @param msg Message holding `ChannelMetadata`; consumed / emptied by `release`.
* @return Newly allocated `ChannelMetadata` moved from the message payload.
*/
[[nodiscard]] std::unique_ptr<ChannelMetadata> channel_metadata_from_message(Message msg);

/**
* @brief `ContentDescription` for a `ChannelMetadata` message payload.
*
* For now this is non-spillable with zero tracked sizes: ORDER boundaries are
* expected to stay device-resident on metadata paths. Spill accounting for
* embedded boundaries can be added later without changing the Python API.
*
* @param m Channel metadata to describe.
* @return Content description with spillability off and zero-sized content.
*/
ContentDescription get_content_description(ChannelMetadata const& m);

/**
* @brief Wrap a `ChannelMetadata` into a `Message`.
*
Expand Down
Loading
Loading