-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindicators.py
More file actions
157 lines (129 loc) · 5.37 KB
/
indicators.py
File metadata and controls
157 lines (129 loc) · 5.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
"""Technical indicators matching the EasyLanguage strategy spec (Chapter 2).
RSI and TRIX formulas follow the exact definitions provided; XAverage uses
multiplier 1/Len (Wilder-style), not 2/(Len+1).
"""
from __future__ import annotations
import math
from collections import deque
from typing import Optional
from config import Config
def _default_rsi_len() -> int:
return Config().RSI_LEN
def _default_trix_len() -> int:
return Config().TRIX_LEN
class RSIIndicator:
"""RSI per spec: var0/var1 ratio smoothed with Wilder-style updates."""
def __init__(self, length: Optional[int] = None) -> None:
self._len = int(length if length is not None else _default_rsi_len())
if self._len < 1:
raise ValueError("RSI length must be >= 1")
self._prices: deque[float] = deque()
self._var0: Optional[float] = None
self._var1: Optional[float] = None
self._prev_price: Optional[float] = None
self._initialized = False
def _compute_initial(self) -> None:
"""Bar-1 init: var0 from Len-bar deltas; var1 from SMA of |1-bar deltas|."""
p = list(self._prices)
n = len(p)
if n < 2 * self._len:
return
s0 = 0.0
s1 = 0.0
for i in range(self._len, 2 * self._len):
s0 += p[i] - p[i - self._len]
s1 += abs(p[i] - p[i - 1])
self._var0 = s0 / self._len
self._var1 = s1 / self._len
self._prev_price = p[-1]
self._initialized = True
self._prices.clear()
def update(self, price: float) -> Optional[float]:
"""Append one price; return RSI or None until warmup is complete."""
self._prices.append(price)
if not self._initialized:
if len(self._prices) < 2 * self._len:
return None
self._compute_initial()
return self._rsi_value()
assert self._prev_price is not None and self._var0 is not None and self._var1 is not None
var2 = price - self._prev_price
inv = 1.0 / self._len
self._var0 = self._var0 + inv * (var2 - self._var0)
self._var1 = self._var1 + inv * (abs(var2) - self._var1)
self._prev_price = price
return self._rsi_value()
def _rsi_value(self) -> float:
assert self._var0 is not None and self._var1 is not None
if self._var1 != 0.0:
var4 = self._var0 / self._var1
else:
var4 = 0.0
return 50.0 * (var4 + 1.0)
@staticmethod
def calculate_series(prices: list[float], length: Optional[int] = None) -> list[Optional[float]]:
"""Historical RSI aligned with streaming `update` (no look-ahead)."""
ind = RSIIndicator(length=length)
return [ind.update(float(p)) for p in prices]
class _WilderXAverage:
"""XAverage: EMA with multiplier 1/length; first value = SMA of first `length` inputs."""
def __init__(self, length: int) -> None:
self._len = int(length)
if self._len < 1:
raise ValueError("XAverage length must be >= 1")
self._buf: list[float] = []
self._ema: Optional[float] = None
def update(self, x: float) -> Optional[float]:
if self._ema is None:
self._buf.append(x)
if len(self._buf) < self._len:
return None
self._ema = sum(self._buf) / self._len
self._buf.clear()
return self._ema
self._ema = self._ema + (1.0 / self._len) * (x - self._ema)
return self._ema
class TRIXIndicator:
"""TRIX = (var0 - var0[1]) * 10000 with triple XAverage(Log(price), Len)."""
def __init__(self, length: Optional[int] = None) -> None:
self._len = int(length if length is not None else _default_trix_len())
if self._len < 1:
raise ValueError("TRIX length must be >= 1")
self._e1 = _WilderXAverage(self._len)
self._e2 = _WilderXAverage(self._len)
self._e3 = _WilderXAverage(self._len)
self._prev_var0: Optional[float] = None
def update(self, price: float) -> Optional[float]:
"""Return TRIX when CurrentBar > 1 for the triple-smoothed series; else None."""
if price <= 0.0:
raise ValueError("TRIX requires positive price for Log(PriceValue)")
logp = math.log(price)
v1 = self._e1.update(logp)
if v1 is None:
return None
v2 = self._e2.update(v1)
if v2 is None:
return None
v3 = self._e3.update(v2)
if v3 is None:
return None
if self._prev_var0 is None:
self._prev_var0 = v3
return None
trix = (v3 - self._prev_var0) * 10000.0
self._prev_var0 = v3
return trix
@staticmethod
def calculate_series(prices: list[float], length: Optional[int] = None) -> list[Optional[float]]:
ind = TRIXIndicator(length=length)
return [ind.update(float(p)) for p in prices]
class CrossDetector:
"""Level cross helpers for signal logic."""
@staticmethod
def crossed_above(prev: float, curr: float, level: float) -> bool:
"""True when the series moves from at or below `level` to strictly above."""
return prev <= level < curr
@staticmethod
def crossed_below(prev: float, curr: float, level: float) -> bool:
"""True when the series moves from at or above `level` to strictly below."""
return prev >= level > curr