From a24050cb68c1284182b1d6ad53351b31cdadf9cc Mon Sep 17 00:00:00 2001 From: zdevito Date: Mon, 8 Dec 2025 10:31:30 -0800 Subject: [PATCH] Remove ncclConfig_t bindings These were never used but complicate how we have to generate bindings to nccl. Differential Revision: [D88656649](https://our.internmc.facebook.com/intern/diff/D88656649/) [ghstack-poisoned] --- monarch_messages/src/worker.rs | 9 --- monarch_tensor_worker/src/comm.rs | 13 +---- monarch_tensor_worker/src/lib.rs | 13 ++--- nccl-sys/build.rs | 1 - nccl-sys/src/lib.rs | 7 --- torch-sys-cuda/src/bridge.h | 10 ---- torch-sys-cuda/src/bridge.rs | 5 -- torch-sys-cuda/src/nccl.rs | 92 +++---------------------------- 8 files changed, 16 insertions(+), 134 deletions(-) diff --git a/monarch_messages/src/worker.rs b/monarch_messages/src/worker.rs index 5958b521fb..3223b25fc1 100644 --- a/monarch_messages/src/worker.rs +++ b/monarch_messages/src/worker.rs @@ -38,7 +38,6 @@ use pyo3::types::PyTuple; use serde::Deserialize; use serde::Serialize; use thiserror::Error; -use torch_sys_cuda::nccl::NcclConfig; use torch_sys_cuda::nccl::ReduceOp; use torch_sys_cuda::nccl::UniqueId; use torch_sys2::BorrowError; @@ -800,10 +799,6 @@ pub enum WorkerMessage { /// will be ordered with respect to other operations scheduled on this /// stream. stream: StreamRef, - /// Configuration for the new communicator. If None, we will not pass a - /// config object to nccl, which means that the created communicator - /// will inherit its parent's config. - config: Option, }, /// Create a new communicator on each rank in `ranks`, capable of @@ -816,10 +811,6 @@ pub enum WorkerMessage { /// will be ordered with respect to other operations scheduled on this /// stream. stream: StreamRef, - /// Configuration for the new communicator. If None, we will not pass a - /// config object to nccl, which means that the created communicator - /// will inherit its parent's config. - config: Option, }, SendTensor { diff --git a/monarch_tensor_worker/src/comm.rs b/monarch_tensor_worker/src/comm.rs index 9e285f5777..7247935605 100644 --- a/monarch_tensor_worker/src/comm.rs +++ b/monarch_tensor_worker/src/comm.rs @@ -25,7 +25,6 @@ use tokio::task::spawn_blocking; use torch_sys_cuda::cuda::Event; use torch_sys_cuda::cuda::Stream; use torch_sys_cuda::nccl::Communicator; -use torch_sys_cuda::nccl::NcclConfig; use torch_sys_cuda::nccl::NcclError; use torch_sys_cuda::nccl::NcclStatus; use torch_sys_cuda::nccl::ReduceOp; @@ -90,14 +89,10 @@ pub enum CommMessage { Group(Vec, Stream, #[reply] OncePortHandle), - SplitAll( - Option, - #[reply] OncePortHandle>, - ), + SplitAll(#[reply] OncePortHandle>), SplitFrom( Vec, - Option, #[reply] OncePortHandle>>, ), } @@ -224,11 +219,10 @@ impl CommMessageHandler for NcclCommActor { async fn split_all( &mut self, cx: &hyperactor::Context, - nccl_config: Option, ) -> Result> { let comm = self.comm.clone(); - let split_comm = spawn_blocking(move || comm.lock().split_all(nccl_config)) + let split_comm = spawn_blocking(move || comm.lock().split_all()) .await .unwrap()?; @@ -241,11 +235,10 @@ impl CommMessageHandler for NcclCommActor { &mut self, cx: &hyperactor::Context, ranks: Vec, - nccl_config: Option, ) -> Result>> { let comm = self.comm.clone(); - let split_comm = spawn_blocking(move || comm.lock().split_from(ranks, nccl_config)) + let split_comm = spawn_blocking(move || comm.lock().split_from(ranks)) .await .unwrap()?; diff --git a/monarch_tensor_worker/src/lib.rs b/monarch_tensor_worker/src/lib.rs index 303732a3f0..99146664c0 100644 --- a/monarch_tensor_worker/src/lib.rs +++ b/monarch_tensor_worker/src/lib.rs @@ -90,7 +90,6 @@ use sorted_vec::SortedVec; use stream::StreamActor; use stream::StreamMessageClient; use stream::StreamParams; -use torch_sys_cuda::nccl::NcclConfig; use torch_sys_cuda::nccl::ReduceOp; use torch_sys_cuda::nccl::UniqueId; use torch_sys2::CudaDevice; @@ -340,7 +339,7 @@ impl WorkerMessageHandler for WorkerActor { for _ in 0..sorted_streams.len() { // Do the split in this event loop, to provide a deterministic // order. - splits.push(comm.split_all(cx, None).await?); + splits.push(comm.split_all(cx).await?); } let _: Vec<()> = try_join_all( sorted_streams @@ -371,7 +370,7 @@ impl WorkerMessageHandler for WorkerActor { .comm .as_ref() .context("tried to call Reduce before BackendNetworkInit")?; - let comm = global_comm.split_all(cx, None).await?; + let comm = global_comm.split_all(cx).await?; self.send_recv_comms .insert((from_stream, to_stream), Arc::new(comm)); Ok(()) @@ -803,7 +802,6 @@ impl WorkerMessageHandler for WorkerActor { dims: Vec, device_mesh: Ref, stream_ref: StreamRef, - config: Option, ) -> Result<()> { let global_comm = self .comm @@ -833,7 +831,6 @@ impl WorkerMessageHandler for WorkerActor { .into_iter() .map(|v| v.clone().try_into()) .collect::, _>>()?, - config, ) .await? .context("split comm should include self rank")?; @@ -842,7 +839,7 @@ impl WorkerMessageHandler for WorkerActor { None => { // This rank is not in the group to be split off. We still need to // participate in the commSplit call, however. - global_comm.split_from(cx, vec![], config).await?; + global_comm.split_from(cx, vec![]).await?; } } Ok(()) @@ -853,7 +850,6 @@ impl WorkerMessageHandler for WorkerActor { cx: &hyperactor::Context, remote_process_group_ref: Ref, stream_ref: StreamRef, - config: Option, ) -> Result<()> { ensure!( self.streams.contains_key(&stream_ref), @@ -888,7 +884,6 @@ impl WorkerMessageHandler for WorkerActor { .into_iter() .map(|v| v.clone().try_into()) .collect::, _>>()?, - config, ) .await? .context("split comm should include self rank")?; @@ -897,7 +892,7 @@ impl WorkerMessageHandler for WorkerActor { None => { // This rank is not in the group to be split off. We still need to // participate in the commSplit call, however. - global_comm.split_from(cx, vec![], config).await?; + global_comm.split_from(cx, vec![]).await?; } } Ok(()) diff --git a/nccl-sys/build.rs b/nccl-sys/build.rs index 1bb333fed9..3e9d1fe511 100644 --- a/nccl-sys/build.rs +++ b/nccl-sys/build.rs @@ -72,7 +72,6 @@ fn main() { .allowlist_type("ncclDataType_t") .allowlist_type("ncclRedOp_t") .allowlist_type("ncclScalarResidence_t") - .allowlist_type("ncclConfig_t") .allowlist_type("ncclSimInfo_t") .allowlist_var("NCCL_SPLIT_NOCOLOR") .allowlist_var("NCCL_MAJOR") diff --git a/nccl-sys/src/lib.rs b/nccl-sys/src/lib.rs index 76dacd19f5..2879054f87 100644 --- a/nccl-sys/src/lib.rs +++ b/nccl-sys/src/lib.rs @@ -15,13 +15,6 @@ unsafe impl ExternType for CUstream_st { type Kind = cxx::kind::Opaque; } -/// SAFETY: bindings -/// Trivial because this is POD struct -unsafe impl ExternType for ncclConfig_t { - type Id = type_id!("ncclConfig_t"); - type Kind = cxx::kind::Trivial; -} - /// SAFETY: bindings unsafe impl ExternType for ncclComm { type Id = type_id!("ncclComm"); diff --git a/torch-sys-cuda/src/bridge.h b/torch-sys-cuda/src/bridge.h index 7452f48164..0e20d34e57 100644 --- a/torch-sys-cuda/src/bridge.h +++ b/torch-sys-cuda/src/bridge.h @@ -8,16 +8,6 @@ #pragma once -#include // @manual - namespace monarch { -/// This function exists because ncclConfig initialization requires the use of -/// a macro. We cannot reference the macro directly from Rust code, so we wrap -/// the macro use in a function and bind that to Rust instead. -inline ncclConfig_t make_nccl_config() { - ncclConfig_t ret = NCCL_CONFIG_INITIALIZER; - return ret; -} - } // namespace monarch diff --git a/torch-sys-cuda/src/bridge.rs b/torch-sys-cuda/src/bridge.rs index f6a1024280..41fbcb54d0 100644 --- a/torch-sys-cuda/src/bridge.rs +++ b/torch-sys-cuda/src/bridge.rs @@ -11,10 +11,5 @@ pub(crate) mod ffi { unsafe extern "C++" { include!("monarch/torch-sys-cuda/src/bridge.h"); - - // nccl helpers - #[namespace = ""] - type ncclConfig_t = nccl_sys::ncclConfig_t; - fn make_nccl_config() -> ncclConfig_t; } } diff --git a/torch-sys-cuda/src/nccl.rs b/torch-sys-cuda/src/nccl.rs index 137967759c..5bc3882329 100644 --- a/torch-sys-cuda/src/nccl.rs +++ b/torch-sys-cuda/src/nccl.rs @@ -6,7 +6,6 @@ * LICENSE file in the root directory of this source tree. */ -use std::ffi::CString; use std::fmt; use std::fmt::Write; use std::hash::Hasher; @@ -26,7 +25,6 @@ use torch_sys2::TensorCell; use torch_sys2::factory_float_tensor; use torch_sys2::is_float8_type; -use crate::bridge::ffi::make_nccl_config; use crate::cuda::CudaError; use crate::cuda::Stream; use crate::cuda::set_device; @@ -100,60 +98,6 @@ pub enum NcclStatus { InProgress, } -/// Rust version of ncclConfig_t. See nccl documentation for what each field -/// means: -/// https://docs.nvidia.com/deeplearning/nccl/user-guide/docs/api/types.html#ncclconfig-t -/// -/// Note that we don't validate field values; we rely on nccl to do that. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct NcclConfig { - pub blocking: bool, - pub cga_cluster_size: u8, - pub min_ctas: u8, - pub max_ctas: u8, - pub net_name: Option, - pub split_share: bool, -} - -impl Default for NcclConfig { - fn default() -> Self { - NcclConfig { - blocking: true, - cga_cluster_size: 4, - min_ctas: 1, - max_ctas: 32, - net_name: None, - split_share: false, - } - } -} - -impl From for ncclConfig_t { - fn from(config: NcclConfig) -> Self { - let mut ret = make_nccl_config(); - ret.blocking = config.blocking.into(); - ret.cgaClusterSize = config.cga_cluster_size.into(); - ret.minCTAs = config.min_ctas.into(); - ret.maxCTAs = config.max_ctas.into(); - if let Some(net_name) = config.net_name { - let c_string = CString::new(net_name) - .expect("failed to create CString") - .into_boxed_c_str(); - - // Just leak the string to avoid complicated ownership issues. I'm - // not aware of anywhere where we actually want to specify the - // network module name in configuration instead of letting nccl just - // choose it for us. If this happens + we are creating tons of - // config objects, we can revisit this. - let ptr = Box::leak(c_string).as_ptr(); - ret.netName = ptr; - } - ret.splitShare = config.split_share.into(); - - ret - } -} - fn nccl_check(result: ncclResult_t) -> Result { match result.0 { 0 => Ok(NcclStatus::Success), @@ -383,9 +327,9 @@ impl Communicator { /// Split off a new communicator from this one, preserving the same world /// size. - pub fn split_all(&mut self, config: Option) -> Result { + pub fn split_all(&mut self) -> Result { let ranks = (0..self.global_world_size).collect(); - Ok(self.split_from(ranks, config)?.unwrap()) + Ok(self.split_from(ranks)?.unwrap()) } /// Split off a new communicator from this one. Only `ranks` will be present @@ -397,7 +341,6 @@ impl Communicator { pub fn split_from( &mut self, mut ranks: Vec, - config: Option, ) -> Result, NcclError> { ranks.sort(); for rank in &ranks { @@ -411,34 +354,17 @@ impl Communicator { Err(_) => NCCL_SPLIT_NOCOLOR, }; - let config = config.map(ncclConfig_t::from); let mut new = MaybeUninit::uninit(); // SAFETY: intended use of C function let new = unsafe { - // This rather awkward duplication is intentional; we are passing in - // `config` as a pointer, which is only guaranteed to be valid for - // the duration of `Some(mut config)` match arm. - match config { - Some(mut config) => { - nccl_check(ncclCommSplit( - self.inner, - color, - self.rank, - new.as_mut_ptr(), - &mut config, - ))?; - } - None => { - nccl_check(ncclCommSplit( - self.inner, - color, - self.rank, - new.as_mut_ptr(), - std::ptr::null_mut(), - ))?; - } - } + nccl_check(ncclCommSplit( + self.inner, + color, + self.rank, + new.as_mut_ptr(), + std::ptr::null_mut(), + ))?; new.assume_init() };