实现一个高性能的批量因子中性化系统,使用11个Barra风格因子对大量因子数据进行中性化处理。
- 批量处理: 处理大量因子文件(用户实际场景:1万-10万个文件)
- 因子中性化: 对在当天所有股票中排序后的原始因子值,每天做一次线性回归去除风格因子影响,得到中性化后的因子值。中性化就是使用原始因子做y,11个风格因子做x,做带截距项的回归,得到残差项,就是中性化后的因子
- 截面排序: 每天对原始因子值进行截面排序(等价于pandas的rank(axis=1))
- 并集逻辑: 保留所有在任意日期出现过的股票(如果某天某股票不存在,则该天该股票的因子值为NaN)
- 并行处理: 可以设置并行数量
- 线性回归模型:
factor_neutralized = factor_raw - X * beta - 其中X包括11个风格因子 + 1个截距项(共12个参数)
- 回归方程:
factor_raw = alpha + β1*style1 + β2*style2 + ... + β11*style11 + ε - 中性化结果:
factor_neutralized = ε
- 总处理时间: 11,700文件应在1小时内完成
- 内存使用: 控制在合理范围内,避免内存泄漏
- CPU利用率: 充分利用多核并行
列结构:
- date: int32,格式YYYYMMDD(如20220101)
- stock_code: string,股票代码(如"000001"等)
- value_0 到 value_10: float64,11个Barra风格因子值
数据样例:
| date | code | value_0 | value_1 | ... | value_10 |
|----------|------------|---------|---------|-----|----------|
| 20220101 | 000001 | 0.123 | -0.456 | ... | 0.789 |
| 20220101 | 000002 | 0.234 | 0.567 | ... | -0.123 |
因子数据(*.parquet,例如/nas197/user_home/chenzongwei/hm61_1st_son/amax_bmax_abs_autocorr1_价格中间50perc_fold_max_smooth_10.parquet)
格式:pandas DataFrame,index为date,columns为股票代码
- index: date(int32),交易日期
- columns: 股票代码(如"000001", "000002"等)
- values: float64,因子原始值
数据样例:
| date | 000001 | 000002 | 000003 |
|----------|-----------|-----------|-----------|
| 20220101 | 1.234 | 2.345 | 3.456 |
| 20220102 | 1.345 | 2.456 | 3.567 |
- 因子文件数量: 1万-10万个
- 股票数量: 约5,000只
- 时间跨度: 约2,500个交易日(约10年)
- 股票数量变化: 不同日期股票数量不同
- 数据缺失: 因子数据中可能有NaN值
- 回归要求: 每日至少需要12只股票(11个风格因子+1个截距项)
-
数据加载阶段
- 加载风格数据并按日期分组
- 预计算每日回归矩阵
(X'X)^(-1)X',以免重复计算 - 验证每日股票数量是否足够(≥12只)
-
并行处理阶段
- 并行读取每个因子文件
- 对每个因子文件独立执行完整处理流程
-
单文件处理流程
- 解析因子数据(日期×股票矩阵)
- 构建并集股票列表(所有出现过的股票)
- 按日期循环处理排序和中性化
- 保存结果到parquet文件
目标: 保留所有在任意日期出现过的股票
实现:
1. 收集所有日期的股票列表
2. 取并集而非交集
3. 对于某日期不存在的股票,填充NaN
预计算方案:
- 对每个日期预计算 (X'X)^(-1)X',并缓存,避免重复计算,同时也要注意各个进程不要因为这个预计算的矩阵重复占用内存,以免内存占用太多
- 运行时直接矩阵乘法: β = (X'X)^(-1)X' * y
- 中性化: factor_neutralized = y - X * β
等价于pandas的rank(axis=1):
- 对每行(每个日期)的因子值进行排序
- 返回排名(1到N)
- 处理NaN值和相同值情况
- 在并行计算之外,单独开启一个用于监控计算进度的进程,每个进程计算完成一个,就像监控进度的线程发送一个信号,监控进度的线程实时打印出当前完成了多少个,总共有多少个,已用时多少,预计剩余用时多少
- 主要语言: Rust
- Python绑定: PyO3
- 并行计算: Rayon
- 矩阵计算: nalgebra
- 文件I/O: Arrow + Parquet
- 纯Rust实现: 避免Python回调的性能损耗
- 局部线程池: 避免全局线程池重复初始化死锁
- 内存池复用: 减少内存分配次数
- 流式处理: 避免全量数据加载
#[pyfunction]
pub fn batch_factor_neutralization(
style_data_path: &str, // 风格数据文件路径
factor_files_dir: &str, // 因子文件目录路径(这个路径下存储了1万-10万个因子parquet文件)
output_dir: &str, // 输出目录路径(将中性化之后的因子以同名的parquet文件存储在这个路径下,注意date同样要是index)
num_threads: Option<usize>, // 并行线程数(可选)
) -> PyResult<()>- 文件格式: parquet
- 文件命名: 保持与输入文件相同的名称
- 数据结构: DataFrame,index为整数,包含date列
输出格式:
| date | 000001 | 000002 | 000003 |
|----------|-----------|-----------|-----------|
| 20220101 | 0.123 | 0.456 | NaN |
| 20220102 | 0.234 | 0.567 | 0.789 |
- 清晰错误信息: 提供具体的问题描述和解决建议
- 优雅降级: 跳过无效日期而非整体失败,如果有某个因子文件整体有错误,也不要打断,而是继续处理下一个因子文件,给我打印一条日志信息即可
- 回归矩阵缓存: 相同股票组合复用回归矩阵
- 批量处理: 减少循环次数和函数调用
- 数据结构优化: 使用更高效的数据结构
- 预计算: 尽可能多的预计算减少运行时开销
- 内存池: 复用大矩阵内存
- SIMD: 利用CPU向量化指令
- 缓存友好: 优化内存访问模式
- 负载均衡: 动态任务分配
- 数值正确性: 与pandas/sklearn回归结果一致
- 完整性: 所有股票都被正确处理
- 格式正确: 输出文件格式符合要求
import rust_pyfunc
# 调用因子中性化
import rust_pyfunc
rust_pyfunc.batch_factor_neutralization(
style_data_mmap_path="/home/chenzongwei/database/barra/barra_daily_together_jason.parquet",
factor_file_path="/nas197/user_home/chenzongwei/hm61_1st_son",
output_path="/nas197/user_home/chenzongwei/hm61_1st_son_neutest",
num_threads=40
)- 总处理时间: 11,700文件 ≤1小时
- 内存使用: 合理且稳定
- 数值准确性: 100%正确
- 数据完整性: 所有股票都被保留
- 错误处理: 清晰的错误信息