Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/include/rapidsmpf/bootstrap/backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ enum class BackendType {
* @brief Automatically detect the best backend based on environment.
*
* Detection order:
* 1. File-based (if RRUN_COORD_DIR set by rrun)
* 1. File-based (if RRUN_COORD_DIR or RRUN_ROOT_ADDRESS set by rrun)
* 2. Slurm/PMIx (if SLURM environment detected)
* 3. File-based (default fallback)
*/
Expand Down
12 changes: 11 additions & 1 deletion cpp/include/rapidsmpf/bootstrap/slurm_backend.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace rapidsmpf::bootstrap::detail {
*
* Usage:
* ```bash
* # Multiple (4) tasks per node, one task per GPU, two nodes.
* # Passthrough: multiple (4) tasks per node, one task per GPU, two nodes.
* srun \
* --mpi=pmix \
* --nodes=2 \
Expand All @@ -40,6 +40,16 @@ namespace rapidsmpf::bootstrap::detail {
* --gpus-per-task=1 \
* --gres=gpu:4 \
* rrun ./benchmarks/bench_shuffle -C ucxx
*
* # Hybrid mode: one task per node, 4 GPUs per task, two nodes.
* srun \
* --mpi=pmix \
* --nodes=2 \
* --ntasks-per-node=1 \
* --cpus-per-task=144 \
* --gpus-per-task=4 \
* --gres=gpu:4 \
* rrun -n 4 ./benchmarks/bench_shuffle -C ucxx
* ```
Comment on lines +44 to 53
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation on why I might want to use hybrid launch mode? IIUC it is because then I just need to ensure that I launch the correct number of processes per node and not worry about binding via slurm (the rrun launch takes care of that).

but, one thing I don't understand is why we can't just launch in passthrough mode and then apply bindings to all the processes we get via the rrun topology detection stuff.

What concretely is hybrid mode buying us?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In passthrough mode we can do that, although in a somewhat limited manner. Take for example CPUs as resources, you have two options:

  1. Setup the job itself to partition the CPUs: I believe this works as expected if the cluster is properly configured, but it also means the burden is on the user to define the resources appropriately, which necessarily puts some burden on the user to know a priori the amount of CPUs available and divide them evenly for each task; or
  2. Just passthrough all CPUs: in this case we can use rrun to determine the CPUs to bind to, however, the GPU index is not known (since the GPU always appears as index 0 to each task) and thus we cannot partition the CPUs per GPU but only bind to the whole CPU socket/NUMA node. Note that the CPU partitioning is not currently implemented in rrun but I intend to do so. We could also technically do that partitioning with rrun based on the Slurm local ID but that would also require a different specialization.

However, there's a few more reasons why I would like to have hybrid mode:

  1. If we have another process to coordinate work submission (analogous to a Dask/Distributed client, as we previously discussed, for example), launching a single task per node will greatly simplify coordination as the coordination can be all prepared via the hybrid mode implementation.
  2. I'm leaning towards using PMIx for other future specializations as a sort of distributed KV store. It seems like it would be a good fit for extending rrun to other distributed systems (like when using SSH). I know this will require a bit more scaffolding in rrun (like setting up a PMIx server), but using PMIx is probably a better alternative than writing our own KV store for launching purposes, and may also allow us to avoid file-based synchronization. Therefore, this existing implementation with Slurm serves also as a test ground for future PMIx use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however, the GPU index is not known (since the GPU always appears as index 0 to each task) and thus we cannot partition the CPUs per GPU but only bind to the whole CPU socket/NUMA node.

Can we not map the cuda runtime device id to its nvml counterpart and therefore determine the physical CPUs to bind to?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think no. This is an intentional isolation layer from Slurm (or rather enroot/pyxis) so that only the resources that are actually available to the allocation are seen at runtime, and that includes NVML, so NVML reports a single device being available and you cannot determine the true index or "order" of the device.

*/
class SlurmBackend : public Backend {
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/bootstrap/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ namespace {
*/
BackendType detect_backend() {
// Check for rrun coordination first (explicit configuration takes priority).
// If RRUN_COORD_DIR is set, rrun is coordinating and we should use FILE backend.
if (getenv_optional("RRUN_COORD_DIR")) {
// If RRUN_COORD_DIR or RRUN_ROOT_ADDRESS is set, rrun is coordinating and we
// should use FILE backend (with or without pre-coordinated address).
if (getenv_optional("RRUN_COORD_DIR") || getenv_optional("RRUN_ROOT_ADDRESS")) {
return BackendType::FILE;
}

Expand Down
107 changes: 103 additions & 4 deletions cpp/src/bootstrap/ucxx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@
#ifdef RAPIDSMPF_HAVE_UCXX

#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <fstream>
#include <memory>
#include <sstream>
#include <string>

#include <cuda_device_runtime_api.h>
#include <unistd.h> // for unsetenv

#include <rapidsmpf/bootstrap/bootstrap.hpp>
#include <rapidsmpf/bootstrap/ucxx.hpp>
Expand All @@ -23,6 +28,36 @@

namespace rapidsmpf::bootstrap {

namespace {
// Hex encoding for binary-safe address transmission
std::string hex_encode(std::string_view input) {
static constexpr const char* hex_chars = "0123456789abcdef";
std::string result;
result.reserve(input.size() * 2);
for (char ch : input) {
auto c = static_cast<unsigned char>(ch);
result.push_back(hex_chars[c >> 4]);
result.push_back(hex_chars[c & 0x0F]);
}
return result;
}

std::string hex_decode(std::string_view const& input) {
std::string result;
result.reserve(input.size() / 2);
for (size_t i = 0; i < input.size(); i += 2) {
auto high = static_cast<unsigned char>(
(input[i] >= 'a') ? (input[i] - 'a' + 10) : (input[i] - '0')
);
auto low = static_cast<unsigned char>(
(input[i + 1] >= 'a') ? (input[i + 1] - 'a' + 10) : (input[i + 1] - '0')
);
result.push_back(static_cast<char>((high << 4) | low));
}
return result;
}
} // namespace

std::shared_ptr<ucxx::UCXX> create_ucxx_comm(
std::shared_ptr<ProgressThread> progress_thread,
BackendType type,
Expand All @@ -35,8 +70,69 @@ std::shared_ptr<ucxx::UCXX> create_ucxx_comm(

std::shared_ptr<ucxx::UCXX> comm;

// Root rank: Create listener and publish address via put() for non-root ranks.
if (ctx.rank == 0) {
auto precomputed_address_encoded = getenv_optional("RRUN_ROOT_ADDRESS");
auto address_file = getenv_optional("RRUN_ROOT_ADDRESS_FILE");

// Path 1: Early address mode for root rank in Slurm hybrid mode.
// Rank 0 is launched first to create its address and write it to a file.
// Parent will coordinate with other parents via PMIx, then launch worker ranks
// with RRUN_ROOT_ADDRESS set. No PMIx put/barrier/get bootstrap coordination.
if (ctx.rank == 0 && address_file.has_value()) {
auto ucxx_initialized_rank =
ucxx::init(nullptr, ctx.nranks, std::nullopt, options);
comm = std::make_shared<ucxx::UCXX>(
std::move(ucxx_initialized_rank), options, progress_thread
);

auto listener_address = comm->listener_address();
auto root_worker_address_str =
std::get<std::shared_ptr<::ucxx::Address>>(listener_address.address)
->getStringView();

std::string encoded_address = hex_encode(root_worker_address_str);
// Write to a temp file then rename so the reader never sees partial content.
std::string const temp_path = *address_file + ".tmp";
std::ofstream addr_file(temp_path);
if (!addr_file) {
throw std::runtime_error(
"Failed to write root address to file: " + temp_path
);
}
addr_file << encoded_address << std::endl;
addr_file.close();
if (std::rename(temp_path.c_str(), address_file->c_str()) != 0) {
std::remove(temp_path.c_str());
throw std::runtime_error(
"Failed to rename root address file to: " + *address_file
);
}

auto verbose = getenv_optional("RAPIDSMPF_VERBOSE");
if (verbose && *verbose == "1") {
std::cerr << "[rank 0] Wrote address to " << *address_file
<< ", skipping bootstrap coordination" << std::endl;
}

// Unset now that bootstrap is complete; the variable is no longer used.
unsetenv("RRUN_ROOT_ADDRESS_FILE");
}
// Path 2: Slurm hybrid mode for non-root ranks.
// Parent process already coordinated the root address via PMIx and provided it
// via RRUN_ROOT_ADDRESS environment variable (hex-encoded).
else if (precomputed_address_encoded.has_value() && ctx.rank != 0)
{
std::string precomputed_address = hex_decode(*precomputed_address_encoded);
auto root_worker_address = ::ucxx::createAddressFromString(precomputed_address);
auto ucxx_initialized_rank =
ucxx::init(nullptr, ctx.nranks, root_worker_address, options);
comm = std::make_shared<ucxx::UCXX>(
std::move(ucxx_initialized_rank), options, progress_thread
);
}
// Path 3: Normal bootstrap mode for root rank.
// Create listener and publish address via put() for non-root ranks to retrieve.
else if (ctx.rank == 0)
{
auto ucxx_initialized_rank =
ucxx::init(nullptr, ctx.nranks, std::nullopt, options);
comm = std::make_shared<ucxx::UCXX>(
Expand All @@ -49,8 +145,11 @@ std::shared_ptr<ucxx::UCXX> create_ucxx_comm(
std::get<std::shared_ptr<::ucxx::Address>>(listener_address.address)
->getStringView());
sync(ctx);
} else {
// Non-root ranks: Retrieve root address via get() and connect.
}
// Path 4: Normal bootstrap mode for non-root ranks.
// Retrieve root address via get() and connect.
else
{
sync(ctx);

auto root_worker_address_str =
Expand Down
Loading
Loading