-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmicrostructure.py
More file actions
199 lines (169 loc) · 6.92 KB
/
microstructure.py
File metadata and controls
199 lines (169 loc) · 6.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""Microstructure metrics: OBI, trade delta, absorption, rolling ATR."""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Deque, List, Optional, Tuple
from models.order_book import OrderBookSnapshot
@dataclass
class TradeEvent:
timestamp: datetime
price: float
qty: float
is_buyer_maker: bool # True = seller aggressor, False = buyer aggressor
@dataclass
class MicrostructureSnapshot:
obi: float # 0-1, >0.6 bullish, <0.4 bearish
delta: float # buy_vol - sell_vol (rolling window)
delta_cumulative: float # session cumulative delta
absorption_long: bool # sellers hitting bid but price not dropping
absorption_short: bool # buyers hitting ask but price not rising
spread_pct: float
mid_price: float
atr: Optional[float]
bid_volume: float
ask_volume: float
class MicrostructureEngine:
"""Stateful calculator — call update_book() and update_trade() on each event."""
def __init__(
self,
obi_levels: int = 10,
delta_window_s: float = 30.0,
absorption_window_s: float = 5.0,
absorption_min_sell_vol: float = 0.5,
absorption_max_price_move_pct: float = 0.02,
atr_window: int = 14,
max_spread_pct: float = 0.05,
) -> None:
self._obi_levels = obi_levels
self._delta_window_s = delta_window_s
self._absorption_window_s = absorption_window_s
self._absorption_min_sell_vol = absorption_min_sell_vol
self._absorption_max_price_move_pct = absorption_max_price_move_pct
self._max_spread_pct = max_spread_pct
self._book: Optional[OrderBookSnapshot] = None
self._trades: Deque[TradeEvent] = deque()
self._cumulative_delta: float = 0.0
# ATR: store (high, low, close) per completed second bucket
self._atr_window = atr_window
self._price_highs: Deque[float] = deque(maxlen=atr_window + 1)
self._price_lows: Deque[float] = deque(maxlen=atr_window + 1)
self._price_closes: Deque[float] = deque(maxlen=atr_window + 1)
self._current_bucket_s: Optional[int] = None
self._bucket_high: float = 0.0
self._bucket_low: float = float("inf")
self._bucket_close: float = 0.0
def update_book(self, snapshot: OrderBookSnapshot) -> None:
self._book = snapshot
def update_trade(self, event: TradeEvent) -> None:
self._trades.append(event)
if not event.is_buyer_maker:
self._cumulative_delta += event.qty
else:
self._cumulative_delta -= event.qty
self._update_atr_bucket(event)
def _update_atr_bucket(self, event: TradeEvent) -> None:
bucket = int(event.timestamp.timestamp())
if self._current_bucket_s is None:
self._current_bucket_s = bucket
self._bucket_high = event.price
self._bucket_low = event.price
self._bucket_close = event.price
return
if bucket != self._current_bucket_s:
self._price_highs.append(self._bucket_high)
self._price_lows.append(self._bucket_low)
self._price_closes.append(self._bucket_close)
self._current_bucket_s = bucket
self._bucket_high = event.price
self._bucket_low = event.price
else:
self._bucket_high = max(self._bucket_high, event.price)
self._bucket_low = min(self._bucket_low, event.price)
self._bucket_close = event.price
def _compute_atr(self) -> Optional[float]:
closes = list(self._price_closes)
highs = list(self._price_highs)
lows = list(self._price_lows)
n = min(len(closes), len(highs), len(lows))
if n < 2:
return None
trs: List[float] = []
for i in range(1, n):
tr = max(
highs[i] - lows[i],
abs(highs[i] - closes[i - 1]),
abs(lows[i] - closes[i - 1]),
)
trs.append(tr)
if not trs:
return None
return sum(trs) / len(trs)
def _purge_old_trades(self, now: datetime) -> None:
cutoff = now.timestamp() - self._delta_window_s
while self._trades and self._trades[0].timestamp.timestamp() < cutoff:
self._trades.popleft()
def _rolling_delta(self) -> float:
delta = 0.0
for t in self._trades:
if not t.is_buyer_maker:
delta += t.qty
else:
delta -= t.qty
return delta
def _detect_absorption(self, now: datetime) -> Tuple[bool, bool]:
"""
absorption_long: sellers aggressive but price not falling
absorption_short: buyers aggressive but price not rising
"""
if self._book is None or not self._trades:
return False, False
cutoff = now.timestamp() - self._absorption_window_s
recent = [t for t in self._trades if t.timestamp.timestamp() >= cutoff]
if not recent:
return False, False
sell_vol = sum(t.qty for t in recent if t.is_buyer_maker)
buy_vol = sum(t.qty for t in recent if not t.is_buyer_maker)
if not recent:
return False, False
first_price = recent[0].price
last_price = recent[-1].price
price_move_pct = abs(last_price - first_price) / first_price * 100.0 if first_price else 0.0
absorption_long = (
sell_vol >= self._absorption_min_sell_vol
and sell_vol > buy_vol
and price_move_pct <= self._absorption_max_price_move_pct
)
absorption_short = (
buy_vol >= self._absorption_min_sell_vol
and buy_vol > sell_vol
and price_move_pct <= self._absorption_max_price_move_pct
)
return absorption_long, absorption_short
def snapshot(self) -> Optional[MicrostructureSnapshot]:
if self._book is None:
return None
now = datetime.now(timezone.utc)
self._purge_old_trades(now)
bid_vol = self._book.bid_volume(self._obi_levels)
ask_vol = self._book.ask_volume(self._obi_levels)
total = bid_vol + ask_vol
obi = bid_vol / total if total > 0 else 0.5
delta = self._rolling_delta()
absorption_long, absorption_short = self._detect_absorption(now)
atr = self._compute_atr()
return MicrostructureSnapshot(
obi=obi,
delta=delta,
delta_cumulative=self._cumulative_delta,
absorption_long=absorption_long,
absorption_short=absorption_short,
spread_pct=self._book.spread_pct,
mid_price=self._book.mid_price,
atr=atr,
bid_volume=bid_vol,
ask_volume=ask_vol,
)
@property
def max_spread_pct(self) -> float:
return self._max_spread_pct