-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
99 lines (89 loc) · 2.76 KB
/
database.py
File metadata and controls
99 lines (89 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import os
import sqlite3
import json
DB_PATH = "tasks.db"
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS download_tasks (
id TEXT PRIMARY KEY,
original_link TEXT,
media_info TEXT, -- JSON string
save_name TEXT,
status TEXT, -- pending, downloading, completed, failed
progress INTEGER DEFAULT 0,
speed TEXT DEFAULT '',
error_msg TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# 解析历史表 (可选)
cursor.execute('''
CREATE TABLE IF NOT EXISTS parse_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
link TEXT,
media_count INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def add_task(task_id, link, media_info, save_name):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO download_tasks (id, original_link, media_info, save_name, status)
VALUES (?, ?, ?, ?, 'pending')
''', (task_id, link, json.dumps(media_info), save_name))
conn.commit()
conn.close()
def update_task_progress(task_id, progress, speed, status='downloading'):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
UPDATE download_tasks
SET progress = ?, speed = ?, status = ?
WHERE id = ?
''', (progress, speed, status, task_id))
conn.commit()
conn.close()
def update_task_status(task_id, status, error_msg=''):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('''
UPDATE download_tasks
SET status = ?, error_msg = ?
WHERE id = ?
''', (status, error_msg, task_id))
conn.commit()
conn.close()
def clear_all_tasks():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute('DELETE FROM download_tasks')
conn.commit()
conn.close()
def get_all_tasks():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute('SELECT * FROM download_tasks ORDER BY created_at DESC')
rows = cursor.fetchall()
conn.close()
tasks = []
for row in rows:
task = dict(row)
task['media_info'] = json.loads(task['media_info'])
tasks.append(task)
return tasks
def clear_completed_tasks():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM download_tasks WHERE status IN ('completed', 'failed')")
conn.commit()
conn.close()
if __name__ == "__main__":
init_db()
print("Database initialized.")