-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcodec_skill_registry.py
More file actions
401 lines (344 loc) · 16.9 KB
/
codec_skill_registry.py
File metadata and controls
401 lines (344 loc) · 16.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
"""CODEC Skill Registry — lazy-loads skill modules on demand.
At startup, only parses skill files with `ast` to extract metadata
(SKILL_NAME, SKILL_DESCRIPTION, SKILL_TRIGGERS, SKILL_MCP_EXPOSE).
The actual module import happens on first invocation and is cached.
Load-time AST safety check (closes audit finding D-1, CRITICAL — see
docs/audits/PHASE-1-SECURITY.md). Two-stage defense per `load()`:
1. Trusted-manifest check. `<skills_dir>/.manifest.json` (generated by
`tools/generate_skill_manifest.py`) lists sha256 of every approved
built-in skill source. If the file's hash matches an entry, the skill
is a known-good built-in and `exec_module` proceeds. This is what lets
legitimately-dangerous built-ins (`calculator`, `system`, `file_write`,
`pilot`, ...) continue to work — they're hash-pinned at PR-review time.
2. AST safety gate. Files NOT in the manifest (or in skill dirs without
a manifest, e.g. `~/.codec/skills/`) run through `is_dangerous_skill_code`.
Any dangerous pattern → refuse + emit `skill_load_blocked` audit event.
An attacker who drops a malicious `<x>.py` via any path (D-2 forge,
D-3 save_skill, D-4 file_write, D-5 path-traversal) won't match the
manifest (we never trust a hash we didn't sign off on) AND will trip the
AST gate. Fail-closed.
"""
import ast
import hashlib
import importlib.util
import json
import logging
import os
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from codec_audit import log_event
from codec_config import is_dangerous_skill_code
log = logging.getLogger("codec")
# Name of the trusted-manifest file inside each skills directory.
TRUSTED_MANIFEST_FILENAME = ".manifest.json"
# A-4: user trigger overrides. Module-level so tests can monkeypatch it; the
# canonical registry now honors this everywhere (the legacy codec_core loader
# read the same file but only affected the voice path).
CUSTOM_TRIGGERS_PATH = os.path.expanduser("~/.codec/custom_triggers.json")
def _extract_metadata(filepath: str) -> Optional[Dict[str, Any]]:
"""Parse a skill .py file with ast to extract module-level metadata
without executing it. Returns a dict or None if the file is not a
valid skill (missing SKILL_TRIGGERS or run function)."""
try:
with open(filepath, "r", encoding="utf-8", errors="ignore") as f:
source = f.read()
tree = ast.parse(source, filename=filepath)
except (SyntaxError, OSError) as e:
log.warning("Skill metadata parse error (%s): %s", filepath, e)
return None
meta: Dict[str, Any] = {}
has_run_func = False
for node in ast.iter_child_nodes(tree):
# Detect top-level: SKILL_NAME = "..." etc.
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id in (
"SKILL_NAME",
"SKILL_DESCRIPTION",
"SKILL_TRIGGERS",
"SKILL_MCP_EXPOSE",
# Phase 2 Step 6 — declarative auto-fire trigger (Q3).
# AST extraction; validation happens in codec_triggers.
"SKILL_OBSERVATION_TRIGGER",
# Phase 2 Step 5 §X — skill-flag injection override.
"SKILL_NEEDS_OBSERVATION",
# re-audit — marks a high-power skill that requires consent
# before firing from chat / refusal over MCP (codec_consent).
"SKILL_DESTRUCTIVE",
):
try:
meta[target.id] = ast.literal_eval(node.value)
except (ValueError, TypeError):
pass
# Detect def run(...)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
if node.name == "run":
has_run_func = True
if not has_run_func:
return None
return meta
class SkillRegistry:
"""Registry that stores skill metadata eagerly and imports modules lazily.
Usage:
registry = SkillRegistry(skills_dir)
registry.scan() # fast — AST parse only
meta = registry.get_metadata() # list of dicts with name/desc/triggers
mod = registry.load("weather") # first call imports; subsequent calls return cache
"""
def __init__(self, skills_dir: str):
self.skills_dir = skills_dir
# name -> metadata dict (always populated after scan)
self._meta: Dict[str, Dict[str, Any]] = {}
# name -> file path
self._paths: Dict[str, str] = {}
# name -> loaded module (populated on demand)
self._modules: Dict[str, Any] = {}
# sha256-hex of every trusted built-in skill source (loaded from
# `<skills_dir>/.manifest.json` at scan() time; empty when the file
# is missing — e.g. user-installed skills dir at ~/.codec/skills/).
self._trusted_hashes: Set[str] = set()
# name -> user-overridden trigger list (A-4: loaded from
# ~/.codec/custom_triggers.json at scan() time). Before A-4, only the
# legacy codec_core.load_skills honored these — so custom triggers
# worked on the voice path but NOT chat/MCP. Wiring them into the
# canonical registry makes them consistent everywhere.
self._custom_triggers: Dict[str, List[str]] = {}
def scan(self) -> int:
"""Scan skills directory and extract metadata via AST.
Returns the number of skills discovered."""
self._meta.clear()
self._paths.clear()
# Do NOT clear _modules — keep any already-loaded modules cached
if not os.path.isdir(self.skills_dir):
return 0
# Reload the trusted-skill manifest on every scan so manifest edits
# take effect without restarting the registry.
self._trusted_hashes = self._load_trusted_manifest()
# Reload user trigger overrides too (A-4) — same refresh-on-scan policy.
self._custom_triggers = self._load_custom_triggers()
for fname in sorted(os.listdir(self.skills_dir)):
if not fname.endswith(".py") or fname.startswith("_"):
continue
filepath = os.path.join(self.skills_dir, fname)
meta = _extract_metadata(filepath)
if meta is None:
continue
name = meta.get("SKILL_NAME", fname[:-3])
self._meta[name] = meta
self._paths[name] = filepath
log.info(
"Skill registry: %d skills discovered (metadata only), %d in trusted manifest",
len(self._meta), len(self._trusted_hashes),
)
return len(self._meta)
def _load_trusted_manifest(self) -> Set[str]:
"""Read `<skills_dir>/.manifest.json` and return the set of trusted
sha256 hex strings. Returns empty set if the file is missing or
malformed — user-installed skill dirs (e.g. ~/.codec/skills/) have
no manifest by design, so they get the AST check on every load."""
manifest_path = os.path.join(self.skills_dir, TRUSTED_MANIFEST_FILENAME)
try:
with open(manifest_path, "r", encoding="utf-8") as f:
data = json.load(f)
except FileNotFoundError:
return set()
except (OSError, json.JSONDecodeError) as e:
log.warning(
"Skill registry: trusted manifest at %s unreadable (%s); "
"treating as empty — all skills will run the AST safety check",
manifest_path, e,
)
return set()
skills = data.get("skills")
if not isinstance(skills, dict):
log.warning(
"Skill registry: manifest %s has no 'skills' dict; treating as empty",
manifest_path,
)
return set()
return {v for v in skills.values() if isinstance(v, str)}
# ── Metadata access (no import needed) ──────────────────────────────
def names(self) -> List[str]:
return list(self._meta.keys())
def get_meta(self, name: str) -> Optional[Dict[str, Any]]:
return self._meta.get(name)
def all_metadata(self) -> Dict[str, Dict[str, Any]]:
return dict(self._meta)
def _load_custom_triggers(self) -> Dict[str, List[str]]:
"""Load user trigger overrides from custom_triggers.json (A-4).
Format: {"<skill_name>": {"triggers": [...]}}. Returns {name: triggers}
for entries with a non-empty list. Tolerates a missing/malformed file.
Path is the module-level CUSTOM_TRIGGERS_PATH (patchable in tests)."""
path = CUSTOM_TRIGGERS_PATH
try:
with open(path, "r", encoding="utf-8") as f:
raw = json.load(f)
except (FileNotFoundError, OSError, json.JSONDecodeError):
return {}
if not isinstance(raw, dict):
return {}
out: Dict[str, List[str]] = {}
for name, entry in raw.items():
if isinstance(entry, dict):
trigs = entry.get("triggers")
if isinstance(trigs, list) and trigs:
out[name] = [str(t) for t in trigs]
return out
def _effective_triggers(self, name: str) -> List[str]:
"""A skill's triggers: the user override from custom_triggers.json if
present, else the AST-extracted SKILL_TRIGGERS (A-4)."""
if name in self._custom_triggers:
return self._custom_triggers[name]
return self._meta.get(name, {}).get("SKILL_TRIGGERS", [])
def get_triggers(self, name: str) -> List[str]:
return self._effective_triggers(name)
def get_description(self, name: str) -> str:
meta = self._meta.get(name, {})
return meta.get("SKILL_DESCRIPTION", name)
def get_mcp_expose(self, name: str) -> Optional[bool]:
meta = self._meta.get(name, {})
return meta.get("SKILL_MCP_EXPOSE", None)
def get_destructive(self, name: str) -> bool:
"""re-audit — True if the skill declares SKILL_DESTRUCTIVE=True (a
high-power op that needs consent on chat / refusal over MCP). Defaults
False when undeclared. Read by codec_consent.is_destructive_skill."""
return bool(self._meta.get(name, {}).get("SKILL_DESTRUCTIVE", False))
def get_observation_trigger(self, name: str) -> Optional[Dict[str, Any]]:
"""Phase 2 Step 6 — return the SKILL_OBSERVATION_TRIGGER dict
for a skill, or None if not declared. Validation happens in
codec_triggers; this just surfaces what AST extracted."""
meta = self._meta.get(name, {})
return meta.get("SKILL_OBSERVATION_TRIGGER", None)
# ── Lazy module loading ─────────────────────────────────────────────
def load(self, name: str) -> Optional[Any]:
"""Import and cache the skill module. Returns the module or None.
Load-time defense (closes D-1): two-stage check before `exec_module`.
Stage 1 — Trusted manifest. If the file's sha256 is in
`<skills_dir>/.manifest.json`, the skill is a known-good built-in
(hash-pinned at PR-review time); skip the AST gate.
Stage 2 — AST safety gate. Files NOT in the manifest run through
`is_dangerous_skill_code`. Any dangerous pattern → refuse + emit
`skill_load_blocked` audit event. Fail-safe on any error inside
the check itself (e.g. validator raised, file unreadable).
"""
if name in self._modules:
return self._modules[name]
filepath = self._paths.get(name)
if not filepath:
log.warning("Skill '%s' not found in registry", name)
return None
# Read raw bytes for hashing + decode separately for the AST check.
# Fail-safe on any I/O problem.
try:
raw = Path(filepath).read_bytes()
except Exception as e:
log.warning(
"Skill '%s' source unreadable (%s) — refusing load",
name, e,
)
return None
file_hash = hashlib.sha256(raw).hexdigest()
trusted = file_hash in self._trusted_hashes
if not trusted:
# Decode for AST check; fail-safe on UnicodeDecodeError.
try:
src = raw.decode("utf-8")
except UnicodeDecodeError as e:
log.warning(
"Skill '%s' not valid UTF-8 (%s) — refusing load",
name, e,
)
return None
# AST safety gate. Broad except by design: an exception here must
# NOT fall through to exec_module — that would defeat the gate.
try:
dangerous, reason = is_dangerous_skill_code(src)
except Exception as e:
log.warning(
"Skill '%s' AST check raised %s — refusing fail-safe",
name, e,
)
return None
if dangerous:
log.warning(
"Skill '%s' refused at load time (not in trusted manifest): %s",
name, reason,
)
try:
log_event(
"skill_load_blocked",
source="codec-skill-registry",
message=f"Refused skill {name}: {reason}",
level="warning",
outcome="error",
extra={
"skill_name": name,
"skill_path": filepath,
"reason": reason,
"file_sha256": file_hash,
},
)
except Exception:
# Audit failure must not mask the refusal.
pass
return None
try:
mod_name = os.path.basename(filepath)[:-3]
spec = importlib.util.spec_from_file_location(mod_name, filepath)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
self._modules[name] = mod
log.info(
"Lazy-loaded skill: %s (%s)",
name, "trusted manifest" if trusted else "passed AST gate",
)
return mod
except Exception as e:
log.warning("Skill import error (%s): %s", name, e)
return None
def is_loaded(self, name: str) -> bool:
return name in self._modules
# ── Convenience: run a skill by name ────────────────────────────────
def run(self, name: str, task: str, *args, **kwargs) -> Optional[str]:
"""Load (if needed) and execute a skill's run() function.
If sandboxing is available, user-generated skills run in a sandboxed
subprocess. Built-in skills with system access run directly.
"""
filepath = self._paths.get(name)
sandboxed = kwargs.pop("sandboxed", False)
if sandboxed and filepath:
try:
from codec_sandbox import run_skill_sandboxed
ok, result = run_skill_sandboxed(
filepath, task,
app=kwargs.get("app", args[0] if args else ""),
ctx=kwargs.get("ctx", args[1] if len(args) > 1 else ""),
)
if not ok:
log.warning("Sandboxed skill %s failed: %s", name, result)
return result
except ImportError:
log.warning("codec_sandbox not available, running skill directly")
mod = self.load(name)
if mod is None or not hasattr(mod, "run"):
return None
return mod.run(task, *args, **kwargs)
# ── Trigger matching (replaces check_skill) ─────────────────────────
def match_trigger(self, task: str) -> Optional[str]:
"""Return the name of the first skill whose triggers match, or None.
Uses word-boundary matching to avoid false positives from substrings."""
matches = self.match_all_triggers(task)
return matches[0] if matches else None
def match_all_triggers(self, task: str) -> List[str]:
"""Return all skill names whose triggers match, sorted by specificity (longest trigger first)."""
low = task.lower()
scored = []
for name in self._meta:
triggers = self._effective_triggers(name) # A-4: honors custom_triggers
matched = [t for t in triggers if re.search(r'\b' + re.escape(t) + r'\b', low)]
if matched:
best = max(len(t) for t in matched)
scored.append((best, name))
scored.sort(key=lambda x: x[0], reverse=True)
return [name for _, name in scored]