From 3f25a9d791f2b86e5e08f0f308f9fc0635f65529 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Sun, 30 Nov 2025 19:38:18 -0800 Subject: [PATCH 01/10] feat: Add cryptographic verification to authenticode_transplant.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds comprehensive cryptographic validation to the Authenticode signature combining tool, bringing the same verification capabilities from auth_var_tool.py to PE file signature operations. Key changes: - Added cryptographic signature verification using the 'cryptography' library - Implemented SpcIndirectDataContent parsing to extract embedded PE hashes - Added certificate extraction and display from PKCS#7 signatures - Compute Authenticode hashes using the algorithm specified in the signature - Verify signatures mathematically using signer's public key (RSA/ECDSA) - Validate that computed PE hash matches the hash in SpcIndirectDataContent New functions: - _get_hash_algorithm_from_oid(): Maps OID strings to hash algorithms - _extract_pe_hash_from_spc_indirect_data(): Parses SPC structure for hash - _extract_certificates_from_pkcs7(): Extracts X.509 certificates - _verify_pkcs7_signature(): Performs full cryptographic verification - compute_authenticode_hash(): Flexible hash computation with configurable algorithm Enhanced functions: - validate_pkcs7_signatures(): Now performs cryptographic verification - main_verify(): Displays certificate details and verification status - main_combine(): Validates signatures cryptographically before combining Bug fixes: - Removed incorrect 8-byte padding from Authenticode hash calculation (padding only applies to WIN_CERTIFICATE structure alignment, not hash data) - Consolidated duplicate hash functions into single implementation Code improvements: - Named constants for all magic numbers in SPC parsing - Better documentation and inline comments - Proper type annotations with Optional types - Enhanced logging with ✓/✗ symbols for verification results Testing: - Verified against Microsoft-signed bootmgfw.efi files - Hash computation now matches Windows AppLocker and UEFI firmware - Both multi-signature and nested signature modes validated - All test cases pass with cryptographic verification Follows Microsoft Authenticode PE specification v1.1 --- scripts/authenticode_transplant.py | 480 ++++++++++++++++++++++++++--- 1 file changed, 431 insertions(+), 49 deletions(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index 24b6406b..827c45a2 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -33,9 +33,14 @@ import os import struct import sys -from typing import Dict, List, Protocol, Tuple, runtime_checkable +from typing import Dict, List, Optional, Protocol, Tuple, runtime_checkable import pefile +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa from pyasn1.codec.der import decoder, encoder from pyasn1.type import tag, univ from pyasn1_modules import rfc2315 @@ -83,62 +88,371 @@ def create_pe(self, filepath: str, fast_load: bool = False) -> pefile.PE: return pefile.PE(filepath, fast_load=fast_load) -def calculate_authenticode_hash(pe: pefile.PE) -> str: - """Calculate the SHA256 hash of a PE file for Authenticode signature verification. +def _get_hash_algorithm_from_oid(oid: str) -> hashes.HashAlgorithm | None: + """Map OID to cryptography hash algorithm.""" + oid_map = { + '2.16.840.1.101.3.4.2.1': hashes.SHA256(), + '2.16.840.1.101.3.4.2.2': hashes.SHA384(), + '2.16.840.1.101.3.4.2.3': hashes.SHA512(), + '1.3.14.3.2.26': hashes.SHA1(), + } + return oid_map.get(oid) + + +def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> tuple[bytes | None, str | None]: + """Extract PE file hash from SpcIndirectDataContent structure. + + SpcIndirectDataContent ::= SEQUENCE { + data SpcAttributeTypeAndOptionalValue, + messageDigest DigestInfo + } + + DigestInfo ::= SEQUENCE { + digestAlgorithm AlgorithmIdentifier, + digest OCTET STRING + } + + This follows the Microsoft Authenticode PE specification which defines + OID 1.3.6.1.4.1.311.2.1.4 for SpcIndirectDataContent. + + Args: + content_bytes: The contentInfo content from PKCS#7 SignedData + + Returns: + Tuple of (hash_bytes, algorithm_oid) or (None, None) if parsing fails + """ + # ASN.1 DER constants + ASN1_OCTET_STRING_TAG = 0x04 + + # Expected hash sizes (in bytes) + SHA1_HASH_SIZE = 20 + SHA256_HASH_SIZE = 32 + SHA384_HASH_SIZE = 48 + SHA512_HASH_SIZE = 64 + VALID_HASH_SIZES = {SHA1_HASH_SIZE, SHA256_HASH_SIZE, SHA384_HASH_SIZE, SHA512_HASH_SIZE} + + # DER-encoded OID prefixes for hash algorithms + # Format: [0x06 (OID tag), length, OID bytes...] + OID_SHA1_DER = b'\x06\x05\x2b\x0e\x03\x02\x1a' # 1.3.14.3.2.26 + OID_SHA256_DER = b'\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01' # 2.16.840.1.101.3.4.2.1 + + # String representations of OIDs + OID_SHA1_STRING = '1.3.14.3.2.26' + OID_SHA256_STRING = '2.16.840.1.101.3.4.2.1' + + # Search parameters + OID_SEARCH_WINDOW = 50 # Bytes to search before hash for algorithm OID + LENGTH_BYTE_OFFSET = 1 + DATA_START_OFFSET = 2 + + try: + # Scan through the content looking for OCTET STRING tags with hash-sized data + i = 0 + while i < len(content_bytes) - 10: + # Check if this is an OCTET STRING tag + if content_bytes[i] == ASN1_OCTET_STRING_TAG: + length = content_bytes[i + LENGTH_BYTE_OFFSET] + + # Check if length matches a known hash size + if length in VALID_HASH_SIZES and i + DATA_START_OFFSET + length <= len(content_bytes): + hash_bytes = content_bytes[i + DATA_START_OFFSET : i + DATA_START_OFFSET + length] + + # Search backwards for the algorithm OID + # The OID appears in the DigestInfo structure before the digest + algorithm_oid = None + search_start = max(0, i - OID_SEARCH_WINDOW) + search_region = content_bytes[search_start:i] + + if OID_SHA256_DER in search_region: + algorithm_oid = OID_SHA256_STRING + elif OID_SHA1_DER in search_region: + algorithm_oid = OID_SHA1_STRING + + logger.info(f"Extracted PE hash from SpcIndirectDataContent: {hash_bytes.hex()}") + if algorithm_oid: + logger.info(f"Hash algorithm OID: {algorithm_oid}") + return hash_bytes, algorithm_oid + i += 1 + + logger.debug("No hash found in SpcIndirectDataContent") + return None, None + + except Exception as e: + logger.debug(f"Failed to parse SpcIndirectDataContent: {e}") + return None, None + + +def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> list: + """Extract X.509 certificates from PKCS7 data.""" + certificates = [] + try: + # Try to decode as ContentInfo first + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + # If that fails, try decoding directly as SignedData + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Extract certificates if present + if signed_data['certificates'].hasValue(): + for cert_choice in signed_data['certificates']: + cert_der = encoder.encode(cert_choice['certificate']) + cert = x509.load_der_x509_certificate(cert_der, default_backend()) + certificates.append(cert) + + except Exception as e: + logger.debug(f"Failed to extract certificates: {e}") + + return certificates + + +def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> dict: + """Verify PKCS7 Authenticode signature against PE file. + + This function: + 1. Extracts the hash algorithm used by the signature + 2. Computes the Authenticode hash using that algorithm + 3. Verifies the computed hash matches the hash in SpcIndirectDataContent + 4. Cryptographically verifies the signature + + Args: + pkcs7_data: The PKCS#7 signature data + pe_data: The full PE file data + + Returns: + dict: Verification results with 'verified' boolean and list of 'signers' + """ + results = { + 'verified': False, + 'signers': [], + 'errors': [] + } + + try: + # Extract certificates from PKCS#7 + certificates = _extract_certificates_from_pkcs7(pkcs7_data) + if not certificates: + results['errors'].append("No certificates found in PKCS#7 signature") + return results + + # Decode PKCS7 structure + try: + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) + signed_data, _ = decoder.decode( + bytes(content_info['content']), + asn1Spec=rfc2315.SignedData() + ) + except Exception: + signed_data, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.SignedData()) + + # Extract the hash algorithm OID and embedded PE hash from SpcIndirectDataContent + authenticode_content_valid = False + embedded_pe_hash = None + hash_algorithm_oid = None + + if signed_data['contentInfo'].hasValue() and signed_data['contentInfo']['content'].hasValue(): + try: + content_bytes = bytes(signed_data['contentInfo']['content']) + + # Extract hash from SpcIndirectDataContent structure + embedded_pe_hash, hash_algorithm_oid = _extract_pe_hash_from_spc_indirect_data(content_bytes) + + if embedded_pe_hash and hash_algorithm_oid: + # Get the hash algorithm object + hash_algo = _get_hash_algorithm_from_oid(hash_algorithm_oid) + if hash_algo: + # Compute the Authenticode hash using the same algorithm + computed_hash = compute_authenticode_hash(pe_data, hash_algo) + logger.debug(f"Computed Authenticode hash: {computed_hash.hex()}") + + # Verify they match + if computed_hash == embedded_pe_hash: + authenticode_content_valid = True + logger.debug(f"[+] PE hash verified! (algorithm: {hash_algo.name})") + else: + logger.warning("[-] PE hash mismatch!") + logger.warning(f" Expected: {embedded_pe_hash.hex()}") + logger.warning(f" Computed: {computed_hash.hex()}") + results['errors'].append("PE hash mismatch between signature and computed hash") + else: + logger.warning(f"Unsupported hash algorithm OID: {hash_algorithm_oid}") + results['errors'].append(f"Unsupported hash algorithm: {hash_algorithm_oid}") + else: + logger.warning("Could not extract PE hash from SpcIndirectDataContent") + results['errors'].append("Could not parse SpcIndirectDataContent") + + except Exception as e: + logger.debug(f"Error checking SpcIndirectDataContent: {e}") + + if not authenticode_content_valid: + logger.warning("PE hash verification in SpcIndirectDataContent failed") + results['errors'].append("Could not verify PE hash in SpcIndirectDataContent") + + # Verify each signer + for signer_idx, signer_info in enumerate(signed_data['signerInfos']): + signer_result = { + 'index': signer_idx, + 'verified': False, + 'error': None + } + + try: + # Get digest algorithm + digest_alg_oid = str(signer_info['digestAlgorithm']['algorithm']) + hash_algorithm = _get_hash_algorithm_from_oid(digest_alg_oid) + + if not hash_algorithm: + signer_result['error'] = f"Unsupported digest algorithm: {digest_alg_oid}" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Get the encrypted digest (signature) + encrypted_digest = bytes(signer_info['encryptedDigest']) + + # Find the signer's certificate + serial_number = int(signer_info['issuerAndSerialNumber']['serialNumber']) + signer_cert = None + for cert in certificates: + if cert.serial_number == serial_number: + signer_cert = cert + break + + if not signer_cert: + signer_result['error'] = f"Signer certificate not found (serial: {serial_number})" + results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue + + # Determine what data to verify + if signer_info['authenticatedAttributes'].hasValue(): + # With authenticated attributes, we verify the signature against the authenticated attributes + # For Authenticode, the authenticated attributes contain a message digest of the + # SpcIndirectDataContent (which in turn contains the PE hash) + authenticated_attrs = signer_info['authenticatedAttributes'] + attrs_der = encoder.encode(authenticated_attrs) + # Replace IMPLICIT tag [0] (0xA0) with SET OF tag (0x31) + if attrs_der[0:1] == b'\xa0': + attrs_der = b'\x31' + attrs_der[1:] + data_to_verify = attrs_der + + # For Authenticode verification, we need to verify that the messageDigest attribute + # matches the hash of the contentInfo content (SpcIndirectDataContent) + # The SpcIndirectDataContent is what contains the PE hash + # We'll verify this by checking that the signature verifies against the authenticated attributes + # The firmware will separately verify the PE hash matches the SpcIndirectDataContent + else: + # No authenticated attributes - this is unusual for Authenticode but theoretically valid + # In this case, we would verify against the content directly + # However, for Authenticode, authenticated attributes are required + signer_result['error'] = "No authenticated attributes found - required for Authenticode" + results['errors'].append(signer_result['error']) + + # Verify signature + public_key = signer_cert.public_key() + + if isinstance(public_key, rsa.RSAPublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + padding.PKCS1v15(), + hash_algorithm + ) + signer_result['verified'] = True + elif isinstance(public_key, ec.EllipticCurvePublicKey): + public_key.verify( + encrypted_digest, + data_to_verify, + ec.ECDSA(hash_algorithm) + ) + signer_result['verified'] = True + else: + signer_result['error'] = f"Unsupported key type: {type(public_key)}" + results['errors'].append(signer_result['error']) + + except InvalidSignature: + signer_result['error'] = "Signature verification failed - invalid signature" + results['errors'].append(signer_result['error']) + except Exception as e: + signer_result['error'] = f"Verification error: {str(e)}" + results['errors'].append(signer_result['error']) + + results['signers'].append(signer_result) + + # Overall verification passes if all signers verified + results['verified'] = all(s['verified'] for s in results['signers']) and len(results['signers']) > 0 + + except Exception as e: + results['errors'].append(f"Failed to verify PKCS#7 signature: {str(e)}") + + return results + + +def compute_authenticode_hash(pe_data: bytes, hash_algorithm: Optional[object] = None) -> bytes: + """Compute Authenticode hash of PE data using specified algorithm. This function computes the hash of a PE file excluding specific fields that are modified during the signing process: the CheckSum field in the Optional Header - and the IMAGE_DIRECTORY_ENTRY_SECURITY directory entry (including the certificate - table itself). + and the IMAGE_DIRECTORY_ENTRY_SECURITY directory entry. Args: - pe: A pefile.PE object representing the parsed PE file. + pe_data: The raw PE file data (bytes) + hash_algorithm: A hash algorithm from cryptography.hazmat.primitives.hashes (e.g., hashes.SHA256()). + Defaults to SHA256 if not specified. Returns: - str: The hexadecimal string representation of the SHA256 hash of the PE file - data, excluding the CheckSum field and security directory/certificate data. + bytes: The computed hash as bytes Note: - This hash is used for Authenticode signature verification and follows the - Microsoft Authenticode specification for computing PE file hashes. + This follows the Microsoft Authenticode PE specification v1.1. + """ + from cryptography.hazmat.primitives import hashes as crypto_hashes - Specificiation can be found here: - https://aka.ms/AuthenticodeSpec + # Default to SHA-256 if no algorithm specified + if hash_algorithm is None: + hash_algorithm = crypto_hashes.SHA256() + + pe = pefile.PE(data=pe_data, fast_load=True) - This algorithm conforms to v1.1 of the specification. - """ security_directory = pe.OPTIONAL_HEADER.DATA_DIRECTORY[ pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"] - ] # Extract Security directory - checksum_offset = pe.OPTIONAL_HEADER.dump_dict()["CheckSum"]["FileOffset"] # CheckSum file offset - certificate_table_offset = security_directory.dump_dict()["VirtualAddress"][ - "FileOffset" - ] # IMAGE_DIRECTORY_ENTRY_SECURITY file offset + ] + checksum_offset = pe.OPTIONAL_HEADER.dump_dict()["CheckSum"]["FileOffset"] + certificate_table_offset = security_directory.dump_dict()["VirtualAddress"]["FileOffset"] certificate_virtual_addr = security_directory.VirtualAddress certificate_size = security_directory.Size - raw_data = pe.__data__ + hash_data = ( - raw_data[:checksum_offset] + raw_data[checksum_offset + 0x04 : certificate_table_offset] - ) # Skip OptionalHeader.CheckSum field and continue until IMAGE_DIRECTORY_ENTRY_SECURITY + pe_data[:checksum_offset] + pe_data[checksum_offset + 0x04 : certificate_table_offset] + ) hash_data += ( - raw_data[certificate_table_offset + 0x08 : certificate_virtual_addr] - + raw_data[certificate_virtual_addr + certificate_size :] - ) # Skip IMAGE_DIRECTORY_ENTRY_SECURITY and certificate - - # Ensure hash_data is aligned to 8-byte boundary per Authenticode specification - padding_needed = (8 - (len(hash_data) % 8)) % 8 - if padding_needed > 0: - hash_data += b'\x00' * padding_needed + pe_data[certificate_table_offset + 0x08 : certificate_virtual_addr] + + pe_data[certificate_virtual_addr + certificate_size :] + ) - return hashlib.sha256(hash_data).hexdigest() + # Map cryptography hash algorithm to hashlib + if isinstance(hash_algorithm, crypto_hashes.SHA256): + return hashlib.sha256(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA1): + return hashlib.sha1(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA384): + return hashlib.sha384(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA512): + return hashlib.sha512(hash_data).digest() + else: + raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}") def get_authenticode_hash(pe_path: str, fs: FileSystemInterface = None) -> str: """Calculate the proper Authenticode hash for a PE file. - This is a convenience wrapper around calculate_authenticode_hash() that + This is a convenience wrapper around compute_authenticode_hash() that handles file loading and cleanup. The hash is computed according to the - Microsoft Authenticode specification, excluding the CheckSum field and + Microsoft Authenticode specification v1.1, excluding the CheckSum field and security directory from the hash calculation. Args: @@ -155,13 +469,14 @@ def get_authenticode_hash(pe_path: str, fs: FileSystemInterface = None) -> str: if fs is None: fs = RealFileSystem() - pe = fs.create_pe(pe_path, fast_load=True) + # Read the raw PE data + with open(pe_path, 'rb') as f: + pe_data = f.read() - # Use the proper Authenticode hash calculation - hash_value = calculate_authenticode_hash(pe) - pe.close() + # Use the proper Authenticode hash calculation (defaults to SHA-256) + hash_bytes = compute_authenticode_hash(pe_data) - return hash_value + return hash_bytes.hex() def validate_pe_file(pe_path: str, fs: FileSystemInterface = None) -> pefile.PE: @@ -419,27 +734,29 @@ def extract_pkcs7_from_wincert(signature_data: bytes) -> bytes: return pkcs7_data -def validate_pkcs7_signatures(*pkcs7_data_list: bytes) -> Tuple[bytes, ...]: - """Validate multiple PKCS#7 signatures and return them for separate WIN_CERTIFICATE structures. +def validate_pkcs7_signatures(pe_data: bytes, *pkcs7_data_list: bytes) -> Tuple[bytes, ...]: + """Validate multiple PKCS#7 signatures cryptographically and return them for separate WIN_CERTIFICATE structures. - This function validates that all PKCS#7 structures are valid signedData and returns - them separately. We create multiple independent WIN_CERTIFICATE structures that UEFI - firmware can iterate through during Secure Boot validation. + This function validates that all PKCS#7 structures are valid signedData and performs + cryptographic verification of each signature against the PE file. + We create multiple independent WIN_CERTIFICATE structures that UEFI firmware can + iterate through during Secure Boot validation. IMPORTANT: This is NOT nested signatures (signtool /as). We do NOT nest one PKCS#7 inside another's unauthenticated attributes. Instead, we use the UEFI-standard approach of multiple WIN_CERTIFICATE structures. Args: + pe_data: The full PE file data (bytes) *pkcs7_data_list: Variable number of PKCS#7 signature data (each becomes a WIN_CERTIFICATE) Returns: Tuple[bytes, ...]: All PKCS#7 signatures, validated and ready to wrap Raises: - ValueError: If any PKCS#7 structure is invalid or not signedData type + ValueError: If any PKCS#7 structure is invalid, not signedData type, or fails cryptographic verification """ - logger.info("Validating PKCS#7 signatures...") + logger.info("Validating PKCS#7 signatures with cryptographic verification...") if len(pkcs7_data_list) < 2: raise ValueError("At least 2 PKCS#7 signatures are required for validation") @@ -449,6 +766,8 @@ def validate_pkcs7_signatures(*pkcs7_data_list: bytes) -> Tuple[bytes, ...]: # Decode all PKCS#7 structures to validate them try: for i, pkcs7_data in enumerate(pkcs7_data_list, 1): + logger.info(f"\n=== Validating Signature {i} ===") + content_info, _ = decoder.decode(pkcs7_data, asn1Spec=rfc2315.ContentInfo()) logger.info(f"Signature {i} type: {content_info['contentType']}") @@ -464,12 +783,36 @@ def validate_pkcs7_signatures(*pkcs7_data_list: bytes) -> Tuple[bytes, ...]: ) logger.info(f"Signature {i} size: {len(pkcs7_data)} bytes") + # Extract and log certificates + certificates = _extract_certificates_from_pkcs7(pkcs7_data) + logger.info(f"Certificates found: {len(certificates)}") + for cert_idx, cert in enumerate(certificates, 1): + logger.info(f" Certificate {cert_idx}:") + logger.info(f" Subject: {cert.subject.rfc4514_string()}") + logger.info(f" Issuer: {cert.issuer.rfc4514_string()}") + + # Perform cryptographic verification + logger.info("Performing cryptographic verification...") + verification_result = _verify_pkcs7_signature(pkcs7_data, pe_data) + + # Log verification results + if verification_result['verified']: + logger.info("[+] Signature cryptographically VERIFIED") + for signer in verification_result['signers']: + if signer['verified']: + logger.info(f" Signer {signer['index'] + 1}: VERIFIED") + else: + logger.error("[-] Signature verification FAILED") + for error in verification_result['errors']: + logger.error(f" - {error}") + raise ValueError(f"Signature {i} failed cryptographic verification: {verification_result['errors']}") + validated_signatures.append(pkcs7_data) logger.info( - f"All {len(validated_signatures)} signatures are valid - " - "returning separately for multiple WIN_CERTIFICATE structures" + f"\nAll {len(validated_signatures)} signatures validated successfully with cryptographic verification!" ) + logger.info("Returning signatures for multiple WIN_CERTIFICATE structures") return tuple(validated_signatures) @@ -996,7 +1339,10 @@ def main_combine(args: argparse.Namespace) -> int: logger.error("Nested signature mode requires at least 2 source files") return 1 - validated_pkcs7_list = validate_pkcs7_signatures(*all_pkcs7_data) + # Use the first PE file's data for verification + with open(args.sources[0], 'rb') as f: + pe_data = f.read() + validated_pkcs7_list = validate_pkcs7_signatures(pe_data, *all_pkcs7_data) logger.info( f"All {len(validated_pkcs7_list)} signatures validated successfully for nested combination!" ) @@ -1020,7 +1366,10 @@ def main_combine(args: argparse.Namespace) -> int: logger.info(f" Output: {args.output}") logger.info("") else: - validated_pkcs7_list = validate_pkcs7_signatures(*all_pkcs7_data) + # Use the first PE file's data for verification + with open(args.sources[0], 'rb') as f: + pe_data = f.read() + validated_pkcs7_list = validate_pkcs7_signatures(pe_data, *all_pkcs7_data) logger.info(f"All {len(validated_pkcs7_list)} signatures validated successfully!") # Save individual validated signatures consistently @@ -1086,6 +1435,10 @@ def main_verify(args: argparse.Namespace) -> int: logger.info(f"Authenticode hash (SHA256): {pe_hash}") logger.info("") + # Read PE file data for cryptographic verification + with open(args.source, 'rb') as f: + pe_data = f.read() + # Extract signatures sig_data, blocks, offset, total_size = extract_all_signatures(args.source) logger.info(f"Security Directory: offset=0x{offset:x}, size={total_size} bytes") @@ -1132,8 +1485,37 @@ def main_verify(args: argparse.Namespace) -> int: else: num_certs = 0 logger.info(f" Number of certificates: {num_certs}") + + # Extract and display certificates + certificates = _extract_certificates_from_pkcs7(pkcs7_data) + if certificates: + logger.info(" Certificate details:") + for cert_idx, cert in enumerate(certificates, 1): + logger.info(f" Certificate {cert_idx}:") + logger.info(f" Subject: {cert.subject.rfc4514_string()}") + logger.info(f" Issuer: {cert.issuer.rfc4514_string()}") + logger.info(f" Serial: {cert.serial_number}") + logger.info(f" Valid: {cert.not_valid_before_utc} to {cert.not_valid_after_utc}") + + # Perform cryptographic verification + logger.info(" Cryptographic Verification:") + verification_result = _verify_pkcs7_signature(pkcs7_data, pe_data) + + if verification_result['verified']: + logger.info(" [+] Signature is cryptographically VALID") + for signer in verification_result['signers']: + if signer['verified']: + logger.info(f" Signer {signer['index'] + 1}: VERIFIED") + else: + logger.warning(" [-] Signature verification FAILED") + for error in verification_result['errors']: + logger.warning(f" - {error}") + except Exception as e: logger.warning(f" Could not parse PKCS#7 structure: {e}") + if args.debug: + import traceback + traceback.print_exc() logger.info("") From 75dd6e45d4f177c4aefdaddbd1c9e67aaa1b9da9 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:08:59 -0800 Subject: [PATCH 02/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index 827c45a2..9cfb491f 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -167,6 +167,10 @@ def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> tuple[bytes algorithm_oid = OID_SHA256_STRING elif OID_SHA1_DER in search_region: algorithm_oid = OID_SHA1_STRING + elif OID_SHA384_DER in search_region: + algorithm_oid = OID_SHA384_STRING + elif OID_SHA512_DER in search_region: + algorithm_oid = OID_SHA512_STRING logger.info(f"Extracted PE hash from SpcIndirectDataContent: {hash_bytes.hex()}") if algorithm_oid: From 1351fcd9cfe60948fd08d68664e4198df638b7bd Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:09:10 -0800 Subject: [PATCH 03/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index 9cfb491f..e8ab8ba0 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -186,7 +186,7 @@ def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> tuple[bytes return None, None -def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> list: +def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> List[x509.Certificate]: """Extract X.509 certificates from PKCS7 data.""" certificates = [] try: From dc8031cad65e77e44e3b432a48e23da30a64336c Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:09:18 -0800 Subject: [PATCH 04/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index e8ab8ba0..a43097bc 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -88,7 +88,7 @@ def create_pe(self, filepath: str, fast_load: bool = False) -> pefile.PE: return pefile.PE(filepath, fast_load=fast_load) -def _get_hash_algorithm_from_oid(oid: str) -> hashes.HashAlgorithm | None: +def _get_hash_algorithm_from_oid(oid: str) -> Optional[hashes.HashAlgorithm]: """Map OID to cryptography hash algorithm.""" oid_map = { '2.16.840.1.101.3.4.2.1': hashes.SHA256(), From be5755a919ed87a8c756d2e54d23e3a6d679da9c Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:09:33 -0800 Subject: [PATCH 05/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index a43097bc..e6c1b2b3 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -387,8 +387,12 @@ def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> dict: results['signers'].append(signer_result) - # Overall verification passes if all signers verified - results['verified'] = all(s['verified'] for s in results['signers']) and len(results['signers']) > 0 + # Overall verification passes if all signers verified AND PE hash is valid + results['verified'] = ( + all(s['verified'] for s in results['signers']) + and len(results['signers']) > 0 + and authenticode_content_valid + ) except Exception as e: results['errors'].append(f"Failed to verify PKCS#7 signature: {str(e)}") From 9680d080d2b0807df6df7918204e331731369fd2 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:09:42 -0800 Subject: [PATCH 06/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index e6c1b2b3..393be462 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -355,6 +355,8 @@ def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> dict: # However, for Authenticode, authenticated attributes are required signer_result['error'] = "No authenticated attributes found - required for Authenticode" results['errors'].append(signer_result['error']) + results['signers'].append(signer_result) + continue # Verify signature public_key = signer_cert.public_key() From 5ee92676d46e38f1718262ac6768ad4aa9dc132b Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:10:08 -0800 Subject: [PATCH 07/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index 393be462..abd7ef5f 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -99,7 +99,7 @@ def _get_hash_algorithm_from_oid(oid: str) -> Optional[hashes.HashAlgorithm]: return oid_map.get(oid) -def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> tuple[bytes | None, str | None]: +def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> Tuple[Optional[bytes], Optional[str]]: """Extract PE file hash from SpcIndirectDataContent structure. SpcIndirectDataContent ::= SEQUENCE { From 91c807c95558021571e4ae240ea6c3780e63bf31 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Mon, 1 Dec 2025 15:10:20 -0800 Subject: [PATCH 08/10] Update scripts/authenticode_transplant.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- scripts/authenticode_transplant.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index abd7ef5f..9847a7a1 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -480,8 +480,7 @@ def get_authenticode_hash(pe_path: str, fs: FileSystemInterface = None) -> str: fs = RealFileSystem() # Read the raw PE data - with open(pe_path, 'rb') as f: - pe_data = f.read() + pe_data = fs.read_binary_file(pe_path) # Use the proper Authenticode hash calculation (defaults to SHA-256) hash_bytes = compute_authenticode_hash(pe_data) From 0cee1147c968f791c1cb947ae0355f7657e01885 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Wed, 21 Jan 2026 10:10:53 -0800 Subject: [PATCH 09/10] Add missing SHA384 and SHA512 OID constants Fix ruff linting errors by adding the missing OID constants: - OID_SHA384_DER and OID_SHA384_STRING for SHA-384 hash algorithm - OID_SHA512_DER and OID_SHA512_STRING for SHA-512 hash algorithm These constants are referenced in the hash extraction logic but were not defined. --- scripts/authenticode_transplant.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index 9847a7a1..b5cce525 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -135,10 +135,14 @@ def _extract_pe_hash_from_spc_indirect_data(content_bytes: bytes) -> Tuple[Optio # Format: [0x06 (OID tag), length, OID bytes...] OID_SHA1_DER = b'\x06\x05\x2b\x0e\x03\x02\x1a' # 1.3.14.3.2.26 OID_SHA256_DER = b'\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01' # 2.16.840.1.101.3.4.2.1 + OID_SHA384_DER = b'\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02' # 2.16.840.1.101.3.4.2.2 + OID_SHA512_DER = b'\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03' # 2.16.840.1.101.3.4.2.3 # String representations of OIDs OID_SHA1_STRING = '1.3.14.3.2.26' OID_SHA256_STRING = '2.16.840.1.101.3.4.2.1' + OID_SHA384_STRING = '2.16.840.1.101.3.4.2.2' + OID_SHA512_STRING = '2.16.840.1.101.3.4.2.3' # Search parameters OID_SEARCH_WINDOW = 50 # Bytes to search before hash for algorithm OID From 600df89cb79b222458bfa1df0048ef8dfb434db6 Mon Sep 17 00:00:00 2001 From: Doug Flick Date: Wed, 21 Jan 2026 11:19:34 -0800 Subject: [PATCH 10/10] Fix resource leak and improve type safety - Add try-finally block in compute_authenticode_hash to ensure PE object is properly closed - Add cryptography==43.0.0 to pip-requirements.txt (missing dependency) - Improve type annotations: change dict to Dict[str, Any] for _verify_pkcs7_signature - Enhance docstring to document return dictionary structure --- pip-requirements.txt | 3 +- scripts/authenticode_transplant.py | 64 ++++++++++++++++-------------- 2 files changed, 37 insertions(+), 30 deletions(-) diff --git a/pip-requirements.txt b/pip-requirements.txt index 811a9e20..624ec532 100644 --- a/pip-requirements.txt +++ b/pip-requirements.txt @@ -4,4 +4,5 @@ ruff==0.14.5 pytest==9.0.1 pefile==2024.8.26 pyasn1==0.6.1 -pyasn1_modules==0.4.2 \ No newline at end of file +pyasn1_modules==0.4.2 +cryptography==43.0.0 \ No newline at end of file diff --git a/scripts/authenticode_transplant.py b/scripts/authenticode_transplant.py index b5cce525..1fa8fce7 100644 --- a/scripts/authenticode_transplant.py +++ b/scripts/authenticode_transplant.py @@ -33,7 +33,7 @@ import os import struct import sys -from typing import Dict, List, Optional, Protocol, Tuple, runtime_checkable +from typing import Any, Dict, List, Optional, Protocol, Tuple, runtime_checkable import pefile from cryptography import x509 @@ -218,7 +218,7 @@ def _extract_certificates_from_pkcs7(pkcs7_data: bytes) -> List[x509.Certificate return certificates -def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> dict: +def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> Dict[str, Any]: """Verify PKCS7 Authenticode signature against PE file. This function: @@ -232,7 +232,10 @@ def _verify_pkcs7_signature(pkcs7_data: bytes, pe_data: bytes) -> dict: pe_data: The full PE file data Returns: - dict: Verification results with 'verified' boolean and list of 'signers' + Dict[str, Any]: Verification results with keys: + - 'verified' (bool): Overall verification status + - 'signers' (List[Dict]): List of signer verification results + - 'errors' (List[str]): List of error messages """ results = { 'verified': False, @@ -432,33 +435,36 @@ def compute_authenticode_hash(pe_data: bytes, hash_algorithm: Optional[object] = pe = pefile.PE(data=pe_data, fast_load=True) - security_directory = pe.OPTIONAL_HEADER.DATA_DIRECTORY[ - pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"] - ] - checksum_offset = pe.OPTIONAL_HEADER.dump_dict()["CheckSum"]["FileOffset"] - certificate_table_offset = security_directory.dump_dict()["VirtualAddress"]["FileOffset"] - certificate_virtual_addr = security_directory.VirtualAddress - certificate_size = security_directory.Size - - hash_data = ( - pe_data[:checksum_offset] + pe_data[checksum_offset + 0x04 : certificate_table_offset] - ) - hash_data += ( - pe_data[certificate_table_offset + 0x08 : certificate_virtual_addr] - + pe_data[certificate_virtual_addr + certificate_size :] - ) + try: + security_directory = pe.OPTIONAL_HEADER.DATA_DIRECTORY[ + pefile.DIRECTORY_ENTRY["IMAGE_DIRECTORY_ENTRY_SECURITY"] + ] + checksum_offset = pe.OPTIONAL_HEADER.dump_dict()["CheckSum"]["FileOffset"] + certificate_table_offset = security_directory.dump_dict()["VirtualAddress"]["FileOffset"] + certificate_virtual_addr = security_directory.VirtualAddress + certificate_size = security_directory.Size + + hash_data = ( + pe_data[:checksum_offset] + pe_data[checksum_offset + 0x04 : certificate_table_offset] + ) + hash_data += ( + pe_data[certificate_table_offset + 0x08 : certificate_virtual_addr] + + pe_data[certificate_virtual_addr + certificate_size :] + ) - # Map cryptography hash algorithm to hashlib - if isinstance(hash_algorithm, crypto_hashes.SHA256): - return hashlib.sha256(hash_data).digest() - elif isinstance(hash_algorithm, crypto_hashes.SHA1): - return hashlib.sha1(hash_data).digest() - elif isinstance(hash_algorithm, crypto_hashes.SHA384): - return hashlib.sha384(hash_data).digest() - elif isinstance(hash_algorithm, crypto_hashes.SHA512): - return hashlib.sha512(hash_data).digest() - else: - raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}") + # Map cryptography hash algorithm to hashlib + if isinstance(hash_algorithm, crypto_hashes.SHA256): + return hashlib.sha256(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA1): + return hashlib.sha1(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA384): + return hashlib.sha384(hash_data).digest() + elif isinstance(hash_algorithm, crypto_hashes.SHA512): + return hashlib.sha512(hash_data).digest() + else: + raise ValueError(f"Unsupported hash algorithm: {hash_algorithm}") + finally: + pe.close() def get_authenticode_hash(pe_path: str, fs: FileSystemInterface = None) -> str: