-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
375 lines (329 loc) · 16.4 KB
/
database.py
File metadata and controls
375 lines (329 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
import asyncio
import asyncpg
import logging
import os
import time
from datetime import datetime, timezone
from typing import Optional, Dict, List, Any
from urllib.parse import urlparse
logger = logging.getLogger('SecurePathBot.Database')
class DatabaseManager:
def __init__(self):
self.pool: Optional[asyncpg.Pool] = None
self.database_url = os.environ.get('DATABASE_URL')
async def connect(self):
"""Initialize database connection pool"""
if not self.database_url:
logger.error("DATABASE_URL not found in environment variables")
return False
try:
# Parse the database URL for asyncpg
parsed = urlparse(self.database_url)
# Create connection pool
self.pool = await asyncpg.create_pool(
host=parsed.hostname,
port=parsed.port,
user=parsed.username,
password=parsed.password,
database=parsed.path[1:], # Remove leading slash
ssl='require',
min_size=1,
max_size=3,
command_timeout=60
)
logger.info("Database connection pool created successfully")
await self.init_tables()
return True
except Exception as e:
logger.error(f"Failed to connect to database: {e}")
return False
async def disconnect(self):
"""Close database connection pool"""
if self.pool:
await self.pool.close()
logger.info("Database connection pool closed")
async def init_tables(self):
"""Create database tables if they don't exist"""
try:
async with self.pool.acquire() as conn:
# Usage tracking table
await conn.execute('''
CREATE TABLE IF NOT EXISTS usage_tracking (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
username VARCHAR(255),
command VARCHAR(50) NOT NULL,
model VARCHAR(50) NOT NULL,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cached_tokens INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
cost DECIMAL(10, 8) DEFAULT 0,
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
guild_id BIGINT,
channel_id BIGINT
)
''')
# Create indexes for better performance
await conn.execute('CREATE INDEX IF NOT EXISTS idx_usage_user_id ON usage_tracking(user_id)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_usage_timestamp ON usage_tracking(timestamp)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_usage_command ON usage_tracking(command)')
# Daily usage summary table
await conn.execute('''
CREATE TABLE IF NOT EXISTS daily_usage_summary (
id SERIAL PRIMARY KEY,
date DATE NOT NULL,
total_requests INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
total_cost DECIMAL(10, 6) DEFAULT 0,
unique_users INTEGER DEFAULT 0,
top_command VARCHAR(50),
UNIQUE(date)
)
''')
# User analytics table
await conn.execute('''
CREATE TABLE IF NOT EXISTS user_analytics (
user_id BIGINT PRIMARY KEY,
username VARCHAR(255),
first_interaction TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_interaction TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
total_requests INTEGER DEFAULT 0,
total_tokens INTEGER DEFAULT 0,
total_cost DECIMAL(10, 6) DEFAULT 0,
favorite_command VARCHAR(50),
avg_tokens_per_request DECIMAL(8, 2) DEFAULT 0
)
''')
# User queries table for storing all user inputs
await conn.execute('''
CREATE TABLE IF NOT EXISTS user_queries (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
username VARCHAR(255),
command VARCHAR(50) NOT NULL,
query_text TEXT NOT NULL,
channel_id BIGINT,
guild_id BIGINT,
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
response_generated BOOLEAN DEFAULT FALSE,
error_occurred BOOLEAN DEFAULT FALSE
)
''')
# Create indexes for better performance on queries table
await conn.execute('CREATE INDEX IF NOT EXISTS idx_queries_user_id ON user_queries(user_id)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_queries_timestamp ON user_queries(timestamp)')
await conn.execute('CREATE INDEX IF NOT EXISTS idx_queries_command ON user_queries(command)')
logger.info("Database tables initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize database tables: {e}")
raise
async def log_usage(self, user_id: int, username: str, command: str, model: str,
input_tokens: int = 0, output_tokens: int = 0, cached_tokens: int = 0,
cost: float = 0.0, guild_id: Optional[int] = None,
channel_id: Optional[int] = None) -> bool:
"""Log a usage event to the database"""
try:
async with self.pool.acquire() as conn:
total_tokens = input_tokens + output_tokens + cached_tokens
# Insert usage record
await conn.execute('''
INSERT INTO usage_tracking
(user_id, username, command, model, input_tokens, output_tokens,
cached_tokens, total_tokens, cost, guild_id, channel_id)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
''', user_id, username, command, model, input_tokens, output_tokens,
cached_tokens, total_tokens, cost, guild_id, channel_id)
# Update user analytics
await conn.execute('''
INSERT INTO user_analytics
(user_id, username, last_interaction, total_requests, total_tokens, total_cost)
VALUES ($1, $2, NOW(), 1, $3, $4)
ON CONFLICT (user_id)
DO UPDATE SET
username = EXCLUDED.username,
last_interaction = NOW(),
total_requests = user_analytics.total_requests + 1,
total_tokens = user_analytics.total_tokens + EXCLUDED.total_tokens,
total_cost = user_analytics.total_cost + EXCLUDED.total_cost
''', user_id, username, total_tokens, cost)
# Update daily summary
today = datetime.now(timezone.utc).date()
await conn.execute('''
INSERT INTO daily_usage_summary (date, total_requests, total_tokens, total_cost, unique_users)
VALUES ($1, 1, $2, $3, 1)
ON CONFLICT (date)
DO UPDATE SET
total_requests = daily_usage_summary.total_requests + 1,
total_tokens = daily_usage_summary.total_tokens + EXCLUDED.total_tokens,
total_cost = daily_usage_summary.total_cost + EXCLUDED.total_cost
''', today, total_tokens, cost)
return True
except Exception as e:
logger.error(f"Failed to log usage: {e}")
return False
async def log_user_query(self, user_id: int, username: str, command: str,
query_text: str, channel_id: Optional[int] = None,
guild_id: Optional[int] = None, response_generated: bool = False,
error_occurred: bool = False) -> bool:
"""Log a user query to the database"""
try:
async with self.pool.acquire() as conn:
await conn.execute('''
INSERT INTO user_queries
(user_id, username, command, query_text, channel_id, guild_id, response_generated, error_occurred)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
''', user_id, username, command, query_text, channel_id, guild_id, response_generated, error_occurred)
return True
except Exception as e:
logger.error(f"Failed to log user query: {e}")
return False
async def get_user_stats(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Get comprehensive stats for a specific user"""
try:
async with self.pool.acquire() as conn:
# Get user analytics
user_data = await conn.fetchrow('''
SELECT * FROM user_analytics WHERE user_id = $1
''', user_id)
if not user_data:
return None
# Get command usage breakdown
command_stats = await conn.fetch('''
SELECT command, COUNT(*) as count, SUM(total_tokens) as tokens, SUM(cost) as cost
FROM usage_tracking
WHERE user_id = $1
GROUP BY command
ORDER BY count DESC
''', user_id)
# Get recent activity (last 7 days)
recent_activity = await conn.fetch('''
SELECT DATE(timestamp) as date, COUNT(*) as requests, SUM(cost) as daily_cost
FROM usage_tracking
WHERE user_id = $1 AND timestamp >= NOW() - INTERVAL '7 days'
GROUP BY DATE(timestamp)
ORDER BY date DESC
''', user_id)
return {
'user_data': dict(user_data),
'command_stats': [dict(row) for row in command_stats],
'recent_activity': [dict(row) for row in recent_activity]
}
except Exception as e:
logger.error(f"Failed to get user stats: {e}")
return None
async def get_global_stats(self) -> Optional[Dict[str, Any]]:
"""Get global usage statistics"""
try:
async with self.pool.acquire() as conn:
# Overall stats
overall = await conn.fetchrow('''
SELECT
COUNT(*) as total_requests,
COUNT(DISTINCT user_id) as unique_users,
SUM(total_tokens) as total_tokens,
SUM(cost) as total_cost,
AVG(total_tokens) as avg_tokens_per_request
FROM usage_tracking
''')
# Top users
top_users = await conn.fetch('''
SELECT username, total_requests, total_cost, total_tokens
FROM user_analytics
ORDER BY total_requests DESC
LIMIT 10
''')
# Most used commands
top_commands = await conn.fetch('''
SELECT command, COUNT(*) as usage_count, SUM(cost) as total_cost
FROM usage_tracking
GROUP BY command
ORDER BY usage_count DESC
''')
# Daily stats for last 7 days
daily_stats = await conn.fetch('''
SELECT date, total_requests, total_cost, unique_users
FROM daily_usage_summary
WHERE date >= CURRENT_DATE - INTERVAL '7 days'
ORDER BY date DESC
''')
return {
'overall': dict(overall),
'top_users': [dict(row) for row in top_users],
'top_commands': [dict(row) for row in top_commands],
'daily_stats': [dict(row) for row in daily_stats]
}
except Exception as e:
logger.error(f"Failed to get global stats: {e}")
return None
async def get_costs_by_model(self) -> Optional[Dict[str, Any]]:
"""Get cost breakdown by model"""
try:
async with self.pool.acquire() as conn:
model_costs = await conn.fetch('''
SELECT
model,
COUNT(*) as requests,
SUM(input_tokens) as input_tokens,
SUM(output_tokens) as output_tokens,
SUM(cached_tokens) as cached_tokens,
SUM(cost) as total_cost,
AVG(cost) as avg_cost_per_request
FROM usage_tracking
GROUP BY model
ORDER BY total_cost DESC
''')
return {
'model_costs': [dict(row) for row in model_costs]
}
except Exception as e:
logger.error(f"Failed to get model costs: {e}")
return None
async def get_query_analytics(self) -> Optional[Dict[str, Any]]:
"""Get analytics on user queries and popular topics"""
try:
async with self.pool.acquire() as conn:
# Most common query topics (using simple keyword analysis)
popular_queries = await conn.fetch('''
SELECT
query_text,
command,
COUNT(*) as frequency,
username
FROM user_queries
WHERE timestamp >= NOW() - INTERVAL '7 days'
ORDER BY frequency DESC
LIMIT 20
''')
# Query patterns by command
command_patterns = await conn.fetch('''
SELECT
command,
COUNT(*) as total_queries,
COUNT(DISTINCT user_id) as unique_users,
AVG(LENGTH(query_text)) as avg_query_length
FROM user_queries
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY command
ORDER BY total_queries DESC
''')
# Most active query times
hourly_activity = await conn.fetch('''
SELECT
EXTRACT(HOUR FROM timestamp) as hour,
COUNT(*) as query_count
FROM user_queries
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY EXTRACT(HOUR FROM timestamp)
ORDER BY query_count DESC
''')
return {
'popular_queries': [dict(row) for row in popular_queries],
'command_patterns': [dict(row) for row in command_patterns],
'hourly_activity': [dict(row) for row in hourly_activity]
}
except Exception as e:
logger.error(f"Failed to get query analytics: {e}")
return None
# Global database manager instance
db_manager = DatabaseManager()