Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 62 additions & 133 deletions src/winml/modelkit/optracing/qnn/csv_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
execute time in cycles/US). NODE SUB-EVENT rows carry per-operator
cycle counts. UNKNOWN SUB-EVENT rows (compile stages) are ignored.

Multiple inference samples are separated by ROOT
"Accelerator (execute) time (cycles)" boundaries.
Multiple inference samples are delimited by the ROOT
"Number of HVX threads used" marker (the first ROOT metric of each
inference); every sample carries its own ROOT metadata.
"""

from __future__ import annotations

import csv
Expand All @@ -35,26 +37,26 @@
_TOKEN_SUFFIX = re.compile(r"_token_\d+(?:_\d+)?")


def parse_qnn_profiling_csv(csv_path: str | Path) -> dict[str, Any]:
"""Parse a QNN basic-mode profiling CSV into a structured dict.
def parse_qnn_profiling_csv(csv_path: str | Path) -> list[dict[str, Any]]:
"""Parse a QNN basic-mode profiling CSV into a list of per-sample records.

Returns one entry per inference sample::

Returns:
-------
dict with keys:
metadata : dict -- hvx_threads, accel_execute_cycles, num_samples
operators : list[dict] -- aggregated ops sorted by cycles desc
samples : list[list[dict]] -- per-sample operator lists
[
{
"metadata": {hvx_threads, accel_execute_cycles, accel_execute_us},
"samples": [{name, op_id, cycles}, ...],
},
...
]

Each sample carries its *own* ROOT metadata so per-operator durations can
be derived against the accelerator cycle counts of the same inference
(the cycle->US factor varies slightly between samples). Operator
aggregation across samples is left to the caller.
"""
rows = _read_csv(csv_path)
metadata = _extract_metadata(rows)
samples = _extract_samples(rows)
metadata["num_samples"] = len(samples)
operators = _aggregate_operators(samples)
return {
"metadata": metadata,
"operators": operators,
"samples": samples,
}
return _extract_samples(rows)


# ---------------------------------------------------------------------------
Expand All @@ -70,103 +72,75 @@ def _read_csv(csv_path: str | Path) -> list[dict[str, str]]:
return list(reader)


def _extract_metadata(rows: list[dict[str, str]]) -> dict[str, Any]:
"""Extract ROOT-level metadata from the CSV rows.
def _extract_samples(rows: list[dict[str, str]]) -> list[dict[str, Any]]:
"""Split the CSV rows into per-sample records.

Captures the *first* occurrence of each metric so the result
reflects the initial inference sample.
A sample begins at the ROOT ``Number of HVX threads used`` marker — the
first ROOT metric QNN emits for each inference — and runs until the next
such marker (or end-of-file). This groups every ROOT metric (HVX threads,
accelerator execute cycles/US) with the NODE rows of the same inference,
so each sample carries its *own* metadata rather than sharing a single
first-occurrence snapshot.

Returns a list of ``{"metadata": {...}, "samples": [op, ...]}`` dicts;
samples that produced no operator rows are dropped.
"""
hvx_threads: int | None = None
accel_execute_cycles: int | None = None
accel_execute_us: int | None = None
samples: list[dict[str, Any]] = []
current: dict[str, Any] | None = None

for row in rows:
event_level = row.get("Event Level", "").strip()
event_id = row.get("Event Identifier", "").strip()
message = row.get("Message", "").strip()
time_val = row.get("Time", "").strip()
unit = row.get("Unit of Measurement", "").strip()

if event_level != "ROOT":
# Sample boundary: a new HVX-threads marker starts a fresh sample.
if event_level == "ROOT" and event_id == "Number of HVX threads used" and unit == "COUNT":
if current is not None and current["samples"]:
samples.append(current)
current = {
"metadata": {
"hvx_threads": int(time_val),
"accel_execute_cycles": 0,
"accel_execute_us": 0,
},
"samples": [],
}
continue

if (
event_id == "Number of HVX threads used"
and unit == "COUNT"
and hvx_threads is None
):
hvx_threads = int(time_val)

if (
event_id == "Accelerator (execute) time (cycles)"
and unit == "CYCLES"
and accel_execute_cycles is None
):
accel_execute_cycles = int(time_val)

if (
event_id == "Accelerator (execute) time"
and unit == "US"
and accel_execute_us is None
):
accel_execute_us = int(time_val)

return {
"hvx_threads": hvx_threads or 0,
"accel_execute_cycles": accel_execute_cycles or 0,
"accel_execute_us": accel_execute_us or 0,
}


def _extract_samples(rows: list[dict[str, str]]) -> list[list[dict[str, Any]]]:
"""Parse NODE SUB-EVENT rows into per-sample operator lists.

Each sample begins at a ROOT row with
``Accelerator (execute) time (cycles)`` and ends before the
next such row (or end-of-file).
"""
samples: list[list[dict[str, Any]]] = []
current_sample: list[dict[str, Any]] | None = None
if current is None:
# Rows before the first HVX marker are compile/finalize noise.
continue

for row in rows:
event_level = row.get("Event Level", "").strip()
event_id = row.get("Event Identifier", "").strip()
message = row.get("Message", "").strip()
time_val = row.get("Time", "").strip()
unit = row.get("Unit of Measurement", "").strip()
meta = current["metadata"]

# Detect sample boundary.
if (
event_level == "ROOT"
and event_id == "Accelerator (execute) time (cycles)"
and unit == "CYCLES"
):
# Close any previous sample before starting a new one.
if current_sample is not None:
samples.append(current_sample)
current_sample = []
meta["accel_execute_cycles"] = int(time_val)
continue

# Only collect NODE SUB-EVENT rows with CYCLES unit.
if (
current_sample is not None
and message == "NODE"
and event_level == "SUB-EVENT"
and unit == "CYCLES"
):
if event_level == "ROOT" and event_id == "Accelerator (execute) time" and unit == "US":
meta["accel_execute_us"] = int(time_val)
continue

# Collect NODE SUB-EVENT rows with CYCLES unit.
if message == "NODE" and event_level == "SUB-EVENT" and unit == "CYCLES":
parsed = _parse_node_event(event_id, time_val)
if parsed is not None:
current_sample.append(parsed)
current["samples"].append(parsed)

# Flush the last sample.
if current_sample is not None and len(current_sample) > 0:
samples.append(current_sample)
if current is not None and current["samples"]:
samples.append(current)

return samples


def _parse_node_event(
event_id: str, time_val: str
) -> dict[str, Any] | None:
def _parse_node_event(event_id: str, time_val: str) -> dict[str, Any] | None:
"""Parse a single NODE SUB-EVENT identifier into name/op_id/cycles."""
m = _OP_PATTERN.match(event_id)
if m is None:
Expand All @@ -180,48 +154,3 @@ def _parse_node_event(
name = _TOKEN_SUFFIX.sub("", raw_name)

return {"name": name, "op_id": op_id, "cycles": cycles}


def _aggregate_operators(
samples: list[list[dict[str, Any]]],
) -> list[dict[str, Any]]:
"""Average operator cycles across samples and sort by cycles desc.

Operators are keyed by ``op_id`` so identically-named ops in
different positions are kept separate.
"""
if not samples:
return []

# Accumulate totals keyed by op_id.
totals: dict[int, dict[str, Any]] = {}
counts: dict[int, int] = {}

for sample in samples:
for op in sample:
oid = op["op_id"]
if oid not in totals:
totals[oid] = {
"name": op["name"],
"op_id": oid,
"cycles": 0,
}
counts[oid] = 0
totals[oid]["cycles"] += op["cycles"]
counts[oid] += 1

# Average.
aggregated: list[dict[str, Any]] = []
for oid, entry in totals.items():
avg_cycles = entry["cycles"] / counts[oid]
aggregated.append(
{
"name": entry["name"],
"op_id": entry["op_id"],
"cycles": avg_cycles,
}
)

# Sort descending by cycles.
aggregated.sort(key=lambda op: op["cycles"], reverse=True)
return aggregated
Loading
Loading