From c311b2511935b06404a09feb14caa9edff35bf44 Mon Sep 17 00:00:00 2001 From: mldangelo Date: Fri, 13 Mar 2026 18:13:37 -0400 Subject: [PATCH 1/5] fix: stop suppressing mixed-case pickle modules --- CHANGELOG.md | 1 + modelaudit/scanners/pickle_scanner.py | 23 ++--- tests/scanners/test_pickle_scanner.py | 121 +++++++++++++++++++++++++- 3 files changed, 126 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 85b98db6..517a0702 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,6 +121,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- **security:** stop treating mixed-case valid pickle module names as implausible, so import and reduce checks no longer bypass on names like `PIL` or attacker-chosen `EvilPkg` - **security:** scan OCI layer members based on registered file extensions so embedded ONNX, Keras H5, and other real-path scanners are no longer skipped inside tar layers - **security:** resolve bare-module TorchServe handler references like `custom_handler` to concrete archive members so malicious handler source is no longer skipped by static analysis - **security:** compare archive entry paths against the intended extraction root without following base-directory symlinks diff --git a/modelaudit/scanners/pickle_scanner.py b/modelaudit/scanners/pickle_scanner.py index e0ee2b2e..3e8c69e6 100644 --- a/modelaudit/scanners/pickle_scanner.py +++ b/modelaudit/scanners/pickle_scanner.py @@ -1740,25 +1740,14 @@ def _is_plausible_python_module(name: str) -> bool: Legitimate module names follow Python identifier rules: - Each dotted segment is a valid Python identifier (letters, digits, underscores; cannot start with a digit). - - Conventionally all-lowercase, though private/internal modules may use - a leading underscore. - Names that contain uppercase letters, start with digits, or include - characters outside ``[a-z0-9_.]`` are almost certainly **not** real - modules -- they are more likely DataFrame column names, user labels, - or other data strings that ended up as pickle GLOBAL arguments - (e.g. ``PEDRA_2020``). - - The check is intentionally conservative: a handful of legitimate but - unusual module names (e.g. ``PIL``, ``Cython``) are covered by - ``ML_SAFE_GLOBALS`` and will pass the allowlist before this function - is ever consulted. + Keep obviously malformed names rejected so arbitrary data strings are less + likely to be treated as imports, while allowing valid mixed-case segments + such as ``PIL``. Returns: True if *name* plausibly refers to a real Python module. """ - import re - if not name: return False @@ -1766,14 +1755,12 @@ def _is_plausible_python_module(name: str) -> bool: if " " in name or "\t" in name: return False - # Split on dots; each segment must be a valid Python identifier that - # looks like a conventional module name (lowercase + digits + _). + # Split on dots; each segment must be a valid Python identifier. segments = name.split(".") if not segments or any(s == "" for s in segments): return False - _MODULE_SEGMENT_RE = re.compile(r"^[a-z_][a-z0-9_]*$") - return all(_MODULE_SEGMENT_RE.match(seg) for seg in segments) + return all(seg.isidentifier() for seg in segments) def _is_safe_ml_global(mod: str, func: str) -> bool: diff --git a/tests/scanners/test_pickle_scanner.py b/tests/scanners/test_pickle_scanner.py index e9fc309c..4640ae30 100644 --- a/tests/scanners/test_pickle_scanner.py +++ b/tests/scanners/test_pickle_scanner.py @@ -18,7 +18,7 @@ EXECUTABLE_SIGNATURES, ) from modelaudit.scanners.base import CheckStatus, IssueSeverity, ScanResult -from modelaudit.scanners.pickle_scanner import PickleScanner +from modelaudit.scanners.pickle_scanner import PickleScanner, _is_plausible_python_module from tests.assets.generators.generate_advanced_pickle_tests import ( generate_memo_based_attack, generate_multiple_pickle_attack, @@ -645,6 +645,43 @@ def _craft_global_reduce_pickle(module: str, func: str) -> bytes: call_ops = b"(" + b"t" + b"R" + b"." return proto + global_op + call_ops + @staticmethod + def _craft_stack_global_reduce_pickle(module: str, func: str) -> bytes: + """Craft protocol-4 payload with STACK_GLOBAL + REDUCE.""" + + def short_binunicode(value: str) -> bytes: + encoded = value.encode() + return b"\x8c" + bytes([len(encoded)]) + encoded + + return b"\x80\x04" + short_binunicode(module) + short_binunicode(func) + b"\x93(tR." + + @staticmethod + def _craft_memoized_stack_global_reduce_pickle(module: str, func: str) -> bytes: + """Craft protocol-4 payload that recalls a memoized STACK_GLOBAL before REDUCE.""" + + def short_binunicode(value: str) -> bytes: + encoded = value.encode() + return b"\x8c" + bytes([len(encoded)]) + encoded + + payload = bytearray(b"\x80\x04") + payload += short_binunicode(module) + payload += short_binunicode(func) + payload += b"\x93" # STACK_GLOBAL + payload += b"\x94" # MEMOIZE index 0 + payload += b"0" # POP + payload += b"h\x00" # BINGET 0 + payload += b"(" # MARK + payload += b"t" # TUPLE + payload += b"R" # REDUCE + payload += b"." + return bytes(payload) + + @staticmethod + def _craft_global_import_only_pickle(module: str, func: str) -> bytes: + """Craft minimal pickle that only imports a GLOBAL and stops.""" + + return b"\x80\x02" + b"c" + f"{module}\n{func}\n".encode() + b"." + def _scan_bytes(self, data: bytes) -> ScanResult: import os import tempfile @@ -659,6 +696,88 @@ def _scan_bytes(self, data: bytes) -> ScanResult: finally: os.unlink(path) + def test_plausible_module_allows_mixed_case_identifiers(self) -> None: + assert _is_plausible_python_module("EvilPkg") + assert _is_plausible_python_module("PIL") + assert _is_plausible_python_module("MyOrg.InternalPkg") + + def test_plausible_module_rejects_malformed_names(self) -> None: + assert not _is_plausible_python_module("foo bar") + assert not _is_plausible_python_module("foo..bar") + assert not _is_plausible_python_module("foo/bar") + assert not _is_plausible_python_module("!!!") + + def test_mixed_case_global_reduce_is_not_suppressed(self) -> None: + result = self._scan_bytes(self._craft_global_reduce_pickle("EvilPkg", "thing")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any(c.status == CheckStatus.FAILED and "EvilPkg.thing" in c.message for c in reduce_checks), ( + f"Expected failed REDUCE check for mixed-case module, got: {[c.message for c in reduce_checks]}" + ) + assert not any("implausible module name 'EvilPkg'" in c.message for c in reduce_checks), ( + "Mixed-case module names should not be classified as implausible" + ) + + def test_pil_global_reduce_is_not_suppressed(self) -> None: + """Legitimate mixed-case modules like PIL should no longer be treated as implausible.""" + result = self._scan_bytes(self._craft_global_reduce_pickle("PIL", "Image")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any(c.status == CheckStatus.FAILED and "PIL.Image" in c.message for c in reduce_checks), ( + f"Expected failed REDUCE check for PIL.Image, got: {[c.message for c in reduce_checks]}" + ) + assert not any("implausible module name 'PIL'" in c.message for c in reduce_checks), ( + "PIL should no longer be classified as an implausible module" + ) + + def test_mixed_case_stack_global_reduce_is_not_suppressed(self) -> None: + result = self._scan_bytes(self._craft_stack_global_reduce_pickle("EvilPkg", "thing")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any(c.status == CheckStatus.FAILED and "EvilPkg.thing" in c.message for c in reduce_checks), ( + "Expected failed REDUCE check for STACK_GLOBAL mixed-case module, " + f"got: {[c.message for c in reduce_checks]}" + ) + + def test_mixed_case_memoized_stack_global_reduce_is_not_suppressed(self) -> None: + result = self._scan_bytes(self._craft_memoized_stack_global_reduce_pickle("EvilPkg", "thing")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any(c.status == CheckStatus.FAILED and "EvilPkg.thing" in c.message for c in reduce_checks), ( + "Expected failed REDUCE check for memoized mixed-case STACK_GLOBAL, " + f"got: {[c.message for c in reduce_checks]}" + ) + + def test_mixed_case_import_only_payload_still_flags_import(self) -> None: + result = self._scan_bytes(self._craft_global_import_only_pickle("Builtins", "eval")) + + import_issues = [issue for issue in result.issues if "Suspicious reference Builtins.eval" in issue.message] + assert import_issues, ( + "Expected suspicious import-only detection for mixed-case dangerous global, " + f"got: {[i.message for i in result.issues]}" + ) + + def test_mixed_case_reduce_in_later_stream_is_not_suppressed(self) -> None: + import io + + buf = io.BytesIO() + pickle.dump({"safe": True}, buf, protocol=2) + buf.write(self._craft_global_reduce_pickle("EvilPkg", "thing")) + + result = self._scan_bytes(buf.getvalue()) + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any(c.status == CheckStatus.FAILED and "EvilPkg.thing" in c.message for c in reduce_checks), ( + f"Expected later-stream REDUCE check for mixed-case module, got: {[c.message for c in reduce_checks]}" + ) + + def test_malformed_module_reduce_stays_implausible(self) -> None: + result = self._scan_bytes(self._craft_global_reduce_pickle("foo..bar", "thing")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any( + c.status == CheckStatus.PASSED and "implausible module name 'foo..bar'" in c.message for c in reduce_checks + ), f"Expected malformed module to remain implausible, got: {[c.message for c in reduce_checks]}" + # ------------------------------------------------------------------ # Fix 1: pkgutil trampoline — must be CRITICAL # ------------------------------------------------------------------ From 22b88aa3650f305d8a7afce6e5279e6cc9af546c Mon Sep 17 00:00:00 2001 From: mldangelo Date: Sun, 15 Mar 2026 09:44:05 -0400 Subject: [PATCH 2/5] fix: restore pickle plausibility filtering Keep mixed-case module names like PIL analyzable without reintroducing uppercase data-label false positives such as PEDRA_2020. Also preserve streaming security exit codes by separating operational errors from security findings. --- modelaudit/core.py | 2 +- modelaudit/models.py | 2 +- modelaudit/scanners/pickle_scanner.py | 26 ++++++++------------- tests/scanners/test_pickle_scanner.py | 16 ++++++++++--- tests/test_cli.py | 19 +++++++++++++++ tests/test_streaming_scan.py | 33 ++++++++++++++++++++++++++- 6 files changed, 75 insertions(+), 23 deletions(-) diff --git a/modelaudit/core.py b/modelaudit/core.py index 32e892b2..2cba862d 100644 --- a/modelaudit/core.py +++ b/modelaudit/core.py @@ -1602,7 +1602,7 @@ def scan_model_streaming( scan_result_dict = { "bytes_scanned": scan_result.bytes_scanned, "files_scanned": 1, # Each scan_result represents one file - "has_errors": scan_result.has_errors, + "has_errors": not scan_result.success, "success": scan_result.success, "issues": [issue.__dict__ for issue in (scan_result.issues or [])], "checks": [check.__dict__ for check in (scan_result.checks or [])], diff --git a/modelaudit/models.py b/modelaudit/models.py index 0077629f..d64a095e 100644 --- a/modelaudit/models.py +++ b/modelaudit/models.py @@ -460,7 +460,7 @@ def aggregate_scan_result_direct(self, scan_result: Any) -> None: self.bytes_scanned += scan_result.bytes_scanned self.files_scanned += 1 # Each ScanResult represents one file scan - if scan_result.has_errors: + if not scan_result.success: self.has_errors = True # Update success status - only set to False for operational errors diff --git a/modelaudit/scanners/pickle_scanner.py b/modelaudit/scanners/pickle_scanner.py index 7572d50b..d3eb8788 100644 --- a/modelaudit/scanners/pickle_scanner.py +++ b/modelaudit/scanners/pickle_scanner.py @@ -1877,12 +1877,13 @@ def _is_plausible_python_module(name: str) -> bool: Check whether *name* looks like a real Python module/package path. Legitimate module names follow Python identifier rules: - - Each dotted segment is a valid Python identifier (letters, digits, - underscores; cannot start with a digit). + - Each dotted segment is an ASCII Python identifier. + - Segments normally contain lowercase characters, with a short explicit + allowlist for case-sensitive imports such as ``PIL``. Keep obviously malformed names rejected so arbitrary data strings are less - likely to be treated as imports, while allowing valid mixed-case segments - such as ``PIL``. + likely to be treated as imports, while still allowing valid mixed-case + segments such as ``EvilPkg`` and ``MyOrg.InternalPkg``. Returns: True if *name* plausibly refers to a real Python module. @@ -1894,12 +1895,12 @@ def _is_plausible_python_module(name: str) -> bool: if " " in name or "\t" in name: return False - # Split on dots; each segment must be a valid Python identifier. + # Split on dots; each segment must be an ASCII Python identifier. segments = name.split(".") - if not segments or any(s == "" for s in segments): + if not segments or any(s == "" or not s.isascii() or not s.isidentifier() for s in segments): return False - return all(seg.isidentifier() for seg in segments) + return all(any(char.islower() for char in seg) or seg in _CASE_SENSITIVE_IMPORT_SEGMENTS for seg in segments) _CASE_SENSITIVE_IMPORT_SEGMENTS = frozenset({"PIL", "Cython"}) @@ -1973,16 +1974,7 @@ def _is_resolved_import_target(mod: str, func: str) -> bool: def _is_plausible_import_only_module(mod: str) -> bool: """Return True when a module path looks importable without matching common data labels.""" - if not mod: - return False - - segments = mod.split(".") - if not segments or any(segment == "" or not segment.isidentifier() for segment in segments): - return False - - return all( - any(char.islower() for char in segment) or segment in _CASE_SENSITIVE_IMPORT_SEGMENTS for segment in segments - ) + return _is_plausible_python_module(mod) def _classify_import_reference( diff --git a/tests/scanners/test_pickle_scanner.py b/tests/scanners/test_pickle_scanner.py index 3b643b61..e5e40c7f 100644 --- a/tests/scanners/test_pickle_scanner.py +++ b/tests/scanners/test_pickle_scanner.py @@ -783,7 +783,7 @@ def _craft_memoized_stack_global_reduce_pickle(module: str, func: str) -> bytes: def _craft_global_import_only_pickle(module: str, func: str) -> bytes: """Craft minimal pickle that only imports a GLOBAL and stops.""" - return b"\x80\x02" + b"c" + f"{module}\n{func}\n".encode() + b"." + return TestPickleScannerBlocklistHardening._craft_global_only_pickle(module, func) @staticmethod def _craft_global_only_pickle(module: str, func: str) -> bytes: @@ -960,6 +960,7 @@ def test_plausible_module_rejects_malformed_names(self) -> None: assert not _is_plausible_python_module("foo..bar") assert not _is_plausible_python_module("foo/bar") assert not _is_plausible_python_module("!!!") + assert not _is_plausible_python_module("PEDRA_2020") def test_mixed_case_global_reduce_is_not_suppressed(self) -> None: result = self._scan_bytes(self._craft_global_reduce_pickle("EvilPkg", "thing")) @@ -977,8 +978,8 @@ def test_pil_global_reduce_is_not_suppressed(self) -> None: result = self._scan_bytes(self._craft_global_reduce_pickle("PIL", "Image")) reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] - assert any(c.status == CheckStatus.FAILED and "PIL.Image" in c.message for c in reduce_checks), ( - f"Expected failed REDUCE check for PIL.Image, got: {[c.message for c in reduce_checks]}" + assert any("PIL.Image" in c.message for c in reduce_checks), ( + f"Expected REDUCE analysis to resolve PIL.Image, got: {[c.message for c in reduce_checks]}" ) assert not any("implausible module name 'PIL'" in c.message for c in reduce_checks), ( "PIL should no longer be classified as an implausible module" @@ -1056,6 +1057,15 @@ def test_malformed_module_reduce_stays_implausible(self) -> None: c.status == CheckStatus.PASSED and "implausible module name 'foo..bar'" in c.message for c in reduce_checks ), f"Expected malformed module to remain implausible, got: {[c.message for c in reduce_checks]}" + def test_uppercase_data_label_reduce_stays_implausible(self) -> None: + result = self._scan_bytes(self._craft_global_reduce_pickle("PEDRA_2020", "thing")) + + reduce_checks = [c for c in result.checks if c.name == "REDUCE Opcode Safety Check"] + assert any( + c.status == CheckStatus.PASSED and "implausible module name 'PEDRA_2020'" in c.message + for c in reduce_checks + ), f"Expected uppercase data label to remain implausible, got: {[c.message for c in reduce_checks]}" + @staticmethod def _structural_tamper_checks(result: ScanResult) -> list: return [issue for issue in result.issues if issue.details.get("tamper_type") is not None] diff --git a/tests/test_cli.py b/tests/test_cli.py index 1136f89b..9ef8cc4a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,7 @@ import json import os import re +from pathlib import Path from unittest.mock import patch import pytest @@ -1419,6 +1420,24 @@ def __reduce__(self): ) +def test_exit_code_security_issues_streaming_local_directory(tmp_path): + """Streaming local scans should still exit 1 for real security findings.""" + sample_dir = Path(__file__).resolve().parent / "assets" / "samples" / "pickles" + safe_sample = sample_dir / "safe_data.pkl" + malicious_sample = sample_dir / "malicious_system_call.pkl" + + streamed_dir = tmp_path / "streamed" + streamed_dir.mkdir() + (streamed_dir / safe_sample.name).write_bytes(safe_sample.read_bytes()) + (streamed_dir / malicious_sample.name).write_bytes(malicious_sample.read_bytes()) + + runner = CliRunner() + result = runner.invoke(cli, ["scan", "--stream", str(streamed_dir)]) + + assert result.exit_code == 1, f"Expected exit code 1, got {result.exit_code}. Output: {result.output}" + assert not any(streamed_dir.iterdir()), "Streaming scan should delete files after scanning" + + def test_exit_code_scan_errors(tmp_path): """Test exit code 2 when errors occur during scanning.""" runner = CliRunner() diff --git a/tests/test_streaming_scan.py b/tests/test_streaming_scan.py index afc68a07..0e54cf4e 100644 --- a/tests/test_streaming_scan.py +++ b/tests/test_streaming_scan.py @@ -8,7 +8,7 @@ import pytest from modelaudit.core import scan_model_directory_or_file, scan_model_streaming -from modelaudit.scanners.base import ScanResult +from modelaudit.scanners.base import IssueSeverity, ScanResult from modelaudit.utils.helpers.secure_hasher import compute_aggregate_hash @@ -206,6 +206,37 @@ def file_generator(): assert result.files_scanned == 2 +def test_scan_model_streaming_critical_findings_do_not_set_operational_errors(temp_test_files): + """Security findings should preserve exit-code semantics in streaming mode.""" + + def file_generator(): + for i, file_path in enumerate(temp_test_files): + is_last = i == len(temp_test_files) - 1 + yield (file_path, is_last) + + finding = ScanResult(scanner_name="test_scanner") + finding.bytes_scanned = 128 + finding.add_check( + name="Dangerous Pickle Check", + passed=False, + message="Detected malicious payload", + severity=IssueSeverity.CRITICAL, + location=str(temp_test_files[0]), + ) + finding.finish(success=True) + + with patch("modelaudit.core.scan_file", return_value=finding): + result = scan_model_streaming( + file_generator=file_generator(), + timeout=30, + delete_after_scan=False, + ) + + assert result.success is True + assert result.has_errors is False + assert result.failed_checks >= 1 + + @pytest.mark.slow def test_scan_model_streaming_timeout(): """Test that timeout is respected in streaming mode.""" From 4dccb4acadebf82741b3383ce74366690b45541a Mon Sep 17 00:00:00 2001 From: mldangelo Date: Sun, 15 Mar 2026 11:56:47 -0400 Subject: [PATCH 3/5] test: fix merge fallout in streaming tests --- tests/test_cli.py | 1 + tests/test_streaming_scan.py | 6 +++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 7926b805..9cbf1ce4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1419,6 +1419,7 @@ def __reduce__(self): f"Expected 'error', 'warning', or 'critical' in output, but got: {result.output}" ) + def test_exit_code_security_issues_streaming_local_directory(tmp_path: Path) -> None: """Streaming local-directory scans should keep security findings as exit code 1.""" import pickle diff --git a/tests/test_streaming_scan.py b/tests/test_streaming_scan.py index dc9e45c8..8ed45963 100644 --- a/tests/test_streaming_scan.py +++ b/tests/test_streaming_scan.py @@ -119,7 +119,10 @@ def file_generator(): assert result.files_scanned == 3 assert result.content_hash is not None -def test_scan_model_streaming_critical_findings_do_not_set_operational_errors(temp_test_files) -> None: + +def test_scan_model_streaming_critical_findings_do_not_set_operational_errors( + temp_test_files: list[Path], +) -> None: """Security findings in streaming mode should still return the security exit code.""" def file_generator(): @@ -227,6 +230,7 @@ def file_generator(): # Should have scanned 2 files (1st and 3rd) assert result.files_scanned == 2 + @pytest.mark.slow def test_scan_model_streaming_timeout(): """Test that timeout is respected in streaming mode.""" From 729df0d3b69cfc4886cda4fa2db46c80a80913ec Mon Sep 17 00:00:00 2001 From: mldangelo Date: Mon, 16 Mar 2026 03:48:12 -0700 Subject: [PATCH 4/5] test: strengthen mixed-case pickle plausibility regressions --- modelaudit/scanners/pickle_scanner.py | 2 +- tests/scanners/test_pickle_scanner.py | 26 ++++++++++++++++++++++++++ tests/test_cli.py | 1 + tests/test_streaming_scan.py | 15 ++++++++++++++- 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/modelaudit/scanners/pickle_scanner.py b/modelaudit/scanners/pickle_scanner.py index c4da1dc9..343ee82b 100644 --- a/modelaudit/scanners/pickle_scanner.py +++ b/modelaudit/scanners/pickle_scanner.py @@ -1996,7 +1996,7 @@ def _is_plausible_python_module(name: str) -> bool: return all(any(char.islower() for char in seg) or seg in _CASE_SENSITIVE_IMPORT_SEGMENTS for seg in segments) -_CASE_SENSITIVE_IMPORT_SEGMENTS = frozenset({"PIL", "Cython"}) +_CASE_SENSITIVE_IMPORT_SEGMENTS: frozenset[str] = frozenset({"PIL", "Cython"}) IMPORT_ONLY_ALWAYS_DANGEROUS_GLOBALS = frozenset( { ("dill", "load"), diff --git a/tests/scanners/test_pickle_scanner.py b/tests/scanners/test_pickle_scanner.py index 689d6e27..3acada28 100644 --- a/tests/scanners/test_pickle_scanner.py +++ b/tests/scanners/test_pickle_scanner.py @@ -997,6 +997,9 @@ def test_mixed_case_stack_global_reduce_is_not_suppressed(self) -> None: "Expected failed REDUCE check for STACK_GLOBAL mixed-case module, " f"got: {[c.message for c in reduce_checks]}" ) + assert not any("implausible module name 'EvilPkg'" in c.message for c in reduce_checks), ( + "Mixed-case STACK_GLOBAL paths should not be suppressed as implausible" + ) def test_mixed_case_memoized_stack_global_reduce_is_not_suppressed(self) -> None: result = self._scan_bytes(self._craft_memoized_stack_global_reduce_pickle("EvilPkg", "thing")) @@ -1006,6 +1009,9 @@ def test_mixed_case_memoized_stack_global_reduce_is_not_suppressed(self) -> None "Expected failed REDUCE check for memoized mixed-case STACK_GLOBAL, " f"got: {[c.message for c in reduce_checks]}" ) + assert not any("implausible module name 'EvilPkg'" in c.message for c in reduce_checks), ( + "Memoized mixed-case STACK_GLOBAL paths should not be suppressed as implausible" + ) def test_mixed_case_import_only_payload_still_flags_import(self) -> None: result = self._scan_bytes(self._craft_global_import_only_pickle("Builtins", "eval")) @@ -1016,6 +1022,26 @@ def test_mixed_case_import_only_payload_still_flags_import(self) -> None: f"got: {[i.message for i in result.issues]}" ) + benign_result = self._scan_bytes(self._craft_global_import_only_pickle("EvilPkg", "thing")) + benign_checks = [ + check + for check in benign_result.checks + if check.name == "Global Module Reference Check" + and check.details.get("import_reference") == "EvilPkg.thing" + and check.details.get("import_only") is True + ] + assert benign_checks, f"Expected import-only analysis for EvilPkg.thing: {benign_result.checks}" + assert all(check.severity == IssueSeverity.WARNING for check in benign_checks), ( + f"Mixed-case unknown imports should not be escalated as dangerous: {benign_checks}" + ) + assert all(check.details.get("classification") == "unknown_third_party" for check in benign_checks), ( + f"Expected mixed-case benign counterpart to stay unknown_third_party: {benign_checks}" + ) + assert not any( + check.severity == IssueSeverity.CRITICAL and check.details.get("import_reference") == "EvilPkg.thing" + for check in benign_result.checks + ), f"Unexpected critical mixed-case import finding for EvilPkg.thing: {benign_result.checks}" + def test_mixed_case_unknown_import_only_is_flagged(self) -> None: """Mixed-case unknown import-only refs should now reach the import-only warning path.""" result = self._scan_bytes(self._craft_global_import_only_pickle("EvilPkg", "thing")) diff --git a/tests/test_cli.py b/tests/test_cli.py index 9cbf1ce4..524063dc 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1437,6 +1437,7 @@ def __reduce__(self): result = runner.invoke(cli, ["scan", "--stream", "--format", "text", str(tmp_path)]) assert result.exit_code == 1, f"Expected exit code 1, got {result.exit_code}. Output: {result.output}" + assert "posix.system" in result.output, f"Expected malicious finding in output, got: {result.output}" assert not evil_pickle_path.exists() diff --git a/tests/test_streaming_scan.py b/tests/test_streaming_scan.py index 8ed45963..a39eb782 100644 --- a/tests/test_streaming_scan.py +++ b/tests/test_streaming_scan.py @@ -128,8 +128,17 @@ def test_scan_model_streaming_critical_findings_do_not_set_operational_errors( def file_generator(): yield (temp_test_files[0], True) + finding = ScanResult(scanner_name="test_scanner") + finding.bytes_scanned = 1024 + finding.success = True + finding.add_issue( + "Detected malicious payload", + severity=IssueSeverity.CRITICAL, + location=str(temp_test_files[0]), + ) + with patch("modelaudit.core.scan_file") as mock_scan: - mock_scan.return_value = create_mock_scan_result(with_critical_issue=True) + mock_scan.return_value = finding result = scan_model_streaming( file_generator=file_generator(), @@ -139,7 +148,11 @@ def file_generator(): assert result.files_scanned == 1 assert len(result.issues) == 1 + assert result.issues[0].message == "Detected malicious payload" + assert result.issues[0].severity == IssueSeverity.CRITICAL + assert result.issues[0].location == str(temp_test_files[0]) assert result.has_errors is False + assert result.success is True assert determine_exit_code(result) == 1 From a7efd558f0e47e3a38f1d88496e57d836e4e1087 Mon Sep 17 00:00:00 2001 From: mldangelo Date: Mon, 16 Mar 2026 11:00:36 -0700 Subject: [PATCH 5/5] test: make streaming CLI pickle assertion platform-safe --- tests/test_cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index 524063dc..3430af52 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1425,6 +1425,7 @@ def test_exit_code_security_issues_streaming_local_directory(tmp_path: Path) -> import pickle evil_pickle_path = tmp_path / "malicious.pkl" + expected_global = f"{os.system.__module__}.system" class MaliciousClass: def __reduce__(self): @@ -1437,7 +1438,7 @@ def __reduce__(self): result = runner.invoke(cli, ["scan", "--stream", "--format", "text", str(tmp_path)]) assert result.exit_code == 1, f"Expected exit code 1, got {result.exit_code}. Output: {result.output}" - assert "posix.system" in result.output, f"Expected malicious finding in output, got: {result.output}" + assert expected_global in result.output, f"Expected malicious finding in output, got: {result.output}" assert not evil_pickle_path.exists()