diff --git a/simpler_setup/tools/README.md b/simpler_setup/tools/README.md index d2446cb10..49817aa53 100644 --- a/simpler_setup/tools/README.md +++ b/simpler_setup/tools/README.md @@ -47,8 +47,20 @@ python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_rec # Verbose mode (for debugging) python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_records.json -v + +# Reuse a deps.json captured in an earlier dep_gen run (different output dir) +python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_records.json \ + --deps-json outputs/_/deps.json ``` +> Dependency arrows in the Perfetto trace come from `deps.json` (dep_gen +> replay). The device hot path no longer records fanout, so the typical +> workflow is **two runs**: a one-time `--enable-dep-gen` capture per +> topology to produce `deps.json`, then any number of +> `--enable-l2-swimlane` runs that consume it. If no `deps.json` is found +> alongside the perf JSON (and `--deps-json` isn't passed), the trace +> still renders but has no arrows; the converter prints a warning. + ### Command-Line Options | Option | Short | Description | @@ -57,6 +69,7 @@ python -m simpler_setup.tools.swimlane_converter outputs/_/l2_perf_rec | `--output` | `-o` | Output JSON file (default: outputs/merged_swimlane_``.json) | | `--kernel-config` | `-k` | Path to kernel_config.py, used for function name mapping | | `--func-names` | | Path to func_id_names_*.json (SceneTest format) for function name mapping | +| `--deps-json` | | Path to a dep_gen `deps.json` (defaults to sibling of input). Without one, no dependency arrows are drawn. | | `--verbose` | `-v` | Enable verbose output | ### Outputs @@ -154,7 +167,7 @@ Output is emitted in three parts: - **Part 2: AICPU scheduler loop breakdown** — per-scheduler-thread loop statistics, per-phase (scan / complete / dispatch / idle) time ratios, pop_hit / pop_miss totals, and (when deps.json is available) per-thread fanout / fanin aggregates - **Part 3: Tail OH distribution & cause analysis** — Tail OH quantile distribution (P10–P99), correlation between scheduler loop iteration time and Tail OH, and data-driven insights into the dominant phase -The perf JSON must be a v2 capture with non-empty `aicpu_scheduler_phases` (rerun the case with `--enable-l2-swimlane` if the tool reports the field is missing). +The perf JSON must have non-empty `aicpu_scheduler_phases` (rerun the case with `--enable-l2-swimlane` if the tool reports the field is missing). --- @@ -270,7 +283,6 @@ The analysis tools share the same input format - the `l2_perf_records_*.json` fi ```json { - "version": 1, "tasks": [ { "task_id": 0, @@ -279,14 +291,15 @@ The analysis tools share the same input format - the `l2_perf_records_*.json` fi "core_type": "aic", "start_time_us": 100.0, "end_time_us": 250.5, - "duration_us": 150.5, - "fanout": [1, 2], - "fanout_count": 2 + "duration_us": 150.5 } ] } ``` +Dependency edges come from `deps.json` (dep_gen replay) at post-process time — +not from the perf JSON. See [`swimlane_converter --deps-json`](#swimlane_converter). + ### Kernel Config Format To display meaningful function names in the output, provide a `kernel_config.py` file: diff --git a/simpler_setup/tools/sched_overhead_analysis.py b/simpler_setup/tools/sched_overhead_analysis.py index d9a5653e9..5ab4e7b36 100644 --- a/simpler_setup/tools/sched_overhead_analysis.py +++ b/simpler_setup/tools/sched_overhead_analysis.py @@ -10,7 +10,7 @@ """Scheduler overhead analysis for PTO2. Inputs: - 1. Per-task perf profiling data (l2_perf_records_*.json), v2 schema with + 1. Per-task perf profiling data (l2_perf_records_*.json) with ``aicpu_scheduler_phases`` populated by ``--enable-l2-swimlane``. 2. deps.json (optional, dep_gen replay output) colocated with the perf JSON, used to derive per-thread fanout / fanin DAG stats. @@ -153,18 +153,16 @@ def auto_select_l2_perf_records_json(): def parse_scheduler_from_json_phases(data): - """Extract scheduler Phase breakdown from l2_perf_records JSON (version >= 2). + """Extract scheduler Phase breakdown from l2_perf_records JSON. Computes per-thread loop counts, task counts, and phase totals - from aicpu_scheduler_phases records. + from aicpu_scheduler_phases records (present at l2_perf_level >= 3). Returns: dict: Thread data keyed by thread index, with per-phase us / pct, pop_hit / pop_miss, loops, completed, tasks_per_loop. Returns empty dict if phase data is not available. """ - if data.get("version", 1) < 2: - return {} phases_by_thread = data.get("aicpu_scheduler_phases", []) if not phases_by_thread: return {} @@ -487,7 +485,7 @@ def run_analysis( # noqa: PLR0912, PLR0915 else: pop_hit = pop_miss = 0 pop_hit_rate = 0.0 - print(" Pop: (no per-emit pop deltas in input — needs --enable-l2-swimlane on a v2 JSON capture)") + print(" Pop: (no per-emit pop deltas in input — needs --enable-l2-swimlane)") print() print("=" * 90) diff --git a/simpler_setup/tools/swimlane_converter.py b/simpler_setup/tools/swimlane_converter.py index e042bb7a6..5841617cb 100644 --- a/simpler_setup/tools/swimlane_converter.py +++ b/simpler_setup/tools/swimlane_converter.py @@ -92,8 +92,7 @@ def read_perf_data(filepath): filepath: Path to input JSON file Returns: - dict: Parsed performance data with keys: - - version + dict: Parsed performance data with key: - tasks (list) Raises: @@ -102,50 +101,44 @@ def read_perf_data(filepath): with open(filepath) as f: data = json.load(f) - # Validate required fields - required_fields = ["version", "tasks"] - for field in required_fields: - if field not in data: - raise ValueError(f"Missing required field: {field}") - - # Validate version - if data["version"] not in [1, 2, 3, 4]: - raise ValueError(f"Unsupported version: {data['version']} (expected 1, 2, 3, or 4)") + if "tasks" not in data: + raise ValueError("Missing required field: tasks") return data -def load_deps_json(perf_records_path): - """Load deps.json (dep_gen replay output) co-located with ``l2_perf_records.json``. +def load_deps_json(deps_path): + """Load a dep_gen replay output (``deps.json``). - deps.json supersedes ``task["fanout"]``: fanout is sealed at the moment the - producer's L2PerfRecord retires, so consumers submitted after a fast producer - completes can never get attributed to it. dep_gen's replay reconstructs the - complete graph by replaying every captured ``submit_task`` through a host - PTO2TensorMap. + deps.json is the sole source of truth for the task graph in this tool: + the device hot path no longer records per-task fanout (see PR #863). The + typical workflow is a dep_gen run once per topology (``--enable-dep-gen``) + to produce ``deps.json``, then any number of ``--enable-l2-swimlane`` runs + that join their per-task timing against that captured graph. Returns: - dict[int, list[int]] mapping ``pred_raw → [succ_raw, ...]`` (i.e. the - same shape as ``task["fanout"]``), or ``None`` if no deps.json is present. - Tasks with no successors are absent from the dict (mirrors ``defaultdict`` - semantics on lookup miss). + dict[int, list[int]] mapping ``pred_raw → [succ_raw, ...]``, or + ``None`` if the file is missing, unreadable, or not v2-shaped. Tasks + with no successors are absent from the dict (``defaultdict``-like + lookup-miss semantics). """ - deps_path = Path(perf_records_path).parent / "deps.json" + deps_path = Path(deps_path) if not deps_path.exists(): return None try: with deps_path.open() as f: data = json.load(f) except (OSError, ValueError) as e: - print(f"Warning: failed to read {deps_path}: {e}; falling back to fanout", file=sys.stderr) + print(f"Warning: failed to read {deps_path}: {e}", file=sys.stderr) return None edges = data.get("edges") if not isinstance(edges, list): + print(f"Warning: {deps_path} has no 'edges' array", file=sys.stderr) return None version = data.get("version") if version != 2: print( - f"Warning: deps.json version={version!r}; only v2 is supported. Falling back to fanout[].", + f"Warning: {deps_path} version={version!r}; only v2 is supported.", file=sys.stderr, ) return None @@ -396,7 +389,6 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 tasks: List of task dicts with fields: - task_id, func_id, core_id, core_type - start_time_us, end_time_us, duration_us - - fanout, fanout_count - dispatch_time_us (optional, AICPU dispatch timestamp) - finish_time_us (optional, AICPU finish timestamp) output_path: Path to output JSON file @@ -477,9 +469,6 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 ts = task["start_time_us"] dur = task["duration_us"] - # Build fanout hint string (packed ids → rXtY / tY for readability) - fanout_str = "[" + ", ".join(format_task_display(x) for x in task["fanout"]) + "]" - # Get function name if available func_id = task["func_id"] tdisp = format_task_display(task["task_id"]) @@ -489,6 +478,14 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 else: task_name = f"func_{_func_id_to_letter(func_id)}({tdisp})" + # Build fanout hint string (packed ids → rXtY / tY for readability) + # from deps.json — the device hot path no longer carries fanout. + fanout_str = ( + "[" + + ", ".join(format_task_display(x) for x in (deps_edges.get(task["task_id"], []) if deps_edges else [])) + + "]" + ) + events.append( { "args": { @@ -620,9 +617,9 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 task_to_aicpu_event_id[(task["task_id"], task["core_id"])] = event_id event_id += 1 - # Flow events (Flow events "s" and "f" for dependencies). When deps.json - # was produced by dep_gen replay, prefer its edges over task["fanout"] — - # fanout is the truncated, race-prone view (see load_deps_json's docstring). + # Flow events (Flow events "s" and "f" for dependencies). Edges come from + # deps.json (dep_gen replay); without one we emit no flow events at all, + # since the device hot path no longer carries fanout (PR #863). # Edges where the predecessor's end_time outlives the successor's start_time # are flagged as happens-before violations and emitted with a distinct flow # name so Perfetto colors them differently from clean dependency arrows. @@ -631,11 +628,7 @@ def generate_chrome_trace_json( # noqa: PLR0912, PLR0915 task_map[t["task_id"]].append(t) flow_id = 0 hb_violation_count = 0 - - def _succs_for(task): - if deps_edges is not None: - return deps_edges.get(task["task_id"], []) - return task["fanout"] + edges_by_pred = deps_edges or {} for task in tasks: src_tid = core_to_tid[task["core_id"]] @@ -646,7 +639,7 @@ def _succs_for(task): # Use a small offset (0.01 us) for visual clarity flow_start_us = src_ts_end - 0.01 - for succ_task_id in _succs_for(task): + for succ_task_id in edges_by_pred.get(task["task_id"], []): if succ_task_id not in task_map: if verbose: print( @@ -699,8 +692,10 @@ def _succs_for(task): flow_id += 1 if verbose: - edge_source = "deps.json" if deps_edges is not None else "task.fanout" - print(f" Flow events: {flow_id} edges (source: {edge_source})") + if deps_edges is not None: + print(f" Flow events: {flow_id} edges (source: deps.json)") + else: + print(" Flow events: 0 (no deps.json — re-run dep_gen and pass --deps-json to add arrows)") if hb_violation_count > 0: print(f" Happens-before violations: {hb_violation_count} edge(s) flagged as 'hb_violation'") @@ -841,7 +836,7 @@ def _succs_for(task): src_tid = task_to_aicpu_tid.get((task["task_id"], task["core_id"]), core_to_tid[task["core_id"]]) src_aicpu_eid = task_to_aicpu_event_id.get((task["task_id"], task["core_id"])) - for succ_task_id in _succs_for(task): + for succ_task_id in edges_by_pred.get(task["task_id"], []): if succ_task_id not in task_map: continue @@ -1108,6 +1103,13 @@ def _build_parser(): "--func-names", help="Path to func_id_names_*.json (SceneTest format) for func_id to function name mapping", ) + parser.add_argument( + "--deps-json", + help=( + "Path to a dep_gen replay deps.json (defaults to sibling of the perf JSON). " + "Without one the trace has no dependency arrows — re-run with --enable-dep-gen first." + ), + ) parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") return parser @@ -1145,11 +1147,10 @@ def _resolve_output_path(args, input_path): def _print_verbose_data_info(data, verbose): - """Print verbose summary of loaded performance data including v2 phase counts.""" + """Print verbose summary of loaded performance data including phase counts.""" if not verbose: return print("\n=== Performance Data ===") - print(f" Version: {data['version']}") print(f" Task Count: {len(data['tasks'])}") if data["tasks"]: start_times = [t["start_time_us"] for t in data["tasks"]] @@ -1158,8 +1159,6 @@ def _print_verbose_data_info(data, verbose): max_time = max(end_times) print(f" Time Range: {min_time:.3f} us - {max_time:.3f} us (span: {max_time - min_time:.3f} us)") print() - if data["version"] != 2: - return scheduler_phases = data.get("aicpu_scheduler_phases") orchestrator_phases = data.get("aicpu_orchestrator_phases") core_to_thread = data.get("core_to_thread") @@ -1227,9 +1226,17 @@ def main(): output_path = _resolve_output_path(args, input_path) - deps_edges = load_deps_json(input_path) - if args.verbose and deps_edges is not None: - print(f" Using deps.json edges ({sum(len(v) for v in deps_edges.values())} total)") + deps_path = Path(args.deps_json) if args.deps_json else Path(input_path).parent / "deps.json" + deps_edges = load_deps_json(deps_path) + if deps_edges is not None: + if args.verbose: + print(f" Using deps.json edges ({sum(len(v) for v in deps_edges.values())} total) from {deps_path}") + else: + print( + f"Warning: no usable deps.json at {deps_path}; Perfetto trace will have no dependency arrows. " + f"Run a dep_gen capture (--enable-dep-gen) and pass --deps-json to add them.", + file=sys.stderr, + ) generate_chrome_trace_json( data["tasks"], @@ -1250,13 +1257,15 @@ def main(): print_task_statistics(data["tasks"], func_names) - # The deep-dive reads only the perf JSON and (optionally) the colocated - # deps.json — sibling auto-discovery happens inside run_sched_overhead_analysis. + # The deep-dive reads the perf JSON plus deps.json (for per-thread + # fanout / fanin aggregates). Forward the resolved deps path so an + # explicit --deps-json overrides sibling auto-discovery there too. print("\n=== Scheduler Overhead Deep Dive ===") deep_dive_rc = run_sched_overhead_analysis( input_path, print_sources=True, perf_data=data, + deps_json_path=deps_path if deps_edges is not None else None, ) if deep_dive_rc != 0: print( diff --git a/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h b/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h index a49bbd9b5..54ebecf24 100644 --- a/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h +++ b/src/a2a3/platform/include/aicpu/l2_perf_collector_aicpu.h @@ -73,8 +73,9 @@ void l2_perf_aicpu_init(int worker_count); * `dual_issue_slots[expected_reg_task_id % PLATFORM_L2_AICORE_RING_SIZE]`, * validates the task_id match, fills all AICPU-side fields, commits into * the current records buffer, and rotates the records buffer internally - * once it fills up. Callers must pre-extract fanout into a plain uint64_t - * array (platform layer cannot depend on runtime linked-list types). + * once it fills up. Fanout edges live in the static DAG (deps.json from + * dep_gen) and are joined by the host's swimlane converter post-run, so + * this commit path does not touch fanout. * * Per-core counter accounting: * total_record_count++ — every commit attempt (success or failure) @@ -95,12 +96,10 @@ void l2_perf_aicpu_init(int worker_count); * @param core_type Core type (AIC/AIV) * @param dispatch_time AICPU timestamp when task was dispatched * @param finish_time AICPU timestamp when task completion was observed - * @param fanout Pre-extracted successor task ID array (nullptr if none) - * @param fanout_count Number of entries in fanout array (0 if none) */ int l2_perf_aicpu_complete_record( int core_id, int thread_idx, uint32_t expected_reg_task_id, uint64_t task_id, uint32_t func_id, CoreType core_type, - uint64_t dispatch_time, uint64_t finish_time, const uint64_t *fanout, int32_t fanout_count + uint64_t dispatch_time, uint64_t finish_time ); /** diff --git a/src/a2a3/platform/include/common/dep_gen.h b/src/a2a3/platform/include/common/dep_gen.h index 167b8d241..091fd349a 100644 --- a/src/a2a3/platform/include/common/dep_gen.h +++ b/src/a2a3/platform/include/common/dep_gen.h @@ -15,9 +15,9 @@ * * Captures the inputs to every Orchestrator::submit_task call into a streaming * ring of DepGenRecord. The host side replays these records offline to - * reconstruct the full task dependency graph (deps.json), bypassing the race - * window in L2PerfRecord::fanout[] (where an early-finishing producer would - * have its record sealed before later-submitted consumers can register). + * reconstruct the full task dependency graph (deps.json). deps.json is the + * sole source of truth for fanout edges; the L2 swimlane hot path no longer + * carries fanout to keep AICPU off the per-task GM-store critical path. * * Streaming buffer design mirrors PMU / L2Perf / TensorDump (single source of * algorithmic truth in src/a2a3/platform/include/host/profiling_common/profiler_base.h): diff --git a/src/a2a3/platform/include/common/l2_perf_profiling.h b/src/a2a3/platform/include/common/l2_perf_profiling.h index 7f5c01355..128e418e6 100644 --- a/src/a2a3/platform/include/common/l2_perf_profiling.h +++ b/src/a2a3/platform/include/common/l2_perf_profiling.h @@ -60,11 +60,6 @@ #include "common/core_type.h" #include "common/platform_config.h" -// Maximum number of successor tasks per L2PerfRecord (matches Task::fanout) -#ifndef RUNTIME_MAX_FANOUT -#define RUNTIME_MAX_FANOUT 128 -#endif - // ============================================================================= // L2 perf_level — granularity ladder for the L2 swimlane profiler. // @@ -82,7 +77,7 @@ enum class L2PerfLevel : uint32_t { DISABLED = 0, // No collection at all AICORE_TIMING = 1, // AICore per-task start/end timestamps + task record buffer - AICPU_TIMING = 2, // + AICPU dispatch/finish timestamps + fanout dependency list + AICPU_TIMING = 2, // + AICPU dispatch/finish timestamps SCHED_PHASES = 3, // + scheduler main-loop phase records (SCHED_COMPLETE/DISPATCH/IDLE_WAIT) ORCH_PHASES = 4, // + orchestrator phase records }; @@ -92,7 +87,13 @@ enum class L2PerfLevel : uint32_t { // ============================================================================= /** - * Single task execution record + * Single task execution record. + * + * Fanout edges live in the static DAG (deps.json from dep_gen) — not in + * this record. Keeping fanout out of the hot AICPU commit path avoids a + * per-task ~1 KB GM store + a linked-list walk on the scheduler's + * critical fanin tail. The host swimlane export emits empty fanout + * fields; `swimlane_converter.py` joins deps.json at post-process time. */ struct L2PerfRecord { // Timing information (device clock timestamps) @@ -111,10 +112,6 @@ struct L2PerfRecord { uint64_t task_id; uint32_t func_id; // Kernel function identifier CoreType core_type; // Core type (AIC/AIV) - - // Dependency relationship (fanout only) - uint64_t fanout[RUNTIME_MAX_FANOUT]; // Successor task task_id array - int32_t fanout_count; // Number of successor tasks } __attribute__((aligned(64))); static_assert(sizeof(L2PerfRecord) % 64 == 0, "L2PerfRecord must be 64-byte aligned for optimal cache performance"); diff --git a/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp b/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp index e4f9d1c68..cc81ab919 100644 --- a/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp +++ b/src/a2a3/platform/src/aicpu/l2_perf_collector_aicpu.cpp @@ -226,7 +226,7 @@ static void switch_records_buffer(int core_id, int thread_idx) { int l2_perf_aicpu_complete_record( int core_id, int thread_idx, uint32_t expected_reg_task_id, uint64_t task_id, uint32_t func_id, CoreType core_type, - uint64_t dispatch_time, uint64_t finish_time, const uint64_t *fanout, int32_t fanout_count + uint64_t dispatch_time, uint64_t finish_time ) { if (core_id < 0 || core_id >= PLATFORM_MAX_CORES) { return -1; @@ -288,23 +288,15 @@ int l2_perf_aicpu_complete_record( record->func_id = func_id; record->core_type = core_type; - // AICPU_TIMING and above: dispatch/finish timing and fanout dependency info + // AICPU_TIMING and above: dispatch/finish timing. Fanout edges live in + // the static DAG (deps.json) and are joined by the host post-run, so they + // are not written here. if (g_l2_perf_level >= L2PerfLevel::AICPU_TIMING) { record->dispatch_time = dispatch_time; record->finish_time = finish_time; - if (fanout != nullptr && fanout_count > 0) { - int32_t n = (fanout_count > RUNTIME_MAX_FANOUT) ? RUNTIME_MAX_FANOUT : fanout_count; - for (int32_t i = 0; i < n; i++) { - record->fanout[i] = fanout[i]; - } - record->fanout_count = n; - } else { - record->fanout_count = 0; - } } else { record->dispatch_time = 0; record->finish_time = 0; - record->fanout_count = 0; } uint32_t new_count = count + 1; diff --git a/src/a2a3/platform/src/host/l2_perf_collector.cpp b/src/a2a3/platform/src/host/l2_perf_collector.cpp index 6b0744aa5..745dab8b6 100644 --- a/src/a2a3/platform/src/host/l2_perf_collector.cpp +++ b/src/a2a3/platform/src/host/l2_perf_collector.cpp @@ -592,9 +592,10 @@ int L2PerfCollector::export_swimlane_json() { } // Step 7: Write JSON data - int version = static_cast(l2_perf_level_); + // Fanout fields are emitted as empty/zero — the device-side hot path no + // longer carries them. Downstream (swimlane_converter.py) joins fanout + // from the sibling deps.json (dep_gen output). outfile << "{\n"; - outfile << " \"version\": " << version << ",\n"; outfile << " \"tasks\": [\n"; for (size_t i = 0; i < tagged_records.size(); ++i) { @@ -620,18 +621,9 @@ int L2PerfCollector::export_swimlane_json() { outfile << " \"end_time_us\": " << std::fixed << std::setprecision(3) << end_us << ",\n"; outfile << " \"duration_us\": " << std::fixed << std::setprecision(3) << duration_us << ",\n"; outfile << " \"dispatch_time_us\": " << std::fixed << std::setprecision(3) << dispatch_us << ",\n"; - outfile << " \"finish_time_us\": " << std::fixed << std::setprecision(3) << finish_us << ",\n"; - outfile << " \"fanout\": ["; - int safe_fanout_count = - (record.fanout_count >= 0 && record.fanout_count <= RUNTIME_MAX_FANOUT) ? record.fanout_count : 0; - for (int j = 0; j < safe_fanout_count; ++j) { - outfile << record.fanout[j]; - if (j < safe_fanout_count - 1) { - outfile << ", "; - } - } - outfile << "],\n"; - outfile << " \"fanout_count\": " << record.fanout_count << "\n"; + outfile << " \"finish_time_us\": " << std::fixed << std::setprecision(3) << finish_us << "\n"; + // Fanout is no longer carried on the device hot path — dep_gen replay + // (deps.json) is the sole source of truth, joined in by tooling. outfile << " }"; if (i < tagged_records.size() - 1) { outfile << ","; diff --git a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp index b89d99e1a..d0d36ceae 100644 --- a/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/host_build_graph/aicpu/aicpu_executor.cpp @@ -739,18 +739,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (prev_running_id != AICPU_TASK_INVALID) { Task *prev_task = &runtime.tasks[prev_running_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < prev_task->fanout_count; i++) { - fanout_arr[i] = static_cast(prev_task->fanout[i]); - } - fanout_count = prev_task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(prev_running_id), static_cast(prev_running_id), prev_task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for implicit task %d", core_id, @@ -764,18 +756,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *task = &runtime.tasks[completed_task_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < task->fanout_count; i++) { - fanout_arr[i] = static_cast(task->fanout[i]); - } - fanout_count = task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(completed_task_id), static_cast(completed_task_id), task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task %d", core_id, completed_task_id @@ -860,18 +844,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (l2_perf_enabled) { uint64_t finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *prev_task = &runtime.tasks[prev_running_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < prev_task->fanout_count; i++) { - fanout_arr[i] = static_cast(prev_task->fanout[i]); - } - fanout_count = prev_task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(prev_running_id), static_cast(prev_running_id), prev_task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for implicit task %d", core_id, @@ -911,18 +887,10 @@ int AicpuExecutor::resolve_and_dispatch(Runtime &runtime, int thread_idx, const if (l2_perf_enabled) { uint64_t finish_ts = (l2_perf_level >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; Task *task = &runtime.tasks[completed_task_id]; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int fanout_count = 0; - if (l2_perf_level >= L2PerfLevel::AICPU_TIMING) { - for (int i = 0; i < task->fanout_count; i++) { - fanout_arr[i] = static_cast(task->fanout[i]); - } - fanout_count = task->fanout_count; - } if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(completed_task_id), static_cast(completed_task_id), task->func_id, h->core_type, - dispatch_timestamps_[core_id], finish_ts, fanout_arr, fanout_count + dispatch_timestamps_[core_id], finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task %d", core_id, completed_task_id diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md index 34e17cc86..566249ac7 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/docs/profiling_levels.md @@ -254,10 +254,13 @@ header just like on onboard. | ----- | -------- | | 0 | Nothing (disabled) | | 1 | AICore timing only (start/end/task_id/func_id/core_type) | -| 2 | + dispatch_time, finish_time, fanout | +| 2 | + dispatch_time, finish_time | | 3 | + Scheduler phases (`SCHED_*`) | | 4 | + Orchestrator phases (full) | +Fanout edges are no longer carried on the device hot path — `swimlane_converter.py` +joins them from the sibling `deps.json` (produced by dep_gen) at post-process time. + Bare `--enable-l2-swimlane` = level 4 (backward compatible). ### Level gating in AICPU code @@ -270,7 +273,7 @@ content it depends on instead of relying on magic numbers: // Cheap binary check, available immediately after kernel entry. if (is_l2_swimlane_enabled()) { ... } -// AICPU dispatch/finish timestamps + fanout. +// AICPU dispatch/finish timestamps. // Granular checks below require l2_perf_aicpu_init to have already run // (so the level has been promoted from the shared-memory header). if (get_l2_perf_level() >= L2PerfLevel::AICPU_TIMING) { ... } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h b/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h index b41c212a7..ea39bf7ea 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/host/dep_gen_replay.h @@ -24,12 +24,10 @@ * device run completes, so going through the filesystem would just be * extra I/O and an extra file in the output directory. * - * deps.json supersedes ``L2PerfRecord::fanout[]`` for tools that need the - * *complete* dependency graph: fanout is sealed when a producer finishes, so - * consumers submitted after a fast producer retires never get attributed to - * it (the race window that motivated dep_gen). Replay sees every submit and - * so reconstructs the graph the runtime would have built if no producer ever - * raced ahead. + * deps.json is the sole source of truth for fanout: the L2 swimlane hot + * path no longer records ``L2PerfRecord::fanout[]`` (taking the per-task + * 1 KB GM store off the scheduler critical path). Replay sees every + * submit and reconstructs the complete dependency graph. * * Output format (deps.json, v2): * diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 813f1f846..d1c039785 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -554,10 +554,9 @@ static TaskOutputTensors submit_task_common( // dep_gen capture point: snapshot the orch submit_task inputs while the // tensormap is still in its pre-lookup state for this task. Replay reads - // these records offline to reconstruct the complete dep graph, sidestepping - // the race window in L2PerfRecord::fanout[] where an early-finishing - // producer's record gets sealed before later-submitted consumers can - // register themselves. + // these records offline to reconstruct the complete dep graph — the sole + // source of truth for fanout now that the swimlane hot path no longer + // records it. if (is_dep_gen_enabled()) { const void *tensor_ptrs[MAX_TENSOR_ARGS]; // TensorArgType is `enum class : int32_t` (4 bytes); the on-disk record diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp index 140f000cb..152af452e 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/scheduler/scheduler_completion.cpp @@ -154,24 +154,12 @@ void SchedulerContext::complete_slot_task( #if PTO2_SCHED_PROFILING uint64_t t_perf_start = get_sys_cnt_aicpu(); #endif - uint64_t finish_ts = 0; - uint64_t fanout_arr[RUNTIME_MAX_FANOUT]; - int32_t fanout_n = 0; - - if (l2_perf_level_ >= L2PerfLevel::AICPU_TIMING) { - finish_ts = get_sys_cnt_aicpu(); - PTO2DepListEntry *cur = slot_state.fanout_head; - while (cur != nullptr && fanout_n < RUNTIME_MAX_FANOUT) { - fanout_arr[fanout_n++] = cur->slot_state->task->task_id.raw; - cur = cur->next; - } - } + uint64_t finish_ts = (l2_perf_level_ >= L2PerfLevel::AICPU_TIMING) ? get_sys_cnt_aicpu() : 0; int32_t perf_slot_idx = static_cast(subslot); if (l2_perf_aicpu_complete_record( core_id, thread_idx, static_cast(expected_reg_task_id), slot_state.task->task_id.raw, - slot_state.task->kernel_id[perf_slot_idx], hank[core_id].core_type, dispatch_ts, finish_ts, fanout_arr, - fanout_n + slot_state.task->kernel_id[perf_slot_idx], hank[core_id].core_type, dispatch_ts, finish_ts ) != 0) { LOG_ERROR( "Core %d: l2_perf_aicpu_complete_record failed for task 0x%" PRIx64, core_id, diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py index b7ad064ae..35a9d3276 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen.py @@ -9,27 +9,20 @@ # ----------------------------------------------------------------------------------------------------------- """dep_gen capture + replay sim test. -Re-runs the ``vector_example`` orchestration with ``--enable-dep-gen`` (and, -in standalone mode, auto-adds ``--enable-l2-swimlane`` for the fanout ⊆ deps -gate). Verifies the end-to-end dep_gen pipeline on a2a3sim: +Re-runs the ``vector_example`` orchestration with ``--enable-dep-gen``. +Verifies the end-to-end dep_gen pipeline on a2a3sim: - 1. ``/deps.json`` is produced by the host replay - (PTO2TensorMap replay → JSON edge list), and contains exactly the - 6 edges documented in example_orchestration.cpp. The capture path - (host collector drains the device ring buffer into memory and feeds - the replay directly — no submit_trace.bin on disk) is exercised - implicitly: if it broke, deps.json would be empty or wrong. - 2. **Validation gate** (when l2_perf_records.json is present, i.e. - ``--enable-l2-swimlane`` was also enabled): every edge in - L2PerfRecord::fanout[] also appears in deps.json. deps may have - MORE edges than fanout (race-window edges fanout missed); we never - assert symmetry — that's the entire reason dep_gen exists. + ``/deps.json`` is produced by the host replay + (PTO2TensorMap replay → JSON edge list), and contains exactly the + 6 edges documented in example_orchestration.cpp. The capture path + (host collector drains the device ring buffer into memory and feeds + the replay directly — no submit_trace.bin on disk) is exercised + implicitly: if it broke, deps.json would be empty or wrong. -Pytest entry: needs ``--enable-dep-gen`` (capture+replay assertions) and -``--enable-l2-swimlane`` (fanout ⊆ deps gate). Standalone entry: pass -``--enable-dep-gen`` and the swimlane flag is added automatically so a -plain ``python test_dep_gen_capture.py -p a2a3sim --enable-dep-gen`` -exercises the full gate. +deps.json is now the sole source of truth for fanout edges — the device +hot path no longer records L2PerfRecord::fanout[], so there is no +"fanout ⊆ deps" cross-check to run. swimlane_converter.py joins +deps.json into the Perfetto trace at post-process time. Compute correctness is delegated to the upstream ``vector_example`` test — this case re-uses the same orchestration to keep coverage focused on the @@ -128,11 +121,8 @@ def test_run(self, st_platform, st_worker, request): def _post_validate(self, case): """Skips if no per-case output_prefix dir exists (e.g. selector skipped this case at pytest level). When the dir + deps.json are - present, assert: - - - deps.json contains the 6 edges documented in example_orchestration.cpp - - if l2_perf_records.json is also present (--enable-l2-swimlane on), - every fanout edge it records is a subset of the deps.json edge set + present, assert that deps.json contains the 6 edges documented in + example_orchestration.cpp. """ case_name = case["name"] safe_label = _sanitize_for_filename(f"TestDepGen_{case_name}") @@ -214,24 +204,6 @@ def _post_validate(self, case): f"edge {e.get('pred')}->{e.get('succ')} (source={source}) missing consumer_shape/start_offset/strides" ) - # ---- fanout ⊆ deps validation gate ---- - perf = out_dir / "l2_perf_records.json" - if perf.exists(): - with perf.open() as f: - pdata = json.load(f) - fanout_edges = set() - for task in pdata.get("tasks", []): - src = int(task["task_id"]) - for succ in task.get("fanout", []): - fanout_edges.add((src, int(succ))) - missing_in_deps = fanout_edges - deps_edges - assert not missing_in_deps, ( - f"fanout ⊆ deps gate FAILED: edges present in l2_perf_records.json " - f"fanout[] but absent from deps.json: {missing_in_deps}. " - f"This is a replay-side regression — the replay should be a " - f"superset of the runtime's fanout view." - ) - # ---- Tool smoke: deps_to_graph ---- # Exit-code-only check; we don't validate the HTML content. A schema # change that breaks the viewer fires here in the same CI step that @@ -257,11 +229,4 @@ def _post_validate(self, case): if __name__ == "__main__": - # ``_post_validate`` is invoked by the SceneTestCase framework after each - # case runs (pytest path AND standalone). Standalone main just adds the - # swimlane flag so the fanout ⊆ deps gate runs by default — both flags - # compose cleanly and the gate is the most informative assertion the test - # produces, so don't make the user remember to ask for it. - if "--enable-dep-gen" in sys.argv and "--enable-l2-swimlane" not in sys.argv: - sys.argv.append("--enable-l2-swimlane") SceneTestCase.run_module(__name__) diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py index ad05dae7e..90910bab6 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/dep_gen/test_dep_gen_chain.py @@ -31,7 +31,6 @@ """ import json -import sys import torch from simpler.task_interface import ArgDirection as D @@ -209,8 +208,4 @@ def _post_validate(self, case): if __name__ == "__main__": - # Standalone entry: auto-add the swimlane flag so fanout ⊆ deps gate runs - # alongside the chain assertions, matching test_dep_gen.py's convention. - if "--enable-dep-gen" in sys.argv and "--enable-l2-swimlane" not in sys.argv: - sys.argv.append("--enable-l2-swimlane") SceneTestCase.run_module(__name__) diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py index 035461dc4..43e4d7daa 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py +++ b/tests/st/a2a3/tensormap_and_ringbuffer/dfx/l2_swimlane/_swimlane_validate.py @@ -30,7 +30,7 @@ from simpler_setup.scene_test import _outputs_dir, _sanitize_for_filename -_REQUIRED_TASK_FIELDS = ("task_id", "func_id", "core_id", "core_type", "start_time_us", "end_time_us", "fanout") +_REQUIRED_TASK_FIELDS = ("task_id", "func_id", "core_id", "core_type", "start_time_us", "end_time_us") def validate_perf_artifact(case_label: str, *, expected_task_count: int | None = None) -> None: @@ -53,7 +53,6 @@ def validate_perf_artifact(case_label: str, *, expected_task_count: int | None = with perf.open() as f: data = json.load(f) - assert data.get("version") in (1, 2, 3, 4), f"unexpected version: {data.get('version')}" tasks = data.get("tasks") assert isinstance(tasks, list), "tasks field missing or not a list" assert len(tasks) > 0, f"perf records empty under {perf}"