From 1c6e2e93d0d9f57c8d020ad7c86fa0720db142ec Mon Sep 17 00:00:00 2001 From: wolegechu Date: Tue, 20 Jan 2026 10:07:30 +0800 Subject: [PATCH] chore(benchmarks): enhance safetensors loading strategies with host pack and 2D H2D support --- .../materialization/benchmarks/README.md | 47 +- ...afetensors_load_strategy_benchmark_main.cc | 1291 +++++++++++++++-- ...n2.5-32b-safetensors-loading-strategies.md | 221 ++- 3 files changed, 1415 insertions(+), 144 deletions(-) diff --git a/core/store/materialization/benchmarks/README.md b/core/store/materialization/benchmarks/README.md index 38a5b285..0c59cbe6 100644 --- a/core/store/materialization/benchmarks/README.md +++ b/core/store/materialization/benchmarks/README.md @@ -42,14 +42,17 @@ fallocate -l 8G /tmp/tensorcast_disk_bench.bin 通过 `--mode=...` 选择: - - `loader`:对比加载策略 A/B/C: + - `loader`:对比加载策略 A/B/C 以及 `host_pack` 变体: - A:把 payload 先整块读到 GPU “文件缓冲”(可选 GPU 内 pack 到最终布局)。 - B:先规划 segment,再 VMM reserve+map,按需把片段泵到最终地址。 - C:基于 B 的“已知完整计划”,按 checkpoint tensor 分组批处理:对非 contiguous slice(典型是 2D 的 `axis=1` 切片)改为顺序读大块 + GPU pack,避免海量碎片化小 IO;并对同源 slice 做读一次 + D2D 复制去重。 + - host_pack(`--strategy=host_pack` / `hp`):和 C 做同一件事(把 `axis=1` 的“每行一段”从碎片化读拉回到顺序读),但把 pack 从 GPU(`cudaMemcpy2DAsync` D2D)挪到 host:先顺序读 row-block 到 host staging,再在 CPU 上把列切片 pack 到 pinned host buffer,最后用 1D `cudaMemcpyAsync(H2D)` 写入最终 output(H2D 只传 `bytes(output)` 而不是 `bytes(disk)`)。 - `safetensors_disk_baseline`:把所有 safetensors payload bytes 按顺序 disk→GPU 读取(不做 per-tensor 规划)。 - `disk_baseline`:把单个大文件按顺序 disk→GPU 读取。 - `disk_fragmentation`:按固定 stride 读取大量小段(模拟碎片化/随机访问)。 - `gpu_peer_baseline`:测一次 GPU0→GPU1 的 `cudaMemcpyPeerAsync`。 +- `h2d_baseline`:测 pinned host→GPU 的 1D H2D 吞吐(`cudaMemcpyAsync`)。 +- `h2d_2d_baseline`:测 pinned host(strided 2D source)→GPU(packed 2D dst)的 H2D 吞吐(`cudaMemcpy2DAsync`;用于模拟 `axis=1` 列切片的典型形态)。 - `safetensors_hot_host_baseline`:**显式预热 OS page cache**(先顺序读取全部 payload 一次),再测 disk(buffered)→pinned host 吞吐。 - `safetensors_hot_disk_baseline`:**显式预热 OS page cache** 后,再测 disk(buffered)→pinned host→GPU 的端到端吞吐(含 H2D)。 - `safetensors_dram_mirror_host_baseline`:显式申请一块大 DRAM 缓冲,把 payload 拷入 DRAM(不计入测量),再测 **DRAM(userspace)→pinned host bounce buffer** 的吞吐;用于和 `safetensors_hot_host_baseline`(page cache→userspace)互相印证。 @@ -153,6 +156,24 @@ bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benc 说明: - `--strategy_c_staging_bytes`:用于“顺序读大块→GPU staging→GPU pack”的 staging 缓冲大小;越大通常越能减少每个大 tensor 的 chunk 次数,但也会占用更多显存。 +### Loader:Host pack(CPU pack + 1D H2D,对照 Strategy C) + +该模式用于和 Strategy C 对照,验证“把 pack 从 GPU copy engine 挪到 CPU”是否划算(尤其在热 cache / P2P source 很快、H2D 成为瓶颈时)。 + +```bash +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=loader \ + --strategy=host_pack \ + --safetensors_dir=/path/to/shards_dir \ + --device_id=0 \ + --use_pinned_host_buffer=true \ + --strategy_c_staging_bytes=$((1024*1024*1024)) \ + --load_plan_json_path=/path/to/loading-meta.json +``` + +说明: +- 该策略仍然会顺序读取“row-block 超集”(因此 `bytes(disk)` 与 C 类似),但 `bytes(h2d)` 会更接近 `bytes(output)`。 + ### Loader:一次跑 A 再跑 B(可选:抽样校验正确性) `--run_both_strategies` 会在一次运行中先跑 A 再跑 B,并启用 A vs B 的抽样对拍: @@ -307,7 +328,7 @@ bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benc - `--pinned_numa_node=N`:把 pinned host slab 绑定到 NUMA node N(best-effort) - `--pinned_numa_node=-2`:自动从 CUDA device 的 `/sys/bus/pci/devices//numa_node` 推断(TP>1 需要 `--h2d_per_gpu_pinned_pool=true`) - `--pinned_numa_prefault=true`:在 `cudaHostRegister` 前触页,避免“pin 时隐式 fault”导致的不可控 NUMA 放置 - - `--h2d_per_gpu_pinned_pool=true`:`h2d_baseline` 为每张 GPU 分配独立 pinned host pool(便于做 NUMA-local 对照) + - `--h2d_per_gpu_pinned_pool=true`:`h2d_baseline/h2d_2d_baseline` 为每张 GPU 分配独立 pinned host pool(便于做 NUMA-local 对照) ```bash bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ @@ -320,6 +341,28 @@ bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benc --use_pinned_host_buffer=true ``` +### H2D 2D 基线(pinned host(strided) → GPU(packed)) + +该模式用 `cudaMemcpy2DAsync(..., cudaMemcpyHostToDevice, ...)` 测“2D 形态”的 H2D 吞吐,常用于模拟 `axis=1` 列切片: + +- 源(host):row-major 2D tensor 的“整行 bytes”作为 `srcPitch`(`src_pitch_bytes = full_cols * elem_bytes`) +- 目标(GPU):按 slice 列宽紧凑存储,`dstPitch = widthBytes = slice_cols * elem_bytes` +- copy 参数:`widthBytes = slice_cols * elem_bytes`,`height = rows` + +默认参数近似 Qwen2.5-32B 的 MLP gate/up 权重(fp16,`[rows=5120, cols=27648]`,TP=4 时每 rank `slice_cols=6912`): + +```bash +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=h2d_2d_baseline \ + --tp_world_size=1 --tp_devices=0 \ + --h2d_bench_bytes=$((8*1024*1024*1024)) \ + --h2d_2d_width_bytes=13824 \ + --h2d_2d_height=5120 \ + --h2d_2d_src_pitch_bytes=55296 \ + --h2d_2d_dst_pitch_bytes=13824 \ + --use_pinned_host_buffer=true +``` + ## Strategy D:预物化(materialize once, load many) 该策略面向“重复加载同一个模型(相同 TP 配置 / 相同计划文件)”的场景:第一次用 Strategy C 生成 output layout 后落盘成连续文件;后续加载可用顺序 I/O 读回,避免 Strategy C 为 `axis=1` 付出的 1.3–2.0× 读放大。 diff --git a/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark_main.cc b/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark_main.cc index 44701737..eeb12917 100644 --- a/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark_main.cc +++ b/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark_main.cc @@ -114,6 +114,7 @@ enum class BenchMode : uint8_t { kSafetensorsDiskBaseline = 4, kSafetensorsHostBaseline = 5, kH2dBaseline = 6, + kH2d2dBaseline = 16, kSafetensorsODirectHostBaseline = 7, kSafetensorsODirectDiskBaseline = 8, kSafetensorsHotHostBaseline = 11, @@ -124,7 +125,7 @@ enum class BenchMode : uint8_t { kMaterializedDiskBaseline = 14, kSafetensorsDramMirrorHostBaseline = 15, }; -enum class StrategyKind : uint8_t { kA_Eager = 0, kB_LazyCommit = 1, kC_BatchedOptimal = 2 }; +enum class StrategyKind : uint8_t { kA_Eager = 0, kB_LazyCommit = 1, kC_BatchedOptimal = 2, kC_HostPack = 3 }; enum class NcclOp : uint8_t { kBroadcast = 0, kSendrecv = 1 }; constexpr uint64_t kCorrectnessSampleMaxBytes = 4ull * 1024ull * 1024ull; @@ -210,6 +211,32 @@ class NullPositionedSink final : public PositionedSink { uint64_t bytes_written_ = 0; }; +class HostBufferSink final : public PositionedSink { + public: + HostBufferSink(gsl::not_null base, uint64_t total_size) : base_(base), total_size_(total_size) {} + + absl::Status write_at(uint64_t offset, const void* src, size_t bytes) override { + if (bytes == 0) { + return absl::OkStatus(); + } + if (total_size_ > 0 && offset + static_cast(bytes) > total_size_) { + return absl::OutOfRangeError("HostBufferSink: write exceeds total_size"); + } + std::memcpy(base_.get() + offset, src, bytes); + bytes_written_ += static_cast(bytes); + return absl::OkStatus(); + } + + [[nodiscard]] uint64_t bytes_written() const { + return bytes_written_; + } + + private: + gsl::not_null base_; + uint64_t total_size_ = 0; + uint64_t bytes_written_ = 0; +}; + struct RunResult { StrategyKind strategy{}; PhaseTimesSec t{}; @@ -295,6 +322,13 @@ struct LoaderConfig { // H2D microbench params (pinned host -> GPU) uint64_t h2d_bench_bytes = 8ull * 1024ull * 1024ull * 1024ull; // 8 GiB bool h2d_per_gpu_pinned_pool = false; + + // H2D 2D microbench params (pinned host strided source -> packed GPU dst; typical for axis=1 slices). + // Defaults approximate Qwen2.5-32B MLP gate/up weight ([rows=5120, cols=27648], fp16; TP=4 slice width=6912). + uint64_t h2d_2d_width_bytes = 13824; // 6912 * 2 + uint64_t h2d_2d_height = 5120; + uint64_t h2d_2d_src_pitch_bytes = 55296; // 27648 * 2 + uint64_t h2d_2d_dst_pitch_bytes = 13824; // packed }; // Forward declarations (used by Strategy D helpers). @@ -780,6 +814,9 @@ absl::StatusOr parse_mode(std::string_view s) { if (s == "h2d_baseline") { return BenchMode::kH2dBaseline; } + if (s == "h2d_2d_baseline") { + return BenchMode::kH2d2dBaseline; + } if (s == "safetensors_o_direct_host_baseline") { return BenchMode::kSafetensorsODirectHostBaseline; } @@ -989,6 +1026,9 @@ absl::StatusOr parse_strategy(std::string_view s) { if (s == "c" || s == "C" || s == "batched_optimal" || s == "optimal") { return StrategyKind::kC_BatchedOptimal; } + if (s == "hp" || s == "host_pack" || s == "c_host_pack") { + return StrategyKind::kC_HostPack; + } return absl::InvalidArgumentError(absl::StrCat("Unknown --strategy: ", s)); } @@ -3512,15 +3552,20 @@ absl::StatusOr> run_strategy_c_with_state( return std::make_pair(r, std::move(s)); } -absl::StatusOr> run_strategy_b_with_state( +absl::StatusOr> run_strategy_c_host_pack_with_state( const LoaderConfig& cfg, const std::vector& shards) { RunResult r; - r.strategy = StrategyKind::kB_LazyCommit; - StrategyBState s; + r.strategy = StrategyKind::kC_HostPack; + StrategyCState s; + StrategyCStats st; const absl::Time t_total = absl::Now(); const auto sched_before = loader::get_gpu_scheduler_stats(cfg.device_id); + if (!cfg.use_pinned_host_buffer) { + return absl::FailedPreconditionError("Strategy host_pack requires --use_pinned_host_buffer=true"); + } + // Meta double open_meta = 0.0; auto meta_or = load_metas_from_safetensors(shards, &open_meta); @@ -3528,180 +3573,808 @@ absl::StatusOr> run_strategy_b_with_state( return meta_or.status(); } s.metas = std::move(meta_or->first); - const uint64_t total_payload_bytes = meta_or->second; r.t.open_meta = open_meta; - if (!cfg.use_pinned_host_buffer) { - return absl::FailedPreconditionError("Strategy B requires --use_pinned_host_buffer=true"); - } - + // Plan const absl::Time t_get = absl::Now(); if (!cfg.load_plan_json_path.empty()) { - RankLoadPlan plan; - TC_ASSIGN_OR_RETURN(plan, build_rank_load_plan(cfg, s.metas)); - r.selected_tensors = plan.unique_tensors; - r.selected_copies = plan.copies; - r.output_bytes = plan.output_bytes; - s.dst_bytes = plan.output_bytes; - s.segments = std::move(plan.segments); + uint64_t unique_tensors = 0; + TC_ASSIGN_OR_RETURN(s.copies, parse_plan_copy_instances_for_rank(cfg, s.metas, &unique_tensors)); + r.selected_tensors = unique_tensors; + r.selected_copies = s.copies.size(); } else { auto selected_or = select_tensors_all(s.metas); if (!selected_or.ok()) { return selected_or.status(); } - s.selected = *selected_or; - r.selected_tensors = s.selected.size(); - r.selected_copies = s.selected.size(); - - // Strategy B uses disk-only direct shard reads. To maximize disk locality, order - // requests by on-disk source offset (i.e., shard file order) instead of tensor - // name order. - std::sort(s.selected.begin(), s.selected.end(), [](const TensorMeta* a, const TensorMeta* b) { + std::vector selected = *selected_or; + std::sort(selected.begin(), selected.end(), [](const TensorMeta* a, const TensorMeta* b) { if (a->offset != b->offset) { return a->offset < b->offset; } return a->name < b->name; }); - - uint64_t dst_cursor = 0; - s.segments.clear(); - s.tensor_plans.clear(); - s.tensor_plans.reserve(s.selected.size()); - for (const TensorMeta* tm : s.selected) { - auto segs_or = plan_tensor_segments(*tm, cfg, dst_cursor); - if (!segs_or.ok()) { - return segs_or.status(); + s.copies.clear(); + s.copies.reserve(selected.size()); + for (const TensorMeta* tm : selected) { + PlannedCopyInstance inst; + inst.meta = tm; + inst.ckpt_name = tm->name; + inst.dst_param = tm->name; + inst.slices = {}; + inst.bytes = tm->size; + inst.contiguous = true; + inst.contiguous_src_offset = tm->offset; + inst.src_sort_key = tm->offset; + if (cfg.tp_world_size > 1) { + TpContiguousSlice slice; + TC_ASSIGN_OR_RETURN(slice, compute_tp_contiguous_slice(*tm, cfg, cfg.tp_rank)); + inst.bytes = slice.bytes; + inst.contiguous_src_offset = tm->offset + slice.offset_in_tensor; + inst.src_sort_key = inst.contiguous_src_offset; } - const auto& segs = *segs_or; - uint64_t tensor_bytes = 0; - for (const auto& seg : segs) { - s.segments.push_back(seg); - tensor_bytes += seg.bytes; - dst_cursor += seg.bytes; + s.copies.push_back(std::move(inst)); + } + r.selected_tensors = selected.size(); + r.selected_copies = selected.size(); + std::sort(s.copies.begin(), s.copies.end(), [](const PlannedCopyInstance& a, const PlannedCopyInstance& b) { + if (a.src_sort_key != b.src_sort_key) { + return a.src_sort_key < b.src_sort_key; } - TensorSlicePlan tp; - tp.name = tm->name; - tp.dst_offset = dst_cursor - tensor_bytes; - tp.bytes = tensor_bytes; - if (tm->shape.size() >= 2) { - tp.rows = tm->shape[0]; - tp.cols = tm->shape[1]; + if (a.ckpt_name != b.ckpt_name) { + return a.ckpt_name < b.ckpt_name; } - tp.elem_size = tm->elem_size; - s.tensor_plans.push_back(std::move(tp)); - } - s.dst_bytes = dst_cursor; - r.output_bytes = dst_cursor; + return a.dst_param < b.dst_param; + }); } - r.t.get_calls_total = seconds_since(t_get); - { - auto [merged_segments, planner_stats] = merge_adjacent_segments_by_src(std::move(s.segments)); - s.segments = std::move(merged_segments); - r.planned_segments = planner_stats.segments_post_merge; - r.planner_segments_pre_merge = planner_stats.segments_pre_merge; - r.planner_segments_merged = (planner_stats.segments_pre_merge >= planner_stats.segments_post_merge) - ? (planner_stats.segments_pre_merge - planner_stats.segments_post_merge) - : 0; - r.planner_src_runs = planner_stats.src_runs_post_merge; - r.planner_src_run_avg_bytes = planner_stats.src_run_avg_bytes; - r.planner_src_run_max_bytes = planner_stats.src_run_max_bytes; + + uint64_t dst_cursor = 0; + for (auto& c : s.copies) { + c.dst_offset = dst_cursor; + dst_cursor += c.bytes; } + s.output_bytes = dst_cursor; + r.output_bytes = dst_cursor; + r.t.get_calls_total = seconds_since(t_get); - // commit: reserve target VA, then pump disk ranges into final mapped addresses. + // Output VMM allocation. const absl::Time t_commit = absl::Now(); TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(cfg.device_id)); - TC_RETURN_IF_ERROR(s.vmm.reserve_and_map(s.dst_bytes, cfg.device_id)); + TC_RETURN_IF_ERROR(s.vmm.reserve_and_map(s.output_bytes, cfg.device_id)); r.res.vmm_reserved_bytes = s.vmm.reserved_bytes(); r.res.vmm_mapped_bytes = s.vmm.mapped_bytes(); r.res.vmm_granularity_bytes = s.vmm.granularity_bytes(); MultiSafetensorsSource backing_src(shards); - // Convert segment list into remapped segments and pump ranges in destination space. - std::vector remap; - remap.reserve(s.segments.size()); - for (const auto& seg : s.segments) { - if (seg.bytes == 0) { - continue; - } - remap.push_back( - RemappedSource::Segment{ - .dst_offset = seg.dst_offset, - .src_offset = seg.src_offset, - .end_offset = seg.dst_offset + seg.bytes, - }); - } - RemappedSource src(gsl::not_null(&backing_src), std::move(remap)); - - loader::GpuMemorySink sink( - loader::GpuMemorySink::Options{ - .gpu_base_ptr = gsl::not_null{s.vmm.base_ptr()}, - .total_size = s.dst_bytes, - .chunk_size = 128 * 1024 * 1024, - .device_id = cfg.device_id, - .allocation = nullptr, - .gpu_sched_enabled = cfg.gpu_sched_enabled, - .gpu_sched_limit_bytes = cfg.gpu_sched_limit_bytes, - .gpu_sched_limit_copies = cfg.gpu_sched_limit_copies, - }); - - const int io_threads = std::max(1, cfg.io_threads); BounceBufferPlan bb; TC_ASSIGN_OR_RETURN(bb, plan_bounce_buffer(cfg)); std::unique_ptr pool_ptr; TC_ASSIGN_OR_RETURN(pool_ptr, make_bounce_buffer_pool(cfg, bb, &r.res)); + const int io_threads = std::max(1, cfg.io_threads); - uint64_t total_requested_bytes = 0; - uint64_t planned_ranges = 0; - auto ranges_or = build_pump_ranges_for_copy(s.segments, io_threads, &total_requested_bytes, &planned_ranges); - if (!ranges_or.ok()) { - return ranges_or.status(); - } - const auto& ranges = *ranges_or; - r.planned_ranges = planned_ranges; - r.bytes.disk_read_bytes = total_requested_bytes; - r.bytes.h2d_bytes = total_requested_bytes; - r.output_bytes = total_requested_bytes; - (void)total_payload_bytes; + // Group copies by checkpoint tensor to enable batched decisions. + struct TensorGroup { + const TensorMeta* meta = nullptr; + std::vector idx; + bool staged = false; + }; - TC_RETURN_IF_ERROR( - loader::pump_ranges(src, sink, *pool_ptr, ranges, io_threads, pump_benchmark_runtime().blocking_executor())); - TC_RETURN_IF_ERROR(sink.close()); - r.t.commit = seconds_since(t_commit); - const auto sched_after = loader::get_gpu_scheduler_stats(cfg.device_id); - if (sched_after.waits >= sched_before.waits) { - r.gpu_sched_waits = sched_after.waits - sched_before.waits; + absl::flat_hash_map group_index; + group_index.reserve(s.copies.size()); + std::vector groups; + groups.reserve(s.copies.size()); + for (size_t i = 0; i < s.copies.size(); ++i) { + const TensorMeta* tm = s.copies[i].meta; + auto [it, inserted] = group_index.emplace(tm, groups.size()); + if (inserted) { + TensorGroup g; + g.meta = tm; + groups.push_back(std::move(g)); + } + groups[it->second].idx.push_back(i); } - if (sched_after.wait_sec >= sched_before.wait_sec) { - r.gpu_sched_wait_sec = sched_after.wait_sec - sched_before.wait_sec; + + // Decide staged tensors; allocate host staging + pinned pack buffers if needed. + bool needs_staging = false; + for (auto& g : groups) { + const TensorMeta& tm = *g.meta; + bool any_non_contig = false; + bool axes_ok = true; + for (const size_t ci : g.idx) { + if (!s.copies[ci].contiguous) { + any_non_contig = true; + } + for (const auto& sl : s.copies[ci].slices) { + if (sl.axis < 0 || sl.axis > 1) { + axes_ok = false; + } + } + } + bool can_pack_2d = false; + if (tm.shape.size() == 2 && axes_ok) { + TC_ASSIGN_OR_RETURN(can_pack_2d, is_row_major_contiguous(tm)); + } + g.staged = any_non_contig && can_pack_2d; + if (g.staged) { + needs_staging = true; + } } - r.t.total_ready = seconds_since(t_total); - return std::make_pair(r, std::move(s)); -} + std::unique_ptr host_staging(nullptr, &free); + uint64_t host_staging_bytes = 0; -absl::Status check_correctness_samples(const LoaderConfig& cfg, const StrategyAState& a, const StrategyBState& b) { - if (cfg.check_correctness_samples == 0) { - return absl::OkStatus(); - } - if (b.vmm.base_ptr() == nullptr) { - return absl::FailedPreconditionError("Strategy B VMM buffer not available for correctness check"); - } - if (a.output == nullptr && !a.gpu_payload) { - return absl::FailedPreconditionError("Strategy A buffers not available for correctness check"); - } + std::unique_ptr host_pack_pool; + uint8_t* host_pack_buf0 = nullptr; + uint8_t* host_pack_buf1 = nullptr; + size_t host_pack_buf_bytes = 0; + cudaStream_t h2d_stream = nullptr; + cudaEvent_t h2d_done[2]{nullptr, nullptr}; + bool h2d_used[2]{false, false}; - std::vector names; - names.reserve(std::min(cfg.check_correctness_samples, b.tensor_plans.size())); - for (const auto& tp : b.tensor_plans) { - names.push_back(tp.name); - if (names.size() >= cfg.check_correctness_samples) { - break; + if (needs_staging) { + s.staging_bytes = std::max(1, cfg.strategy_c_staging_bytes); + host_staging_bytes = s.staging_bytes; + if (host_staging_bytes > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError("host_pack: staging_bytes overflows size_t"); + } + host_staging.reset(reinterpret_cast(aligned_alloc(4096, static_cast(host_staging_bytes)))); + if (host_staging == nullptr) { + return absl::ResourceExhaustedError("host_pack: failed to allocate host staging buffer"); } - } - std::sort(names.begin(), names.end()); - const auto* a_payload_base = a.gpu_payload ? static_cast(a.gpu_payload->get()) : nullptr; + // Compute an upper bound on the max packed output bytes per chunk, used to size pinned pack buffers. + uint64_t max_packed_chunk_bytes = 0; + for (const auto& g : groups) { + if (!g.staged) { + continue; + } + const TensorMeta& tm = *g.meta; + if (tm.shape.size() != 2 || tm.elem_size == 0) { + continue; + } + + // Union row range across copies in this tensor. + Slice2D union_slice; + union_slice.row_start = std::numeric_limits::max(); + union_slice.row_size = 0; + union_slice.col_start = 0; + union_slice.col_size = tm.shape[1]; + for (const size_t ci : g.idx) { + Slice2D sl; + TC_ASSIGN_OR_RETURN(sl, extract_slice_2d(tm, s.copies[ci].slices)); + union_slice.row_start = std::min(union_slice.row_start, sl.row_start); + union_slice.row_size = std::max(union_slice.row_size, sl.row_start + sl.row_size); + } + if (union_slice.row_start == std::numeric_limits::max()) { + union_slice.row_start = 0; + union_slice.row_size = 0; + } + const int64_t row_start = std::max(0, union_slice.row_start); + const int64_t row_end = std::min(tm.shape[0], union_slice.row_size); + if (row_end <= row_start) { + continue; + } + + const uint64_t cols = static_cast(tm.shape[1]); + const uint64_t elem = static_cast(tm.elem_size); + const uint64_t row_bytes_u64 = cols * elem; + if (row_bytes_u64 == 0) { + continue; + } + const uint64_t staging_bytes = s.staging_bytes; + uint64_t max_rows_per_chunk = (staging_bytes / row_bytes_u64); + if (max_rows_per_chunk == 0) { + max_rows_per_chunk = 1; + } + for (const size_t ci : g.idx) { + Slice2D sl; + TC_ASSIGN_OR_RETURN(sl, extract_slice_2d(tm, s.copies[ci].slices)); + if (sl.row_size <= 0 || sl.col_size <= 0) { + continue; + } + const uint64_t rows_u64 = std::min(max_rows_per_chunk, static_cast(sl.row_size)); + const uint64_t col_bytes = static_cast(sl.col_size) * elem; + if (col_bytes == 0) { + continue; + } + if (rows_u64 > (std::numeric_limits::max() / col_bytes)) { + return absl::InvalidArgumentError("host_pack: packed bytes overflow"); + } + max_packed_chunk_bytes = std::max(max_packed_chunk_bytes, rows_u64 * col_bytes); + } + } + + constexpr uint64_t kAlign = common::memory::PinnedBufferPool::kDirectIOAlignment; + const uint64_t pack_chunk_u64 = + std::max(kAlign, ((max_packed_chunk_bytes + kAlign - 1) / kAlign) * kAlign); + if (pack_chunk_u64 > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError("host_pack: pinned pack chunk bytes overflow size_t"); + } + const size_t pack_chunk_bytes = static_cast(pack_chunk_u64); + if (pack_chunk_bytes > (std::numeric_limits::max() / 2u)) { + return absl::InvalidArgumentError("host_pack: pinned pack pool bytes overflow size_t"); + } + const size_t pack_total_bytes = pack_chunk_bytes * 2; + + common::memory::PinnedBufferPool::Options pool_opts; + pool_opts.name = "host_pack"; + pool_opts.prefault = cfg.pinned_numa_prefault; + TC_ASSIGN_OR_RETURN(pool_opts.numa_node, resolve_pinned_numa_node_for_device(cfg, cfg.device_id)); + host_pack_pool = + std::make_unique(pack_total_bytes, pack_chunk_bytes, std::move(pool_opts)); + const auto slabs = host_pack_pool->list_slabs(); + if (slabs.empty()) { + return absl::InternalError("host_pack: pinned pack pool has no slabs"); + } + size_t pinned_total = 0; + for (const auto& slab : slabs) { + pinned_total += slab.bytes; + } + r.res.pinned_host_bytes += pinned_total; + + host_pack_buf_bytes = host_pack_pool->slice_bytes(); + if (slabs[0].bytes < (host_pack_buf_bytes * 2)) { + return absl::InternalError("host_pack: pinned slab smaller than 2 slices"); + } + auto* base = reinterpret_cast(slabs[0].base.get()); + host_pack_buf0 = base; + host_pack_buf1 = base + host_pack_buf_bytes; + + TC_RETURN_IF_ERROR(tensorcast::cuda::stream_create_with_flags(&h2d_stream, cudaStreamNonBlocking)); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_create_with_flags(&h2d_done[0], cudaEventDisableTiming)); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_create_with_flags(&h2d_done[1], cudaEventDisableTiming)); + } + + // Stream for D2D copies (dedup) and 2D pack on GPU (if needed). + cudaStream_t pack_stream = nullptr; + TC_RETURN_IF_ERROR(tensorcast::cuda::stream_create_with_flags(&pack_stream, cudaStreamNonBlocking)); + double commit_sec = 0.0; + double pack_sec = 0.0; + + auto flush_direct = [&](std::vector& pending, std::vector& d2d_ops) -> absl::Status { + if (pending.empty()) { + if (!d2d_ops.empty()) { + return absl::InternalError("Internal error: d2d_ops not empty while pending segments empty"); + } + return absl::OkStatus(); + } + auto [merged, planner_stats] = merge_adjacent_segments_by_src(std::move(pending)); + pending.clear(); + pending = std::move(merged); + uint64_t max_dst_end = 0; + for (const auto& seg : pending) { + const uint64_t end = seg.dst_offset + static_cast(seg.bytes); + max_dst_end = std::max(max_dst_end, end); + } + if (max_dst_end > s.output_bytes) { + return absl::OutOfRangeError("Strategy host_pack: direct segment write exceeds output buffer bounds"); + } + r.planned_segments += planner_stats.segments_post_merge; + r.planner_segments_pre_merge += planner_stats.segments_pre_merge; + r.planner_segments_merged += (planner_stats.segments_pre_merge >= planner_stats.segments_post_merge) + ? (planner_stats.segments_pre_merge - planner_stats.segments_post_merge) + : 0; + r.planner_src_runs += planner_stats.src_runs_post_merge; + r.planner_src_run_max_bytes = std::max(r.planner_src_run_max_bytes, planner_stats.src_run_max_bytes); + + std::vector remap; + remap.reserve(pending.size()); + for (const auto& seg : pending) { + remap.push_back( + RemappedSource::Segment{ + .dst_offset = seg.dst_offset, + .src_offset = seg.src_offset, + .end_offset = seg.dst_offset + seg.bytes, + }); + } + RemappedSource src(gsl::not_null(&backing_src), std::move(remap)); + + loader::GpuMemorySink sink( + loader::GpuMemorySink::Options{ + .gpu_base_ptr = gsl::not_null{s.vmm.base_ptr()}, + // Strategy C performs sparse writes into the final output buffer; do + // not enforce "must write exactly total_size bytes" on close(). + .total_size = 0, + .chunk_size = 128 * 1024 * 1024, + .device_id = cfg.device_id, + .allocation = nullptr, + .gpu_sched_enabled = cfg.gpu_sched_enabled, + .gpu_sched_limit_bytes = cfg.gpu_sched_limit_bytes, + .gpu_sched_limit_copies = cfg.gpu_sched_limit_copies, + }); + + uint64_t total_requested_bytes = 0; + uint64_t planned_ranges = 0; + auto ranges_or = build_pump_ranges_for_copy(pending, io_threads, &total_requested_bytes, &planned_ranges); + if (!ranges_or.ok()) { + return ranges_or.status(); + } + const auto& ranges = *ranges_or; + r.planned_ranges += planned_ranges; + r.bytes.disk_read_bytes += total_requested_bytes; + r.bytes.h2d_bytes += total_requested_bytes; + + if (auto* adapter = dynamic_cast(pool_ptr.get()); adapter != nullptr) { + TC_RETURN_IF_ERROR(adapter->get_buffer()->reset_for_new_production()); + } + + const absl::Time t = absl::Now(); + TC_RETURN_IF_ERROR( + loader::pump_ranges(src, sink, *pool_ptr, ranges, io_threads, pump_benchmark_runtime().blocking_executor())); + TC_RETURN_IF_ERROR(sink.close()); + commit_sec += seconds_since(t); + + if (!d2d_ops.empty()) { + const absl::Time t_d2d = absl::Now(); + auto* base = static_cast(s.vmm.base_ptr()); + for (const auto& op : d2d_ops) { + if (op.bytes == 0) { + continue; + } + TC_RETURN_IF_ERROR( + tensorcast::cuda::memcpy_async( + base + op.dst_dst_offset, base + op.src_dst_offset, op.bytes, cudaMemcpyDeviceToDevice, pack_stream)); + r.bytes.d2d_bytes += static_cast(op.bytes); + } + TC_RETURN_IF_ERROR(tensorcast::cuda::stream_synchronize(pack_stream)); + pack_sec += seconds_since(t_d2d); + d2d_ops.clear(); + } + pending.clear(); + return absl::OkStatus(); + }; + + std::sort(groups.begin(), groups.end(), [](const TensorGroup& a, const TensorGroup& b) { + return a.meta->offset < b.meta->offset; + }); + + std::vector pending_direct; + std::vector pending_d2d; + pending_direct.reserve(s.copies.size()); + pending_d2d.reserve(s.copies.size()); + + size_t next_host_pack_buf = 0; + + // Execute in on-disk tensor order to preserve sequential access. + for (auto& g : groups) { + const TensorMeta& tm = *g.meta; + if (g.staged) { + ++st.staged_tensors; + TC_RETURN_IF_ERROR(flush_direct(pending_direct, pending_d2d)); + + if (!needs_staging || host_staging == nullptr || host_staging_bytes == 0) { + return absl::InternalError("host_pack: staged tensor but host staging buffers are not initialized"); + } + + // Compute union row range for staged reads. + Slice2D union_slice; + union_slice.row_start = std::numeric_limits::max(); + union_slice.row_size = 0; + union_slice.col_start = 0; + union_slice.col_size = tm.shape[1]; + for (const size_t ci : g.idx) { + Slice2D sl; + TC_ASSIGN_OR_RETURN(sl, extract_slice_2d(tm, s.copies[ci].slices)); + union_slice.row_start = std::min(union_slice.row_start, sl.row_start); + union_slice.row_size = std::max(union_slice.row_size, sl.row_start + sl.row_size); + } + if (union_slice.row_start == std::numeric_limits::max()) { + union_slice.row_start = 0; + union_slice.row_size = 0; + } + const int64_t row_start = std::max(0, union_slice.row_start); + const int64_t row_end = std::min(tm.shape[0], union_slice.row_size); + if (row_end <= row_start) { + continue; + } + const uint64_t cols = static_cast(tm.shape[1]); + const uint64_t elem = static_cast(tm.elem_size); + const uint64_t row_bytes_u64 = cols * elem; + if (row_bytes_u64 == 0 || row_bytes_u64 > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError(absl::StrCat("Row byte size overflow for tensor ", tm.name)); + } + const uint64_t staging_bytes = host_staging_bytes; + uint64_t max_rows_per_chunk = (staging_bytes / row_bytes_u64); + if (max_rows_per_chunk == 0) { + max_rows_per_chunk = 1; + } + + auto* out_base = static_cast(s.vmm.base_ptr()); + auto* staging_base = host_staging.get(); + for (int64_t row = row_start; row < row_end;) { + const int64_t chunk_rows = std::min(static_cast(max_rows_per_chunk), row_end - row); + const uint64_t chunk_bytes = static_cast(chunk_rows) * row_bytes_u64; + if (chunk_bytes > host_staging_bytes) { + return absl::InternalError("host_pack: computed chunk_bytes exceeds host staging buffer"); + } + + std::vector remap; + remap.push_back( + RemappedSource::Segment{ + .dst_offset = 0, + .src_offset = tm.offset + static_cast(row) * row_bytes_u64, + .end_offset = chunk_bytes, + }); + RemappedSource src(gsl::not_null(&backing_src), std::move(remap)); + + HostBufferSink sink(gsl::not_null{staging_base}, chunk_bytes); + + const auto ranges = split_even_ranges(/*base=*/0, chunk_bytes, io_threads); + r.planned_ranges += ranges.size(); + r.planned_segments += 1; + r.bytes.disk_read_bytes += chunk_bytes; + + if (auto* adapter = dynamic_cast(pool_ptr.get()); adapter != nullptr) { + TC_RETURN_IF_ERROR(adapter->get_buffer()->reset_for_new_production()); + } + + const absl::Time t = absl::Now(); + TC_RETURN_IF_ERROR( + loader::pump_ranges( + src, sink, *pool_ptr, ranges, io_threads, pump_benchmark_runtime().blocking_executor())); + commit_sec += seconds_since(t); + ++st.staged_reads; + + // Host pack: staged host buffer -> pinned host -> H2D into final output layout. + for (const size_t ci : g.idx) { + const PlannedCopyInstance& copy = s.copies[ci]; + Slice2D sl; + TC_ASSIGN_OR_RETURN(sl, extract_slice_2d(tm, copy.slices)); + + const int64_t copy_row_start = sl.row_start; + const int64_t copy_row_end = sl.row_start + sl.row_size; + const int64_t chunk_row_start = row; + const int64_t chunk_row_end = row + chunk_rows; + const int64_t inter_start = std::max(copy_row_start, chunk_row_start); + const int64_t inter_end = std::min(copy_row_end, chunk_row_end); + if (inter_end <= inter_start || sl.col_size <= 0 || sl.row_size <= 0) { + continue; + } + + const uint64_t inter_rows = static_cast(inter_end - inter_start); + const uint64_t col_bytes_u64 = static_cast(sl.col_size) * elem; + if (col_bytes_u64 == 0 || col_bytes_u64 > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError("host_pack: col_bytes overflow"); + } + if (inter_rows > (std::numeric_limits::max() / col_bytes_u64)) { + return absl::InvalidArgumentError("host_pack: packed bytes overflow"); + } + const uint64_t packed_bytes_u64 = inter_rows * col_bytes_u64; + if (packed_bytes_u64 == 0) { + continue; + } + if (packed_bytes_u64 > static_cast(host_pack_buf_bytes)) { + return absl::OutOfRangeError("host_pack: packed_bytes exceeds pinned host pack buffer"); + } + + const uint64_t src_row_off = static_cast(inter_start - chunk_row_start); + const uint64_t dst_row_off = static_cast(inter_start - copy_row_start); + const uint64_t src_col_off = static_cast(sl.col_start) * elem; + + const uint8_t* src_ptr = staging_base + src_row_off * row_bytes_u64 + src_col_off; + uint8_t* dst_ptr = out_base + copy.dst_offset + dst_row_off * col_bytes_u64; + + // Wait for the selected pack buffer to be free before overwriting. + const size_t buf_idx = next_host_pack_buf % 2; + next_host_pack_buf += 1; + uint8_t* pack_buf = (buf_idx == 0) ? host_pack_buf0 : host_pack_buf1; + if (pack_buf == nullptr || h2d_done[buf_idx] == nullptr || h2d_stream == nullptr) { + return absl::InternalError("host_pack: pinned pack buffers or CUDA resources are not initialized"); + } + if (h2d_used[buf_idx]) { + bool ready = false; + TC_RETURN_IF_ERROR(tensorcast::cuda::event_query(h2d_done[buf_idx], &ready)); + if (!ready) { + const absl::Time t_wait = absl::Now(); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_synchronize(h2d_done[buf_idx])); + commit_sec += seconds_since(t_wait); + } + } + + // CPU pack (row-by-row memcpy) into pinned host buffer. + const absl::Time t_pack = absl::Now(); + const size_t col_bytes = static_cast(col_bytes_u64); + const size_t src_pitch = static_cast(row_bytes_u64); + const size_t rows = static_cast(inter_rows); + for (size_t rr = 0; rr < rows; ++rr) { + const uint8_t* src_row = src_ptr + rr * src_pitch; + std::memcpy(pack_buf + rr * col_bytes, src_row, col_bytes); + } + pack_sec += seconds_since(t_pack); + + TC_RETURN_IF_ERROR( + tensorcast::cuda::memcpy_async( + dst_ptr, pack_buf, static_cast(packed_bytes_u64), cudaMemcpyHostToDevice, h2d_stream)); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_record(h2d_done[buf_idx], h2d_stream)); + h2d_used[buf_idx] = true; + r.bytes.h2d_bytes += packed_bytes_u64; + } + + row += chunk_rows; + } + continue; + } + + // Direct (contiguous) reads; deduplicate identical src slices within a tensor. + absl::flat_hash_map> by_key; + by_key.reserve(g.idx.size()); + for (const size_t ci : g.idx) { + const PlannedCopyInstance& copy = s.copies[ci]; + if (!copy.contiguous) { + // Fallback to segment expansion for non-2D/non-contiguous slices. + uint64_t copy_bytes = 0; + uint64_t src_key = 0; + std::vector segs; + TC_ASSIGN_OR_RETURN( + segs, build_segments_for_slices(*copy.meta, copy.slices, copy.dst_offset, ©_bytes, &src_key)); + pending_direct.insert(pending_direct.end(), segs.begin(), segs.end()); + ++st.fallback_copies; + continue; + } + const std::string k = std::format("{}:{}", copy.contiguous_src_offset, copy.bytes); + by_key[k].push_back(ci); + } + + for (auto& it : by_key) { + const std::vector& copies = it.second; + if (copies.empty()) { + continue; + } + const PlannedCopyInstance& primary = s.copies[copies[0]]; + pending_direct.push_back( + SegmentCopy{ + .src_offset = primary.contiguous_src_offset, + .dst_offset = primary.dst_offset, + .bytes = static_cast(primary.bytes), + }); + ++st.direct_primary_reads; + + for (size_t i = 1; i < copies.size(); ++i) { + const PlannedCopyInstance& other = s.copies[copies[i]]; + pending_d2d.push_back( + D2dCopyOp{ + .src_dst_offset = primary.dst_offset, + .dst_dst_offset = other.dst_offset, + .bytes = static_cast(primary.bytes), + }); + ++st.direct_dedup_copies; + } + } + } + + TC_RETURN_IF_ERROR(flush_direct(pending_direct, pending_d2d)); + + if (h2d_stream != nullptr) { + const absl::Time t_wait = absl::Now(); + TC_RETURN_IF_ERROR(tensorcast::cuda::stream_synchronize(h2d_stream)); + commit_sec += seconds_since(t_wait); + } + + const auto sched_after = loader::get_gpu_scheduler_stats(cfg.device_id); + if (sched_after.waits >= sched_before.waits) { + r.gpu_sched_waits = sched_after.waits - sched_before.waits; + } + if (sched_after.wait_sec >= sched_before.wait_sec) { + r.gpu_sched_wait_sec = sched_after.wait_sec - sched_before.wait_sec; + } + + if (h2d_done[0] != nullptr) { + (void)tensorcast::cuda::event_destroy(h2d_done[0]); + } + if (h2d_done[1] != nullptr) { + (void)tensorcast::cuda::event_destroy(h2d_done[1]); + } + if (h2d_stream != nullptr) { + (void)tensorcast::cuda::stream_destroy(h2d_stream); + } + + r.t.commit = commit_sec; + r.t.pack = pack_sec; + r.t.total_ready = seconds_since(t_total); + (void)t_commit; + + (void)tensorcast::cuda::stream_destroy(pack_stream); + + r.c_staged_tensors = st.staged_tensors; + r.c_staged_reads = st.staged_reads; + r.c_direct_primary_reads = st.direct_primary_reads; + r.c_direct_dedup_copies = st.direct_dedup_copies; + r.c_fallback_copies = st.fallback_copies; + + return std::make_pair(r, std::move(s)); +} + +absl::StatusOr> run_strategy_b_with_state( + const LoaderConfig& cfg, + const std::vector& shards) { + RunResult r; + r.strategy = StrategyKind::kB_LazyCommit; + StrategyBState s; + const absl::Time t_total = absl::Now(); + const auto sched_before = loader::get_gpu_scheduler_stats(cfg.device_id); + + // Meta + double open_meta = 0.0; + auto meta_or = load_metas_from_safetensors(shards, &open_meta); + if (!meta_or.ok()) { + return meta_or.status(); + } + s.metas = std::move(meta_or->first); + const uint64_t total_payload_bytes = meta_or->second; + r.t.open_meta = open_meta; + + if (!cfg.use_pinned_host_buffer) { + return absl::FailedPreconditionError("Strategy B requires --use_pinned_host_buffer=true"); + } + + const absl::Time t_get = absl::Now(); + if (!cfg.load_plan_json_path.empty()) { + RankLoadPlan plan; + TC_ASSIGN_OR_RETURN(plan, build_rank_load_plan(cfg, s.metas)); + r.selected_tensors = plan.unique_tensors; + r.selected_copies = plan.copies; + r.output_bytes = plan.output_bytes; + s.dst_bytes = plan.output_bytes; + s.segments = std::move(plan.segments); + } else { + auto selected_or = select_tensors_all(s.metas); + if (!selected_or.ok()) { + return selected_or.status(); + } + s.selected = *selected_or; + r.selected_tensors = s.selected.size(); + r.selected_copies = s.selected.size(); + + // Strategy B uses disk-only direct shard reads. To maximize disk locality, order + // requests by on-disk source offset (i.e., shard file order) instead of tensor + // name order. + std::sort(s.selected.begin(), s.selected.end(), [](const TensorMeta* a, const TensorMeta* b) { + if (a->offset != b->offset) { + return a->offset < b->offset; + } + return a->name < b->name; + }); + + uint64_t dst_cursor = 0; + s.segments.clear(); + s.tensor_plans.clear(); + s.tensor_plans.reserve(s.selected.size()); + for (const TensorMeta* tm : s.selected) { + auto segs_or = plan_tensor_segments(*tm, cfg, dst_cursor); + if (!segs_or.ok()) { + return segs_or.status(); + } + const auto& segs = *segs_or; + uint64_t tensor_bytes = 0; + for (const auto& seg : segs) { + s.segments.push_back(seg); + tensor_bytes += seg.bytes; + dst_cursor += seg.bytes; + } + TensorSlicePlan tp; + tp.name = tm->name; + tp.dst_offset = dst_cursor - tensor_bytes; + tp.bytes = tensor_bytes; + if (tm->shape.size() >= 2) { + tp.rows = tm->shape[0]; + tp.cols = tm->shape[1]; + } + tp.elem_size = tm->elem_size; + s.tensor_plans.push_back(std::move(tp)); + } + s.dst_bytes = dst_cursor; + r.output_bytes = dst_cursor; + } + r.t.get_calls_total = seconds_since(t_get); + { + auto [merged_segments, planner_stats] = merge_adjacent_segments_by_src(std::move(s.segments)); + s.segments = std::move(merged_segments); + r.planned_segments = planner_stats.segments_post_merge; + r.planner_segments_pre_merge = planner_stats.segments_pre_merge; + r.planner_segments_merged = (planner_stats.segments_pre_merge >= planner_stats.segments_post_merge) + ? (planner_stats.segments_pre_merge - planner_stats.segments_post_merge) + : 0; + r.planner_src_runs = planner_stats.src_runs_post_merge; + r.planner_src_run_avg_bytes = planner_stats.src_run_avg_bytes; + r.planner_src_run_max_bytes = planner_stats.src_run_max_bytes; + } + + // commit: reserve target VA, then pump disk ranges into final mapped addresses. + const absl::Time t_commit = absl::Now(); + TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(cfg.device_id)); + TC_RETURN_IF_ERROR(s.vmm.reserve_and_map(s.dst_bytes, cfg.device_id)); + r.res.vmm_reserved_bytes = s.vmm.reserved_bytes(); + r.res.vmm_mapped_bytes = s.vmm.mapped_bytes(); + r.res.vmm_granularity_bytes = s.vmm.granularity_bytes(); + + MultiSafetensorsSource backing_src(shards); + + // Convert segment list into remapped segments and pump ranges in destination space. + std::vector remap; + remap.reserve(s.segments.size()); + for (const auto& seg : s.segments) { + if (seg.bytes == 0) { + continue; + } + remap.push_back( + RemappedSource::Segment{ + .dst_offset = seg.dst_offset, + .src_offset = seg.src_offset, + .end_offset = seg.dst_offset + seg.bytes, + }); + } + RemappedSource src(gsl::not_null(&backing_src), std::move(remap)); + + loader::GpuMemorySink sink( + loader::GpuMemorySink::Options{ + .gpu_base_ptr = gsl::not_null{s.vmm.base_ptr()}, + .total_size = s.dst_bytes, + .chunk_size = 128 * 1024 * 1024, + .device_id = cfg.device_id, + .allocation = nullptr, + .gpu_sched_enabled = cfg.gpu_sched_enabled, + .gpu_sched_limit_bytes = cfg.gpu_sched_limit_bytes, + .gpu_sched_limit_copies = cfg.gpu_sched_limit_copies, + }); + + const int io_threads = std::max(1, cfg.io_threads); + BounceBufferPlan bb; + TC_ASSIGN_OR_RETURN(bb, plan_bounce_buffer(cfg)); + std::unique_ptr pool_ptr; + TC_ASSIGN_OR_RETURN(pool_ptr, make_bounce_buffer_pool(cfg, bb, &r.res)); + + uint64_t total_requested_bytes = 0; + uint64_t planned_ranges = 0; + auto ranges_or = build_pump_ranges_for_copy(s.segments, io_threads, &total_requested_bytes, &planned_ranges); + if (!ranges_or.ok()) { + return ranges_or.status(); + } + const auto& ranges = *ranges_or; + r.planned_ranges = planned_ranges; + r.bytes.disk_read_bytes = total_requested_bytes; + r.bytes.h2d_bytes = total_requested_bytes; + r.output_bytes = total_requested_bytes; + (void)total_payload_bytes; + + TC_RETURN_IF_ERROR( + loader::pump_ranges(src, sink, *pool_ptr, ranges, io_threads, pump_benchmark_runtime().blocking_executor())); + TC_RETURN_IF_ERROR(sink.close()); + r.t.commit = seconds_since(t_commit); + const auto sched_after = loader::get_gpu_scheduler_stats(cfg.device_id); + if (sched_after.waits >= sched_before.waits) { + r.gpu_sched_waits = sched_after.waits - sched_before.waits; + } + if (sched_after.wait_sec >= sched_before.wait_sec) { + r.gpu_sched_wait_sec = sched_after.wait_sec - sched_before.wait_sec; + } + + r.t.total_ready = seconds_since(t_total); + return std::make_pair(r, std::move(s)); +} + +absl::Status check_correctness_samples(const LoaderConfig& cfg, const StrategyAState& a, const StrategyBState& b) { + if (cfg.check_correctness_samples == 0) { + return absl::OkStatus(); + } + if (b.vmm.base_ptr() == nullptr) { + return absl::FailedPreconditionError("Strategy B VMM buffer not available for correctness check"); + } + if (a.output == nullptr && !a.gpu_payload) { + return absl::FailedPreconditionError("Strategy A buffers not available for correctness check"); + } + + std::vector names; + names.reserve(std::min(cfg.check_correctness_samples, b.tensor_plans.size())); + for (const auto& tp : b.tensor_plans) { + names.push_back(tp.name); + if (names.size() >= cfg.check_correctness_samples) { + break; + } + } + std::sort(names.begin(), names.end()); + + const auto* a_payload_base = a.gpu_payload ? static_cast(a.gpu_payload->get()) : nullptr; const auto* a_out_base = a.output ? static_cast(a.output->get()) : nullptr; const auto* b_base = static_cast(b.vmm.base_ptr()); @@ -4611,6 +5284,279 @@ absl::Status run_h2d_baseline(const LoaderConfig& cfg) { return absl::OkStatus(); } +absl::Status run_h2d_2d_baseline(const LoaderConfig& cfg) { + if (cfg.h2d_bench_bytes == 0) { + return absl::InvalidArgumentError("--h2d_bench_bytes must be > 0 for mode=h2d_2d_baseline"); + } + if (!cfg.use_pinned_host_buffer) { + return absl::InvalidArgumentError("--use_pinned_host_buffer=true is required for mode=h2d_2d_baseline"); + } + if (cfg.tp_world_size <= 0) { + return absl::InvalidArgumentError("--tp_world_size must be > 0 for mode=h2d_2d_baseline"); + } + if (cfg.h2d_2d_width_bytes == 0) { + return absl::InvalidArgumentError("--h2d_2d_width_bytes must be > 0 for mode=h2d_2d_baseline"); + } + if (cfg.h2d_2d_height == 0) { + return absl::InvalidArgumentError("--h2d_2d_height must be > 0 for mode=h2d_2d_baseline"); + } + if (cfg.h2d_2d_src_pitch_bytes < cfg.h2d_2d_width_bytes) { + return absl::InvalidArgumentError("--h2d_2d_src_pitch_bytes must be >= --h2d_2d_width_bytes"); + } + if (cfg.h2d_2d_dst_pitch_bytes < cfg.h2d_2d_width_bytes) { + return absl::InvalidArgumentError("--h2d_2d_dst_pitch_bytes must be >= --h2d_2d_width_bytes"); + } + + const uint64_t width_bytes = cfg.h2d_2d_width_bytes; + const uint64_t height = cfg.h2d_2d_height; + const uint64_t src_pitch = cfg.h2d_2d_src_pitch_bytes; + const uint64_t dst_pitch = cfg.h2d_2d_dst_pitch_bytes; + + if (height > (std::numeric_limits::max() / width_bytes)) { + return absl::InvalidArgumentError("h2d_2d_baseline: width_bytes * height overflow"); + } + if (height > (std::numeric_limits::max() / src_pitch)) { + return absl::InvalidArgumentError("h2d_2d_baseline: src_pitch * height overflow"); + } + if (height > (std::numeric_limits::max() / dst_pitch)) { + return absl::InvalidArgumentError("h2d_2d_baseline: dst_pitch * height overflow"); + } + const uint64_t bytes_per_call = width_bytes * height; + const uint64_t src_bytes = src_pitch * height; + const uint64_t dst_bytes = dst_pitch * height; + + if (bytes_per_call == 0 || src_bytes == 0 || dst_bytes == 0) { + return absl::InvalidArgumentError("h2d_2d_baseline: computed bytes are zero"); + } + if (src_bytes > static_cast(std::numeric_limits::max()) || + dst_bytes > static_cast(std::numeric_limits::max()) || + src_pitch > static_cast(std::numeric_limits::max()) || + dst_pitch > static_cast(std::numeric_limits::max()) || + width_bytes > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError("h2d_2d_baseline: size_t overflow"); + } + + std::vector device_ids; + TC_ASSIGN_OR_RETURN(device_ids, build_tp_device_ids(cfg)); + + struct HostPool { + std::shared_ptr pool; + const uint8_t* base = nullptr; + size_t bytes = 0; + int numa_node = -1; + }; + + std::vector host_pools; + host_pools.reserve(cfg.h2d_per_gpu_pinned_pool ? static_cast(cfg.tp_world_size) : 1u); + + const uint64_t src_bytes_per_rank = src_bytes; + uint64_t src_pool_bytes_u64 = src_bytes_per_rank; + if (!cfg.h2d_per_gpu_pinned_pool) { + const uint64_t tp_u64 = static_cast(cfg.tp_world_size); + if (tp_u64 > 0 && src_bytes_per_rank > (std::numeric_limits::max() / tp_u64)) { + return absl::InvalidArgumentError("h2d_2d_baseline: src_bytes * tp_world_size overflow"); + } + src_pool_bytes_u64 = src_bytes_per_rank * tp_u64; + } + if (src_pool_bytes_u64 > static_cast(std::numeric_limits::max())) { + return absl::InvalidArgumentError("h2d_2d_baseline: src_pool_bytes overflows size_t"); + } + const size_t src_pool_bytes = static_cast(src_pool_bytes_u64); + + auto build_one_pool = [&](int device_id) -> absl::StatusOr { + common::memory::PinnedBufferPool::Options pool_opts; + pool_opts.name = "h2d_2d_baseline"; + pool_opts.prefault = cfg.pinned_numa_prefault; + TC_ASSIGN_OR_RETURN(pool_opts.numa_node, resolve_pinned_numa_node_for_device(cfg, device_id)); + + auto pool = std::make_shared( + src_pool_bytes, static_cast(src_bytes_per_rank), std::move(pool_opts)); + const auto slabs = pool->list_slabs(); + if (slabs.empty()) { + return absl::InternalError("h2d_2d_baseline: no pinned slabs"); + } + for (const auto& slab : slabs) { + std::memset(slab.base.get(), 0, slab.bytes); + } + const uint8_t* host_base = reinterpret_cast(slabs[0].base.get()); + const size_t host_bytes = slabs[0].bytes; + if (host_bytes < src_pool_bytes) { + return absl::InternalError("h2d_2d_baseline: pinned host slab smaller than src_pool_bytes"); + } + return HostPool{ + .pool = std::move(pool), + .base = host_base, + .bytes = host_bytes, + .numa_node = pool_opts.numa_node, + }; + }; + + if (cfg.pinned_numa_node == -2 && !cfg.h2d_per_gpu_pinned_pool && cfg.tp_world_size > 1) { + return absl::InvalidArgumentError( + "--pinned_numa_node=-2 (auto) requires --h2d_per_gpu_pinned_pool=true when tp_world_size>1"); + } + if (cfg.h2d_per_gpu_pinned_pool) { + for (int rank = 0; rank < cfg.tp_world_size; ++rank) { + HostPool host; + TC_ASSIGN_OR_RETURN(host, build_one_pool(device_ids[static_cast(rank)])); + host_pools.push_back(std::move(host)); + } + } else { + HostPool host; + TC_ASSIGN_OR_RETURN(host, build_one_pool(/*device_id=*/cfg.device_id)); + host_pools.push_back(std::move(host)); + } + + const uint64_t target_bytes_per_gpu = cfg.h2d_bench_bytes; + const uint64_t iters = std::max(1, (target_bytes_per_gpu + bytes_per_call - 1) / bytes_per_call); + if (iters > (std::numeric_limits::max() / bytes_per_call)) { + return absl::InvalidArgumentError("h2d_2d_baseline: bytes_per_call * iters overflow"); + } + const uint64_t bytes_per_gpu = bytes_per_call * iters; + + struct PerGpuCtx { + int device_id = -1; + std::unique_ptr dst; + cudaStream_t stream = nullptr; + cudaEvent_t done_event = nullptr; + double sec = 0.0; // completion time since t0 + }; + + std::vector gpus(static_cast(cfg.tp_world_size)); + for (int rank = 0; rank < cfg.tp_world_size; ++rank) { + auto& ctx = gpus[static_cast(rank)]; + ctx.device_id = device_ids[static_cast(rank)]; + TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(ctx.device_id)); + ctx.dst = std::make_unique(); + TC_RETURN_IF_ERROR(ctx.dst->allocate(static_cast(dst_bytes), ctx.device_id)); + TC_RETURN_IF_ERROR(tensorcast::cuda::stream_create_with_flags(&ctx.stream, cudaStreamNonBlocking)); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_create_with_flags(&ctx.done_event, cudaEventDisableTiming)); + } + + const absl::Time t0 = absl::Now(); + for (uint64_t iter = 0; iter < iters; ++iter) { + for (int rank = 0; rank < cfg.tp_world_size; ++rank) { + auto& ctx = gpus[static_cast(rank)]; + TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(ctx.device_id)); + const HostPool& host = host_pools[cfg.h2d_per_gpu_pinned_pool ? static_cast(rank) : 0u]; + const uint64_t rank_off_u64 = + cfg.h2d_per_gpu_pinned_pool ? 0ull : (static_cast(rank) * src_bytes_per_rank); + if (!cfg.h2d_per_gpu_pinned_pool) { + const uint64_t host_bytes_u64 = static_cast(host.bytes); + if (rank_off_u64 > host_bytes_u64 || (host_bytes_u64 - rank_off_u64) < src_bytes_per_rank) { + return absl::InternalError("h2d_2d_baseline: computed src offset exceeds host slab size"); + } + } + const void* src = host.base + static_cast(rank_off_u64); + void* dst_ptr = ctx.dst->get(); + SC_RETURN_IF_CUDA_ERROR(cudaMemcpy2DAsync( + dst_ptr, + static_cast(dst_pitch), + src, + static_cast(src_pitch), + static_cast(width_bytes), + static_cast(height), + cudaMemcpyHostToDevice, + ctx.stream)); + } + } + for (int rank = 0; rank < cfg.tp_world_size; ++rank) { + auto& ctx = gpus[static_cast(rank)]; + TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(ctx.device_id)); + TC_RETURN_IF_ERROR(tensorcast::cuda::event_record(ctx.done_event, ctx.stream)); + } + + std::vector done(static_cast(cfg.tp_world_size), false); + int remaining = cfg.tp_world_size; + while (remaining > 0) { + for (int rank = 0; rank < cfg.tp_world_size; ++rank) { + if (done[static_cast(rank)]) { + continue; + } + auto& ctx = gpus[static_cast(rank)]; + TC_RETURN_IF_ERROR(tensorcast::cuda::set_device(ctx.device_id)); + bool ready = false; + TC_RETURN_IF_ERROR(tensorcast::cuda::event_query(ctx.done_event, &ready)); + if (!ready) { + continue; + } + done[static_cast(rank)] = true; + remaining -= 1; + ctx.sec = seconds_since(t0); + } + if (remaining > 0) { + absl::SleepFor(absl::Milliseconds(1)); + } + } + + double makespan = 0.0; + for (const auto& ctx : gpus) { + makespan = std::max(makespan, ctx.sec); + } + + for (auto& ctx : gpus) { + if (ctx.device_id >= 0) { + (void)tensorcast::cuda::set_device(ctx.device_id); + } + if (ctx.stream != nullptr) { + (void)tensorcast::cuda::stream_synchronize(ctx.stream); + } + if (ctx.done_event != nullptr) { + (void)tensorcast::cuda::event_destroy(ctx.done_event); + ctx.done_event = nullptr; + } + if (ctx.stream != nullptr) { + (void)tensorcast::cuda::stream_destroy(ctx.stream); + ctx.stream = nullptr; + } + ctx.dst.reset(); + } + + const double per_gpu_gib = static_cast(bytes_per_gpu) / (1024.0 * 1024.0 * 1024.0); + const double agg_gib = per_gpu_gib * static_cast(cfg.tp_world_size); + const double agg_gib_s = agg_gib / std::max(1e-9, makespan); + std::string per_gpu_str; + for (size_t i = 0; i < gpus.size(); ++i) { + if (i != 0) { + absl::StrAppend(&per_gpu_str, " "); + } + const auto& ctx = gpus[i]; + const double gib_s = per_gpu_gib / std::max(1e-9, ctx.sec); + absl::StrAppend(&per_gpu_str, std::format("gpu{}={:.3f}GiB/s", ctx.device_id, gib_s)); + } + std::string host_pool_str; + for (size_t i = 0; i < host_pools.size(); ++i) { + if (i != 0) { + absl::StrAppend(&host_pool_str, " "); + } + absl::StrAppend(&host_pool_str, std::format("pool{}=numa{}", i, host_pools[i].numa_node)); + } + + LOG(INFO) << std::format( + "h2d_2d_baseline tp_world_size={} tp_devices={} target_bytes_per_gpu={} bytes_per_gpu={} iters={} bytes_per_call={} width_bytes={} height={} src_pitch={} dst_pitch={} makespan_sec={:.6f} agg_GiB/s={:.3f} per_gpu=[{}] pinned=true pinned_numa_node={} pinned_numa_prefault={} h2d_per_gpu_pinned_pool={} host_pools=[{}] src_bytes={} dst_bytes={}", + cfg.tp_world_size, + join_device_ids(device_ids), + static_cast(target_bytes_per_gpu), + static_cast(bytes_per_gpu), + static_cast(iters), + static_cast(bytes_per_call), + static_cast(width_bytes), + static_cast(height), + static_cast(src_pitch), + static_cast(dst_pitch), + makespan, + agg_gib_s, + per_gpu_str, + cfg.pinned_numa_node, + cfg.pinned_numa_prefault, + cfg.h2d_per_gpu_pinned_pool, + host_pool_str, + static_cast(src_bytes), + static_cast(dst_bytes)); + + return absl::OkStatus(); +} + class OdirectMultiFileSource final : public SeekableSource { public: explicit OdirectMultiFileSource(std::vector paths) : paths_(std::move(paths)) {} @@ -5344,6 +6290,9 @@ absl::Status log_run_result(const LoaderConfig& cfg, const RunResult& r) { case StrategyKind::kC_BatchedOptimal: strategy = "C_batched_optimal"; break; + case StrategyKind::kC_HostPack: + strategy = "C_host_pack"; + break; } BounceBufferPlan bb; TC_ASSIGN_OR_RETURN(bb, plan_bounce_buffer(cfg)); @@ -5364,6 +6313,14 @@ absl::Status log_run_result(const LoaderConfig& cfg, const RunResult& r) { static_cast(r.c_direct_primary_reads), static_cast(r.c_direct_dedup_copies), static_cast(r.c_fallback_copies)); + } else if (r.strategy == StrategyKind::kC_HostPack) { + planner_stats = std::format( + " staged_tensors={} staged_reads={} direct_primary_reads={} direct_dedup_copies={} fallback_copies={}", + static_cast(r.c_staged_tensors), + static_cast(r.c_staged_reads), + static_cast(r.c_direct_primary_reads), + static_cast(r.c_direct_dedup_copies), + static_cast(r.c_fallback_copies)); } LOG(INFO) << std::format( "result strategy={} tp=(rank={}/{} mode=row_only) selection={} tensors copies={} segments={} ranges={}{} " @@ -5476,13 +6433,13 @@ ABSL_FLAG( std::string, mode, "loader", - "One of: loader, disk_baseline, disk_fragmentation, gpu_peer_baseline, safetensors_disk_baseline, safetensors_host_baseline, safetensors_hot_disk_baseline, safetensors_hot_host_baseline, safetensors_dram_mirror_host_baseline, materialize_d, materialized_disk_baseline, h2d_baseline, safetensors_o_direct_host_baseline, safetensors_o_direct_disk_baseline, nccl_baseline, nccl_launch_tax"); + "One of: loader, disk_baseline, disk_fragmentation, gpu_peer_baseline, safetensors_disk_baseline, safetensors_host_baseline, safetensors_hot_disk_baseline, safetensors_hot_host_baseline, safetensors_dram_mirror_host_baseline, materialize_d, materialized_disk_baseline, h2d_baseline, h2d_2d_baseline, safetensors_o_direct_host_baseline, safetensors_o_direct_disk_baseline, nccl_baseline, nccl_launch_tax"); ABSL_FLAG( std::string, strategy, "a", - "Strategy for loader mode: a|b|c. Use --run_both_strategies to run both (A+B only)."); + "Strategy for loader mode: a|b|c|host_pack (aliases: hp,c_host_pack). Use --run_both_strategies to run both (A+B only)."); ABSL_FLAG(bool, run_both_strategies, false, "If true, run A then B in one invocation (enables A vs B checks)."); ABSL_FLAG(std::string, safetensors_dir, "", "Directory containing one or more .safetensors shards."); @@ -5614,12 +6571,36 @@ ABSL_FLAG( "Stride between segments for disk_fragmentation."); ABSL_FLAG(uint64_t, gpu_peer_bytes, 1024ull * 1024ull * 1024ull, "Bytes to copy for gpu_peer_baseline."); -ABSL_FLAG(uint64_t, h2d_bench_bytes, 8ull * 1024ull * 1024ull * 1024ull, "Bytes to copy for h2d_baseline."); +ABSL_FLAG( + uint64_t, + h2d_bench_bytes, + 8ull * 1024ull * 1024ull * 1024ull, + "Bytes to copy per GPU for h2d_baseline and h2d_2d_baseline."); +ABSL_FLAG( + uint64_t, + h2d_2d_width_bytes, + 13824, + "For h2d_2d_baseline: bytes per row to copy (widthBytes). Defaults to 6912 fp16 elems (Qwen2.5-32B TP=4 MLP gate/up)."); +ABSL_FLAG( + uint64_t, + h2d_2d_height, + 5120, + "For h2d_2d_baseline: number of rows to copy (height). Defaults to hidden size 5120 (Qwen2.5-32B)."); +ABSL_FLAG( + uint64_t, + h2d_2d_src_pitch_bytes, + 55296, + "For h2d_2d_baseline: source pitch in bytes (srcPitch). Defaults to 27648 fp16 elems (Qwen2.5-32B MLP gate/up)."); +ABSL_FLAG( + uint64_t, + h2d_2d_dst_pitch_bytes, + 13824, + "For h2d_2d_baseline: destination pitch in bytes (dstPitch). Default equals h2d_2d_width_bytes (packed output)."); ABSL_FLAG( bool, h2d_per_gpu_pinned_pool, false, - "For h2d_baseline: allocate a separate pinned host pool per GPU (recommended with pinned_numa_node=-2)."); + "For h2d_baseline/h2d_2d_baseline: allocate a separate pinned host pool per GPU (recommended with pinned_numa_node=-2)."); int main(int argc, char** argv) { absl::ParseCommandLine(argc, argv); @@ -5695,6 +6676,10 @@ int main(int argc, char** argv) { cfg.gpu_peer_bytes = absl::GetFlag(FLAGS_gpu_peer_bytes); cfg.h2d_bench_bytes = absl::GetFlag(FLAGS_h2d_bench_bytes); cfg.h2d_per_gpu_pinned_pool = absl::GetFlag(FLAGS_h2d_per_gpu_pinned_pool); + cfg.h2d_2d_width_bytes = absl::GetFlag(FLAGS_h2d_2d_width_bytes); + cfg.h2d_2d_height = absl::GetFlag(FLAGS_h2d_2d_height); + cfg.h2d_2d_src_pitch_bytes = absl::GetFlag(FLAGS_h2d_2d_src_pitch_bytes); + cfg.h2d_2d_dst_pitch_bytes = absl::GetFlag(FLAGS_h2d_2d_dst_pitch_bytes); cfg.safetensors_dir = absl::GetFlag(FLAGS_safetensors_dir); cfg.load_plan_json_path = absl::GetFlag(FLAGS_load_plan_json_path); @@ -5781,6 +6766,14 @@ int main(int argc, char** argv) { } return 0; } + if (cfg.mode == BenchMode::kH2d2dBaseline) { + auto st = tensorcast::store::loader::run_h2d_2d_baseline(cfg); + if (!st.ok()) { + LOG(ERROR) << st; + return 1; + } + return 0; + } if (cfg.mode == BenchMode::kMaterializedDiskBaseline) { auto st = tensorcast::store::loader::run_materialized_disk_baseline(cfg); if (!st.ok()) { @@ -5996,6 +6989,22 @@ int main(int argc, char** argv) { return 0; } + if (cfg.strategy == tensorcast::store::loader::StrategyKind::kC_HostPack) { + auto c_or = tensorcast::store::loader::run_strategy_c_host_pack_with_state(cfg, shards); + if (!c_or.ok()) { + LOG(ERROR) << c_or.status(); + return 1; + } + auto& [c_res, c_state] = *c_or; + auto st = tensorcast::store::loader::log_run_result(cfg, c_res); + if (!st.ok()) { + LOG(ERROR) << st; + return 1; + } + (void)c_state; + return 0; + } + auto b_or = tensorcast::store::loader::run_strategy_b_with_state(cfg, shards); if (!b_or.ok()) { LOG(ERROR) << b_or.status(); diff --git a/docs/benchmarks/20260118-qwen2.5-32b-safetensors-loading-strategies.md b/docs/benchmarks/20260118-qwen2.5-32b-safetensors-loading-strategies.md index 68b7035a..e729ee48 100644 --- a/docs/benchmarks/20260118-qwen2.5-32b-safetensors-loading-strategies.md +++ b/docs/benchmarks/20260118-qwen2.5-32b-safetensors-loading-strategies.md @@ -1,7 +1,7 @@ # Qwen2.5-32B-Instruct safetensors 加载策略基准测试报告(A/B/C/D) 日期:2026-01-18 -代码版本:`8b1f583` +代码版本:`0e739b0`(含本报告相关的本地改动) 目标:在 `core/store/materialization/benchmarks/safetensors_load_strategy_benchmark` 中,用 vLLM 生成的 `loading-meta.json`(TP=1/2/4)评估加载策略(A/B/C),并给出一个“更贴近理论最优”的派生策略 D(预物化:materialize once, load many),分析端到端耗时、阶段瓶颈,以及与“当前硬件/软件路径上限”的差距。 --- @@ -63,6 +63,17 @@ 对比 B 的关键变化: - B 的瓶颈在“碎片化读”,C 将其变成“顺序读 + 少量 2D D2D pack”,显著提高磁盘→GPU有效带宽。 +### 2.3b Strategy C_host_pack(对照:把 pack 放在 Host) + +核心思路:沿用 C 的“按 checkpoint tensor 分组 + 顺序读 row-block”的决策,但把 pack 从 GPU copy engine 挪到 host: +- disk→host staging(连续 row-block) +- CPU 把每行 segment pack 到 pinned host buffer(只保留最终需要的列切片 bytes) +- 1D `cudaMemcpyAsync(H2D)` 一次性写入 output + +预期收益与代价: +- 收益:H2D 只传 `bytes(output)`(而不是 row-block 的 `bytes(disk)`),对 TP=4 这类读放大场景尤其明显。 +- 代价:CPU 侧 pack 变成额外的 DRAM→pinned memcpy(量级≈`bytes(output)`),在“source 很快(热 cache / DRAM / P2P)”的情况下,可能把瓶颈推到 CPU/DRAM/调度上。 + ### 2.4 Strategy D(预物化:materialize once, load many) 核心思路:对固定的 `loading-meta.json`(固定 TP 配置、固定切片/复制关系),第一次用 C 生成 output layout 后把结果落盘为每 rank 一个连续文件;后续加载直接顺序读 `bytes(output)`,避免 C 的 1.3–2.0× 读放大,也避免 B 的碎片化小读。 @@ -110,6 +121,11 @@ - 说明: - 该模式会先把 payload 从 safetensors 读入一块用户态 DRAM(不计时),再测 DRAM→pinned host 的拷贝吞吐。 - DRAM→pinned 明显高于 page cache→userspace(10.883 GiB/s),说明热读基线主要受 `pread(2)` 路径/内核 copy/系统调用与框架调度开销影响;而不是 CPU 内存带宽本身已经打满。 + - 更具体地讲,`safetensors_hot_host_baseline` 虽然“数据已在 DRAM(page cache)里”,但每个 chunk 仍然要走一次 `pread(2)`:VFS 层处理、file offset/页缓存查找、`copy_to_iter/copy_to_user` 的内核→用户拷贝(通常按页迭代)以及 syscall 边界的固定开销;这些都会把“纯 memcpy”能达到的吞吐拉下来。 + - `copy_to_user` 的“安全拷贝”路径通常也更难吃满带宽:要处理潜在 page fault、SMAP 的 uaccess 开关(`stac/clac`)、对象大小检查(`__check_object_size`)等;同时内核态拷贝通常不会走到用户态 `memcpy` 的最激进向量化/写合并实现(例如 AVX512 + non-temporal store 的组合)。 + - page cache 的底层是普通 page(常见为 4KiB):对 61GiB 这种超大拷贝,会带来更高的 per-page 迭代与 TLB 压力;而 `dram_mirror_host_baseline` 的源 buffer 是匿名 `mmap`,在部分系统配置下可能以 THP(2MiB)承载,进一步降低 TLB 开销(因此更接近“理想 userspace memcpy”)。 + - NUMA 也会显著放大波动:page cache 页的物理放置由 warmup 时的线程/CPU 决定,而 pinned slab 的放置由 `--pinned_numa_node` 决定;两者若跨 socket,会把“page cache→pinned”的热读吞吐明显拉低(这也是为什么 H2D 并发测量中仅控 pinned NUMA 往往不够,还需要配合 CPU affinity)。 + - 另外,`dram_mirror_host_baseline` 测得的 18.462 GiB/s 也不是“机器 DRAM 带宽上限”,而是“本 benchmark 的 pump/buffer pool 框架 + memcpy”在当前线程数/NUMA/实现下的可达值;它更多用来衡量 **pread 热路径相对 userspace memcpy 的额外开销**。 2) **pinned host→GPU(纯 H2D)**(不读磁盘) - `--mode=h2d_baseline`(日志:`/tmp/tensorcast_h2d_numa_tp1_default.log`) @@ -152,6 +168,27 @@ 结论:NUMA 对 “多 GPU 并发 H2D” 的影响可以非常大;在评估策略 A/B/C 的上限时,必须把 “pinned host buffer 的 NUMA 放置 + 线程/进程亲和性” 当作一级变量。 +2d) **H2D 2D(模拟 `axis=1` 列切片):`cudaMemcpy2DAsync(H2D)` 的有效带宽** + +动机:用户提出“增大 pinned memory,让 pinned memory 能放下完整 tensor,然后用 `cudaMemcpy2DAsync` 一步到位写入 device 上的最终布局,去掉 pack”。为了回答“2D H2D 本身会不会比 1D H2D 慢”,新增了一个小基准: + +- `--mode=h2d_2d_baseline`:用 `cudaMemcpy2DAsync(dst, dstPitch, src, srcPitch, widthBytes, height, cudaMemcpyHostToDevice, stream)` 测 **有效吞吐** + - `effective_bytes = widthBytes * height * iters`(注意:不包含 `srcPitch-widthBytes` 的 padding,因为 DMA 实际只搬运 widthBytes×height) + - 典型 `axis=1` 形态:`srcPitch = full_cols * elem_bytes`,`widthBytes = slice_cols * elem_bytes`,`dstPitch = widthBytes`(packed output) + +结果(TP=1,GPU0;`--h2d_bench_bytes=8GiB`): +- 典型 dense(`[rows=5120, cols=5120]`,TP=4 slice_cols=1280,fp16):`width=2560 srcPitch=10240 height=5120` → **50.310 GiB/s**(日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_dense_w2560.log`) +- 典型 QKV(`[rows=5120, cols=15360]`,TP=4 slice_cols=3840,fp16):`width=7680 srcPitch=30720 height=5120` → **51.076 GiB/s**(日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_qkv_w7680.log`) +- 典型 MLP gate/up(`[rows=5120, cols=27648]`,TP=4 slice_cols=6912,fp16):`width=13824 srcPitch=55296 height=5120` → **50.970 GiB/s**(日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_default.log`) + - 对照(很窄的 width 会掉速;`--h2d_bench_bytes=2GiB`): + - `width=256 srcPitch=10240 height=5120` → **29.703 GiB/s**(日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_w256.log`) + - `width=512 srcPitch=10240 height=5120` → **33.535 GiB/s**(日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_w512.log`) + +解读: +- 在这些“真实模型常见的 axis=1 slice 宽度(2.5–13.5KiB/row)”下,`cudaMemcpy2DAsync(H2D)` 的有效带宽与 `cudaMemcpyAsync(H2D)` 基本一致(≈50–51 GiB/s),因此 **“2D H2D”本身不是性能短板**。 +- 但你的担心在 “width 很小” 时确实成立:`cudaMemcpy2DAsync(H2D)` 内部即使只提交一次 API,也需要在 DMA 侧处理 `height` 行的地址步进与映射/描述符;当 `widthBytes` 小到不足以摊平 per-row 开销时,有效带宽会明显低于 1D 大块 H2D。 +- 但这并不等价于“端到端会大幅变快”:在本报告的 TP=4 Strategy C 中,`T(pack)` 已经只有 **0.022s/5.990s(<0.4%)**,即使彻底去掉 pack,理论收益也非常有限;要显著缩短 makespan 仍然需要优先从“减少总读量/减少碎片化/更强 overlap”入手。 + 3) **热读端到端:磁盘(buffered)→pinned host→GPU(合并链路,包含 H2D)** - `--mode=safetensors_hot_disk_baseline`(日志:`/tmp/tensorcast_safetensors_hot_disk_baseline.log`) - payload bytes:61.027 GiB @@ -252,6 +289,30 @@ - A 的 pack 已被彻底“去瓶颈化”(3.36s → 0.02s),但 A 仍然读全 payload,所以端到端仍主要受 IO 限制;不同 run 的 `T(open_copy)` 会受 page cache/系统状态影响而波动。 - C 依然是 TP=4 下整体最快的策略。 +### 5.3b TP=4(Strategy C,多 rank 并发,NUMA 对照) + +目的:验证 **NUMA** 是否会显著影响 TP=4 多 rank 并发的 `T(commit)` / makespan,并评估“是否值得进一步做集中读取 + 分发”的优化方向。 + +说明(本次对照的控制条件): +- 先用 `safetensors_hot_host_baseline` 显式预热 page cache(日志:`/tmp/tensorcast_warm_page_cache_before_tp4_c.log`)。 +- 随后启动 4 个进程(rank0..3)并发跑 Strategy C(每个 rank 用对应 `--device_id`),以 `max(T(total_ready))` 作为 makespan。 +- 因为并发测试受系统噪声影响较大,下面结论主要看趋势;更严谨的版本应重复多次取分布(P50/P95)。 + +结果(越小越好): +- **默认**(不控 NUMA):makespan `6.921s`,max `T(commit)=3.794s`(日志:`/tmp/tensorcast_tp4_c_concurrent_default_rank{0,1,2,3}.log`) +- **仅 pinned slab NUMA auto**(`--pinned_numa_node=-2 --pinned_numa_prefault=true`):makespan `6.979s`,max `T(commit)=3.896s`(日志:`/tmp/tensorcast_tp4_c_concurrent_numa_rank{0,1,2,3}.log`) + - 结论:**仅做 pinned host NUMA 放置,并未改善 makespan**(甚至略差);说明对 Strategy C 这条链路,仅“内存放置”不足以稳定收益。 +- **强制 pinned slab 固定在 NUMA0**:makespan `7.870s`(日志:`/tmp/tensorcast_tp4_c_concurrent_bind0_rank{0,1,2,3}.log`) +- **强制 pinned slab 固定在 NUMA1**:makespan `7.085s`(日志:`/tmp/tensorcast_tp4_c_concurrent_bind1_rank{0,1,2,3}.log`) + - 结论:把所有 rank 的 pinned slab 固定到单一 NUMA node 会明显变差(尤其 NUMA0),符合预期:至少有一部分 GPU/CPU 会被迫走跨 socket 路径。 +- **CPU affinity(taskset)+ pinned slab NUMA auto**:makespan `6.853s`,max `T(commit)=3.611s`(日志:`/tmp/tensorcast_tp4_c_concurrent_taskset_numa_rank{0,1,2,3}.log`) + - 对照:仅 taskset(不控 pinned NUMA)makespan `7.018s`(日志:`/tmp/tensorcast_tp4_c_concurrent_taskset_default_rank{0,1,2,3}.log`) + - 结论:**NUMA 的收益需要“线程/CPU 亲和性 + pinned 内存放置”同时成立**,否则很容易被 page cache 放置、线程调度与跨 socket copy 抵消。 + +初步判断(是否值得做“集中读取 + 分发”): +- 在“热 cache + 本地并发”条件下,仅靠 NUMA 放置/绑核可以带来 **~1–2%(makespan)到 ~6%(commit)** 的改进量级; +- 但 Strategy C 的核心瓶颈(TP=4 下约 `~2×` 读放大)并不会因 NUMA 调整消失;若目标是更大幅度降低 makespan,仍需要从 **减少总读量/避免多 rank 重复读** 的方向(例如集中读取 + NVLink 分发,或 Strategy D 预物化)入手。 + ### 5.4 TP=4(Strategy D:预物化 + 冷读加载) 说明:D 分成“首次生成”和“后续加载”两阶段;其中后续加载可无限复用。 @@ -275,6 +336,29 @@ - 每 rank `~1.49 GiB/s`(`/tmp/tensorcast_d_load_tp4_concurrent_rank{0,1,2,3}.log`) - 聚合 `~5.96 GiB/s`,已几乎贴近冷读端到端上限 `5.981 GiB/s` +### 5.5 补充:Strategy C_host_pack(对照实验:Host pack vs GPU staging pack) + +说明: +- 目的:验证“把 pack 从 GPU 挪到 host,以减少 `bytes(h2d)`”是否能缩短 makespan。 +- 控制变量:同一组参数(`io_threads=4`,`bbuf_size_kb=262144`,`buffer_chunks=8`,`strategy_c_staging_bytes=1GiB`),并先用 `safetensors_hot_host_baseline` 显式预热 page cache。 +- 日志目录(新增,对照实验保存到数据集目录下):`/mnt/host0/Qwen2.5-32B-Instruct/tensorcast_bench_logs/20260119_host_pack_compare/` + +结果(越小越好;makespan 取各 rank 的 `T(total_ready)` 最大值): + +- TP=2(2 ranks 并发): + - C(GPU staging + D2D pack):makespan **6.822s**(`tp2_c_rank{0,1}.log`) + - `bytes(h2d)=40.514GiB`,`T(pack)=0.016s` + - C_host_pack(CPU pack + 1D H2D):makespan **9.883s**(`tp2_host_pack_rank{0,1}.log`) + - `bytes(h2d)=30.514GiB`(降低 25%),但 `T(pack)=1.518–1.755s`(CPU pack 成为新瓶颈) + +- TP=4(4 ranks 并发): + - C(GPU staging + D2D pack):makespan **8.160s**(`tp4_c_rank{0,1,2,3}.log`) + - `bytes(h2d)=30.258GiB`,`T(pack)=0.009–0.011s` + - C_host_pack(CPU pack + 1D H2D):makespan **8.897s**(`tp4_host_pack_rank{0,1,2,3}.log`) + - `bytes(h2d)=15.258GiB`(约 2× 降低),但 `T(pack)=0.900–0.937s` + +初步结论:在 Qwen2.5-32B 这组 loading-meta 下,**GPU pack 足够快**(C 的 `T(pack)` < 0.02s),因此 “减少 H2D 字节数” 带来的 `T(commit)` 改善不足以抵消 CPU pack 的额外开销;C_host_pack 目前整体更慢。 + --- ## 6. 瓶颈与上限差距分析(按策略/阶段) @@ -430,6 +514,26 @@ TP>1 时,C 的主瓶颈回到 **“顺序/大块 IO 链路带宽”**(这是 这也解释了 D “理论最优”的意义:它把问题从“访问形态导致的 IOPS/碎片化/读放大”收敛为“顺序读带宽”问题,并且在本机上已经打到冷读带宽的天花板。 +### 6.5 Strategy C_host_pack:H2D 减少了,但 CPU pack 取代 GPU pack 成为瓶颈 + +以本次对照实验(`/mnt/host0/Qwen2.5-32B-Instruct/tensorcast_bench_logs/20260119_host_pack_compare/`)为例: + +- TP=4: + - C:`bytes(h2d)=30.258GiB`,但 `T(pack)≈0.01s`(GPU `cudaMemcpy2DAsync(D2D)` pack 基本“免费”) + - C_host_pack:`bytes(h2d)=15.258GiB`(理论上 H2D 时间可减半),但引入 `T(pack)≈0.90–0.94s` + - 观察到 `T(commit)` 确实下降(约 `4.78–4.81s` → `4.20–4.45s`),但节省量(`~0.4–0.6s`)小于新增 CPU pack(`~0.9s`),因此 makespan 反而上升。 + +- TP=2: + - C_host_pack 只减少了 `~10GiB` 的 H2D(40.5→30.5GiB),但 CPU pack 需要搬 `~30.5GiB`,`T(pack)=1.5–1.8s`,因此退化更明显。 + +瓶颈判断(为什么会这样): +- 在当前实现里,CPU pack 本质是“DRAM→pinned 的 memcpy(按行拷贝)”,吞吐落在 `~16–20 GiB/s`,接近 `safetensors_dram_mirror_host_baseline` 测到的 userspace memcpy 上限量级;因此它很容易成为新增的主瓶颈。 +- 反过来,GPU pack(C 的 staged 路径)非常快:因为它在 GPU 内完成(HBM + copy engine),且通过 `cudaMemcpy2DAsync` 合并了大量小拷贝调用,成本几乎可以忽略。 + +工程含义: +- **默认仍推荐 Strategy C(GPU staging + D2D pack)**:它把最重的“scatter/pack”留在 GPU 内完成,通常更稳。 +- C_host_pack 更像一个“特殊条件下的 trade-off/兜底”:当 staging 显存不足、或 H2D/NUMA 异常导致 PCIe 变成硬瓶颈时,才可能出现净收益。 + --- ## 7. 结论与建议 @@ -440,7 +544,9 @@ TP>1 时,C 的主瓶颈回到 **“顺序/大块 IO 链路带宽”**(这是 - **B**:严重偏离上限(~25×),根因是 axis=1 引发的碎片化小读。 - **A**:读全 payload 导致“理论下限”就偏高;但 A 的 pack 已通过 `cudaMemcpy2DAsync` 合并达到近似最优(TP=4 pack 3.36s → 0.02s)。 - **C**:综合最优,且关键路径已回到带宽上限附近;仍有约 1.3–1.6× 的进一步优化空间(主要是 overlap 与 CPU/同步开销)。 + - **C_host_pack(对照)**:能显著降低 `bytes(h2d)`,但在本模型的 TP=2/4 实测中,CPU pack 变成新瓶颈,整体更慢;更适合作为“显存不足/H2D 异常时的 trade-off/兜底”。 - **D**(预物化):如果你会重复加载同一个 `loading-meta.json`(相同 TP/同一份计划),D 的“后续加载”可以把问题直接收敛为“顺序读带宽”,并且在本机上已经基本打到冷读端到端上限;代价是首次生成需要额外的 D2H+落盘成本与磁盘空间。 + - **集中读取 + NVLink 分发/去重(在线版的“减少总读量”)**:在“单 NVMe + 多 rank 并发冷读”场景,理论最大收益基本等于 Strategy C 的总读放大(TP=2 约 `1.33×`,TP=4 约 `1.98×`),上限接近 D 的后续加载(`~10.2s`);但在热 cache/本机并发条件下,单一 reader 可能丢掉“多进程并行 pread”带来的 host 侧吞吐,需要引入按 socket 分区的 reader(NUMA-aware)与更强 overlap 才能稳定获益。 ### 7.2 如果目标是进一步逼近上限(优先级建议) 1) **面向 C:做 IO 与 pack 的更强重叠**(双 staging buffer,减少每个 chunk 的同步点) @@ -492,6 +598,107 @@ bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benc ``` 日志:`/tmp/tensorcast_h2d_numa_tp4_default.log`、`/tmp/tensorcast_h2d_numa_tp4_bind0.log`、`/tmp/tensorcast_h2d_numa_tp4_bind1.log` +H2D 2D 基线(模拟 `axis=1` 列切片;`cudaMemcpy2DAsync(H2D)`): +```bash +# 典型 MLP gate/up([rows=5120, cols=27648], fp16; TP=4 slice_cols=6912) +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=h2d_2d_baseline \ + --tp_world_size=1 --tp_devices=0 \ + --h2d_bench_bytes=$((8*1024*1024*1024)) \ + --h2d_2d_width_bytes=13824 --h2d_2d_height=5120 \ + --h2d_2d_src_pitch_bytes=55296 --h2d_2d_dst_pitch_bytes=13824 + +# 典型 QKV([rows=5120, cols=15360], fp16; TP=4 slice_cols=3840) +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=h2d_2d_baseline \ + --tp_world_size=1 --tp_devices=0 \ + --h2d_bench_bytes=$((8*1024*1024*1024)) \ + --h2d_2d_width_bytes=7680 --h2d_2d_height=5120 \ + --h2d_2d_src_pitch_bytes=30720 --h2d_2d_dst_pitch_bytes=7680 + +# 典型 dense([rows=5120, cols=5120], fp16; TP=4 slice_cols=1280) +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=h2d_2d_baseline \ + --tp_world_size=1 --tp_devices=0 \ + --h2d_bench_bytes=$((8*1024*1024*1024)) \ + --h2d_2d_width_bytes=2560 --h2d_2d_height=5120 \ + --h2d_2d_src_pitch_bytes=10240 --h2d_2d_dst_pitch_bytes=2560 +``` +日志:`/tmp/tensorcast_h2d_2d_axis1_tp1_default.log`、`/tmp/tensorcast_h2d_2d_axis1_tp1_qkv_w7680.log`、`/tmp/tensorcast_h2d_2d_axis1_tp1_dense_w2560.log` + +TP=4 Strategy C 多 rank 并发(NUMA 对照;建议先热读预热 page cache): +```bash +# 先预热 page cache(确保后续 loader 处于“热读”条件) +./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=safetensors_hot_host_baseline \\ + --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true + +# 默认:不控 NUMA +for r in 0 1 2 3; do + ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=c \\ + --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=$r --device_id=$r \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-1 --pinned_numa_prefault=false \\ + 2>&1 | tee /tmp/tensorcast_tp4_c_concurrent_default_rank${r}.log & +done +wait + +# 对照:C_host_pack(日志保存到模型目录内;本次对照实验用这个目录) +LOGDIR=/mnt/host0/Qwen2.5-32B-Instruct/tensorcast_bench_logs/20260119_host_pack_compare +for r in 0 1 2 3; do + ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=host_pack \\ + --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=$r --device_id=$r \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-2 --pinned_numa_prefault=true \\ + 2>&1 | tee ${LOGDIR}/tp4_host_pack_rank${r}.log & +done +wait + +# NUMA-aware(auto)+ CPU affinity(按 nvidia-smi topo -m:GPU0/1/2 → CPUs 0-31,GPU3 → CPUs 32-63) +taskset -c 0-31 ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=c --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=0 --device_id=0 \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-2 --pinned_numa_prefault=true \\ + 2>&1 | tee /tmp/tensorcast_tp4_c_concurrent_taskset_numa_rank0.log & +taskset -c 0-31 ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=c --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=1 --device_id=1 \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-2 --pinned_numa_prefault=true \\ + 2>&1 | tee /tmp/tensorcast_tp4_c_concurrent_taskset_numa_rank1.log & +taskset -c 0-31 ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=c --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=2 --device_id=2 \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-2 --pinned_numa_prefault=true \\ + 2>&1 | tee /tmp/tensorcast_tp4_c_concurrent_taskset_numa_rank2.log & +taskset -c 32-63 ./bazel-bin/core/store/materialization/benchmarks/safetensors_load_strategy_benchmark \\ + --mode=loader --strategy=c --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \\ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \\ + --tp_world_size=4 --tp_rank=3 --device_id=3 \\ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \\ + --strategy_c_staging_bytes=$((1024*1024*1024)) \\ + --pinned_numa_node=-2 --pinned_numa_prefault=true \\ + 2>&1 | tee /tmp/tensorcast_tp4_c_concurrent_taskset_numa_rank3.log & +wait +``` + 基线(全 payload 顺序读到 GPU): ```bash bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ @@ -560,6 +767,18 @@ bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benc ``` 日志:`/tmp/tensorcast_vllm_tp4_c_rank1.log` +TP=4 Strategy C_host_pack(对照:CPU pack + 1D H2D;示例取 rank1): +```bash +bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \ + --mode=loader --strategy=host_pack \ + --safetensors_dir=/mnt/host0/Qwen2.5-32B-Instruct \ + --load_plan_json_path=/tmp/vllm-loading-meta-Qwen2.5-32B-Instruct-20260117-211516/tp4/loading-meta.json \ + --tp_world_size=4 --tp_rank=1 --device_id=1 \ + --io_threads=4 --bbuf_size_kb=262144 --buffer_chunks=8 --use_pinned_host_buffer=true \ + --strategy_c_staging_bytes=$((1024*1024*1024)) +``` +日志(本次对照实验保存到模型目录内):`/mnt/host0/Qwen2.5-32B-Instruct/tensorcast_bench_logs/20260119_host_pack_compare/tp4_host_pack_rank1.log` + Strategy D:TP=4 首次生成(materialize_d,以 rank0 为例) ```bash bazel run //core/store/materialization/benchmarks:safetensors_load_strategy_benchmark -- \