diff --git a/.github/workflows/validate-kek-updates.yml b/.github/workflows/validate-kek-updates.yml new file mode 100644 index 0000000..f990b0e --- /dev/null +++ b/.github/workflows/validate-kek-updates.yml @@ -0,0 +1,132 @@ +# This workflow validates KEK update files in pull requests to ensure they have +# valid cryptographic signatures and expected payloads before merging. +# +# Copyright (c) Microsoft Corporation. +# SPDX-License-Identifier: BSD-2-Clause-Patent +name: Validate KEK Updates + +on: + pull_request: + branches: [ "main" ] + paths: + - 'PostSignedObjects/KEK/**/*.bin' + - 'PreSignedObjects/KEK/**/*.bin' + +jobs: + validate-kek: + name: Validate KEK Update Files + runs-on: ubuntu-latest + + steps: + - name: Checkout PR + uses: actions/checkout@v4 + with: + fetch-depth: 0 # Need full history to compare with base branch + + - name: Set up Python + uses: actions/setup-python@v6 + with: + python-version: 3.12 + cache: 'pip' + cache-dependency-path: pip-requirements.txt + + - name: Install Pip Dependencies + run: | + python -m pip install --upgrade pip + pip install -r pip-requirements.txt + + - name: Get Changed KEK Files + id: changed-files + run: | + # Get list of changed .bin files in KEK directories + git fetch origin ${{ github.base_ref }} + CHANGED_FILES=$(git diff --name-only --diff-filter=AM origin/${{ github.base_ref }}...HEAD | grep -E '(PostSignedObjects|PreSignedObjects)/KEK/.*\.bin$' || echo "") + + if [ -z "$CHANGED_FILES" ]; then + echo "No KEK files changed" + echo "has_changes=false" >> $GITHUB_OUTPUT + else + echo "Changed KEK files:" + echo "$CHANGED_FILES" + echo "has_changes=true" >> $GITHUB_OUTPUT + # Save changed files to a temporary file + echo "$CHANGED_FILES" > changed_kek_files.txt + fi + + - name: Validate Changed KEK Files + if: steps.changed-files.outputs.has_changes == 'true' + run: | + VALIDATION_FAILED=0 + VALIDATION_RESULTS_DIR="kek_validation_results" + mkdir -p "$VALIDATION_RESULTS_DIR" + + echo "## KEK Validation Results" >> $GITHUB_STEP_SUMMARY + echo "" >> $GITHUB_STEP_SUMMARY + + while IFS= read -r file; do + if [ -f "$file" ]; then + echo "Validating: $file" + BASENAME=$(basename "$file" .bin) + OUTPUT_JSON="$VALIDATION_RESULTS_DIR/${BASENAME}_validation.json" + + # Run validation and capture exit code + if python scripts/validate_kek.py "$file" -o "$OUTPUT_JSON" -q; then + # Parse JSON to check both signature and payload + SIGNATURE_VALID=$(jq -r '.result.valid' "$OUTPUT_JSON") + PAYLOAD_VALID=$(jq -r '.result.payload_hash_valid' "$OUTPUT_JSON") + + if [ "$SIGNATURE_VALID" = "true" ] && [ "$PAYLOAD_VALID" = "true" ]; then + echo "✅ **PASS**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ✅ VALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: ✅ True" >> $GITHUB_STEP_SUMMARY + elif [ "$SIGNATURE_VALID" = "true" ] && [ "$PAYLOAD_VALID" = "false" ]; then + echo "⚠️ **WARNING**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ✅ VALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: ⚠️ False (non-standard payload)" >> $GITHUB_STEP_SUMMARY + PAYLOAD_HASH=$(jq -r '.result.payload_hash' "$OUTPUT_JSON") + echo " - Payload Hash: \`$PAYLOAD_HASH\`" >> $GITHUB_STEP_SUMMARY + # Don't fail on payload mismatch, just warn + else + echo "❌ **FAIL**: \`$file\`" >> $GITHUB_STEP_SUMMARY + echo " - Cryptographic Signature: ❌ INVALID" >> $GITHUB_STEP_SUMMARY + echo " - Expected Payload: $([ "$PAYLOAD_VALID" = "true" ] && echo "✅ True" || echo "⚠️ False")" >> $GITHUB_STEP_SUMMARY + VALIDATION_FAILED=1 + fi + else + echo "❌ **FAIL**: \`$file\` - Validation script failed" >> $GITHUB_STEP_SUMMARY + VALIDATION_FAILED=1 + fi + echo "" >> $GITHUB_STEP_SUMMARY + fi + done < changed_kek_files.txt + + # Upload validation results as artifact + if [ -d "$VALIDATION_RESULTS_DIR" ] && [ "$(ls -A $VALIDATION_RESULTS_DIR)" ]; then + echo "Uploading validation results..." + fi + + # Exit with error if any validation failed + if [ $VALIDATION_FAILED -eq 1 ]; then + echo "::error::One or more KEK files have invalid cryptographic signatures" + exit 1 + fi + + - name: Upload Validation Results + if: steps.changed-files.outputs.has_changes == 'true' + uses: actions/upload-artifact@v4 + with: + name: kek-validation-results + path: kek_validation_results/ + retention-days: 30 + + - name: Comment on PR + if: steps.changed-files.outputs.has_changes == 'true' && failure() + uses: actions/github-script@v7 + with: + script: | + github.rest.issues.createComment({ + issue_number: context.issue.number, + owner: context.repo.owner, + repo: context.repo.repo, + body: '❌ **KEK Validation Failed**\n\nOne or more KEK update files have invalid cryptographic signatures. Please review the validation results in the workflow run details.' + }) diff --git a/scripts/auth_var_tool.py b/scripts/auth_var_tool.py index b3a6c2d..6b1c479 100644 --- a/scripts/auth_var_tool.py +++ b/scripts/auth_var_tool.py @@ -5,11 +5,12 @@ ## """UEFI Authenticated Variable Tool for signing and formatting variables. -This tool provides three main commands: +This tool provides four main commands: 1. format: Generates signable data and receipt files for external signing workflows 2. sign: Signs variables using PFX files or attaches pre-generated signatures -3. describe: Parses and describes existing signed variables +3. verify: Verifies cryptographic signatures of authenticated variables +4. describe: Parses and describes existing signed variables The tool supports both direct signing (using PFX files) and external signing workflows (where signatures are generated outside this tool and then attached). @@ -31,25 +32,37 @@ # Attach external signature using receipt python auth_var_tool.py sign --receipt-file MyVar.receipt.json --signature-file MyVar.bin.p7 + # Verify a signed authenticated variable + python auth_var_tool.py verify MyVar.authvar.bin MyVar 8be4df61-93ca-11d2-aa0d-00e098032b8c "NV,BS,RT,AT,AP" -v + # Describe an existing signed variable python auth_var_tool.py describe signed_variable.bin """ import argparse import datetime +import hashlib import io import json import logging import os +import re import sys import uuid from getpass import getpass +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa from cryptography.hazmat.primitives.serialization import pkcs12 from edk2toollib.uefi.authenticated_variables_structure_support import ( EfiVariableAuthentication2, EfiVariableAuthentication2Builder, ) +from pyasn1.codec.der import decoder, encoder +from pyasn1_modules import rfc2315 # Puts the script into debug mode, may be enabled via argparse ENABLE_DEBUG = False @@ -92,6 +105,253 @@ def _parse_timestamp(timestamp_str: str = None) -> datetime.datetime: return datetime.datetime.now(datetime.timezone.utc) +def _get_hash_algorithm_from_oid(oid: str) -> hashes.HashAlgorithm | None: + """Map OID to cryptography hash algorithm.""" + oid_map = { + '2.16.840.1.101.3.4.2.1': hashes.SHA256(), + '2.16.840.1.101.3.4.2.2': hashes.SHA384(), + '2.16.840.1.101.3.4.2.3': hashes.SHA512(), + '1.3.14.3.2.26': hashes.SHA1(), + } + return oid_map.get(oid) + + +def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> list: + """Extract X.509 certificates from PKCS7 data.""" + certificates = [] + try: + # Try to decode as ContentInfo first + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + # If that fails, try decoding directly as SignedData + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Extract certificates if present + if signed_data['certificates'].hasValue(): + for cert_choice in signed_data['certificates']: + cert_der = encoder.encode(cert_choice['certificate']) + cert = x509.load_der_x509_certificate(cert_der, default_backend()) + certificates.append(cert) + + except Exception as e: + logger.debug(f"Failed to extract certificates: {e}") + + return certificates + + +def _verify_pkcs7_signature(pkcs7_data: bytes, certificates: list, external_data: bytes) -> dict: + """Verify PKCS7 detached signature against external data. + + Returns: + dict: Verification results with 'verified' boolean and list of 'signers' + """ + results = { + 'verified': False, + 'signers': [], + 'errors': [] + } + + try: + # Decode PKCS7 structure + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Verify each signer + for signer_idx, signer_info in enumerate(signed_data['signerInfos']): + signer_result = { + 'index': signer_idx, + 'verified': False, + 'error': None + } + + try: + # Get digest algorithm + digest_alg_oid = str(signer_info['digestAlgorithm']['algorithm']) + hash_algorithm = _get_hash_algorithm_from_oid(digest_alg_oid) + + if not hash_algorithm: + signer_result['error'] = f"Unsupported digest algorithm: {digest_alg_oid}" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Get the encrypted digest (signature) + encrypted_digest = bytes(signer_info['encryptedDigest']) + + # Find the signer's certificate + serial_number = int(signer_info['issuerAndSerialNumber']['serialNumber']) + signer_cert = None + for cert in certificates: + if cert.serial_number == serial_number: + signer_cert = cert + break + + if not signer_cert: + signer_result['error'] = f"Signer certificate not found (serial: {serial_number})" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Determine what data to verify + if signer_info['authenticatedAttributes'].hasValue(): + # With authenticated attributes, sign the attributes + authenticated_attrs = signer_info['authenticatedAttributes'] + attrs_der = encoder.encode(authenticated_attrs) + # Replace IMPLICIT tag [0] (0xA0) with SET OF tag (0x31) + if attrs_der[0:1] == b'\xa0': + attrs_der = b'\x31' + attrs_der[1:] + data_to_verify = attrs_der + else: + # No authenticated attributes - verify external data directly + data_to_verify = external_data + + # Verify signature + public_key = signer_cert.public_key() + + if isinstance(public_key, rsa.RSAPublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + padding.PKCS1v15(), + hash_algorithm + ) + signer_result['verified'] = True + elif isinstance(public_key, ec.EllipticCurvePublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + ec.ECDSA(hash_algorithm) + ) + signer_result['verified'] = True + else: + signer_result['error'] = f"Unsupported key type: {type(public_key)}" + results['errors'].append(signer_result['error']) + + except InvalidSignature: + signer_result['error'] = "Signature verification failed - invalid signature" + results['errors'].append(signer_result['error']) + except Exception as e: + signer_result['error'] = f"Verification error: {str(e)}" + results['errors'].append(signer_result['error']) + + results['signers'].append(signer_result) + + # Overall verification passes if all signers verified + results['verified'] = all(s['verified'] for s in results['signers']) and len(results['signers']) > 0 + + except Exception as e: + results['errors'].append(f"PKCS7 parsing error: {str(e)}") + + return results + + +def verify_variable(args: argparse.Namespace) -> int: + """Verifies the cryptographic signature of an authenticated variable. + + This command validates that: + 1. The PKCS7 signature structure is valid + 2. The signature cryptographically verifies against the signable data + 3. The signing certificate is present in the signature + + Parameters + ---------- + args : argparse.Namespace + Command-line arguments including: + - authvar_file: Path to the signed authenticated variable file + - var_name: Variable name used during signing + - var_guid: Variable GUID used during signing + - attributes: Variable attributes used during signing + - verbose: Enable detailed output + + Returns: + ------- + int + 0 if verification succeeds, 1 if verification fails + """ + try: + # Parse the authenticated variable + logger.info(f"Verifying authenticated variable: {args.authvar_file}") + + with open(args.authvar_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + + # Reconstruct the signable data using the builder + signing_time = auth_var.time.get_datetime() + builder = EfiVariableAuthentication2Builder( + name=args.var_name, + guid=uuid.UUID(args.var_guid), + attributes=args.attributes, + payload=auth_var.payload, + efi_time=signing_time + ) + signable_data = builder.get_digest() + + if args.verbose: + logger.info(f"Variable Name: {args.var_name}") + logger.info(f"Variable GUID: {args.var_guid}") + logger.info(f"Attributes: {args.attributes}") + logger.info(f"Signing Time: {signing_time}") + logger.info(f"Payload Size: {len(auth_var.payload)} bytes") + logger.info(f"Signable Data SHA256: {hashlib.sha256(signable_data).hexdigest()}") + + # Extract PKCS7 signature (cert_data from edk2toollib is the PKCS7 data) + pkcs7_data = auth_var.auth_info.cert_data + + # Extract certificates + certificates = _extract_certificates_from_pkcs7(pkcs7_data) + + if args.verbose: + logger.info(f"\nCertificates found: {len(certificates)}") + for i, cert in enumerate(certificates, 1): + logger.info(f" Certificate {i}:") + logger.info(f" Subject: {cert.subject.rfc4514_string()}") + logger.info(f" Issuer: {cert.issuer.rfc4514_string()}") + logger.info(f" Valid: {cert.not_valid_before_utc} to {cert.not_valid_after_utc}") + + # Verify the signature + verification_result = _verify_pkcs7_signature(pkcs7_data, certificates, signable_data) + + # Display results + if args.verbose: + logger.info("") + logger.info("Signature Verification Results:") + for signer in verification_result['signers']: + logger.info(f" Signer {signer['index'] + 1}:") + if signer['verified']: + logger.info(" Status: VERIFIED") + else: + logger.info(" Status: FAILED") + if signer['error']: + logger.info(f" Error: {signer['error']}") + + if verification_result['verified']: + logger.info("[+] Authenticated variable signature is VALID") + return 0 + else: + logger.error("[-] Authenticated variable signature verification FAILED") + for error in verification_result['errors']: + logger.error(f" - {error}") + return 1 + + except Exception as e: + logger.error(f"Failed to verify authenticated variable: {e}") + if args.verbose: + import traceback + traceback.print_exc() + return 1 + + def format_variable(args: argparse.Namespace) -> int: """Formats a variable for signing by generating signable data and a receipt file. @@ -389,6 +649,73 @@ def _attach_signature_from_receipt(args: argparse.Namespace) -> int: return 0 +def _convert_hex_strings_to_readable(content: str) -> str: + """Convert hex-encoded strings in describe output to human-readable format. + + This function searches for patterns like "value=0x131a..." in the content + and attempts to decode them as UTF-8 strings. Common encodings in certificates + include PrintableString (0x13), UTF8String (0x0c), and IA5String (0x16). + + Parameters + ---------- + content : str + The original text content from auth_var.print() + + Returns: + ------- + str + The content with hex strings converted to readable format where possible + """ + + # Pattern to match hex value lines like " value=0x131a444f204e4f54..." + pattern = r'([ ]*value=)(0x[0-9a-fA-F]+)' + + def decode_hex_value(match: 're.Match[str]') -> str: + indent = match.group(1) + hex_string = match.group(2) + + try: + # Remove "0x" prefix and convert to bytes + hex_bytes = bytes.fromhex(hex_string[2:]) + + # Check if this looks like an ASN.1 encoded string + # Common prefixes: 0x13 (PrintableString), 0x0c (UTF8String), 0x16 (IA5String) + if len(hex_bytes) >= 2 and hex_bytes[0] in [0x13, 0x0c, 0x16]: + # Second byte is the length + length = hex_bytes[1] + if len(hex_bytes) >= 2 + length: + # Extract the string content + string_data = hex_bytes[2:2+length] + try: + # Attempt to decode as UTF-8 + decoded = string_data.decode('utf-8') + # Only replace if it looks like printable text + if decoded.isprintable() or all(c in '\t\n\r' or c.isprintable() for c in decoded): + return f'{indent}{hex_string} ("{decoded}")' + except (UnicodeDecodeError, AttributeError): + # Decoding failed; ignore and try the next decoding strategy. + pass + + # If it's not ASN.1 encoded, try direct UTF-8 decode + # (in case it's just raw string data) + try: + decoded = hex_bytes.decode('utf-8') + if decoded.isprintable() or all(c in '\t\n\r' or c.isprintable() for c in decoded): + return f'{indent}{hex_string} ("{decoded}")' + except UnicodeDecodeError: + # Ignore decode errors: not all byte sequences are valid UTF-8, so skip these cases. + pass + + except (ValueError, IndexError) as e: + # Failed to decode hex string as ASN.1 or UTF-8; this is expected for some values. + logging.debug(f"Failed to decode hex string '{hex_string}': {e}") + + # If decoding fails, return original + return match.group(0) + + return re.sub(pattern, decode_hex_value, content) + + def describe_variable(args: argparse.Namespace) -> int: """Parses and describes an authenticated variable structure. @@ -410,9 +737,20 @@ def describe_variable(args: argparse.Namespace) -> int: name = os.path.basename(args.signed_payload) output_file = os.path.join(args.output_dir, f"{name}.authvar.txt") + # First write to a buffer to capture the output + buffer = io.StringIO() + auth_var.print(outfs=buffer) + + # Get the content and convert hex strings to readable format + content = buffer.getvalue() + readable_content = _convert_hex_strings_to_readable(content) + + # Write the converted content to the output file with open(output_file, "w") as f: - auth_var.print(outfs=f) + f.write(readable_content) + payload_hash = hashlib.sha256(auth_var.payload).hexdigest() + logger.info(f"Payload SHA256: {payload_hash}") logger.info(f"Output: {output_file}") return 0 @@ -567,6 +905,49 @@ def setup_describe_parser(subparsers: argparse._SubParsersAction) -> argparse._S return subparsers +def setup_verify_parser(subparsers: argparse._SubParsersAction) -> argparse._SubParsersAction: + """Sets up the verify parser. + + :param subparsers: - sub parser from argparse to add options to + + :returns: subparser + """ + verify_parser = subparsers.add_parser( + "verify", + help="Verifies the cryptographic signature of an authenticated variable" + ) + verify_parser.set_defaults(function=verify_variable) + + verify_parser.add_argument( + "authvar_file", + type=typecheck_file_exists, + help="Path to the signed authenticated variable file (.authvar.bin)" + ) + + verify_parser.add_argument( + "var_name", + help="Variable name that was used during signing (e.g., 'KEK', 'db', 'PK')" + ) + + verify_parser.add_argument( + "var_guid", + help="Variable GUID that was used during signing (e.g., '8be4df61-93ca-11d2-aa0d-00e098032b8c')" + ) + + verify_parser.add_argument( + "attributes", + help="Comma-separated list of attributes used during signing (e.g., 'NV,BS,RT,AT,AP')" + ) + + verify_parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose output with detailed verification information" + ) + + return subparsers + + def parse_args() -> argparse.Namespace: """Parses arguments from the command line.""" parser = argparse.ArgumentParser( @@ -582,9 +963,10 @@ def parse_args() -> argparse.Namespace: "--debug", action="store_true", default=False, help="enables debug printing for deep inspection" ) - subparsers = setup_format_parser(subparsers) - subparsers = setup_sign_parser(subparsers) - subparsers = setup_describe_parser(subparsers) + setup_format_parser(subparsers) + setup_sign_parser(subparsers) + setup_describe_parser(subparsers) + setup_verify_parser(subparsers) args = parser.parse_args() # Create output directory if it doesn't exist (after parsing args) diff --git a/scripts/validate_kek.py b/scripts/validate_kek.py new file mode 100644 index 0000000..36a2fb6 --- /dev/null +++ b/scripts/validate_kek.py @@ -0,0 +1,408 @@ +"""Validate KEK update file(s) and generate a JSON report. + +This script validates authenticated variable files - either a single file or all +files in a specified folder - and generates a JSON report with validation results. +""" + +import argparse +import hashlib +import json +import logging +import sys +from datetime import datetime, timezone +from pathlib import Path + +# Import validation functions from auth_var_tool +sys.path.insert(0, str(Path(__file__).parent)) +# Import the verify function from auth_var_tool +from auth_var_tool import verify_variable +from edk2toollib.uefi.authenticated_variables_structure_support import EfiVariableAuthentication2 + +# Standard KEK parameters +KEK_NAME = "KEK" +KEK_GUID = "8be4df61-93ca-11d2-aa0d-00e098032b8c" +KEK_ATTRIBUTES = "NV,BS,RT,AT,AP" + +# Expected payload hash for Microsoft 2023 KEK (EFI Signature List with x.509) +EXPECTED_PAYLOAD_HASH = "5b85333c009d7ea55cbb6f11a5c2ff45ee1091a968504c929aed25c84674962f" + + +def validate_single_kek( + kek_file: Path, + quiet: bool = False +) -> dict: + """Validate a single KEK update file. + + Args: + kek_file: Path to KEK update file + quiet: If True, suppress validation output from the verification process + + Returns: + dict: Validation result for the file + """ + logging.info(f"Validating: {kek_file.name}") + + file_result = { + "filename": kek_file.name, + "path": str(kek_file), + "valid": False, + "payload_hash_valid": False, + "error": None, + "warnings": [], + "details": {} + } + + try: + # First, parse the authenticated variable to check payload hash + with open(kek_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + payload = auth_var.payload + payload_hash = hashlib.sha256(payload).hexdigest() + + file_result["payload_hash"] = payload_hash + file_result["payload_size"] = len(payload) + file_result["payload_hash_valid"] = (payload_hash.lower() == EXPECTED_PAYLOAD_HASH.lower()) + + if not file_result["payload_hash_valid"]: + warning_msg = f"Payload hash mismatch: expected {EXPECTED_PAYLOAD_HASH}, got {payload_hash}" + file_result["warnings"].append(warning_msg) + logging.warning(" [!] Payload hash mismatch!") + logging.warning(f" Expected: {EXPECTED_PAYLOAD_HASH}") + logging.warning(f" Got: {payload_hash}") + + # Validate the file using auth_var_tool.verify_variable + # Create a namespace object with the required arguments + verify_args = argparse.Namespace( + authvar_file=str(kek_file), + var_name=KEK_NAME, + var_guid=KEK_GUID, + attributes=KEK_ATTRIBUTES, + verbose=False + ) + + # Capture logger output if in quiet mode + if quiet: + # Temporarily increase logger level to suppress INFO messages + original_level = logging.root.level + logging.root.setLevel(logging.ERROR) + + try: + # verify_variable returns 0 for success, 1 for failure + exit_code = verify_variable(verify_args) + file_result["valid"] = (exit_code == 0) + + if not file_result["valid"]: + file_result["warnings"].append("Signature verification failed") + finally: + if quiet: + # Restore original logger level + logging.root.setLevel(original_level) + + # Store basic details + file_result["details"] = { + "verified": file_result["valid"] + } + + # Display results + sig_status = "VALID" if file_result["valid"] else "INVALID" + payload_status = "True" if file_result["payload_hash_valid"] else "False" + + logging.info(f" Cryptographic Signature: {sig_status}") + logging.info(f" Expected Payload: {payload_status}\n") + + except Exception as e: + file_result["error"] = str(e) + logging.error(f" [X] ERROR: {e}\n") + + return file_result + + +def validate_kek_folder( + folder_path: Path, + output_file: Path = None, + quiet: bool = False, + recursive: bool = False +) -> dict: + """Validate all .bin files in the specified folder. + + Args: + folder_path: Path to folder containing KEK update files + output_file: Optional path to output JSON file + quiet: If True, suppress validation output from the verification process + recursive: If True, process subdirectories recursively + + Returns: + dict: Validation results + """ + results = { + "validation_date": datetime.now(timezone.utc).isoformat(), + "folder": str(folder_path), + "parameters": { + "var_name": KEK_NAME, + "var_guid": KEK_GUID, + "attributes": KEK_ATTRIBUTES + }, + "files": {}, + "by_manufacturer": {} + } + + # Find all .bin files (recursively if requested) + if recursive: + bin_files = sorted(folder_path.rglob("*.bin")) + else: + bin_files = sorted(folder_path.glob("*.bin")) + + if not bin_files: + logging.warning(f"No .bin files found in {folder_path}") + # Initialize empty summary for consistency + results["summary"] = { + "total": 0, + "valid": 0, + "invalid": 0, + "manufacturers": 0 + } + return results + + logging.info(f"Found {len(bin_files)} files to validate\n") + + # Validate each file + for bin_file in bin_files: + # Determine manufacturer (relative path from base folder) + relative_path = bin_file.relative_to(folder_path) + if len(relative_path.parts) > 1: + manufacturer = relative_path.parts[0] + else: + manufacturer = "root" + + logging.info(f"Validating: {relative_path}") + + file_result = { + "filename": bin_file.name, + "relative_path": str(relative_path), + "manufacturer": manufacturer, + "path": str(bin_file), + "valid": False, + "payload_hash_valid": False, + "error": None, + "warnings": [], + "details": {} + } + + try: + # First, parse the authenticated variable to check payload hash + with open(bin_file, 'rb') as f: + auth_var = EfiVariableAuthentication2(decodefs=f) + payload = auth_var.payload + payload_hash = hashlib.sha256(payload).hexdigest() + + file_result["payload_hash"] = payload_hash + file_result["payload_size"] = len(payload) + file_result["payload_hash_valid"] = (payload_hash.lower() == EXPECTED_PAYLOAD_HASH.lower()) + + if not file_result["payload_hash_valid"]: + warning_msg = f"Payload hash mismatch: expected {EXPECTED_PAYLOAD_HASH}, got {payload_hash}" + file_result["warnings"].append(warning_msg) + logging.warning(" [!] Payload hash mismatch!") + logging.warning(f" Expected: {EXPECTED_PAYLOAD_HASH}") + logging.warning(f" Got: {payload_hash}") + + # Validate the file using auth_var_tool.verify_variable + # Create a namespace object with the required arguments + verify_args = argparse.Namespace( + authvar_file=str(bin_file), + var_name=KEK_NAME, + var_guid=KEK_GUID, + attributes=KEK_ATTRIBUTES, + verbose=False + ) + + # Capture logger output if in quiet mode + if quiet: + # Temporarily increase logger level to suppress INFO messages + original_level = logging.root.level + logging.root.setLevel(logging.ERROR) + + try: + # verify_variable returns 0 for success, 1 for failure + exit_code = verify_variable(verify_args) + file_result["valid"] = (exit_code == 0) + + if not file_result["valid"]: + file_result["warnings"].append("Signature verification failed") + finally: + if quiet: + # Restore original logger level + logging.root.setLevel(original_level) + + # Store basic details + file_result["details"] = { + "verified": file_result["valid"] + } + + # Display results + sig_status = "VALID" if file_result["valid"] else "INVALID" + payload_status = "True" if file_result["payload_hash_valid"] else "False" + + logging.info(f" Cryptographic Signature: {sig_status}") + logging.info(f" Expected Payload: {payload_status}\n") + + except Exception as e: + file_result["error"] = str(e) + logging.error(f" [X] ERROR: {e}") + + results["files"][str(relative_path)] = file_result + + # Add to manufacturer grouping + if manufacturer not in results["by_manufacturer"]: + results["by_manufacturer"][manufacturer] = { + "files": [], + "valid": 0, + "invalid": 0 + } + results["by_manufacturer"][manufacturer]["files"].append(str(relative_path)) + if file_result["valid"]: + results["by_manufacturer"][manufacturer]["valid"] += 1 + else: + results["by_manufacturer"][manufacturer]["invalid"] += 1 + + # Generate summary + valid_count = sum(1 for r in results["files"].values() if r["valid"]) + invalid_count = len(results["files"]) - valid_count + + results["summary"] = { + "total": len(results["files"]), + "valid": valid_count, + "invalid": invalid_count, + "manufacturers": len(results["by_manufacturer"]) + } + + logging.info(f"\n{'='*60}") + logging.info("SUMMARY:") + logging.info(f" Total Files: {results['summary']['total']}") + logging.info(f" Valid: {results['summary']['valid']}") + logging.info(f" Invalid: {results['summary']['invalid']}") + if recursive: + logging.info(f" Manufacturers: {results['summary']['manufacturers']}") + logging.info("") + logging.info("By Manufacturer:") + for mfr, data in sorted(results["by_manufacturer"].items()): + logging.info( + f" {mfr:30s} Total: {len(data['files']):3d} Valid: {data['valid']:3d} Invalid: {data['invalid']:3d}" + ) + logging.info(f"{'='*60}") + + # Save to file if requested + if output_file: + with open(output_file, 'w') as f: + json.dump(results, f, indent=2) + logging.info(f"\nResults saved to: {output_file}") + + return results + + +def main() -> int: + """Main entry point for validating KEK update file(s).""" + parser = argparse.ArgumentParser( + description="Validate KEK update file(s) - single file or folder" + ) + parser.add_argument( + "path", + type=Path, + help="Path to a KEK update file (.bin) or folder containing KEK update files" + ) + parser.add_argument( + "-o", "--output", + type=Path, + default=None, + help="Path to output JSON file (default: _validation_results.json, always generated)" + ) + parser.add_argument( + "-r", "--recursive", + action="store_true", + help="Process subdirectories recursively (only applicable for folders)" + ) + parser.add_argument( + "-v", "--verbose", + action="store_true", + help="Enable verbose logging" + ) + parser.add_argument( + "-q", "--quiet", + action="store_true", + help="Suppress validation output (show only summary)" + ) + + args = parser.parse_args() + + # Setup logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format='%(message)s' + ) + + # Validate path exists + if not args.path.exists(): + logging.error(f"Path not found: {args.path}") + return 1 + + # Determine if path is a file or directory + if args.path.is_file(): + # Validate single file + if not args.path.suffix == '.bin': + logging.error(f"File must have .bin extension: {args.path}") + return 1 + + # Determine output file + if args.output is None: + output_file = args.path.parent / f"{args.path.stem}_validation_results.json" + else: + output_file = args.output + + # Validate the single file + file_result = validate_single_kek(args.path, quiet=args.quiet) + + # Create results structure + results = { + "validation_date": datetime.now(timezone.utc).isoformat(), + "file": str(args.path), + "parameters": { + "var_name": KEK_NAME, + "var_guid": KEK_GUID, + "attributes": KEK_ATTRIBUTES + }, + "result": file_result + } + + # Save to file + with open(output_file, 'w') as f: + json.dump(results, f, indent=2) + logging.info(f"Results saved to: {output_file}") + + # Return exit code based on validation + return 0 if file_result["valid"] else 1 + + elif args.path.is_dir(): + # Validate folder + # Determine output file + if args.output is None: + output_file = args.path.parent / f"{args.path.name}_validation_results.json" + else: + output_file = args.output + + # Run validation + results = validate_kek_folder(args.path, output_file, quiet=args.quiet, recursive=args.recursive) + + # Return exit code based on results + if results["summary"]["invalid"] > 0: + return 1 + + return 0 + + else: + logging.error(f"Invalid path type: {args.path}") + return 1 + + +if __name__ == "__main__": + sys.exit(main())