From 1179d6ef470158bd4147c29ecb717de29bce485e Mon Sep 17 00:00:00 2001 From: Brian Sam-Bodden Date: Sat, 16 May 2026 13:30:42 -0700 Subject: [PATCH] fix(metrics): expose calculation status --- src/mfcqi/calculator.py | 50 ++++++++++++++++++++--- src/mfcqi/cli/commands/analyze_helpers.py | 33 +++++++++++++-- src/mfcqi/cli/utils/output.py | 1 + tests/test_mfcqi_calculator.py | 4 ++ tests/test_output.py | 5 +++ 5 files changed, 84 insertions(+), 9 deletions(-) diff --git a/src/mfcqi/calculator.py b/src/mfcqi/calculator.py index 0b922ef..46f4b1c 100644 --- a/src/mfcqi/calculator.py +++ b/src/mfcqi/calculator.py @@ -99,6 +99,7 @@ def __init__( # Cache for metrics to avoid recreating them self._cached_metrics: dict[str, Any] | None = None self._cached_codebase: Path | None = None + self.last_metric_statuses: dict[str, dict[str, Any]] = {} def calculate(self, codebase: Path) -> float: """Calculate MFCQI score using geometric mean formula. @@ -110,11 +111,13 @@ def calculate(self, codebase: Path) -> float: MFCQI score between 0.0 and 1.0 """ if not codebase.exists() or (not codebase.is_dir() and not codebase.is_file()): + self.last_metric_statuses = {} return 0.0 # Check if codebase has any Python files (excluding .venv, etc.) py_files = get_python_files(codebase) if not py_files: + self.last_metric_statuses = {} return 0.0 # Determine final metrics based on complexity analysis @@ -122,8 +125,9 @@ def calculate(self, codebase: Path) -> float: # Extract and normalize all metrics normalized_scores = [] + self.last_metric_statuses = {} - for _metric_name, metric in final_metrics.items(): + for metric_name, metric in final_metrics.items(): try: # Extract raw metric value raw_value = metric.extract(codebase) @@ -135,10 +139,12 @@ def calculate(self, codebase: Path) -> float: normalized_value = max(0.0, min(1.0, normalized_value)) normalized_scores.append(normalized_value) + self._record_metric_status(metric_name, "ok", raw_value, normalized_value) - except Exception: + except Exception as e: # If metric fails, use 0.0 (worst score) normalized_scores.append(0.0) + self._record_metric_status(metric_name, "failed", None, 0.0, str(e)) # Calculate geometric mean return self._calculate_geometric_mean(normalized_scores) @@ -223,6 +229,7 @@ def get_detailed_metrics(self, codebase: Path) -> dict[str, float]: Dictionary with metric names and their normalized scores """ results = {} + self.last_metric_statuses = {} if not codebase.exists() or (not codebase.is_dir() and not codebase.is_file()): # Return zeros for included metrics @@ -241,17 +248,40 @@ def get_detailed_metrics(self, codebase: Path) -> dict[str, float]: normalized_value = metric.normalize(raw_value) normalized_value = max(0.0, min(1.0, normalized_value)) results[metric_name] = normalized_value + self._record_metric_status(metric_name, "ok", raw_value, normalized_value) except Exception as e: import logging logging.warning(f"Failed to calculate metric {metric_name}: {e}") results[metric_name] = 0.0 + self._record_metric_status(metric_name, "failed", None, 0.0, str(e)) # Calculate overall MFCQI score - results["mfcqi_score"] = self.calculate(codebase) + results["mfcqi_score"] = self._calculate_geometric_mean( + [score for score in results.values() if isinstance(score, (int, float))] + ) return results + def _record_metric_status( + self, + metric_name: str, + status: str, + raw_value: Any, + normalized_value: float, + error: str | None = None, + ) -> None: + """Record structured status for the most recent metric calculation.""" + metric_status: dict[str, Any] = { + "status": status, + "normalized_value": normalized_value, + } + if raw_value is not None: + metric_status["raw_value"] = raw_value + if error: + metric_status["error"] = error + self.last_metric_statuses[metric_name] = metric_status + def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, Any]: """Get detailed metrics WITH raw tool outputs for LLM context. @@ -274,11 +304,13 @@ def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, An applicable_metrics = self._determine_applicable_metrics(codebase) # Calculate each metric AND collect tool outputs + self.last_metric_statuses = {} for metric_name, metric in applicable_metrics.items(): try: raw_value = metric.extract(codebase) normalized_value = metric.normalize(raw_value) results[metric_name] = max(0.0, min(1.0, normalized_value)) + self._record_metric_status(metric_name, "ok", raw_value, results[metric_name]) # Get the actual Bandit issues if available for security metric if ( @@ -304,11 +336,19 @@ def get_detailed_metrics_with_tool_outputs(self, codebase: Path) -> dict[str, An # Log metric extraction failure (graceful degradation to 0.0) logger.debug(f"Metric '{metric_name}' extraction failed: {e}. Using 0.0") results[metric_name] = 0.0 + self._record_metric_status(metric_name, "failed", None, 0.0, str(e)) # Calculate overall score - mfcqi_score = self.calculate(codebase) + mfcqi_score = self._calculate_geometric_mean( + [score for score in results.values() if isinstance(score, (int, float))] + ) - return {"mfcqi_score": mfcqi_score, "metrics": results, "tool_outputs": tool_outputs} + return { + "mfcqi_score": mfcqi_score, + "metrics": results, + "tool_outputs": tool_outputs, + "metric_statuses": self.last_metric_statuses, + } def _get_complex_functions(self, codebase: Path, limit: int = 10) -> list[dict[str, Any]]: """Get the most complex functions in the codebase. diff --git a/src/mfcqi/cli/commands/analyze_helpers.py b/src/mfcqi/cli/commands/analyze_helpers.py index e37b693..80e2c6a 100644 --- a/src/mfcqi/cli/commands/analyze_helpers.py +++ b/src/mfcqi/cli/commands/analyze_helpers.py @@ -57,10 +57,12 @@ def calculate_metrics( detailed_data = calculator.get_detailed_metrics_with_tool_outputs(paths[0]) detailed_metrics = detailed_data.get("metrics", {}) detailed_metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0) + detailed_metrics["_metric_statuses"] = detailed_data.get("metric_statuses", {}) tool_outputs = detailed_data.get("tool_outputs", {}) else: progress.update(task, description="📊 Calculating metrics...") - detailed_metrics = calculator.get_detailed_metrics(paths[0]) + detailed_metrics = dict(calculator.get_detailed_metrics(paths[0])) + detailed_metrics["_metric_statuses"] = getattr(calculator, "last_metric_statuses", {}) tool_outputs = {} elapsed = time.time() - start_time @@ -89,12 +91,16 @@ def _calculate_metrics_for_multiple_paths( detailed_data = calculator.get_detailed_metrics_with_tool_outputs(current_path) metrics = detailed_data.get("metrics", {}) metrics["mfcqi_score"] = detailed_data.get("mfcqi_score", 0.0) + metrics["_metric_statuses"] = detailed_data.get("metric_statuses", {}) _merge_tool_outputs(merged_tool_outputs, detailed_data.get("tool_outputs", {})) else: - metrics = calculator.get_detailed_metrics(current_path) + metrics = dict(calculator.get_detailed_metrics(current_path)) + metrics["_metric_statuses"] = getattr(calculator, "last_metric_statuses", {}) metric_sets.append(metrics) - return _average_metric_sets(metric_sets), merged_tool_outputs + averaged_metrics = _average_metric_sets(metric_sets) + averaged_metrics["_metric_statuses"] = _merge_metric_statuses(metric_sets) + return averaged_metrics, merged_tool_outputs def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any]: @@ -114,6 +120,20 @@ def _average_metric_sets(metric_sets: Sequence[dict[str, Any]]) -> dict[str, Any return averaged +def _merge_metric_statuses(metric_sets: Sequence[dict[str, Any]]) -> dict[str, list[Any]]: + """Merge metric statuses collected from separate path analyses.""" + merged: dict[str, list[Any]] = {} + + for metrics in metric_sets: + statuses = metrics.get("_metric_statuses", {}) + if not isinstance(statuses, dict): + continue + for metric_name, status in statuses.items(): + merged.setdefault(metric_name, []).append(status) + + return merged + + def _merge_tool_outputs(target: dict[str, Any], source: dict[str, Any]) -> None: """Merge tool outputs collected from separate path analyses.""" for key, value in source.items(): @@ -176,7 +196,12 @@ def prepare_analysis_result(detailed_metrics: dict[str, Any]) -> dict[str, Any]: return { "mfcqi_score": cqi_score, - "metric_scores": {k: v for k, v in detailed_metrics.items() if k != "mfcqi_score"}, + "metric_scores": { + k: v + for k, v in detailed_metrics.items() + if k != "mfcqi_score" and not k.startswith("_") + }, + "metric_statuses": detailed_metrics.get("_metric_statuses", {}), "diagnostics": [], "recommendations": [], "model_used": "metrics-only", diff --git a/src/mfcqi/cli/utils/output.py b/src/mfcqi/cli/utils/output.py index 3cb3d6b..fea4817 100644 --- a/src/mfcqi/cli/utils/output.py +++ b/src/mfcqi/cli/utils/output.py @@ -37,6 +37,7 @@ def format_json_output(analysis_result: dict[str, Any]) -> dict[str, Any]: "metrics": analysis_result.get("metric_scores", {}), "recommendations": analysis_result.get("recommendations", []), "model_used": analysis_result.get("model_used", "metrics-only"), + "metric_statuses": analysis_result.get("metric_statuses", {}), "diagnostics_count": len(analysis_result.get("diagnostics", [])), "timestamp": analysis_result.get("timestamp"), "version": __version__, diff --git a/tests/test_mfcqi_calculator.py b/tests/test_mfcqi_calculator.py index 7da9eb7..c8a7eb7 100644 --- a/tests/test_mfcqi_calculator.py +++ b/tests/test_mfcqi_calculator.py @@ -438,6 +438,8 @@ def test(): # Should still return a score (0.0 for failed metric) result = calculator.calculate(Path(tmpdir)) assert 0.0 <= result <= 1.0 + assert calculator.last_metric_statuses["cyclomatic_complexity"]["status"] == "failed" + assert "Test error" in calculator.last_metric_statuses["cyclomatic_complexity"]["error"] def test_get_detailed_metrics_invalid_codebase(): @@ -499,6 +501,8 @@ def test(): assert isinstance(result, dict) assert "maintainability_index" in result assert result["maintainability_index"] == 0.0 + assert calculator.last_metric_statuses["maintainability_index"]["status"] == "failed" + assert "Test" in calculator.last_metric_statuses["maintainability_index"]["error"] def test_paradigm_detection_exception_falls_back_to_complexity(): diff --git a/tests/test_output.py b/tests/test_output.py index defb61e..7d1f8e8 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -17,6 +17,10 @@ def test_format_json_output(): "recommendations": ["Improve documentation", "Reduce complexity"], "model_used": "claude-3-5-sonnet", "diagnostics": [{"severity": "warning", "message": "Test warning"}], + "metric_statuses": { + "cyclomatic_complexity": {"status": "ok", "raw_value": 1.0}, + "maintainability_index": {"status": "failed", "error": "tool failed"}, + }, "timestamp": "2025-01-01T00:00:00", } @@ -27,6 +31,7 @@ def test_format_json_output(): assert result["recommendations"] == analysis_result["recommendations"] assert result["model_used"] == "claude-3-5-sonnet" assert result["diagnostics_count"] == 1 + assert result["metric_statuses"] == analysis_result["metric_statuses"] assert result["timestamp"] == "2025-01-01T00:00:00" assert "version" in result