Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions src/winml/modelkit/commands/perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,16 @@ def _resolve_device_ep(self) -> None:
self._resolved_device = resolved_device
self._resolved_ep = resolved_ep

@property
def resolved_device(self) -> str | None:
"""Concrete device driving the build/inference (``None`` until resolved)."""
return self._resolved_device

@property
def resolved_ep(self) -> EPNameOrAlias | None:
"""Concrete EP driving the build/inference (``None`` until resolved)."""
return self._resolved_ep

@property
def _is_composite(self) -> bool:
"""Composite models orchestrate multiple sub-sessions (e.g. CLIP/SigLIP).
Expand Down Expand Up @@ -1810,11 +1820,17 @@ def perf(
# Op-tracing (additive to existing benchmark)
# =================================================================
if op_tracing:
from ..optracing import is_qnn_profiling_available
from ..optracing import is_profiling_available

if not is_qnn_profiling_available():
console.print("[red]Error:[/red] Op-tracing requires onnxruntime-qnn")
console.print("Install with: [bold]pip install onnxruntime-qnn[/bold]")
if not is_profiling_available(
benchmark.resolved_ep, benchmark.resolved_device, op_tracing
):
console.print(
"[red]Error:[/red] Op-tracing is only supported for the QNN EP "
"on NPU at the 'basic' level "
f"(resolved EP={benchmark.resolved_ep}, "
f"device={benchmark.resolved_device}, level={op_tracing})."
)
raise SystemExit(1)

from ..optracing import (
Expand Down Expand Up @@ -1860,10 +1876,9 @@ def perf(
# Display and save
display_op_trace_report(trace_result, console)

model_slug = hf_model.replace("/", "_").replace("\\", "_")
if is_onnx:
model_slug = model_path.stem
trace_output = output_dir / f"{model_slug}_op_trace.json"
# Mirror the benchmark report path so the two files sit side by side:
# a/b.json -> a/b_op_trace.json.
trace_output = output.with_name(f"{output.stem}_op_trace{output.suffix}")
write_op_trace_json(trace_result, trace_output)
console.print(f"[green]Op-trace saved to:[/green] {trace_output}")

Expand Down
45 changes: 37 additions & 8 deletions src/winml/modelkit/optracing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,49 @@

from __future__ import annotations

from typing import TYPE_CHECKING

from .base import OpTracer
from .registry import get_tracer, register_tracer
from .report import display_op_trace_report, write_op_trace_json
from .result import OperatorMetrics, OpTraceResult


def is_qnn_profiling_available() -> bool:
"""Check if QNN EP is available for op-tracing."""
try:
import onnxruntime as ort
if TYPE_CHECKING:
from ..utils.constants import EPNameOrAlias

# The single EP / device / tracing-level combination op-tracing currently
# supports. Expanded as more tracers land.
_SUPPORTED_EP = "QNNExecutionProvider"
_SUPPORTED_DEVICE = "npu"
_SUPPORTED_LEVEL = "basic"


def is_profiling_available(
resolved_ep: EPNameOrAlias | None,
resolved_device: str | None,
op_tracing: str | None,
) -> bool:
"""Check whether op-tracing is supported for a resolved EP/device/level.

Op-tracing is currently limited to the QNN EP on NPU at the ``"basic"``
level; every other combination is unsupported.

Args:
resolved_ep: Concrete EP the benchmark resolved to (full name or alias).
resolved_device: Concrete device the benchmark resolved to (e.g. ``"npu"``).
op_tracing: Requested tracing level (e.g. ``"basic"``), or ``None``.

Returns:
``True`` only for the QNN + NPU + ``"basic"`` combination.
"""
from ..utils.constants import normalize_ep_name

return "QNNExecutionProvider" in ort.get_available_providers()
except (ImportError, AttributeError):
return False
return (
normalize_ep_name(resolved_ep) == _SUPPORTED_EP
and (resolved_device or "").lower() == _SUPPORTED_DEVICE
and (op_tracing or "").lower() == _SUPPORTED_LEVEL
)


__all__ = [
Expand All @@ -28,7 +57,7 @@ def is_qnn_profiling_available() -> bool:
"OperatorMetrics",
"display_op_trace_report",
"get_tracer",
"is_qnn_profiling_available",
"is_profiling_available",
"register_tracer",
"write_op_trace_json",
]
49 changes: 19 additions & 30 deletions src/winml/modelkit/optracing/qnn/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
(detail) post-processing.
5. Return a structured ``OpTraceResult``.
"""

from __future__ import annotations

import contextlib
Expand All @@ -23,6 +24,7 @@

import numpy as np

from ...winml import add_ep_for_device
from ..base import OpTracer
from ..result import OperatorMetrics, OpTraceResult
from .csv_parser import parse_qnn_profiling_csv
Expand Down Expand Up @@ -125,14 +127,16 @@ def run(self, iterations: int = 5, warmup: int = 2) -> OpTraceResult:
csv_path = self.output_dir / "profiling_output.csv"
options = self._build_session_options(ort)
provider_options = self._build_provider_options(csv_path)
if not add_ep_for_device(
options, "QNNExecutionProvider", ort.OrtHardwareDeviceType.NPU, provider_options
):
raise RuntimeError("Failed to add QNNExecutionProvider for NPU device.")

# CWD must be output_dir so schematic.bin lands there.
with _working_directory(self.output_dir):
session = ort.InferenceSession(
str(self.onnx_path),
sess_options=options,
providers=["QNNExecutionProvider"],
provider_options=provider_options,
)

inputs = self._generate_inputs(session)
Expand All @@ -158,33 +162,26 @@ def run(self, iterations: int = 5, warmup: int = 2) -> OpTraceResult:
def _build_session_options(self, ort_module: Any) -> Any:
"""Create ``ort.SessionOptions`` with profiling config entries."""
options = ort_module.SessionOptions()
options.add_session_config_entry(
"session.disable_cpu_ep_fallback", "1"
)
options.add_session_config_entry("session.disable_cpu_ep_fallback", "1")
options.add_session_config_entry("ep.context_enable", "1")
options.add_session_config_entry("ep.context_embed_mode", "0")
return options

def _build_provider_options(
self, csv_path: Path
) -> list[dict[str, str]]:
def _build_provider_options(self, csv_path: Path) -> dict[str, str]:
"""Build QNN EP provider options dict.

- ``basic`` mode uses ``profiling_level=detailed`` (per-op cycles).
- ``detail`` mode uses ``profiling_level=optrace`` (full QHAS).
"""
profiling_level = "optrace" if self.level == "detail" else "detailed"

return [
{
"backend_path": "QnnHtp.dll",
"htp_performance_mode": "high_performance",
"htp_graph_finalization_optimization_mode": "3",
"enable_htp_fp16_precision": "1",
"profiling_level": profiling_level,
"profiling_file_path": str(csv_path),
}
]
return {
"htp_performance_mode": "high_performance",
"htp_graph_finalization_optimization_mode": "3",
"enable_htp_fp16_precision": "1",
"profiling_level": profiling_level,
"profiling_file_path": str(csv_path),
}

# ------------------------------------------------------------------
# Input generation
Expand All @@ -204,9 +201,7 @@ def _generate_inputs(session: Any) -> dict[str, np.ndarray]:
# Result collection
# ------------------------------------------------------------------

def _collect_results(
self, csv_path: Path, iterations: int
) -> OpTraceResult:
def _collect_results(self, csv_path: Path, iterations: int) -> OpTraceResult:
"""Parse profiling artifacts into an ``OpTraceResult``."""
artifacts: dict[str, str] = {}
qnn_log = Path(str(csv_path) + "_qnn.log")
Expand Down Expand Up @@ -260,15 +255,11 @@ def _try_qhas(
import json as _json

if schematic is None or not schematic.is_file():
logger.info(
"No schematic found; falling back to CSV for detail mode"
)
logger.info("No schematic found; falling back to CSV for detail mode")
return None

qhas_output = self.output_dir / "qhas_output.json"
result_path = run_qhas_viewer(
qnn_log, schematic, qhas_output, sdk_root=find_qnn_sdk()
)
result_path = run_qhas_viewer(qnn_log, schematic, qhas_output, sdk_root=find_qnn_sdk())

if result_path is None or not result_path.is_file():
logger.info("QHAS viewer did not produce output; falling back")
Expand Down Expand Up @@ -329,9 +320,7 @@ def _from_csv(
op_path=op["name"],
op_id=op["op_id"],
duration_us=op["cycles"] * cycle_to_us,
percent_of_total=(
op["cycles"] / total_cycles * 100 if total_cycles > 0 else 0
),
percent_of_total=(op["cycles"] / total_cycles * 100 if total_cycles > 0 else 0),
)
for op in parsed["operators"]
]
Expand Down
42 changes: 38 additions & 4 deletions tests/unit/optracing/test_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,45 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------
"""Test QNN EP detection for op-tracing."""
"""Test op-tracing support detection for the resolved EP/device/level."""

from winml.modelkit.optracing import is_qnn_profiling_available
import pytest

from winml.modelkit.optracing import is_profiling_available

def test_is_qnn_profiling_available_returns_bool():
result = is_qnn_profiling_available()

def test_is_profiling_available_returns_bool():
result = is_profiling_available("QNNExecutionProvider", "npu", "basic")
assert isinstance(result, bool)


def test_supported_combination():
assert is_profiling_available("QNNExecutionProvider", "npu", "basic") is True


def test_qnn_alias_is_normalized():
# The benchmark may carry the user's EP alias verbatim; it must still match.
assert is_profiling_available("qnn", "npu", "basic") is True


def test_device_is_case_insensitive():
assert is_profiling_available("QNNExecutionProvider", "NPU", "basic") is True


def test_level_is_case_insensitive():
assert is_profiling_available("QNNExecutionProvider", "npu", "BASIC") is True


@pytest.mark.parametrize(
("ep", "device", "level"),
[
("CPUExecutionProvider", "npu", "basic"), # wrong EP
("QNNExecutionProvider", "gpu", "basic"), # wrong device
("QNNExecutionProvider", "npu", "detail"), # unsupported level
(None, "npu", "basic"), # no EP
("QNNExecutionProvider", None, "basic"), # no device
("QNNExecutionProvider", "npu", None), # no level
],
)
def test_unsupported_combinations(ep, device, level):
assert is_profiling_available(ep, device, level) is False
17 changes: 8 additions & 9 deletions tests/unit/optracing/test_qnn_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ def test_qnn_profiler_creates_session_options():


def test_qnn_profiler_provider_options_basic():
"""Verify provider options for basic mode (profiling_level=detailed)."""
"""Verify provider options for basic mode (profiling_level=detailed).

The QNN backend/device is now selected by ``add_ep_for_device``, so the
options dict no longer carries ``backend_path``.
"""
profiler = QNNProfiler(Path("model.onnx"), output_dir=Path("out"), level="basic")
opts = profiler._build_provider_options(Path("out/profiling.csv"))
po = profiler._build_provider_options(Path("out/profiling.csv"))

assert len(opts) == 1
po = opts[0]
assert po["backend_path"] == "QnnHtp.dll"
assert po["htp_performance_mode"] == "high_performance"
assert po["htp_graph_finalization_optimization_mode"] == "3"
assert po["enable_htp_fp16_precision"] == "1"
Expand All @@ -69,11 +70,9 @@ def test_qnn_profiler_provider_options_basic():
def test_qnn_profiler_provider_options_detail():
"""Verify provider options for detail mode (profiling_level=optrace)."""
profiler = QNNProfiler(Path("model.onnx"), output_dir=Path("out"), level="detail")
opts = profiler._build_provider_options(Path("out/profiling.csv"))
po = profiler._build_provider_options(Path("out/profiling.csv"))

po = opts[0]
assert po["profiling_level"] == "optrace"
assert po["backend_path"] == "QnnHtp.dll"


# =====================================================================
Expand Down Expand Up @@ -182,7 +181,7 @@ def write_csv_on_del():
# Verify session creation was called correctly via builder methods.
profiler._build_session_options(mock_ort)
po = profiler._build_provider_options(output_dir / "profiling_output.csv")
assert po[0]["profiling_level"] == "detailed"
assert po["profiling_level"] == "detailed"

# Now test the CSV parsing path directly.
result = profiler._from_csv(
Expand Down
Loading