Skip to content

Commit

Permalink
feat: support WLS in PCA statistical risk model
Browse files Browse the repository at this point in the history
  • Loading branch information
gavincyi committed Mar 2, 2023
1 parent 96b2a1b commit 71fc3e2
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 20 deletions.
12 changes: 11 additions & 1 deletion src/fpm_risk_model/factor_risk_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,19 @@ def residual_returns(self) -> ndarray:
"""
return self._residual_returns

def fit(self, X: ndarray) -> object:
def fit(self, X: ndarray, weights: Optional[ndarray] = None) -> object:
"""
Fit the model.
Parameters
----------
X : ndarray
Input array of shape (T, N) where N is the number of
instruments and T is the number of timeframes.
weights: Optional[ndarray]
Weights array of shape (N,) where N is the number of
instruments.
"""
pass

Expand Down
36 changes: 32 additions & 4 deletions src/fpm_risk_model/pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,33 @@
from typing import Any, Dict, Optional

import pandas as pd
from pandas import DataFrame

from ..factor_risk_model import FactorRiskModel
from ..rolling_factor_risk_model import RollingFactorRiskModel


def generate_factor_risk_model(
model: str, data: pd.DataFrame, **kwargs
model: str, data: DataFrame, **kwargs
) -> FactorRiskModel:
"""
Generate factor risk model
Parameters
----------
model : str
Model name supported in statistics module. Supported
value is `pca`.
data: DataFrame
Dataframe of returns of valid instruments, in a dimension
of (T, N) where N is the number of instruments and T is the
of timeframes.
Returns
-------
FactorRiskModel
A fitted factor risk model.
"""
model = model.lower().replace("-", "_")
if model == "pca":
Expand All @@ -29,7 +46,11 @@ def generate_factor_risk_model(


def generate_rolling_factor_risk_model(
model: str, data: pd.DataFrame, model_parameters: Dict[str, Any], **kwargs
model: str,
data: DataFrame,
model_parameters: Dict[str, Any],
weights: Optional[DataFrame] = None,
**kwargs,
) -> RollingFactorRiskModel:
model = model.lower().replace("-", "_")
if model == "pca":
Expand All @@ -39,7 +60,7 @@ def generate_rolling_factor_risk_model(
else:
raise ValueError(f"Model name {model} is not supported")
rolling_model = RollingFactorRiskModel(model=model, **kwargs)
return rolling_model.fit(X=data)
return rolling_model.fit(X=data, weights=weights)


def dump_factor_risk_model(
Expand Down Expand Up @@ -196,7 +217,10 @@ def load_rolling_factor_risk_model(


def where_validity(
validity: pd.DataFrame, data: pd.DataFrame, fillna: Any = None
validity: pd.DataFrame,
data: pd.DataFrame,
fillna: Any = None,
ffill: Optional[bool] = False,
) -> pd.DataFrame:
"""
Return the data for the given universe.
Expand All @@ -211,13 +235,17 @@ def where_validity(
respectively.
fillna: Any
Handle nan values which includes data outside of the universe.
ffill: Optional[bool]
Indicates to forward fill the data. Default is `False`.
Returns
-------
pd.DataFrame
Dataframe containing the data for the given universe.
"""
data = data.reindex_like(validity).where(validity)
if ffill:
data = data.ffill()
if fillna is not None:
data = data.fillna(fillna)
return data
7 changes: 7 additions & 0 deletions src/fpm_risk_model/regressor/wls.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@


class WLS:
"""
Weighted least squares problem solver.
The solver is to run regression with weighted least squares
objective.
"""

def __init__(self, executor: str = "closed"):
"""
Construct
Expand Down
20 changes: 19 additions & 1 deletion src/fpm_risk_model/risk_model.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from abc import ABC, abstractmethod
from typing import Any
from typing import Any, Union

from numpy import ndarray
from pandas import DataFrame, Series

from .config import Config
from .engine import NumpyEngine
Expand Down Expand Up @@ -91,3 +92,20 @@ def asdict(self):
Returns a dict representation of the object.
"""
return self.config.dict()

@staticmethod
def _to_numpy(values: Union[ndarray, DataFrame]) -> ndarray:
"""
Convert the values to a numpy array
"""
if values is None:
return values
elif isinstance(values, (DataFrame, Series)):
return values.values
elif isinstance(values, ndarray):
return values
else:
raise TypeError(
"Expect either pandas DataFrame or numpy array, "
f"but got {values.__class__.__name__}"
)
19 changes: 17 additions & 2 deletions src/fpm_risk_model/rolling_risk_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def items(self) -> Iterable[Tuple[datetime, RiskModel]]:
"""
return self._values.items()

def fit(self, X: DataFrame) -> object:
def fit(self, X: DataFrame, weights: Optional[DataFrame] = None) -> object:
"""
Fit the model.
Expand All @@ -106,6 +106,10 @@ def fit(self, X: DataFrame) -> object:
The instrument returns of which its index and columns
are the date / time and return values.
weights: DataFrame
The weights of the instruments, same dimension as the
instrument returns.
Returns
-------
object
Expand All @@ -132,7 +136,18 @@ def fit(self, X: DataFrame) -> object:
index_name = X.index[end_index - 1]
else:
raise TypeError(f"Invalid type of X {X.__class__.__name__}")
values[index_name] = self._model.fit(X=X_input).copy()

if weights is None:
weights_input = None
elif isinstance(weights, DataFrame):
weights_input = weights.loc[index_name]
else:
raise TypeError(
f"Invalid type of weights {weights.__class__.__name__}"
)
values[index_name] = self._model.fit(
X=X_input, weights=weights_input
).copy()
except Exception as exc:
raise RuntimeError(
f"Failed to fit at the index {index} due to error: {exc}"
Expand Down
23 changes: 11 additions & 12 deletions src/fpm_risk_model/statistical/pca.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Optional, Union

from numpy import ndarray
from pandas import DataFrame
from pandas import DataFrame, Series
from sklearn.decomposition import PCA as sklearn_PCA

from ..factor_risk_model import FactorRiskModel
Expand Down Expand Up @@ -61,7 +61,11 @@ def __init__(
)
self._model = sklearn_PCA(n_components=n_components)

def fit(self, X: Union[ndarray, DataFrame]) -> object:
def fit(
self,
X: Union[ndarray, DataFrame],
weights: Optional[Union[ndarray, Series]] = None,
) -> object:
"""
Fit the returns into the risk model.
Expand All @@ -79,15 +83,8 @@ def fit(self, X: Union[ndarray, DataFrame]) -> object:
The object itself.
"""
# First convert all the numpy ndarray type first
if isinstance(X, DataFrame):
X_fit = X.values
elif isinstance(X, ndarray):
X_fit = X
else:
raise TypeError(
"X must be in numpy ndarray or pandas DataFrame type, "
f"not {X.__class__.__name__}"
)
X_fit = self._to_numpy(X)
weights_fit = self._to_numpy(weights)

# Initialize the engine
eg = self._engine
Expand All @@ -102,6 +99,8 @@ def fit(self, X: Union[ndarray, DataFrame]) -> object:
# Select the instruments of which the returns are not always 0
X_reindex = ~eg.all(eg.abs(X_fit) < 1e-20, axis=0)
X_fit = X_fit[:, X_reindex]
if weights_fit is not None:
weights_fit = weights_fit[X_reindex]

# Fit with skilearn PCA on the return matrix (T, N)
self._model.fit(X_fit)
Expand All @@ -118,7 +117,7 @@ def fit(self, X: Union[ndarray, DataFrame]) -> object:
)
# Factor matrix (T, n)
wls = WLS()
F = wls.fit(X=B.T, y=X_fit.T).T
F = wls.fit(X=B.T, y=X_fit.T, weights=weights_fit).T
# Residual returns (N, T)
residual_returns = X_fit - F @ B

Expand Down

0 comments on commit 71fc3e2

Please sign in to comment.