Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -347,9 +347,10 @@ list(APPEND NVFUSER_SRCS
${NVFUSER_SRCS_DIR}/preseg_passes/remove_empty.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/reorder_sharded_axis.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/segment_inplace_update.cpp
${NVFUSER_SRCS_DIR}/host_ir/allocate_and_deallocate.cpp
${NVFUSER_SRCS_DIR}/host_ir/assign_streams.cpp
${NVFUSER_SRCS_DIR}/host_ir/pass/convert_op_to_communication.cpp
${NVFUSER_SRCS_DIR}/host_ir/pass/stream_parallel_type.cpp
${NVFUSER_SRCS_DIR}/host_ir/allocate_and_deallocate.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/translate_no_reduction_matmul_to_mul_squeeze.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/translate_repeat_to_expand.cpp
${NVFUSER_SRCS_DIR}/preseg_passes/translate_scatter_accumulate.cpp
Expand Down
1 change: 0 additions & 1 deletion csrc/host_ir/allocate_and_deallocate.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// clang-format on
#pragma once

#include "host_ir/container.h"
#include "optimization_pass.h"

namespace nvfuser::hir {
Expand Down
63 changes: 63 additions & 0 deletions csrc/host_ir/assign_streams.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on

#include "host_ir/assign_streams.h"

#include "host_ir/container.h"
#include "ir/builder.h"

namespace nvfuser::hir {

void AssignStreams::runPass(Fusion* fusion) {
auto* hic = dynamic_cast<HostIrContainer*>(fusion);
NVF_CHECK(hic != nullptr);
FusionGuard fg(hic);

for (auto it = hic->topLevel().exprs().begin();
it != hic->topLevel().exprs().end();) {
auto next_it = std::next(it);

auto* for_loop = dynamic_cast<hir::ForLoop*>(*it);
if (for_loop == nullptr) {
it = next_it;
continue;
}

// We should check that the loop is stream-parallel. This is not necessary
// at this moment because all loops are stream-parallel. This is also hard
// to do becauase hir::ForLoop doesn't point to the source IterDomain.

auto* get_current_stream = IrBuilder::create<GetCurrentStream>();
Stream* main_stream = get_current_stream->stream();
hic->topLevel().insert(it, get_current_stream);

// At the beginning of each iteration: set stream and synchronize with main
// stream
auto* worker_stream = IrBuilder::create<Stream>(for_loop->index());
auto* set_stream = IrBuilder::create<SetCurrentStream>(worker_stream);
auto* sync_main = IrBuilder::create<Synchronize>(main_stream);
auto old_begin = for_loop->body().exprs().begin();
for_loop->body().insert(old_begin, set_stream);
for_loop->body().insert(old_begin, sync_main);

// After the loop: create a joining loop to synchronize all worker streams
auto* join_loop = IrBuilder::create<hir::ForLoop>(
for_loop->index(), for_loop->start(), for_loop->stop());

// In the joining loop: synchronize each worker stream
auto* join_worker_stream = IrBuilder::create<Stream>(join_loop->index());
auto* sync_worker = IrBuilder::create<Synchronize>(join_worker_stream);
join_loop->body().push_back(sync_worker);

// Insert join_loop after the current for_loop
hic->topLevel().insert(next_it, join_loop);
it = next_it;
}
}

} // namespace nvfuser::hir
26 changes: 26 additions & 0 deletions csrc/host_ir/assign_streams.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2025-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#pragma once

#include "optimization_pass.h"

namespace nvfuser::hir {

// A host IR pass that assigns streams to stream-parallel loops.
class AssignStreams : public OptimizationPass<AssignStreams> {
friend class OptimizationPass<AssignStreams>;

protected:
static void runPass(Fusion* fusion);

static constexpr std::string_view name() {
return "AssignStreams";
}
};

} // namespace nvfuser::hir
21 changes: 7 additions & 14 deletions csrc/host_ir/evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ HostIrEvaluator::HostIrEvaluator(
communicator_(communicator),
params_(params),
expr_evaluator_(),
my_local_device_index_(communicator_ ? communicator_->local_rank() : 0),
my_local_device_index_(
communicator_ == nullptr ? 0 : communicator_->local_rank()),
ipc_handle_cache_(expr_evaluator_),
multicast_handle_cache_() {
const DeviceIdxType device_index =
(communicator_ != nullptr && communicator_->is_available())
? communicator_->deviceId()
: 0;
communicator_ == nullptr ? 0 : communicator_->deviceId();
if (isDebugDumpEnabled(DebugDumpOption::HostIr) && device_index == 0) {
container_->print(debug());
}
Expand Down Expand Up @@ -147,16 +146,10 @@ c10::cuda::CUDAStream HostIrEvaluator::getCUDAStream(Stream* stream) {
NVF_ERROR(value.hasValue() && value.is<int64_t>());
stream_key = value.as<int64_t>();
}
if (streams_.find(stream_key) == streams_.end()) {
auto i = (communicator_ != nullptr && communicator_->is_available())
? communicator_->deviceId()
: 0;
streams_.insert(
{stream_key,
c10::cuda::getStreamFromPool(
/*isHighPriority=*/false, static_cast<c10::DeviceIndex>(i))});
}
return streams_.at(stream_key);

auto [it, inserted] =
streams_.try_emplace(stream_key, c10::cuda::getStreamFromPool());
return it->second;
}

void HostIrEvaluator::handle(SetCurrentStream* set_current_stream) {
Expand Down
2 changes: 1 addition & 1 deletion csrc/host_ir/ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ std::string Stream::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "Stream ";
if (index() == nullptr) {
ss << name();
ss << static_cast<const void*>(this);
} else {
ss << index()->toInlineString();
}
Expand Down
1 change: 0 additions & 1 deletion csrc/host_ir/ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#include "ir/base_nodes.h"
#include "ir/builder.h"
#include "multidevice/communication.h"
#include "scheduler/heuristic.h"

namespace nvfuser {
// This works around a circular dependency: compiled_kernel.h ==>
Expand Down
2 changes: 2 additions & 0 deletions csrc/host_ir/passes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
#include "host_ir/passes.h"

#include "host_ir/allocate_and_deallocate.h"
#include "host_ir/assign_streams.h"

namespace nvfuser::hir {

void runPasses(HostIrContainer& hic) {
OptimizationPass<hir::AllocateAndDeallocate>::runPass(&hic);
OptimizationPass<hir::AssignStreams>::runPass(&hic);
}

} // namespace nvfuser::hir
2 changes: 1 addition & 1 deletion csrc/ir/internal_nodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
namespace nvfuser {

class ViewTransform;
class Scope;
class IrCloner;
struct AnalyzeViewResult;

Expand Down Expand Up @@ -2506,6 +2505,7 @@ class Scope {
return std::ssize(exprs_);
}

// Returns an iterator pointing to the inserted expression.
Iterator insert(Iterator pos, Expr* expr);

Iterator push_back(Expr* e) {
Expand Down
1 change: 0 additions & 1 deletion csrc/multidevice/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "multidevice/c10d_mock.h"
#endif

#include "exceptions.h"
#include "multidevice/multidevice.h"
#include "visibility.h"

Expand Down
4 changes: 1 addition & 3 deletions tests/cpp/test_multidevice_lower_communication_cuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,7 @@ INSTANTIATE_TEST_SUITE_P(
testing::Values(
2 * 1024 * 1024LL, // 2 MB
8 * 1024 * 1024LL, // 8 MB
32 * 1024 * 1024LL, // 32 MB
128 * 1024 * 1024LL, // 128 MB
256 * 1024 * 1024LL // 256 MB
32 * 1024 * 1024LL // 32 MB
),
testing::Values(
CommunicationProtocol::kMemcpy,
Expand Down
10 changes: 4 additions & 6 deletions tests/cpp/test_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@

namespace nvfuser {

class StreamTest : public NVFuserTest {
public:
StreamTest() {
EnableOptionsGuard::getCurOptions().set(EnableOption::HostIrLowering);
}
};
// The tests in this file verify building blocks for stream parallelism, e.g.,
// sharding propagation and KernelExecutor. End-to-end tests have been moved to
// tests/python/direct/test_stream.py because the Python API is sufficient.
using StreamTest = NVFuserTest;

TEST_F(StreamTest, AddPerStream) {
constexpr int64_t c = 3;
Expand Down
6 changes: 3 additions & 3 deletions tests/python/direct/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from nvfuser_direct import FusionDefinition, ParallelType, DataType


def test_matmul(nvfuser_direct_test):
def test_matmul():
c = 3

with FusionDefinition() as fd:
Expand Down Expand Up @@ -46,7 +46,7 @@ def test_matmul(nvfuser_direct_test):
assert event.input_shapes == [[5, 7], [7, 2], [5, 2]]


def test_two_matmuls_inlinable(nvfuser_direct_test):
def test_two_matmuls_inlinable():
c = 3

with FusionDefinition() as fd:
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_two_matmuls_inlinable(nvfuser_direct_test):
assert event.input_shapes[0][0] == 2


def test_two_matmuls_not_inlinable(nvfuser_direct_test):
def test_two_matmuls_not_inlinable():
c = 3

with FusionDefinition() as fd:
Expand Down
9 changes: 4 additions & 5 deletions tests/python/multidevice/benchmark_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ def wrapper(*args, **kwargs):

# Returns two functors, the first with profiler off and the second with profiler
# on. The first functor is usually used for warmup and the second for actual
# benchmarking. This way, one
# can collect stats of the first few non-warmup benchmark iterations using
# benchmarking. This way, one can collect stats of the first few non-warmup
# benchmark iterations using
# ```bash
# mpirun -np 1 nsys profile --capture-range=cudaProfilerApi --capture-range-end=repeat:<iterations> pytest tests/python/multidevice/<test_file>.py -k <filter> --only-mpi : -np <processes - 1> pytest tests/python/multidevice/<test_file>.py -k <filter> --only-mpi
# nsys profile --capture-range=cudaProfilerApi --capture-range-end=repeat:<iterations> mpirun -np <processes> pytest tests/python/multidevice/<test_file>.py -k <filter> --only-mpi
# ```
# and then display the stats using e.g. `nsys stats --report=cuda_gpu_kern_sum
# report1.nsys-rep`.
# and then display the stats using `nsys stats`.
def get_benchmark_fns(func):
return get_benchmark_fn(func, profile=False), get_benchmark_fn(func, profile=True)
Loading