Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0bdfebd
feat(serving): add artifact-centered boundary facades
wolegechu May 17, 2026
6cef671
feat(serving): add lifecycle runtime integration
wolegechu May 20, 2026
64ad84d
feat(serving): add runtime diagnostics and parity helpers
wolegechu May 20, 2026
5832891
fix: select external preload authority by member
wolegechu May 20, 2026
1388bf6
feat: refine artifact-centered serving realization
wolegechu May 23, 2026
3c00d12
docs: add unified artifact realization kernel
wolegechu May 23, 2026
6ff19d9
feat: complete unified artifact realization
wolegechu May 25, 2026
74f7b30
fix: fix ci lint
wolegechu May 25, 2026
67e9d36
fix: remove stale mypy ignore
wolegechu May 25, 2026
a472c00
fix: fix python ci and update 0120
wolegechu May 25, 2026
751a18f
Merge branch 'main' into codex/0121-unified-artifact-realization
wolegechu May 25, 2026
7a71023
fix: fix python ci and update 0120
wolegechu May 25, 2026
9cb73d4
fix: clean native runtime before pytest teardown
wolegechu May 25, 2026
a1cc0bd
feat: artifact-centered runtime realization cleanup
wolegechu May 25, 2026
a7f7448
chore: merge main into artifact runtime realization branch
wolegechu May 26, 2026
3250201
fix: reconcile artifact runtime paths after main merge
wolegechu May 26, 2026
e640bc5
fix: align runtime publication schema handling
wolegechu May 26, 2026
db451dd
chore: clean up artifact runtime lifecycle imports
wolegechu May 26, 2026
ed1caa5
fix: centralize artifact runtime source subjects
wolegechu May 26, 2026
f184281
refactor: complete artifact runtime ownership cleanup
wolegechu May 26, 2026
ce67471
Merge branch 'main' into codex/0120-artifact-runtime-realization-revi…
wolegechu May 26, 2026
aa57ad4
fix: tighten artifact runtime validation gates
wolegechu May 26, 2026
27dd9a9
chore: update
wolegechu May 26, 2026
fdbbd86
fix: implement artifact realization review fixes
wolegechu May 26, 2026
3d73aa4
fix(daemon): clean up runtime binding leases
wolegechu May 26, 2026
1d6631f
fix: align artifact realization consistency
wolegechu May 26, 2026
db2e4f0
fix: keep local ready finalize source selection optional
wolegechu May 26, 2026
5e9e0f9
ci: preserve pytest result before native teardown
wolegechu May 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion .github/workflows/ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,23 @@ jobs:

- name: Run Python tests
run: |
uv run pytest tests/python/ -v --tb=short
uv run python - <<'PY'
import contextlib
import os
import sys

import pytest

exit_code = pytest.main(["tests/python/", "-v", "--tb=short"])
if "tensorcast._C" in sys.modules:
with contextlib.suppress(Exception):
from tensorcast._c_ext import get_c_ext

get_c_ext().shutdown_native_runtime()
sys.stdout.flush()
sys.stderr.flush()
os._exit(int(exit_code))
PY
env:
LD_LIBRARY_PATH: ${{ github.workspace }}/tensorcast/lib:${{ env.LD_LIBRARY_PATH }}
TENSORCAST_CUDA_BACKEND: fake
Expand Down
15 changes: 15 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
default_install_hook_types:
- pre-commit
- commit-msg
- pre-push
exclude: ^.github/actions/assigner/dist
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down Expand Up @@ -90,6 +91,20 @@ repos:
- --use-current-year
- repo: local
hooks:
- id: pyright
name: pyright (tensorcast)
language: system
entry: env UV_NO_SYNC=1 uv run pyright ./tensorcast
pass_filenames: false
files: ^(tensorcast/.*\.py|pyproject\.toml)$
stages: [pre-push]
- id: mypy
name: mypy (tensorcast)
language: system
entry: env UV_NO_SYNC=1 uv run mypy ./tensorcast
pass_filenames: false
files: ^(tensorcast/.*\.py|pyproject\.toml)$
stages: [pre-push]
- id: webui-prettier
name: webui-prettier-check
language: system
Expand Down
8 changes: 8 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ ruff check .
ruff format . --check
```

The pre-push hook also runs the CI-matching package type checks through the
project `uv` environment:

```bash
pyright ./tensorcast
mypy ./tensorcast
```

If you modify protocol buffers, regenerate Python stubs and C++ headers:

```bash
Expand Down
4 changes: 4 additions & 0 deletions daemon/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -1683,6 +1683,7 @@ sc_cc_library(
"//proto/tensorcast/global_store/v1:global_store_grpc_cc",
"@abseil-cpp//absl/strings",
"@abseil-cpp//absl/types:span",
"@protobuf",
],
)

Expand Down Expand Up @@ -1965,14 +1966,17 @@ cc_test(
name = "pid_monitor_unwatch_integration_test",
srcs = ["state/pid_monitor_unwatch_integration_test.cc"],
deps = [
":handle_lease_registry_lib",
":ipc_region_registry_lib",
":lifecycle_kernel_lib",
":lip_manager_lib",
":pid_monitor_lib",
":ref_tracker_hdr",
":registration_manager_lib",
":session_lifecycle_lib",
":session_manager_hdr",
"//core/store:device_registry",
"//core/store:store_engine",
"@catch2//:catch2_main",
],
)
Expand Down
1 change: 1 addition & 0 deletions daemon/service/controllers/materialization_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ grpc::Status MaterializationController::prefetch_serving_binding(
status->set_message(failed ? "serving binding set materialization failed" : "serving binding set is local-ready");
status->set_progress(1.0);
status->mutable_result()->PackFrom(set_result);
attach_controller_realization_plan_span_attrs(rctx, realization_plan);
rctx.mark_success();
return grpc::Status::OK;
}
Expand Down
160 changes: 136 additions & 24 deletions daemon/service/controllers/materialization_policy_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <algorithm>
#include <cstdint>
#include <format>
#include <limits>
#include <optional>
#include <string>
#include <string_view>
Expand All @@ -18,6 +19,9 @@
#include "core/common/artifact_hash.h"
#include "core/store/materialization/dataplane/view/view_identity.h"
#include "daemon/service/rpc_context.h"
#include "google/protobuf/io/coded_stream.h"
#include "google/protobuf/io/zero_copy_stream_impl_lite.h"
#include "google/protobuf/message_lite.h"

namespace tensorcast::daemon::materialization_policy {

Expand All @@ -30,6 +34,7 @@ using store::loader::ViewOp;
constexpr std::string_view kGroupRealizationTransportKind = "group_realization_transport";
constexpr std::string_view kGroupRealizationChildTransportRequestProfile =
"tensorcast.group_realization.child_transport_request.v1";
constexpr std::string_view kControllerSourceSelectionDigestProfile = "tensorcast.controller.source_selection_digest.v1";

store::loading::SourceLocalityHint to_source_locality(v2::SourceLocality locality) {
switch (locality) {
Expand Down Expand Up @@ -375,20 +380,124 @@ std::vector<std::string> acquire_group_barriers_for(const v2::GroupRealizationAc
return barriers;
}

std::string serialize_deterministic(const google::protobuf::MessageLite& message) {
std::string output;
{
google::protobuf::io::StringOutputStream string_stream(&output);
google::protobuf::io::CodedOutputStream coded_stream(&string_stream);
coded_stream.SetSerializationDeterministic(true);
if (!message.SerializeToCodedStream(&coded_stream) || coded_stream.HadError()) {
return message.SerializeAsString();
}
}
return output;
}

void append_big_endian_u64(std::vector<uint8_t>* out, uint64_t value) {
for (int shift = 56; shift >= 0; shift -= 8) {
out->push_back(static_cast<uint8_t>((value >> shift) & 0xffU));
}
}

void append_digest_part(std::vector<uint8_t>* out, std::string_view part) {
append_big_endian_u64(out, static_cast<uint64_t>(part.size()));
out->insert(out->end(), part.begin(), part.end());
}

std::string sha256_hex_for_parts(const std::vector<std::string_view>& parts) {
uint64_t total_size = 0;
for (std::string_view part : parts) {
total_size += 8U + static_cast<uint64_t>(part.size());
}
std::vector<uint8_t> payload;
if (total_size <= static_cast<uint64_t>(std::numeric_limits<size_t>::max())) {
payload.reserve(static_cast<size_t>(total_size));
}
for (std::string_view part : parts) {
append_digest_part(&payload, part);
}
const std::vector<uint8_t> digest = common::sha256_digest_bytes(absl::MakeConstSpan(payload));
return absl::BytesToHexString(std::string(reinterpret_cast<const char*>(digest.data()), digest.size()));
}

std::string artifact_profile_for(std::string_view artifact_id) {
if (artifact_id.starts_with("msa1:")) {
return "mounted_source";
}
if (artifact_id.starts_with("cgid:")) {
return "byte_artifact";
}
return "durable_artifact";
}

std::string authority_scope_for(std::string_view artifact_id) {
if (artifact_id.starts_with("msa1:")) {
return "daemon_local_mounted_source";
}
return "daemon_mediated_durable";
}

std::optional<std::string> requested_generation_hint_for(const v2::GroupRealizationOptions* group_realization) {
if (group_realization == nullptr || !group_realization->enabled()) {
return std::nullopt;
}
if (group_realization->version().value_case() != v2::VersionReference::kKeyReference) {
return std::nullopt;
}
const v2::KeyVersionReference& key_ref = group_realization->version().key_reference();
if (!key_ref.has_expected_generation()) {
return std::nullopt;
}
return std::to_string(key_ref.expected_generation());
}

std::optional<std::string> requested_version_set_id_for(const v2::GroupRealizationOptions* group_realization) {
if (group_realization == nullptr || !group_realization->enabled()) {
return std::nullopt;
}
if (group_realization->version().value_case() != v2::VersionReference::kExplicitVersionSet) {
return std::nullopt;
}
const std::string& version_set_id = group_realization->version().explicit_version_set().version_set_id();
if (version_set_id.empty()) {
return std::nullopt;
}
return version_set_id;
}

std::optional<std::string> selection_digest_for(
const v2::GroupRealizationOptions* group_realization,
const GroupRealizationBeginContext* begin_context,
const tensorcast::common::v1::ArtifactSelection& selection) {
if (begin_context != nullptr && !begin_context->selection_hash.empty()) {
return absl::BytesToHexString(begin_context->selection_hash);
}
if (!selection.selection_hash().empty()) {
return absl::BytesToHexString(selection.selection_hash());
}
if (group_realization != nullptr && group_realization->enabled()) {
const tensorcast::common::v1::ArtifactSelection& effective_selection =
begin_context != nullptr && !begin_context->part_selection.artifact_id().empty() ? begin_context->part_selection
: selection;
if (effective_selection.artifact_id().empty()) {
return std::nullopt;
}
return std::nullopt;

const std::string serialized_selection = serialize_deterministic(effective_selection);
const std::string selection_identity = begin_context != nullptr && !begin_context->selection_hash.empty()
? begin_context->selection_hash
: effective_selection.selection_hash();
std::string generation_hint;
if (begin_context != nullptr && begin_context->key_generation != 0) {
generation_hint = std::to_string(begin_context->key_generation);
} else if (std::optional<std::string> requested_generation = requested_generation_hint_for(group_realization);
requested_generation.has_value()) {
generation_hint = *requested_generation;
}
const std::string profile = artifact_profile_for(effective_selection.artifact_id());
const std::string scope = authority_scope_for(effective_selection.artifact_id());
return sha256_hex_for_parts({
kControllerSourceSelectionDigestProfile,
serialized_selection,
effective_selection.logical_layout_hash(),
selection_identity,
profile,
scope,
generation_hint,
});
}

std::optional<std::string> operation_id_for(const v2::MaterializeIntoTargetRequest& request) {
Expand Down Expand Up @@ -896,7 +1005,7 @@ absl::StatusOr<ControllerRealizationPlan> build_controller_realization_plan_impl
.group_barriers = group_barriers_for(group_realization),
.version_set_id = group_begin_context != nullptr && !group_begin_context->version_set.version_set_id().empty()
? std::optional<std::string>(group_begin_context->version_set.version_set_id())
: std::nullopt,
: requested_version_set_id_for(group_realization),
.transaction_id = group_begin_context != nullptr && !group_begin_context->transaction_id.empty()
? std::optional<std::string>(group_begin_context->transaction_id)
: std::nullopt,
Expand Down Expand Up @@ -956,11 +1065,15 @@ absl::StatusOr<ControllerRealizationPlan> build_prefetch_target_set_realization_
: "same_daemon_session",
.collective_policy = prefetch_collective_policy_for(request, member_count),
.group_barriers = group_barriers_for(request.has_group_realization() ? &request.group_realization() : nullptr),
.version_set_id = std::nullopt,
.version_set_id =
requested_version_set_id_for(request.has_group_realization() ? &request.group_realization() : nullptr),
.transaction_id = std::nullopt,
.source_selection_digest = !request.source().artifact_selection_digest().empty()
? std::optional<std::string>(request.source().artifact_selection_digest())
: selection_digest_for(nullptr, nullptr, request.source_selection()),
: selection_digest_for(
request.has_group_realization() ? &request.group_realization() : nullptr,
nullptr,
request.source_selection()),
};
plan.lifecycle = ControllerRealizationLifecyclePlan{
.capability = "target_set",
Expand Down Expand Up @@ -1020,11 +1133,15 @@ absl::StatusOr<ControllerRealizationPlan> build_prefetch_member_realization_plan
: "same_daemon_session",
.collective_policy = prefetch_collective_policy_for(request, member_count),
.group_barriers = group_barriers_for(request.has_group_realization() ? &request.group_realization() : nullptr),
.version_set_id = std::nullopt,
.version_set_id =
requested_version_set_id_for(request.has_group_realization() ? &request.group_realization() : nullptr),
.transaction_id = std::nullopt,
.source_selection_digest = !request.source().artifact_selection_digest().empty()
? std::optional<std::string>(request.source().artifact_selection_digest())
: selection_digest_for(nullptr, nullptr, request.source_selection()),
: selection_digest_for(
request.has_group_realization() ? &request.group_realization() : nullptr,
nullptr,
request.source_selection()),
};
plan.lifecycle = ControllerRealizationLifecyclePlan{
.capability = "retained_binding",
Expand Down Expand Up @@ -1259,7 +1376,7 @@ absl::StatusOr<v2::CollectivePolicy> resolve_collective_policy(
const ExecutionTopologyContext& execution_topology) {
const bool has_collective_group = execution_topology.collective_load_group.has_value();
if (requested == v2::CollectivePolicy::COLLECTIVE_POLICY_UNSPECIFIED) {
return has_collective_group ? v2::CollectivePolicy::COLLECTIVE_POLICY_REQUIRE_COLLECTIVE
return has_collective_group ? v2::CollectivePolicy::COLLECTIVE_POLICY_COLLECTIVE_FIRST
: v2::CollectivePolicy::COLLECTIVE_POLICY_DISABLE_COLLECTIVE;
}
if (requested == v2::CollectivePolicy::COLLECTIVE_POLICY_DISABLE_COLLECTIVE && has_collective_group) {
Expand Down Expand Up @@ -1367,13 +1484,11 @@ absl::StatusOr<ControllerRealizationPlan> build_controller_realization_plan(
.group_barriers = group_barriers_for(group_realization),
.version_set_id = group_begin_context != nullptr && !group_begin_context->version_set.version_set_id().empty()
? std::optional<std::string>(group_begin_context->version_set.version_set_id())
: std::nullopt,
: requested_version_set_id_for(group_realization),
.transaction_id = group_begin_context != nullptr && !group_begin_context->transaction_id.empty()
? std::optional<std::string>(group_begin_context->transaction_id)
: std::nullopt,
.source_selection_digest = !resolved_selection.selection_hash().empty()
? std::optional<std::string>(absl::BytesToHexString(resolved_selection.selection_hash()))
: std::nullopt,
.source_selection_digest = selection_digest_for(group_realization, group_begin_context, resolved_selection),
};
plan.lifecycle = ControllerRealizationLifecyclePlan{
.capability = target_kind,
Expand Down Expand Up @@ -1439,9 +1554,8 @@ absl::StatusOr<ControllerRealizationPlan> build_controller_realization_plan(cons
.group_barriers = {},
.version_set_id = std::nullopt,
.transaction_id = std::nullopt,
.source_selection_digest =
request.has_initial_selection() && !request.initial_selection().selection_hash().empty()
? std::optional<std::string>(absl::BytesToHexString(request.initial_selection().selection_hash()))
.source_selection_digest = request.has_initial_selection()
? selection_digest_for(nullptr, nullptr, request.initial_selection())
: std::nullopt,
};
const bool daemon_owned = request.ownership() == v2::BindingOwnership::BINDING_OWNERSHIP_DAEMON;
Expand Down Expand Up @@ -1967,9 +2081,7 @@ absl::StatusOr<ControllerRealizationPlan> build_controller_realization_plan(
.group_barriers = group_barriers_for(group_realization),
.version_set_id = std::nullopt,
.transaction_id = std::nullopt,
.source_selection_digest = !scope.selection().selection_hash().empty()
? std::optional<std::string>(absl::BytesToHexString(scope.selection().selection_hash()))
: std::nullopt,
.source_selection_digest = selection_digest_for(group_realization, nullptr, scope.selection()),
};
plan.lifecycle = ControllerRealizationLifecyclePlan{
.capability = "publication",
Expand Down
Loading
Loading