1212https://en.wikipedia.org/wiki/Autoregressive_integrated_moving_average
1313"""
1414
15+
1516class ARIMA :
1617 def __init__ (self , p = 1 , d = 1 , q = 1 , lr = 0.001 , epochs = 500 ) -> None :
1718 """
@@ -93,8 +94,11 @@ def inverse_difference(
9394 return forecast
9495
9596 def _compute_residuals (
96- self , diff_data : NDArray [np .float64 ], phi : NDArray [np .float64 ],
97- theta : NDArray [np .float64 ], c : float
97+ self ,
98+ diff_data : NDArray [np .float64 ],
99+ phi : NDArray [np .float64 ],
100+ theta : NDArray [np .float64 ],
101+ c : float ,
98102 ) -> tuple [NDArray [np .float64 ], NDArray [np .float64 ]]:
99103 """
100104 Computes residuals for given parameters.
@@ -115,12 +119,12 @@ def _compute_residuals(
115119 for t in range (start , n ):
116120 # AR part: a weighted sum of the last 'p' actual values.
117121 # We reverse the data slice [::-1] so that the most recent value is first.
118- ar_term = (np .dot (phi , diff_data [t - self .p :t ][::- 1 ])
119- if self .p > 0 else 0.0 )
122+ ar_term = (
123+ np .dot (phi , diff_data [t - self .p : t ][::- 1 ]) if self .p > 0 else 0.0
124+ )
120125
121126 # MA part: a weighted sum of the last 'q' prediction errors.
122- ma_term = (np .dot (theta , errors [t - self .q :t ][::- 1 ])
123- if self .q > 0 else 0.0 )
127+ ma_term = np .dot (theta , errors [t - self .q : t ][::- 1 ]) if self .q > 0 else 0.0
124128
125129 # Combine everything to make the one-step-ahead prediction.
126130 preds [t ] = c + ar_term + ma_term
@@ -131,7 +135,7 @@ def _compute_residuals(
131135
132136 def fit (
133137 self , data : list [float ] | NDArray [np .float64 ], method : str = "opt"
134- ) -> ' ARIMA' :
138+ ) -> " ARIMA" :
135139 """
136140 Trains the ARIMA model.
137141
@@ -190,10 +194,16 @@ def _fit_gradient_descent(self, diff_data: NDArray[np.float64], start: int) -> N
190194 for epoch in range (self .epochs ):
191195 # First, calculate the predictions and errors with the current parameters.
192196 for t in range (start , n ):
193- ar_term = (np .dot (self .phi , diff_data [t - self .p :t ][::- 1 ])
194- if self .p > 0 else 0.0 )
195- ma_term = (np .dot (self .theta , errors [t - self .q :t ][::- 1 ])
196- if self .q > 0 else 0.0 )
197+ ar_term = (
198+ np .dot (self .phi , diff_data [t - self .p : t ][::- 1 ])
199+ if self .p > 0
200+ else 0.0
201+ )
202+ ma_term = (
203+ np .dot (self .theta , errors [t - self .q : t ][::- 1 ])
204+ if self .q > 0
205+ else 0.0
206+ )
197207 preds [t ] = self .c + ar_term + ma_term
198208 errors [t ] = diff_data [t ] - preds [t ]
199209
@@ -231,7 +241,7 @@ def _fit_gradient_descent(self, diff_data: NDArray[np.float64], start: int) -> N
231241 self .n_train = n # Ensure n_train is assigned as an integer
232242 sigma_term = np .sum (self .errors [start :] ** 2 ) / m
233243 self .sigma_err = float (np .sqrt (sigma_term ))
234- msg = f"Fitted params (GD): phi={ self .phi } ,theta={ self .theta } ,c={ self .c :.6f} \n "
244+ msg = f"Fitted params (GD): phi={ self .phi } ,theta={ self .theta } ,c={ self .c :.6f} \n "
235245 print (msg )
236246
237247 def _fit_optimization (self , diff_data : NDArray [np .float64 ], start : int ) -> None :
@@ -249,8 +259,8 @@ def _fit_optimization(self, diff_data: NDArray[np.float64], start: int) -> None:
249259 # Must find 'params' that make output of this function as small as possible.
250260 def sse_objective (params ):
251261 # Unpack the parameters from the single array the optimizer uses.
252- phi = params [:self .p ] if self .p > 0 else np .array ([])
253- theta = params [self .p : self .p + self .q ] if self .q > 0 else np .array ([])
262+ phi = params [: self .p ] if self .p > 0 else np .array ([])
263+ theta = params [self .p : self .p + self .q ] if self .q > 0 else np .array ([])
254264 c = params [- 1 ]
255265
256266 # Calculate the errors for these parameters.
@@ -265,18 +275,22 @@ def sse_objective(params):
265275 result = minimize (
266276 sse_objective ,
267277 init_params ,
268- method = ' L-BFGS-B' ,
269- options = {"maxiter" : 5000 , "ftol" : 1e-9 }
278+ method = " L-BFGS-B" ,
279+ options = {"maxiter" : 5000 , "ftol" : 1e-9 },
270280 )
271281
272282 # Once it's done, unpack the best parameters it found.
273283 best_params = result .x
274- self .phi = best_params [:self .p ] if self .p > 0 else np .array ([])
275- self .theta = best_params [self .p :self .p + self .q ] if self .q > 0 else np .array ([])
284+ self .phi = best_params [: self .p ] if self .p > 0 else np .array ([])
285+ self .theta = (
286+ best_params [self .p : self .p + self .q ] if self .q > 0 else np .array ([])
287+ )
276288 self .c = float (best_params [- 1 ])
277289
278290 # Recalculate final errors with these optimal parameters and store everything.
279- _ ,final_errors = self ._compute_residuals (diff_data ,self .phi ,self .theta ,self .c )
291+ _ , final_errors = self ._compute_residuals (
292+ diff_data , self .phi , self .theta , self .c
293+ )
280294 self .errors = final_errors # Ensure errors is assigned correctly
281295 self .diff_data = diff_data # Ensure diff_data is assigned correctly
282296 self .n_train = n # Ensure n_train is assigned as an integer
@@ -285,11 +299,7 @@ def sse_objective(params):
285299 self .sigma_err = float (np .sqrt (sigma_term / denom ))
286300
287301 # Format and print results
288- params = {
289- 'phi' : self .phi ,
290- 'theta' : self .theta ,
291- 'c' : f"{ self .c :.6f} "
292- }
302+ params = {"phi" : self .phi , "theta" : self .theta , "c" : f"{ self .c :.6f} " }
293303 msg = "Fitted params (Opt): phi={phi}, theta={theta}, c={c}\n "
294304 print (msg .format (** params ))
295305
@@ -325,12 +335,10 @@ def forecast(
325335 for _ in range (steps ):
326336 # AR part: Use the last 'p' values (from previous y-values).
327337 ar_slice = slice (- self .p , None )
328- ar_term = (np .dot (self .phi , diff_data [ar_slice ][::- 1 ])
329- if self .p > 0 else 0.0 )
338+ ar_term = np .dot (self .phi , diff_data [ar_slice ][::- 1 ]) if self .p > 0 else 0.0
330339 # MA part:Use the last 'q' errors(from prediction errors).
331340 ma_slice = slice (- self .q , None )
332- ma_term = (np .dot (self .theta , errors [ma_slice ][::- 1 ])
333- if self .q > 0 else 0.0 )
341+ ma_term = np .dot (self .theta , errors [ma_slice ][::- 1 ]) if self .q > 0 else 0.0
334342
335343 # The next predicted value (on the differenced scale).
336344 next_diff_forecast = self .c + ar_term + ma_term
@@ -385,7 +393,7 @@ def forecast(
385393 forecast = model .forecast (forecast_steps , simulate_errors = True )
386394
387395 # Compare forecast to the actual data.
388- true_future = data [train_end : train_end + forecast_steps ]
396+ true_future = data [train_end : train_end + forecast_steps ]
389397 rmse = np .sqrt (np .mean ((np .array (forecast ) - true_future ) ** 2 ))
390398 print (f"Forecast RMSE: { rmse :.6f} \n " )
391399
@@ -405,7 +413,7 @@ def forecast(
405413 color = "gray" ,
406414 linestyle = ":" ,
407415 alpha = 0.7 ,
408- label = f"Train/Test { split_label } "
416+ label = f"Train/Test { split_label } " ,
409417 )
410418 plt .xlabel ("Time Step" )
411419 plt .ylabel ("Value" )
0 commit comments