Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/actions/docker-build/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ runs:
ECR_HOSTNAME: ${{ inputs.aws_account_id }}.dkr.ecr.${{ inputs.aws_default_region }}.amazonaws.com
run: |
sudo apt-get update && sudo apt-get install -y git build-essential protobuf-compiler libclang-dev
curl https://sh.rustup.rs -sSf | sh -s -- -y --default-toolchain stable
curl --retry 5 --retry-delay 2 --fail --show-error -sSL -o /tmp/rustup-init.sh https://sh.rustup.rs
sh /tmp/rustup-init.sh -y --default-toolchain stable
rm -f /tmp/rustup-init.sh
. "$HOME/.cargo/env"
echo "$HOME/.cargo/bin" >> "$GITHUB_PATH"
cargo install cbindgen
Expand Down
10 changes: 7 additions & 3 deletions .github/actions/install-vcluster-cli/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ runs:
aarch64) VCLUSTER_ARCH="arm64" ;;
*) echo "Unsupported architecture: ${ARCH}"; exit 1 ;;
esac
curl -sL -o /tmp/vcluster \
TMP_BIN="$(mktemp -p /tmp vcluster.XXXXXX)"
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
-o "${TMP_BIN}" \
"https://github.com/loft-sh/vcluster/releases/download/${{ inputs.vcluster_version }}/vcluster-linux-${VCLUSTER_ARCH}"
sudo mv /tmp/vcluster /usr/local/bin/vcluster
sudo chmod +x /usr/local/bin/vcluster
sudo install -m 0755 "${TMP_BIN}" /usr/local/bin/vcluster
rm -f "${TMP_BIN}"
vcluster version
fi
echo "::endgroup::"
2 changes: 1 addition & 1 deletion .github/workflows/nightly-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pre-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
PROTOC_VER="30.2"
PROTOC_ZIP="protoc-${PROTOC_VER}-linux-x86_64.zip"
PROTOC_SHA256="327e9397c6fb3ea2a542513a3221334c6f76f7aa524a7d2561142b67b312a01f"
curl -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
curl --retry 5 --retry-delay 2 -fsSLO "$PB_REL/download/v${PROTOC_VER}/${PROTOC_ZIP}"
echo "${PROTOC_SHA256} ${PROTOC_ZIP}" | sha256sum -c -
unzip "${PROTOC_ZIP}" -d $HOME/.local
rm "${PROTOC_ZIP}"
Expand Down
10 changes: 8 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down Expand Up @@ -440,7 +443,10 @@ jobs:
env:
CRANE_VERSION: v0.20.2
run: |
curl -sL "https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
curl --retry 5 --retry-delay 2 \
--connect-timeout 10 --max-time 120 \
--fail --show-error -sL \
"https://github.com/google/go-containerregistry/releases/download/${CRANE_VERSION}/go-containerregistry_Linux_x86_64.tar.gz" \
| tar -xzf - crane
sudo mv crane /usr/local/bin/
crane version
Expand Down
75 changes: 72 additions & 3 deletions components/src/dynamo/frontend/sglang_prepost.py
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,69 @@ def _try_parse_json_array(text: str) -> list | None:
return None


def _decode_single_token(tokenizer, token_id: int) -> str | None:
try:
return tokenizer.decode([token_id], skip_special_tokens=False)
except Exception as exc:
logger.exception(
"Failed to decode token for chat logprobs: token_id=%s tokenizer=%s",
token_id,
type(tokenizer).__name__,
)
raise RuntimeError(
f"Failed to decode token_id={token_id} with tokenizer={type(tokenizer).__name__}"
) from exc


def _build_chat_logprobs(
*,
tokenizer,
token_ids: list[int],
log_probs: list[float] | None,
top_logprobs: list[list[dict[str, Any]]] | None,
) -> dict[str, Any] | None:
if not log_probs:
return None

content: list[dict[str, Any]] = []
for idx, logprob in enumerate(log_probs):
if idx >= len(token_ids):
break

token_id = token_ids[idx]
token_text = _decode_single_token(tokenizer, token_id)
token_top_logprobs = []
if top_logprobs and idx < len(top_logprobs):
for item in top_logprobs[idx]:
token_text_alt = item.get("token")
if token_text_alt is None and item.get("token_id") is not None:
token_text_alt = _decode_single_token(tokenizer, item["token_id"])
token_top_logprobs.append(
{
"token": token_text_alt,
"logprob": item.get("logprob"),
"bytes": (
list(token_text_alt.encode("utf-8"))
if token_text_alt is not None
else None
),
}
)

content.append(
{
"token": token_text,
"logprob": float(logprob),
"bytes": (
list(token_text.encode("utf-8")) if token_text is not None else None
),
"top_logprobs": token_top_logprobs,
}
)

return {"content": content}


class SglangStreamingPostProcessor:
"""Streaming post-processor using SGLang parsers and HF tokenizer detokenization.

Expand Down Expand Up @@ -621,6 +684,12 @@ def process_output(self, engine_response: dict[str, Any]) -> dict[str, Any] | No
raw_ids = engine_response.get("token_ids")
token_ids = raw_ids if isinstance(raw_ids, list) else list(raw_ids or [])
finish_reason = engine_response.get("finish_reason")
logprobs = _build_chat_logprobs(
tokenizer=self.tokenizer,
token_ids=token_ids,
log_probs=engine_response.get("log_probs"),
top_logprobs=engine_response.get("top_logprobs"),
)

delta_text = self._incremental_decode(token_ids) if token_ids else ""

Expand All @@ -630,14 +699,14 @@ def process_output(self, engine_response: dict[str, Any]) -> dict[str, Any] | No
"index": 0,
"delta": {"role": "assistant", "content": delta_text},
"finish_reason": finish_reason,
"logprobs": None,
"logprobs": logprobs,
}
elif finish_reason:
return {
"index": 0,
"delta": {},
"finish_reason": finish_reason,
"logprobs": None,
"logprobs": logprobs,
}
return None

Expand Down Expand Up @@ -854,7 +923,7 @@ def process_output(self, engine_response: dict[str, Any]) -> dict[str, Any] | No
"index": 0,
"delta": delta if has_content else {},
"finish_reason": effective_finish,
"logprobs": None,
"logprobs": logprobs,
}

return None
26 changes: 26 additions & 0 deletions components/src/dynamo/frontend/tests/test_sglang_processor_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,32 @@ def test_fast_path_content_output(self, tokenizer):
assert choice["index"] == 0
assert choice["logprobs"] is None

def test_fast_path_includes_logprobs(self, tokenizer):
"""Fast path maps engine log_probs/top_logprobs into OpenAI chat logprobs."""
post = SglangStreamingPostProcessor(
tokenizer=tokenizer, tool_call_parser=None, reasoning_parser=None
)
token_ids = tokenizer.encode("Hello")
choice = post.process_output(
{
"token_ids": token_ids,
"log_probs": [-0.1] * len(token_ids),
"top_logprobs": [
[{"token": tokenizer.decode([tid]), "logprob": -0.1}]
for tid in token_ids
],
"finish_reason": None,
}
)

assert choice is not None
assert choice["logprobs"] is not None
content = choice["logprobs"]["content"]
assert len(content) == len(token_ids)
assert content[0]["logprob"] == -0.1
assert "top_logprobs" in content[0]
assert "bytes" in content[0]


# ---------------------------------------------------------------------------
# SglangStreamingPostProcessor: reasoning parsing
Expand Down
6 changes: 4 additions & 2 deletions components/src/dynamo/planner/config/planner_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ class PlannerConfig(BaseModel):
"kubernetes", "virtual", "global-planner"
] = SLAPlannerDefaults.environment
namespace: str = Field(
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo")
default_factory=lambda: os.environ.get("DYN_NAMESPACE", "dynamo"),
exclude=True,
)
backend: Literal["vllm", "sglang", "trtllm", "mocker"] = SLAPlannerDefaults.backend
mode: Literal["disagg", "prefill", "decode", "agg"] = SLAPlannerDefaults.mode
Expand Down Expand Up @@ -109,7 +110,8 @@ class PlannerConfig(BaseModel):
default_factory=lambda: os.environ.get(
"PROMETHEUS_ENDPOINT",
"http://prometheus-kube-prometheus-prometheus.monitoring.svc.cluster.local:9090",
)
),
exclude=True,
)
metric_reporting_prometheus_port: int = Field(
default_factory=lambda: int(os.environ.get("PLANNER_PROMETHEUS_PORT", 0))
Expand Down
Loading
Loading