From 6cd6b53a6d915ebeb5e4812b11ff0b4b651d52da Mon Sep 17 00:00:00 2001 From: Rick Hegeman Date: Thu, 29 Nov 2018 15:54:00 +0100 Subject: [PATCH 1/3] Python enkf which can use observers and models from Java and Python --- py_openda/README_ENKF.md | 182 +++++++++ py_openda/examples/py_kalman/README.txt | 1 + py_openda/examples/py_kalman/enkf_swan.py | 102 +++++ .../examples/py_kalman/py_openda/__init__.py | 0 .../py_openda/algorithms/__init__.py | 0 .../py_openda/algorithms/ensemble_kalman.py | 99 +++++ .../costFunctions/CsvStochObserver.py | 84 +++++ .../GenericEnsembleKalmanFilter.py | 211 +++++++++++ .../py_openda/costFunctions/JObjects.py | 355 ++++++++++++++++++ .../costFunctions/LorenzStochModelFactory.py | 53 +++ .../costFunctions/LorenzStochModelInstance.py | 166 ++++++++ .../py_openda/costFunctions/__init__.py | 0 .../py_kalman/py_openda/utils/__init__.py | 0 .../py_kalman/py_openda/utils/py4j_utils.py | 206 ++++++++++ py_openda/examples/py_kalman/setup.py | 10 + 15 files changed, 1469 insertions(+) create mode 100644 py_openda/README_ENKF.md create mode 100644 py_openda/examples/py_kalman/README.txt create mode 100644 py_openda/examples/py_kalman/enkf_swan.py create mode 100644 py_openda/examples/py_kalman/py_openda/__init__.py create mode 100644 py_openda/examples/py_kalman/py_openda/algorithms/__init__.py create mode 100644 py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py create mode 100644 py_openda/examples/py_kalman/py_openda/costFunctions/__init__.py create mode 100644 py_openda/examples/py_kalman/py_openda/utils/__init__.py create mode 100644 py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py create mode 100644 py_openda/examples/py_kalman/setup.py diff --git a/py_openda/README_ENKF.md b/py_openda/README_ENKF.md new file mode 100644 index 000000000..815610c8d --- /dev/null +++ b/py_openda/README_ENKF.md @@ -0,0 +1,182 @@ +## Using the main program +The main program can be found at `enkf_swan.py`. This program can run an ensemble kalman filter simulation based on a `.oda` file along with its configuration `.xml` files. This is done by calling the function `main(input_string, observation_location=0)`, which uses the absolute file path given by `input_string` for configuration and plots the results at the location given by `observation_location`. The function also produces a tuple containing two numpy arrays with results; one from a simulation with ensemble kalman filtering and one without any data assimilation. The results are taken between the prediction step and the update step. + +## Inserting your own model or observer class +If you want to use a different model factory, stochastic observer or time class for the simulation, you have to change the relevant import statement in `GenericEnsembleKalmanFilter.py` to insert the class into the algorithm. The algorithm should have no problems with using a different class, assuming that your class has the same methods as the default one and assuming it is able to communicate well enough with the other classes that you are planning on using for the algorithm. The data type of the output of some of the methods that produce vectors are generic, meaning that as long they are either a Python list, a 1D numpy array, a java array or an OpenDA IVector, the `GenericEnsembleKalmanFilter` will convert the output to something usable. If your output uses a different data type, you must convert it yourself to one of those four options. This can either be done premptively at the end of your method, or you can include the data type in the `input_to_*` functions in `py4j_utils.py`. If your data type is a java class, be sure to use the IVector for an example on how to handle class recognition with py4j. For more information on the requirements of the methods, be sure to read the documentation of the default objects found in `JObjects.py`. + +## The Time object +TODO: Time Object kan echt 3 keer beter. + +## Using your own model factory/instance + +When implementing a model factory, you just need a class with the following two methods: +```python +def __init__(self, config, scriptdir): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + """ + +def get_instance(self, noise_config, main_or_ens): + """ + Create an instance of the stochastic Model. + + :param noise_config: dictionary as given by EnkfAlgorithm.xml for + the noise configuration. + :param main_or_ens: determines the ouput level of the model. + :return: the stochastic Model instance. + """ +``` + +* `__init__(self, config, scriptdir)` initializes the object. Here `config` is the stochModelFactory element of the main `.oda` configuration file as a dictionary. `scriptdir` is the directory containing the `.oda` file. + +TODO: main_or_ens even uitvogelen. + +* `get_instance(self, noise_config, main_or_ens)` returns an instance of the coresponding model. `nosie_config` is a python dictionary containing model noise configuration. It has the keys `@stochInit`, `@stochParameter` and `@stochForcing`. Note that this is the only point in the algorithm where you will call the `__init__` method of your model instance, so you are free to give it whatever input you need. + +A model instance requires the following methods: + +```python +def get_time_horizon(self): + """ + Get the computational time horizon of the model (begin and end time). + + :return: the time horizon (containing begin and end time). + """ + +def get_current_time(self): + """ + Get the model instance's current simulation time stamp. + + :return: The model's current simulation time stamp. + """ + +def announce_observed_values(self, descriptions): + """ + Tells model that it can expect to be asked for model values + corresponding to the observations + described. The model can make arrangement to save these values. The + method compute run over a long interval at once, + not stopping at each time with observations. + This is meant to increase the performance especially of calibration + algorithms. + + :param descriptions: an ObservationDescriptions object with meta + data for the observations. + :return: + """ + +def compute(self, time): + """ + Let the stochastic model instance compute to the requested target time stamp. + This function can not be used to go back in time. + + :param time: Time to compute to. + :return: + """ + +def get_observations(self, descriptions): + """ + Get model values corresponding to the descriptions. + + :param descriptions: An ObservationDescriptions object with meta + data for the observations + :return: python list with the model values corresponding to the descriptions + """ + +def update_state(self, state_array, main_or_ens): + """ + Update the state vector of the model. + + :param state_array: numpy array used to update the model state. + :main_or_ens: "main" for updating the main model, "ens" for ensemble members. + :return: + """ +``` + +* `get_time_horizon(self)` should return a time object spanning the start and end of the simulation. + +* `get_current_time(self)` returns the time up to which the state vector has been calculated. + +* `announce_observed_values(self, descriptions)` function which can be used to make sure that the model will save the observations described by the descriptions. This function does get called by the main algorithm, but it does not have to do anything if your implementation does not need it to. + +* `compute(self, time)` runs the model until the given time. + +* `get_observations(self, descriptions)` returns values of the state vector that correspond to the given observation descriptions. These descriptions are given by the stoch observer, so pay special attention when trying to use them. + +* `update_state(self, state_array, main_or_ens)` changes the state vector based on the input `state_array`. Note that the input is a little different depending on whether the model to be updated is an ensemble member or the main model. For an ensemblemember `state_array` should be added to the state vector, while for the main model it is supposed to replace the state vector. This is controlled by the `main_or_ens` argument, which is a string equal to either "main" or "ens". + +## Using your own stoch observer +When creating your own stoch observer, the following methods are required: +```python +def __init__(self, config, scriptdir): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + """ + +def create_selection(self, model_span): + """ + Create a new observer containing a selection of the present observer + based on the given time span. + + :param model_span: time span with selection. + :return: stochastic observer containing the required selection. + """ + +def get_times(self): + """ + Get all different times in increasing order. There is at least + one observation for each time. + + :return: some type of vector containing the times + """ + +def get_count(self): + """ + Total number of observations. + + :return: the number of observations. + """ + +def get_observation_descriptions(self): + """ + Get the observation descriptions. + + :return: observation descriptions which are compatible with the + used model instance + """ + +def get_sqrt_covariance(self): + """ + Get the covariance matrix for the stochastic observations. + + :return: the covariance matrix as numpy array. + """ + +def get_realizations(self): + """ + Get realization values for all observations, for one ensemble member. + + :return: the realizations. + """ +``` + +TODO: Misschien leuk om init ook met alleen een xml file als input te doen? + +* `__init__(self, config=None, scriptdir=None)` initializes the object. Here `config` is the stochObserver element of the main `.oda` configuration file as a dictionary. `scriptdir` is the directory containing the `.oda` file. + +* `create_selection(self, model_span)` creates a new stoch observer, limited to the span given by the time object `model_span`. The class of the output should be the same as the class of `self`. + +* `get_times(self)` lists all the times where there are observations available. The output will automatically be converted to a python list of time objects. + +* `get_count(self)` simply returns an integer indicating the number of measurements available. + +TODO: java observation description maken! + +* `get_observation_descriptions(self)` returns the observation descriptions that the model instance can use to determine which elements of the state vector corresponds to quantities that are observed. Keep in mind that there is no utility routine for creating a java observation description from python, so for now you cannot combine a python observer with a java model. + +* `get_sqrt_covariance(self)` generates an np array representing the squared covariance matrix. Note that you first have to convert the output yourself. For this you can use the utility routine `j_vector_array_to_np_array(j_sqrt)` if you have a java vector array. + +* `get_realizations(self)` creates realizations of the observed values using the mean and the standarddeviation of the measurement. The output will be converted automatically. + diff --git a/py_openda/examples/py_kalman/README.txt b/py_openda/examples/py_kalman/README.txt new file mode 100644 index 000000000..2e6d3ea65 --- /dev/null +++ b/py_openda/examples/py_kalman/README.txt @@ -0,0 +1 @@ +Description of py_openda library goes here \ No newline at end of file diff --git a/py_openda/examples/py_kalman/enkf_swan.py b/py_openda/examples/py_kalman/enkf_swan.py new file mode 100644 index 000000000..f1260fb14 --- /dev/null +++ b/py_openda/examples/py_kalman/enkf_swan.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Algorithm for data assimilation using an ensemble kalman filter. Uses an +observer and a model factory from Java but the algorithm resides in Python. +To run the algorithm remember to first start the server : oda_py4j.sh + +Created on Thu Nov 8 15:57:29 2018 + +@author: hegeman +""" + + +import os +from time import time as tm +import numpy as np +import matplotlib.pyplot as plt +import xmlschema +from py_openda.costFunctions.GenericEnsembleKalmanFilter import GenericEnsembleKalmanFilter +from py_openda.algorithms.ensemble_kalman import kalman_algorithm, no_filter + +#TODO: xmlschema is al gauw veel te streng :/ (t.o.v. Java tenminste) + + + +input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_lorenz_3var_part1/simulation_ensemble.oda' +#input_string = '/v3/Stage/Rick/openda/openda_public/examples/model_swan/kalman_twin_windbound/enkf_wind_bound.oda' +#input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_double_pendulum_part2/enkf.oda' +#input_string = '/v3/Stage/Rick/openda/openda_public/core/tests/simple_oscillator/Enkf.oda' + +def main(input_string, observation_location=0): + """ + Main function that runs an ensemble kalman filter as described by the .oda file in input_string. + yields the pred_f_central for a filtered and an unfiltered simulation and plots the + results at one observation location. + + :param input_string: absolute file path of the .oda file. + :param observation_location: index of the observation location you want to plot (default 0). + + :return results: numpy array containing the results for the filtered experiment. + :return no_results: numpy array containing the results for the unfiltered experiment. + """ + os.chdir(input_string.rsplit('/', 1)[0]) + + scriptdir = os.getcwd() + + main_schema = xmlschema.XMLSchema('http://schemas.openda.org/openDaApplication.xsd') + alg_schema = xmlschema.XMLSchema('http://schemas.openda.org/algorithm/enkf.xsd') +# alg_schema = xmlschema.XMLSchema('http://schemas.openda.org/algorithm/sequentialEnsembleAlgorithm.xsd') + main_config = main_schema.decode(input_string.rsplit('/', 1)[1]) + + + +# alg_config = alg_schema.decode('%s/%s' % (main_config.get('algorithm').get('workingDirectory'), +# main_config.get('algorithm').get('configString'))) + +# ensembleModel@stochParameter=false +# ensembleModel@stochForcing=true +# ensembleModel@stochInit=true + #FIXME: Defaults van Java zijn niet de defaults voor xsd?????????????? + alg_config = {'ensembleSize': 50, 'mainModel':{'@stochParameter':False, '@stochForcing':False, '@stochInit':False}, + 'ensembleModel': {'@stochParameter':False, '@stochForcing':True, '@stochInit':True} } + +# n_ensemble = 3 + n_ensemble = alg_config.get('ensembleSize') + + + main_class = GenericEnsembleKalmanFilter(n_ensemble, alg_config, main_config, scriptdir) + n_obs = main_class.get_n_observations() + +# n_steps = main_class.get_n_times()-3 + n_steps = 50 + t = main_class.get_timeline()[:n_steps+1] + t = [(time - t[0])*24 for time in t] + results = np.zeros((n_steps+1, n_obs)) + for j in range(n_steps): + print(j) + start = tm() + results[j, :] = kalman_algorithm(main_class) + end = tm() + print(end-start) + + results[-1, :] = no_filter(main_class) + plt.plot(t, results[:, observation_location]) + + compare_class = GenericEnsembleKalmanFilter(1, alg_config, main_config, scriptdir) + no_results = np.zeros((n_steps+1, n_obs)) + for j in range(n_steps+1): + print(j) + start = tm() + no_results[j, :] = no_filter(compare_class) + end = tm() + print(end-start) + + plt.plot(t, no_results[:, observation_location]) + plt.legend(("EnKF", "no_filter")) + plt.ylabel("x_f_central") + plt.xlabel("t in hours") + return(results, no_results) + +if __name__ == "__main__": + (results, no_results) = main(input_string) diff --git a/py_openda/examples/py_kalman/py_openda/__init__.py b/py_openda/examples/py_kalman/py_openda/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_openda/examples/py_kalman/py_openda/algorithms/__init__.py b/py_openda/examples/py_kalman/py_openda/algorithms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py b/py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py new file mode 100644 index 000000000..33bb69cb6 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Main module containing the functions used for ensemble kalman filters. + + +Created on Tue Nov 20 15:43:21 2018 + +@author: hegeman +""" +import numpy as np + +def kalman_algorithm(enkf): + """ + Main algorithm for ensemble kalman filtering. Runs the prediction step and the update step. + + :param enkf: object which houses the main model, observer, ensembles and the time. + :return: Python list corresponding to the Java result pred_f_central. + """ + enkf.next_predictions() + (predictions, mean_predicitons) = enkf.get_ensemble_vectors_state() + (observations, mean_observations) = enkf.get_ensemble_vectors_forecast() + pred_f_central = enkf.get_results() + + K = kalman_matrix(enkf, predictions, observations) + kalman_update(enkf, observations, predictions, mean_observations, mean_predicitons, K) + + return pred_f_central + +def kalman_matrix(enkf, predictions, observations): + """ + Function for generating the kalman gain from the statevector and observations. + + :param enkf: object which houses the main model, observer, ensembles and the time. + :param predictions: numpy array containing the state vectors for each ensemble member. + :param observations: numpy array containing the values of each ensemble member at the observed location. + + :return K: kalman gain + """ + sqrt_r = enkf.get_covariance_matrix() + n_observations = observations.shape[0] + n_ensemble = predictions.shape[1] + state_lenght = predictions.shape[0] + sqrt_q_min1 = (n_ensemble -1.0)**0.5 + + pred_mat = observations.copy() + pred_mat /= sqrt_q_min1 + D = np.zeros((n_observations, n_observations)) + D = pred_mat.dot(pred_mat.transpose()) + D += sqrt_r.dot(sqrt_r.transpose()) + + predictions /= sqrt_q_min1 + D_inverse = np.linalg.inv(D) + E = np.zeros((n_ensemble, n_observations)) + E = pred_mat.transpose().dot(D_inverse) + K = np.zeros((state_lenght, n_observations)) + for i in range(n_observations): + for j in range(n_ensemble): + K[:, i] += E[j, i]*predictions[:, j] + return K + +def kalman_update(enkf, observations, predictions, mean_observations, mean_predicitons, K): + """ + Function for updating the states, model and ensemble member using the kalman gain. + + :param enkf: object which houses the main model, observer, ensembles and the time. + :param observations: numpy array containing the values of each ensemble member at the observed location. + :param predictions: numpy array containing the state vectors for each ensemble member. + :param mean_observations: numpy array containing the ensemble mean at the observed location. + :param mean_observations: numpy array containing the ensemble mean of the state vector. + :param K: kalman gain + + :return: + """ + state_lenght = K.shape[0] + n_ensemble = observations.shape[1] + for i in range(n_ensemble): + innovation = enkf.get_realizations() - observations[:, i] - mean_observations.transpose() + delta = np.zeros((state_lenght, 1)) + delta[:] = K.dot(innovation.transpose()) + #FIXME: State update met delta maar dan toch de predictions hierbuiten optellen is lelijk + #en py4j specifiek, maar scheelt wel een kwart van je tijd + enkf.update_state(i, delta) + #FIXME: for loop moet ivm MemoryError. Dat is eng. + for j in range(predictions.shape[0]): + predictions[j, i] = predictions[j, i] + delta[j] + prediction_mean = np.array([np.mean(predictions, axis=1)]).transpose() + mean_predicitons + enkf.update_model(prediction_mean) + +def no_filter(enkf): + """ + Algorithm for running the model without filtering. + + :param enkf: object which houses the main model, observer, ensembles and the time. + :return pred_f_central: result corresponding to the Java result x_f_central. + """ + enkf.next_predictions() + pred_f_central = enkf.get_results() + return pred_f_central diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py b/py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py new file mode 100644 index 000000000..19c9f1d39 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Stoch observer for reading from .csv files, usable by the ensemble kalman filter algorithm. +Created on Mon Nov 26 10:49:24 2018 + +@author: hegeman +""" + +import os +import numpy as np +from scipy.stats import norm +import pandas as pd + +class CsvStochObserver: + """ + A stochastic observer which uses pandas to read observations from a csv file. + """ + def __init__(self, config=None, scriptdir=None, clone=None): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + :param clone: if None (default), the class will initialize from configuration, + otherwise the class will be a copy of clone. + """ + if clone is None: + observer_file = os.path.join(scriptdir, config.get('workingDirectory'), config.get('configFile')) + self.data = pd.read_csv(observer_file).set_index('time') + else: + self.data = clone + + def create_selection(self, model_span): + """ + Create a new observer containing a selection of the present observer + based on the given time span. + + :param model_span: time span with selection. + :return: stochastic observer containing the required selection. + """ + time_precision = 1.1574074074074073E-5 + selection = self.data.loc[model_span.get_start()-time_precision : model_span.get_end()+time_precision] + return CsvStochObserver(clone=selection) + + def get_times(self): + """ + Get all different times in increasing order. There is at least one observation for each time. + + :return: some type of vector containing the times + """ + return self.data.index.values + + def get_count(self): + """ + Total number of observations. + + :return: the number of observations. + """ + return self.data.count()[0] + + def get_observation_descriptions(self): + """ + Get the observation descriptions. + + :return: observation descriptions which are compatible with the used model instance + """ + #FIXME: Dit later converteren ziet er heel awkward uit + return self.data + + def get_sqrt_covariance(self): + """ + Get the covariance matrix for the stochastic observations. + + :return: the covariance matrix as numpy array. + """ + return np.diag(self.data['std']) + + def get_realizations(self): + """ + Get realization values for all observations, for one ensemble member. + + :return: the realizations. + """ + return [norm(loc=mean, scale=std).rvs() for mean, + std in zip(self.data['value'], self.data['std'])] diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py b/py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py new file mode 100644 index 000000000..640f500b4 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Module containing the class which houses the models and observer that will be used for +ensemble kalman filtering. +Created on Tue Nov 20 15:33:06 2018 + +@author: hegeman +""" + +import numpy as np + +#from py_openda.costFunctions.JObjects import (JModelFactory as ModelFactory, +# JStochObserver as StochObserver, +# JTime as Time) + +#from py_openda.costFunctions.JObjects import (JStochObserver as StochObserver, +# PyTime as Time) +#from py_openda.costFunctions.LorenzStochModelFactory import LorenzStochModelFactory as ModelFactory + + +from py_openda.costFunctions.JObjects import PyTime as Time +from py_openda.costFunctions.CsvStochObserver import CsvStochObserver as StochObserver +from py_openda.costFunctions.LorenzStochModelFactory import LorenzStochModelFactory as ModelFactory + +#from py_openda.costFunctions.JObjects import (JModelFactory as ModelFactory, +# PyTime as Time) +#from py_openda.costFunctions.CsvStochObserver import CsvStochObserver as StochObserver + +import py_openda.utils.py4j_utils as utils + + + +class GenericEnsembleKalmanFilter: + """ + Class which holds the models and observer that will interact with the Kalman Filter. + """ + + def __init__(self, ensemble_size, alg_config, main_config, scriptdir): + """ + :param ensemble_size: number of ensemble members you wish to use. + :param alg_config: dictionary decoded from an xml configuration file which adheres to + http://schemas.openda.org/algorithm/enkf.xsd + :param main_config: dictionary decoded from an xml configuration file which adheres to + http://schemas.openda.org/openDaApplication.xsd + :param scriptdir: location of the main .oda file. + """ + self.model_factory = ModelFactory(main_config.get('stochModelFactory'), scriptdir) + + self.observer = StochObserver(config=main_config.get('stochObserver'), scriptdir=scriptdir) + + self.main_model = self.model_factory.get_instance(alg_config.get('mainModel'), + main_or_ens="main") + + now = self.main_model.get_current_time() + self.current_time = Time(now.get_start(), now.get_end()) + + model_span = self.main_model.get_time_horizon() + model_span = Time(model_span.get_start(), model_span.get_end()) + selection = self.observer.create_selection(model_span) + self.analysis_times = utils.input_to_time_list(selection.get_times(), Time) + + + #FIXME: DIT KLOPT MISSCHIEN NIET HELEMAAL + self.this_step = -1 + if alg_config.get('analysisTimes') is not None: + if alg_config.get('analysisTimes').get('@skipAtInitialTime'): + self.this_step = 0 + + self.selection = None + self.ensemble_size = ensemble_size + self.ensemble = [None]*self.ensemble_size + for i in range(self.ensemble_size): + self.ensemble[i] = self.model_factory.get_instance(alg_config.get('ensembleModel'), + main_or_ens="ens") + + def get_n_observations(self): + """ + Total number of observations. + + :return: the number of observations. + """ + return self.observer.create_selection(self.analysis_times[self.this_step+1]).get_count() + + def get_n_times(self): + """ + Returns number of times in analysis_times + + :return: number of available time steps. + """ + return len(self.analysis_times) + + def get_timeline(self): + """ + Returns the list of time stamps where measurements took place. + + :return: Python list with all analysis times in MJD. + """ + return [time.get_mjd() for time in self.analysis_times] + + def get_covariance_matrix(self): + """ + Create a covariance matrix from the stoch observer. + + :return: numpy array of the covariance matrix. + """ + return self.selection.get_sqrt_covariance() + + def get_realizations(self): + """ + Get realizations from the stoch observer. + + :param selection: stoch observer. + :return: numpy array with realizations. + """ + return utils.input_to_np_array(self.selection.get_realizations()) + + def get_ensemble_vectors_forecast(self): + """ + Represents the values of the state of each ensemble at the observed locations. + + :return ensemble_predicted_observations: numpy array containing the ensemble states + at the observed locations, with the mean removed. + :return mean_observations: ensemble mean at those same locations. + """ + + descriptions = self.selection.get_observation_descriptions() + ensemble_predicted_observations = [None]*self.ensemble_size + for i in range(self.ensemble_size): + ensemble_predicted_observations[i] = utils.input_to_py_list(self.ensemble[i].get_observations(descriptions)) + ensemble_predicted_observations = np.array(ensemble_predicted_observations).transpose() + mean_observations = np.array([np.mean(ensemble_predicted_observations, axis=1)]).transpose() + ensemble_predicted_observations -= mean_observations + return(ensemble_predicted_observations, mean_observations) + + def get_ensemble_vectors_state(self): + """ + Returns the entire state vector for each ensemble member, as well as the ensemble mean. + + :return ensemble_states: numpy array containing the state vectors, with the mean removed. + :return ensemble_mean: ensemble mean at those same locations. + """ + ensemble_states = [None]*self.ensemble_size + for i in range(self.ensemble_size): + ensemble_states[i] = utils.input_to_py_list(self.ensemble[i].get_state()) + ensemble_states = np.array(ensemble_states).transpose() + ensemble_mean = np.array([np.mean(ensemble_states, axis=1)]).transpose() + ensemble_states -= ensemble_mean + return(ensemble_states, ensemble_mean) + + def forecast(self, time): + """ + Runs the model and all ensemble members until the given time. + + :param time: time which indicates the current time. + :return: + """ + self.main_model.compute(time) + for i in range(self.ensemble_size): + if self.selection.get_count() > 0: + self.ensemble[i].announce_observed_values(self.selection.get_observation_descriptions()) + self.ensemble[i].compute(time) + + def next_predictions(self): + """ + Advances the time at the start of the next step in the algorithm + and advances the model and ensemble members along as well. + + :return: + """ + self.this_step += 1 + time = self.analysis_times[self.this_step] + + self.selection = self.observer.create_selection(time) + if time.after(self.current_time): + if time.get_is_span(): + time = Time(time.get_mjd(), time.get_mjd()) + self.forecast(time) + self.current_time = time + + def update_state(self, i, state_array): + """ + Updates the state vector for one of the ensemble members. + + :param i: index of the relevant ensemble member. + :param state_array: numpy array containing the updated state vector. + :return: + """ + self.ensemble[i].update_state(state_array, "ens") + + def update_model(self, mean): + """ + Updates the state vector for the main model based on the ensemble members. + + :param mean: numpy array containing the new ensemble mean. + :return: + """ + self.main_model.update_state(mean, "main") + + + def get_results(self): + """ + Returns the main model state at the observed locations. + Corresponds to the x_f_central results in Java. + + :return: Python list containing the results. + """ + + descriptions = self.selection.get_observation_descriptions() + pred = self.main_model.get_observations(descriptions) + return utils.input_to_py_list(pred) diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py b/py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py new file mode 100644 index 000000000..92248e111 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Module containing various classes that are used by the ensemble kalman filter. +When adding your own classes be sure that they include all the methods given by the +default classes JModelFactory, JModelInstance, PyTime and JStochObserver. +Type conversion for method outputs are taken care of, so there is some flexibility in that regard. +The data type of inputs are not flexible unless the method only gets called within this script, +so be sure to carefully read the documentation of the default classes. + +Created on Tue Nov 20 15:27:05 2018 + +@author: hegeman +""" + +import os +from py4j.java_gateway import JavaGateway +import py_openda.utils.py4j_utils as utils + +gateway = JavaGateway() + + +class JModelFactory: + """ + Wrapperclass for using a stochModelFactory from Java. + """ + + def __init__(self, config, scriptdir): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + """ + model_input_dir = os.path.join(scriptdir, config.get('workingDirectory')) + model_config_xml = config.get('configFile') + self.model_factory = None + model_object = 'gateway.jvm.'+config.get('@className') + exec('self.model_factory =%s()' % model_object) + utils.initialize_openda_configurable(self.model_factory, model_input_dir, model_config_xml) + + def get_instance(self, noise_config, main_or_ens): + """ + Create an instance of the stochastic Model. + + :param noise_config: dictionary as given by EnkfAlgorithm.xml for the noise configuration. + :param main_or_ens: determines the ouput level of the model. + :return: the stochastic Model instance. + """ + if main_or_ens == 'main': + output_level = gateway.jvm.org.openda.interfaces.IStochModelFactory.OutputLevel.ModelDefault + elif main_or_ens == 'ens': + output_level = gateway.jvm.org.openda.interfaces.IStochModelFactory.OutputLevel.Suppress + model = self.model_factory.getInstance(output_level) + return JModelInstance(model, noise_config, main_or_ens) + +class JStochObserver: + """ + Wrapperclass for using a stochObserver from Java. + """ + + def __init__(self, config=None, scriptdir=None, clone=None): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + :param clone: if None (default), the class will initialize from configuration, otherwise + the class will be a copy of clone. + """ + if clone is None: + observer_input_dir = os.path.join(scriptdir, config.get('workingDirectory')) + observer_config_xml = config.get('configFile') + self.observer = None + observer_object = 'gateway.jvm.'+config.get('@className') + exec('self.observer =%s()' % observer_object) + utils.initialize_openda_configurable(self.observer, observer_input_dir, + observer_config_xml) + else: + self.observer = clone + + def create_selection(self, model_span): + """ + Create a new observer, containing a selection of the present observer, + based on the given time span. + + :param model_span: time span with selection. + :return: stochastic observer containing the required selection. + """ + time_precision = gateway.jvm.org.openda.utils.performance.OdaGlobSettings.getTimePrecision() + selection_span = gateway.jvm.org.openda.utils.Time(model_span.get_start(), model_span.get_end()) + selection_span = selection_span.extendInterval(time_precision) + return JStochObserver(clone=self.observer.createSelection(selection_span)) + + def get_times(self): + """ + Get all different times in increasing order. There is at least one observation for each time. + + :return: some type of vector containing the times + """ + return self.observer.getTimes() + + def get_count(self): + """ + Total number of observations. + + :return: the number of observations. + """ + return self.observer.getCount() + + def get_observation_descriptions(self): + """ + Get the observation descriptions. + + :return: observation descriptions which are compatible with the used model instance + """ + return self.observer.getObservationDescriptions() + + def get_sqrt_covariance(self): + """ + Get the covariance matrix for the stochastic observations. + + :return: the covariance matrix as numpy array. + """ + #FIXME: misschien nog niet super generiek + j_sqrt = self.observer.getSqrtCovariance().asVectorArray() + py_sqrt = utils.j_vector_array_to_np_array(j_sqrt) + return py_sqrt + + def get_realizations(self): + """ + Get realization values for all observations, for one ensemble member. + + :return: the realizations. + """ + return self.observer.getRealizations() + + +class JModelInstance: + """ + Wrapperclass for using a stochModelInstance from Java. + """ + + def __init__(self, model, noise_config, main_or_ens): + """ + :param model: model instance as provided by the factory. + :param noise_config: dictionary as given by EnkfAlgorithm.xml for the noise configuration. + :param main_or_ens: determines the ouput level of the model. + """ + self.model = model + if noise_config is None: + if main_or_ens == "main": + noise_config = {'@stochParameter':False, '@stochForcing':False, '@stochInit':False} + elif main_or_ens == "ens": + noise_config = {'@stochParameter':False, '@stochForcing':True, '@stochInit':True} + + if noise_config.get('@stochInit'): + init = self.model.getStateUncertainty() + self.model.axpyOnState(1.0, init.createRealization()) + + if noise_config.get('@stochParameter'): + pars = self.model.getParameterUncertainty().createRealization() + self.model.setParameters(pars) + + self.model.setAutomaticNoiseGeneration(noise_config.get('@stochForcing')) + + + def get_time_horizon(self): + """ + Get the computational time horizon of the model (begin and end time). + + :return: the time horizon (containing begin and end time). + """ + return JTime(self.model.getTimeHorizon()) + + def get_current_time(self): + """ + Get the model instance's current simulation time stamp. + + :return: The model's current simulation time stamp. + """ + return JTime(self.model.getCurrentTime()) + + def announce_observed_values(self, descriptions): + """ + Tells model that it can expect to be asked for model values corresponding to the observations + described. The model can make arrangement to save these values. The method compute run over a long + interval at once, not stopping at each time with observations. + This is meant to increase the performance especially of calibration algorithms. + + :param descriptions: an ObservationDescriptions object with meta data for the observations. + :return: + """ + self.model.announceObservedValues(descriptions) + + def compute(self, time): + """ + Let the stochastic model instance compute to the requested target time stamp. + This function can not be used to go back in time. + + :param time: Time to compute to. + :return: + """ + time = gateway.jvm.org.openda.utils.Time(time.get_start()) + self.model.compute(time) + + def get_observations(self, descriptions): + """ + Get model values corresponding to the descriptions. + + :param descriptions: An ObservationDescriptions object with meta data for the observations + :return: python list with the model values corresponding to the descriptions + """ + return self.model.getObservationOperator().getObservedValues(descriptions) + + def update_state(self, state_array, main_or_ens): + """ + Update the state vector of the model. + + :param state_array: numpy array used to update the model state. + :main_or_ens: "main" for updating the main model, "ens" for ensemble members. + :return: + """ + if main_or_ens == "ens": + delta = utils.input_to_j_vector(state_array) + self.model.axpyOnState(1.0, delta) + elif main_or_ens == "main": + delta_mean = utils.input_to_j_vector(state_array) + x_main = self.model.getState() + delta_mean.axpy(-1.0, x_main) + self.model.axpyOnState(1.0, delta_mean) + + + def get_state(self): + """ + Returns the state of the model. + + :return: State vector. + """ + return self.model.getState() + +class JTime: + """ + Wrapperclass for using a Time object from Java. + """ + def __init__(self, start, end=None): + """ + Note: if you want to save a Java object as a JTime, only include a start time. + + :param start: start of the time period. + :param end: end of the time period. + """ + if end is None: + self.time = gateway.jvm.org.openda.utils.Time(start) + else: + self.time = gateway.jvm.org.openda.utils.Time(start, end) + + def get_start(self): + """ + Returns the start of the time period. + + :return: start time. + """ + return self.time.getBeginMJD() + + def get_end(self): + """ + Returns the start of the time period. + + :return: start time. + """ + return self.time.getEndMJD() + + def get_is_span(self): + """ + Check whether self is a time span or a time stamp. + + :return: True if self is a time span. + """ + return self.time.isSpan() + + def after(self, other_time): + """ + Check whether self starts after other_time ends. + + :param other_time: time object to be compared + :return: True if self starts after other_time ends. + """ + return self.time.after(gateway.jvm.org.openda.utils.Time(other_time.get_start(), + other_time.get_end())) + + def get_mjd(self): + """ + Returns a time stamp in the middle of the time period. + + :return: center of time period. + """ + return self.time.getMJD() + +class PyTime: + """ + Class used for keeping track of periods of time. + """ + def __init__(self, start, end=None): + """ + :param start: start of the time period. + :param end: end of the time period. + """ + self.start = start + if end is None: + self.end = start + + else: + self.end = end + if end is None or start == end: + self.is_span = False + else: + self.is_span = True + + def get_start(self): + """ + Returns the start of the time period. + + :return: start time. + """ + return self.start + + def get_end(self): + """ + Returns the end of the time period. + + :return: end time. + """ + return self.end + + def get_is_span(self): + """ + Check whether self is a time span or a time stamp. + + :return: True if self is a time span. + """ + return self.is_span + + def after(self, other_time): + """ + Check whether self starts after other_time ends. + + :param other_time: time object to be compared + :return: True if self starts after other_time ends. + """ + return self.start > other_time.get_end() + + def get_mjd(self): + """ + Returns a time stamp in the middle of the time period. + + :return: center of time period. + """ + return 0.5*(self.start+self.end) diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py b/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py new file mode 100644 index 000000000..93e123c80 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Factory for making Lorenz model instances, usable by the ensemble kalman filter algorithm. +Created on Thu Nov 22 11:29:15 2018 + +@author: hegeman +""" + +import os +import xmlschema +from py_openda.costFunctions.LorenzStochModelInstance import LorenzStochModelInstance + +class LorenzStochModelFactory: + """ + Factory for making Lorenz model instances + """ + def __init__(self, config, scriptdir): + """ + :param config: dictionary used for configuration. + :param scriptdir: location of the main .oda file. + """ + xml_path = os.path.join(scriptdir, config.get('workingDirectory'), config.get('configFile')) + schema = xmlschema.XMLSchema('http://schemas.openda.org/toymodels/LorenzConfig.xsd') + self.config = schema.decode(xml_path) + + names = self.config.get('parameters').get('@names').split(',') + param_values = [float(val) for val in self.config.get('parameters').get('$').strip('[]').split(',')] + param_uncertainty = [float(val) for val in self.config.get('parameterUncertainty') + .get('$').strip('[]').split(',')] + param = dict(zip(names, param_values)) + param_uncertainty = dict(zip(names, param_uncertainty)) + + state = [float(val) for val in self.config.get('initialState').strip('[]').split(',')] + state_uncertainty = [float(val) for val in self.config.get('initialStateUncertainty') + .strip('[]').split(',')] + + sys_noise = self.config.get('systemNoise').strip('{[]}').split('],[') + sys_mean = [float(val) for val in sys_noise[0].split(',')] + sys_std = [float(val) for val in sys_noise[1].split(',')] + + span = [float(val) for val in self.config.get('simulationTimespan').strip('[]').split(',')] + self.model_attributes = (param, param_uncertainty, state, state_uncertainty, sys_mean, sys_std, span) + + def get_instance(self, noise_config, main_or_ens): + """ + Create an instance of the stochastic Model. + + :param noise_config: dictionary as given by EnkfAlgorithm.xml for the noise configuration. + :param main_or_ens: determines the ouput level of the model. + :return: the stochastic Model instance. + """ + return LorenzStochModelInstance(self.model_attributes, noise_config, main_or_ens) diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py b/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py new file mode 100644 index 000000000..cf228c5d4 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Instance of a three varible Lorenz model, usable by the ensemble kalman filter algorithm. +Created on Thu Nov 22 11:32:08 2018 + +@author: hegeman +""" + +from math import sqrt +import numpy as np +from scipy.stats import norm +from scipy.integrate import ode +from py_openda.costFunctions.JObjects import PyTime +import py_openda.utils.py4j_utils as utils + +class LorenzStochModelInstance: + """ + Instance of a three point Lorenz model. + """ + def __init__(self, model_attributes, noise_config, main_or_ens=None): + """ + :param model_attributes: attributes saved in the model factory. + :param noise_config: dictionary as given by EnkfAlgorithm.xml for the noise configuration. + :param main_or_ens: determines the ouput level of the model. + """ + (self.param, self.param_uncertainty, self.state, self.state_uncertainty, self.sys_mean, + self.sys_std, self.span) = model_attributes + + if noise_config is None: + if main_or_ens == "main": + noise_config = {'@stochParameter':False, '@stochForcing':False, '@stochInit':False} + elif main_or_ens == "ens": + noise_config = {'@stochParameter':False, '@stochForcing':True, '@stochInit':True} + if noise_config.get('@stochInit'): + realizations = [norm(loc=mean, scale=std).rvs() for mean, + std in zip(self.state, self.state_uncertainty)] + self.state = realizations.copy() + if noise_config.get('@stochParameter'): + realizations = [norm(loc=mean, scale=std).rvs() for mean, + std in zip(list(self.param.values()), + list(self.param_uncertainty.values()))] + self.param = realizations + + self.auto_noise = noise_config.get('@stochForcing') + + + self.current_time = PyTime(self.span[0]) + self.state = np.array(self.state) + + + def get_time_horizon(self): + """ + Get the computational time horizon of the model (begin and end time). + + :return: the time horizon (containing begin and end time). + """ + return PyTime(self.span[0], self.span[2]) + + def get_current_time(self): + """ + Get the model instance's current simulation time stamp. + + :return: The model's current simulation time stamp. + """ + return self.current_time + + def announce_observed_values(self, descriptions): + """ + Tells model that it can expect to be asked for model values corresponding to the observations + described. The model can make arrangement to save these values. The method compute run over a long + interval at once, not stopping at each time with observations. + This is meant to increase the performance especially of calibration algorithms. + + :param descriptions: an ObservationDescriptions object with meta data for the observations. + :return: + """ + None + + def compute(self, time): + """ + Let the stochastic model instance compute to the requested target time stamp. + This function can not be used to go back in time. + + :param time: Time to compute to. + :return: + """ + end_time = time.get_start() + solver = ode(_lorenz_function_).set_integrator('dopri5').set_f_params(list(self.param.values())) + solver = solver.set_initial_value(self.state, self.current_time.get_start()) + new_state = self.state.copy() + t = self.current_time.get_start() + t_step = self.span[1] + nsteps = round((end_time-t)/t_step) + for _ in range(nsteps): + solver.integrate(solver.t +t_step) + new_state = solver.y + if self.auto_noise: + realizations = [norm(loc=mean, scale=std).rvs() for mean, + std in zip(self.sys_mean, self.sys_std)] + new_state = new_state + sqrt(t_step)*np.array(realizations) + solver = solver.set_initial_value(new_state, solver.t) + self.current_time = PyTime(end_time) + self.state = new_state + + def get_observations(self, descriptions): + """ + Get model values corresponding to the descriptions. + + :param descriptions: An ObservationDescriptions object with meta data for the observations + :return: python list with the model values corresponding to the descriptions + """ + indeces = utils.input_to_py_descriptions(descriptions) + obs = [None]*len(indeces) + for (i, index) in enumerate(indeces): + obs[i] = self.state[int(index)].copy() + return obs + + def update_state(self, state_array, main_or_ens): + """ + Update the state vector of the model. + + :param state_array: numpy array used to update the model state. + :main_or_ens: "main" for updating the main model, "ens" for ensemble members. + :return: + """ + if main_or_ens == "ens": + delta = utils.input_to_np_array(state_array) + self.state += delta + elif main_or_ens == "main": + delta_mean = utils.input_to_np_array(state_array) + self.state = delta_mean + + + def get_state(self): + """ + Returns the state of the model. + + :return: State vector. + """ + return self.state + +def _lorenz_function_(t, x, params): + """ + Function which computes the derivative of the current state at time t. + + :param t: time. + :param x: state. + :param params: list containing the parameters sigma, rho and beta. + + :return: derivative of the state. + """ + res = [None]*3 +# beta = 2.6667 +# sigma = 10.0 +# rho = 28.0 + sigma = params[0] + rho = params[1] + beta = params[2] + # dx = sigma*(y-x) *dt + # dy = (rho*x-y-x*z)*dt + # dz = (x*y-beta*z) *dt + res[0] = sigma*(x[1] - x[0]) + res[1] = rho*x[0] - x[1] - x[0]*x[2] + res[2] = x[0]*x[1] - beta*x[2] + return res diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/__init__.py b/py_openda/examples/py_kalman/py_openda/costFunctions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_openda/examples/py_kalman/py_openda/utils/__init__.py b/py_openda/examples/py_kalman/py_openda/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py b/py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py new file mode 100644 index 000000000..2cca25b55 --- /dev/null +++ b/py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Module of utility py4j functions for converting python lists, java arrays, +numpy arrays and OpenDA vectors, as well as initializing OpenDA configurables. + +Created on Tue Nov 20 15:16:14 2018 + +@author: hegeman +""" + +import numpy as np +import pandas as pd +from py4j.java_gateway import JavaGateway +from py4j.java_collections import JavaArray + +gateway = JavaGateway() # connect to the JVM +IVector_class = gateway.jvm.java.lang.Class.forName("org.openda.interfaces.IVector") +ITime_class = gateway.jvm.java.lang.Class.forName("org.openda.interfaces.ITime") +IObservationDescriptions_class = gateway.jvm.java.lang.Class.forName("org.openda.interfaces.IObservationDescriptions") + +def initialize_openda_configurable(openda_configurable, input_dir, config_xml): + """ + Initialise an OpenDA configurable (e.g. ModelFactory, StochObserver). + + :param openda_configurable: OpenDA object implementing IConfigurable. + :param input_dir: Working directory/input directory. + :param config_xml: Configuration file. + :return: + """ + + # Translate the input strings to java objects (File and String array) + j_input_dir = gateway.jvm.java.io.File(input_dir) + string_class = gateway.jvm.String + j_arguments = gateway.new_array(string_class, 1) + j_arguments[0] = config_xml + + # Initialize .. + openda_configurable.initialize(j_input_dir, j_arguments) + +def py_list_to_j_array(py_x): + """ + Create a java double array from python list. + + :param py_x: python list of doubles. + :return: java array of doubles copy of py_x. + """ + n_elements = len(py_x) + double_class = gateway.jvm.double + j_x = gateway.new_array(double_class, n_elements) + j_x[:] = py_x[:] + + return j_x + +def j_array_to_py_list(j_x): + """ + Create a python list from a java array. + :param j_x: java array. + :return: python list with values of j_x. + """ + + n_elements = len(j_x) + py_x = [None]*n_elements + py_x[:] = j_x[:] + return py_x + +def j_vector_array_to_np_array(j_vectors): + """ + Create a numpy array from a java vector array. + Note that this function also works for different data types, + as long as the data type has the method getValues(). + + :param j_vectors: java vector array. + :return: numpy array with values of j_vectors. + """ + + + vals = [j_array_to_py_list(vec.getValues()) for vec in j_vectors] + py_array = np.c_[[np.array(val) for val in vals]].transpose() + return py_array + +def np_array_to_j_array(np_array): + """ + Create a java double array from a 1D numpy array. + + :param np_array: 1D numpy array of doubles. + :return: java array of doubles. + """ + np_array = np_array.squeeze().tolist() + j_array = py_list_to_j_array(np_array) + return j_array + +def np_array_to_j_vector(np_array): + """ + Create a java Vector from a 1D numpy array. + + :param np_array: 1D numpy array of doubles. + :return: java Vector. + """ + j_array = np_array_to_j_array(np_array) + return gateway.jvm.org.openda.utils.Vector(j_array) + +def input_to_j_array(obj): + """ + Converts a python list, a 1D numpy array, java Array or an OpenDA vector + to a java array. + + :param obj: object of unknown type. + :return: obj as java array + """ + if isinstance(obj, list): + obj = py_list_to_j_array(obj) + elif isinstance(obj, np.ndarray): + obj = np_array_to_j_array(obj) + elif isinstance(obj, JavaArray): + None + elif IVector_class.isInstance(obj): + obj = obj.getValues() + return obj + +def input_to_py_list(obj): + """ + Converts a python list, a 1D numpy array, java Array or an OpenDA vector + to a java array. + + :param obj: object of unknown type. + :return: obj as java array + """ + if isinstance(obj, JavaArray): + obj = j_array_to_py_list(obj) + elif isinstance(obj, np.ndarray): + obj = obj.squeeze().tolist() + elif isinstance(obj, list): + None + elif IVector_class.isInstance(obj): + obj = j_array_to_py_list(obj.getValues()) + return obj + +def input_to_np_array(obj): + """ + Converts a python list, a 1D numpy array, java Array or an OpenDA vector + to a java array. + + :param obj: object of unknown type. + :return: obj as java array + """ + if isinstance(obj, list): + obj = np.array(obj) + elif isinstance(obj, JavaArray): + obj = np.array(j_array_to_py_list(obj)) + elif isinstance(obj, np.ndarray): + obj = obj.squeeze() + elif IVector_class.isInstance(obj): + obj = np.array(j_array_to_py_list(obj.getValues())) + return obj + +def input_to_j_vector(obj): + """ + Converts a python list, a 1D numpy array, java Array or an OpenDA vector + to a java array. + + :param obj: object of unknown type. + :return: obj as java array + """ + vector_class = gateway.jvm.org.openda.utils.Vector + if isinstance(obj, list): + obj = vector_class(py_list_to_j_array(obj)) + elif isinstance(obj, JavaArray): + obj = vector_class(obj) + elif isinstance(obj, np.ndarray): + obj = np_array_to_j_vector(obj) + return obj + +def input_to_time_list(obj, t_class): + """ + Converts a python list, a 1D numpy array, java Array or an OpenDA vector + containing Itime or t_class objects to a python list of t_class objects. + + :param obj: object of unknown type. + :param t_class: Time class of the desired output. + :return: obj as list of times. + """ + time_list = input_to_py_list(obj) + n_times = len(time_list) + result = [None]*n_times + for (i, time) in enumerate(time_list): + if ITime_class.isInstance(time): + result[i] = t_class(time.getBeginMJD(), time.getEndMJD()) + else: + result[i] = t_class(time) + return result + +def input_to_py_descriptions(obj): + """ + Converts a pandas DataFrame or a java observer description to a python list + of indices. + + :param obj: object of unknown type. + :return: python list of indices. + """ + if isinstance(obj, pd.DataFrame): + indeces = obj['index'].tolist() + elif IObservationDescriptions_class.isInstance(obj): + indeces = obj.getValueProperties("index") + indeces = input_to_py_list(indeces) + return indeces diff --git a/py_openda/examples/py_kalman/setup.py b/py_openda/examples/py_kalman/setup.py new file mode 100644 index 000000000..c1ad6087a --- /dev/null +++ b/py_openda/examples/py_kalman/setup.py @@ -0,0 +1,10 @@ +from setuptools import setup, find_packages +with open("README.txt", 'r') as f: + long_description = f.read() + +setup(name = 'py_openda', + version = '0.1', + packages = find_packages(), + description = ['...'], + long_description = long_description, + install_requires = ['py4j', 'numpy', 'scipy', 'pandas', 'matplotlib', 'xmlschema']) \ No newline at end of file From 2c6e56832fc5e0f4326792b583907b58b29cae1f Mon Sep 17 00:00:00 2001 From: Rick Hegeman Date: Thu, 29 Nov 2018 16:42:47 +0100 Subject: [PATCH 2/3] Updated min todo list --- py_openda/examples/py_kalman/enkf_swan.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/py_openda/examples/py_kalman/enkf_swan.py b/py_openda/examples/py_kalman/enkf_swan.py index f1260fb14..8d6d61aab 100644 --- a/py_openda/examples/py_kalman/enkf_swan.py +++ b/py_openda/examples/py_kalman/enkf_swan.py @@ -20,7 +20,7 @@ from py_openda.algorithms.ensemble_kalman import kalman_algorithm, no_filter #TODO: xmlschema is al gauw veel te streng :/ (t.o.v. Java tenminste) - +#FIXME: Kloppen mijn defaults wel???? input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_lorenz_3var_part1/simulation_ensemble.oda' @@ -51,15 +51,13 @@ def main(input_string, observation_location=0): -# alg_config = alg_schema.decode('%s/%s' % (main_config.get('algorithm').get('workingDirectory'), -# main_config.get('algorithm').get('configString'))) + alg_config = alg_schema.decode('%s/%s' % (main_config.get('algorithm').get('workingDirectory'), + main_config.get('algorithm').get('configString'))) # ensembleModel@stochParameter=false # ensembleModel@stochForcing=true # ensembleModel@stochInit=true - #FIXME: Defaults van Java zijn niet de defaults voor xsd?????????????? - alg_config = {'ensembleSize': 50, 'mainModel':{'@stochParameter':False, '@stochForcing':False, '@stochInit':False}, - 'ensembleModel': {'@stochParameter':False, '@stochForcing':True, '@stochInit':True} } + # n_ensemble = 3 n_ensemble = alg_config.get('ensembleSize') From a58c77db1cbde268df502509736f7abda5df3cf2 Mon Sep 17 00:00:00 2001 From: Rick Hegeman Date: Fri, 21 Dec 2018 16:12:41 +0100 Subject: [PATCH 3/3] Moved Python library and added TODO's --- py_openda/examples/py_kalman/enkf_swan.py | 16 ++++++++++------ .../py_kalman => }/py_openda/__init__.py | 0 .../py_openda/algorithms/__init__.py | 0 .../py_openda/algorithms/ensemble_kalman.py | 5 ++--- .../py_openda/costFunctions/CsvStochObserver.py | 1 - .../costFunctions/GenericEnsembleKalmanFilter.py | 16 +++++++++------- .../py_openda/costFunctions/JObjects.py | 2 +- .../costFunctions/LorenzStochModelFactory.py | 0 .../costFunctions/LorenzStochModelInstance.py | 0 .../py_openda/costFunctions/__init__.py | 0 .../py_kalman => }/py_openda/utils/__init__.py | 0 .../py_kalman => }/py_openda/utils/py4j_utils.py | 0 12 files changed, 22 insertions(+), 18 deletions(-) rename py_openda/{examples/py_kalman => }/py_openda/__init__.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/algorithms/__init__.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/algorithms/ensemble_kalman.py (94%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/CsvStochObserver.py (97%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/GenericEnsembleKalmanFilter.py (93%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/JObjects.py (99%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/LorenzStochModelFactory.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/LorenzStochModelInstance.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/costFunctions/__init__.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/utils/__init__.py (100%) rename py_openda/{examples/py_kalman => }/py_openda/utils/py4j_utils.py (100%) diff --git a/py_openda/examples/py_kalman/enkf_swan.py b/py_openda/examples/py_kalman/enkf_swan.py index 8d6d61aab..b257fd5c5 100644 --- a/py_openda/examples/py_kalman/enkf_swan.py +++ b/py_openda/examples/py_kalman/enkf_swan.py @@ -19,12 +19,11 @@ from py_openda.costFunctions.GenericEnsembleKalmanFilter import GenericEnsembleKalmanFilter from py_openda.algorithms.ensemble_kalman import kalman_algorithm, no_filter -#TODO: xmlschema is al gauw veel te streng :/ (t.o.v. Java tenminste) -#FIXME: Kloppen mijn defaults wel???? +#TODO: Some movable parts in Java might still be hard coded in Python +#TODO: Quite a few xml files might not follow the schema as well as xmlschema wants them to - -input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_lorenz_3var_part1/simulation_ensemble.oda' -#input_string = '/v3/Stage/Rick/openda/openda_public/examples/model_swan/kalman_twin_windbound/enkf_wind_bound.oda' +#input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_lorenz_3var_part1/simulation_ensemble.oda' +input_string = '/v3/Stage/Rick/openda/openda_public/examples/model_swan/kalman_twin_windbound/enkf_wind_bound.oda' #input_string = '/v3/Stage/Rick/openda/openda_public/course/exercise_double_pendulum_part2/enkf.oda' #input_string = '/v3/Stage/Rick/openda/openda_public/core/tests/simple_oscillator/Enkf.oda' @@ -40,6 +39,9 @@ def main(input_string, observation_location=0): :return results: numpy array containing the results for the filtered experiment. :return no_results: numpy array containing the results for the unfiltered experiment. """ + #TODO: Create a py4j JavaGateway and make it a global variable using __builtin__ + #This way you can use a callback server for multiple objects. + os.chdir(input_string.rsplit('/', 1)[0]) scriptdir = os.getcwd() @@ -67,7 +69,7 @@ def main(input_string, observation_location=0): n_obs = main_class.get_n_observations() # n_steps = main_class.get_n_times()-3 - n_steps = 50 + n_steps = 3 t = main_class.get_timeline()[:n_steps+1] t = [(time - t[0])*24 for time in t] results = np.zeros((n_steps+1, n_obs)) @@ -97,4 +99,6 @@ def main(input_string, observation_location=0): return(results, no_results) if __name__ == "__main__": + strt = tm() (results, no_results) = main(input_string) + print(tm()-strt) \ No newline at end of file diff --git a/py_openda/examples/py_kalman/py_openda/__init__.py b/py_openda/py_openda/__init__.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/__init__.py rename to py_openda/py_openda/__init__.py diff --git a/py_openda/examples/py_kalman/py_openda/algorithms/__init__.py b/py_openda/py_openda/algorithms/__init__.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/algorithms/__init__.py rename to py_openda/py_openda/algorithms/__init__.py diff --git a/py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py b/py_openda/py_openda/algorithms/ensemble_kalman.py similarity index 94% rename from py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py rename to py_openda/py_openda/algorithms/ensemble_kalman.py index 33bb69cb6..05295e92d 100644 --- a/py_openda/examples/py_kalman/py_openda/algorithms/ensemble_kalman.py +++ b/py_openda/py_openda/algorithms/ensemble_kalman.py @@ -78,10 +78,9 @@ def kalman_update(enkf, observations, predictions, mean_observations, mean_predi innovation = enkf.get_realizations() - observations[:, i] - mean_observations.transpose() delta = np.zeros((state_lenght, 1)) delta[:] = K.dot(innovation.transpose()) - #FIXME: State update met delta maar dan toch de predictions hierbuiten optellen is lelijk - #en py4j specifiek, maar scheelt wel een kwart van je tijd + #TODO: State update happens both here and within the model object. + #Looks terrible but saves 1 conversion. enkf.update_state(i, delta) - #FIXME: for loop moet ivm MemoryError. Dat is eng. for j in range(predictions.shape[0]): predictions[j, i] = predictions[j, i] + delta[j] prediction_mean = np.array([np.mean(predictions, axis=1)]).transpose() + mean_predicitons diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py b/py_openda/py_openda/costFunctions/CsvStochObserver.py similarity index 97% rename from py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py rename to py_openda/py_openda/costFunctions/CsvStochObserver.py index 19c9f1d39..af98de825 100644 --- a/py_openda/examples/py_kalman/py_openda/costFunctions/CsvStochObserver.py +++ b/py_openda/py_openda/costFunctions/CsvStochObserver.py @@ -63,7 +63,6 @@ def get_observation_descriptions(self): :return: observation descriptions which are compatible with the used model instance """ - #FIXME: Dit later converteren ziet er heel awkward uit return self.data def get_sqrt_covariance(self): diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py b/py_openda/py_openda/costFunctions/GenericEnsembleKalmanFilter.py similarity index 93% rename from py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py rename to py_openda/py_openda/costFunctions/GenericEnsembleKalmanFilter.py index 640f500b4..38a5d3306 100644 --- a/py_openda/examples/py_kalman/py_openda/costFunctions/GenericEnsembleKalmanFilter.py +++ b/py_openda/py_openda/costFunctions/GenericEnsembleKalmanFilter.py @@ -10,18 +10,20 @@ import numpy as np -#from py_openda.costFunctions.JObjects import (JModelFactory as ModelFactory, -# JStochObserver as StochObserver, -# JTime as Time) +#TODO: Use exec() to choose imports + +from py_openda.costFunctions.JObjects import (JModelFactory as ModelFactory, + JStochObserver as StochObserver, + JTime as Time) #from py_openda.costFunctions.JObjects import (JStochObserver as StochObserver, # PyTime as Time) #from py_openda.costFunctions.LorenzStochModelFactory import LorenzStochModelFactory as ModelFactory -from py_openda.costFunctions.JObjects import PyTime as Time -from py_openda.costFunctions.CsvStochObserver import CsvStochObserver as StochObserver -from py_openda.costFunctions.LorenzStochModelFactory import LorenzStochModelFactory as ModelFactory +#from py_openda.costFunctions.JObjects import PyTime as Time +#from py_openda.costFunctions.CsvStochObserver import CsvStochObserver as StochObserver +#from py_openda.costFunctions.LorenzStochModelFactory import LorenzStochModelFactory as ModelFactory #from py_openda.costFunctions.JObjects import (JModelFactory as ModelFactory, # PyTime as Time) @@ -61,7 +63,7 @@ def __init__(self, ensemble_size, alg_config, main_config, scriptdir): self.analysis_times = utils.input_to_time_list(selection.get_times(), Time) - #FIXME: DIT KLOPT MISSCHIEN NIET HELEMAAL + #TODO: This part might not be correct! self.this_step = -1 if alg_config.get('analysisTimes') is not None: if alg_config.get('analysisTimes').get('@skipAtInitialTime'): diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py b/py_openda/py_openda/costFunctions/JObjects.py similarity index 99% rename from py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py rename to py_openda/py_openda/costFunctions/JObjects.py index 92248e111..4515c94aa 100644 --- a/py_openda/examples/py_kalman/py_openda/costFunctions/JObjects.py +++ b/py_openda/py_openda/costFunctions/JObjects.py @@ -118,7 +118,7 @@ def get_sqrt_covariance(self): :return: the covariance matrix as numpy array. """ - #FIXME: misschien nog niet super generiek + #TODO: This part is not very generic! j_sqrt = self.observer.getSqrtCovariance().asVectorArray() py_sqrt = utils.j_vector_array_to_np_array(j_sqrt) return py_sqrt diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py b/py_openda/py_openda/costFunctions/LorenzStochModelFactory.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelFactory.py rename to py_openda/py_openda/costFunctions/LorenzStochModelFactory.py diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py b/py_openda/py_openda/costFunctions/LorenzStochModelInstance.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/costFunctions/LorenzStochModelInstance.py rename to py_openda/py_openda/costFunctions/LorenzStochModelInstance.py diff --git a/py_openda/examples/py_kalman/py_openda/costFunctions/__init__.py b/py_openda/py_openda/costFunctions/__init__.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/costFunctions/__init__.py rename to py_openda/py_openda/costFunctions/__init__.py diff --git a/py_openda/examples/py_kalman/py_openda/utils/__init__.py b/py_openda/py_openda/utils/__init__.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/utils/__init__.py rename to py_openda/py_openda/utils/__init__.py diff --git a/py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py b/py_openda/py_openda/utils/py4j_utils.py similarity index 100% rename from py_openda/examples/py_kalman/py_openda/utils/py4j_utils.py rename to py_openda/py_openda/utils/py4j_utils.py