Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
2afb712
chore(byte-artifact): add verbose logs for batch_set observability
zhou-yuhan Apr 26, 2026
0327b09
feat(byte-artifact): remove full-pack mirror for RDMA-based batch_set
zhou-yuhan Apr 26, 2026
908628f
feat(byte-artifact): enable composite batch stage into final bodies f…
zhou-yuhan Apr 26, 2026
197bb7b
feat(byte-artifact): remove source-side full-pack slab and use direct…
zhou-yuhan Apr 27, 2026
36475ba
feat(byte-artifact): reuse stable HOST_SHARED MRs for put source views
zhou-yuhan Apr 27, 2026
72bbc10
chore(byte-artifact): downgrade high-frequency observability logs to …
zhou-yuhan Apr 27, 2026
431b559
docs: design weight broadcast scheduling
FernanDAlumin Apr 29, 2026
83a1584
docs: plan weight broadcast scheduling
FernanDAlumin Apr 29, 2026
9b839ec
feat(sdk): add transport scheduling group context
FernanDAlumin Apr 29, 2026
0ef3978
feat(sdk): derive prefetch transport group hints
FernanDAlumin Apr 29, 2026
d3e1cf7
docs: align broadcast plan with ignored proto outputs
FernanDAlumin Apr 29, 2026
c7d619c
feat(proto): add materialize replica transport hints
FernanDAlumin Apr 29, 2026
f4c34ab
feat(sdk): forward prefetch transport hints to daemon
FernanDAlumin Apr 29, 2026
9529a15
feat(daemon): map materialize transport hints
FernanDAlumin Apr 29, 2026
f232f05
feat(plan): propagate transport group hints
FernanDAlumin Apr 29, 2026
ff787da
docs: document weight prefetch transport groups
FernanDAlumin Apr 29, 2026
e9187e3
chore(communicator): simplify transfer progress formatting
FernanDAlumin Apr 29, 2026
7780e04
docs: design tree broadcast phase2
FernanDAlumin Apr 29, 2026
749cdb1
docs: plan tree broadcast phase2
FernanDAlumin Apr 29, 2026
8ac8c82
feat(global-store): add broadcast state repository
FernanDAlumin Apr 29, 2026
c7a0561
fix(global-store): enforce broadcast repository invariants
FernanDAlumin Apr 29, 2026
1f3c281
fix(global-store): harden broadcast edge transitions
FernanDAlumin Apr 29, 2026
8db320a
feat(global-store): add broadcast session rpc
FernanDAlumin Apr 29, 2026
462308c
fix(global-store): align broadcast rpc contract
FernanDAlumin Apr 29, 2026
9d78a4c
fix(global-store): stabilize broadcast create proto fields
FernanDAlumin Apr 29, 2026
323075c
fix(global-store): harden broadcast session creation
FernanDAlumin Apr 29, 2026
dbea72e
feat(global-store): route broadcast transports through tree edges
FernanDAlumin Apr 29, 2026
c7a3fb8
fix(global-store): harden broadcast transport lifecycle
FernanDAlumin Apr 29, 2026
5cd916c
fix(global-store): ignore stale broadcast edge failures
FernanDAlumin Apr 29, 2026
7ab9aeb
fix(global-store): retry ineligible broadcast parents
FernanDAlumin Apr 29, 2026
e7fcdef
fix(global-store): recycle stale broadcast request ids
FernanDAlumin Apr 29, 2026
a785ff9
fix(global-store): reject exhausted broadcast replays
FernanDAlumin Apr 29, 2026
72fc42c
fix(global-store): preserve legacy transport fingerprints
FernanDAlumin Apr 29, 2026
bf58013
fix(global-store): stop cancelled broadcast advancement
FernanDAlumin Apr 29, 2026
c2aae7b
feat(materialize): propagate broadcast session hints
FernanDAlumin Apr 29, 2026
abdc148
fix(api): export broadcast context at top level
FernanDAlumin Apr 29, 2026
c21cce7
fix(api): propagate broadcast context through materialization
FernanDAlumin Apr 29, 2026
cb84e57
feat(core): enforce broadcast transport parent hints
FernanDAlumin Apr 29, 2026
6c1f667
fix(core): update global store client test stubs
FernanDAlumin Apr 29, 2026
7c80302
fix(core): preserve broadcast transport compatibility
FernanDAlumin Apr 29, 2026
f21a48d
feat(daemon): expose broadcast session creation
FernanDAlumin Apr 29, 2026
d1f0b92
test: cover broadcast tree dissemination
FernanDAlumin Apr 29, 2026
e1a1f70
fix(core): harden broadcast tree materialization
FernanDAlumin Apr 30, 2026
2f5d6fd
style: format C++ sources
FernanDAlumin Apr 30, 2026
89f405b
fix: Recover torch and pyyaml versions in pyproject.toml
FernanDAlumin May 6, 2026
a9663a5
style: fix ruff I001 import ordering
FernanDAlumin May 6, 2026
35dcccf
style: format global store repositories
FernanDAlumin May 6, 2026
21577ff
fix(types): resolve pyright errors and __all__ warnings
FernanDAlumin May 6, 2026
4c8be02
fix(types): narrow cursor type in BroadcastRepository
FernanDAlumin May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 177 additions & 4 deletions core/communicator/engine/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1013,15 +1013,23 @@ StagingWindow::StageFn MakeStageFunction(
v1::RdmaConfig::StagedRdmaBackend staged_backend,
bool use_direct,
ibv_mr* direct_mr,
std::shared_ptr<void> direct_keepalive,
std::shared_ptr<RdmaSourceStageProfile> source_stage_profile) {
if (use_direct) {
const uint64_t base_addr = tensor->get_uint64_addr();
const uint64_t tensor_bytes = tensor->get_bytes();
return [tensor, ledger, request_key = std::move(request_key), base_addr, tensor_bytes, direct_mr](
return [tensor,
ledger,
request_key = std::move(request_key),
base_addr,
tensor_bytes,
direct_mr,
direct_keepalive = std::move(direct_keepalive)](
uint64_t offset, uint32_t bytes, uint32_t /*segment_idx*/) -> absl::StatusOr<StageLease> {
if (offset + bytes > tensor_bytes) {
return absl::OutOfRangeError("direct RDMA stage exceeded tensor bounds");
}
(void)direct_keepalive;
void* device_ptr = reinterpret_cast<void*>(base_addr + offset);
StageLease::Metadata metadata;
metadata.transport = StageTransport::kRdma;
Expand Down Expand Up @@ -1956,6 +1964,72 @@ absl::Status Communicator::wait_for_tensor_reads_to_drain(const std::string& ten
return absl::OkStatus();
}

std::shared_ptr<Communicator::StableLocalBackingSourceViewState> Communicator::lookup_stable_local_backing_source_view(
const std::string& tensor_key) const {
absl::MutexLock lock(&stable_source_views_mu_);
auto it = stable_source_views_.find(tensor_key);
if (it == stable_source_views_.end()) {
return nullptr;
}
return it->second;
}

absl::StatusOr<Communicator::StableSourceViewMrEnsureResult> Communicator::ensure_stable_local_backing_source_view_mr(
const std::string& tensor_key,
const std::shared_ptr<StableLocalBackingSourceViewState>& view,
const net_dev_t& dev) {
if (view == nullptr) {
return absl::InvalidArgumentError("stable-backed source view is required");
}
if (dev == nullptr) {
return absl::InvalidArgumentError("stable-backed source view requires an RDMA device");
}
std::shared_ptr<StableLocalBackingState> state;
{
absl::MutexLock lock(&stable_local_backings_mu_);
auto it = stable_local_backings_.find(view->backing.backing_id);
if (it != stable_local_backings_.end()) {
state = it->second;
}
}
if (state == nullptr) {
return absl::FailedPreconditionError("stable-backed source view backing is not active");
}
auto stable_use = state->acquire_use();
if (stable_use == nullptr) {
return absl::FailedPreconditionError("stable-backed source view backing is retiring");
}

const uint32_t chunk_slots = std::max<uint32_t>(1, stable_local_mr_reuse_chunk_slots_);
const bool prewarm_requested = state->prewarm_requested_enabled();
const bool prewarm_complete = state->prewarm_complete_for_test();
auto chunk_or =
state->ensure_chunk(dev, dev->get_rail_id(), view->backing.slot_bytes, chunk_slots, view->addr, view->bytes);
if (!chunk_or.ok()) {
return chunk_or.status();
}
if (chunk_or->mr == nullptr) {
return absl::InternalError("stable-backed source view chunk MR is null");
}
VLOG(2) << "communicator.read_plan_stable_source_prepare"
<< " tensor_key=" << tensor_key << " backing_id=" << view->backing.backing_id
<< " rail_id=" << chunk_or->rail_id << " nic=" << chunk_or->nic_name
<< " chunk_index=" << chunk_or->chunk_index << " cache_hit=" << chunk_or->cache_hit
<< " waited_on_inflight=" << chunk_or->waited_on_inflight << " registered_now=" << chunk_or->registered_now
<< " prewarm_requested=" << prewarm_requested << " prewarm_complete=" << prewarm_complete;
return StableSourceViewMrEnsureResult{
.mr = chunk_or->mr,
.backing_use = stable_use,
.backing_id = view->backing.backing_id,
.chunk_index = chunk_or->chunk_index,
.cache_hit = chunk_or->cache_hit,
.waited_on_inflight = chunk_or->waited_on_inflight,
.registered_now = chunk_or->registered_now,
.prewarm_requested = prewarm_requested,
.prewarm_complete = prewarm_complete,
};
}

std::shared_ptr<Communicator::TransferProgressState> Communicator::create_transfer_progress_state(
std::string transfer_id,
std::string request_key,
Expand Down Expand Up @@ -2033,7 +2107,7 @@ uint64_t Communicator::add_transfer_progress_bytes(
const double progress_percent =
state->total_bytes > 0 ? static_cast<double>(done) * 100.0 / static_cast<double>(state->total_bytes) : 100.0;
LOG(INFO) << std::format(
"[xfer_progress] side={} transport={} state=progress peer={} request={} bar=[{}] {:5.1f}% "
"[xfer_progress] side={} transport={} state=progress peer={} request={} bar=[{}] {:.1f}% "
"done_gib={:.3f}/{:.3f} rate_inst_gibps={:.3f} rate_avg_gibps={:.3f}",
state->side,
state->transport,
Expand Down Expand Up @@ -2077,7 +2151,7 @@ void Communicator::finish_transfer_progress(
const std::string phase = status.ok() ? "done" : "failed";
const std::string status_text = status.ok() ? std::string() : truncate_token(status.message(), 120);
const std::string line = std::format(
"[xfer_progress] side={} transport={} state={} peer={} request={} bar=[{}] {:5.1f}% "
"[xfer_progress] side={} transport={} state={} peer={} request={} bar=[{}] {:.1f}% "
"done_gib={:.3f}/{:.3f} rate_inst_gibps={:.3f} rate_avg_gibps={:.3f}{}",
state->side,
state->transport,
Expand Down Expand Up @@ -2729,6 +2803,7 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
v1::RdmaConfig::StagedRdmaBackend staged_backend = v1::RdmaConfig::STAGED_RDMA_BACKEND_HOST_PINNED;
bool direct_eligible = false;
ibv_mr* direct_mr = nullptr;
std::shared_ptr<void> direct_keepalive;
};

std::vector<PlannedSourceSlice> resolved_slices;
Expand All @@ -2752,6 +2827,7 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
}
read_guards->push_back(*read_guard_or);
tensor->wait_read_ready();
auto stable_source_view = lookup_stable_local_backing_source_view(tensor_key);

auto dev = task.request.rail_id >= 0 ? tensor->get_dev_by_rail(task.request.rail_id) : nullptr;
if (dev == nullptr) {
Expand All @@ -2762,6 +2838,15 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
}
}
const bool tensor_on_cpu = tensor->get_mem_type() == COMMUNICATE_ENGINE_DEV_CPU;
if (dev == nullptr && tensor_on_cpu && stable_source_view != nullptr) {
const int requested_rail = task.request.rail_id >= 0
? task.request.rail_id
: (session->dev != nullptr ? session->dev->get_rail_id() : -1);
dev = get_net_dev(COMMUNICATE_ENGINE_DEV_CPU, 0, tensor_key, requested_rail);
if (dev != nullptr) {
tensor->add_dev(dev);
}
}
if (tensor_on_cpu && session->dev != nullptr &&
(dev == nullptr || session->dev->get_name() != dev->get_name() ||
session->dev->get_rail_id() != dev->get_rail_id())) {
Expand All @@ -2770,6 +2855,9 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
<< " selected_nic=" << (dev != nullptr ? dev->get_name() : "<none>")
<< " session_rail=" << session->dev->get_rail_id() << " session_nic=" << session->dev->get_name();
dev = session->dev;
if (stable_source_view != nullptr && tensor->get_dev_by_rail(dev->get_rail_id()) == nullptr) {
tensor->add_dev(dev);
}
}
if (dev == nullptr) {
return absl::FailedPreconditionError(absl::StrCat("read_plan tensor missing RDMA device: ", tensor_key));
Expand Down Expand Up @@ -2805,6 +2893,7 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
const bool direct_requested = tensor->direct_rdma_enabled();
bool direct_eligible = false;
ibv_mr* direct_mr = nullptr;
std::shared_ptr<void> direct_keepalive;
DirectFallbackReason fallback_reason = DirectFallbackReason::kNone;
if (direct_requested) {
const bool direct_mem_supported = tensor_on_cpu || tensor->get_mem_type() == COMMUNICATE_ENGINE_DEV_GPU;
Expand All @@ -2813,7 +2902,18 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
} else if (tensor->needs_staging()) {
fallback_reason = DirectFallbackReason::kNeedsStaging;
} else {
if (tensor_on_cpu) {
if (tensor_on_cpu && stable_source_view != nullptr) {
auto ensure_result_or = ensure_stable_local_backing_source_view_mr(tensor_key, stable_source_view, dev);
if (!ensure_result_or.ok()) {
return ensure_result_or.status();
}
direct_mr = ensure_result_or->mr;
direct_keepalive = std::move(ensure_result_or->backing_use);
direct_eligible = direct_mr != nullptr;
if (!direct_eligible) {
fallback_reason = DirectFallbackReason::kMrUnavailable;
}
} else if (tensor_on_cpu) {
auto ensure_result_or = ensure_tensor_registered_on_dev(tensor, dev);
if (!ensure_result_or.ok()) {
fallback_reason = DirectFallbackReason::kMrUnavailable;
Expand Down Expand Up @@ -2863,6 +2963,7 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
.staged_backend = staged_backend,
.direct_eligible = direct_eligible,
.direct_mr = direct_mr,
.direct_keepalive = direct_keepalive,
});
if (source.bytes > std::numeric_limits<uint64_t>::max() - total_bytes) {
return absl::InvalidArgumentError("read_plan source byte count overflow");
Expand Down Expand Up @@ -2927,6 +3028,7 @@ absl::StatusOr<std::shared_ptr<RdmaReadSession>> Communicator::admit_read_plan_r
source.staged_backend,
use_direct,
source.direct_mr,
source.direct_keepalive,
session->source_stage_profile),
.chunk_size = chunk_size,
.zero_copy = use_direct,
Expand Down Expand Up @@ -3274,6 +3376,72 @@ absl::Status Communicator::deactivate_stable_local_backing(std::string_view back
return absl::OkStatus();
}

absl::Status Communicator::register_stable_local_backing_source_view(const StableLocalBackingSourceView& view) {
if (!enable_rdma_ || rdma_context_ == nullptr) {
return absl::FailedPreconditionError("stable-backed source views require RDMA");
}
if (view.tensor_key.empty()) {
return absl::InvalidArgumentError("stable-backed source view requires tensor_key");
}
if (view.addr == 0 || view.bytes == 0) {
return absl::InvalidArgumentError("stable-backed source view requires non-empty address range");
}
if (view.backing.kind != StableLocalBackingKind::kHostSharedRegion || view.backing.backing_id.empty() ||
view.backing.backing_base_addr == 0 || view.backing.backing_bytes == 0 ||
view.backing.dev_type != COMMUNICATE_ENGINE_DEV_CPU || view.backing.slot_bytes == 0) {
return absl::InvalidArgumentError("stable-backed source view requires CPU HOST_SHARED backing with slot geometry");
}
if (view.keepalive == nullptr) {
return absl::InvalidArgumentError("stable-backed source view requires keepalive");
}

std::shared_ptr<StableLocalBackingState> state;
{
absl::MutexLock lock(&stable_local_backings_mu_);
auto it = stable_local_backings_.find(view.backing.backing_id);
if (it != stable_local_backings_.end()) {
state = it->second;
}
}
if (state == nullptr) {
return absl::FailedPreconditionError("stable-backed source view backing is not active");
}
auto merge_status = state->merge_activation_backing(view.backing, nullptr);
if (!merge_status.ok()) {
return merge_status;
}
const uint32_t chunk_slots = std::max<uint32_t>(1, stable_local_mr_reuse_chunk_slots_);
auto chunk_or = state->resolve_chunk_for_region(view.backing.slot_bytes, chunk_slots, view.addr, view.bytes);
if (!chunk_or.ok()) {
return chunk_or.status();
}

auto tensor = std::make_shared<PartitionTensor>(
view.tensor_key,
view.addr,
view.bytes,
COMMUNICATE_ENGINE_DEV_CPU,
/*dev=*/nullptr);
tensor->set_read_ready();
tensor->set_direct_rdma_enabled(true);
store_.register_tensor(tensor);
{
absl::MutexLock lock(&stable_source_views_mu_);
stable_source_views_[view.tensor_key] =
std::make_shared<StableLocalBackingSourceViewState>(StableLocalBackingSourceViewState{
.addr = view.addr,
.bytes = view.bytes,
.backing = view.backing,
.keepalive = view.keepalive,
});
}
VLOG(2) << "stable_local_backing.source_view_register"
<< " key=" << view.tensor_key << " backing_id=" << view.backing.backing_id << " addr=0x" << std::hex
<< view.addr << std::dec << " bytes=" << view.bytes << " slot_bytes=" << view.backing.slot_bytes
<< " chunk_index=" << chunk_or->chunk_index << " chunk_bytes=" << chunk_or->chunk_bytes;
return absl::OkStatus();
}

bool Communicator::stable_local_backing_supported_for_test() const {
return enable_rdma_ && rdma_context_ != nullptr;
}
Expand Down Expand Up @@ -4743,6 +4911,7 @@ absl::Status Communicator::handle_rdma_read_request(
staged_backend,
use_direct,
direct_mr,
nullptr,
session->source_stage_profile);
session->window =
std::make_unique<StagingWindow>(*ledger_ptr, stage_fn, total_bytes, chunk_size, start_offset, window_segments);
Expand Down Expand Up @@ -5058,6 +5227,10 @@ absl::Status Communicator::unregister_tensor(const std::string& tensor_key) {
} else {
store_.unregister_tensor(tensor_key);
}
{
absl::MutexLock lock(&stable_source_views_mu_);
stable_source_views_.erase(tensor_key);
}

{
absl::MutexLock lock(&tensor_read_mu_);
Expand Down
38 changes: 38 additions & 0 deletions core/communicator/engine/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ class Communicator {

absl::Status deactivate_stable_local_backing(std::string_view backing_id);

struct StableLocalBackingSourceView {
std::string tensor_key;
uint64_t addr = 0;
uint64_t bytes = 0;
tensorcast::store::StableLocalBackingRef backing;
std::shared_ptr<void> keepalive;
};

absl::Status register_stable_local_backing_source_view(const StableLocalBackingSourceView& view);

// Test-only stable-backing introspection.
bool stable_local_backing_supported_for_test() const;
bool stable_local_backing_active_for_test(std::string_view backing_id) const;
Expand Down Expand Up @@ -294,6 +304,25 @@ class Communicator {
double gate_wait_ms = 0.0;
};

struct StableSourceViewMrEnsureResult {
struct ibv_mr* mr = nullptr;
std::shared_ptr<void> backing_use;
std::string backing_id;
uint64_t chunk_index = 0;
bool cache_hit = false;
bool waited_on_inflight = false;
bool registered_now = false;
bool prewarm_requested = false;
bool prewarm_complete = false;
};

struct StableLocalBackingSourceViewState {
uint64_t addr = 0;
uint64_t bytes = 0;
tensorcast::store::StableLocalBackingRef backing;
std::shared_ptr<void> keepalive;
};

struct TensorReadState {
int inflight = 0;
bool retiring = false;
Expand All @@ -303,6 +332,12 @@ class Communicator {
absl::StatusOr<std::shared_ptr<void>> acquire_tensor_read_lease(const std::string& tensor_key);
void release_tensor_read_lease(const std::string& tensor_key);
absl::Status wait_for_tensor_reads_to_drain(const std::string& tensor_key, absl::Duration timeout);
std::shared_ptr<StableLocalBackingSourceViewState> lookup_stable_local_backing_source_view(
const std::string& tensor_key) const;
absl::StatusOr<StableSourceViewMrEnsureResult> ensure_stable_local_backing_source_view_mr(
const std::string& tensor_key,
const std::shared_ptr<StableLocalBackingSourceViewState>& view,
const transport::net_dev_t& dev);

struct MtcpReadTask {
channel_t channel;
Expand Down Expand Up @@ -457,6 +492,9 @@ class Communicator {
mutable absl::Mutex stable_local_backings_mu_;
absl::flat_hash_map<std::string, std::shared_ptr<StableLocalBackingState>> stable_local_backings_
ABSL_GUARDED_BY(stable_local_backings_mu_);
mutable absl::Mutex stable_source_views_mu_;
absl::flat_hash_map<std::string, std::shared_ptr<StableLocalBackingSourceViewState>> stable_source_views_
ABSL_GUARDED_BY(stable_source_views_mu_);

// Serialize channel creation to avoid duplicate control connections to same peer
mutable absl::Mutex create_channel_mu_;
Expand Down
Loading
Loading