Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/actions/docker-build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ runs:
ECR_HOSTNAME: ${{ inputs.aws_account_id }}.dkr.ecr.${{ inputs.aws_default_region }}.amazonaws.com
run: |
sudo apt-get update && sudo apt-get install -y git build-essential protobuf-compiler libclang-dev
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable
curl --retry 5 --retry-delay 2 --fail --show-error -sSL -o /tmp/rustup-init.sh https://sh.rustup.rs
sh /tmp/rustup-init.sh -y --default-toolchain stable
rm -f /tmp/rustup-init.sh
. "$HOME/.cargo/env"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
cargo install cbindgen
Expand Down
10 changes: 7 additions & 3 deletions .github/actions/install-vcluster-cli/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ runs:
aarch64) VCLUSTER_ARCH="arm64" ;;
*) echo "Unsupported architecture: ${ARCH}"; exit 1 ;;
esac
curl -sL -o /tmp/vcluster \
TMP_BIN="$(mktemp -p /tmp vcluster.XXXXXX)"
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
-o "${TMP_BIN}" \
"https://github.com/loft-sh/vcluster/releases/download/${{ inputs.vcluster_version }}/vcluster-linux-${VCLUSTER_ARCH}"
sudo mv /tmp/vcluster /usr/local/bin/vcluster
sudo chmod +x /usr/local/bin/vcluster
sudo install -m 0755 "${TMP_BIN}" /usr/local/bin/vcluster
rm -f "${TMP_BIN}"
vcluster version
fi
echo "::endgroup::"
2 changes: 1 addition & 1 deletion .github/workflows/nightly-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pre-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
10 changes: 8 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down Expand Up @@ -440,7 +443,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down
10 changes: 9 additions & 1 deletion components/src/dynamo/frontend/sglang_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
preprocess_chat_request,
)
from .utils import (
FrontendRoundRobinRouter,
PreprocessError,
extract_mm_urls,
handle_engine_error,
Expand Down Expand Up @@ -624,9 +625,16 @@ async def chat_engine_factory(
kv_router_config=self.router_config.kv_router_config,
)
else:
router = await generate_endpoint.client(
client = await generate_endpoint.client(
router_mode=self.router_config.router_mode
)
if self.router_config.router_mode == RouterMode.RoundRobin:
router = FrontendRoundRobinRouter(
client,
f"{namespace_name}.{component_name}.{endpoint_name}",
)
else:
router = client

preprocess_pool = None
preprocess_workers = self.config.preprocess_workers
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

import pytest

from dynamo.frontend.utils import FrontendRoundRobinRouter

pytestmark = [pytest.mark.unit, pytest.mark.gpu_0, pytest.mark.pre_merge]


class _FakeClient:
def __init__(self, instance_sequences):
self._instance_sequences = list(instance_sequences)
self._instance_idx = 0
self.direct_calls = []

def instance_ids(self):
if self._instance_idx < len(self._instance_sequences):
ids = self._instance_sequences[self._instance_idx]
self._instance_idx += 1
return ids
return self._instance_sequences[-1]

async def wait_for_instances(self):
return [9, 10]

async def direct(self, request, instance_id, annotated=True):
self.direct_calls.append((request, instance_id, annotated))
return {
"instance": str(instance_id),
"request": request,
"annotated": annotated,
}


@pytest.mark.asyncio
async def test_frontend_round_robin_router_balances_sorted_instance_ids():
client = _FakeClient([[20, 10, 30], [20, 10, 30], [20, 10, 30], [20, 10, 30]])
router = FrontendRoundRobinRouter(client, "dynamo.backend.generate")

results = []
for idx in range(4):
results.append(await router.generate({"seq": idx}, annotated=False))

assert [item["instance"] for item in results] == ["10", "20", "30", "10"]
assert [call[2] for call in client.direct_calls] == [False, False, False, False]


@pytest.mark.asyncio
async def test_frontend_round_robin_router_refreshes_membership_each_request():
client = _FakeClient([[2, 1], [3, 2, 1], [3, 2, 1]])
router = FrontendRoundRobinRouter(client, "dynamo.backend.generate")

first = await router.generate({"seq": 0}, annotated=False)
second = await router.generate({"seq": 1}, annotated=False)
third = await router.generate({"seq": 2}, annotated=False)

assert first["instance"] == "1"
assert second["instance"] == "2"
assert third["instance"] == "3"


@pytest.mark.asyncio
async def test_frontend_round_robin_router_waits_for_instances_when_empty():
client = _FakeClient([[]])
router = FrontendRoundRobinRouter(client, "dynamo.backend.generate")

result = await router.generate({"seq": 0}, annotated=False)

assert result["instance"] == "9"


@pytest.mark.asyncio
async def test_frontend_round_robin_router_raises_when_no_instances_ever_appear():
client = _FakeClient([[]])
client.wait_for_instances = _empty_instances
router = FrontendRoundRobinRouter(client, "dynamo.backend.generate")

with pytest.raises(RuntimeError, match="No active backend instances available"):
await router.generate({"seq": 0}, annotated=False)


@pytest.mark.asyncio
async def test_frontend_round_robin_router_rejects_unexpected_kwargs():
client = _FakeClient([[1]])
router = FrontendRoundRobinRouter(client, "dynamo.backend.generate")

with pytest.raises(TypeError, match="Unsupported kwargs"):
await router.generate({"seq": 0}, annotated=False, foo=1)


async def _empty_instances():
return []
58 changes: 58 additions & 0 deletions components/src/dynamo/frontend/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@

"""Shared utilities for frontend chat processors (vLLM, SGLang)."""

import asyncio
import logging
import os
import uuid
from typing import Any

_MASK_64_BITS = (1 << 64) - 1
logger = logging.getLogger(__name__)


def random_uuid() -> str:
Expand All @@ -33,6 +36,61 @@ def __init__(self, error_dict: dict[str, Any]):
super().__init__(str(error_dict))


class FrontendRoundRobinRouter:
"""Frontend-managed round-robin over the current runtime client membership.

This avoids sticky routing behavior in the opaque runtime round-robin client by
selecting an instance in Python and sending the request via ``Client.direct``.
"""

def __init__(self, client: Any, endpoint_name: str):
self._client = client
self._endpoint_name = endpoint_name
self._cursor = 0
self._lock = asyncio.Lock()
self._debug = os.getenv("DYN_FRONTEND_ROUTING_DEBUG", "").lower() in {
"1",
"true",
"yes",
"on",
}

async def generate(self, request: dict[str, Any], **kwargs: Any):
annotated = kwargs.pop("annotated", None)
if kwargs:
raise TypeError(
f"Unsupported kwargs for frontend round-robin router: {sorted(kwargs)}"
)

instance_ids = list(self._client.instance_ids())
if not instance_ids:
instance_ids = list(await self._client.wait_for_instances())
if not instance_ids:
raise RuntimeError(
f"No active backend instances available for {self._endpoint_name}"
)

instance_ids = sorted(instance_ids)
async with self._lock:
instance_id = instance_ids[self._cursor % len(instance_ids)]
self._cursor += 1

if self._debug:
logger.info(
"Frontend routing selected endpoint=%s instance=%s instances=%s annotated=%s",
self._endpoint_name,
instance_id,
instance_ids,
annotated,
)

return await self._client.direct(
request,
instance_id=instance_id,
annotated=annotated,
)


# Content part types that carry media URLs, mapped to the key used in the
# multimodal data dict sent to the backend handler.
_MEDIA_CONTENT_TYPES = ("image_url", "audio_url", "video_url")
Expand Down
10 changes: 9 additions & 1 deletion components/src/dynamo/frontend/vllm_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

from .prepost import StreamingPostProcessor, preprocess_chat_request
from .utils import (
FrontendRoundRobinRouter,
extract_mm_urls,
handle_engine_error,
make_internal_error,
Expand Down Expand Up @@ -800,9 +801,16 @@ async def chat_engine_factory(
kv_router_config=self.router_config.kv_router_config,
)
else:
router = await generate_endpoint.client(
client = await generate_endpoint.client(
router_mode=self.router_config.router_mode
)
if self.router_config.router_mode == RouterMode.RoundRobin:
router = FrontendRoundRobinRouter(
client,
f"{namespace_name}.{component_name}.{endpoint_name}",
)
else:
router = client

block_size = self.config.kv_cache_block_size or 16

Expand Down
6 changes: 4 additions & 2 deletions components/src/dynamo/planner/config/planner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PlannerConfig(BaseModel):
"kubernetes", "virtual", "global-planner"
] = SLAPlannerDefaults.environment
namespace: str = Field(
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo")
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo"),
exclude=True,
)
backend: Literal["vllm", "sglang", "trtllm", "mocker"] = SLAPlannerDefaults.backend
mode: Literal["disagg", "prefill", "decode", "agg"] = SLAPlannerDefaults.mode
Expand Down Expand Up @@ -109,7 +110,8 @@ class PlannerConfig(BaseModel):
default_factory=lambda: os.environ.get(
"PROMETHEUS_ENDPOINT",
"http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090",
)
),
exclude=True,
)
metric_reporting_prometheus_port: int = Field(
default_factory=lambda: int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
Expand Down
2 changes: 2 additions & 0 deletions deploy/operator/internal/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
DynamoNixlPort = 19090
DynamoNixlPortName = "nixl"

DynamoFPMBasePort = 20380

MpiRunSshPort = 2222

// Default security context values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_FORWARDPASS_METRIC_PORT", Value: "20380"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
Expand Down Expand Up @@ -916,6 +917,7 @@ func TestDynamoComponentDeploymentReconciler_generateLeaderWorkerSet(t *testing.
{Name: "CONTAINER_NAME", Value: commonconsts.MainContainerName},
{Name: commonconsts.DynamoComponentEnvVar, Value: commonconsts.ComponentTypeWorker},
{Name: commonconsts.DynamoDiscoveryBackendEnvVar, Value: "kubernetes"},
{Name: "DYN_FORWARDPASS_METRIC_PORT", Value: "20380"},
{Name: "DYN_HEALTH_CHECK_ENABLED", Value: "false"},
{Name: commonconsts.DynamoNamespaceEnvVar, Value: "default-test-lws-deploy"},
{Name: "DYN_PARENT_DGD_K8S_NAME", Value: "test-lws-deploy"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,6 +1307,12 @@ func (r *DynamoGraphDeploymentRequestReconciler) createProfilingJob(ctx context.
Value: string(dgdr.UID),
},
}
if r.Config.Infrastructure.PrometheusEndpoint != "" {
profilerEnv = append(profilerEnv, corev1.EnvVar{
Name: "PROMETHEUS_ENDPOINT",
Value: r.Config.Infrastructure.PrometheusEndpoint,
})
}

// Build volume mounts
volumeMounts := []corev1.VolumeMount{
Expand Down
4 changes: 4 additions & 0 deletions deploy/operator/internal/dynamo/component_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (w *WorkerDefaults) GetBaseContainer(context ComponentContext) (corev1.Cont
Name: "NIXL_TELEMETRY_PROMETHEUS_PORT",
Value: fmt.Sprintf("%d", commonconsts.DynamoNixlPort),
},
{
Name: "DYN_FORWARDPASS_METRIC_PORT",
Value: fmt.Sprintf("%d", commonconsts.DynamoFPMBasePort),
},
}...)

if context.WorkerHashSuffix != "" {
Expand Down
Loading
Loading