Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/actions/docker-build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ runs:
ECR_HOSTNAME: ${{ inputs.aws_account_id }}.dkr.ecr.${{ inputs.aws_default_region }}.amazonaws.com
run: |
sudo apt-get update && sudo apt-get install -y git build-essential protobuf-compiler libclang-dev
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable
curl --retry 5 --retry-delay 2 --fail --show-error -sSL -o /tmp/rustup-init.sh https://sh.rustup.rs
sh /tmp/rustup-init.sh -y --default-toolchain stable
rm -f /tmp/rustup-init.sh
. "$HOME/.cargo/env"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
cargo install cbindgen
Expand Down
10 changes: 7 additions & 3 deletions .github/actions/install-vcluster-cli/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ runs:
aarch64) VCLUSTER_ARCH="arm64" ;;
*) echo "Unsupported architecture: ${ARCH}"; exit 1 ;;
esac
curl -sL -o /tmp/vcluster \
TMP_BIN="$(mktemp -p /tmp vcluster.XXXXXX)"
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
-o "${TMP_BIN}" \
"https://github.com/loft-sh/vcluster/releases/download/${{ inputs.vcluster_version }}/vcluster-linux-${VCLUSTER_ARCH}"
sudo mv /tmp/vcluster /usr/local/bin/vcluster
sudo chmod +x /usr/local/bin/vcluster
sudo install -m 0755 "${TMP_BIN}" /usr/local/bin/vcluster
rm -f "${TMP_BIN}"
vcluster version
fi
echo "::endgroup::"
2 changes: 1 addition & 1 deletion .github/workflows/nightly-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pre-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
10 changes: 8 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down Expand Up @@ -440,7 +443,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down
6 changes: 4 additions & 2 deletions components/src/dynamo/planner/config/planner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PlannerConfig(BaseModel):
"kubernetes", "virtual", "global-planner"
] = SLAPlannerDefaults.environment
namespace: str = Field(
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo")
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo"),
exclude=True,
)
backend: Literal["vllm", "sglang", "trtllm", "mocker"] = SLAPlannerDefaults.backend
mode: Literal["disagg", "prefill", "decode", "agg"] = SLAPlannerDefaults.mode
Expand Down Expand Up @@ -109,7 +110,8 @@ class PlannerConfig(BaseModel):
default_factory=lambda: os.environ.get(
"PROMETHEUS_ENDPOINT",
"http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090",
)
),
exclude=True,
)
metric_reporting_prometheus_port: int = Field(
default_factory=lambda: int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
Expand Down
50 changes: 50 additions & 0 deletions components/src/dynamo/sglang/request_handlers/handler_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
)

import sglang as sgl
from sglang.srt.managers.io_struct import (
DestroyWeightsUpdateGroupReqInput,
InitWeightsUpdateGroupReqInput,
)

from dynamo._core import Context
from dynamo.common.utils.input_params import InputParamManager
Expand Down Expand Up @@ -733,6 +737,12 @@ def _priority_kwargs(self, priority: Any) -> Dict[str, Any]:
return {"priority": normalized}
return {}

def _weight_update_unsupported_response(self) -> dict:
return {
"success": False,
"message": "weight update control not supported on this worker",
}

async def release_memory_occupation(self, body: dict) -> dict:
"""Release GPU memory occupation and unregister from discovery.

Expand Down Expand Up @@ -857,6 +867,30 @@ async def update_weights_from_disk(self, body: dict) -> dict:
"num_paused_requests": num_paused_requests,
}

async def init_weights_update_group(self, body: dict) -> dict:
"""Initialize distributed weight-update NCCL group on the worker."""
if self.engine is None:
return self._weight_update_unsupported_response()

req = InitWeightsUpdateGroupReqInput(**body)
(
success,
message,
) = await self.engine.tokenizer_manager.init_weights_update_group(req, None)
return {"success": success, "message": message}

async def destroy_weights_update_group(self, body: dict) -> dict:
"""Destroy distributed weight-update NCCL group on the worker."""
if self.engine is None:
return self._weight_update_unsupported_response()

req = DestroyWeightsUpdateGroupReqInput(**body)
(
success,
message,
) = await self.engine.tokenizer_manager.destroy_weights_update_group(req, None)
return {"success": success, "message": message}

async def update_weights_from_tensor(self, body: dict) -> dict:
"""Update model weights from tensors without restarting the server."""
from sglang.srt.managers.io_struct import UpdateWeightsFromTensorReqInput
Expand Down Expand Up @@ -980,6 +1014,15 @@ async def session_control(self, request, context=None):
result = {"status": "error", "message": f"Unknown action: {action}"}
yield result

async def get_weight_version(self, body: dict) -> dict:
"""Return the active weight version currently served by the worker."""
_ = body
if self.engine is None:
return self._weight_update_unsupported_response()
return {
"weight_version": self.engine.tokenizer_manager.server_args.weight_version
}

def register_engine_routes(self, runtime: DistributedRuntime) -> None:
"""Register all engine routes for this handler.

Expand All @@ -994,6 +1037,12 @@ def register_engine_routes(self, runtime: DistributedRuntime) -> None:
runtime.register_engine_route(
"resume_memory_occupation", self.resume_memory_occupation
)
runtime.register_engine_route(
"init_weights_update_group", self.init_weights_update_group
)
runtime.register_engine_route(
"destroy_weights_update_group", self.destroy_weights_update_group
)
runtime.register_engine_route(
"update_weights_from_disk", self.update_weights_from_disk
)
Expand All @@ -1009,6 +1058,7 @@ def register_engine_routes(self, runtime: DistributedRuntime) -> None:
runtime.register_engine_route(
"update_weight_version", self.update_weight_version
)
runtime.register_engine_route("get_weight_version", self.get_weight_version)
if getattr(self.config, "dynamo_args", None) and getattr(
self.config.dynamo_args, "enable_rl", False
):
Expand Down
72 changes: 72 additions & 0 deletions components/src/dynamo/sglang/tests/test_sglang_handler_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import types

import pytest

from dynamo.sglang.request_handlers.handler_base import BaseWorkerHandler

pytestmark = [
pytest.mark.unit,
pytest.mark.sglang,
pytest.mark.gpu_0,
pytest.mark.pre_merge,
]


class _DummyRuntime:
def __init__(self):
self.routes = {}

def register_engine_route(self, name, handler):
self.routes[name] = handler


class _DummyWorkerHandler(BaseWorkerHandler):
async def generate(self, request, context):
if False:
yield request, context


def test_register_engine_routes_includes_weight_update_routes():
handler = _DummyWorkerHandler.__new__(_DummyWorkerHandler)
runtime = _DummyRuntime()

handler.register_engine_routes(runtime)

assert "init_weights_update_group" in runtime.routes
assert "destroy_weights_update_group" in runtime.routes
assert "get_weight_version" in runtime.routes


@pytest.mark.asyncio
async def test_get_weight_version_reads_active_version_from_server_args():
handler = _DummyWorkerHandler.__new__(_DummyWorkerHandler)
handler.engine = types.SimpleNamespace(
tokenizer_manager=types.SimpleNamespace(
server_args=types.SimpleNamespace(weight_version=17)
)
)

result = await handler.get_weight_version({})

assert result == {"weight_version": 17}


@pytest.mark.asyncio
async def test_weight_update_routes_return_unsupported_without_engine():
handler = _DummyWorkerHandler.__new__(_DummyWorkerHandler)
handler.engine = None

init_result = await handler.init_weights_update_group({})
destroy_result = await handler.destroy_weights_update_group({})
version_result = await handler.get_weight_version({})

expected = {
"success": False,
"message": "weight update control not supported on this worker",
}
assert init_result == expected
assert destroy_result == expected
assert version_result == expected
2 changes: 2 additions & 0 deletions deploy/operator/internal/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
DynamoNixlPort = 19090
DynamoNixlPortName = "nixl"

DynamoFPMBasePort = 20380

MpiRunSshPort = 2222

// Default security context values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_FORWARDPASS_METRIC_PORT", Value: "20380"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
Expand Down Expand Up @@ -916,6 +917,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_FORWARDPASS_METRIC_PORT", Value: "20380"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,12 @@ func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.
Value: string(dgdr.UID),
},
}
if r.Config.Infrastructure.PrometheusEndpoint != "" {
profilerEnv = append(profilerEnv, corev1.EnvVar{
Name: "PROMETHEUS_ENDPOINT",
Value: r.Config.Infrastructure.PrometheusEndpoint,
})
}

// Build volume mounts
volumeMounts := []corev1.VolumeMount{
Expand Down
4 changes: 4 additions & 0 deletions deploy/operator/internal/dynamo/component_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (w *WorkerDefaults) GetBaseContainer(context ComponentContext) (corev1.Cont
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: fmt.Sprintf("%d", commonconsts.DynamoNixlPort),
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: fmt.Sprintf("%d", commonconsts.DynamoFPMBasePort),
},
}...)

if context.WorkerHashSuffix != "" {
Expand Down
17 changes: 17 additions & 0 deletions deploy/operator/internal/dynamo/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2160,6 +2160,10 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: "19090",
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: "20380",
},
{
Name: "DYN_PARENT_DGD_K8S_NAME",
Value: "test-dynamo-graph-deployment",
Expand Down Expand Up @@ -2374,6 +2378,10 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: "19090",
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: "20380",
},
{
Name: "DYN_PARENT_DGD_K8S_NAME",
Value: "test-dynamo-graph-deployment",
Expand Down Expand Up @@ -3187,6 +3195,10 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: "19090",
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: "20380",
},
{
Name: "DYN_PARENT_DGD_K8S_NAME",
Value: "test-dynamo-graph-deployment",
Expand Down Expand Up @@ -3388,6 +3400,10 @@ func TestGenerateGrovePodCliqueSet(t *testing.T) {
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: "19090",
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: "20380",
},
{
Name: "DYN_PARENT_DGD_K8S_NAME",
Value: "test-dynamo-graph-deployment",
Expand Down Expand Up @@ -5635,6 +5651,7 @@ func TestGenerateBasePodSpec_Worker(t *testing.T) {
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoComponentEnvVar, Value: "worker"},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_FORWARDPASS_METRIC_PORT", Value: "20380"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-deployment"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-deployment"},
Expand Down
1 change: 1 addition & 0 deletions recipes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ These recipes are under active development and may require additional setup step
| Model | Framework | Mode | GPUs | Deployment | Notes |
|-------|-----------|------|------|------------|-------|
| **[GLM-5-NVFP4](glm-5-nvfp4/sglang/disagg/)** | SGLang | Disagg Prefill/Decode | 20x GB200 | ✅ | NVFP4, EAGLE speculative decoding, TP16 decode + TP4 prefill. Requires [custom container build](glm-5-nvfp4/). |
| **[Nemotron-3-Nano-Omni-NVFP4](nemotron-3-nano-omni/vllm/agg/)** | vLLM | Aggregated | 1x GPU | ✅ | Multimodal text/image/video/audio serving. Requires [custom container build](nemotron-3-nano-omni/). |
| **[nvidia/Kimi-K2.5-NVFP4](kimi-k2.5/trtllm/agg/nvidia/)** | TensorRT-LLM | Aggregated | 8x B200 | ✅ | Text only — MoE model, TP8×EP8, reasoning + tool calling. Vision input not yet functional. |
| **[DeepSeek-V4-Flash](deepseek-v4/deepseek-v4-flash/vllm/agg/)** | vLLM | Aggregated | 4x B200 | ✅ | Text only — MoE model (284B / 13B active), DP=4 + EP, FP8 KV cache, reasoning + tool calling. Requires [custom container build](deepseek-v4/container/). |
| **[DeepSeek-V4-Flash](deepseek-v4/deepseek-v4-flash/sglang/agg/)** | SGLang | Aggregated | 4x B200 | ✅ | Text only — MoE model (284B / 13B active), TP=4, MXFP4 MoE via FlashInfer, EAGLE MTP (3 steps / 4 draft tokens), reasoning + tool calling. Prebuilt image available; optional [custom container build](deepseek-v4/container/). |
Expand Down
Loading
Loading