Skip to content

Commit 769cb33

Browse files
committed
Move openquant.data processing into Rust via PyO3
1 parent 9c2c0b6 commit 769cb33

6 files changed

Lines changed: 559 additions & 62 deletions

File tree

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
use chrono::NaiveDateTime;
2+
use std::collections::{BTreeMap, HashSet};
3+
4+
#[derive(Debug, Clone, PartialEq)]
5+
pub struct OhlcvRow {
6+
pub timestamp: NaiveDateTime,
7+
pub symbol: String,
8+
pub open: f64,
9+
pub high: f64,
10+
pub low: f64,
11+
pub close: f64,
12+
pub volume: f64,
13+
pub adj_close: f64,
14+
}
15+
16+
#[derive(Debug, Clone, PartialEq)]
17+
pub struct AlignedOhlcvRow {
18+
pub timestamp: NaiveDateTime,
19+
pub symbol: String,
20+
pub open: Option<f64>,
21+
pub high: Option<f64>,
22+
pub low: Option<f64>,
23+
pub close: Option<f64>,
24+
pub volume: Option<f64>,
25+
pub adj_close: Option<f64>,
26+
pub is_missing_bar: bool,
27+
}
28+
29+
#[derive(Debug, Clone, PartialEq)]
30+
pub struct DataQualityReport {
31+
pub row_count: usize,
32+
pub symbol_count: usize,
33+
pub duplicate_key_count: usize,
34+
pub gap_interval_count: usize,
35+
pub ts_min: Option<NaiveDateTime>,
36+
pub ts_max: Option<NaiveDateTime>,
37+
pub rows_removed_by_deduplication: usize,
38+
}
39+
40+
fn sort_rows(rows: &mut [OhlcvRow]) {
41+
rows.sort_by(|a, b| {
42+
a.symbol
43+
.cmp(&b.symbol)
44+
.then_with(|| a.timestamp.cmp(&b.timestamp))
45+
});
46+
}
47+
48+
fn dedupe_rows(rows: &[OhlcvRow], keep_last: bool) -> (Vec<OhlcvRow>, usize) {
49+
if rows.is_empty() {
50+
return (Vec::new(), 0);
51+
}
52+
53+
let mut deduped = Vec::new();
54+
let mut i = 0usize;
55+
while i < rows.len() {
56+
let mut j = i + 1;
57+
while j < rows.len()
58+
&& rows[j].symbol == rows[i].symbol
59+
&& rows[j].timestamp == rows[i].timestamp
60+
{
61+
j += 1;
62+
}
63+
let chosen = if keep_last { &rows[j - 1] } else { &rows[i] };
64+
deduped.push(chosen.clone());
65+
i = j;
66+
}
67+
let removed = rows.len().saturating_sub(deduped.len());
68+
(deduped, removed)
69+
}
70+
71+
pub fn quality_report(rows: &[OhlcvRow], rows_removed_by_deduplication: usize) -> DataQualityReport {
72+
let mut symbol_set = HashSet::new();
73+
let mut duplicate_key_count = 0usize;
74+
let mut key_counts: BTreeMap<(String, NaiveDateTime), usize> = BTreeMap::new();
75+
76+
for row in rows {
77+
symbol_set.insert(row.symbol.clone());
78+
let key = (row.symbol.clone(), row.timestamp);
79+
*key_counts.entry(key).or_insert(0usize) += 1;
80+
}
81+
82+
for count in key_counts.values() {
83+
if *count > 1 {
84+
duplicate_key_count += 1;
85+
}
86+
}
87+
88+
let mut gap_interval_count = 0usize;
89+
let mut last_by_symbol: BTreeMap<String, NaiveDateTime> = BTreeMap::new();
90+
for row in rows {
91+
if let Some(prev) = last_by_symbol.get(&row.symbol) {
92+
if (row.timestamp - *prev).num_seconds() > 24 * 3600 {
93+
gap_interval_count += 1;
94+
}
95+
}
96+
last_by_symbol.insert(row.symbol.clone(), row.timestamp);
97+
}
98+
99+
DataQualityReport {
100+
row_count: rows.len(),
101+
symbol_count: symbol_set.len(),
102+
duplicate_key_count,
103+
gap_interval_count,
104+
ts_min: rows.first().map(|r| r.timestamp),
105+
ts_max: rows.last().map(|r| r.timestamp),
106+
rows_removed_by_deduplication,
107+
}
108+
}
109+
110+
pub fn clean_ohlcv_rows(rows: &[OhlcvRow], keep_last: bool) -> (Vec<OhlcvRow>, DataQualityReport) {
111+
let mut sorted = rows.to_vec();
112+
sort_rows(&mut sorted);
113+
let (deduped, removed) = dedupe_rows(&sorted, keep_last);
114+
let report = quality_report(&deduped, removed);
115+
(deduped, report)
116+
}
117+
118+
pub fn align_calendar_rows(rows: &[OhlcvRow], interval_seconds: i64) -> Result<Vec<AlignedOhlcvRow>, String> {
119+
if interval_seconds <= 0 {
120+
return Err("interval_seconds must be > 0".to_string());
121+
}
122+
let (clean, _) = clean_ohlcv_rows(rows, true);
123+
if clean.is_empty() {
124+
return Ok(Vec::new());
125+
}
126+
127+
let mut by_symbol: BTreeMap<String, Vec<OhlcvRow>> = BTreeMap::new();
128+
for row in clean {
129+
by_symbol.entry(row.symbol.clone()).or_default().push(row);
130+
}
131+
132+
let mut out = Vec::new();
133+
for (symbol, rows_for_symbol) in by_symbol {
134+
if rows_for_symbol.is_empty() {
135+
continue;
136+
}
137+
let start = rows_for_symbol.first().expect("non-empty").timestamp;
138+
let end = rows_for_symbol.last().expect("non-empty").timestamp;
139+
140+
let mut index: BTreeMap<NaiveDateTime, OhlcvRow> = BTreeMap::new();
141+
for row in rows_for_symbol {
142+
index.insert(row.timestamp, row);
143+
}
144+
145+
let mut ts = start;
146+
while ts <= end {
147+
if let Some(row) = index.get(&ts) {
148+
out.push(AlignedOhlcvRow {
149+
timestamp: ts,
150+
symbol: symbol.clone(),
151+
open: Some(row.open),
152+
high: Some(row.high),
153+
low: Some(row.low),
154+
close: Some(row.close),
155+
volume: Some(row.volume),
156+
adj_close: Some(row.adj_close),
157+
is_missing_bar: false,
158+
});
159+
} else {
160+
out.push(AlignedOhlcvRow {
161+
timestamp: ts,
162+
symbol: symbol.clone(),
163+
open: None,
164+
high: None,
165+
low: None,
166+
close: None,
167+
volume: None,
168+
adj_close: None,
169+
is_missing_bar: true,
170+
});
171+
}
172+
ts += chrono::Duration::seconds(interval_seconds);
173+
}
174+
}
175+
Ok(out)
176+
}

crates/openquant/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub mod cla;
55
pub mod codependence;
66
pub mod combinatorial_optimization;
77
pub mod cross_validation;
8+
pub mod data_processing;
89
pub mod data_structures;
910
pub mod ef3m;
1011
pub mod ensemble_methods;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use chrono::{DateTime, NaiveDateTime, Utc};
2+
use openquant::data_processing::{align_calendar_rows, clean_ohlcv_rows, OhlcvRow};
3+
4+
fn ts(seconds: i64) -> NaiveDateTime {
5+
DateTime::<Utc>::from_timestamp(seconds, 0).expect("timestamp").naive_utc()
6+
}
7+
8+
fn sample_rows() -> Vec<OhlcvRow> {
9+
vec![
10+
OhlcvRow {
11+
timestamp: ts(0),
12+
symbol: "AAPL".to_string(),
13+
open: 100.0,
14+
high: 101.0,
15+
low: 99.0,
16+
close: 100.5,
17+
volume: 10.0,
18+
adj_close: 100.5,
19+
},
20+
OhlcvRow {
21+
timestamp: ts(60),
22+
symbol: "AAPL".to_string(),
23+
open: 100.5,
24+
high: 101.2,
25+
low: 100.3,
26+
close: 101.0,
27+
volume: 11.0,
28+
adj_close: 101.0,
29+
},
30+
OhlcvRow {
31+
timestamp: ts(60),
32+
symbol: "AAPL".to_string(),
33+
open: 100.6,
34+
high: 101.3,
35+
low: 100.4,
36+
close: 101.1,
37+
volume: 12.0,
38+
adj_close: 101.1,
39+
},
40+
]
41+
}
42+
43+
#[test]
44+
fn clean_and_align_rows() {
45+
let rows = sample_rows();
46+
let (clean, report) = clean_ohlcv_rows(&rows, true);
47+
assert_eq!(clean.len(), 2);
48+
assert_eq!(report.rows_removed_by_deduplication, 1);
49+
assert_eq!(report.duplicate_key_count, 0);
50+
51+
let aligned = align_calendar_rows(&clean, 60).expect("align");
52+
assert_eq!(aligned.len(), 2);
53+
assert!(!aligned[0].is_missing_bar);
54+
assert!(!aligned[1].is_missing_bar);
55+
}

0 commit comments

Comments
 (0)