Skip to content

Commit bd87af9

Browse files
committed
feat: data_root 校验 + timestamp 缺失时 CSV 推断 + 点目录过滤
场景 A - data_root 误指向产品子目录: - config.py: 新增 validate_data_root_not_product_dir(),双信号检测 (timestamp.txt 存在 / >= 2 个日期子目录),错误信息含诊断和修复建议 - ensure_data_root_ready() 内部调用,覆盖全部 CLI 和 GUI 入口 场景 B - 无 timestamp.txt(从官方客户端迁移): - data_query.py: 新增 infer_local_date_from_csv(),按文件名倒序取 20 个 文件提取日期列最大值,用 normalize_data_date 过滤伪日期 - orchestrator.py: 回补模式无 timestamp 时用 CSV 推断基线日期, 推断后复查门控避免无效下载;mirror_unknown 产品不推断 - data_query.py: _check_temporal_integrity 推断模式跳过 #7, #8 用 max_date 作终点避免不同抽样产生虚假缺口 清噪治理: - config.py: discover_local_products 跳过点目录(.cache 等) 测试:21 个新测试覆盖全部场景,428 测试全过
1 parent 60ba3c6 commit bd87af9

7 files changed

Lines changed: 606 additions & 15 deletions

File tree

quantclass_sync_internal/cli.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -686,6 +686,7 @@ def cmd_status(ctx: typer.Context) -> None:
686686
输出所有产品的本地数据日期、落后天数和上次同步结果。
687687
"""
688688
command_ctx = _init_command(ctx, "status")
689+
ensure_data_root_ready(command_ctx.data_root, create_if_missing=False)
689690
catalog = load_catalog_or_raise(command_ctx.catalog_file)
690691

691692
overview = get_products_overview(command_ctx.data_root, catalog)
@@ -845,6 +846,7 @@ def cmd_audit(
845846
) -> None:
846847
"""数据质量全面检查"""
847848
command_ctx = _init_command(ctx, "audit")
849+
ensure_data_root_ready(command_ctx.data_root, create_if_missing=False)
848850
catalog = load_catalog_or_raise(command_ctx.catalog_file)
849851
data_root = command_ctx.data_root
850852

quantclass_sync_internal/config.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,56 @@ def resolve_path_from_config(raw_path: Path, *, config_file: Path) -> Path:
3737
return (config_file.parent / expanded).resolve()
3838

3939

40+
# 日期格式目录检测:用于判断 data_root 是否误指向产品子目录
41+
_DATE_DIR_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$")
42+
43+
44+
def validate_data_root_not_product_dir(data_root: Path) -> None:
45+
"""校验 data_root 是否误指向某个产品子目录(而非数据根目录)。
46+
47+
检测信号(OR 关系,仅查根层不递归):
48+
1. data_root/timestamp.txt 存在 — 我们写的文件,正确 data_root 根层不可能有
49+
2. >= 2 个 YYYY-MM-DD 直接子目录 — 产品目录特征
50+
"""
51+
52+
# 信号 1:根目录下有 timestamp.txt
53+
if (data_root / "timestamp.txt").exists():
54+
parent = data_root.parent
55+
raise RuntimeError(
56+
f"data_root 可能指向了某个产品子目录而非数据根目录。\n"
57+
f" 命中信号:data_root 下存在 timestamp.txt\n"
58+
f" 当前配置:{data_root}\n"
59+
f" 可能正确的上级目录:{parent}\n"
60+
f" 修复方式:重新执行 setup,或编辑 user_config.json 修改 data_root\n"
61+
f" 如果上述路径不正确,请手动指定正确的数据根目录。"
62+
)
63+
64+
# 信号 2:>= 2 个 YYYY-MM-DD 格式的直接子目录
65+
date_dir_count = 0
66+
for item in data_root.iterdir():
67+
if item.is_dir() and _DATE_DIR_RE.match(item.name):
68+
date_dir_count += 1
69+
if date_dir_count >= 2:
70+
parent = data_root.parent
71+
raise RuntimeError(
72+
f"data_root 可能指向了某个产品子目录而非数据根目录。\n"
73+
f" 命中信号:data_root 下有 {date_dir_count}+ 个日期子目录\n"
74+
f" 当前配置:{data_root}\n"
75+
f" 可能正确的上级目录:{parent}\n"
76+
f" 修复方式:重新执行 setup,或编辑 user_config.json 修改 data_root\n"
77+
f" 如果上述路径不正确,请手动指定正确的数据根目录。"
78+
)
79+
80+
4081
def ensure_data_root_ready(data_root: Path, create_if_missing: bool = False) -> Path:
4182
"""校验 data_root;需要时可自动创建目录。"""
4283

4384
data_root = data_root.expanduser().resolve()
4485
if data_root.exists():
4586
if not data_root.is_dir():
4687
raise RuntimeError(f"data_root 不是目录:{data_root}")
88+
# 校验是否误指向产品子目录
89+
validate_data_root_not_product_dir(data_root)
4790
return data_root
4891
if create_if_missing:
4992
data_root.mkdir(parents=True, exist_ok=True)
@@ -287,7 +330,11 @@ def discover_local_products(data_root: Path, catalog_products: Sequence[str]) ->
287330
for item in sorted(data_root.iterdir(), key=lambda x: x.name):
288331
if not item.is_dir():
289332
continue
333+
# 通用排除:点目录(.cache、.quantclass_sync 等)
334+
if item.name.startswith("."):
335+
continue
290336
product_name = normalize_product_name(item.name)
337+
# 精确排除:按产品名过滤(如 log、work 等非数据目录)
291338
if product_name in DISCOVERY_IGNORED_PRODUCTS:
292339
continue
293340
if not _dir_has_data_files(item):

quantclass_sync_internal/data_query.py

Lines changed: 63 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,56 @@
1717
)
1818
from .models import log_error, RULES
1919
from .status_store import (
20-
read_local_timestamp_date, read_or_backfill_product_last_status,
20+
normalize_data_date, read_local_timestamp_date,
21+
read_or_backfill_product_last_status,
2122
report_dir_path, status_db_path, PRODUCT_LAST_STATUS_FILE,
2223
)
2324

25+
# --- CSV 日期推断 ---
26+
27+
28+
def infer_local_date_from_csv(data_root: Path, product: str, rule) -> Optional[str]:
29+
"""从 CSV 数据内容推断本地最新日期(当 timestamp.txt 缺失时使用)。
30+
31+
策略:按文件名倒序取最后 20 个文件,提取日期列最大值。
32+
尽力推断而非精确,推断偏低时只会多回补几天,无数据损坏风险。
33+
无 CSV 文件或无 rule 时返回 None(真正的首次同步)。
34+
"""
35+
if not rule:
36+
return None
37+
38+
# 确定日期列:优先 date_filter_col,其次 sort_cols 第一列
39+
date_col = rule.date_filter_col or (rule.sort_cols[0] if rule.sort_cols else None)
40+
if not date_col:
41+
return None
42+
43+
product_dir = data_root / product
44+
csv_files = _list_csv_files(product_dir)
45+
if not csv_files:
46+
return None
47+
48+
# 按文件名倒序排,取最后 20 个(比纯随机更可靠地覆盖最新日期)
49+
csv_files_sorted = sorted(csv_files, key=lambda f: f.name, reverse=True)
50+
samples = csv_files_sorted[:20]
51+
52+
max_date = None
53+
for f in samples:
54+
try:
55+
header, rows = _read_csv_full(f, rule)
56+
if not header or not rows or date_col not in header:
57+
continue
58+
idx = header.index(date_col)
59+
for row in rows:
60+
if idx < len(row) and row[idx].strip():
61+
# normalize_data_date 校验格式合法性,过滤伪日期
62+
d = normalize_data_date(row[idx].strip()[:10])
63+
if d and (max_date is None or d > max_date):
64+
max_date = d
65+
except Exception:
66+
continue
67+
return max_date
68+
69+
2470
# --- 产品状态总览 ---
2571

2672
# 状态颜色阈值(自然日)
@@ -647,8 +693,14 @@ def _check_temporal_integrity(data_root, product, rule, trading_calendar):
647693
return issues
648694

649695
ts_date = read_local_timestamp_date(data_root, product)
696+
inferred_mode = False
650697
if not ts_date:
651-
return issues
698+
# 无 timestamp 时从 CSV 推断,启用推断模式
699+
inferred = infer_local_date_from_csv(data_root, product, rule)
700+
if not inferred:
701+
return issues # 真正无数据,跳过
702+
ts_date = inferred
703+
inferred_mode = True
652704

653705
# 确定日期列:优先 date_filter_col,其次 sort_cols 第一列
654706
date_col = rule.date_filter_col or (rule.sort_cols[0] if rule.sort_cols else None)
@@ -660,9 +712,9 @@ def _check_temporal_integrity(data_root, product, rule, trading_calendar):
660712
if not csv_files:
661713
return issues
662714

663-
# #7 timestamp-数据日期一致性:CSV 最大日期不应超过 timestamp,也不应远落后
715+
# #7 timestamp-数据日期一致性:推断模式跳过(endpoint 和 max_date 来自同一数据源,比较无意义)
664716
max_date = _sample_max_date(csv_files, rule, date_col, sample_size=20)
665-
if max_date:
717+
if max_date and not inferred_mode:
666718
if max_date > ts_date:
667719
issues.append(_issue("date_exceeds_timestamp", "error", "temporal_integrity",
668720
product, f"CSV 最大日期 {max_date} > timestamp {ts_date}",
@@ -677,9 +729,12 @@ def _check_temporal_integrity(data_root, product, rule, trading_calendar):
677729
f"timestamp {ts_date} 远超数据最大日期 {max_date}(差 {gap_days} 天)",
678730
"", False, "needs_resync"))
679731

680-
# #8 日期连续性:用 min_date 作为起点,ts_date 作为终点,检查期间是否有缺失日期
732+
# #8 日期连续性:用 min_date 作为起点检查期间是否有缺失日期
733+
# 推断模式下用 max_date(_sample_max_date 随机抽样)作为终点,
734+
# 避免用 inferred(文件名倒序抽样)当终点时 end > max_date 产生虚假缺口
681735
if not max_date:
682736
return issues
737+
end_date = max_date if inferred_mode else ts_date
683738

684739
# 抽样最小日期,作为连续性检查的起始点
685740
min_date = _sample_min_date(csv_files, rule, date_col, sample_size=20)
@@ -690,13 +745,13 @@ def _check_temporal_integrity(data_root, product, rule, trading_calendar):
690745
expected = None
691746
if is_crypto:
692747
# 加密货币全天候交易,期望每日都有数据
693-
expected = _generate_calendar_days(min_date, ts_date)
748+
expected = _generate_calendar_days(min_date, end_date)
694749
elif product in BUSINESS_DAY_ONLY_PRODUCTS and trading_calendar:
695750
# A 股交易日产品,用精确交易日历
696-
expected = {d for d in trading_calendar if min_date <= d <= ts_date}
751+
expected = {d for d in trading_calendar if min_date <= d <= end_date}
697752
elif product in BUSINESS_DAY_ONLY_PRODUCTS:
698753
# 无交易日历时降级为工作日近似(节假日可能误报)
699-
expected = _generate_weekdays(min_date, ts_date)
754+
expected = _generate_weekdays(min_date, end_date)
700755

701756
if expected is None:
702757
return issues

quantclass_sync_internal/gui/api.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from typing import Any, Dict, Optional, Tuple
1717

1818
from ..config import (
19+
ensure_data_root_ready,
1920
load_secrets_from_file,
2021
load_user_config_or_raise,
2122
resolve_credentials_for_update,
@@ -170,6 +171,12 @@ def get_overview(self) -> Dict[str, Any]:
170171
if err:
171172
return {"ok": False, "error": err}
172173

174+
# 校验 data_root 是否合法(拦截误指向产品子目录)
175+
try:
176+
ensure_data_root_ready(data_root, create_if_missing=False)
177+
except RuntimeError as exc:
178+
return {"ok": False, "error": str(exc)}
179+
173180
# 获取产品状态列表
174181
try:
175182
raw_products = get_products_overview(data_root, catalog)
@@ -539,6 +546,7 @@ def progress_cb(current, total, product, phase):
539546
"current": current, "total": total, "product": product,
540547
})
541548

549+
ensure_data_root_ready(data_root, create_if_missing=False)
542550
result = check_data_health(data_root, catalog, progress_callback=progress_cb)
543551
with self._lock:
544552
self._health_progress["checking"] = False
@@ -594,6 +602,12 @@ def check_updates(self) -> Dict[str, Any]:
594602
if err:
595603
return {"ok": False, "error": err}
596604

605+
# 校验 data_root 是否合法
606+
try:
607+
ensure_data_root_ready(data_root, create_if_missing=False)
608+
except RuntimeError as exc:
609+
return {"ok": False, "error": str(exc)}
610+
597611
if not catalog:
598612
return {
599613
"ok": True, "products": [],

quantclass_sync_internal/orchestrator.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
HTTP_ATTEMPTS_BY_PRODUCT,
5656
HTTP_FAILURES_BY_PRODUCT,
5757
)
58+
from .data_query import infer_local_date_from_csv
5859
from .file_sync import sync_from_extract
5960
from .models import (
6061
CommandContext,
@@ -64,6 +65,7 @@
6465
ProductRunResult,
6566
ProductStatus,
6667
ProductSyncError,
68+
RULES,
6769
RunReport,
6870
SyncStats,
6971
log_debug,
@@ -557,11 +559,49 @@ def _resolve_requested_dates_for_plan(
557559
return [api_latest_date], False
558560
return [""], False
559561

560-
# 回补模式:无本地基线时只能跑 latest 一次。
562+
# 回补模式:无 timestamp 时尝试从 CSV 数据推断基线
561563
if not local_date:
562-
if api_latest_date:
563-
return [api_latest_date], False
564-
return [""], False
564+
# mirror_unknown 产品无 rule,infer 内部返回 None 后走 latest-only 路径
565+
rule = RULES.get(product_name)
566+
inferred = infer_local_date_from_csv(
567+
command_ctx.data_root, product_name, rule
568+
)
569+
if not inferred:
570+
# 真正的首次同步或无 rule 产品,只下载 latest
571+
if api_latest_date:
572+
return [api_latest_date], False
573+
return [""], False
574+
# 推断后复查门控:已是最新则跳过
575+
if should_skip_by_timestamp(inferred, api_latest_date):
576+
elapsed = time.time() - t_product_start
577+
with lock if lock is not None else contextlib.nullcontext():
578+
_append_result(
579+
report,
580+
product=product_name,
581+
status="skipped",
582+
strategy=plan.strategy,
583+
reason_code=REASON_UP_TO_DATE,
584+
date_time=api_latest_date or "",
585+
mode="gate",
586+
elapsed=elapsed,
587+
error=f"CSV 推断日期已是最新(inferred={inferred}, api={api_latest_date})。",
588+
)
589+
log_info(
590+
f"[{product_name}] CSV 推断门控命中,跳过更新。",
591+
event="SYNC_SKIP",
592+
inferred_date=inferred,
593+
api_latest_date=api_latest_date,
594+
decision="skip",
595+
)
596+
return [], True
597+
# 有 CSV 数据但无 timestamp,用推断日期走正常回补
598+
local_date = inferred
599+
log_info(
600+
f"[{product_name}] 无 timestamp,从 CSV 推断基线日期。",
601+
event="PRODUCT_PLAN",
602+
inferred_date=inferred,
603+
decision="infer_baseline",
604+
)
565605

566606
catchup_dates = _normalize_date_queue(api_latest_candidates, product=product_name, local_date=local_date)
567607
if _should_probe_fallback(

tests/test_check_updates.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,24 @@ def test_config_error_returns_not_ok(self, mock_config):
2020
self.assertFalse(result["ok"])
2121
self.assertIn("配置", result["error"])
2222

23+
@patch('quantclass_sync_internal.gui.api.ensure_data_root_ready')
2324
@patch('quantclass_sync_internal.gui.api.resolve_credentials_for_update')
2425
@patch.object(SyncApi, '_resolve_config')
25-
def test_missing_credentials_returns_not_ok(self, mock_config, mock_creds):
26+
def test_missing_credentials_returns_not_ok(self, mock_config, mock_creds, _mock_ready):
2627
"""凭证缺失时返回 ok=False,不发起 API 请求。"""
2728
mock_config.return_value = (MagicMock(), MagicMock(), ["product-a"], None)
2829
mock_creds.return_value = ("", "", "none")
2930
result = self.api.check_updates()
3031
self.assertFalse(result["ok"])
3132

3233

34+
@patch('quantclass_sync_internal.gui.api.ensure_data_root_ready')
3335
@patch('quantclass_sync_internal.gui.api.get_latest_time')
3436
@patch('quantclass_sync_internal.gui.api.resolve_credentials_for_update')
3537
@patch('quantclass_sync_internal.gui.api.get_products_overview')
3638
@patch.object(SyncApi, '_resolve_config')
3739
def test_partial_failure_returns_ok_with_failed_count(
38-
self, mock_config, mock_overview, mock_creds, mock_latest,
40+
self, mock_config, mock_overview, mock_creds, mock_latest, _mock_ready,
3941
):
4042
"""部分产品 API 失败时,成功的正常返回,failed 计数正确。"""
4143
mock_config.return_value = (
@@ -67,10 +69,11 @@ def _side_effect(api_base, product, hid, headers):
6769
self.assertEqual(by_name["product-a"]["source"], "api")
6870
self.assertEqual(by_name["product-b"]["source"], "cached")
6971

72+
@patch('quantclass_sync_internal.gui.api.ensure_data_root_ready')
7073
@patch('quantclass_sync_internal.gui.api.get_latest_time')
7174
@patch('quantclass_sync_internal.gui.api.resolve_credentials_for_update')
7275
@patch.object(SyncApi, '_resolve_config')
73-
def test_401_returns_not_ok(self, mock_config, mock_creds, mock_latest):
76+
def test_401_returns_not_ok(self, mock_config, mock_creds, mock_latest, _mock_ready):
7477
"""401 全局错误时立即返回 ok=False。"""
7578
from quantclass_sync_internal.models import FatalRequestError
7679

0 commit comments

Comments
 (0)