diff --git a/skchange/costs/multivariate_t_cost.py b/skchange/costs/multivariate_t_cost.py index 94e80a1..f803898 100644 --- a/skchange/costs/multivariate_t_cost.py +++ b/skchange/costs/multivariate_t_cost.py @@ -9,8 +9,7 @@ import scipy.stats as st from skchange.costs.base import BaseCost -from skchange.costs.utils import CovType, MeanType, check_cov, check_mean -from skchange.utils.numba import jit, njit, prange +from skchange.utils.numba import njit, prange @njit @@ -127,11 +126,11 @@ def scale_matrix_fixed_point_iteration( effective_num_samples = n - num_zeroed_samples inverse_scale_matrix = np.linalg.solve(scale_matrix, np.eye(p)) - z_scores = np.sum( - np.dot(centered_samples, inverse_scale_matrix) * centered_samples, axis=1 + mahalanobis_squared_distances = np.sum( + (centered_samples @ inverse_scale_matrix) * centered_samples, axis=1 ) - sample_weight = (p + t_dof) / (t_dof + z_scores) + sample_weight = (p + t_dof) / (t_dof + mahalanobis_squared_distances) weighted_samples = centered_samples * sample_weight[:, np.newaxis] reconstructed_scale_matrix = ( @@ -217,57 +216,13 @@ def maximum_likelihood_scale_matrix( return mle_scale_matrix -def _mv_t_ll_at_mle_params_scipy( - X: np.ndarray, - start: int, - end: int, - t_dof: float, -) -> float: - """Calculate multivariate T log likelihood at the MLE parameters for a segment. - - Parameters - ---------- - X : np.ndarray - Data matrix. Rows are observations and columns are variables. - start : int - Start index of the segment (inclusive). - end : int - End index of the segment (exclusive). - t_dof : float - The degrees of freedom of the multivariate t-distribution. - - Returns - ------- - log_likelihood : float - The log likelihood of the observations in the - interval ``[start, end)`` in the data matrix `X`, - evaluated at the maximum likelihood parameter - estimates for the mean and scale matrix, given - the provided degrees of freedom. - """ - X_segment = X[start:end] - - # Estimate the mean of each dimension through the sample medians: - sample_medians = np.median(X_segment, axis=0) - X_centered = X_segment - sample_medians - - # Compute the MLE scale matrix: - mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, t_dof) - - # Compute the log likelihood of the segment using scipy: - mv_t_dist = st.multivariate_t(loc=sample_medians, shape=mle_scale_matrix, df=t_dof) - log_likelihood = mv_t_dist.logpdf(X_segment).sum() - - return log_likelihood - - @njit def _multivariate_t_log_likelihood( - scale_matrix, centered_samples: np.ndarray, - t_dof: float, - log_det_scale_matrix: Union[float, None] = None, + dof: float, + scale_matrix: Union[np.ndarray, None] = None, inverse_scale_matrix: Union[np.ndarray, None] = None, + log_det_scale_matrix: Union[float, None] = None, ) -> float: """Calculate the log likelihood of a multivariate t-distribution. @@ -282,7 +237,7 @@ def _multivariate_t_log_likelihood( The scale matrix of the multivariate t-distribution. centered_samples : np.ndarray The centered samples from the multivariate t-distribution. - t_dof : float + dof : float The degrees of freedom of the multivariate t-distribution. Returns @@ -290,32 +245,42 @@ def _multivariate_t_log_likelihood( float The log likelihood of the multivariate t-distribution. """ + if scale_matrix is None and ( + inverse_scale_matrix is None or log_det_scale_matrix is None + ): + raise ValueError( + "Either the scale matrix by itself, or the inverse scale matrix " + "and log determinant of the scale matrix must be provided." + ) + elif (log_det_scale_matrix is None) ^ (inverse_scale_matrix is None): + raise ValueError( + "Both the log determinant of the scale matrix and the inverse " + "scale matrix must be provided to compute the log likelihood." + ) + num_samples, sample_dim = centered_samples.shape - # Compute the log likelihood of the segment using numba, but similar to scipy: - if log_det_scale_matrix is None: + if log_det_scale_matrix is None and inverse_scale_matrix is None: sign_det, log_det_scale_matrix = np.linalg.slogdet(scale_matrix) if sign_det <= 0: # TODO, raise a warning here? return np.nan - if inverse_scale_matrix is None: inverse_scale_matrix = np.linalg.solve(scale_matrix, np.eye(sample_dim)) - mahalonobis_distances = np.sum( - np.dot(centered_samples, inverse_scale_matrix) * centered_samples, axis=1 + mahalanobis_squared_distances = np.sum( + (centered_samples @ inverse_scale_matrix) * centered_samples, axis=1 ) # Normalization constants: - exponent = 0.5 * (t_dof + sample_dim) + exponent = 0.5 * (dof + sample_dim) A = numba_log_gamma(exponent) - B = numba_log_gamma(0.5 * t_dof) - C = 0.5 * sample_dim * np.log(t_dof * np.pi) + B = numba_log_gamma(0.5 * dof) + C = 0.5 * sample_dim * np.log(dof * np.pi) D = 0.5 * log_det_scale_matrix - normalization_contribution = num_samples * (A - B - C - D) - - sample_contributions = -exponent * np.log1p(mahalonobis_distances / t_dof) + normalization_contribution = num_samples * (A - B - C - D) + sample_contributions = -exponent * np.log1p(mahalanobis_squared_distances / dof) total_log_likelihood = normalization_contribution + sample_contributions.sum() return total_log_likelihood @@ -326,7 +291,7 @@ def _mv_t_ll_at_mle_params( X: np.ndarray, start: int, end: int, - t_dof: float, + dof: float, ) -> float: """Calculate multivariate T log likelihood at the MLE parameters for a segment. @@ -338,7 +303,7 @@ def _mv_t_ll_at_mle_params( Start index of the segment (inclusive). end : int End index of the segment (exclusive). - t_dof : float + dof : float The degrees of freedom of the multivariate t-distribution. Returns @@ -356,10 +321,10 @@ def _mv_t_ll_at_mle_params( sample_medians = np.median(X_segment, axis=0) X_centered = X_segment - sample_medians - mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, t_dof) + mle_scale_matrix = maximum_likelihood_scale_matrix(X_centered, dof) total_log_likelihood = _multivariate_t_log_likelihood( - scale_matrix=mle_scale_matrix, centered_samples=X_centered, t_dof=t_dof + scale_matrix=mle_scale_matrix, centered_samples=X_centered, dof=dof ) return total_log_likelihood @@ -394,7 +359,7 @@ def multivariate_t_cost_mle_params( for i in prange(num_starts): segment_log_likelihood = _mv_t_ll_at_mle_params( - X, starts[i], ends[i], t_dof=mv_t_dof + X, starts[i], ends[i], dof=mv_t_dof ) costs[i, 0] = -2.0 * segment_log_likelihood @@ -405,9 +370,10 @@ def _mv_t_ll_at_fixed_params( X: np.ndarray, start: int, end: int, - mv_t_mean: np.ndarray, - mv_t_scale_matrix: np.ndarray, - mv_t_dof: float, + mean: np.ndarray, + inverse_scale_matrix: np.ndarray, + log_det_scale_matrix: float, + dof: float, ) -> float: """Calculate multivariate T log likelihood at the MLE parameters for a segment. @@ -419,11 +385,13 @@ def _mv_t_ll_at_fixed_params( Start index of the segment (inclusive). end : int End index of the segment (exclusive). - mv_t_mean : np.ndarray + mean : np.ndarray The mean of the multivariate t-distribution. - mv_t_scale_matrix : np.ndarray - The scale matrix of the multivariate t-distribution. - mv_t_dof : float + inverse_scale_matrix : np.ndarray + The inverse of the scale matrix of the multivariate t-distribution. + log_det_scale_matrix : float + The log determinant of the scale matrix of the multivariate t-distribution. + dof : float The degrees of freedom of the multivariate t-distribution. Returns @@ -435,22 +403,27 @@ def _mv_t_ll_at_fixed_params( estimates for the mean and scale matrix, given the provided degrees of freedom. """ - X_segment = X[start:end] + X_centered = X[start:end] - mean # Compute the log likelihood of the segment: - mv_t_dist = st.multivariate_t(loc=mv_t_mean, shape=mv_t_scale_matrix, df=mv_t_dof) - log_likelihood = mv_t_dist.logpdf(X_segment).sum() + total_log_likelihood = _multivariate_t_log_likelihood( + inverse_scale_matrix=inverse_scale_matrix, + log_det_scale_matrix=log_det_scale_matrix, + centered_samples=X_centered, + dof=dof, + ) - return log_likelihood + return total_log_likelihood def multivariate_t_cost_fixed_params( starts: np.ndarray, ends: np.ndarray, X: np.ndarray, - mv_t_mean: np.ndarray, - mv_t_scale_matrix: np.ndarray, - mv_t_dof: float, + mean: np.ndarray, + inverse_scale_matrix: np.ndarray, + log_det_scale_matrix: float, + dof: float, ) -> np.ndarray: """Calculate the multivariate T twice negative log likelihood cost. @@ -481,7 +454,13 @@ def multivariate_t_cost_fixed_params( for i in prange(num_starts): segment_log_likelihood = _mv_t_ll_at_fixed_params( - X, starts[i], ends[i], mv_t_mean, mv_t_scale_matrix, mv_t_dof=mv_t_dof + X, + starts[i], + ends[i], + mean=mean, + inverse_scale_matrix=inverse_scale_matrix, + log_det_scale_matrix=log_det_scale_matrix, + dof=dof, ) costs[i, 0] = -2.0 * segment_log_likelihood diff --git a/skchange/costs/tests/test_multivariate_t_cost.py b/skchange/costs/tests/test_multivariate_t_cost.py index b21c952..b7b6e6a 100644 --- a/skchange/costs/tests/test_multivariate_t_cost.py +++ b/skchange/costs/tests/test_multivariate_t_cost.py @@ -287,7 +287,7 @@ def test_mv_t_log_likelihood(seed=4125, num_samples=100, p=8, t_dof=5.0): ) ll_simple = _multivariate_t_log_likelihood( - scale_matrix=mle_scale_matrix, centered_samples=X_centered, t_dof=t_dof + scale_matrix=mle_scale_matrix, centered_samples=X_centered, dof=t_dof ) ll_differences = np.diff(np.array([ll_scipy, ll_manual_scipy, ll_simple, ll_scipy]))