-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
213 lines (176 loc) · 7.45 KB
/
database.py
File metadata and controls
213 lines (176 loc) · 7.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
"""
PPT Translator — 数据库层 (SQLite)
表结构:
users — 用户账号(邮箱、密码哈希、是否验证、套餐)
email_tokens — 邮件验证 / 密码重置 token
usage_log — 翻译使用记录(注册用户 + 游客 IP)
"""
import sqlite3
from pathlib import Path
from datetime import datetime, timedelta, timezone
DB_PATH = Path(__file__).parent / "ppt_translator.db"
# ── 套餐限制 ─────────────────────────────────────────────────────────────────
PLANS = {
"guest": {
"translations_per_day": 1,
"max_slides": 10,
"period": "day",
},
"free": {
"translations_per_month": 10,
"max_slides": 30,
"period": "month",
},
"pro": {
"translations_per_month": 100,
"max_slides": 100,
"period": "month",
},
}
# ── 连接 ─────────────────────────────────────────────────────────────────────
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
return conn
# ── 初始化表 ─────────────────────────────────────────────────────────────────
def init_db():
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL COLLATE NOCASE,
password_hash TEXT NOT NULL,
is_verified INTEGER NOT NULL DEFAULT 0,
plan TEXT NOT NULL DEFAULT 'free',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS email_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE,
token TEXT UNIQUE NOT NULL,
type TEXT NOT NULL, -- 'verify' | 'reset'
expires_at TEXT NOT NULL,
used INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE IF NOT EXISTS usage_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER REFERENCES users(id) ON DELETE SET NULL,
ip_address TEXT,
slides_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_usage_user ON usage_log(user_id, created_at);
CREATE INDEX IF NOT EXISTS idx_usage_ip ON usage_log(ip_address, created_at);
CREATE INDEX IF NOT EXISTS idx_tokens_token ON email_tokens(token);
""")
conn.commit()
conn.close()
# ── 用户 CRUD ─────────────────────────────────────────────────────────────────
def create_user(email: str, password_hash: str) -> int:
"""创建用户,返回新用户 id。邮箱已存在时抛出 ValueError。"""
conn = get_db()
try:
cur = conn.execute(
"INSERT INTO users (email, password_hash) VALUES (?, ?)",
(email.lower().strip(), password_hash),
)
conn.commit()
return cur.lastrowid
except sqlite3.IntegrityError:
raise ValueError("该邮箱已注册")
finally:
conn.close()
def get_user_by_email(email: str) -> dict | None:
conn = get_db()
row = conn.execute(
"SELECT * FROM users WHERE email = ?", (email.lower().strip(),)
).fetchone()
conn.close()
return dict(row) if row else None
def get_user_by_id(user_id: int) -> dict | None:
conn = get_db()
row = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
return dict(row) if row else None
def verify_user(user_id: int):
conn = get_db()
conn.execute("UPDATE users SET is_verified = 1 WHERE id = ?", (user_id,))
conn.commit()
conn.close()
def update_password(user_id: int, new_hash: str):
conn = get_db()
conn.execute("UPDATE users SET password_hash = ? WHERE id = ?", (new_hash, user_id))
conn.commit()
conn.close()
# ── 邮件 Token ────────────────────────────────────────────────────────────────
def create_email_token(user_id: int, token: str, token_type: str, hours: int = 24):
"""创建邮件 token,同类型旧 token 自动作废。"""
expires_at = (datetime.now(timezone.utc) + timedelta(hours=hours)).strftime(
"%Y-%m-%d %H:%M:%S"
)
conn = get_db()
# 作废旧 token
conn.execute(
"UPDATE email_tokens SET used = 1 WHERE user_id = ? AND type = ? AND used = 0",
(user_id, token_type),
)
conn.execute(
"INSERT INTO email_tokens (user_id, token, type, expires_at) VALUES (?, ?, ?, ?)",
(user_id, token, token_type, expires_at),
)
conn.commit()
conn.close()
def consume_email_token(token: str, token_type: str) -> dict | None:
"""
验证 token 有效性并标记已使用。
返回关联的用户行,失败返回 None。
"""
conn = get_db()
row = conn.execute(
"""SELECT et.*, u.id AS uid FROM email_tokens et
JOIN users u ON u.id = et.user_id
WHERE et.token = ? AND et.type = ? AND et.used = 0
AND et.expires_at > datetime('now')""",
(token, token_type),
).fetchone()
if not row:
conn.close()
return None
conn.execute("UPDATE email_tokens SET used = 1 WHERE id = ?", (row["id"],))
conn.commit()
user = conn.execute("SELECT * FROM users WHERE id = ?", (row["uid"],)).fetchone()
conn.close()
return dict(user) if user else None
# ── 使用量统计 ────────────────────────────────────────────────────────────────
def log_usage(slides_count: int, user_id: int | None = None, ip_address: str | None = None):
conn = get_db()
conn.execute(
"INSERT INTO usage_log (user_id, ip_address, slides_count) VALUES (?, ?, ?)",
(user_id, ip_address, slides_count),
)
conn.commit()
conn.close()
def count_guest_today(ip_address: str) -> int:
"""游客今日翻译次数(按 IP)"""
conn = get_db()
row = conn.execute(
"""SELECT COUNT(*) AS cnt FROM usage_log
WHERE ip_address = ? AND user_id IS NULL
AND date(created_at) = date('now')""",
(ip_address,),
).fetchone()
conn.close()
return row["cnt"] if row else 0
def count_user_this_month(user_id: int) -> int:
"""注册用户本月翻译次数"""
conn = get_db()
row = conn.execute(
"""SELECT COUNT(*) AS cnt FROM usage_log
WHERE user_id = ?
AND strftime('%Y-%m', created_at) = strftime('%Y-%m', 'now')""",
(user_id,),
).fetchone()
conn.close()
return row["cnt"] if row else 0