Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions src/mfcqi/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def __init__(
# Cache for metrics to avoid recreating them
self._cached_metrics: dict[str, Any] | None = None
self._cached_codebase: Path | None = None
self.last_metric_statuses: dict[str, dict[str, Any]] = {}

def calculate(self, codebase: Path) -> float:
"""Calculate MFCQI score using geometric mean formula.
Expand All @@ -110,20 +111,23 @@ def calculate(self, codebase: Path) -> float:
MFCQI score between 0.0 and 1.0
"""
if not codebase.exists() or (not codebase.is_dir() and not codebase.is_file()):
self.last_metric_statuses = {}
return 0.0

# Check if codebase has any Python files (excluding .venv, etc.)
py_files = get_python_files(codebase)
if not py_files:
self.last_metric_statuses = {}
return 0.0

# Determine final metrics based on complexity analysis
final_metrics = self._determine_applicable_metrics(codebase)

# Extract and normalize all metrics
normalized_scores = []
self.last_metric_statuses = {}

for _metric_name, metric in final_metrics.items():
for metric_name, metric in final_metrics.items():
try:
# Extract raw metric value
raw_value = metric.extract(codebase)
Expand All @@ -135,10 +139,12 @@ def calculate(self, codebase: Path) -> float:
normalized_value = max(0.0, min(1.0, normalized_value))

normalized_scores.append(normalized_value)
self._record_metric_status(metric_name, "ok", raw_value, normalized_value)

except Exception:
except Exception as e:
# If metric fails, use 0.0 (worst score)
normalized_scores.append(0.0)
self._record_metric_status(metric_name, "failed", None, 0.0, str(e))

# Calculate geometric mean
return self._calculate_geometric_mean(normalized_scores)
Expand Down Expand Up @@ -223,6 +229,7 @@ def get_detailed_metrics(self, codebase: Path) -> dict[str, float]:
Dictionary with metric names and their normalized scores
"""
results = {}
self.last_metric_statuses = {}

if not codebase.exists() or (not codebase.is_dir() and not codebase.is_file()):
# Return zeros for included metrics
Expand All @@ -241,17 +248,40 @@ def get_detailed_metrics(self, codebase: Path) -> dict[str, float]:
normalized_value = metric.normalize(raw_value)
normalized_value = max(0.0, min(1.0, normalized_value))
results[metric_name] = normalized_value
self._record_metric_status(metric_name, "ok", raw_value, normalized_value)
except Exception as e:
import logging

logging.warning(f"Failed to calculate metric {metric_name}: {e}")
results[metric_name] = 0.0
self._record_metric_status(metric_name, "failed", None, 0.0, str(e))

# Calculate overall MFCQI score
results["mfcqi_score"] = self.calculate(codebase)
results["mfcqi_score"] = self._calculate_geometric_mean(
[score for score in results.values() if isinstance(score, (int, float))]
)

return results

def _record_metric_status(
self,
metric_name: str,
status: str,
raw_value: Any,
normalized_value: float,
error: str | None = None,
) -> None:
"""Record structured status for the most recent metric calculation."""
metric_status: dict[str, Any] = {
"status": status,
"normalized_value": normalized_value,
}
if raw_value is not None:
metric_status["raw_value"] = raw_value
if error:
metric_status["error"] = error
self.last_metric_statuses[metric_name] = metric_status

def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, Any]:
"""Get detailed metrics WITH raw tool outputs for LLM context.

Expand All @@ -274,11 +304,13 @@ def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, An
applicable_metrics = self._determine_applicable_metrics(codebase)

# Calculate each metric AND collect tool outputs
self.last_metric_statuses = {}
for metric_name, metric in applicable_metrics.items():
try:
raw_value = metric.extract(codebase)
normalized_value = metric.normalize(raw_value)
results[metric_name] = max(0.0, min(1.0, normalized_value))
self._record_metric_status(metric_name, "ok", raw_value, results[metric_name])

# Get the actual Bandit issues if available for security metric
if (
Expand All @@ -304,11 +336,19 @@ def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, An
# Log metric extraction failure (graceful degradation to 0.0)
logger.debug(f"Metric '{metric_name}' extraction failed: {e}. Using 0.0")
results[metric_name] = 0.0
self._record_metric_status(metric_name, "failed", None, 0.0, str(e))

# Calculate overall score
mfcqi_score = self.calculate(codebase)
mfcqi_score = self._calculate_geometric_mean(
[score for score in results.values() if isinstance(score, (int, float))]
)

return {"mfcqi_score": mfcqi_score, "metrics": results, "tool_outputs": tool_outputs}
return {
"mfcqi_score": mfcqi_score,
"metrics": results,
"tool_outputs": tool_outputs,
"metric_statuses": self.last_metric_statuses,
}

def _get_complex_functions(self, codebase: Path, limit: int = 10) -> list[dict[str, Any]]:
"""Get the most complex functions in the codebase.
Expand Down
33 changes: 29 additions & 4 deletions src/mfcqi/cli/commands/analyze_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ def calculate_metrics(
detailed_data = calculator.get_detailed_metrics_with_tool_outputs(paths[0])
detailed_metrics = detailed_data.get("metrics", {})
detailed_metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0)
detailed_metrics["_metric_statuses"] = detailed_data.get("metric_statuses", {})
tool_outputs = detailed_data.get("tool_outputs", {})
else:
progress.update(task, description="📊 Calculating metrics...")
detailed_metrics = calculator.get_detailed_metrics(paths[0])
detailed_metrics = dict(calculator.get_detailed_metrics(paths[0]))
detailed_metrics["_metric_statuses"] = getattr(calculator, "last_metric_statuses", {})
tool_outputs = {}

elapsed = time.time() - start_time
Expand Down Expand Up @@ -89,12 +91,16 @@ def _calculate_metrics_for_multiple_paths(
detailed_data = calculator.get_detailed_metrics_with_tool_outputs(current_path)
metrics = detailed_data.get("metrics", {})
metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0)
metrics["_metric_statuses"] = detailed_data.get("metric_statuses", {})
_merge_tool_outputs(merged_tool_outputs, detailed_data.get("tool_outputs", {}))
else:
metrics = calculator.get_detailed_metrics(current_path)
metrics = dict(calculator.get_detailed_metrics(current_path))
metrics["_metric_statuses"] = getattr(calculator, "last_metric_statuses", {})
metric_sets.append(metrics)

return _average_metric_sets(metric_sets), merged_tool_outputs
averaged_metrics = _average_metric_sets(metric_sets)
averaged_metrics["_metric_statuses"] = _merge_metric_statuses(metric_sets)
return averaged_metrics, merged_tool_outputs


def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any]:
Expand All @@ -114,6 +120,20 @@ def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any
return averaged


def _merge_metric_statuses(metric_sets: Sequence[dict[str, Any]]) -> dict[str, list[Any]]:
"""Merge metric statuses collected from separate path analyses."""
merged: dict[str, list[Any]] = {}

for metrics in metric_sets:
statuses = metrics.get("_metric_statuses", {})
if not isinstance(statuses, dict):
continue
for metric_name, status in statuses.items():
merged.setdefault(metric_name, []).append(status)

return merged


def _merge_tool_outputs(target: dict[str, Any], source: dict[str, Any]) -> None:
"""Merge tool outputs collected from separate path analyses."""
for key, value in source.items():
Expand Down Expand Up @@ -176,7 +196,12 @@ def prepare_analysis_result(detailed_metrics: dict[str, Any]) -> dict[str, Any]:

return {
"mfcqi_score": cqi_score,
"metric_scores": {k: v for k, v in detailed_metrics.items() if k != "mfcqi_score"},
"metric_scores": {
k: v
for k, v in detailed_metrics.items()
if k != "mfcqi_score" and not k.startswith("_")
},
"metric_statuses": detailed_metrics.get("_metric_statuses", {}),
"diagnostics": [],
"recommendations": [],
"model_used": "metrics-only",
Expand Down
1 change: 1 addition & 0 deletions src/mfcqi/cli/utils/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def format_json_output(analysis_result: dict[str, Any]) -> dict[str, Any]:
"metrics": analysis_result.get("metric_scores", {}),
"recommendations": analysis_result.get("recommendations", []),
"model_used": analysis_result.get("model_used", "metrics-only"),
"metric_statuses": analysis_result.get("metric_statuses", {}),
"diagnostics_count": len(analysis_result.get("diagnostics", [])),
"timestamp": analysis_result.get("timestamp"),
"version": __version__,
Expand Down
4 changes: 4 additions & 0 deletions tests/test_mfcqi_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ def test():
# Should still return a score (0.0 for failed metric)
result = calculator.calculate(Path(tmpdir))
assert 0.0 <= result <= 1.0
assert calculator.last_metric_statuses["cyclomatic_complexity"]["status"] == "failed"
assert "Test error" in calculator.last_metric_statuses["cyclomatic_complexity"]["error"]


def test_get_detailed_metrics_invalid_codebase():
Expand Down Expand Up @@ -499,6 +501,8 @@ def test():
assert isinstance(result, dict)
assert "maintainability_index" in result
assert result["maintainability_index"] == 0.0
assert calculator.last_metric_statuses["maintainability_index"]["status"] == "failed"
assert "Test" in calculator.last_metric_statuses["maintainability_index"]["error"]


def test_paradigm_detection_exception_falls_back_to_complexity():
Expand Down
5 changes: 5 additions & 0 deletions tests/test_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ def test_format_json_output():
"recommendations": ["Improve documentation", "Reduce complexity"],
"model_used": "claude-3-5-sonnet",
"diagnostics": [{"severity": "warning", "message": "Test warning"}],
"metric_statuses": {
"cyclomatic_complexity": {"status": "ok", "raw_value": 1.0},
"maintainability_index": {"status": "failed", "error": "tool failed"},
},
"timestamp": "2025-01-01T00:00:00",
}

Expand All @@ -27,6 +31,7 @@ def test_format_json_output():
assert result["recommendations"] == analysis_result["recommendations"]
assert result["model_used"] == "claude-3-5-sonnet"
assert result["diagnostics_count"] == 1
assert result["metric_statuses"] == analysis_result["metric_statuses"]
assert result["timestamp"] == "2025-01-01T00:00:00"
assert "version" in result

Expand Down
Loading