diff --git a/.gitignore b/.gitignore index 950f24e6..63ac6f38 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ # Mutant files -e2e_projects/**/mutants +e2e_projects/**/mutants* /mutants tests/data/**/*.py.meta diff --git a/README.rst b/README.rst index c54962cb..4d2eaac6 100644 --- a/README.rst +++ b/README.rst @@ -53,8 +53,7 @@ it will try to figure out where the code to mutate is. You can stop the mutation run at any time and mutmut will restart where you -left off. It will continue where it left off, and re-test functions that were -modified since last run. +left off. To work with the results, use `mutmut browse` where you can see the mutants, retest them when you've updated your tests. @@ -209,6 +208,25 @@ to failing tests. debug=true +Disable setproctitle (macOS) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Mutmut uses ``setproctitle`` to show the current mutant name in the process +list, which is helpful for monitoring long runs. However, ``setproctitle`` +uses CoreFoundation APIs on macOS that are not fork-safe, causing segfaults +in child processes. + +By default, mutmut automatically disables ``setproctitle`` on macOS and +enables it on other platforms. If you need to override this (e.g. to enable it on +macOS at your own risk, or to disable it on other platforms), set ``use_setproctitle``: + +.. code-block:: toml + + # pyproject.toml + [tool.mutmut] + use_setproctitle = false + + Whitelisting ~~~~~~~~~~~~ @@ -226,6 +244,30 @@ whitelist lines are: to continue, but it's slower. +Skipping Entire Functions +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Similarly, you can skip an entire function from mutation using +``# pragma: no mutate function``: + +.. code-block:: python + + def complex_algorithm(): # pragma: no mutate function + # This function won't be mutated at all + return some_complex_calculation() + +Both syntax styles are supported: + +- ``# pragma: no mutate function`` +- ``# pragma: no mutate: function`` + +This is useful for functions that: + +- Have complex side effects that make mutation testing impractical +- Are performance-critical and you want to avoid trampoline overhead +- Are known to cause issues with the mutation testing framework + + Modifying pytest arguments ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/docker/Dockerfile.test b/docker/Dockerfile.test index dee6efe2..f215fbbe 100644 --- a/docker/Dockerfile.test +++ b/docker/Dockerfile.test @@ -1,14 +1,21 @@ -FROM python:3.10.19-slim-trixie AS base +ARG PYTHON_VERSION=3.10 + +FROM python:${PYTHON_VERSION}-slim-trixie AS base WORKDIR /mutmut COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ -ENV UV_PROJECT_ENVIRONMENT=/opt/venv +ARG PYTHON_VERSION +ENV UV_PROJECT_ENVIRONMENT=/opt/venv \ + UV_PYTHON_PREFERENCE=only-system \ + UV_PYTHON=${PYTHON_VERSION} -COPY . . +COPY pyproject.toml uv.lock ./ -RUN uv sync --group dev +RUN uv sync --group dev --no-install-project + +COPY . . ENTRYPOINT ["uv", "run", "pytest"] CMD ["--verbose"] diff --git a/e2e_projects/my_lib/src/my_lib/__init__.py b/e2e_projects/my_lib/src/my_lib/__init__.py index 97278b19..1a8132fa 100644 --- a/e2e_projects/my_lib/src/my_lib/__init__.py +++ b/e2e_projects/my_lib/src/my_lib/__init__.py @@ -1,10 +1,15 @@ from collections.abc import Callable +from enum import Enum from functools import cache -from typing import Union +from typing import AsyncGenerator, Union import ctypes import asyncio +def my_decorator(func): # pragma: no mutate: function + return func + + def hello() -> str: return "Hello from my-lib!" @@ -14,6 +19,13 @@ def badly_tested() -> str: def untested() -> str: return "Mutants for this method should survive" +def skip_this_function() -> int: # pragma: no mutate: function + return 1 + 2 * 3 + +def also_skip_this_function() -> str: # pragma: no mutate function + return "should" + " not" + " mutate" + + def make_greeter(name: Union[str, None]) -> Callable[[], str]: def hi(): if name: @@ -88,6 +100,30 @@ def from_coords(coords) -> 'Point': def coords(self): return self.x, self.y + @staticmethod + def skip_static_decorator_pragma(a: int, b: int) -> int: # pragma: no mutate: function + return a + b * 2 + + @classmethod + def skip_class_decorator_pragma(cls, value: int) -> "Point": # pragma: no mutate: function + return cls(value + 1, value * 2) + + def skip_instance_method_pragma(self) -> int: # pragma: no mutate: function + return self.x + self.y * 2 + + @staticmethod # pragma: no mutate: function + def pragma_on_staticmethod_decorator(a: int, b: int) -> int: + return a + b * 2 + + @classmethod # pragma: no mutate: function + def pragma_on_classmethod_decorator(cls, value: int) -> "Point": + return cls(value + 1, value * 2) + + @my_decorator + @classmethod + def skip_multi_decorator(cls, value: int) -> "Point": + return cls(value + 1, value * 2) + def escape_sequences(): return "foo" \ @@ -111,3 +147,59 @@ def func_with_star(a, /, b, *, c, **kwargs): def func_with_arbitrary_args_clone(*args, **kwargs): pass # pragma: no mutate def func_with_arbitrary_args(*args, **kwargs): return len(args) + len(kwargs) + + +class Color(Enum): + RED = 1 + GREEN = 2 + BLUE = 3 + + async def async_get(self) -> int: + await asyncio.sleep(0.001) + return self.value + + @staticmethod + async def async_get_all() -> AsyncGenerator[Color, None]: + """return type hint here is "wrong" (it's technically AsyncGenerator[int, None]) + but using Color in this context allows us to have a forward reference to the Color class + that doesn't require quoting the class name (eg. "Color") in the type hint + that we otherwise would not be able to have in py3.10, and allows us to test that + trampoline templates are resilient to forward references when using the external trampoline + pattern. + """ + for i in (Color.RED, Color.GREEN, Color.BLUE): + await asyncio.sleep(0.001) + yield i + + def is_primary(self) -> bool: + return self in (Color.RED, Color.GREEN, Color.BLUE) + + def darken(self) -> int: + return self.value - 1 + + @staticmethod + def from_name(name: str) -> "Color": + return Color[name.upper()] + + @classmethod + def default(cls) -> "Color": + return cls.RED + + +class SkipThisClass: # pragma: no mutate: class + def method_one(self) -> int: + return 1 + 2 + + def method_two(self) -> str: + return "hello" + " world" + + @staticmethod + def static_method() -> int: + return 3 * 4 + + +class AlsoSkipThisClass: # pragma: no mutate class + VALUE = 10 + 20 + + def compute(self) -> int: + return self.VALUE * 2 diff --git a/e2e_projects/my_lib/tests/test_my_lib.py b/e2e_projects/my_lib/tests/test_my_lib.py index 3febc7af..3e757f72 100644 --- a/e2e_projects/my_lib/tests/test_my_lib.py +++ b/e2e_projects/my_lib/tests/test_my_lib.py @@ -31,6 +31,28 @@ def test_point(): def test_point_from_coords(): assert Point.from_coords((1, 2)).x == 1 + +def test_point_skip_static_decorator_pragma(): + assert Point.skip_static_decorator_pragma(3, 4) == 11 + + +def test_point_skip_class_decorator_pragma(): + p = Point.skip_class_decorator_pragma(5) + assert p.x == 6 + assert p.y == 10 + + +def test_point_skip_instance_method_pragma(): + p = Point(3, 4) + assert p.skip_instance_method_pragma() == 11 + + +def test_point_skip_multi_decorator(): + p = Point.skip_multi_decorator(5) + assert p.x == 6 + assert p.y == 10 + + def test_fibonacci(): assert fibonacci(1) == 1 assert cached_fibonacci(1) == 1 @@ -66,3 +88,73 @@ def test_signature_functions_are_callable(): def test_signature_is_coroutine(): assert asyncio.iscoroutinefunction(async_consumer) + + +# Tests for enum mutation +def test_color_enum_values(): + assert Color.RED.value == 1 + assert Color.GREEN.value == 2 + assert Color.BLUE.value == 3 + + +def test_color_is_primary(): + assert Color.RED.is_primary() is True + + +def test_color_darken(): + assert Color.GREEN.darken() > 0 + + +def test_color_from_name(): + assert Color.from_name("red") == Color.RED + assert Color.from_name("BLUE") == Color.BLUE + + +def test_color_default(): + assert Color.default() == Color.RED + + +def test_skip_this_function(): + assert skip_this_function() == 7 + + +def test_also_skip_this_function(): + assert also_skip_this_function() == "should not mutate" + + +def test_skip_this_class(): + obj = SkipThisClass() + assert obj.method_one() == 3 + assert obj.method_two() == "hello world" + assert SkipThisClass.static_method() == 12 + + +def test_also_skip_this_class(): + obj = AlsoSkipThisClass() + assert obj.VALUE == 30 + assert obj.compute() == 60 + + +def test_pragma_on_staticmethod_decorator(): + assert Point.pragma_on_staticmethod_decorator(3, 4) == 11 + + +def test_pragma_on_classmethod_decorator(): + p = Point.pragma_on_classmethod_decorator(5) + assert p.x == 6 + assert p.y == 10 + + +@pytest.mark.asyncio +async def test_color_async_get(): + assert await Color.RED.async_get() == 1 + assert await Color.GREEN.async_get() == 2 + assert await Color.BLUE.async_get() == 3 + + +@pytest.mark.asyncio +async def test_color_async_get_all(): + results = [] + async for color in Color.async_get_all(): + results.append(color) + assert results == [Color.RED, Color.GREEN, Color.BLUE] diff --git a/scripts/run_tests.sh b/scripts/run_tests.sh index 1bc2086e..e6cade67 100755 --- a/scripts/run_tests.sh +++ b/scripts/run_tests.sh @@ -1,5 +1,70 @@ #!/bin/bash set -e cd "$(dirname "$0")/.." -docker build -t mutmut -f ./docker/Dockerfile.test . -docker run --rm -t -v "$(pwd)":/mutmut mutmut "$@" + +usage() { + echo "Usage: $0 [--py 3.10,3.12,3.14] [--ff] [-- pytest args...]" + echo " --py Comma-separated Python versions to test." + echo " Default: 3.10" + echo " --ff Stop on first failure instead of running all versions." + echo "" + echo " Everything after '--' is forwarded to pytest." + exit 1 +} + +PY_VERSIONS="3.10" +FAIL_FAST=false + +while [[ $# -gt 0 ]]; do + case "$1" in + --py) + PY_VERSIONS="$2" + shift 2 + ;; + --ff) + FAIL_FAST=true + shift + ;; + -h|--help) + usage + ;; + --) + shift + break + ;; + *) + break + ;; + esac +done + +IFS=',' read -r -a VERSIONS <<< "$PY_VERSIONS" +RESULTS=() +EXIT_CODE=0 + +print_results() { + echo "" + echo "=== Results ===" + for RESULT in "${RESULTS[@]}"; do + echo " $RESULT" + done +} + +for VER in "${VERSIONS[@]}"; do + IMAGE_NAME="mutmut-test-${VER}" + docker build -t "$IMAGE_NAME" --build-arg "PYTHON_VERSION=$VER" -f ./docker/Dockerfile.test . + if docker run --rm -t -v "$(pwd)":/mutmut "$IMAGE_NAME" "$@"; then + RESULTS+=("Python $VER: PASSED") + else + EXIT_CODE=1 + RESULTS+=("Python $VER: FAILED") + if [[ "$FAIL_FAST" == true ]]; then + print_results + exit 1 + fi + fi +done + +print_results + +exit $EXIT_CODE diff --git a/src/mutmut/__init__.py b/src/mutmut/__init__.py index f77effc2..d50ba1b6 100644 --- a/src/mutmut/__init__.py +++ b/src/mutmut/__init__.py @@ -1,31 +1,42 @@ from __future__ import annotations import importlib.metadata +import warnings from collections import defaultdict -from typing import TYPE_CHECKING -if TYPE_CHECKING: - from mutmut.__main__ import Config +from mutmut.configuration import Config __version__ = importlib.metadata.version("mutmut") -duration_by_test: defaultdict[str, float] = defaultdict(float) stats_time: float | None = None -config: Config | None = None +duration_by_test: dict[str, float] = defaultdict(float) +tests_by_mangled_function_name: dict[str, set[str]] = defaultdict(set) _stats: set[str] = set() -tests_by_mangled_function_name: defaultdict[str, set[str]] = defaultdict(set) _covered_lines: dict[str, set[int]] | None = None +def __getattr__(name: str) -> object: + match name: + case "config": + warnings.warn( + "mutmut.config is deprecated as of 3.4.1, use mutmut.configuration.Config.get() instead", + FutureWarning, + stacklevel=2, + ) + return Config.get() + case _: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + + def _reset_globals() -> None: - global duration_by_test, stats_time, config, _stats, tests_by_mangled_function_name + global duration_by_test, stats_time, _stats, tests_by_mangled_function_name global _covered_lines duration_by_test.clear() stats_time = None - config = None + Config.reset() _stats = set() tests_by_mangled_function_name = defaultdict(set) _covered_lines = None diff --git a/src/mutmut/__main__.py b/src/mutmut/__main__.py index 4076ef90..50fb0a59 100644 --- a/src/mutmut/__main__.py +++ b/src/mutmut/__main__.py @@ -8,8 +8,9 @@ from typing import TYPE_CHECKING from typing import Any -from mutmut.type_checking import TypeCheckingError -from mutmut.type_checking import run_type_checker +from mutmut.utils.file_utils import change_cwd +from mutmut.utils.format_utils import get_mutant_name +from mutmut.utils.format_utils import strip_prefix if platform.system() == "Windows": print( @@ -24,15 +25,12 @@ import json import resource import shutil -import signal import subprocess import warnings from abc import ABC from collections import defaultdict -from configparser import ConfigParser -from configparser import NoOptionError -from configparser import NoSectionError -from contextlib import contextmanager +from collections.abc import Callable +from collections.abc import Sequence from dataclasses import dataclass from dataclasses import field from datetime import datetime @@ -41,7 +39,6 @@ from io import TextIOBase from json import JSONDecodeError from math import ceil -from multiprocessing import Lock from multiprocessing import Pool from multiprocessing import set_start_method from os import makedirs @@ -49,21 +46,24 @@ from os.path import isdir from os.path import isfile from pathlib import Path -from signal import SIGTERM from threading import Thread from time import process_time -from time import sleep +from types import TracebackType import click import libcst as cst from rich.text import Text -from setproctitle import setproctitle import mutmut from mutmut.code_coverage import gather_coverage from mutmut.code_coverage import get_covered_lines_for_file -from mutmut.file_mutation import mutate_file_contents -from mutmut.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.configuration import Config +from mutmut.mutation.data import SourceFileMutationData +from mutmut.mutation.file_mutation import filter_mutants_with_type_checker +from mutmut.mutation.file_mutation import mutate_file_contents +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.threading.timeout import register_timeout +from mutmut.utils.safe_setproctitle import safe_setproctitle as setproctitle if TYPE_CHECKING: from coverage import Coverage @@ -113,37 +113,11 @@ exit_code_to_emoji = {exit_code: emoji_by_status[status] for exit_code, status in status_by_exit_code.items()} -def guess_paths_to_mutate() -> list[Path]: - """Guess the path to source code to mutate""" - this_dir = os.getcwd().split(os.sep)[-1] - if isdir("lib"): - return [Path("lib")] - elif isdir("src"): - return [Path("src")] - elif isdir(this_dir): - return [Path(this_dir)] - elif isdir(this_dir.replace("-", "_")): - return [Path(this_dir.replace("-", "_"))] - elif isdir(this_dir.replace(" ", "_")): - return [Path(this_dir.replace(" ", "_"))] - elif isdir(this_dir.replace("-", "")): - return [Path(this_dir.replace("-", ""))] - elif isdir(this_dir.replace(" ", "")): - return [Path(this_dir.replace(" ", ""))] - if isfile(this_dir + ".py"): - return [Path(this_dir + ".py")] - raise FileNotFoundError( - "Could not figure out where the code to mutate is. " - 'Please specify it by adding "paths_to_mutate=code_dir" in setup.cfg to the [mutmut] section.' - ) - - def record_trampoline_hit(name: str) -> None: assert not name.startswith("src."), "Failed trampoline hit. Module name starts with `src.`, which is invalid" - assert mutmut.config is not None - if mutmut.config.max_stack_depth != -1: + if Config.get().max_stack_depth != -1: f = inspect.currentframe() - c = mutmut.config.max_stack_depth + c = Config.get().max_stack_depth while c and f: filename = f.f_code.co_filename if "pytest" in filename or "hammett" in filename or "unittest" in filename: @@ -157,9 +131,8 @@ def record_trampoline_hit(name: str) -> None: mutmut._stats.add(name) -def walk_all_files() -> Iterable[tuple[str, str]]: - assert mutmut.config is not None - for path in mutmut.config.paths_to_mutate: +def walk_all_files() -> Iterator[tuple[str, str]]: + for path in Config.get().paths_to_mutate: if not isdir(path): if isfile(path): yield "", str(path) @@ -169,7 +142,7 @@ def walk_all_files() -> Iterable[tuple[str, str]]: yield root, filename -def walk_source_files() -> Iterable[Path]: +def walk_source_files() -> Iterator[Path]: for root, filename in walk_all_files(): if filename.endswith(".py"): yield Path(root) / filename @@ -253,8 +226,8 @@ def create_file_mutants(path: Path) -> FileMutationResult: print(path) output_path = Path("mutants") / path makedirs(output_path.parent, exist_ok=True) - assert mutmut.config is not None - if mutmut.config.should_ignore_for_mutation(path): + + if Config.get().should_ignore_for_mutation(path): shutil.copy(path, output_path) return FileMutationResult(ignored=True) else: @@ -279,15 +252,13 @@ def setup_source_paths() -> None: def store_lines_covered_by_tests() -> None: - assert mutmut.config is not None - if mutmut.config.mutate_only_covered_lines: + if Config.get().mutate_only_covered_lines: mutmut._covered_lines = gather_coverage(PytestRunner(), list(walk_source_files())) def copy_also_copy_files() -> None: - assert mutmut.config is not None - assert isinstance(mutmut.config.also_copy, list) - for path in mutmut.config.also_copy: + assert isinstance(Config.get().also_copy, list) + for path in Config.get().also_copy: print(" also copying", path) path = Path(path) destination = Path("mutants") / path @@ -351,17 +322,7 @@ def create_mutants_for_file(filename: Path, output_path: Path) -> FileMutationRe return FileMutationResult(warnings=warnings) -def get_mutant_name(relative_source_path: Path, mutant_method_name: str) -> str: - module_name = str(relative_source_path)[: -len(relative_source_path.suffix)].replace(os.sep, ".") - module_name = strip_prefix(module_name, prefix="src.") - - # FYI, we currently use "mutant_name" inconsistently, for both the whole identifier including the path and only the mangled method name - mutant_name = f"{module_name}.{mutant_method_name}" - mutant_name = mutant_name.replace(".__init__.", ".") - return mutant_name - - -def write_all_mutants_to_file(*, out: Any, source: str, filename: str | Path) -> Any: +def write_all_mutants_to_file(*, out: TextIOBase, source: str, filename: Path) -> Sequence[str]: result, mutant_names = mutate_file_contents( str(filename), source, get_covered_lines_for_file(str(filename), mutmut._covered_lines) ) @@ -370,167 +331,10 @@ def write_all_mutants_to_file(*, out: Any, source: str, filename: str | Path) -> return mutant_names -class SourceFileMutationData: - def __init__(self, *, path: Path) -> None: - self.estimated_time_of_tests_by_mutant: dict[str, float] = {} - self.path = path - self.meta_path = Path("mutants") / (str(path) + ".meta") - self.key_by_pid: dict[int, str] = {} - self.exit_code_by_key: dict[str, int | None] = {} - self.durations_by_key: dict[str, float] = {} - self.type_check_error_by_key: dict[str, str] = {} - self.start_time_by_pid: dict[int, datetime] = {} - - def load(self) -> None: - try: - with open(self.meta_path) as f: - meta = json.load(f) - except FileNotFoundError: - return - - self.exit_code_by_key = meta.pop("exit_code_by_key") - self.durations_by_key = meta.pop("durations_by_key") - self.estimated_time_of_tests_by_mutant = meta.pop("estimated_durations_by_key") - self.type_check_error_by_key = meta.pop("type_check_error_by_key") - assert not meta, f"Meta file {self.meta_path} contains unexpected keys: {set(meta.keys())}" - - def register_pid(self, *, pid: int, key: str) -> None: - self.key_by_pid[pid] = key - with START_TIMES_BY_PID_LOCK: - self.start_time_by_pid[pid] = datetime.now() - - def register_result(self, *, pid: int, exit_code: int) -> None: - assert self.key_by_pid[pid] in self.exit_code_by_key - key = self.key_by_pid[pid] - self.exit_code_by_key[key] = exit_code - self.durations_by_key[key] = (datetime.now() - self.start_time_by_pid[pid]).total_seconds() - # TODO: maybe rate limit this? Saving on each result can slow down mutation testing a lot if the test run is fast. - del self.key_by_pid[pid] - with START_TIMES_BY_PID_LOCK: - del self.start_time_by_pid[pid] - self.save() - - def stop_children(self) -> None: - for pid in self.key_by_pid.keys(): - os.kill(pid, SIGTERM) - - def save(self) -> None: - with open(self.meta_path, "w") as f: - json.dump( - dict( - exit_code_by_key=self.exit_code_by_key, - durations_by_key=self.durations_by_key, - type_check_error_by_key=self.type_check_error_by_key, - estimated_durations_by_key=self.estimated_time_of_tests_by_mutant, - ), - f, - indent=4, - ) - - -def filter_mutants_with_type_checker() -> dict[str, FailedTypeCheckMutant]: - assert mutmut.config is not None - with change_cwd(Path("mutants")): - errors = run_type_checker(mutmut.config.type_check_command) - errors_by_path = group_by_path(errors) - - mutants_to_skip: dict[str, FailedTypeCheckMutant] = {} - - for path, errors_of_file in errors_by_path.items(): - with open(path, encoding="utf-8") as file: - source = file.read() - wrapper = cst.MetadataWrapper(cst.parse_module(source)) - visitor = MutatedMethodsCollector(path) - wrapper.visit(visitor) - mutated_methods = visitor.found_mutants - - for error in errors_of_file: - assert error.file_path == visitor.file - mutant = next( - (m for m in mutated_methods if m.line_number_start <= error.line_number <= m.line_number_end), None - ) - if mutant is None: - raise Exception( - f"Could not find mutant for type error {error.file_path}:{error.line_number} ({error.error_description}). " - "Probably, a code mutation influenced types in unexpected locations. " - "If your project normally has no type errors and uses mypy/pyrefly, please file an issue with steps to reproduce on github." - ) - - mutant_name = get_mutant_name(path.relative_to(Path(".").absolute()), mutant.function_name) - - mutants_to_skip[mutant_name] = FailedTypeCheckMutant( - method_location=mutant, - name=mutant_name, - error=error, - ) - - return mutants_to_skip - - -def group_by_path(errors: list[TypeCheckingError]) -> dict[Path, list[TypeCheckingError]]: - grouped: dict[Path, list[TypeCheckingError]] = defaultdict(list) - - for error in errors: - grouped[error.file_path].append(error) - - return grouped - - -@dataclass -class MutatedMethodLocation: - file: Path - function_name: str - line_number_start: int - line_number_end: int - - -@dataclass -class FailedTypeCheckMutant: - method_location: MutatedMethodLocation - name: str - error: TypeCheckingError - - -class MutatedMethodsCollector(cst.CSTVisitor): - METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,) - - def __init__(self, file: Path): - self.file = file - self.found_mutants: list[MutatedMethodLocation] = [] - - def visit_FunctionDef(self, node: cst.FunctionDef) -> bool: - name = node.name.value - if is_mutated_method_name(name): - range = self.get_metadata(cst.metadata.PositionProvider, node) - self.found_mutants.append( - MutatedMethodLocation( - file=self.file, - function_name=name, - line_number_start=range.start.line, - line_number_end=range.end.line, - ) - ) - - # do not continue visting children of this function - # mutated methods are not nested within other methods - return False - - -def is_mutated_method_name(name: str) -> bool: - return name.startswith(("x_", "xǁ")) and "__mutmut" in name - - -def unused(*_: Any) -> None: +def unused(*_: object) -> None: pass -def strip_prefix(s: str, *, prefix: str, strict: bool = False) -> str: - if s.startswith(prefix): - return s[len(prefix) :] - assert strict is False, f"String '{s}' does not start with prefix '{prefix}'" - return s - - class TestRunner(ABC): def run_stats(self, *, tests: Iterable[str]) -> int: raise NotImplementedError() @@ -551,16 +355,6 @@ def list_all_tests(self) -> ListAllTestsResult: raise NotImplementedError() -@contextmanager -def change_cwd(path: str | Path) -> Iterator[None]: - old_cwd = os.path.abspath(os.getcwd()) - os.chdir(path) - try: - yield - finally: - os.chdir(old_cwd) - - def collected_test_names() -> set[str]: return set(mutmut.duration_by_test.keys()) @@ -590,25 +384,23 @@ def new_tests(self) -> set[str]: class PytestRunner(TestRunner): def __init__(self) -> None: - assert mutmut.config is not None - self._pytest_add_cli_args: list[str] = mutmut.config.pytest_add_cli_args - self._pytest_add_cli_args_test_selection: list[str] = mutmut.config.pytest_add_cli_args_test_selection + self._pytest_add_cli_args: list[str] = Config.get().pytest_add_cli_args + self._pytest_add_cli_args_test_selection: list[str] = Config.get().pytest_add_cli_args_test_selection # tests_dir is a special case of a test selection option, # so also use pytest_add_cli_args_test_selection for the implementation - self._pytest_add_cli_args_test_selection += mutmut.config.tests_dir + self._pytest_add_cli_args_test_selection += Config.get().tests_dir # noinspection PyMethodMayBeStatic def execute_pytest(self, params: list[str], **kwargs: Any) -> int: import pytest params = ["--rootdir=.", "--tb=native"] + params + self._pytest_add_cli_args - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: params = ["-vv"] + params print("python -m pytest ", " ".join([f'"{param}"' for param in params])) exit_code = int(pytest.main(params, **kwargs)) - if mutmut.config.debug: + if Config.get().debug: print(" exit code", exit_code) if exit_code == 4: raise BadTestExecutionCommandsException(params) @@ -670,8 +462,7 @@ def pytest_deselected(self, items: Any) -> None: collector = TestsCollector() - assert mutmut.config is not None - tests_dir = mutmut.config.tests_dir # noqa: F841 + tests_dir = Config.get().tests_dir pytest_args = ["-x", "-q", "--collect-only"] + self._pytest_add_cli_args_test_selection with change_cwd("mutants"): @@ -754,7 +545,7 @@ def orig_function_and_class_names_from_key(mutant_name: str) -> tuple[str, str | spinner = itertools.cycle("⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏") -def status_printer() -> Any: +def status_printer() -> Callable[..., None]: """Manage the printing and in-place updating of a line of characters .. note:: @@ -850,10 +641,14 @@ def run_forced_fail_test(runner: TestRunner) -> None: class CatchOutput: - def __init__(self, callback: Any = lambda s: None, spinner_title: str | None = None) -> None: + def __init__( + self, + callback: Callable[[str], None] = lambda s: None, + spinner_title: str | None = None, + ) -> None: self.strings: list[str] = [] self.spinner_title = spinner_title or "" - if mutmut.config is not None and mutmut.config.debug: + if Config.get().debug: self.spinner_title += "\n" class StdOutRedirect(TextIOBase): @@ -879,8 +674,7 @@ def start(self) -> None: print_status(self.spinner_title) sys.stdout = self.redirect sys.stderr = self.redirect - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: self.stop() def dump_output(self) -> None: @@ -893,113 +687,17 @@ def __enter__(self) -> CatchOutput: self.start() return self - def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: self.stop() if self.spinner_title: print() -@dataclass -class Config: - also_copy: list[Path] - do_not_mutate: list[str] - max_stack_depth: int - debug: bool - paths_to_mutate: list[Path] - pytest_add_cli_args: list[str] - pytest_add_cli_args_test_selection: list[str] - tests_dir: list[str] - mutate_only_covered_lines: bool - type_check_command: list[str] - - def should_ignore_for_mutation(self, path: Path | str) -> bool: - if not str(path).endswith(".py"): - return True - for p in self.do_not_mutate: - if fnmatch.fnmatch(str(path), p): - return True - return False - - -def config_reader() -> Any: - path = Path("pyproject.toml") - if path.exists(): - if sys.version_info >= (3, 11): - from tomllib import loads - else: - # noinspection PyPackageRequirements - from toml import loads - data = loads(path.read_text("utf-8")) - - try: - config = data["tool"]["mutmut"] - except KeyError: - pass - else: - - def _toml_reader(key: str, default: Any) -> Any: - try: - result = config[key] - except KeyError: - return default - return result - - return _toml_reader - - config_parser = ConfigParser() - config_parser.read("setup.cfg") - - def _cfg_reader(key: str, default: Any) -> Any: - try: - result: Any = config_parser.get("mutmut", key) - except (NoOptionError, NoSectionError): - return default - if isinstance(default, list): - if "\n" in result: - result = [x for x in result.split("\n") if x] - else: - result = [result] - elif isinstance(default, bool): - result = result.lower() in ("1", "t", "true") - elif isinstance(default, int): - result = int(result) - return result - - return _cfg_reader - - -def ensure_config_loaded() -> None: - if mutmut.config is None: - mutmut.config = load_config() - - -def load_config() -> Config: - s = config_reader() - - return Config( - do_not_mutate=s("do_not_mutate", []), - also_copy=[Path(y) for y in s("also_copy", [])] - + [ - Path("tests/"), - Path("test/"), - Path("setup.cfg"), - Path("pyproject.toml"), - Path("pytest.ini"), - Path(".coveragerc"), - Path(".gitignore"), - ] - + list(Path(".").glob("test*.py")), - max_stack_depth=s("max_stack_depth", -1), - debug=s("debug", False), - mutate_only_covered_lines=s("mutate_only_covered_lines", False), - paths_to_mutate=[Path(y) for y in s("paths_to_mutate", [])] or guess_paths_to_mutate(), - tests_dir=s("tests_dir", []), - pytest_add_cli_args=s("pytest_add_cli_args", []), - pytest_add_cli_args_test_selection=s("pytest_add_cli_args_test_selection", []), - type_check_command=s("type_check_command", []), - ) - - @click.group() @click.version_option() def cli() -> None: @@ -1026,8 +724,7 @@ def run_stats_collection(runner: TestRunner, tests: Iterable[str] | None = None) print( "Stopping early, because we could not find any test case for any mutant. It seems that the selected tests do not cover any code that we mutated." ) - assert mutmut.config is not None - if not mutmut.config.debug: + if not Config.get().debug: print("You can set debug=true to see the executed test names in the output above.") else: print("In the last pytest run above, you can see which tests we executed.") @@ -1126,13 +823,12 @@ def save_cicd_stats(source_file_mutation_data_by_path: dict[str, SourceFileMutat # exports CI/CD stats to block pull requests from merging if mutation score is too low, or used in other ways in CI/CD pipelines @cli.command() def export_cicd_stats() -> None: - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() source_file_mutation_data_by_path: dict[str, SourceFileMutationData] = {} for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue meta_path = Path("mutants") / (str(path) + ".meta") @@ -1156,12 +852,14 @@ def export_cicd_stats() -> None: def collect_source_file_mutation_data( *, mutant_names: tuple[str, ...] | list[str] -) -> tuple[list[tuple[SourceFileMutationData, str, int | None]], dict[str, SourceFileMutationData]]: - assert mutmut.config is not None +) -> tuple[ + list[tuple[SourceFileMutationData, str, int | None]], + dict[str, SourceFileMutationData], +]: source_file_mutation_data_by_path: dict[str, SourceFileMutationData] = {} for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue assert str(path) not in source_file_mutation_data_by_path m = SourceFileMutationData(path=path) @@ -1194,7 +892,7 @@ def estimated_worst_case_time(mutant_name: str) -> float: @click.argument("mutant_names", required=False, nargs=-1) def print_time_estimates(mutant_names: tuple[str, ...]) -> None: assert isinstance(mutant_names, (tuple, list)), mutant_names - ensure_config_loaded() + Config.ensure_loaded() runner = PytestRunner() runner.prepare_main_test_run() @@ -1214,12 +912,12 @@ def print_time_estimates(mutant_names: tuple[str, ...]) -> None: @cli.command() @click.argument("mutant_name", required=True, nargs=1) -def tests_for_mutant(mutant_name: str) -> None: +def tests_for_mutant(mutant_name: tuple[str, ...]) -> None: if not load_stats(): print("Failed to load stats. Please run mutmut first to collect stats.") exit(1) - tests = tests_for_mutant_names([mutant_name]) + tests = tests_for_mutant_names(mutant_name) for test in sorted(tests): print(test) @@ -1231,34 +929,12 @@ def stop_all_children(mutants: list[tuple[SourceFileMutationData, str, int | Non # used to copy the global mutmut.config to subprocesses set_start_method("fork") -START_TIMES_BY_PID_LOCK = Lock() - - -def timeout_checker(mutants: list[tuple[SourceFileMutationData, str, int | None]]) -> Any: - def inner_timeout_checker() -> None: - while True: - sleep(1) - - now = datetime.now() - for m, mutant_name, result in mutants: - # copy dict inside lock, so it is not modified by another process while we iterate it - with START_TIMES_BY_PID_LOCK: - start_times_by_pid = dict(m.start_time_by_pid) - for pid, start_time in start_times_by_pid.items(): - run_time = now - start_time - if run_time.total_seconds() > (m.estimated_time_of_tests_by_mutant[mutant_name] + 1) * 15: - try: - os.kill(pid, signal.SIGXCPU) - except ProcessLookupError: - pass - - return inner_timeout_checker @cli.command() @click.option("--max-children", type=int) @click.argument("mutant_names", required=False, nargs=-1) -def run(mutant_names: tuple[str, ...], *, max_children: int | None) -> None: +def run(mutant_names: tuple[str, ...] | list[str], *, max_children: int | None) -> None: assert isinstance(mutant_names, (tuple, list)), mutant_names _run(mutant_names, max_children) @@ -1268,8 +944,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> # TODO: run no-ops once in a while to detect if we get false negatives # TODO: we should be able to get information on which tests killed mutants, which means we can get a list of tests and how many mutants each test kills. Those that kill zero mutants are redundant! os.environ["MUTANT_UNDER_TEST"] = "mutant_generation" - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() if max_children is None: max_children = os.cpu_count() or 4 @@ -1288,7 +963,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> f" done in {round(time.total_seconds() * 1000)}ms ({stats.mutated} files mutated, {stats.ignored} ignored, {stats.unmodified} unmodified)", ) - if mutmut.config.type_check_command: + if Config.get().type_check_command: with CatchOutput(spinner_title="Filtering mutations with type checker"): mutants_caught_by_type_checker = filter_mutants_with_type_checker() else: @@ -1324,8 +999,7 @@ def _run(mutant_names: tuple[str, ...] | list[str], max_children: int | None) -> def read_one_child_exit_status() -> None: pid, wait_status = os.wait() exit_code = os.waitstatus_to_exitcode(wait_status) - assert mutmut.config is not None - if mutmut.config.debug: + if Config.get().debug: print(" worker exit code", exit_code) source_file_mutation_data_by_pid[pid].register_result(pid=pid, exit_code=exit_code) @@ -1335,48 +1009,37 @@ def read_one_child_exit_status() -> None: # Run estimated fast mutants first, calculated as the estimated time for a surviving mutant. mutants = sorted(mutants, key=lambda x: estimated_worst_case_time(x[1])) - - gc.freeze() - start = datetime.now() try: + gc.freeze() print("Running mutation testing") - # Calculate times of tests - for m, mutant_name, result in mutants: + # Now do mutation + for mutation_data, mutant_name, result in mutants: mutant_name = mutant_name.replace("__init__.", "") tests = mutmut.tests_by_mangled_function_name.get(mangled_name_from_mutant_name(mutant_name), set()) estimated_time_of_tests = sum(mutmut.duration_by_test[test_name] for test_name in tests) - m.estimated_time_of_tests_by_mutant[mutant_name] = estimated_time_of_tests - - Thread(target=timeout_checker(mutants), daemon=True).start() - - # Now do mutation - for m, mutant_name, result in mutants: + mutation_data.estimated_time_of_tests_by_mutant[mutant_name] = estimated_time_of_tests print_stats(source_file_mutation_data_by_path) - mutant_name = mutant_name.replace("__init__.", "") - # Rerun mutant if it's explicitly mentioned, but otherwise let the result stand if not mutant_names and result is not None: continue - tests = mutmut.tests_by_mangled_function_name.get(mangled_name_from_mutant_name(mutant_name), set()) - if not tests: - m.exit_code_by_key[mutant_name] = 33 - m.save() + mutation_data.exit_code_by_key[mutant_name] = 33 + mutation_data.save() continue failed_type_check_mutant = mutants_caught_by_type_checker.get(mutant_name) if failed_type_check_mutant: - m.exit_code_by_key[mutant_name] = 37 - m.type_check_error_by_key[mutant_name] = failed_type_check_mutant.error.error_description - m.save() + mutation_data.exit_code_by_key[mutant_name] = 37 + mutation_data.type_check_error_by_key[mutant_name] = failed_type_check_mutant.error.error_description + mutation_data.save() continue pid = os.fork() - if not pid: + if pid == 0: # In the child os.environ["MUTANT_UNDER_TEST"] = mutant_name setproctitle(f"mutmut: {mutant_name}") @@ -1386,21 +1049,22 @@ def read_one_child_exit_status() -> None: if not sorted_tests: os._exit(33) - estimated_time_of_tests = m.estimated_time_of_tests_by_mutant[mutant_name] - cpu_time_limit = ceil((estimated_time_of_tests + 1) * 30 + process_time()) + cpu_time_limit_s = ceil((estimated_time_of_tests + 1) * 30 + process_time()) # signal SIGXCPU after . One second later signal SIGKILL if it is still running - resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit, cpu_time_limit + 1)) + resource.setrlimit(resource.RLIMIT_CPU, (cpu_time_limit_s, cpu_time_limit_s + 1)) with CatchOutput(): - test_result = runner.run_tests(mutant_name=mutant_name, tests=sorted_tests) + result = runner.run_tests(mutant_name=mutant_name, tests=sorted_tests) - if test_result != 0: + if result != 0: pass - os._exit(test_result) + os._exit(result) else: # in the parent - source_file_mutation_data_by_pid[pid] = m - m.register_pid(pid=pid, key=mutant_name) + wall_time_limit_s = (estimated_time_of_tests + 1) * 15 + register_timeout(pid=pid, timeout_s=wall_time_limit_s) + source_file_mutation_data_by_pid[pid] = mutation_data + mutation_data.register_pid(pid=pid, key=mutant_name) running_children += 1 if running_children >= max_children: @@ -1418,12 +1082,14 @@ def read_one_child_exit_status() -> None: except KeyboardInterrupt: print("Stopping...") stop_all_children(mutants) + finally: + gc.unfreeze() - t = datetime.now() - start + elapsed_time = datetime.now() - start print_stats(source_file_mutation_data_by_path, force_output=True) print() - print(f"{count_tried / t.total_seconds():.2f} mutations/second") + print(f"{count_tried / elapsed_time.total_seconds():.2f} mutations/second") if mutant_names: print() @@ -1455,7 +1121,7 @@ def tests_for_mutant_names(mutant_names: tuple[str, ...] | list[str]) -> set[str @cli.command() @click.option("--all", default=False) def results(all: bool) -> None: - ensure_config_loaded() + Config.ensure_loaded() for path in walk_source_files(): if not str(path).endswith(".py"): continue @@ -1511,9 +1177,8 @@ def read_mutant_function(module: cst.Module, mutant_name: str) -> cst.FunctionDe def find_mutant(mutant_name: str) -> SourceFileMutationData: - assert mutmut.config is not None for path in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(path): + if Config.get().should_ignore_for_mutation(path): continue m = SourceFileMutationData(path=path) @@ -1524,7 +1189,11 @@ def find_mutant(mutant_name: str) -> SourceFileMutationData: raise FileNotFoundError(f"Could not find mutant {mutant_name}") -def get_diff_for_mutant(mutant_name: str, source: str | None = None, path: Path | None = None) -> str: +def get_diff_for_mutant( + mutant_name: str, + source: str | None = None, + path: Path | str | None = None, +) -> str: if path is None: m = find_mutant(mutant_name) path = m.path @@ -1555,7 +1224,7 @@ def get_diff_for_mutant(mutant_name: str, source: str | None = None, path: Path @cli.command() @click.argument("mutant_name") def show(mutant_name: str) -> None: - ensure_config_loaded() + Config.ensure_loaded() print(get_diff_for_mutant(mutant_name)) return @@ -1564,7 +1233,7 @@ def show(mutant_name: str) -> None: @click.argument("mutant_name") def apply(mutant_name: str) -> None: # try: - ensure_config_loaded() + Config.ensure_loaded() apply_mutant(mutant_name) # except FileNotFoundError as e: # print(e) @@ -1595,7 +1264,7 @@ def apply_mutant(mutant_name: str) -> None: @cli.command() @click.option("--show-killed", is_flag=True, default=False, help="Display mutants killed by tests and type checker.") def browse(show_killed: bool) -> None: - ensure_config_loaded() + Config.ensure_loaded() from rich.syntax import Syntax from textual.app import App @@ -1605,8 +1274,8 @@ def browse(show_killed: bool) -> None: from textual.widgets import Footer from textual.widgets import Static - class ResultBrowser(App): # type: ignore[type-arg] - loading_id: str | None = None + class ResultBrowser(App[None]): + loading_id = None CSS_PATH = "result_browser_layout.tcss" BINDINGS = [ ("q", "quit()", "Quit"), @@ -1623,6 +1292,7 @@ class ResultBrowser(App): # type: ignore[type-arg] cursor_type = "row" source_file_mutation_data_and_stat_by_path: dict[str, tuple[SourceFileMutationData, Stat]] = {} + path_by_name: dict[str, Path] = {} def compose(self) -> Iterable[Any]: with Container(classes="container"): @@ -1649,13 +1319,12 @@ def on_mount(self) -> None: self.populate_files_table() def read_data(self) -> None: - ensure_config_loaded() - assert mutmut.config is not None + Config.ensure_loaded() self.source_file_mutation_data_and_stat_by_path = {} self.path_by_name: dict[str, Path] = {} for p in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(p): + if Config.get().should_ignore_for_mutation(p): continue source_file_mutation_data = SourceFileMutationData(path=p) source_file_mutation_data.load() @@ -1745,7 +1414,7 @@ def on_data_table_row_highlighted(self, event: Any) -> None: diff_view.update("") def load_thread() -> None: - ensure_config_loaded() + Config.ensure_loaded() try: d = get_diff_for_mutant(event.row_key.value, path=path) if event.row_key.value == self.loading_id: @@ -1756,10 +1425,14 @@ def load_thread() -> None: t = Thread(target=load_thread) t.start() - def retest(self, pattern: str) -> None: + def retest(self, pattern: str | None) -> None: + if pattern is None: + return self._run_subprocess_command("run", [pattern]) - def view_tests(self, mutant_name: str) -> None: + def view_tests(self, mutant_name: str | None) -> None: + if mutant_name is None: + return self._run_subprocess_command("tests-for-mutant", [mutant_name]) def _run_subprocess_command(self, command: str, args: list[str]) -> None: @@ -1778,14 +1451,13 @@ def get_mutant_name_from_selection(self) -> str | None: # noinspection PyTypeChecker mutants_table: DataTable[Any] = self.query_one("#mutants") # type: ignore[assignment] if mutants_table.cursor_row is None: - return + return None - return str(mutants_table.get_row_at(mutants_table.cursor_row)[0]) + result: str = mutants_table.get_row_at(mutants_table.cursor_row)[0] + return result def action_retest_mutant(self) -> None: - name = self.get_mutant_name_from_selection() - if name is not None: - self.retest(name) + self.retest(self.get_mutant_name_from_selection()) def action_retest_function(self) -> None: name = self.get_mutant_name_from_selection() @@ -1798,7 +1470,7 @@ def action_retest_module(self) -> None: self.retest(name.rpartition(".")[0] + ".*") def action_apply_mutant(self) -> None: - ensure_config_loaded() + Config.ensure_loaded() # noinspection PyTypeChecker mutants_table: DataTable[Any] = self.query_one("#mutants") # type: ignore[assignment] if mutants_table.cursor_row is None: diff --git a/src/mutmut/code_coverage.py b/src/mutmut/code_coverage.py index 17e43f31..bbf401b7 100644 --- a/src/mutmut/code_coverage.py +++ b/src/mutmut/code_coverage.py @@ -22,11 +22,11 @@ def get_covered_lines_for_file(filename: str, covered_lines: dict[str, set[int]] return None abs_filename = str((Path("mutants") / filename).absolute()) - lines = None + lines: set[int] = set() if abs_filename in covered_lines: - lines = covered_lines[abs_filename] + lines = set(covered_lines[abs_filename]) - return lines or set() + return lines # Gathers coverage for the given source files and @@ -52,11 +52,8 @@ def gather_coverage(runner: TestRunner, source_files: Iterable[Path]) -> dict[st for filename in source_files: abs_filename = str((mutants_path / filename).absolute()) - lines = coverage_data.lines(abs_filename) - if lines is None: - # file was not imported during test run, e.g. because test selection excluded this file - lines = [] - covered_lines[abs_filename] = set(lines) + lines = set(coverage_data.lines(abs_filename) or []) + covered_lines[abs_filename] = lines _unload_modules_not_in(modules) diff --git a/src/mutmut/configuration.py b/src/mutmut/configuration.py new file mode 100644 index 00000000..b8fa224a --- /dev/null +++ b/src/mutmut/configuration.py @@ -0,0 +1,162 @@ +from __future__ import annotations + +import fnmatch +import os +import platform +import sys +from collections.abc import Callable +from configparser import ConfigParser +from configparser import NoOptionError +from configparser import NoSectionError +from dataclasses import dataclass +from os.path import isdir +from os.path import isfile +from pathlib import Path +from typing import Any + + +def _config_reader() -> Callable[[str, Any], Any]: + path = Path("pyproject.toml") + if path.exists(): + if sys.version_info >= (3, 11): + from tomllib import loads + else: + # noinspection PyPackageRequirements + from toml import loads + data = loads(path.read_text("utf-8")) + + try: + config = data["tool"]["mutmut"] + except KeyError: + pass + else: + + def toml_conf(key: str, default: Any) -> Any: + try: + result = config[key] + except KeyError: + return default + return result + + return toml_conf + + config_parser = ConfigParser() + config_parser.read("setup.cfg") + + def setup_cfg_conf(key: str, default: Any) -> Any: + try: + result = config_parser.get("mutmut", key) + except (NoOptionError, NoSectionError): + return default + if isinstance(default, list): + if "\n" in result: + return [x for x in result.split("\n") if x] + else: + return [result] + elif isinstance(default, bool): + return result.lower() in ("1", "t", "true") + elif isinstance(default, int): + return int(result) + return result + + return setup_cfg_conf + + +def _guess_paths_to_mutate() -> list[str]: + """Guess the path to source code to mutate + + :rtype: str + """ + this_dir = os.getcwd().split(os.sep)[-1] + if isdir("lib"): + return ["lib"] + elif isdir("src"): + return ["src"] + elif isdir(this_dir): + return [this_dir] + elif isdir(this_dir.replace("-", "_")): + return [this_dir.replace("-", "_")] + elif isdir(this_dir.replace(" ", "_")): + return [this_dir.replace(" ", "_")] + elif isdir(this_dir.replace("-", "")): + return [this_dir.replace("-", "")] + elif isdir(this_dir.replace(" ", "")): + return [this_dir.replace(" ", "")] + if isfile(this_dir + ".py"): + return [this_dir + ".py"] + raise FileNotFoundError( + "Could not figure out where the code to mutate is. " + 'Please specify it by adding "paths_to_mutate=code_dir" in setup.cfg to the [mutmut] section.' + ) + + +def _load_config() -> Config: + s = _config_reader() + + return Config( + do_not_mutate=s("do_not_mutate", []), + also_copy=[Path(y) for y in s("also_copy", [])] + + [ + Path("tests/"), + Path("test/"), + Path("setup.cfg"), + Path("pyproject.toml"), + ] + + list(Path(".").glob("test*.py")), + max_stack_depth=s("max_stack_depth", -1), + debug=s("debug", False), + mutate_only_covered_lines=s("mutate_only_covered_lines", False), + paths_to_mutate=[Path(y) for y in s("paths_to_mutate", [])] or [Path(p) for p in _guess_paths_to_mutate()], + tests_dir=s("tests_dir", []), + pytest_add_cli_args=s("pytest_add_cli_args", []), + pytest_add_cli_args_test_selection=s("pytest_add_cli_args_test_selection", []), + type_check_command=s("type_check_command", []), + use_setproctitle=s( + "use_setproctitle", not platform.system() == "Darwin" + ), # False on Mac, true otherwise as default (https://github.com/boxed/mutmut/pull/450#issuecomment-4002571055) + ) + + +_config: Config | None = None + + +@dataclass +class Config: + also_copy: list[Path] + do_not_mutate: list[str] + max_stack_depth: int + debug: bool + paths_to_mutate: list[Path] + pytest_add_cli_args: list[str] + pytest_add_cli_args_test_selection: list[str] + tests_dir: list[str] + mutate_only_covered_lines: bool + type_check_command: list[str] + use_setproctitle: bool + + def should_ignore_for_mutation(self, path: Path | str) -> bool: + path_str = str(path) + if not path_str.endswith(".py"): + return True + for p in self.do_not_mutate: + if fnmatch.fnmatch(path_str, p): + return True + return False + + @staticmethod + def ensure_loaded() -> None: + global _config + if _config is None: + _config = _load_config() + + @staticmethod + def get() -> Config: + global _config + Config.ensure_loaded() + assert _config is not None + return _config + + @staticmethod + def reset() -> None: + global _config + _config = None diff --git a/src/mutmut/mutation/__init__.py b/src/mutmut/mutation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/mutation/data.py b/src/mutmut/mutation/data.py new file mode 100644 index 00000000..46a1f51b --- /dev/null +++ b/src/mutmut/mutation/data.py @@ -0,0 +1,61 @@ +import json +import os +import signal +from datetime import datetime +from pathlib import Path + + +class SourceFileMutationData: + def __init__(self, *, path: Path | str) -> None: + self.estimated_time_of_tests_by_mutant: dict[str, float] = {} + self.path = path + self.meta_path = Path("mutants") / (str(path) + ".meta") + self.key_by_pid: dict[int, str] = {} + self.exit_code_by_key: dict[str, int | None] = {} + self.durations_by_key: dict[str, float] = {} + self.start_time_by_pid: dict[int, datetime] = {} + self.type_check_error_by_key: dict[str, str | None] = {} + + def load(self) -> None: + try: + with open(self.meta_path) as f: + meta = json.load(f) + except FileNotFoundError: + return + + self.exit_code_by_key = meta.pop("exit_code_by_key") + self.type_check_error_by_key = meta.pop("type_check_error_by_key", {}) + self.durations_by_key = meta.pop("durations_by_key") + self.estimated_time_of_tests_by_mutant = meta.pop("estimated_durations_by_key") + assert not meta, f"Meta file {self.meta_path} constains unexpected keys: {set(meta.keys())}" + + def register_pid(self, *, pid: int, key: str) -> None: + self.key_by_pid[pid] = key + self.start_time_by_pid[pid] = datetime.now() + + def register_result(self, *, pid: int, exit_code: int) -> None: + assert self.key_by_pid[pid] in self.exit_code_by_key + key = self.key_by_pid[pid] + self.exit_code_by_key[key] = exit_code + self.durations_by_key[key] = (datetime.now() - self.start_time_by_pid[pid]).total_seconds() + # TODO: maybe rate limit this? Saving on each result can slow down mutation testing a lot if the test run is fast. + del self.key_by_pid[pid] + del self.start_time_by_pid[pid] + self.save() + + def stop_children(self) -> None: + for pid in self.key_by_pid.keys(): + os.kill(pid, signal.SIGTERM) + + def save(self) -> None: + with open(self.meta_path, "w") as f: + json.dump( + { + "exit_code_by_key": self.exit_code_by_key, + "type_check_error_by_key": self.type_check_error_by_key, + "durations_by_key": self.durations_by_key, + "estimated_durations_by_key": self.estimated_time_of_tests_by_mutant, + }, + f, + indent=4, + ) diff --git a/src/mutmut/mutation/enum_mutation.py b/src/mutmut/mutation/enum_mutation.py new file mode 100644 index 00000000..3b3c6317 --- /dev/null +++ b/src/mutmut/mutation/enum_mutation.py @@ -0,0 +1,34 @@ +"""Enum class detection and method type classification for mutation handling.""" + +import libcst as cst + +# Known enum base class names from the standard library +ENUM_BASE_CLASSES = frozenset({"Enum", "IntEnum", "Flag", "IntFlag", "StrEnum"}) + + +def is_enum_class(node: cst.ClassDef) -> bool: + """Check if a ClassDef inherits from any known enum base class. + + Works for: + - class Color(Enum): ... + - class Permission(Flag): ... + - class Status(enum.Enum): ... (Attribute access) + + Limitations: + - Cannot detect aliased imports: from enum import Enum as E + - Cannot detect custom enum base classes + """ + for base_arg in node.bases: + base = base_arg.value + + # Case 1: Simple name like `Enum`, `Flag`, `IntEnum` + if isinstance(base, cst.Name): + if base.value in ENUM_BASE_CLASSES: + return True + + # Case 2: Attribute access like `enum.Enum`, `enum.Flag` + elif isinstance(base, cst.Attribute): + if isinstance(base.attr, cst.Name) and base.attr.value in ENUM_BASE_CLASSES: + return True + + return False diff --git a/src/mutmut/file_mutation.py b/src/mutmut/mutation/file_mutation.py similarity index 51% rename from src/mutmut/file_mutation.py rename to src/mutmut/mutation/file_mutation.py index e3217282..252d2470 100644 --- a/src/mutmut/file_mutation.py +++ b/src/mutmut/mutation/file_mutation.py @@ -5,18 +5,31 @@ from collections.abc import Mapping from collections.abc import Sequence from dataclasses import dataclass +from pathlib import Path from typing import Union +from typing import cast import libcst as cst import libcst.matchers as m from libcst.metadata import MetadataWrapper from libcst.metadata import PositionProvider -from mutmut.node_mutation import OPERATORS_TYPE -from mutmut.node_mutation import mutation_operators -from mutmut.trampoline_templates import create_trampoline_lookup -from mutmut.trampoline_templates import mangle_function_name -from mutmut.trampoline_templates import trampoline_impl +from mutmut.configuration import Config +from mutmut.mutation.enum_mutation import is_enum_class +from mutmut.mutation.mutators import OPERATORS_TYPE +from mutmut.mutation.mutators import MethodType +from mutmut.mutation.mutators import get_method_type +from mutmut.mutation.mutators import mutation_operators +from mutmut.mutation.pragma_handling import parse_pragma_lines +from mutmut.mutation.trampoline_templates import build_enum_trampoline +from mutmut.mutation.trampoline_templates import build_mutants_dict_and_name +from mutmut.mutation.trampoline_templates import mangle_function_name +from mutmut.mutation.trampoline_templates import trampoline_impl +from mutmut.type_checking import TypeCheckingError +from mutmut.type_checking import run_type_checker +from mutmut.utils.file_utils import change_cwd +from mutmut.utils.format_utils import get_mutant_name +from mutmut.utils.format_utils import is_mutated_method_name NEVER_MUTATE_FUNCTION_NAMES = {"__getattribute__", "__setattr__", "__new__"} NEVER_MUTATE_FUNCTION_CALLS = {"len", "isinstance"} @@ -26,29 +39,39 @@ class Mutation: original_node: cst.CSTNode mutated_node: cst.CSTNode - contained_by_top_level_function: cst.CSTNode | None + contained_by_top_level_function: cst.FunctionDef | None def mutate_file_contents(filename: str, code: str, covered_lines: set[int] | None = None) -> tuple[str, Sequence[str]]: """Create mutations for `code` and merge them to a single mutated file with trampolines. :return: A tuple of (mutated code, list of mutant function names)""" - module, mutations = create_mutations(code, covered_lines) + module, mutations, ignored_classes, ignored_functions = create_mutations(code, covered_lines) - return combine_mutations_to_source(module, mutations) + mutated_code, mutant_names = combine_mutations_to_source(module, mutations, ignored_classes, ignored_functions) + # TODO: implement function hashing to skip testing unchanged functions -def create_mutations(code: str, covered_lines: set[int] | None = None) -> tuple[cst.Module, list[Mutation]]: - """Parse the code and create mutations.""" - ignored_lines = pragma_no_mutate_lines(code) + return mutated_code, mutant_names + + +def create_mutations( + code: str, covered_lines: set[int] | None = None +) -> tuple[cst.Module, list[Mutation], set[str], set[str]]: + """Parse the code and create mutations. + + :return: A tuple of (module, mutations, ignored_classes, ignored_functions)""" + ignored_lines, ignored_class_lines, ignored_function_lines = parse_pragma_lines(code) module = cst.parse_module(code) metadata_wrapper = MetadataWrapper(module) - visitor = MutationVisitor(mutation_operators, ignored_lines, covered_lines) + visitor = MutationVisitor( + mutation_operators, ignored_lines, covered_lines, ignored_class_lines, ignored_function_lines + ) module = metadata_wrapper.visit(visitor) - return module, visitor.mutations + return module, visitor.mutations, visitor.ignored_classes, visitor.ignored_functions class OuterFunctionProvider(cst.BatchableMetadataProvider[cst.CSTNode | None]): @@ -68,7 +91,7 @@ def bar(): def __init__(self) -> None: super().__init__() - def visit_Module(self, node: cst.Module) -> bool | None: + def visit_Module(self, node: cst.Module) -> bool: for child in node.body: if isinstance(child, cst.FunctionDef): # mark all nodes inside the function to belong to this function @@ -103,11 +126,22 @@ class MutationVisitor(cst.CSTVisitor): METADATA_DEPENDENCIES = (PositionProvider, OuterFunctionProvider) - def __init__(self, operators: OPERATORS_TYPE, ignore_lines: set[int], covered_lines: set[int] | None = None): + def __init__( + self, + operators: OPERATORS_TYPE, + ignore_lines: set[int], + covered_lines: set[int] | None = None, + ignored_class_lines: set[int] | None = None, + ignored_function_lines: set[int] | None = None, + ): self.mutations: list[Mutation] = [] self._operators = operators self._ignored_lines = ignore_lines self._covered_lines = covered_lines + self._ignored_class_lines = ignored_class_lines or set() + self._ignored_function_lines = ignored_function_lines or set() + self.ignored_classes: set[str] = set() + self.ignored_functions: set[str] = set() def on_visit(self, node: cst.CSTNode) -> bool: if self._skip_node_and_children(node): @@ -126,7 +160,7 @@ def _create_mutations(self, node: cst.CSTNode) -> None: mutation = Mutation( original_node=node, mutated_node=mutated_node, - contained_by_top_level_function=self.get_metadata(OuterFunctionProvider, node, None), + contained_by_top_level_function=self.get_metadata(OuterFunctionProvider, node, None), # type: ignore ) self.mutations.append(mutation) @@ -146,6 +180,20 @@ def _should_mutate_node(self, node: cst.CSTNode) -> bool: return True def _skip_node_and_children(self, node: cst.CSTNode) -> bool: + # Check if this is a class with pragma: no mutate class + if isinstance(node, cst.ClassDef): + position = self.get_metadata(PositionProvider, node, None) + if position and position.start.line in self._ignored_class_lines: + self.ignored_classes.add(node.name.value) + return True + + # Check if this is a function with pragma: no mutate function + if isinstance(node, cst.FunctionDef): + position = self.get_metadata(PositionProvider, node, None) + if position and position.start.line in self._ignored_function_lines: + self.ignored_functions.add(node.name.value) + return True + if ( isinstance(node, cst.Call) and isinstance(node.func, cst.Name) @@ -172,7 +220,14 @@ def _skip_node_and_children(self, node: cst.CSTNode) -> bool: # 1) copying them for the trampoline setup can cause side effects (e.g. multiple @app.post("/foo") definitions) # 2) decorators are executed when the function is defined, so we don't want to mutate their arguments and cause exceptions # 3) @property decorators break the trampoline signature assignment (which expects it to be a function) - if isinstance(node, (cst.FunctionDef, cst.ClassDef)) and len(node.decorators): + # Exception: @staticmethod and @classmethod are allowed because they are predictable and it's easy to set up trampolines for them + if isinstance(node, cst.FunctionDef) and len(node.decorators): + if len(node.decorators) == 1: + decorator = node.decorators[0].decorator + if isinstance(decorator, cst.Name) and decorator.value in ("staticmethod", "classmethod"): + return False + return True + if isinstance(node, cst.ClassDef) and len(node.decorators): return True return False @@ -185,12 +240,21 @@ def _skip_node_and_children(self, node: cst.CSTNode) -> bool: trampoline_impl_cst[-1] = trampoline_impl_cst[-1].with_changes(leading_lines=[cst.EmptyLine(), cst.EmptyLine()]) -def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation]) -> tuple[str, Sequence[str]]: +def combine_mutations_to_source( + module: cst.Module, + mutations: Sequence[Mutation], + ignored_classes: set[str] | None = None, + ignored_functions: set[str] | None = None, +) -> tuple[str, Sequence[str]]: """Create mutated functions and trampolines for all mutations and compile them to a single source code. :param module: The original parsed module :param mutations: Mutations that should be applied. + :param ignored_classes: Class names to skip transformation for (e.g., enums with pragma: no mutate class) + :param ignored_functions: Function names to skip transformation for (pragma: no mutate function) :return: Mutated code and list of mutation names""" + ignored_classes = ignored_classes or set() + ignored_functions = ignored_functions or set() # copy start of the module (in particular __future__ imports) result: list[MODULE_STATEMENT] = get_statements_until_func_or_class(module.body) @@ -222,19 +286,38 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation if not isinstance(cls.body, cst.IndentedBlock): # we don't mutate single-line classes, e.g. `class A: a = 1; b = 2` result.append(cls) + elif is_enum_class(cls): + external_nodes, modified_cls, enum_mutant_names = enum_trampoline_arrangement( + cls, mutations_within_function + ) + result.extend(external_nodes) + result.append(modified_cls) + mutation_names.extend(enum_mutant_names) else: + external_nodes_for_class: list[MODULE_STATEMENT] = [] mutated_body = [] for method in cls.body.body: method_mutants = mutations_within_function.get(method) if not isinstance(method, cst.FunctionDef) or not method_mutants: mutated_body.append(method) continue - nodes, mutant_names = function_trampoline_arrangement( - method, method_mutants, class_name=cls.name.value - ) - mutated_body.extend(nodes) - mutation_names.extend(mutant_names) + method_type = get_method_type(method) + if method_type in (MethodType.STATICMETHOD, MethodType.CLASSMETHOD): + ext_nodes, assignment, method_mutant_names = _external_method_injection( + method, method_mutants, cls.name.value, method_type + ) + external_nodes_for_class.extend(ext_nodes) + mutated_body.append(assignment) + mutation_names.extend(method_mutant_names) + else: + nodes, mutant_names = function_trampoline_arrangement( + method, method_mutants, class_name=cls.name.value + ) + mutated_body.extend(nodes) + mutation_names.extend(mutant_names) + + result.extend(external_nodes_for_class) result.append(cls.with_changes(body=cls.body.with_changes(body=mutated_body))) else: result.append(statement) @@ -243,6 +326,59 @@ def combine_mutations_to_source(module: cst.Module, mutations: Sequence[Mutation return mutated_module.code, mutation_names +def _external_method_injection( + method: cst.FunctionDef, mutants: Sequence[Mutation], class_name: str, method_type: MethodType +) -> tuple[Sequence[MODULE_STATEMENT], cst.SimpleStatementLine, Sequence[str]]: + """Create external trampoline for a method using external injection pattern. + + This moves mutation code outside the class and uses a simple assignment + inside the class body. Works for staticmethod, classmethod, and instance methods. + + :param method: The method to create external trampoline for + :param mutants: The mutations for this method + :param class_name: The containing class name + :param method_type: MethodType.STATICMETHOD, MethodType.CLASSMETHOD, or MethodType.INSTANCE + :return: A tuple of (external_nodes, class_body_assignment, mutant_names) + """ + external_nodes: list[MODULE_STATEMENT] = [] + mutant_names: list[str] = [] + method_name = method.name.value + prefix = f"_{class_name}_{method_name}" + mangled_name = mangle_function_name(name=method_name, class_name=class_name) + "__mutmut" + + stringify = StringifyAnnotations() + + orig_func = method.with_changes(name=cst.Name(f"{prefix}_orig"), decorators=[]) + orig_func = cast(cst.FunctionDef, orig_func.visit(stringify)) + external_nodes.append(orig_func) + + for i, mutant in enumerate(mutants): + mutant_func_name = f"{prefix}_mutant_{i + 1}" + full_mutant_name = f"{mangled_name}_{i + 1}" + mutant_names.append(full_mutant_name) + + mutated = method.with_changes(name=cst.Name(mutant_func_name), decorators=[]) + mutated = cast(cst.FunctionDef, deep_replace(mutated, mutant.original_node, mutant.mutated_node)) + mutated = cast(cst.FunctionDef, mutated.visit(stringify)) + external_nodes.append(mutated) + trampoline_code = build_enum_trampoline( + class_name=class_name, method_name=method_name, mutant_names=mutant_names, method_type=method_type + ) + trampoline_nodes = list(cst.parse_module(trampoline_code).body) + external_nodes.extend(trampoline_nodes) + + if method_type == MethodType.STATICMETHOD: + assignment_code = f"{method_name} = staticmethod({prefix}_trampoline)" + elif method_type == MethodType.CLASSMETHOD: + assignment_code = f"{method_name} = classmethod({prefix}_trampoline)" + else: + assignment_code = f"{method_name} = {prefix}_trampoline" + + assignment = cast(cst.SimpleStatementLine, cst.parse_statement(assignment_code)) + + return external_nodes, assignment, mutant_names + + def function_trampoline_arrangement( function: cst.FunctionDef, mutants: Iterable[Mutation], class_name: str | None ) -> tuple[Sequence[MODULE_STATEMENT], Sequence[str]]: @@ -266,16 +402,19 @@ def function_trampoline_arrangement( for i, mutant in enumerate(mutants): mutant_name = f"{mangled_name}_{i + 1}" mutant_names.append(mutant_name) - mutated_method_base = function.with_changes(name=cst.Name(mutant_name)) - mutated_method_result = deep_replace(mutated_method_base, mutant.original_node, mutant.mutated_node) - nodes.append(mutated_method_result) # type: ignore[arg-type] - - mutants_dict = list( - cst.parse_module(create_trampoline_lookup(orig_name=name, mutants=mutant_names, class_name=class_name)).body + mutated_method = function.with_changes(name=cst.Name(mutant_name)) + mutated_method = cast(cst.FunctionDef, deep_replace(mutated_method, mutant.original_node, mutant.mutated_node)) + nodes.append(mutated_method) + + # trampoline that forwards the calls + mutants_dict_code = build_mutants_dict_and_name( + orig_name=name, + class_name=class_name, + mutants=mutant_names, ) - mutants_dict[0] = mutants_dict[0].with_changes(leading_lines=[cst.EmptyLine()]) - - nodes.extend(mutants_dict) + mutants_dict_nodes = list(cst.parse_module(mutants_dict_code).body) + mutants_dict_nodes[0] = mutants_dict_nodes[0].with_changes(leading_lines=[cst.EmptyLine()]) + nodes.extend(mutants_dict_nodes) return nodes, mutant_names @@ -324,13 +463,13 @@ def _get_local_name(func_name: str) -> cst.BaseExpression: ], ) # for non-async functions, simply return the value or generator - result_statement = cst.SimpleStatementLine([cst.Return(result)]) + result_statement: cst.BaseStatement = cst.SimpleStatementLine([cst.Return(result)]) if function.asynchronous: is_generator = _is_generator(function) if is_generator: # async for i in _mutmut_trampoline(...): yield i - result_statement = cst.For( # type: ignore[assignment] + result_statement = cst.For( target=cst.Name("i"), iter=result, body=cst.IndentedBlock([cst.SimpleStatementLine([cst.Expr(cst.Yield(cst.Name("i")))])]), @@ -354,6 +493,53 @@ def _get_local_name(func_name: str) -> cst.BaseExpression: ) +def enum_trampoline_arrangement( + cls: cst.ClassDef, mutations_by_method: Mapping[cst.CSTNode, Sequence[Mutation]] +) -> tuple[Sequence[MODULE_STATEMENT], cst.ClassDef, Sequence[str]]: + """Create external functions and minimal enum class for enum mutation. + + This pattern moves all mutation-related code OUTSIDE the enum class body, + avoiding the enum metaclass conflict that occurs when class-level attributes + are added. The enum class only contains simple method assignments. + + :param cls: The enum class definition + :param mutations_by_method: Mapping of method nodes to their mutations + :return: A tuple of (external_nodes, modified_class, mutant_names) + """ + external_nodes: list[MODULE_STATEMENT] = [] + mutant_names: list[str] = [] + new_body: list[cst.BaseStatement | cst.BaseSmallStatement] = [] + class_name = cls.name.value + + for item in cls.body.body: + if not isinstance(item, cst.FunctionDef): + new_body.append(item) + continue + + method = item + method_mutants = mutations_by_method.get(method) + + if not method_mutants: + new_body.append(method) + continue + + method_type = get_method_type(method) + if method_type is None: + new_body.append(method) + continue + + ext_nodes, assignment, method_mutant_names = _external_method_injection( + method, method_mutants, class_name, method_type + ) + external_nodes.extend(ext_nodes) + new_body.append(assignment) + mutant_names.extend(method_mutant_names) + + modified_cls = cls.with_changes(body=cls.body.with_changes(body=new_body)) + + return external_nodes, modified_cls, mutant_names + + def get_statements_until_func_or_class(statements: Sequence[MODULE_STATEMENT]) -> list[MODULE_STATEMENT]: """Get all statements until we encounter the first function or class definition""" result: list[MODULE_STATEMENT] = [] @@ -383,9 +569,11 @@ def pragma_no_mutate_lines(source: str) -> set[int]: } -def deep_replace(tree: cst.CSTNode, old_node: cst.CSTNode, new_node: cst.CSTNode) -> cst.CSTNode: +def deep_replace( + tree: cst.CSTNode, old_node: cst.CSTNode, new_node: cst.CSTNode +) -> cst.CSTNode | cst.RemovalSentinel | cst.FlattenSentinel[cst.CSTNode]: """Like the CSTNode.deep_replace method, except that we only replace up to one occurrence of old_node.""" - return tree.visit(ChildReplacementTransformer(old_node, new_node)) # type: ignore[return-value] + return tree.visit(ChildReplacementTransformer(old_node, new_node)) class ChildReplacementTransformer(cst.CSTTransformer): @@ -407,6 +595,26 @@ def on_leave(self, original_node: cst.CSTNode, updated_node: cst.CSTNode) -> cst return updated_node +class StringifyAnnotations(cst.CSTTransformer): + """Convert type annotations to string literals to avoid NameError from forward references. + + When methods are extracted from a class body and placed before the class definition, + annotations referencing the class (e.g., -> AsyncGenerator[Clazz, None]) would fail. + This transformer turns them into string literals (e.g., -> "AsyncGenerator[Clazz, None]"). + + This allows for mutations to be placed anywhere in the file, improving the resilience of + the mutation process without breaking type checking. + """ + + _empty_module = cst.parse_module("") + + def leave_Annotation(self, original_node: cst.Annotation, updated_node: cst.Annotation) -> cst.Annotation: + if isinstance(updated_node.annotation, (cst.SimpleString, cst.ConcatenatedString, cst.FormattedString)): + return updated_node + source = self._empty_module.code_for_node(updated_node.annotation) + return updated_node.with_changes(annotation=cst.SimpleString(f'"{source}"')) + + def _is_generator(function: cst.FunctionDef) -> bool: """Return True if the function has yield statement(s).""" visitor = IsGeneratorVisitor(function) @@ -423,7 +631,6 @@ def __init__(self, original_function: cst.FunctionDef): self.original_function: cst.FunctionDef = original_function def visit_FunctionDef(self, node: cst.FunctionDef) -> bool | None: - # do not recurse into inner function definitions if self.original_function != node: return False return None @@ -431,3 +638,90 @@ def visit_FunctionDef(self, node: cst.FunctionDef) -> bool | None: def visit_Yield(self, node: cst.Yield) -> bool: self.is_generator = True return False + + +@dataclass +class MutatedMethodLocation: + file: Path + function_name: str + line_number_start: int + line_number_end: int + + +@dataclass +class FailedTypeCheckMutant: + method_location: MutatedMethodLocation + name: str + error: TypeCheckingError + + +class MutatedMethodsCollector(cst.CSTVisitor): + METADATA_DEPENDENCIES = (cst.metadata.PositionProvider,) + + def __init__(self, file: Path): + self.file = file + self.found_mutants: list[MutatedMethodLocation] = [] + + def visit_FunctionDef(self, node: cst.FunctionDef) -> bool: + name = node.name.value + if is_mutated_method_name(name): + range = self.get_metadata(cst.metadata.PositionProvider, node) + self.found_mutants.append( + MutatedMethodLocation( + file=self.file, + function_name=name, + line_number_start=range.start.line, + line_number_end=range.end.line, + ) + ) + + # do not continue visting children of this function + # mutated methods are not nested within other methods + return False + + +def group_by_path(errors: list[TypeCheckingError]) -> dict[Path, list[TypeCheckingError]]: + grouped: dict[Path, list[TypeCheckingError]] = defaultdict(list) + + for error in errors: + grouped[error.file_path].append(error) + + return grouped + + +def filter_mutants_with_type_checker() -> dict[str, FailedTypeCheckMutant]: + with change_cwd(Path("mutants")): + errors = run_type_checker(Config.get().type_check_command) + errors_by_path = group_by_path(errors) + + mutants_to_skip: dict[str, FailedTypeCheckMutant] = {} + + for path, errors_of_file in errors_by_path.items(): + with open(path, encoding="utf-8") as file: + source = file.read() + wrapper = cst.MetadataWrapper(cst.parse_module(source)) + visitor = MutatedMethodsCollector(path) + wrapper.visit(visitor) + mutated_methods = visitor.found_mutants + + for error in errors_of_file: + assert error.file_path == visitor.file + mutant = next( + (m for m in mutated_methods if m.line_number_start <= error.line_number <= m.line_number_end), None + ) + if mutant is None: + raise Exception( + f"Could not find mutant for type error {error.file_path}:{error.line_number} ({error.error_description}). " + "Probably, a code mutation influenced types in unexpected locations. " + "If your project normally has no type errors and uses mypy/pyrefly, please file an issue with steps to reproduce on github." + ) + + mutant_name = get_mutant_name(path.relative_to(Path(".").absolute()), mutant.function_name) + + mutants_to_skip[mutant_name] = FailedTypeCheckMutant( + method_location=mutant, + name=mutant_name, + error=error, + ) + + return mutants_to_skip diff --git a/src/mutmut/node_mutation.py b/src/mutmut/mutation/mutators.py similarity index 88% rename from src/mutmut/node_mutation.py rename to src/mutmut/mutation/mutators.py index bf4b1f1d..0429976f 100644 --- a/src/mutmut/node_mutation.py +++ b/src/mutmut/mutation/mutators.py @@ -4,6 +4,7 @@ from collections.abc import Callable from collections.abc import Iterable from collections.abc import Sequence +from enum import Enum from typing import Any from typing import cast @@ -12,7 +13,7 @@ OPERATORS_TYPE = Sequence[ tuple[ - type[cst.CSTNode], + type, Callable[[Any], Iterable[cst.CSTNode]], ] ] @@ -21,6 +22,41 @@ NON_ESCAPE_SEQUENCE = re.compile(r"((? MethodType | None: + """Determine the method type based on decorators. + + Returns: + MethodType.STATICMETHOD - for @staticmethod + MethodType.CLASSMETHOD - for @classmethod + MethodType.INSTANCE - for no decorators (regular instance method) + None - for other/multiple decorators (should be skipped) + """ + if not method.decorators: + return MethodType.INSTANCE + + if len(method.decorators) != 1: + # Multiple decorators - skip + return None + + decorator = method.decorators[0].decorator + if isinstance(decorator, cst.Name): + if decorator.value == "staticmethod": + return MethodType.STATICMETHOD + elif decorator.value == "classmethod": + return MethodType.CLASSMETHOD + + # Other decorator - skip + return None + + def operator_number(node: cst.BaseNumber) -> Iterable[cst.BaseNumber]: if isinstance(node, cst.Integer | cst.Float): yield node.with_changes(value=repr(node.evaluated_value + 1)) @@ -233,11 +269,10 @@ def operator_assignment( if not node.value: # do not mutate `a: sometype` to an assignment `a: sometype = ""` return - mutated_value: cst.BaseExpression if m.matches(node.value, m.Name("None")): mutated_value = cst.SimpleString('""') else: - mutated_value = cst.Name("None") + mutated_value = cst.Name("None") # type: ignore[assignment] yield node.with_changes(value=mutated_value) @@ -251,8 +286,8 @@ def operator_match(node: cst.Match) -> Iterable[cst.CSTNode]: # Operators that should be called on specific node types mutation_operators: OPERATORS_TYPE = [ - (cst.BaseNumber, operator_number), # type: ignore[type-abstract] - (cst.BaseString, operator_string), # type: ignore[type-abstract] + (cst.BaseNumber, operator_number), + (cst.BaseString, operator_string), (cst.Name, operator_name), (cst.Assign, operator_assignment), (cst.AnnAssign, operator_assignment), @@ -263,8 +298,8 @@ def operator_match(node: cst.Match) -> Iterable[cst.CSTNode]: (cst.Call, operator_symmetric_string_methods_swap), (cst.Call, operator_unsymmetrical_string_methods_swap), (cst.Lambda, operator_lambda), - (cst.CSTNode, operator_keywords), # type: ignore[type-abstract] - (cst.CSTNode, operator_swap_op), # type: ignore[type-abstract] + (cst.CSTNode, operator_keywords), + (cst.CSTNode, operator_swap_op), (cst.Match, operator_match), ] @@ -279,3 +314,5 @@ def _simple_mutation_mapping( # TODO: detect regexes and mutate them in nasty ways? Maybe mutate all strings as if they are regexes + +# TODO: implement removal of inner decorators diff --git a/src/mutmut/mutation/pragma_handling.py b/src/mutmut/mutation/pragma_handling.py new file mode 100644 index 00000000..76d7137d --- /dev/null +++ b/src/mutmut/mutation/pragma_handling.py @@ -0,0 +1,41 @@ +"""Pragma comment parsing for mutation control.""" + + +def parse_pragma_lines(source: str) -> tuple[set[int], set[int], set[int]]: + """Parse all pragma: no mutate variants. + + Each set is mutually exclusive. + + Supported pragmas: + - ``# pragma: no mutate`` - skip this line only + - ``# pragma: no mutate class`` - skip entire class + - ``# pragma: no mutate: class`` - skip entire class (alternative syntax) + - ``# pragma: no mutate function`` - skip entire function + - ``# pragma: no mutate: function`` - skip entire function (alternative syntax) + + :return: A tuple of (no_mutate_lines, class_lines, function_lines) + """ + no_mutate_lines: set[int] = set() + class_lines: set[int] = set() + function_lines: set[int] = set() + + for i, line in enumerate(source.split("\n")): + if "# pragma:" not in line: + continue + + pragma_content = line.partition("# pragma:")[-1] + line_num = i + 1 + + if "no mutate" not in pragma_content: + continue + + # Check for specific variants first (more specific matches) + if "no mutate class" in pragma_content or "no mutate: class" in pragma_content: + class_lines.add(line_num) + elif "no mutate function" in pragma_content or "no mutate: function" in pragma_content: + function_lines.add(line_num) + else: + # Generic "no mutate" (not class or function) + no_mutate_lines.add(line_num) + + return no_mutate_lines, class_lines, function_lines diff --git a/src/mutmut/mutation/trampoline_templates.py b/src/mutmut/mutation/trampoline_templates.py new file mode 100644 index 00000000..1ed17139 --- /dev/null +++ b/src/mutmut/mutation/trampoline_templates.py @@ -0,0 +1,159 @@ +from mutmut.mutation.mutators import MethodType + +CLASS_NAME_SEPARATOR = "ǁ" + +GENERATED_MARKER = "# type: ignore # mutmut generated" + + +def _mark_generated(code: str) -> str: + """Append the generated marker comment to every code line in a block.""" + lines = [] + for line in code.splitlines(): + stripped = line.strip() + if stripped and not stripped.startswith("#"): + line = f"{line} {GENERATED_MARKER}" + lines.append(line) + return "\n".join(lines) + + +def mangle_function_name(*, name: str, class_name: str | None) -> str: + assert CLASS_NAME_SEPARATOR not in name + if class_name: + assert CLASS_NAME_SEPARATOR not in class_name + prefix = f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}" + else: + prefix = "x_" + return f"{prefix}{name}" + + +def build_mutants_dict_and_name( + *, + orig_name: str, + mutants: list[str], + class_name: str | None, +) -> str: + """Generate the mutants dictionary and __name__ fix for a function trampoline. + + :param mutants: List of mutant function names (mangled) + :param class_name: The containing class name, or None for top-level functions + :param orig_name: The original function name + :return: String containing the mutants dict and __name__ assignment + """ + mangled_name = mangle_function_name(name=orig_name, class_name=class_name) + + type_annotation = "ClassVar[MutantDict]" if class_name is not None else "MutantDict" + mutants_dict_entries = ",\n".join(f" {repr(m)}: {m}" for m in mutants) + mutants_dict = f"{mangled_name}__mutmut_mutants : {type_annotation} = {{\n{mutants_dict_entries}\n}}" + + return _mark_generated(f""" +{mutants_dict} + +{mangled_name}__mutmut_orig.__name__ = '{mangled_name}' +""") + + +def build_enum_trampoline( + *, class_name: str, method_name: str, mutant_names: list[str], method_type: MethodType +) -> str: + """Generate external trampoline code for enum methods. + + This pattern moves all mutation-related code OUTSIDE the enum class body, + avoiding the enum metaclass conflict. The enum class only contains a simple + assignment like `method_name = _ClassName_method_trampoline`. + + :param class_name: The enum class name + :param method_name: The method being mutated + :param mutant_names: List of mutant function names (mangled) + :param method_type: 'instance', 'static', or 'classmethod' + :return: String containing the external functions and mutants dict + """ + prefix = f"_{class_name}_{method_name}" + mangled_name = mangle_function_name(name=method_name, class_name=class_name) + + # Build mutants dict + mutants_dict_entries = ",\n".join(f" {repr(m)}: {prefix}_mutant_{i + 1}" for i, m in enumerate(mutant_names)) + mutants_dict = f"{prefix}_mutants: MutantDict = {{\n{mutants_dict_entries}\n}}" + + orig_name_fix = f"{prefix}_orig.__name__ = '{mangled_name}'" + + # Build trampoline based on method type + if method_type == MethodType.STATICMETHOD: + trampoline = f""" + +def {prefix}_trampoline(*args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + elif method_type == MethodType.CLASSMETHOD: + trampoline = f""" + +def {prefix}_trampoline(cls, *args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs, cls) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + else: # instance method + trampoline = f""" + +def {prefix}_trampoline(self, *args, **kwargs): + return _mutmut_trampoline({prefix}_orig, {prefix}_mutants, args, kwargs, self) + +{prefix}_trampoline.__name__ = '{method_name}' +""" + + return _mark_generated(f"\n\n{orig_name_fix}\n\n{mutants_dict}\n\n{trampoline}") + + +# noinspection PyUnresolvedReferences +# language=python +trampoline_impl = _mark_generated(""" +from collections.abc import Sequence +from typing import Annotated +from typing import Callable +from typing import ClassVar +from typing import TypeVar + +TReturn = TypeVar('TReturn') +MutantDict = Annotated[dict[str, Callable[..., TReturn]], "Mutant"] + + +def _mutmut_trampoline(orig: Callable[..., TReturn], mutants: MutantDict, call_args: Sequence, call_kwargs: dict, self_arg = None) -> TReturn: + \"""Forward call to original or mutated function, depending on the environment\""" + import os + mutant_under_test = os.environ.get('MUTANT_UNDER_TEST', '') + if not mutant_under_test: + # No mutant being tested - call original function + if self_arg is not None and not hasattr(orig, '__self__'): + return orig(self_arg, *call_args, **call_kwargs) + else: + return orig(*call_args, **call_kwargs) + if mutant_under_test == 'fail': + from mutmut.__main__ import MutmutProgrammaticFailException + raise MutmutProgrammaticFailException('Failed programmatically') + elif mutant_under_test == 'stats': + from mutmut.__main__ import record_trampoline_hit + record_trampoline_hit(orig.__module__ + '.' + orig.__name__) + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): + result = orig(self_arg, *call_args, **call_kwargs) + else: + result = orig(*call_args, **call_kwargs) + return result + prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' + if not mutant_under_test.startswith(prefix): + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): + result = orig(self_arg, *call_args, **call_kwargs) + else: + result = orig(*call_args, **call_kwargs) + return result + mutant_name = mutant_under_test.rpartition('.')[-1] + if self_arg is not None: + # call to a class method where self is not bound + result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) + else: + result = mutants[mutant_name](*call_args, **call_kwargs) + return result + +""") diff --git a/src/mutmut/threading/__init__.py b/src/mutmut/threading/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/threading/timeout.py b/src/mutmut/threading/timeout.py new file mode 100644 index 00000000..a8acf4f3 --- /dev/null +++ b/src/mutmut/threading/timeout.py @@ -0,0 +1,56 @@ +import heapq +import os +import signal +import time +from threading import Condition +from threading import Thread + +_timeout_heap: list[tuple[float, int]] = [] # (timeout_timestamp, pid) +_heap_lock = Condition() +_checker_started = False + + +def register_timeout(pid: int, timeout_s: float) -> None: + """Register a timeout for a given PID. + + Starts the timeout checker thread if not already started. + + On timeout sends SIGXCPU to the process. + + Args: + pid: The process ID to register the timeout for. + timeout_s: The number of seconds until the timeout occurs. + """ + global _checker_started + if not _checker_started: + _checker_started = True + Thread(target=_timeout_checker_thread, name=f"{os.getpid()}-mutmut-timeout-checker", daemon=True).start() + + deadline = time.time() + timeout_s + with _heap_lock: + heapq.heappush(_timeout_heap, (deadline, pid)) + _heap_lock.notify() + + +def _timeout_checker_thread() -> None: + """Thread function that checks for timeouts and terminates processes. + + We make a trade-off here in the name of simplicity by not exposing a + mechanism to cancel timeouts, which saves us an O(n) operation on each + timeout we would cancel. Instead, we let expired entries for already-terminated + processes remain in the heap until they reach the top and are popped off. + The downside is a bit of memory bloat but each tuple is ~72 bytes so + even with 10,000 backed up timeouts it's less than 1MB. + """ + while True: + with _heap_lock: + while not _timeout_heap: + _heap_lock.wait() + now = time.time() + while _timeout_heap and _timeout_heap[0][0] <= now: + _, pid = heapq.heappop(_timeout_heap) + try: + os.kill(pid, signal.SIGXCPU) + except ProcessLookupError: + pass # Process already terminated + time.sleep(1) diff --git a/src/mutmut/trampoline_templates.py b/src/mutmut/trampoline_templates.py deleted file mode 100644 index 0b051938..00000000 --- a/src/mutmut/trampoline_templates.py +++ /dev/null @@ -1,63 +0,0 @@ -CLASS_NAME_SEPARATOR = "ǁ" - - -def create_trampoline_lookup(*, orig_name: str, mutants: list[str], class_name: str | None) -> str: - mangled_name = mangle_function_name(name=orig_name, class_name=class_name) - - mutants_dict = ( - f"{mangled_name}__mutmut_mutants : ClassVar[MutantDict] = {{ # type: ignore\n" - + ", \n ".join(f"{repr(m)}: {m}" for m in mutants) - + "\n}" - ) - return f""" -{mutants_dict} -{mangled_name}__mutmut_orig.__name__ = '{mangled_name}' -""" - - -def mangle_function_name(*, name: str, class_name: str | None) -> str: - assert CLASS_NAME_SEPARATOR not in name - if class_name: - assert CLASS_NAME_SEPARATOR not in class_name - prefix = f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}" - else: - prefix = "x_" - return f"{prefix}{name}" - - -# noinspection PyUnresolvedReferences -# language=python -trampoline_impl = """ -from typing import Annotated -from typing import Callable -from typing import ClassVar - -MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore - - -def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore - \"""Forward call to original or mutated function, depending on the environment\""" - import os # type: ignore - mutant_under_test = os.environ['MUTANT_UNDER_TEST'] # type: ignore - if mutant_under_test == 'fail': # type: ignore - from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore - raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore - elif mutant_under_test == 'stats': # type: ignore - from mutmut.__main__ import record_trampoline_hit # type: ignore - record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore - # (for class methods, orig is bound and thus does not need the explicit self argument) - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore - if not mutant_under_test.startswith(prefix): # type: ignore - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore - if self_arg is not None: # type: ignore - # call to a class method where self is not bound - result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore - else: - result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore - return result # type: ignore - -""" diff --git a/src/mutmut/utils/__init__.py b/src/mutmut/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/mutmut/utils/file_utils.py b/src/mutmut/utils/file_utils.py new file mode 100644 index 00000000..d3e73be8 --- /dev/null +++ b/src/mutmut/utils/file_utils.py @@ -0,0 +1,14 @@ +import os +from collections.abc import Iterator +from contextlib import contextmanager +from pathlib import Path + + +@contextmanager +def change_cwd(path: Path | str) -> Iterator[None]: + old_cwd = Path(os.getcwd()).resolve() + os.chdir(path) + try: + yield + finally: + os.chdir(old_cwd) diff --git a/src/mutmut/utils/format_utils.py b/src/mutmut/utils/format_utils.py new file mode 100644 index 00000000..a1228719 --- /dev/null +++ b/src/mutmut/utils/format_utils.py @@ -0,0 +1,55 @@ +"""Utility functions for mutmut name formatting and key generation.""" + +import os +from pathlib import Path + +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR + + +def make_mutant_key(func_name: str, class_name: str | None = None) -> str: + """Create a consistent key for identifying a function/method for mutation tracking. + + :param func_name: The function or method name + :param class_name: The containing class name, or None for top-level functions + :return: A key string like "xǁMyClassǁmethod" for methods or "x_foo" for functions + """ + if class_name: + return f"x{CLASS_NAME_SEPARATOR}{class_name}{CLASS_NAME_SEPARATOR}{func_name}" + else: + return f"x_{func_name}" + + +def parse_mutant_key(key: str) -> tuple[str, str | None]: + """Parse a mutant key back into function name and optional class name. + + :param key: A key string like "xǁMyClassǁmethod" or "x_foo" + :return: A tuple of (func_name, class_name) where class_name is None for top-level functions + """ + if CLASS_NAME_SEPARATOR in key: + class_name = key[key.index(CLASS_NAME_SEPARATOR) + 1 : key.rindex(CLASS_NAME_SEPARATOR)] + func_name = key[key.rindex(CLASS_NAME_SEPARATOR) + 1 :] + return func_name, class_name + else: + assert key.startswith("x_"), f"Invalid key format: {key}" + return key[2:], None + + +def is_mutated_method_name(name: str) -> bool: + return name.startswith(("x_", "xǁ")) and "__mutmut" in name + + +def strip_prefix(s: str, *, prefix: str, strict: bool = False) -> str: + if s.startswith(prefix): + return s[len(prefix) :] + assert strict is False, f"String '{s}' does not start with prefix '{prefix}'" + return s + + +def get_mutant_name(relative_source_path: Path, mutant_method_name: str) -> str: + module_name = str(relative_source_path)[: -len(relative_source_path.suffix)].replace(os.sep, ".") + module_name = strip_prefix(module_name, prefix="src.") + + # FYI, we currently use "mutant_name" inconsistently, for both the whole identifier including the path and only the mangled method name + mutant_name = f"{module_name}.{mutant_method_name}" + mutant_name = mutant_name.replace(".__init__.", ".") + return mutant_name diff --git a/src/mutmut/utils/safe_setproctitle.py b/src/mutmut/utils/safe_setproctitle.py new file mode 100644 index 00000000..427ba09a --- /dev/null +++ b/src/mutmut/utils/safe_setproctitle.py @@ -0,0 +1,24 @@ +"""Safe wrapper for setproctitle that handles fork-unsafe behavior on macOS. + +setproctitle uses CoreFoundation APIs on macOS which aren't fork-safe. +Calling setproctitle after fork() causes segfaults in the child process. + +This module provides a safe_setproctitle() function that: +- Works normally on Linux +- Is a no-op on macOS to avoid crashes after fork() + +Related: https://github.com/boxed/mutmut/pull/450#issuecomment-4002571055 +""" + +from mutmut.configuration import Config + +if Config.get().use_setproctitle: + from setproctitle import setproctitle as _setproctitle + + def safe_setproctitle(title: str) -> None: + """Set the process title.""" + _setproctitle(title) +else: + + def safe_setproctitle(title: str) -> None: + """No-op on macOS where setproctitle crashes after fork.""" diff --git a/tests/data/test_generation/__init__.py b/tests/data/test_generation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/conftest.py b/tests/e2e/conftest.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/e2e/e2e_utils.py b/tests/e2e/e2e_utils.py index 30becf80..c7a6ae71 100644 --- a/tests/e2e/e2e_utils.py +++ b/tests/e2e/e2e_utils.py @@ -8,8 +8,8 @@ import mutmut from mutmut.__main__ import SourceFileMutationData from mutmut.__main__ import _run -from mutmut.__main__ import ensure_config_loaded from mutmut.__main__ import walk_source_files +from mutmut.configuration import Config @contextmanager @@ -25,11 +25,12 @@ def change_cwd(path): def read_all_stats_for_project(project_path: Path) -> dict[str, dict]: """Create a single dict from all mutant results in *.meta files""" with change_cwd(project_path): - ensure_config_loaded() + Config.reset() + Config.ensure_loaded() stats = {} for p in walk_source_files(): - if mutmut.config.should_ignore_for_mutation(p): # type: ignore + if Config.get().should_ignore_for_mutation(p): continue data = SourceFileMutationData(path=p) data.load() @@ -46,19 +47,25 @@ def read_json_file(path: Path): def write_json_file(path: Path, data: Any): with open(path, "w") as file: json.dump(data, file, indent=2) + file.write("\n") # ensure newline at end of file for POSIX compliance + + +REPO_ROOT = Path(__file__).resolve().parent.parent.parent +E2E_PROJECTS = REPO_ROOT / "e2e_projects" def run_mutmut_on_project(project: str) -> dict: """Runs mutmut on this project and verifies that the results stay the same for all mutations.""" mutmut._reset_globals() - project_path = Path("..").parent / "e2e_projects" / project + project_path = E2E_PROJECTS / project mutants_path = project_path / "mutants" shutil.rmtree(mutants_path, ignore_errors=True) # mutmut run with change_cwd(project_path): + mutmut._reset_globals() _run([], None) return read_all_stats_for_project(project_path) diff --git a/tests/e2e/test_e2e_my_lib.py b/tests/e2e/test_e2e_my_lib.py index c2aac7e1..8875ee81 100644 --- a/tests/e2e/test_e2e_my_lib.py +++ b/tests/e2e/test_e2e_my_lib.py @@ -64,6 +64,23 @@ def test_my_lib_result_snapshot(): "my_lib.xǁPointǁto_origin__mutmut_3": 0, "my_lib.xǁPointǁto_origin__mutmut_4": 0, "my_lib.xǁPointǁ__len____mutmut_1": 33, + "my_lib.xǁPointǁfrom_coords__mutmut_1": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_2": 0, + "my_lib.xǁPointǁfrom_coords__mutmut_3": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_4": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_5": 1, + "my_lib.xǁPointǁfrom_coords__mutmut_6": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_1": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_2": 1, + "my_lib.xǁPointǁpragma_on_staticmethod_decorator__mutmut_3": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_1": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_2": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_3": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_4": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_5": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_6": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_7": 1, + "my_lib.xǁPointǁpragma_on_classmethod_decorator__mutmut_8": 1, "my_lib.x_escape_sequences__mutmut_1": 1, "my_lib.x_escape_sequences__mutmut_2": 0, "my_lib.x_escape_sequences__mutmut_3": 1, @@ -79,6 +96,14 @@ def test_my_lib_result_snapshot(): "my_lib.x_func_with_star__mutmut_2": 1, "my_lib.x_func_with_star__mutmut_3": 1, "my_lib.x_func_with_arbitrary_args__mutmut_1": 1, + "my_lib.xǁColorǁasync_get__mutmut_1": 1, + "my_lib.xǁColorǁasync_get__mutmut_2": 0, + "my_lib.xǁColorǁasync_get_all__mutmut_1": 1, + "my_lib.xǁColorǁasync_get_all__mutmut_2": 0, + "my_lib.xǁColorǁis_primary__mutmut_1": 1, + "my_lib.xǁColorǁdarken__mutmut_1": 0, + "my_lib.xǁColorǁdarken__mutmut_2": 1, + "my_lib.xǁColorǁfrom_name__mutmut_1": 1, } } ) diff --git a/tests/mutation/test_enum_handling.py b/tests/mutation/test_enum_handling.py new file mode 100644 index 00000000..3b30b77e --- /dev/null +++ b/tests/mutation/test_enum_handling.py @@ -0,0 +1,132 @@ +"""Tests for enum class detection and handling.""" + +import libcst as cst + +from mutmut.mutation.enum_mutation import ENUM_BASE_CLASSES +from mutmut.mutation.enum_mutation import is_enum_class +from mutmut.mutation.mutators import MethodType +from mutmut.mutation.mutators import get_method_type + + +class TestEnumBaseClasses: + """Tests for ENUM_BASE_CLASSES constant.""" + + def test_contains_standard_enum_types(self): + assert "Enum" in ENUM_BASE_CLASSES + assert "IntEnum" in ENUM_BASE_CLASSES + assert "Flag" in ENUM_BASE_CLASSES + assert "IntFlag" in ENUM_BASE_CLASSES + assert "StrEnum" in ENUM_BASE_CLASSES + + def test_is_frozenset(self): + assert isinstance(ENUM_BASE_CLASSES, frozenset) + + +class TestIsEnumClass: + """Tests for is_enum_class function.""" + + def test_simple_enum(self): + code = "class Color(Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_int_enum(self): + code = "class Priority(IntEnum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_flag_enum(self): + code = "class Permission(Flag): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_attribute_access_enum(self): + code = "class Status(enum.Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + def test_regular_class(self): + code = "class MyClass: pass" + module = cst.parse_module(code) + cls = module.body[0] + assert not is_enum_class(cls) + + def test_class_with_other_base(self): + code = "class MyClass(SomeBase): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert not is_enum_class(cls) + + def test_class_with_multiple_bases(self): + code = "class MyClass(Base1, Enum): pass" + module = cst.parse_module(code) + cls = module.body[0] + assert is_enum_class(cls) + + +class TestGetMethodType: + """Tests for get_method_type function.""" + + def test_instance_method(self): + code = """ +class Foo: + def method(self): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.INSTANCE + + def test_staticmethod(self): + code = """ +class Foo: + @staticmethod + def method(): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.STATICMETHOD + + def test_classmethod(self): + code = """ +class Foo: + @classmethod + def method(cls): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) == MethodType.CLASSMETHOD + + def test_other_single_decorator(self): + code = """ +class Foo: + @property + def method(self): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) is None + + def test_multiple_decorators(self): + code = """ +class Foo: + @staticmethod + @some_decorator + def method(): + pass +""" + module = cst.parse_module(code) + cls = module.body[0] + method = cls.body.body[0] + assert get_method_type(method) is None diff --git a/tests/test_mutation.py b/tests/mutation/test_mutation.py similarity index 71% rename from tests/test_mutation.py rename to tests/mutation/test_mutation.py index 7c9dd4d9..509cc9bf 100644 --- a/tests/test_mutation.py +++ b/tests/mutation/test_mutation.py @@ -5,20 +5,19 @@ import libcst as cst import pytest -import mutmut -from mutmut.__main__ import CLASS_NAME_SEPARATOR from mutmut.__main__ import CatchOutput from mutmut.__main__ import MutmutProgrammaticFailException from mutmut.__main__ import get_diff_for_mutant from mutmut.__main__ import orig_function_and_class_names_from_key from mutmut.__main__ import run_forced_fail_test -from mutmut.file_mutation import create_mutations -from mutmut.file_mutation import mutate_file_contents -from mutmut.trampoline_templates import mangle_function_name +from mutmut.mutation.file_mutation import create_mutations +from mutmut.mutation.file_mutation import mutate_file_contents +from mutmut.mutation.trampoline_templates import CLASS_NAME_SEPARATOR +from mutmut.mutation.trampoline_templates import mangle_function_name def mutants_for_source(source: str, covered_lines: set[int] | None = None) -> list[str]: - module, mutated_nodes = create_mutations(source, covered_lines) + module, mutated_nodes, _, _ = create_mutations(source, covered_lines) mutants: list[str] = [module.deep_replace(m.original_node, m.mutated_node).code for m in mutated_nodes] # type: ignore return mutants @@ -432,6 +431,280 @@ def foo(): # pragma: no mutate assert mutants +def test_pragma_no_mutate_class(): + """Test that pragma: no mutate class skips entire class from mutation.""" + source = """ +class Foo: # pragma: no mutate class + def method(self): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should not have any mutant versions or trampoline attributes + assert "xǁFooǁmethod__mutmut" not in mutated_code + # Original class should be preserved unchanged + assert "def method(self):" in mutated_code + assert "return 1 + 1" in mutated_code + + +def test_pragma_no_mutate_class_with_colon(): + """Test that pragma: no mutate: class also works (alternative syntax).""" + source = """ +class Bar: # pragma: no mutate: class + def method(self): + return 2 + 2 +""".strip() + mutated_code = mutated_module(source) + assert "xǁBarǁmethod__mutmut" not in mutated_code + assert "def method(self):" in mutated_code + + +def test_pragma_no_mutate_class_does_not_affect_other_classes(): + """Test that pragma: no mutate class only affects the annotated class.""" + source = """ +class Skipped: # pragma: no mutate class + def method(self): + return 1 + +class Mutated: + def method(self): + return 1 +""".strip() + mutated_code = mutated_module(source) + # Skipped class should not have mutants + assert "xǁSkippedǁmethod__mutmut" not in mutated_code + # Mutated class should have mutants + assert "xǁMutatedǁmethod__mutmut_orig" in mutated_code + + +def test_pragma_no_mutate_vs_no_mutate_class(): + """Test that regular pragma: no mutate does NOT skip entire class (only that line).""" + source = """ +class Foo: # pragma: no mutate + def method(self): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Regular pragma should NOT skip the class - methods should still be mutated + assert "xǁFooǁmethod__mutmut" in mutated_code + + +def test_pragma_no_mutate_class_for_enum(): + """Test the enum use case - pragma prevents trampoline attribute injection.""" + source = """ +from enum import Enum + +class Color(Enum): # pragma: no mutate class + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + mutated_code = mutated_module(source) + # No mutant attributes should be added to the enum class + assert "__mutmut_mutants" not in mutated_code + assert "xǁColorǁdescribe__mutmut" not in mutated_code + # Original enum should be preserved + assert "class Color(Enum):" in mutated_code + assert "RED = 1" in mutated_code + + +def test_pragma_no_mutate_function(): + """Test that pragma: no mutate function skips entire function from mutation.""" + source = """ +def foo(): # pragma: no mutate function + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should not have any mutant versions or trampoline + assert "x_foo__mutmut" not in mutated_code + assert "__mutmut_mutants" not in mutated_code + # Original function should be preserved + assert "def foo():" in mutated_code + assert "return 1 + 1" in mutated_code + + +def test_pragma_no_mutate_function_with_colon(): + """Test that pragma: no mutate: function also works (alternative syntax).""" + source = """ +def bar(): # pragma: no mutate: function + return 2 + 2 +""".strip() + mutated_code = mutated_module(source) + assert "x_bar__mutmut" not in mutated_code + assert "def bar():" in mutated_code + + +def test_pragma_no_mutate_function_does_not_affect_other_functions(): + """Test that pragma: no mutate function only affects the annotated function.""" + source = """ +def skipped(): # pragma: no mutate function + return 1 + +def mutated(): + return 1 +""".strip() + mutated_code = mutated_module(source) + # Skipped function should not have mutants + assert "x_skipped__mutmut" not in mutated_code + # Mutated function should have mutants + assert "x_mutated__mutmut_orig" in mutated_code + + +def test_pragma_no_mutate_vs_no_mutate_function(): + """Test that regular pragma: no mutate does NOT skip entire function.""" + source = """ +def foo(): # pragma: no mutate + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Regular pragma should NOT skip the function - it should still be mutated + assert "x_foo__mutmut" in mutated_code + + +def test_enum_mutation_uses_external_injection(): + """Test that enum classes use external injection pattern to avoid metaclass conflicts.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + mutated_code = mutated_module(source) + # Should NOT have mutant attributes injected INTO the class body (breaks enums) + # The mutant dict should be OUTSIDE the class (before the class definition) + assert "_Color_describe_mutants" in mutated_code + # External trampoline function should exist + assert "_Color_describe_trampoline" in mutated_code + # The method inside the class should be a simple assignment + assert "describe = _Color_describe_trampoline" in mutated_code + # Ensure no ClassVar inside the class (which would break enum) + # Split to get just the class body + class_start = mutated_code.find("class Color(Enum):") + assert class_start > mutated_code.find("_Color_describe_mutants") # mutants dict is BEFORE class + + +def test_enum_mutation_with_staticmethod(): + """Test that @staticmethod in enum classes works correctly.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + + @staticmethod + def helper(): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should have external trampoline + assert "_Color_helper_trampoline" in mutated_code + # Assignment should use staticmethod wrapper + assert "helper = staticmethod(_Color_helper_trampoline)" in mutated_code + + +def test_enum_mutation_with_classmethod(): + """Test that @classmethod in enum classes works correctly.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + + @classmethod + def from_string(cls): + return 1 + 1 +""".strip() + mutated_code = mutated_module(source) + # Should have external trampoline + assert "_Color_from_string_trampoline" in mutated_code + # Assignment should use classmethod wrapper + assert "from_string = classmethod(_Color_from_string_trampoline)" in mutated_code + + +def test_enum_mutation_preserves_enum_members(): + """Test that enum members are preserved when methods are mutated.""" + source = """ +from enum import Enum + +class Status(Enum): + PENDING = 'pending' + ACTIVE = 'active' + DONE = 'done' + + def is_active(self): + return self == Status.ACTIVE +""".strip() + mutated_code = mutated_module(source) + # Enum members should be unchanged + assert "PENDING = 'pending'" in mutated_code + assert "ACTIVE = 'active'" in mutated_code + assert "DONE = 'done'" in mutated_code + # But method should be mutated externally + assert "_Status_is_active_trampoline" in mutated_code + + +def test_regular_class_staticmethod_mutation(): + """Test that @staticmethod in regular classes is now mutated using external injection.""" + source = """ +class Calculator: + @staticmethod + def add(a, b): + return a + b +""".strip() + mutated_code = mutated_module(source) + # Should use external injection pattern + assert "_Calculator_add_trampoline" in mutated_code + assert "_Calculator_add_orig" in mutated_code + # Assignment should use staticmethod wrapper + assert "add = staticmethod(_Calculator_add_trampoline)" in mutated_code + + +def test_regular_class_classmethod_mutation(): + """Test that @classmethod in regular classes is now mutated using external injection.""" + source = """ +class Factory: + @classmethod + def create(cls, value): + return value + 1 +""".strip() + mutated_code = mutated_module(source) + # Should use external injection pattern + assert "_Factory_create_trampoline" in mutated_code + assert "_Factory_create_orig" in mutated_code + # Assignment should use classmethod wrapper + assert "create = classmethod(_Factory_create_trampoline)" in mutated_code + + +def test_regular_class_mixed_methods(): + """Test that regular classes correctly handle mix of instance, static, and class methods.""" + source = """ +class MyClass: + def instance_method(self): + return 1 + 1 + + @staticmethod + def static_method(): + return 2 + 2 + + @classmethod + def class_method(cls): + return 3 + 3 +""".strip() + mutated_code = mutated_module(source) + # Instance method uses internal trampoline (inside class) + assert "xǁMyClassǁinstance_method__mutmut_orig" in mutated_code + # Static and class methods use external injection + assert "_MyClass_static_method_trampoline" in mutated_code + assert "_MyClass_class_method_trampoline" in mutated_code + assert "static_method = staticmethod(_MyClass_static_method_trampoline)" in mutated_code + assert "class_method = classmethod(_MyClass_class_method_trampoline)" in mutated_code + + def test_mutate_only_covered_lines_none(): source = """def foo():\n return 1+1\n""".strip() mutants = mutants_for_source(source, covered_lines=set()) @@ -644,7 +917,6 @@ def foo(): @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_failing_test(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(return_value=1) run_forced_fail_test(runner) @@ -663,7 +935,6 @@ def test_run_forced_fail_test_with_failing_test(_start, _stop, _dump_output, cap @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_mutmut_programmatic_fail_exception(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(side_effect=MutmutProgrammaticFailException()) run_forced_fail_test(runner) @@ -678,7 +949,6 @@ def test_run_forced_fail_test_with_mutmut_programmatic_fail_exception(_start, _s @patch.object(CatchOutput, "stop") @patch.object(CatchOutput, "start") def test_run_forced_fail_test_with_all_tests_passing(_start, _stop, _dump_output, capfd): - mutmut._reset_globals() runner = _mocked_runner_run_forced_failed(return_value=0) with pytest.raises(SystemExit) as error: diff --git a/tests/mutation/test_mutation_runtime.py b/tests/mutation/test_mutation_runtime.py new file mode 100644 index 00000000..d015659d --- /dev/null +++ b/tests/mutation/test_mutation_runtime.py @@ -0,0 +1,80 @@ +"""Runtime integration tests that use exec() to verify mutated code works at runtime. + +These tests sit between unit tests and E2E tests: they generate mutated code +via mutate_file_contents and then exec() it to verify runtime behavior. +""" + +import os + +from mutmut.mutation.file_mutation import mutate_file_contents + + +def test_enum_mutation_runtime_execution(): + """Test that mutated enum code can actually be executed and mutants activated.""" + source = """ +from enum import Enum + +class Color(Enum): + RED = 1 + GREEN = 2 + + def describe(self): + return self.name.lower() +""".strip() + + mutated_code, mutant_names = mutate_file_contents("test.py", source) + assert len(mutant_names) > 0, "Should have at least one mutant" + + old_env = os.environ.get("MUTANT_UNDER_TEST") + try: + os.environ["MUTANT_UNDER_TEST"] = "none" + namespace = {"__name__": "test_module"} + exec(mutated_code, namespace) + Color = namespace["Color"] + + assert Color.RED.value == 1 + assert Color.GREEN.value == 2 + + assert Color.RED.describe() == "red" + + mutant_name = "test_module." + mutant_names[0] + os.environ["MUTANT_UNDER_TEST"] = mutant_name + + assert Color.RED.describe() == "RED" + finally: + if old_env is not None: + os.environ["MUTANT_UNDER_TEST"] = old_env + elif "MUTANT_UNDER_TEST" in os.environ: + del os.environ["MUTANT_UNDER_TEST"] + + +def test_regular_class_staticmethod_runtime(): + """Test that staticmethod mutation in regular classes works at runtime.""" + source = """ +class Calculator: + @staticmethod + def add(a, b): + return a + b +""".strip() + + mutated_code, mutant_names = mutate_file_contents("test.py", source) + assert len(mutant_names) > 0, "Should have at least one mutant" + + old_env = os.environ.get("MUTANT_UNDER_TEST") + try: + os.environ["MUTANT_UNDER_TEST"] = "none" + namespace = {"__name__": "test_module"} + exec(mutated_code, namespace) + Calculator = namespace["Calculator"] + + assert Calculator.add(2, 3) == 5 + + mutant_name = "test_module." + mutant_names[0] + os.environ["MUTANT_UNDER_TEST"] = mutant_name + + assert Calculator.add(5, 3) == 2 + finally: + if old_env is not None: + os.environ["MUTANT_UNDER_TEST"] = old_env + elif "MUTANT_UNDER_TEST" in os.environ: + del os.environ["MUTANT_UNDER_TEST"] diff --git a/tests/mutation/test_pragma_handling.py b/tests/mutation/test_pragma_handling.py new file mode 100644 index 00000000..1df6c21e --- /dev/null +++ b/tests/mutation/test_pragma_handling.py @@ -0,0 +1,97 @@ +"""Tests for pragma comment parsing.""" + +from mutmut.mutation.pragma_handling import parse_pragma_lines + + +class TestParsePragmaLines: + """Tests for parse_pragma_lines function.""" + + def test_no_pragmas(self): + source = """def foo(): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == set() + + def test_simple_no_mutate(self): + source = """def foo(): + return 1 + 1 # pragma: no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {2} + assert class_lines == set() + assert function_lines == set() + + def test_no_mutate_class(self): + source = """class Foo: # pragma: no mutate class + def method(self): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == {1} + assert function_lines == set() + + def test_no_mutate_class_with_colon(self): + source = """class Foo: # pragma: no mutate: class + def method(self): + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == {1} + assert function_lines == set() + + def test_no_mutate_function(self): + source = """def foo(): # pragma: no mutate function + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == {1} + + def test_no_mutate_function_with_colon(self): + source = """def foo(): # pragma: no mutate: function + return 1 + 1 +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == {1} + + def test_mixed_pragmas(self): + source = """class Skipped: # pragma: no mutate class + def method(self): + return 1 + 1 + +def skipped_func(): # pragma: no mutate function + return 2 + 2 + +def mutated(): + return 3 + 3 # pragma: no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {9} + assert class_lines == {1} + assert function_lines == {5} + + def test_pragma_no_cover_with_no_mutate(self): + source = """def foo(): + return 1 + 1 # pragma: no cover, no mutate +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == {2} + assert class_lines == set() + assert function_lines == set() + + def test_other_pragma_ignored(self): + source = """def foo(): + return 1 + 1 # pragma: no cover +""" + no_mutate, class_lines, function_lines = parse_pragma_lines(source) + assert no_mutate == set() + assert class_lines == set() + assert function_lines == set() diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 00000000..041798b4 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,347 @@ +from pathlib import Path + +import pytest + +from mutmut.configuration import Config +from mutmut.configuration import _config_reader +from mutmut.configuration import _guess_paths_to_mutate +from mutmut.configuration import _load_config + + +@pytest.fixture(autouse=True) +def reset_config(): + """Reset config singleton before and after each test.""" + Config.reset() + yield + Config.reset() + + +@pytest.fixture +def in_tmp_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + """Change to a temporary directory for the duration of the test.""" + monkeypatch.chdir(tmp_path) + return tmp_path + + +class TestConfigSingleton: + def test_get_loads_config(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config = Config.get() + assert config is not None + assert isinstance(config, Config) + + def test_get_returns_same_instance(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config1 = Config.get() + config2 = Config.get() + assert config1 is config2 + + def test_reset_clears_singleton(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + config1 = Config.get() + Config.reset() + config2 = Config.get() + assert config1 is not config2 + + def test_ensure_loaded_is_idempotent(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + Config.ensure_loaded() + config1 = Config.get() + Config.ensure_loaded() + config2 = Config.get() + assert config1 is config2 + + +class TestShouldIgnoreForMutation: + def test_ignores_non_python_files(self): + config = Config( + also_copy=[], + do_not_mutate=[], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.txt") is True + assert config.should_ignore_for_mutation("foo.js") is True + assert config.should_ignore_for_mutation("foo") is True + + def test_does_not_ignore_python_files(self): + config = Config( + also_copy=[], + do_not_mutate=[], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.py") is False + assert config.should_ignore_for_mutation("src/foo.py") is False + + def test_respects_do_not_mutate_exact_match(self): + config = Config( + also_copy=[], + do_not_mutate=["foo.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("foo.py") is True + assert config.should_ignore_for_mutation("bar.py") is False + + def test_respects_do_not_mutate_glob_pattern(self): + config = Config( + also_copy=[], + do_not_mutate=["**/test_*.py", "src/ignore_*.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation("tests/test_foo.py") is True + assert config.should_ignore_for_mutation("src/ignore_me.py") is True + assert config.should_ignore_for_mutation("src/keep_me.py") is False + + def test_accepts_path_objects(self): + config = Config( + also_copy=[], + do_not_mutate=["foo.py"], + max_stack_depth=-1, + debug=False, + paths_to_mutate=[], + pytest_add_cli_args=[], + pytest_add_cli_args_test_selection=[], + tests_dir=[], + mutate_only_covered_lines=False, + type_check_command=[], + use_setproctitle=False, + ) + assert config.should_ignore_for_mutation(Path("foo.py")) is True + assert config.should_ignore_for_mutation(Path("bar.py")) is False + + +class TestConfigReaderPyprojectToml: + def test_reads_from_pyproject_toml(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +max_stack_depth = 10 +paths_to_mutate = ["src", "lib"] +do_not_mutate = ["**/migrations/*"] +""") + reader = _config_reader() + assert reader("debug", False) is True + assert reader("max_stack_depth", -1) == 10 + assert reader("paths_to_mutate", []) == ["src", "lib"] + assert reader("do_not_mutate", []) == ["**/migrations/*"] + + def test_returns_default_for_missing_key(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +""") + reader = _config_reader() + assert reader("nonexistent", "default_value") == "default_value" + assert reader("max_stack_depth", -1) == -1 + + def test_handles_missing_mutmut_section(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.other] +foo = "bar" +""") + # Should fall through to setup.cfg reader + reader = _config_reader() + assert reader("debug", False) is False + + +class TestConfigReaderSetupCfg: + def test_reads_from_setup_cfg(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +max_stack_depth = 5 +paths_to_mutate = src +""") + reader = _config_reader() + assert reader("debug", False) is True + assert reader("max_stack_depth", -1) == 5 + assert reader("paths_to_mutate", []) == ["src"] + + def test_parses_multiline_list(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +do_not_mutate = + **/migrations/* + **/test_*.py + src/generated.py +""") + reader = _config_reader() + assert reader("do_not_mutate", []) == [ + "**/migrations/*", + "**/test_*.py", + "src/generated.py", + ] + + def test_parses_bool_values(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +""") + reader = _config_reader() + assert reader("debug", False) is True + + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = 1 +""") + reader = _config_reader() + assert reader("debug", False) is True + + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = false +""") + reader = _config_reader() + assert reader("debug", False) is False + + def test_parses_int_values(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +max_stack_depth = 42 +""") + reader = _config_reader() + assert reader("max_stack_depth", -1) == 42 + + def test_returns_default_for_missing_section(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[other] +foo = bar +""") + reader = _config_reader() + assert reader("debug", False) is False + + def test_returns_default_for_missing_key(self, in_tmp_dir: Path): + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = true +""") + reader = _config_reader() + assert reader("nonexistent", "default") == "default" + + +class TestConfigReaderPriority: + def test_pyproject_toml_takes_precedence(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +""") + (in_tmp_dir / "setup.cfg").write_text(""" +[mutmut] +debug = false +""") + reader = _config_reader() + assert reader("debug", False) is True + + +class TestGuessPathsToMutate: + def test_guesses_lib_directory(self, in_tmp_dir: Path): + (in_tmp_dir / "lib").mkdir() + assert _guess_paths_to_mutate() == ["lib"] + + def test_guesses_src_directory(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + assert _guess_paths_to_mutate() == ["src"] + + def test_prefers_lib_over_src(self, in_tmp_dir: Path): + (in_tmp_dir / "lib").mkdir() + (in_tmp_dir / "src").mkdir() + assert _guess_paths_to_mutate() == ["lib"] + + def test_guesses_directory_matching_cwd_name(self, in_tmp_dir: Path): + # tmp_path has a random name, create a subdir matching it + dir_name = in_tmp_dir.name + (in_tmp_dir / dir_name).mkdir() + assert _guess_paths_to_mutate() == [dir_name] + + def test_guesses_py_file_matching_cwd_name(self, in_tmp_dir: Path): + dir_name = in_tmp_dir.name + (in_tmp_dir / f"{dir_name}.py").touch() + assert _guess_paths_to_mutate() == [f"{dir_name}.py"] + + def test_raises_when_cannot_guess(self, in_tmp_dir: Path): + with pytest.raises(FileNotFoundError, match="Could not figure out"): + _guess_paths_to_mutate() + + +class TestLoadConfig: + def test_loads_all_config_values(self, in_tmp_dir: Path): + (in_tmp_dir / "pyproject.toml").write_text(""" +[tool.mutmut] +debug = true +max_stack_depth = 10 +paths_to_mutate = ["src"] +do_not_mutate = ["**/test_*.py"] +tests_dir = ["tests/unit"] +pytest_add_cli_args = ["-x", "--tb=short"] +pytest_add_cli_args_test_selection = ["--no-header"] +also_copy = ["fixtures"] +mutate_only_covered_lines = true +type_check_command = ["mypy", "--strict"] +""") + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert config.debug is True + assert config.max_stack_depth == 10 + assert config.paths_to_mutate == [Path("src")] + assert config.do_not_mutate == ["**/test_*.py"] + assert config.tests_dir == ["tests/unit"] + assert config.pytest_add_cli_args == ["-x", "--tb=short"] + assert config.pytest_add_cli_args_test_selection == ["--no-header"] + assert Path("fixtures") in config.also_copy + assert config.mutate_only_covered_lines is True + assert config.type_check_command == ["mypy", "--strict"] + + def test_uses_defaults_when_no_config(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert config.debug is False + assert config.max_stack_depth == -1 + assert config.paths_to_mutate == [Path("src")] + assert config.do_not_mutate == [] + assert config.mutate_only_covered_lines is False + assert config.type_check_command == [] + + def test_also_copy_includes_defaults(self, in_tmp_dir: Path): + (in_tmp_dir / "src").mkdir() + + config = _load_config() + + assert Path("tests/") in config.also_copy + assert Path("test/") in config.also_copy + assert Path("setup.cfg") in config.also_copy + assert Path("pyproject.toml") in config.also_copy diff --git a/tests/test_generation_error_handling.py b/tests/test_generation_error_handling.py index 2c601f31..f556e392 100644 --- a/tests/test_generation_error_handling.py +++ b/tests/test_generation_error_handling.py @@ -6,19 +6,14 @@ import mutmut.__main__ from mutmut.__main__ import InvalidGeneratedSyntaxException from mutmut.__main__ import create_mutants +from mutmut.configuration import Config source_dir = Path(__file__).parent / "data" / "test_generation" source_dir = source_dir.relative_to(Path.cwd()) -class MockConfig: - def should_ignore_for_mutation(self, path: Path) -> bool: - return False - - def test_mutant_generation_raises_exception_on_invalid_syntax(monkeypatch): mutmut._reset_globals() - mutmut.config = MockConfig() shutil.rmtree("mutants", ignore_errors=True) @@ -30,7 +25,7 @@ def test_mutant_generation_raises_exception_on_invalid_syntax(monkeypatch): source_dir / "invalid_syntax.py", ] monkeypatch.setattr(mutmut.__main__, "walk_source_files", lambda: source_files) - monkeypatch.setattr("mutmut.config.should_ignore_for_mutation", lambda _path: False) + monkeypatch.setattr(Config.get(), "should_ignore_for_mutation", lambda _path: False) # should raise an exception, because we copy the invalid_syntax.py file and then verify # if it is valid syntax diff --git a/tests/test_mutation regression.py b/tests/test_mutation regression.py index 6d567025..d4de6c74 100644 --- a/tests/test_mutation regression.py +++ b/tests/test_mutation regression.py @@ -1,8 +1,8 @@ import libcst as cst from inline_snapshot import snapshot -from mutmut.file_mutation import create_trampoline_wrapper -from mutmut.file_mutation import mutate_file_contents +from mutmut.mutation.file_mutation import create_trampoline_wrapper +from mutmut.mutation.file_mutation import mutate_file_contents def _get_trampoline_wrapper(source: str, mangled_name: str, class_name: str | None = None) -> str: @@ -90,37 +90,53 @@ def add(self, value): import lib lib.foo() -from typing import Annotated -from typing import Callable -from typing import ClassVar - -MutantDict = Annotated[dict[str, Callable], "Mutant"] # type: ignore - - -def _mutmut_trampoline(orig, mutants, call_args, call_kwargs, self_arg = None): # type: ignore - """Forward call to original or mutated function, depending on the environment""" - import os # type: ignore - mutant_under_test = os.environ['MUTANT_UNDER_TEST'] # type: ignore - if mutant_under_test == 'fail': # type: ignore - from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore - raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore - elif mutant_under_test == 'stats': # type: ignore - from mutmut.__main__ import record_trampoline_hit # type: ignore - record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore - # (for class methods, orig is bound and thus does not need the explicit self argument) - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore - if not mutant_under_test.startswith(prefix): # type: ignore - result = orig(*call_args, **call_kwargs) # type: ignore - return result # type: ignore - mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore - if self_arg is not None: # type: ignore +from collections.abc import Sequence # type: ignore # mutmut generated +from typing import Annotated # type: ignore # mutmut generated +from typing import Callable # type: ignore # mutmut generated +from typing import ClassVar # type: ignore # mutmut generated +from typing import TypeVar # type: ignore # mutmut generated + +TReturn = TypeVar('TReturn') # type: ignore # mutmut generated +MutantDict = Annotated[dict[str, Callable[..., TReturn]], "Mutant"] # type: ignore # mutmut generated + + +def _mutmut_trampoline(orig: Callable[..., TReturn], mutants: MutantDict, call_args: Sequence, call_kwargs: dict, self_arg = None) -> TReturn: # type: ignore # mutmut generated + """Forward call to original or mutated function, depending on the environment""" # type: ignore # mutmut generated + import os # type: ignore # mutmut generated + mutant_under_test = os.environ.get('MUTANT_UNDER_TEST', '') # type: ignore # mutmut generated + if not mutant_under_test: # type: ignore # mutmut generated + # No mutant being tested - call original function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore # mutmut generated + return orig(self_arg, *call_args, **call_kwargs) # type: ignore # mutmut generated + else: # type: ignore # mutmut generated + return orig(*call_args, **call_kwargs) # type: ignore # mutmut generated + if mutant_under_test == 'fail': # type: ignore # mutmut generated + from mutmut.__main__ import MutmutProgrammaticFailException # type: ignore # mutmut generated + raise MutmutProgrammaticFailException('Failed programmatically') # type: ignore # mutmut generated + elif mutant_under_test == 'stats': # type: ignore # mutmut generated + from mutmut.__main__ import record_trampoline_hit # type: ignore # mutmut generated + record_trampoline_hit(orig.__module__ + '.' + orig.__name__) # type: ignore # mutmut generated + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore # mutmut generated + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore # mutmut generated + else: # type: ignore # mutmut generated + result = orig(*call_args, **call_kwargs) # type: ignore # mutmut generated + return result # type: ignore # mutmut generated + prefix = orig.__module__ + '.' + orig.__name__ + '__mutmut_' # type: ignore # mutmut generated + if not mutant_under_test.startswith(prefix): # type: ignore # mutmut generated + # Check if orig is a bound method (has __self__) or plain function + if self_arg is not None and not hasattr(orig, '__self__'): # type: ignore # mutmut generated + result = orig(self_arg, *call_args, **call_kwargs) # type: ignore # mutmut generated + else: # type: ignore # mutmut generated + result = orig(*call_args, **call_kwargs) # type: ignore # mutmut generated + return result # type: ignore # mutmut generated + mutant_name = mutant_under_test.rpartition('.')[-1] # type: ignore # mutmut generated + if self_arg is not None: # type: ignore # mutmut generated # call to a class method where self is not bound - result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore - else: - result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore - return result # type: ignore + result = mutants[mutant_name](self_arg, *call_args, **call_kwargs) # type: ignore # mutmut generated + else: # type: ignore # mutmut generated + result = mutants[mutant_name](*call_args, **call_kwargs) # type: ignore # mutmut generated + return result # type: ignore # mutmut generated def foo(a: list[int], b): args = [a, b]# type: ignore @@ -136,11 +152,12 @@ def x_foo__mutmut_1(a: list[int], b): def x_foo__mutmut_2(a: list[int], b): return a[0] >= b -x_foo__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore -'x_foo__mutmut_1': x_foo__mutmut_1, \n\ - 'x_foo__mutmut_2': x_foo__mutmut_2 -} -x_foo__mutmut_orig.__name__ = 'x_foo' +x_foo__mutmut_mutants : MutantDict = { # type: ignore # mutmut generated + 'x_foo__mutmut_1': x_foo__mutmut_1, # type: ignore # mutmut generated + 'x_foo__mutmut_2': x_foo__mutmut_2 # type: ignore # mutmut generated +} # type: ignore # mutmut generated + +x_foo__mutmut_orig.__name__ = 'x_foo' # type: ignore # mutmut generated def bar(): args = []# type: ignore @@ -153,10 +170,11 @@ def x_bar__mutmut_orig(): def x_bar__mutmut_1(): yield 2 -x_bar__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore -'x_bar__mutmut_1': x_bar__mutmut_1 -} -x_bar__mutmut_orig.__name__ = 'x_bar' +x_bar__mutmut_mutants : MutantDict = { # type: ignore # mutmut generated + 'x_bar__mutmut_1': x_bar__mutmut_1 # type: ignore # mutmut generated +} # type: ignore # mutmut generated + +x_bar__mutmut_orig.__name__ = 'x_bar' # type: ignore # mutmut generated class Adder: def __init__(self, amount): @@ -168,10 +186,11 @@ def xǁAdderǁ__init____mutmut_orig(self, amount): def xǁAdderǁ__init____mutmut_1(self, amount): self.amount = None \n\ - xǁAdderǁ__init____mutmut_mutants : ClassVar[MutantDict] = { # type: ignore - 'xǁAdderǁ__init____mutmut_1': xǁAdderǁ__init____mutmut_1 - } - xǁAdderǁ__init____mutmut_orig.__name__ = 'xǁAdderǁ__init__' + xǁAdderǁ__init____mutmut_mutants : ClassVar[MutantDict] = { # type: ignore # mutmut generated + 'xǁAdderǁ__init____mutmut_1': xǁAdderǁ__init____mutmut_1 # type: ignore # mutmut generated + } # type: ignore # mutmut generated + \n\ + xǁAdderǁ__init____mutmut_orig.__name__ = 'xǁAdderǁ__init__' # type: ignore # mutmut generated def add(self, value): args = [value]# type: ignore @@ -184,10 +203,11 @@ def xǁAdderǁadd__mutmut_orig(self, value): def xǁAdderǁadd__mutmut_1(self, value): return self.amount - value \n\ - xǁAdderǁadd__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore - 'xǁAdderǁadd__mutmut_1': xǁAdderǁadd__mutmut_1 - } - xǁAdderǁadd__mutmut_orig.__name__ = 'xǁAdderǁadd' + xǁAdderǁadd__mutmut_mutants : ClassVar[MutantDict] = { # type: ignore # mutmut generated + 'xǁAdderǁadd__mutmut_1': xǁAdderǁadd__mutmut_1 # type: ignore # mutmut generated + } # type: ignore # mutmut generated + \n\ + xǁAdderǁadd__mutmut_orig.__name__ = 'xǁAdderǁadd' # type: ignore # mutmut generated print(Adder(1).add(2))\ ''') diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/test_safe_setproctitle.py b/tests/utils/test_safe_setproctitle.py new file mode 100644 index 00000000..7b528c38 --- /dev/null +++ b/tests/utils/test_safe_setproctitle.py @@ -0,0 +1,86 @@ +"""Tests for safe_setproctitle module. + +This test verifies that setproctitle crashes on macOS Python 3.14+ after fork +when CoreFoundation has been loaded (which happens during normal mutmut operation). +When setproctitle is fixed upstream, this test will fail and we can remove the workaround. +""" + +import os +import platform +import signal + +import pytest +from setproctitle import setproctitle + +from mutmut.utils.safe_setproctitle import safe_setproctitle + +# Only run this test on macOS +IS_MACOS = platform.system() == "Darwin" + + +@pytest.mark.skipif(not IS_MACOS, reason="setproctitle only crashes after fork on macOS") +def test_setproctitle_crashes_after_fork_with_corefoundation_loaded(): + """Verify setproctitle segfaults after fork when CoreFoundation is loaded. + + This test exists to detect when setproctitle is fixed upstream. + If this test FAILS, it means setproctitle no longer crashes and we can + remove the workaround in safe_setproctitle.py. + + The crash only happens when CoreFoundation has been loaded before fork. + We trigger this by calling setproctitle once in the parent before forking. + """ + # Import and call setproctitle in the parent first - this loads CoreFoundation + + setproctitle("parent process") + + pid = os.fork() + + if pid == 0: + # Child process - call setproctitle again + try: + setproctitle("child process") + # If we get here, setproctitle didn't crash - exit with success + os._exit(0) + except Exception: + # If it raises a Python exception instead of segfaulting + os._exit(1) + else: + # Parent process - wait for child and check exit status + _, status = os.waitpid(pid, 0) + + if os.WIFSIGNALED(status): + exit_signal = os.WTERMSIG(status) + assert exit_signal == signal.SIGSEGV, ( + f"Expected SIGSEGV (11), got signal {exit_signal}. setproctitle crash behavior may have changed." + ) + else: + exit_code = os.WEXITSTATUS(status) + pytest.fail( + f"setproctitle did NOT crash (exit code {exit_code}). " + "The library may have been fixed! Consider removing the " + "workaround in safe_setproctitle.py" + ) + + +@pytest.mark.skipif(not IS_MACOS, reason="safe_setproctitle workaround only applies to macOS") +def test_safe_setproctitle_does_not_crash_after_fork(): + """Verify our safe_setproctitle wrapper doesn't crash after fork.""" + pid = os.fork() + + if pid == 0: + # Child process - use our safe wrapper + try: + safe_setproctitle("test title") + os._exit(0) # Success + except Exception: + os._exit(1) # Failed with exception + else: + # Parent process + _, status = os.waitpid(pid, 0) + + if os.WIFSIGNALED(status): + exit_signal = os.WTERMSIG(status) + pytest.fail(f"safe_setproctitle crashed with signal {exit_signal}! The workaround is not working.") + else: + exit_code = os.WEXITSTATUS(status) + assert exit_code == 0, f"safe_setproctitle failed with exit code {exit_code}"