-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
230 lines (211 loc) · 8.87 KB
/
main.py
File metadata and controls
230 lines (211 loc) · 8.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import os
import vdf
import time
import winreg
import argparse
import requests
import traceback
import subprocess
import requests
import yaml
import colorlog
import logging
from pathlib import Path
from multiprocessing.pool import ThreadPool
from multiprocessing.dummy import Pool, Lock
from requests.packages import urllib3
urllib3.disable_warnings()
def init_log():
logger = logging.getLogger('Onekey')
logger.setLevel(logging.DEBUG)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
fmt_string = '%(log_color)s[%(name)s][%(levelname)s]%(message)s'
# black red green yellow blue purple cyan 和 white
log_colors = {
'DEBUG': 'white',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'purple'
}
fmt = colorlog.ColoredFormatter(fmt_string, log_colors=log_colors)
stream_handler.setFormatter(fmt)
logger.addHandler(stream_handler)
return logger
log = init_log()
date = time.strftime('%Y年%m月%d日 %H时%M分')
print('\033[1;32;40m _____ __ _ _____ _ _ _____ __ __ \033[0m')
print('\033[1;32;40m / _ \ | \ | | | ____| | | / / | ____| \ \ / /\033[0m')
print('\033[1;32;40m | | | | | \| | | |__ | |/ / | |__ \ \/ /\033[0m')
print('\033[1;32;40m | | | | | |\ | | __| | |\ \ | __| \ /')
print('\033[1;32;40m | |_| | | | \ | | |___ | | \ \ | |___ / /\033[0m')
print('\033[1;32;40m \_____/ |_| \_| |_____| |_| \_\ |_____| /_/\033[0m')
print(f'\033[1;31;40m欢迎使用, 春节将至, 现在是{date}\033[0m')
print('\033[1;32;40m作者ikun\033[0m')
print('\033[1;32;40m当前版本13.1\033[0m')
print('\033[5;31;40m沧海吃老子饭还骂上老子了,光荣榜第一位\033[0m')
print('\033[1;32;40m温馨提示:App ID可以在Steam商店页面或SteamDB找到\033[0m')
default = {
'github_persoal_token': '' ,
'github_persoal_token_example': 'Bearer 你生成的Github个人访问Token',
'customize_steam_path': '',
'customize_steam_path_example': '填写Steam路径,一般为自动获取,如:C:/Program Files(x86)/steam',
}
def gen_config():
with open("./appsettings.yaml", "w", encoding="utf-8") as f:
f.write(yaml.dump(default, allow_unicode=True))
f.close()
if (not os.getenv('build')):
log.warning('首次启动或配置文件被删除,已创建默认配置文件')
return gen_config
def load_config():
if os.path.exists('appsettings.yaml'):
with open('appsettings.yaml', 'r', encoding="utf-8") as config_file:
config = yaml.safe_load(config_file)
else:
gen_config()
with open('appsettings.yaml', 'r', encoding="utf-8") as config_file:
config = yaml.safe_load(config_file)
return config
lock = Lock()
def get(branch, path):
url_list = [f'https://github.moeyy.xyz/https://raw.githubusercontent.com/{repo}/{branch}/{path}',
f'https://raw.gitmirror.com/{repo}/{branch}/{path}',
f'https://fastly.jsdelivr.net/gh/{repo}@{branch}/{path}']
retry = 3
while True:
for url in url_list:
try:
r = requests.get(url,verify=False)
if r.status_code == 200:
return r.content
except requests.exceptions.ConnectionError:
log.error(f'获取失败: {path}')
retry -= 1
if not retry:
log.warning(f'超过最大重试次数: {path}')
raise
def get_manifest(branch, path, steam_path: Path, app_id=None):
try:
if path.endswith('.manifest'):
depot_cache_path = steam_path / 'depotcache'
with lock:
if not depot_cache_path.exists():
depot_cache_path.mkdir(exist_ok=True)
save_path = depot_cache_path / path
if save_path.exists():
with lock:
log.warning(f'已存在清单: {path}')
return
content = get(branch, path)
with lock:
log.info(f'清单下载成功: {path}')
with save_path.open('wb') as f:
f.write(content)
if path.endswith('.vdf') and path not in ['appinfo.vdf']:
if path == 'config.vdf' or 'Key.vdf':
content = get(branch, path)
with lock:
log.info(f'密钥下载成功: {path}')
depots_config = vdf.loads(content.decode(encoding='utf-8'))
if stool_add(
[(depot_id, '1', depots_config['depots'][depot_id]['DecryptionKey'])
for depot_id in depots_config['depots']]):
log.info('导入Steamtools Depot成功')
except KeyboardInterrupt:
raise
except:
traceback.print_exc()
raise
return True
def stool_add(depot_list):
steam_path = get_steam_path()
lua_content = ""
for depot_id, type_, depot_key in depot_list:
lua_content += f"""addappid({depot_id}, {type_}, "{depot_key}")"""
lua_filename = f"Onekey_unlock_{depot_id}.lua"
lua_filepath = steam_path / "config" / "stplug-in" / lua_filename
with open(lua_filepath, "w", encoding="utf-8") as lua_file:
lua_file.write(lua_content)
luapacka_path = steam_path / "config" / "stplug-in" / "luapacka.exe"
subprocess.run([str(luapacka_path), str(lua_filepath)])
os.remove(lua_filepath)
return True
def get_steam_path():
config = load_config()
customize_steam_path = config.get('customize_steam_path', '')
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Valve\Steam')
steam_path = Path(winreg.QueryValueEx(key, 'SteamPath')[0]) or customize_steam_path
return steam_path
def check_github_api_rate_limit():
config = load_config()
github_persoal_token = config.get('github_persoal_token', '')
headers = {'Authorization': f'{github_persoal_token}'}
url = 'https://api.github.com/rate_limit'
r = requests.get(url, headers=headers, verify=False)
if r.status_code == 200:
rate_limit = r.json()['rate']
remaining_requests = rate_limit['remaining']
reset_time = rate_limit['reset']
reset_time_formatted = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(reset_time))
log.info(f'剩余请求次数: {remaining_requests}')
if remaining_requests == 0:
log.warning(f'GitHub API 请求数已用尽,将在 {reset_time_formatted} 重置')
return True
def main(app_id):
app_id_list = list(filter(str.isdecimal, app_id.strip().split('-')))
app_id = app_id_list[0]
config = load_config()
github_persoal_token = config.get('github_persoal_token', '')
headers = {'Authorization': f'{github_persoal_token}'}
url = f'https://api.github.com/repos/{repo}/branches/{app_id}'
try:
r = requests.get(url, verify=False, headers=headers)
if 'commit' in r.json():
branch = r.json()['name']
url = r.json()['commit']['commit']['tree']['url']
date = r.json()['commit']['commit']['author']['date']
r = requests.get(url,verify=False)
if 'tree' in r.json():
stool_add([(app_id, 1, "None")])
result_list = []
with Pool(32) as pool:
pool: ThreadPool
for i in r.json()['tree']:
result_list.append(pool.apply_async(get_manifest, (branch, i['path'], get_steam_path(), app_id)))
try:
while pool._state == 'RUN':
if all([result.ready() for result in result_list]):
break
time.sleep(0.1)
except KeyboardInterrupt:
with lock:
pool.terminate()
raise
if all([result.successful() for result in result_list]):
log.info(f'清单最新更新时间:{date}')
log.info(f'入库成功: {app_id}')
log.info('重启steam生效')
return True
except KeyboardInterrupt:
exit()
except requests.exceptions.RequestException as e:
log.error(f"An error occurred: {e}")
log.error(f'入库失败: {app_id},清单库中可能暂未收录该游戏')
return False
parser = argparse.ArgumentParser()
parser.add_argument('-a', '--app-id')
parser.add_argument('-r', '--repo', default='Auiowu/ManifestAutoUpdate')
args = parser.parse_args()
repo = args.repo
if __name__ == '__main__':
try:
load_config()
main(args.app_id or input('需要入库的App ID: '))
except KeyboardInterrupt:
exit()
except:
traceback.print_exc()
if not args.app_id:
os.system('pause')