From e463da6ea2da9782e47ed4b152d8be1db68925d7 Mon Sep 17 00:00:00 2001 From: sungdark Date: Thu, 26 Feb 2026 07:37:11 +0000 Subject: [PATCH 1/2] feat: add multi-dimensional quality scorer with tests --- quality_scorer.py | 148 +++++++++++++++++++++++++++++++++++++++++ test_quality_scorer.py | 37 +++++++++++ 2 files changed, 185 insertions(+) create mode 100644 quality_scorer.py create mode 100644 test_quality_scorer.py diff --git a/quality_scorer.py b/quality_scorer.py new file mode 100644 index 0000000..8565322 --- /dev/null +++ b/quality_scorer.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import json +import re +from dataclasses import dataclass +from typing import Any, Dict, List + +WEIGHTS = { + "completeness": 0.30, + "format_compliance": 0.20, + "coverage": 0.25, + "clarity": 0.15, + "validity": 0.10, +} + + +@dataclass +class ScoreResult: + weighted_score: float + quality_rating: str + scores: Dict[str, float] + feedback: List[str] + pass_threshold: bool + + def to_dict(self) -> Dict[str, Any]: + return { + "weighted_score": round(self.weighted_score, 4), + "quality_rating": self.quality_rating, + "scores": {k: round(v, 4) for k, v in self.scores.items()}, + "feedback": self.feedback, + "pass_threshold": self.pass_threshold, + } + + +def _detect_format(text: str) -> str: + t = text.strip() + if not t: + return "text" + if (t.startswith("{") and t.endswith("}")) or (t.startswith("[") and t.endswith("]")): + try: + json.loads(t) + return "json" + except Exception: + pass + if "```" in t or re.search(r"\b(def|class|function|const|let|var|import)\b", t): + return "code" + if re.search(r"(^#\s)|(^-\s)|(^\d+\.\s)", t, re.M): + return "markdown" + return "text" + + +def _clamp(x: float) -> float: + return max(0.0, min(1.0, x)) + + +def _tokenize(text: str) -> List[str]: + return re.findall(r"[A-Za-z0-9_]+", text.lower()) + + +def score_submission(submission: str, rubric_keywords: List[str] | None = None, threshold: float = 0.7) -> Dict[str, Any]: + rubric_keywords = rubric_keywords or ["summary", "steps", "result"] + fmt = _detect_format(submission) + txt = submission.strip() + tokens = _tokenize(txt) + + # Completeness: length and structure markers + completeness = _clamp((len(tokens) / 120.0)) + if fmt in ("json", "markdown", "code"): + completeness = _clamp(completeness + 0.15) + + # Format compliance: based on successful parse/structure patterns + if fmt == "json": + try: + json.loads(txt) + format_compliance = 0.95 + except Exception: + format_compliance = 0.2 + elif fmt == "markdown": + format_compliance = 0.8 if re.search(r"(^#\s)|(^-\s)|(^\d+\.\s)", txt, re.M) else 0.5 + elif fmt == "code": + format_compliance = 0.8 if re.search(r"[{}();]|\n\s{2,}\w", txt) else 0.55 + else: + format_compliance = 0.7 + + # Coverage: keyword hit ratio + hits = sum(1 for k in rubric_keywords if k.lower() in txt.lower()) + coverage = _clamp(hits / max(1, len(rubric_keywords))) + + # Clarity: sentence/word balance and low symbol noise + punct = len(re.findall(r"[.!?]", txt)) + symbol_noise = len(re.findall(r"[^\w\s\.,;:!?\-\(\)\[\]\{\}'\"/\\]", txt)) + clarity = 0.6 + min(0.25, punct / 20.0) - min(0.35, symbol_noise / max(1, len(txt)) * 5) + clarity = _clamp(clarity) + + # Validity: basic consistency and parseability clues + validity = 0.65 + if fmt == "json": + try: + parsed = json.loads(txt) + validity = 0.9 if isinstance(parsed, (dict, list)) else 0.75 + except Exception: + validity = 0.1 + elif fmt == "code": + brackets_ok = txt.count("(") == txt.count(")") and txt.count("{") == txt.count("}") + validity = 0.8 if brackets_ok else 0.45 + + scores = { + "completeness": round(completeness, 4), + "format_compliance": round(format_compliance, 4), + "coverage": round(coverage, 4), + "clarity": round(clarity, 4), + "validity": round(validity, 4), + } + + weighted = sum(scores[k] * WEIGHTS[k] for k in WEIGHTS) + + if weighted >= 0.85: + rating = "excellent" + elif weighted >= 0.7: + rating = "good" + elif weighted >= 0.5: + rating = "fair" + else: + rating = "poor" + + fb = [] + for k, v in scores.items(): + if v < 0.5: + fb.append(f"Improve {k.replace('_', ' ')}.") + if not fb: + fb.append("Strong overall quality across rubric dimensions.") + + return ScoreResult( + weighted_score=weighted, + quality_rating=rating, + scores=scores, + feedback=fb, + pass_threshold=weighted >= threshold, + ).to_dict() + + +def benchmark_100(submissions: List[str]) -> float: + import time + start = time.time() + for s in submissions[:100]: + score_submission(s) + return time.time() - start diff --git a/test_quality_scorer.py b/test_quality_scorer.py new file mode 100644 index 0000000..8c9377e --- /dev/null +++ b/test_quality_scorer.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +import json +import unittest + +from quality_scorer import score_submission, benchmark_100 + + +class TestQualityScorer(unittest.TestCase): + def test_output_schema(self): + out = score_submission('{"summary":"ok","steps":[1,2],"result":"done"}') + self.assertIn("weighted_score", out) + self.assertIn("quality_rating", out) + self.assertIn("scores", out) + self.assertIn("feedback", out) + self.assertIn("pass_threshold", out) + self.assertEqual(set(out["scores"].keys()), {"completeness", "format_compliance", "coverage", "clarity", "validity"}) + + def test_formats(self): + samples = [ + '{"summary":"hello","result":"x"}', + '# Title\n- step 1\n- step 2\nresult done', + 'def run(x):\n return x+1', + 'plain text summary steps and result', + ] + for s in samples: + out = score_submission(s) + self.assertGreaterEqual(out["weighted_score"], 0.0) + self.assertLessEqual(out["weighted_score"], 1.0) + + def test_benchmark(self): + subs = [json.dumps({"summary": "a", "steps": [1,2,3], "result": "ok"}) for _ in range(100)] + sec = benchmark_100(subs) + self.assertLess(sec, 10.0) + + +if __name__ == "__main__": + unittest.main() From de47dc397dbfc2243048971f4e5eddc3f0aa0193 Mon Sep 17 00:00:00 2001 From: sungdark Date: Thu, 26 Feb 2026 09:15:50 +0000 Subject: [PATCH 2/2] refactor: improve scoring config, feedback quality, and gt evaluation helper --- quality_scorer.py | 133 +++++++++++++++++++++++++++++------------ test_quality_scorer.py | 40 ++++++++++++- 2 files changed, 131 insertions(+), 42 deletions(-) diff --git a/quality_scorer.py b/quality_scorer.py index 8565322..6d4f1ce 100644 --- a/quality_scorer.py +++ b/quality_scorer.py @@ -3,10 +3,11 @@ import json import re +import time from dataclasses import dataclass -from typing import Any, Dict, List +from typing import Any, Dict, List, Tuple -WEIGHTS = { +DEFAULT_WEIGHTS = { "completeness": 0.30, "format_compliance": 0.20, "coverage": 0.25, @@ -33,6 +34,14 @@ def to_dict(self) -> Dict[str, Any]: } +def _clamp(x: float) -> float: + return max(0.0, min(1.0, x)) + + +def _tokenize(text: str) -> List[str]: + return re.findall(r"[A-Za-z0-9_]+", text.lower()) + + def _detect_format(text: str) -> str: t = text.strip() if not t: @@ -43,69 +52,64 @@ def _detect_format(text: str) -> str: return "json" except Exception: pass - if "```" in t or re.search(r"\b(def|class|function|const|let|var|import)\b", t): + if "```" in t or re.search(r"\b(def|class|function|const|let|var|import|return)\b", t): return "code" if re.search(r"(^#\s)|(^-\s)|(^\d+\.\s)", t, re.M): return "markdown" return "text" -def _clamp(x: float) -> float: - return max(0.0, min(1.0, x)) - - -def _tokenize(text: str) -> List[str]: - return re.findall(r"[A-Za-z0-9_]+", text.lower()) +def _weights_or_default(weights: Dict[str, float] | None) -> Dict[str, float]: + active = dict(DEFAULT_WEIGHTS) + if weights: + active.update(weights) + total = sum(active.values()) or 1.0 + # normalize to 1.0 + return {k: v / total for k, v in active.items()} -def score_submission(submission: str, rubric_keywords: List[str] | None = None, threshold: float = 0.7) -> Dict[str, Any]: - rubric_keywords = rubric_keywords or ["summary", "steps", "result"] +def _score_dimensions(submission: str, rubric_keywords: List[str]) -> Tuple[str, Dict[str, float]]: fmt = _detect_format(submission) txt = submission.strip() tokens = _tokenize(txt) - # Completeness: length and structure markers - completeness = _clamp((len(tokens) / 120.0)) + completeness = _clamp(len(tokens) / 120.0) if fmt in ("json", "markdown", "code"): - completeness = _clamp(completeness + 0.15) + completeness = _clamp(completeness + 0.12) - # Format compliance: based on successful parse/structure patterns if fmt == "json": try: - json.loads(txt) - format_compliance = 0.95 + parsed = json.loads(txt) + format_compliance = 0.95 if isinstance(parsed, (dict, list)) else 0.8 except Exception: format_compliance = 0.2 elif fmt == "markdown": - format_compliance = 0.8 if re.search(r"(^#\s)|(^-\s)|(^\d+\.\s)", txt, re.M) else 0.5 + format_compliance = 0.82 if re.search(r"(^#\s)|(^-\s)|(^\d+\.\s)", txt, re.M) else 0.55 elif fmt == "code": - format_compliance = 0.8 if re.search(r"[{}();]|\n\s{2,}\w", txt) else 0.55 + format_compliance = 0.82 if re.search(r"[{}();]|\n\s{2,}\w", txt) else 0.58 else: - format_compliance = 0.7 + format_compliance = 0.72 - # Coverage: keyword hit ratio hits = sum(1 for k in rubric_keywords if k.lower() in txt.lower()) coverage = _clamp(hits / max(1, len(rubric_keywords))) - # Clarity: sentence/word balance and low symbol noise punct = len(re.findall(r"[.!?]", txt)) symbol_noise = len(re.findall(r"[^\w\s\.,;:!?\-\(\)\[\]\{\}'\"/\\]", txt)) - clarity = 0.6 + min(0.25, punct / 20.0) - min(0.35, symbol_noise / max(1, len(txt)) * 5) + clarity = 0.62 + min(0.22, punct / 20.0) - min(0.30, symbol_noise / max(1, len(txt)) * 5) clarity = _clamp(clarity) - # Validity: basic consistency and parseability clues validity = 0.65 if fmt == "json": try: parsed = json.loads(txt) validity = 0.9 if isinstance(parsed, (dict, list)) else 0.75 except Exception: - validity = 0.1 + validity = 0.12 elif fmt == "code": brackets_ok = txt.count("(") == txt.count(")") and txt.count("{") == txt.count("}") validity = 0.8 if brackets_ok else 0.45 - scores = { + return fmt, { "completeness": round(completeness, 4), "format_compliance": round(format_compliance, 4), "coverage": round(coverage, 4), @@ -113,35 +117,86 @@ def score_submission(submission: str, rubric_keywords: List[str] | None = None, "validity": round(validity, 4), } - weighted = sum(scores[k] * WEIGHTS[k] for k in WEIGHTS) +def _rating(weighted: float) -> str: if weighted >= 0.85: - rating = "excellent" - elif weighted >= 0.7: - rating = "good" - elif weighted >= 0.5: - rating = "fair" - else: - rating = "poor" + return "excellent" + if weighted >= 0.7: + return "good" + if weighted >= 0.5: + return "fair" + return "poor" + +def _feedback(scores: Dict[str, float]) -> List[str]: fb = [] + messages = { + "completeness": "内容不够完整,建议补充上下文、步骤和结果。", + "format_compliance": "格式规范不足,建议按目标格式(JSON/Markdown/Code)整理。", + "coverage": "覆盖面偏低,建议补全 rubric 关键词相关内容。", + "clarity": "表达清晰度一般,建议拆句并减少噪声字符。", + "validity": "有效性偏弱,建议修复结构错误或语法问题。", + } for k, v in scores.items(): if v < 0.5: - fb.append(f"Improve {k.replace('_', ' ')}.") + fb.append(messages[k]) if not fb: - fb.append("Strong overall quality across rubric dimensions.") + fb.append("整体质量较好,已覆盖主要评分维度。") + return fb + + +def score_submission( + submission: str, + rubric_keywords: List[str] | None = None, + threshold: float = 0.7, + weights: Dict[str, float] | None = None, +) -> Dict[str, Any]: + rubric_keywords = rubric_keywords or ["summary", "steps", "result"] + active_weights = _weights_or_default(weights) + + _, scores = _score_dimensions(submission, rubric_keywords) + weighted = sum(scores[k] * active_weights[k] for k in active_weights) return ScoreResult( weighted_score=weighted, - quality_rating=rating, + quality_rating=_rating(weighted), scores=scores, - feedback=fb, + feedback=_feedback(scores), pass_threshold=weighted >= threshold, ).to_dict() +def evaluate_against_ground_truth(dataset: List[Dict[str, Any]]) -> Dict[str, Any]: + """dataset item: {submission:str, expected_score:float, rubric_keywords?:list[str]}""" + abs_errors: List[float] = [] + results = [] + for item in dataset: + out = score_submission(item["submission"], item.get("rubric_keywords")) + err = abs(out["weighted_score"] - float(item["expected_score"])) + abs_errors.append(err) + results.append( + { + "expected": round(float(item["expected_score"]), 4), + "predicted": out["weighted_score"], + "abs_error": round(err, 4), + "quality_rating": out["quality_rating"], + } + ) + + mae = sum(abs_errors) / max(1, len(abs_errors)) + max_error = max(abs_errors) if abs_errors else 0.0 + within_point_05 = sum(1 for e in abs_errors if e <= 0.05) / max(1, len(abs_errors)) + + return { + "count": len(dataset), + "mae": round(mae, 4), + "max_error": round(max_error, 4), + "within_point_05_ratio": round(within_point_05, 4), + "results": results, + } + + def benchmark_100(submissions: List[str]) -> float: - import time start = time.time() for s in submissions[:100]: score_submission(s) diff --git a/test_quality_scorer.py b/test_quality_scorer.py index 8c9377e..695627b 100644 --- a/test_quality_scorer.py +++ b/test_quality_scorer.py @@ -2,7 +2,7 @@ import json import unittest -from quality_scorer import score_submission, benchmark_100 +from quality_scorer import benchmark_100, evaluate_against_ground_truth, score_submission class TestQualityScorer(unittest.TestCase): @@ -13,7 +13,10 @@ def test_output_schema(self): self.assertIn("scores", out) self.assertIn("feedback", out) self.assertIn("pass_threshold", out) - self.assertEqual(set(out["scores"].keys()), {"completeness", "format_compliance", "coverage", "clarity", "validity"}) + self.assertEqual( + set(out["scores"].keys()), + {"completeness", "format_compliance", "coverage", "clarity", "validity"}, + ) def test_formats(self): samples = [ @@ -28,10 +31,41 @@ def test_formats(self): self.assertLessEqual(out["weighted_score"], 1.0) def test_benchmark(self): - subs = [json.dumps({"summary": "a", "steps": [1,2,3], "result": "ok"}) for _ in range(100)] + subs = [json.dumps({"summary": "a", "steps": [1, 2, 3], "result": "ok"}) for _ in range(100)] sec = benchmark_100(subs) self.assertLess(sec, 10.0) + def test_ground_truth_alignment(self): + # 20 samples with expected scores that this heuristic should approximate. + dataset = [ + {"submission": '{"summary":"clear summary","steps":["a","b"],"result":"done"}', "expected_score": 0.84}, + {"submission": '{"summary":"short","result":"ok"}', "expected_score": 0.68}, + {"submission": '# Report\n- summary\n- steps\n- result', "expected_score": 0.78}, + {"submission": 'summary: yes. steps: yes. result: yes.', "expected_score": 0.72}, + {"submission": 'def run(x):\n # summary\n return x', "expected_score": 0.66}, + {"submission": '{"foo":1}', "expected_score": 0.53}, + {"submission": 'random words only', "expected_score": 0.38}, + {"submission": '```python\ndef x():\n return 1\n```', "expected_score": 0.58}, + {"submission": '1. summary\n2. steps\n3. result', "expected_score": 0.76}, + {"submission": '{"summary":"ok","steps":[1,2,3],"result":"final"}', "expected_score": 0.86}, + {"submission": '{"summary":"ok","steps":[],"result":"done"}', "expected_score": 0.77}, + {"submission": '### Header\nresult only', "expected_score": 0.54}, + {"submission": 'summary and steps but no final result', "expected_score": 0.61}, + {"submission": '{"summary":"detailed","steps":["a","b","c"],"result":"good","notes":"clear"}', "expected_score": 0.88}, + {"submission": 'def bad(x:\n return x', "expected_score": 0.44}, + {"submission": '# title\n- summary\ntext text text\nresult', "expected_score": 0.67}, + {"submission": '{"summary":"s","steps":["1"],"result":"r","extra":"..."}', "expected_score": 0.79}, + {"submission": 'plain text with summary steps result and clear ending.', "expected_score": 0.74}, + {"submission": '{"summary":"none"}', "expected_score": 0.56}, + {"submission": 'tiny', "expected_score": 0.3}, + ] + metrics = evaluate_against_ground_truth(dataset) + self.assertEqual(metrics["count"], 20) + # Heuristic baseline keeps average error reasonably small; this can be tightened + # if/when a trained calibrator is introduced. + self.assertLessEqual(metrics["mae"], 0.15) + self.assertGreaterEqual(metrics["within_point_05_ratio"], 0.10) + if __name__ == "__main__": unittest.main()