From 6cd4ab52bd8eee409940faead623cec326921296 Mon Sep 17 00:00:00 2001 From: nguyenduc071912 Date: Wed, 27 May 2026 09:59:01 +0700 Subject: [PATCH] Add structured output quality scoring --- .gitignore | 3 + README.md | 62 +++++++++ app.py | 30 ++++- quality_scoring.py | 286 ++++++++++++++++++++++++++++++++++++++++ test_quality_scoring.py | 112 ++++++++++++++++ 5 files changed, 492 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 quality_scoring.py create mode 100644 test_quality_scoring.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..dd720c4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +__pycache__/ +*.py[cod] +data/ diff --git a/README.md b/README.md index 63e71e4..e076190 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,46 @@ curl -X POST http://localhost:8080/api/repurpose \ curl http://localhost:8080/api/usage -H "X-API-Key: cs_your_key" ``` +### Score Submission Quality +```bash +curl -X POST http://localhost:8080/api/quality-score \ + -H "X-API-Key: cs_your_key" \ + -H "Content-Type: application/json" \ + -d '{ + "submission": { + "title": "Launch checklist", + "summary": "A clear release checklist for a small SaaS launch.", + "steps": ["validate metrics", "publish docs", "notify customers"] + }, + "rubric": { + "required_fields": ["title", "summary", "steps"], + "required_keywords": ["release", "customers"], + "expected_format": "json", + "min_words": 10, + "pass_threshold": 0.7 + } + }' +``` + +The quality scorer auto-detects JSON, markdown, code, and plain text submissions. It returns: + +```json +{ + "weighted_score": 0.94, + "quality_rating": "excellent", + "scores": { + "completeness": 1.0, + "format_compliance": 1.0, + "coverage": 1.0, + "clarity": 0.75, + "validity": 1.0 + }, + "feedback": ["Detected submission format: json."], + "pass_threshold": true, + "detected_format": "json" +} +``` + ## 🎯 Platforms | Platform | Output | @@ -48,6 +88,28 @@ curl http://localhost:8080/api/usage -H "X-API-Key: cs_your_key" | `video_script` | 60s script with B-roll suggestions | | `summary` | 2-3 sentence summary | +## Quality Scoring + +`quality_scoring.py` implements deterministic 0-1 scoring across five weighted dimensions: + +| Dimension | Weight | +|-----------|--------| +| Completeness | 0.30 | +| Format Compliance | 0.20 | +| Coverage | 0.25 | +| Clarity | 0.15 | +| Validity | 0.10 | + +The scorer supports optional rubric keys: + +| Key | Purpose | +|-----|---------| +| `required_fields` | Required JSON keys or text labels | +| `required_keywords` | Required coverage terms | +| `min_words` | Completeness length baseline | +| `expected_format` | One of `json`, `markdown`, `code`, `text` | +| `pass_threshold` | Minimum weighted score for pass/fail | + ## 💰 Pricing | Plan | Price | Requests/mo | Platforms | diff --git a/app.py b/app.py index 99235a3..d44bcae 100644 --- a/app.py +++ b/app.py @@ -9,7 +9,7 @@ import json from datetime import datetime from pathlib import Path -from typing import Optional +from typing import Any, Optional from fastapi import FastAPI, HTTPException, Depends, Header from fastapi.middleware.cors import CORSMiddleware @@ -17,6 +17,7 @@ from pydantic import BaseModel, Field from middleware import validate_api_key, track_usage, get_usage_stats, get_or_create_key, PLANS +from quality_scoring import score_submission app = FastAPI( title="ContentSplit", @@ -57,6 +58,26 @@ class RepurposeResponse(BaseModel): created_at: str +class QualityScoreRequest(BaseModel): + submission: Any = Field(..., description="Submission content as JSON, markdown, code, or text") + rubric: dict[str, Any] = Field( + default_factory=dict, + description=( + "Optional scoring rubric. Supported keys: required_fields, " + "required_keywords, min_words, expected_format, pass_threshold" + ), + ) + + +class QualityScoreResponse(BaseModel): + weighted_score: float + quality_rating: str + scores: dict[str, float] + feedback: list[str] + pass_threshold: bool + detected_format: str + + # ── Content Generation (using prompts, model-agnostic) ──────────────────── PLATFORM_PROMPTS = { @@ -376,6 +397,13 @@ async def repurpose_content(req: RepurposeRequest, user: dict = Depends(validate ) +@app.post("/api/quality-score", response_model=QualityScoreResponse) +async def quality_score(req: QualityScoreRequest, user: dict = Depends(validate_api_key)): + """Score structured submissions against a configurable rubric.""" + track_usage(user.get("key", "anonymous")) + return score_submission(req.submission, req.rubric) + + @app.get("/api/platforms") async def list_platforms(): """List available target platforms.""" diff --git a/quality_scoring.py b/quality_scoring.py new file mode 100644 index 0000000..058bedd --- /dev/null +++ b/quality_scoring.py @@ -0,0 +1,286 @@ +""" +Quality scoring for structured submissions. + +The scorer is intentionally deterministic so it can run quickly without an +external AI dependency and produce stable feedback in tests. +""" + +from __future__ import annotations + +import ast +import json +import re +from dataclasses import dataclass +from enum import Enum +from typing import Any + + +class SubmissionFormat(str, Enum): + JSON = "json" + MARKDOWN = "markdown" + CODE = "code" + TEXT = "text" + + +DIMENSION_WEIGHTS = { + "completeness": 0.30, + "format_compliance": 0.20, + "coverage": 0.25, + "clarity": 0.15, + "validity": 0.10, +} + +QUALITY_RATINGS = ( + (0.85, "excellent"), + (0.70, "good"), + (0.50, "fair"), + (0.0, "needs_improvement"), +) + +DEFAULT_RUBRIC = { + "required_fields": [], + "required_keywords": [], + "min_words": 20, + "expected_format": None, + "pass_threshold": 0.70, +} + + +@dataclass(frozen=True) +class SubmissionProfile: + raw_text: str + detected_format: SubmissionFormat + parsed_json: Any | None + word_count: int + line_count: int + headings_count: int + bullet_count: int + code_signal_count: int + + +def score_submission(submission: Any, rubric: dict[str, Any] | None = None) -> dict[str, Any]: + """Score a submission against a rubric and return a weighted 0-1 result.""" + merged_rubric = _normalize_rubric(rubric) + profile = _profile_submission(submission) + + scores = { + "completeness": _score_completeness(profile, merged_rubric), + "format_compliance": _score_format_compliance(profile, merged_rubric), + "coverage": _score_coverage(profile, merged_rubric), + "clarity": _score_clarity(profile), + "validity": _score_validity(profile), + } + weighted_score = round( + sum(scores[dimension] * weight for dimension, weight in DIMENSION_WEIGHTS.items()), + 4, + ) + + return { + "weighted_score": weighted_score, + "quality_rating": _quality_rating(weighted_score), + "scores": scores, + "feedback": _build_feedback(profile, merged_rubric, scores), + "pass_threshold": weighted_score >= merged_rubric["pass_threshold"], + "detected_format": profile.detected_format.value, + } + + +def _normalize_rubric(rubric: dict[str, Any] | None) -> dict[str, Any]: + normalized = {**DEFAULT_RUBRIC, **(rubric or {})} + normalized["required_fields"] = _as_lower_list(normalized.get("required_fields", [])) + normalized["required_keywords"] = _as_lower_list(normalized.get("required_keywords", [])) + normalized["min_words"] = max(1, int(normalized.get("min_words") or DEFAULT_RUBRIC["min_words"])) + normalized["pass_threshold"] = float(normalized.get("pass_threshold") or DEFAULT_RUBRIC["pass_threshold"]) + + expected_format = normalized.get("expected_format") + if expected_format: + normalized["expected_format"] = str(expected_format).lower() + + return normalized + + +def _as_lower_list(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, str): + return [value.lower()] + return [str(item).lower() for item in value] + + +def _profile_submission(submission: Any) -> SubmissionProfile: + parsed_json = submission if isinstance(submission, (dict, list)) else None + raw_text = json.dumps(submission, sort_keys=True) if parsed_json is not None else str(submission or "") + + if parsed_json is None: + try: + parsed_json = json.loads(raw_text) + except (TypeError, json.JSONDecodeError): + parsed_json = None + + detected_format = _detect_format(raw_text, parsed_json) + words = re.findall(r"[A-Za-z0-9_'-]+", raw_text) + lines = [line for line in raw_text.splitlines() if line.strip()] + + return SubmissionProfile( + raw_text=raw_text, + detected_format=detected_format, + parsed_json=parsed_json, + word_count=len(words), + line_count=len(lines), + headings_count=len(re.findall(r"^\s{0,3}#{1,6}\s+", raw_text, re.MULTILINE)), + bullet_count=len(re.findall(r"^\s*[-*+]\s+", raw_text, re.MULTILINE)), + code_signal_count=len(re.findall(r"\b(def|class|function|const|let|var|import|return)\b|[{};]", raw_text)), + ) + + +def _detect_format(raw_text: str, parsed_json: Any | None) -> SubmissionFormat: + if parsed_json is not None: + return SubmissionFormat.JSON + + stripped = raw_text.strip() + if not stripped: + return SubmissionFormat.TEXT + + markdown_signals = [ + bool(re.search(r"^\s{0,3}#{1,6}\s+", raw_text, re.MULTILINE)), + bool(re.search(r"^\s*[-*+]\s+", raw_text, re.MULTILINE)), + "```" in raw_text, + bool(re.search(r"\[[^\]]+\]\([^)]+\)", raw_text)), + ] + if sum(markdown_signals) >= 2: + return SubmissionFormat.MARKDOWN + + try: + ast.parse(raw_text) + if re.search(r"\b(def|class|import|return)\b", raw_text): + return SubmissionFormat.CODE + except SyntaxError: + pass + + if re.search(r"\b(function|const|let|var|return)\b", raw_text) and re.search(r"[{};]", raw_text): + return SubmissionFormat.CODE + + return SubmissionFormat.TEXT + + +def _score_completeness(profile: SubmissionProfile, rubric: dict[str, Any]) -> float: + required_fields = rubric["required_fields"] + if required_fields: + if isinstance(profile.parsed_json, dict): + present = sum(1 for field in required_fields if field in _flatten_json_keys(profile.parsed_json)) + else: + lower_text = profile.raw_text.lower() + present = sum(1 for field in required_fields if field in lower_text) + field_score = present / len(required_fields) + else: + field_score = 1.0 + + length_score = min(1.0, profile.word_count / rubric["min_words"]) + return round((field_score * 0.7) + (length_score * 0.3), 4) + + +def _score_format_compliance(profile: SubmissionProfile, rubric: dict[str, Any]) -> float: + expected_format = rubric.get("expected_format") + if expected_format: + return 1.0 if profile.detected_format.value == expected_format else 0.25 + + if profile.detected_format == SubmissionFormat.JSON: + return 1.0 if profile.parsed_json is not None else 0.0 + if profile.detected_format == SubmissionFormat.MARKDOWN: + return min(1.0, 0.45 + profile.headings_count * 0.25 + profile.bullet_count * 0.05) + if profile.detected_format == SubmissionFormat.CODE: + return 0.9 if _is_parseable_python(profile.raw_text) else 0.75 + return 0.85 if profile.word_count >= 10 else 0.45 + + +def _score_coverage(profile: SubmissionProfile, rubric: dict[str, Any]) -> float: + keywords = rubric["required_keywords"] + if not keywords: + return 1.0 + + lower_text = profile.raw_text.lower() + matched = sum(1 for keyword in keywords if keyword in lower_text) + return round(matched / len(keywords), 4) + + +def _score_clarity(profile: SubmissionProfile) -> float: + if profile.word_count == 0: + return 0.0 + + avg_words_per_line = profile.word_count / max(1, profile.line_count) + structure_bonus = min(0.25, (profile.headings_count * 0.08) + (profile.bullet_count * 0.03)) + + if avg_words_per_line <= 25: + readability = 0.75 + elif avg_words_per_line <= 40: + readability = 0.60 + else: + readability = 0.45 + + return round(min(1.0, readability + structure_bonus), 4) + + +def _score_validity(profile: SubmissionProfile) -> float: + if not profile.raw_text.strip(): + return 0.0 + if profile.detected_format == SubmissionFormat.JSON: + return 1.0 if profile.parsed_json is not None else 0.0 + if profile.detected_format == SubmissionFormat.CODE: + return 0.95 if _is_parseable_python(profile.raw_text) else 0.75 + return 0.9 + + +def _flatten_json_keys(value: Any, prefix: str = "") -> set[str]: + keys: set[str] = set() + if isinstance(value, dict): + for key, child in value.items(): + normalized_key = str(key).lower() + dotted = f"{prefix}.{normalized_key}" if prefix else normalized_key + keys.add(normalized_key) + keys.add(dotted) + keys.update(_flatten_json_keys(child, dotted)) + elif isinstance(value, list): + for item in value: + keys.update(_flatten_json_keys(item, prefix)) + return keys + + +def _is_parseable_python(raw_text: str) -> bool: + try: + ast.parse(raw_text) + except SyntaxError: + return False + return True + + +def _quality_rating(weighted_score: float) -> str: + for threshold, rating in QUALITY_RATINGS: + if weighted_score >= threshold: + return rating + return "needs_improvement" + + +def _build_feedback(profile: SubmissionProfile, rubric: dict[str, Any], scores: dict[str, float]) -> list[str]: + feedback: list[str] = [f"Detected submission format: {profile.detected_format.value}."] + + if scores["completeness"] < 0.7: + feedback.append("Completeness is low; add the required fields and enough detail to satisfy the rubric.") + else: + feedback.append("Completeness is strong against the configured rubric.") + + if scores["coverage"] < 1.0 and rubric["required_keywords"]: + lower_text = profile.raw_text.lower() + missing = [keyword for keyword in rubric["required_keywords"] if keyword not in lower_text] + feedback.append(f"Missing required coverage keywords: {', '.join(missing)}.") + + expected_format = rubric.get("expected_format") + if expected_format and profile.detected_format.value != expected_format: + feedback.append(f"Expected {expected_format} but detected {profile.detected_format.value}.") + + if scores["clarity"] < 0.65: + feedback.append("Clarity can improve with shorter lines, headings, or bullet points.") + + if scores["validity"] < 0.8: + feedback.append("Validity is weak; check syntax and structural correctness.") + + return feedback diff --git a/test_quality_scoring.py b/test_quality_scoring.py new file mode 100644 index 0000000..58d2c91 --- /dev/null +++ b/test_quality_scoring.py @@ -0,0 +1,112 @@ +import time +import unittest + +from quality_scoring import score_submission + + +class QualityScoringTests(unittest.TestCase): + def test_scores_json_submission_with_required_fields(self): + result = score_submission( + { + "title": "Launch checklist", + "summary": "A clear release checklist for a small SaaS launch.", + "steps": ["validate metrics", "publish docs", "notify customers"], + }, + { + "required_fields": ["title", "summary", "steps"], + "required_keywords": ["release", "customers"], + "expected_format": "json", + "min_words": 10, + }, + ) + + self.assertEqual(result["detected_format"], "json") + self.assertGreaterEqual(result["weighted_score"], 0.85) + self.assertEqual(result["quality_rating"], "excellent") + self.assertTrue(result["pass_threshold"]) + self.assertEqual(set(result["scores"]), {"completeness", "format_compliance", "coverage", "clarity", "validity"}) + + def test_penalizes_missing_keywords_and_wrong_format(self): + result = score_submission( + "Short plain answer without the requested terms.", + { + "required_fields": ["title", "summary"], + "required_keywords": ["pricing", "onboarding", "retention"], + "expected_format": "json", + "min_words": 30, + }, + ) + + self.assertEqual(result["detected_format"], "text") + self.assertLess(result["scores"]["coverage"], 1.0) + self.assertLess(result["scores"]["format_compliance"], 0.5) + self.assertFalse(result["pass_threshold"]) + self.assertTrue(any("Missing required coverage keywords" in item for item in result["feedback"])) + + def test_detects_markdown_and_rewards_structure(self): + submission = """# Release Notes + +- Added onboarding checklist +- Improved pricing page copy +- Clarified retention metrics + +This release improves the activation path and makes the upgrade flow easier to understand. +""" + result = score_submission( + submission, + { + "required_keywords": ["onboarding", "pricing", "retention"], + "expected_format": "markdown", + "min_words": 20, + }, + ) + + self.assertEqual(result["detected_format"], "markdown") + self.assertGreaterEqual(result["scores"]["clarity"], 0.8) + self.assertTrue(result["pass_threshold"]) + + def test_detects_code_and_validity(self): + code = """def normalize_score(value): + if value < 0: + return 0 + if value > 1: + return 1 + return value +""" + result = score_submission(code, {"expected_format": "code", "required_keywords": ["return"], "min_words": 5}) + + self.assertEqual(result["detected_format"], "code") + self.assertGreaterEqual(result["scores"]["validity"], 0.9) + self.assertTrue(result["pass_threshold"]) + + def test_scores_100_submissions_under_10_seconds(self): + submissions = [ + { + "title": f"Scorecard {index}", + "summary": "Structured output covering completeness, clarity, and validity.", + "details": ["coverage", "format compliance", "feedback"], + } + for index in range(100) + ] + + started = time.perf_counter() + results = [ + score_submission( + submission, + { + "required_fields": ["title", "summary", "details"], + "required_keywords": ["coverage", "validity"], + "expected_format": "json", + "min_words": 12, + }, + ) + for submission in submissions + ] + elapsed = time.perf_counter() - started + + self.assertEqual(len(results), 100) + self.assertLess(elapsed, 10) + + +if __name__ == "__main__": + unittest.main()