1+ from __future__ import annotations
12from dataclasses import dataclass
2- from typing import Dict , Any
3+ from math import sqrt
4+ from statistics import NormalDist
5+ from typing import Dict , Any , Tuple
36import numpy as np
47import pandas as pd
58
@@ -9,95 +12,258 @@ class MarketRiskConfig:
912 alpha : float = 0.99
1013 window : int = 250
1114 method : str = "historical" # historical | parametric | monte_carlo | fhs
15+ horizon_days : int = 1
16+ exposure : float = 1.0
17+
18+
19+ # Convenience for Normal
20+ _N = NormalDist ()
21+
22+
23+ def _z (alpha : float ) -> float :
24+ return _N .inv_cdf (alpha )
25+
26+
27+ def _phi (x : float ) -> float :
28+ return _N .pdf (x )
29+
30+
31+ def portfolio_returns (returns : pd .DataFrame , weights : np .ndarray ) -> pd .Series :
32+ return pd .Series (returns .values @ weights , index = returns .index , name = "r_p" )
33+
34+
35+ def var_parametric (
36+ returns : pd .DataFrame ,
37+ weights : np .ndarray ,
38+ alpha : float = 0.95 ,
39+ horizon_days : int = 1 ,
40+ exposure : float = 1.0 ,
41+ ) -> float :
42+ mu = returns .mean ().values
43+ cov = returns .cov ().values
44+ mu_p = float (weights @ mu ) * horizon_days
45+ sigma_p = float (np .sqrt (weights @ cov @ weights )) * sqrt (horizon_days )
46+ z = _z (alpha )
47+ loss_var = (- mu_p + z * sigma_p ) * exposure
48+ return float (loss_var )
49+
50+
51+ def es_parametric (
52+ returns : pd .DataFrame ,
53+ weights : np .ndarray ,
54+ alpha : float = 0.95 ,
55+ horizon_days : int = 1 ,
56+ exposure : float = 1.0 ,
57+ ) -> float :
58+ mu = returns .mean ().values
59+ cov = returns .cov ().values
60+ mu_p = float (weights @ mu ) * horizon_days
61+ sigma_p = float (np .sqrt (weights @ cov @ weights )) * sqrt (horizon_days )
62+ z = _z (alpha )
63+ es = (- mu_p + sigma_p * (_phi (z ) / (1 - alpha ))) * exposure
64+ return float (es )
65+
66+
67+ def var_historical (
68+ returns : pd .DataFrame ,
69+ weights : np .ndarray ,
70+ alpha : float = 0.95 ,
71+ horizon_days : int = 1 ,
72+ exposure : float = 1.0 ,
73+ sqrt_time : bool = True ,
74+ ) -> float :
75+ r_p = portfolio_returns (returns , weights )
76+ if sqrt_time and horizon_days > 1 :
77+ r_p = r_p * sqrt (horizon_days )
78+ q = r_p .quantile (1 - alpha )
79+ return float (- q * exposure )
80+
81+
82+ def es_historical (
83+ returns : pd .DataFrame ,
84+ weights : np .ndarray ,
85+ alpha : float = 0.95 ,
86+ horizon_days : int = 1 ,
87+ exposure : float = 1.0 ,
88+ sqrt_time : bool = True ,
89+ ) -> float :
90+ r_p = portfolio_returns (returns , weights )
91+ if sqrt_time and horizon_days > 1 :
92+ r_p = r_p * sqrt (horizon_days )
93+ var_loss = - r_p .quantile (1 - alpha )
94+ tail = r_p [r_p <= - var_loss ]
95+ es = - tail .mean () if len (tail ) else var_loss
96+ return float (es * exposure )
97+
98+
99+ def _cov_shrink (cov : np .ndarray , lam : float = 0.01 ) -> np .ndarray :
100+ d = np .diag (np .diag (cov ))
101+ return (1 - lam ) * cov + lam * d
102+
103+
104+ def var_es_monte_carlo (
105+ returns : pd .DataFrame ,
106+ weights : np .ndarray ,
107+ alpha : float = 0.95 ,
108+ horizon_days : int = 1 ,
109+ exposure : float = 1.0 ,
110+ n_sims : int = 100_000 ,
111+ seed : int | None = 42 ,
112+ shrink_lambda : float = 0.01 ,
113+ ) -> Tuple [float , float ]:
114+ rng = np .random .default_rng (seed )
115+ mu = returns .mean ().values * horizon_days
116+ cov = returns .cov ().values * horizon_days
117+ cov = _cov_shrink (cov , lam = shrink_lambda )
118+
119+ sims = rng .multivariate_normal (mu , cov , size = int (n_sims ), method = "cholesky" )
120+ port = sims @ weights
121+
122+ q = np .quantile (port , 1 - alpha )
123+ var_loss = float (- q * exposure )
124+
125+ tail = port [port <= q ]
126+ es_loss = float (- tail .mean () * exposure ) if tail .size else var_loss
127+ return var_loss , es_loss
128+
129+
130+ def garch11_filter (
131+ r : pd .Series ,
132+ alpha_g : float = 0.05 ,
133+ beta_g : float = 0.94 ,
134+ long_run_var : float | None = None ,
135+ init_var : float | None = None ,
136+ ) -> pd .Series :
137+ r = r .dropna ()
138+ if long_run_var is None :
139+ long_run_var = float (r .var (ddof = 1 )) if len (r ) > 1 else float ((r ** 2 ).mean ())
140+ if init_var is None :
141+ init_var = long_run_var
142+ omega = max (1e-18 , (1.0 - alpha_g - beta_g ) * long_run_var )
143+
144+ sig2 = np .empty (len (r ), dtype = float )
145+ sig2 [0 ] = max (1e-18 , init_var )
146+ r2 = r .values ** 2
147+ for t in range (1 , len (r )):
148+ sig2 [t ] = omega + alpha_g * r2 [t - 1 ] + beta_g * sig2 [t - 1 ]
149+ sigma = np .sqrt (sig2 )
150+ return pd .Series (sigma , index = r .index , name = "sigma_garch" )
151+
152+
153+ def garch11_forecast_sigma_next (
154+ r_last : float ,
155+ sigma_last : float ,
156+ alpha_g : float ,
157+ beta_g : float ,
158+ long_run_var : float ,
159+ ) -> float :
160+ omega = max (1e-18 , (1.0 - alpha_g - beta_g ) * long_run_var )
161+ sig2_next = omega + alpha_g * (r_last ** 2 ) + beta_g * (sigma_last ** 2 )
162+ return float (np .sqrt (max (sig2_next , 1e-18 )))
163+
164+
165+ def fhs_var_es_next (
166+ returns : pd .DataFrame ,
167+ weights : np .ndarray ,
168+ alpha : float = 0.99 ,
169+ exposure : float = 1.0 ,
170+ alpha_g : float = 0.05 ,
171+ beta_g : float = 0.94 ,
172+ ) -> Tuple [float , float ]:
173+ r_p = pd .Series (returns .values @ weights , index = returns .index , name = "r_p" ).dropna ()
174+ if len (r_p ) < 50 :
175+ q = r_p .quantile (1 - alpha )
176+ var_loss = float (- q * exposure )
177+ tail = r_p [r_p <= q ]
178+ es_loss = float (- tail .mean () * exposure ) if len (tail ) else var_loss
179+ return var_loss , es_loss
180+
181+ long_var = float (r_p .var (ddof = 1 ))
182+ sigma = garch11_filter (r_p , alpha_g = alpha_g , beta_g = beta_g , long_run_var = long_var )
183+ sigma_safe = sigma .replace (0.0 , np .nan ).bfill ().ffill ()
184+ z = (r_p / sigma_safe ).dropna ()
185+
186+ qz = z .quantile (1 - alpha )
187+ tail_z = z [z <= qz ]
188+ z_es = tail_z .mean () if len (tail_z ) else qz
189+
190+ sigma_next = garch11_forecast_sigma_next (
191+ float (r_p .iloc [- 1 ]), float (sigma .iloc [- 1 ]), alpha_g , beta_g , long_run_var = long_var
192+ )
193+
194+ var_loss = float (- qz * sigma_next * exposure )
195+ es_loss = float (- z_es * sigma_next * exposure )
196+ return var_loss , es_loss
12197
13198
14199class MarketRiskModel :
15200 """
16- Market Risk Model for VaR / ES estimation.
17-
18- Loss convention:
19- - Losses are positive
20- - VaR and ES are reported as positive numbers
201+ Validation-ready Market Risk engine.
202+ Outputs are positive loss numbers: VaR >= 0, ES >= VaR.
21203 """
22204
23- def __init__ (self , returns : pd .Series , config : MarketRiskConfig ):
205+ def __init__ (self , returns : pd .DataFrame , weights : np . ndarray , config : MarketRiskConfig ):
24206 self .returns = returns .dropna ()
207+ self .weights = np .asarray (weights , dtype = float )
25208 self .config = config
26-
27- # Enforce loss convention
28- self .losses = - self .returns
29-
30209 self .var_ = None
31210 self .es_ = None
32211
33- # ---------- public API ----------
34-
35212 def fit (self ) -> None :
36- if self .config .method == "historical" :
37- self ._fit_historical ()
38- elif self .config .method == "parametric" :
39- self ._fit_parametric ()
40- elif self .config .method == "monte_carlo" :
41- self ._fit_monte_carlo ()
42- elif self .config .method == "fhs" :
43- self ._fit_filtered_historical ()
213+ a = self .config .alpha
214+ h = self .config .horizon_days
215+ e = self .config .exposure
216+ m = self .config .method
217+
218+ if m == "historical" :
219+ self .var_ = var_historical (self .returns , self .weights , alpha = a , horizon_days = h , exposure = e )
220+ self .es_ = es_historical (self .returns , self .weights , alpha = a , horizon_days = h , exposure = e )
221+ elif m == "parametric" :
222+ self .var_ = var_parametric (self .returns , self .weights , alpha = a , horizon_days = h , exposure = e )
223+ self .es_ = es_parametric (self .returns , self .weights , alpha = a , horizon_days = h , exposure = e )
224+ elif m == "monte_carlo" :
225+ self .var_ , self .es_ = var_es_monte_carlo (self .returns , self .weights , alpha = a , horizon_days = h , exposure = e )
226+ elif m == "fhs" :
227+ self .var_ , self .es_ = fhs_var_es_next (self .returns , self .weights , alpha = a , exposure = e )
44228 else :
45- raise ValueError (f"Unknown method: { self .config .method } " )
229+ raise ValueError (f"Unknown method: { m } " )
230+
231+ # Enforce loss convention invariants (model risk sanity)
232+ self .var_ = float (self .var_ )
233+ self .es_ = float (self .es_ )
234+ if self .var_ < 0 :
235+ raise ValueError ("VaR must be non-negative under loss convention." )
236+ if self .es_ < self .var_ :
237+ raise ValueError ("ES must be >= VaR under loss convention." )
46238
47239 def compute_var (self ) -> float :
48240 if self .var_ is None :
49- raise RuntimeError ("Model must be fitted before computing VaR " )
241+ raise RuntimeError ("fit() must be called before compute_var(). " )
50242 return self .var_
51243
52244 def compute_es (self ) -> float :
53245 if self .es_ is None :
54- raise RuntimeError ("Model must be fitted before computing ES " )
246+ raise RuntimeError ("fit() must be called before compute_es(). " )
55247 return self .es_
56248
249+ def assumptions (self ) -> list :
250+ base = ["iid returns" , "stationarity within rolling window" ]
251+ if self .config .method == "parametric" :
252+ base .append ("normality" )
253+ if self .config .method in ("monte_carlo" ,):
254+ base .append ("multivariate normal joint distribution" )
255+ if self .config .method in ("fhs" ,):
256+ base .append ("fixed-parameter GARCH(1,1) volatility filter" )
257+ return base
258+
57259 def summary (self ) -> Dict [str , Any ]:
58260 return {
59261 "VaR" : self .var_ ,
60262 "ES" : self .es_ ,
61263 "alpha" : self .config .alpha ,
62264 "window" : self .config .window ,
63265 "method" : self .config .method ,
266+ "horizon_days" : self .config .horizon_days ,
267+ "exposure" : self .config .exposure ,
64268 "assumptions" : self .assumptions (),
65269 }
66-
67- def assumptions (self ) -> list :
68- assumptions = ["iid returns" , "stationarity within rolling window" ]
69- if self .config .method == "parametric" :
70- assumptions .append ("normality" )
71- return assumptions
72-
73- # ---------- model implementations ----------
74-
75- def _fit_historical (self ):
76- window_losses = self .losses [- self .config .window :]
77- self .var_ = np .quantile (window_losses , self .config .alpha )
78- self .es_ = window_losses [window_losses >= self .var_ ].mean ()
79-
80- def _fit_parametric (self ):
81- from scipy .stats import norm
82-
83- mu = self .losses .mean ()
84- sigma = self .losses .std (ddof = 1 )
85-
86- z = norm .ppf (self .config .alpha )
87- self .var_ = mu + z * sigma
88- self .es_ = mu + sigma * norm .pdf (z ) / (1 - self .config .alpha )
89-
90- def _fit_monte_carlo (self ):
91- simulated = np .random .normal (
92- self .losses .mean (),
93- self .losses .std (ddof = 1 ),
94- size = 100_000 ,
95- )
96- self .var_ = np .quantile (simulated , self .config .alpha )
97- self .es_ = simulated [simulated >= self .var_ ].mean ()
98-
99- def _fit_filtered_historical (self ):
100- # Placeholder: plug in existing GARCH-lite logic here
101- filtered_losses = self .losses
102- self .var_ = np .quantile (filtered_losses , self .config .alpha )
103- self .es_ = filtered_losses [filtered_losses >= self .var_ ].mean ()
0 commit comments