diff --git a/CHANGELOG.md b/CHANGELOG.md index a93fede26..6a37f1c4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.14.3] - 2025-04-25 10:00:00 + +### Added + +- Puts a ceiling on the version of the `marshmallow<4.0.0` package in `environment.yml` +- Update `txfunc.py` for the estimation of the `HSV` and `GS` tax functions. +- Update `test_txfunc.py`, `tax_func_estimate_outputs.pkl`, and `tax_func_loop_outputs.pkl` files for testing +- Update `dask` and `distributed` client calls in `SS.py` and `TPI.py` to allow for updated versions. + ## [0.14.2] - 2025-04-04 12:00:00 ### Added @@ -373,6 +382,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Version [0.7.0] on August 30, 2021 was the first time that the OG-USA repository was detached from all of the core model logic, which was named OG-Core. Before this version, OG-USA was part of what is now the [`OG-Core`](https://github.com/PSLmodels/OG-Core) repository. In the next version of OG-USA, we adjusted the version numbering to begin with 0.1.0. This initial version of 0.7.0, was sequential from what OG-USA used to be when the OG-Core project was called OG-USA. - Any earlier versions of OG-USA can be found in the [`OG-Core`](https://github.com/PSLmodels/OG-Core) repository [release history](https://github.com/PSLmodels/OG-Core/releases) from [v.0.6.4](https://github.com/PSLmodels/OG-Core/releases/tag/v0.6.4) (Jul. 20, 2021) or earlier. + +[0.14.3]: https://github.com/PSLmodels/OG-Core/compare/v0.14.2...v0.14.3 [0.14.2]: https://github.com/PSLmodels/OG-Core/compare/v0.14.1...v0.14.2 [0.14.1]: https://github.com/PSLmodels/OG-Core/compare/v0.14.0...v0.14.1 [0.14.0]: https://github.com/PSLmodels/OG-Core/compare/v0.13.2...v0.14.0 diff --git a/environment.yml b/environment.yml index 68fadd400..b89fb92ce 100644 --- a/environment.yml +++ b/environment.yml @@ -15,6 +15,7 @@ dependencies: - distributed>=2.30.1 - paramtools>=0.15.0 - sphinx>=3.5.4 +- marshmallow<4.0.0 - sphinx-argparse - sphinxcontrib-bibtex>=2.0.0 - sphinx-math-dollar diff --git a/ogcore/SS.py b/ogcore/SS.py index a8c58865e..6705b2e4e 100644 --- a/ogcore/SS.py +++ b/ogcore/SS.py @@ -34,12 +34,6 @@ level=log_level, format="%(message)s" # Only show the message itself ) -""" -A global future for the Parameters object for client workers. -This is scattered once and place at module scope, then used -by the client in the inner loop. -""" -scattered_p = None """ ------------------------------------------------------------------------ @@ -223,12 +217,6 @@ def inner_loop(outer_loop_vars, p, client): units """ - # Retrieve the "scattered" Parameters object. - global scattered_p - - if scattered_p is None: - scattered_p = client.scatter(p, broadcast=True) if client else p - # unpack variables to pass to function bssmat, nssmat, r_p, r, w, p_m, Y, BQ, TR, Ig_baseline, factor = ( outer_loop_vars @@ -247,35 +235,71 @@ def inner_loop(outer_loop_vars, p, client): tr = household.get_tr(TR, None, p, "SS") ubi = p.ubi_nom_array[-1, :, :] / factor + scattered_p = client.scatter(p, broadcast=True) if client else p + lazy_values = [] for j in range(p.J): guesses = np.append(bssmat[:, j], nssmat[:, j]) - euler_params = ( + + # Create a delayed function that will access the scattered_p + @delayed + def solve_for_j( + guesses, r_p, w, p_tilde, - bq[:, j], - rm[:, j], - tr[:, j], - ubi[:, j], + bq_j, + rm_j, + tr_j, + ubi_j, factor, j, - scattered_p, - ) - lazy_values.append( - delayed(opt.root)( + scattered_p_future, + ): + # This function will be executed on workers with access to scattered_p + return opt.root( euler_equation_solver, guesses * 0.9, - args=euler_params, + args=( + r_p, + w, + p_tilde, + bq_j, + rm_j, + tr_j, + ubi_j, + factor, + j, + scattered_p_future, + ), method=p.FOC_root_method, tol=MINIMIZER_TOL, ) + + # Add the delayed computation to our list + lazy_values.append( + solve_for_j( + guesses, + r_p, + w, + p_tilde, + bq[:, j], + rm[:, j], + tr[:, j], + ubi[:, j], + factor, + j, + scattered_p, + ) ) + if client: - futures = client.compute(lazy_values, num_workers=p.num_workers) + # Compute all the values + futures = client.compute(lazy_values) + # Later, gather the results when needed results = client.gather(futures) else: - results = results = compute( + results = compute( *lazy_values, scheduler=dask.multiprocessing.get, num_workers=p.num_workers, @@ -1197,11 +1221,6 @@ def run_SS(p, client=None): results """ - global scattered_p - if client: - scattered_p = client.scatter(p, broadcast=True) - else: - scattered_p = p # Create list of deviation factors for initial guesses of r and TR dev_factor_list = [ diff --git a/ogcore/TPI.py b/ogcore/TPI.py index b9c123ba8..d7c066c2a 100644 --- a/ogcore/TPI.py +++ b/ogcore/TPI.py @@ -46,13 +46,6 @@ level=log_level, format="%(message)s" # Only show the message itself ) -""" -A global future for the Parameters object for client workers. -This is scattered once and place at module scope, then used -by the client in the inner loop. -""" -scattered_p = None - def get_initial_SS_values(p): """ @@ -578,12 +571,6 @@ def run_TPI(p, client=None): results """ - global scattered_p - if client: - scattered_p = client.scatter(p, broadcast=True) - else: - scattered_p = p - # unpack tuples of parameters initial_values, ss_vars, theta, baseline_values = get_initial_SS_values(p) (B0, b_sinit, b_splus1init, factor, initial_b, initial_n) = initial_values @@ -779,10 +766,15 @@ def run_TPI(p, client=None): ).sum(axis=2) p_tilde = aggr.get_ptilde(p_i[:, :], p.tau_c[:, :], p.alpha_c, "TPI") + # scatter parameters to workers + scattered_p = client.scatter(p, broadcast=True) if client else p + euler_errors = np.zeros((p.T, 2 * p.S, p.J)) lazy_values = [] for j in range(p.J): guesses = (guesses_b[:, :, j], guesses_n[:, :, j]) + + # Add the delayed computation to our list lazy_values.append( delayed(inner_loop)( guesses, @@ -795,7 +787,9 @@ def run_TPI(p, client=None): ) ) if client: - futures = client.compute(lazy_values, num_workers=p.num_workers) + # Compute all the values + futures = client.compute(lazy_values) + # Later, gather the results when needed results = client.gather(futures) else: results = compute( diff --git a/ogcore/__init__.py b/ogcore/__init__.py index e733effbf..bbcb58e1a 100644 --- a/ogcore/__init__.py +++ b/ogcore/__init__.py @@ -20,4 +20,4 @@ from ogcore.txfunc import * from ogcore.utils import * -__version__ = "0.14.2" +__version__ = "0.14.3" diff --git a/ogcore/txfunc.py b/ogcore/txfunc.py index 88a83afa2..a6df207fb 100644 --- a/ogcore/txfunc.py +++ b/ogcore/txfunc.py @@ -288,9 +288,6 @@ def get_tax_rates( elif ( income.ndim == 2 ): # I think only calls here are for loops over S and J - # for s in range(income.shape[0]): - # for j in range(income.shape[1]): - # txrates[s, j] = params[s][j][0](income[s, j]) txrates = [ [ params[s][j][0](income[s, j]) @@ -299,12 +296,6 @@ def get_tax_rates( for s in range(income.shape[0]) ] else: # to catch 3D arrays, looping over T, S, J - # for t in range(income.shape[0]): - # for s in range(income.shape[1]): - # for j in range(income.shape[2]): - # txrates[t, s, j] = params[t][s][j][0]( - # income[t, s, j] - # ) txrates = [ [ [ @@ -596,6 +587,8 @@ def txfunc_est( output_dir (str): output directory for saving plot files graph (bool): whether to plot the estimated functions compared to the data + params_init (Numpy array): initial values for the parameters + global_opt (bool): whether to use global optimization method Returns: (tuple): tax function estimation output: @@ -678,15 +671,6 @@ def txfunc_est( ) lb_max_x = np.maximum(min_x, 0.0) + 1e-4 lb_max_y = np.maximum(min_y, 0.0) + 1e-4 - # bnds = ( - # (1e-12, None), - # (1e-12, None), - # (1e-12, None), - # (1e-12, None), - # (lb_max_x, MAX_ETR + 0.15), - # (lb_max_y, MAX_ETR + 0.15), - # (0, 1), - # ) bnds = ( (1e-12, 9999), (1e-12, 9999), @@ -802,9 +786,9 @@ def txfunc_est( # Need to use a different functional form than for DEP function. # ''' if params_init is None: - phi0_init = 1.0 - phi1_init = 1.0 - phi2_init = 1.0 + phi0_init = 0.3 + phi1_init = 0.3 + phi2_init = 0.01 params_init = np.array([phi0_init, phi1_init, phi2_init]) tx_objs = ( np.array([None]), @@ -815,8 +799,7 @@ def txfunc_est( tax_func_type, rate_type, ) - # bnds = ((1e-12, None), (1e-12, None), (1e-12, None)) - bnds = ((1e-12, 9999), (1e-12, 9999), (1e-12, 9999)) + bnds = ((1e-12, 1.0), (1e-12, 1.0), (1e-12, 1.0)) if global_opt: params_til = opt.differential_evolution( wsumsq, bounds=bnds, args=(tx_objs), seed=1 @@ -839,15 +822,16 @@ def txfunc_est( # set initial values to parameter estimates params_init = np.array([phi0til, phi1til, phi2til]) elif tax_func_type == "HSV": - # ''' - # Estimate Heathcote, Storesletten, Violante (2017) parameters via - # OLS - # ''' + """ + Estimate Heathcote, Storesletten, Violante (2017) parameters via + OLS + """ constant = np.ones_like(income) ln_income = np.log(income) X_mat = np.column_stack((constant, ln_income)) Y_vec = np.log(1 - txrates) - param_est = np.linalg.inv(X_mat.T @ X_mat) @ X_mat.T @ Y_vec + W = np.diag(wgts) + param_est = np.linalg.inv(X_mat.T @ W @ X_mat) @ X_mat.T @ W @ Y_vec params = np.zeros(numparams) if rate_type == "etr": ln_lambda_s_hat, minus_tau_s_hat = param_est @@ -858,16 +842,15 @@ def txfunc_est( params[:2] = np.array([lambda_s_hat, -minus_tau_s_hat]) # Calculate the WSSE Y_hat = X_mat @ params - # wsse = ((Y_vec - Y_hat) ** 2 * wgts).sum() - wsse = ((Y_vec - Y_hat) ** 2).sum() + wsse = ((Y_vec - Y_hat) ** 2 * wgts).sum() obs = df.shape[0] params_to_plot = params elif tax_func_type == "linear": - # ''' - # For linear rates, just take the mean ETR or MTR by age-year. - # Can use DEP form and set all parameters except for the shift - # parameter to zero. - # ''' + """ + For linear rates, just take the mean ETR or MTR by age-year. + Can use DEP form and set all parameters except for the shift + parameter to zero. + """ params = np.zeros(numparams) wsse = 0.0 obs = df.shape[0] @@ -1704,7 +1687,7 @@ def tax_func_estimate( ) ) if client: - futures = client.compute(lazy_values, num_workers=num_workers) + futures = client.compute(lazy_values) results = client.gather(futures) else: results = results = compute( diff --git a/setup.py b/setup.py index 4ad5c2b44..f04f572d2 100755 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="ogcore", - version="0.14.2", + version="0.14.3", author="Jason DeBacker and Richard W. Evans", license="CC0 1.0 Universal (CC0 1.0) Public Domain Dedication", description="A general equilibrium overlapping generations model for fiscal policy analysis", diff --git a/tests/test_io_data/tax_func_estimate_outputs.pkl b/tests/test_io_data/tax_func_estimate_outputs.pkl index eb9a071a6..9bf2c6d20 100644 Binary files a/tests/test_io_data/tax_func_estimate_outputs.pkl and b/tests/test_io_data/tax_func_estimate_outputs.pkl differ diff --git a/tests/test_io_data/tax_func_loop_outputs.pkl b/tests/test_io_data/tax_func_loop_outputs.pkl index be5e711de..c21b7d14b 100644 Binary files a/tests/test_io_data/tax_func_loop_outputs.pkl and b/tests/test_io_data/tax_func_loop_outputs.pkl differ diff --git a/tests/test_txfunc.py b/tests/test_txfunc.py index 4aabc045c..6f0a34869 100644 --- a/tests/test_txfunc.py +++ b/tests/test_txfunc.py @@ -224,64 +224,27 @@ def test_replace_outliers(): ) -@pytest.mark.local # only marking as local because platform -# affects results from scipy.opt that is called in this test - so it'll -# pass if run on Mac with MKL, but not necessarily on other platforms @pytest.mark.parametrize( - "rate_type,tax_func_type,numparams,expected_tuple", + "rate_type,tax_func_type,true_params", [ - ("etr", "DEP", 12, expected_tuple_DEP), - ("etr", "DEP_totalinc", 6, expected_tuple_DEP_totalinc), - ("etr", "GS", 3, expected_tuple_GS), - ], - ids=["DEP", "DEP_totalinc", "GS"], -) -def test_txfunc_est( - rate_type, tax_func_type, numparams, expected_tuple, tmpdir -): - """ - Test txfunc.txfunc_est() function. The test is that given - inputs from previous run, the outputs are unchanged. - """ - micro_data = utils.safe_read_pickle( - os.path.join(CUR_PATH, "test_io_data", "micro_data_dict_for_tests.pkl") - ) - s = 80 - t = 2030 - df = txfunc.tax_data_sample(micro_data[str(t)]) - output_dir = tmpdir - # Put old df variables into new df var names - df.rename( - columns={ - "MTR labor income": "mtr_labinc", - "MTR capital income": "mtr_capinc", - "Total labor income": "total_labinc", - "Total capital income": "total_capinc", - "ETR": "etr", - "expanded_income": "market_income", - "Weights": "weight", - }, - inplace=True, - ) - test_tuple = txfunc.txfunc_est( - df, s, t, rate_type, tax_func_type, numparams, output_dir, True - ) - - for i, v in enumerate(expected_tuple): - assert np.allclose(test_tuple[i], v) - - -@pytest.mark.parametrize( - "rate_type,tax_func_type,numparams,expected_tuple", - [ - ("etr", "linear", 1, expected_tuple_linear), - ("mtrx", "linear", 1, expected_tuple_linear_mtrx), - ("mtry", "linear", 1, expected_tuple_linear_mtry), - ("etr", "HSV", 2, expected_tuple_HSV), - ("mtrx", "HSV", 2, expected_tuple_HSV_mtrx), - ("mtry", "HSV", 2, expected_tuple_HSV_mtry), + # ("etr", "DEP", [6.28E-12, 4.36E-05, 1.04E-23, 7.77E-09, 0.80, 0.80, 0.84, -0.14, -0.15, 0.15, 0.16, -0.15]), + ( + "etr", + "DEP_totalinc", + [6.28e-12, 4.36e-05, 0.35, -0.14, 0.15, -0.15], + ), + ("etr", "GS", [0.35, 0.25, 0.03]), + ("etr", "linear", [0.25]), + ("mtrx", "linear", [0.4]), + ("mtry", "linear", [0.1]), + ("etr", "HSV", [0.5, 0.1]), + ("mtrx", "HSV", [0.5, 0.1]), + ("mtry", "HSV", [0.4, 0.15]), ], + # ids=["DEP", "DEP_totalinc", "GS", "linear, etr", ids=[ + "DEP_totalinc", + "GS", "linear, etr", "linear, mtrx", "linear, mtry", @@ -290,40 +253,100 @@ def test_txfunc_est( "HSV, mtry", ], ) -def test_txfunc_est_on_GH( - rate_type, tax_func_type, numparams, expected_tuple, tmpdir -): +def test_txfunc_est(rate_type, tax_func_type, true_params, tmpdir): """ - Test txfunc.txfunc_est() function. The test is that given - inputs from previous run, the outputs are unchanged. + Test txfunc.txfunc_est() function. The test the estimator can + recover (close to) the true parameters. """ - micro_data = utils.safe_read_pickle( - os.path.join(CUR_PATH, "test_io_data", "micro_data_dict_for_tests.pkl") + # Generate data based on true parameters + N = 20_000 + weights = np.ones(N) + x = np.random.uniform(0, 500_000, size=N) + y = np.random.uniform(0, 500_000, size=N) + eps1 = np.random.normal(scale=0.00001, size=N) + eps2 = np.random.normal(scale=0.00001, size=N) + eps3 = np.random.normal(scale=0.00001, size=N) + micro_data = pd.DataFrame( + { + "total_capinc": y, + "total_labinc": x, + "weight": weights, + "total_tax": ( + ( + txfunc.get_tax_rates( + true_params, + x, + y, + weights, + tax_func_type, + rate_type="etr", + for_estimation=False, + ) + + eps1 + ) + * (x + y) + ), + "etr": ( + txfunc.get_tax_rates( + true_params, + x, + y, + weights, + tax_func_type, + rate_type="etr", + for_estimation=False, + ) + + eps1 + ), + "mtr_labinc": ( + txfunc.get_tax_rates( + true_params, + x, + y, + weights, + tax_func_type, + rate_type="mtr", + for_estimation=False, + ) + + eps2 + ), + "mtr_capinc": ( + txfunc.get_tax_rates( + true_params, + x, + y, + weights, + tax_func_type, + rate_type="mtr", + for_estimation=False, + ) + + eps3 + ), + } ) - s = 80 - t = 2030 - df = txfunc.tax_data_sample(micro_data[str(t)]) + micro_data["age"] = 44 + micro_data["year"] = 2025 output_dir = tmpdir - # Put old df variables into new df var names - df.rename( - columns={ - "MTR labor income": "mtr_labinc", - "MTR capital income": "mtr_capinc", - "Total labor income": "total_labinc", - "Total capital income": "total_capinc", - "ETR": "etr", - "expanded_income": "market_income", - "Weights": "weight", - }, - inplace=True, - ) - test_tuple = txfunc.txfunc_est( - df, s, t, rate_type, tax_func_type, numparams, output_dir, True + param_est, _, obs, _ = txfunc.txfunc_est( + micro_data, + 44, + 2025, + rate_type, + tax_func_type, + len(true_params), + output_dir, + True, ) - for i, v in enumerate(expected_tuple): - print("For element", i, ", test tuple =", test_tuple[i]) - assert np.allclose(test_tuple[i], v, rtol=0.0, atol=1e-04) + assert obs == micro_data.shape[0] + print("Estimated parameters:", param_est) + print("Diffs = ", np.absolute(param_est - true_params)) + if "DEP" in tax_func_type: + assert np.allclose(param_est, true_params, atol=0.2, rtol=0.01) + else: + assert np.allclose(param_est, true_params, atol=0.0, rtol=0.01) + # TODO: maybe the test is that the true parameters are in the + # 95% confidence interval of the estimated parameters def test_txfunc_est_exception(tmpdir): @@ -362,9 +385,6 @@ def test_tax_data_sample(): assert isinstance(df, pd.DataFrame) -@pytest.mark.local -# mark as local run since results work on Mac, but differ on other -# platforms def test_tax_func_loop(): """ Test txfunc.tax_func_loop() function. The test is that given inputs from @@ -390,7 +410,8 @@ def test_tax_func_loop(): numparams, tpers, ) = input_tuple - tax_func_type = "DEP" + tax_func_type = "HSV" + numparams = 2 # Rename and create vars to suit new micro_data var names micro_data["total_labinc"] = ( micro_data["Wage income"] + micro_data["SE income"] @@ -438,7 +459,6 @@ def test_tax_func_loop(): output_dir, numparams, ) - expected_tuple = utils.safe_read_pickle( os.path.join(CUR_PATH, "test_io_data", "tax_func_loop_outputs.pkl") ) @@ -732,7 +752,7 @@ def test_tax_func_estimate(tmpdir, dask_client): client, num_workers, ) = input_tuple - tax_func_type = "DEP" + tax_func_type = "HSV" age_specific = False BW = 1 test_path = os.path.join(tmpdir, "test_out.pkl")