-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmemory_engine.py
More file actions
296 lines (235 loc) · 10 KB
/
memory_engine.py
File metadata and controls
296 lines (235 loc) · 10 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Memory Engine for AI Agent Memory Management
Handles memory storage, recall, and intelligent memory management
"""
import os
import asyncio
from typing import List, Dict, Any, Optional
import json
import hashlib
from datetime import datetime, timedelta
import re
from database import agent_db
from encryption_engine import encryption_engine
class MemoryEngine:
def __init__(self):
self.max_memory_length = int(os.getenv("MAX_MEMORY_LENGTH", "10000")) # Max characters per memory
self.default_ttl = int(os.getenv("DEFAULT_MEMORY_TTL", "86400")) # 24 hours default
async def store_memory(self, agent_id: str, session_id: str, context: str,
tags: List[str] = None, ttl: int = None) -> Dict[str, str]:
"""
Store agent memory with intelligent processing
Args:
agent_id: Agent identifier
session_id: Session identifier
context: Memory content
tags: Optional tags for categorization
ttl: Time to live in seconds
Returns:
Dictionary with storage result
"""
try:
# Validate inputs
if not agent_id or not session_id or not context:
raise ValueError("agent_id, session_id, and context are required")
if len(context) > self.max_memory_length:
# Truncate context if too long
context = context[:self.max_memory_length] + "...[truncated]"
# Clean and normalize context
context = self._clean_context(context)
# Encrypt context before storing
context = encryption_engine.encrypt(context)
# Extract additional tags from content if not provided
if tags is None:
tags = []
# Auto-extract tags from content
auto_tags = self._extract_tags_from_context(context)
tags.extend(auto_tags)
# Remove duplicates and limit tag count
tags = list(set(tags))[:10] # Max 10 tags
# Use default TTL if not specified
if ttl is None:
ttl = self.default_ttl
# Store in database
result = await agent_db.store_memory(
agent_id=agent_id,
session_id=session_id,
context=context,
tags=tags,
ttl=ttl
)
print(f"[OK] Memory stored for agent {agent_id}: {result['memory_id']}")
return result
except Exception as e:
print(f"[ERROR] Memory storage failed: {e}")
raise
async def recall_memories(self, agent_id: str, query: str, tags: List[str] = None,
limit: int = 10) -> List[Dict[str, Any]]:
"""
Recall memories with intelligent search
Args:
agent_id: Agent identifier
query: Search query
tags: Optional tag filters
limit: Maximum number of memories to return
Returns:
List of relevant memories with relevance scoring
"""
try:
# Validate inputs
if not agent_id:
raise ValueError("agent_id is required")
# Clean query
query = self._clean_query(query)
# Get memories from database
memories = await agent_db.recall_memories(
agent_id=agent_id,
query=query,
tags=tags,
limit=limit * 2 # Get more to allow for relevance filtering
)
# Decrypt context fields before scoring
for m in memories:
m["context"] = encryption_engine.decrypt(m["context"])
# Apply relevance scoring and filtering
scored_memories = self._score_relevance(memories, query)
# Sort by relevance score and limit results
scored_memories.sort(key=lambda x: x.get('relevance_score', 0), reverse=True)
# Return top results with relevance scores
return scored_memories[:limit]
except Exception as e:
print(f"[ERROR] Memory recall failed: {e}")
raise
async def get_memory_summary(self, agent_id: str, session_id: str = None) -> Dict[str, Any]:
"""
Get memory summary for an agent
Args:
agent_id: Agent identifier
session_id: Optional session filter
Returns:
Memory summary statistics
"""
try:
# Get recent memories
recent_memories = await agent_db.recall_memories(
agent_id=agent_id,
query="",
limit=100
)
# Filter by session if specified
if session_id:
recent_memories = [m for m in recent_memories if session_id in str(m)]
# Analyze memories
total_memories = len(recent_memories)
# Extract tag statistics
all_tags = []
for memory in recent_memories:
all_tags.extend(memory.get('tags', []))
tag_counts = {}
for tag in all_tags:
tag_counts[tag] = tag_counts.get(tag, 0) + 1
# Get top tags
top_tags = sorted(tag_counts.items(), key=lambda x: x[1], reverse=True)[:10]
# Calculate time span
if recent_memories:
earliest = min(datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')) for m in recent_memories)
latest = max(datetime.fromisoformat(m['created_at'].replace('Z', '+00:00')) for m in recent_memories)
time_span_days = (latest - earliest).days
else:
time_span_days = 0
return {
'total_memories': total_memories,
'time_span_days': time_span_days,
'top_tags': [{'tag': tag, 'count': count} for tag, count in top_tags],
'session_filter': session_id,
'summary_generated_at': datetime.now().isoformat()
}
except Exception as e:
print(f"[ERROR] Memory summary generation failed: {e}")
raise
def _clean_context(self, context: str) -> str:
"""Clean and normalize context text"""
# Remove excessive whitespace
context = re.sub(r'\s+', ' ', context.strip())
# Remove potentially sensitive data patterns
context = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '[EMAIL]', context)
context = re.sub(r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b', '[CARD]', context)
context = re.sub(r'\b\d{3}-\d{2}-\d{4}\b', '[SSN]', context)
return context
def _clean_query(self, query: str) -> str:
"""Clean and normalize search query"""
if not query:
return ""
# Remove excessive whitespace and normalize
query = re.sub(r'\s+', ' ', query.strip())
return query
def _extract_tags_from_context(self, context: str) -> List[str]:
"""Extract relevant tags from context content"""
tags = []
# Common AI/tech keywords
ai_keywords = [
'ai', 'machine learning', 'deep learning', 'neural network',
'llm', 'gpt', 'claude', 'chatbot', 'nlp', 'computer vision',
'api', 'database', 'web', 'mobile', 'cloud', 'docker'
]
context_lower = context.lower()
for keyword in ai_keywords:
if keyword in context_lower:
tags.append(keyword.replace(' ', '_'))
# Extract programming languages
languages = [
'python', 'javascript', 'java', 'go', 'rust', 'cpp',
'typescript', 'swift', 'kotlin', 'php', 'ruby'
]
for lang in languages:
if lang in context_lower:
tags.append(f'lang_{lang}')
# Extract action types
action_keywords = {
'error': ['error', 'exception', 'fail', 'bug'],
'debug': ['debug', 'trace', 'log'],
'feature': ['feature', 'implement', 'add', 'create'],
'fix': ['fix', 'resolve', 'solve', 'patch'],
'optimization': ['optimize', 'performance', 'speed', 'improve']
}
for action, keywords in action_keywords.items():
if any(keyword in context_lower for keyword in keywords):
tags.append(action)
return tags[:5] # Limit auto-extracted tags
def _score_relevance(self, memories: List[Dict[str, Any]], query: str) -> List[Dict[str, Any]]:
"""Score memories for relevance to query"""
if not query:
# If no query, return all memories with neutral score
for memory in memories:
memory['relevance_score'] = 50
return memories
query_terms = set(query.lower().split())
for memory in memories:
score = 0
context = memory.get('context', '').lower()
tags = [tag.lower() for tag in memory.get('tags', [])]
# Score based on exact term matches in content
for term in query_terms:
# Higher weight for exact matches
if term in context:
score += context.count(term) * 10
# Medium weight for tag matches
if term in tags:
score += 15
# Lower weight for partial matches
for word in context.split():
if term in word and len(term) > 2:
score += 3
# Boost recent memories slightly
try:
created_at = datetime.fromisoformat(memory['created_at'].replace('Z', '+00:00'))
days_old = (datetime.now(created_at.tzinfo) - created_at).days
if days_old < 7:
score += 5
except:
pass
# Normalize score to 0-100 range
memory['relevance_score'] = min(100, max(0, score))
return memories