-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhostess_memory.py
More file actions
149 lines (123 loc) · 4.84 KB
/
hostess_memory.py
File metadata and controls
149 lines (123 loc) · 4.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
PHASE 56.5: Hostess Memory Module.
Manages Hostess interactions with decay visualization for 3D tree.
@status: active
@phase: 96
@depends: asyncio, dataclasses
@used_by: hostess_agent.py, chat_handler.py
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
import asyncio
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@dataclass
class InteractionRecord:
"""Single interaction record with decay tracking."""
file_id: str
query: str
response: str
timestamp: float
count: int = 1
decay_factor: float = 1.0
class HostessMemory:
"""Hostess memory system with automatic decay."""
def __init__(self, user_id: str, qdrant_client=None):
"""Initialize hostess memory for a user."""
self.user_id = user_id
self.interaction_tree: Dict[str, InteractionRecord] = {}
self.qdrant_client = qdrant_client
self._decay_task: Optional[asyncio.Task] = None
logger.info(f"[HostessMemory] Initialized for user {user_id}")
async def start(self):
"""Start periodic decay task."""
self._decay_task = asyncio.create_task(self._periodic_decay())
logger.info(f"[HostessMemory] Decay task started for {self.user_id}")
async def stop(self):
"""Stop decay task."""
if self._decay_task:
self._decay_task.cancel()
try:
await self._decay_task
except asyncio.CancelledError:
pass
logger.info(f"[HostessMemory] Decay task stopped for {self.user_id}")
async def _periodic_decay(self):
"""Decay interaction weights every hour."""
while True:
try:
await asyncio.sleep(3600) # 1 hour
now = datetime.now()
to_prune = []
for file_id, record in self.interaction_tree.items():
age_days = (now - datetime.fromtimestamp(record.timestamp)).days
record.decay_factor = max(0.1, 1.0 - (age_days * 0.05))
if record.decay_factor < 0.15:
to_prune.append(file_id)
for file_id in to_prune:
del self.interaction_tree[file_id]
logger.debug(f"[HostessMemory] Pruned {file_id} for {self.user_id}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"[HostessMemory] Decay error: {e}")
def record_interaction(self, file_id: str, query: str, response: str):
"""Record a new interaction with the hostess."""
if file_id in self.interaction_tree:
record = self.interaction_tree[file_id]
record.count += 1
record.timestamp = time.time()
record.decay_factor = min(1.0, record.decay_factor + 0.1)
else:
self.interaction_tree[file_id] = InteractionRecord(
file_id=file_id,
query=query,
response=response,
timestamp=time.time(),
)
logger.debug(
f"[HostessMemory] Recorded interaction for {file_id} ({self.interaction_tree[file_id].count}x)"
)
def get_recent_context(self, limit: int = 5) -> str:
"""Get recent interactions for context injection."""
sorted_records = sorted(
self.interaction_tree.items(),
key=lambda x: x[1].timestamp,
reverse=True,
)[:limit]
context_lines = []
for file_id, record in sorted_records:
preview = record.response[:150]
context_lines.append(f"File {file_id} ({record.count}x): {preview}...")
return "\n".join(context_lines) if context_lines else ""
def get_visual_tree_data(self) -> Dict:
"""Export for frontend visualization."""
nodes = [
{
"id": file_id,
"label": f"{file_id.split('/')[-1]} ({record.count}x)",
"size": min(10, record.count * 2),
"opacity": record.decay_factor,
}
for file_id, record in self.interaction_tree.items()
]
return {"nodes": nodes}
def clear(self):
"""Clear all interactions."""
self.interaction_tree.clear()
logger.info(f"[HostessMemory] Cleared all interactions for {self.user_id}")
@property
def stats(self) -> Dict:
"""Get memory statistics."""
return {
"total_interactions": len(self.interaction_tree),
"total_visits": sum(r.count for r in self.interaction_tree.values()),
"avg_decay": (
sum(r.decay_factor for r in self.interaction_tree.values())
/ len(self.interaction_tree)
if self.interaction_tree
else 0
),
}