Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions core/common/memory/pinned_memory_authority.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ absl::Status PinnedMemoryAuthority::validate_and_build_pools() {
pools_.resize(classes_.size());
if (classes_.size() == 1) {
const auto& cls = classes_.front();
PinnedBufferPool::Options options{
.name = std::string(cls.name),
.register_on_create = !cfg_.defer_host_registration,
};
PinnedBufferPool::Options options;
options.name = std::string(cls.name);
options.numa_node = cls.numa_node;
options.prefault = cls.numa_prefault;
options.register_on_create = !cfg_.defer_host_registration;
pools_.front() = std::make_shared<PinnedBufferPool>(static_cast<size_t>(cls.pool_bytes), cls.slice_bytes, options);
return absl::OkStatus();
}
Expand Down
2 changes: 2 additions & 0 deletions core/common/memory/pinned_memory_authority.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class PinnedMemoryAuthority {
// Phase 1 fixed-allocation: fully preallocated capacity of this class pool.
uint64_t pool_bytes = 0;
bool rdma_preregister = false;
int numa_node = -1;
bool numa_prefault = false;
};

struct Config {
Expand Down
2 changes: 1 addition & 1 deletion core/store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ Implementation: `try_evict_gpu_memory_impl()` in store_engine.cc. CPU VS memory
- Content-addressed `mi2:...` artifact IDs require an authoritative source binding (Global Store routing, managed shared-disk location, or daemon-local disk import). Core no longer accepts string path hints.
- Per-GPU transfer concurrency is limited to 1 active session by design to reduce VRAM fragmentation and pressure.
- Canonical tensor indices are expected in schema version `"v3"`; the engine emits v3 descriptors and rejects older schemas on write paths.
- View replica registration still publishes canonical residency to Global Store. The engine issues a best-effort `record_view_residency` call so the daemon can start wiring the future RPC; until Global Store implements it the call returns `UNIMPLEMENTED` and is logged at `VLOG(1)`.
- View replica registration publishes minimal view residency to Global Store via `record_view_residency`. This residency record is queryable even when only `view_id`, `view_size`, and optional `view_data_hash` are known; explicit view registration paths still carry the full `view_spec_json` and coverage metadata.

For broader architectural context, see docs/architecture.md and docs/state-management.md.
- Verification metadata: canonical replicas reuse `verification.json`. Views persist per-view metadata under `verification.view_<sanitized_view_id>.json`; each record carries the `byte_space_id` so canonical metadata is never reused for a view. Every persisted payload now embeds a `metadata_signature` (canonical SHA-256 of the serialized payload). The loader re-reads the on-disk JSON on every materialization, validates the signature, and compares the payload fingerprint against any cached entry before reuse. Tampered or truncated files trigger `DataLoss` (cache is invalidated) and force regeneration, while cached entries are only reused when the file is absent and a fresh persistence will rewrite it. P2P senders may still provide inline `verification_json`; the backend `ingest_from_p2p()` path now flows through the runtime pipeline, which performs fast KEY_POINTS verification of the loaded replica (CPU/GPU). Verification failure returns a `DataLoss` error and aborts materialization. All metadata reads/writes are serialized through `VerificationMetadataGuard`, persisted via an atomic write helper (`open` → `write` → `fsync` → `rename` + directory `fsync`), and accompanied by structured `verification_metadata_write_{succeeded,failed}` logs that surface artifact, byte-space, guard wait, and write durations.
Expand Down
1 change: 1 addition & 0 deletions core/store/communication_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ struct P2PSource {
std::vector<std::string> memory_keys;
std::vector<size_t> buf_sizes;
bool enable_checksum = true;
bool source_is_view = false;
Location location;

// Optional verification metadata (JSON) passed from the sender side, e.g.,
Expand Down
96 changes: 87 additions & 9 deletions core/store/components/global_store_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,57 @@ const char* status_to_cstr(global_store::Status s) {
}
}

std::optional<ViewTransportMetadata> convert_view_transport_metadata(
const global_store::RequestReplicaTransportResponse& response) {
if (!response.has_view_transport_metadata()) {
return std::nullopt;
}
const auto& proto = response.view_transport_metadata();
if (proto.view_id().empty()) {
return std::nullopt;
}
ViewTransportMetadata metadata;
metadata.view_id = proto.view_id();
metadata.view_size_bytes = proto.view_size_bytes();
if (proto.has_view_data_hash() && !proto.view_data_hash().empty()) {
metadata.view_data_hash = proto.view_data_hash();
}
return metadata;
}

global_store::TransportRouteKind normalize_transport_route_kind(
global_store::TransportRouteKind route_kind,
const RemoteReplicaInfo& remote_replica,
std::optional<std::string_view> requested_view_id) {
if (route_kind != global_store::TRANSPORT_ROUTE_KIND_UNSPECIFIED) {
return route_kind;
}
if (requested_view_id.has_value() && remote_replica.view_id.has_value() &&
*remote_replica.view_id == std::string(*requested_view_id)) {
return global_store::TRANSPORT_ROUTE_KIND_RESIDENT_VIEW;
}
return global_store::TRANSPORT_ROUTE_KIND_CANONICAL;
}

std::optional<ViewTransportMetadata> normalize_view_transport_metadata(
const global_store::RequestReplicaTransportResponse& response,
const RemoteReplicaInfo& remote_replica,
std::optional<std::string_view> requested_view_id) {
auto metadata = convert_view_transport_metadata(response);
if (metadata.has_value()) {
return metadata;
}
if (requested_view_id.has_value() && remote_replica.view_id.has_value() &&
*remote_replica.view_id == std::string(*requested_view_id)) {
return ViewTransportMetadata{
.view_id = *remote_replica.view_id,
.view_size_bytes = remote_replica.memory_size,
.view_data_hash = std::nullopt,
};
}
return std::nullopt;
}

global_store::TransportCompletionOutcome to_proto_transport_completion_outcome(TransportCompletionOutcome outcome) {
switch (outcome) {
case TransportCompletionOutcome::kSuccess:
Expand Down Expand Up @@ -1087,14 +1138,27 @@ absl::Status GlobalStoreClient::record_view_residency(
std::string_view view_id,
uint64_t view_size_bytes,
std::optional<std::string_view> view_data_hash) {
(void)canonical_artifact_id;
(void)view_id;
(void)view_size_bytes;
(void)view_data_hash;
// A dedicated RPC for view metadata will be introduced for view-residency signals.
// Until that lands, treat this as a best-effort noop so core plumbing can wire
// the call sites without coupling to server availability.
return absl::UnimplementedError("Global Store view residency RPC not yet implemented");
if (!is_connected()) {
return absl::FailedPreconditionError("GlobalStoreClient not connected");
}
if (canonical_artifact_id.empty()) {
return absl::InvalidArgumentError("record_view_residency requires canonical_artifact_id");
}
if (view_id.empty()) {
return absl::InvalidArgumentError("record_view_residency requires view_id");
}
if (view_size_bytes == 0) {
return absl::InvalidArgumentError("record_view_residency requires view_size_bytes > 0");
}

ViewStateUpdate update;
update.artifact_id = std::string(canonical_artifact_id);
update.view_id = std::string(view_id);
update.view_size_bytes = view_size_bytes;
if (view_data_hash.has_value() && !view_data_hash->empty()) {
update.view_data_hash = std::string(*view_data_hash);
}
return update_artifact_view_state(update);
}

absl::Status GlobalStoreClient::update_artifact_view_state(const ViewStateUpdate& update) {
Expand Down Expand Up @@ -2218,6 +2282,10 @@ absl::Status GlobalStoreClient::unregister_replica(std::string_view artifact_id,
}

if (response.status() != global_store::STATUS_OK) {
if (response.status() == global_store::STATUS_NOT_FOUND) {
return absl::NotFoundError(
absl::StrFormat("UnregisterReplica target not found: artifact_id=%s replica_id=%s", artifact_id, replica_id));
}
return absl::InternalError(absl::StrFormat("UnregisterReplica failed with status: %d", response.status()));
}

Expand Down Expand Up @@ -2436,7 +2504,10 @@ absl::StatusOr<TransportSession> GlobalStoreClient::request_replica_transport(
TransportSession session;
session.transport_id = response.transport_id();
session.remote_replica = convert_from_proto_memory_info(response.remote_memory_info());
session.remote_replica.grpc_port = response.source_grpc_port();
session.start_time = absl::Now();
session.route_kind = normalize_transport_route_kind(response.route_kind(), session.remote_replica, std::nullopt);
session.view_transport_metadata = normalize_view_transport_metadata(response, session.remote_replica, std::nullopt);

LOG(INFO) << "Started P2P transport " << session.transport_id << " from " << session.remote_replica.node_id;

Expand Down Expand Up @@ -2535,7 +2606,10 @@ absl::StatusOr<TransportSession> GlobalStoreClient::request_view_transport(
TransportSession session;
session.transport_id = response.transport_id();
session.remote_replica = convert_from_proto_memory_info(response.remote_memory_info());
session.remote_replica.grpc_port = response.source_grpc_port();
session.start_time = absl::Now();
session.route_kind = normalize_transport_route_kind(response.route_kind(), session.remote_replica, view_id);
session.view_transport_metadata = normalize_view_transport_metadata(response, session.remote_replica, view_id);

LOG(INFO) << "Started view transport " << session.transport_id << " from " << session.remote_replica.node_id
<< " view_id=" << view_id;
Expand Down Expand Up @@ -3281,9 +3355,13 @@ absl::StatusOr<KeyMapping> GlobalStoreClient::resolve_key_mapping_with_options(
if (!status.ok()) {
return status;
}
if (response.status() != global_store::STATUS_OK) {
if (response.status() == global_store::STATUS_NOT_FOUND) {
return absl::NotFoundError("key not found");
}
if (response.status() != global_store::STATUS_OK) {
return absl::InternalError(
absl::StrFormat("ResolveKeyMapping failed with global-store status=%d", static_cast<int>(response.status())));
}

KeyMapping out{
.artifact_id = response.artifact_id(),
Expand Down
9 changes: 9 additions & 0 deletions core/store/components/global_store_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ struct RemoteReplicaInfo {
std::string endpoint_id;
std::string node_address;
uint32_t node_port;
uint32_t grpc_port{0};
uint64_t memory_size;
common::memory::MemoryLocation memory_type;
uint32_t device_id;
Expand All @@ -120,11 +121,19 @@ struct RemoteReplicaInfo {
std::optional<std::string> view_id;
};

struct ViewTransportMetadata {
std::string view_id;
uint64_t view_size_bytes{0};
std::optional<std::string> view_data_hash;
};

// Transport session for P2P transfers
struct TransportSession {
std::string transport_id;
RemoteReplicaInfo remote_replica;
absl::Time start_time;
global_store::TransportRouteKind route_kind{global_store::TRANSPORT_ROUTE_KIND_UNSPECIFIED};
std::optional<ViewTransportMetadata> view_transport_metadata;
};

struct TransportSchedulingGroupHint {
Expand Down
2 changes: 2 additions & 0 deletions core/store/materialization/control/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ sc_cc_library(
"//core/store:global_store_client",
"//core/store:worker_identity",
"//core/store/materialization/contracts:loading_spec",
"//proto/tensorcast/daemon/v2:daemon_grpc_cc",
"@abseil-cpp//absl/cleanup",
"@abseil-cpp//absl/log:absl_log",
"@abseil-cpp//absl/strings",
"@gsl",
Expand Down
Loading
Loading