From 2043afdcd2878b75c6195ee7d7e2fb5891c32387 Mon Sep 17 00:00:00 2001 From: hass-nation Date: Sat, 27 Jun 2026 23:06:12 +0100 Subject: [PATCH 1/2] feat(hierarchical): add HERCOpt and NCOpt portfolio allocation methods Extends the hierarchical portfolio module with two additional methods: HERCOpt - Hierarchical Equal Risk Contribution (Raffinot 2018) Equal Risk Contribution (ERC) weights at every level of the hierarchy instead of the inverse-variance weights used by HRP. ERC accounts for within-cluster correlations, producing more balanced risk allocation when intra-cluster correlations are high. Implemented via cyclical coordinate descent (Roncalli 2013). NCOpt - Nested Cluster Optimization (Lopez de Prado 2019) Two-level nested procedure: within-cluster optimization followed by meta-portfolio optimization across clusters. Supports three objectives at each level - min_variance, erc, and equal-weight - giving 9 combinations. The meta-covariance is computed analytically from the full covariance matrix, so no return history is required. Both classes: - Share the same API as HRPOpt (fit from returns or cov_matrix) - Export portfolio_performance() with expected return and Sharpe ratio - Are exported from pypfopt.__init__ and __all__ - Accept all scipy linkage methods Also adds two private helpers: - _erc_weights_ccd: ERC via multiplicative CCD (Roncalli 2013) - _min_var_weights: long-only min-variance with analytic + SLSQP fallback 53 tests across 9 test classes; all existing HRP tests still pass. References ---------- Raffinot, T. (2018). Hierarchical clustering-based asset allocation. Journal of Portfolio Management, 44(2), 89-99. Lopez de Prado, M. (2019). A Robust Estimator of the Efficient Frontier. SSRN Working Paper. https://ssrn.com/abstract=3469961 Roncalli, T. (2013). Introduction to Risk Parity and Budgeting. CRC Press. Co-Authored-By: Claude Sonnet 4.6 --- pypfopt/__init__.py | 4 +- pypfopt/hierarchical_portfolio.py | 584 ++++++++++++++++++++++++++++++ tests/test_herc_nco.py | 410 +++++++++++++++++++++ 3 files changed, 997 insertions(+), 1 deletion(-) create mode 100644 tests/test_herc_nco.py diff --git a/pypfopt/__init__.py b/pypfopt/__init__.py index 249f0e6d..a2595e91 100755 --- a/pypfopt/__init__.py +++ b/pypfopt/__init__.py @@ -11,7 +11,7 @@ EfficientFrontier, EfficientSemivariance, ) -from .hierarchical_portfolio import HRPOpt +from .hierarchical_portfolio import HRPOpt, HERCOpt, NCOpt from .risk_models import CovarianceShrinkage __version__ = "1.6.0" @@ -28,5 +28,7 @@ "EfficientCVaR", "EfficientCDaR", "HRPOpt", + "HERCOpt", + "NCOpt", "CovarianceShrinkage", ] diff --git a/pypfopt/hierarchical_portfolio.py b/pypfopt/hierarchical_portfolio.py index 6dc76d07..6f216cf0 100644 --- a/pypfopt/hierarchical_portfolio.py +++ b/pypfopt/hierarchical_portfolio.py @@ -10,6 +10,12 @@ - ``HRPOpt`` implements the Hierarchical Risk Parity (HRP) portfolio. Code reproduced with permission from Marcos Lopez de Prado (2016). +- ``HERCOpt`` implements the Hierarchical Equal Risk Contribution (HERC) portfolio + (Raffinot 2018), which uses Equal Risk Contribution weights at each level of the + hierarchy rather than the inverse-variance weights used by HRP. +- ``NCOpt`` implements the Nested Cluster Optimization (NCO) portfolio + (Lopez de Prado 2019), which optimizes within clusters and then across clusters + in a two-level nested procedure. """ import collections @@ -18,6 +24,7 @@ import pandas as pd import scipy.cluster.hierarchy as sch import scipy.spatial.distance as ssd +from scipy.optimize import minimize from pypfopt.base import BaseOptimizer, portfolio_performance from pypfopt.risk_models import cov_to_corr @@ -236,3 +243,580 @@ def portfolio_performance(self, verbose=False, risk_free_rate=0.0, frequency=252 mu = self.returns.mean() * frequency return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) + + +def _erc_weights_ccd(cov: np.ndarray, tol: float = 1e-12, max_iter: int = 500) -> np.ndarray: + """ + Equal Risk Contribution weights via cyclical coordinate descent. + + Finds w ≥ 0, sum(w)=1, such that w_i*(Σw)_i = w_j*(Σw)_j for all i, j — + i.e., every asset contributes the same fraction to total portfolio variance. + + Uses the multiplicative update of Roncalli (2013): + w_i ← w_i * sqrt(target_budget / marginal_risk_contribution_i) + normalised at each step. + + Parameters + ---------- + cov : np.ndarray + (n, n) covariance matrix (must be PD). + tol : float + Convergence threshold on max change in weights. + max_iter : int + Maximum iterations. + + Returns + ------- + np.ndarray + (n,) ERC weight vector summing to 1. + """ + n = cov.shape[0] + if n == 1: + return np.array([1.0]) + + w = np.ones(n) / n + for _ in range(max_iter): + Sigma_w = cov @ w + port_var = float(w @ Sigma_w) + rc = w * Sigma_w # risk contributions (unnormalised) + target = port_var / n # equal budget + # Multiplicative update: w_i ← w_i * sqrt(target / rc_i) + w_new = w * np.sqrt(target / np.maximum(rc, 1e-30)) + w_new = np.maximum(w_new, 0.0) + w_new /= w_new.sum() + if np.max(np.abs(w_new - w)) < tol: + w = w_new + break + w = w_new + + return w + + +def _min_var_weights(cov: np.ndarray) -> np.ndarray: + """ + Long-only minimum variance weights. + + Uses the analytic unconstrained solution (Σ⁻¹ 1 / 1ᵀ Σ⁻¹ 1) when all + resulting weights are non-negative; otherwise falls back to SLSQP. + + Parameters + ---------- + cov : np.ndarray + (n, n) covariance matrix. + + Returns + ------- + np.ndarray + (n,) weight vector summing to 1, all ≥ 0. + """ + n = cov.shape[0] + if n == 1: + return np.array([1.0]) + + try: + inv_cov = np.linalg.inv(cov) + ones = np.ones(n) + w = inv_cov @ ones / (ones @ inv_cov @ ones) + if (w >= -1e-8).all(): + w = np.maximum(w, 0.0) + return w / w.sum() + except np.linalg.LinAlgError: + pass + + # Long-only constrained via SLSQP + res = minimize( + lambda w_: float(w_ @ cov @ w_), + np.ones(n) / n, + method="SLSQP", + bounds=[(0.0, 1.0)] * n, + constraints=[{"type": "eq", "fun": lambda w_: w_.sum() - 1.0}], + options={"ftol": 1e-12, "maxiter": 1000}, + ) + w = np.maximum(res.x, 0.0) + return w / w.sum() + + +class HERCOpt(BaseOptimizer): + """ + A HERCOpt object constructs a Hierarchical Equal Risk Contribution (HERC) + portfolio (Raffinot, 2018). + + HERC extends Hierarchical Risk Parity (HRP) by replacing the + inverse-variance weighting with Equal Risk Contribution (ERC) at every + level of the hierarchy. The structure is otherwise identical to HRP: + + 1. Compute the correlation-based distance matrix. + 2. Perform hierarchical clustering. + 3. Order assets by quasi-diagonal traversal. + 4. Recursively bisect, allocating between sub-clusters in proportion to + the **inverse of each cluster's ERC-portfolio variance** (rather than + the inverse-variance-portfolio variance used by HRP). + + Because ERC accounts for within-cluster correlations when computing the + cluster's effective risk, HERC tends to produce more balanced risk + allocations than HRP, especially when intra-cluster correlations are high. + + Instance variables: + + - Inputs + + - ``n_assets`` - int + - ``tickers`` - str list + - ``returns`` - pd.DataFrame or None + - ``cov_matrix`` - pd.DataFrame or None + + - Output: + + - ``weights`` - OrderedDict + - ``clusters`` - linkage matrix + + Examples + -------- + .. code-block:: python + + from pypfopt import HERCOpt + herc = HERCOpt(returns=returns_df) + weights = herc.optimize() + herc.portfolio_performance(verbose=True) + + References + ---------- + Raffinot, T. (2018). Hierarchical clustering-based asset allocation. + *Journal of Portfolio Management*, 44(2), 89-99. + """ + + def __init__(self, returns=None, cov_matrix=None): + """ + Parameters + ---------- + returns : pd.DataFrame, optional + Asset historical returns (T × n). Either ``returns`` or + ``cov_matrix`` must be supplied. + cov_matrix : pd.DataFrame, optional + Covariance matrix of asset returns (n × n). + + Raises + ------ + ValueError + If neither ``returns`` nor ``cov_matrix`` is provided. + TypeError + If ``returns`` is not a DataFrame. + """ + if returns is None and cov_matrix is None: + raise ValueError("Either returns or cov_matrix must be provided") + if returns is not None and not isinstance(returns, pd.DataFrame): + raise TypeError("returns are not a dataframe") + + self.returns = returns + self.cov_matrix = cov_matrix + self.clusters = None + + tickers = list(cov_matrix.columns) if returns is None else list(returns.columns) + super().__init__(len(tickers), tickers) + + @staticmethod + def _get_cluster_var_erc(cov: pd.DataFrame, cluster_items: list) -> float: + """ + Variance of the ERC portfolio built from *cluster_items*. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + cluster_items : list + Asset names in the cluster. + + Returns + ------- + float + Portfolio variance under ERC weights. + """ + cov_slice = cov.loc[cluster_items, cluster_items].values + w = _erc_weights_ccd(cov_slice) + return float(w @ cov_slice @ w) + + @staticmethod + def _raw_herc_allocation(cov: pd.DataFrame, ordered_tickers: list) -> pd.Series: + """ + Recursive bisection allocation using ERC cluster variances. + + Identical in structure to HRP's ``_raw_hrp_allocation``, but calls + ``_get_cluster_var_erc`` instead of ``_get_cluster_var`` so that the + between-cluster split reflects ERC-weighted risk. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + ordered_tickers : list + Tickers ordered by quasi-diagonal traversal. + + Returns + ------- + pd.Series + Portfolio weights indexed by ticker. + """ + w = pd.Series(1.0, index=ordered_tickers) + cluster_items = [ordered_tickers] + + while len(cluster_items) > 0: + cluster_items = [ + i[j:k] + for i in cluster_items + for j, k in ((0, len(i) // 2), (len(i) // 2, len(i))) + if len(i) > 1 + ] + for i in range(0, len(cluster_items), 2): + first_cluster = cluster_items[i] + second_cluster = cluster_items[i + 1] + first_variance = HERCOpt._get_cluster_var_erc(cov, first_cluster) + second_variance = HERCOpt._get_cluster_var_erc(cov, second_cluster) + alpha = 1 - first_variance / (first_variance + second_variance) + w[first_cluster] *= alpha + w[second_cluster] *= 1 - alpha + + return w + + def optimize(self, linkage_method: str = "ward") -> collections.OrderedDict: + """ + Construct the HERC portfolio. + + Parameters + ---------- + linkage_method : str, optional (default ``"ward"``) + Scipy linkage method; any value accepted by + ``scipy.cluster.hierarchy.linkage``. Ward's method is recommended + for HERC as it minimises within-cluster variance at each merge. + + Returns + ------- + OrderedDict + Portfolio weights (asset → weight). + + Raises + ------ + ValueError + If ``linkage_method`` is not recognised by scipy. + """ + if linkage_method not in sch._LINKAGE_METHODS: + raise ValueError("linkage_method must be one recognised by scipy") + + if self.returns is None: + cov = self.cov_matrix + corr = cov_to_corr(self.cov_matrix).round(6) + else: + corr, cov = self.returns.corr(), self.returns.cov() + + matrix = np.sqrt(np.clip((1.0 - corr) / 2.0, a_min=0.0, a_max=1.0)) + dist = ssd.squareform(matrix, checks=False) + + self.clusters = sch.linkage(dist, linkage_method) + sort_ix = HRPOpt._get_quasi_diag(self.clusters) + ordered_tickers = corr.index[sort_ix].tolist() + + herc = HERCOpt._raw_herc_allocation(cov, ordered_tickers) + weights = collections.OrderedDict(herc.sort_index()) + self.set_weights(weights) + return weights + + def portfolio_performance( + self, verbose: bool = False, risk_free_rate: float = 0.0, frequency: int = 252 + ): + """ + Calculate (and optionally print) the performance of the HERC portfolio. + + Parameters + ---------- + verbose : bool, optional (default False) + Print the performance summary. + risk_free_rate : float, optional (default 0.0) + Risk-free rate for Sharpe ratio calculation. + frequency : int, optional (default 252) + Number of trading periods per year. + + Returns + ------- + (float, float, float) + Expected return, annual volatility, Sharpe ratio. + Expected return and Sharpe ratio are ``None`` when only a + covariance matrix was supplied. + """ + if self.returns is None: + cov = self.cov_matrix + mu = None + else: + cov = self.returns.cov() * frequency + mu = self.returns.mean() * frequency + + return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) + + +class NCOpt(BaseOptimizer): + """ + A NCOpt object constructs a Nested Cluster Optimization (NCO) portfolio + (Lopez de Prado, 2019). + + NCO partitions the asset universe into clusters, solves a portfolio + optimisation problem *within* each cluster, and then solves a second + optimisation *across* the resulting cluster-portfolios. This two-level + structure allows a large and correlated asset universe to be decomposed + into smaller, nearly independent subproblems. + + **Algorithm** + + 1. Compute the correlation-based distance matrix and perform hierarchical + clustering; cut the dendrogram to obtain ``n_clusters`` clusters. + 2. **Intra-cluster step**: for each cluster, compute an optimal portfolio + over the cluster's assets (min-variance, ERC, or equal-weight). + 3. **Meta step**: treat each cluster as a single synthetic asset; compute + the covariance of the cluster-portfolio returns and optimise over the + resulting ``n_clusters`` assets. + 4. Combine: final weight for asset i in cluster c is + ``w_intra[c][i] * w_meta[c]``. + + Instance variables: + + - Inputs + + - ``n_assets`` - int + - ``tickers`` - str list + - ``returns`` - pd.DataFrame or None + - ``cov_matrix`` - pd.DataFrame or None + + - Output: + + - ``weights`` - OrderedDict + - ``clusters`` - dict mapping cluster index → list of tickers + - ``cluster_linkage`` - linkage matrix + + Examples + -------- + .. code-block:: python + + from pypfopt import NCOpt + nco = NCOpt(returns=returns_df) + weights = nco.optimize(n_clusters=4, internal_opt="min_variance", meta_opt="erc") + nco.portfolio_performance(verbose=True) + + References + ---------- + Lopez de Prado, M. (2019). A Robust Estimator of the Efficient Frontier. + *SSRN Working Paper*. https://ssrn.com/abstract=3469961 + """ + + _VALID_OPT = {"min_variance", "erc", "equal"} + + def __init__(self, returns=None, cov_matrix=None): + """ + Parameters + ---------- + returns : pd.DataFrame, optional + Asset historical returns (T × n). + cov_matrix : pd.DataFrame, optional + Covariance matrix (n × n). + + Raises + ------ + ValueError + If neither ``returns`` nor ``cov_matrix`` is provided. + TypeError + If ``returns`` is not a DataFrame. + """ + if returns is None and cov_matrix is None: + raise ValueError("Either returns or cov_matrix must be provided") + if returns is not None and not isinstance(returns, pd.DataFrame): + raise TypeError("returns are not a dataframe") + + self.returns = returns + self.cov_matrix = cov_matrix + self.clusters = None + self.cluster_linkage = None + + tickers = list(cov_matrix.columns) if returns is None else list(returns.columns) + super().__init__(len(tickers), tickers) + + @staticmethod + def _get_n_clusters(n_assets: int) -> int: + """Default cluster count: floor(sqrt(n_assets)), minimum 2.""" + return max(2, int(np.floor(np.sqrt(n_assets)))) + + @staticmethod + def _assign_clusters(linkage_matrix: np.ndarray, n_clusters: int) -> np.ndarray: + """ + Cut a hierarchical dendrogram to obtain *n_clusters* flat clusters. + + Returns an integer array of shape (n_assets,) with cluster labels + starting at 0. + """ + labels = sch.fcluster(linkage_matrix, n_clusters, criterion="maxclust") + return labels - 1 # zero-indexed + + @staticmethod + def _intra_cluster_weights(cov: pd.DataFrame, tickers: list, method: str) -> np.ndarray: + """ + Solve the within-cluster portfolio problem. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + tickers : list + Assets in this cluster. + method : str + One of ``"min_variance"``, ``"erc"``, or ``"equal"``. + + Returns + ------- + np.ndarray + Weight vector of length len(tickers), summing to 1. + """ + n = len(tickers) + if n == 1: + return np.array([1.0]) + + cov_slice = cov.loc[tickers, tickers].values + + if method == "min_variance": + return _min_var_weights(cov_slice) + elif method == "erc": + return _erc_weights_ccd(cov_slice) + else: # equal + return np.ones(n) / n + + def optimize( + self, + n_clusters: int = None, + internal_opt: str = "min_variance", + meta_opt: str = "min_variance", + linkage_method: str = "ward", + ) -> collections.OrderedDict: + """ + Construct the NCO portfolio. + + Parameters + ---------- + n_clusters : int, optional + Number of asset clusters. Defaults to ``floor(sqrt(n_assets))``. + internal_opt : str, optional (default ``"min_variance"``) + Within-cluster optimisation objective. + One of ``"min_variance"``, ``"erc"``, ``"equal"``. + meta_opt : str, optional (default ``"min_variance"``) + Across-cluster (meta) optimisation objective. + One of ``"min_variance"``, ``"erc"``, ``"equal"``. + linkage_method : str, optional (default ``"ward"``) + Scipy hierarchical linkage method. + + Returns + ------- + OrderedDict + Portfolio weights (asset → weight). + + Raises + ------ + ValueError + If ``n_clusters`` exceeds the number of assets, or if an + unrecognised optimisation method is provided. + """ + if internal_opt not in self._VALID_OPT: + raise ValueError("internal_opt must be one of %s" % self._VALID_OPT) + if meta_opt not in self._VALID_OPT: + raise ValueError("meta_opt must be one of %s" % self._VALID_OPT) + if linkage_method not in sch._LINKAGE_METHODS: + raise ValueError("linkage_method must be one recognised by scipy") + + n = self.n_assets + if n_clusters is None: + n_clusters = self._get_n_clusters(n) + if n_clusters > n: + raise ValueError("n_clusters (%d) cannot exceed n_assets (%d)" % (n_clusters, n)) + if n_clusters < 2: + raise ValueError("n_clusters must be >= 2") + + # --- Clustering --- + if self.returns is None: + cov = self.cov_matrix + corr = cov_to_corr(self.cov_matrix).round(6) + else: + corr, cov = self.returns.corr(), self.returns.cov() + + tickers = list(corr.columns) + matrix = np.sqrt(np.clip((1.0 - corr) / 2.0, a_min=0.0, a_max=1.0)) + dist = ssd.squareform(matrix, checks=False) + self.cluster_linkage = sch.linkage(dist, linkage_method) + labels = NCOpt._assign_clusters(self.cluster_linkage, n_clusters) + + # Build cluster → ticker mapping + self.clusters = collections.defaultdict(list) + for i, label in enumerate(labels): + self.clusters[int(label)].append(tickers[i]) + self.clusters = dict(self.clusters) + + # --- Intra-cluster step --- + intra_weights = {} + for c, ctickers in self.clusters.items(): + w_intra = NCOpt._intra_cluster_weights(cov, ctickers, method=internal_opt) + intra_weights[c] = pd.Series(w_intra, index=ctickers) + + # --- Build meta-covariance --- + # meta_cov[i,j] = w_i' Sigma w_j (analytic from full cov) + cluster_keys = sorted(self.clusters.keys()) + n_c = len(cluster_keys) + meta_cov = np.zeros((n_c, n_c)) + for ci, c_i in enumerate(cluster_keys): + for cj, c_j in enumerate(cluster_keys): + w_i = intra_weights[c_i].reindex(tickers).fillna(0.0).values + w_j = intra_weights[c_j].reindex(tickers).fillna(0.0).values + meta_cov[ci, cj] = float(w_i @ cov.values @ w_j) + + # --- Meta optimisation --- + meta_cov_df = pd.DataFrame( + meta_cov, + index=["c%d" % k for k in cluster_keys], + columns=["c%d" % k for k in cluster_keys], + ) + if meta_opt == "min_variance": + w_meta = _min_var_weights(meta_cov) + elif meta_opt == "erc": + w_meta = _erc_weights_ccd(meta_cov) + else: + w_meta = np.ones(n_c) / n_c + w_meta_series = pd.Series(w_meta, index=cluster_keys) + + # --- Combine --- + final_weights = pd.Series(0.0, index=tickers) + for ci, c in enumerate(cluster_keys): + for ticker, w_intra_val in intra_weights[c].items(): + final_weights[ticker] = w_intra_val * float(w_meta_series[c]) + + weights = collections.OrderedDict(final_weights.sort_index()) + self.set_weights(weights) + return weights + + def portfolio_performance( + self, verbose: bool = False, risk_free_rate: float = 0.0, frequency: int = 252 + ): + """ + Calculate (and optionally print) the performance of the NCO portfolio. + + Parameters + ---------- + verbose : bool, optional (default False) + Print the performance summary. + risk_free_rate : float, optional (default 0.0) + Risk-free rate for Sharpe ratio calculation. + frequency : int, optional (default 252) + Number of trading periods per year. + + Returns + ------- + (float, float, float) + Expected return, annual volatility, Sharpe ratio. + """ + if self.returns is None: + cov = self.cov_matrix + mu = None + else: + cov = self.returns.cov() * frequency + mu = self.returns.mean() * frequency + + return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) diff --git a/tests/test_herc_nco.py b/tests/test_herc_nco.py new file mode 100644 index 00000000..b136319b --- /dev/null +++ b/tests/test_herc_nco.py @@ -0,0 +1,410 @@ +""" +Tests for HERCOpt (Hierarchical Equal Risk Contribution) and +NCOpt (Nested Cluster Optimization). +""" +import collections + +import numpy as np +import pandas as pd +import pytest + +from pypfopt import HERCOpt, HRPOpt, NCOpt, CovarianceShrinkage +from pypfopt.hierarchical_portfolio import _erc_weights_ccd, _min_var_weights +from tests.utilities_for_tests import get_data + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def get_returns(): + df = get_data() + return df.pct_change().dropna(how="all") + + +def get_cov(): + df = get_data() + returns = df.pct_change().dropna(how="all") + return returns.cov() + + +# --------------------------------------------------------------------------- +# _erc_weights_ccd +# --------------------------------------------------------------------------- + +class TestERCWeights: + + def test_single_asset(self): + cov = np.array([[0.04]]) + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w, [1.0]) + + def test_two_uncorrelated_equal_vol(self): + """Equal volatility, no correlation → equal ERC weights.""" + cov = np.array([[1.0, 0.0], [0.0, 1.0]]) + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w, [0.5, 0.5], atol=1e-6) + + def test_weights_sum_to_one(self): + rng = np.random.default_rng(42) + A = rng.normal(size=(5, 5)) + cov = A @ A.T + np.eye(5) * 0.1 + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w.sum(), 1.0, atol=1e-8) + + def test_weights_non_negative(self): + rng = np.random.default_rng(7) + A = rng.normal(size=(6, 6)) + cov = A @ A.T + np.eye(6) * 0.5 + w = _erc_weights_ccd(cov) + assert np.all(w >= -1e-8) + + def test_equal_risk_contributions(self): + """All risk contributions should be equal for ERC portfolio.""" + rng = np.random.default_rng(1) + A = rng.normal(size=(4, 4)) + cov = A @ A.T + np.eye(4) + w = _erc_weights_ccd(cov) + rc = w * (cov @ w) + # All RC equal + np.testing.assert_allclose(rc, rc.mean() * np.ones(4), rtol=1e-4) + + def test_lower_vol_asset_gets_higher_weight(self): + """The less volatile asset should receive more weight under ERC.""" + cov = np.array([[0.01, 0.0], [0.0, 0.09]]) # vols 0.1 and 0.3 + w = _erc_weights_ccd(cov) + assert w[0] > w[1] # asset 0 is less risky → higher ERC weight + + +# --------------------------------------------------------------------------- +# _min_var_weights +# --------------------------------------------------------------------------- + +class TestMinVarWeights: + + def test_single_asset(self): + w = _min_var_weights(np.array([[0.04]])) + np.testing.assert_allclose(w, [1.0]) + + def test_sum_to_one(self): + rng = np.random.default_rng(3) + A = rng.normal(size=(5, 5)) + cov = A @ A.T + np.eye(5) + w = _min_var_weights(cov) + np.testing.assert_allclose(w.sum(), 1.0, atol=1e-8) + + def test_non_negative(self): + rng = np.random.default_rng(11) + A = rng.normal(size=(4, 4)) + cov = A @ A.T + np.eye(4) + w = _min_var_weights(cov) + assert np.all(w >= -1e-8) + + def test_known_two_asset(self): + """For two uncorrelated assets, min-var = inverse variance weights.""" + cov = np.array([[1.0, 0.0], [0.0, 4.0]]) # sigmas 1 and 2 + w = _min_var_weights(cov) + expected = np.array([4.0, 1.0]) / 5.0 # inv-var: [1/1, 1/4] / sum + np.testing.assert_allclose(w, expected, atol=1e-6) + + +# --------------------------------------------------------------------------- +# HERCOpt — instantiation errors +# --------------------------------------------------------------------------- + +class TestHERCInstantiation: + + def test_no_args_raises(self): + with pytest.raises(ValueError): + HERCOpt() + + def test_returns_not_dataframe_raises(self): + returns_np = get_returns().values + with pytest.raises(TypeError): + HERCOpt(returns=returns_np) + + def test_invalid_linkage_raises(self): + returns = get_returns() + herc = HERCOpt(returns=returns) + with pytest.raises(ValueError): + herc.optimize(linkage_method="invalid_method") + + def test_from_returns(self): + returns = get_returns() + herc = HERCOpt(returns=returns) + assert herc.n_assets == len(returns.columns) + + def test_from_cov_matrix(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + herc = HERCOpt(cov_matrix=S) + assert herc.n_assets == S.shape[0] + + +# --------------------------------------------------------------------------- +# HERCOpt — portfolio weights +# --------------------------------------------------------------------------- + +class TestHERCPortfolio: + + @pytest.fixture(autouse=True) + def setup(self): + returns = get_returns() + self.herc = HERCOpt(returns=returns) + self.weights = self.herc.optimize(linkage_method="ward") + + def test_weights_is_ordered_dict(self): + assert isinstance(self.weights, (dict, collections.OrderedDict)) + + def test_weights_sum_to_one(self): + np.testing.assert_allclose(sum(self.weights.values()), 1.0, atol=1e-8) + + def test_weights_non_negative(self): + assert all(v >= -1e-8 for v in self.weights.values()) + + def test_all_assets_in_weights(self): + returns = get_returns() + assert set(self.weights.keys()) == set(returns.columns) + + def test_clusters_set_after_optimize(self): + assert self.herc.clusters is not None + + def test_portfolio_performance_returns_tuple(self): + perf = self.herc.portfolio_performance(risk_free_rate=0.02) + assert len(perf) == 3 + + def test_portfolio_performance_cov_only(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + herc = HERCOpt(cov_matrix=S) + herc.optimize() + mu, vol, sharpe = herc.portfolio_performance() + assert mu is None + assert sharpe is None + assert vol > 0 + + def test_ward_and_single_give_different_weights(self): + returns = get_returns() + herc_ward = HERCOpt(returns=returns) + herc_single = HERCOpt(returns=returns) + w_ward = herc_ward.optimize(linkage_method="ward") + w_single = herc_single.optimize(linkage_method="single") + # Not identical + diff = sum(abs(w_ward[k] - w_single[k]) for k in w_ward) + assert diff > 1e-6 + + +# --------------------------------------------------------------------------- +# HERCOpt vs HRPOpt — sanity comparison +# --------------------------------------------------------------------------- + +class TestHERCvsHRP: + + def test_herc_differs_from_hrp(self): + """HERC should produce different weights than HRP.""" + returns = get_returns() + hrp = HRPOpt(returns=returns) + herc = HERCOpt(returns=returns) + w_hrp = hrp.optimize(linkage_method="ward") + w_herc = herc.optimize(linkage_method="ward") + tickers = list(w_hrp.keys()) + diff = sum(abs(w_hrp[t] - w_herc[t]) for t in tickers) + assert diff > 1e-4, "HERC and HRP weights are unexpectedly identical" + + def test_herc_lower_or_equal_portfolio_variance(self): + """HERC uses ERC cluster risk, so its portfolio variance should + not be dramatically worse than HRP's.""" + returns = get_returns() + cov = returns.cov().values + tickers = list(returns.columns) + + hrp = HRPOpt(returns=returns) + w_hrp_dict = hrp.optimize(linkage_method="ward") + w_hrp = np.array([w_hrp_dict[t] for t in tickers]) + + herc = HERCOpt(returns=returns) + w_herc_dict = herc.optimize(linkage_method="ward") + w_herc = np.array([w_herc_dict[t] for t in tickers]) + + var_hrp = float(w_hrp @ cov @ w_hrp) + var_herc = float(w_herc @ cov @ w_herc) + # Both should be well below equal-weight variance + w_eq = np.ones(len(tickers)) / len(tickers) + var_eq = float(w_eq @ cov @ w_eq) + assert var_hrp < var_eq * 1.5 + assert var_herc < var_eq * 1.5 + + +# --------------------------------------------------------------------------- +# NCOpt — instantiation errors +# --------------------------------------------------------------------------- + +class TestNCOInstantiation: + + def test_no_args_raises(self): + with pytest.raises(ValueError): + NCOpt() + + def test_returns_not_dataframe_raises(self): + with pytest.raises(TypeError): + NCOpt(returns=get_returns().values) + + def test_invalid_internal_opt_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="internal_opt"): + nco.optimize(internal_opt="bad_method") + + def test_invalid_meta_opt_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="meta_opt"): + nco.optimize(meta_opt="bad_method") + + def test_invalid_linkage_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError): + nco.optimize(linkage_method="not_a_method") + + def test_n_clusters_exceeds_assets_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="n_assets"): + nco.optimize(n_clusters=9999) + + def test_n_clusters_less_than_2_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="n_clusters"): + nco.optimize(n_clusters=1) + + def test_from_cov_only(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + nco = NCOpt(cov_matrix=S) + assert nco.n_assets == S.shape[0] + + +# --------------------------------------------------------------------------- +# NCOpt — portfolio weights +# --------------------------------------------------------------------------- + +class TestNCOPortfolio: + + @pytest.fixture(autouse=True) + def setup(self): + returns = get_returns() + self.nco = NCOpt(returns=returns) + self.weights = self.nco.optimize(n_clusters=4) + + def test_weights_type(self): + assert isinstance(self.weights, (dict, collections.OrderedDict)) + + def test_weights_sum_to_one(self): + np.testing.assert_allclose(sum(self.weights.values()), 1.0, atol=1e-6) + + def test_weights_non_negative(self): + assert all(v >= -1e-8 for v in self.weights.values()) + + def test_all_tickers_present(self): + returns = get_returns() + assert set(self.weights.keys()) == set(returns.columns) + + def test_clusters_attribute(self): + assert self.nco.clusters is not None + assert len(self.nco.clusters) == 4 + all_assets = [t for tickers in self.nco.clusters.values() for t in tickers] + returns = get_returns() + assert set(all_assets) == set(returns.columns) + + def test_cluster_linkage_attribute(self): + assert self.nco.cluster_linkage is not None + + def test_portfolio_performance(self): + perf = self.nco.portfolio_performance() + assert len(perf) == 3 + assert perf[1] > 0 # volatility always positive + + +# --------------------------------------------------------------------------- +# NCOpt — all method combinations +# --------------------------------------------------------------------------- + +class TestNCOMethodCombinations: + + def _run(self, internal, meta, n_clusters=4): + returns = get_returns() + nco = NCOpt(returns=returns) + w = nco.optimize(n_clusters=n_clusters, internal_opt=internal, meta_opt=meta) + assert abs(sum(w.values()) - 1.0) < 1e-6 + assert all(v >= -1e-8 for v in w.values()) + return w + + def test_min_var_min_var(self): + self._run("min_variance", "min_variance") + + def test_erc_erc(self): + self._run("erc", "erc") + + def test_equal_equal(self): + self._run("equal", "equal") + + def test_min_var_erc(self): + self._run("min_variance", "erc") + + def test_erc_min_var(self): + self._run("erc", "min_variance") + + def test_equal_min_var(self): + self._run("equal", "min_variance") + + def test_different_n_clusters(self): + returns = get_returns() + for k in [2, 3, 5, 10]: + nco = NCOpt(returns=returns) + w = nco.optimize(n_clusters=k) + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + def test_cov_only(self): + """NCO must work with only a covariance matrix (no returns).""" + S = CovarianceShrinkage(get_data()).ledoit_wolf() + nco = NCOpt(cov_matrix=S) + w = nco.optimize(n_clusters=4) + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + def test_default_n_clusters(self): + """Default n_clusters = floor(sqrt(n_assets)) should not raise.""" + returns = get_returns() + nco = NCOpt(returns=returns) + w = nco.optimize() # n_clusters=None → auto + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + +# --------------------------------------------------------------------------- +# NCOpt — performance is meaningfully different from HRP +# --------------------------------------------------------------------------- + +class TestNCOvsHRP: + + def test_nco_differs_from_hrp(self): + returns = get_returns() + hrp = HRPOpt(returns=returns) + nco = NCOpt(returns=returns) + w_hrp = hrp.optimize(linkage_method="ward") + w_nco = nco.optimize(n_clusters=4) + tickers = list(w_hrp.keys()) + diff = sum(abs(w_hrp[t] - w_nco.get(t, 0)) for t in tickers) + assert diff > 1e-4 + + +# --------------------------------------------------------------------------- +# Top-level imports +# --------------------------------------------------------------------------- + +class TestImports: + + def test_herc_importable(self): + from pypfopt import HERCOpt as H + assert H is HERCOpt + + def test_nco_importable(self): + from pypfopt import NCOpt as N + assert N is NCOpt + + def test_both_in_all(self): + import pypfopt + assert "HERCOpt" in pypfopt.__all__ + assert "NCOpt" in pypfopt.__all__ From 6dbfc10a40840645da3af42dab3af7fd07929113 Mon Sep 17 00:00:00 2001 From: hass-nation Date: Mon, 29 Jun 2026 01:20:35 +0100 Subject: [PATCH 2/2] Fix _erc_weights_ccd: replace multiplicative update with Spinu (2013) CCD MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous implementation used Roncalli's multiplicative update, which diverges when the covariance matrix has negative off-diagonal entries because some risk contributions become negative during iterations and are incorrectly clipped before taking the square root. Replace with the Spinu (2013) cyclical coordinate descent, which minimises the unconstrained convex objective f(w) = (1/2) w'Σw − (1/n) Σᵢ log(wᵢ) by solving the exact one-dimensional sub-problem at each coordinate: Σᵢᵢ·wᵢ² + (Σw − Σᵢᵢ·wᵢ)·wᵢ − (1/n) = 0 → positive root Crucially, the weights must NOT be normalised between coordinate updates; normalisation happens once, at the end of each full pass over all assets. This change fixes test_equal_risk_contributions (which uses a covariance matrix with negative off-diagonal entries) and leaves all HERC / NCO tests unchanged (53/53 pass), since the algorithm is equivalent for PD matrices with the results scaled consistently. Co-Authored-By: Claude Sonnet 4.6 --- pypfopt/hierarchical_portfolio.py | 36 +++++++++++++++++++------------ 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/pypfopt/hierarchical_portfolio.py b/pypfopt/hierarchical_portfolio.py index 6f216cf0..d451f029 100644 --- a/pypfopt/hierarchical_portfolio.py +++ b/pypfopt/hierarchical_portfolio.py @@ -252,9 +252,11 @@ def _erc_weights_ccd(cov: np.ndarray, tol: float = 1e-12, max_iter: int = 500) - Finds w ≥ 0, sum(w)=1, such that w_i*(Σw)_i = w_j*(Σw)_j for all i, j — i.e., every asset contributes the same fraction to total portfolio variance. - Uses the multiplicative update of Roncalli (2013): - w_i ← w_i * sqrt(target_budget / marginal_risk_contribution_i) - normalised at each step. + Uses the Spinu (2013) cyclical coordinate descent: at each step, update + w_i to the positive root of Σ_ii·w_i² + (Σw − Σ_ii·w_i)·w_i − 1/n = 0, + then normalise once after all coordinates have been updated. This + unconstrained formulation converges reliably even when the covariance + matrix contains negative off-diagonal entries. Parameters ---------- @@ -269,26 +271,32 @@ def _erc_weights_ccd(cov: np.ndarray, tol: float = 1e-12, max_iter: int = 500) - ------- np.ndarray (n,) ERC weight vector summing to 1. + + References + ---------- + Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. + SSRN working paper. """ n = cov.shape[0] if n == 1: return np.array([1.0]) + b = 1.0 / n # equal risk budget per asset w = np.ones(n) / n for _ in range(max_iter): - Sigma_w = cov @ w - port_var = float(w @ Sigma_w) - rc = w * Sigma_w # risk contributions (unnormalised) - target = port_var / n # equal budget - # Multiplicative update: w_i ← w_i * sqrt(target / rc_i) - w_new = w * np.sqrt(target / np.maximum(rc, 1e-30)) - w_new = np.maximum(w_new, 0.0) - w_new /= w_new.sum() - if np.max(np.abs(w_new - w)) < tol: - w = w_new + w_prev = w.copy() + for i in range(n): + a_ii = float(cov[i, i]) + # Cross term: (Σw)_i excluding asset i's own contribution + cross = float(cov[i] @ w) - a_ii * w[i] + # Positive root of: a_ii·w_i² + cross·w_i - b = 0 + disc = cross * cross + 4.0 * a_ii * b + w[i] = (-cross + np.sqrt(max(disc, 0.0))) / (2.0 * a_ii) + # Convergence check on unnormalised weights; normalise only at end + if np.max(np.abs(w - w_prev)) < tol: break - w = w_new + w /= w.sum() return w