|
| 1 | +""" |
| 2 | +追加快捷路径 vs 完整合并 基准对比。 |
| 3 | +
|
| 4 | +在真实数据文件的临时副本上测量两条路径的耗时差异。 |
| 5 | +不修改原始数据文件。 |
| 6 | +
|
| 7 | +用法:python3 scripts/benchmark_fast_path.py [--product PRODUCT] [--sample N] |
| 8 | +""" |
| 9 | + |
| 10 | +import argparse |
| 11 | +import json |
| 12 | +import random |
| 13 | +import shutil |
| 14 | +import statistics |
| 15 | +import sys |
| 16 | +import tempfile |
| 17 | +import time |
| 18 | +from pathlib import Path |
| 19 | + |
| 20 | +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) |
| 21 | + |
| 22 | +from quantclass_sync_internal.csv_engine import ( |
| 23 | + read_csv_payload, |
| 24 | + resolve_sort_indices, |
| 25 | + row_sort_key, |
| 26 | + sortable_value, |
| 27 | + sync_payload_to_target, |
| 28 | +) |
| 29 | +from quantclass_sync_internal.models import CsvPayload, DatasetRule, RULES |
| 30 | + |
| 31 | + |
| 32 | +def make_fake_incoming(existing: CsvPayload, rule: DatasetRule) -> CsvPayload: |
| 33 | + """构造 1 行比已有数据更新的 incoming payload。""" |
| 34 | + if not existing.rows or not existing.header: |
| 35 | + return existing |
| 36 | + |
| 37 | + sort_indices = resolve_sort_indices(existing.header, rule) |
| 38 | + if not sort_indices: |
| 39 | + return existing |
| 40 | + |
| 41 | + # 取最后一行,把排序列的日期/时间 +1 单位 |
| 42 | + last_row = list(existing.rows[-1]) |
| 43 | + from datetime import datetime, timedelta |
| 44 | + for idx in sort_indices: |
| 45 | + if idx < len(last_row): |
| 46 | + val = last_row[idx].strip() |
| 47 | + # YYYY-MM-DD HH:MM:SS(含时间) |
| 48 | + if len(val) == 19 and val[4] == "-" and val[10] == " ": |
| 49 | + try: |
| 50 | + dt = datetime.strptime(val, "%Y-%m-%d %H:%M:%S") |
| 51 | + last_row[idx] = (dt + timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S") |
| 52 | + except Exception: |
| 53 | + pass |
| 54 | + # YYYY-MM-DD(纯日期) |
| 55 | + elif len(val) == 10 and val[4] == "-" and val[7] == "-": |
| 56 | + try: |
| 57 | + dt = datetime.strptime(val, "%Y-%m-%d") |
| 58 | + last_row[idx] = (dt + timedelta(days=1)).strftime("%Y-%m-%d") |
| 59 | + except Exception: |
| 60 | + pass |
| 61 | + |
| 62 | + return CsvPayload( |
| 63 | + note=existing.note, |
| 64 | + header=list(existing.header), |
| 65 | + rows=[last_row], |
| 66 | + encoding=existing.encoding, |
| 67 | + delimiter=existing.delimiter, |
| 68 | + ) |
| 69 | + |
| 70 | + |
| 71 | +def benchmark_file(src_path: Path, rule: DatasetRule): |
| 72 | + """对单个文件做快捷路径 vs 完整合并的对比。""" |
| 73 | + existing = read_csv_payload(src_path, preferred_encoding=rule.encoding) |
| 74 | + if not existing.rows or not existing.header: |
| 75 | + return None |
| 76 | + |
| 77 | + incoming = make_fake_incoming(existing, rule) |
| 78 | + if not incoming.rows: |
| 79 | + return None |
| 80 | + |
| 81 | + result = { |
| 82 | + "file": src_path.name, |
| 83 | + "rows": len(existing.rows), |
| 84 | + "size_kb": src_path.stat().st_size / 1024, |
| 85 | + } |
| 86 | + |
| 87 | + with tempfile.TemporaryDirectory() as tmpdir: |
| 88 | + # --- 快捷路径 --- |
| 89 | + fast_target = Path(tmpdir) / "fast.csv" |
| 90 | + shutil.copy2(src_path, fast_target) |
| 91 | + |
| 92 | + t0 = time.perf_counter() |
| 93 | + status_f, added_f, audit_f = sync_payload_to_target( |
| 94 | + incoming, fast_target, rule, dry_run=False |
| 95 | + ) |
| 96 | + result["fast_ms"] = (time.perf_counter() - t0) * 1000 |
| 97 | + result["fast_status"] = status_f |
| 98 | + result["fast_path_hit"] = audit_f.checked_files == 1 and status_f == "updated" |
| 99 | + |
| 100 | + # --- 完整合并(用无 sort_cols 的规则强制走完整路径)--- |
| 101 | + full_target = Path(tmpdir) / "full.csv" |
| 102 | + shutil.copy2(src_path, full_target) |
| 103 | + full_rule = DatasetRule( |
| 104 | + name=rule.name + "-full", |
| 105 | + encoding=rule.encoding, |
| 106 | + has_note=rule.has_note, |
| 107 | + key_cols=rule.key_cols, |
| 108 | + sort_cols=(), # 无排序列 -> 不走快捷路径 |
| 109 | + ) |
| 110 | + |
| 111 | + t0 = time.perf_counter() |
| 112 | + status_m, added_m, audit_m = sync_payload_to_target( |
| 113 | + incoming, full_target, full_rule, dry_run=False |
| 114 | + ) |
| 115 | + result["full_ms"] = (time.perf_counter() - t0) * 1000 |
| 116 | + result["speedup"] = result["full_ms"] / max(result["fast_ms"], 0.01) |
| 117 | + |
| 118 | + return result |
| 119 | + |
| 120 | + |
| 121 | +def main(): |
| 122 | + parser = argparse.ArgumentParser(description="追加快捷路径基准对比") |
| 123 | + parser.add_argument("--product", default="stock-trading-data-pro") |
| 124 | + parser.add_argument("--sample", type=int, default=50) |
| 125 | + parser.add_argument("--data-root", default=None) |
| 126 | + args = parser.parse_args() |
| 127 | + |
| 128 | + if args.data_root: |
| 129 | + data_root = Path(args.data_root) |
| 130 | + else: |
| 131 | + config_path = Path(__file__).resolve().parent.parent / "user_config.json" |
| 132 | + with open(config_path) as f: |
| 133 | + data_root = Path(json.load(f)["data_root"]) |
| 134 | + |
| 135 | + product_dir = data_root / args.product |
| 136 | + if not product_dir.exists(): |
| 137 | + print(f"产品目录不存在: {product_dir}") |
| 138 | + sys.exit(1) |
| 139 | + |
| 140 | + rule = RULES.get(args.product) |
| 141 | + if rule is None: |
| 142 | + for name in RULES: |
| 143 | + if args.product.startswith(name): |
| 144 | + rule = RULES[name] |
| 145 | + break |
| 146 | + if rule is None: |
| 147 | + print(f"未找到产品规则: {args.product}") |
| 148 | + sys.exit(1) |
| 149 | + |
| 150 | + csv_files = sorted([p for p in product_dir.rglob("*.csv") if p.is_file()]) |
| 151 | + total = len(csv_files) |
| 152 | + print(f"产品: {args.product}") |
| 153 | + print(f"规则: sort_cols={rule.sort_cols}, key_cols={rule.key_cols}") |
| 154 | + print(f"CSV 文件: {total}") |
| 155 | + |
| 156 | + n = min(args.sample, total) |
| 157 | + sample = random.sample(csv_files, n) if n < total else csv_files |
| 158 | + print(f"采样: {len(sample)} 文件\n") |
| 159 | + |
| 160 | + results = [] |
| 161 | + for i, path in enumerate(sample): |
| 162 | + try: |
| 163 | + r = benchmark_file(path, rule) |
| 164 | + if r: |
| 165 | + results.append(r) |
| 166 | + if (i + 1) % 10 == 0: |
| 167 | + print(f" 进度: {i + 1}/{len(sample)}") |
| 168 | + except Exception as e: |
| 169 | + print(f" 跳过 {path.name}: {e}") |
| 170 | + |
| 171 | + if not results: |
| 172 | + print("无有效结果") |
| 173 | + return |
| 174 | + |
| 175 | + hit_count = sum(1 for r in results if r["fast_path_hit"]) |
| 176 | + miss_count = len(results) - hit_count |
| 177 | + |
| 178 | + print(f"\n{'=' * 60}") |
| 179 | + print(f"快捷路径命中率: {hit_count}/{len(results)} ({hit_count / len(results) * 100:.0f}%)") |
| 180 | + print(f"{'=' * 60}") |
| 181 | + |
| 182 | + if hit_count > 0: |
| 183 | + hits = [r for r in results if r["fast_path_hit"]] |
| 184 | + fast_times = [r["fast_ms"] for r in hits] |
| 185 | + full_times = [r["full_ms"] for r in hits] |
| 186 | + speedups = [r["speedup"] for r in hits] |
| 187 | + |
| 188 | + print(f"\n命中快捷路径的文件 ({hit_count} 个):") |
| 189 | + print(f" 快捷路径: 均值 {statistics.mean(fast_times):.2f}ms, 中位 {statistics.median(fast_times):.2f}ms") |
| 190 | + print(f" 完整合并: 均值 {statistics.mean(full_times):.1f}ms, 中位 {statistics.median(full_times):.1f}ms") |
| 191 | + print(f" 加速比: 均值 {statistics.mean(speedups):.1f}x, 中位 {statistics.median(speedups):.1f}x") |
| 192 | + |
| 193 | + total_fast = sum(fast_times) |
| 194 | + total_full = sum(full_times) |
| 195 | + print(f"\n 采样总计: 快捷 {total_fast:.0f}ms vs 完整 {total_full:.0f}ms") |
| 196 | + print(f" 推算全量 {total} 文件:") |
| 197 | + est_fast = statistics.mean(fast_times) * total / 1000 |
| 198 | + est_full = statistics.mean(full_times) * total / 1000 |
| 199 | + print(f" 快捷路径: {est_fast:.1f}s") |
| 200 | + print(f" 完整合并: {est_full:.1f}s") |
| 201 | + print(f" 节省: {est_full - est_fast:.1f}s ({(1 - est_fast / est_full) * 100:.0f}%)") |
| 202 | + |
| 203 | + if miss_count > 0: |
| 204 | + misses = [r for r in results if not r["fast_path_hit"]] |
| 205 | + print(f"\n未命中的文件 ({miss_count} 个):") |
| 206 | + for r in misses[:5]: |
| 207 | + print(f" {r['file']}: status={r['fast_status']}") |
| 208 | + |
| 209 | + # 最快和最慢 |
| 210 | + if hit_count >= 5: |
| 211 | + print(f"\n最慢 5 个(快捷路径):") |
| 212 | + for r in sorted(hits, key=lambda x: -x["fast_ms"])[:5]: |
| 213 | + print(f" {r['file']}: {r['rows']}行, 快捷 {r['fast_ms']:.2f}ms vs 完整 {r['full_ms']:.1f}ms ({r['speedup']:.0f}x)") |
| 214 | + |
| 215 | + |
| 216 | +if __name__ == "__main__": |
| 217 | + main() |
0 commit comments