Skip to content

Commit 7065239

Browse files
committed
SSH重构: 引入IDataStore机制(分离本地数据与远端数据)
Signed-off-by: wuqiongjin <suchinfinity@qq.com>
1 parent a1b1d0a commit 7065239

3 files changed

Lines changed: 368 additions & 165 deletions

File tree

src/store/data_store.py

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import os
2+
from pathlib import Path
3+
from typing import Protocol
4+
5+
from src.extend.ssh_client import SshClient, SshConfig
6+
from src.utils.utils import Utils
7+
from src.utils.const import AppPath, Key
8+
9+
10+
class IDataStore(Protocol):
11+
def read_config(self) -> dict: ...
12+
def write_config(self, config: dict) -> bool: ...
13+
14+
def read_tasks(self): ...
15+
def write_tasks(self, tasks) -> bool: ...
16+
17+
def read_runner_result(self) -> dict: ...
18+
def write_runner_result(self, payload: dict) -> bool: ...
19+
20+
def sync_file(self, local_path: str, remote_filename: str) -> bool: ...
21+
22+
23+
class LocalDataStore:
24+
def __init__(self):
25+
pass
26+
27+
def read_config(self) -> dict:
28+
data = Utils.read_dict_from_json(AppPath.DataJson)
29+
return data if isinstance(data, dict) else {}
30+
31+
def write_config(self, config: dict) -> bool:
32+
try:
33+
Utils.write_dict_to_file(AppPath.DataJson, config if isinstance(config, dict) else {})
34+
return True
35+
except Exception:
36+
return False
37+
38+
def read_tasks(self):
39+
data = Utils.read_dict_from_json(AppPath.TasksJson)
40+
return data if isinstance(data, list) else ([] if data is None else data)
41+
42+
def write_tasks(self, tasks) -> bool:
43+
try:
44+
Utils.write_dict_to_file(AppPath.TasksJson, tasks)
45+
return True
46+
except Exception:
47+
return False
48+
49+
def read_runner_result(self) -> dict:
50+
data = Utils.read_dict_from_json(AppPath.RunnerResultJson)
51+
return data if isinstance(data, dict) else {}
52+
53+
def write_runner_result(self, payload: dict) -> bool:
54+
try:
55+
Utils.write_dict_to_file(AppPath.RunnerResultJson, payload if isinstance(payload, dict) else {})
56+
return True
57+
except Exception:
58+
return False
59+
60+
def sync_file(self, local_path: str, remote_filename: str) -> bool:
61+
return True
62+
63+
64+
class RemoteDataStore:
65+
def __init__(
66+
self,
67+
ssh_cfg: SshConfig,
68+
host: str,
69+
local_data_root: str,
70+
remote_app_root_override: str | None,
71+
):
72+
self._ssh_cfg = ssh_cfg
73+
self._host = str(host or "").strip()
74+
self._local_data_root = str(local_data_root)
75+
self._remote_app_root_override = str(remote_app_root_override or "").strip()
76+
77+
self.remote_home_dir: str | None = None
78+
self.remote_app_root_abs: str | None = None
79+
self.remote_data_root_abs: str | None = None
80+
self.cache_data_root: str | None = None
81+
82+
@staticmethod
83+
def _default_config() -> dict:
84+
return {
85+
Key.UserName: "",
86+
Key.UserPassword: "",
87+
Key.DriverPath: "",
88+
Key.CaptchaRetryTimes: 5,
89+
Key.CaptchaToleranceAngle: 5,
90+
Key.AlwaysRetry: False,
91+
Key.ShowWebPage: False,
92+
Key.NotificationEmail: "",
93+
Key.SendEmailWhenSuccess: False,
94+
Key.SendEmailWhenFailed: False,
95+
Key.LinuxDisplay: ":0",
96+
Key.CheckLinuxCredentialsOnPlanCreate: True,
97+
}
98+
99+
@staticmethod
100+
def _ssh_keys() -> set[str]:
101+
return {
102+
Key.SshEnabled,
103+
Key.SshHost,
104+
Key.SshUsername,
105+
Key.SshPassword,
106+
Key.SshUsePrivateKey,
107+
Key.SshPrivateKeyPath,
108+
Key.SshServerPlatform,
109+
Key.SshRemoteAppRoot,
110+
}
111+
112+
@staticmethod
113+
def _strip_ssh_keys(data: dict) -> dict:
114+
if not isinstance(data, dict):
115+
return {}
116+
cleaned = dict(data)
117+
for k in RemoteDataStore._ssh_keys():
118+
cleaned.pop(k, None)
119+
return cleaned
120+
121+
def bootstrap(self) -> tuple[bool, str | None]:
122+
try:
123+
cache_root = Path(self._local_data_root).parent / "remote_cache" / Utils.replace_signs(self._host)
124+
cache_data_root = cache_root / "data"
125+
cache_data_root.mkdir(parents=True, exist_ok=True)
126+
self.cache_data_root = str(cache_data_root)
127+
128+
def _ensure_json_file(local_target: Path, default_obj, expected_type) -> tuple[bool, bool, str | None]:
129+
try:
130+
need_upload = False
131+
if not local_target.exists() or local_target.stat().st_size == 0:
132+
Utils.write_dict_to_file(str(local_target), default_obj)
133+
return True, True, None
134+
135+
data_any = Utils.read_dict_from_json(str(local_target))
136+
if not isinstance(data_any, expected_type):
137+
Utils.write_dict_to_file(str(local_target), default_obj)
138+
need_upload = True
139+
return True, need_upload, None
140+
except Exception as e:
141+
return False, False, str(e)
142+
143+
with SshClient(self._ssh_cfg) as ssh:
144+
code, home_out, home_err = ssh.exec("echo $HOME", timeout_sec=5)
145+
home_dir = (home_out or "").strip()
146+
if code != 0 or not home_dir.startswith("/"):
147+
return False, (home_err or home_out or "无法获取远端 HOME 目录").strip()
148+
self.remote_home_dir = home_dir
149+
150+
if self._remote_app_root_override:
151+
if not self._remote_app_root_override.startswith("/"):
152+
return False, f"远端AppRoot必须为绝对路径(以/开头):{self._remote_app_root_override}"
153+
remote_app_root_abs = self._remote_app_root_override.rstrip("/")
154+
else:
155+
script = "sh -lc 'base=\"${XDG_DATA_HOME:-$HOME/.local/share}\"; echo \"${base}/auto-clock\"'"
156+
code, out, err = ssh.exec(script, timeout_sec=5)
157+
remote_app_root_abs = (out or "").strip().rstrip("/")
158+
if code != 0 or not remote_app_root_abs.startswith("/"):
159+
msg = (err or out or "").strip() or "无法解析远端 AppRoot"
160+
return False, msg
161+
162+
self.remote_app_root_abs = remote_app_root_abs
163+
self.remote_data_root_abs = f"{remote_app_root_abs}/data"
164+
165+
sftp = ssh.sftp()
166+
downloaded_data_json = False
167+
for name in ["data.json", "tasks.json", "runner_result.json"]:
168+
try:
169+
local_target = cache_data_root / name
170+
sftp.get(f"{self.remote_data_root_abs}/{name}", str(local_target))
171+
if name == "data.json":
172+
downloaded_data_json = True
173+
if local_target.exists() and local_target.stat().st_size == 0:
174+
return False, f"下载远端文件为空:{name},本地缓存:{local_target}"
175+
except FileNotFoundError:
176+
try:
177+
local_target = cache_data_root / name
178+
if local_target.exists():
179+
local_target.unlink()
180+
except Exception:
181+
pass
182+
except Exception as e:
183+
return False, f"下载远端文件失败:{name},错误:{e}"
184+
185+
base_data = {}
186+
cache_data_json = cache_data_root / "data.json"
187+
if downloaded_data_json and cache_data_json.exists() and cache_data_json.stat().st_size > 0:
188+
data_any = Utils.read_dict_from_json(str(cache_data_json))
189+
if isinstance(data_any, dict):
190+
base_data.update(self._strip_ssh_keys(data_any))
191+
192+
defaults = self._default_config()
193+
for k, v in defaults.items():
194+
if k not in base_data:
195+
base_data[k] = v
196+
197+
init_targets: list[tuple[str, Path]] = []
198+
199+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "data.json", base_data, dict)
200+
if not ok3:
201+
return False, f"初始化本地缓存文件失败:data.json,错误:{err3}"
202+
if need_up:
203+
init_targets.append(("data.json", cache_data_root / "data.json"))
204+
205+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "tasks.json", [], list)
206+
if not ok3:
207+
return False, f"初始化本地缓存文件失败:tasks.json,错误:{err3}"
208+
if need_up:
209+
init_targets.append(("tasks.json", cache_data_root / "tasks.json"))
210+
211+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "runner_result.json", {}, dict)
212+
if not ok3:
213+
return False, f"初始化本地缓存文件失败:runner_result.json,错误:{err3}"
214+
if need_up:
215+
init_targets.append(("runner_result.json", cache_data_root / "runner_result.json"))
216+
217+
for remote_name, local_path in init_targets:
218+
try:
219+
ssh.upload_file(str(local_path), f"{self.remote_data_root_abs}/{remote_name}")
220+
except Exception as e:
221+
return False, f"初始化远端文件失败:{remote_name},错误:{e}"
222+
223+
return True, None
224+
except Exception as e:
225+
return False, str(e)
226+
227+
def read_config(self) -> dict:
228+
data = Utils.read_dict_from_json(AppPath.DataJson)
229+
return data if isinstance(data, dict) else {}
230+
231+
def write_config(self, config: dict) -> bool:
232+
try:
233+
Utils.write_dict_to_file(AppPath.DataJson, self._strip_ssh_keys(config if isinstance(config, dict) else {}))
234+
self.sync_file(AppPath.DataJson, "data.json")
235+
return True
236+
except Exception:
237+
return False
238+
239+
def read_tasks(self):
240+
data = Utils.read_dict_from_json(AppPath.TasksJson)
241+
return data if isinstance(data, list) else ([] if data is None else data)
242+
243+
def write_tasks(self, tasks) -> bool:
244+
try:
245+
Utils.write_dict_to_file(AppPath.TasksJson, tasks)
246+
self.sync_file(AppPath.TasksJson, "tasks.json")
247+
return True
248+
except Exception:
249+
return False
250+
251+
def read_runner_result(self) -> dict:
252+
data = Utils.read_dict_from_json(AppPath.RunnerResultJson)
253+
return data if isinstance(data, dict) else {}
254+
255+
def write_runner_result(self, payload: dict) -> bool:
256+
try:
257+
Utils.write_dict_to_file(AppPath.RunnerResultJson, payload if isinstance(payload, dict) else {})
258+
self.sync_file(AppPath.RunnerResultJson, "runner_result.json")
259+
return True
260+
except Exception:
261+
return False
262+
263+
def sync_file(self, local_path: str, remote_filename: str) -> bool:
264+
try:
265+
if not self.remote_data_root_abs:
266+
return False
267+
if not os.path.exists(local_path):
268+
return False
269+
remote_path = f"{self.remote_data_root_abs}/{remote_filename}"
270+
with SshClient(self._ssh_cfg) as ssh:
271+
ssh.upload_file(local_path, remote_path)
272+
return True
273+
except Exception:
274+
return False

0 commit comments

Comments
 (0)