From 27cf8545589601e439d010737709e4de4399fe49 Mon Sep 17 00:00:00 2001 From: Kshitiz Godara Date: Mon, 1 Jun 2026 17:23:33 +0000 Subject: [PATCH] feat(scripts): add automated QEMU ISO install verification Add scripts/auto_iso_install_qemu.py to drive an unattended Azure Linux ISO install in QEMU over the serial console and verify the installed system boots and logs in. Lets contributors iterate on the installer (Anaconda, kickstart, GRUB, LUKS) without clicking through the TUI, and gives CI a reusable smoke-test entry point. Supports both standard and LUKS-encrypted installs, parses the Anaconda hub at runtime to locate the Root password spoke, handles first-boot and post-relabel LUKS unlocks, verifies login via an echoed marker, captures `systemd-analyze` output, then powers off. Usage: python3 scripts/auto_iso_install_qemu.py -i -d --- scripts/auto_iso_install_qemu.py | 523 +++++++++++++++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100755 scripts/auto_iso_install_qemu.py diff --git a/scripts/auto_iso_install_qemu.py b/scripts/auto_iso_install_qemu.py new file mode 100755 index 00000000000..03e15dc2e48 --- /dev/null +++ b/scripts/auto_iso_install_qemu.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python3 +"""Automate Azure Linux ISO install in QEMU over serial console. + +This script boots an installer ISO, watches serial output, responds to known +prompts, waits for first boot, logs in, and verifies the installed system is up. + +Example: + python3 scripts/auto_iso_install_qemu.py \ + --iso /path/to/azl4-iso-installer.iso \ + --disk ./azl4-test.qcow2 \ + --log ./azl4-install.log +""" + +from __future__ import annotations + +import argparse +import os +import platform +import re +import shlex +import shutil +import signal +import subprocess +import sys +import time +from dataclasses import dataclass, field +from typing import List, Optional, Pattern + + +# --- Constants / compiled regexes --- + +ANSI_ESCAPE_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") +CPR_REPLY_RE = re.compile(r"(?:\x1B\[)?\d+;\d+R") +DSR_QUERY_RE = re.compile(r"\x1B\[[0-9;]*n") + + +# --- Data structures --- + +@dataclass +class Responder: + """One-shot pattern->reply rule.""" + pattern: Pattern[str] + reply: str + note: str + max_hits: int = 1 + hits: int = 0 + + def can_fire(self) -> bool: + return self.hits < self.max_hits + + +@dataclass +class SequentialPrompts: + """Track a sequence of prompts that must be answered in order.""" + prompts: list[tuple[Pattern[str], str, str]] # (pattern, reply, cooldown_key) + note_prefix: str + entries: int = 0 + + @property + def done(self) -> bool: + return self.entries >= len(self.prompts) + + @property + def current(self) -> tuple[Pattern[str], str, str]: + return self.prompts[self.entries] + + +@dataclass +class State: + """Mutable automation state for the main loop.""" + installed: bool = False + logged_in: bool = False + login_verified: bool = False + login_verification_sent: bool = False + boot_analyze_done: bool = False + boot_analyze_sent: bool = False + spoke_selected: bool = False + begin_attempts: int = 0 + root_spoke_choice: Optional[str] = None + hub_begin_key: str = "b" + root_spoke_lookup_notice_sent: bool = False + awaiting_login_password: bool = False + anaconda_started: bool = False + install_started: bool = False + post_install_enter_sent: bool = False + poweroff_sent: bool = False + login_password_sent: bool = False + boot_luks_unlock_count: int = 0 + last_sent: dict[str, float] = field(default_factory=dict) + + +# --- Utility functions --- + +def run_checked(cmd: List[str]) -> None: + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError( + f"Command failed ({proc.returncode}): {' '.join(shlex.quote(c) for c in cmd)}\n" + f"stdout:\n{proc.stdout}\nstderr:\n{proc.stderr}" + ) + + +def ensure_tools() -> None: + machine = platform.machine() + if machine not in ("x86_64", "AMD64"): + raise RuntimeError( + f"This script only supports x86_64 hosts (detected: {machine}). " + f"aarch64 QEMU ISO boot is not currently supported." + ) + for tool in ("qemu-system-x86_64", "qemu-img"): + if subprocess.run(["bash", "-lc", f"command -v {tool}"], capture_output=True).returncode != 0: + raise RuntimeError(f"Required tool not found: {tool}") + + +def ensure_disk(path: str, size: str) -> bool: + """Create disk if missing. Returns True if disk already existed.""" + if os.path.exists(path): + return True + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + run_checked(["qemu-img", "create", "-f", "qcow2", path, size]) + return False + + +def send_line(proc: subprocess.Popen, line: str) -> None: + if proc.stdin is None: + return + proc.stdin.write((line + "\n").encode("utf-8")) + proc.stdin.flush() + + +def maybe_send( + proc: subprocess.Popen, + buf: str, + pattern: Pattern[str], + reply: str, + note: str, + st: State, + key: str, + cooldown: float = 2.0, +) -> bool: + """Send reply if pattern matches buf and cooldown has elapsed.""" + if not pattern.search(buf): + return False + now = time.time() + if now - st.last_sent.get(key, 0.0) < cooldown: + return False + send_line(proc, reply) + st.last_sent[key] = now + print(f"\n[AUTO] {note}: sent {reply!r}") + return True + + +def advance_seq(proc: subprocess.Popen, buf: str, seq: SequentialPrompts, st: State, cooldown: float = 2.0) -> bool: + """Try to advance a sequential prompt sequence by one step.""" + if seq.done: + return False + pat, reply, key = seq.current + if maybe_send(proc, buf, pat, reply, f"{seq.note_prefix} ({seq.entries + 1}/{len(seq.prompts)})", st, key, cooldown): + seq.entries += 1 + return True + return False + + +def sanitize(text: str) -> str: + """Strip ANSI escapes and normalize line endings for regex matching.""" + clean = ANSI_ESCAPE_RE.sub("", text) + clean = CPR_REPLY_RE.sub("", clean) + return clean.replace("\r", "\n") + + +def strip_cpr_for_display(text: str, carry: str) -> tuple[str, str]: + """Remove CPR/DSR noise from live output, handling chunk boundaries.""" + merged = carry + text + stripped = DSR_QUERY_RE.sub("", merged) + stripped = CPR_REPLY_RE.sub("", stripped) + partial = re.search(r"\x1B\[[0-9;]*$", stripped) + if partial: + return stripped[:partial.start()], partial.group(0) + return stripped, "" + + +def find_root_password_spoke_choice(screen_text: str) -> Optional[str]: + """Return the numeric choice for the Root password spoke from hub text.""" + spoke_entry_re = re.compile( + r"(?mi)(\d+)\)\s*\[[^\]]*\]\s*(.*?)" + r"(?=(?:\s+\d+\)\s*\[[^\]]*\]\s*)|\n|$)", + ) + for m in spoke_entry_re.finditer(screen_text): + if re.search(r"\broot\s+password\b", m.group(2), re.IGNORECASE): + return m.group(1) + # Fallback: look left of "Root password" for nearest "N)". + idx = screen_text.lower().find("root password") + if idx != -1: + nums = re.findall(r"(\d+)\)", screen_text[max(0, idx - 120):idx]) + if nums: + return nums[-1] + return None + + +def find_hub_begin_key(screen_text: str) -> str: + m = re.search(r"(?mi)\['([^']+)'\s+to\s+begin\s+installation", screen_text) + return m.group(1) if m else "b" + + +def hub_root_password_is_complete(screen_text: str) -> bool: + return bool( + re.search(r"(?mi)root\s+password\s+is\s+set", screen_text) + or re.search(r"(?mi)\[x\]\s*Root\s+password\b", screen_text) + ) + + +# --- Patterns --- + +P = { + "post_install_enter": re.compile( + r"Press\s+Enter\s+to\s+(?:reboot(?:\s+into\s+the\s+installed\s+system)?|quit)\s*: ?", re.IGNORECASE + ), + "login": re.compile(r"login:\s*$", re.IGNORECASE), + "login_pw": re.compile(r"password:\s*$", re.IGNORECASE), + "hub": re.compile(r"Please make (?:your choice from above|a selection from the above)", re.IGNORECASE), + "root_pw": re.compile(r"(?m)^.*password\s*:\s*$", re.IGNORECASE), + "root_pw_confirm": re.compile(r"(?mi)^.*((confirm|again).*(password|passphrase)|(password|passphrase).*(confirm|again)).*$"), + "weak_pw": re.compile(r"(use (it|this) anyway|weak password)", re.IGNORECASE), + "luks_pw": re.compile(r"(?mi)^\s*Passphrase\s*:\s*$"), + "luks_pw_confirm": re.compile(r"(?mi)^\s*Passphrase\s*\(confirm\)\s*:\s*$"), + "anaconda_started": re.compile(r"Starting installer", re.IGNORECASE), + "boot_luks": re.compile(r"(?i)(?:please\s+)?enter\s+passphrase\s+for\s+(?:disk\s+)?\S+\s*:"), +} + +SUCCESS_PATTERNS = [ + ("installed", re.compile(r"Installation complete\.\s*Press", re.IGNORECASE)), + ("installed", re.compile(r"Created UEFI boot entry: Azure Linux", re.IGNORECASE)), + ("logged_in", re.compile(r"\b(root|azurelinux)@[^\n]*[#\$]\s*$", re.IGNORECASE)), + ("login_verified", re.compile(r"__AZL_INSTALL_SUCCESS__")), + ("boot_analyze_done", re.compile(r"__AZL_ANALYZE_DONE__")), +] + +FAILURE_PATTERNS = [ + re.compile(r"BootloaderInstallationError", re.IGNORECASE), + re.compile(r"Traceback \(most recent call last\)", re.IGNORECASE), + re.compile(r"kernel panic", re.IGNORECASE), +] + +INSTALL_PROGRESS_PATTERNS = [ + re.compile(r"Running pre-installation", re.IGNORECASE), + re.compile(r"Running installation", re.IGNORECASE), + re.compile(r"Installing", re.IGNORECASE), + re.compile(r"Creating efi", re.IGNORECASE), +] + + +# --- Main --- + +def main() -> int: + parser = argparse.ArgumentParser(description="Automate Azure Linux ISO installation in QEMU") + parser.add_argument("-i", "--iso", required=True, help="Path to installer ISO") + parser.add_argument("-d", "--disk", required=True, help="QCOW2 disk path") + parser.add_argument("-s", "--disk-size", default="10G", help="Disk size if not exists") + parser.add_argument("-m", "--memory", default="4096", help="RAM in MiB") + parser.add_argument("-c", "--cpus", default="2", help="vCPU count") + parser.add_argument("-t", "--timeout", type=int, default=600, help="Timeout in seconds") + parser.add_argument("-u", "--username", default="root", help="Login username") + parser.add_argument("-p", "--password", default="Azl4Install!234", help="Root/login password") + parser.add_argument("-L", "--luks-passphrase", default=None, help="LUKS passphrase (defaults to --password)") + parser.add_argument("-C", "--install-choice", choices=["1", "2"], default="1", help="Installer menu choice") + parser.add_argument("-R", "--root-password-spoke", default=None, help="Explicit spoke number if hub parsing is ambiguous") + parser.add_argument("-l", "--log", default="qemu-install.log", help="Serial log file") + parser.add_argument("-r", "--raw-log", default=None, help="Raw serial log (defaults to .raw)") + parser.add_argument("-q", "--quiet", action="store_true", + help="Suppress live QEMU serial output to stdout (logs still written to file)") + parser.add_argument("--ovmf-code", default="/usr/share/edk2/ovmf/OVMF_CODE.fd", help="OVMF code path") + parser.add_argument("--ovmf-vars", default="/usr/share/edk2/ovmf/OVMF_VARS.fd", help="OVMF vars template") + args = parser.parse_args() + + luks_passphrase = args.luks_passphrase or args.password + + if not os.path.exists(args.iso): + print(f"ISO not found: {args.iso}", file=sys.stderr) + return 2 + + ensure_tools() + + disk_existed = ensure_disk(args.disk, args.disk_size) + if disk_existed: + print( + f"WARNING: Disk already exists: {args.disk}\n" + f" QEMU may boot from disk instead of ISO. " + f"Delete the disk to force a fresh install.", + file=sys.stderr, + ) + + vars_copy = os.path.abspath(os.path.splitext(args.disk)[0] + "-OVMF_VARS.fd") + if not os.path.exists(vars_copy): + os.makedirs(os.path.dirname(vars_copy) or ".", exist_ok=True) + shutil.copy2(args.ovmf_vars, vars_copy) + + qemu_cmd = [ + "qemu-system-x86_64", "-enable-kvm", "-machine", "q35", "-cpu", "host", + "-smp", str(args.cpus), "-m", str(args.memory), + "-drive", f"if=pflash,format=raw,readonly=on,file={args.ovmf_code}", + "-drive", f"if=pflash,format=raw,file={vars_copy}", + "-drive", f"file={os.path.abspath(args.disk)},if=virtio,format=qcow2", + "-cdrom", os.path.abspath(args.iso), + "-boot", "once=d", "-serial", "stdio", "-monitor", "none", "-display", "none", + ] + + responders = [ + Responder(re.compile(r"Select installation type"), args.install_choice, "Installer menu selection"), + Responder(re.compile(r"grub>\s*$", re.IGNORECASE), "normal", "GRUB rescue fallback"), + ] + + # Sequential prompt sequences. + root_pw_seq = SequentialPrompts( + prompts=[ + (P["root_pw"], args.password, "root_pw_first"), + (P["root_pw_confirm"], args.password, "root_pw_second"), + ], + note_prefix="Root password", + ) + luks_pw_seq = SequentialPrompts( + prompts=[ + (P["luks_pw"], luks_passphrase, "luks_pw_first"), + (P["luks_pw_confirm"], luks_passphrase, "luks_pw_second"), + ], + note_prefix="LUKS passphrase", + ) + + st = State() + start = time.time() + qemu: subprocess.Popen | None = None + + print("Launching QEMU:") + print(" " + " ".join(shlex.quote(p) for p in qemu_cmd)) + + raw_log_path = args.raw_log or f"{args.log}.raw" + + with open(args.log, "w", encoding="utf-8", buffering=1) as logf, \ + open(raw_log_path, "w", encoding="utf-8", buffering=1) as raw_logf: + try: + qemu = subprocess.Popen( + qemu_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, bufsize=0, + ) + assert qemu.stdout is not None + os.set_blocking(qemu.stdout.fileno(), False) + + buf = "" + display_carry = "" + last_status = 0.0 + + while True: + if qemu.poll() is not None: + break + if time.time() - start > args.timeout: + raise TimeoutError(f"Timed out after {args.timeout}s") + + try: + chunk = os.read(qemu.stdout.fileno(), 4096) + except BlockingIOError: + chunk = b"" + + if not chunk: + time.sleep(0.1) + if time.time() - last_status > 20: + sys.stdout.write(f"\n[STATUS] running... {int(time.time() - start)}s\n") + last_status = time.time() + continue + + text = chunk.decode("utf-8", errors="replace") + raw_logf.write(text) + display_text, display_carry = strip_cpr_for_display(text, display_carry) + if not args.quiet: + sys.stdout.write(display_text) + sys.stdout.flush() + logf.write(sanitize(text)) + buf = (buf + text)[-12000:] + + clean = sanitize(buf[-4000:]) + + # Check for fatal errors. + for pat in FAILURE_PATTERNS: + if pat.search(clean): + raise RuntimeError(f"Detected failure: {pat.pattern}") + + # Track state transitions. + if not st.anaconda_started and P["anaconda_started"].search(buf[-2000:]): + st.anaconda_started = True + + if not st.install_started: + for pat in INSTALL_PROGRESS_PATTERNS: + if pat.search(clean): + st.install_started = True + print("\n[AUTO] Installation progress detected.") + break + + # Success detection. Gate logged_in shell prompt match to after the + # login password was actually sent, so we don't match transient root + # shells in the installer environment. + for attr, pat in SUCCESS_PATTERNS: + if attr == "logged_in" and not st.login_password_sent: + continue + if pat.search(clean): + setattr(st, attr, True) + + # --- Pre-install automation --- + if not st.installed: + hub_window = sanitize(buf[-2000:]) + + # Hub spoke selection. + if not st.spoke_selected and P["hub"].search(hub_window): + if st.root_spoke_choice is None: + st.root_spoke_choice = args.root_password_spoke or find_root_password_spoke_choice(hub_window) + st.hub_begin_key = find_hub_begin_key(hub_window) + if st.root_spoke_choice is not None: + if maybe_send(qemu, hub_window, P["hub"], st.root_spoke_choice, + f"Hub: select Root password spoke ({st.root_spoke_choice})", st, "hub_select_spoke"): + st.spoke_selected = True + elif not st.root_spoke_lookup_notice_sent: + print("\n[AUTO] Hub detected, waiting to parse Root password spoke...") + st.root_spoke_lookup_notice_sent = True + + # Root password entry (only after spoke selected). + if st.spoke_selected and not root_pw_seq.done: + advance_seq(qemu, buf[-1200:], root_pw_seq, st) + + # LUKS passphrase during install (option 2, after anaconda starts). + if args.install_choice == "2" and st.anaconda_started and not luks_pw_seq.done: + advance_seq(qemu, buf[-1200:], luks_pw_seq, st) + + # Weak password confirmation. + maybe_send(qemu, buf[-1200:], P["weak_pw"], "yes", "Weak password accept", st, "weak_pw_confirm") + + # Begin installation once root password is set. + if root_pw_seq.done and not st.install_started and P["hub"].search(hub_window): + if hub_root_password_is_complete(hub_window): + if maybe_send(qemu, hub_window, P["hub"], st.hub_begin_key, + f"Hub: begin installation ({st.hub_begin_key})", st, "hub_continue", cooldown=4.0): + st.begin_attempts += 1 + elif st.begin_attempts == 0: + print("\n[AUTO] Hub: Root password incomplete; waiting...") + + # Generic responders (install menu, grub rescue). + for r in responders: + if r.can_fire() and r.pattern.search(buf): + send_line(qemu, r.reply) + r.hits += 1 + sys.stdout.write(f"\n[AUTO] {r.note}: sent {r.reply!r}\n") + + # --- Post-install automation --- + if not st.post_install_enter_sent and P["post_install_enter"].search(sanitize(buf[-2000:])): + send_line(qemu, "") + st.post_install_enter_sent = True + print("\n[AUTO] Post-install prompt: sent Enter") + + # Boot LUKS unlock (first boot + after SELinux relabel reboot). + if args.install_choice == "2" and st.installed and not st.login_verified: + if maybe_send(qemu, sanitize(buf[-1200:]), P["boot_luks"], luks_passphrase, + f"Boot LUKS unlock (#{st.boot_luks_unlock_count + 1})", st, + f"boot_luks_{st.boot_luks_unlock_count}", cooldown=5.0): + st.boot_luks_unlock_count += 1 + + # Console login. + if maybe_send(qemu, buf[-1200:], P["login"], args.username, "Login username", st, "login_username"): + st.awaiting_login_password = True + if st.awaiting_login_password: + if maybe_send(qemu, buf[-1200:], P["login_pw"], args.password, "Login password", st, "login_password"): + st.awaiting_login_password = False + st.login_password_sent = True + + # Verification and shutdown. + if st.logged_in and not st.login_verification_sent: + send_line(qemu, "echo __AZL_INSTALL_SUCCESS__") + st.login_verification_sent = True + if st.login_verified and not st.boot_analyze_sent: + # Collect boot timing once the system is up. + send_line( + qemu, + "echo __AZL_ANALYZE_BEGIN__; systemd-analyze; " + "systemd-analyze blame | head -n 20; " + "echo __AZL_ANALYZE_DONE__", + ) + st.boot_analyze_sent = True + if st.boot_analyze_done and not st.poweroff_sent: + send_line(qemu, "poweroff") + st.poweroff_sent = True + + # Periodic status. + if time.time() - last_status > 20: + sys.stdout.write(f"\n[STATUS] running... {int(time.time() - start)}s\n") + last_status = time.time() + + rc = qemu.returncode if qemu.returncode is not None else -1 + if rc not in (0, 1): + raise RuntimeError(f"QEMU exited with unexpected code {rc}") + if not st.installed: + msg = "WARNING: install completion not detected; check log." + if disk_existed: + msg += ( + f"\n The disk '{args.disk}' already existed before this run." + f"\n QEMU likely booted from the existing disk instead of the ISO." + f"\n Delete the disk and re-run to test a fresh ISO install." + ) + print(msg, file=sys.stderr) + return 1 + if not st.login_verified: + print("WARNING: login verification not detected; check log.", file=sys.stderr) + return 1 + print(f"\nSUCCESS: automated install finished. Serial log: {args.log}") + return 0 + + except Exception as exc: + print(f"\nERROR: {exc}", file=sys.stderr) + if qemu and qemu.poll() is None: + qemu.send_signal(signal.SIGTERM) + try: + qemu.wait(timeout=10) + except subprocess.TimeoutExpired: + qemu.kill() + return 1 + + +if __name__ == "__main__": + raise SystemExit(main())