From 73871e8183b0d74d5c26483063d653332457b09b Mon Sep 17 00:00:00 2001 From: Enzo Busseti Date: Fri, 22 Sep 2023 02:01:00 +0400 Subject: [PATCH] factor model covariance now takes either Sigma or (F,d) forecasters --- cvxportfolio/forecast.py | 112 +++++++++++------- cvxportfolio/risks.py | 165 ++++++++++++--------------- cvxportfolio/tests/test_simulator.py | 29 +++++ 3 files changed, 171 insertions(+), 135 deletions(-) diff --git a/cvxportfolio/forecast.py b/cvxportfolio/forecast.py index deab4c5ee..987b909eb 100644 --- a/cvxportfolio/forecast.py +++ b/cvxportfolio/forecast.py @@ -22,6 +22,7 @@ from dataclasses import dataclass import numpy as np +import pandas as pd from .estimator import PolicyEstimator @@ -172,44 +173,71 @@ def _online_update(self, t, past_returns): self.last_time = t -## brought back from old commit https://github.com/cvxgrp/cvxportfolio/commit/aa3d2150d12d85a6fb1befdf22cb7967fcc27f30 -## currently unused -def build_low_rank_model(rets, num_factors=10, iters=10, normalize=True, shrink=True): - r"""Build a low rank risk model from past returns that include NaNs. - - This is an experimental procedure that may work well on past returns - matrices with few NaN values (say, below 20% of the total entries). - If there are (many) NaNs, one should probably also use a rather - large risk forecast error. - """ - # rets = past_returns.iloc[:,:-1] # drop cash - nan_fraction = rets.isnull().sum().sum() / np.prod(rets.shape) - normalizer = np.sqrt((rets**2).mean()) - if normalize: - normalized = rets/(normalizer + 1E-8) - else: + +@dataclass(unsafe_hash=True) +class HistoricalLowRankCovarianceSVD(BaseForecast): + + num_factors: int + svd_iters: int = 10 + svd: str = 'numpy' + + # brought back from old commit + # https://github.com/cvxgrp/cvxportfolio/commit/aa3d2150d12d85a6fb1befdf22cb7967fcc27f30 + # matches original 2016 method from example notebooks, with new heuristic for NaNs + @staticmethod + def build_low_rank_model(rets, num_factors=10, iters=10, svd='numpy'): + r"""Build a low rank risk model from past returns that include NaNs. + + This is an experimental procedure that may work well on past returns + matrices with few NaN values (say, below 20% of the total entries). + If there are (many) NaNs, one should probably also use a rather + large risk forecast error. + """ + # rets = past_returns.iloc[:,:-1] # drop cash + nan_fraction = rets.isnull().sum().sum() / np.prod(rets.shape) + normalizer = np.sqrt((rets**2).mean()) + # if normalize: + # normalized = rets/(normalizer + 1E-8) + # else: normalized = rets - if nan_fraction: - if nan_fraction > 0.1 and not shrink: - warnings.warn("Low rank model estimation on past returns with many NaNs should use the `shrink` option") - nan_implicit_imputation = pd.DataFrame(0., columns=normalized.columns, index = normalized.index) - for i in range(iters): - u, s, v = np.linalg.svd(normalized.fillna(nan_implicit_imputation), full_matrices=False) - nan_implicit_imputation = pd.DataFrame( - (u[:, :num_factors] * (s[:num_factors] - s[num_factors] * shrink)) @ v[:num_factors], - columns = normalized.columns, index = normalized.index) - else: - u, s, v = np.linalg.svd(normalized, full_matrices=False) - F = v[:num_factors].T * s[:num_factors] / np.sqrt(len(rets)) - if normalize: - F = pd.DataFrame(F.T * (normalizer.values + 1E-8), columns=normalizer.index) - else: + if nan_fraction: + #if nan_fraction > 0.1 and not shrink: + # warnings.warn("Low rank model estimation on past returns with many NaNs should use the `shrink` option") + nan_implicit_imputation = pd.DataFrame(0., columns=normalized.columns, index = normalized.index) + for i in range(iters): + if svd == 'numpy': + u, s, v = np.linalg.svd(normalized.fillna(nan_implicit_imputation), full_matrices=False) + else: + raise Exception('Currently only numpy svd is implemented') + nan_implicit_imputation = pd.DataFrame( + (u[:, :num_factors] * (s[:num_factors] #- s[num_factors] * shrink + )) @ v[:num_factors], + columns = normalized.columns, index = normalized.index) + else: + u, s, v = np.linalg.svd(normalized, full_matrices=False) + F = v[:num_factors].T * s[:num_factors] / np.sqrt(len(rets)) + #if normalize: + # F = pd.DataFrame(F.T * (normalizer.values + 1E-8), columns=normalizer.index) + #else: F = pd.DataFrame(F.T, columns=normalizer.index) - idyosyncratic = normalizer**2 - (F**2).sum(0) - if not np.all(idyosyncratic >= 0.): - raise ForeCastError("Low rank risk estimation with iterative SVD did not work.") - return F, idyosyncratic + idyosyncratic = normalizer**2 - (F**2).sum(0) + if not np.all(idyosyncratic >= 0.): + raise Exception("Low rank risk estimation with iterative SVD did not work.") + return F.values, idyosyncratic.values + + @online_cache + def _values_in_time(self, past_returns, **kwargs): + return self.build_low_rank_model(past_returns.iloc[:, :-1], + num_factors=self.num_factors, + iters=self.svd_iters, svd=self.svd) + + +def project_on_psd_cone_and_factorize(Sigma): + """Factorize matrix and remove negative eigenvalues.""" + eigval, eigvec = np.linalg.eigh(Sigma) + eigval = np.maximum(eigval, 0.) + return eigvec @ np.diag(np.sqrt(eigval)) @dataclass(unsafe_hash=True) class HistoricalFactorizedCovariance(BaseForecast): @@ -253,12 +281,12 @@ def _get_initial_joint_mean(past_returns): tmp = nonnull.T @ past_returns.iloc[:, :-1].fillna(0.) return tmp # * tmp.T - @staticmethod - def _factorize(Sigma): - """Factorize matrix and remove negative eigenvalues.""" - eigval, eigvec = np.linalg.eigh(Sigma) - eigval = np.maximum(eigval, 0.) - return eigvec @ np.diag(np.sqrt(eigval)) + # @staticmethod + # def _factorize(Sigma): + # """Factorize matrix and remove negative eigenvalues.""" + # eigval, eigvec = np.linalg.eigh(Sigma) + # eigval = np.maximum(eigval, 0.) + # return eigvec @ np.diag(np.sqrt(eigval)) def _initial_compute(self, t, past_returns): self.last_counts_matrix = self._get_count_matrix(past_returns).values @@ -288,4 +316,4 @@ def _values_in_time(self, t, past_returns, **kwargs): tmp = self.joint_mean / self.last_counts_matrix Sigma -= tmp.T * tmp - return self._factorize(Sigma) + return project_on_psd_cone_and_factorize(Sigma) diff --git a/cvxportfolio/risks.py b/cvxportfolio/risks.py index 618bac3b5..7d9bfc3cc 100644 --- a/cvxportfolio/risks.py +++ b/cvxportfolio/risks.py @@ -23,7 +23,9 @@ import pandas as pd from .costs import BaseCost -from .forecast import HistoricalVariance, HistoricalFactorizedCovariance +from .forecast import HistoricalVariance, \ + HistoricalFactorizedCovariance, project_on_psd_cone_and_factorize, \ + HistoricalLowRankCovarianceSVD logger = logging.getLogger(__name__) @@ -42,71 +44,24 @@ class BaseRiskModel(BaseCost): class FullCovariance(BaseRiskModel): - """Quadratic risk model with full covariance matrix. + """Quadratic risk model with full covariance matrix. + + It represents the objective term: + + .. math:: + {(w^+_t - w^\text{b}_t )}^T \Sigma_t (w^+_t - w^\text{b}_t) + + where :math:`w^+_t` and :math:`w^\text{b}_t` are the post-trade + and the benchmark weights, respectively, at time :math:`t`. :param Sigma: DataFrame of covariance matrices - supplied by the user, or None if fitting from the past data. + supplied by the user, or by default Covariance fitted from the past data. The DataFrame can either represents a single constant covariance matrix - or one for each point in time. - :type Sigma: pandas.DataFrame or None - - + or one for each point in time. If it is a class we instantiate + it with default parameters. At each time :math:`t` we project the value of + :math:`\Sigma_t` on the cone of positive semi-definite matrices. + :type Sigma: pandas.DataFrame or Estimator """ - # r"""Quadratic risk model with full covariance matrix. - # - # This class represents the term :math:`\Sigma_t`, *i.e.,* - # the :math:`(n-1) \times (n-1)` positive semi-definite matrix - # which estimates the covariance of the (non-cash) assets' returns. - # :ref:`Optimization-based policies` use this, as is explained - # in Chapter 4 and 5 of the `book `_. - # - # The user can either supply a :class:`pandas.DataFrame` with the covariance matrix - # (constant or varying in time) computed externally (for example - # with some machine learning technique) or let this class estimate the covariance from the data. - # The latter is the default behavior. - # - # This class implements three ways to compute the covariance matrix from the past returns. The - # computation is repeated at each point in time :math:`t` of a :class:`BackTest` using only - # the past returns available at that point: :math:`r_{t-1}, r_{t-2}, \ldots`. - # - # * *rolling covariance*, using :class:`pandas.DataFrame.rolling.cov`. This is done - # if the user specifies the ``rolling`` argument. - # * *exponential moving window covariance*, using :class:`pandas.DataFrame.ewm.cov`. This is done - # if the user specifies the ``halflife`` argument (``rolling`` takes precedence). - # * *full historical covariance*, using :class:`pandas.DataFrame.cov`. This is the default - # behavior if no arguments are specified. - # - # If there are missing data in the historical returns the estimated covariance may not - # be positive semi-definite. We correct it by projecting on the positive semi-definite - # cone (*i.e.*, we set the negative eigenvalues of the resulting :math:`\Sigma_t` to zero). - # - # :param Sigma: :class:`pandas.DataFrame` of covariance matrices - # supplied by the user. The DataFrame either represents a single (constant) covariance matrix - # or one for each point in time. In the latter case the DataFrame must have a :class:`pandas.MultiIndex` - # where the first level is a :class:`pandas.DatetimeIndex`. If ``None`` (the default) - # the covariance matrix is computed from past returns. - # :type Sigma: pandas.DataFrame or None - # :param rolling: if it is not ``None`` the covariance matrix will be estimated - # on a rolling window of size ``rolling`` of the past returns. - # :type rolling: int or None - # :param halflife: if it is not ``None`` the covariance matrix will be estimated - # on an exponential moving window of the past returns with half-life ``halflife``. - # If ``rolling`` is specified it takes precedence over ``halflife``. If both are ``None`` the full history - # will be used for estimation. - # :type halflife: int or None - # :param kappa: the multiplier for the associated forecast error risk - # (see pages 32-33 of the `book `_). - # If ``float`` a passed it is treated as a constant, if ``pandas.Series`` with ``pandas.DateTime`` index - # it varies in time, if ``None`` the forecast error risk term will not be compiled. - # :type kappa: float or pandas.Series or None - # :param kelly: correct the covariance matrix with the term :math:`\mu\mu^T`, as is explained - # in page 28 of the `book `_, - # to match the second term of the Taylor expansion of the portfolio log-return. Default - # is ``False``, corresponding to classical mean-variance optimization. If ``True``, it - # estimates :math:`\mu` with the same technique as :math:`\Sigma`, *i.e.*, with rolling window - # average, exponential moving window average, or an average of the full history. - # :type kelly: bool - # """ def __init__(self, Sigma=HistoricalFactorizedCovariance): @@ -122,15 +77,12 @@ def _pre_evaluation(self, universe, backtest_times): self.Sigma_sqrt = cp.Parameter((len(universe)-1, len(universe)-1)) def _values_in_time(self, t, past_returns, **kwargs): - """Update forecast error risk here, and take square root of Sigma.""" - + if self._alreadyfactorized: self.Sigma_sqrt.value = self.Sigma.current_value else: - Sigma = self.Sigma.current_value - eigval, eigvec = np.linalg.eigh(Sigma) - eigval = np.maximum(eigval, 0.) - self.Sigma_sqrt.value = eigvec @ np.diag(np.sqrt(eigval)) + self.Sigma_sqrt.value = project_on_psd_cone_and_factorize( + self.Sigma.current_value) def _compile_to_cvxpy(self, w_plus, z, w_plus_minus_w_bm): self.cvxpy_expression = cp.sum_squares( @@ -204,9 +156,11 @@ def _compile_to_cvxpy(self, w_plus, z, w_plus_minus_w_bm): class FactorModelCovariance(BaseRiskModel): """Factor model covariance, either user-provided or fitted from the data. - It has the structure - - :math:`F \Sigma_{F} F^T + \mathbf{diag}(d)` + It represents the objective term: + + .. math:: + + {(w^+_t - w^\text{b}_t )}^T (F \Sigma_{F} F^T + \mathbf{diag}(d)) (w^+_t - w^\text{b}_t) where the factors exposure :math:`F` has as many rows as the number of assets and as many columns as the number of factors, @@ -222,50 +176,68 @@ class FactorModelCovariance(BaseRiskModel): the number of factors to the constructor) with a standard PCA of the historical covariance at each point in time of the backtest (only looking at past returns). - :param F: factors exposure matrix either constant or varying in time. If constant + :param F: Factors exposure matrix either constant or varying in time. If constant use a dataframe where the index are the factors and the columns are the asset names. If varying in time use a pandas multiindexed dataframe where the first index level is time and the second level are the factors. The columns should always be the asset names. If None the constructor will default to fit the model from past returns at each point of the backtest. - :type F: pd.DataFrame or None - :param Sigma_F: factors covariance matrix either constant or varying in time. If varying in time + :type F: pandas.DataFrame or None + :param Sigma_F: Factors covariance matrix either constant or varying in time. If varying in time use a multiindexed dataframe where the first index level is time, and the second are the factors (like the columns). If None it is assumed that :math:`Sigma_F` is the identity matrix: Leaving this to None will not trigger automatic fit of the model. You can also have a factors exposure matrix that is fixed in time and a factors covariance that instead changes in time, or the opposite. - :type Sigma_F: pd.DataFrame or None - :param d: idyosyncratic variances either constant or varying in time. If constant use a pandas series, + :type Sigma_F: pandas.DataFrame or None + :param d: Idyosyncratic variances either constant or varying in time. If constant use a pandas series, if varying in time use a pandas dataframe where the index is time and the columns are the asset names. You can have this varying in time and the exposures or the factors covariance fixed, or the opposite. If you leave this to None you will trigger automatic fit of the model. If you wish to have no idyosyncratic variances you can for example just pass 0. - :type d: pd.Series or pd.DataFrame or None - :param num_factors: number of factors (columns of F) that are obtained when fitting the model automatically + :type d: pandas.Series or pandas.DataFrame or None + :param num_factors: Number of factors (columns of F) that are obtained when + fitting the model automatically (otherwise it is ignored). :type num_factors: int - :param kelly: when fitting the model you can perform a PCA on the covariance defined in the standard way - with this equal to False, or use the the quadratic term of the Taylor approximation of the Kelly - gambling objective with this equal to True. The difference is very small and it's better both - computationally and in terms of portfolio performance to leave this to True. - :type kelly: bool + :param Sigma: Only relevant if F or d are None. Same as the parameter + passed to :class:`FullCovariance` (by default, + historical covariance fitted at each point in time). We take its PCA for the low-rank + model, and the remaining factors are used to estimate the diagonal, as + is explained at pages 59-60 of the book. If it is a class, we instantiate + it with default parameters. + :type Sigma: pandas.DataFrame or Estimator + :param F_and_d_Forecaster: Only relevant if F or d are None, and Sigma is None. Forecaster + that at each point in time produces estimate of F and d. By default we use a SVD-based + forecaster that is equivalent to :class:`HistoricalFactorizedCovariance` if there + are no missing values. If you pass a class, it will be instantiated with ``num_factors``. + :type F_and_d_Forecaster: Estimator """ - def __init__(self, F=None, d=None, Sigma_F=None, num_factors=1, kelly=True): + def __init__(self, F=None, d=None, Sigma_F=None, num_factors=1, + Sigma=HistoricalFactorizedCovariance, F_and_d_Forecaster=HistoricalLowRankCovarianceSVD): self.F = F if F is None else DataEstimator(F, compile_parameter=True) self.d = d if d is None else DataEstimator(d) self.Sigma_F = Sigma_F if Sigma_F is None else DataEstimator(Sigma_F, ignore_shape_check=True) if (self.F is None) or (self.d is None): self._fit = True - self.Sigma = HistoricalFactorizedCovariance(kelly=kelly) + if Sigma is None: + if isinstance(F_and_d_Forecaster, type): + F_and_d_Forecaster = F_and_d_Forecaster(num_factors=num_factors) + self.F_and_d_Forecaster = F_and_d_Forecaster + else: + if isinstance(Sigma, type): + Sigma = Sigma() + self._alreadyfactorized = hasattr(Sigma, 'FACTORIZED') \ + and Sigma.FACTORIZED + self.Sigma = DataEstimator(Sigma) + self.num_factors = num_factors else: self._fit = False - self.num_factors = num_factors - + def _pre_evaluation(self, universe, backtest_times): self.idyosync_sqrt_parameter = cp.Parameter(len(universe)-1) - effective_num_factors = min(self.num_factors, len(universe)-1) if self._fit: + effective_num_factors = min(self.num_factors, len(universe)-1) self.F_parameter = cp.Parameter( (effective_num_factors, len(universe)-1)) else: @@ -276,16 +248,23 @@ def _pre_evaluation(self, universe, backtest_times): self.F_parameter = cp.Parameter(self.F.parameter.shape) def _values_in_time(self, t, past_returns, **kwargs): + if self._fit: - Sigmasqrt = self.Sigma.current_value - # numpy eigendecomposition has largest eigenvalues last - self.F_parameter.value = Sigmasqrt[:, -self.num_factors:].T - d = np.sum(Sigmasqrt[:, :-self.num_factors]**2, axis=1) + if hasattr(self, 'F_and_d_Forecaster'): + self.F_parameter.value, d = self.F_and_d_Forecaster.current_value + else: + Sigmasqrt = self.Sigma.current_value if self._alreadyfactorized \ + else project_on_psd_cone_and_factorize( + self.Sigma.current_value) + # numpy eigendecomposition has largest eigenvalues last + self.F_parameter.value = Sigmasqrt[:, -self.num_factors:].T + d = np.sum(Sigmasqrt[:, :-self.num_factors]**2, axis=1) else: d = self.d.current_value if not (self.Sigma_F is None): self.F_parameter.value = ( - self.F.parameter.value.T @ np.linalg.cholesky(self.Sigma_F.current_value)).T + self.F.parameter.value.T @ np.linalg.cholesky( + self.Sigma_F.current_value)).T self.idyosync_sqrt_parameter.value = np.sqrt(d) diff --git a/cvxportfolio/tests/test_simulator.py b/cvxportfolio/tests/test_simulator.py index af73ab7d7..747f0f66b 100644 --- a/cvxportfolio/tests/test_simulator.py +++ b/cvxportfolio/tests/test_simulator.py @@ -779,6 +779,35 @@ def test_cancel_trades(self): sim.backtest(policy, start_time='2023-01-01') + + def test_svd_covariance_forecaster(self): + + sim = cvx.StockMarketSimulator( + ['AAPL', 'MSFT', 'GE', 'ZM', 'META', 'GOOG', 'GLD'], + trading_frequency='quarterly', + base_location=self.datadir) + + objective = cvx.ReturnsForecast() - 5 * cvx.FactorModelCovariance( + num_factors=2, Sigma=None) + policy = cvx.SinglePeriodOptimization( + objective, [cvx.LongOnly(), cvx.LeverageLimit(1)]) + + result_svd = sim.backtest(policy, start_time='2020-01-01', + end_time='2023-09-01') + + objective = cvx.ReturnsForecast() - 5 * cvx.FactorModelCovariance( + num_factors=2) + policy = cvx.SinglePeriodOptimization( + objective, [cvx.LongOnly(), cvx.LeverageLimit(1)]) + + result_eig = sim.backtest(policy, start_time='2020-01-01', + end_time='2023-09-01') + + self.assertTrue(result_svd.sharpe_ratio > result_eig.sharpe_ratio) + + print(result_svd) + print(result_eig) + if __name__ == '__main__':