Skip to content

Commit af4f98f

Browse files
committed
feat: GUI 体验打磨 + 数据质量校验
GUI 体验打磨: - 总览/同步/历史三页重构,新视觉规范(#f8f9fb 背景,border 替代 shadow) - 实时产品列表、错误内联显示、阶段耗时拆分 - 重试失败产品、可点击数据目录(macOS) - 后端扩展:四元组返回、progress_callback 扩展签名 数据质量校验: - 10 项检查覆盖文件/内容/时间/覆盖/格式完整性 - repair_data_issues 支持 4 种自动修复动作 - CLI audit 子命令(--fix 模式) - GUI 异步健康检查(确认→进度→结果三状态) - 追加快捷路径改用原子写入(copy+replace) 407 测试通过
1 parent f27d759 commit af4f98f

17 files changed

Lines changed: 3778 additions & 963 deletions

quantclass_sync_internal/cli.py

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
split_products,
6262
utc_now_iso,
6363
)
64-
from .data_query import get_latest_run_summary, get_products_overview
64+
from .data_query import check_data_health, get_latest_run_summary, get_products_overview, repair_data_issues
6565
from .orchestrator import _build_headers, _execute_plans, load_catalog_or_raise, run_update_with_settings
6666
from .reporting import _append_result, _finalize_and_write_report, _new_report, resolve_report_path
6767
from .status_store import (
@@ -836,3 +836,66 @@ def cmd_gui(ctx: typer.Context) -> None:
836836
"""
837837
from .gui import launch_gui
838838
launch_gui()
839+
840+
@app.command("audit")
841+
@command_guard("audit")
842+
def cmd_audit(
843+
ctx: typer.Context,
844+
fix: bool = typer.Option(False, "--fix", help="自动修复可修复问题"),
845+
) -> None:
846+
"""数据质量全面检查"""
847+
command_ctx = _init_command(ctx, "audit")
848+
catalog = load_catalog_or_raise(command_ctx.catalog_file)
849+
data_root = command_ctx.data_root
850+
851+
# CLI 进度回调:在同一行滚动显示当前扫描产品
852+
def cli_progress(current, total, product, phase):
853+
RICH_CONSOLE.print(f" [{current + 1}/{total}] {product}", end="\r")
854+
855+
RICH_CONSOLE.print("[bold]数据质量检查[/bold]\n")
856+
result = check_data_health(data_root, catalog, progress_callback=cli_progress)
857+
RICH_CONSOLE.print()
858+
859+
issues = result["issues"]
860+
summary = result["summary"]
861+
862+
if not issues:
863+
RICH_CONSOLE.print("[green]✓ 数据健康,未发现问题[/green]")
864+
else:
865+
from collections import defaultdict
866+
# 按问题类别分组显示
867+
by_category = defaultdict(list)
868+
for i in issues:
869+
by_category[i["category"]].append(i)
870+
category_names = {
871+
"file_integrity": "文件完整性",
872+
"content_integrity": "内容完整性",
873+
"temporal_integrity": "时间完整性",
874+
"coverage_integrity": "覆盖完整性",
875+
"format_integrity": "格式完整性",
876+
}
877+
for cat, cat_issues in by_category.items():
878+
RICH_CONSOLE.print(f"\n[bold]{category_names.get(cat, cat)}[/bold]")
879+
for i in cat_issues:
880+
sev_icon = "[red]●[/red]" if i["severity"] == "error" else "[yellow]○[/yellow]"
881+
repair_tag = "[green]可修复[/green]" if i["repairable"] else "[dim]需处理[/dim]"
882+
file_info = f" {i['file']}" if i["file"] else ""
883+
RICH_CONSOLE.print(
884+
f" {sev_icon} {i['product']}{file_info} - {i['detail']} {repair_tag}"
885+
)
886+
887+
# 底部汇总行
888+
RICH_CONSOLE.print(
889+
f"\n{summary['scanned_products']} 产品 · {summary['scanned_files']} 文件"
890+
f" · {summary['elapsed_seconds']}s"
891+
f" · [red]{summary['by_severity'].get('error', 0)} 错误[/red]"
892+
f" / [yellow]{summary['by_severity'].get('warning', 0)} 警告[/yellow]"
893+
)
894+
895+
if fix and any(i["repairable"] for i in issues):
896+
RICH_CONSOLE.print("\n[bold]修复可修复问题...[/bold]")
897+
repair_result = repair_data_issues(data_root, issues)
898+
RICH_CONSOLE.print(
899+
f" 已修复 {len(repair_result['repaired'])} 个"
900+
f",失败 {len(repair_result['failed'])} 个"
901+
)

quantclass_sync_internal/constants.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,13 @@
200200
"default": {"max_attempts": 5, "timeout_seconds": 60, "backoff_cap_seconds": 8},
201201
}
202202

203+
META_HEALTH_BASELINE_REL = Path(SYNC_META_DIRNAME) / "log" / "health_baseline.json"
204+
205+
# 财务/公告类产品,日期连续性检查不适用
206+
FINANCIAL_PRODUCTS = {"stock-fin-data-xbx", "stock-fin-pre-fore-data-xbx"}
207+
NOTICE_PRODUCTS = {"stock-notices-title"}
208+
209+
203210
def normalize_product_name(product: str) -> str:
204211
"""
205212
统一产品名写法。

quantclass_sync_internal/csv_engine.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -453,18 +453,30 @@ def _append_csv_rows(
453453
encoding: str,
454454
) -> None:
455455
"""
456-
以 append 模式追加 CSV 数据行(不写表头/note)。
456+
追加行到 CSV,使用 copy+append+replace 保证原子性。
457+
调用方必须持有 _locked_target 文件锁。
457458
458459
前置条件(由调用方保证):
459460
- 文件末尾以 \\n 结尾
460-
- 调用方持有文件锁
461+
- 调用方持有文件锁(.lock 文件锁,非 inode 锁,os.replace 改 inode 安全)
461462
462463
注意(跨层依赖):中断恢复依赖 orchestrator 层在 sync
463464
完整成功后才更新 timestamp。
464465
"""
465-
with target.open("a", encoding=encoding, newline="") as f:
466-
writer = csv.writer(f, delimiter=delimiter, lineterminator="\n")
467-
writer.writerows(rows)
466+
# 先 copy 到临时文件,追加后 replace,崩溃时临时文件留在磁盘但原文件不损坏
467+
tmp_path = target.parent / f".tmp-append-{target.name}"
468+
import shutil
469+
try:
470+
shutil.copy2(target, tmp_path)
471+
with tmp_path.open("a", encoding=encoding, newline="") as f:
472+
writer = csv.writer(f, delimiter=delimiter, lineterminator="\n")
473+
writer.writerows(rows)
474+
os.replace(tmp_path, target)
475+
except BaseException:
476+
# 写入失败时清理临时文件,保留原文件不动
477+
if tmp_path.exists():
478+
tmp_path.unlink()
479+
raise
468480

469481

470482
def merge_payload(existing: Optional[CsvPayload], incoming: CsvPayload, rule: DatasetRule) -> Tuple[CsvPayload, int]:
@@ -661,6 +673,7 @@ def sync_payload_to_target(incoming: CsvPayload, target: Path, rule: DatasetRule
661673
existing_delimiter, rule.encoding,
662674
)
663675
audit.checked_files = 1
676+
audit.append_fast = 1
664677
return "updated", len(incoming.rows), audit
665678

666679
# --- 原有完整合并路径(不变)---

0 commit comments

Comments
 (0)