Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 330 additions & 0 deletions quality_scorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
#!/usr/bin/env python3
"""
Multi-Dimensional Quality Scoring Algorithm for Structured Outputs.

Scores structured submissions (JSON, markdown, code, text) against a rubric,
returning a 0–1 weighted score with per-dimension feedback.

Dimensions & Weights:
- Completeness 0.30
- Format Compliance 0.20
- Coverage 0.25
- Clarity 0.15
- Validity 0.10
"""

from __future__ import annotations

import json
import re
import math
from dataclasses import dataclass, field, asdict
from typing import Any, Optional

# ── Weights ─────────────────────────────────────────────────────────────────

WEIGHTS = {
"completeness": 0.30,
"format_compliance": 0.20,
"coverage": 0.25,
"clarity": 0.15,
"validity": 0.10,
}

QUALITY_BANDS = [
(0.90, "excellent"),
(0.75, "good"),
(0.55, "acceptable"),
(0.35, "poor"),
(0.00, "rejected"),
]

DEFAULT_PASS_THRESHOLD = 0.55


# ── Format Detection ────────────────────────────────────────────────────────

def detect_format(content: str) -> str:
"""Auto-detect submission format: json, markdown, code, or text."""
stripped = content.strip()
# JSON
if stripped.startswith(("{", "[")):
try:
json.loads(stripped)
return "json"
except (json.JSONDecodeError, ValueError):
pass
# Markdown: headers, lists, code fences
md_signals = (
re.search(r"^#{1,6}\s", stripped, re.MULTILINE),
re.search(r"^[-*+]\s", stripped, re.MULTILINE),
"```" in stripped,
)
if sum(bool(s) for s in md_signals) >= 2:
return "markdown"
# Code: function/class defs, braces, imports
code_signals = (
re.search(r"^(def |class |import |from |function |const |let |var |#include)", stripped, re.MULTILINE),
re.search(r"[{};]$", stripped, re.MULTILINE),
)
if sum(bool(s) for s in code_signals) >= 1:
return "code"
return "text"


# ── Rubric ──────────────────────────────────────────────────────────────────

@dataclass
class Rubric:
"""Describes what a correct submission looks like."""
required_fields: list[str] = field(default_factory=list) # JSON keys or section headings
expected_format: Optional[str] = None # json | markdown | code | text
topic_keywords: list[str] = field(default_factory=list) # coverage keywords
min_length: int = 0
max_length: int = 100_000
expected_sections: list[str] = field(default_factory=list) # for markdown
schema: Optional[dict] = None # simple JSON schema check
pass_threshold: float = DEFAULT_PASS_THRESHOLD


# ── Scorers ─────────────────────────────────────────────────────────────────

def _score_completeness(content: str, fmt: str, rubric: Rubric) -> tuple[float, list[str]]:
"""How many required fields/sections are present."""
feedback: list[str] = []
if not rubric.required_fields:
# No rubric fields → score on basic non-emptiness & length
length_ok = rubric.min_length <= len(content) <= rubric.max_length
score = 1.0 if (content.strip() and length_ok) else 0.4
if not content.strip():
feedback.append("Submission is empty.")
if not length_ok:
feedback.append(f"Length {len(content)} outside [{rubric.min_length}, {rubric.max_length}].")
return score, feedback

found = 0
lower = content.lower()
for fld in rubric.required_fields:
if fld.lower() in lower:
found += 1
else:
feedback.append(f"Missing required field/section: '{fld}'.")
score = found / len(rubric.required_fields) if rubric.required_fields else 1.0
return score, feedback


def _score_format_compliance(content: str, fmt: str, rubric: Rubric) -> tuple[float, list[str]]:
"""Does the format match expectations?"""
feedback: list[str] = []
expected = rubric.expected_format
if expected is None:
# No specific format required → lenient
return 0.9, []

if fmt == expected:
score = 1.0
else:
score = 0.3
feedback.append(f"Expected format '{expected}', detected '{fmt}'.")

# Extra checks per format
if expected == "json" and fmt == "json":
try:
parsed = json.loads(content.strip())
# Schema check
if rubric.schema and isinstance(parsed, dict):
for key, typ in rubric.schema.items():
if key not in parsed:
score -= 0.1
feedback.append(f"JSON missing key '{key}'.")
elif typ and not isinstance(parsed[key], {"str": str, "int": int, "float": float, "list": list, "dict": dict, "bool": bool}.get(typ, object)):
score -= 0.05
feedback.append(f"Key '{key}' expected type {typ}.")
except (json.JSONDecodeError, ValueError):
score = 0.2
feedback.append("Invalid JSON.")

if expected == "markdown" and fmt == "markdown":
if rubric.expected_sections:
found_sections = re.findall(r"^#{1,6}\s+(.+)", content, re.MULTILINE)
found_lower = {s.strip().lower() for s in found_sections}
for sec in rubric.expected_sections:
if sec.lower() not in found_lower:
score -= 0.08
feedback.append(f"Missing markdown section: '{sec}'.")

return max(score, 0.0), feedback


def _score_coverage(content: str, fmt: str, rubric: Rubric) -> tuple[float, list[str]]:
"""How well does content cover the expected topics?"""
feedback: list[str] = []
if not rubric.topic_keywords:
# Heuristic: sentence count as proxy
sentences = re.split(r"[.!?]+", content)
sentences = [s for s in sentences if len(s.strip()) > 10]
if len(sentences) >= 5:
return 1.0, []
elif len(sentences) >= 2:
return 0.7, ["Content is brief; consider expanding."]
return 0.4, ["Very little substantive content."]

lower = content.lower()
hit = sum(1 for kw in rubric.topic_keywords if kw.lower() in lower)
score = hit / len(rubric.topic_keywords)
missed = [kw for kw in rubric.topic_keywords if kw.lower() not in lower]
if missed:
feedback.append(f"Missing topic coverage: {', '.join(missed[:5])}.")
return score, feedback


def _score_clarity(content: str, fmt: str, rubric: Rubric) -> tuple[float, list[str]]:
"""Readability & structure heuristics."""
feedback: list[str] = []
score = 1.0

# Average sentence length (proxy for readability)
sentences = [s.strip() for s in re.split(r"[.!?\n]+", content) if s.strip()]
if sentences:
avg_words = sum(len(s.split()) for s in sentences) / len(sentences)
if avg_words > 40:
score -= 0.2
feedback.append("Sentences are very long; consider breaking them up.")
elif avg_words > 30:
score -= 0.1
feedback.append("Some sentences are long.")

# Repeated words (sign of low quality / filler)
words = re.findall(r"\b\w{4,}\b", content.lower())
if words:
from collections import Counter
counts = Counter(words)
top_freq = counts.most_common(1)[0][1] if counts else 0
if top_freq > len(words) * 0.15 and len(words) > 20:
score -= 0.15
feedback.append("High word repetition detected.")

# Structure: has paragraphs / sections?
if len(content) > 500 and "\n" not in content:
score -= 0.15
feedback.append("Large block of text with no paragraph breaks.")

return max(score, 0.0), feedback


def _score_validity(content: str, fmt: str, rubric: Rubric) -> tuple[float, list[str]]:
"""Is the content parseable / well-formed?"""
feedback: list[str] = []
score = 1.0

if fmt == "json":
try:
json.loads(content.strip())
except (json.JSONDecodeError, ValueError) as e:
score = 0.1
feedback.append(f"JSON parse error: {e}")
elif fmt == "code":
# Check for balanced braces / brackets
opens = content.count("{") + content.count("[") + content.count("(")
closes = content.count("}") + content.count("]") + content.count(")")
if opens != closes:
score -= 0.3
feedback.append(f"Unbalanced delimiters: {opens} opens vs {closes} closes.")
# Check for syntax patterns that suggest incomplete code
if content.rstrip().endswith(","):
score -= 0.1
feedback.append("Code appears truncated (trailing comma).")
elif fmt == "markdown":
# Unclosed code fences
fences = content.count("```")
if fences % 2 != 0:
score -= 0.2
feedback.append("Unclosed code fence in markdown.")

# Universal: non-empty
if not content.strip():
return 0.0, ["Empty submission."]

# Encoding sanity
garbage = sum(1 for c in content if ord(c) > 0xFFFF or (ord(c) < 32 and c not in "\n\r\t"))
if garbage > len(content) * 0.05:
score -= 0.3
feedback.append("High proportion of non-printable characters.")

return max(score, 0.0), feedback


# ── Main Scorer ─────────────────────────────────────────────────────────────

SCORERS = {
"completeness": _score_completeness,
"format_compliance": _score_format_compliance,
"coverage": _score_coverage,
"clarity": _score_clarity,
"validity": _score_validity,
}


@dataclass
class ScoreResult:
weighted_score: float
quality_rating: str
scores: dict[str, float]
feedback: list[str]
pass_threshold: bool
detected_format: str


def score_submission(content: str, rubric: Optional[Rubric] = None) -> ScoreResult:
"""Score a structured submission against a rubric.

Args:
content: The raw submission text (JSON, markdown, code, or text).
rubric: Optional rubric describing expectations. Defaults to lenient.

Returns:
ScoreResult with weighted_score (0–1), quality_rating, per-dimension
scores, feedback list, and pass/fail.
"""
if rubric is None:
rubric = Rubric()

fmt = detect_format(content)
all_scores: dict[str, float] = {}
all_feedback: list[str] = []

for dim, scorer in SCORERS.items():
raw, fb = scorer(content, fmt, rubric)
all_scores[dim] = round(min(max(raw, 0.0), 1.0), 4)
all_feedback.extend(fb)

weighted = sum(all_scores[d] * WEIGHTS[d] for d in WEIGHTS)
weighted = round(min(max(weighted, 0.0), 1.0), 4)

rating = "rejected"
for threshold, label in QUALITY_BANDS:
if weighted >= threshold:
rating = label
break

return ScoreResult(
weighted_score=weighted,
quality_rating=rating,
scores=all_scores,
feedback=all_feedback,
pass_threshold=weighted >= rubric.pass_threshold,
detected_format=fmt,
)


def score_submission_dict(content: str, rubric: Optional[Rubric] = None) -> dict[str, Any]:
"""Same as score_submission but returns a plain dict."""
result = score_submission(content, rubric)
return asdict(result)


# ── Batch Scoring ───────────────────────────────────────────────────────────

def score_batch(submissions: list[str], rubric: Optional[Rubric] = None) -> list[dict[str, Any]]:
"""Score a list of submissions. Designed to handle 100 in <10s."""
return [score_submission_dict(s, rubric) for s in submissions]
Loading