From 75ea5c7ffd8509f677a6e7d024dd73f01d8d575b Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Fri, 26 Sep 2025 14:34:30 +0200 Subject: [PATCH 1/3] Update motion field with a general motion field, so it is updated every timestep, or allow multiple timesteps of motion field as input with shape (t,2,m,n) --- pysteps/nowcasts/utils.py | 142 ++++++++++++++++++++++---------------- 1 file changed, 82 insertions(+), 60 deletions(-) diff --git a/pysteps/nowcasts/utils.py b/pysteps/nowcasts/utils.py index 8ddd3da0f..64ebfc3a7 100644 --- a/pysteps/nowcasts/utils.py +++ b/pysteps/nowcasts/utils.py @@ -270,6 +270,7 @@ def nowcast_main_loop( extrap_method, func, extrap_kwargs=None, + motion_field_general=None, velocity_pert_gen=None, params=None, ensemble=False, @@ -279,69 +280,61 @@ def nowcast_main_loop( num_workers=1, measure_time=False, ): - """Utility method for advection-based nowcast models that are applied in - the Lagrangian coordinates. In addition, this method allows the case, where - one or more components of the model (e.g. an autoregressive process) require - using regular integer time steps but the user-supplied values are irregular - or non-integer. - + """ + Utility method for advection-based nowcast models operating in Lagrangian coordinates. + + This method supports models that evolve on regular integer time steps (e.g., autoregressive + processes), while allowing forecast times to be irregular or non-integer. It handles temporal + interpolation, ensemble forecasting, velocity perturbations, and optional motion field updates. + Forecasts are extrapolated back to Eulerian coordinates, with support for parallel execution + across ensemble members. + Parameters ---------- precip : array_like - Array of shape (m,n) containing the most recently observed precipitation - field. + Array of shape (m, n) representing the most recently observed precipitation field. velocity : array_like - Array of shape (2,m,n) containing the x- and y-components of the - advection field. + Array of shape (2, m, n) or (t, 2, m, n) containing the x- and y-components of the advection field. state : object - The initial state of the nowcast model. - timesteps : int or list of floats - Number of time steps to forecast or a list of time steps for which the - forecasts are computed. The elements of the list are required to be in - ascending order. - extrap_method : str, optional - Name of the extrapolation method to use. See the documentation of - :py:mod:`pysteps.extrapolation.interface`. - ensemble : bool - Set to True to produce a nowcast ensemble. - num_ensemble_members : int - Number of ensemble members. Applicable if ensemble is set to True. - func : function - A function that takes the current state of the nowcast model and its - parameters and returns a forecast field and the new state. The shape of - the forecast field is expected to be (m,n) for a deterministic nowcast - and (n_ens_members,m,n) for an ensemble. + Initial state of the nowcast model. + timesteps : int or list of float + Number of time steps to forecast, or a list of specific forecast times (must be ascending). + extrap_method : str + Name of the extrapolation method to use. See `pysteps.extrapolation.interface`. + func : callable + Function that takes the current state and parameters, returning a forecast field and updated state. + The forecast field should be of shape (m, n) for deterministic mode or (n_ens_members, m, n) for ensemble mode. extrap_kwargs : dict, optional - Optional dictionary containing keyword arguments for the extrapolation - method. See the documentation of pysteps.extrapolation. - velocity_pert_gen : list, optional - Optional list of functions that generate velocity perturbations. The - length of the list is expected to be n_ens_members. The functions - are expected to take lead time (relative to timestep index) as input - argument and return a perturbation field of shape (2,m,n). + Additional keyword arguments for the extrapolation method. + motion_field_general : array_like, optional + General motion field used to update the input velocity dynamically. If provided, velocity is updated + once per timestep using `update_motion_field`. + velocity_pert_gen : list of callable, optional + List of functions that generate velocity perturbations. Each function takes lead time as input and + returns a perturbation field of shape (2, m, n). Required if `ensemble=True`. params : dict, optional - Optional dictionary containing keyword arguments for func. - callback : function, optional - Optional function that is called after computation of each time step of - the nowcast. The function takes one argument: the nowcast array. This - can be used, for instance, writing output files. + Dictionary of keyword arguments passed to `func`. + ensemble : bool, optional + If True, enables ensemble forecasting. + num_ensemble_members : int, optional + Number of ensemble members. Used only if `ensemble=True`. + callback : callable, optional + Function called after each nowcast timestep. Receives the nowcast array as input. return_output : bool, optional - Set to False to disable returning the output forecast fields and return - None instead. This can save memory if the intermediate results are - instead written to files using the callback function. + If False, disables returning forecast fields. Useful when output is handled via `callback`. num_workers : int, optional - Number of parallel workers to use. Applicable if a nowcast ensemble is - generated. + Number of parallel workers used for ensemble computation. measure_time : bool, optional - If set to True, measure, print and return the computation time. - + If True, measures and returns total computation time. + Returns ------- - out : list - List of forecast fields for the given time steps. If measure_time is - True, return a pair, where the second element is the total computation - time in the loop. + out : list or tuple + List of forecast fields for the requested time steps. If `measure_time=True`, returns a tuple + `(forecast_list, computation_time)`. """ + + precip_forecast_out = None timesteps, original_timesteps, timestep_type = create_timestep_range(timesteps) @@ -354,7 +347,10 @@ def nowcast_main_loop( displacement = None t_prev = 0.0 t_total = 0.0 - + + # check if velocity input is constant (2, m, n) or time varying (t, 2, m, n) + constant_velocity = velocity.ndim == 3 + # initialize the extrapolator extrapolator = extrapolation.get_method(extrap_method) @@ -405,7 +401,20 @@ def nowcast_main_loop( # call the function to iterate the integer-timestep part of the model # for one time step precip_forecast_new, state_new = func(state_cur, params) + + if constant_velocity: + if motion_field_general is None: + velocity_updated = velocity + else: + velocity_updated = update_motion_field( + velocity, + motion_field_general, + extrapolator + ) + else: + velocity_updated = velocity[t] + if not ensemble: precip_forecast_new = precip_forecast_new[np.newaxis, :] @@ -437,18 +446,17 @@ def nowcast_main_loop( precip_forecast_out_cur = [ None for _ in range(precip_forecast_ip.shape[0]) ] - + def worker1(i): extrap_kwargs_ = extrap_kwargs.copy() extrap_kwargs_["displacement_prev"] = displacement[i] extrap_kwargs_["allow_nonfinite_values"] = ( True if np.any(~np.isfinite(precip_forecast_ip[i])) else False ) - + + velocity_ = velocity_updated if velocity_pert_gen is not None: - velocity_ = velocity + velocity_pert_gen[i](t_total) - else: - velocity_ = velocity + velocity_ += velocity_pert_gen[i](t_total) precip_forecast_ep, displacement[i] = extrapolator( precip_forecast_ip[i], @@ -489,11 +497,10 @@ def worker1(i): def worker2(i): extrap_kwargs_ = extrap_kwargs.copy() extrap_kwargs_["displacement_prev"] = displacement[i] - + + velocity_ = velocity_updated if velocity_pert_gen is not None: - velocity_ = velocity + velocity_pert_gen[i](t_total) - else: - velocity_ = velocity + velocity_ += velocity_pert_gen[i](t_total) _, displacement[i] = extrapolator( None, @@ -533,6 +540,21 @@ def worker2(i): return precip_forecast_out +def update_motion_field(motion_field, motion_field_general, extrapolator): + u0, v0 = motion_field[0], motion_field[1] + ug, vg = motion_field_general[0], motion_field_general[1] + + # Extrapolate each component and extract the 2D array + u1 = extrapolator(u0, motion_field_general, timesteps=1)[0] + v1 = extrapolator(v0, motion_field_general, timesteps=1)[0] + + u1[np.isnan(u1)] = ug[np.isnan(u1)] + v1[np.isnan(v1)] = vg[np.isnan(v1)] + + # Stack into shape (2, 1000, 1000) + return np.stack([u1, v1]) + + def print_ar_params(phi): """ Print the parameters of an AR(p) model. From c938d1d5ebeee7bffd65f8ccf391909e1508761e Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Fri, 26 Sep 2025 14:40:47 +0200 Subject: [PATCH 2/3] Updated descriptions for update_motion_field and nowcast_main_loop --- pysteps/nowcasts/utils.py | 122 +++++++++++++++++++++++++------------- 1 file changed, 80 insertions(+), 42 deletions(-) diff --git a/pysteps/nowcasts/utils.py b/pysteps/nowcasts/utils.py index 64ebfc3a7..25241eeab 100644 --- a/pysteps/nowcasts/utils.py +++ b/pysteps/nowcasts/utils.py @@ -280,61 +280,74 @@ def nowcast_main_loop( num_workers=1, measure_time=False, ): - """ - Utility method for advection-based nowcast models operating in Lagrangian coordinates. - - This method supports models that evolve on regular integer time steps (e.g., autoregressive - processes), while allowing forecast times to be irregular or non-integer. It handles temporal - interpolation, ensemble forecasting, velocity perturbations, and optional motion field updates. - Forecasts are extrapolated back to Eulerian coordinates, with support for parallel execution - across ensemble members. - + """Utility method for advection-based nowcast models that are applied in + the Lagrangian coordinates. In addition, this method allows the case, where + one or more components of the model (e.g. an autoregressive process) require + using regular integer time steps but the user-supplied values are irregular + or non-integer. + Parameters ---------- precip : array_like - Array of shape (m, n) representing the most recently observed precipitation field. + Array of shape (m,n) containing the most recently observed precipitation + field. velocity : array_like - Array of shape (2, m, n) or (t, 2, m, n) containing the x- and y-components of the advection field. + Array of shape (2,m,n) or (t,2,m,n) containing the x- and y-components of the + advection field. If time-varying, the first dimension must match the number + of forecast timesteps. state : object - Initial state of the nowcast model. - timesteps : int or list of float - Number of time steps to forecast, or a list of specific forecast times (must be ascending). - extrap_method : str - Name of the extrapolation method to use. See `pysteps.extrapolation.interface`. - func : callable - Function that takes the current state and parameters, returning a forecast field and updated state. - The forecast field should be of shape (m, n) for deterministic mode or (n_ens_members, m, n) for ensemble mode. + The initial state of the nowcast model. + timesteps : int or list of floats + Number of time steps to forecast or a list of time steps for which the + forecasts are computed. The elements of the list are required to be in + ascending order. + extrap_method : str, optional + Name of the extrapolation method to use. See the documentation of + :py:mod:`pysteps.extrapolation.interface`. + ensemble : bool + Set to True to produce a nowcast ensemble. + num_ensemble_members : int + Number of ensemble members. Applicable if ensemble is set to True. + func : function + A function that takes the current state of the nowcast model and its + parameters and returns a forecast field and the new state. The shape of + the forecast field is expected to be (m,n) for a deterministic nowcast + and (n_ens_members,m,n) for an ensemble. extrap_kwargs : dict, optional - Additional keyword arguments for the extrapolation method. + Optional dictionary containing keyword arguments for the extrapolation + method. See the documentation of pysteps.extrapolation. motion_field_general : array_like, optional - General motion field used to update the input velocity dynamically. If provided, velocity is updated - once per timestep using `update_motion_field`. - velocity_pert_gen : list of callable, optional - List of functions that generate velocity perturbations. Each function takes lead time as input and - returns a perturbation field of shape (2, m, n). Required if `ensemble=True`. + Optional general motion field used to update the input velocity field + when it is constant. Ignored if velocity is time-varying. + velocity_pert_gen : list, optional + Optional list of functions that generate velocity perturbations. The + length of the list is expected to be n_ens_members. The functions + are expected to take lead time (relative to timestep index) as input + argument and return a perturbation field of shape (2,m,n). params : dict, optional - Dictionary of keyword arguments passed to `func`. - ensemble : bool, optional - If True, enables ensemble forecasting. - num_ensemble_members : int, optional - Number of ensemble members. Used only if `ensemble=True`. - callback : callable, optional - Function called after each nowcast timestep. Receives the nowcast array as input. + Optional dictionary containing keyword arguments for func. + callback : function, optional + Optional function that is called after computation of each time step of + the nowcast. The function takes one argument: the nowcast array. This + can be used, for instance, writing output files. return_output : bool, optional - If False, disables returning forecast fields. Useful when output is handled via `callback`. + Set to False to disable returning the output forecast fields and return + None instead. This can save memory if the intermediate results are + instead written to files using the callback function. num_workers : int, optional - Number of parallel workers used for ensemble computation. + Number of parallel workers to use. Applicable if a nowcast ensemble is + generated. measure_time : bool, optional - If True, measures and returns total computation time. - + If set to True, measure, print and return the computation time. + Returns ------- - out : list or tuple - List of forecast fields for the requested time steps. If `measure_time=True`, returns a tuple - `(forecast_list, computation_time)`. + out : list + List of forecast fields for the given time steps. If measure_time is + True, return a pair, where the second element is the total computation + time in the loop. """ - precip_forecast_out = None timesteps, original_timesteps, timestep_type = create_timestep_range(timesteps) @@ -541,6 +554,31 @@ def worker2(i): def update_motion_field(motion_field, motion_field_general, extrapolator): + """ + Update a constant motion field by advecting it forward using a general motion field. + + This function applies one timestep of extrapolation to each component of the input + motion field using the provided general motion field. It handles missing values + by replacing NaNs in the extrapolated output with corresponding values from the + general motion field. + + Parameters + ---------- + motion_field : array_like + Array of shape (2, m, n) containing the x- and y-components of the motion field + to be updated. + motion_field_general : array_like + Array of shape (2, m, n) used to advect the input motion field forward. + extrapolator : function + Extrapolation function that takes a 2D field, a motion field, and a timestep + argument, and returns a list of advected fields. + + Returns + ------- + updated_motion_field : ndarray + Array of shape (2, m, n) containing the updated motion field after one timestep + of advection. + """ u0, v0 = motion_field[0], motion_field[1] ug, vg = motion_field_general[0], motion_field_general[1] @@ -548,10 +586,10 @@ def update_motion_field(motion_field, motion_field_general, extrapolator): u1 = extrapolator(u0, motion_field_general, timesteps=1)[0] v1 = extrapolator(v0, motion_field_general, timesteps=1)[0] + # Fill missing values with general motion field u1[np.isnan(u1)] = ug[np.isnan(u1)] v1[np.isnan(v1)] = vg[np.isnan(v1)] - - # Stack into shape (2, 1000, 1000) + return np.stack([u1, v1]) From 1cd12f4f3013907e051f4a8300bb0f5711d23317 Mon Sep 17 00:00:00 2001 From: Daniel Eduardo Villarreal Jaime Date: Fri, 26 Sep 2025 14:58:09 +0200 Subject: [PATCH 3/3] Allow to estimate motion_field_general from mean of motion_field --- pysteps/nowcasts/utils.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pysteps/nowcasts/utils.py b/pysteps/nowcasts/utils.py index 25241eeab..12c8ac208 100644 --- a/pysteps/nowcasts/utils.py +++ b/pysteps/nowcasts/utils.py @@ -562,13 +562,17 @@ def update_motion_field(motion_field, motion_field_general, extrapolator): by replacing NaNs in the extrapolated output with corresponding values from the general motion field. + If `motion_field_general` is set to the string "mean", the mean of the input + motion field is used as the general motion field for extrapolation. + Parameters ---------- motion_field : array_like Array of shape (2, m, n) containing the x- and y-components of the motion field to be updated. - motion_field_general : array_like - Array of shape (2, m, n) used to advect the input motion field forward. + motion_field_general : array_like or str + Array of shape (2, m, n) used to advect the input motion field forward, + or the string "mean" to use the spatial mean of the input motion field. extrapolator : function Extrapolation function that takes a 2D field, a motion field, and a timestep argument, and returns a list of advected fields. @@ -580,7 +584,13 @@ def update_motion_field(motion_field, motion_field_general, extrapolator): of advection. """ u0, v0 = motion_field[0], motion_field[1] - ug, vg = motion_field_general[0], motion_field_general[1] + + if motion_field_general == "mean": + ug = np.full_like(u0, np.nanmean(u0)) + vg = np.full_like(v0, np.nanmean(v0)) + motion_field_general = np.stack([ug, vg]) + else: + ug, vg = motion_field_general[0], motion_field_general[1] # Extrapolate each component and extract the 2D array u1 = extrapolator(u0, motion_field_general, timesteps=1)[0] @@ -593,6 +603,7 @@ def update_motion_field(motion_field, motion_field_general, extrapolator): return np.stack([u1, v1]) + def print_ar_params(phi): """ Print the parameters of an AR(p) model.