Skip to content

Commit f0389ab

Browse files
committed
SSH重构: 引入IDataStore机制(分离本地数据与远端数据)
Signed-off-by: wuqiongjin <suchinfinity@qq.com>
1 parent a1b1d0a commit f0389ab

3 files changed

Lines changed: 370 additions & 149 deletions

File tree

src/store/data_store.py

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
import os
2+
import re
3+
from pathlib import Path
4+
from typing import Protocol
5+
6+
from src.extend.ssh_client import SshClient, SshConfig
7+
from src.utils.utils import Utils
8+
from src.utils.const import AppPath, Key
9+
10+
11+
class IDataStore(Protocol):
12+
def read_config(self) -> dict: ...
13+
def write_config(self, config: dict) -> bool: ...
14+
15+
def read_tasks(self): ...
16+
def write_tasks(self, tasks) -> bool: ...
17+
18+
def read_runner_result(self) -> dict: ...
19+
def write_runner_result(self, payload: dict) -> bool: ...
20+
21+
def sync_file(self, local_path: str, remote_filename: str) -> bool: ...
22+
23+
24+
class LocalDataStore:
25+
def __init__(self):
26+
pass
27+
28+
def read_config(self) -> dict:
29+
data = Utils.read_dict_from_json(AppPath.DataJson)
30+
return data if isinstance(data, dict) else {}
31+
32+
def write_config(self, config: dict) -> bool:
33+
try:
34+
Utils.write_dict_to_file(AppPath.DataJson, config if isinstance(config, dict) else {})
35+
return True
36+
except Exception:
37+
return False
38+
39+
def read_tasks(self):
40+
data = Utils.read_dict_from_json(AppPath.TasksJson)
41+
return data if isinstance(data, list) else ([] if data is None else data)
42+
43+
def write_tasks(self, tasks) -> bool:
44+
try:
45+
Utils.write_dict_to_file(AppPath.TasksJson, tasks)
46+
return True
47+
except Exception:
48+
return False
49+
50+
def read_runner_result(self) -> dict:
51+
data = Utils.read_dict_from_json(AppPath.RunnerResultJson)
52+
return data if isinstance(data, dict) else {}
53+
54+
def write_runner_result(self, payload: dict) -> bool:
55+
try:
56+
Utils.write_dict_to_file(AppPath.RunnerResultJson, payload if isinstance(payload, dict) else {})
57+
return True
58+
except Exception:
59+
return False
60+
61+
def sync_file(self, local_path: str, remote_filename: str) -> bool:
62+
return True
63+
64+
65+
class RemoteDataStore:
66+
def __init__(
67+
self,
68+
ssh_cfg: SshConfig,
69+
host: str,
70+
local_data_root: str,
71+
remote_app_root_override: str | None,
72+
):
73+
self._ssh_cfg = ssh_cfg
74+
self._host = str(host or "").strip()
75+
self._local_data_root = str(local_data_root)
76+
self._remote_app_root_override = str(remote_app_root_override or "").strip()
77+
78+
self.remote_home_dir: str | None = None
79+
self.remote_app_root_abs: str | None = None
80+
self.remote_data_root_abs: str | None = None
81+
self.cache_data_root: str | None = None
82+
83+
@staticmethod
84+
def _default_config() -> dict:
85+
return {
86+
Key.UserName: "",
87+
Key.UserPassword: "",
88+
Key.DriverPath: "",
89+
Key.CaptchaRetryTimes: 5,
90+
Key.CaptchaToleranceAngle: 5,
91+
Key.AlwaysRetry: False,
92+
Key.ShowWebPage: False,
93+
Key.NotificationEmail: "",
94+
Key.SendEmailWhenSuccess: False,
95+
Key.SendEmailWhenFailed: False,
96+
Key.LinuxDisplay: ":0",
97+
Key.CheckLinuxCredentialsOnPlanCreate: True,
98+
}
99+
100+
@staticmethod
101+
def _ssh_keys() -> set[str]:
102+
return {
103+
Key.SshEnabled,
104+
Key.SshHost,
105+
Key.SshUsername,
106+
Key.SshPassword,
107+
Key.SshUsePrivateKey,
108+
Key.SshPrivateKeyPath,
109+
Key.SshServerPlatform,
110+
Key.SshRemoteAppRoot,
111+
}
112+
113+
@staticmethod
114+
def _strip_ssh_keys(data: dict) -> dict:
115+
if not isinstance(data, dict):
116+
return {}
117+
cleaned = dict(data)
118+
for k in RemoteDataStore._ssh_keys():
119+
cleaned.pop(k, None)
120+
return cleaned
121+
122+
def bootstrap(self) -> tuple[bool, str | None]:
123+
try:
124+
cache_root = Path(self._local_data_root).parent / "remote_cache" / Utils.replace_signs(self._host)
125+
cache_data_root = cache_root / "data"
126+
cache_data_root.mkdir(parents=True, exist_ok=True)
127+
self.cache_data_root = str(cache_data_root)
128+
129+
def _ensure_json_file(local_target: Path, default_obj, expected_type) -> tuple[bool, bool, str | None]:
130+
try:
131+
need_upload = False
132+
if not local_target.exists() or local_target.stat().st_size == 0:
133+
Utils.write_dict_to_file(str(local_target), default_obj)
134+
return True, True, None
135+
136+
data_any = Utils.read_dict_from_json(str(local_target))
137+
if not isinstance(data_any, expected_type):
138+
Utils.write_dict_to_file(str(local_target), default_obj)
139+
need_upload = True
140+
return True, need_upload, None
141+
except Exception as e:
142+
return False, False, str(e)
143+
144+
with SshClient(self._ssh_cfg) as ssh:
145+
code, home_out, home_err = ssh.exec("echo $HOME", timeout_sec=5)
146+
home_dir = (home_out or "").strip()
147+
if code != 0 or not home_dir.startswith("/"):
148+
return False, (home_err or home_out or "无法获取远端 HOME 目录").strip()
149+
self.remote_home_dir = home_dir
150+
151+
if self._remote_app_root_override:
152+
if not self._remote_app_root_override.startswith("/"):
153+
return False, f"远端AppRoot必须为绝对路径(以/开头):{self._remote_app_root_override}"
154+
remote_app_root_abs = self._remote_app_root_override.rstrip("/")
155+
else:
156+
script = "sh -lc 'base=\"${XDG_DATA_HOME:-$HOME/.local/share}\"; echo \"${base}/auto-clock\"'"
157+
code, out, err = ssh.exec(script, timeout_sec=5)
158+
remote_app_root_abs = (out or "").strip().rstrip("/")
159+
if code != 0 or not remote_app_root_abs.startswith("/"):
160+
msg = (err or out or "").strip() or "无法解析远端 AppRoot"
161+
return False, msg
162+
163+
self.remote_app_root_abs = remote_app_root_abs
164+
self.remote_data_root_abs = f"{remote_app_root_abs}/data"
165+
166+
sftp = ssh.sftp()
167+
downloaded_data_json = False
168+
for name in ["data.json", "tasks.json", "runner_result.json"]:
169+
try:
170+
local_target = cache_data_root / name
171+
sftp.get(f"{self.remote_data_root_abs}/{name}", str(local_target))
172+
if name == "data.json":
173+
downloaded_data_json = True
174+
if local_target.exists() and local_target.stat().st_size == 0:
175+
return False, f"下载远端文件为空:{name},本地缓存:{local_target}"
176+
except FileNotFoundError:
177+
try:
178+
local_target = cache_data_root / name
179+
if local_target.exists():
180+
local_target.unlink()
181+
except Exception:
182+
pass
183+
except Exception as e:
184+
return False, f"下载远端文件失败:{name},错误:{e}"
185+
186+
base_data = {}
187+
cache_data_json = cache_data_root / "data.json"
188+
if downloaded_data_json and cache_data_json.exists() and cache_data_json.stat().st_size > 0:
189+
data_any = Utils.read_dict_from_json(str(cache_data_json))
190+
if isinstance(data_any, dict):
191+
base_data.update(self._strip_ssh_keys(data_any))
192+
193+
defaults = self._default_config()
194+
for k, v in defaults.items():
195+
if k not in base_data:
196+
base_data[k] = v
197+
198+
init_targets: list[tuple[str, Path]] = []
199+
200+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "data.json", base_data, dict)
201+
if not ok3:
202+
return False, f"初始化本地缓存文件失败:data.json,错误:{err3}"
203+
if need_up:
204+
init_targets.append(("data.json", cache_data_root / "data.json"))
205+
206+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "tasks.json", [], list)
207+
if not ok3:
208+
return False, f"初始化本地缓存文件失败:tasks.json,错误:{err3}"
209+
if need_up:
210+
init_targets.append(("tasks.json", cache_data_root / "tasks.json"))
211+
212+
ok3, need_up, err3 = _ensure_json_file(cache_data_root / "runner_result.json", {}, dict)
213+
if not ok3:
214+
return False, f"初始化本地缓存文件失败:runner_result.json,错误:{err3}"
215+
if need_up:
216+
init_targets.append(("runner_result.json", cache_data_root / "runner_result.json"))
217+
218+
for remote_name, local_path in init_targets:
219+
try:
220+
ssh.upload_file(str(local_path), f"{self.remote_data_root_abs}/{remote_name}")
221+
except Exception as e:
222+
return False, f"初始化远端文件失败:{remote_name},错误:{e}"
223+
224+
return True, None
225+
except Exception as e:
226+
return False, str(e)
227+
228+
def read_config(self) -> dict:
229+
data = Utils.read_dict_from_json(AppPath.DataJson)
230+
return data if isinstance(data, dict) else {}
231+
232+
def write_config(self, config: dict) -> bool:
233+
try:
234+
Utils.write_dict_to_file(AppPath.DataJson, self._strip_ssh_keys(config if isinstance(config, dict) else {}))
235+
self.sync_file(AppPath.DataJson, "data.json")
236+
return True
237+
except Exception:
238+
return False
239+
240+
def read_tasks(self):
241+
data = Utils.read_dict_from_json(AppPath.TasksJson)
242+
return data if isinstance(data, list) else ([] if data is None else data)
243+
244+
def write_tasks(self, tasks) -> bool:
245+
try:
246+
Utils.write_dict_to_file(AppPath.TasksJson, tasks)
247+
self.sync_file(AppPath.TasksJson, "tasks.json")
248+
return True
249+
except Exception:
250+
return False
251+
252+
def read_runner_result(self) -> dict:
253+
data = Utils.read_dict_from_json(AppPath.RunnerResultJson)
254+
return data if isinstance(data, dict) else {}
255+
256+
def write_runner_result(self, payload: dict) -> bool:
257+
try:
258+
Utils.write_dict_to_file(AppPath.RunnerResultJson, payload if isinstance(payload, dict) else {})
259+
self.sync_file(AppPath.RunnerResultJson, "runner_result.json")
260+
return True
261+
except Exception:
262+
return False
263+
264+
def sync_file(self, local_path: str, remote_filename: str) -> bool:
265+
try:
266+
if not self.remote_data_root_abs:
267+
return False
268+
if not os.path.exists(local_path):
269+
return False
270+
remote_path = f"{self.remote_data_root_abs}/{remote_filename}"
271+
with SshClient(self._ssh_cfg) as ssh:
272+
ssh.upload_file(local_path, remote_path)
273+
return True
274+
except Exception:
275+
return False

0 commit comments

Comments
 (0)