-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudit.py
More file actions
executable file
·302 lines (261 loc) · 11.1 KB
/
audit.py
File metadata and controls
executable file
·302 lines (261 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#!/usr/bin/env python3
"""
Scan installed CC skills against session history and classify by usage.
Exit codes: 0 success, 1 error.
JSON output (--json flag): written to stdout.
"""
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime, timedelta, timezone
from pathlib import Path
def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--days", type=int, default=60, help="lookback window in days")
p.add_argument("--json", action="store_true", help="emit JSON to stdout")
p.add_argument("--dead-threshold", type=int, default=0,
help="max invocations to classify as dead (default 0)")
p.add_argument("--situational-threshold", type=int, default=2,
help="max invocations to classify as situational (default 2)")
return p.parse_args()
def load_protected_skills():
"""Skills mentioned in ~/.claude/CLAUDE.md should never be overridden."""
claude_md = Path.home() / ".claude" / "CLAUDE.md"
if not claude_md.exists():
return set()
text = claude_md.read_text(errors="replace")
# match /skill-name patterns inside backticks or slash-command references
found = set()
for m in re.finditer(r"`/([a-zA-Z0-9_-]+)`", text):
found.add(m.group(1))
for m in re.finditer(r"/([a-zA-Z0-9_-]+)", text):
found.add(m.group(1))
return found
def installed_skills():
"""Return dict of skill_name -> source_label for all enabled installed skills."""
settings_path = Path.home() / ".claude" / "settings.json"
if not settings_path.exists():
return {}
try:
settings = json.loads(settings_path.read_text())
except Exception as e:
print(f"ERROR reading settings.json: {e}", file=sys.stderr)
sys.exit(1)
result = {}
# user skills
for p in (Path.home() / ".claude" / "skills").glob("*/SKILL.md"):
result[p.parent.name] = "user"
# plugin skills (only enabled plugins)
enabled = settings.get("enabledPlugins", {})
for plugin_id, on in enabled.items():
if not on:
continue
try:
name, marketplace = plugin_id.split("@", 1)
except ValueError:
continue
plugin_root = Path.home() / ".claude" / "plugins" / "cache" / marketplace / name
if not plugin_root.exists():
continue
for p in plugin_root.rglob("SKILL.md"):
# skip agent/IDE sub-directories
if any(seg in str(p) for seg in ["/.agents/", "/.roo/", "/.junie/", "/.kiro/"]):
continue
result[p.parent.name] = f"plugin:{plugin_id}"
return result
def collect_usage(days: int):
"""Count invocations per skill name from session jsonl files."""
proj_root = Path.home() / ".claude" / "projects"
if not proj_root.exists():
return Counter()
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
counts: Counter = Counter()
for jsonl_path in proj_root.rglob("*.jsonl"):
try:
mtime = datetime.fromtimestamp(jsonl_path.stat().st_mtime, tz=timezone.utc)
except OSError:
continue
if mtime < cutoff:
continue
try:
with open(jsonl_path, errors="replace") as fh:
for raw in fh:
raw = raw.strip()
if not raw:
continue
try:
d = json.loads(raw)
except json.JSONDecodeError:
continue
msg_type = d.get("type")
# slash-command signal: <command-name>/foo</command-name> in user/system content
if msg_type in ("user", "system"):
msg = d.get("message", {}) if msg_type == "user" else {}
content = msg.get("content", "") if msg else d.get("content", "")
if isinstance(content, list):
content = " ".join(
b.get("text", "") for b in content if isinstance(b, dict)
)
if isinstance(content, str):
for m in re.finditer(r"<command-name>/([a-zA-Z0-9_-]+)", content):
counts[m.group(1)] += 1
# Skill tool signal: tool_use blocks with name=="Skill"
elif msg_type == "assistant":
msg = d.get("message", {})
content = msg.get("content", [])
if isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "tool_use" and block.get("name") == "Skill":
skill_name = block.get("input", {}).get("skill", "")
# strip plugin: prefix and leading slash
skill_name = skill_name.lstrip("/").split(":")[-1]
if skill_name:
counts[skill_name] += 1
except OSError:
continue
return counts
def read_skill_description(name: str, skills: dict) -> str:
"""Read the description frontmatter from a skill's SKILL.md, or return empty string."""
source = skills.get(name, "user")
settings_path = Path.home() / ".claude" / "settings.json"
if source == "user":
skill_md = Path.home() / ".claude" / "skills" / name / "SKILL.md"
else:
# plugin:name@marketplace
try:
plugin_id = source.split(":", 1)[1]
pname, marketplace = plugin_id.split("@", 1)
plugin_root = Path.home() / ".claude" / "plugins" / "cache" / marketplace / pname
skill_md = next(plugin_root.rglob(f"{name}/SKILL.md"), None)
if skill_md is None:
return ""
except Exception:
return ""
if not skill_md or not skill_md.exists():
return ""
try:
text = skill_md.read_text(errors="replace")
m = re.search(
r"^description:\s*[>|]?\s*\n?(.*?)(?=\n\w|\n---|\Z)",
text, re.DOTALL | re.MULTILINE
)
if m:
return re.sub(r"\s+", " ", m.group(1)).strip()
except OSError:
pass
return ""
def estimate_tokens(skill_names, skills: dict) -> int:
"""Estimate tokens removed per turn by pruning skill_names."""
total = 0
for name in skill_names:
desc = read_skill_description(name, skills)
# "- name: description\n"
line = f"- {name}: {desc}\n" if desc else f"- {name}\n"
total += len(line) // 4
return total
def main():
args = parse_args()
skills = installed_skills()
if not skills:
if args.json:
print(json.dumps({
"installed_count": 0,
"dead_count": 0,
"situational_count": 0,
"kept_count": 0,
"estimated_tokens_saved": 0,
"dead": [],
"situational": [],
"kept": [],
}))
else:
print("No installed skills found.")
return
usage = collect_usage(args.days)
has_history = bool(usage)
protected = load_protected_skills()
dead = []
situational = []
kept = []
# plugin_id -> {skills: [...], all_dead: bool}
plugins: dict = {}
for name in sorted(skills):
source = skills[name]
count = usage.get(name, 0)
is_plugin = source.startswith("plugin:")
if is_plugin:
plugin_id = source.split(":", 1)[1]
if plugin_id not in plugins:
plugins[plugin_id] = {"skills": [], "plugin_id": plugin_id}
plugins[plugin_id]["skills"].append({"name": name, "uses": count})
elif name in protected:
kept.append({"name": name, "uses": count, "source": source, "protected": True})
elif count <= args.dead_threshold:
dead.append({"name": name, "uses": count, "source": source})
elif count <= args.situational_threshold:
situational.append({"name": name, "uses": count, "source": source})
else:
kept.append({"name": name, "uses": count, "source": source})
# classify each plugin: all_dead = every skill has 0 uses
plugin_summaries = []
for pid, pdata in sorted(plugins.items()):
total_uses = sum(s["uses"] for s in pdata["skills"])
all_dead = total_uses == 0
plugin_summaries.append({
"plugin_id": pid,
"skill_count": len(pdata["skills"]),
"total_uses": total_uses,
"all_dead": all_dead,
"skills": pdata["skills"],
})
disableable_plugins = [p for p in plugin_summaries if p["all_dead"]]
tokens_saved = estimate_tokens(
[s["name"] for s in dead] + [s["name"] for s in situational],
skills
)
if args.json:
print(json.dumps({
"installed_count": len(skills),
"has_history": has_history,
"dead_count": len(dead),
"situational_count": len(situational),
"kept_count": len(kept),
"plugin_count": sum(len(p["skills"]) for p in plugin_summaries),
"disableable_plugin_count": len(disableable_plugins),
"estimated_tokens_saved": tokens_saved,
"dead": dead,
"situational": situational,
"kept": kept,
"plugins": plugin_summaries,
"disableable_plugins": disableable_plugins,
}, indent=2))
else:
user_skill_count = len(skills) - sum(len(p["skills"]) for p in plugin_summaries)
print(f"Installed skills: {len(skills)}")
print(f" User skills: {user_skill_count}")
print(f" Plugin skills: {sum(len(p['skills']) for p in plugin_summaries)} across {len(plugin_summaries)} plugin(s)")
if not has_history:
print(f" (No session history found — all skills appear unused. Install age may be <{args.days}d.)")
print(f"Dead (0 uses in {args.days}d): {len(dead)}")
print(f"Situational (1-{args.situational_threshold} uses): {len(situational)}")
print(f"Kept (>{args.situational_threshold} uses): {len(kept)}")
print(f"Est. tokens saved/turn: ~{tokens_saved}")
if dead:
print("\nDead user skills:")
for s in dead:
print(f" {s['name']:<40} [{s['source']}]")
if situational:
print("\nSituational user skills:")
for s in situational:
print(f" {s['name']:<40} {s['uses']:>2} uses [{s['source']}]")
if plugin_summaries:
print(f"\nPlugins ({len(disableable_plugins)} fully unused, disableable):")
for p in plugin_summaries:
status = "ALL DEAD — can disable" if p["all_dead"] else f"{p['total_uses']} uses across {p['skill_count']} skills"
print(f" {p['plugin_id']:<45} {status}")
if __name__ == "__main__":
main()