From 8e6dc1fbe66cbba4a8cbaf97b77a1f665ef80164 Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Sat, 16 May 2026 10:44:01 -0700 Subject: [PATCH] feat(cli): support multiple analyze paths --- src/mfcqi/cli/commands/analyze.py | 41 ++++++++++++-- src/mfcqi/cli/commands/analyze_helpers.py | 65 +++++++++++++++++++++-- tests/integration/test_cli_integration.py | 48 +++++++++++++++++ 3 files changed, 145 insertions(+), 9 deletions(-) diff --git a/src/mfcqi/cli/commands/analyze.py b/src/mfcqi/cli/commands/analyze.py index 7eb34f7..948ead9 100644 --- a/src/mfcqi/cli/commands/analyze.py +++ b/src/mfcqi/cli/commands/analyze.py @@ -21,8 +21,32 @@ console = Console() +def _parse_analysis_paths(raw_paths: tuple[str, ...]) -> list[Path]: + """Expand comma-separated CLI path arguments and validate each path.""" + paths: list[Path] = [] + for raw_path in raw_paths: + for path_part in raw_path.split(","): + path_text = path_part.strip() + if path_text: + path = Path(path_text) + if not path.exists(): + raise click.BadParameter(f"Path does not exist: {path_text}", param_hint="PATH") + paths.append(path) + + if not paths: + raise click.BadParameter("At least one path is required", param_hint="PATH") + + return paths + + +def _quality_gate_config_root(paths: list[Path]) -> Path: + """Choose a stable location for resolving quality gate config files.""" + first_path = paths[0] + return first_path if first_path.is_dir() else first_path.parent + + @click.command() -@click.argument("path", type=click.Path(exists=True, path_type=Path)) +@click.argument("paths", nargs=-1, type=str, required=True) @click.option( "--model", help="Specific model to use (e.g., claude-3-5-sonnet, gpt-4o, ollama:codellama:7b)" ) @@ -49,7 +73,7 @@ @click.pass_context def analyze( ctx: click.Context, - path: Path, + paths: tuple[str, ...], model: str | None, provider: str | None, skip_llm: bool, @@ -78,11 +102,18 @@ def analyze( llm_handler = LLMHandler(config_manager, ollama_endpoint) calculator = MFCQICalculator() + analysis_paths = _parse_analysis_paths(paths) + analysis_target: Path | list[Path] = ( + analysis_paths[0] if len(analysis_paths) == 1 else analysis_paths + ) + analysis_path_label = ( + str(analysis_paths[0]) if len(analysis_paths) == 1 else ", ".join(map(str, analysis_paths)) + ) # Calculate base metrics try: detailed_metrics, tool_outputs, _elapsed = calculate_metrics( - path, + analysis_target, calculator, need_tool_outputs=not should_skip_llm, silent=silent, @@ -99,7 +130,7 @@ def analyze( if not should_skip_llm: try: llm_result = get_llm_recommendations( - str(path), + analysis_path_label, detailed_metrics, tool_outputs, llm_handler, @@ -133,7 +164,7 @@ def analyze( ) # Find quality gate config - config_path = find_quality_gate_config(path) + config_path = find_quality_gate_config(_quality_gate_config_root(analysis_paths)) if config_path: gate_config = QualityGateConfig.from_file(config_path) else: diff --git a/src/mfcqi/cli/commands/analyze_helpers.py b/src/mfcqi/cli/commands/analyze_helpers.py index 9ef07e8..e37b693 100644 --- a/src/mfcqi/cli/commands/analyze_helpers.py +++ b/src/mfcqi/cli/commands/analyze_helpers.py @@ -1,6 +1,7 @@ """Helper functions for the analyze command.""" import json +from collections.abc import Sequence from pathlib import Path from typing import Any @@ -20,7 +21,7 @@ def calculate_metrics( - path: Path, + path: Path | Sequence[Path], calculator: MFCQICalculator, need_tool_outputs: bool, silent: bool, @@ -42,18 +43,24 @@ def calculate_metrics( task = progress.add_task("🔍 Analyzing codebase...", total=None) start_time = time.time() - if need_tool_outputs: + paths = [path] if isinstance(path, Path) else list(path) + + if len(paths) > 1: + detailed_metrics, tool_outputs = _calculate_metrics_for_multiple_paths( + paths, calculator, need_tool_outputs + ) + elif need_tool_outputs: progress.update( task, description="📊 Calculating metrics...", ) - detailed_data = calculator.get_detailed_metrics_with_tool_outputs(path) + detailed_data = calculator.get_detailed_metrics_with_tool_outputs(paths[0]) detailed_metrics = detailed_data.get("metrics", {}) detailed_metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0) tool_outputs = detailed_data.get("tool_outputs", {}) else: progress.update(task, description="📊 Calculating metrics...") - detailed_metrics = calculator.get_detailed_metrics(path) + detailed_metrics = calculator.get_detailed_metrics(paths[0]) tool_outputs = {} elapsed = time.time() - start_time @@ -68,6 +75,56 @@ def calculate_metrics( return detailed_metrics, tool_outputs, elapsed +def _calculate_metrics_for_multiple_paths( + paths: Sequence[Path], + calculator: MFCQICalculator, + need_tool_outputs: bool, +) -> tuple[dict[str, Any], dict[str, Any]]: + """Calculate metrics for multiple paths and average shared numeric results.""" + metric_sets: list[dict[str, Any]] = [] + merged_tool_outputs: dict[str, Any] = {} + + for current_path in paths: + if need_tool_outputs: + detailed_data = calculator.get_detailed_metrics_with_tool_outputs(current_path) + metrics = detailed_data.get("metrics", {}) + metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0) + _merge_tool_outputs(merged_tool_outputs, detailed_data.get("tool_outputs", {})) + else: + metrics = calculator.get_detailed_metrics(current_path) + metric_sets.append(metrics) + + return _average_metric_sets(metric_sets), merged_tool_outputs + + +def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any]: + """Average numeric metric values across multiple analysis results.""" + averaged: dict[str, Any] = {} + metric_names = {name for metrics in metric_sets for name in metrics} + + for metric_name in metric_names: + values = [ + metrics[metric_name] + for metrics in metric_sets + if isinstance(metrics.get(metric_name), (int, float)) + ] + if values: + averaged[metric_name] = sum(values) / len(values) + + return averaged + + +def _merge_tool_outputs(target: dict[str, Any], source: dict[str, Any]) -> None: + """Merge tool outputs collected from separate path analyses.""" + for key, value in source.items(): + if isinstance(value, list): + target.setdefault(key, []).extend(value) + elif isinstance(value, (int, float)): + target[key] = max(target.get(key, value), value) + else: + target[key] = value + + def get_llm_recommendations( path: str, detailed_metrics: dict[str, Any], diff --git a/tests/integration/test_cli_integration.py b/tests/integration/test_cli_integration.py index f10bf97..6a41bf9 100644 --- a/tests/integration/test_cli_integration.py +++ b/tests/integration/test_cli_integration.py @@ -2,6 +2,7 @@ Integration tests for CLI with different LLM providers. """ +import json import tempfile from pathlib import Path @@ -76,6 +77,53 @@ def test_analyze_json_output(self): assert result.exit_code == 0 # Should produce valid JSON output + def test_analyze_comma_separated_paths_json_output(self): + """Test analyze supports comma-separated path arguments.""" + first_file = self.temp_dir / "first.py" + second_file = self.temp_dir / "second.py" + first_file.write_text("def first():\n return 1\n") + second_file.write_text("def second():\n return 2\n") + + result = self.runner.invoke( + cli, + [ + "analyze", + f"{first_file},{second_file}", + "--skip-llm", + "--format", + "json", + ], + ) + + assert result.exit_code == 0 + output = json.loads(result.output) + assert "mfcqi_score" in output + assert "metrics" in output + + def test_analyze_space_separated_paths_json_output(self): + """Test analyze supports multiple path arguments.""" + first_file = self.temp_dir / "first.py" + second_file = self.temp_dir / "second.py" + first_file.write_text("def first():\n return 1\n") + second_file.write_text("def second():\n return 2\n") + + result = self.runner.invoke( + cli, + [ + "analyze", + str(first_file), + str(second_file), + "--skip-llm", + "--format", + "json", + ], + ) + + assert result.exit_code == 0 + output = json.loads(result.output) + assert "mfcqi_score" in output + assert "metrics" in output + def test_analyze_with_output_file(self): """Test analyze command with output file.""" output_file = self.temp_dir / "report.json"