From 3f9a43c3493fbb688e513fa97971abc7dfc66a24 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:01:05 +0000 Subject: [PATCH 1/3] Initial plan From 308e6ff61cc7a9ea0e459d6862b9679715fda706 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Feb 2026 13:14:55 +0000 Subject: [PATCH 2/3] Add pytest tests, requirements, pyproject.toml, and CI workflow with black/ruff/mypy Co-authored-by: dantzert <47285626+dantzert@users.noreply.github.com> --- .github/workflows/ci.yml | 35 ++++++++ modpods.py | 2 +- pyproject.toml | 20 +++++ requirements.txt | 18 ++++ tests/__init__.py | 0 tests/test_modpods.py | 190 +++++++++++++++++++++++++++++++++++++++ 6 files changed, 264 insertions(+), 1 deletion(-) create mode 100644 .github/workflows/ci.yml create mode 100644 pyproject.toml create mode 100644 requirements.txt create mode 100644 tests/__init__.py create mode 100644 tests/test_modpods.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..007f722 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,35 @@ +name: CI + +on: + push: + branches: ["**"] + pull_request: + branches: ["**"] + +jobs: + lint-and-test: + runs-on: ubuntu-latest + permissions: + contents: read + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.11" + + - name: Install dependencies + run: pip install -r requirements.txt + + - name: Lint with black + run: black --check tests/ + + - name: Lint with ruff + run: ruff check tests/ + + - name: Type-check with mypy + run: mypy tests/ modpods.py --ignore-missing-imports + + - name: Run tests with pytest + run: pytest tests/ -v diff --git a/modpods.py b/modpods.py index 865d335..bd1ba08 100644 --- a/modpods.py +++ b/modpods.py @@ -1960,7 +1960,7 @@ def infer_causative_topology(system_data, dependent_columns, independent_columns if method == 'granger': # granger causality from statsmodels.tsa.stattools import grangercausalitytests causative_topo = pd.DataFrame(index=dependent_columns,columns=system_data.columns).fillna('n') - total_graph = pd.DataFrame(index=dependent_columns,columns=system_data.columns).fillna(1.0) + total_graph = pd.DataFrame(index=dependent_columns,columns=system_data.columns, dtype=float).fillna(1.0) print(causative_topo) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..88b1ec2 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,20 @@ +[tool.black] +line-length = 88 +target-version = ["py310", "py311", "py312"] + +[tool.ruff] +line-length = 88 +target-version = "py310" + +[tool.ruff.lint] +select = ["E", "F", "W", "I"] +ignore = ["E501"] + +[tool.mypy] +ignore_missing_imports = true +no_strict_optional = true +warn_return_any = false +check_untyped_defs = false + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5659692 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,18 @@ +# Core dependencies for modpods +numpy>=1.24 +pandas>=2.0 +scipy>=1.10 +matplotlib>=3.7 +control>=0.9 +pysindy>=1.7 +networkx>=3.0 +pyswmm>=1.0 +statsmodels>=0.14 + +# Testing +pytest>=7.0 + +# Linting +black>=23.0 +ruff>=0.1 +mypy>=1.0 diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_modpods.py b/tests/test_modpods.py new file mode 100644 index 0000000..baeebb9 --- /dev/null +++ b/tests/test_modpods.py @@ -0,0 +1,190 @@ +""" +Pytest tests for modpods core functions. + +These tests cover the functions that are self-contained and do not require +external data files or long-running simulations. +""" + +import warnings + +import control as ct +import numpy as np +import pandas as pd +import pytest + +import modpods + +# --------------------------------------------------------------------------- +# lti_from_gamma +# --------------------------------------------------------------------------- + + +def test_lti_from_gamma_returns_required_keys() -> None: + """lti_from_gamma must return a dict with the expected keys.""" + result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) + assert isinstance(result, dict) + for key in ("t", "gamma_pdf", "lti_approx_output", "lti_approx"): + assert key in result, f"missing key '{key}' in result" + + +def test_lti_from_gamma_output_shapes_match() -> None: + """gamma_pdf and lti_approx_output must have the same length.""" + result = modpods.lti_from_gamma(shape=5, scale=2, location=0) + assert result["gamma_pdf"].shape == result["lti_approx_output"].shape + + +def test_lti_from_gamma_achieves_reasonable_nse() -> None: + """The LTI approximation should achieve NSE > 0.9 for a well-conditioned + gamma distribution (shape=10, scale=1, location=0).""" + result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) + gamma_pdf = result["gamma_pdf"] + lti_approx = result["lti_approx_output"] + nse = 1.0 - float( + np.sum(np.square(gamma_pdf - lti_approx)) + / np.sum(np.square(gamma_pdf - np.mean(gamma_pdf))) + ) + assert nse > 0.9, f"NSE {nse:.4f} is below the 0.9 threshold" + + +def test_lti_from_gamma_t_is_nonnegative() -> None: + """The time vector returned must be non-negative and monotonically increasing.""" + result = modpods.lti_from_gamma(shape=3, scale=1, location=0) + t = result["t"] + assert t[0] >= 0.0 + assert np.all(np.diff(t) > 0), "time vector is not strictly increasing" + + +# --------------------------------------------------------------------------- +# infer_causative_topology (Granger causality) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def cascade_lti_system_data() -> pd.DataFrame: + """Generate response data from a known cascade LTI system. + + System topology (ground truth): + u1 → x0 → x1 → x2 (u1 causes x2 via a long cascade, delayed) + u2 → x8 (u2 causes x8 directly) + x7 → x9, x8 → x9 (x9 driven by both chains; x7 is not observable) + + Observable variables: u1, u2, x2, x8, x9 + """ + np.random.seed(0) + + A = np.diag(-1.0 * np.ones(10)) + A[1, 0] = 1 + A[2, 1] = 1 + A[3, 2] = 1 + A[4, 3] = 1 + A[5, 4] = 1 + A[6, 5] = 1 + A[7, 6] = 1 + A[9, 7] = 1 + A[9, 8] = 1 + + B = np.zeros((10, 2)) + B[0, 0] = 1 + B[8, 1] = 1 + + C = np.eye(10) + D = np.zeros((10, 2)) + + system = ct.ss(A, B, C, D) + time_base = 50.0 + dt = 0.05 + T = np.arange(0, time_base, dt) + + u = np.zeros((len(T), 2)) + u[int(25 / dt) : int(40 / dt), 0] = np.random.rand(int(15 / dt)) - 0.5 + u[int(0 / dt) : int(15 / dt), 1] = np.random.rand(int(15 / dt)) - 0.5 + u[np.abs(u) < 0.40] = 0 + u[:, 0] *= np.random.rand(len(T)) * 1000 + u[:, 1] *= np.random.rand(len(T)) * 100 + + response = ct.forced_response(system, T, np.transpose(u)) + df = pd.DataFrame(index=T) + df["u1"] = response.inputs[0] + df["u2"] = response.inputs[1] + df["x2"] = response.states[2] + df["x8"] = response.states[8] + df["x9"] = response.states[9] + return df + + +def test_infer_causative_topology_returns_dataframe( + cascade_lti_system_data: pd.DataFrame, +) -> None: + """infer_causative_topology must return a (DataFrame, DataFrame) tuple.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = modpods.infer_causative_topology( + cascade_lti_system_data, + dependent_columns=["x2", "x8", "x9"], + independent_columns=["u1", "u2"], + verbose=False, + max_iter=0, + method="granger", + ) + assert isinstance(result, tuple) and len(result) == 2 + causative_topo, total_graph = result + assert isinstance(causative_topo, pd.DataFrame) + assert isinstance(total_graph, pd.DataFrame) + + +def test_infer_causative_topology_identifies_u1_causes_x2( + cascade_lti_system_data: pd.DataFrame, +) -> None: + """Granger causality must identify u1 as a cause of x2 (delayed cascade).""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + causative_topo, _ = modpods.infer_causative_topology( + cascade_lti_system_data, + dependent_columns=["x2", "x8", "x9"], + independent_columns=["u1", "u2"], + verbose=False, + max_iter=0, + method="granger", + ) + assert ( + causative_topo.loc["x2", "u1"] == "d" + ), f"Expected u1→x2 to be 'd' (delayed), got '{causative_topo.loc['x2', 'u1']}'" + + +def test_infer_causative_topology_identifies_u2_causes_x8( + cascade_lti_system_data: pd.DataFrame, +) -> None: + """Granger causality must identify u2 as a cause of x8 (direct link).""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + causative_topo, _ = modpods.infer_causative_topology( + cascade_lti_system_data, + dependent_columns=["x2", "x8", "x9"], + independent_columns=["u1", "u2"], + verbose=False, + max_iter=0, + method="granger", + ) + assert ( + causative_topo.loc["x8", "u2"] == "d" + ), f"Expected u2→x8 to be 'd' (delayed), got '{causative_topo.loc['x8', 'u2']}'" + + +def test_infer_causative_topology_no_self_loops( + cascade_lti_system_data: pd.DataFrame, +) -> None: + """No variable should be identified as causing itself.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + causative_topo, _ = modpods.infer_causative_topology( + cascade_lti_system_data, + dependent_columns=["x2", "x8", "x9"], + independent_columns=["u1", "u2"], + verbose=False, + max_iter=0, + method="granger", + ) + for dep_var in ["x2", "x8", "x9"]: + assert ( + causative_topo.loc[dep_var, dep_var] == "n" + ), f"Self-loop detected for {dep_var}" From a1705b9be65cb62f0366a49dfc17e9491a59ad8b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Feb 2026 15:57:12 +0000 Subject: [PATCH 3/3] Collect all old test scripts into pytest, move data files, fix modern dependency compat Co-authored-by: dantzert <47285626+dantzert@users.noreply.github.com> --- modpods.py | 65 +- pyproject.toml | 3 + requirements.txt | 7 +- test.py | 104 --- test_coef_constraints.py | 114 ---- test_lti_control_of_swmm.py | 592 ------------------ test_lti_from_gamma.py | 50 -- test_lti_system_gen.py | 279 --------- test_topo_from_swmm.py | 34 - test_topo_inference.py | 174 ----- .../data/03439000_05_model_output.txt | 0 .../data/swmm_lti_plant_approx_seeing.pickle | Bin tests/test_modpods.py | 421 +++++++++++-- 13 files changed, 425 insertions(+), 1418 deletions(-) delete mode 100644 test.py delete mode 100644 test_coef_constraints.py delete mode 100644 test_lti_control_of_swmm.py delete mode 100644 test_lti_from_gamma.py delete mode 100644 test_lti_system_gen.py delete mode 100644 test_topo_from_swmm.py delete mode 100644 test_topo_inference.py rename 03439000_05_model_output.txt => tests/data/03439000_05_model_output.txt (100%) rename swmm_lti_plant_approx_seeing.pickle => tests/data/swmm_lti_plant_approx_seeing.pickle (100%) diff --git a/modpods.py b/modpods.py index bd1ba08..30c1b89 100644 --- a/modpods.py +++ b/modpods.py @@ -6,6 +6,8 @@ import pysindy as ps # Suppress the specific AxesWarning from pysindy after import warnings.filterwarnings('ignore', message='.*axes labeled for array with.*', module='pysindy') +# ConstrainedSR3 moved to a private submodule in pysindy 2.x +from pysindy.optimizers._constrained_sr3 import ConstrainedSR3 as _ConstrainedSR3 import scipy.stats as stats @@ -362,7 +364,6 @@ def SINDY_delays_MI(shape_factors, scale_factors, loc_factors, index, forcing, r differentiation_method= ps.FiniteDifference(), feature_library=ps.PolynomialLibrary(degree=poly_degree,include_bias = include_bias, include_interaction=include_interaction), optimizer = ps.STLSQ(threshold=0), - feature_names = feature_names ) elif (forcing_coef_constraints is not None and not bibo_stable): library = ps.PolynomialLibrary(degree=poly_degree,include_bias = include_bias, include_interaction=include_interaction) @@ -384,8 +385,7 @@ def SINDY_delays_MI(shape_factors, scale_factors, loc_factors, index, forcing, r model = ps.SINDy( differentiation_method= ps.FiniteDifference(), feature_library=ps.PolynomialLibrary(degree=poly_degree,include_bias = include_bias, include_interaction=include_interaction), - optimizer = ps.ConstrainedSR3(threshold=0, thresholder = "l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), - feature_names = feature_names + optimizer = _ConstrainedSR3(reg_weight_lam=0, regularizer="l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), ) elif (bibo_stable): # highest order output autocorrelation is constrained to be negative #import cvxpy @@ -470,8 +470,7 @@ def SINDY_delays_MI(shape_factors, scale_factors, loc_factors, index, forcing, r model = ps.SINDy( differentiation_method= ps.FiniteDifference(), feature_library=ps.PolynomialLibrary(degree=poly_degree,include_bias = include_bias, include_interaction=include_interaction), - optimizer = ps.ConstrainedSR3(threshold=0, thresholder = "l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), - feature_names = feature_names + optimizer = _ConstrainedSR3(reg_weight_lam=0, regularizer="l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), ) if transform_dependent: # combine response and forcing into one dataframe @@ -514,13 +513,12 @@ def SINDY_delays_MI(shape_factors, scale_factors, loc_factors, index, forcing, r model = ps.SINDy( differentiation_method= ps.FiniteDifference(), feature_library=library, - optimizer = ps.ConstrainedSR3(threshold=0, thresholder = "l0", - nu = 10e9, initial_guess = initial_guess, + optimizer = _ConstrainedSR3(reg_weight_lam=0, regularizer="l0", + relax_coeff_nu = 10e9, initial_guess = initial_guess, constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=False, max_iter=10000), - feature_names = feature_names ) try: @@ -611,7 +609,7 @@ def SINDY_delays_MI(shape_factors, scale_factors, loc_factors, index, forcing, r except Exception as e: # and print the exception: print("Exception in simulation\n") print(e) - error_metrics = {"MAE":[np.NAN],"RMSE":[np.NAN],"NSE":[np.NAN],"alpha":[np.NAN],"beta":[np.NAN],"HFV":[np.NAN],"HFV10":[np.NAN],"LFV":[np.NAN],"FDC":[np.NAN],"r2":r2} + error_metrics = {"MAE":[np.nan],"RMSE":[np.nan],"NSE":[np.nan],"alpha":[np.nan],"beta":[np.nan],"HFV":[np.nan],"HFV10":[np.nan],"LFV":[np.nan],"FDC":[np.nan],"r2":r2} return {"error_metrics": error_metrics, "model": model, "simulated": response[1:], "response": response, "forcing": forcing, "index": index,"diverged":True} @@ -632,20 +630,23 @@ def transform_inputs(shape_factors, scale_factors, loc_factors,index, forcing): #print("forcing at beginning of transform inputs") #print(forcing) shape_time = np.arange(0,len(index),1) + input_values_cache = {col: np.array(forcing[col].values, dtype=float) for col in orig_forcing_columns} for input in orig_forcing_columns: # which input are we talking about? for transform_idx in range(1,num_transforms + 1): # which transformation of that input are we talking about? + col_name = str(str(input) + "_tr_" + str(transform_idx)) # if the column doesn't exist, create it - if (str(str(input) + "_tr_" + str(transform_idx)) not in forcing.columns): - forcing.loc[:,str(str(input) + "_tr_" + str(transform_idx))] = 0.0 + if col_name not in forcing.columns: + forcing.loc[:,col_name] = 0.0 # now, fill it with zeros (need to reset between different transformation shape factors) - forcing[str(str(input) + "_tr_" + str(transform_idx))].values[:] = float(0.0) - #print(forcing) + # accumulate into a numpy array first (compatible with pandas Copy-on-Write semantics) + col_data = np.zeros(len(index)) for idx in range(0,len(index)): # timestep - if (abs(forcing[input].iloc[idx]) > 10**-6): # when nonzero forcing occurs + if (abs(input_values_cache[input][idx]) > 10**-6): # when nonzero forcing occurs if (idx == int(0)): - forcing[str(str(input) + "_tr_" + str(transform_idx))].values[idx:] += forcing[input].values[idx]*stats.gamma.pdf(shape_time, shape_factors[input][transform_idx], scale=scale_factors[input][transform_idx], loc = loc_factors[input][transform_idx]) + col_data[idx:] += input_values_cache[input][idx]*stats.gamma.pdf(shape_time, shape_factors[input][transform_idx], scale=scale_factors[input][transform_idx], loc = loc_factors[input][transform_idx]) else: - forcing[str(str(input) + "_tr_" + str(transform_idx))].values[idx:] += forcing[input].values[idx]*stats.gamma.pdf(shape_time[:-idx], shape_factors[input][transform_idx], scale=scale_factors[input][transform_idx], loc = loc_factors[input][transform_idx]) + col_data[idx:] += input_values_cache[input][idx]*stats.gamma.pdf(shape_time[:-idx], shape_factors[input][transform_idx], scale=scale_factors[input][transform_idx], loc = loc_factors[input][transform_idx]) + forcing.loc[:,col_name] = col_data #print("forcing at end of transform inputs") #print(forcing) @@ -674,8 +675,8 @@ def delay_io_predict(delay_io_model, system_data, num_transforms=1,evaluation=Fa print("Exception in simulation\n") print(e) print("diverged.") - error_metrics = {"MAE":[np.NAN],"RMSE":[np.NAN],"NSE":[np.NAN],"alpha":[np.NAN],"beta":[np.NAN],"HFV":[np.NAN],"HFV10":[np.NAN],"LFV":[np.NAN],"FDC":[np.NAN]} - return {'prediction':np.NAN*np.ones(shape=response[windup_timesteps+1:].shape), 'error_metrics':error_metrics,"diverged":True} + error_metrics = {"MAE":[np.nan],"RMSE":[np.nan],"NSE":[np.nan],"alpha":[np.nan],"beta":[np.nan],"HFV":[np.nan],"HFV10":[np.nan],"LFV":[np.nan],"FDC":[np.nan]} + return {'prediction':np.nan*np.ones(shape=response[windup_timesteps+1:].shape), 'error_metrics':error_metrics,"diverged":True} # return all the error metrics if the prediction is being evaluated against known measurements if (evaluation): @@ -738,11 +739,11 @@ def delay_io_predict(delay_io_model, system_data, num_transforms=1,evaluation=Fa except Exception as e: # and print the exception: print(e) print("Simulation diverged.") - error_metrics = {"MAE":[np.NAN],"RMSE":[np.NAN],"NSE":[np.NAN],"alpha":[np.NAN],"beta":[np.NAN],"HFV":[np.NAN],"HFV10":[np.NAN],"LFV":[np.NAN],"FDC":[np.NAN],"diverged":True} + error_metrics = {"MAE":[np.nan],"RMSE":[np.nan],"NSE":[np.nan],"alpha":[np.nan],"beta":[np.nan],"HFV":[np.nan],"HFV10":[np.nan],"LFV":[np.nan],"FDC":[np.nan],"diverged":True} return {'prediction':prediction, 'error_metrics':error_metrics} else: - error_metrics = {"MAE":[np.NAN],"RMSE":[np.NAN],"NSE":[np.NAN],"alpha":[np.NAN],"beta":[np.NAN],"HFV":[np.NAN],"HFV10":[np.NAN],"LFV":[np.NAN],"FDC":[np.NAN]} + error_metrics = {"MAE":[np.nan],"RMSE":[np.nan],"NSE":[np.nan],"alpha":[np.nan],"beta":[np.nan],"HFV":[np.nan],"HFV10":[np.nan],"LFV":[np.nan],"FDC":[np.nan]} return {'prediction':prediction, 'error_metrics':error_metrics,"diverged":False} @@ -1059,14 +1060,15 @@ def lti_system_gen(causative_topology, system_data,independent_columns,dependent # Figure out how many library features there will be library = ps.PolynomialLibrary(degree=1,include_bias = False, include_interaction=False) #total_train = pd.concat((response,forcing), axis='columns') - library.fit([ps.AxesArray(feature_names,{"ax_sample":0,"ax_coord":1})]) + # fit on a dummy (2, n_features) array; 2 rows is the minimum pysindy requires + library.fit(np.zeros((2, len(feature_names)))) n_features = library.n_output_features_ #print(f"Features ({n_features}):", library.get_feature_names()) # Set constraints #n_targets = total_train.shape[1] # not sure what targets means after reading through the pysindy docs #print("n_targets") #print(n_targets) - constraint_rhs = 0 + constraint_rhs = np.zeros(1) # one row per constraint, one column per coefficient constraint_lhs = np.zeros((1 , n_features )) @@ -1080,8 +1082,7 @@ def lti_system_gen(causative_topology, system_data,independent_columns,dependent model = ps.SINDy( differentiation_method= ps.FiniteDifference(), feature_library=ps.PolynomialLibrary(degree=1,include_bias = False, include_interaction=False), - optimizer = ps.ConstrainedSR3(threshold=0, thresholder = "l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), - feature_names = feature_names + optimizer = _ConstrainedSR3(reg_weight_lam=0, regularizer="l2",constraint_lhs=constraint_lhs, constraint_rhs = constraint_rhs, inequality_constraints=True), ) else: # unoconstrained @@ -1089,7 +1090,6 @@ def lti_system_gen(causative_topology, system_data,independent_columns,dependent differentiation_method= ps.FiniteDifference(order=10,drop_endpoints=True), feature_library=ps.PolynomialLibrary(degree=1,include_bias = False, include_interaction=False), optimizer=ps.optimizers.STLSQ(threshold=0,alpha=0), - feature_names = feature_names ) if system_data.loc[:,immediate_forcing].empty: # the subsystem is autonomous instant_fit = model.fit(x = system_data.loc[:,row] ,t = np.arange(0,len(system_data.index),1)) @@ -1302,9 +1302,9 @@ def lti_system_gen(causative_topology, system_data,independent_columns,dependent - A.fillna(0.0,inplace=True) - B.fillna(0.0,inplace=True) - C.fillna(0.0,inplace=True) + A = A.apply(pd.to_numeric, errors='coerce').fillna(0.0) + B = B.apply(pd.to_numeric, errors='coerce').fillna(0.0) + C = C.apply(pd.to_numeric, errors='coerce').fillna(0.0) # if bibo_stable is specified and A not hurwitz, make A hurwitz by defining A' = A - I*max(real(eig(A))) # this will gaurantee stability (max eigenvalue will have real part < 0) @@ -2693,6 +2693,15 @@ def infer_causative_topology(system_data, dependent_columns, independent_columns def topo_from_pystorms(pystorms_scenario): + # pyswmm 2.x prevents multiple simultaneous simulations in the same process + # via a class-level flag. Reset it so we can open our own simulation even if + # the caller's pystorms scenario already has one open internally. + try: + from pyswmm.simulation import _sim_state_instance + _sim_state_instance.sim_is_instantiated = False + except ImportError: + pass + # if any are 3-tuples, chop them down to 2-tuples pystorms_scenario.config['states'] = [t[:-1] if len(t) == 3 else t for t in pystorms_scenario.config['states']] diff --git a/pyproject.toml b/pyproject.toml index 88b1ec2..8e3fa93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,3 +18,6 @@ check_untyped_defs = false [tool.pytest.ini_options] testpaths = ["tests"] +markers = [ + "slow: marks tests as slow (uses large data files or long simulations)", +] diff --git a/requirements.txt b/requirements.txt index 5659692..383a7c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,10 +4,13 @@ pandas>=2.0 scipy>=1.10 matplotlib>=3.7 control>=0.9 -pysindy>=1.7 +pysindy>=2.0 +cvxpy>=1.3 networkx>=3.0 -pyswmm>=1.0 +pyswmm>=2.0 +pystorms>=1.0 statsmodels>=0.14 +dill>=0.3 # Testing pytest>=7.0 diff --git a/test.py b/test.py deleted file mode 100644 index ef96fb7..0000000 --- a/test.py +++ /dev/null @@ -1,104 +0,0 @@ -import numpy as np -import pandas as pd -import scipy.stats as stats -import os -import matplotlib.pyplot as plt -import modpods - -# basic funcionality tests and a bit of a tutorial - -# some data from the CAMELS dataset -# change the filepath to wherever you have modpods at -# "C:\modpods\03439000_05_model_output.txt" -filepath = "C:/modpods/03439000_05_model_output.txt" - - -df = pd.read_csv(filepath, sep='\s+') -print(df) -print(df.columns) -# combine the columns YR, MNTH, DY, and YR into a single datetime column -df.rename({'YR':'year','MNTH':'month','DY':'day','HR':'hour'},axis=1,inplace=True) -df['datetime'] = pd.to_datetime(df[['year','month','day','hour']]) - -# set the index to the datetime column -df.set_index('datetime',inplace=True) -# shift the forcing back one timestep (one day) to make the system causal - -print(df[['OBS_RUN','RAIM']]) -df.RAIM = df.RAIM.shift(-1) -df.dropna(inplace=True) -print(df[['OBS_RUN','RAIM']]) - - -# for better results (and slower run) up the max iterations, model complexity (poly_order and max_transforms), and the number of years used to train - - - -# drop all columns except for RAIM (surface water input) and OBS_RUN (observed runoff) for actual CAMELS training -# but for testing the MIMO delay_io_model I want multiple inputs and multiple outputs -windup_timesteps = 30 # days of windup -years = 1 -df_train = df.iloc[:365*years + windup_timesteps,:] # total data used, actually trained on this less the windup period -df_eval = df.iloc[-(365*years + windup_timesteps):,:] # data for evaluation, not used in training - -#df.plot(y=['OBS_RUN','RAIM']) -#plt.show() - - - -#df['ones'] = np.ones(len(df.OBS_RUN)) # to make sure MIMO error metrics are working correctly -print(df_train) -forcing_coef_constraints = {'RAIM':-1, 'PET':1,'PRCP':-1} -df_train = df_train[['OBS_RUN','RAIM','PET','PRCP']] -rainfall_runoff_model = modpods.delay_io_train(df_train, ['OBS_RUN'],['RAIM','PET','PRCP'],windup_timesteps=windup_timesteps, - init_transforms=1, max_transforms=1,max_iter=10, verbose=True, forcing_coef_constraints= forcing_coef_constraints, - poly_order=1, bibo_stable=False) - - -print(rainfall_runoff_model) -print(rainfall_runoff_model[1]) -print("error metrics") -print(rainfall_runoff_model[1]['final_model']['error_metrics']) -#print(rainfall_runoff_model[2]['final_model']['error_metrics']) -#print(rainfall_runoff_model[3]['final_model']['error_metrics']) -print("shapes") -print(rainfall_runoff_model[1]['shape_factors']) -#print(rainfall_runoff_model[2]['shape_factors']) -#print(rainfall_runoff_model[3]['shape_factors']) - -# plot the results -fig, ax = plt.subplots(1,1,figsize=(8,4)) -ax.plot(df_train.index[windup_timesteps+1:],rainfall_runoff_model[1]['final_model']['response']['OBS_RUN'][windup_timesteps+1:],label='observed') -ax.plot(df_train.index[windup_timesteps+1:],rainfall_runoff_model[1]['final_model']['simulated'][:,0],label='simulated') -#ax.set_title('1 transformation') -ax.legend() -plt.title("training") -''' -ax[1].plot(df.index[windup_timesteps+1:],rainfall_runoff_model[2]['final_model']['response']['OBS_RUN'][windup_timesteps+1:],label='observed') -ax[1].plot(df.index[windup_timesteps+1:],rainfall_runoff_model[2]['final_model']['simulated'][:,0],label='simulated') -ax[1].set_title('2 transformations') -ax[1].legend() -ax[2].plot(df.index[windup_timesteps+1:],rainfall_runoff_model[3]['final_model']['response']['OBS_RUN'][windup_timesteps+1:],label='observed') -ax[2].plot(df.index[windup_timesteps+1:],rainfall_runoff_model[3]['final_model']['simulated'][:,0],label='simulated') -ax[2].set_title('3 transformations') -ax[2].legend() -''' -plt.show() -plt.close('all') - - - -# now test prediction / evaluation -eval_sim = modpods.delay_io_predict(rainfall_runoff_model, df_eval, 1,evaluation=True) -print("error metrics") -print(eval_sim['error_metrics']) -fig, ax = plt.subplots(1,1,figsize=(8,4)) -ax.plot(df_eval.index[windup_timesteps+1:],df_eval['OBS_RUN'][windup_timesteps+1:],label='observed') -ax.plot(df_eval.index[windup_timesteps+1:],eval_sim['prediction'],label='simulated') -#ax.set_title('1 transformation') -ax.legend() -plt.title("evaluation") -plt.show() - - - diff --git a/test_coef_constraints.py b/test_coef_constraints.py deleted file mode 100644 index 2058c47..0000000 --- a/test_coef_constraints.py +++ /dev/null @@ -1,114 +0,0 @@ -import numpy as np -import pandas as pd -import scipy.stats as stats -import matplotlib.pyplot as plt -import modpods - -import control as ct - -cartoon = False -plot = False - -# set the random seed -np.random.seed(0) - -# define an LTI matrix where I know what the causative topology (connections) should look like -# this is an easy test case. one input affects two states, which then affect the output. so only four connections total -A = np.diag(-1*np.ones(10)) - -# define cascade connections -A[1,0] = 1 -A[2,1] = 1 -A[3,2] = 1 -A[4,3] = 1 -A[5,4] = 1 # two reservoirs -A[6,5] = 1 -A[7,6] = 1 - -# define connections to output -A[9,7] = -1 -A[9,8] = 1 - -# define input -B = np.zeros(shape=(10,2)) -B[0,0]= 1 -B[8,1] = 1 - -C = np.zeros(shape=(3,10)) -C[0,2] = 1 # x2 is observed -C[1,8] = 1 # x8 is observed -C[2,9] = 1 # x9 is observed - -parallel_reservoirs = ct.ss(A,B,C,0,inputs=['u1','u2'],outputs=['x2','x8','x9']) -time_base = 150 -# dt = .1 -# dt = 0.5 -# dt = 1 -# dt = 2 - -dt = .05 - - -if cartoon: - dt = 0.05 -u = np.zeros((int(time_base / dt),2)) - -u[int(25/dt):int(50/dt),0] = np.random.rand(len(u[int(25/dt):int(50/dt),0]))-0.5 -u[int(5/dt):int(20/dt),1] = np.random.rand(len(u[int(5/dt):int(20/dt),1]))-0.5 -u[abs(u) < 0.40] = 0 # make it sparse (~80% of timesteps set to zero) -u[:,0] = u[:,0]*np.random.rand(len(u))*1000 -u[:,1] = u[:,1]*np.random.rand(len(u))*100 - - -if cartoon: # make the forcing simple - u = np.zeros((int(time_base / dt),2)) - u[int(5/dt):int(6/dt),0] = 1 - u[int(0/dt):int(1/dt),1] = 1 - - -T = np.arange(0,time_base,dt) -response = ct.forced_response(parallel_reservoirs,T,np.transpose(u)) - -system_data = pd.DataFrame(index=T) -system_data['u1'] = response.inputs[0][:] -system_data['u2'] = response.inputs[1][:] -system_data['x2'] = response.outputs[0][:] -system_data['x8'] = response.outputs[1][:] -system_data['x9'] = response.outputs[2][:] - -''' -system_data['x2'] = response.states[2][:] -system_data['x8'] = response.states[8][:] -system_data['x9'] = response.states[9][:] -''' - - -if plot: - system_data.plot(figsize=(10,5), subplots=True,legend=True) - plt.show() - if cartoon: - # also make a more cartoony version - cartoon_plot_data = system_data.copy() - # normalize all the magnitudes in cartoon_plot_data such as that all columns have maximum of 1 - for col in cartoon_plot_data.columns: - cartoon_plot_data[col] = cartoon_plot_data[col]/np.max(np.abs(cartoon_plot_data[col])) - - cartoon_plot_data.iloc[:int(len(cartoon_plot_data)/10)].plot(figsize=(5,5), subplots=False,legend=True,fontsize='xx-large',style=['r','b','g','m','k'],xticks=[],yticks=[],linewidth=5) - plt.gca().axis('off') # get rid of bounding box - plt.savefig("C:/modpods/test_lti_system_gen_cartoon.svg") - plt.show() - -forcing_coef_constraints = dict() -for column in system_data.columns: - if 'u' in column: - forcing_coef_constraints[column] = -1 # assume both forcing coefficients are negative - # they are both actually positive, so the coefficients would only be negative if the constraint was in action - -forcing_coef_constraints['u1'] = 1 # make u1 (and its powers and transformations) positive -forcing_coef_constraints['u2'] = -1 # make u2 (and its powers and transformations) positive -rainfall_runoff_model = modpods.delay_io_train(system_data,dependent_columns = ['x8'],independent_columns=['u1','u2'],windup_timesteps = 0, - init_transforms=1,max_transforms=1,max_iter=100, - poly_order = 1, verbose = True,bibo_stable=True, forcing_coef_constraints=forcing_coef_constraints) - - -print(rainfall_runoff_model) diff --git a/test_lti_control_of_swmm.py b/test_lti_control_of_swmm.py deleted file mode 100644 index 4cdf2d1..0000000 --- a/test_lti_control_of_swmm.py +++ /dev/null @@ -1,592 +0,0 @@ -# reference: https://colab.research.google.com/github/kLabUM/pystorms/blob/master/tutorials/Scenario_Gamma.ipynb - -import sys -#from modpods import topo_from_pystorms -#sys.path.append("G:/My Drive/modpods") -import modpods -import pystorms -import numpy as np -import matplotlib.pyplot as plt -import pandas as pd -import control as ct -import dill as pickle - - - -# uncontrolled -env = pystorms.scenarios.gamma() -done = False -print("simulating uncontrolled case") -while not done: - # Query the current state of the simulation - state = env.state() - - # Initialize actions to have each asset open - actions = np.ones(11) - - # Set the actions and progress the simulation - done = env.step(actions) - -# Calculate the performance measure for the uncontrolled simulation -uncontrolled_perf = sum(env.data_log["performance_measure"]) -''' -print("The calculated performance for the uncontrolled case of Scenario gamma is:") -print("{:.4e}".format(uncontrolled_perf)) - -basin_max_depths = [5., 10., 10., 10.] - -plt.figure(figsize=(15,6)) -plt.subplot(1,2,1) -plt.plot(np.asarray(env.data_log['depthN']['1'])/basin_max_depths[0], label='Basin 1') -plt.plot(np.asarray(env.data_log['depthN']['2'])/basin_max_depths[1], label='Basin 2') -plt.plot(np.asarray(env.data_log['depthN']['3'])/basin_max_depths[2], label='Basin 3') -plt.plot(np.asarray(env.data_log['depthN']['4'])/basin_max_depths[3], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Filling Degree') -plt.legend() - -plt.subplot(1,2,2) -plt.plot(env.data_log['flow']['O1'], label='Basin 1') -plt.plot(env.data_log['flow']['O2'], label='Basin 2') -plt.plot(env.data_log['flow']['O3'], label='Basin 3') -plt.plot(env.data_log['flow']['O4'], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Basin Outflow (cfs)') -plt.legend() -#plt.show() -plt.close() -''' -uncontrolled_data = env.data_log - -def controller_efd(state, max_depths): - # Initialize the action space so that we can compute the new settings - new_settings = np.ones(len(state)) - # Set equal filling degree parameters - c = 1.5 - theta = 0.25 - - # Assign the current depth in each basin - depths = state - - # Compute the filling degrees - fd = depths/max_depths - # Compute the average filling degree across each controlled basin - fd_average = sum(fd)/len(fd) - - # Update each valve setting based on the relative fullness of each basin - for i in range(0,len(fd)): - - # If a basin is very full compared to the average, we should open its - # valve to release some water - if fd[i] > fd_average: - new_settings[i] = c*(fd[i]-fd_average) - - # If a basin's filling degree is close to the average (within some value - # theta), its setting can be close to that average - elif fd_average-fd[i] <= theta: - new_settings[i] = fd_average - - # If a basin is very empty compared to the average, we can close its - # valve to store more water at that location, prioritizing releasing at - # the other locations - else: - new_settings[i] = 0. - - # Make sure the settings are in bounds [0,1] - new_settings[i] = min(new_settings[i], 1.) - new_settings[i] = max(new_settings[i], 0.) - - return new_settings - - -env = pystorms.scenarios.gamma() -done = False - -# Specify the maximum depths for each basin we are controlling -basin_max_depths = [5., 10., 10., 10.] -print("simulating equal filling degree") -while not done: - # Query the current state of the simulation - state = env.state() - # Isolate only the states that we need (the 4 downstream basin depths) - states_relevant = state[0:4] - - # Pass the current, relevant states and the maximum basin - # depths into our equal filling degree logic - actions_efd = controller_efd(states_relevant, basin_max_depths) - # Specify that the other 7 valves in the network should be - # open since we are not controlling them here - actions_uncontrolled = np.ones(7) - # Join the two above action arrays - actions = np.concatenate((actions_efd, actions_uncontrolled), axis=0) - - # Set the actions and progress the simulation - done = env.step(actions) - -# Calculate the performance measure for the uncontrolled simulation -equalfilling_perf = sum(env.data_log["performance_measure"]) -ef_data = env.data_log -''' -print("The calculated performance for the equal filling degree case of Scenario gamma is:") -print("{}.".format(equalfilling_perf)) - -plt.figure(figsize=(15,6)) -plt.subplot(1,2,1) -plt.plot(np.asarray(env.data_log['depthN']['1'])/basin_max_depths[0], label='Basin 1') -plt.plot(np.asarray(env.data_log['depthN']['2'])/basin_max_depths[1], label='Basin 2') -plt.plot(np.asarray(env.data_log['depthN']['3'])/basin_max_depths[2], label='Basin 3') -plt.plot(np.asarray(env.data_log['depthN']['4'])/basin_max_depths[3], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Filling Degree') -plt.legend() - -plt.subplot(1,2,2) -plt.plot(env.data_log['flow']['O1'], label='Basin 1') -plt.plot(env.data_log['flow']['O2'], label='Basin 2') -plt.plot(env.data_log['flow']['O3'], label='Basin 3') -plt.plot(env.data_log['flow']['O4'], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Basin Outflow (cfs)') -plt.legend() -''' - -''' -ef_flows = pd.DataFrame.from_dict(ef_data['flow']) -ef_flows.columns = env.config['action_space'] -ef_depthN = pd.DataFrame.from_dict(ef_data['depthN']) -ef_depthN.columns = env.config['states'] -ef_response = pd.concat([ef_flows, ef_depthN], axis=1) -ef_response.index = env.data_log['simulation_time'] -print(ef_response) -# for the columns of ef_response which do not contain the string "O" (i.e. the depths) -# if the current column name is "X", make it "(X, depthN)" -# this is because the modpods library expects the depths to be named "X, depthN" -# where X is the name of the corresponding flow -for col in ef_response.columns: - if "O" in col: # the orifices - # if there's a number in that column name that's greater than 4, drop this column - # this is because we're only controlling the first 4 orifices and these measurements are redundant to the storage node depths if the orifices are always open - if int(col[1:]) > 4: - ef_response.drop(columns=col, inplace=True) - - ef_response.rename(columns={col: (col, "flow")}, inplace=True) - - - -print(ef_response) -''' - -# do a training simulation such that the flows are actually independent of the depths -# in both the uncontrolled and efd scenarios the flows at the orifice are highly coupled to the depths -env = pystorms.scenarios.gamma() -done = False - -# Specify the maximum depths for each basin we are controlling -basin_max_depths = [5., 10., 10., 10.] -actions_characterize = np.ones(4) -step = 0 -print("running characterization simulation") -while not done: - - if step % 1000 == 0: - actions_characterize = np.ones(4)*0.3 # mostly close all the valves - actions_characterize[np.random.randint(0,4)] = np.random.rand() # open one valve a random amount - - actions_uncontrolled = np.ones(7) - # Join the two above action arrays - actions = np.concatenate((actions_characterize, actions_uncontrolled), axis=0) - - # Set the actions and progress the simulation - done = env.step(actions) - step += 1 -random_perf = sum(env.data_log["performance_measure"]) -print("performance of characterization:") -print("{:.4e}".format(random_perf)) -''' -plt.figure(figsize=(15,6)) -plt.subplot(1,2,1) -plt.plot(np.asarray(env.data_log['depthN']['1'])/basin_max_depths[0], label='Basin 1') -plt.plot(np.asarray(env.data_log['depthN']['2'])/basin_max_depths[1], label='Basin 2') -plt.plot(np.asarray(env.data_log['depthN']['3'])/basin_max_depths[2], label='Basin 3') -plt.plot(np.asarray(env.data_log['depthN']['4'])/basin_max_depths[3], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Filling Degree') -plt.legend() - -plt.subplot(1,2,2) -plt.plot(env.data_log['flow']['O1'], label='Basin 1') -plt.plot(env.data_log['flow']['O2'], label='Basin 2') -plt.plot(env.data_log['flow']['O3'], label='Basin 3') -plt.plot(env.data_log['flow']['O4'], label='Basin 4') -plt.xlabel('Simulation Timestep') -plt.ylabel('Basin Outflow (cfs)') -plt.legend() -plt.show() -''' - -training_data = env.data_log -training_flows = pd.DataFrame.from_dict(training_data['flow']) -training_flows.columns = env.config['action_space'] -training_depthN = pd.DataFrame.from_dict(training_data['depthN']) -training_depthN.columns = env.config['states'] -training_response = pd.concat([training_flows, training_depthN], axis=1) -training_response.index = env.data_log['simulation_time'] -print(training_response) - - -# for the columns of training_response which do not contain the string "O" (i.e. the depths) -# if the current column name is "X", make it "(X, depthN)" -# this is because the modpods library expects the depths to be named "X, depthN" -# where X is the name of the corresponding flow -for col in training_response.columns: - if "O" in col: # the orifices - # if there's a number in that column name that's greater than 4, drop this column - # this is because we're only controlling the first 4 orifices and these measurements are redundant to the storage node depths if the orifices are always open - if int(col[1:]) > 4: - training_response.drop(columns=col, inplace=True) - - training_response.rename(columns={col: (col, "flow")}, inplace=True) - - -#training_response['rain'] = 0 -#training_response['rain'][0] = 1 # just an impulse that says "something happened". not even real rain data - -print(training_response) - - - -# for debugging resample to a coarser time step (native resolution is about one minute but not consistent) -# need a consistent time step for modpods -orig_index_length = len(training_response.index) -training_response = training_response.resample('10T',axis='index').mean().copy(deep=True) -training_dt = orig_index_length / len(training_response.index) -print(training_response) -# get rid of the initial filling, this is confusing to the model because the forcing (rainfall) isn't observed so the storage nodes seem to rise autonomously -# could also include dummy forcing -training_response = training_response.iloc[60:,:] # start ten hours in -print(training_response) - -# we'll only use the training response to infer the topology and dynamics -# for this experiment assume all of flow O1-O11 and depth 1-11 are observable -# but only O1-O4 are controllable -#independent_columns = training_response.columns[0:4] # orifices O1 through O4 -#dependent_columns = training_response.drop(columns=independent_columns).columns - -dependent_columns = training_response.columns[4:8] # just depths at 1 through 4 -independent_columns = training_response.drop(columns = dependent_columns).columns - -use_blind = False -# learn the topology from the data -# this will be the "blind" plant model -if use_blind: # don't have this on all the time because it's very expensive - blind_topo = modpods.infer_causative_topology(training_response, dependent_columns = dependent_columns, - independent_columns = independent_columns, verbose=True,swmm=True) - - print(blind_topo.causative_topo) - print(blind_topo.total_graph) - - -# read the topology from the swmm file (this is much cheaper) -env.config['states'] = dependent_columns -env.config['action_space'] = independent_columns -# the default is controlling all 11 orifices so we need to edit the environment -print("defining topology") -swmm_topo = modpods.topo_from_pystorms(env) -#swmm_topo['rain'] = 'd' -# define delayed connection to rain, delayed impact on all storage nodes -#independent_columns.append('rain') - -# the index of the causative topology should be the dependent columns -#swmm_topo.index = dependent_columns -# the columns of the causative topology should be the dependent columns plus the independent columns -#swmm_topo.columns = dependent_columns.append(independent_columns) - -# show all columns when printing dataframes -pd.set_option('display.max_columns', None) -#print(swmm_topo) - -if use_blind: - print("differences in topology") - print(blind_topo.causative_topo.compare(swmm_topo)) - -# learn the dynamics now, or load a previously learned model -''' -# learn the dynamics from the trainingd response -print("learning dynamics") -lti_plant_approx_seeing = modpods.lti_system_gen(swmm_topo, training_response, - independent_columns= independent_columns, - dependent_columns = dependent_columns, max_iter = 100, - swmm=True,bibo_stable=True,max_transition_state_dim=5) -# pickle the plant approximation to load later -with open('G:/My Drive/modpods/swmm_lti_plant_approx_seeing.pickle', 'wb') as handle: - pickle.dump(lti_plant_approx_seeing, handle) -''' - -# load the plant approximation from a pickle -with open('G:/My Drive/modpods/swmm_lti_plant_approx_seeing.pickle', 'rb') as handle: - print("loading previously trained model") - lti_plant_approx_seeing = pickle.load(handle) - -if use_blind: - lti_plant_approx_blind = modpods.lti_system_gen(blind_topo, training_response, - independent_columns= independent_columns, - dependent_columns = dependent_columns) - - - -# is the plant approximation internally stable? -plant_eigenvalues,_ = np.linalg.eig(lti_plant_approx_seeing['A'].values) - -# cast the columns of dataframes to strings for easier indexing -training_response.columns = training_response.columns.astype(str) -dependent_columns = [str(col) for col in dependent_columns] -independent_columns = [str(col) for col in independent_columns] -# reindex the training_response to an integer step -training_response.index = np.arange(0,len(training_response),1) -''' -# evaluate the plant approximation accuracy -# only plot the depths at 1, 2, 3, and 4 -# the forcing is the flows at O1, O2, O3, and O4 -approx_response = ct.forced_response(lti_plant_approx_seeing['system'], U=np.transpose(training_response[independent_columns].values), T=training_response.index.values) -approx_data = pd.DataFrame(index=training_response.index.values) -approx_data[dependent_columns[0]] = approx_response.outputs[0][:] -approx_data[dependent_columns[1]] = approx_response.outputs[1][:] -approx_data[dependent_columns[2]] = approx_response.outputs[2][:] -approx_data[dependent_columns[3]] = approx_response.outputs[3][:] - -output_columns = dependent_columns[0:4] # depth at 1,2,3,4 - -# create a vertical subplot of 3 axes -fig, axes = plt.subplots(4, 1, figsize=(10, 10)) - -for idx in range(len(output_columns)): - axes[idx].plot(training_response[output_columns[idx]],label='actual') - axes[idx].plot(approx_data[output_columns[idx]],label='approx') - if idx == 0: - axes[idx].legend(fontsize='x-large',loc='best') - axes[idx].set_ylabel(output_columns[idx],fontsize='large') - if idx == len(output_columns)-1: - axes[idx].set_xlabel("time",fontsize='x-large') -# label the left column of plots "training" -axes[0].set_title("outputs",fontsize='xx-large') - -plt.savefig("G:/My Drive/modpods/test_lti_control_of_swmm_plant_approx.png") -plt.savefig("G:/My Drive/modpods/test_lti_control_of_swmm_plant_approx.svg") -#plt.show() -plt.close() -# same plot, but just the first few timesteps (this is the accuracy that matters for feedback control) -# create a vertical subplot of 3 axes -fig, axes = plt.subplots(4, 1, figsize=(10, 10)) - -for idx in range(len(output_columns)): - axes[idx].plot(training_response[output_columns[idx]][:10],label='actual') - axes[idx].plot(approx_data[output_columns[idx]][:10],label='approx') - if idx == 0: - axes[idx].legend(fontsize='x-large',loc='best') - axes[idx].set_ylabel(output_columns[idx],fontsize='large') - if idx == len(output_columns)-1: - axes[idx].set_xlabel("time",fontsize='x-large') -# label the left column of plots "training" -axes[0].set_title("outputs",fontsize='xx-large') - -plt.savefig("G:/My Drive/modpods/test_lti_control_of_swmm_plant_approx_first10.png") -plt.savefig("G:/My Drive/modpods/test_lti_control_of_swmm_plant_approx_first10.svg") -#plt.show() -plt.close() -''' -# define the cost function -Q = np.eye(len(lti_plant_approx_seeing['A'].columns)) / 10e12 # we don't want to penalize the transition states as their magnitude doesn't have directly tractable physical meaning -# bryson's rule based on the maxiumum depth of each basin -# note: the swmm file gamma.inp actually specifies the maximum depth of storage node 1 as 10 feet, -# but i'll be consistent with the configuration of the efd controller for consistent comparison -basin_max_depths_all = [5.0, 10.0, 10.0, 10.0, 10.0,20.0, 10.0, 10.0,10.0, 13.72, 14.96] -for asset_index in range(len(dependent_columns)): - Q[lti_plant_approx_seeing['A'].columns.get_loc(dependent_columns[asset_index]),lti_plant_approx_seeing['A'].columns.get_loc(dependent_columns[asset_index])] = 1 / ((basin_max_depths_all[asset_index])**2 ) - -flood_weighting = 1.5 # how much more important is flooding than the other objectives? -Q = Q * flood_weighting # set flood_weighting = 1 for basic bryson's rule (care about flows as much as flooding) -# threshold on flows at 0.11 m^3 / s which is 3.9 cfs -R = np.eye(4) / (3.9**2) # bryson's rule on maximum allowable flow -# define the system -# sys_response_to_control = ct.ss(lti_plant_approx_seeing['A'],lti_plant_approx_seeing['B'],lti_plant_approx_seeing['C'],0) # just for defining the controller gain -# (not necessary in this case, but would be if you've got disturbances) -# find the state feedback gain for the linear quadratic regulator -print("defining controller") -K,S,E = ct.lqr(lti_plant_approx_seeing['A'],lti_plant_approx_seeing['B'].values[:,0:4],Q,R) # only the first four columns of B represent control inputs, the rest are disturbances -#print("feedback poles (A-BK)") -feedback_poles,_ = np.linalg.eig(lti_plant_approx_seeing['A'].values - lti_plant_approx_seeing['B'].values[:,0:4]@K) -#print(feedback_poles) -# define the observer gain - -# "fast" relative to the controller -print("defining observer") -#L = ct.place(np.transpose(lti_plant_approx_seeing['A'].values), np.transpose(lti_plant_approx_seeing['C'].values),5*np.real(feedback_poles)) - - -# based on assumed noise -measurement_noise = 0.05 # more measurement noise means a worse sensor -process_noise = 1 # more process noise means a worse model -L,P,E = ct.lqe(lti_plant_approx_seeing['system'],process_noise*np.eye(len(lti_plant_approx_seeing['B'].columns)),measurement_noise*np.eye(len(lti_plant_approx_seeing['C'].index)) ) # unit covariance on process noise and measurement error -#print("observer poles") - -observer_poles,_ = np.linalg.eig(lti_plant_approx_seeing['A'].values - L@lti_plant_approx_seeing['C'].values) -#print(observer_poles) - -''' -# define the observer based compensator (per freudenberg 560 course notes 2.4) -obc_A = lti_plant_approx_seeing['A'].values-lti_plant_approx_seeing['B'].values@K - L@lti_plant_approx_seeing['C'].values -# ingests measurements, returns control actions -obc = ct.ss(obc_A, L, -K, 0, inputs=list(lti_plant_approx_seeing['C'].index),outputs=list(lti_plant_approx_seeing['B'].columns)) # negate K to give back commanded flows which are positive - - -# need to separately define the observer and controller because we can't close the loop in the typical way -# the observer takes the control input and measured output as inputs and outputs the estimated full state -#observer_input = np.concatenate((lti_plant_approx_seeing['B'].values,L),axis = 1) -#observer = ct.ss(lti_plant_approx_seeing['A'] - L@lti_plant_approx_seeing['C'].values, observer_input, np.eye(len(lti_plant_approx_seeing['A']) ) , 0 ) - -# the controller takes in an estiamte of the state and returns a control command -# this is just, u = -K @ xhat, not necessary to define a state space model for that as there's no evolution - -# can't form the closed loop system because the plant is not a state space system, but rather a software model -# the state estimate and control actions will be computed iteratively as the simulation is stepped through -''' - -env = pystorms.scenarios.gamma() -done = False - -u = np.zeros((4,1) ) # start with all orifices completely closed -u_open_pct = np.zeros((4,1)) # start with all orifices completely closed -xhat = np.zeros((len(lti_plant_approx_seeing['A'].columns),1)) # initial state estimate - -steps = 0 # make sure the estimator and controller operate at the frequency the approxiation was trained at - -# convert control command (flow) into orifice open percentage -# per the EPA-SWMM user manual volume ii hydraulics, orifices (section 6.2, page 107) - https://nepis.epa.gov/Exe/ZyPDF.cgi/P100S9AS.PDF?Dockey=P100S9AS.PDF -# all orifices in gamma are "bottom" -Cd = 0.65 # happens to be the same for all of them -Ao = 1 # area is one square foot. again, happens to be the same for all of them. -g = 32.2 # ft / s^2 -# the expression for discharge is found using Torricelli's equation: Q = Cd * (Ao*open_pct) sqrt(2*g*H_e) -# H_e is the effective head in feet, which is just the depth in the basin as the orifices are "bottom" -# to get the action command as a percent open, we solve as: open_pct = Q_desired / (Cd * Ao * sqrt(2*g*H_e)) - -while not done: - # Query the current state of the simulation - observables = env.state() - y_measured = observables[:4].reshape(-1,1) # depths at 1-4 - d = observables[4:].reshape(-1,1) # "disturbances", depths at 5-11 - - - # for updating the plant, calculate the "u" that is actually applied to the plant, not the desired control input - for idx in range(len(u)): - u[idx,0] = Cd*Ao*u_open_pct[idx,0]*np.sqrt(2*g*observables[idx]) # calculate the actual flow through the orifice - - # update the observer based on these measurements -> xhat_dot = A xhat + B u + L (y_m - C xhat) - # right now the state evolution is exactly cancelling the observer. why would this be? - state_evolution = lti_plant_approx_seeing['A'].values @ xhat - # causing the state estimation to diverge to crazy values - impact_of_control = lti_plant_approx_seeing['B'].values @ np.concatenate((u,d),axis=0) - yhat = lti_plant_approx_seeing['C'].values @ xhat # just for reference, could be useful for plotting later - y_error = y_measured - yhat # cast observables to be 2 dimensional - output_updating = L @ y_error - xhat_dot = (state_evolution + impact_of_control + output_updating) / training_dt # divide by the training frequency to get the change in state over the time step - #xhat_dot = (impact_of_control + output_updating) / training_dt # don't listen to the state dynamics, just use them to define the feedback -> that lost stability - xhat += xhat_dot # update the state estimate - - # note that this is only truly the error if there is zero error in the measurements. Generally, data is not truth. - u = -K @ xhat # calculate control command - - - u_open_pct = u*-1 - - for idx in range(len(u)): - head = 2*g*observables[idx] - - if head < 0.01: # if the head is less than 0.01 ft, the basin is empty, so close the orifice - u_open_pct[idx,0] = 0 - else: - u_open_pct[idx,0] = u[idx,0] / (Cd*Ao * np.sqrt(2*g*observables[idx])) # open percentage for desired flow rate - - if u_open_pct[idx,0] > 1: # if the calculated open percentage is greater than 1, the orifice is fully open - u_open_pct[idx,0] = 1 - elif u_open_pct[idx,0]< 0: # if the calculated open percentage is less than 0, the orifice is fully closed - u_open_pct[idx,0] = 0 - - - # Specify that the other 7 valves in the network should be - # open since we are not controlling them here - actions_uncontrolled = np.ones(7) - # Join the two above action arrays - actions = np.concatenate((u_open_pct.flatten(), actions_uncontrolled), axis=0) - - # Set the actions and progress the simulation - done = env.step(actions) - steps += 1 - - if steps % 1000 == 0: - print("u_open_pct") - print(u_open_pct) - print("yhat") - print(yhat) - print("y error") - print(y_error) - - - -# Calculate the performance measure for the uncontrolled simulation -obc_perf = sum(env.data_log["performance_measure"]) -obc_data = env.data_log - -print("The calculated performance for the uncontrolled case of Scenario gamma is:") -print("{:.4e}".format(uncontrolled_perf)) -print("for the equal filling degree case of Scenario gamma is:") -print("{:.4e}".format(equalfilling_perf)) -print("for the random control case:") -print("{:.4e}".format(random_perf)) -print("for the observer based compensator case of Scenario gamma is:") -print("{:.4e}".format(obc_perf)) - -fig,axes = plt.subplots(4,2,figsize=(20,10)) -fig.suptitle("Pystorms Scenario Gamma") -axes[0,0].set_title("Valves",fontsize='xx-large') -axes[0,1].set_title("Storage Nodes",fontsize='xx-large') - -valves = ["O1","O2","O3","O4"] -storage_nodes = ["1","2","3","4"] -cfs2cms = 35.315 -ft2meters = 3.281 -# plot the valves -for idx in range(4): - axes[idx,0].plot(uncontrolled_data['simulation_time'],np.array(uncontrolled_data['flow'][valves[idx]])/cfs2cms,label='Uncontrolled',color='k',linewidth=2) - axes[idx,0].plot(ef_data['simulation_time'],np.array(ef_data['flow'][valves[idx]])/cfs2cms,label='Equal Filling',color='b',linewidth=2) - axes[idx,0].plot(obc_data['simulation_time'],np.array(obc_data['flow'][valves[idx]])/cfs2cms,label='LTI Feedback',color='g',linewidth=2) - # add a dotted red line indicating the flow threshold - axes[idx,0].hlines(3.9/cfs2cms, uncontrolled_data['simulation_time'][0],uncontrolled_data['simulation_time'][-1],label='Threshold',colors='r',linestyles='dashed',linewidth=2) - #axes[idx,0].set_ylabel( str( str(valves[idx]) + " Flow" ),rotation='horizontal',labelpad=8) - axes[idx,0].annotate(str( str(valves[idx]) + " Flow" ),xy=(0.5,0.8),xycoords='axes fraction',fontsize='xx-large') - if idx == 0: - axes[idx,0].legend(fontsize='xx-large') - if idx != 3: - axes[idx,0].set_xticks([]) - - -# plot the storage nodes -for idx in range(4): - axes[idx,1].plot(uncontrolled_data['simulation_time'],np.array(uncontrolled_data['depthN'][storage_nodes[idx]])/ft2meters,label='Uncontrolled',color='k',linewidth=2) - axes[idx,1].plot(ef_data['simulation_time'],np.array(ef_data['depthN'][storage_nodes[idx]])/ft2meters,label='Equal Filling',color='b',linewidth=2) - axes[idx,1].plot(obc_data['simulation_time'],np.array(obc_data['depthN'][storage_nodes[idx]])/ft2meters,label='LTI Feedback',color='g',linewidth=2) - #axes[idx,1].set_ylabel( str( str(storage_nodes[idx]) + " Depth"),rotation='horizontal',labelpad=8) - axes[idx,1].annotate( str( str(storage_nodes[idx]) + " Depth"),xy=(0.5,0.8),xycoords='axes fraction',fontsize='xx-large') - - # add a dotted red line indicating the depth threshold - axes[idx,1].hlines(basin_max_depths_all[idx]/ft2meters,uncontrolled_data['simulation_time'][0],uncontrolled_data['simulation_time'][-1],label='Threshold',colors='r',linestyles='dashed',linewidth=2) - if idx != 3: - axes[idx,1].set_xticks([]) - -plt.tight_layout() -plt.savefig("G:/My Drive/modpods/pystorms_gamma_comparison.png",dpi=450) -plt.savefig("G:/My Drive/modpods/pystorms_gamma_comparison.svg",dpi=450) -#plt.show() - -print("done") - diff --git a/test_lti_from_gamma.py b/test_lti_from_gamma.py deleted file mode 100644 index 926ce6d..0000000 --- a/test_lti_from_gamma.py +++ /dev/null @@ -1,50 +0,0 @@ -import numpy as np -import pandas as pd -import scipy.stats as stats -import matplotlib.pyplot as plt -import modpods - - -# make the gamma distribution to be fit -# make shape, scale, and location random floats between 0 and 1 -shape = np.random.uniform(1, 10) -scale = np.random.uniform(0, 10) -loc = np.random.uniform(0, 10) -dt = 0.1 # pretend this was the sampling frequency of the data from which we got this -# worst case input is a sudden (sharp) change after some delay -# that is, shape=1, scale small, and location large -#shape = 1#6 -#scale = 1#0 -#loc = 1 - -# will also need to match the dt at which the gamma distribution is sampled - -# make something very sharp and fast -shape = 1 -scale = 0.1 -loc = 0 - -# make something very diffuse and slow - peak at 1000 timesteps with half-maximum width of about 200 timesteps -#shape = 100 -#scale = 10 -#loc = 0 - -# a transformation that generates a decay rate within the bounds should have a very accurate approximation -shape = 10 -scale = 1 -loc = 0 - - -print("shape, scale, loc: ", shape, scale, loc) - - -approx = modpods.lti_from_gamma(shape,scale,loc,dt,verbose=True) - - -plt.figure(figsize=(8,6)) -plt.plot(approx['t'],approx['gamma_pdf'], label="gamma pdf") -plt.plot(approx['t'],approx['lti_approx_output'], label="lti approximation") -plt.legend() -plt.show() - -print("done") \ No newline at end of file diff --git a/test_lti_system_gen.py b/test_lti_system_gen.py deleted file mode 100644 index 5f77bf0..0000000 --- a/test_lti_system_gen.py +++ /dev/null @@ -1,279 +0,0 @@ -import numpy as np -import pandas as pd -import scipy.stats as stats -import matplotlib.pyplot as plt -import modpods - -import control as ct - -cartoon=False -plot = True - -# set the random seed -np.random.seed(0) - -# define an LTI matrix where I know what the causative topology (connections) should look like -# this is an easy test case. one input affects two states, which then affect the output. so only four connections total -A = np.diag(-1*np.ones(10)) - -# define cascade connections -A[1,0] = 1 -A[2,1] = 1 -A[3,2] = 1 -A[4,3] = 1 -A[5,4] = 1 # two reservoirs -A[6,5] = 1 -A[7,6] = 1 - -# define connections to output -A[9,7] = 1 -A[9,8] = 1 - -# define input -B = np.zeros(shape=(10,2)) -B[0,0]= 1 -B[8,1] = 1 - -C = np.zeros(shape=(3,10)) -C[0,2] = 1 # x2 is observed -C[1,8] = 1 # x8 is observed -C[2,9] = 1 # x9 is observed - -parallel_reservoirs = ct.ss(A,B,C,0,inputs=['u1','u2'],outputs=['x2','x8','x9']) -time_base = 150 -# dt = .1 -# dt = 0.5 -# dt = 1 -# dt = 2 - -dt = .05 - -if cartoon: - dt = 0.05 -u = np.zeros((int(time_base / dt),2)) - -u[int(25/dt):int(50/dt),0] = np.random.rand(len(u[int(25/dt):int(50/dt),0]))-0.5 -u[int(5/dt):int(20/dt),1] = np.random.rand(len(u[int(5/dt):int(20/dt),1]))-0.5 -u[abs(u) < 0.40] = 0 # make it sparse (~80% of timesteps set to zero) -u[:,0] = u[:,0]*np.random.rand(len(u))*1000 -u[:,1] = u[:,1]*np.random.rand(len(u))*100 - - -if cartoon: # make the forcing simple - u = np.zeros((int(time_base / dt),2)) - u[int(5/dt):int(6/dt),0] = 1 - u[int(0/dt):int(1/dt),1] = 1 - - -T = np.arange(0,time_base,dt) -response = ct.forced_response(parallel_reservoirs,T,np.transpose(u)) - -system_data = pd.DataFrame(index=T) -system_data['u1'] = response.inputs[0][:] -system_data['u2'] = response.inputs[1][:] -system_data['x2'] = response.outputs[0][:] -system_data['x8'] = response.outputs[1][:] -system_data['x9'] = response.outputs[2][:] - -''' -system_data['x2'] = response.states[2][:] -system_data['x8'] = response.states[8][:] -system_data['x9'] = response.states[9][:] -''' - - -if plot: - system_data.plot(figsize=(10,5), subplots=True,legend=True) - plt.show() - if cartoon: - # also make a more cartoony version - cartoon_plot_data = system_data.copy() - # normalize all the magnitudes in cartoon_plot_data such as that all columns have maximum of 1 - for col in cartoon_plot_data.columns: - cartoon_plot_data[col] = cartoon_plot_data[col]/np.max(np.abs(cartoon_plot_data[col])) - - cartoon_plot_data.iloc[:int(len(cartoon_plot_data)/10)].plot(figsize=(5,5), subplots=False,legend=True,fontsize='xx-large',style=['r','b','g','m','k'],xticks=[],yticks=[],linewidth=5) - plt.gca().axis('off') # get rid of bounding box - plt.savefig("C:/modpods/test_lti_system_gen_cartoon.svg") - plt.show() - - -blind = True -if blind: - # define the causative topology from the response data - causative_topology, total_graph = modpods.infer_causative_topology(system_data,dependent_columns=['x2','x8','x9'], - independent_columns=['u1','u2'], verbose=True, max_iter=0, method='granger') -else: # assume we know the topology - # define the causative topology - # if this wasn't known a priori, we could use the modpods.infer_causative_topology function to find it - # but that function is expensive so I excluded it from this testing script - causative_topology = pd.DataFrame(index=['u1','u2','x2','x8','x9'], columns=['u1','u2','x2','x8','x9']).fillna('n') - causative_topology.loc['x2','u1'] = 'd' - causative_topology.loc['x8','u2'] = 'i' - causative_topology.loc['x9','x8'] = 'i' - causative_topology.loc['x9','x2'] = 'd' - -print(causative_topology) - - -# max iterations is for learning the delay model. increase it for better accuracy -lti_sys_from_data = modpods.lti_system_gen(causative_topology,system_data,['u1','u2'],['x2','x8','x9'],max_iter=100,bibo_stable=True) -#lti_sys_from_data = modpods.lti_system_gen(causative_topology,system_data,['u1','u2'],['x2','x8','x9'],max_iter=25,bibo_stable=False) -# print all columns of the pandas dataframe -pd.set_option('display.max_columns', None) - -''' -print("final lti system") -print("A") -print(lti_sys_from_data['A']) -print("B") -print(lti_sys_from_data['B']) -print("C") -print(lti_sys_from_data['C']) -''' -# how good is the system response aproximation for the training forcing? -approx_response = ct.forced_response(lti_sys_from_data['system'],T,np.transpose(u)) -approx_data = pd.DataFrame(index=T) -approx_data['u1'] = approx_response.inputs[0][:] -approx_data['u2'] = approx_response.inputs[1][:] -approx_data['x2'] = approx_response.outputs[0][:] -approx_data['x8'] = approx_response.outputs[1][:] -approx_data['x9'] = approx_response.outputs[2][:] - - -# create a vertical subplot of 3 axes -fig, axes = plt.subplots(3, 2, figsize=(8, 8)) -# plot the error of each output -output_columns = ['x2','x8','x9'] -for idx in range(len(output_columns)): - axes[idx,0].plot(system_data[output_columns[idx]],label='actual') - axes[idx,0].plot(approx_data[output_columns[idx]],label='approx') - if idx == 0: - axes[idx,0].legend(fontsize='x-large',loc='best') - axes[idx,0].set_ylabel(output_columns[idx],fontsize='xx-large') - if idx == len(output_columns)-1: - axes[idx,0].set_xlabel("time",fontsize='x-large') - -# label the left column of plots "training" -axes[0,0].set_title("training",fontsize='xx-large') - - -# what about a different forcing? (test case) -u2 = np.zeros((int(time_base / dt),2)) -u2[int(0/dt):int(35/dt),0] = np.random.rand(len(u2[int(0/dt):int(35/dt),0]))-0.5 -u2[int(10/dt):int(80/dt),1] = np.random.rand(len(u2[int(10/dt):int(80/dt),1]))-0.5 -u2[abs(u2) < 0.40] = 0 # make it sparse -u2[:,0] = u2[:,0]*np.random.rand(len(u2))*1000 -u2[:,1] = u2[:,1]*np.random.rand(len(u2))*100 - -approx_response = ct.forced_response(lti_sys_from_data['system'],T,np.transpose(u2)) -approx_data = pd.DataFrame(index=T) -approx_data['u1'] = approx_response.inputs[0][:] -approx_data['u2'] = approx_response.inputs[1][:] -approx_data['x2'] = approx_response.outputs[0][:] -approx_data['x8'] = approx_response.outputs[1][:] -approx_data['x9'] = approx_response.outputs[2][:] - -actual_response = ct.forced_response(parallel_reservoirs,T,np.transpose(u2)) -actual_data = pd.DataFrame(index=T) -actual_data['u1'] = actual_response.inputs[0][:] -actual_data['u2'] = actual_response.inputs[1][:] -actual_data['x2'] = actual_response.outputs[0][:] -actual_data['x8'] = actual_response.outputs[1][:] -actual_data['x9'] = actual_response.outputs[2][:] - - -for idx in range(len(output_columns)): - axes[idx,1].plot(actual_data[output_columns[idx]],label='actual') - axes[idx,1].plot(approx_data[output_columns[idx]],label='approx') - if idx == len(output_columns)-1: - axes[idx,1].set_xlabel("time",fontsize='x-large') - -axes[0,1].set_title("testing",fontsize='xx-large') -plt.tight_layout() -plt.savefig("C:/modpods/test_lti_system_gen.png") -plt.savefig("C:/modpods/test_lti_system_gen.svg") -plt.show() -#plt.close() - - - -# now try LQR using u1 (slow) as the disturbance and u2 (fast) as the control -# make the objective to minimize the magnitude of x9 - -# define the cost function -Q = np.eye(len(lti_sys_from_data['A'].columns))*0 # no states matter -Q[lti_sys_from_data['A'].columns.get_loc('x9'),lti_sys_from_data['A'].columns.get_loc('x9')] = 10e6 # other than x9 -R = np.eye(len(lti_sys_from_data['B'].columns)) / 10e6 # don't constrain the control effort -# define the system -B_u = lti_sys_from_data['B'].values # use u2 as the control -B_u[:,0] = 0 # don't use u1 as the control -B_d = lti_sys_from_data['B'].values # use u1 as the disturbance -B_d[:,1] = 0 # don't use u2 as the disturbance -sys_response_to_control = ct.ss(lti_sys_from_data['A'],B_u,lti_sys_from_data['C'],0) # just for defining the controller gain -# find the state feedback gain for the linear quadratic regulator -K,S,E = ct.lqr(sys_response_to_control,Q,R) # one row of K should be zeros to reflect that u1 is not used as a control but is the disturbance - - -# find the estimator gain for the kalman filter -# observe performance degrade as the assumed measurement noise covariance increases (slower poles on the observer) -assumed_noise_levels = [10e-8, 10e-1] - -# plot the results -fig,axes = plt.subplots(len(output_columns),2,figsize=(8,8)) -graph_labels = ['d','u','y'] - -# define the disturbance (only through the slow, u1 channel) -d = np.zeros((int(time_base / dt),2)) -d[int(0/dt):int(35/dt),0] = np.random.rand(len(d[int(0/dt):int(35/dt),0]))-0.5 -d[abs(d) < 0.40] = 0 # make it sparse -d[:,0] = d[:,0]*np.random.rand(len(d))*1000 - -for noise_level_idx in range(2): - noisiness = assumed_noise_levels[noise_level_idx] - L,P,E = ct.lqe(lti_sys_from_data['system'],np.eye(len(lti_sys_from_data['B'].columns)),noisiness*np.eye(len(lti_sys_from_data['C'].index)) ) # unit covariance on process noise and measurement error - - # define the observer based compensator (per freudenberg 560 course notes 2.4) - obc_A = lti_sys_from_data['A'].values-lti_sys_from_data['B'].values@K - L@lti_sys_from_data['C'].values - obc = ct.ss(obc_A, L, K, 0, inputs=['x2_m','x8_m','x9_m'],outputs=['u1','u2']) # K positive because negative feedback is assumed - - # define the closed loop system with the original plant and the observer based compensator designed using the identified model - closed_loop = ct.feedback(parallel_reservoirs,obc) - - obc_feedback_control = ct.forced_response(closed_loop,T,np.transpose(d)) - obc_feedback_data = pd.DataFrame(index=T) - obc_feedback_data['u1'] = obc_feedback_control.inputs[0][:] - obc_feedback_data['u2'] = obc_feedback_control.inputs[1][:] - obc_feedback_data['x2'] = obc_feedback_control.outputs[0][:] - obc_feedback_data['x8'] = obc_feedback_control.outputs[1][:] - obc_feedback_data['x9'] = obc_feedback_control.outputs[2][:] - - uncontrolled = ct.forced_response(parallel_reservoirs,T,np.transpose(d)) - uncontrolled_data = pd.DataFrame(index=T) - uncontrolled_data['u1'] = uncontrolled.inputs[0][:] - uncontrolled_data['u2'] = uncontrolled.inputs[1][:] - uncontrolled_data['x2'] = uncontrolled.outputs[0][:] - uncontrolled_data['x8'] = uncontrolled.outputs[1][:] - uncontrolled_data['x9'] = uncontrolled.outputs[2][:] - - - for idx in range(len(output_columns)): - axes[idx,noise_level_idx].plot(uncontrolled_data[output_columns[idx]],label='uncontrolled') - axes[idx,noise_level_idx].plot(obc_feedback_data[output_columns[idx]],label='obc feedback') - if idx == 0: - axes[idx,noise_level_idx].legend(fontsize='x-large') - if noise_level_idx == 0: - axes[idx,noise_level_idx].set_title("measurements assumed clean",fontsize='large') - else: - axes[idx,noise_level_idx].set_title("measurements assumed noisy",fontsize='large') - if idx == len(output_columns)-1: - axes[idx,noise_level_idx].set_xlabel("time",fontsize='x-large') - if noise_level_idx == 0: - axes[idx,noise_level_idx].set_ylabel(graph_labels[idx],fontsize='xx-large',rotation='horizontal') - -plt.tight_layout() -plt.savefig("C:/modpods/test_lti_system_gen_obc.png") -plt.savefig("C:/modpods/test_lti_system_gen_obc.svg") -plt.show() - -print("done") diff --git a/test_topo_from_swmm.py b/test_topo_from_swmm.py deleted file mode 100644 index 5251439..0000000 --- a/test_topo_from_swmm.py +++ /dev/null @@ -1,34 +0,0 @@ -import pyswmm -import pystorms -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt -import sys -sys.path.append("G:/My Drive/modpods") -import modpods - -# display all columns of pandas dataframes -pd.set_option('display.max_columns', None) - -#epsilon_topo = modpods.topo_from_pystorms(pystorms.scenarios.epsilon()) -#theta = pystorms.scenarios.theta() -#theta_topo = modpods.topo_from_pystorms(theta) - -gamma = pystorms.scenarios.zeta() -#gamma.config['states'] = [gamma.config['states'][0], gamma.config['states'][3], gamma.config['states'][5], gamma.config['states'][9]] -#gamma.config['action_space'] = [gamma.config['action_space'][0], gamma.config['action_space'][3], gamma.config['action_space'][5], gamma.config['action_space'][9]] -print(gamma.config['states']) -print(gamma.config['action_space']) -gamma_topo = modpods.topo_from_pystorms(gamma) -print(gamma_topo) - - -#beta = pystorms.scenarios.beta() -#beta_topo = modpods.topo_from_pystorms(beta) - -#zeta = pystorms.scenarios.zeta() -#zeta_topo = modpods.topo_from_pystorms(zeta) - -print("done") - - diff --git a/test_topo_inference.py b/test_topo_inference.py deleted file mode 100644 index dcb47fe..0000000 --- a/test_topo_inference.py +++ /dev/null @@ -1,174 +0,0 @@ -import numpy as np -import pandas as pd -import scipy.stats as stats -import os as os -import matplotlib.pyplot as plt -import modpods as modpods -import control as ct - - -# define an LTI matrix where I know what the causative topology (connections) should look like -# this is an easy test case. one input affects two states, which then affect the output. so only four connections total -A = np.diag(-1*np.ones(10)) - -# define cascade connections -A[1,0] = 1 -A[2,1] = 1 -A[3,2] = 1 -A[4,3] = 1 -A[5,4] = 1 # two reservoirs -A[6,5] = 1 -A[7,6] = 1 - -# define connections to output -#A[9,4] = 1 -A[9,7] = 1 -#A[9,8] = 1 -A[9,8] = 1 - -# define input -B = np.zeros(shape=(10,2)) -B[0,0]= 1 -#B[5] = 2 -B[8,1] = 1 - -C = np.eye(10) -D = np.zeros(shape=(10,2)) - -parallel_reservoirs = ct.ss(A,B,C,D) -''' -response = ct.impulse_response(parallel_reservoirs) -plt.figure(figsize=(10,5)) -states_to_plot = np.array([0,8,9]) -for state in states_to_plot: - plt.plot(response.outputs[state][0], '--', label=str(state)) - -plt.legend() -plt.title("impulse response") -plt.show() -''' - -time_base = 50 -dt = .05 -u = np.zeros((int(time_base / dt),2)) - -#u[2020:2120,1] = np.random.rand(100) -0.5 - -#u[1220:1320,1] = np.random.rand(100) -0.5 - -# u1 -> x5 -> x9 -#u[3180:3280,0] = np.random.rand(100) -0.5 -u[int(25/dt):int(40/dt),0] = np.random.rand(len(u[int(25/dt):int(40/dt),0]))-0.5 -u[int(0/dt):int(15/dt),1] = np.random.rand(len(u[int(0/dt):int(15/dt),1]))-0.5 -u[abs(u) < 0.40] = 0 # make it sparse -u[:,0] = u[:,0]*np.random.rand(len(u))*1000 -u[:,1] = u[:,1]*np.random.rand(len(u))*100 - - -#u[700:800,0] = 10 - -#u[900:950,1] = -10 - -#u[1100:1400,0] = 2 -#u[1600:1750,1] = -7 -#u[0:20,1] = 20 -#u[int(15/dt),1] = -50 -#u[int(25/dt):int(35/dt),0] = 6 -#u[int(29/dt):int(31/dt),0] = -10 -#u[int(25/dt):int(35/dt),1] = np.random.rand(len(u[int(25/dt):int(35/dt),1]) )*5 - 2.5 -# u2 -> x8 -> x9 -#u[2020:2120,1] = np.random.rand(100) -0.5 - -#u[1220:1320,1] = np.random.rand(100) -0.5 - -# u1 -> x5 -> x9 -#u[3180:3280,0] = np.random.rand(100) -0.5 - -#u[abs(u) < 0.45] = 0 # make it sparse -#u[:600,0] = u[:600,0]*np.random.rand(600)*100 -#u[:600,1] = u[:600,1]*np.random.rand(600)*100 - - - - -T = np.arange(0,time_base,dt) -response = ct.forced_response(parallel_reservoirs,T,np.transpose(u)) - -system_data = pd.DataFrame(index=T) -system_data['u1'] = response.inputs[0][:] -system_data['u2'] = response.inputs[1][:] -system_data['x2'] = response.states[2][:] -system_data['x8'] = response.states[8][:] -system_data['x9'] = response.states[9][:] - -plot = False - -system_data.plot(figsize=(10,5), subplots=True,legend=True) -plt.savefig('test_topo_inference.png') -if plot: - plt.show() -plt.close('all') - -# now try to infer the topology -# assume i know this is data from a drainage system and so I assume it's a directed acyclic graph -causative_topo = modpods.infer_causative_topology(system_data,dependent_columns=['x2','x8','x9'], - independent_columns=['u1','u2'], verbose=True, max_iter=0, method='granger') -# the correct answer here is that there is immediate causation flowing from u2 to x8 to x9 -# and there is delayed causation flowing from u1 to x2 to x9 - -#modpods.delay_io_train(system_data, dependent_columns=['x5','x8','x9'], independent_columns=['u1','u2'], -# max_transforms=1,max_iter=250,poly_order=1,transform_dependent=True,bibo_stable=True,verbose=True) - -print("done") -# a randomized (but stable) lti system -''' -A = np.matrix(np.random.rand(5,5)) - 0.5 -A = (A - np.diag(np.ones(5))) - - -# add an oscillatory pair -A = np.concatenate((A , 0.001*(np.random.rand(2,5) -0.5) ), axis=0) -A = np.concatenate((A, np.array([[0,0,],[0,0],[0,0],[0,0],[0,0] , [-0.3,-1], [1,-0.3]])), axis=1) -#A = [[0,-1], [1,0]] - -print(A) -A = A/100 -l,v = np.linalg.eig(A) -print(l) - -B = (np.random.rand(7,1) - 0.5)*4 - - - -C = np.eye(7) -D = np.zeros(shape=(7,1)) - - -random_system = ct.ss(A,B,C,D) - -response = ct.impulse_response(random_system) -plt.figure(figsize=(25,10)) -for state in range(7): - plt.plot(response.outputs[state][0], label=str(state)) - -plt.legend() -plt.title("impulse response") - - -u = np.concatenate( (np.zeros(100), np.ones(10), np.zeros(120), -.5*np.ones(10), np.zeros(150), np.ones(10), - np.zeros(70), -np.ones(1), np.zeros(190), np.ones(100), np.zeros(140) , - np.linspace(-2,2,50) , np.zeros(130) , np.linspace(0,0.5,200), np.zeros(20), - np.sin(np.arange(0,10,0.01) ) , np.zeros(30), np.ones(500), np.zeros(1000) ) ) - -#u = np.concatenate( (np.zeros(10), np.ones(1), np.zeros(500))) - -response = ct.forced_response(random_system,np.arange(0,len(u)),u) - -plt.figure(figsize=(25,10)) -for state in range(7): - plt.plot(response.outputs[state][:], label=str(state)) - -plt.plot(u, label='forcing') -plt.legend() -plt.title("forced response") -''' \ No newline at end of file diff --git a/03439000_05_model_output.txt b/tests/data/03439000_05_model_output.txt similarity index 100% rename from 03439000_05_model_output.txt rename to tests/data/03439000_05_model_output.txt diff --git a/swmm_lti_plant_approx_seeing.pickle b/tests/data/swmm_lti_plant_approx_seeing.pickle similarity index 100% rename from swmm_lti_plant_approx_seeing.pickle rename to tests/data/swmm_lti_plant_approx_seeing.pickle diff --git a/tests/test_modpods.py b/tests/test_modpods.py index baeebb9..28dbcef 100644 --- a/tests/test_modpods.py +++ b/tests/test_modpods.py @@ -1,11 +1,17 @@ """ Pytest tests for modpods core functions. -These tests cover the functions that are self-contained and do not require -external data files or long-running simulations. +Tests collected from the following original scripts (now deleted): + test_lti_from_gamma.py, test_topo_inference.py, test_coef_constraints.py, + test.py, test_lti_system_gen.py, test_topo_from_swmm.py, + test_lti_control_of_swmm.py + +Tests that load large data files or run long simulations are marked @pytest.mark.slow. """ +import pathlib import warnings +from typing import Any import control as ct import numpy as np @@ -14,49 +20,35 @@ import modpods -# --------------------------------------------------------------------------- -# lti_from_gamma -# --------------------------------------------------------------------------- - - -def test_lti_from_gamma_returns_required_keys() -> None: - """lti_from_gamma must return a dict with the expected keys.""" - result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) - assert isinstance(result, dict) - for key in ("t", "gamma_pdf", "lti_approx_output", "lti_approx"): - assert key in result, f"missing key '{key}' in result" +DATA_DIR = pathlib.Path(__file__).parent / "data" -def test_lti_from_gamma_output_shapes_match() -> None: - """gamma_pdf and lti_approx_output must have the same length.""" - result = modpods.lti_from_gamma(shape=5, scale=2, location=0) - assert result["gamma_pdf"].shape == result["lti_approx_output"].shape +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- -def test_lti_from_gamma_achieves_reasonable_nse() -> None: - """The LTI approximation should achieve NSE > 0.9 for a well-conditioned - gamma distribution (shape=10, scale=1, location=0).""" - result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) - gamma_pdf = result["gamma_pdf"] - lti_approx = result["lti_approx_output"] - nse = 1.0 - float( - np.sum(np.square(gamma_pdf - lti_approx)) - / np.sum(np.square(gamma_pdf - np.mean(gamma_pdf))) +@pytest.fixture(scope="module") +def simple_lti_data() -> pd.DataFrame: + """Small two-state LTI system: u → x0 → x1 (cascade, 200 time-steps).""" + np.random.seed(42) + n, dt = 200, 0.05 + T = np.arange(0, n * dt, dt) + A = np.array([[-1.0, 0], [1.0, -1.0]]) + B = np.array([[1.0], [0.0]]) + sys = ct.ss(A, B, np.eye(2), 0) + u = np.zeros((n, 1)) + u[50:80, 0] = np.random.rand(30) + response = ct.forced_response(sys, T, np.transpose(u)) + df = pd.DataFrame( + index=T, + data={ + "u": response.inputs[0], + "x0": response.states[0], + "x1": response.states[1], + }, ) - assert nse > 0.9, f"NSE {nse:.4f} is below the 0.9 threshold" - - -def test_lti_from_gamma_t_is_nonnegative() -> None: - """The time vector returned must be non-negative and monotonically increasing.""" - result = modpods.lti_from_gamma(shape=3, scale=1, location=0) - t = result["t"] - assert t[0] >= 0.0 - assert np.all(np.diff(t) > 0), "time vector is not strictly increasing" - - -# --------------------------------------------------------------------------- -# infer_causative_topology (Granger causality) -# --------------------------------------------------------------------------- + return df @pytest.fixture(scope="module") @@ -66,7 +58,7 @@ def cascade_lti_system_data() -> pd.DataFrame: System topology (ground truth): u1 → x0 → x1 → x2 (u1 causes x2 via a long cascade, delayed) u2 → x8 (u2 causes x8 directly) - x7 → x9, x8 → x9 (x9 driven by both chains; x7 is not observable) + x7 → x9, x8 → x9 (x9 driven by both chains) Observable variables: u1, u2, x2, x8, x9 """ @@ -112,6 +104,145 @@ def cascade_lti_system_data() -> pd.DataFrame: return df +# --------------------------------------------------------------------------- +# lti_from_gamma tests (from test_lti_from_gamma.py) +# --------------------------------------------------------------------------- + + +def test_lti_from_gamma_returns_required_keys() -> None: + """lti_from_gamma must return a dict with the expected keys.""" + result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) + assert isinstance(result, dict) + for key in ("t", "gamma_pdf", "lti_approx_output", "lti_approx"): + assert key in result, f"missing key '{key}' in result" + + +def test_lti_from_gamma_output_shapes_match() -> None: + """gamma_pdf and lti_approx_output must have the same length.""" + result = modpods.lti_from_gamma(shape=5, scale=2, location=0) + assert result["gamma_pdf"].shape == result["lti_approx_output"].shape + + +def test_lti_from_gamma_achieves_reasonable_nse() -> None: + """The LTI approximation should achieve NSE > 0.9 for a well-conditioned + gamma distribution (shape=10, scale=1, location=0).""" + result = modpods.lti_from_gamma(shape=10, scale=1, location=0, dt=0.1) + gamma_pdf = result["gamma_pdf"] + lti_approx = result["lti_approx_output"] + nse = 1.0 - float( + np.sum(np.square(gamma_pdf - lti_approx)) + / np.sum(np.square(gamma_pdf - np.mean(gamma_pdf))) + ) + assert nse > 0.9, f"NSE {nse:.4f} is below the 0.9 threshold" + + +def test_lti_from_gamma_t_is_nonnegative() -> None: + """The time vector returned must be non-negative and monotonically increasing.""" + result = modpods.lti_from_gamma(shape=3, scale=1, location=0) + t = result["t"] + assert t[0] >= 0.0 + assert np.all(np.diff(t) > 0), "time vector is not strictly increasing" + + +# --------------------------------------------------------------------------- +# delay_io_train / delay_io_predict tests (from test_coef_constraints.py) +# --------------------------------------------------------------------------- + + +def test_delay_io_train_returns_model(simple_lti_data: pd.DataFrame) -> None: + """delay_io_train must return a dict keyed by output-variable index.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model = modpods.delay_io_train( + simple_lti_data, + dependent_columns=["x1"], + independent_columns=["u"], + windup_timesteps=0, + init_transforms=1, + max_transforms=1, + max_iter=5, + poly_order=1, + verbose=False, + ) + assert isinstance(model, dict) + assert 1 in model, "expected key 1 (first output) in model dict" + assert "final_model" in model[1] + assert "error_metrics" in model[1]["final_model"] + + +def test_delay_io_train_nse_above_zero(simple_lti_data: pd.DataFrame) -> None: + """Training NSE on the simple cascade system must be positive.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model = modpods.delay_io_train( + simple_lti_data, + dependent_columns=["x1"], + independent_columns=["u"], + windup_timesteps=0, + init_transforms=1, + max_transforms=1, + max_iter=10, + poly_order=1, + verbose=False, + ) + nse = float(model[1]["final_model"]["error_metrics"]["NSE"][0]) + assert nse > 0.0, f"Training NSE {nse:.4f} is non-positive" + + +def test_delay_io_train_with_forcing_coef_constraints( + simple_lti_data: pd.DataFrame, +) -> None: + """delay_io_train with bibo_stable=True and forcing_coef_constraints must complete + without error and return a valid model dict.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model = modpods.delay_io_train( + simple_lti_data, + dependent_columns=["x1"], + independent_columns=["u"], + windup_timesteps=0, + init_transforms=1, + max_transforms=1, + max_iter=10, + poly_order=1, + verbose=False, + bibo_stable=True, + forcing_coef_constraints={"u": 1}, + ) + assert isinstance(model, dict) + assert 1 in model + assert model[1]["final_model"]["error_metrics"]["NSE"] is not None + + +def test_delay_io_predict_returns_expected_shape( + simple_lti_data: pd.DataFrame, +) -> None: + """delay_io_predict must return a dict with 'prediction' of the right length.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model = modpods.delay_io_train( + simple_lti_data, + dependent_columns=["x1"], + independent_columns=["u"], + windup_timesteps=0, + init_transforms=1, + max_transforms=1, + max_iter=5, + poly_order=1, + verbose=False, + ) + pred = modpods.delay_io_predict(model, simple_lti_data, num_transforms=1) + assert isinstance(pred, dict) + assert "prediction" in pred + # prediction length should be approximately equal to data length + assert pred["prediction"].shape[0] > 0 + + +# --------------------------------------------------------------------------- +# infer_causative_topology tests (from test_topo_inference.py) +# --------------------------------------------------------------------------- + + def test_infer_causative_topology_returns_dataframe( cascade_lti_system_data: pd.DataFrame, ) -> None: @@ -188,3 +319,211 @@ def test_infer_causative_topology_no_self_loops( assert ( causative_topo.loc[dep_var, dep_var] == "n" ), f"Self-loop detected for {dep_var}" + + +# --------------------------------------------------------------------------- +# lti_system_gen tests (from test_lti_system_gen.py) — SLOW +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def known_topology() -> pd.DataFrame: + """Manually specified topology for the 5-variable cascade system: + u1 --(delayed)--> x2 + u2 --(immediate)-> x8 + x8 --(immediate)-> x9 + x2 --(delayed)---> x9 + """ + topo = pd.DataFrame( + index=["x2", "x8", "x9"], + columns=["u1", "u2", "x2", "x8", "x9"], + ).fillna("n") + topo.loc["x2", "u1"] = "d" + topo.loc["x8", "u2"] = "i" + topo.loc["x9", "x8"] = "i" + topo.loc["x9", "x2"] = "d" + return topo + + +@pytest.mark.slow +def test_lti_system_gen_returns_state_space( + cascade_lti_system_data: pd.DataFrame, + known_topology: pd.DataFrame, +) -> None: + """lti_system_gen must return a dict with 'system', 'A', 'B', 'C' keys, where + 'system' is a StateSpace object that can be simulated.""" + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + result = modpods.lti_system_gen( + known_topology, + cascade_lti_system_data, + independent_columns=["u1", "u2"], + dependent_columns=["x2", "x8", "x9"], + max_iter=10, + bibo_stable=True, + ) + + assert isinstance(result, dict) + for key in ("system", "A", "B", "C"): + assert key in result, f"missing key '{key}'" + + assert isinstance(result["system"], ct.StateSpace) + # Verify the system can be used for forward simulation + T = cascade_lti_system_data.index + test_u = np.zeros((len(T), 2)) + test_u[100:200, 0] = 1.0 + response = ct.forced_response(result["system"], T, np.transpose(test_u)) + assert response.outputs.shape[0] == 3, "expected 3 outputs (x2, x8, x9)" + + +# --------------------------------------------------------------------------- +# topo_from_pystorms tests (from test_topo_from_swmm.py and +# test_lti_control_of_swmm.py) — SLOW +# --------------------------------------------------------------------------- + + +@pytest.mark.slow +def test_topo_from_pystorms_zeta_returns_dataframe() -> None: + """topo_from_pystorms on the zeta scenario must return a non-empty DataFrame + whose index and columns correspond to the scenario's state variables.""" + pystorms = pytest.importorskip("pystorms") + env = pystorms.scenarios.zeta() + topo = modpods.topo_from_pystorms(env) + + assert isinstance(topo, pd.DataFrame) + assert topo.shape[0] > 0, "topology DataFrame must have at least one row" + # All cells should be 'n', 'i', or 'd' + valid_values = {"n", "i", "d"} + for val in topo.values.flat: + assert val in valid_values, f"unexpected cell value '{val}' in topology" + + +@pytest.mark.slow +def test_topo_from_pystorms_zeta_has_self_connections() -> None: + """topo_from_pystorms (zeta) must mark each state as influencing itself ('i').""" + pystorms = pytest.importorskip("pystorms") + env = pystorms.scenarios.zeta() + topo = modpods.topo_from_pystorms(env) + + for state in topo.index: + if state in topo.columns: + # Use .at[] to avoid pandas MultiIndex interpretation of tuple keys + assert ( + topo.at[state, state] == "i" + ), f"state {state!r} should have self-connection 'i'" + + +@pytest.mark.slow +def test_topo_from_pystorms_gamma_after_characterization() -> None: + """After running a characterization simulation of the gamma scenario, + topo_from_pystorms must successfully infer the network topology.""" + pystorms = pytest.importorskip("pystorms") + np.random.seed(42) + + # Run characterization simulation (randomly opens/closes valves) + env = pystorms.scenarios.gamma() + done = False + step = 0 + actions_characterize = np.ones(4) + while not done: + if step % 1000 == 0: + actions_characterize = np.ones(4) * 0.3 + actions_characterize[np.random.randint(0, 4)] = np.random.rand() + done = env.step(np.concatenate((actions_characterize, np.ones(7)), axis=0)) + step += 1 + + # Restrict state/action space to the first 4 controlled basins + dependent_columns = env.config["states"][4:8] + independent_columns = [ + c for c in env.config["action_space"] if c not in dependent_columns + ] + env.config["states"] = dependent_columns + env.config["action_space"] = independent_columns + + topo = modpods.topo_from_pystorms(env) + + assert isinstance(topo, pd.DataFrame) + assert topo.shape[0] > 0 + + +# --------------------------------------------------------------------------- +# CAMELS rainfall-runoff tests (from test.py) — SLOW (uses data file) +# --------------------------------------------------------------------------- + + +@pytest.fixture(scope="module") +def camels_data() -> pd.DataFrame: + """Load and preprocess the CAMELS daily streamflow data.""" + filepath = DATA_DIR / "03439000_05_model_output.txt" + df = pd.read_csv(filepath, sep=r"\s+") + df.rename( + {"YR": "year", "MNTH": "month", "DY": "day", "HR": "hour"}, + axis=1, + inplace=True, + ) + df["datetime"] = pd.to_datetime(df[["year", "month", "day", "hour"]]) + df.set_index("datetime", inplace=True) + df.RAIM = df.RAIM.shift(-1) + df.dropna(inplace=True) + return df + + +@pytest.fixture(scope="module") +def trained_camels_model( + camels_data: pd.DataFrame, +) -> dict[Any, Any]: + """Train a delay_io model on one year of CAMELS data.""" + windup_timesteps = 30 + years = 1 + df_train = camels_data.iloc[: 365 * years + windup_timesteps, :][ + ["OBS_RUN", "RAIM", "PET", "PRCP"] + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + model = modpods.delay_io_train( + df_train, + dependent_columns=["OBS_RUN"], + independent_columns=["RAIM", "PET", "PRCP"], + windup_timesteps=windup_timesteps, + init_transforms=1, + max_transforms=1, + max_iter=10, + poly_order=1, + verbose=False, + bibo_stable=False, + forcing_coef_constraints={"RAIM": -1, "PET": 1, "PRCP": -1}, + ) + return model + + +@pytest.mark.slow +def test_delay_io_train_camels_returns_model( + trained_camels_model: dict[Any, Any], +) -> None: + """delay_io_train on CAMELS data must return a model dict with NSE > -1.""" + assert isinstance(trained_camels_model, dict) + assert 1 in trained_camels_model + nse_val = trained_camels_model[1]["final_model"]["error_metrics"]["NSE"] + nse = float(nse_val[0]) if hasattr(nse_val, "__len__") else float(nse_val) + assert nse > -1.0, f"CAMELS training NSE {nse:.4f} is unreasonably low" + + +@pytest.mark.slow +def test_delay_io_predict_camels_returns_prediction( + trained_camels_model: dict[Any, Any], + camels_data: pd.DataFrame, +) -> None: + """delay_io_predict on CAMELS eval data must return a 'prediction' array.""" + windup_timesteps = 30 + years = 1 + df_eval = camels_data.iloc[-(365 * years + windup_timesteps) :, :][ + ["OBS_RUN", "RAIM", "PET", "PRCP"] + ] + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + pred = modpods.delay_io_predict( + trained_camels_model, df_eval, num_transforms=1, evaluation=True + ) + assert isinstance(pred, dict) + assert "prediction" in pred + assert pred["prediction"].shape[0] > 0