-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbacktester.py
More file actions
192 lines (180 loc) · 8.23 KB
/
backtester.py
File metadata and controls
192 lines (180 loc) · 8.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import pandas as pd
import numpy as np
from dataclasses import dataclass
from typing import List, Dict, Optional
import logging
# Configure module logger
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
@dataclass
class Order:
ts: pd.Timestamp
side: str # 'buy' or 'sell'
type: str # 'limit' or 'market'
price: Optional[float]
qty: int
tag: str = ''
@dataclass
class Fill:
ts: pd.Timestamp
side: str
price: float
qty: int
fee: float
slippage: float
order_tag: str
pnl: float = 0.0 # realized P&L associated with this fill (could be partial)
class Backtester:
def __init__(self, data: pd.DataFrame, strategy, latency_ms: int = 0, fee_per_share: float = 0.0, slippage_bps: float = 0.0):
self.df = data.copy()
# Ensure datetime index named 'ts'
if 'timestamp' in self.df.columns and not isinstance(self.df.index, pd.DatetimeIndex):
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'], utc=True, errors='coerce')
self.df = self.df.dropna(subset=['timestamp']).set_index('timestamp')
if not isinstance(self.df.index, pd.DatetimeIndex):
# Assume index is timestamp-like already
try:
self.df.index = pd.to_datetime(self.df.index, utc=True)
except Exception:
raise ValueError("Data index must be datetime or include a 'timestamp' column.")
self.df.index.name = 'ts'
# derive mid if missing
if 'mid' not in self.df.columns and {'bid','ask'}.issubset(self.df.columns):
self.df['mid'] = (self.df['bid'] + self.df['ask']) / 2.0
self.strategy = strategy
self.latency = pd.Timedelta(milliseconds=latency_ms)
self.fee_per_share = fee_per_share
self.slippage_bps = slippage_bps
self.orders: List[Order] = []
self.fills: List[Fill] = []
self.position = 0
self.avg_price = 0.0 # average entry price of current open position
self.cash = 0.0
self.realized_pnl = 0.0
self.equity_curve = []
def _apply_slippage(self, price: float, side: str) -> float:
# Adjust price by slippage in basis points
adj = price * (self.slippage_bps / 10000.0)
return price + adj if side == 'buy' else price - adj
def _match_order(self, order: Order, row: pd.Series) -> Optional[Fill]:
# Simulate limit order matching against the order book snapshot
bid, ask = row['bid'], row['ask']
# effective time after latency
if row.name < order.ts + self.latency:
return None
# Market order executes immediately at best price
exec_price = None
if order.type == 'market':
exec_price = ask if order.side == 'buy' else bid
else:
# limit: buy if limit >= ask, sell if limit <= bid
if order.side == 'buy' and order.price is not None and order.price >= ask:
exec_price = ask
elif order.side == 'sell' and order.price is not None and order.price <= bid:
exec_price = bid
if exec_price is None:
return None
exec_price = self._apply_slippage(exec_price, order.side)
fee = self.fee_per_share * order.qty
# slippage here relative to top-of-book
sl = abs(exec_price - (ask if order.side == 'buy' else bid))
return Fill(ts=row.name, side=order.side, price=exec_price, qty=order.qty, fee=fee, slippage=sl, order_tag=order.tag, pnl=0.0)
def _apply_fill_accounting(self, fill: Fill):
# Update position, cash, average price, and realized P&L using average cost method
side = fill.side
qty = fill.qty
price = fill.price
fee = fill.fee
realized = 0.0
if side == 'buy':
if self.position >= 0:
# increasing or opening long
new_pos = self.position + qty
self.avg_price = (self.avg_price * self.position + price * qty) / new_pos if new_pos != 0 else 0.0
self.position = new_pos
self.cash -= price * qty + fee
else:
# reducing short
closing = min(qty, -self.position)
realized += (self.avg_price - price) * closing # short exits when buying
self.position += closing # less negative
self.cash -= price * closing + fee * (closing/qty)
residual = qty - closing
if residual > 0:
# flip to long with residual
self.avg_price = price
self.position += residual
self.cash -= price * residual + fee * (residual/qty)
else: # sell
if self.position <= 0:
# increasing or opening short
new_pos = self.position - qty
# avg_price for short tracks entry price magnitude
self.avg_price = (self.avg_price * (-self.position) + price * qty) / (-new_pos) if new_pos != 0 else 0.0
self.position = new_pos
self.cash += price * qty - fee
else:
# reducing long
closing = min(qty, self.position)
realized += (price - self.avg_price) * closing
self.position -= closing
self.cash += price * closing - fee * (closing/qty)
residual = qty - closing
if residual > 0:
# flip to short with residual
self.avg_price = price
self.position -= residual
self.cash += price * residual - fee * (residual/qty)
self.realized_pnl += realized
fill.pnl = realized
def _update_pnl(self, row: pd.Series):
# Mark-to-market on mid
mid = row.get('mid', (row['bid'] + row['ask'])/2)
unrealized = (mid - self.avg_price) * self.position if self.position >= 0 else (self.avg_price - mid) * (-self.position)
equity = self.cash + unrealized
self.equity_curve.append({'ts': row.name, 'equity': equity, 'position': self.position, 'unrealized': unrealized, 'realized': self.realized_pnl})
def run(self) -> dict:
# Main loop
for ts, row in self.df.iterrows():
# 1) Strategy generates orders
try:
new_orders = self.strategy.on_tick(row, self.position)
except Exception as e:
logger.exception("Strategy on_tick failed at %s: %s", ts, e)
new_orders = []
for o in new_orders:
self.orders.append(Order(ts=ts, **o))
# 2) Try to match existing orders
pending = []
for order in self.orders:
fill = self._match_order(order, row)
if fill:
self._apply_fill_accounting(fill)
self.fills.append(fill)
else:
pending.append(order)
self.orders = pending
# 3) Update P&L
self._update_pnl(row)
# Build results objects
equity_df = pd.DataFrame(self.equity_curve).set_index('ts')
fills_df = pd.DataFrame([f.__dict__ for f in self.fills]) if self.fills else pd.DataFrame(columns=['ts','side','price','qty','fee','slippage','order_tag','pnl'])
orders_open_df = pd.DataFrame([o.__dict__ for o in self.orders]) if self.orders else pd.DataFrame(columns=['ts','side','type','price','qty','tag'])
results = {
'equity': equity_df,
'fills': fills_df,
'orders_open': orders_open_df,
'data': self.df, # provide original order book for plotting stats
'params': {
'latency_ms': int(self.latency / pd.Timedelta(milliseconds=1)),
'fee_per_share': self.fee_per_share,
'slippage_bps': self.slippage_bps,
'strategy': type(self.strategy).__name__,
'strategy_params': {
'lookback': getattr(self.strategy, 'lookback', None),
'threshold': getattr(self.strategy, 'threshold', None),
'max_position': getattr(self.strategy, 'max_position', None)
}
}
}
return results