diff --git a/skchange/costs/multivariate_t_cost.py b/skchange/costs/multivariate_t_cost.py index c034283..94e80a1 100644 --- a/skchange/costs/multivariate_t_cost.py +++ b/skchange/costs/multivariate_t_cost.py @@ -7,13 +7,32 @@ import numpy as np import scipy.stats as st -from scipy.special import polygamma from skchange.costs.base import BaseCost from skchange.costs.utils import CovType, MeanType, check_cov, check_mean from skchange.utils.numba import jit, njit, prange +@njit +def numba_log_gamma(x: float) -> float: + """Compute the log of the gamma function. + + Uses the Stirling's approximation for the gamma function. + Source: https://en.wikipedia.org/wiki/Gamma_function#Log-gamma_function + """ + x_cubed = x * x * x + log_gamma = ( + (x - 0.5) * np.log(x) + - x + + 0.5 * np.log(2.0 * np.pi) + + 1.0 / (12.0 * x) + - 1.0 / (360.0 * x_cubed) + + 1.0 / (1260.0 * x_cubed * x * x) + ) + + return log_gamma + + @njit def numba_digamma(x: float) -> float: """Approximate the digamma function. @@ -193,11 +212,12 @@ def maximum_likelihood_scale_matrix( max_iter=max_iter, reverse_tol=reverse_tol, ) + print(f"Inner scale matrix MLE iterations: {inner_iterations}") return mle_scale_matrix -def _mv_t_ll_at_mle_params( +def _mv_t_ll_at_mle_params_scipy( X: np.ndarray, start: int, end: int, @@ -234,13 +254,118 @@ def _mv_t_ll_at_mle_params( # Compute the MLE scale matrix: mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, t_dof) - # Compute the log likelihood of the segment: + # Compute the log likelihood of the segment using scipy: mv_t_dist = st.multivariate_t(loc=sample_medians, shape=mle_scale_matrix, df=t_dof) log_likelihood = mv_t_dist.logpdf(X_segment).sum() return log_likelihood +@njit +def _multivariate_t_log_likelihood( + scale_matrix, + centered_samples: np.ndarray, + t_dof: float, + log_det_scale_matrix: Union[float, None] = None, + inverse_scale_matrix: Union[np.ndarray, None] = None, +) -> float: + """Calculate the log likelihood of a multivariate t-distribution. + + Directly from the definition of the multivariate t-distribution. + Implementation inspired by the scipy implementation of + the multivariate t-distribution, but simplified. + Source: https://en.wikipedia.org/wiki/Multivariate_t-distribution + + Parameters + ---------- + scale_matrix : np.ndarray + The scale matrix of the multivariate t-distribution. + centered_samples : np.ndarray + The centered samples from the multivariate t-distribution. + t_dof : float + The degrees of freedom of the multivariate t-distribution. + + Returns + ------- + float + The log likelihood of the multivariate t-distribution. + """ + num_samples, sample_dim = centered_samples.shape + # Compute the log likelihood of the segment using numba, but similar to scipy: + + if log_det_scale_matrix is None: + sign_det, log_det_scale_matrix = np.linalg.slogdet(scale_matrix) + if sign_det <= 0: + # TODO, raise a warning here? + return np.nan + + if inverse_scale_matrix is None: + inverse_scale_matrix = np.linalg.solve(scale_matrix, np.eye(sample_dim)) + + mahalonobis_distances = np.sum( + np.dot(centered_samples, inverse_scale_matrix) * centered_samples, axis=1 + ) + + # Normalization constants: + exponent = 0.5 * (t_dof + sample_dim) + A = numba_log_gamma(exponent) + B = numba_log_gamma(0.5 * t_dof) + C = 0.5 * sample_dim * np.log(t_dof * np.pi) + D = 0.5 * log_det_scale_matrix + normalization_contribution = num_samples * (A - B - C - D) + + sample_contributions = -exponent * np.log1p(mahalonobis_distances / t_dof) + + total_log_likelihood = normalization_contribution + sample_contributions.sum() + + return total_log_likelihood + + +@njit +def _mv_t_ll_at_mle_params( + X: np.ndarray, + start: int, + end: int, + t_dof: float, +) -> float: + """Calculate multivariate T log likelihood at the MLE parameters for a segment. + + Parameters + ---------- + X : np.ndarray + Data matrix. Rows are observations and columns are variables. + start : int + Start index of the segment (inclusive). + end : int + End index of the segment (exclusive). + t_dof : float + The degrees of freedom of the multivariate t-distribution. + + Returns + ------- + log_likelihood : float + The log likelihood of the observations in the + interval ``[start, end)`` in the data matrix `X`, + evaluated at the maximum likelihood parameter + estimates for the mean and scale matrix, given + the provided degrees of freedom. + """ + X_segment = X[start:end] + + # Estimate the mean of each dimension through the sample medians: + sample_medians = np.median(X_segment, axis=0) + X_centered = X_segment - sample_medians + + mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, t_dof) + + total_log_likelihood = _multivariate_t_log_likelihood( + scale_matrix=mle_scale_matrix, centered_samples=X_centered, t_dof=t_dof + ) + + return total_log_likelihood + + +@njit def multivariate_t_cost_mle_params( starts: np.ndarray, ends: np.ndarray, X: np.ndarray, mv_t_dof: float ) -> np.ndarray: diff --git a/skchange/costs/tests/test_multivariate_t_cost.py b/skchange/costs/tests/test_multivariate_t_cost.py index 05cbd25..b21c952 100644 --- a/skchange/costs/tests/test_multivariate_t_cost.py +++ b/skchange/costs/tests/test_multivariate_t_cost.py @@ -3,11 +3,12 @@ import numpy as np import pytest import scipy.linalg as sla -from scipy.special import digamma, polygamma -from scipy.stats import multivariate_t +import scipy.stats as st +from scipy.special import digamma, gammaln, polygamma from skchange.costs import MultivariateTCost from skchange.costs.multivariate_t_cost import ( + _multivariate_t_log_likelihood, maximum_likelihood_scale_matrix, ) from skchange.utils.numba import numba_available @@ -203,10 +204,10 @@ def approximate_mv_t_scale_matrix_gradient( scale_matrix_minus = scale_matrix.copy() scale_matrix_minus[i, j] -= epsilon - ll_plus = multivariate_t.logpdf( + ll_plus = st.multivariate_t.logpdf( centered_samples, loc=np.zeros(p), shape=scale_matrix_plus, df=t_dof ).sum() - ll_minus = multivariate_t.logpdf( + ll_minus = st.multivariate_t.logpdf( centered_samples, loc=np.zeros(p), shape=scale_matrix_minus, df=t_dof ).sum() @@ -215,8 +216,87 @@ def approximate_mv_t_scale_matrix_gradient( return grad +def _multivariate_t_log_likelihood_like_scipy( + scale_matrix, centered_samples: np.ndarray, t_dof: float +) -> float: + """Calculate the log likelihood of a multivariate t-distribution. + + Directly from the definition of the multivariate t-distribution. + Implementation taken from the scipy implementation of the multivariate t-distribution. + Source: https://en.wikipedia.org/wiki/Multivariate_t-distribution + + Parameters + ---------- + scale_matrix : np.ndarray + The scale matrix of the multivariate t-distribution. + centered_samples : np.ndarray + The centered samples from the multivariate t-distribution. + t_dof : float + The degrees of freedom of the multivariate t-distribution. + + Returns + ------- + float + The log likelihood of the multivariate t-distribution. + """ + num_samples, sample_dim = centered_samples.shape + # Compute the log likelihood of the segment using numba, but similar to scipy: + scale_spectrum, scale_eigvecs = np.linalg.eigh(scale_matrix) + log_det_scale_matrix = np.sum(np.log(scale_spectrum)) + + scale_sqrt_inv = np.multiply(scale_eigvecs, 1.0 / np.sqrt(scale_spectrum)) + mahalonobis_distances = np.square(np.dot(centered_samples, scale_sqrt_inv)).sum( + axis=-1 + ) + + # Normalization constants: + exponent = 0.5 * (t_dof + sample_dim) + A = gammaln(exponent) + B = gammaln(0.5 * t_dof) + C = 0.5 * sample_dim * np.log(t_dof * np.pi) + D = 0.5 * log_det_scale_matrix + normalization_contribution = num_samples * (A - B - C - D) + + sample_contributions = -exponent * np.log1p(mahalonobis_distances / t_dof) + + total_log_likelihood = normalization_contribution + sample_contributions.sum() + + return total_log_likelihood + + +def test_mv_t_log_likelihood(seed=4125, num_samples=100, p=8, t_dof=5.0): + # TODO: Parametrize and test over wide range of input values. + np.random.seed(seed) + + mv_t_samples = st.multivariate_t(loc=np.zeros(p), shape=np.eye(p), df=t_dof).rvs( + num_samples + ) + + sample_medians = np.median(mv_t_samples, axis=0) + X_centered = mv_t_samples - sample_medians + mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, t_dof) + + ll_scipy = ( + st.multivariate_t(loc=sample_medians, shape=mle_scale_matrix, df=t_dof) + .logpdf(mv_t_samples) + .sum() + ) + + ll_manual_scipy = _multivariate_t_log_likelihood_like_scipy( + scale_matrix=mle_scale_matrix, centered_samples=X_centered, t_dof=t_dof + ) + + ll_simple = _multivariate_t_log_likelihood( + scale_matrix=mle_scale_matrix, centered_samples=X_centered, t_dof=t_dof + ) + ll_differences = np.diff(np.array([ll_scipy, ll_manual_scipy, ll_simple, ll_scipy])) + + np.testing.assert_allclose(ll_differences, 0, atol=1e-4) + + def test_scale_matrix_mle(seed=4125): """Test scale matrix MLE.""" + # TODO: Parametrize and test over wide range of input values. np.random.seed(seed) n_samples = 50 p = 3 @@ -227,9 +307,9 @@ def test_scale_matrix_mle(seed=4125): true_mean = np.arange(p) * (-1 * np.ones(p)).cumprod() - mv_t_samples = multivariate_t(loc=true_mean, shape=true_scale_matrix, df=t_dof).rvs( - n_samples - ) + mv_t_samples = st.multivariate_t( + loc=true_mean, shape=true_scale_matrix, df=t_dof + ).rvs(n_samples) sample_medians = np.median(mv_t_samples, axis=0) centered_samples = mv_t_samples - sample_medians @@ -256,11 +336,11 @@ def test_scale_matrix_mle(seed=4125): ), "True scale matrix gradient is not larger than the MLE gradient." # Assure that we've increased the log-likelihood with the MLE scale matrix: - true_scale_matrix_ll = multivariate_t.logpdf( + true_scale_matrix_ll = st.multivariate_t.logpdf( centered_samples, loc=np.zeros(p), shape=true_scale_matrix, df=t_dof ).sum() - mle_scale_matrix_ll = multivariate_t.logpdf( + mle_scale_matrix_ll = st.multivariate_t.logpdf( centered_samples, loc=np.zeros(p), shape=mle_scale_matrix, df=t_dof ).sum() @@ -291,7 +371,7 @@ def test_scale_matrix_numba_benchmark( random_nudge = np.random.randn(p).reshape(-1, 1) true_scale_matrix = np.eye(p) + 0.5 * random_nudge @ random_nudge.T true_mean = np.arange(p) * (-1 * np.ones(p)).cumprod() - mv_t_samples = multivariate_t( + mv_t_samples = st.multivariate_t( loc=true_mean, shape=true_scale_matrix, df=t_dof ).rvs(n_samples) centered_samples = mv_t_samples - np.median(mv_t_samples, axis=0)