-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemory_hygiene.py
More file actions
executable file
·298 lines (250 loc) · 10.4 KB
/
memory_hygiene.py
File metadata and controls
executable file
·298 lines (250 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
#!/usr/bin/env python3
"""
Memory Hygiene — Deadness Scoring for Aurora
Scores memory files on staleness, dead platform references, size, and economic
relevance. Outputs archival candidates so Aurora (or the orchestrator) can act.
Usage:
python3 memory_hygiene.py # Full report
python3 memory_hygiene.py --json # Machine-readable output
python3 memory_hygiene.py --auto-archive # Move dead files to archive/
Scoring (0-100, higher = healthier):
- Recency: files not modified in 7+ days lose points
- Dead references: mentions of known-dead platforms penalised
- Size efficiency: oversized files penalised
- Economic relevance: files linked to active revenue streams score higher
"""
import json
import os
import re
import sys
import shutil
from datetime import datetime, timezone
from pathlib import Path
BASE_DIR = Path(__file__).parent
MEMORY_DIR = BASE_DIR / "memory"
ARCHIVE_DIR = BASE_DIR / "archive"
LOG_FILE = BASE_DIR / "logs" / "memory_hygiene.jsonl"
# Known-dead platforms/services — update as reality changes
DEAD_PLATFORMS = {
"reddit": "shadow-banned",
"x/twitter": "$0 credits",
"twitter": "$0 credits",
"x premium": "$0 credits (X is dead for Aurora)",
"x strategy": "$0 credits (X is dead for Aurora)",
"fiverr": "GUI-only, no API",
"gumroad": "KYC blocked",
"proxies.sx": "maintainer inactive, $SX not USDC",
"proxies_sx": "maintainer inactive",
"clawgig": "suspended",
"clawtasks": "suspended",
"nullpath": "500 error",
"ubounty": "browser-only",
"bountycaster": "needs badges",
}
# Filename patterns that indicate dead content (checked against filename, not content)
DEAD_FILENAME_PATTERNS = ["x-strategy", "twitter", "reddit"]
# Active platforms — files referencing these score higher
ACTIVE_PLATFORMS = {
"superteam", "agentpact", "moltlaunch", "near",
"github", "telegram", "email", "gmail",
"dev.to", "hashnode", "paragraph",
"base", "solana", "usdc",
}
# Files that should never be archived
PROTECTED_FILES = {"MEMORY.md"}
# Thresholds
STALE_DAYS = 7 # Files older than this lose recency points
VERY_STALE_DAYS = 14 # Files older than this are strong archive candidates
LARGE_FILE_TOKENS = 3000 # Files estimated above this are "oversized"
ARCHIVE_THRESHOLD = 35 # Score below this → recommend archival
def estimate_tokens(text):
"""Rough token estimate (4 chars per token)."""
return len(text) // 4
def score_file(filepath, content, now):
"""Score a single memory file. Returns dict with score and reasons."""
rel_path = str(filepath.relative_to(MEMORY_DIR))
mtime = datetime.fromtimestamp(filepath.stat().st_mtime, tz=timezone.utc)
age_days = (now - mtime).total_seconds() / 86400
tokens = estimate_tokens(content)
content_lower = content.lower()
score = 100
reasons = []
strengths = []
# --- Recency (0-30 points at risk) ---
if age_days > VERY_STALE_DAYS:
penalty = min(30, int(age_days - VERY_STALE_DAYS))
score -= penalty
reasons.append(f"Very stale: {age_days:.0f} days old (-{penalty})")
elif age_days > STALE_DAYS:
penalty = min(15, int((age_days - STALE_DAYS) * 2))
score -= penalty
reasons.append(f"Stale: {age_days:.0f} days old (-{penalty})")
else:
strengths.append(f"Fresh: {age_days:.1f} days old")
# --- Dead platform references (0-30 points at risk) ---
dead_found = []
# Check filename against known-dead patterns
rel_lower = rel_path.lower()
for pattern in DEAD_FILENAME_PATTERNS:
if pattern in rel_lower:
dead_found.append(f"filename matches '{pattern}' (dead platform)")
# Check content for dead platform mentions
for platform, reason in DEAD_PLATFORMS.items():
count = len(re.findall(r'\b' + re.escape(platform) + r'\b', content_lower))
if count >= 2:
dead_found.append(f"{platform} ({reason}, {count} refs)")
if dead_found:
penalty = min(30, len(dead_found) * 10)
score -= penalty
reasons.append(f"Dead platform refs: {', '.join(dead_found)} (-{penalty})")
# --- Active platform references (bonus up to +10) ---
active_found = []
for platform in ACTIVE_PLATFORMS:
if platform in content_lower:
active_found.append(platform)
if active_found:
bonus = min(10, len(active_found) * 3)
score = min(100, score + bonus)
strengths.append(f"Active refs: {', '.join(active_found[:5])} (+{bonus})")
# --- Size efficiency (0-15 points at risk) ---
if tokens > LARGE_FILE_TOKENS:
penalty = min(15, (tokens - LARGE_FILE_TOKENS) // 200)
score -= penalty
reasons.append(f"Oversized: ~{tokens} tokens (-{penalty})")
elif tokens > 1500:
pass # Normal size
else:
strengths.append(f"Compact: ~{tokens} tokens")
# --- Economic relevance (0-15 points at risk) ---
economic_keywords = ["revenue", "income", "paid", "earning", "bounty", "gig",
"client", "invoice", "usdc", "payment", "contract"]
econ_hits = sum(1 for kw in economic_keywords if kw in content_lower)
if econ_hits == 0:
score -= 10
reasons.append("No economic relevance (-10)")
elif econ_hits >= 3:
strengths.append(f"Economically relevant ({econ_hits} keywords)")
# --- Protection ---
if rel_path in PROTECTED_FILES:
score = max(score, ARCHIVE_THRESHOLD + 1) # Never recommend archiving
strengths.append("Protected file")
score = max(0, min(100, score))
recommendation = "keep"
if score < ARCHIVE_THRESHOLD:
recommendation = "archive"
elif score < 50:
recommendation = "review"
return {
"file": rel_path,
"score": score,
"recommendation": recommendation,
"age_days": round(age_days, 1),
"tokens": tokens,
"reasons": reasons,
"strengths": strengths,
"last_modified": mtime.isoformat(),
}
def run_hygiene(memory_dir=None):
"""Run hygiene check on all memory files. Returns list of scored results."""
mem_dir = memory_dir or MEMORY_DIR
if not mem_dir.exists():
return []
now = datetime.now(timezone.utc)
results = []
for f in sorted(mem_dir.rglob("*")):
if f.is_file():
try:
content = f.read_text(encoding="utf-8")
except (OSError, UnicodeDecodeError):
continue
results.append(score_file(f, content, now))
results.sort(key=lambda x: x["score"])
return results
def format_report(results):
"""Format hygiene results for display."""
if not results:
return "No memory files found."
lines = ["# Memory Hygiene Report", ""]
archive_candidates = [r for r in results if r["recommendation"] == "archive"]
review_candidates = [r for r in results if r["recommendation"] == "review"]
healthy = [r for r in results if r["recommendation"] == "keep"]
if archive_candidates:
lines.append(f"## ARCHIVE ({len(archive_candidates)} files)")
for r in archive_candidates:
lines.append(f" {r['file']}: score {r['score']}/100 ({r['age_days']:.0f} days, ~{r['tokens']} tok)")
for reason in r["reasons"]:
lines.append(f" ! {reason}")
lines.append("")
if review_candidates:
lines.append(f"## REVIEW ({len(review_candidates)} files)")
for r in review_candidates:
lines.append(f" {r['file']}: score {r['score']}/100 ({r['age_days']:.0f} days, ~{r['tokens']} tok)")
for reason in r["reasons"]:
lines.append(f" ! {reason}")
lines.append("")
if healthy:
lines.append(f"## HEALTHY ({len(healthy)} files)")
for r in healthy:
lines.append(f" {r['file']}: score {r['score']}/100")
lines.append("")
total_tokens = sum(r["tokens"] for r in results)
archivable_tokens = sum(r["tokens"] for r in archive_candidates)
lines.append(f"Total memory: ~{total_tokens} tokens")
if archivable_tokens:
lines.append(f"Archivable: ~{archivable_tokens} tokens ({archivable_tokens * 100 // total_tokens}% of total)")
return "\n".join(lines)
def auto_archive(results):
"""Move archive-recommended files to archive/."""
ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
archived = []
for r in results:
if r["recommendation"] == "archive" and r["file"] not in PROTECTED_FILES:
src = MEMORY_DIR / r["file"]
dst = ARCHIVE_DIR / r["file"]
if src.exists():
dst.parent.mkdir(parents=True, exist_ok=True)
shutil.move(str(src), str(dst))
archived.append(r["file"])
return archived
def log_hygiene(results):
"""Log hygiene run to audit trail."""
LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"files_checked": len(results),
"archive_candidates": len([r for r in results if r["recommendation"] == "archive"]),
"review_candidates": len([r for r in results if r["recommendation"] == "review"]),
"total_tokens": sum(r["tokens"] for r in results),
}
try:
with open(LOG_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
except OSError:
pass
def get_hygiene_warning(results):
"""Generate a short warning string for inclusion in wake prompt.
Returns None if everything is healthy."""
archive_candidates = [r for r in results if r["recommendation"] == "archive"]
if not archive_candidates:
return None
archivable_tokens = sum(r["tokens"] for r in archive_candidates)
file_list = ", ".join(r["file"] for r in archive_candidates[:5])
return (
f"MEMORY HYGIENE: {len(archive_candidates)} file(s) should be archived "
f"(~{archivable_tokens} tokens). Candidates: {file_list}. "
f"Run `python3 memory_hygiene.py` for details, or "
f"`python3 memory_hygiene.py --auto-archive` to clean up."
)
if __name__ == "__main__":
results = run_hygiene()
log_hygiene(results)
if "--json" in sys.argv:
print(json.dumps(results, indent=2))
elif "--auto-archive" in sys.argv:
archived = auto_archive(results)
if archived:
print(f"Archived {len(archived)} files: {', '.join(archived)}")
else:
print("No files to archive.")
else:
print(format_report(results))