Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/mfcqi/cli/commands/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,32 @@
console = Console()


def _parse_analysis_paths(raw_paths: tuple[str, ...]) -> list[Path]:
"""Expand comma-separated CLI path arguments and validate each path."""
paths: list[Path] = []
for raw_path in raw_paths:
for path_part in raw_path.split(","):
path_text = path_part.strip()
if path_text:
path = Path(path_text)
if not path.exists():
raise click.BadParameter(f"Path does not exist: {path_text}", param_hint="PATH")
paths.append(path)

if not paths:
raise click.BadParameter("At least one path is required", param_hint="PATH")

return paths


def _quality_gate_config_root(paths: list[Path]) -> Path:
"""Choose a stable location for resolving quality gate config files."""
first_path = paths[0]
return first_path if first_path.is_dir() else first_path.parent


@click.command()
@click.argument("path", type=click.Path(exists=True, path_type=Path))
@click.argument("paths", nargs=-1, type=str, required=True)
@click.option(
"--model", help="Specific model to use (e.g., claude-3-5-sonnet, gpt-4o, ollama:codellama:7b)"
)
Expand All @@ -49,7 +73,7 @@
@click.pass_context
def analyze(
ctx: click.Context,
path: Path,
paths: tuple[str, ...],
model: str | None,
provider: str | None,
skip_llm: bool,
Expand Down Expand Up @@ -78,11 +102,18 @@ def analyze(
llm_handler = LLMHandler(config_manager, ollama_endpoint)

calculator = MFCQICalculator()
analysis_paths = _parse_analysis_paths(paths)
analysis_target: Path | list[Path] = (
analysis_paths[0] if len(analysis_paths) == 1 else analysis_paths
)
analysis_path_label = (
str(analysis_paths[0]) if len(analysis_paths) == 1 else ", ".join(map(str, analysis_paths))
)

# Calculate base metrics
try:
detailed_metrics, tool_outputs, _elapsed = calculate_metrics(
path,
analysis_target,
calculator,
need_tool_outputs=not should_skip_llm,
silent=silent,
Expand All @@ -99,7 +130,7 @@ def analyze(
if not should_skip_llm:
try:
llm_result = get_llm_recommendations(
str(path),
analysis_path_label,
detailed_metrics,
tool_outputs,
llm_handler,
Expand Down Expand Up @@ -133,7 +164,7 @@ def analyze(
)

# Find quality gate config
config_path = find_quality_gate_config(path)
config_path = find_quality_gate_config(_quality_gate_config_root(analysis_paths))
if config_path:
gate_config = QualityGateConfig.from_file(config_path)
else:
Expand Down
65 changes: 61 additions & 4 deletions src/mfcqi/cli/commands/analyze_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Helper functions for the analyze command."""

import json
from collections.abc import Sequence
from pathlib import Path
from typing import Any

Expand All @@ -20,7 +21,7 @@


def calculate_metrics(
path: Path,
path: Path | Sequence[Path],
calculator: MFCQICalculator,
need_tool_outputs: bool,
silent: bool,
Expand All @@ -42,18 +43,24 @@ def calculate_metrics(
task = progress.add_task("🔍 Analyzing codebase...", total=None)
start_time = time.time()

if need_tool_outputs:
paths = [path] if isinstance(path, Path) else list(path)

if len(paths) > 1:
detailed_metrics, tool_outputs = _calculate_metrics_for_multiple_paths(
paths, calculator, need_tool_outputs
)
elif need_tool_outputs:
progress.update(
task,
description="📊 Calculating metrics...",
)
detailed_data = calculator.get_detailed_metrics_with_tool_outputs(path)
detailed_data = calculator.get_detailed_metrics_with_tool_outputs(paths[0])
detailed_metrics = detailed_data.get("metrics", {})
detailed_metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0)
tool_outputs = detailed_data.get("tool_outputs", {})
else:
progress.update(task, description="📊 Calculating metrics...")
detailed_metrics = calculator.get_detailed_metrics(path)
detailed_metrics = calculator.get_detailed_metrics(paths[0])
tool_outputs = {}

elapsed = time.time() - start_time
Expand All @@ -68,6 +75,56 @@ def calculate_metrics(
return detailed_metrics, tool_outputs, elapsed


def _calculate_metrics_for_multiple_paths(
paths: Sequence[Path],
calculator: MFCQICalculator,
need_tool_outputs: bool,
) -> tuple[dict[str, Any], dict[str, Any]]:
"""Calculate metrics for multiple paths and average shared numeric results."""
metric_sets: list[dict[str, Any]] = []
merged_tool_outputs: dict[str, Any] = {}

for current_path in paths:
if need_tool_outputs:
detailed_data = calculator.get_detailed_metrics_with_tool_outputs(current_path)
metrics = detailed_data.get("metrics", {})
metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0)
_merge_tool_outputs(merged_tool_outputs, detailed_data.get("tool_outputs", {}))
else:
metrics = calculator.get_detailed_metrics(current_path)
metric_sets.append(metrics)

return _average_metric_sets(metric_sets), merged_tool_outputs


def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any]:
"""Average numeric metric values across multiple analysis results."""
averaged: dict[str, Any] = {}
metric_names = {name for metrics in metric_sets for name in metrics}

for metric_name in metric_names:
values = [
metrics[metric_name]
for metrics in metric_sets
if isinstance(metrics.get(metric_name), (int, float))
]
if values:
averaged[metric_name] = sum(values) / len(values)

return averaged


def _merge_tool_outputs(target: dict[str, Any], source: dict[str, Any]) -> None:
"""Merge tool outputs collected from separate path analyses."""
for key, value in source.items():
if isinstance(value, list):
target.setdefault(key, []).extend(value)
elif isinstance(value, (int, float)):
target[key] = max(target.get(key, value), value)
else:
target[key] = value


def get_llm_recommendations(
path: str,
detailed_metrics: dict[str, Any],
Expand Down
48 changes: 48 additions & 0 deletions tests/integration/test_cli_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Integration tests for CLI with different LLM providers.
"""

import json
import tempfile
from pathlib import Path

Expand Down Expand Up @@ -76,6 +77,53 @@ def test_analyze_json_output(self):
assert result.exit_code == 0
# Should produce valid JSON output

def test_analyze_comma_separated_paths_json_output(self):
"""Test analyze supports comma-separated path arguments."""
first_file = self.temp_dir / "first.py"
second_file = self.temp_dir / "second.py"
first_file.write_text("def first():\n return 1\n")
second_file.write_text("def second():\n return 2\n")

result = self.runner.invoke(
cli,
[
"analyze",
f"{first_file},{second_file}",
"--skip-llm",
"--format",
"json",
],
)

assert result.exit_code == 0
output = json.loads(result.output)
assert "mfcqi_score" in output
assert "metrics" in output

def test_analyze_space_separated_paths_json_output(self):
"""Test analyze supports multiple path arguments."""
first_file = self.temp_dir / "first.py"
second_file = self.temp_dir / "second.py"
first_file.write_text("def first():\n return 1\n")
second_file.write_text("def second():\n return 2\n")

result = self.runner.invoke(
cli,
[
"analyze",
str(first_file),
str(second_file),
"--skip-llm",
"--format",
"json",
],
)

assert result.exit_code == 0
output = json.loads(result.output)
assert "mfcqi_score" in output
assert "metrics" in output

def test_analyze_with_output_file(self):
"""Test analyze command with output file."""
output_file = self.temp_dir / "report.json"
Expand Down
Loading