Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ in
constexpr int kRows = 128;
constexpr int kCols = 128;
using DynShapeDim5 = Shape<1, 1, 1, kRows, kCols>;
using DynStridDim5 = Stride<1, 1, 1, kCols, 1>;
using DynStridDim5 = pto::Stride<1, 1, 1, kCols, 1>;
using GlobalData = GlobalTensor<float, DynShapeDim5, DynStridDim5>;
using TileData = Tile<TileType::Vec, float, kRows, kCols, BLayout::RowMajor, -1, -1>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ in
constexpr int kRows = 128;
constexpr int kCols = 128;
using DynShapeDim5 = Shape<1, 1, 1, kRows, kCols>;
using DynStridDim5 = Stride<1, 1, 1, kCols, 1>;
using DynStridDim5 = pto::Stride<1, 1, 1, kCols, 1>;
using GlobalData = GlobalTensor<float, DynShapeDim5, DynStridDim5>;
using TileData = Tile<TileType::Vec, float, kRows, kCols, BLayout::RowMajor, -1, -1>;

Expand Down
17 changes: 15 additions & 2 deletions examples/workers/l3/allreduce_distributed/test_allreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,29 @@
from .main import run


@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim", "a5"])
@pytest.mark.runtime("tensormap_and_ringbuffer")
@pytest.mark.device_count(2)
def test_allreduce_distributed(st_platform, st_device_ids):
assert len(st_device_ids) == 2
rc = run([int(d) for d in st_device_ids], platform=st_platform)
assert rc == 0


# >2-rank cases live in a separate function so a5 can be dropped via the
# function-level platforms mark (the harness deselects by that mark, not by
# per-param marks). a5 onboard CI exposes only 2 NPUs, and a device_count(N>2)
# job aborts the whole resource phase — which would also take down the 2-rank
# case above. Still covered on a2a3 hardware and both sims.
@pytest.mark.platforms(["a2a3sim", "a2a3", "a5sim"])
@pytest.mark.runtime("tensormap_and_ringbuffer")
@pytest.mark.parametrize(
"n_devices",
[
pytest.param(2, marks=pytest.mark.device_count(2)),
pytest.param(4, marks=pytest.mark.device_count(4)),
],
)
def test_allreduce_distributed(st_platform, st_device_ids, n_devices):
def test_allreduce_distributed_multi_rank(st_platform, st_device_ids, n_devices):
assert len(st_device_ids) == n_devices
rc = run([int(d) for d in st_device_ids], platform=st_platform)
assert rc == 0
244 changes: 152 additions & 92 deletions src/a2a3/platform/onboard/host/comm_hccl.cpp

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/a2a3/platform/onboard/host/device_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,11 @@ int DeviceRunner::ensure_acl_ready(int device_id) {
return -1;
}

// aclInit is process-wide; CANN returns 100002 if it has already been
// initialized (possibly by another owner), which we treat as success.
constexpr int kAclRepeatInit = 100002;
// aclInit is process-wide; CANN returns ACL_ERROR_REPEAT_INITIALIZE if it
// has already been initialized (possibly by another owner), which we
// treat as success.
aclError aRet = aclInit(nullptr);
if (aRet != ACL_SUCCESS && static_cast<int>(aRet) != kAclRepeatInit) {
if (aRet != ACL_SUCCESS && static_cast<int>(aRet) != ACL_ERROR_REPEAT_INITIALIZE) {
LOG_ERROR("aclInit failed: %d", static_cast<int>(aRet));
return static_cast<int>(aRet);
}
Expand Down
9 changes: 7 additions & 2 deletions src/a2a3/platform/onboard/host/device_runner.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,9 +429,14 @@ class DeviceRunner {

/**
* Destroy a stream previously returned by create_comm_stream().
* Tolerates a nullptr stream (returns 0).
* Tolerates a nullptr stream.
*
* @return 0 on success, error code on failure.
* Best-effort: any failure from aclrtSynchronizeStream /
* aclrtDestroyStream is logged but not propagated, since leaking a
* stream at teardown is strictly better than blocking device
* finalization.
*
* @return Always 0.
*/
int destroy_comm_stream(void *stream);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_
#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_
#pragma once

#include <stdint.h>

Expand Down Expand Up @@ -79,5 +78,3 @@ struct AICoreCompletionMailbox {
static_assert(
sizeof(AICoreCompletionMailbox) % PTO2_ALIGN_SIZE == 0, "AICoreCompletionMailbox size must be cache-line aligned"
);

#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_AICORE_COMPLETION_MAILBOX_H_
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_KERNEL_H_
#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_KERNEL_H_
#pragma once

#include <stdint.h>

Expand Down Expand Up @@ -142,5 +141,3 @@ send_request_entry(AsyncCtx &ctx, SdmaRequestDescriptor<DstTensor, SrcTensor, Sc
pto2::detail::defer_flush(ctx);
return true;
}

#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_KERNEL_H_
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_SCHEDULER_H_
#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_SCHEDULER_H_
#pragma once

#include <cstddef>
#include <cstdint>
Expand Down Expand Up @@ -65,5 +64,3 @@ inline void retire_sdma_event_record(uint64_t record_addr) {
__atomic_store_n(channel_info, packed, __ATOMIC_RELEASE);
cache_flush_range(const_cast<const void *>(reinterpret_cast<volatile void *>(channel_info)), sizeof(uint64_t));
}

#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_BACKEND_SDMA_SDMA_COMPLETION_SCHEDULER_H_
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef PTO_ASYNC_KERNEL_API_H
#define PTO_ASYNC_KERNEL_API_H
#pragma once

#include <stdint.h>

Expand Down Expand Up @@ -98,6 +97,10 @@ inline __aicore__ void defer_flush(AsyncCtx &ctx) {
inline __aicore__ AsyncCtx get_async_ctx(__gm__ int64_t *args) {
__gm__ LocalContext *lc =
reinterpret_cast<__gm__ LocalContext *>(static_cast<uintptr_t>(args[PAYLOAD_LOCAL_CONTEXT_INDEX]));
// Field-by-field copy is mandatory: CCE rejects `AsyncCtx ctx = lc->async_ctx;`
// because there is no implicit constructor that crosses the __gm__ address
// space into Local Memory. When a new field is added to AsyncCtx, mirror it
// below or this kernel path will silently see zero for that field.
AsyncCtx ctx{};
ctx.completion_count = lc->async_ctx.completion_count;
ctx.completion_error_code = lc->async_ctx.completion_error_code;
Expand Down Expand Up @@ -153,5 +156,3 @@ save_expected_notification_counter(AsyncCtx &ctx, volatile __gm__ void *counter_
(void)register_completion_condition(ctx, token);
pto2::detail::defer_flush(ctx);
}

#endif // PTO_ASYNC_KERNEL_API_H
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef PTO_ASYNC_WAIT_H
#define PTO_ASYNC_WAIT_H
#pragma once

#include <atomic>
#include <cstddef>
Expand Down Expand Up @@ -367,5 +366,3 @@ struct AsyncWaitList {
#endif
);
};

#endif // PTO_ASYNC_WAIT_H
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
* -----------------------------------------------------------------------------------------------------------
*/

#ifndef SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_
#define SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_
#pragma once

#include <stdint.h>

Expand Down Expand Up @@ -41,5 +40,3 @@ struct CompletionPollResult {
CompletionPollState state{CompletionPollState::PENDING};
int32_t error_code{PTO2_ERROR_NONE};
};

#endif // SRC_A2A3_RUNTIME_TENSORMAP_AND_RINGBUFFER_RUNTIME_PTO_COMPLETION_TOKEN_H_
17 changes: 17 additions & 0 deletions src/a5/platform/onboard/host/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ list(APPEND HOST_RUNTIME_SOURCES
"${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/l2_perf_collector.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/pmu_collector.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/../../src/host/tensor_dump_collector.cpp"
"${CMAKE_CURRENT_SOURCE_DIR}/comm_hccl.cpp"
)
if(DEFINED CUSTOM_SOURCE_DIRS)
foreach(SRC_DIR ${CUSTOM_SOURCE_DIRS})
Expand Down Expand Up @@ -80,6 +81,10 @@ target_include_directories(host_runtime
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}
${CMAKE_CURRENT_SOURCE_DIR}/../../include
# Shared platform_comm headers (comm.h / comm_context.h) live in
# src/common so a5 (HCCL) and a5/a2a3 sim (POSIX-shm) can use the
# same contract.
${CMAKE_CURRENT_SOURCE_DIR}/../../../../common
${CMAKE_CUSTOM_INCLUDE_DIRS}
PRIVATE
${ASCEND_HOME_PATH}/include
Expand All @@ -95,13 +100,25 @@ target_link_directories(host_runtime
${ASCEND_HOME_PATH}/runtime/lib64
)

# CANN 9.x exposes the working non-V2 HCCL entry points through libhcomm.
# Link it explicitly so comm_hccl.cpp can follow the same initialization path
# as the pto-isa communication tests.
find_library(HCOMM_LIB NAMES hcomm PATHS ${ASCEND_HOME_PATH}/lib64 NO_DEFAULT_PATH)
if(HCOMM_LIB)
set(HCCL_LINK_TARGETS ${HCOMM_LIB})
message(STATUS "Using HCCL library: ${HCOMM_LIB}")
else()
message(FATAL_ERROR "libhcomm not found under ${ASCEND_HOME_PATH}/lib64")
endif()

# Link against CANN runtime libraries
# ascend_hal is dynamically loaded at runtime via dlopen in device_runner
# when performance profiling is enabled
target_link_libraries(host_runtime
PRIVATE
runtime
ascendcl
${HCCL_LINK_TARGETS}
dl
)

Expand Down
Loading