-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathauth.py
More file actions
293 lines (251 loc) · 9.58 KB
/
auth.py
File metadata and controls
293 lines (251 loc) · 9.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
"""
认证和授权模块
Copyright (c) 2025 Miyang Tech (Zhuhai Hengqin) Co., Ltd.
MIT License
"""
import os
import secrets
import hashlib
import sqlite3
from datetime import datetime, timedelta
from typing import Optional, List, Dict
class AuthManager:
"""认证管理器"""
def __init__(self, db_path: str, config: dict):
self.db_path = db_path
self.config = config
# 确保目录存在
os.makedirs(os.path.dirname(db_path), exist_ok=True)
self._init_auth_tables()
self._ensure_default_admin()
def _get_conn(self) -> sqlite3.Connection:
"""获取数据库连接"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
return conn
def _init_auth_tables(self):
"""初始化认证相关表"""
conn = self._get_conn()
try:
cursor = conn.cursor()
# 管理员账号表
cursor.execute("""
CREATE TABLE IF NOT EXISTS admins (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TEXT,
last_login TEXT
)
""")
# API Token 表
cursor.execute("""
CREATE TABLE IF NOT EXISTS api_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
token TEXT UNIQUE NOT NULL,
created_at TEXT,
last_used TEXT,
is_active INTEGER DEFAULT 1
)
""")
# 会话表
cursor.execute("""
CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT UNIQUE NOT NULL,
admin_id INTEGER NOT NULL,
created_at TEXT,
expires_at TEXT,
FOREIGN KEY (admin_id) REFERENCES admins(id)
)
""")
conn.commit()
finally:
conn.close()
def _ensure_default_admin(self):
"""确保存在默认管理员"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM admins")
count = cursor.fetchone()[0]
if count == 0:
# 从配置读取默认密码,如果没有则生成随机密码
default_password = self.config.get("auth", {}).get("default_password", "")
if not default_password:
default_password = secrets.token_urlsafe(12)
print(f"\n🔐 已创建默认管理员账号:")
print(f" 用户名: admin")
print(f" 密码: {default_password}")
print(f" 请登录后立即修改密码!\n")
password_hash = self._hash_password(default_password)
cursor.execute("""
INSERT INTO admins (username, password_hash, created_at)
VALUES (?, ?, ?)
""", ("admin", password_hash, datetime.now().isoformat()))
conn.commit()
finally:
conn.close()
def _hash_password(self, password: str) -> str:
"""密码哈希"""
salt = self.config.get("auth", {}).get("salt", "mcp_template_salt")
return hashlib.sha256(f"{password}{salt}".encode()).hexdigest()
# ==================== 管理员认证 ====================
def verify_admin(self, username: str, password: str) -> Optional[int]:
"""验证管理员登录"""
password_hash = self._hash_password(password)
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM admins
WHERE username = ? AND password_hash = ?
""", (username, password_hash))
row = cursor.fetchone()
if row:
# 更新最后登录时间
cursor.execute("""
UPDATE admins SET last_login = ? WHERE id = ?
""", (datetime.now().isoformat(), row[0]))
conn.commit()
return row[0]
return None
finally:
conn.close()
def create_session(self, admin_id: int, expires_hours: int = 24) -> str:
"""创建会话"""
session_id = secrets.token_urlsafe(32)
expires_at = datetime.now() + timedelta(hours=expires_hours)
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sessions (session_id, admin_id, created_at, expires_at)
VALUES (?, ?, ?, ?)
""", (session_id, admin_id, datetime.now().isoformat(), expires_at.isoformat()))
conn.commit()
finally:
conn.close()
return session_id
def verify_session(self, session_id: str) -> Optional[int]:
"""验证会话"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT admin_id, expires_at FROM sessions
WHERE session_id = ?
""", (session_id,))
row = cursor.fetchone()
if row:
expires_at = datetime.fromisoformat(row[1])
if expires_at > datetime.now():
return row[0]
else:
# 过期,删除会话
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()
return None
finally:
conn.close()
def delete_session(self, session_id: str):
"""删除会话(登出)"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM sessions WHERE session_id = ?", (session_id,))
conn.commit()
finally:
conn.close()
def change_password(self, admin_id: int, new_password: str) -> bool:
"""修改密码"""
password_hash = self._hash_password(new_password)
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE admins SET password_hash = ? WHERE id = ?
""", (password_hash, admin_id))
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()
# ==================== API Token 管理 ====================
def create_api_token(self, name: str) -> str:
"""创建 API Token"""
token = f"mcp_{secrets.token_urlsafe(32)}"
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO api_tokens (name, token, created_at, is_active)
VALUES (?, ?, ?, 1)
""", (name, token, datetime.now().isoformat()))
conn.commit()
finally:
conn.close()
return token
def verify_api_token(self, token: str) -> bool:
"""验证 API Token"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM api_tokens
WHERE token = ? AND is_active = 1
""", (token,))
row = cursor.fetchone()
if row:
# 更新最后使用时间
cursor.execute("""
UPDATE api_tokens SET last_used = ? WHERE id = ?
""", (datetime.now().isoformat(), row[0]))
conn.commit()
return True
return False
finally:
conn.close()
def list_api_tokens(self) -> List[Dict]:
"""列出所有 API Token"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, token, created_at, last_used, is_active
FROM api_tokens ORDER BY created_at DESC
""")
rows = cursor.fetchall()
return [{
"id": row[0],
"name": row[1],
"token": row[2][:12] + "..." if row[2] else "",
"token_full": row[2],
"created_at": row[3],
"last_used": row[4],
"is_active": bool(row[5])
} for row in rows]
finally:
conn.close()
def revoke_api_token(self, token_id: int) -> bool:
"""撤销 API Token"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE api_tokens SET is_active = 0 WHERE id = ?
""", (token_id,))
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()
def delete_api_token(self, token_id: int) -> bool:
"""删除 API Token"""
conn = self._get_conn()
try:
cursor = conn.cursor()
cursor.execute("DELETE FROM api_tokens WHERE id = ?", (token_id,))
conn.commit()
return cursor.rowcount > 0
finally:
conn.close()