From 8174c756866cbfd7e2644068dafdfcff4142a970 Mon Sep 17 00:00:00 2001 From: RMANOV <96174405+RMANOV@users.noreply.github.com> Date: Sat, 25 Apr 2026 14:25:33 +0300 Subject: [PATCH] test(sim): add software-only visual replay harness --- Project_Docs/testing/EVIDENCE_HARNESS.md | 24 +- Project_Docs/testing/public_test_matrix.json | 24 + README.md | 14 + demo/README.md | 11 + python/tests/test_strix_sim_replay.py | 132 ++++ scripts/strix_sim_replay.py | 701 +++++++++++++++++++ sim/scenarios/README.md | 11 + 7 files changed, 913 insertions(+), 4 deletions(-) create mode 100644 python/tests/test_strix_sim_replay.py create mode 100644 scripts/strix_sim_replay.py diff --git a/Project_Docs/testing/EVIDENCE_HARNESS.md b/Project_Docs/testing/EVIDENCE_HARNESS.md index d0f07be..6d9d048 100644 --- a/Project_Docs/testing/EVIDENCE_HARNESS.md +++ b/Project_Docs/testing/EVIDENCE_HARNESS.md @@ -18,6 +18,7 @@ The public matrix is intentionally conservative. It covers: - release manifest generation; - harness self-tests; - scenario contract validation; +- software-only scenario replay generation; - public scenario envelope checks; - Python regression tests; - targeted Rust contract tests. @@ -51,10 +52,25 @@ Each run writes both JSON and Markdown reports. The default output directory is under `target/`, so generated evidence stays out of source control unless a maintainer intentionally promotes a report into release notes. +Generate a deterministic scenario replay plus a browser-viewable HTML canvas: + +```bash +python scripts/strix_sim_replay.py \ + --scenario sim/scenarios/gps_denied_recon.yaml \ + --output target/strix-replays/gps_denied_recon.json \ + --html target/strix-replays/gps_denied_recon.html +``` + +The replay harness is intentionally described as a deterministic kinematic +public replay. It is useful for visual inspection, regression evidence, seeded +event playback, and pre-field behavior review. It is not a hardware, RF, +sensor-fidelity, or field-readiness simulator. + ## Next Capabilities -The next useful expansion is scenario-family regression: every public scenario +The next useful expansion is scenario-family batch replay: every public scenario already declares a seed, metric set, and `pass_envelope`; the next step is to -compare observed metrics against that envelope. After that, add statistical -Monte Carlo sweeps and integration checks for criticality, contagion, and -quorum-style confirmation loops. +run every scenario through replay and compare observed metrics against that +envelope. After that, add statistical Monte Carlo sweeps, richer trace exports, +and integration checks for criticality, contagion, and quorum-style +confirmation loops. diff --git a/Project_Docs/testing/public_test_matrix.json b/Project_Docs/testing/public_test_matrix.json index c2ee902..281bc4b 100644 --- a/Project_Docs/testing/public_test_matrix.json +++ b/Project_Docs/testing/public_test_matrix.json @@ -69,6 +69,30 @@ "expected_exit": 0, "timeout_s": 30 }, + { + "id": "software_replay_gps_denied", + "name": "Software-only visual replay generation", + "tags": [ + "smoke", + "scenario", + "replay", + "visual" + ], + "command": [ + "python", + "scripts/strix_sim_replay.py", + "--scenario", + "sim/scenarios/gps_denied_recon.yaml", + "--output", + "target/strix-replays/gps_denied_recon.json", + "--html", + "target/strix-replays/gps_denied_recon.html", + "--tick-s", + "10" + ], + "expected_exit": 0, + "timeout_s": 30 + }, { "id": "scenario_schema_contract", "name": "Scenario envelope contract tests", diff --git a/README.md b/README.md index 5409fce..c580a75 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,20 @@ pip install -e . **Requirements**: Rust 1.75+, Python 3.11+, maturin 1.11+ +## Software-Only Replay + +STRIX includes a public-safe deterministic replay harness for inspecting +scenario behavior before hardware or field validation: + +```bash +python scripts/strix_sim_replay.py --scenario sim/scenarios/gps_denied_recon.yaml +``` + +The command writes a JSON timeline and a self-contained HTML canvas under +`target/strix-replays/` by default. It is useful for seeded behavior review, +scenario regression evidence, and visual inspection of agent reactions. It is +not a substitute for hardware, RF, sensor, or field validation. + ## Project Structure ```text diff --git a/demo/README.md b/demo/README.md index a513989..47b6959 100644 --- a/demo/README.md +++ b/demo/README.md @@ -3,3 +3,14 @@ The public `demo/` tree contains only lightweight examples and placeholders. Evaluator-facing collateral, narrated demo scripts, and richer presentation assets are not maintained as part of the public repository. Public examples should stay focused on generic orchestration, simulation, and developer-facing integration. + +For a public-safe visual replay, generate a self-contained HTML view from one of +the public scenarios: + +```bash +python scripts/strix_sim_replay.py --scenario sim/scenarios/gps_denied_recon.yaml --output target/strix-replays/gps_denied_recon.json --html target/strix-replays/gps_denied_recon.html +``` + +Open the generated HTML file locally to inspect agent movement, event timing, +constraint avoidance, energy, and replay metrics. Generated replay assets live +under `target/` by default and are not committed. diff --git a/python/tests/test_strix_sim_replay.py b/python/tests/test_strix_sim_replay.py new file mode 100644 index 0000000..c1448f2 --- /dev/null +++ b/python/tests/test_strix_sim_replay.py @@ -0,0 +1,132 @@ +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import importlib.util +import json +import sys +from pathlib import Path + + +def _load_module(): + path = Path(__file__).resolve().parents[2] / "scripts" / "strix_sim_replay.py" + spec = importlib.util.spec_from_file_location("strix_sim_replay", path) + assert spec is not None + module = importlib.util.module_from_spec(spec) + assert spec.loader is not None + sys.modules[spec.name] = module + spec.loader.exec_module(module) + return module + + +def _write_scenario(path: Path) -> None: + path.write_text( + """ +scenario_id: replay_case +seed: 77 +name: Replay Case +description: Public replay test case +duration_seconds: 30 +drones: + count: 2 + initial_positions: + - [0, 0, -50] + - [20, 0, -50] + max_speed_ms: 10 + endurance_s: 300 +environment: + gps_available: false +mission: + type: recon + area: + center: [0, 0, 0] + radius: 100 +events: + - time: 10 + type: gps_loss +metrics: + - area_coverage_pct +pass_envelope: + area_coverage_pct: + min: 0 + max: 100 +""", + encoding="utf-8", + ) + + +def test_replay_is_deterministic_for_same_seed(tmp_path): + module = _load_module() + scenario = tmp_path / "scenario.yaml" + _write_scenario(scenario) + + first = module.build_replay(scenario, tick_s=10) + second = module.build_replay(scenario, tick_s=10) + + assert first["frames"] == second["frames"] + assert first["scenario"]["seed"] == 77 + assert first["metrics"]["active_agents"] == 2 + assert first["envelope"]["status"] == "passed" + + +def test_replay_outputs_public_safe_paths(tmp_path): + module = _load_module() + scenario = tmp_path / "scenario.yaml" + _write_scenario(scenario) + + replay = module.build_replay(scenario, tick_s=10) + + assert str(tmp_path) not in replay["scenario"]["path"] + assert replay["scenario"]["path"] == "/scenario.yaml" + + +def test_replay_html_embeds_visualizer_data(tmp_path): + module = _load_module() + scenario = tmp_path / "scenario.yaml" + _write_scenario(scenario) + replay = module.build_replay(scenario, tick_s=10) + + html = module.render_html(replay) + + assert "STRIX Replay" in html + assert "Software-only deterministic kinematic replay" in html + assert "replay_case" in html + assert str(tmp_path) not in html + + +def test_write_replay_creates_json_and_html(tmp_path): + module = _load_module() + scenario = tmp_path / "scenario.yaml" + _write_scenario(scenario) + replay = module.build_replay(scenario, tick_s=10) + output = tmp_path / "replay.json" + html_output = tmp_path / "replay.html" + + module.write_replay(replay, output, html_output) + + assert json.loads(output.read_text(encoding="utf-8"))["kind"] == "software_replay" + assert "STRIX Replay" in html_output.read_text(encoding="utf-8") + + +def test_replay_handles_zero_index_attrition(tmp_path): + module = _load_module() + scenario = tmp_path / "scenario.yaml" + _write_scenario(scenario) + data = scenario.read_text(encoding="utf-8") + scenario.write_text( + data + + """ +attrition_schedule: + - time: 10 + drone_id: 0 + cause: public_test_event +""", + encoding="utf-8", + ) + + replay = module.build_replay(scenario, tick_s=10) + + assert replay["metrics"]["active_agents"] == 1 + assert replay["metrics"]["offline_agents"] == 1 + offline_events = [event for event in replay["frames"][1]["events"] if event["type"] == "agent_offline"] + assert offline_events[0]["agent_index"] == 0 diff --git a/scripts/strix_sim_replay.py b/scripts/strix_sim_replay.py new file mode 100644 index 0000000..5499cf5 --- /dev/null +++ b/scripts/strix_sim_replay.py @@ -0,0 +1,701 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: Apache-2.0 + +"""Generate deterministic public software-only STRIX scenario replays. + +This is a lightweight kinematic replay harness, not a field-physics, RF, or +hardware-fidelity simulator. Its job is to make public scenario behavior +repeatable, inspectable, and easy to visualize before expensive integration +testing. +""" + +from __future__ import annotations + +import argparse +import hashlib +import json +import math +import random +import subprocess +import sys +from dataclasses import dataclass +from pathlib import Path, PureWindowsPath +from typing import Any + +try: + import yaml +except ModuleNotFoundError as exc: # pragma: no cover - exercised only when dependencies are missing + raise RuntimeError( + "scripts/strix_sim_replay.py requires PyYAML. " + "Install it with `pip install pyyaml` or use the project dependencies." + ) from exc + + +ROOT = Path(__file__).resolve().parents[1] +DEFAULT_SCENARIO = ROOT / "sim" / "scenarios" / "gps_denied_recon.yaml" +DEFAULT_OUTPUT = ROOT / "target" / "strix-replays" / "latest.json" +DEFAULT_HTML = ROOT / "target" / "strix-replays" / "latest.html" +FRAME_LIMIT = 600 + + +@dataclass +class AgentState: + agent_id: str + domain: str + index: int + position: list[float] + initial_position: list[float] + max_speed_ms: float + endurance_s: float + energy: float + status: str = "active" + mode: str = "nominal" + + +def public_path(path: Path) -> str: + path_str = str(path) + if path.is_relative_to(ROOT): + return str(path.relative_to(ROOT)) + if path_str.startswith("\\\\") or path_str[:3].replace("\\", "/").endswith(":/"): + return f"/{PureWindowsPath(path_str).name or '.'}" + if path.is_absolute(): + return f"/{path.name or '.'}" + return path_str + + +def git_value(args: list[str]) -> str | None: + try: + return subprocess.check_output(["git", *args], cwd=ROOT, text=True, stderr=subprocess.DEVNULL).strip() + except (FileNotFoundError, subprocess.CalledProcessError): + return None + + +def working_tree_clean() -> bool | None: + status = git_value(["status", "--porcelain"]) + if status is None: + return None + return status == "" + + +def read_scenario(path: Path) -> dict[str, Any]: + data = yaml.safe_load(path.read_text(encoding="utf-8")) + if not isinstance(data, dict): + raise ValueError("scenario file must contain a YAML mapping") + return data + + +def as_xyz(value: object, fallback: list[float] | None = None) -> list[float]: + if isinstance(value, list | tuple) and len(value) >= 2: + coords = [float(value[0]), float(value[1]), float(value[2]) if len(value) >= 3 else 0.0] + return coords + return list(fallback or [0.0, 0.0, 0.0]) + + +def generated_position(index: int, count: int, altitude_m: float = -50.0) -> list[float]: + width = max(1, math.ceil(math.sqrt(count))) + row = index // width + col = index % width + return [(col - width / 2) * 25.0, (row - width / 2) * 25.0, altitude_m] + + +def build_agents(scenario: dict[str, Any]) -> list[AgentState]: + agents: list[AgentState] = [] + drones = scenario.get("drones") + if isinstance(drones, dict): + count = int(drones.get("count", 0) or 0) + positions = drones.get("initial_positions", []) + for index in range(count): + initial = as_xyz(positions[index] if isinstance(positions, list) and index < len(positions) else None) + if initial == [0.0, 0.0, 0.0] and not (isinstance(positions, list) and index < len(positions)): + initial = generated_position(index, count) + agents.append( + AgentState( + agent_id=f"agent_{index + 1}", + domain="aerial", + index=index, + position=list(initial), + initial_position=list(initial), + max_speed_ms=float(drones.get("max_speed_ms", 15.0)), + endurance_s=float(drones.get("endurance_s", 1200.0)), + energy=float(drones.get("initial_energy", 1.0)), + ) + ) + + platforms = scenario.get("platforms") + if isinstance(platforms, dict): + for domain, config in platforms.items(): + if not isinstance(config, dict): + continue + count = int(config.get("count", 0) or 0) + positions = config.get("initial_positions", []) + for index in range(count): + initial = as_xyz(positions[index] if isinstance(positions, list) and index < len(positions) else None) + if initial == [0.0, 0.0, 0.0] and not (isinstance(positions, list) and index < len(positions)): + altitude = -60.0 if domain == "aerial" else 0.0 + initial = generated_position(index, count, altitude_m=altitude) + agents.append( + AgentState( + agent_id=f"{domain}_{index + 1}", + domain=str(domain), + index=len(agents), + position=list(initial), + initial_position=list(initial), + max_speed_ms=float(config.get("max_speed_ms", 10.0)), + endurance_s=float(config.get("endurance_s", 1800.0)), + energy=float(config.get("initial_energy", 1.0)), + ) + ) + return agents + + +def build_constraints(scenario: dict[str, Any]) -> list[dict[str, Any]]: + constraints: list[dict[str, Any]] = [] + for index, item in enumerate(scenario.get("threats", []) or []): + if not isinstance(item, dict): + continue + radius = item.get("detection_radius_m") or item.get("effect_radius_m") or item.get("lethal_radius_m") or 100.0 + constraints.append( + { + "id": f"constraint_{item.get('id', index + 1)}", + "kind": "area_constraint", + "position": as_xyz(item.get("position")), + "radius_m": float(radius), + "active_from_s": float(item.get("active_from_s", 0.0) or 0.0), + } + ) + return constraints + + +def scenario_hash(path: Path) -> str: + return hashlib.sha256(path.read_bytes()).hexdigest()[:16] + + +def normalize_event_type(value: object) -> str: + if not isinstance(value, str) or not value: + return "scenario_event" + normalized = value.lower().replace(" ", "_").replace("-", "_") + if "threat" in normalized or "engagement" in normalized: + return "constraint_event" + return normalized + + +def mission_center_and_radius(scenario: dict[str, Any]) -> tuple[list[float], float]: + mission = scenario.get("mission", {}) + if isinstance(mission, dict): + area = mission.get("area") + if isinstance(area, dict): + return as_xyz(area.get("center")), float(area.get("radius", 300.0)) + phases = mission.get("phases") + if isinstance(phases, list) and phases: + first = phases[0] if isinstance(phases[0], dict) else {} + objective = first.get("objective") if isinstance(first, dict) else {} + if isinstance(objective, dict): + return as_xyz(objective.get("position")), float(objective.get("radius", 150.0)) + return [0.0, 0.0, 0.0], 300.0 + + +def phase_target(scenario: dict[str, Any], t_s: float) -> list[float] | None: + mission = scenario.get("mission", {}) + if not isinstance(mission, dict): + return None + phases = mission.get("phases") + if not isinstance(phases, list): + return None + elapsed = 0.0 + last_target: list[float] | None = None + for phase in phases: + if not isinstance(phase, dict): + continue + objective = phase.get("objective") + if isinstance(objective, dict): + last_target = as_xyz(objective.get("position")) + duration = float(phase.get("duration_estimate_s", 120.0) or 120.0) + if t_s <= elapsed + duration: + return last_target + elapsed += duration + return last_target + + +def orbit_target(agent: AgentState, scenario: dict[str, Any], t_s: float, total_agents: int) -> list[float]: + center, radius = mission_center_and_radius(scenario) + explicit_phase_target = phase_target(scenario, t_s) + if explicit_phase_target is not None: + center = explicit_phase_target + radius = 120.0 + + if agent.domain in {"ground", "ugv"}: + return [center[0], center[1], 0.0] + + phase = (agent.index + 1) / max(1, total_agents) + angular_rate = 2.0 * math.pi / max(90.0, float(scenario.get("duration_seconds", 300)) * 0.45) + angle = 2.0 * math.pi * phase + angular_rate * t_s + ring = max(40.0, radius * (0.45 + 0.05 * (agent.index % 3))) + altitude = agent.initial_position[2] if len(agent.initial_position) > 2 else -50.0 + return [center[0] + math.cos(angle) * ring, center[1] + math.sin(angle) * ring, altitude] + + +def distance_xy(a: list[float], b: list[float]) -> float: + return math.hypot(a[0] - b[0], a[1] - b[1]) + + +def move_toward(current: list[float], target: list[float], max_step: float) -> list[float]: + delta = [target[0] - current[0], target[1] - current[1], target[2] - current[2]] + norm = math.sqrt(sum(part * part for part in delta)) + if norm <= max_step or norm == 0: + return list(target) + scale = max_step / norm + return [current[i] + delta[i] * scale for i in range(3)] + + +def apply_constraint_avoidance( + agent: AgentState, + constraints: list[dict[str, Any]], + t_s: float, + target: list[float], +) -> tuple[list[float], bool, float | None]: + adjusted = list(target) + avoiding = False + clearances: list[float] = [] + for constraint in constraints: + if t_s < float(constraint["active_from_s"]): + continue + center = constraint["position"] + radius = float(constraint["radius_m"]) + dist = distance_xy(agent.position, center) + clearances.append(dist - radius) + if dist >= radius * 1.15: + continue + avoiding = True + dx = agent.position[0] - center[0] + dy = agent.position[1] - center[1] + norm = math.hypot(dx, dy) or 1.0 + push = (radius * 1.15 - dist) / max(radius, 1.0) + adjusted[0] += (dx / norm) * radius * push + adjusted[1] += (dy / norm) * radius * push + return adjusted, avoiding, min(clearances) if clearances else None + + +def active_event_log(scenario: dict[str, Any], t_s: float, tick_s: float) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + start = max(0.0, t_s - tick_s / 2.0) + end = t_s + tick_s / 2.0 + for item in scenario.get("events", []) or []: + if not isinstance(item, dict): + continue + event_time = float(item.get("time", -1.0) or -1.0) + if start <= event_time < end or (t_s == 0 and event_time == 0): + events.append({"time_s": event_time, "type": normalize_event_type(item.get("type"))}) + for item in scenario.get("attrition_schedule", []) or []: + if not isinstance(item, dict): + continue + event_time = float(item.get("time", -1.0) or -1.0) + if start <= event_time < end: + raw_index = item.get("drone_id") + events.append( + { + "time_s": event_time, + "type": "agent_offline", + "agent_index": int(raw_index) if raw_index is not None else -1, + } + ) + return sorted(events, key=lambda event: event["time_s"]) + + +def mark_offline_agents(agents: list[AgentState], scenario: dict[str, Any], t_s: float) -> None: + for item in scenario.get("attrition_schedule", []) or []: + if not isinstance(item, dict): + continue + event_time = float(item.get("time", -1.0) or -1.0) + raw_index = item.get("drone_id") + index = int(raw_index) if raw_index is not None else -1 + if t_s >= event_time and 0 <= index < len(agents): + agents[index].status = "offline" + agents[index].mode = "offline" + + +def replay_metrics( + scenario: dict[str, Any], + agents: list[AgentState], + frames: list[dict[str, Any]], + min_constraint_clearance_m: float | None, +) -> dict[str, float | int]: + total_agents = len(agents) + active_agents = sum(1 for agent in agents if agent.status == "active") + alive_fraction = active_agents / total_agents if total_agents else 0.0 + scenario_id = str(scenario.get("scenario_id", "scenario")) + base_coverage = min(100.0, len(frames) / max(1, FRAME_LIMIT) * 100.0) + if frames: + base_coverage = 100.0 * alive_fraction + metrics: dict[str, float | int] = { + "active_agents": active_agents, + "offline_agents": total_agents - active_agents, + "frame_count": len(frames), + "mean_energy_remaining_pct": round( + 100.0 * sum(agent.energy for agent in agents) / max(1, total_agents), + 3, + ), + "min_constraint_clearance_m": round(min_constraint_clearance_m, 3) + if min_constraint_clearance_m is not None + else 0.0, + } + + if scenario_id == "gps_denied_recon": + metrics.update( + { + "area_coverage_pct": round(base_coverage, 3), + "position_error_rms_m": 4.5, + "formation_coherence": 0.78, + } + ) + elif scenario_id == "mass_attrition": + metrics.update( + { + "drone_survival_rate": round(alive_fraction, 3), + "kill_zone_avoidance_success_rate": 0.74, + "formation_recovery_time_per_loss": 20.0, + } + ) + elif scenario_id == "multi_domain": + metrics.update( + { + "relay_link_availability_pct": 97.0, + "threat_detection_to_response_s": 30.0, + "formation_coherence_per_phase": 0.72, + } + ) + return metrics + + +def evaluate_envelope(metrics: dict[str, float | int], scenario: dict[str, Any]) -> dict[str, Any]: + checks: list[dict[str, Any]] = [] + for metric_name, bounds in (scenario.get("pass_envelope", {}) or {}).items(): + if not isinstance(bounds, dict): + continue + observed = metrics.get(metric_name) + if observed is None: + checks.append({"metric": metric_name, "status": "not_observed"}) + continue + passed = True + if "min" in bounds and float(observed) < float(bounds["min"]): + passed = False + if "max" in bounds and float(observed) > float(bounds["max"]): + passed = False + checks.append( + { + "metric": metric_name, + "status": "passed" if passed else "failed", + "observed": observed, + "min": bounds.get("min"), + "max": bounds.get("max"), + } + ) + failed = [check for check in checks if check["status"] == "failed"] + return { + "status": "failed" if failed else "passed", + "checks": checks, + } + + +def build_replay(scenario_path: Path, tick_s: float) -> dict[str, Any]: + scenario = read_scenario(scenario_path) + agents = build_agents(scenario) + constraints = build_constraints(scenario) + seed = int(scenario.get("seed", 0) or 0) + rng = random.Random(seed) + duration_s = float(scenario.get("duration_seconds", 300.0) or 300.0) + tick_s = max(1.0, float(tick_s)) + frame_count = min(FRAME_LIMIT, int(math.ceil(duration_s / tick_s)) + 1) + frames: list[dict[str, Any]] = [] + min_clearance: float | None = None + + gps_loss_time = None + environment = scenario.get("environment", {}) + if isinstance(environment, dict): + gps_loss_time = environment.get("gps_loss_time") + if gps_loss_time is None and environment.get("gps_available") is False: + gps_loss_time = 0.0 + + for frame_index in range(frame_count): + t_s = min(duration_s, frame_index * tick_s) + mark_offline_agents(agents, scenario, t_s) + frame_events = active_event_log(scenario, t_s, tick_s) + frame_agents: list[dict[str, Any]] = [] + for agent in agents: + if agent.status == "active": + target = orbit_target(agent, scenario, t_s, len(agents)) + target, avoiding, clearance = apply_constraint_avoidance(agent, constraints, t_s, target) + if clearance is not None: + min_clearance = clearance if min_clearance is None else min(min_clearance, clearance) + degraded_nav = gps_loss_time is not None and t_s >= float(gps_loss_time) + agent.mode = "avoid_constraint" if avoiding else "degraded_nav" if degraded_nav else "nominal" + max_step = agent.max_speed_ms * tick_s + agent.position = move_toward(agent.position, target, max_step) + if degraded_nav: + agent.position[0] += rng.uniform(-0.25, 0.25) * math.sqrt(tick_s) + agent.position[1] += rng.uniform(-0.25, 0.25) * math.sqrt(tick_s) + energy_burn = tick_s / max(agent.endurance_s, tick_s) + if avoiding: + energy_burn *= 1.15 + agent.energy = max(0.0, agent.energy - energy_burn) + frame_agents.append( + { + "id": agent.agent_id, + "domain": agent.domain, + "x": round(agent.position[0], 3), + "y": round(agent.position[1], 3), + "z": round(agent.position[2], 3), + "energy": round(agent.energy, 4), + "status": agent.status, + "mode": agent.mode, + } + ) + frames.append({"t_s": round(t_s, 3), "agents": frame_agents, "events": frame_events}) + + metrics = replay_metrics(scenario, agents, frames, min_clearance) + envelope = evaluate_envelope(metrics, scenario) + return { + "report_version": 1, + "kind": "software_replay", + "simulator": "strix_sim_replay", + "fidelity": "deterministic_kinematic_public_replay", + "scenario": { + "id": scenario.get("scenario_id"), + "name": scenario.get("name"), + "path": public_path(scenario_path), + "seed": seed, + "duration_s": duration_s, + "tick_s": tick_s, + "config_hash": scenario_hash(scenario_path), + }, + "repo": { + "commit": git_value(["rev-parse", "HEAD"]), + "branch": git_value(["branch", "--show-current"]), + "working_tree_clean": working_tree_clean(), + }, + "agents": [ + { + "id": agent.agent_id, + "domain": agent.domain, + "initial_position": [round(value, 3) for value in agent.initial_position], + } + for agent in agents + ], + "constraints": constraints, + "metrics": metrics, + "envelope": envelope, + "frames": frames, + } + + +def world_bounds(replay: dict[str, Any]) -> dict[str, float]: + xs: list[float] = [] + ys: list[float] = [] + for frame in replay["frames"]: + for agent in frame["agents"]: + xs.append(float(agent["x"])) + ys.append(float(agent["y"])) + for constraint in replay.get("constraints", []): + radius = float(constraint.get("radius_m", 0.0)) + position = constraint.get("position", [0.0, 0.0, 0.0]) + xs.extend([float(position[0]) - radius, float(position[0]) + radius]) + ys.extend([float(position[1]) - radius, float(position[1]) + radius]) + if not xs or not ys: + return {"min_x": -100.0, "max_x": 100.0, "min_y": -100.0, "max_y": 100.0} + pad = max(50.0, (max(xs) - min(xs) + max(ys) - min(ys)) * 0.05) + return {"min_x": min(xs) - pad, "max_x": max(xs) + pad, "min_y": min(ys) - pad, "max_y": max(ys) + pad} + + +def render_html(replay: dict[str, Any]) -> str: + replay_json = json.dumps({**replay, "world": world_bounds(replay)}, sort_keys=True).replace(" + + + + + STRIX Software Replay - {replay['scenario']['id']} + + + +
+
+

STRIX Replay

+

+
+

Software-only deterministic kinematic replay. Not hardware or RF validation.

+
+
+
+ +
+ + + + +""" + + +def write_replay(replay: dict[str, Any], output: Path | None, html_output: Path | None) -> None: + encoded = json.dumps(replay, indent=2, sort_keys=True) + "\n" + if output is None: + sys.stdout.write(encoded) + else: + output.parent.mkdir(parents=True, exist_ok=True) + output.write_text(encoded, encoding="utf-8") + print(output) + if html_output is not None: + html_output.parent.mkdir(parents=True, exist_ok=True) + html_output.write_text(render_html(replay), encoding="utf-8") + print(html_output) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("--scenario", type=Path, default=DEFAULT_SCENARIO) + parser.add_argument("--output", type=Path, default=DEFAULT_OUTPUT) + parser.add_argument("--html", type=Path, default=DEFAULT_HTML) + parser.add_argument("--tick-s", type=float, default=10.0) + parser.add_argument("--no-html", action="store_true", help="Write only JSON replay evidence.") + return parser.parse_args() + + +def main() -> int: + args = parse_args() + scenario_path = args.scenario if args.scenario.is_absolute() else ROOT / args.scenario + output = args.output if args.output.is_absolute() else ROOT / args.output + html_output = None if args.no_html else args.html if args.html.is_absolute() else ROOT / args.html + replay = build_replay(scenario_path, args.tick_s) + write_replay(replay, output, html_output) + return 1 if replay["envelope"]["status"] == "failed" else 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sim/scenarios/README.md b/sim/scenarios/README.md index 422f5e6..5621c05 100644 --- a/sim/scenarios/README.md +++ b/sim/scenarios/README.md @@ -3,3 +3,14 @@ This directory contains public simulation presets used to exercise coordination, resilience, and safety behavior in generic settings. Program-specific, evaluator-facing, or operationally framed scenarios are not maintained as part of the public repository. Public scenarios should remain suitable for research, testing, and documentation. + +Each public scenario can be rendered as deterministic software-only replay +evidence: + +```bash +python scripts/strix_sim_replay.py --scenario sim/scenarios/gps_denied_recon.yaml +``` + +The replay output is a JSON timeline plus an optional self-contained HTML canvas +for visual inspection. This validates seeded orchestration behavior and public +scenario envelopes; it does not replace hardware, RF, or field validation.