diff --git a/CHANGELOG.md b/CHANGELOG.md index e049a095..71a1b90e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- eliminate false positives for valid ExecuTorch FlatBuffers binaries and file-type validation on public `.pte` models - eliminate Keras ZIP false positives for safe built-in and allowlisted serialized objects such as `Add` and `NotEqual` - **security:** remove `dill.load` / `dill.loads` from the pickle safe-global allowlist so recursive dill deserializers stay flagged as dangerous loader entry points - **security:** add exact dangerous helper coverage for validated torch and NumPy refs such as `numpy.f2py.crackfortran.getlincoef`, `torch._dynamo.guards.GuardBuilder.get`, and `torch.utils.collect_env.run` diff --git a/modelaudit/scanners/executorch_scanner.py b/modelaudit/scanners/executorch_scanner.py index 102369ea..40a98c38 100644 --- a/modelaudit/scanners/executorch_scanner.py +++ b/modelaudit/scanners/executorch_scanner.py @@ -7,6 +7,7 @@ from typing import Any, ClassVar from ..utils import sanitize_archive_path +from ..utils.file.detection import _is_executorch_binary_signature, _is_valid_executorch_binary from .base import BaseScanner, IssueSeverity, ScanResult from .pickle_scanner import PickleScanner @@ -50,7 +51,19 @@ def scan(self, path: str) -> ScanResult: file_size = self.get_file_size(path) result.metadata["file_size"] = file_size - header = self._read_header(path) + header = self._read_header(path, length=8) + if _is_executorch_binary_signature(header) and _is_valid_executorch_binary(path): + result.add_check( + name="ExecuTorch Binary Format Validation", + passed=True, + message="Valid ExecuTorch binary program format detected", + location=path, + details={"path": path, "format": "executorch_binary"}, + ) + result.bytes_scanned = file_size + result.finish(success=True) + return result + if not header.startswith(b"PK"): result.add_check( name="ExecuTorch Archive Format Validation", diff --git a/modelaudit/utils/file/detection.py b/modelaudit/utils/file/detection.py index 33793ca5..1c25e7a3 100644 --- a/modelaudit/utils/file/detection.py +++ b/modelaudit/utils/file/detection.py @@ -293,6 +293,62 @@ def _is_lightgbm_signature(prefix: bytes) -> bool: return (starts_with_tree or "tree=" in preview) and header_hits >= 3 and tree_hits >= 2 and not xgboost_like +def _is_executorch_binary_signature(prefix: bytes) -> bool: + """Recognize versioned ExecuTorch FlatBuffers binaries by their file identifier.""" + return len(prefix) >= 8 and prefix[4:6] == b"ET" and prefix[6:8].isdigit() + + +def _is_valid_executorch_binary(path: str | Path) -> bool: + """Validate the minimal FlatBuffers structure for ExecuTorch binaries.""" + file_path = Path(path) + if not file_path.is_file(): + return False + + try: + file_size = file_path.stat().st_size + if file_size < 16: + return False + + with file_path.open("rb") as f: + header = f.read(8) + if not _is_executorch_binary_signature(header): + return False + + root_table_offset = struct.unpack(" file_size: + return False + + f.seek(root_table_offset) + table_header = f.read(4) + if len(table_header) != 4: + return False + + vtable_back_offset = struct.unpack(" root_table_offset: + return False + + vtable_offset = root_table_offset - vtable_back_offset + if vtable_offset < 8 or vtable_offset + 4 > file_size: + return False + + f.seek(vtable_offset) + vtable_header = f.read(4) + if len(vtable_header) != 4: + return False + + vtable_size, object_size = struct.unpack(" file_size: + return False + if root_table_offset + object_size > file_size: + return False + except (OSError, struct.error): + return False + + return True + + def _is_zlib_header(prefix: bytes) -> bool: if len(prefix) < 2: return False @@ -416,6 +472,9 @@ def detect_file_format_from_magic(path: str) -> str: magic8 = header[:8] magic16 = header[:16] + if _is_executorch_binary_signature(magic8) and _is_valid_executorch_binary(file_path): + return "executorch" + # Try the new pattern matching approach first format_result = detect_format_from_magic_bytes(magic4, magic8, magic16) if format_result == "zip" and file_path.suffix.lower() == ".mar" and is_torchserve_mar_archive(path): @@ -949,9 +1008,9 @@ def validate_file_type(path: str) -> bool: if ext_format == "nemo" and header_format == "tar": return True - # ExecuTorch files should be zip archives + # ExecuTorch files may be ZIP archives or valid FlatBuffers binaries. if ext_format == "executorch": - return header_format == "zip" + return header_format == "zip" or _is_valid_executorch_binary(path) # Keras files can be either ZIP (Keras 3.x) or HDF5 (legacy Keras) if ext_format == "keras": diff --git a/tests/conftest.py b/tests/conftest.py index 848f456b..599cd7ba 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -102,6 +102,7 @@ def pytest_runtest_setup(item): "test_mxnet_scanner.py", # MXNet scanner tests "test_tf_metagraph_scanner.py", # TensorFlow MetaGraph scanner tests "test_torchserve_mar_scanner.py", # TorchServe .mar scanner tests + "test_executorch_scanner.py", # ExecuTorch scanner tests "test_telemetry.py", # telemetry payload and availability tests "test_telemetry_decoupling.py", # telemetry failure-isolation tests "test_debug_command.py", # debug output telemetry flags diff --git a/tests/scanners/test_executorch_scanner.py b/tests/scanners/test_executorch_scanner.py index b026e287..19a611ae 100644 --- a/tests/scanners/test_executorch_scanner.py +++ b/tests/scanners/test_executorch_scanner.py @@ -6,6 +6,13 @@ from modelaudit.scanners.executorch_scanner import ExecuTorchScanner +def create_executorch_binary(tmp_path: Path, *, identifier: bytes = b"ET12") -> Path: + binary_path = tmp_path / "program.pte" + # Minimal valid FlatBuffer with the ExecuTorch file identifier. + binary_path.write_bytes(b"\x0c\x00\x00\x00" + identifier + b"\x04\x00\x04\x00\x04\x00\x00\x00") + return binary_path + + def create_executorch_archive(tmp_path: Path, *, malicious: bool = False) -> Path: zip_path = tmp_path / "model.ptl" with zipfile.ZipFile(zip_path, "w") as z: @@ -22,7 +29,7 @@ def __reduce__(self): return zip_path -def test_executorch_scanner_can_handle(tmp_path): +def test_executorch_scanner_can_handle(tmp_path: Path) -> None: path = create_executorch_archive(tmp_path) assert ExecuTorchScanner.can_handle(str(path)) other = tmp_path / "model.h5" @@ -30,7 +37,7 @@ def test_executorch_scanner_can_handle(tmp_path): assert not ExecuTorchScanner.can_handle(str(other)) -def test_executorch_scanner_safe_model(tmp_path): +def test_executorch_scanner_safe_model(tmp_path: Path) -> None: path = create_executorch_archive(tmp_path) scanner = ExecuTorchScanner() result = scanner.scan(str(path)) @@ -40,7 +47,7 @@ def test_executorch_scanner_safe_model(tmp_path): assert not critical -def test_executorch_scanner_malicious(tmp_path): +def test_executorch_scanner_malicious(tmp_path: Path) -> None: path = create_executorch_archive(tmp_path, malicious=True) scanner = ExecuTorchScanner() result = scanner.scan(str(path)) @@ -48,10 +55,41 @@ def test_executorch_scanner_malicious(tmp_path): assert any("eval" in i.message.lower() for i in result.issues) -def test_executorch_scanner_invalid_zip(tmp_path): +def test_executorch_scanner_invalid_zip(tmp_path: Path) -> None: file_path = tmp_path / "bad.ptl" file_path.write_bytes(b"not zip") scanner = ExecuTorchScanner() result = scanner.scan(str(file_path)) assert not result.success assert any("executorch" in i.message.lower() for i in result.issues) + + +def test_executorch_scanner_accepts_binary_program_header(tmp_path: Path) -> None: + file_path = create_executorch_binary(tmp_path) + scanner = ExecuTorchScanner() + result = scanner.scan(str(file_path)) + assert result.success is True + assert result.bytes_scanned == file_path.stat().st_size + assert not any("not a valid executorch archive" in issue.message.lower() for issue in result.issues) + assert not any("file type validation failed" in issue.message.lower() for issue in result.issues) + + +def test_executorch_scanner_accepts_versioned_binary_program_header(tmp_path: Path) -> None: + file_path = create_executorch_binary(tmp_path, identifier=b"ET13") + scanner = ExecuTorchScanner() + result = scanner.scan(str(file_path)) + + assert result.success is True + assert result.bytes_scanned == file_path.stat().st_size + assert not result.issues + + +def test_executorch_scanner_rejects_invalid_binary_signature_match(tmp_path: Path) -> None: + file_path = tmp_path / "fake-program.pte" + file_path.write_bytes(b"JUNKET12notflatbufferatall") + + scanner = ExecuTorchScanner() + result = scanner.scan(str(file_path)) + + assert result.success is False + assert any(issue.rule_code == "S104" for issue in result.issues) diff --git a/tests/utils/file/test_filetype.py b/tests/utils/file/test_filetype.py index 8d5a92f3..f372dde8 100644 --- a/tests/utils/file/test_filetype.py +++ b/tests/utils/file/test_filetype.py @@ -256,6 +256,22 @@ def test_detect_torch7_formats_by_signature(tmp_path: Path) -> None: assert detect_file_format_from_magic(str(torch7_path)) == "torch7" assert validate_file_type(str(torch7_path)) is True + +def test_detect_executorch_binary_requires_valid_flatbuffer_structure(tmp_path: Path) -> None: + executorch_path = tmp_path / "program.pte" + executorch_path.write_bytes(b"\x0c\x00\x00\x00ET13\x04\x00\x04\x00\x04\x00\x00\x00") + + assert detect_file_format(str(executorch_path)) == "executorch" + assert detect_file_format_from_magic(str(executorch_path)) == "executorch" + assert validate_file_type(str(executorch_path)) is True + + fake_executorch_path = tmp_path / "fake-program.pte" + fake_executorch_path.write_bytes(b"JUNKET12notflatbufferatall") + + assert detect_file_format(str(fake_executorch_path)) == "executorch" + assert detect_file_format_from_magic(str(fake_executorch_path)) == "unknown" + assert validate_file_type(str(fake_executorch_path)) is False + fake_torch7 = tmp_path / "fake.t7" fake_torch7.write_text("not torch7") assert detect_file_format(str(fake_torch7)) == "unknown" @@ -561,6 +577,18 @@ def test_validate_file_type(tmp_path): mar.writestr("weights.bin", b"weights") mar.writestr("handler.py", b"def handle(data, context):\n return data\n") assert validate_file_type(str(mar_path)) is True + + # ExecuTorch binaries require a valid FlatBuffers layout in addition to the file identifier. + executorch_path = tmp_path / "program.pte" + executorch_path.write_bytes(b"\x0c\x00\x00\x00ET13\x04\x00\x04\x00\x04\x00\x00\x00") + assert detect_file_format_from_magic(str(executorch_path)) == "executorch" + assert validate_file_type(str(executorch_path)) is True + + invalid_executorch_path = tmp_path / "invalid-program.pte" + invalid_executorch_path.write_bytes(b"\x0c\x00\x00\x00ETAA\x04\x00\x04\x00\x04\x00\x00\x00") + assert detect_file_format_from_magic(str(invalid_executorch_path)) == "unknown" + assert validate_file_type(str(invalid_executorch_path)) is False + # Llamafile wrappers validate by extension with scanner-level marker checks. llamafile_path = tmp_path / "model.llamafile" llamafile_path.write_bytes(b"\x7fELF" + b"\x00" * 32 + b"llamafile")