From fa956572db1d784904e8d80f89a51d97f11b0e39 Mon Sep 17 00:00:00 2001 From: wcwxy <26245345+ChaoWao@users.noreply.github.com> Date: Wed, 27 May 2026 09:37:05 +0800 Subject: [PATCH] Refactor: drop fanout from L2PerfRecord hot path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The L2 swimlane per-task commit on AICPU was copying up to 128*8B = 1 KB of fanout edges plus walking the producer's fanout linked list, every task, on the scheduler completion critical path. The fanout edges are already the static DAG and are reconstructed offline by dep_gen replay into deps.json — so the device-side hot path was paying GM-bandwidth and cache-miss cost to duplicate information host tooling already has. Scope is a2a3 only; a5 is untouched. Device side: - L2PerfRecord drops fanout[128] / fanout_count (~1088 B -> 64 B per record). - l2_perf_aicpu_complete_record drops the trailing fanout / fanout_count parameters; the impl no longer touches them. - scheduler_completion drops the fanout_arr build + linked-list walk; host_build_graph/aicpu_executor drops the same pattern at all four call sites. Host side: - l2_perf_collector::export_swimlane_json emits "fanout": [] and "fanout_count": 0 per task to keep the JSON schema shape stable, and drops the top-level "version" field, which had drifted into a duplicate of L2PerfLevel (see in-flight PR #856 for the misaligned guard cleanup on the consumer side). Downstream tools: - swimlane_converter already preferred deps.json over task["fanout"]; it now reads the version-free schema and treats empty fanout as the expected steady state. - sched_overhead_analysis no longer gates phase parsing on the dropped "version" field — it gates on presence of aicpu_scheduler_phases, which is the right key. Tests and comments: - dep_gen tests drop the now-vacuous "fanout subset-of deps" gate and the auto-add of --enable-l2-swimlane that only existed to feed that gate. - _swimlane_validate drops the version assertion. - profiling_levels.md, dep_gen.h, dep_gen_replay.h, pto_orchestrator comments updated to reflect deps.json as the sole source of truth for fanout. Verified on a2a3sim: test_l2_swimlane, test_l2_swimlane_mixed, test_dep_gen, test_dep_gen_chain all pass with --enable-l2-swimlane --enable-dep-gen. --- simpler_setup/tools/README.md | 23 +++- .../tools/sched_overhead_analysis.py | 10 +- simpler_setup/tools/swimlane_converter.py | 109 ++++++++++-------- .../include/aicpu/l2_perf_collector_aicpu.h | 9 +- src/a2a3/platform/include/common/dep_gen.h | 6 +- .../include/common/l2_perf_profiling.h | 19 ++- .../src/aicpu/l2_perf_collector_aicpu.cpp | 16 +-- .../platform/src/host/l2_perf_collector.cpp | 20 +--- .../host_build_graph/aicpu/aicpu_executor.cpp | 40 +------ .../docs/profiling_levels.md | 7 +- .../host/dep_gen_replay.h | 10 +- .../runtime/pto_orchestrator.cpp | 7 +- .../scheduler/scheduler_completion.cpp | 16 +-- .../dfx/dep_gen/test_dep_gen.py | 63 +++------- .../dfx/dep_gen/test_dep_gen_chain.py | 5 - .../dfx/l2_swimlane/_swimlane_validate.py | 3 +- 16 files changed, 139 insertions(+), 224 deletions(-) diff --git a/simpler_setup/tools/README.md b/simpler_setup/tools/README.md index d2446cb10..49817aa53 100644 --- a/simpler_setup/tools/README.md +++ b/simpler_setup/tools/README.md @@ -47,8 +47,20 @@ python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_rec # Verbose mode (for debugging) python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_records.json -v + +# Reuse a deps.json captured in an earlier dep_gen run (different output dir) +python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_records.json \ + --deps-json outputs/_/deps.json ``` +> Dependency arrows in the Perfetto trace come from `deps.json` (dep_gen +> replay). The device hot path no longer records fanout, so the typical +> workflow is **two runs**: a one-time `--enable-dep-gen` capture per +> topology to produce `deps.json`, then any number of +> `--enable-l2-swimlane` runs that consume it. If no `deps.json` is found +> alongside the perf JSON (and `--deps-json` isn't passed), the trace +> still renders but has no arrows; the converter prints a warning. + ### Command-Line Options | Option | Short | Description | @@ -57,6 +69,7 @@ python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_rec | `--output` | `-o` | Output JSON file (default: outputs/merged_swimlane_``.json) | | `--kernel-config` | `-k` | Path to kernel_config.py, used for function name mapping | | `--func-names` | | Path to func_id_names_*.json (SceneTest format) for function name mapping | +| `--deps-json` | | Path to a dep_gen `deps.json` (defaults to sibling of input). Without one, no dependency arrows are drawn. | | `--verbose` | `-v` | Enable verbose output | ### Outputs @@ -154,7 +167,7 @@ Output is emitted in three parts: - **Part 2: AICPU scheduler loop breakdown** — per-scheduler-thread loop statistics, per-phase (scan / complete / dispatch / idle) time ratios, pop_hit / pop_miss totals, and (when deps.json is available) per-thread fanout / fanin aggregates - **Part 3: Tail OH distribution & cause analysis** — Tail OH quantile distribution (P10–P99), correlation between scheduler loop iteration time and Tail OH, and data-driven insights into the dominant phase -The perf JSON must be a v2 capture with non-empty `aicpu_scheduler_phases` (rerun the case with `--enable-l2-swimlane` if the tool reports the field is missing). +The perf JSON must have non-empty `aicpu_scheduler_phases` (rerun the case with `--enable-l2-swimlane` if the tool reports the field is missing). --- @@ -270,7 +283,6 @@ The analysis tools share the same input format - the `l2_perf_records_*.json` fi ```json { - "version": 1, "tasks": [ { "task_id": 0, @@ -279,14 +291,15 @@ The analysis tools share the same input format - the `l2_perf_records_*.json` fi "core_type": "aic", "start_time_us": 100.0, "end_time_us": 250.5, - "duration_us": 150.5, - "fanout": [1, 2], - "fanout_count": 2 + "duration_us": 150.5 } ] } ``` +Dependency edges come from `deps.json` (dep_gen replay) at post-process time — +not from the perf JSON. See [`swimlane_converter --deps-json`](#swimlane_converter). + ### Kernel Config Format To display meaningful function names in the output, provide a `kernel_config.py` file: diff --git a/simpler_setup/tools/sched_overhead_analysis.py b/simpler_setup/tools/sched_overhead_analysis.py index d9a5653e9..5ab4e7b36 100644 --- a/simpler_setup/tools/sched_overhead_analysis.py +++ b/simpler_setup/tools/sched_overhead_analysis.py @@ -10,7 +10,7 @@ """Scheduler overhead analysis for PTO2. Inputs: - 1. Per-task perf profiling data (l2_perf_records_*.json), v2 schema with + 1. Per-task perf profiling data (l2_perf_records_*.json) with ``aicpu_scheduler_phases`` populated by ``--enable-l2-swimlane``. 2. deps.json (optional, dep_gen replay output) colocated with the perf JSON, used to derive per-thread fanout / fanin DAG stats. @@ -153,18 +153,16 @@ def auto_select_l2_perf_records_json(): def parse_scheduler_from_json_phases(data): - """Extract scheduler Phase breakdown from l2_perf_records JSON (version >= 2). + """Extract scheduler Phase breakdown from l2_perf_records JSON. Computes per-thread loop counts, task counts, and phase totals - from aicpu_scheduler_phases records. + from aicpu_scheduler_phases records (present at l2_perf_level >= 3). Returns: dict: Thread data keyed by thread index, with per-phase us / pct, pop_hit / pop_miss, loops, completed, tasks_per_loop. Returns empty dict if phase data is not available. """ - if data.get("version", 1) < 2: - return {} phases_by_thread = data.get("aicpu_scheduler_phases", []) if not phases_by_thread: return {} @@ -487,7 +485,7 @@ def run_analysis( # noqa: PLR0912, PLR0915 else: pop_hit = pop_miss = 0 pop_hit_rate = 0.0 - print(" Pop: (no per-emit pop deltas in input — needs --enable-l2-swimlane on a v2 JSON capture)") + print(" Pop: (no per-emit pop deltas in input — needs --enable-l2-swimlane)") print() print("=" * 90) diff --git a/simpler_setup/tools/swimlane_converter.py b/simpler_setup/tools/swimlane_converter.py index e042bb7a6..5841617cb 100644 --- a/simpler_setup/tools/swimlane_converter.py +++ b/simpler_setup/tools/swimlane_converter.py @@ -92,8 +92,7 @@ def read_perf_data(filepath): filepath: Path to input JSON file Returns: - dict: Parsed performance data with keys: - - version + dict: Parsed performance data with key: - tasks (list) Raises: @@ -102,50 +101,44 @@ def read_perf_data(filepath): with open(filepath) as f: data = json.load(f) - # Validate required fields - required_fields = ["version", "tasks"] - for field in required_fields: - if field not in data: - raise ValueError(f"Missing required field: {field}") - - # Validate version - if data["version"] not in [1, 2, 3, 4]: - raise ValueError(f"Unsupported version: {data['version']} (expected 1, 2, 3, or 4)") + if "tasks" not in data: + raise ValueError("Missing required field: tasks") return data -def load_deps_json(perf_records_path): - """Load deps.json (dep_gen replay output) co-located with ``l2_perf_records.json``. +def load_deps_json(deps_path): + """Load a dep_gen replay output (``deps.json``). - deps.json supersedes ``task["fanout"]``: fanout is sealed at the moment the - producer's L2PerfRecord retires, so consumers submitted after a fast producer - completes can never get attributed to it. dep_gen's replay reconstructs the - complete graph by replaying every captured ``submit_task`` through a host - PTO2TensorMap. + deps.json is the sole source of truth for the task graph in this tool: + the device hot path no longer records per-task fanout (see PR #863). The + typical workflow is a dep_gen run once per topology (``--enable-dep-gen``) + to produce ``deps.json``, then any number of ``--enable-l2-swimlane`` runs + that join their per-task timing against that captured graph. Returns: - dict[int, list[int]] mapping ``pred_raw → [succ_raw, ...]`` (i.e. the - same shape as ``task["fanout"]``), or ``None`` if no deps.json is present. - Tasks with no successors are absent from the dict (mirrors ``defaultdict`` - semantics on lookup miss). + dict[int, list[int]] mapping ``pred_raw → [succ_raw, ...]``, or + ``None`` if the file is missing, unreadable, or not v2-shaped. Tasks + with no successors are absent from the dict (``defaultdict``-like + lookup-miss semantics). """ - deps_path = Path(perf_records_path).parent / "deps.json" + deps_path = Path(deps_path) if not deps_path.exists(): return None try: with deps_path.open() as f: data = json.load(f) except (OSError, ValueError) as e: - print(f"Warning: failed to read {deps_path}: {e}; falling back to fanout", file=sys.stderr) + print(f"Warning: failed to read {deps_path}: {e}", file=sys.stderr) return None edges = data.get("edges") if not isinstance(edges, list): + print(f"Warning: {deps_path} has no 'edges' array", file=sys.stderr) return None version = data.get("version") if version != 2: print( - f"Warning: deps.json version={version!r}; only v2 is supported. Falling back to fanout[].", + f"Warning: {deps_path} version={version!r}; only v2 is supported.", file=sys.stderr, ) return None @@ -396,7 +389,6 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 tasks: List of task dicts with fields: - task_id, func_id, core_id, core_type - start_time_us, end_time_us, duration_us - - fanout, fanout_count - dispatch_time_us (optional, AICPU dispatch timestamp) - finish_time_us (optional, AICPU finish timestamp) output_path: Path to output JSON file @@ -477,9 +469,6 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 ts = task["start_time_us"] dur = task["duration_us"] - # Build fanout hint string (packed ids → rXtY / tY for readability) - fanout_str = "[" + ", ".join(format_task_display(x) for x in task["fanout"]) + "]" - # Get function name if available func_id = task["func_id"] tdisp = format_task_display(task["task_id"]) @@ -489,6 +478,14 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 else: task_name = f"func_{_func_id_to_letter(func_id)}({tdisp})" + # Build fanout hint string (packed ids → rXtY / tY for readability) + # from deps.json — the device hot path no longer carries fanout. + fanout_str = ( + "[" + + ", ".join(format_task_display(x) for x in (deps_edges.get(task["task_id"], []) if deps_edges else [])) + + "]" + ) + events.append( { "args": { @@ -620,9 +617,9 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 task_to_aicpu_event_id[(task["task_id"], task["core_id"])] = event_id event_id += 1 - # Flow events (Flow events "s" and "f" for dependencies). When deps.json - # was produced by dep_gen replay, prefer its edges over task["fanout"] — - # fanout is the truncated, race-prone view (see load_deps_json's docstring). + # Flow events (Flow events "s" and "f" for dependencies). Edges come from + # deps.json (dep_gen replay); without one we emit no flow events at all, + # since the device hot path no longer carries fanout (PR #863). # Edges where the predecessor's end_time outlives the successor's start_time # are flagged as happens-before violations and emitted with a distinct flow # name so Perfetto colors them differently from clean dependency arrows. @@ -631,11 +628,7 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 task_map[t["task_id"]].append(t) flow_id = 0 hb_violation_count = 0 - - def _succs_for(task): - if deps_edges is not None: - return deps_edges.get(task["task_id"], []) - return task["fanout"] + edges_by_pred = deps_edges or {} for task in tasks: src_tid = core_to_tid[task["core_id"]] @@ -646,7 +639,7 @@ def _succs_for(task): # Use a small offset (0.01 us) for visual clarity flow_start_us = src_ts_end - 0.01 - for succ_task_id in _succs_for(task): + for succ_task_id in edges_by_pred.get(task["task_id"], []): if succ_task_id not in task_map: if verbose: print( @@ -699,8 +692,10 @@ def _succs_for(task): flow_id += 1 if verbose: - edge_source = "deps.json" if deps_edges is not None else "task.fanout" - print(f" Flow events: {flow_id} edges (source: {edge_source})") + if deps_edges is not None: + print(f" Flow events: {flow_id} edges (source: deps.json)") + else: + print(" Flow events: 0 (no deps.json — re-run dep_gen and pass --deps-json to add arrows)") if hb_violation_count > 0: print(f" Happens-before violations: {hb_violation_count} edge(s) flagged as 'hb_violation'") @@ -841,7 +836,7 @@ def _succs_for(task): src_tid = task_to_aicpu_tid.get((task["task_id"], task["core_id"]), core_to_tid[task["core_id"]]) src_aicpu_eid = task_to_aicpu_event_id.get((task["task_id"], task["core_id"])) - for succ_task_id in _succs_for(task): + for succ_task_id in edges_by_pred.get(task["task_id"], []): if succ_task_id not in task_map: continue @@ -1108,6 +1103,13 @@ def _build_parser(): "--func-names", help="Path to func_id_names_*.json (SceneTest format) for func_id to function name mapping", ) + parser.add_argument( + "--deps-json", + help=( + "Path to a dep_gen replay deps.json (defaults to sibling of the perf JSON). " + "Without one the trace has no dependency arrows — re-run with --enable-dep-gen first." + ), + ) parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") return parser @@ -1145,11 +1147,10 @@ def _resolve_output_path(args, input_path): def _print_verbose_data_info(data, verbose): - """Print verbose summary of loaded performance data including v2 phase counts.""" + """Print verbose summary of loaded performance data including phase counts.""" if not verbose: return print("\n=== Performance Data ===") - print(f" Version: {data['version']}") print(f" Task Count: {len(data['tasks'])}") if data["tasks"]: start_times = [t["start_time_us"] for t in data["tasks"]] @@ -1158,8 +1159,6 @@ def _print_verbose_data_info(data, verbose): max_time = max(end_times) print(f" Time Range: {min_time:.3f} us - {max_time:.3f} us (span: {max_time - min_time:.3f} us)") print() - if data["version"] != 2: - return scheduler_phases = data.get("aicpu_scheduler_phases") orchestrator_phases = data.get("aicpu_orchestrator_phases") core_to_thread = data.get("core_to_thread") @@ -1227,9 +1226,17 @@ def main(): output_path = _resolve_output_path(args, input_path) - deps_edges = load_deps_json(input_path) - if args.verbose and deps_edges is not None: - print(f" Using deps.json edges ({sum(len(v) for v in deps_edges.values())} total)") + deps_path = Path(args.deps_json) if args.deps_json else Path(input_path).parent / "deps.json" + deps_edges = load_deps_json(deps_path) + if deps_edges is not None: + if args.verbose: + print(f" Using deps.json edges ({sum(len(v) for v in deps_edges.values())} total) from {deps_path}") + else: + print( + f"Warning: no usable deps.json at {deps_path}; Perfetto trace will have no dependency arrows. " + f"Run a dep_gen capture (--enable-dep-gen) and pass --deps-json to add them.", + file=sys.stderr, + ) generate_chrome_trace_json( data["tasks"], @@ -1250,13 +1257,15 @@ def main(): print_task_statistics(data["tasks"], func_names) - # The deep-dive reads only the perf JSON and (optionally) the colocated - # deps.json — sibling auto-discovery happens inside run_sched_overhead_analysis. + # The deep-dive reads the perf JSON plus deps.json (for per-thread + # fanout / fanin aggregates). Forward the resolved deps path so an + # explicit --deps-json overrides sibling auto-discovery there too. print("\n=== Scheduler Overhead Deep Dive ===") deep_dive_rc = run_sched_overhead_analysis( input_path, print_sources=True, perf_data=data, + deps_json_path=deps_path if deps_edges is not None else None, ) if deep_dive_rc != 0: print( diff --git a/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h b/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h index a49bbd9b5..54ebecf24 100644 --- a/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h +++ b/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h @@ -73,8 +73,9 @@ void l2_perf_aicpu_init(int worker_count); * `dual_issue_slots[expected_reg_task_id % PLATFORM_L2_AICORE_RING_SIZE]`, * validates the task_id match, fills all AICPU-side fields, commits into * the current records buffer, and rotates the records buffer internally - * once it fills up. Callers must pre-extract fanout into a plain uint64_t - * array (platform layer cannot depend on runtime linked-list types). + * once it fills up. Fanout edges live in the static DAG (deps.json from + * dep_gen) and are joined by the host's swimlane converter post-run, so + * this commit path does not touch fanout. * * Per-core counter accounting: * total_record_count++ — every commit attempt (success or failure) @@ -95,12 +96,10 @@ void l2_perf_aicpu_init(int worker_count); * @param core_type Core type (AIC/AIV) * @param dispatch_time AICPU timestamp when task was dispatched * @param finish_time AICPU timestamp when task completion was observed - * @param fanout Pre-extracted successor task ID array (nullptr if none) - * @param fanout_count Number of entries in fanout array (0 if none) */ int l2_perf_aicpu_complete_record( int core_id, int thread_idx, uint32_t expected_reg_task_id, uint64_t task_id, uint32_t func_id, CoreType core_type, - uint64_t dispatch_time, uint64_t finish_time, const uint64_t *fanout, int32_t fanout_count + uint64_t dispatch_time, uint64_t finish_time ); /** diff --git a/src/a2a3/platform/include/common/dep_gen.h b/src/a2a3/platform/include/common/dep_gen.h index 167b8d241..091fd349a 100644 --- a/src/a2a3/platform/include/common/dep_gen.h +++ b/src/a2a3/platform/include/common/dep_gen.h @@ -15,9 +15,9 @@ * * Captures the inputs to every Orchestrator::submit_task call into a streaming * ring of DepGenRecord. The host side replays these records offline to - * reconstruct the full task dependency graph (deps.json), bypassing the race - * window in L2PerfRecord::fanout[] (where an early-finishing producer would - * have its record sealed before later-submitted consumers can register). + * reconstruct the full task dependency graph (deps.json). deps.json is the + * sole source of truth for fanout edges; the L2 swimlane hot path no longer + * carries fanout to keep AICPU off the per-task GM-store critical path. * * Streaming buffer design mirrors PMU / L2Perf / TensorDump (single source of * algorithmic truth in src/a2a3/platform/include/host/profiling_common/profiler_base.h): diff --git a/src/a2a3/platform/include/common/l2_perf_profiling.h b/src/a2a3/platform/include/common/l2_perf_profiling.h index 7f5c01355..128e418e6 100644 --- a/src/a2a3/platform/include/common/l2_perf_profiling.h +++ b/src/a2a3/platform/include/common/l2_perf_profiling.h @@ -60,11 +60,6 @@ #include "common/core_type.h" #include "common/platform_config.h" -// Maximum number of successor tasks per L2PerfRecord (matches Task::fanout) -#ifndef RUNTIME_MAX_FANOUT -#define RUNTIME_MAX_FANOUT 128 -#endif - // ============================================================================= // L2 perf_level — granularity ladder for the L2 swimlane profiler. // @@ -82,7 +77,7 @@ enum class L2PerfLevel : uint32_t { DISABLED = 0, // No collection at all AICORE_TIMING = 1, // AICore per-task start/end timestamps + task record buffer - AICPU_TIMING = 2, // + AICPU dispatch/finish timestamps + fanout dependency list + AICPU_TIMING = 2, // + AICPU dispatch/finish timestamps SCHED_PHASES = 3, // + scheduler main-loop phase records (SCHED_COMPLETE/DISPATCH/IDLE_WAIT) ORCH_PHASES = 4, // + orchestrator phase records }; @@ -92,7 +87,13 @@ enum class L2PerfLevel : uint32_t { // ============================================================================= /** - * Single task execution record + * Single task execution record. + * + * Fanout edges live in the static DAG (deps.json from dep_gen) — not in + * this record. Keeping fanout out of the hot AICPU commit path avoids a + * per-task ~1 KB GM store + a linked-list walk on the scheduler's + * critical fanin tail. The host swimlane export emits empty fanout + * fields; `swimlane_converter.py` joins deps.json at post-process time. */ struct L2PerfRecord { // Timing information (device clock timestamps) @@ -111,10 +112,6 @@ struct L2PerfRecord { uint64_t task_id; uint32_t func_id; // Kernel function identifier CoreType core_type; // Core type (AIC/AIV) - - // Dependency relationship (fanout only) - uint64_t fanout[RUNTIME_MAX_FANOUT]; // Successor task task_id array - int32_t fanout_count; // Number of successor tasks } __attribute__((aligned(64))); static_assert(sizeof(L2PerfRecord) % 64 == 0, "L2PerfRecord must be 64-byte aligned for optimal cache performance"); diff --git a/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp b/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp index e4f9d1c68..cc81ab919 100644 --- a/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp +++ b/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp @@ -226,7 +226,7 @@ static void switch_records_buffer(int core_id, int thread_idx) { int l2_perf_aicpu_complete_record( int core_id, int thread_idx, uint32_t expected_reg_task_id, uint64_t task_id, uint32_t func_id, CoreType core_type, - uint64_t dispatch_time, uint64_t finish_time, const uint64_t *fanout, int32_t fanout_count + uint64_t dispatch_time, uint64_t finish_time ) { if (core_id < 0 || core_id >= PLATFORM_MAX_CORES) { return -1; @@ -288,23 +288,15 @@ int l2_perf_aicpu_complete_record( record->func_id = func_id; record->core_type = core_type; - // AICPU_TIMING and above: dispatch/finish timing and fanout dependency info + // AICPU_TIMING and above: dispatch/finish timing. Fanout edges live in + // the static DAG (deps.json) and are joined by the host post-run, so they + // are not written here. if (g_l2_perf_level >= L2PerfLevel::AICPU_TIMING) { record->dispatch_time = dispatch_time; record->finish_time = finish_time; - if (fanout != nullptr && fanout_count > 0) { - int32_t n = (fanout_count > RUNTIME_MAX_FANOUT) ? RUNTIME_MAX_FANOUT : fanout_count; - for (int32_t i = 0; i < n; i++) { - record->fanout[i] = fanout[i]; - } - record->fanout_count = n; - } else { - record->fanout_count = 0; - } } else { record->dispatch_time = 0; record->finish_time = 0; - record->fanout_count = 0; } uint32_t new_count = count + 1; diff --git a/src/a2a3/platform/src/host/l2_perf_collector.cpp b/src/a2a3/platform/src/host/l2_perf_collector.cpp index 6b0744aa5..745dab8b6 100644 --- a/src/a2a3/platform/src/host/l2_perf_collector.cpp +++ b/src/a2a3/platform/src/host/l2_perf_collector.cpp @@ -592,9 +592,10 @@ int L2PerfCollector::export_swimlane_json() { } // Step 7: Write JSON data - int version = static_cast(l2_perf_level_); + // Fanout fields are emitted as empty/zero — the device-side hot path no + // longer carries them. Downstream (swimlane_converter.py) joins fanout + // from the sibling deps.json (dep_gen output). outfile << "{\n"; - outfile << " \"version\": " << version << ",\n"; outfile << " \"tasks\": [\n"; for (size_t i = 0; i < tagged_records.size(); ++i) { @@ -620,18 +621,9 @@ int L2PerfCollector::export_swimlane_json() { outfile << " \"end_time_us\": " << std::fixed << std::setprecision(3) << end_us << ",\n"; outfile << " \"duration_us\": " << std::fixed << std::setprecision(3) << duration_us << ",\n"; outfile << " \"dispatch_time_us\": " << std::fixed << std::setprecision(3) << dispatch_us << ",\n"; - outfile << " \"finish_time_us\": " << std::fixed << std::setprecision(3) << finish_us << ",\n"; - outfile << " \"fanout\": ["; - int safe_fanout_count = - (record.fanout_count >= 0 && record.fanout_count <= RUNTIME_MAX_FANOUT) ? record.fanout_count : 0; - for (int j = 0; j < safe_fanout_count; ++j) { - outfile << record.fanout[j]; - if (j < safe_fanout_count - 1) { - outfile << ", "; - } - } - outfile << "],\n"; - outfile << " \"fanout_count\": " << record.fanout_count << "\n"; + outfile << " \"finish_time_us\": " << std::fixed << std::setprecision(3) << finish_us << "\n"; + // Fanout is no longer carried on the device hot path — dep_gen replay + // (deps.json) is the sole source of truth, joined in by tooling. outfile << " }"; if (i < tagged_records.size() - 1) { outfile << ","; diff --git a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp index b89d99e1a..d0d36ceae 100644 --- a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp @@ -739,18 +739,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (prev_running_id != AICPU_TASK_INVALID) { Task *prev_task = &runtime.tasks[prev_running_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < prev_task->fanout_count; i++) { - fanout_arr[i] = static_cast(prev_task->fanout[i]); - } - fanout_count = prev_task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(prev_running_id), static_cast(prev_running_id), prev_task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for implicit task %d", core_id, @@ -764,18 +756,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *task = &runtime.tasks[completed_task_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < task->fanout_count; i++) { - fanout_arr[i] = static_cast(task->fanout[i]); - } - fanout_count = task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(completed_task_id), static_cast(completed_task_id), task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task %d", core_id, completed_task_id @@ -860,18 +844,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (l2_perf_enabled) { uint64_t finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *prev_task = &runtime.tasks[prev_running_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < prev_task->fanout_count; i++) { - fanout_arr[i] = static_cast(prev_task->fanout[i]); - } - fanout_count = prev_task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(prev_running_id), static_cast(prev_running_id), prev_task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for implicit task %d", core_id, @@ -911,18 +887,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (l2_perf_enabled) { uint64_t finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *task = &runtime.tasks[completed_task_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < task->fanout_count; i++) { - fanout_arr[i] = static_cast(task->fanout[i]); - } - fanout_count = task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(completed_task_id), static_cast(completed_task_id), task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task %d", core_id, completed_task_id diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md index 34e17cc86..566249ac7 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md @@ -254,10 +254,13 @@ header just like on onboard. | ----- | -------- | | 0 | Nothing (disabled) | | 1 | AICore timing only (start/end/task_id/func_id/core_type) | -| 2 | + dispatch_time, finish_time, fanout | +| 2 | + dispatch_time, finish_time | | 3 | + Scheduler phases (`SCHED_*`) | | 4 | + Orchestrator phases (full) | +Fanout edges are no longer carried on the device hot path — `swimlane_converter.py` +joins them from the sibling `deps.json` (produced by dep_gen) at post-process time. + Bare `--enable-l2-swimlane` = level 4 (backward compatible). ### Level gating in AICPU code @@ -270,7 +273,7 @@ content it depends on instead of relying on magic numbers: // Cheap binary check, available immediately after kernel entry. if (is_l2_swimlane_enabled()) { ... } -// AICPU dispatch/finish timestamps + fanout. +// AICPU dispatch/finish timestamps. // Granular checks below require l2_perf_aicpu_init to have already run // (so the level has been promoted from the shared-memory header). if (get_l2_perf_level() >= L2PerfLevel::AICPU_TIMING) { ... } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h b/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h index b41c212a7..ea39bf7ea 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h @@ -24,12 +24,10 @@ * device run completes, so going through the filesystem would just be * extra I/O and an extra file in the output directory. * - * deps.json supersedes ``L2PerfRecord::fanout[]`` for tools that need the - * *complete* dependency graph: fanout is sealed when a producer finishes, so - * consumers submitted after a fast producer retires never get attributed to - * it (the race window that motivated dep_gen). Replay sees every submit and - * so reconstructs the graph the runtime would have built if no producer ever - * raced ahead. + * deps.json is the sole source of truth for fanout: the L2 swimlane hot + * path no longer records ``L2PerfRecord::fanout[]`` (taking the per-task + * 1 KB GM store off the scheduler critical path). Replay sees every + * submit and reconstructs the complete dependency graph. * * Output format (deps.json, v2): * diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 813f1f846..d1c039785 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -554,10 +554,9 @@ static TaskOutputTensors submit_task_common( // dep_gen capture point: snapshot the orch submit_task inputs while the // tensormap is still in its pre-lookup state for this task. Replay reads - // these records offline to reconstruct the complete dep graph, sidestepping - // the race window in L2PerfRecord::fanout[] where an early-finishing - // producer's record gets sealed before later-submitted consumers can - // register themselves. + // these records offline to reconstruct the complete dep graph — the sole + // source of truth for fanout now that the swimlane hot path no longer + // records it. if (is_dep_gen_enabled()) { const void *tensor_ptrs[MAX_TENSOR_ARGS]; // TensorArgType is `enum class : int32_t` (4 bytes); the on-disk record diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp index 140f000cb..152af452e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp @@ -154,24 +154,12 @@ void SchedulerContext::complete_slot_task( #if PTO2_SCHED_PROFILING uint64_t t_perf_start = get_sys_cnt_aicpu(); #endif - uint64_t finish_ts = 0; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int32_t fanout_n = 0; - - if (l2_perf_level_ >= L2PerfLevel::AICPU_TIMING) { - finish_ts = get_sys_cnt_aicpu(); - PTO2DepListEntry *cur = slot_state.fanout_head; - while (cur != nullptr && fanout_n < RUNTIME_MAX_FANOUT) { - fanout_arr[fanout_n++] = cur->slot_state->task->task_id.raw; - cur = cur->next; - } - } + uint64_t finish_ts = (l2_perf_level_ >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; int32_t perf_slot_idx = static_cast(subslot); if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(expected_reg_task_id), slot_state.task->task_id.raw, - slot_state.task->kernel_id[perf_slot_idx], hank[core_id].core_type, dispatch_ts, finish_ts, fanout_arr, - fanout_n + slot_state.task->kernel_id[perf_slot_idx], hank[core_id].core_type, dispatch_ts, finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task 0x%" PRIx64, core_id, diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py index b7ad064ae..35a9d3276 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py @@ -9,27 +9,20 @@ # ----------------------------------------------------------------------------------------------------------- """dep_gen capture + replay sim test. -Re-runs the ``vector_example`` orchestration with ``--enable-dep-gen`` (and, -in standalone mode, auto-adds ``--enable-l2-swimlane`` for the fanout ⊆ deps -gate). Verifies the end-to-end dep_gen pipeline on a2a3sim: +Re-runs the ``vector_example`` orchestration with ``--enable-dep-gen``. +Verifies the end-to-end dep_gen pipeline on a2a3sim: - 1. ``/deps.json`` is produced by the host replay - (PTO2TensorMap replay → JSON edge list), and contains exactly the - 6 edges documented in example_orchestration.cpp. The capture path - (host collector drains the device ring buffer into memory and feeds - the replay directly — no submit_trace.bin on disk) is exercised - implicitly: if it broke, deps.json would be empty or wrong. - 2. **Validation gate** (when l2_perf_records.json is present, i.e. - ``--enable-l2-swimlane`` was also enabled): every edge in - L2PerfRecord::fanout[] also appears in deps.json. deps may have - MORE edges than fanout (race-window edges fanout missed); we never - assert symmetry — that's the entire reason dep_gen exists. + ``/deps.json`` is produced by the host replay + (PTO2TensorMap replay → JSON edge list), and contains exactly the + 6 edges documented in example_orchestration.cpp. The capture path + (host collector drains the device ring buffer into memory and feeds + the replay directly — no submit_trace.bin on disk) is exercised + implicitly: if it broke, deps.json would be empty or wrong. -Pytest entry: needs ``--enable-dep-gen`` (capture+replay assertions) and -``--enable-l2-swimlane`` (fanout ⊆ deps gate). Standalone entry: pass -``--enable-dep-gen`` and the swimlane flag is added automatically so a -plain ``python test_dep_gen_capture.py -p a2a3sim --enable-dep-gen`` -exercises the full gate. +deps.json is now the sole source of truth for fanout edges — the device +hot path no longer records L2PerfRecord::fanout[], so there is no +"fanout ⊆ deps" cross-check to run. swimlane_converter.py joins +deps.json into the Perfetto trace at post-process time. Compute correctness is delegated to the upstream ``vector_example`` test — this case re-uses the same orchestration to keep coverage focused on the @@ -128,11 +121,8 @@ def test_run(self, st_platform, st_worker, request): def _post_validate(self, case): """Skips if no per-case output_prefix dir exists (e.g. selector skipped this case at pytest level). When the dir + deps.json are - present, assert: - - - deps.json contains the 6 edges documented in example_orchestration.cpp - - if l2_perf_records.json is also present (--enable-l2-swimlane on), - every fanout edge it records is a subset of the deps.json edge set + present, assert that deps.json contains the 6 edges documented in + example_orchestration.cpp. """ case_name = case["name"] safe_label = _sanitize_for_filename(f"TestDepGen_{case_name}") @@ -214,24 +204,6 @@ def _post_validate(self, case): f"edge {e.get('pred')}->{e.get('succ')} (source={source}) missing consumer_shape/start_offset/strides" ) - # ---- fanout ⊆ deps validation gate ---- - perf = out_dir / "l2_perf_records.json" - if perf.exists(): - with perf.open() as f: - pdata = json.load(f) - fanout_edges = set() - for task in pdata.get("tasks", []): - src = int(task["task_id"]) - for succ in task.get("fanout", []): - fanout_edges.add((src, int(succ))) - missing_in_deps = fanout_edges - deps_edges - assert not missing_in_deps, ( - f"fanout ⊆ deps gate FAILED: edges present in l2_perf_records.json " - f"fanout[] but absent from deps.json: {missing_in_deps}. " - f"This is a replay-side regression — the replay should be a " - f"superset of the runtime's fanout view." - ) - # ---- Tool smoke: deps_to_graph ---- # Exit-code-only check; we don't validate the HTML content. A schema # change that breaks the viewer fires here in the same CI step that @@ -257,11 +229,4 @@ def _post_validate(self, case): if __name__ == "__main__": - # ``_post_validate`` is invoked by the SceneTestCase framework after each - # case runs (pytest path AND standalone). Standalone main just adds the - # swimlane flag so the fanout ⊆ deps gate runs by default — both flags - # compose cleanly and the gate is the most informative assertion the test - # produces, so don't make the user remember to ask for it. - if "--enable-dep-gen" in sys.argv and "--enable-l2-swimlane" not in sys.argv: - sys.argv.append("--enable-l2-swimlane") SceneTestCase.run_module(__name__) diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py index ad05dae7e..90910bab6 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py @@ -31,7 +31,6 @@ """ import json -import sys import torch from simpler.task_interface import ArgDirection as D @@ -209,8 +208,4 @@ def _post_validate(self, case): if __name__ == "__main__": - # Standalone entry: auto-add the swimlane flag so fanout ⊆ deps gate runs - # alongside the chain assertions, matching test_dep_gen.py's convention. - if "--enable-dep-gen" in sys.argv and "--enable-l2-swimlane" not in sys.argv: - sys.argv.append("--enable-l2-swimlane") SceneTestCase.run_module(__name__) diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py index 035461dc4..43e4d7daa 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py @@ -30,7 +30,7 @@ from simpler_setup.scene_test import _outputs_dir, _sanitize_for_filename -_REQUIRED_TASK_FIELDS = ("task_id", "func_id", "core_id", "core_type", "start_time_us", "end_time_us", "fanout") +_REQUIRED_TASK_FIELDS = ("task_id", "func_id", "core_id", "core_type", "start_time_us", "end_time_us") def validate_perf_artifact(case_label: str, *, expected_task_count: int | None = None) -> None: @@ -53,7 +53,6 @@ def validate_perf_artifact(case_label: str, *, expected_task_count: int | None = with perf.open() as f: data = json.load(f) - assert data.get("version") in (1, 2, 3, 4), f"unexpected version: {data.get('version')}" tasks = data.get("tasks") assert isinstance(tasks, list), "tasks field missing or not a list" assert len(tasks) > 0, f"perf records empty under {perf}"