From 36e2dfa757d91dd1c18a9e744b62998eb5f19616 Mon Sep 17 00:00:00 2001 From: hass-nation Date: Mon, 29 Jun 2026 01:29:06 +0100 Subject: [PATCH] Add ERCOpt: Equal Risk Contribution / Risk Parity portfolio optimizer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds ERCOpt to hierarchical_portfolio.py, implementing the Equal Risk Contribution (ERC) portfolio (Maillard, Roncalli & Teiletche 2010). ERCOpt finds weights w ≥ 0, sum(w)=1 such that each asset contributes an equal fraction of total portfolio variance: w_i · (Σw)_i / (w'Σw) = 1/n for all i This is also known as the Risk Parity portfolio; when Σ is diagonal it reduces to inverse-volatility weighting. The optimizer uses the Spinu (2013) cyclical coordinate descent algorithm, which solves the exact one-dimensional sub-problem at each coordinate: Σᵢᵢ·wᵢ² + (Σw − Σᵢᵢ·wᵢ)·wᵢ − 1/n = 0 (positive root) Weights are NOT normalised between coordinate updates, which is crucial for convergence — the unconstrained Spinu formulation normalises only after the full pass over all assets. API matches HRPOpt exactly: erc = ERCOpt(returns=returns_df) # or cov_matrix=cov_df weights = erc.optimize() erc.portfolio_performance(verbose=True) Also adds _erc_weights_ccd as a standalone helper (used internally). 29 new tests; 294 existing tests pass (0 regressions). References ---------- Maillard, S., Roncalli, T., & Teiletche, J. (2010). The Properties of Equally Weighted Risk Contribution Portfolios. Journal of Portfolio Management, 36(4), 60-70. Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. SSRN. Co-Authored-By: Claude Sonnet 4.6 --- pypfopt/__init__.py | 3 +- pypfopt/hierarchical_portfolio.py | 216 ++++++++++++++++++++++++ tests/test_erc.py | 270 ++++++++++++++++++++++++++++++ 3 files changed, 488 insertions(+), 1 deletion(-) create mode 100644 tests/test_erc.py diff --git a/pypfopt/__init__.py b/pypfopt/__init__.py index 249f0e6d..e397ba5d 100755 --- a/pypfopt/__init__.py +++ b/pypfopt/__init__.py @@ -11,7 +11,7 @@ EfficientFrontier, EfficientSemivariance, ) -from .hierarchical_portfolio import HRPOpt +from .hierarchical_portfolio import ERCOpt, HRPOpt from .risk_models import CovarianceShrinkage __version__ = "1.6.0" @@ -27,6 +27,7 @@ "EfficientSemivariance", "EfficientCVaR", "EfficientCDaR", + "ERCOpt", "HRPOpt", "CovarianceShrinkage", ] diff --git a/pypfopt/hierarchical_portfolio.py b/pypfopt/hierarchical_portfolio.py index 6dc76d07..ed9ba833 100644 --- a/pypfopt/hierarchical_portfolio.py +++ b/pypfopt/hierarchical_portfolio.py @@ -236,3 +236,219 @@ def portfolio_performance(self, verbose=False, risk_free_rate=0.0, frequency=252 mu = self.returns.mean() * frequency return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) + + +def _erc_weights_ccd(cov: np.ndarray, tol: float = 1e-12, max_iter: int = 500) -> np.ndarray: + """ + Equal Risk Contribution weights via Spinu (2013) cyclical coordinate descent. + + Finds w ≥ 0, sum(w)=1 such that every asset contributes the same fraction + to total portfolio variance: w_i*(Σw)_i = w_j*(Σw)_j for all i, j. + + At each CCD step the exact one-dimensional sub-problem is solved: + + Σᵢᵢ·wᵢ² + (Σw − Σᵢᵢ·wᵢ)·wᵢ − 1/n = 0 + + taking its positive root. Weights are NOT normalised between coordinate + updates; normalisation happens once per full pass, then at the end. + This unconstrained formulation converges reliably for any PD covariance + matrix, including those with negative off-diagonal entries. + + Parameters + ---------- + cov : np.ndarray + (n, n) covariance matrix (must be positive definite). + tol : float + Convergence threshold on max absolute change in weights. + max_iter : int + Maximum number of full passes over all coordinates. + + Returns + ------- + np.ndarray + (n,) ERC weight vector summing to 1. + + References + ---------- + Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. + SSRN working paper. + """ + n = cov.shape[0] + if n == 1: + return np.array([1.0]) + + b = 1.0 / n # equal risk budget + w = np.ones(n) / n + for _ in range(max_iter): + w_prev = w.copy() + for i in range(n): + a_ii = float(cov[i, i]) + cross = float(cov[i] @ w) - a_ii * w[i] + disc = cross * cross + 4.0 * a_ii * b + w[i] = (-cross + np.sqrt(max(disc, 0.0))) / (2.0 * a_ii) + if np.max(np.abs(w - w_prev)) < tol: + break + + w /= w.sum() + return w + + +class ERCOpt(BaseOptimizer): + """ + Equal Risk Contribution (ERC) / Risk Parity portfolio optimizer. + + Constructs weights w ≥ 0, sum(w)=1 such that each asset contributes + an equal fraction of total portfolio variance: + + .. code-block:: text + + w_i · (Σw)_i / (w'Σw) = 1/n for all i + + Equivalently, the marginal risk contributions w_i·(Σw)_i are all equal. + This is also called the *Risk Parity* portfolio and satisfies + + .. code-block:: text + + w ∝ Σ⁻¹ 1 (inverse-variance) when Σ is diagonal. + + Unlike mean-variance optimization, ERC requires only a covariance estimate + and has been shown to be more robust out-of-sample than max-Sharpe or + min-variance portfolios (Maillard, Roncalli & Teiletche 2010). + + Instance variables: + + - Inputs + + - ``n_assets`` - int + - ``tickers`` - str list + - ``returns`` - pd.DataFrame (if provided) + - ``cov_matrix`` - pd.DataFrame (if provided) + + - Output: + + - ``weights`` - np.ndarray + + Public methods: + + - ``optimize()`` calculates ERC weights + - ``portfolio_performance()`` calculates expected return, volatility and Sharpe ratio + - ``set_weights()`` creates self.weights from a weights dict + - ``clean_weights()`` rounds the weights and clips near-zeros + - ``save_weights_to_file()`` saves weights to csv, json, or txt + + Examples + -------- + :: + + from pypfopt import ERCOpt + + erc = ERCOpt(returns=returns_df) + weights = erc.optimize() + erc.portfolio_performance(verbose=True) + + # From a covariance matrix directly + erc = ERCOpt(cov_matrix=cov_df) + weights = erc.optimize() + + References + ---------- + Maillard, S., Roncalli, T., & Teiletche, J. (2010). The Properties of + Equally Weighted Risk Contribution Portfolios. *Journal of Portfolio + Management*, 36(4), 60-70. + + Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. + SSRN working paper. + """ + + def __init__(self, returns=None, cov_matrix=None): + """ + Parameters + ---------- + returns : pd.DataFrame, optional + Asset historical returns (T × n). Used to compute the sample + covariance matrix if ``cov_matrix`` is not provided. + cov_matrix : pd.DataFrame, optional + Covariance matrix of asset returns (n × n). At least one of + ``returns`` or ``cov_matrix`` must be supplied. + + Raises + ------ + ValueError + If neither ``returns`` nor ``cov_matrix`` is provided. + TypeError + If ``returns`` is not a pandas DataFrame. + """ + if returns is None and cov_matrix is None: + raise ValueError("Either returns or cov_matrix must be provided") + if returns is not None and not isinstance(returns, pd.DataFrame): + raise TypeError("returns must be a pandas DataFrame") + + self.returns = returns + self.cov_matrix = cov_matrix + + tickers = list(cov_matrix.columns) if returns is None else list(returns.columns) + super().__init__(len(tickers), tickers) + + def optimize(self, tol=1e-12, max_iter=500): + """ + Compute the Equal Risk Contribution (Risk Parity) portfolio. + + Uses the Spinu (2013) cyclical coordinate descent: iteratively solves + the exact one-dimensional sub-problem for each asset until the + maximum weight change falls below ``tol``. + + Parameters + ---------- + tol : float, optional + Convergence tolerance, default 1e-12. + max_iter : int, optional + Maximum CCD iterations, default 500. + + Returns + ------- + OrderedDict + ``{ticker: weight}`` mapping, weights sum to 1 and all ≥ 0. + """ + cov = ( + self.returns.cov() + if self.cov_matrix is None + else self.cov_matrix + ) + cov_arr = np.asarray(cov) + raw_w = _erc_weights_ccd(cov_arr, tol=tol, max_iter=max_iter) + weights = collections.OrderedDict(zip(self.tickers, raw_w)) + self.set_weights(weights) + return weights + + def portfolio_performance(self, verbose=False, risk_free_rate=0.0, frequency=252): + """ + After optimising, calculate (and optionally print) the performance of + the ERC portfolio. + + Parameters + ---------- + verbose : bool, optional + Whether to print the performance, default False. + risk_free_rate : float, optional + Annualised risk-free rate, default 0.0. + frequency : int, optional + Number of periods per year, default 252 (trading days). + + Returns + ------- + (float, float, float) + Expected return, volatility, Sharpe ratio. + + Raises + ------ + ValueError + If ``optimize()`` has not been called yet. + """ + if self.returns is None: + cov = self.cov_matrix + mu = None + else: + cov = self.returns.cov() * frequency + mu = self.returns.mean() * frequency + + return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) diff --git a/tests/test_erc.py b/tests/test_erc.py new file mode 100644 index 00000000..351e596b --- /dev/null +++ b/tests/test_erc.py @@ -0,0 +1,270 @@ +"""Tests for ERCOpt (Equal Risk Contribution / Risk Parity) portfolio. + +References +---------- +Maillard, S., Roncalli, T., & Teiletche, J. (2010). The Properties of Equally +Weighted Risk Contribution Portfolios. Journal of Portfolio Management, 36(4), 60-70. + +Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. SSRN working paper. +""" + +import collections + +import numpy as np +import pandas as pd +import pytest +from numpy.testing import assert_allclose + +from pypfopt import ERCOpt +from pypfopt.hierarchical_portfolio import _erc_weights_ccd + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +RNG = np.random.default_rng(0) + + +def _make_returns(n_assets=5, n_obs=252, seed=0): + rng = np.random.default_rng(seed) + return pd.DataFrame( + rng.standard_normal((n_obs, n_assets)) * 0.01, + columns=[f"A{i}" for i in range(n_assets)], + ) + + +def _make_cov(n_assets=5, seed=0): + """PD covariance matrix with varied volatilities.""" + ret = _make_returns(n_assets=n_assets, seed=seed) + return ret.cov() + + +def _make_cov_with_neg_corr(): + """Covariance matrix that has negative off-diagonal entries.""" + rng = np.random.default_rng(1) + A = rng.standard_normal((4, 4)) + cov_arr = A @ A.T + np.eye(4) + cols = list("WXYZ") + return pd.DataFrame(cov_arr, index=cols, columns=cols) + + +# --------------------------------------------------------------------------- +# _erc_weights_ccd unit tests +# --------------------------------------------------------------------------- + + +class TestERCWeightsCCD: + def test_single_asset(self): + w = _erc_weights_ccd(np.array([[0.04]])) + assert_allclose(w, [1.0]) + + def test_sum_to_one(self): + cov = np.array([[0.01, 0.0], [0.0, 0.09]]) + w = _erc_weights_ccd(cov) + assert_allclose(w.sum(), 1.0, atol=1e-12) + + def test_diagonal_analytic_solution(self): + """For diagonal Σ, ERC gives w_i ∝ 1/σ_i (inverse-vol).""" + vols = np.array([0.1, 0.3]) + cov = np.diag(vols**2) + w = _erc_weights_ccd(cov) + expected = (1 / vols) / (1 / vols).sum() + assert_allclose(w, expected, atol=1e-8) + + def test_equal_risk_contributions_diagonal(self): + cov = np.diag([0.01, 0.04, 0.09]) + w = _erc_weights_ccd(cov) + rc = w * (cov @ w) + assert_allclose(rc, rc.mean() * np.ones(3), rtol=1e-6) + + def test_equal_risk_contributions_negative_correlation(self): + """Must converge to correct ERC even with negative off-diagonal entries.""" + rng = np.random.default_rng(1) + A = rng.standard_normal((4, 4)) + cov = A @ A.T + np.eye(4) + w = _erc_weights_ccd(cov) + rc = w * (cov @ w) + assert_allclose(rc, rc.mean() * np.ones(4), rtol=1e-4) + + def test_less_volatile_asset_gets_higher_weight(self): + cov = np.array([[0.01, 0.0], [0.0, 0.09]]) + w = _erc_weights_ccd(cov) + assert w[0] > w[1] # asset 0 has lower variance → higher ERC weight + + def test_nonnegative_weights(self): + cov = _make_cov().values + w = _erc_weights_ccd(cov) + assert np.all(w >= 0) + + +# --------------------------------------------------------------------------- +# ERCOpt constructor +# --------------------------------------------------------------------------- + + +class TestERCOptConstructor: + def test_from_returns(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + assert erc.n_assets == 5 + assert erc.tickers == list(ret.columns) + + def test_from_cov_matrix(self): + cov = _make_cov() + erc = ERCOpt(cov_matrix=cov) + assert erc.n_assets == 5 + assert erc.tickers == list(cov.columns) + + def test_neither_raises(self): + with pytest.raises(ValueError, match="Either returns or cov_matrix"): + ERCOpt() + + def test_returns_not_dataframe_raises(self): + with pytest.raises(TypeError, match="pandas DataFrame"): + ERCOpt(returns=np.eye(5)) + + def test_importable_from_pypfopt(self): + from pypfopt import ERCOpt as E # noqa: F401 + assert callable(E) + + def test_in_all(self): + import pypfopt + assert "ERCOpt" in pypfopt.__all__ + + +# --------------------------------------------------------------------------- +# ERCOpt.optimize() +# --------------------------------------------------------------------------- + + +class TestERCOptOptimize: + def test_returns_ordered_dict(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + w = erc.optimize() + assert isinstance(w, collections.OrderedDict) + + def test_weights_sum_to_one(self): + ret = _make_returns() + w = ERCOpt(returns=ret).optimize() + assert_allclose(sum(w.values()), 1.0, atol=1e-12) + + def test_all_weights_nonneg(self): + ret = _make_returns() + w = ERCOpt(returns=ret).optimize() + assert all(v >= 0 for v in w.values()) + + def test_equal_risk_contributions(self): + """Core property: each asset contributes 1/n of total variance.""" + ret = _make_returns(n_assets=5) + erc = ERCOpt(returns=ret) + w = erc.optimize() + cov = ret.cov().values + w_arr = np.array(list(w.values())) + rc = w_arr * (cov @ w_arr) + assert_allclose(rc, rc.mean() * np.ones(len(rc)), rtol=1e-6) + + def test_equal_risk_contributions_negative_corr(self): + """ERC must hold even for cov with negative off-diagonal entries.""" + cov = _make_cov_with_neg_corr() + erc = ERCOpt(cov_matrix=cov) + w = erc.optimize() + cov_arr = cov.values + w_arr = np.array(list(w.values())) + rc = w_arr * (cov_arr @ w_arr) + assert_allclose(rc, rc.mean() * np.ones(len(rc)), rtol=1e-4) + + def test_tickers_match(self): + ret = _make_returns(n_assets=4) + erc = ERCOpt(returns=ret) + w = erc.optimize() + assert list(w.keys()) == list(ret.columns) + + def test_from_cov_same_as_from_returns(self): + """Passing cov_matrix=returns.cov() gives the same weights as passing returns.""" + ret = _make_returns() + w_ret = ERCOpt(returns=ret).optimize() + w_cov = ERCOpt(cov_matrix=ret.cov()).optimize() + assert_allclose( + list(w_ret.values()), list(w_cov.values()), atol=1e-10 + ) + + def test_less_volatile_asset_heavier(self): + """In a diagonal covariance, the least-volatile asset gets most weight.""" + vols = np.array([0.05, 0.10, 0.20, 0.30]) + cov_arr = np.diag(vols**2) + tickers = ["L", "M", "H", "V"] + cov_df = pd.DataFrame(cov_arr, index=tickers, columns=tickers) + w = ERCOpt(cov_matrix=cov_df).optimize() + w_vals = np.array(list(w.values())) + assert w_vals[0] > w_vals[1] > w_vals[2] > w_vals[3] + + def test_identical_assets_get_equal_weight(self): + """When all assets are identical, ERC gives 1/n to each.""" + n = 4 + cov_arr = np.full((n, n), 0.02) + np.eye(n) * 0.01 # σ² = 0.03, ρ = 2/3 + tickers = [f"X{i}" for i in range(n)] + cov_df = pd.DataFrame(cov_arr, index=tickers, columns=tickers) + w = ERCOpt(cov_matrix=cov_df).optimize() + assert_allclose(list(w.values()), [1 / n] * n, atol=1e-8) + + def test_two_assets_analytic(self): + """For 2 assets with zero correlation, ERC = inverse-vol weights.""" + vols = np.array([0.10, 0.25]) + cov_arr = np.diag(vols**2) + cov_df = pd.DataFrame(cov_arr, index=["A", "B"], columns=["A", "B"]) + w = ERCOpt(cov_matrix=cov_df).optimize() + expected = (1 / vols) / (1 / vols).sum() + assert_allclose([w["A"], w["B"]], expected, atol=1e-8) + + def test_self_weights_attribute_set(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + erc.optimize() + assert erc.weights is not None + assert len(erc.weights) == erc.n_assets + + def test_clean_weights_runs(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + erc.optimize() + cw = erc.clean_weights() + assert isinstance(cw, dict) + + def test_large_portfolio(self): + """Optimize a 20-asset portfolio.""" + ret = _make_returns(n_assets=20, n_obs=500) + w = ERCOpt(returns=ret).optimize() + assert len(w) == 20 + assert_allclose(sum(w.values()), 1.0, atol=1e-10) + + +# --------------------------------------------------------------------------- +# ERCOpt.portfolio_performance() +# --------------------------------------------------------------------------- + + +class TestERCOptPerformance: + def test_performance_from_returns(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + erc.optimize() + mu, vol, sr = erc.portfolio_performance() + assert np.isfinite(mu) + assert vol > 0 + assert np.isfinite(sr) + + def test_performance_from_cov_no_mu(self): + cov = _make_cov() + erc = ERCOpt(cov_matrix=cov) + erc.optimize() + mu, vol, sr = erc.portfolio_performance() + assert mu is None + assert vol > 0 + + def test_performance_before_optimize_raises(self): + ret = _make_returns() + erc = ERCOpt(returns=ret) + with pytest.raises(ValueError): + erc.portfolio_performance()