Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- eliminate false positives for valid ExecuTorch FlatBuffers binaries and file-type validation on public `.pte` models
- eliminate Keras ZIP false positives for safe built-in and allowlisted serialized objects such as `Add` and `NotEqual`
- **security:** remove `dill.load` / `dill.loads` from the pickle safe-global allowlist so recursive dill deserializers stay flagged as dangerous loader entry points
- **security:** add exact dangerous helper coverage for validated torch and NumPy refs such as `numpy.f2py.crackfortran.getlincoef`, `torch._dynamo.guards.GuardBuilder.get`, and `torch.utils.collect_env.run`
Expand Down
15 changes: 14 additions & 1 deletion modelaudit/scanners/executorch_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, ClassVar

from ..utils import sanitize_archive_path
from ..utils.file.detection import _is_executorch_binary_signature, _is_valid_executorch_binary
from .base import BaseScanner, IssueSeverity, ScanResult
from .pickle_scanner import PickleScanner

Expand Down Expand Up @@ -50,7 +51,19 @@ def scan(self, path: str) -> ScanResult:
file_size = self.get_file_size(path)
result.metadata["file_size"] = file_size

header = self._read_header(path)
header = self._read_header(path, length=8)
if _is_executorch_binary_signature(header) and _is_valid_executorch_binary(path):
result.add_check(
name="ExecuTorch Binary Format Validation",
passed=True,
message="Valid ExecuTorch binary program format detected",
location=path,
details={"path": path, "format": "executorch_binary"},
)
result.bytes_scanned = file_size
result.finish(success=True)
return result

if not header.startswith(b"PK"):
result.add_check(
name="ExecuTorch Archive Format Validation",
Expand Down
63 changes: 61 additions & 2 deletions modelaudit/utils/file/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,62 @@ def _is_lightgbm_signature(prefix: bytes) -> bool:
return (starts_with_tree or "tree=" in preview) and header_hits >= 3 and tree_hits >= 2 and not xgboost_like


def _is_executorch_binary_signature(prefix: bytes) -> bool:
"""Recognize versioned ExecuTorch FlatBuffers binaries by their file identifier."""
return len(prefix) >= 8 and prefix[4:6] == b"ET" and prefix[6:8].isdigit()


def _is_valid_executorch_binary(path: str | Path) -> bool:
"""Validate the minimal FlatBuffers structure for ExecuTorch binaries."""
file_path = Path(path)
if not file_path.is_file():
return False

try:
file_size = file_path.stat().st_size
if file_size < 16:
return False

with file_path.open("rb") as f:
header = f.read(8)
if not _is_executorch_binary_signature(header):
return False

root_table_offset = struct.unpack("<I", header[:4])[0]
if root_table_offset < 12 or root_table_offset + 4 > file_size:
return False

f.seek(root_table_offset)
table_header = f.read(4)
if len(table_header) != 4:
return False

vtable_back_offset = struct.unpack("<i", table_header)[0]
if vtable_back_offset <= 0 or vtable_back_offset > root_table_offset:
return False

vtable_offset = root_table_offset - vtable_back_offset
if vtable_offset < 8 or vtable_offset + 4 > file_size:
return False

f.seek(vtable_offset)
vtable_header = f.read(4)
if len(vtable_header) != 4:
return False

vtable_size, object_size = struct.unpack("<HH", vtable_header)
if vtable_size < 4 or object_size < 4:
return False
if vtable_offset + vtable_size > file_size:
return False
if root_table_offset + object_size > file_size:
return False
except (OSError, struct.error):
return False

return True


def _is_zlib_header(prefix: bytes) -> bool:
if len(prefix) < 2:
return False
Expand Down Expand Up @@ -416,6 +472,9 @@ def detect_file_format_from_magic(path: str) -> str:
magic8 = header[:8]
magic16 = header[:16]

if _is_executorch_binary_signature(magic8) and _is_valid_executorch_binary(file_path):
return "executorch"

# Try the new pattern matching approach first
format_result = detect_format_from_magic_bytes(magic4, magic8, magic16)
if format_result == "zip" and file_path.suffix.lower() == ".mar" and is_torchserve_mar_archive(path):
Expand Down Expand Up @@ -949,9 +1008,9 @@ def validate_file_type(path: str) -> bool:
if ext_format == "nemo" and header_format == "tar":
return True

# ExecuTorch files should be zip archives
# ExecuTorch files may be ZIP archives or valid FlatBuffers binaries.
if ext_format == "executorch":
return header_format == "zip"
return header_format == "zip" or _is_valid_executorch_binary(path)

# Keras files can be either ZIP (Keras 3.x) or HDF5 (legacy Keras)
if ext_format == "keras":
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def pytest_runtest_setup(item):
"test_mxnet_scanner.py", # MXNet scanner tests
"test_tf_metagraph_scanner.py", # TensorFlow MetaGraph scanner tests
"test_torchserve_mar_scanner.py", # TorchServe .mar scanner tests
"test_executorch_scanner.py", # ExecuTorch scanner tests
"test_telemetry.py", # telemetry payload and availability tests
"test_telemetry_decoupling.py", # telemetry failure-isolation tests
"test_debug_command.py", # debug output telemetry flags
Expand Down
46 changes: 42 additions & 4 deletions tests/scanners/test_executorch_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from modelaudit.scanners.executorch_scanner import ExecuTorchScanner


def create_executorch_binary(tmp_path: Path, *, identifier: bytes = b"ET12") -> Path:
binary_path = tmp_path / "program.pte"
# Minimal valid FlatBuffer with the ExecuTorch file identifier.
binary_path.write_bytes(b"\x0c\x00\x00\x00" + identifier + b"\x04\x00\x04\x00\x04\x00\x00\x00")
return binary_path


def create_executorch_archive(tmp_path: Path, *, malicious: bool = False) -> Path:
zip_path = tmp_path / "model.ptl"
with zipfile.ZipFile(zip_path, "w") as z:
Expand All @@ -22,15 +29,15 @@ def __reduce__(self):
return zip_path


def test_executorch_scanner_can_handle(tmp_path):
def test_executorch_scanner_can_handle(tmp_path: Path) -> None:
path = create_executorch_archive(tmp_path)
assert ExecuTorchScanner.can_handle(str(path))
other = tmp_path / "model.h5"
other.write_bytes(b"data")
assert not ExecuTorchScanner.can_handle(str(other))


def test_executorch_scanner_safe_model(tmp_path):
def test_executorch_scanner_safe_model(tmp_path: Path) -> None:
path = create_executorch_archive(tmp_path)
scanner = ExecuTorchScanner()
result = scanner.scan(str(path))
Expand All @@ -40,18 +47,49 @@ def test_executorch_scanner_safe_model(tmp_path):
assert not critical


def test_executorch_scanner_malicious(tmp_path):
def test_executorch_scanner_malicious(tmp_path: Path) -> None:
path = create_executorch_archive(tmp_path, malicious=True)
scanner = ExecuTorchScanner()
result = scanner.scan(str(path))
assert any(i.severity == IssueSeverity.CRITICAL for i in result.issues)
assert any("eval" in i.message.lower() for i in result.issues)


def test_executorch_scanner_invalid_zip(tmp_path):
def test_executorch_scanner_invalid_zip(tmp_path: Path) -> None:
file_path = tmp_path / "bad.ptl"
file_path.write_bytes(b"not zip")
scanner = ExecuTorchScanner()
result = scanner.scan(str(file_path))
assert not result.success
assert any("executorch" in i.message.lower() for i in result.issues)


def test_executorch_scanner_accepts_binary_program_header(tmp_path: Path) -> None:
file_path = create_executorch_binary(tmp_path)
scanner = ExecuTorchScanner()
result = scanner.scan(str(file_path))
assert result.success is True
assert result.bytes_scanned == file_path.stat().st_size
assert not any("not a valid executorch archive" in issue.message.lower() for issue in result.issues)
assert not any("file type validation failed" in issue.message.lower() for issue in result.issues)


def test_executorch_scanner_accepts_versioned_binary_program_header(tmp_path: Path) -> None:
file_path = create_executorch_binary(tmp_path, identifier=b"ET13")
scanner = ExecuTorchScanner()
result = scanner.scan(str(file_path))

assert result.success is True
assert result.bytes_scanned == file_path.stat().st_size
assert not result.issues


def test_executorch_scanner_rejects_invalid_binary_signature_match(tmp_path: Path) -> None:
file_path = tmp_path / "fake-program.pte"
file_path.write_bytes(b"JUNKET12notflatbufferatall")

scanner = ExecuTorchScanner()
result = scanner.scan(str(file_path))

assert result.success is False
assert any(issue.rule_code == "S104" for issue in result.issues)
28 changes: 28 additions & 0 deletions tests/utils/file/test_filetype.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,22 @@ def test_detect_torch7_formats_by_signature(tmp_path: Path) -> None:
assert detect_file_format_from_magic(str(torch7_path)) == "torch7"
assert validate_file_type(str(torch7_path)) is True


def test_detect_executorch_binary_requires_valid_flatbuffer_structure(tmp_path: Path) -> None:
executorch_path = tmp_path / "program.pte"
executorch_path.write_bytes(b"\x0c\x00\x00\x00ET13\x04\x00\x04\x00\x04\x00\x00\x00")

assert detect_file_format(str(executorch_path)) == "executorch"
assert detect_file_format_from_magic(str(executorch_path)) == "executorch"
assert validate_file_type(str(executorch_path)) is True

fake_executorch_path = tmp_path / "fake-program.pte"
fake_executorch_path.write_bytes(b"JUNKET12notflatbufferatall")

assert detect_file_format(str(fake_executorch_path)) == "executorch"
assert detect_file_format_from_magic(str(fake_executorch_path)) == "unknown"
assert validate_file_type(str(fake_executorch_path)) is False

fake_torch7 = tmp_path / "fake.t7"
fake_torch7.write_text("not torch7")
assert detect_file_format(str(fake_torch7)) == "unknown"
Expand Down Expand Up @@ -561,6 +577,18 @@ def test_validate_file_type(tmp_path):
mar.writestr("weights.bin", b"weights")
mar.writestr("handler.py", b"def handle(data, context):\n return data\n")
assert validate_file_type(str(mar_path)) is True

# ExecuTorch binaries require a valid FlatBuffers layout in addition to the file identifier.
executorch_path = tmp_path / "program.pte"
executorch_path.write_bytes(b"\x0c\x00\x00\x00ET13\x04\x00\x04\x00\x04\x00\x00\x00")
assert detect_file_format_from_magic(str(executorch_path)) == "executorch"
assert validate_file_type(str(executorch_path)) is True

invalid_executorch_path = tmp_path / "invalid-program.pte"
invalid_executorch_path.write_bytes(b"\x0c\x00\x00\x00ETAA\x04\x00\x04\x00\x04\x00\x00\x00")
assert detect_file_format_from_magic(str(invalid_executorch_path)) == "unknown"
assert validate_file_type(str(invalid_executorch_path)) is False

# Llamafile wrappers validate by extension with scanner-level marker checks.
llamafile_path = tmp_path / "model.llamafile"
llamafile_path.write_bytes(b"\x7fELF" + b"\x00" * 32 + b"llamafile")
Expand Down
Loading