Skip to content

Commit 0723803

Browse files
committed
feat: API 日期缓存 + 同步前并发预取,消除 Plan 阶段串行瓶颈
1 parent b0469e2 commit 0723803

6 files changed

Lines changed: 490 additions & 23 deletions

File tree

quantclass_sync_internal/constants.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@
206206
FINANCIAL_PRODUCTS = {"stock-fin-data-xbx", "stock-fin-pre-fore-data-xbx"}
207207
NOTICE_PRODUCTS = {"stock-notices-title"}
208208

209+
# API 日期缓存 TTL(秒),5 分钟内的缓存直接使用,过期则回退 HTTP 查询
210+
API_DATE_CACHE_TTL_SECONDS = 300
211+
209212

210213
def normalize_product_name(product: str) -> str:
211214
"""

quantclass_sync_internal/orchestrator.py

Lines changed: 162 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import threading
99
import traceback
1010
import time
11-
from concurrent.futures import ThreadPoolExecutor, as_completed
11+
from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeoutError, as_completed
1212
from datetime import date, datetime, timedelta
1313
from pathlib import Path
1414
from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple
@@ -25,6 +25,7 @@
2525
validate_run_mode,
2626
)
2727
from .constants import (
28+
API_DATE_CACHE_TTL_SECONDS,
2829
BUSINESS_DAY_ONLY_PRODUCTS,
2930
PREPROCESS_PRODUCT,
3031
PREPROCESS_TRIGGER_PRODUCTS,
@@ -85,17 +86,31 @@
8586
)
8687
from .status_store import (
8788
export_status_json,
89+
load_api_latest_dates,
8890
load_product_status,
8991
normalize_data_date,
9092
open_status_db,
9193
read_local_timestamp_date,
9294
report_dir_path,
9395
should_skip_by_timestamp,
9496
status_json_path,
97+
update_api_latest_dates,
9598
upsert_product_status,
9699
write_local_timestamp,
97100
)
98101

102+
103+
def _is_cache_fresh(checked_at_str: str) -> bool:
104+
"""检查缓存时间戳是否在 TTL 内。旧格式(无 T)视为过期。"""
105+
if "T" not in checked_at_str:
106+
return False
107+
try:
108+
checked_at = datetime.strptime(checked_at_str, "%Y-%m-%dT%H:%M:%S")
109+
return (datetime.now() - checked_at).total_seconds() < API_DATE_CACHE_TTL_SECONDS
110+
except ValueError:
111+
return False
112+
113+
99114
def process_product(
100115
plan: ProductPlan,
101116
date_time: Optional[str],
@@ -483,6 +498,7 @@ def _resolve_requested_dates_for_plan(
483498
t_product_start: float,
484499
catch_up_to_latest: bool = False,
485500
lock: Optional[threading.Lock] = None,
501+
api_date_cache: Optional[Dict[str, Tuple[str, str]]] = None,
486502
) -> Tuple[List[str], bool]:
487503
"""
488504
解析单产品执行日期列表,并处理 timestamp 门控。
@@ -498,23 +514,46 @@ def _resolve_requested_dates_for_plan(
498514
return [requested_date_for_plan], False
499515

500516
product_name = normalize_product_name(plan.name)
501-
try:
502-
# 1) 读取 API 可用日期列表(latest)
503-
api_latest_candidates = get_latest_times(
504-
api_base=command_ctx.api_base.rstrip("/"),
505-
product=product_name,
506-
hid=hid,
507-
headers=headers,
508-
)
509-
except Exception as exc:
510-
# latest 获取失败时保持 fail-open,继续执行旧兜底路径。
511-
log_info(
512-
f"[{plan.name}] timestamp 门控异常,回退执行更新。",
513-
event="PRODUCT_PLAN",
514-
decision="fallback_run",
515-
error=str(exc),
516-
)
517-
return [requested_date_for_plan], False
517+
518+
# 缓存检查:check_updates 已查过且未过期时跳过 HTTP
519+
cache_hit = False
520+
api_latest_candidates: List[str] = []
521+
if api_date_cache:
522+
cached = api_date_cache.get(product_name) or api_date_cache.get(plan.name)
523+
if cached:
524+
cached_date, checked_at_str = cached
525+
if _is_cache_fresh(checked_at_str):
526+
# 计算缓存年龄用于日志
527+
try:
528+
checked_at = datetime.strptime(checked_at_str, "%Y-%m-%dT%H:%M:%S")
529+
age_seconds = (datetime.now() - checked_at).total_seconds()
530+
except ValueError:
531+
age_seconds = 0.0
532+
log_info(
533+
f"[{plan.name}] 使用缓存 API 日期 {cached_date}{int(age_seconds)}s 前查询)",
534+
event="PRODUCT_PLAN", decision="cache_hit",
535+
)
536+
api_latest_candidates = [cached_date]
537+
cache_hit = True
538+
539+
if not cache_hit:
540+
try:
541+
# 1) 读取 API 可用日期列表(latest)
542+
api_latest_candidates = get_latest_times(
543+
api_base=command_ctx.api_base.rstrip("/"),
544+
product=product_name,
545+
hid=hid,
546+
headers=headers,
547+
)
548+
except Exception as exc:
549+
# latest 获取失败时保持 fail-open,继续执行旧兜底路径。
550+
log_info(
551+
f"[{plan.name}] timestamp 门控异常,回退执行更新。",
552+
event="PRODUCT_PLAN",
553+
decision="fallback_run",
554+
error=str(exc),
555+
)
556+
return [requested_date_for_plan], False
518557

519558
# latest 语义保持原样:这里不做业务日裁剪,只做规范化和去重排序。
520559
api_latest_candidates = _normalize_date_queue(
@@ -863,6 +902,101 @@ def _maybe_run_coin_preprocess(
863902
# 无论哪个分支退出,统一在此累加后处理阶段耗时
864903
report.phase_postprocess_seconds += max(0.0, time.time() - phase_start)
865904

905+
def _prefetch_api_dates(
906+
products: List[str],
907+
command_ctx: "CommandContext",
908+
hid: str,
909+
headers: Dict[str, str],
910+
max_workers: int = 8,
911+
) -> Dict[str, Tuple[str, str]]:
912+
"""并发预取产品的 API 最新日期,写入缓存并返回。
913+
914+
已在缓存中且未过期的产品跳过。失败的产品静默跳过,
915+
Plan 阶段会回退到逐产品 HTTP 查询。
916+
"""
917+
api_base = command_ctx.api_base.rstrip("/")
918+
log_dir = report_dir_path(command_ctx.data_root)
919+
# 1. 读现有缓存,筛出需要查询的产品
920+
existing_cache = load_api_latest_dates(log_dir)
921+
uncached = []
922+
for product in products:
923+
cached = existing_cache.get(product)
924+
if cached:
925+
_, checked_at_str = cached
926+
if _is_cache_fresh(checked_at_str):
927+
continue # 缓存新鲜,跳过
928+
uncached.append(product)
929+
930+
if not uncached:
931+
log_info(
932+
f"[预取] 全部 {len(products)} 个产品缓存命中,跳过 HTTP",
933+
event="PREFETCH", decision="all_cached",
934+
)
935+
return existing_cache
936+
937+
# 2. 并发预取未命中的产品
938+
log_info(
939+
f"[预取] 并发查询 {len(uncached)}/{len(products)} 个产品",
940+
event="PREFETCH", decision="fetching",
941+
)
942+
fetched: Dict[str, str] = {} # 写入仅在主线程的 as_completed 循环内,无并发写入
943+
# abort_event 只能拦截尚未开始的 worker,已在执行的请求会自然完成或超时
944+
abort_event = threading.Event()
945+
t_start = time.time()
946+
947+
def _fetch_one(product: str) -> Tuple[str, Optional[str]]:
948+
"""单产品 HTTP 查询,401/403 触发全局中止。"""
949+
if abort_event.is_set():
950+
return product, None
951+
try:
952+
date_str = get_latest_time(api_base, product, hid, headers)
953+
return product, date_str
954+
except FatalRequestError as exc:
955+
# 认证失败时中止整个预取
956+
if exc.status_code in (401, 403):
957+
abort_event.set()
958+
return product, None
959+
except Exception:
960+
return product, None
961+
962+
effective_workers = min(max_workers, len(uncached))
963+
executor = ThreadPoolExecutor(max_workers=effective_workers)
964+
try:
965+
futures = {executor.submit(_fetch_one, p): p for p in uncached}
966+
for future in as_completed(futures, timeout=30):
967+
try:
968+
product, date_str = future.result()
969+
if date_str:
970+
fetched[product] = date_str
971+
except Exception:
972+
pass
973+
if abort_event.is_set():
974+
break
975+
except (TimeoutError, FuturesTimeoutError):
976+
log_info("[预取] 超时,放弃剩余查询", event="PREFETCH", decision="timeout")
977+
finally:
978+
executor.shutdown(wait=False, cancel_futures=True)
979+
980+
elapsed = time.time() - t_start
981+
log_info(
982+
f"[预取] 完成,成功 {len(fetched)}/{len(uncached)},耗时 {elapsed:.1f}s",
983+
event="PREFETCH", decision="done",
984+
)
985+
986+
# 3. 持久化并返回内存合并的缓存(不重读文件,避免竞争和过期条目泄漏)
987+
if fetched:
988+
try:
989+
update_api_latest_dates(log_dir, fetched)
990+
except Exception:
991+
pass
992+
# 合并:保留新鲜的已有缓存 + 刚预取的结果
993+
checked_at_now = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
994+
merged: Dict[str, Tuple[str, str]] = dict(existing_cache)
995+
for product, date_str in fetched.items():
996+
merged[product] = (date_str, checked_at_now)
997+
return merged
998+
999+
8661000
def _execute_plans(
8671001
plans: Sequence[ProductPlan],
8681002
command_ctx: CommandContext,
@@ -896,6 +1030,15 @@ def _execute_plans(
8961030
has_error = False
8971031
t_run_start = time.time()
8981032

1033+
# 并发预取所有产品的 API 最新日期,写入缓存供 Plan 阶段命中(替代单次 load_api_latest_dates)
1034+
product_names = [normalize_product_name(p.name) for p in plans]
1035+
_api_date_cache = _prefetch_api_dates(
1036+
products=product_names,
1037+
command_ctx=command_ctx,
1038+
hid=hid,
1039+
headers=headers,
1040+
)
1041+
8991042
# stop-on-error 要求严格顺序控制,强制串行
9001043
effective_workers = max(1, max_workers) if not command_ctx.stop_on_error else 1
9011044
# 保护共享状态的互斥锁(串行时无竞争,开销可忽略)
@@ -922,6 +1065,7 @@ def _run_one_plan(plan: ProductPlan) -> Tuple[bool, float, SyncStats, str, str]:
9221065
t_product_start=t_product_start,
9231066
catch_up_to_latest=catch_up_to_latest,
9241067
lock=_lock,
1068+
api_date_cache=_api_date_cache,
9251069
)
9261070
with _lock:
9271071
report.phase_plan_seconds += max(0.0, time.time() - t_plan_phase)

quantclass_sync_internal/status_store.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from contextlib import contextmanager
1111
from datetime import datetime
1212
from pathlib import Path
13-
from typing import Dict, IO, Iterator, List, Optional
13+
from typing import Dict, IO, Iterator, List, Optional, Tuple
1414

1515
try:
1616
import fcntl
@@ -32,6 +32,10 @@
3232

3333
_RUN_SCOPE_PATTERN = re.compile(r"^\d{8}-\d{6}(?:[-_].+)?$")
3434

35+
# 产品最后状态写入来源标识常量
36+
_SOURCE_API_CHECK = "api_check"
37+
_SOURCE_SYNC = "sync"
38+
3539

3640
def _status_db_has_rows(path: Path) -> bool:
3741
"""判断状态库是否可用(存在 product_status 且至少 1 行)。"""
@@ -471,7 +475,8 @@ def _update_product_last_status(log_dir: Path, report: RunReport) -> None:
471475
"reason_code": item.reason_code,
472476
"error": item.error,
473477
"date_time": item.date_time,
474-
"checked_at": datetime.now().strftime("%Y-%m-%d"),
478+
"checked_at": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
479+
"source": _SOURCE_SYNC,
475480
}
476481
# 原子写入
477482
with atomic_temp_path(status_path, tag="last_status") as tmp:
@@ -499,12 +504,46 @@ def update_api_latest_dates(log_dir: Path, api_latest_dates: Dict[str, str]) ->
499504
existing = _scan_reports_for_backfill(log_dir)
500505
else:
501506
existing = _scan_reports_for_backfill(log_dir)
502-
checked_at = datetime.now().strftime("%Y-%m-%d")
507+
checked_at = datetime.now().strftime("%Y-%m-%dT%H:%M:%S")
503508
for product, date_str in api_latest_dates.items():
504509
if product in existing:
505510
existing[product]["date_time"] = date_str
506511
existing[product]["checked_at"] = checked_at
512+
existing[product]["source"] = _SOURCE_API_CHECK
507513
else:
508-
existing[product] = {"status": "", "reason_code": "", "error": "", "date_time": date_str, "checked_at": checked_at}
514+
existing[product] = {
515+
"status": "", "reason_code": "", "error": "",
516+
"date_time": date_str, "checked_at": checked_at,
517+
"source": _SOURCE_API_CHECK,
518+
}
509519
with atomic_temp_path(status_path, tag="last_status") as tmp:
510520
tmp.write_text(json.dumps(existing, ensure_ascii=False, indent=2), encoding="utf-8")
521+
522+
523+
def load_api_latest_dates(log_dir: Path) -> Dict[str, Tuple[str, str]]:
524+
"""读取 product_last_status.json 中由 check_updates 写入的 API 最新日期缓存。
525+
526+
只返回 source=="api_check" 的记录(排除同步结果写入的记录)。
527+
返回 {product: (date_time, checked_at)}。
528+
文件不存在或解析失败时返回空字典(静默降级)。
529+
"""
530+
status_path = log_dir / PRODUCT_LAST_STATUS_FILE
531+
if not status_path.exists():
532+
return {}
533+
try:
534+
data = json.loads(status_path.read_text(encoding="utf-8"))
535+
except (OSError, json.JSONDecodeError):
536+
return {}
537+
if not isinstance(data, dict):
538+
return {}
539+
result: Dict[str, Tuple[str, str]] = {}
540+
for product, info in data.items():
541+
if not isinstance(info, dict):
542+
continue
543+
# 只读取 check_updates 写入的记录,排除同步结果
544+
if info.get("source") != _SOURCE_API_CHECK:
545+
continue
546+
dt, ca = info.get("date_time"), info.get("checked_at")
547+
if isinstance(dt, str) and isinstance(ca, str):
548+
result[product] = (dt, ca)
549+
return result

tests/test_integration.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
open_status_db,
3333
load_product_status,
3434
read_local_timestamp_date,
35+
report_dir_path,
3536
resolve_runtime_paths,
3637
write_local_timestamp,
38+
PRODUCT_LAST_STATUS_FILE,
3739
)
3840
from quantclass_sync_internal.file_sync import sync_known_product
3941
from quantclass_sync_internal.constants import TIMESTAMP_FILE_NAME
@@ -172,6 +174,10 @@ def _run_execute_plans(
172174
"""
173175
report = _new_report("test-integ-001", mode="network")
174176

177+
# 清除 API 日期缓存,确保每次调用模拟独立运行(_prefetch_api_dates 不命中旧缓存)
178+
cache_file = report_dir_path(ctx.data_root) / PRODUCT_LAST_STATUS_FILE
179+
cache_file.unlink(missing_ok=True)
180+
175181
save_file_mock = make_save_file_mock(date_to_content)
176182

177183
with (

0 commit comments

Comments
 (0)