-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #280 from yongzhengqi/main
Implement Enhanced Indexing as a Portfolio Optimizer
- Loading branch information
Showing
14 changed files
with
624 additions
and
224 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
from .base import RiskModel | ||
from .poet import POETCovEstimator | ||
from .shrink import ShrinkCovEstimator | ||
from .structured import StructuredCovEstimator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
# Copyright (c) Microsoft Corporation. | ||
# Licensed under the MIT License. | ||
|
||
import inspect | ||
import numpy as np | ||
import pandas as pd | ||
from typing import Union | ||
|
||
from qlib.model.base import BaseModel | ||
|
||
|
||
class RiskModel(BaseModel): | ||
"""Risk Model | ||
A risk model is used to estimate the covariance matrix of stock returns. | ||
""" | ||
|
||
MASK_NAN = "mask" | ||
FILL_NAN = "fill" | ||
IGNORE_NAN = "ignore" | ||
|
||
def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True): | ||
""" | ||
Args: | ||
nan_option (str): nan handling option (`ignore`/`mask`/`fill`). | ||
assume_centered (bool): whether the data is assumed to be centered. | ||
scale_return (bool): whether scale returns as percentage. | ||
""" | ||
# nan | ||
assert nan_option in [ | ||
self.MASK_NAN, | ||
self.FILL_NAN, | ||
self.IGNORE_NAN, | ||
], f"`nan_option={nan_option}` is not supported" | ||
self.nan_option = nan_option | ||
|
||
self.assume_centered = assume_centered | ||
self.scale_return = scale_return | ||
|
||
def predict( | ||
self, | ||
X: Union[pd.Series, pd.DataFrame, np.ndarray], | ||
return_corr: bool = False, | ||
is_price: bool = True, | ||
return_decomposed_components=False, | ||
) -> Union[pd.DataFrame, np.ndarray, tuple]: | ||
""" | ||
Args: | ||
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, | ||
with variables as columns and observations as rows. | ||
return_corr (bool): whether return the correlation matrix. | ||
is_price (bool): whether `X` contains price (if not assume stock returns). | ||
return_decomposed_components (bool): whether return decomposed components of the covariance matrix. | ||
Returns: | ||
pd.DataFrame or np.ndarray: estimated covariance (or correlation). | ||
""" | ||
assert ( | ||
not return_corr or not return_decomposed_components | ||
), "Can only return either correlation matrix or decomposed components." | ||
|
||
# transform input into 2D array | ||
if not isinstance(X, (pd.Series, pd.DataFrame)): | ||
columns = None | ||
else: | ||
if isinstance(X.index, pd.MultiIndex): | ||
if isinstance(X, pd.DataFrame): | ||
X = X.iloc[:, 0].unstack(level="instrument") # always use the first column | ||
else: | ||
X = X.unstack(level="instrument") | ||
else: | ||
# X is 2D DataFrame | ||
pass | ||
columns = X.columns # will be used to restore dataframe | ||
X = X.values | ||
|
||
# calculate pct_change | ||
if is_price: | ||
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows | ||
|
||
# scale return | ||
if self.scale_return: | ||
X *= 100 | ||
|
||
# handle nan and centered | ||
X = self._preprocess(X) | ||
|
||
# return decomposed components if needed | ||
if return_decomposed_components: | ||
assert ( | ||
"return_decomposed_components" in inspect.getfullargspec(self._predict).args | ||
), "This risk model does not support return decomposed components of the covariance matrix " | ||
|
||
F, cov_b, var_u = self._predict(X, return_decomposed_components=True) | ||
return F, cov_b, var_u | ||
|
||
# estimate covariance | ||
S = self._predict(X) | ||
|
||
# return correlation if needed | ||
if return_corr: | ||
vola = np.sqrt(np.diag(S)) | ||
corr = S / np.outer(vola, vola) | ||
if columns is None: | ||
return corr | ||
return pd.DataFrame(corr, index=columns, columns=columns) | ||
|
||
# return covariance | ||
if columns is None: | ||
return S | ||
return pd.DataFrame(S, index=columns, columns=columns) | ||
|
||
def _predict(self, X: np.ndarray) -> np.ndarray: | ||
"""covariance estimation implementation | ||
This method should be overridden by child classes. | ||
By default, this method implements the empirical covariance estimation. | ||
Args: | ||
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). | ||
Returns: | ||
np.ndarray: covariance matrix. | ||
""" | ||
xTx = np.asarray(X.T.dot(X)) | ||
N = len(X) | ||
if isinstance(X, np.ma.MaskedArray): | ||
M = 1 - X.mask | ||
N = M.T.dot(M) # each pair has distinct number of samples | ||
return xTx / N | ||
|
||
def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: | ||
"""handle nan and centerize data | ||
Note: | ||
if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`. | ||
""" | ||
# handle nan | ||
if self.nan_option == self.FILL_NAN: | ||
X = np.nan_to_num(X) | ||
elif self.nan_option == self.MASK_NAN: | ||
X = np.ma.masked_invalid(X) | ||
# centralize | ||
if not self.assume_centered: | ||
X = X - np.nanmean(X, axis=0) | ||
return X |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
import numpy as np | ||
|
||
from qlib.model.riskmodel import RiskModel | ||
|
||
|
||
class POETCovEstimator(RiskModel): | ||
"""Principal Orthogonal Complement Thresholding Estimator (POET) | ||
Reference: | ||
[1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. | ||
Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 | ||
[2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m | ||
""" | ||
|
||
THRESH_SOFT = "soft" | ||
THRESH_HARD = "hard" | ||
THRESH_SCAD = "scad" | ||
|
||
def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs): | ||
""" | ||
Args: | ||
num_factors (int): number of factors (if set to zero, no factor model will be used). | ||
thresh (float): the positive constant for thresholding. | ||
thresh_method (str): thresholding method, which can be | ||
- 'soft': soft thresholding. | ||
- 'hard': hard thresholding. | ||
- 'scad': scad thresholding. | ||
kwargs: see `RiskModel` for more information. | ||
""" | ||
super().__init__(**kwargs) | ||
|
||
assert num_factors >= 0, "`num_factors` requires a positive integer" | ||
self.num_factors = num_factors | ||
|
||
assert thresh >= 0, "`thresh` requires a positive float number" | ||
self.thresh = thresh | ||
|
||
assert thresh_method in [ | ||
self.THRESH_HARD, | ||
self.THRESH_SOFT, | ||
self.THRESH_SCAD, | ||
], "`thresh_method` should be `soft`/`hard`/`scad`" | ||
self.thresh_method = thresh_method | ||
|
||
def _predict(self, X: np.ndarray) -> np.ndarray: | ||
|
||
Y = X.T # NOTE: to match POET's implementation | ||
p, n = Y.shape | ||
|
||
if self.num_factors > 0: | ||
Dd, V = np.linalg.eig(Y.T.dot(Y)) | ||
V = V[:, np.argsort(Dd)] | ||
F = V[:, -self.num_factors :][:, ::-1] * np.sqrt(n) | ||
LamPCA = Y.dot(F) / n | ||
uhat = np.asarray(Y - LamPCA.dot(F.T)) | ||
Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) | ||
rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) | ||
else: | ||
uhat = np.asarray(Y) | ||
rate = np.sqrt(np.log(p) / n) | ||
Lowrank = 0 | ||
|
||
lamb = rate * self.thresh | ||
SuPCA = uhat.dot(uhat.T) / n | ||
SuDiag = np.diag(np.diag(SuPCA)) | ||
R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5)) | ||
|
||
if self.thresh_method == self.THRESH_HARD: | ||
M = R * (np.abs(R) > lamb) | ||
elif self.thresh_method == self.THRESH_SOFT: | ||
res = np.abs(R) - lamb | ||
res = (res + np.abs(res)) / 2 | ||
M = np.sign(R) * res | ||
else: | ||
M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) | ||
M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 | ||
M3 = (np.abs(R) >= 3.7 * lamb) * R | ||
M = M1 + M2 + M3 | ||
|
||
Rthresh = M - np.diag(np.diag(M)) + np.eye(p) | ||
SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5) | ||
SigmaY = SigmaU + Lowrank | ||
|
||
return SigmaY |
Oops, something went wrong.