diff --git a/pypfopt/__init__.py b/pypfopt/__init__.py index 249f0e6d..a2595e91 100755 --- a/pypfopt/__init__.py +++ b/pypfopt/__init__.py @@ -11,7 +11,7 @@ EfficientFrontier, EfficientSemivariance, ) -from .hierarchical_portfolio import HRPOpt +from .hierarchical_portfolio import HRPOpt, HERCOpt, NCOpt from .risk_models import CovarianceShrinkage __version__ = "1.6.0" @@ -28,5 +28,7 @@ "EfficientCVaR", "EfficientCDaR", "HRPOpt", + "HERCOpt", + "NCOpt", "CovarianceShrinkage", ] diff --git a/pypfopt/hierarchical_portfolio.py b/pypfopt/hierarchical_portfolio.py index 6dc76d07..d451f029 100644 --- a/pypfopt/hierarchical_portfolio.py +++ b/pypfopt/hierarchical_portfolio.py @@ -10,6 +10,12 @@ - ``HRPOpt`` implements the Hierarchical Risk Parity (HRP) portfolio. Code reproduced with permission from Marcos Lopez de Prado (2016). +- ``HERCOpt`` implements the Hierarchical Equal Risk Contribution (HERC) portfolio + (Raffinot 2018), which uses Equal Risk Contribution weights at each level of the + hierarchy rather than the inverse-variance weights used by HRP. +- ``NCOpt`` implements the Nested Cluster Optimization (NCO) portfolio + (Lopez de Prado 2019), which optimizes within clusters and then across clusters + in a two-level nested procedure. """ import collections @@ -18,6 +24,7 @@ import pandas as pd import scipy.cluster.hierarchy as sch import scipy.spatial.distance as ssd +from scipy.optimize import minimize from pypfopt.base import BaseOptimizer, portfolio_performance from pypfopt.risk_models import cov_to_corr @@ -236,3 +243,588 @@ def portfolio_performance(self, verbose=False, risk_free_rate=0.0, frequency=252 mu = self.returns.mean() * frequency return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) + + +def _erc_weights_ccd(cov: np.ndarray, tol: float = 1e-12, max_iter: int = 500) -> np.ndarray: + """ + Equal Risk Contribution weights via cyclical coordinate descent. + + Finds w ≥ 0, sum(w)=1, such that w_i*(Σw)_i = w_j*(Σw)_j for all i, j — + i.e., every asset contributes the same fraction to total portfolio variance. + + Uses the Spinu (2013) cyclical coordinate descent: at each step, update + w_i to the positive root of Σ_ii·w_i² + (Σw − Σ_ii·w_i)·w_i − 1/n = 0, + then normalise once after all coordinates have been updated. This + unconstrained formulation converges reliably even when the covariance + matrix contains negative off-diagonal entries. + + Parameters + ---------- + cov : np.ndarray + (n, n) covariance matrix (must be PD). + tol : float + Convergence threshold on max change in weights. + max_iter : int + Maximum iterations. + + Returns + ------- + np.ndarray + (n,) ERC weight vector summing to 1. + + References + ---------- + Spinu, F. (2013). An Algorithm for Computing Risk Parity Weights. + SSRN working paper. + """ + n = cov.shape[0] + if n == 1: + return np.array([1.0]) + + b = 1.0 / n # equal risk budget per asset + w = np.ones(n) / n + for _ in range(max_iter): + w_prev = w.copy() + for i in range(n): + a_ii = float(cov[i, i]) + # Cross term: (Σw)_i excluding asset i's own contribution + cross = float(cov[i] @ w) - a_ii * w[i] + # Positive root of: a_ii·w_i² + cross·w_i - b = 0 + disc = cross * cross + 4.0 * a_ii * b + w[i] = (-cross + np.sqrt(max(disc, 0.0))) / (2.0 * a_ii) + # Convergence check on unnormalised weights; normalise only at end + if np.max(np.abs(w - w_prev)) < tol: + break + + w /= w.sum() + return w + + +def _min_var_weights(cov: np.ndarray) -> np.ndarray: + """ + Long-only minimum variance weights. + + Uses the analytic unconstrained solution (Σ⁻¹ 1 / 1ᵀ Σ⁻¹ 1) when all + resulting weights are non-negative; otherwise falls back to SLSQP. + + Parameters + ---------- + cov : np.ndarray + (n, n) covariance matrix. + + Returns + ------- + np.ndarray + (n,) weight vector summing to 1, all ≥ 0. + """ + n = cov.shape[0] + if n == 1: + return np.array([1.0]) + + try: + inv_cov = np.linalg.inv(cov) + ones = np.ones(n) + w = inv_cov @ ones / (ones @ inv_cov @ ones) + if (w >= -1e-8).all(): + w = np.maximum(w, 0.0) + return w / w.sum() + except np.linalg.LinAlgError: + pass + + # Long-only constrained via SLSQP + res = minimize( + lambda w_: float(w_ @ cov @ w_), + np.ones(n) / n, + method="SLSQP", + bounds=[(0.0, 1.0)] * n, + constraints=[{"type": "eq", "fun": lambda w_: w_.sum() - 1.0}], + options={"ftol": 1e-12, "maxiter": 1000}, + ) + w = np.maximum(res.x, 0.0) + return w / w.sum() + + +class HERCOpt(BaseOptimizer): + """ + A HERCOpt object constructs a Hierarchical Equal Risk Contribution (HERC) + portfolio (Raffinot, 2018). + + HERC extends Hierarchical Risk Parity (HRP) by replacing the + inverse-variance weighting with Equal Risk Contribution (ERC) at every + level of the hierarchy. The structure is otherwise identical to HRP: + + 1. Compute the correlation-based distance matrix. + 2. Perform hierarchical clustering. + 3. Order assets by quasi-diagonal traversal. + 4. Recursively bisect, allocating between sub-clusters in proportion to + the **inverse of each cluster's ERC-portfolio variance** (rather than + the inverse-variance-portfolio variance used by HRP). + + Because ERC accounts for within-cluster correlations when computing the + cluster's effective risk, HERC tends to produce more balanced risk + allocations than HRP, especially when intra-cluster correlations are high. + + Instance variables: + + - Inputs + + - ``n_assets`` - int + - ``tickers`` - str list + - ``returns`` - pd.DataFrame or None + - ``cov_matrix`` - pd.DataFrame or None + + - Output: + + - ``weights`` - OrderedDict + - ``clusters`` - linkage matrix + + Examples + -------- + .. code-block:: python + + from pypfopt import HERCOpt + herc = HERCOpt(returns=returns_df) + weights = herc.optimize() + herc.portfolio_performance(verbose=True) + + References + ---------- + Raffinot, T. (2018). Hierarchical clustering-based asset allocation. + *Journal of Portfolio Management*, 44(2), 89-99. + """ + + def __init__(self, returns=None, cov_matrix=None): + """ + Parameters + ---------- + returns : pd.DataFrame, optional + Asset historical returns (T × n). Either ``returns`` or + ``cov_matrix`` must be supplied. + cov_matrix : pd.DataFrame, optional + Covariance matrix of asset returns (n × n). + + Raises + ------ + ValueError + If neither ``returns`` nor ``cov_matrix`` is provided. + TypeError + If ``returns`` is not a DataFrame. + """ + if returns is None and cov_matrix is None: + raise ValueError("Either returns or cov_matrix must be provided") + if returns is not None and not isinstance(returns, pd.DataFrame): + raise TypeError("returns are not a dataframe") + + self.returns = returns + self.cov_matrix = cov_matrix + self.clusters = None + + tickers = list(cov_matrix.columns) if returns is None else list(returns.columns) + super().__init__(len(tickers), tickers) + + @staticmethod + def _get_cluster_var_erc(cov: pd.DataFrame, cluster_items: list) -> float: + """ + Variance of the ERC portfolio built from *cluster_items*. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + cluster_items : list + Asset names in the cluster. + + Returns + ------- + float + Portfolio variance under ERC weights. + """ + cov_slice = cov.loc[cluster_items, cluster_items].values + w = _erc_weights_ccd(cov_slice) + return float(w @ cov_slice @ w) + + @staticmethod + def _raw_herc_allocation(cov: pd.DataFrame, ordered_tickers: list) -> pd.Series: + """ + Recursive bisection allocation using ERC cluster variances. + + Identical in structure to HRP's ``_raw_hrp_allocation``, but calls + ``_get_cluster_var_erc`` instead of ``_get_cluster_var`` so that the + between-cluster split reflects ERC-weighted risk. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + ordered_tickers : list + Tickers ordered by quasi-diagonal traversal. + + Returns + ------- + pd.Series + Portfolio weights indexed by ticker. + """ + w = pd.Series(1.0, index=ordered_tickers) + cluster_items = [ordered_tickers] + + while len(cluster_items) > 0: + cluster_items = [ + i[j:k] + for i in cluster_items + for j, k in ((0, len(i) // 2), (len(i) // 2, len(i))) + if len(i) > 1 + ] + for i in range(0, len(cluster_items), 2): + first_cluster = cluster_items[i] + second_cluster = cluster_items[i + 1] + first_variance = HERCOpt._get_cluster_var_erc(cov, first_cluster) + second_variance = HERCOpt._get_cluster_var_erc(cov, second_cluster) + alpha = 1 - first_variance / (first_variance + second_variance) + w[first_cluster] *= alpha + w[second_cluster] *= 1 - alpha + + return w + + def optimize(self, linkage_method: str = "ward") -> collections.OrderedDict: + """ + Construct the HERC portfolio. + + Parameters + ---------- + linkage_method : str, optional (default ``"ward"``) + Scipy linkage method; any value accepted by + ``scipy.cluster.hierarchy.linkage``. Ward's method is recommended + for HERC as it minimises within-cluster variance at each merge. + + Returns + ------- + OrderedDict + Portfolio weights (asset → weight). + + Raises + ------ + ValueError + If ``linkage_method`` is not recognised by scipy. + """ + if linkage_method not in sch._LINKAGE_METHODS: + raise ValueError("linkage_method must be one recognised by scipy") + + if self.returns is None: + cov = self.cov_matrix + corr = cov_to_corr(self.cov_matrix).round(6) + else: + corr, cov = self.returns.corr(), self.returns.cov() + + matrix = np.sqrt(np.clip((1.0 - corr) / 2.0, a_min=0.0, a_max=1.0)) + dist = ssd.squareform(matrix, checks=False) + + self.clusters = sch.linkage(dist, linkage_method) + sort_ix = HRPOpt._get_quasi_diag(self.clusters) + ordered_tickers = corr.index[sort_ix].tolist() + + herc = HERCOpt._raw_herc_allocation(cov, ordered_tickers) + weights = collections.OrderedDict(herc.sort_index()) + self.set_weights(weights) + return weights + + def portfolio_performance( + self, verbose: bool = False, risk_free_rate: float = 0.0, frequency: int = 252 + ): + """ + Calculate (and optionally print) the performance of the HERC portfolio. + + Parameters + ---------- + verbose : bool, optional (default False) + Print the performance summary. + risk_free_rate : float, optional (default 0.0) + Risk-free rate for Sharpe ratio calculation. + frequency : int, optional (default 252) + Number of trading periods per year. + + Returns + ------- + (float, float, float) + Expected return, annual volatility, Sharpe ratio. + Expected return and Sharpe ratio are ``None`` when only a + covariance matrix was supplied. + """ + if self.returns is None: + cov = self.cov_matrix + mu = None + else: + cov = self.returns.cov() * frequency + mu = self.returns.mean() * frequency + + return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) + + +class NCOpt(BaseOptimizer): + """ + A NCOpt object constructs a Nested Cluster Optimization (NCO) portfolio + (Lopez de Prado, 2019). + + NCO partitions the asset universe into clusters, solves a portfolio + optimisation problem *within* each cluster, and then solves a second + optimisation *across* the resulting cluster-portfolios. This two-level + structure allows a large and correlated asset universe to be decomposed + into smaller, nearly independent subproblems. + + **Algorithm** + + 1. Compute the correlation-based distance matrix and perform hierarchical + clustering; cut the dendrogram to obtain ``n_clusters`` clusters. + 2. **Intra-cluster step**: for each cluster, compute an optimal portfolio + over the cluster's assets (min-variance, ERC, or equal-weight). + 3. **Meta step**: treat each cluster as a single synthetic asset; compute + the covariance of the cluster-portfolio returns and optimise over the + resulting ``n_clusters`` assets. + 4. Combine: final weight for asset i in cluster c is + ``w_intra[c][i] * w_meta[c]``. + + Instance variables: + + - Inputs + + - ``n_assets`` - int + - ``tickers`` - str list + - ``returns`` - pd.DataFrame or None + - ``cov_matrix`` - pd.DataFrame or None + + - Output: + + - ``weights`` - OrderedDict + - ``clusters`` - dict mapping cluster index → list of tickers + - ``cluster_linkage`` - linkage matrix + + Examples + -------- + .. code-block:: python + + from pypfopt import NCOpt + nco = NCOpt(returns=returns_df) + weights = nco.optimize(n_clusters=4, internal_opt="min_variance", meta_opt="erc") + nco.portfolio_performance(verbose=True) + + References + ---------- + Lopez de Prado, M. (2019). A Robust Estimator of the Efficient Frontier. + *SSRN Working Paper*. https://ssrn.com/abstract=3469961 + """ + + _VALID_OPT = {"min_variance", "erc", "equal"} + + def __init__(self, returns=None, cov_matrix=None): + """ + Parameters + ---------- + returns : pd.DataFrame, optional + Asset historical returns (T × n). + cov_matrix : pd.DataFrame, optional + Covariance matrix (n × n). + + Raises + ------ + ValueError + If neither ``returns`` nor ``cov_matrix`` is provided. + TypeError + If ``returns`` is not a DataFrame. + """ + if returns is None and cov_matrix is None: + raise ValueError("Either returns or cov_matrix must be provided") + if returns is not None and not isinstance(returns, pd.DataFrame): + raise TypeError("returns are not a dataframe") + + self.returns = returns + self.cov_matrix = cov_matrix + self.clusters = None + self.cluster_linkage = None + + tickers = list(cov_matrix.columns) if returns is None else list(returns.columns) + super().__init__(len(tickers), tickers) + + @staticmethod + def _get_n_clusters(n_assets: int) -> int: + """Default cluster count: floor(sqrt(n_assets)), minimum 2.""" + return max(2, int(np.floor(np.sqrt(n_assets)))) + + @staticmethod + def _assign_clusters(linkage_matrix: np.ndarray, n_clusters: int) -> np.ndarray: + """ + Cut a hierarchical dendrogram to obtain *n_clusters* flat clusters. + + Returns an integer array of shape (n_assets,) with cluster labels + starting at 0. + """ + labels = sch.fcluster(linkage_matrix, n_clusters, criterion="maxclust") + return labels - 1 # zero-indexed + + @staticmethod + def _intra_cluster_weights(cov: pd.DataFrame, tickers: list, method: str) -> np.ndarray: + """ + Solve the within-cluster portfolio problem. + + Parameters + ---------- + cov : pd.DataFrame + Full covariance matrix. + tickers : list + Assets in this cluster. + method : str + One of ``"min_variance"``, ``"erc"``, or ``"equal"``. + + Returns + ------- + np.ndarray + Weight vector of length len(tickers), summing to 1. + """ + n = len(tickers) + if n == 1: + return np.array([1.0]) + + cov_slice = cov.loc[tickers, tickers].values + + if method == "min_variance": + return _min_var_weights(cov_slice) + elif method == "erc": + return _erc_weights_ccd(cov_slice) + else: # equal + return np.ones(n) / n + + def optimize( + self, + n_clusters: int = None, + internal_opt: str = "min_variance", + meta_opt: str = "min_variance", + linkage_method: str = "ward", + ) -> collections.OrderedDict: + """ + Construct the NCO portfolio. + + Parameters + ---------- + n_clusters : int, optional + Number of asset clusters. Defaults to ``floor(sqrt(n_assets))``. + internal_opt : str, optional (default ``"min_variance"``) + Within-cluster optimisation objective. + One of ``"min_variance"``, ``"erc"``, ``"equal"``. + meta_opt : str, optional (default ``"min_variance"``) + Across-cluster (meta) optimisation objective. + One of ``"min_variance"``, ``"erc"``, ``"equal"``. + linkage_method : str, optional (default ``"ward"``) + Scipy hierarchical linkage method. + + Returns + ------- + OrderedDict + Portfolio weights (asset → weight). + + Raises + ------ + ValueError + If ``n_clusters`` exceeds the number of assets, or if an + unrecognised optimisation method is provided. + """ + if internal_opt not in self._VALID_OPT: + raise ValueError("internal_opt must be one of %s" % self._VALID_OPT) + if meta_opt not in self._VALID_OPT: + raise ValueError("meta_opt must be one of %s" % self._VALID_OPT) + if linkage_method not in sch._LINKAGE_METHODS: + raise ValueError("linkage_method must be one recognised by scipy") + + n = self.n_assets + if n_clusters is None: + n_clusters = self._get_n_clusters(n) + if n_clusters > n: + raise ValueError("n_clusters (%d) cannot exceed n_assets (%d)" % (n_clusters, n)) + if n_clusters < 2: + raise ValueError("n_clusters must be >= 2") + + # --- Clustering --- + if self.returns is None: + cov = self.cov_matrix + corr = cov_to_corr(self.cov_matrix).round(6) + else: + corr, cov = self.returns.corr(), self.returns.cov() + + tickers = list(corr.columns) + matrix = np.sqrt(np.clip((1.0 - corr) / 2.0, a_min=0.0, a_max=1.0)) + dist = ssd.squareform(matrix, checks=False) + self.cluster_linkage = sch.linkage(dist, linkage_method) + labels = NCOpt._assign_clusters(self.cluster_linkage, n_clusters) + + # Build cluster → ticker mapping + self.clusters = collections.defaultdict(list) + for i, label in enumerate(labels): + self.clusters[int(label)].append(tickers[i]) + self.clusters = dict(self.clusters) + + # --- Intra-cluster step --- + intra_weights = {} + for c, ctickers in self.clusters.items(): + w_intra = NCOpt._intra_cluster_weights(cov, ctickers, method=internal_opt) + intra_weights[c] = pd.Series(w_intra, index=ctickers) + + # --- Build meta-covariance --- + # meta_cov[i,j] = w_i' Sigma w_j (analytic from full cov) + cluster_keys = sorted(self.clusters.keys()) + n_c = len(cluster_keys) + meta_cov = np.zeros((n_c, n_c)) + for ci, c_i in enumerate(cluster_keys): + for cj, c_j in enumerate(cluster_keys): + w_i = intra_weights[c_i].reindex(tickers).fillna(0.0).values + w_j = intra_weights[c_j].reindex(tickers).fillna(0.0).values + meta_cov[ci, cj] = float(w_i @ cov.values @ w_j) + + # --- Meta optimisation --- + meta_cov_df = pd.DataFrame( + meta_cov, + index=["c%d" % k for k in cluster_keys], + columns=["c%d" % k for k in cluster_keys], + ) + if meta_opt == "min_variance": + w_meta = _min_var_weights(meta_cov) + elif meta_opt == "erc": + w_meta = _erc_weights_ccd(meta_cov) + else: + w_meta = np.ones(n_c) / n_c + w_meta_series = pd.Series(w_meta, index=cluster_keys) + + # --- Combine --- + final_weights = pd.Series(0.0, index=tickers) + for ci, c in enumerate(cluster_keys): + for ticker, w_intra_val in intra_weights[c].items(): + final_weights[ticker] = w_intra_val * float(w_meta_series[c]) + + weights = collections.OrderedDict(final_weights.sort_index()) + self.set_weights(weights) + return weights + + def portfolio_performance( + self, verbose: bool = False, risk_free_rate: float = 0.0, frequency: int = 252 + ): + """ + Calculate (and optionally print) the performance of the NCO portfolio. + + Parameters + ---------- + verbose : bool, optional (default False) + Print the performance summary. + risk_free_rate : float, optional (default 0.0) + Risk-free rate for Sharpe ratio calculation. + frequency : int, optional (default 252) + Number of trading periods per year. + + Returns + ------- + (float, float, float) + Expected return, annual volatility, Sharpe ratio. + """ + if self.returns is None: + cov = self.cov_matrix + mu = None + else: + cov = self.returns.cov() * frequency + mu = self.returns.mean() * frequency + + return portfolio_performance(self.weights, mu, cov, verbose, risk_free_rate) diff --git a/tests/test_herc_nco.py b/tests/test_herc_nco.py new file mode 100644 index 00000000..b136319b --- /dev/null +++ b/tests/test_herc_nco.py @@ -0,0 +1,410 @@ +""" +Tests for HERCOpt (Hierarchical Equal Risk Contribution) and +NCOpt (Nested Cluster Optimization). +""" +import collections + +import numpy as np +import pandas as pd +import pytest + +from pypfopt import HERCOpt, HRPOpt, NCOpt, CovarianceShrinkage +from pypfopt.hierarchical_portfolio import _erc_weights_ccd, _min_var_weights +from tests.utilities_for_tests import get_data + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def get_returns(): + df = get_data() + return df.pct_change().dropna(how="all") + + +def get_cov(): + df = get_data() + returns = df.pct_change().dropna(how="all") + return returns.cov() + + +# --------------------------------------------------------------------------- +# _erc_weights_ccd +# --------------------------------------------------------------------------- + +class TestERCWeights: + + def test_single_asset(self): + cov = np.array([[0.04]]) + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w, [1.0]) + + def test_two_uncorrelated_equal_vol(self): + """Equal volatility, no correlation → equal ERC weights.""" + cov = np.array([[1.0, 0.0], [0.0, 1.0]]) + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w, [0.5, 0.5], atol=1e-6) + + def test_weights_sum_to_one(self): + rng = np.random.default_rng(42) + A = rng.normal(size=(5, 5)) + cov = A @ A.T + np.eye(5) * 0.1 + w = _erc_weights_ccd(cov) + np.testing.assert_allclose(w.sum(), 1.0, atol=1e-8) + + def test_weights_non_negative(self): + rng = np.random.default_rng(7) + A = rng.normal(size=(6, 6)) + cov = A @ A.T + np.eye(6) * 0.5 + w = _erc_weights_ccd(cov) + assert np.all(w >= -1e-8) + + def test_equal_risk_contributions(self): + """All risk contributions should be equal for ERC portfolio.""" + rng = np.random.default_rng(1) + A = rng.normal(size=(4, 4)) + cov = A @ A.T + np.eye(4) + w = _erc_weights_ccd(cov) + rc = w * (cov @ w) + # All RC equal + np.testing.assert_allclose(rc, rc.mean() * np.ones(4), rtol=1e-4) + + def test_lower_vol_asset_gets_higher_weight(self): + """The less volatile asset should receive more weight under ERC.""" + cov = np.array([[0.01, 0.0], [0.0, 0.09]]) # vols 0.1 and 0.3 + w = _erc_weights_ccd(cov) + assert w[0] > w[1] # asset 0 is less risky → higher ERC weight + + +# --------------------------------------------------------------------------- +# _min_var_weights +# --------------------------------------------------------------------------- + +class TestMinVarWeights: + + def test_single_asset(self): + w = _min_var_weights(np.array([[0.04]])) + np.testing.assert_allclose(w, [1.0]) + + def test_sum_to_one(self): + rng = np.random.default_rng(3) + A = rng.normal(size=(5, 5)) + cov = A @ A.T + np.eye(5) + w = _min_var_weights(cov) + np.testing.assert_allclose(w.sum(), 1.0, atol=1e-8) + + def test_non_negative(self): + rng = np.random.default_rng(11) + A = rng.normal(size=(4, 4)) + cov = A @ A.T + np.eye(4) + w = _min_var_weights(cov) + assert np.all(w >= -1e-8) + + def test_known_two_asset(self): + """For two uncorrelated assets, min-var = inverse variance weights.""" + cov = np.array([[1.0, 0.0], [0.0, 4.0]]) # sigmas 1 and 2 + w = _min_var_weights(cov) + expected = np.array([4.0, 1.0]) / 5.0 # inv-var: [1/1, 1/4] / sum + np.testing.assert_allclose(w, expected, atol=1e-6) + + +# --------------------------------------------------------------------------- +# HERCOpt — instantiation errors +# --------------------------------------------------------------------------- + +class TestHERCInstantiation: + + def test_no_args_raises(self): + with pytest.raises(ValueError): + HERCOpt() + + def test_returns_not_dataframe_raises(self): + returns_np = get_returns().values + with pytest.raises(TypeError): + HERCOpt(returns=returns_np) + + def test_invalid_linkage_raises(self): + returns = get_returns() + herc = HERCOpt(returns=returns) + with pytest.raises(ValueError): + herc.optimize(linkage_method="invalid_method") + + def test_from_returns(self): + returns = get_returns() + herc = HERCOpt(returns=returns) + assert herc.n_assets == len(returns.columns) + + def test_from_cov_matrix(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + herc = HERCOpt(cov_matrix=S) + assert herc.n_assets == S.shape[0] + + +# --------------------------------------------------------------------------- +# HERCOpt — portfolio weights +# --------------------------------------------------------------------------- + +class TestHERCPortfolio: + + @pytest.fixture(autouse=True) + def setup(self): + returns = get_returns() + self.herc = HERCOpt(returns=returns) + self.weights = self.herc.optimize(linkage_method="ward") + + def test_weights_is_ordered_dict(self): + assert isinstance(self.weights, (dict, collections.OrderedDict)) + + def test_weights_sum_to_one(self): + np.testing.assert_allclose(sum(self.weights.values()), 1.0, atol=1e-8) + + def test_weights_non_negative(self): + assert all(v >= -1e-8 for v in self.weights.values()) + + def test_all_assets_in_weights(self): + returns = get_returns() + assert set(self.weights.keys()) == set(returns.columns) + + def test_clusters_set_after_optimize(self): + assert self.herc.clusters is not None + + def test_portfolio_performance_returns_tuple(self): + perf = self.herc.portfolio_performance(risk_free_rate=0.02) + assert len(perf) == 3 + + def test_portfolio_performance_cov_only(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + herc = HERCOpt(cov_matrix=S) + herc.optimize() + mu, vol, sharpe = herc.portfolio_performance() + assert mu is None + assert sharpe is None + assert vol > 0 + + def test_ward_and_single_give_different_weights(self): + returns = get_returns() + herc_ward = HERCOpt(returns=returns) + herc_single = HERCOpt(returns=returns) + w_ward = herc_ward.optimize(linkage_method="ward") + w_single = herc_single.optimize(linkage_method="single") + # Not identical + diff = sum(abs(w_ward[k] - w_single[k]) for k in w_ward) + assert diff > 1e-6 + + +# --------------------------------------------------------------------------- +# HERCOpt vs HRPOpt — sanity comparison +# --------------------------------------------------------------------------- + +class TestHERCvsHRP: + + def test_herc_differs_from_hrp(self): + """HERC should produce different weights than HRP.""" + returns = get_returns() + hrp = HRPOpt(returns=returns) + herc = HERCOpt(returns=returns) + w_hrp = hrp.optimize(linkage_method="ward") + w_herc = herc.optimize(linkage_method="ward") + tickers = list(w_hrp.keys()) + diff = sum(abs(w_hrp[t] - w_herc[t]) for t in tickers) + assert diff > 1e-4, "HERC and HRP weights are unexpectedly identical" + + def test_herc_lower_or_equal_portfolio_variance(self): + """HERC uses ERC cluster risk, so its portfolio variance should + not be dramatically worse than HRP's.""" + returns = get_returns() + cov = returns.cov().values + tickers = list(returns.columns) + + hrp = HRPOpt(returns=returns) + w_hrp_dict = hrp.optimize(linkage_method="ward") + w_hrp = np.array([w_hrp_dict[t] for t in tickers]) + + herc = HERCOpt(returns=returns) + w_herc_dict = herc.optimize(linkage_method="ward") + w_herc = np.array([w_herc_dict[t] for t in tickers]) + + var_hrp = float(w_hrp @ cov @ w_hrp) + var_herc = float(w_herc @ cov @ w_herc) + # Both should be well below equal-weight variance + w_eq = np.ones(len(tickers)) / len(tickers) + var_eq = float(w_eq @ cov @ w_eq) + assert var_hrp < var_eq * 1.5 + assert var_herc < var_eq * 1.5 + + +# --------------------------------------------------------------------------- +# NCOpt — instantiation errors +# --------------------------------------------------------------------------- + +class TestNCOInstantiation: + + def test_no_args_raises(self): + with pytest.raises(ValueError): + NCOpt() + + def test_returns_not_dataframe_raises(self): + with pytest.raises(TypeError): + NCOpt(returns=get_returns().values) + + def test_invalid_internal_opt_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="internal_opt"): + nco.optimize(internal_opt="bad_method") + + def test_invalid_meta_opt_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="meta_opt"): + nco.optimize(meta_opt="bad_method") + + def test_invalid_linkage_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError): + nco.optimize(linkage_method="not_a_method") + + def test_n_clusters_exceeds_assets_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="n_assets"): + nco.optimize(n_clusters=9999) + + def test_n_clusters_less_than_2_raises(self): + nco = NCOpt(returns=get_returns()) + with pytest.raises(ValueError, match="n_clusters"): + nco.optimize(n_clusters=1) + + def test_from_cov_only(self): + S = CovarianceShrinkage(get_data()).ledoit_wolf() + nco = NCOpt(cov_matrix=S) + assert nco.n_assets == S.shape[0] + + +# --------------------------------------------------------------------------- +# NCOpt — portfolio weights +# --------------------------------------------------------------------------- + +class TestNCOPortfolio: + + @pytest.fixture(autouse=True) + def setup(self): + returns = get_returns() + self.nco = NCOpt(returns=returns) + self.weights = self.nco.optimize(n_clusters=4) + + def test_weights_type(self): + assert isinstance(self.weights, (dict, collections.OrderedDict)) + + def test_weights_sum_to_one(self): + np.testing.assert_allclose(sum(self.weights.values()), 1.0, atol=1e-6) + + def test_weights_non_negative(self): + assert all(v >= -1e-8 for v in self.weights.values()) + + def test_all_tickers_present(self): + returns = get_returns() + assert set(self.weights.keys()) == set(returns.columns) + + def test_clusters_attribute(self): + assert self.nco.clusters is not None + assert len(self.nco.clusters) == 4 + all_assets = [t for tickers in self.nco.clusters.values() for t in tickers] + returns = get_returns() + assert set(all_assets) == set(returns.columns) + + def test_cluster_linkage_attribute(self): + assert self.nco.cluster_linkage is not None + + def test_portfolio_performance(self): + perf = self.nco.portfolio_performance() + assert len(perf) == 3 + assert perf[1] > 0 # volatility always positive + + +# --------------------------------------------------------------------------- +# NCOpt — all method combinations +# --------------------------------------------------------------------------- + +class TestNCOMethodCombinations: + + def _run(self, internal, meta, n_clusters=4): + returns = get_returns() + nco = NCOpt(returns=returns) + w = nco.optimize(n_clusters=n_clusters, internal_opt=internal, meta_opt=meta) + assert abs(sum(w.values()) - 1.0) < 1e-6 + assert all(v >= -1e-8 for v in w.values()) + return w + + def test_min_var_min_var(self): + self._run("min_variance", "min_variance") + + def test_erc_erc(self): + self._run("erc", "erc") + + def test_equal_equal(self): + self._run("equal", "equal") + + def test_min_var_erc(self): + self._run("min_variance", "erc") + + def test_erc_min_var(self): + self._run("erc", "min_variance") + + def test_equal_min_var(self): + self._run("equal", "min_variance") + + def test_different_n_clusters(self): + returns = get_returns() + for k in [2, 3, 5, 10]: + nco = NCOpt(returns=returns) + w = nco.optimize(n_clusters=k) + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + def test_cov_only(self): + """NCO must work with only a covariance matrix (no returns).""" + S = CovarianceShrinkage(get_data()).ledoit_wolf() + nco = NCOpt(cov_matrix=S) + w = nco.optimize(n_clusters=4) + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + def test_default_n_clusters(self): + """Default n_clusters = floor(sqrt(n_assets)) should not raise.""" + returns = get_returns() + nco = NCOpt(returns=returns) + w = nco.optimize() # n_clusters=None → auto + np.testing.assert_allclose(sum(w.values()), 1.0, atol=1e-6) + + +# --------------------------------------------------------------------------- +# NCOpt — performance is meaningfully different from HRP +# --------------------------------------------------------------------------- + +class TestNCOvsHRP: + + def test_nco_differs_from_hrp(self): + returns = get_returns() + hrp = HRPOpt(returns=returns) + nco = NCOpt(returns=returns) + w_hrp = hrp.optimize(linkage_method="ward") + w_nco = nco.optimize(n_clusters=4) + tickers = list(w_hrp.keys()) + diff = sum(abs(w_hrp[t] - w_nco.get(t, 0)) for t in tickers) + assert diff > 1e-4 + + +# --------------------------------------------------------------------------- +# Top-level imports +# --------------------------------------------------------------------------- + +class TestImports: + + def test_herc_importable(self): + from pypfopt import HERCOpt as H + assert H is HERCOpt + + def test_nco_importable(self): + from pypfopt import NCOpt as N + assert N is NCOpt + + def test_both_in_all(self): + import pypfopt + assert "HERCOpt" in pypfopt.__all__ + assert "NCOpt" in pypfopt.__all__