diff --git a/.test/CUSTOMIZATION_GUIDE.md b/.test/CUSTOMIZATION_GUIDE.md new file mode 100644 index 00000000..0ffc2302 --- /dev/null +++ b/.test/CUSTOMIZATION_GUIDE.md @@ -0,0 +1,267 @@ +# Customization Guide: Steering Skill Optimization with `--focus` + +## 1. Overview + +The `--focus` flag lets you steer skill optimization with natural language. It uses an LLM to make targeted adjustments to `manifest.yaml` and `ground_truth.yaml` before GEPA runs, so the optimizer prioritizes what matters to you. + +### Quick Start + +```bash +# Single focus area +uv run python .test/scripts/optimize.py databricks-asset-bundles \ + --focus "prefix all catalogs with customer_ prefix" + +# Multiple focus areas +uv run python .test/scripts/optimize.py databricks-asset-bundles \ + --focus "prefix all catalogs with customer_ prefix" \ + --focus "always use serverless compute" + +# Focus areas from a file +uv run python .test/scripts/optimize.py databricks-asset-bundles \ + --focus-file my_focus_areas.txt + +# Dry run to see what would change +uv run python .test/scripts/optimize.py databricks-asset-bundles \ + --focus "prefix all catalogs with customer_ prefix" --dry-run + +# Combined with presets +uv run python .test/scripts/optimize.py databricks-asset-bundles \ + --focus "use DLT for all pipeline examples" --preset quick +``` + +### What Happens + +1. The LLM reads SKILL.md, `manifest.yaml`, and `ground_truth.yaml` +2. It adds `[FOCUS]`-prefixed guidelines to the manifest +3. It adjusts relevant existing test cases (expectations, patterns, guidelines) +4. It generates 2-3 new test cases targeting the focus area +5. GEPA optimization then runs with these enhanced evaluation criteria + +--- + +## 2. How Each `ground_truth.yaml` Field Impacts Optimization + +### `outputs.response` - Reference Answer + +**What it is:** The ideal response the judge compares agent output against. + +**How it steers optimization:** The quality judge uses this as the gold standard. If the reference response includes specific patterns (e.g., parameterized catalogs), the optimizer learns to produce those patterns. + +**Example focus prompt:** `"All examples should use variable substitution for catalog names"` + +**Before:** +```yaml +outputs: + response: | + catalog: my_catalog +``` + +**After:** +```yaml +outputs: + response: | + catalog: ${var.catalog_prefix}_catalog +``` + +### `expectations.expected_facts` - Substring Assertions + +**What it is:** Exact substrings that must appear in the response. Checked deterministically (case-insensitive). + +**How it steers optimization:** Failed facts tell the optimizer exactly what content is missing. Adding facts about your focus area forces the optimizer to include that content. + +**Example focus prompt:** `"Must explain the MEASURE() function wrapping"` + +**Before:** +```yaml +expected_facts: + - "Defines variables with default values" +``` + +**After:** +```yaml +expected_facts: + - "Defines variables with default values" + - "All catalog values use ${var.catalog_prefix} variable" +``` + +### `expectations.expected_patterns` - Regex Patterns + +**What it is:** Regular expressions checked with `re.findall(pattern, response, re.IGNORECASE)`. Each has a `min_count` (minimum number of matches required) and a `description`. + +**How it steers optimization:** Pattern failures are binary and precise. Adding patterns for your focus area creates hard requirements the optimizer must satisfy. + +**Example focus prompt:** `"Prefix all catalogs with a configurable prefix variable"` + +**Before:** +```yaml +expected_patterns: + - pattern: 'catalog:' + min_count: 2 + description: Defines catalog variable +``` + +**After:** +```yaml +expected_patterns: + - pattern: 'catalog:' + min_count: 2 + description: Defines catalog variable + - pattern: '\$\{var\.catalog_prefix\}' + min_count: 1 + description: Uses catalog prefix variable +``` + +### `expectations.guidelines` - LLM Judge Rules + +**What it is:** Natural-language evaluation criteria passed to the LLM judge. The judge scores how well the response follows each guideline. + +**How it steers optimization:** Guidelines are the most flexible steering mechanism. They influence the quality score (30% of total) and effectiveness score (40% of total). + +**Example focus prompt:** `"Must parameterize catalog names with a prefix variable"` + +**Before:** +```yaml +guidelines: + - "Must define variables at root level with defaults" +``` + +**After:** +```yaml +guidelines: + - "Must define variables at root level with defaults" + - "Must parameterize catalog names with a prefix variable" +``` + +### `metadata.tags` - Categorization + +**What it is:** Tags for organizing and filtering test cases. No direct impact on optimization scoring. + +**How it steers optimization:** Tags help identify which test cases were generated or adjusted by focus. Focus-generated test cases get tags matching the focus area. + +--- + +## 3. How Each `manifest.yaml` Field Impacts Optimization + +### `scorers.default_guidelines` - Global Guidelines + +**What it is:** Guidelines applied to ALL test cases that don't define their own guidelines. These are merged with per-test-case guidelines by the quality judge. + +**How it steers optimization:** Adding `[FOCUS]` guidelines here affects every evaluation, not just specific test cases. This is the broadest way to steer optimization. + +**What `--focus` does:** Prepends `[FOCUS]` to each new guideline and appends to the list. Duplicates are skipped. + +**Before:** +```yaml +default_guidelines: + - "Response must address the user's request completely" + - "YAML examples must be valid and properly indented" +``` + +**After:** +```yaml +default_guidelines: + - "Response must address the user's request completely" + - "YAML examples must be valid and properly indented" + - "[FOCUS] All catalog references must use a configurable prefix variable" + - "[FOCUS] Variable substitution syntax ${var.prefix} must be demonstrated" +``` + +### `quality_gates` - Pass/Fail Thresholds + +**What it is:** Minimum score thresholds for each scorer. If a score falls below the gate, the test case fails. + +**How it steers optimization:** Higher thresholds make the optimizer work harder to satisfy that criterion. `--focus` can only make thresholds stricter (higher), never looser. + +**Before:** +```yaml +quality_gates: + pattern_adherence: 0.9 + execution_success: 0.8 +``` + +**After (if focus demands stricter pattern checking):** +```yaml +quality_gates: + pattern_adherence: 0.95 + execution_success: 0.8 +``` + +--- + +## 4. Prompting Examples + +### Scenario: Customer wants all catalogs prefixed + +```bash +--focus "When creating DABs, prefix all catalogs and schemas with a customer-specific prefix using variables" +``` + +**What changes:** +- **manifest.yaml**: Adds `[FOCUS] All catalog/schema references must use ${var.prefix}_catalog pattern` +- **ground_truth.yaml**: Existing multi-env test cases get new `expected_patterns` for `${var.prefix}` syntax; 2-3 new test cases about prefix configuration + +### Scenario: Customer wants DLT examples in DABs + +```bash +--focus "Include Delta Live Tables (DLT) pipeline examples in all DABs configurations" +``` + +**What changes:** +- **manifest.yaml**: Adds `[FOCUS] DABs examples should include DLT pipeline resource definitions` +- **ground_truth.yaml**: Existing pipeline test cases get DLT-specific patterns; new test cases cover DLT pipeline YAML configuration + +### Scenario: Customer wants stricter SQL validation + +```bash +--focus "All SQL examples must use parameterized queries, never string interpolation" +``` + +**What changes:** +- **manifest.yaml**: Adds `[FOCUS] SQL examples must use parameterized queries with bind variables` +- **quality_gates**: `pattern_adherence` may increase (e.g., 0.9 -> 0.95) +- **ground_truth.yaml**: SQL-related test cases get patterns checking for parameterized syntax + +--- + +## 5. Reviewing and Rolling Back Changes + +### Identifying Focus-Generated Content + +- **Guidelines**: Look for the `[FOCUS]` prefix in `manifest.yaml` `default_guidelines` +- **Test cases**: Check `metadata.source: generated_from_focus` in `ground_truth.yaml` +- **Adjusted responses**: Check `metadata._focus_original_response` for the pre-focus original + +### Rolling Back + +**Remove focus guidelines from manifest:** +```bash +# Edit manifest.yaml, delete lines starting with "[FOCUS]" +grep -v "^\s*- \"\[FOCUS\]" .test/skills//manifest.yaml > tmp && mv tmp .test/skills//manifest.yaml +``` + +**Remove focus-generated test cases:** +```python +# In Python +import yaml +with open(".test/skills//ground_truth.yaml") as f: + data = yaml.safe_load(f) +data["test_cases"] = [ + tc for tc in data["test_cases"] + if tc.get("metadata", {}).get("source") != "generated_from_focus" +] +with open(".test/skills//ground_truth.yaml", "w") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False) +``` + +**Restore original responses (for adjusted test cases):** +```python +for tc in data["test_cases"]: + original = tc.get("metadata", {}).pop("_focus_original_response", None) + if original: + tc["outputs"]["response"] = original +``` + +**Or use git:** +```bash +git checkout -- .test/skills//manifest.yaml .test/skills//ground_truth.yaml +``` diff --git a/.test/README.md b/.test/README.md index f9ae7c9c..e99c418a 100644 --- a/.test/README.md +++ b/.test/README.md @@ -187,7 +187,7 @@ uv run python .test/scripts/optimize.py [options] |------|---------|---------|---------| | `--gen-model` | `GEPA_GEN_LM` | `databricks/databricks-claude-sonnet-4-6` | Generates responses in proxy evaluator | | `--reflection-lm` | `GEPA_REFLECTION_LM` | `databricks/databricks-claude-opus-4-6` | GEPA's reflection/mutation model | -| `--judge-model` | `GEPA_JUDGE_LM` | `databricks/databricks-claude-sonnet-4-6` | MLflow quality/regression judges | +| `--judge-model` | `GEPA_JUDGE_LM` | `databricks/databricks-claude-sonnet-4-6` | MLflow quality judge | Proxy evaluator models use [litellm provider prefixes](https://docs.litellm.ai/docs/providers): `databricks/`, `openai/`, `anthropic/`. @@ -277,8 +277,8 @@ test_cases: | Field | Required | Description | |-------|----------|-------------| | `inputs.prompt` | Yes | The user question | -| `expectations.expected_facts` | Yes | Facts the response must contain (checked by quality judge) | -| `expectations.expected_patterns` | No | Regex patterns checked deterministically | +| `expectations.expected_facts` | Yes | Facts the response must contain (checked by quality judge + deterministic substring match) | +| `expectations.expected_patterns` | No | Regex patterns checked deterministically (feeds `fact_coverage`/`pattern_adherence` scores) | | `expectations.guidelines` | No | Soft rules for the quality judge | | `expectations.trace_expectations` | No | Agent behavioral validation (only with `--agent-eval`) | | `outputs.response` | No | Reference answer for judge comparison | @@ -311,6 +311,80 @@ Without `--tool-modules`, all skills are included regardless. Available modules: --- +## Evaluation & Scoring + +### SkillBench evaluator (default) + +Each candidate skill is evaluated per-task using a WITH vs WITHOUT comparison: + +1. **Generate WITH-skill response** — LLM generates with SKILL.md in context +2. **Generate WITHOUT-skill response** — LLM generates without skill (cached) +3. **Three focused judges** — each returns categorical `"excellent"` / `"acceptable"` / `"poor"` verdicts: + - **Correctness judge** (WITH + WITHOUT) — facts, API references, code syntax accuracy + - **Completeness judge** (WITH + WITHOUT) — all parts addressed, expected info present + - **Guideline adherence judge** (WITH only) — Databricks-specific patterns and practices + - **Regression judge** (conditional) — fires only when effectiveness delta < -0.05 +4. **Deterministic assertions** (0 LLM calls) — `assertions.py` checks `expected_facts` (substring match) and `expected_patterns` (regex match) against both responses + +**Cost per task:** 5 LLM calls (correctness×2 + completeness×2 + guideline_adherence×1). WITHOUT calls are cached, so subsequent iterations cost only 3 calls. + +**Scoring weights:** + +| Component | Weight | Source | +|-----------|--------|--------| +| Effectiveness delta | 30% | Mean of (correctness_delta + completeness_delta) | +| Quality composite | 20% | Mean of (correctness + completeness + guideline_adherence) WITH scores | +| Fact/pattern coverage | 15% | Deterministic assertions from `assertions.py` | +| Guideline adherence | 10% | Dedicated weight for Databricks patterns | +| Token efficiency | 10% | Smaller candidates score higher | +| Structure | 5% | Syntax validation (Python, SQL, no hallucinated APIs) | +| Regression penalty | -10% | Explicit penalty when regression_judge detects harm | + +**Categorical-to-float conversion:** `excellent=1.0`, `acceptable=0.6`, `poor=0.0`. The nonlinear scale incentivizes GEPA to push from "acceptable" to "excellent" (0.4 gap). + +### How GEPA uses evaluation feedback + +GEPA's reflection LM reads `side_info` rendered as markdown headers. Key fields: + +- **`Judge_correctness_with`** / **`Judge_correctness_without`** — per-dimension accuracy feedback with categorical verdicts +- **`Judge_completeness_with`** / **`Judge_completeness_without`** — per-dimension coverage feedback +- **`Judge_guideline_adherence`** — pattern compliance feedback (WITH only) +- **`Judge_effectiveness`** — per-dimension deltas (`correctness_delta`, `completeness_delta`, `overall_delta`) +- **`Regression_Analysis`** — specific "what to fix" guidance (only when regression detected) +- **`Missing_Facts`** / **`Missing_Patterns`** — exact list of what the skill should add (from assertions) +- **`Passed_Facts`** / **`Passed_Patterns`** — what the skill already covers +- **`scores`** — feeds GEPA's multi-objective Pareto frontier (`correctness_with`, `completeness_with`, `guideline_adherence`, `quality_composite`, etc.) + +This gives GEPA three independent, actionable signals. A mutation that fixes correctness but doesn't help completeness shows clear movement on one dimension, guiding the next mutation. + +### Why three judges (not one, not five)? + +The previous single quality judge collapsed 5 criteria into one 0.0–1.0 score. When a mutation improved correctness but hurt completeness, the score barely moved — GEPA couldn't distinguish which dimension improved. Three judges cover the core evaluation dimensions without excessive cost: + +1. **Correctness** → fix errors (API syntax, deprecated patterns) +2. **Completeness** → add missing content +3. **Guideline adherence** → align with Databricks patterns + `--focus` areas + +Deterministic assertions in `assertions.py` remain for precise, structured `Missing_Facts` lists at zero LLM cost. + +### Agent evaluator (`--agent-eval`) + +Runs a real Claude Code agent and adds tool-call scoring: + +| Component | Weight | +|-----------|--------| +| Content quality | 20% | +| Skill effectiveness | 20% | +| Tool call correctness | 20% | +| Behavioral compliance | 15% | +| Execution success | 10% | +| Tool call efficiency | 10% | +| Token efficiency | 5% | + +The agent evaluator also uses `assertions.py` for structured `Missing_Facts`/`Missing_Patterns` feedback. Tool-call judges use MLflow's `ToolCallCorrectness`/`ToolCallEfficiency` when available, falling back to deterministic trace scorers. + +--- + ## Project Structure ``` @@ -325,8 +399,9 @@ Without `--tool-modules`, all skills are included regardless. Available modules: │ ├── runner.py # Multi-pass GEPA orchestrator │ ├── skillbench_evaluator.py # Fast proxy evaluator (WITH vs WITHOUT) │ ├── agent_evaluator.py # Real Claude Code agent evaluator +│ ├── assertions.py # Deterministic fact/pattern assertions (zero LLM cost) │ ├── assessment_fetcher.py # MLflow assessment injection -│ ├── judges.py # MLflow judge factories + fallback chain +│ ├── judges.py # MLflow quality judge factory + fallback chain │ ├── config.py # Presets, model registration │ ├── splitter.py # Train/val dataset splitting │ ├── tools.py # MCP tool description extraction diff --git a/.test/TECHNICAL.md b/.test/TECHNICAL.md index 191a8dfc..b9fa727b 100644 --- a/.test/TECHNICAL.md +++ b/.test/TECHNICAL.md @@ -14,7 +14,7 @@ For setup instructions and CLI usage, see [README.md](README.md). - [Agent Evaluator](#agent-evaluator) - [GEPA Optimization Loop](#gepa-optimization-loop) - [Multi-Pass Optimization](#multi-pass-optimization) -- [MLflow Judges](#mlflow-judges) +- [Judges & Assertions](#judges--assertions) - [MLflow Assessment Injection](#mlflow-assessment-injection) - [MLflow Tracing Integration](#mlflow-tracing-integration) - [Component Scaling](#component-scaling) @@ -83,23 +83,35 @@ The WITHOUT-skill response is **cached by prompt hash** — since the model and ## Proxy Evaluator (SkillBench) -The default evaluator (`skillbench_evaluator.py`) uses `litellm.completion` to generate responses and MLflow judges to score them. It's fast (~5 LLM calls per task per iteration) but doesn't test actual tool usage. +The default evaluator (`skillbench_evaluator.py`) uses `litellm.completion` to generate responses, an MLflow quality judge to score them, and deterministic assertions to check fact/pattern coverage. It's fast (~3 LLM calls per task per iteration) but doesn't test actual tool usage. ### Per-task evaluation flow 1. **Generate WITH-skill response** — `litellm.completion` with skill + tool descriptions as system context, temperature=0 2. **Generate WITHOUT-skill response** — Same prompt, no skill. Cached by prompt hash (computed once, reused across all GEPA iterations) -3. **Judge scoring** — `quality_judge` scores both responses against `expected_facts`, `expected_patterns`, and `guidelines`. WITHOUT-skill judge results are also cached. -4. **Compute composite score** — Weighted combination of effectiveness delta, absolute quality, structure validation, and token efficiency +3. **Multi-judge scoring** — Three focused judges score responses with categorical `"excellent"` / `"acceptable"` / `"poor"` verdicts: + - **Correctness judge** — scores WITH + WITHOUT (WITHOUT cached). Evaluates factual accuracy, API correctness, code syntax. + - **Completeness judge** — scores WITH + WITHOUT (WITHOUT cached). Evaluates question coverage, expected facts/patterns presence. + - **Guideline adherence judge** — scores WITH only (meaningless without skill). Evaluates Databricks patterns, conventions, `--focus` areas. + - **Regression judge** — fires only when effectiveness delta < -0.05. Identifies what the skill broke. +4. **Deterministic assertions** — `assertions.py` runs binary pass/fail checks for each `expected_fact` (substring match) and `expected_pattern` (regex match) on both WITH and WITHOUT responses. Zero LLM cost. +5. **Compute composite score** — Weighted combination of per-dimension effectiveness deltas, quality composite, fact/pattern coverage, guideline adherence, structure validation, token efficiency, and regression penalty. + +**Cost per task:** 5 LLM calls initially (correctness×2 + completeness×2 + guideline_adherence×1). After caching WITHOUT calls: 3 LLM calls per task on subsequent iterations. + +**Categorical-to-float conversion:** `excellent=1.0`, `acceptable=0.6`, `poor=0.0`. Nonlinear: "acceptable" contributes positively (above 0.5) but the 0.4 gap to "excellent" incentivizes GEPA to push higher. ### Proxy scoring weights | Weight | Dimension | Source | |--------|-----------|--------| -| **40%** | Skill Effectiveness | `quality_with - quality_without` (the delta) | -| **30%** | Absolute Quality | `quality_with` score from judge | +| **30%** | Effectiveness Delta | Mean of (correctness_delta + completeness_delta) | +| **20%** | Quality Composite | Mean of (correctness + completeness + guideline_adherence) WITH scores | +| **15%** | Fact/Pattern Coverage | Deterministic assertions from `assertions.py` | +| **10%** | Guideline Adherence | Dedicated weight for Databricks patterns | | **5%** | Structure | Python/SQL syntax validation | -| **25%** | Token Efficiency | Smaller = higher score (bonus up to 1.15x) | +| **10%** | Token Efficiency | Smaller = higher score (bonus up to 1.15x) | +| **-10%** | Regression Penalty | Explicit penalty when regression_judge detects harm | ### Rate limiting @@ -209,35 +221,66 @@ The key insight: because `side_info` contains **full judge rationale** (not trun ```python side_info = { "Task": "Create a metric view for order analytics...", - "Judge_quality_with": { - "score": 0.65, - "rationale": "Correctly uses CREATE OR REPLACE VIEW but misses " - "MEASURE() wrapping. Pattern adherence: 2/3. Fact coverage: 3/5." - }, - "Judge_quality_without": { - "score": 0.2, - "rationale": "Without the skill, invented non-existent CREATE METRIC VIEW " - "syntax. Only 1/5 expected facts present." - }, + + # Per-dimension judge feedback — GEPA sees each as a separate section + "Judge_correctness_with": {"verdict": "acceptable", "score": 0.6, "rationale": "..."}, + "Judge_correctness_without": {"verdict": "poor", "score": 0.0, "rationale": "..."}, + "Judge_completeness_with": {"verdict": "excellent", "score": 1.0, "rationale": "..."}, + "Judge_completeness_without":{"verdict": "poor", "score": 0.0, "rationale": "..."}, + "Judge_guideline_adherence": {"verdict": "acceptable", "score": 0.6, "rationale": "..."}, + + # Per-dimension effectiveness deltas "Judge_effectiveness": { "verdict": "improved", - "delta": 0.45, + "correctness_delta": +0.6, + "completeness_delta": +1.0, + "overall_delta": +0.8, + }, + + # Regression analysis (only when regression detected) + "Regression_Analysis": {"rationale": "..."}, # from regression_judge + + # Assertion-based structured feedback — GEPA renders each as a markdown header + "Missing_Facts": ["Missing: MEASURE() function for querying metric views"], + "Missing_Patterns": ["Found 0 matches (need >=1)"], # pattern_MEASURE\( + "Passed_Facts": [ + "Found: Uses CREATE OR REPLACE VIEW with WITH METRICS LANGUAGE YAML", + "Found: Defines measures with name and expr using aggregate functions", + ], + "Passed_Patterns": ["Found 1 matches (need >=1)"], # WITH METRICS LANGUAGE YAML + # skill_md_specific_info — only shown when reflecting on the skill component + "skill_md_specific_info": { + "Assertion_Diagnostics": "NEEDS_SKILL: fact — 'MEASURE() function for querying metric views'", + "Regressions": "", }, - "Scores": { - "effectiveness": 0.45, - "quality_with": 0.65, + "scores": { + "correctness_with": 0.6, + "correctness_without": 0.0, + "completeness_with": 1.0, + "completeness_without": 0.0, + "guideline_adherence": 0.6, + "quality_composite": 0.73, # mean of (0.6 + 1.0 + 0.6) / 3 + "correctness_delta": 0.6, + "completeness_delta": 1.0, + "skill_effectiveness": 0.8, # mean of deltas + "effectiveness_verdict": "improved", + "regression_penalty": 0.0, + "fact_coverage": 0.67, # 2/3 facts passed + "pattern_adherence": 0.5, # 1/2 patterns passed "structure": 1.0, - "efficiency": 0.92, - "composite": 0.71 + "token_efficiency": 0.92, + "final": 0.71, }, - "Tokens": {"candidate_total": 1198, "original_total": 1234, "budget": 2000}, + "token_counts": {"candidate_total": 1198, "original_total": 1234, "budget": 2000}, # If MLflow assessments were injected: - "Real_world_assessments": [ + "real_world_assessments": [ {"name": "ToolCallCorrectness", "value": "no", "rationale": "Agent used Bash instead of execute_sql"} ] } ``` +GEPA renders each top-level key as a markdown header. The **key names are the headers** — so `Missing_Facts` becomes `## Missing_Facts` followed by a bulleted list. This gives the reflection LM precise, actionable information instead of having to parse prose rationale. + --- ## Multi-Pass Optimization @@ -282,30 +325,56 @@ Both scores come from the same evaluator, same judges, same prompts, same cached --- -## MLflow Judges +## Judges & Assertions + +The framework uses two complementary scoring mechanisms: -The framework uses [MLflow's `make_judge`](https://mlflow.org/docs/latest/llms/llm-evaluate/index.html) to score responses (`judges.py`). +### 1. Multi-judge architecture (LLM-based) -### Judge types +Built with [MLflow's `make_judge`](https://mlflow.org/docs/latest/genai/eval-monitor/scorers/llm-judge/custom-judges/) (`judges.py`). Three focused custom judges evaluate independent dimensions: -| Judge | What it does | Returns | -|-------|-------------|---------| -| `quality_judge` | Scores a response against expected facts, patterns, and guidelines | `float` (0.0-1.0) + rationale | -| `regression_judge` | Identifies specific ways the skill harms responses | `bool` + rationale | -| `effectiveness_judge` | Compares WITH vs WITHOUT responses | `verdict` (improved/same/regressed) + rationale | +| Judge | Focus | Feedback type | Runs on | GEPA signal | +|-------|-------|--------------|---------|-------------| +| **Correctness** | Facts, API references, code syntax | `Literal["excellent", "acceptable", "poor"]` | WITH + WITHOUT | `Judge_correctness_with` / `Judge_correctness_without` | +| **Completeness** | All parts addressed, expected info | `Literal["excellent", "acceptable", "poor"]` | WITH + WITHOUT | `Judge_completeness_with` / `Judge_completeness_without` | +| **Guideline adherence** | Databricks patterns, conventions | `Literal["excellent", "acceptable", "poor"]` | WITH only | `Judge_guideline_adherence` | +| **Regression** | What the skill broke (conditional) | `bool` | Conditional | `Regression_Analysis` | -During optimization, only `quality_judge` is called (effectiveness is derived from the quality delta). The other judges are available for standalone use. +Each judge returns a categorical verdict with detailed rationale. Verdicts are converted to floats: `excellent=1.0`, `acceptable=0.6`, `poor=0.0`. -### Judge prompt construction +**Why categorical over numeric?** Categorical `Literal` types produce more reliable, consistent judgments than continuous 0.0–1.0 scores. They're also compatible with [MemAlign](https://mlflow.org/docs/latest/genai/eval-monitor/scorers/llm-judge/custom-judges/#alignment) for judge calibration with human feedback. -The quality judge receives: -- The response to score -- `expected_facts` from the test case -- `expected_patterns` with descriptions -- `guidelines` from the test case and manifest -- Deduplicated guidelines from all test cases for the skill +**Why three judges (not one, not five)?** The previous single quality judge collapsed 5 criteria into one score. When a mutation improved correctness but hurt completeness, the score barely moved — GEPA couldn't distinguish which dimension improved. Three judges cover core evaluation dimensions: +1. **Correctness** → GEPA can target: fix API errors, update deprecated patterns +2. **Completeness** → GEPA can target: add missing content, cover more question parts +3. **Guideline adherence** → GEPA can target: align with Databricks conventions, `--focus` areas -It returns a 0.0–1.0 score with a detailed written rationale explaining which facts were present/missing and which patterns matched/failed. +Five judges would cost 7+ calls/task — too expensive for iterative GEPA. + +**Guideline injection:** +- **Correctness judge** receives only correctness-related guidelines (filtered by keywords: api, syntax, correct, deprecated, modern) +- **Guideline adherence judge** receives ALL guidelines: `default_guidelines` from manifest + per-test `guidelines` + `[FOCUS]` guidelines from `--focus` +- This makes `--focus` areas directly evaluable — the judge checks whether the response follows them + +### 2. Deterministic assertions (zero LLM cost) + +`assertions.py` runs binary pass/fail checks against the response: + +| Assertion type | How it works | Example | +|---------------|-------------|---------| +| **Fact** | Case-insensitive substring match | `"MEASURE() function"` → found/missing | +| **Pattern** | Regex with `min_count`/`max_count` | `MEASURE\(` with `min_count: 1` | + +`run_all_assertions()` checks both WITH and WITHOUT responses. `summarize_failures()` classifies each assertion: + +- **POSITIVE** — fails without skill, passes with (skill is helping) +- **REGRESSION** — passes without skill, fails with (skill is confusing the agent) +- **NEEDS_SKILL** — fails both with and without (skill must add this content) +- **NEUTRAL** — same result either way (agent already knows this) + +### Effectiveness scoring + +Effectiveness is derived per-dimension: `correctness_delta = correctness_with - correctness_without` and `completeness_delta = completeness_with - completeness_without`, then averaged. This gives GEPA two separate signals about WHERE improvement happened, enabling targeted mutations rather than generic ones. --- @@ -403,10 +472,13 @@ Example: `--include-tools --tool-modules sql serving` (3 components: `skill_md` | Weight | Dimension | What it measures | |--------|-----------|-----------------| -| **40%** | Skill Effectiveness | `quality_with - quality_without` — the skill's unique contribution | -| **30%** | Absolute Quality | `quality_with` — overall response quality with skill present | +| **30%** | Effectiveness Delta | Mean of (correctness_delta + completeness_delta) — per-dimension skill contribution | +| **20%** | Quality Composite | Mean of (correctness + completeness + guideline_adherence) WITH scores | +| **15%** | Fact/Pattern Coverage | Deterministic assertions — `fact_coverage` and `pattern_adherence` | +| **10%** | Guideline Adherence | Dedicated weight for Databricks patterns and conventions | | **5%** | Structure | Python/SQL syntax validity (deterministic) | -| **25%** | Token Efficiency | Token count vs original — smaller skills save context window | +| **10%** | Token Efficiency | Token count vs original — smaller skills save context window | +| **-10%** | Regression Penalty | Explicit penalty when regression_judge detects skill-caused harm | ### Agent evaluator @@ -493,13 +565,16 @@ Optimizing both simultaneously creates a **confounding variable problem**: ▼ ▼ skillbench_evaluator.py agent_evaluator.py (fast proxy: litellm + (real Claude Code via - MLflow judges) Claude Agent SDK) - │ │ - ▼ ▼ - judges.py executor.py - (quality_judge, (ClaudeSDKClient, - regression_judge, event streaming, - model fallback) TraceMetrics builder) + 3 focused judges + Claude Agent SDK + + assertions) assertions) + │ │ │ │ + ▼ ▼ ▼ ▼ + judges.py assertions.py executor.py assertions.py + (correctness_judge, (ClaudeSDKClient, + completeness_judge, event streaming, + guideline_adherence_judge, TraceMetrics builder) + regression_judge, + model fallback) │ │ ▼ ▼ MLflow make_judge MLflow Tracing diff --git a/.test/scripts/_common.py b/.test/scripts/_common.py index e0078c54..6c9f14d1 100644 --- a/.test/scripts/_common.py +++ b/.test/scripts/_common.py @@ -5,12 +5,21 @@ - setup_path(): Add skill_test to Python path - create_cli_context(): Create CLIContext with proper base_path """ + +import os import sys from pathlib import Path +# Fix protobuf v6 C-extension incompatibility with MLflow. +# The cpp FieldDescriptor is missing `is_repeated`, causing json_format to fail. +# Force pure-Python implementation unless the user has explicitly set it. +if "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION" not in os.environ: + os.environ["PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION"] = "python" + # Load .env file if present try: from dotenv import load_dotenv + load_dotenv() except ImportError: pass # python-dotenv not installed, skip diff --git a/.test/scripts/optimize.py b/.test/scripts/optimize.py index 512cf914..26de631c 100644 --- a/.test/scripts/optimize.py +++ b/.test/scripts/optimize.py @@ -52,7 +52,8 @@ def main(): help="Optimize all skills that have ground_truth.yaml", ) parser.add_argument( - "--preset", "-p", + "--preset", + "-p", choices=["quick", "standard", "thorough"], default="standard", help="GEPA optimization preset (default: standard)", @@ -61,7 +62,7 @@ def main(): "--gen-model", default=None, help="LLM model for generative evaluation (default: GEPA_GEN_LM env or " - "databricks/databricks-claude-sonnet-4-6).", + "databricks/databricks-claude-sonnet-4-6).", ) parser.add_argument( "--reflection-lm", @@ -87,7 +88,7 @@ def main(): "--apply-last", action="store_true", help="Apply the last saved optimization result without re-running " - "(reads from .test/skills//optimized_SKILL.md)", + "(reads from .test/skills//optimized_SKILL.md)", ) parser.add_argument( "--include-tools", @@ -145,19 +146,19 @@ def main(): "--agent-eval", action="store_true", help="Hybrid mode: use real Claude Code agent for baseline + validation, " - "proxy for GEPA iterations.", + "proxy for GEPA iterations.", ) parser.add_argument( "--agent-eval-full", action="store_true", help="Full agent mode: use real Claude Code agent for ALL GEPA iterations " - "(slow but most accurate).", + "(slow but most accurate).", ) parser.add_argument( "--agent-model", default=None, help="Model for agent execution (e.g., databricks-claude-sonnet-4-6). " - "Defaults to ANTHROPIC_MODEL env var.", + "Defaults to ANTHROPIC_MODEL env var.", ) parser.add_argument( "--agent-timeout", @@ -175,7 +176,7 @@ def main(): default=None, metavar="EXPERIMENT_ID", help="MLflow experiment ID with ToolCallCorrectness/ToolCallEfficiency assessments. " - "Injects real-world behavioral feedback into GEPA's reflection context.", + "Injects real-world behavioral feedback into GEPA's reflection context.", ) parser.add_argument( @@ -192,6 +193,20 @@ def main(): dest="requirements", help="Inline requirement for test case generation (repeatable).", ) + parser.add_argument( + "--focus", + action="append", + default=None, + dest="focus_areas", + help="Natural-language focus area to steer optimization (repeatable). " + "E.g., --focus 'prefix all catalogs with customer_ prefix'", + ) + parser.add_argument( + "--focus-file", + type=str, + default=None, + help="File with focus areas (one per line). Combined with --focus args.", + ) args = parser.parse_args() @@ -199,7 +214,11 @@ def main(): parser.error("Either provide a skill name or use --all") from skill_test.optimize.runner import optimize_skill - from skill_test.optimize.review import review_optimization, apply_optimization, load_last_result + from skill_test.optimize.review import ( + review_optimization, + apply_optimization, + load_last_result, + ) # Handle requirements-driven example generation if args.generate_from or args.requirements: @@ -212,16 +231,19 @@ def main(): print(f"Error: requirements file not found: {req_path}") sys.exit(1) requirements.extend( - line.strip() for line in req_path.read_text().splitlines() + line.strip() + for line in req_path.read_text().splitlines() if line.strip() and not line.strip().startswith("#") ) if args.requirements: requirements.extend(args.requirements) if requirements: from generate_examples import run_generation + gen_model = args.gen_model if gen_model is None: from skill_test.optimize.config import DEFAULT_GEN_LM + gen_model = DEFAULT_GEN_LM run_generation( skill_name=args.skill_name, @@ -231,6 +253,39 @@ def main(): ) print() + # Collect focus areas from --focus and --focus-file + focus_areas: list[str] | None = None + if args.focus_areas or args.focus_file: + focus_areas = [] + if args.focus_areas: + focus_areas.extend(args.focus_areas) + if args.focus_file: + fp = Path(args.focus_file) + if not fp.exists(): + print(f"Error: focus file not found: {fp}") + sys.exit(1) + focus_areas.extend( + line.strip() + for line in fp.read_text().splitlines() + if line.strip() and not line.strip().startswith("#") + ) + + # Apply focus areas before optimization + if focus_areas: + from focus import apply_focus + from skill_test.optimize.config import DEFAULT_GEN_LM + + focus_gen_model = args.gen_model or DEFAULT_GEN_LM + if args.all: + # Defer per-skill focus application to the loop below + pass + elif args.skill_name: + apply_focus( + skill_name=args.skill_name, + focus_areas=focus_areas, + gen_model=focus_gen_model, + ) + # Handle --apply-last: load saved result and apply without re-running if args.apply_last: if not args.skill_name: @@ -238,12 +293,18 @@ def main(): result = load_last_result(args.skill_name) if result is None: print(f"No saved optimization found for '{args.skill_name}'.") - print(f"Run optimization first: uv run python .test/scripts/optimize.py {args.skill_name}") + print( + f"Run optimization first: uv run python .test/scripts/optimize.py {args.skill_name}" + ) sys.exit(1) print(f"Applying saved optimization for '{args.skill_name}':") - print(f" Score: {result.original_score:.3f} -> {result.optimized_score:.3f} " - f"({result.improvement:+.3f})") - print(f" Tokens: {result.original_token_count:,} -> {result.optimized_token_count:,}") + print( + f" Score: {result.original_score:.3f} -> {result.optimized_score:.3f} " + f"({result.improvement:+.3f})" + ) + print( + f" Tokens: {result.original_token_count:,} -> {result.optimized_token_count:,}" + ) try: apply_optimization(result) sys.exit(0) @@ -277,6 +338,7 @@ def main(): mlflow_experiment=args.mlflow_experiment, mlflow_assessment_experiment=args.mlflow_assessments, max_per_skill=args.max_per_skill, + focus_areas=focus_areas, ) review_optimization(result) if args.apply and not args.dry_run: @@ -291,15 +353,29 @@ def main(): skill_names = [ d.name for d in sorted(skills_dir.iterdir()) - if d.is_dir() and (d / "ground_truth.yaml").exists() and not d.name.startswith("_") + if d.is_dir() + and (d / "ground_truth.yaml").exists() + and not d.name.startswith("_") ] - print(f"Found {len(skill_names)} skills to optimize: {', '.join(skill_names)}\n") + print( + f"Found {len(skill_names)} skills to optimize: {', '.join(skill_names)}\n" + ) results = [] for name in skill_names: print(f"\n{'=' * 60}") print(f" Optimizing: {name}") print(f"{'=' * 60}") + # Apply focus per-skill in --all mode + if focus_areas: + from focus import apply_focus + from skill_test.optimize.config import DEFAULT_GEN_LM + + apply_focus( + skill_name=name, + focus_areas=focus_areas, + gen_model=args.gen_model or DEFAULT_GEN_LM, + ) try: result = optimize_skill( skill_name=name, @@ -322,11 +398,14 @@ def main(): agent_timeout=args.agent_timeout, mlflow_experiment=args.mlflow_experiment, mlflow_assessment_experiment=args.mlflow_assessments, + focus_areas=focus_areas, ) review_optimization(result) if args.apply and not args.dry_run: apply_optimization(result) - results.append({"skill": name, "success": True, "improvement": result.improvement}) + results.append( + {"skill": name, "success": True, "improvement": result.improvement} + ) except Exception as e: print(f" ERROR: {e}") results.append({"skill": name, "success": False, "error": str(e)}) @@ -364,6 +443,7 @@ def main(): agent_model=args.agent_model, agent_timeout=args.agent_timeout, mlflow_experiment=args.mlflow_experiment, + focus_areas=focus_areas, ) review_optimization(result) if args.apply and not args.dry_run: diff --git a/.test/skills/databricks-zerobus-ingest/ground_truth.yaml b/.test/skills/databricks-zerobus-ingest/ground_truth.yaml index 89cbfa9c..3a8eb79b 100644 --- a/.test/skills/databricks-zerobus-ingest/ground_truth.yaml +++ b/.test/skills/databricks-zerobus-ingest/ground_truth.yaml @@ -18,6 +18,8 @@ test_cases: - Calls stream.ingest_record() to send data - Calls ack.wait_for_ack() to confirm durability - Closes stream in a finally block + - Installs databricks-sdk>=0.97.0 and databricks-zerobus-ingest-sdk>=1.1.0 + - Uses %pip install to install required packages before importing expected_patterns: - pattern: from zerobus\.sdk min_count: 1 @@ -29,13 +31,26 @@ test_cases: min_count: 1 - pattern: (finally|close) min_count: 1 + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must reference the latest Zerobus SDK version 1.1.0 or higher + - pattern: databricks-sdk>=0\.97\.0 + min_count: 1 + description: Must reference the latest Databricks SDK version 0.97.0 or higher guidelines: - Must import from zerobus.sdk.sync or zerobus.sdk.aio - Must include proper stream cleanup with try/finally - Must show the full init -> stream -> ingest -> ack -> close pattern + - Must specify databricks-sdk>=0.97.0 in the pip install command + - Must specify databricks-zerobus-ingest-sdk>=1.1.0 in the pip install command + - Must include a protobuf runtime compatibility check before installing grpcio-tools metadata: category: happy_path difficulty: easy + tags: + - sdk-versioning + - installation + - compatibility - id: databricks-zerobus-ingest_002 inputs: prompt: How do I set up authentication and find the server endpoint for Zerobus @@ -75,6 +90,8 @@ test_cases: - Uses RecordType.PROTOBUF for Protobuf serialization - Requires a compiled .proto file matching the target table schema - Uses SerializeToString() to convert Protobuf messages to bytes + - Installs databricks-zerobus-ingest-sdk>=1.1.0 for async support + - Checks google.protobuf runtime version before installing grpcio-tools for compatibility expected_patterns: - pattern: zerobus\.sdk\.aio min_count: 1 @@ -84,13 +101,27 @@ test_cases: min_count: 1 - pattern: await min_count: 1 + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must use the latest Zerobus SDK version 1.1.0 or higher + - pattern: google\.protobuf|grpcio-tools + min_count: 1 + description: Must reference protobuf or grpcio-tools for Protobuf serialization guidelines: - Must use async/await patterns consistently - Must reference Protobuf schema generation from UC table - Must include proper async stream cleanup + - Must specify databricks-zerobus-ingest-sdk>=1.1.0 in the pip install command + - Must include a runtime protobuf version check to select the correct grpcio-tools + version + - Must handle compatibility between protobuf 5.26.x/5.29.x and grpcio-tools 1.62.0 metadata: category: happy_path difficulty: medium + tags: + - sdk-versioning + - protobuf-compatibility + - async - id: databricks-zerobus-ingest_004 inputs: prompt: Generate a Protobuf schema from a Unity Catalog table for Zerobus ingestion @@ -103,6 +134,8 @@ test_cases: - Maps UC column types to Protobuf types - Generates a .proto file with correct syntax - Requires protoc compilation to produce language stubs + - Uses databricks-sdk>=0.97.0 to fetch Unity Catalog table schema + - Checks protobuf runtime version before installing grpcio-tools to ensure compatibility expected_patterns: - pattern: \.proto min_count: 1 @@ -110,13 +143,27 @@ test_cases: min_count: 1 - pattern: message min_count: 1 + - pattern: databricks-sdk>=0\.97\.0 + min_count: 1 + description: Must use the latest Databricks SDK version 0.97.0 or higher + - pattern: google\.protobuf\.__version__|runtime_version + min_count: 1 + description: Must check the runtime protobuf version for grpcio-tools compatibility guidelines: - Must show the type mapping from UC types to Protobuf types - Must include protoc compilation step - Must produce valid proto3 syntax + - Must specify databricks-sdk>=0.97.0 in the pip install command + - Must include the smart grpcio-tools installation that checks the runtime protobuf + version + - Must handle protobuf 5.26.x and 5.29.x by pinning grpcio-tools==1.62.0 metadata: category: happy_path difficulty: medium + tags: + - sdk-versioning + - protobuf-compatibility + - schema-generation - id: databricks-zerobus-ingest_005 inputs: prompt: Implement retry and reconnection logic for a Zerobus producer @@ -129,18 +176,29 @@ test_cases: - Implement exponential backoff for retries - Reinitialize the stream on connection failures - Max throughput is 100 MB/s and 15,000 rows/s per stream + - Uses databricks-zerobus-ingest-sdk>=1.1.0 which includes improved retry primitives + - Installs compatible SDK versions before implementing retry logic expected_patterns: - pattern: (retry|backoff|reconnect) min_count: 1 - pattern: create_stream min_count: 1 + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must use the latest Zerobus SDK version 1.1.0 or higher guidelines: - Must implement exponential backoff, not fixed-interval retries - Must reinitialize stream on disconnection, not just retry the same call - Must mention at-least-once semantics and idempotency considerations + - Must specify databricks-zerobus-ingest-sdk>=1.1.0 in the pip install command + - Must specify databricks-sdk>=0.97.0 in the pip install command metadata: category: error_handling difficulty: hard + tags: + - sdk-versioning + - retry-logic + - resilience - id: databricks-zerobus-ingest_006 inputs: prompt: Build a Zerobus Ingest client in Go that sends JSON records @@ -153,6 +211,8 @@ test_cases: - Uses gRPC for transport - Implements proper error handling and stream cleanup - Uses JSON record type + - References the latest SDK versions compatible with Go (databricks-zerobus-ingest-sdk>=1.1.0 + equivalent for Go) expected_patterns: - pattern: (go|golang|Go) min_count: 1 @@ -162,9 +222,15 @@ test_cases: - Must use idiomatic Go patterns (error handling, defer for cleanup) - Must reference the multi-language client documentation - Must include proper gRPC connection setup + - Must reference the latest available Zerobus SDK version (1.1.0+) for Go + - Must note any Go-specific version compatibility requirements metadata: category: multi_language difficulty: medium + tags: + - sdk-versioning + - go + - multi-language - id: databricks-zerobus-ingest_007 inputs: prompt: What are the limits and constraints of Zerobus Ingest? @@ -179,6 +245,7 @@ test_cases: - At-least-once delivery semantics - Table must be a managed Delta table in Unity Catalog - Zerobus does not create or alter tables + - 'SDK version requirements: databricks-sdk>=0.97.0 and databricks-zerobus-ingest-sdk>=1.1.0' expected_patterns: - pattern: 100.?MB min_count: 1 @@ -188,9 +255,15 @@ test_cases: - Must list concrete throughput limits - Must mention single-AZ durability caveat - Must mention that table management is the user's responsibility + - Must mention the minimum SDK version requirements for production use + - Must note that older SDK versions may lack features or have known issues metadata: category: knowledge difficulty: easy + tags: + - sdk-versioning + - limits + - constraints - id: grp_20260212_153137 inputs: prompt: Build a simple python producer that ingests JSON records into a Delta @@ -433,3 +506,311 @@ test_cases: fixed_by_commit: null fix_description: null trace_run_id: null +- id: databricks-zerobus-ingest_focus_00 + inputs: + prompt: Show me how to install the correct versions of databricks-sdk and databricks-zerobus-ingest-sdk + with protobuf runtime compatibility handling + outputs: + response: "Here is the recommended installation sequence for Zerobus Ingest that\ + \ handles protobuf runtime compatibility across Databricks Runtime versions:\n\ + \n```python\n# Step 1: Install core SDKs with pinned minimum versions\n%pip\ + \ install databricks-sdk>=0.97.0 databricks-zerobus-ingest-sdk>=1.1.0\n\n# Step\ + \ 2: Smart grpcio-tools installation based on runtime protobuf version\nimport\ + \ google.protobuf\nruntime_version = google.protobuf.__version__\nprint(f\"\ + Runtime protobuf version: {runtime_version}\")\n\nif runtime_version.startswith(\"\ + 5.26\") or runtime_version.startswith(\"5.29\"):\n # These protobuf versions\ + \ require a pinned grpcio-tools to avoid descriptor conflicts\n %pip install\ + \ grpcio-tools==1.62.0\nelse:\n # Newer or older protobuf versions can use\ + \ the latest grpcio-tools\n %pip install grpcio-tools\n\nprint(\"Installation\ + \ complete. Restart the Python kernel if running in a notebook.\")\n```\n\n\ + **Why these versions?**\n\n| Package | Minimum Version | Reason |\n|---|---|---|\n\ + | `databricks-sdk` | `>=0.97.0` | Required for latest Unity Catalog APIs and\ + \ token refresh fixes |\n| `databricks-zerobus-ingest-sdk` | `>=1.1.0` | Includes\ + \ stream stability improvements and ACK reliability fixes |\n| `grpcio-tools`\ + \ | `==1.62.0` (conditional) | Avoids protobuf descriptor pool conflicts on\ + \ DBR 14.x/15.x |\n\n**Verification:**\n```python\nimport databricks.sdk\nimport\ + \ zerobus\nprint(f\"databricks-sdk version: {databricks.sdk.__version__}\")\n\ + print(f\"zerobus-ingest-sdk version: {zerobus.__version__}\")\n```\n\n**Notes:**\n\ + - These packages are NOT pre-installed on Databricks clusters.\n- After `%pip\ + \ install`, restart the Python interpreter (or use `dbutils.library.restartPython()`)\ + \ before importing.\n- For production clusters, pin exact versions in your `requirements.txt`\ + \ to ensure reproducibility." + execution_success: true + expectations: + expected_patterns: + - pattern: databricks-sdk>=0\.97\.0 + min_count: 1 + description: Must specify databricks-sdk>=0.97.0 + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must specify databricks-zerobus-ingest-sdk>=1.1.0 + - pattern: google\.protobuf\.__version__|runtime_version + min_count: 1 + description: Must check the runtime protobuf version + - pattern: grpcio-tools==1\.62\.0 + min_count: 1 + description: Must pin grpcio-tools==1.62.0 for protobuf 5.26.x/5.29.x + - pattern: 5\.26|5\.29 + min_count: 1 + description: Must handle protobuf versions 5.26.x and 5.29.x specifically + - pattern: '%pip install' + min_count: 1 + description: Must use %pip install for Databricks notebook compatibility + expected_facts: + - Installs databricks-sdk>=0.97.0 + - Installs databricks-zerobus-ingest-sdk>=1.1.0 + - Checks google.protobuf.__version__ before installing grpcio-tools + - Pins grpcio-tools==1.62.0 for protobuf 5.26.x and 5.29.x runtimes + - Uses latest grpcio-tools for other protobuf versions + - These packages are NOT pre-installed on Databricks clusters + - Recommends restarting the Python kernel after installation + guidelines: + - Must specify databricks-sdk>=0.97.0 as the minimum version + - Must specify databricks-zerobus-ingest-sdk>=1.1.0 as the minimum version + - Must include the conditional grpcio-tools installation based on runtime protobuf + version + - Must handle protobuf 5.26.x and 5.29.x by pinning grpcio-tools==1.62.0 + - Must note that packages are not pre-installed and a kernel restart may be needed + - Must provide a verification step to confirm installed versions + metadata: + category: setup + difficulty: easy + source: generated_from_focus + focus: '- ensure the latest databricks-sdk is being used like 0.97.0 + + - ensure the latest databricks zerobus sdk is being using 1.1.0 + + - make sure compatibility across runtimes' + tags: + - sdk-versioning + - installation + - protobuf-compatibility + - grpcio-tools + - runtime-compatibility +- id: databricks-zerobus-ingest_focus_01 + inputs: + prompt: Write a production-ready Python Zerobus Ingest producer using the latest + SDK versions (databricks-sdk 0.97.0, zerobus-ingest-sdk 1.1.0) with Protobuf + serialization, proper version checks, and error handling + outputs: + response: "Here is a production-ready Zerobus Ingest producer using the latest\ + \ SDK versions with Protobuf serialization:\n\n## Step 1: Install Dependencies\n\ + \n```python\n# Install latest compatible SDK versions\n%pip install databricks-sdk>=0.97.0\ + \ databricks-zerobus-ingest-sdk>=1.1.0\n\n# Smart grpcio-tools installation\n\ + import google.protobuf\nruntime_version = google.protobuf.__version__\nprint(f\"\ + Runtime protobuf version: {runtime_version}\")\n\nif runtime_version.startswith(\"\ + 5.26\") or runtime_version.startswith(\"5.29\"):\n %pip install grpcio-tools==1.62.0\n\ + else:\n %pip install grpcio-tools\n```\n\n## Step 2: Define Protobuf Schema\ + \ (events.proto)\n\n```protobuf\nsyntax = \"proto3\";\npackage events;\n\nmessage\ + \ SensorEvent {\n string device_id = 1;\n double temperature = 2;\n int64\ + \ event_timestamp = 3;\n string status = 4;\n}\n```\n\nCompile with:\n```bash\n\ + python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. events.proto\n\ + ```\n\n## Step 3: Production Producer\n\n```python\nimport time\nimport logging\n\ + from zerobus.sdk.sync import ZerobusSdk, RecordType\nfrom events_pb2 import\ + \ SensorEvent\n\nlogging.basicConfig(level=logging.INFO)\nlogger = logging.getLogger(__name__)\n\ + \n# Configuration\nSERVER_ENDPOINT = \".zerobus.databricks.com:443\"\ + \nCLIENT_ID = \"\"\nCLIENT_SECRET = \"\"\ + \nTABLE_FQN = \"catalog.schema.sensor_events\"\n\nMAX_RETRIES = 5\nBASE_BACKOFF_SECONDS\ + \ = 1.0\nMAX_BACKOFF_SECONDS = 60.0\n\ndef create_sdk_client():\n \"\"\"\ + Initialize the Zerobus SDK with service principal credentials.\"\"\"\n return\ + \ ZerobusSdk(\n server_endpoint=SERVER_ENDPOINT,\n client_id=CLIENT_ID,\n\ + \ client_secret=CLIENT_SECRET,\n )\n\ndef ingest_with_retry(records:\ + \ list, max_retries: int = MAX_RETRIES):\n \"\"\"Ingest records with exponential\ + \ backoff retry on failure.\"\"\"\n attempt = 0\n backoff = BASE_BACKOFF_SECONDS\n\ + \n while attempt <= max_retries:\n sdk = None\n stream = None\n\ + \ try:\n sdk = create_sdk_client()\n stream = sdk.create_stream(\n\ + \ table=TABLE_FQN,\n record_type=RecordType.PROTOBUF,\n\ + \ )\n logger.info(f\"Stream created. Ingesting {len(records)}\ + \ records (attempt {attempt + 1}).\")\n\n acks = []\n \ + \ for record in records:\n proto_msg = SensorEvent(\n \ + \ device_id=record[\"device_id\"],\n temperature=record[\"\ + temperature\"],\n event_timestamp=record[\"event_timestamp\"\ + ],\n status=record[\"status\"],\n )\n \ + \ ack = stream.ingest_record(proto_msg.SerializeToString())\n \ + \ acks.append(ack)\n\n # Wait for all ACKs to confirm\ + \ durability\n for ack in acks:\n ack.wait_for_ack()\n\ + \n stream.flush()\n logger.info(f\"Successfully ingested\ + \ {len(records)} records.\")\n return # Success\n\n except\ + \ Exception as e:\n logger.warning(f\"Ingestion attempt {attempt\ + \ + 1} failed: {e}\")\n attempt += 1\n if attempt > max_retries:\n\ + \ logger.error(\"Max retries exceeded. Raising exception.\")\n\ + \ raise\n sleep_time = min(backoff * (2 ** (attempt\ + \ - 1)), MAX_BACKOFF_SECONDS)\n logger.info(f\"Retrying in {sleep_time:.1f}s...\"\ + )\n time.sleep(sleep_time)\n finally:\n if stream:\n\ + \ try:\n stream.close()\n except\ + \ Exception:\n pass\n if sdk:\n \ + \ try:\n sdk.close()\n except Exception:\n\ + \ pass\n\n# Example usage\nif __name__ == \"__main__\":\n\ + \ sample_records = [\n {\"device_id\": \"sensor-001\", \"temperature\"\ + : 22.5, \"event_timestamp\": int(time.time()), \"status\": \"ok\"},\n \ + \ {\"device_id\": \"sensor-002\", \"temperature\": 35.1, \"event_timestamp\"\ + : int(time.time()), \"status\": \"warning\"},\n ]\n ingest_with_retry(sample_records)\n\ + ```\n\n**Key production considerations:**\n- **SDK versions**: `databricks-sdk>=0.97.0`\ + \ and `databricks-zerobus-ingest-sdk>=1.1.0` are required for stability.\n-\ + \ **Protobuf compatibility**: Always check `google.protobuf.__version__` before\ + \ installing `grpcio-tools`.\n- **At-least-once delivery**: Zerobus guarantees\ + \ at-least-once semantics; design your target table to handle duplicate records\ + \ (e.g., use a deduplication key).\n- **Stream limits**: Max 100 MB/s and 15,000\ + \ rows/s per stream.\n- **Cleanup**: Always close streams and SDK clients in\ + \ `finally` blocks to avoid resource leaks." + execution_success: true + expectations: + expected_patterns: + - pattern: databricks-sdk>=0\.97\.0 + min_count: 1 + description: Must specify databricks-sdk>=0.97.0 + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must specify databricks-zerobus-ingest-sdk>=1.1.0 + - pattern: RecordType\.PROTOBUF + min_count: 1 + description: Must use RecordType.PROTOBUF for Protobuf serialization + - pattern: SerializeToString + min_count: 1 + description: Must call SerializeToString() to serialize Protobuf messages + - pattern: wait_for_ack + min_count: 1 + description: Must call wait_for_ack() to confirm durability + - pattern: exponential|backoff|2 \*\* + min_count: 1 + description: Must implement exponential backoff + - pattern: finally + min_count: 1 + description: Must use finally block for cleanup + - pattern: google\.protobuf\.__version__|runtime_version + min_count: 1 + description: Must check runtime protobuf version + - pattern: grpcio-tools==1\.62\.0 + min_count: 1 + description: Must pin grpcio-tools for protobuf 5.26.x/5.29.x + expected_facts: + - Installs databricks-sdk>=0.97.0 and databricks-zerobus-ingest-sdk>=1.1.0 + - Checks google.protobuf.__version__ before installing grpcio-tools + - Pins grpcio-tools==1.62.0 for protobuf 5.26.x and 5.29.x + - Uses RecordType.PROTOBUF for Protobuf serialization + - Calls SerializeToString() to convert Protobuf messages to bytes + - Calls wait_for_ack() to confirm durability + - Implements exponential backoff for retries + - Closes stream and SDK in a finally block + - Mentions at-least-once delivery semantics + - Mentions max 100 MB/s and 15,000 rows/s per stream limits + guidelines: + - Must specify databricks-sdk>=0.97.0 in the pip install command + - Must specify databricks-zerobus-ingest-sdk>=1.1.0 in the pip install command + - Must include the conditional grpcio-tools installation based on runtime protobuf + version + - Must use RecordType.PROTOBUF and call SerializeToString() + - Must implement exponential backoff (not fixed-interval) for retries + - Must close stream and SDK client in finally blocks + - Must call wait_for_ack() to confirm record durability + - Must mention at-least-once delivery semantics and idempotency considerations + metadata: + category: happy_path + difficulty: hard + source: generated_from_focus + focus: '- ensure the latest databricks-sdk is being used like 0.97.0 + + - ensure the latest databricks zerobus sdk is being using 1.1.0 + + - make sure compatibility across runtimes' + tags: + - sdk-versioning + - protobuf-compatibility + - production-ready + - retry-logic + - grpcio-tools + - protobuf +- id: databricks-zerobus-ingest_focus_02 + inputs: + prompt: I'm getting a protobuf descriptor pool conflict when using Zerobus Ingest + with grpcio-tools on Databricks Runtime 14.x. How do I fix this? + outputs: + response: "This is a known compatibility issue between certain protobuf runtime\ + \ versions (5.26.x and 5.29.x shipped with DBR 14.x/15.x) and newer `grpcio-tools`\ + \ releases. Here is how to diagnose and fix it:\n\n## Diagnosis\n\n```python\n\ + import google.protobuf\nprint(f\"Protobuf runtime version: {google.protobuf.__version__}\"\ + )\n# If this prints 5.26.x or 5.29.x, you need the pinned grpcio-tools\n```\n\ + \n## Fix: Smart grpcio-tools Installation\n\n```python\n# First, ensure you\ + \ have the latest compatible core SDKs\n%pip install databricks-sdk>=0.97.0\ + \ databricks-zerobus-ingest-sdk>=1.1.0\n\n# Then install grpcio-tools based\ + \ on the detected protobuf version\nimport google.protobuf\nruntime_version\ + \ = google.protobuf.__version__\nprint(f\"Runtime protobuf version: {runtime_version}\"\ + )\n\nif runtime_version.startswith(\"5.26\") or runtime_version.startswith(\"\ + 5.29\"):\n # Pin grpcio-tools to avoid descriptor pool conflicts\n %pip\ + \ install grpcio-tools==1.62.0\n print(\"Installed grpcio-tools==1.62.0 for\ + \ protobuf compatibility\")\nelse:\n # Use latest grpcio-tools for other\ + \ protobuf versions\n %pip install grpcio-tools\n print(f\"Installed latest\ + \ grpcio-tools for protobuf {runtime_version}\")\n```\n\n## Why This Happens\n\ + \n| DBR Version | Protobuf Version | Compatible grpcio-tools |\n|---|---|---|\n\ + | 14.x | 5.26.x | `==1.62.0` |\n| 15.x | 5.29.x | `==1.62.0` |\n| 13.x and earlier\ + \ | 3.x / 4.x | Latest |\n| 16.x+ | 6.x+ | Latest |\n\nThe conflict occurs because\ + \ `grpcio-tools` bundles its own protobuf descriptor pool, which clashes with\ + \ the runtime version when they are incompatible. Pinning `grpcio-tools==1.62.0`\ + \ uses a bundled protobuf that is compatible with the 5.26.x/5.29.x runtime.\n\ + \n## Verify the Fix\n\n```python\n# After installation and kernel restart, verify\ + \ no import errors\nfrom grpc_tools import protoc\nfrom zerobus.sdk.sync import\ + \ ZerobusSdk, RecordType\nprint(\"All imports successful - no descriptor pool\ + \ conflicts\")\n```\n\n## Additional Tips\n\n- Always restart the Python kernel\ + \ (`dbutils.library.restartPython()`) after `%pip install` in notebooks.\n-\ + \ Pin exact versions in `requirements.txt` for production cluster init scripts\ + \ to ensure reproducibility.\n- If you are using `databricks-zerobus-ingest-sdk>=1.1.0`,\ + \ the SDK itself does not require `grpcio-tools` at runtime — only for compiling\ + \ `.proto` files during development." + execution_success: true + expectations: + expected_patterns: + - pattern: google\.protobuf\.__version__|runtime_version + min_count: 1 + description: Must check the runtime protobuf version + - pattern: 5\.26|5\.29 + min_count: 1 + description: Must identify protobuf 5.26.x and 5.29.x as the affected versions + - pattern: grpcio-tools==1\.62\.0 + min_count: 1 + description: Must recommend pinning grpcio-tools==1.62.0 as the fix + - pattern: databricks-sdk>=0\.97\.0 + min_count: 1 + description: Must reference the latest databricks-sdk version + - pattern: databricks-zerobus-ingest-sdk>=1\.1\.0 + min_count: 1 + description: Must reference the latest zerobus-ingest-sdk version + - pattern: descriptor.pool|descriptor pool + min_count: 1 + description: Must explain the descriptor pool conflict + - pattern: restartPython|restart.*kernel|kernel.*restart + min_count: 1 + description: Must recommend restarting the Python kernel after installation + expected_facts: + - Protobuf 5.26.x and 5.29.x shipped with DBR 14.x/15.x cause descriptor pool + conflicts + - The fix is to pin grpcio-tools==1.62.0 for protobuf 5.26.x and 5.29.x + - Checks google.protobuf.__version__ to detect the runtime version + - Installs databricks-sdk>=0.97.0 and databricks-zerobus-ingest-sdk>=1.1.0 + - Recommends restarting the Python kernel after pip install + - grpcio-tools is only needed for compiling .proto files, not at runtime for the + SDK + guidelines: + - Must identify protobuf 5.26.x and 5.29.x as the root cause of the conflict + - Must recommend pinning grpcio-tools==1.62.0 as the specific fix + - Must include the conditional installation pattern that checks the runtime version + - Must specify databricks-sdk>=0.97.0 and databricks-zerobus-ingest-sdk>=1.1.0 + - Must recommend restarting the Python kernel after installation + - Must explain why the conflict occurs (bundled vs runtime protobuf descriptor + pool) + - Should provide a verification step to confirm the fix worked + metadata: + category: error_handling + difficulty: medium + source: generated_from_focus + focus: '- ensure the latest databricks-sdk is being used like 0.97.0 + + - ensure the latest databricks zerobus sdk is being using 1.1.0 + + - make sure compatibility across runtimes' + tags: + - sdk-versioning + - protobuf-compatibility + - grpcio-tools + - troubleshooting + - databricks-runtime + - descriptor-pool diff --git a/.test/skills/databricks-zerobus-ingest/manifest.yaml b/.test/skills/databricks-zerobus-ingest/manifest.yaml index 930f50c1..294dfc79 100644 --- a/.test/skills/databricks-zerobus-ingest/manifest.yaml +++ b/.test/skills/databricks-zerobus-ingest/manifest.yaml @@ -1,6 +1,7 @@ skill_name: databricks-zerobus-ingest tool_modules: [] -description: Test cases for Zerobus Ingest skill - gRPC-based data ingestion into Delta tables +description: Test cases for Zerobus Ingest skill - gRPC-based data ingestion into + Delta tables triggers: - zerobus - zerobus ingest @@ -21,12 +22,26 @@ scorers: - guidelines_from_expectations default_guidelines: - Response must use the zerobus.sdk package (not fabricated imports) - - Code must follow the pattern: SDK init -> create stream -> ingest records -> handle ACKs -> close + - 'Code must follow the pattern: SDK init -> create stream -> ingest records -> + handle ACKs -> close' - Must include proper stream cleanup (try/finally or context manager) - - Must use correct classes (ZerobusSdk, StreamConfigurationOptions, TableProperties, RecordType) + - Must use correct classes (ZerobusSdk, StreamConfigurationOptions, TableProperties, + RecordType) - Must reference at-least-once delivery semantics when discussing reliability - Protobuf examples must use protoc-generated stubs, not hand-rolled messages - Server endpoint format must match the documented cloud-specific patterns + - '[FOCUS] Must use databricks-sdk>=0.97.0 (not older versions like 0.85.0) in all + pip install commands and requirements' + - '[FOCUS] Must use databricks-zerobus-ingest-sdk>=1.1.0 (not older versions like + 0.2.0) in all pip install commands and requirements' + - '[FOCUS] Must include the smart protobuf/grpcio-tools compatibility check before + installing grpcio-tools to avoid runtime version conflicts' + - '[FOCUS] Must print or check the runtime protobuf version before installing grpcio-tools + and select a compatible grpcio-tools version accordingly' + - '[FOCUS] Must not hardcode a single grpcio-tools version without first inspecting + the runtime google.protobuf version' + - '[FOCUS] When specifying SDK versions in code comments or documentation, must + reflect the latest versions (databricks-sdk>=0.97.0, databricks-zerobus-ingest-sdk>=1.1.0)' trace_expectations: tool_limits: Bash: 10 @@ -40,5 +55,5 @@ scorers: - databricks-skills/databricks-zerobus-ingest/SKILL.md quality_gates: syntax_valid: 1.0 - pattern_adherence: 0.9 + pattern_adherence: 0.95 execution_success: 0.8 diff --git a/.test/src/skill_test/optimize/agent_evaluator.py b/.test/src/skill_test/optimize/agent_evaluator.py index bea8d146..132acaee 100644 --- a/.test/src/skill_test/optimize/agent_evaluator.py +++ b/.test/src/skill_test/optimize/agent_evaluator.py @@ -31,6 +31,7 @@ tool_count as tool_count_scorer, tool_sequence as tool_sequence_scorer, ) +from .assertions import run_all_assertions, summarize_failures from .judges import ( JudgeFeedback, create_skill_quality_judge, @@ -342,6 +343,17 @@ def __call__( score_without = _safe_parse_score(quality_without_fb.value) effectiveness_delta = score_with - score_without + # Deterministic fact/pattern assertions (zero LLM cost) + with_assertion_results = run_all_assertions(with_response, expectations) + without_assertion_results = run_all_assertions(without_response, expectations) + + fact_results = [r for r in with_assertion_results if r.assertion_type == "fact"] + pattern_results = [r for r in with_assertion_results if r.assertion_type == "pattern"] + fact_score = sum(1 for r in fact_results if r.passed) / len(fact_results) if fact_results else 1.0 + pattern_score = sum(1 for r in pattern_results if r.passed) / len(pattern_results) if pattern_results else 1.0 + + failure_summary = summarize_failures(with_assertion_results, without_assertion_results) + # Phase 4: Tool-call judges (MLflow or fallback) tool_scores = _run_mlflow_tool_judges( with_trace, @@ -389,7 +401,7 @@ def __call__( side_info: dict[str, Any] = {} if prompt: - side_info["Task"] = prompt[:200] + side_info["Task"] = prompt[:500] side_info["Judge_quality_with"] = { "score": score_with, @@ -406,6 +418,18 @@ def __call__( "delta": effectiveness_delta, } + # Assertion-based structured feedback + side_info["Missing_Facts"] = [r.rationale for r in fact_results if not r.passed] + side_info["Missing_Patterns"] = [r.rationale for r in pattern_results if not r.passed] + side_info["Passed_Facts"] = [r.rationale for r in fact_results if r.passed] + side_info["Passed_Patterns"] = [r.rationale for r in pattern_results if r.passed] + + if failure_summary.get("Error") or failure_summary.get("Regressions"): + side_info["skill_md_specific_info"] = { + "Assertion_Diagnostics": failure_summary.get("Error", ""), + "Regressions": failure_summary.get("Regressions", ""), + } + # Agent-specific details side_info["agent_trace"] = { "total_tool_calls": with_trace.get("tools", {}).get("total_calls", 0), @@ -424,15 +448,17 @@ def __call__( # Expected vs Actual reference_answer = example.get("answer", "") if reference_answer: - side_info["Expected"] = reference_answer[:500] + side_info["Expected"] = reference_answer[:2000] if with_response: - side_info["Actual"] = with_response[:500] + side_info["Actual"] = with_response[:2000] - # Score breakdown + # Score breakdown (scores dict feeds GEPA's Pareto frontier) side_info["scores"] = { "quality_with": score_with, "quality_without": score_without, "skill_effectiveness": effectiveness_delta, + "fact_coverage": fact_score, + "pattern_adherence": pattern_score, "tool_correctness": tool_correctness, "tool_efficiency": tool_efficiency, "behavioral": behavioral_score, diff --git a/.test/src/skill_test/optimize/alignment.py b/.test/src/skill_test/optimize/alignment.py index 6151f3f2..a7b8587b 100644 --- a/.test/src/skill_test/optimize/alignment.py +++ b/.test/src/skill_test/optimize/alignment.py @@ -97,3 +97,29 @@ def align_judge( except Exception as e: logger.warning("MemAlign alignment failed for %s: %s", skill_name, e) return judge + + +def align_judges( + skill_name: str, + judges: dict[str, Any], + reflection_lm: str = "openai:/gpt-4o-mini", +) -> dict[str, Any]: + """Align multiple judges with human feedback using MemAlign. + + Convenience wrapper that calls ``align_judge`` on each judge in the dict. + Judges that can't be aligned (insufficient traces) are returned unchanged. + + Args: + skill_name: Name of the skill to load traces for. + judges: Dict mapping judge names to judge instances + (e.g. ``{"correctness": cj, "completeness": cmj, "guideline_adherence": gj}``). + reflection_lm: LLM for MemAlign's reflection step. + + Returns: + Dict with same keys, values are aligned judges where possible. + """ + aligned: dict[str, Any] = {} + for name, judge in judges.items(): + logger.info("Aligning judge '%s' for skill '%s'", name, skill_name) + aligned[name] = align_judge(skill_name, judge, reflection_lm=reflection_lm) + return aligned diff --git a/.test/src/skill_test/optimize/judges.py b/.test/src/skill_test/optimize/judges.py index dfc748a3..247aad67 100644 --- a/.test/src/skill_test/optimize/judges.py +++ b/.test/src/skill_test/optimize/judges.py @@ -1,11 +1,18 @@ """MLflow judge factories for skill evaluation. -Replaces the 6 separate judge calls and binary assertion layer with three -focused judges that provide both scores AND rich rationale for GEPA's -reflection LM. +Multi-judge architecture with three focused custom ``make_judge`` instances: -Judges: - quality_judge — Scores a single response (0.0-1.0) against expectations. + correctness_judge — Are facts, API references, and code syntax accurate? + completeness_judge — Are all parts of the question addressed? + guideline_adherence_judge — Does the response follow Databricks-specific patterns? + +Each judge uses categorical ``Literal["excellent", "acceptable", "poor"]`` +feedback types for more reliable, alignable judgments. Scores are converted +to floats via ``CATEGORICAL_SCORES``. + +Legacy judges (kept for backward compat): + _create_legacy_quality_judge — Single 0.0-1.0 quality scorer (renamed from + ``create_skill_quality_judge``). effectiveness_judge — Compares WITH vs WITHOUT responses, returns verdict. regression_judge — Identifies specific ways a skill harms responses. @@ -31,7 +38,7 @@ import os import time from dataclasses import dataclass -from typing import Any +from typing import Any, Literal from mlflow.genai.judges import make_judge @@ -244,7 +251,235 @@ def _safe_parse_score(raw_value: Any) -> float: # --------------------------------------------------------------------------- -# Quality judge — primary scorer for a single response +# Categorical scoring +# --------------------------------------------------------------------------- + +CATEGORICAL_SCORES: dict[str, float] = { + "excellent": 1.0, + "acceptable": 0.6, + "poor": 0.0, +} + + +def _categorical_to_float(verdict: str | float) -> float: + """Convert a categorical verdict to a float score. + + Handles ``Literal["excellent", "acceptable", "poor"]`` from the new + multi-judge architecture as well as raw floats from legacy judges. + """ + if isinstance(verdict, (int, float)): + return max(0.0, min(1.0, float(verdict))) + key = str(verdict).strip().lower() + return CATEGORICAL_SCORES.get(key, 0.0) + + +# --------------------------------------------------------------------------- +# Correctness judge — facts, API references, code syntax accuracy +# --------------------------------------------------------------------------- + +_CORRECTNESS_KEYWORDS = {"api", "syntax", "correct", "deprecated", "modern", "function", "parameter", "error", "version"} + +_CORRECTNESS_INSTRUCTIONS = """\ +You are an expert evaluator for Databricks skill documentation CORRECTNESS. +Focus ONLY on whether facts, API references, and code syntax are accurate. + +## What to evaluate + +1. **Factual accuracy** — are stated facts correct? +2. **API accuracy** — are function names, parameters, and return types correct? +3. **Code syntax** — is the code syntactically valid and runnable? +4. **Currency** — are APIs current (not deprecated)? + +Do NOT evaluate completeness or style — only correctness. + +## Expected Information + +{{ expectations }} + +## Input + +Question: {{ inputs }} +Response: {{ outputs }} + +## Instructions + +Return one of exactly: "excellent", "acceptable", "poor". + +- "excellent" = all facts, APIs, and syntax are correct +- "acceptable" = mostly correct with minor inaccuracies that don't break functionality +- "poor" = contains significant factual errors, wrong APIs, or broken syntax + +Provide detailed rationale explaining: +- Specific factual errors found (or confirming correctness) +- API references that are wrong or deprecated +- Syntax issues in code examples +""" + + +def create_correctness_judge( + skill_guidelines: list[str] | None = None, + judge_model: str | None = None, +) -> Any: + """Create a correctness-focused judge with categorical feedback. + + Args: + skill_guidelines: Optional guidelines — only correctness-related ones + (matching keywords like api, syntax, correct, deprecated) are injected. + judge_model: LLM model for the judge. + """ + instructions = _CORRECTNESS_INSTRUCTIONS + if skill_guidelines: + # Filter for correctness-related guidelines + filtered = [ + g for g in skill_guidelines + if any(kw in g.lower() for kw in _CORRECTNESS_KEYWORDS) + ] + if filtered: + principles = "\n".join(f"- {g}" for g in filtered) + instructions += f"\n\n## Domain-Specific Correctness Principles\n{principles}\n" + + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + return make_judge( + name="skill_correctness", + model=model_uri, + instructions=instructions, + feedback_value_type=Literal["excellent", "acceptable", "poor"], + inference_params=inference_params, + ) + + +# --------------------------------------------------------------------------- +# Completeness judge — all parts addressed, all expected info present +# --------------------------------------------------------------------------- + +_COMPLETENESS_INSTRUCTIONS = """\ +You are an expert evaluator for Databricks skill documentation COMPLETENESS. +Focus ONLY on whether all parts of the question are addressed and all expected +information is present. + +## What to evaluate + +1. **Question coverage** — are all parts of the question answered? +2. **Expected facts** — are all expected facts present? +3. **Expected patterns** — are all expected code patterns demonstrated? +4. **Depth** — is the response detailed enough to be actionable? + +Do NOT evaluate correctness or style — only completeness. + +## Expected Information + +{{ expectations }} + +## Input + +Question: {{ inputs }} +Response: {{ outputs }} + +## Instructions + +Return one of exactly: "excellent", "acceptable", "poor". + +- "excellent" = all parts addressed, all expected facts and patterns present +- "acceptable" = most parts addressed, minor gaps in coverage +- "poor" = significant parts of the question unanswered or major facts missing + +Provide detailed rationale explaining: +- Which parts of the question are addressed vs unanswered +- Which expected facts are present vs missing +- Which expected patterns are demonstrated vs absent +""" + + +def create_completeness_judge( + judge_model: str | None = None, +) -> Any: + """Create a completeness-focused judge with categorical feedback. + + Args: + judge_model: LLM model for the judge. + """ + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + return make_judge( + name="skill_completeness", + model=model_uri, + instructions=_COMPLETENESS_INSTRUCTIONS, + feedback_value_type=Literal["excellent", "acceptable", "poor"], + inference_params=inference_params, + ) + + +# --------------------------------------------------------------------------- +# Guideline adherence judge — Databricks patterns and practices +# --------------------------------------------------------------------------- + +_GUIDELINE_ADHERENCE_INSTRUCTIONS = """\ +You are an expert evaluator for Databricks skill documentation GUIDELINE ADHERENCE. +Focus ONLY on whether the response follows Databricks-specific patterns, +conventions, and guidelines. + +## What to evaluate + +1. **Pattern adherence** — does the response follow expected code patterns? +2. **Convention compliance** — does it use Databricks-specific conventions? +3. **Best practice alignment** — does it follow recommended practices? +4. **Guideline compliance** — does it adhere to the specific guidelines listed below? + +Do NOT evaluate general correctness or completeness — only guideline adherence. + +## Expected Information + +{{ expectations }} + +## Input + +Question: {{ inputs }} +Response: {{ outputs }} + +## Instructions + +Return one of exactly: "excellent", "acceptable", "poor". + +- "excellent" = follows all guidelines and patterns precisely +- "acceptable" = follows most guidelines with minor deviations +- "poor" = ignores or violates important guidelines + +Provide detailed rationale explaining: +- Which guidelines are followed vs violated +- Which patterns are correctly demonstrated vs missing +- Specific deviations from expected practices +""" + + +def create_guideline_adherence_judge( + skill_guidelines: list[str] | None = None, + judge_model: str | None = None, +) -> Any: + """Create a guideline adherence judge with categorical feedback. + + Receives ALL guidelines (default_guidelines + per-test guidelines + + [FOCUS] guidelines from ``--focus``), making focus areas directly evaluable. + + Args: + skill_guidelines: All guidelines to evaluate against. + judge_model: LLM model for the judge. + """ + instructions = _GUIDELINE_ADHERENCE_INSTRUCTIONS + if skill_guidelines: + principles = "\n".join(f"- {g}" for g in skill_guidelines) + instructions += f"\n\n## Required Guidelines\n{principles}\n" + + model_uri, inference_params = _to_judge_model_and_params(judge_model or DEFAULT_JUDGE_LM) + return make_judge( + name="skill_guideline_adherence", + model=model_uri, + instructions=instructions, + feedback_value_type=Literal["excellent", "acceptable", "poor"], + inference_params=inference_params, + ) + + +# --------------------------------------------------------------------------- +# Legacy quality judge (renamed, kept for backward compatibility) # --------------------------------------------------------------------------- _QUALITY_INSTRUCTIONS = """\ @@ -286,11 +521,13 @@ def _safe_parse_score(raw_value: Any) -> float: """ -def create_skill_quality_judge( +def _create_legacy_quality_judge( skill_guidelines: list[str] | None = None, judge_model: str | None = None, ) -> Any: - """Create a universal quality judge for scoring responses. + """Create a universal quality judge for scoring responses (legacy). + + Renamed from ``create_skill_quality_judge``. Kept for backward compatibility. Args: skill_guidelines: Optional per-skill evaluation principles from @@ -313,6 +550,10 @@ def create_skill_quality_judge( ) +# Backward-compatible alias +create_skill_quality_judge = _create_legacy_quality_judge + + # --------------------------------------------------------------------------- # Effectiveness judge — WITH vs WITHOUT comparison # --------------------------------------------------------------------------- diff --git a/.test/src/skill_test/optimize/review.py b/.test/src/skill_test/optimize/review.py index e4d3f6c7..bb4fee78 100644 --- a/.test/src/skill_test/optimize/review.py +++ b/.test/src/skill_test/optimize/review.py @@ -161,22 +161,30 @@ def review_optimization(result: OptimizationResult) -> None: # Aggregate judge-based scores from per-task side_info task_count = 0 - sum_with = 0.0 - sum_without = 0.0 + sum_corr_w = 0.0 + sum_comp_w = 0.0 + sum_guide = 0.0 sum_eff = 0.0 per_task_lines: list[str] = [] for task_id in sorted(si.keys()): info = si[task_id] scores = info.get("scores", {}) - pw = scores.get("quality_with", 0.0) - pwo = scores.get("quality_without", 0.0) + corr_w = scores.get("correctness_with", 0.0) + comp_w = scores.get("completeness_with", 0.0) + guide = scores.get("guideline_adherence", 0.0) eff = scores.get("skill_effectiveness", 0.0) - sum_with += pw - sum_without += pwo + sum_corr_w += corr_w + sum_comp_w += comp_w + sum_guide += guide sum_eff += eff task_count += 1 + # Get categorical verdicts for display + corr_verdict = info.get("Judge_correctness_with", {}).get("verdict", "?") + comp_verdict = info.get("Judge_completeness_with", {}).get("verdict", "?") + guide_verdict = info.get("Judge_guideline_adherence", {}).get("verdict", "?") + # Build per-task notes error = info.get("Error", "") notes = [] @@ -187,14 +195,18 @@ def review_optimization(result: OptimizationResult) -> None: if not notes: notes.append("OK") note_str = f" [{'; '.join(notes)}]" - per_task_lines.append(f" {task_id:<30s} WITH {pw:.2f} WITHOUT {pwo:.2f} delta {eff:+.2f}{note_str}") + per_task_lines.append( + f" {task_id:<30s} corr={corr_verdict:<10s} comp={comp_verdict:<10s} " + f"guide={guide_verdict:<10s} delta {eff:+.2f}{note_str}" + ) if task_count > 0: - agg_with = sum_with / task_count - agg_without = sum_without / task_count + agg_corr = sum_corr_w / task_count + agg_comp = sum_comp_w / task_count + agg_guide = sum_guide / task_count agg_eff = sum_eff / task_count else: - agg_with = agg_without = agg_eff = 0.0 + agg_corr = agg_comp = agg_guide = agg_eff = 0.0 # Score summary improvement_sign = "+" if result.improvement >= 0 else "" @@ -203,8 +215,9 @@ def review_optimization(result: OptimizationResult) -> None: f"({improvement_sign}{result.improvement:.3f})" ) print(f" Skill Effectiveness: {agg_eff:.2f}") - print(f" Quality (with): {agg_with:.2f}") - print(f" Quality (without): {agg_without:.2f} (baseline)") + print(f" Correctness (with): {agg_corr:.2f}") + print(f" Completeness (with): {agg_comp:.2f}") + print(f" Guideline Adherence: {agg_guide:.2f}") # Token counts reduction_sign = "+" if result.token_reduction_pct >= 0 else "" diff --git a/.test/src/skill_test/optimize/runner.py b/.test/src/skill_test/optimize/runner.py index 3129634f..a5017e8b 100644 --- a/.test/src/skill_test/optimize/runner.py +++ b/.test/src/skill_test/optimize/runner.py @@ -29,7 +29,12 @@ create_skillbench_evaluator, build_skillbench_background, ) -from .splitter import create_gepa_datasets, generate_bootstrap_tasks, to_gepa_instances, create_cross_skill_dataset +from .splitter import ( + create_gepa_datasets, + generate_bootstrap_tasks, + to_gepa_instances, + create_cross_skill_dataset, +) from .tools import ( extract_tool_descriptions, tools_to_gepa_components, @@ -69,13 +74,25 @@ def _compute_diff_summary(original: str, optimized: str) -> str: """Generate a human-readable summary of changes.""" original_lines = original.splitlines(keepends=True) optimized_lines = optimized.splitlines(keepends=True) - diff = list(difflib.unified_diff(original_lines, optimized_lines, fromfile="original", tofile="optimized", n=1)) + diff = list( + difflib.unified_diff( + original_lines, + optimized_lines, + fromfile="original", + tofile="optimized", + n=1, + ) + ) if not diff: return "No changes" - added = sum(1 for line in diff if line.startswith("+") and not line.startswith("+++")) - removed = sum(1 for line in diff if line.startswith("-") and not line.startswith("---")) + added = sum( + 1 for line in diff if line.startswith("+") and not line.startswith("+++") + ) + removed = sum( + 1 for line in diff if line.startswith("-") and not line.startswith("---") + ) parts = [] if added: @@ -85,7 +102,11 @@ def _compute_diff_summary(original: str, optimized: str) -> str: changed_sections = set() for line in diff: - content = line[1:].strip() if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) else "" + content = ( + line[1:].strip() + if line.startswith(("+", "-")) and not line.startswith(("+++", "---")) + else "" + ) if content.startswith("#"): changed_sections.add(content) @@ -130,9 +151,29 @@ def _log_detailed_judge_metrics( agent_validation_si: dict[str, dict] | None, ): """Log detailed per-task judge metrics, aggregates, and rationales to the active MLflow run.""" - # Score keys we expect from skillbench judges - JUDGE_SCORE_KEYS = ["quality_with", "quality_without", "skill_effectiveness", "structure", "token_efficiency"] - AGENT_SCORE_KEYS = ["tool_correctness", "tool_efficiency", "behavioral", "execution_success"] + # Score keys we expect from skillbench judges (multi-judge architecture) + JUDGE_SCORE_KEYS = [ + "correctness_with", + "correctness_without", + "completeness_with", + "completeness_without", + "guideline_adherence", + "quality_composite", + "correctness_delta", + "completeness_delta", + "skill_effectiveness", + "regression_penalty", + "fact_coverage", + "pattern_adherence", + "structure", + "token_efficiency", + ] + AGENT_SCORE_KEYS = [ + "tool_correctness", + "tool_efficiency", + "behavioral", + "execution_success", + ] # --- A. Per-task skillbench scores --- metrics = {} @@ -239,6 +280,8 @@ def optimize_skill( mlflow_assessment_experiment: str | None = None, # Cross-skill dataset max_per_skill: int | None = None, + # Focus areas for steering optimization + focus_areas: list[str] | None = None, # Deprecated params kept for backward compat mode: str = "static", task_lm: str | None = None, @@ -300,7 +343,9 @@ def optimize_skill( # Build read-only tool context string (for skill optimization) if tool_components: - tool_context_str = "\n\n".join(tool_components[k] for k in sorted(tool_components)) + tool_context_str = "\n\n".join( + tool_components[k] for k in sorted(tool_components) + ) # 2. Build seed_candidate (multi-component dict) seed_candidate: dict[str, str] = {} @@ -333,11 +378,17 @@ def optimize_skill( # 3. Load datasets if tools_only: # Cross-skill dataset for tool optimization - train = create_cross_skill_dataset(max_per_skill=max_per_skill or 5, tool_modules=tool_modules) + train = create_cross_skill_dataset( + max_per_skill=max_per_skill or 5, tool_modules=tool_modules + ) val = None if train: - source_skills = {t.get("metadata", {}).get("source_skill", "?") for t in train} - print(f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)") + source_skills = { + t.get("metadata", {}).get("source_skill", "?") for t in train + } + print( + f"Cross-skill dataset: {len(train)} tasks from {len(source_skills)} skill(s)" + ) else: # Fall back to single-skill dataset try: @@ -359,13 +410,19 @@ def optimize_skill( assessment_summary = None assessment_by_task: dict[str, list] = {} if mlflow_assessment_experiment: - from .assessment_fetcher import fetch_assessments, summarize_assessment_patterns, match_assessments_to_tasks + from .assessment_fetcher import ( + fetch_assessments, + summarize_assessment_patterns, + match_assessments_to_tasks, + ) records = fetch_assessments(mlflow_assessment_experiment, skill_name=skill_name) if records: assessment_summary = summarize_assessment_patterns(records) assessment_by_task = match_assessments_to_tasks(records, train) - print(f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched") + print( + f"MLflow assessments: {len(records)} traces, {len(assessment_by_task)} tasks matched" + ) if assessment_summary: print(f" {assessment_summary.splitlines()[0]}") else: @@ -383,7 +440,9 @@ def optimize_skill( print("Evaluator: skillbench (judge-driven)") if not effective_gen_model: - raise ValueError("SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var.") + raise ValueError( + "SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var." + ) evaluator = create_skillbench_evaluator( skill_name, gen_model=effective_gen_model, @@ -421,7 +480,9 @@ def optimize_skill( evaluator = agent_evaluator print("Mode: agent-eval-full (agent for ALL iterations)") else: - print("Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)") + print( + "Mode: agent-eval hybrid (proxy for GEPA, agent for baseline + validation)" + ) # 5. Get config (scaled by component count) num_components = len(seed_candidate) @@ -488,7 +549,9 @@ def _refiner_lm_with_fallback(prompt): for comp, tokens in original_token_counts.items(): print(f" {comp}: {tokens:,} tokens") if tool_context_str: - print(f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens") + print( + f"Tool context (read-only): {count_tokens(tool_context_str):,} tokens" + ) print(f"Train tasks: {len(train)}") print(f"Val tasks: {len(val) if val else 'None (single-task mode)'}") print(f"Generation model: {effective_gen_model}") @@ -517,6 +580,7 @@ def _refiner_lm_with_fallback(prompt): baseline_side_info=si_by_id, token_budget=token_budget, assessment_summary=assessment_summary, + focus_areas=focus_areas, ) print(f"\nBackground preview:\n{background[:500]}...") @@ -525,8 +589,10 @@ def _refiner_lm_with_fallback(prompt): dry_run_agent_si = None if agent_evaluator: print(f"\nAgent baseline ({len(train)} tasks)...") - dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = _evaluate_on_tasks( - agent_evaluator, seed_candidate, train, label="Agent baseline" + dry_run_agent_score, agent_per_task, dry_run_agent_si, _ = ( + _evaluate_on_tasks( + agent_evaluator, seed_candidate, train, label="Agent baseline" + ) ) print(f"Agent baseline score: {dry_run_agent_score:.3f}") for task_id, score in agent_per_task.items(): @@ -571,20 +637,33 @@ def _refiner_lm_with_fallback(prompt): baseline_side_info=si_by_id, token_budget=token_budget, assessment_summary=assessment_summary, + focus_areas=focus_areas, ) objective = ( f"Refine and improve the existing '{skill_name}' skill. " - "Score is based on SKILL EFFECTIVENESS (35%) and TOKEN EFFICIENCY (25%). " - "Judge rationale in side_info explains exactly what failed. " + "Score: EFFECTIVENESS (30%) + QUALITY_COMPOSITE (20%) + FACT_PATTERN (15%) " + "+ GUIDELINE_ADHERENCE (10%) + EFFICIENCY (10%) + STRUCTURE (5%) - REGRESSION_PENALTY (10%). " + "Three judges evaluate independently: CORRECTNESS (facts/API/syntax), " + "COMPLETENESS (coverage), GUIDELINE_ADHERENCE (patterns). " + "Use per-dimension deltas in Judge_effectiveness to see WHERE improvement happened. " + "Use Missing_Facts and Missing_Patterns in side_info to see exactly what content to add. " "Focus on what the agent would otherwise get wrong. " "Be concise — remove redundant examples and verbose explanations." ) + if focus_areas: + focus_text = "\n".join(f"- {f}" for f in focus_areas) + objective += ( + f"\n\nUSER PRIORITY — The user has asked to prioritize:\n{focus_text}\n" + "Weight these priorities heavily when deciding what to add, change, or emphasize." + ) # 6b. Agent baseline scoring (hybrid mode: before GEPA loop) if agent_evaluator and not agent_eval_full: print(f"\n Agent baseline scoring ({len(train)} tasks)...") - agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = _evaluate_on_tasks( - agent_evaluator, seed_candidate, train, label="Agent baseline" + agent_baseline_score, agent_baseline_per_task, agent_baseline_si, _ = ( + _evaluate_on_tasks( + agent_evaluator, seed_candidate, train, label="Agent baseline" + ) ) print(f" Agent baseline score: {agent_baseline_score:.3f}") for task_id, score in agent_baseline_per_task.items(): @@ -609,7 +688,11 @@ def _refiner_lm_with_fallback(prompt): ) # estimate_pass_duration expects the model name string, not the callable - _est_reflection_lm = _reflection_model_name if _reflection_model_name else str(reflection_lm or DEFAULT_GEN_LM) + _est_reflection_lm = ( + _reflection_model_name + if _reflection_model_name + else str(reflection_lm or DEFAULT_GEN_LM) + ) est_secs = estimate_pass_duration( config.engine.max_metric_calls, _est_reflection_lm, @@ -624,7 +707,9 @@ def _refiner_lm_with_fallback(prompt): ) for pass_num in range(1, max_passes + 1): - print(f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---") + print( + f"\n --- Pass {pass_num}/{max_passes} (best score so far: {best_score:.4f}) ---" + ) pass_config = copy.deepcopy(config) @@ -644,10 +729,14 @@ def _refiner_lm_with_fallback(prompt): total_metric_calls += result.total_metric_calls or 0 candidate = result.best_candidate - pass_score, _, pass_si_by_id, _ = _evaluate_on_tasks(evaluator, candidate, train, label=f"Pass {pass_num}") + pass_score, _, pass_si_by_id, _ = _evaluate_on_tasks( + evaluator, candidate, train, label=f"Pass {pass_num}" + ) improvement = pass_score - best_score - print(f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})") + print( + f" Pass {pass_num} score: {pass_score:.4f} (delta: {'+' if improvement >= 0 else ''}{improvement:.4f})" + ) if pass_score > best_score + improvement_threshold: best = dict(candidate) @@ -674,7 +763,9 @@ def _refiner_lm_with_fallback(prompt): val_scores: dict[str, float] = {} if val: - _, val_scores, _, _ = _evaluate_on_tasks(evaluator, best, val, label="Validation") + _, val_scores, _, _ = _evaluate_on_tasks( + evaluator, best, val, label="Validation" + ) token_reduction_pct = ( (total_original_tokens - optimized_token_count) / total_original_tokens * 100 @@ -690,8 +781,8 @@ def _refiner_lm_with_fallback(prompt): if agent_evaluator and not agent_eval_full: print(f"\n Agent validation scoring ({len(train)} tasks on best candidate)...") - agent_validation_score, agent_val_per_task, agent_validation_si, _ = _evaluate_on_tasks( - agent_evaluator, best, train, label="Agent validation" + agent_validation_score, agent_val_per_task, agent_validation_si, _ = ( + _evaluate_on_tasks(agent_evaluator, best, train, label="Agent validation") ) print(f" Agent validation score: {agent_validation_score:.3f}") for task_id, score in agent_val_per_task.items(): diff --git a/.test/src/skill_test/optimize/skillbench_evaluator.py b/.test/src/skill_test/optimize/skillbench_evaluator.py index 131efe51..476e4351 100644 --- a/.test/src/skill_test/optimize/skillbench_evaluator.py +++ b/.test/src/skill_test/optimize/skillbench_evaluator.py @@ -1,18 +1,24 @@ """SkillBench evaluator: measure skill effectiveness via WITH vs WITHOUT comparison. Evaluates skills by measuring agent performance WITH the skill vs WITHOUT it -on real tasks. Uses MLflow judges as the primary scoring mechanism — judges -provide both scores AND rich rationale for GEPA's reflection LM. +on real tasks. Uses three focused MLflow judges (correctness, completeness, +guideline adherence) as the primary scoring mechanism — each judge provides +categorical verdicts AND rich rationale for GEPA's reflection LM. Phase 1: WITH-SKILL -- LLM generates response with SKILL.md in context Phase 2: WITHOUT-SKILL -- LLM generates response with NO skill (cached once) - Phase 3: JUDGE -- quality_judge scores both, effectiveness derived from delta + Phase 3: JUDGES -- correctness + completeness (WITH+WITHOUT), guideline_adherence (WITH only), + regression (conditional on delta < -0.05) + Phase 4: ASSERTIONS -- deterministic fact/pattern checking (zero LLM cost) Scoring weights: - 40% Skill Effectiveness (quality_with - quality_without delta) - 30% Absolute Quality (quality_with score from judge) + 30% Effectiveness Delta (mean of correctness_delta + completeness_delta) + 20% Quality Composite (mean of correctness + completeness + guideline_adherence WITH scores) + 15% Fact/Pattern Coverage (deterministic assertions from assertions.py) + 10% Guideline Adherence (dedicated weight for practices) 5% Structure (syntax validity) - 25% Token Efficiency (smaller candidates score higher) + 10% Token Efficiency (smaller candidates score higher) + 10% Regression Penalty (explicit penalty when regression_judge fires) """ from __future__ import annotations @@ -27,12 +33,15 @@ from mlflow.entities import Feedback from ..scorers.universal import python_syntax, sql_syntax, no_hallucinated_apis +from .assertions import run_all_assertions, summarize_failures from .judges import ( JudgeFeedback, - create_skill_quality_judge, + _categorical_to_float, + create_correctness_judge, + create_completeness_judge, + create_guideline_adherence_judge, create_regression_judge, run_judge_safe, - _safe_parse_score, completion_with_fallback, ) from .utils import count_tokens @@ -129,13 +138,17 @@ def _effectiveness_score(verdict: str | float) -> float: class SkillBenchEvaluator: - """GEPA-compatible evaluator using judges for scoring + diagnostics. + """GEPA-compatible evaluator using three focused judges for scoring + diagnostics. + + Uses correctness, completeness, and guideline adherence judges with + categorical ``Literal["excellent", "acceptable", "poor"]`` feedback types. + Produces decomposed signals for GEPA's reflection LM. Args: gen_model: LLM model for generating responses. Required. original_token_counts: Token counts of original artifacts for efficiency scoring. token_budget: Hard token ceiling; candidates exceeding this are penalized. - skill_guidelines: Deduplicated guidelines from ground_truth.yaml for the quality judge. + skill_guidelines: Deduplicated guidelines from ground_truth.yaml for judges. judge_model: LLM model for judges. Defaults to GEPA_JUDGE_LM env or databricks/databricks-claude-sonnet-4-6. """ @@ -151,19 +164,33 @@ def __init__( assessment_by_task: dict[str, list] | None = None, ): if not gen_model: - raise ValueError("SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var.") + raise ValueError( + "SkillBench evaluator requires a gen_model. Pass --gen-model or set GEPA_GEN_LM env var." + ) self.gen_model = gen_model self._baseline_response_cache: dict[str, str] = {} - self._baseline_judge_cache: dict[str, JudgeFeedback] = {} + # Per-judge baseline caches (WITHOUT responses are stable across iterations) + self._baseline_correctness_cache: dict[str, JudgeFeedback] = {} + self._baseline_completeness_cache: dict[str, JudgeFeedback] = {} self._original_token_counts = original_token_counts or {} self._total_original_tokens = sum(self._original_token_counts.values()) self._token_budget = token_budget self._tool_context = tool_context or "" self._assessment_by_task = assessment_by_task or {} - # Create judge instances with configurable model - self._quality_judge = create_skill_quality_judge(skill_guidelines, judge_model=judge_model) - self._regression_judge = create_regression_judge(judge_model=judge_model) + # Create three focused judge instances + self._correctness_judge = create_correctness_judge( + skill_guidelines, judge_model=judge_model + ) + self._completeness_judge = create_completeness_judge( + judge_model=judge_model + ) + self._guideline_adherence_judge = create_guideline_adherence_judge( + skill_guidelines, judge_model=judge_model + ) + self._regression_judge = create_regression_judge( + judge_model=judge_model + ) def _generate_response(self, prompt: str, skill_context: str | None = None) -> str: """Generate a response with or without skill context.""" @@ -225,7 +252,9 @@ def __call__( # Decode expectations expectations: dict[str, Any] = {} - expectations_json = example.get("additional_context", {}).get("expectations", "") + expectations_json = example.get("additional_context", {}).get( + "expectations", "" + ) if expectations_json: try: expectations = json.loads(expectations_json) @@ -241,7 +270,7 @@ def __call__( # Phase 2: Generate WITHOUT-skill response (cached) without_response = self._get_baseline_response(prompt) - # Phase 3: Judge-driven scoring + # Phase 3: Multi-judge scoring facts = expectations.get("expected_facts", []) patterns = expectations.get("expected_patterns", []) guidelines = expectations.get("guidelines", []) @@ -251,55 +280,126 @@ def __call__( facts_str = "\n".join(f"- {f}" for f in facts) if facts else "None specified" patterns_str = ( "\n".join( - f"- {p}" if isinstance(p, str) else f"- {p.get('description', p.get('pattern', ''))}" for p in patterns + f"- {p}" + if isinstance(p, str) + else f"- {p.get('description', p.get('pattern', ''))}" + for p in patterns ) if patterns else "None specified" ) - guidelines_str = "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" - - expectations_text = ( - f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" + guidelines_str = ( + "\n".join(f"- {g}" for g in guidelines) if guidelines else "None specified" ) + expectations_text = f"Expected facts:\n{facts_str}\n\nExpected patterns:\n{patterns_str}\n\nGuidelines:\n{guidelines_str}" + # make_judge requires expectations as dict, inputs/outputs as Any. - # The template renders {{ expectations }} as the dict's string repr, - # so we pack our formatted text into a single-key dict. expectations_dict = {"criteria": expectations_text} - # Quality judge: score WITH response - quality_with_fb = run_judge_safe( - self._quality_judge, + baseline_key = _prompt_hash(prompt) + + # --- Correctness judge: WITH + WITHOUT (WITHOUT cached) --- + correctness_with_fb = run_judge_safe( + self._correctness_judge, inputs=prompt, outputs=with_response, expectations=expectations_dict, - name="quality_with", + name="correctness_with", ) + if baseline_key not in self._baseline_correctness_cache: + self._baseline_correctness_cache[baseline_key] = run_judge_safe( + self._correctness_judge, + inputs=prompt, + outputs=without_response, + expectations=expectations_dict, + name="correctness_without", + ) + correctness_without_fb = self._baseline_correctness_cache[baseline_key] - # Quality judge: score WITHOUT response (cached — baseline never changes) - baseline_key = _prompt_hash(prompt) - if baseline_key not in self._baseline_judge_cache: - self._baseline_judge_cache[baseline_key] = run_judge_safe( - self._quality_judge, + # --- Completeness judge: WITH + WITHOUT (WITHOUT cached) --- + completeness_with_fb = run_judge_safe( + self._completeness_judge, + inputs=prompt, + outputs=with_response, + expectations=expectations_dict, + name="completeness_with", + ) + if baseline_key not in self._baseline_completeness_cache: + self._baseline_completeness_cache[baseline_key] = run_judge_safe( + self._completeness_judge, inputs=prompt, outputs=without_response, expectations=expectations_dict, - name="quality_without", + name="completeness_without", ) - quality_without_fb = self._baseline_judge_cache[baseline_key] + completeness_without_fb = self._baseline_completeness_cache[baseline_key] + + # --- Guideline adherence judge: WITH only (meaningless without skill) --- + guideline_adherence_fb = run_judge_safe( + self._guideline_adherence_judge, + inputs=prompt, + outputs=with_response, + expectations=expectations_dict, + name="guideline_adherence", + ) - # Parse scores - score_with = _safe_parse_score(quality_with_fb.value) - score_without = _safe_parse_score(quality_without_fb.value) - effectiveness_delta = score_with - score_without + # Convert categorical verdicts to float scores + correctness_with = _categorical_to_float(correctness_with_fb.value) + correctness_without = _categorical_to_float(correctness_without_fb.value) + completeness_with = _categorical_to_float(completeness_with_fb.value) + completeness_without = _categorical_to_float(completeness_without_fb.value) + guideline_adherence_score = _categorical_to_float(guideline_adherence_fb.value) - # Derive effectiveness verdict from quality delta (no LLM call needed) + # Per-dimension effectiveness deltas + correctness_delta = correctness_with - correctness_without + completeness_delta = completeness_with - completeness_without + effectiveness_delta = (correctness_delta + completeness_delta) / 2.0 + + # Quality composite: mean of all three WITH scores + quality_composite = (correctness_with + completeness_with + guideline_adherence_score) / 3.0 + + # Derive effectiveness verdict if effectiveness_delta > 0.05: - effectiveness_verdict = 1.0 # improved + effectiveness_verdict = "improved" elif effectiveness_delta < -0.05: - effectiveness_verdict = 0.0 # regressed + effectiveness_verdict = "regressed" else: - effectiveness_verdict = 0.5 # same + effectiveness_verdict = "same" + + # --- Regression judge: conditional on delta < -0.05 --- + regression_penalty = 0.0 + regression_fb = None + if effectiveness_delta < -0.05: + comparison_input = ( + f"QUESTION:\n{prompt}\n\n" + f"WITH-SKILL RESPONSE:\n{with_response}\n\n" + f"WITHOUT-SKILL RESPONSE:\n{without_response}" + ) + regression_fb = run_judge_safe( + self._regression_judge, + inputs=comparison_input, + expectations=expectations_dict, + name="regression", + ) + # bool/yes → 1.0 (regression found), no → 0.0 + reg_val = regression_fb.value + if isinstance(reg_val, bool): + regression_penalty = 1.0 if reg_val else 0.0 + elif isinstance(reg_val, str) and reg_val.strip().lower() in ("yes", "true"): + regression_penalty = 1.0 + + # Phase 4: Deterministic fact/pattern assertions (zero LLM cost) + with_results = run_all_assertions(with_response, expectations) + without_results = run_all_assertions(without_response, expectations) + + fact_results = [r for r in with_results if r.assertion_type == "fact"] + pattern_results = [r for r in with_results if r.assertion_type == "pattern"] + fact_score = sum(1 for r in fact_results if r.passed) / len(fact_results) if fact_results else 1.0 + pattern_score = sum(1 for r in pattern_results if r.passed) / len(pattern_results) if pattern_results else 1.0 + + # GEPA-friendly diagnostics from assertion comparison + failure_summary = summarize_failures(with_results, without_results) # Structure validation on the skill itself structure = _run_structure_scorers(skill_md) if skill_md else 1.0 @@ -320,8 +420,17 @@ def __call__( else: efficiency = 1.0 - # Weighted final score - final_score = 0.40 * max(0.0, effectiveness_delta) + 0.30 * score_with + 0.05 * structure + 0.25 * efficiency + # Weighted final score with new multi-judge weights + fact_pattern = 0.5 * fact_score + 0.5 * pattern_score + final_score = max(0.0, min(1.0, + 0.30 * effectiveness_delta + + 0.20 * quality_composite + + 0.15 * fact_pattern + + 0.10 * guideline_adherence_score + + 0.05 * structure + + 0.10 * efficiency + - 0.10 * regression_penalty + )) # Build side info with FULL judge rationale (not truncated!) reference_answer = example.get("answer", "") @@ -330,36 +439,82 @@ def __call__( # Task context if prompt: - side_info["Task"] = prompt[:200] + side_info["Task"] = prompt[:500] - # Full judge feedback — the critical fix for GEPA optimization - side_info["Judge_quality_with"] = { - "score": score_with, - "rationale": quality_with_fb.rationale, + # Per-dimension judge feedback — GEPA sees each as a separate section + side_info["Judge_correctness_with"] = { + "verdict": str(correctness_with_fb.value), + "score": correctness_with, + "rationale": correctness_with_fb.rationale, + } + side_info["Judge_correctness_without"] = { + "verdict": str(correctness_without_fb.value), + "score": correctness_without, + "rationale": correctness_without_fb.rationale, + } + side_info["Judge_completeness_with"] = { + "verdict": str(completeness_with_fb.value), + "score": completeness_with, + "rationale": completeness_with_fb.rationale, + } + side_info["Judge_completeness_without"] = { + "verdict": str(completeness_without_fb.value), + "score": completeness_without, + "rationale": completeness_without_fb.rationale, } - side_info["Judge_quality_without"] = { - "score": score_without, - "rationale": quality_without_fb.rationale, + side_info["Judge_guideline_adherence"] = { + "verdict": str(guideline_adherence_fb.value), + "score": guideline_adherence_score, + "rationale": guideline_adherence_fb.rationale, } + + # Per-dimension effectiveness deltas side_info["Judge_effectiveness"] = { - "verdict": ( - "improved" if effectiveness_verdict == 1.0 else "regressed" if effectiveness_verdict == 0.0 else "same" - ), - "delta": effectiveness_delta, + "verdict": effectiveness_verdict, + "correctness_delta": correctness_delta, + "completeness_delta": completeness_delta, + "overall_delta": effectiveness_delta, } + # Regression analysis (only when regression detected) + if regression_fb and regression_penalty > 0: + side_info["Regression_Analysis"] = { + "rationale": regression_fb.rationale, + } + + # Assertion-based structured feedback — GEPA renders each key as a markdown header + side_info["Missing_Facts"] = [r.rationale for r in fact_results if not r.passed] + side_info["Missing_Patterns"] = [r.rationale for r in pattern_results if not r.passed] + side_info["Passed_Facts"] = [r.rationale for r in fact_results if r.passed] + side_info["Passed_Patterns"] = [r.rationale for r in pattern_results if r.passed] + + # skill_md_specific_info — shown ONLY when reflecting on the skill component + if failure_summary.get("Error") or failure_summary.get("Regressions"): + side_info["skill_md_specific_info"] = { + "Assertion_Diagnostics": failure_summary.get("Error", ""), + "Regressions": failure_summary.get("Regressions", ""), + } + # Expected vs Actual for GEPA reflection if reference_answer: - side_info["Expected"] = reference_answer[:500] + side_info["Expected"] = reference_answer[:2000] if with_response: - side_info["Actual"] = with_response[:500] + side_info["Actual"] = with_response[:2000] - # Score breakdown + # Score breakdown (scores dict feeds GEPA's Pareto frontier) side_info["scores"] = { - "quality_with": score_with, - "quality_without": score_without, + "correctness_with": correctness_with, + "correctness_without": correctness_without, + "completeness_with": completeness_with, + "completeness_without": completeness_without, + "guideline_adherence": guideline_adherence_score, + "quality_composite": quality_composite, + "correctness_delta": correctness_delta, + "completeness_delta": completeness_delta, "skill_effectiveness": effectiveness_delta, - "effectiveness_verdict": effectiveness_verdict, + "regression_penalty": regression_penalty, + "fact_coverage": fact_score, + "pattern_adherence": pattern_score, "structure": structure, "token_efficiency": efficiency, "final": final_score, @@ -376,52 +531,82 @@ def __call__( # Inject matched real-world assessments from MLflow traces if self._assessment_by_task: task_id = example.get("additional_context", {}).get("task_id", "") - matched = self._assessment_by_task.get(task_id) or self._assessment_by_task.get(_prompt_hash(prompt), []) + matched = self._assessment_by_task.get( + task_id + ) or self._assessment_by_task.get(_prompt_hash(prompt), []) if matched: side_info["real_world_assessments"] = [ - {"name": a.name, "value": a.value, "rationale": a.rationale} for a in matched + {"name": a.name, "value": a.value, "rationale": a.rationale} + for a in matched ] - # Derive diagnostic labels from judge verdicts for backward compat - if effectiveness_delta < -0.05: + # Derive diagnostic labels from assertions + judge verdicts + # Find weakest dimension for targeted GEPA feedback + weakest_dim = "correctness" if correctness_with <= completeness_with else "completeness" + weakest_score = min(correctness_with, completeness_with) + + if failure_summary.get("Error"): + # Assertions detected specific NEEDS_SKILL/REGRESSION items + side_info["Error"] = failure_summary["Error"] + elif effectiveness_delta < -0.05: + # Per-dimension regression info + regressed_dims = [] + if correctness_delta < -0.05: + regressed_dims.append(f"correctness({correctness_delta:+.2f})") + if completeness_delta < -0.05: + regressed_dims.append(f"completeness({completeness_delta:+.2f})") + dims_str = ", ".join(regressed_dims) if regressed_dims else f"overall({effectiveness_delta:+.2f})" side_info["Error"] = ( - f"REGRESSION: skill_effectiveness delta={effectiveness_delta:.2f} " - f"(with={score_with:.2f}, without={score_without:.2f})" + f"REGRESSION: {dims_str}. " + f"correctness: {correctness_with:.2f} (was {correctness_without:.2f}), " + f"completeness: {completeness_with:.2f} (was {completeness_without:.2f})" ) - side_info["skill_md_specific_info"] = { - "Regressions": quality_with_fb.rationale, - } - elif score_with < 0.5: + elif weakest_score < 0.6: side_info["Error"] = ( - f"NEEDS_SKILL: quality_with={score_with:.2f}, missing content. Judge: {quality_with_fb.rationale[:200]}" + f"NEEDS_SKILL: weakest dimension is {weakest_dim}={weakest_score:.2f}. " + f"correctness={correctness_with:.2f}, completeness={completeness_with:.2f}, " + f"guideline_adherence={guideline_adherence_score:.2f}" ) return final_score, side_info def _collect_skill_guidelines(skill_name: str) -> list[str]: - """Collect and deduplicate all guidelines from a skill's ground_truth.yaml.""" + """Collect and deduplicate guidelines from ground_truth.yaml and manifest.yaml.""" from pathlib import Path import yaml - gt_path = Path(".test/skills") / skill_name / "ground_truth.yaml" - if not gt_path.exists(): - return [] - - try: - with open(gt_path) as f: - data = yaml.safe_load(f) or {} - except Exception: - return [] - seen: set[str] = set() guidelines: list[str] = [] - for tc in data.get("test_cases", []): - for g in tc.get("expectations", {}).get("guidelines", []): - g_norm = g.strip() - if g_norm and g_norm not in seen: - seen.add(g_norm) - guidelines.append(g_norm) + + # Collect from ground_truth.yaml test cases + gt_path = Path(".test/skills") / skill_name / "ground_truth.yaml" + if gt_path.exists(): + try: + with open(gt_path) as f: + data = yaml.safe_load(f) or {} + for tc in data.get("test_cases", []): + for g in tc.get("expectations", {}).get("guidelines", []): + g_norm = g.strip() + if g_norm and g_norm not in seen: + seen.add(g_norm) + guidelines.append(g_norm) + except Exception: + pass + + # Collect from manifest.yaml default_guidelines (includes [FOCUS] guidelines) + manifest_path = Path(".test/skills") / skill_name / "manifest.yaml" + if manifest_path.exists(): + try: + with open(manifest_path) as f: + manifest = yaml.safe_load(f) or {} + for g in manifest.get("scorers", {}).get("default_guidelines", []): + g_norm = g.strip() + if g_norm and g_norm not in seen: + seen.add(g_norm) + guidelines.append(g_norm) + except Exception: + pass return guidelines @@ -484,6 +669,7 @@ def build_skillbench_background( baseline_side_info: dict[str, dict] | None = None, token_budget: int | None = None, assessment_summary: str | None = None, + focus_areas: list[str] | None = None, ) -> str: """Build concise GEPA reflection context for SkillBench optimization. @@ -493,7 +679,9 @@ def build_skillbench_background( baseline_desc = "" if baseline_scores: mean_score = sum(baseline_scores.values()) / len(baseline_scores) - baseline_desc = f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." + baseline_desc = ( + f"\nBASELINE: mean {mean_score:.3f} across {len(baseline_scores)} tasks." + ) if baseline_side_info: needs_skill_ids = [] @@ -511,14 +699,16 @@ def build_skillbench_background( components_desc = "" if component_names and any(c.startswith("tools_") for c in component_names): - tool_modules = [c.replace("tools_", "") for c in component_names if c.startswith("tools_")] + tool_modules = [ + c.replace("tools_", "") for c in component_names if c.startswith("tools_") + ] components_desc = ( f"\nAlso optimizing MCP tool descriptions for: {', '.join(tool_modules)}. " "Keep docstrings accurate and concise — every token counts toward the budget." ) token_desc = ( - f"\nTOKEN EFFICIENCY (25% of score): Current artifacts total {original_token_count:,} tokens. " + f"\nTOKEN EFFICIENCY (15% of score): Current artifacts total {original_token_count:,} tokens. " "Smaller candidates score HIGHER. Be ruthlessly concise." ) if token_budget: @@ -528,16 +718,34 @@ def build_skillbench_background( if assessment_summary: assessment_desc = f"\n\n{assessment_summary}" + focus_desc = "" + if focus_areas: + focus_items = "\n".join(f" - {f}" for f in focus_areas) + focus_desc = ( + f"\n\nUSER FOCUS PRIORITIES:\n{focus_items}\n" + "These are high-priority areas the user wants the skill to emphasize. " + "Weight these heavily in your optimization decisions." + ) + return ( f"You are refining SKILL.md for '{skill_name}'.\n" - "The skill is scored by MLflow judges that evaluate how much it HELPS an agent.\n" - "Judge rationale in side_info explains exactly WHAT failed and WHY.\n" - "Use Judge_quality_with to see missing facts/patterns.\n" - "Use Judge_effectiveness to see if the skill helped or hurt.\n" + "The skill is scored by THREE focused MLflow judges:\n" + " 1. CORRECTNESS — facts, API references, code syntax accuracy\n" + " 2. COMPLETENESS — all parts addressed, all expected info present\n" + " 3. GUIDELINE ADHERENCE — Databricks-specific patterns and practices\n" + "Each judge returns 'excellent', 'acceptable', or 'poor' with rationale.\n\n" + "Judge rationale in side_info explains exactly WHAT failed and WHY per dimension.\n" + "Use Judge_correctness_with/without for accuracy feedback.\n" + "Use Judge_completeness_with/without for coverage feedback.\n" + "Use Judge_guideline_adherence for pattern compliance feedback.\n" + "Use Judge_effectiveness for per-dimension deltas (correctness_delta, completeness_delta).\n" + "Missing_Facts and Missing_Patterns show exact pass/fail for each expected assertion.\n" + "Passed_Facts and Passed_Patterns show what the skill already covers.\n" "Focus on: specific API syntax, version requirements, non-obvious patterns.\n" "Do NOT add generic knowledge the agent already has." f"{baseline_desc}" f"{components_desc}" f"{token_desc}" f"{assessment_desc}" + f"{focus_desc}" ) diff --git a/databricks-builder-app/server/app.py b/databricks-builder-app/server/app.py index 8ff1b4cb..abfcf1b0 100644 --- a/databricks-builder-app/server/app.py +++ b/databricks-builder-app/server/app.py @@ -22,7 +22,7 @@ from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.middleware.cors import CORSMiddleware -from .db import is_postgres_configured, is_dynamic_token_mode, run_migrations, init_database, start_token_refresh, stop_token_refresh +from .db import is_postgres_configured, is_dynamic_token_mode, run_migrations, init_database, start_token_refresh, stop_token_refresh, start_matview_refresh, stop_matview_refresh from .routers import agent_router, clusters_router, config_router, conversations_router, projects_router, skills_router, warehouses_router from .services.backup_manager import start_backup_worker, stop_backup_worker from .services.skills_manager import copy_skills_to_app @@ -62,6 +62,9 @@ async def lifespan(app: FastAPI): # Run migrations in background thread (non-blocking) asyncio.create_task(asyncio.to_thread(run_migrations)) + # Start hourly materialized view refresh (daily_revenue_by_region) + await start_matview_refresh() + # Start backup worker start_backup_worker() except Exception as e: @@ -80,7 +83,8 @@ async def lifespan(app: FastAPI): logger.info('Shutting down application...') - # Stop token refresh if running + # Stop background tasks + await stop_matview_refresh() await stop_token_refresh() stop_backup_worker() diff --git a/databricks-builder-app/server/db/__init__.py b/databricks-builder-app/server/db/__init__.py index b161714e..822185d3 100644 --- a/databricks-builder-app/server/db/__init__.py +++ b/databricks-builder-app/server/db/__init__.py @@ -11,17 +11,20 @@ is_postgres_configured, run_migrations, session_scope, + start_matview_refresh, start_token_refresh, + stop_matview_refresh, stop_token_refresh, test_database_connection, ) -from .models import Base, Conversation, Execution, Message, Project +from .models import Base, Conversation, Execution, Message, Order, Project __all__ = [ 'Base', 'Conversation', 'Execution', 'Message', + 'Order', 'Project', 'create_tables', 'get_engine', @@ -33,7 +36,9 @@ 'is_postgres_configured', 'run_migrations', 'session_scope', + 'start_matview_refresh', 'start_token_refresh', + 'stop_matview_refresh', 'stop_token_refresh', 'test_database_connection', ] diff --git a/databricks-builder-app/server/db/database.py b/databricks-builder-app/server/db/database.py index 9c157e90..1f3be0a7 100644 --- a/databricks-builder-app/server/db/database.py +++ b/databricks-builder-app/server/db/database.py @@ -45,6 +45,12 @@ # Token refresh interval (50 minutes - tokens expire after 1 hour) TOKEN_REFRESH_INTERVAL_SECONDS = 50 * 60 +# Materialized view refresh state +_matview_refresh_task: Optional[asyncio.Task] = None + +# Materialized view refresh interval (1 hour) +MATVIEW_REFRESH_INTERVAL_SECONDS = 60 * 60 + # Cached resolved hostaddr for DNS workaround _resolved_hostaddr: Optional[str] = None @@ -513,6 +519,90 @@ async def test_database_connection() -> Optional[str]: return str(e) +async def _refresh_materialized_view(view_name: str = "daily_revenue_by_region") -> bool: + """Refresh a materialized view using CONCURRENTLY (non-blocking). + + Uses CONCURRENTLY so the view remains readable during refresh. + Requires a UNIQUE index on the materialized view. + + Args: + view_name: Name of the materialized view to refresh + + Returns: + True if refresh succeeded, False otherwise + """ + if _engine is None: + logger.warning("Cannot refresh materialized view: database engine not initialized") + return False + + try: + from sqlalchemy import text + + async with _engine.begin() as conn: + await conn.execute( + text(f"REFRESH MATERIALIZED VIEW CONCURRENTLY {view_name}") + ) + logger.info(f"Materialized view '{view_name}' refreshed successfully") + return True + except Exception as e: + logger.error(f"Failed to refresh materialized view '{view_name}': {e}") + return False + + +async def _matview_refresh_loop(): + """Background task to refresh materialized views every hour. + + Runs an initial refresh on startup (after a short delay to allow + migrations to complete), then refreshes every MATVIEW_REFRESH_INTERVAL_SECONDS. + """ + # Wait 30 seconds on startup to allow migrations to complete + await asyncio.sleep(30) + + while True: + try: + await _refresh_materialized_view("daily_revenue_by_region") + except asyncio.CancelledError: + logger.info("Materialized view refresh task cancelled") + break + except Exception as e: + logger.error(f"Error in materialized view refresh loop: {e}") + + try: + await asyncio.sleep(MATVIEW_REFRESH_INTERVAL_SECONDS) + except asyncio.CancelledError: + logger.info("Materialized view refresh task cancelled") + break + + +async def start_matview_refresh(): + """Start the background materialized view refresh task.""" + global _matview_refresh_task + + if _matview_refresh_task is not None: + logger.warning("Materialized view refresh task already running") + return + + _matview_refresh_task = asyncio.create_task(_matview_refresh_loop()) + logger.info( + f"Started materialized view refresh background task " + f"(interval: {MATVIEW_REFRESH_INTERVAL_SECONDS}s)" + ) + + +async def stop_matview_refresh(): + """Stop the background materialized view refresh task.""" + global _matview_refresh_task + + if _matview_refresh_task is not None: + _matview_refresh_task.cancel() + try: + await _matview_refresh_task + except asyncio.CancelledError: + pass + _matview_refresh_task = None + logger.info("Stopped materialized view refresh background task") + + def run_migrations() -> None: """Run Alembic migrations programmatically. diff --git a/databricks-builder-app/server/db/models.py b/databricks-builder-app/server/db/models.py index 46ff02f8..4114772d 100644 --- a/databricks-builder-app/server/db/models.py +++ b/databricks-builder-app/server/db/models.py @@ -1,10 +1,11 @@ -"""Database models for Projects, Conversations, and Messages.""" +"""Database models for Projects, Conversations, Messages, and Orders.""" import uuid from datetime import datetime, timezone +from decimal import Decimal from typing import Any, List, Optional -from sqlalchemy import Boolean, DateTime, ForeignKey, Index, LargeBinary, String, Text +from sqlalchemy import Boolean, DateTime, ForeignKey, Index, LargeBinary, Numeric, String, Text from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -217,3 +218,41 @@ def to_dict(self) -> dict[str, Any]: 'created_at': self.created_at.isoformat() if self.created_at else None, 'updated_at': self.updated_at.isoformat() if self.updated_at else None, } + + +class Order(Base): + """Order model - represents a customer order with revenue data. + + Used as the source table for the daily_revenue_by_region materialized view. + """ + + __tablename__ = 'orders' + + id: Mapped[str] = mapped_column(String(50), primary_key=True, default=generate_uuid) + region: Mapped[str] = mapped_column(String(100), nullable=False, index=True) + amount: Mapped[Decimal] = mapped_column(Numeric(precision=12, scale=2), nullable=False) + currency: Mapped[str] = mapped_column(String(3), nullable=False, default='USD') + status: Mapped[str] = mapped_column( + String(20), nullable=False, default='completed' + ) # completed, pending, cancelled, refunded + customer_id: Mapped[Optional[str]] = mapped_column(String(50), nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=utc_now, nullable=False + ) + + __table_args__ = ( + Index('ix_orders_region_created_at', 'region', 'created_at'), + Index('ix_orders_status', 'status'), + ) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary.""" + return { + 'id': self.id, + 'region': self.region, + 'amount': float(self.amount) if self.amount else 0.0, + 'currency': self.currency, + 'status': self.status, + 'customer_id': self.customer_id, + 'created_at': self.created_at.isoformat() if self.created_at else None, + }