Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 83 additions & 12 deletions pysteps/nowcasts/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ def nowcast_main_loop(
extrap_method,
func,
extrap_kwargs=None,
motion_field_general=None,
velocity_pert_gen=None,
params=None,
ensemble=False,
Expand All @@ -291,8 +292,9 @@ def nowcast_main_loop(
Array of shape (m,n) containing the most recently observed precipitation
field.
velocity : array_like
Array of shape (2,m,n) containing the x- and y-components of the
advection field.
Array of shape (2,m,n) or (t,2,m,n) containing the x- and y-components of the
advection field. If time-varying, the first dimension must match the number
of forecast timesteps.
state : object
The initial state of the nowcast model.
timesteps : int or list of floats
Expand All @@ -314,6 +316,9 @@ def nowcast_main_loop(
extrap_kwargs : dict, optional
Optional dictionary containing keyword arguments for the extrapolation
method. See the documentation of pysteps.extrapolation.
motion_field_general : array_like, optional
Optional general motion field used to update the input velocity field
when it is constant. Ignored if velocity is time-varying.
velocity_pert_gen : list, optional
Optional list of functions that generate velocity perturbations. The
length of the list is expected to be n_ens_members. The functions
Expand Down Expand Up @@ -342,6 +347,7 @@ def nowcast_main_loop(
True, return a pair, where the second element is the total computation
time in the loop.
"""

precip_forecast_out = None

timesteps, original_timesteps, timestep_type = create_timestep_range(timesteps)
Expand All @@ -354,7 +360,10 @@ def nowcast_main_loop(
displacement = None
t_prev = 0.0
t_total = 0.0


# check if velocity input is constant (2, m, n) or time varying (t, 2, m, n)
constant_velocity = velocity.ndim == 3

# initialize the extrapolator
extrapolator = extrapolation.get_method(extrap_method)

Expand Down Expand Up @@ -405,7 +414,20 @@ def nowcast_main_loop(
# call the function to iterate the integer-timestep part of the model
# for one time step
precip_forecast_new, state_new = func(state_cur, params)

if constant_velocity:
if motion_field_general is None:
velocity_updated = velocity
else:
velocity_updated = update_motion_field(
velocity,
motion_field_general,
extrapolator
)
else:
velocity_updated = velocity[t]


if not ensemble:
precip_forecast_new = precip_forecast_new[np.newaxis, :]

Expand Down Expand Up @@ -437,18 +459,17 @@ def nowcast_main_loop(
precip_forecast_out_cur = [
None for _ in range(precip_forecast_ip.shape[0])
]

def worker1(i):
extrap_kwargs_ = extrap_kwargs.copy()
extrap_kwargs_["displacement_prev"] = displacement[i]
extrap_kwargs_["allow_nonfinite_values"] = (
True if np.any(~np.isfinite(precip_forecast_ip[i])) else False
)


velocity_ = velocity_updated
if velocity_pert_gen is not None:
velocity_ = velocity + velocity_pert_gen[i](t_total)
else:
velocity_ = velocity
velocity_ += velocity_pert_gen[i](t_total)

precip_forecast_ep, displacement[i] = extrapolator(
precip_forecast_ip[i],
Expand Down Expand Up @@ -489,11 +510,10 @@ def worker1(i):
def worker2(i):
extrap_kwargs_ = extrap_kwargs.copy()
extrap_kwargs_["displacement_prev"] = displacement[i]


velocity_ = velocity_updated
if velocity_pert_gen is not None:
velocity_ = velocity + velocity_pert_gen[i](t_total)
else:
velocity_ = velocity
velocity_ += velocity_pert_gen[i](t_total)

_, displacement[i] = extrapolator(
None,
Expand Down Expand Up @@ -533,6 +553,57 @@ def worker2(i):
return precip_forecast_out


def update_motion_field(motion_field, motion_field_general, extrapolator):
"""
Update a constant motion field by advecting it forward using a general motion field.

This function applies one timestep of extrapolation to each component of the input
motion field using the provided general motion field. It handles missing values
by replacing NaNs in the extrapolated output with corresponding values from the
general motion field.

If `motion_field_general` is set to the string "mean", the mean of the input
motion field is used as the general motion field for extrapolation.

Parameters
----------
motion_field : array_like
Array of shape (2, m, n) containing the x- and y-components of the motion field
to be updated.
motion_field_general : array_like or str
Array of shape (2, m, n) used to advect the input motion field forward,
or the string "mean" to use the spatial mean of the input motion field.
extrapolator : function
Extrapolation function that takes a 2D field, a motion field, and a timestep
argument, and returns a list of advected fields.

Returns
-------
updated_motion_field : ndarray
Array of shape (2, m, n) containing the updated motion field after one timestep
of advection.
"""
u0, v0 = motion_field[0], motion_field[1]

if motion_field_general == "mean":
ug = np.full_like(u0, np.nanmean(u0))
vg = np.full_like(v0, np.nanmean(v0))
motion_field_general = np.stack([ug, vg])
else:
ug, vg = motion_field_general[0], motion_field_general[1]

# Extrapolate each component and extract the 2D array
u1 = extrapolator(u0, motion_field_general, timesteps=1)[0]
v1 = extrapolator(v0, motion_field_general, timesteps=1)[0]

# Fill missing values with general motion field
u1[np.isnan(u1)] = ug[np.isnan(u1)]
v1[np.isnan(v1)] = vg[np.isnan(v1)]

return np.stack([u1, v1])



def print_ar_params(phi):
"""
Print the parameters of an AR(p) model.
Expand Down