-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmodel_selection.py
More file actions
344 lines (287 loc) · 13.3 KB
/
Copy pathmodel_selection.py
File metadata and controls
344 lines (287 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python3
"""
model_selection.py — Automatische Modellauswahl für Issues
Dieses Modul implementiert die Logik zur Auswahl des besten KI-Modells
basierend auf Issue-Typ, Risiko, Kosten und historischen Daten.
Verwendung:
from model_selection import select_model
model = select_model(issue_text, labels, touched_files, repo_type)
"""
from __future__ import annotations
import re
from typing import Dict, List, Optional, Tuple
from model_catalog import (
OPENCODE_FREE_MODELS,
OPENCODE_LOW_STRENGTH_MODELS,
OPENCODE_MEDIUM_STRENGTH_MODELS,
)
# ────────────────────────────────────────────────────────────────────────────────
# Konfiguration und Konstanten
# ────────────────────────────────────────────────────────────────────────────────
# Issue-Kategorien
ISSUE_CATEGORIES = {
"docs-only": ["docs", "documentation", "readme", "license", "md", "rst"],
"tests": ["test", "pytest", "unittest", "spec"],
"python": ["python", "py", "django", "flask"],
"r": ["r", "rscript", "tidyverse", "shiny"],
"dashboard/ui": ["dashboard", "ui", "frontend", "react", "vue"],
"provider-integration": ["provider", "api", "integration", "codex", "mistral"],
"refactor": ["refactor", "cleanup", "restructure"],
"ci-failure": ["ci", "github actions", "travis", "circleci"],
"low-code-repo": ["low-code", "no-code", "config", "yaml", "json"],
}
# Risiko- und Stärke-Zuordnung
RISK_MAP = {
"docs-only": "low",
"tests": "medium",
"python": "medium",
"r": "medium",
"dashboard/ui": "high",
"provider-integration": "high",
"refactor": "high",
"ci-failure": "high",
"low-code-repo": "low",
}
STRENGTH_MAP = {
"low": ["mistral-small", "deepseek-coder:6.7b", "qwen-coder",
*OPENCODE_LOW_STRENGTH_MODELS],
"medium": ["mistral-medium", "claude-sonnet-3.5", "gpt-4o-mini",
*OPENCODE_MEDIUM_STRENGTH_MODELS],
"high": ["mistral-large", "claude-sonnet-4", "gpt-4o"],
}
# Modell-Kosten-Tiers (relativ)
COST_TIERS = {
"mistral-small": "cheap",
"deepseek-coder:6.7b": "cheap",
"qwen-coder": "cheap",
**{model: "cheap" for model in OPENCODE_FREE_MODELS},
"mistral-medium": "medium",
"claude-sonnet-3.5": "medium",
"gpt-4o-mini": "medium",
"mistral-large": "expensive",
"claude-sonnet-4": "expensive",
"gpt-4o": "expensive",
}
# Standard-Modell-Reihenfolge für Eskalation
MODEL_ESCALATION = [
*OPENCODE_FREE_MODELS,
"mistral-small",
"mistral-medium",
"mistral-large",
"claude-sonnet-3.5",
"claude-sonnet-4",
"gpt-4o-mini",
"gpt-4o",
]
# ────────────────────────────────────────────────────────────────────────────────
# Hilfsfunktionen
# ────────────────────────────────────────────────────────────────────────────────
def normalize_text(text: str) -> str:
"""Normalisiert Text für die Suche (Kleinbuchstaben, ohne Sonderzeichen)."""
return re.sub(r'[^a-z0-9\s-]', '', text.lower())
def extract_keywords(text: str) -> List[str]:
"""Extrahiert relevante Keywords aus einem Text."""
normalized = normalize_text(text)
return re.findall(r'\b\w{3,}\b', normalized)
def match_issue_category(keywords: List[str], labels: List[str], files: List[str]) -> str:
"""Klassifiziert ein Issue basierend auf Keywords, Labels und Dateien."""
for category, indicators in ISSUE_CATEGORIES.items():
# Prüfe Labels
if any(label.lower() in indicators for label in labels):
return category
# Prüfe Dateiendungen
if any(any(f.endswith(f".{ext}") for ext in indicators) for f in files):
return category
# Prüfe Keywords
if any(keyword in indicators for keyword in keywords):
return category
return "general" # Fallback
def get_risk_level(category: str) -> str:
"""Gibt das Risiko-Level für eine Issue-Kategorie zurück."""
return RISK_MAP.get(category, "medium") # Fallback: medium
def get_strength_tier(risk: str) -> List[str]:
"""Gibt die Modell-Stärke-Tier für ein Risiko-Level zurück."""
return STRENGTH_MAP.get(risk, STRENGTH_MAP["medium"]) # Fallback: medium
def get_cost_tier(model: str) -> str:
"""Gibt das Kosten-Tier für ein Modell zurück."""
return COST_TIERS.get(model, "medium") # Fallback: medium
def filter_models_by_cost(models: List[str], max_cost: str) -> List[str]:
"""Filtert Modelle nach maximalem Kosten-Tier."""
cost_order = ["cheap", "medium", "expensive"]
max_index = cost_order.index(max_cost)
return [m for m in models if cost_order.index(COST_TIERS.get(m, "medium")) <= max_index]
def _looks_like_repo_path(value: str) -> bool:
"""Return True for conservative repo-relative path candidates."""
if not value or any(char.isspace() for char in value):
return False
if value.startswith(("/", "http://", "https://")):
return False
if not re.fullmatch(r"[A-Za-z0-9_./-]+", value):
return False
return "/" in value or bool(re.search(r"\.[A-Za-z0-9]+$", value))
def _extract_path_candidates(text: str) -> List[str]:
paths: List[str] = []
for raw in re.split(r"[,;\s]+", text):
candidate = raw.strip().strip("`'\"()[]{}")
candidate = candidate.removeprefix("-").removeprefix("*").strip()
if _looks_like_repo_path(candidate) and candidate not in paths:
paths.append(candidate)
return paths
def extract_touched_files_from_issue_body(issue_body: str) -> List[str]:
"""Extract clear `Touches:` file hints from an issue body.
Supported patterns:
- `Touches: scripts/foo.py, tests/test_foo.py`
- `Touches:` followed by indented or bulleted path lines.
The parser is intentionally conservative. If no explicit `Touches:` marker
is present, or the following text does not look like repo-relative paths, it
returns an empty list.
"""
if not issue_body:
return []
paths: List[str] = []
lines = issue_body.splitlines()
for index, line in enumerate(lines):
match = re.match(r"^\s*touches\s*:\s*(.*)$", line, re.IGNORECASE)
if not match:
continue
paths.extend(_extract_path_candidates(match.group(1)))
for continuation in lines[index + 1:]:
if not continuation.strip():
break
if not re.match(r"^\s+(?:[-*]\s*)?|^\s*[-*]\s+", continuation):
break
next_paths = _extract_path_candidates(continuation)
if not next_paths:
break
for path in next_paths:
if path not in paths:
paths.append(path)
break
return paths
# ────────────────────────────────────────────────────────────────────────────────
# Hauptfunktionen
# ────────────────────────────────────────────────────────────────────────────────
def classify_issue(issue_text: str, labels: List[str], touched_files: List[str], repo_type: str) -> str:
"""
Klassifiziert ein Issue basierend auf Text, Labels, betroffenen Dateien und Repo-Typ.
Args:
issue_text: Der Text des Issues.
labels: Die Labels des Issues.
touched_files: Die betroffenen Dateien.
repo_type: Der Typ des Repositories (z.B. "python", "r", "docs").
Returns:
Die Issue-Kategorie (z.B. "docs-only", "tests", "python" etc.).
"""
keywords = extract_keywords(issue_text)
category = match_issue_category(keywords, labels, touched_files)
# Repo-Typ als Fallback oder zur Verfeinerung
if category == "general" and repo_type in ISSUE_CATEGORIES:
return repo_type
return category
def estimate_risk_and_strength(issue_category: str) -> Tuple[str, List[str]]:
"""
Schätzt das Risiko und die benötigte Modell-Stärke für eine Issue-Kategorie.
Args:
issue_category: Die Issue-Kategorie.
Returns:
Ein Tupel aus (Risiko-Level, Liste der passenden Modelle).
"""
risk = get_risk_level(issue_category)
strength_tier = get_strength_tier(risk)
return risk, strength_tier
def select_model(
issue_text: str,
labels: List[str],
touched_files: List[str],
repo_type: str,
max_cost_tier: str = "expensive",
manual_overrides: Optional[Dict[str, str]] = None,
run_history: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, str]:
"""
Wählt das beste Modell für ein Issue basierend auf Kategorie, Risiko, Kosten und Verlauf.
Args:
issue_text: Der Text des Issues.
labels: Die Labels des Issues.
touched_files: Die betroffenen Dateien.
repo_type: Der Typ des Repositories.
max_cost_tier: Das maximale Kosten-Tier ("cheap", "medium", "expensive").
manual_overrides: Manuelle Übersteuerungen (z.B. {"model": "claude-sonnet-4"}).
run_history: Verlauf früherer Runs (für Eskalation).
Returns:
Ein Dictionary mit:
- "model": Das ausgewählte Modell.
- "reason": Der Grund für die Auswahl.
- "cost_tier": Das Kosten-Tier.
- "fallback_plan": Mögliche Eskalationsmodelle.
"""
# 1. Issue klassifizieren
issue_category = classify_issue(issue_text, labels, touched_files, repo_type)
# 2. Risiko und Stärke schätzen
risk, strength_tier = estimate_risk_and_strength(issue_category)
# 3. Modelle nach Kosten filtern
affordable_models = filter_models_by_cost(strength_tier, max_cost_tier)
# 4. Manuelle Übersteuerungen anwenden
if manual_overrides and "model" in manual_overrides:
selected_model = manual_overrides["model"]
reason = f"Manuell übersteuert: {selected_model}"
else:
# 5. Eskalation basierend auf Verlauf
if run_history:
last_run = run_history[-1]
if last_run.get("status") in ["no-change", "failed"]:
# Eskalation: Nächststärkeres Modell wählen
current_index = MODEL_ESCALATION.index(last_run["model"])
if current_index + 1 < len(MODEL_ESCALATION):
selected_model = MODEL_ESCALATION[current_index + 1]
reason = f"Eskalation nach fehlgeschlagenem Run: {selected_model}"
else:
selected_model = affordable_models[0] # Fallback
reason = f"Maximale Eskalation erreicht; nutze {selected_model}"
else:
selected_model = affordable_models[0] # Standard
reason = f"Erstversuch mit günstigstem passendem Modell: {selected_model}"
else:
selected_model = affordable_models[0] # Standard
reason = f"Erstversuch mit günstigstem passendem Modell: {selected_model}"
# 6. Metadaten zusammenstellen
cost_tier = get_cost_tier(selected_model)
fallback_plan = [m for m in MODEL_ESCALATION if m != selected_model][:2] # Nächste 2 Modelle
return {
"model": selected_model,
"reason": reason,
"risk": risk,
"category": issue_category,
"cost_tier": cost_tier,
"fallback_plan": fallback_plan,
}
# ────────────────────────────────────────────────────────────────────────────────
# CLI-Integration (für solve_issues.py)
# ────────────────────────────────────────────────────────────────────────────────
def select_model_for_issue(
issue: Dict[str, str],
repo_type: str,
max_cost_tier: str = "expensive",
manual_overrides: Optional[Dict[str, str]] = None,
run_history: Optional[List[Dict[str, str]]] = None,
) -> Dict[str, str]:
"""
Wählt ein Modell für ein GitHub-Issue aus (CLI-Adapter).
Args:
issue: Das GitHub-Issue als Dictionary.
repo_type: Der Typ des Repositories.
max_cost_tier: Das maximale Kosten-Tier.
manual_overrides: Manuelle Übersteuerungen.
run_history: Verlauf früherer Runs.
Returns:
Das Ergebnis der Modellauswahl.
"""
return select_model(
issue_text=issue.get("body", ""),
labels=issue.get("labels", []),
touched_files=extract_touched_files_from_issue_body(issue.get("body", "")),
repo_type=repo_type,
max_cost_tier=max_cost_tier,
manual_overrides=manual_overrides,
run_history=run_history,
)