diff --git a/docs/code/sampling/tempering/README.rst b/docs/code/sampling/tempering/README.rst new file mode 100644 index 000000000..36ae64cfb --- /dev/null +++ b/docs/code/sampling/tempering/README.rst @@ -0,0 +1,2 @@ +Tempering MCMC Examples +^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/docs/code/sampling/tempering/local_reliability_funcs.py b/docs/code/sampling/tempering/local_reliability_funcs.py new file mode 100644 index 000000000..5b78b1e87 --- /dev/null +++ b/docs/code/sampling/tempering/local_reliability_funcs.py @@ -0,0 +1,5 @@ +import numpy as np + + +def correlated_gaussian(samples, b_eff, d): + return [b_eff * np.sqrt(d) - np.sum(samples[i, :]) for i in range(samples.shape[0])] diff --git a/docs/code/sampling/tempering/parallel_tempering.py b/docs/code/sampling/tempering/parallel_tempering.py new file mode 100644 index 000000000..14c5ed317 --- /dev/null +++ b/docs/code/sampling/tempering/parallel_tempering.py @@ -0,0 +1,464 @@ +""" + +Parallel Tempering for Bayesian Inference and Reliability analyses +==================================================================== + +""" + +# %% md +# +# The general framework: one wants to sample from a distribution of the form +# +# .. math:: p_{1}(x)=\dfrac{q_1(x)p_{0}(x)}{Z_{1}} +# +# where :math:`q_{1}(x)` and :math:`p_{0}(x)` can be evaluated; and potentially estimate the constant +# :math:`Z_{1}=\int{q_{1}(x) p_{0}(x)dx}`. Parallel tempering introduces a sequence of intermediate distributions: +# +# .. math:: p_{\beta}(x) \propto q(x, \beta) p_{0}(x) +# +# for values of :math:`\beta` in [0, 1] (note: :math:`\beta` is :math:`1/T` where :math:`T` is often referred as the +# temperature). Setting :math:`\beta=1` equates sampling from the target, while :math:`\beta \rightarrow 0` samples from +# the reference distribution :math:`p_{0}`. Periodically during the run, the different temperatures swap members of +# their ensemble in a way that preserves detailed balance. The chains closer to the reference chain (hot chains) can +# sample from regions that have low probability under the target and thus allow a better exploration of the parameter +# space, while the cold chains can better explore the regions of high likelihood. +# +# The normalizing constant :math:`Z_{1}` is estimated via thermodynamic integration: +# +# .. math:: \ln{Z_{\beta=1}} = \ln{Z_{\beta=0}} + \int_{0}^{1} E_{p_{\beta}} \left[ \frac{\partial \ln{q_{\beta}(x)}}{\partial \beta} \right] d\beta = \ln{Z_{\beta=0}} + \int_{0}^{1} E_{p_{\beta}} \left[ U_{\beta}(x) \right] d\beta +# +# where :math:`\ln{Z_{\beta=0}}=\int{q_{\beta=0}(x) p_{0}(x)dx}` can be determined by simple MC sampling since +# :math:`q_{\beta=0}(x)` is close to the reference distribution :math:`p_{0}`. The function +# :math:`U_{\beta}(x)=\frac{\partial \ln{q_{\beta}(x)}}{\partial \beta}` is called the potential, and can be evaluated +# using posterior samples from :math:`p_{\beta}(x)`. +# +# In the code, the user must define: +# - a function to evaluate the reference distribution :math:`p_{0}(x)`, +# - a function to evaluate the intermediate factor :math:`q(x, \beta)` (function that takes in two inputs: x and +# :math:`\beta`), +# - if evaluation of :math:`Z_{1}` is of interest, a function that evaluates the potential :math:`U_{\beta}(x)`, from +# evaluations of :math:`\ln{(x, \beta)}` which are saved during the MCMC run for the various chains (different +# :math:`\beta` values). +# +# Bayesian inference +# +# In the Bayesian setting, :math:`p_{0}` is the prior and, given a likelihood :math:`L(data; x)`: +# +# .. math:: q_{T}(x) = L(data; x) ^{\beta} +# +# Then for the model evidence: +# +# .. math:: U_{\beta}(x) = \ln{L(data; x)} + + +# %% + + + +import numpy as np +import matplotlib.pyplot as plt + +from UQpy.run_model import RunModel, PythonModel +from UQpy.distributions import MultivariateNormal, JointIndependent, Normal, Uniform + +# %% md +# %% + +from scipy.stats import multivariate_normal, norm, uniform + +# bimodal posterior +mu1 = np.array([1., 1.]) +mu2 = -0.8 * np.ones(2) +w1 = 0.5 +# Width of 0.1 in each dimension +sigma1 = np.diag([0.02, 0.05]) +sigma2 = np.diag([0.05, 0.02]) + +# define prior, likelihood and target (posterior) +prior_distribution = JointIndependent(marginals=[Uniform(loc=-2, scale=4), Uniform(loc=-2, scale=4)]) + + +def log_likelihood(x): + # Posterior is a mixture of two gaussians + return np.logaddexp(np.log(w1) + multivariate_normal.logpdf(x=x, mean=mu1, cov=sigma1), + np.log(1. - w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2)) + + +def log_target(x): + return log_likelihood(x) + prior_distribution.log_pdf(x) + + +# %% md +# + +# estimate evidence +def estimate_evidence_from_prior_samples(size): + samples = -2. + 4 * np.random.uniform(size=size * 2).reshape((size, 2)) + return np.mean(np.exp(log_likelihood(samples))) + + +def func_integration(x1, x2): + x = np.array([x1, x2]).reshape((1, 2)) + return np.exp(log_likelihood(x)) * (1. / 4) ** 2 + + +def estimate_evidence_from_quadrature(): + from scipy.integrate import dblquad + ev = dblquad(func=func_integration, a=-2, b=2, gfun=lambda x: -2, hfun=lambda x: 2) + return ev + + +x = np.arange(-2, 2, 0.02) +y = np.arange(-2, 2, 0.02) +xx, yy = np.meshgrid(x, y) +z = np.exp(log_likelihood(np.concatenate([xx.reshape((-1, 1)), yy.reshape((-1, 1))], axis=-1))) +h = plt.contourf(x, y, z.reshape(xx.shape)) +plt.title('Likelihood') +plt.axis('equal') +plt.show() + +print('Evidence computed analytically = {}'.format(estimate_evidence_from_quadrature()[0])) + +# %% md +# + +from UQpy.sampling.mcmc import MetropolisHastings + +seed = -2. + 4. * np.random.rand(100, 2) +mcmc0 = MetropolisHastings(log_pdf_target=log_target, burn_length=100, jump=3, seed=list(seed), dimension=2, + random_state=123, save_log_pdf=True) +mcmc0.run(nsamples_per_chain=200) + +print(mcmc0.samples.shape) +fig, ax = plt.subplots(ncols=1, figsize=(6, 4)) +ax.scatter(mcmc0.samples[:, 0], mcmc0.samples[:, 1], alpha=0.5) +ax.set_xlim([-2, 2]) +ax.set_ylim([-2, 2]) +plt.show() + + +def estimate_evidence_from_posterior_samples(log_posterior_values, posterior_samples): + log_like = log_likelihood(posterior_samples) # log_posterior_values - log_prior(posterior_samples) + ev = 1. / np.mean(1. / np.exp(log_like)) + return ev + + +evidence = estimate_evidence_from_posterior_samples( + log_posterior_values=mcmc0.log_pdf_values, posterior_samples=mcmc0.samples) +print('Estimated evidence by HM={}'.format(evidence)) + + +# %% md +# + +# %% + +def log_intermediate(x, temper_param): + return temper_param * log_likelihood(x) # + (1. - 1. / temperature) * log_prior(x) + + +# %% md +# + +# %% + +from UQpy.sampling import MetropolisHastings, ParallelTemperingMCMC + +seed = -2. + 4. * np.random.rand(5, 2) +betas = [1. / np.sqrt(2.) ** i for i in range(20 - 1, -1, -1)] +print(len(betas)) +print(betas) + +samplers = [MetropolisHastings(burn_length=100, jump=3, seed=list(seed), dimension=2) for _ in range(len(betas))] +mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, + distribution_reference=prior_distribution, + n_iterations_between_sweeps=10, + tempering_parameters=betas, + random_state=123, + save_log_pdf=True, samplers=samplers) + +mcmc.run(nsamples_per_chain=200) +print(mcmc.samples.shape) +print(mcmc.mcmc_samplers[-1].samples.shape) + +# %% md +# + +# %% + +# the intermediate samples can be accessed via the mcmc_samplers.samples attributes or +# directly via the intermediate_samples attribute +fig, ax = plt.subplots(ncols=3, figsize=(12, 3.5)) +for j, ind in enumerate([0, -6, -1]): + ax[j].scatter(mcmc.mcmc_samplers[ind].samples[:, 0], mcmc.mcmc_samplers[ind].samples[:, 1], alpha=0.5, + color='orange') + # ax[j].scatter(mcmc.intermediate_samples[ind][:, 0], mcmc.intermediate_samples[ind][:, 1], alpha=0.5, + # color='orange') + ax[j].set_xlim([-2, 2]) + ax[j].set_ylim([-2, 2]) + ax[j].set_title(r'$\beta$ = {:.3f}'.format(mcmc.tempering_parameters[ind]), fontsize=16) + ax[j].set_xlabel(r'$\theta_{1}$', fontsize=14) + ax[j].set_ylabel(r'$\theta_{2}$', fontsize=14) +plt.tight_layout() +plt.show() + + +# %% md +# + +# %% + +def compute_potential(x, temper_param, log_intermediate_values): + """ """ + return log_intermediate_values / temper_param + + +# %% md +# + +# %% + +ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_Z0=0.) +print('Estimate of evidence by thermodynamic integration = {:.4f}'.format(ev)) + +ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, nsamples_from_p0=5000) +print('Estimate of evidence by thermodynamic integration = {:.4f}'.format(ev)) + +# %% md +# Reliability +# ------------ +# In a reliability context, :math:`p_{0}` is the pdf of the parameters and we have: +# +# .. math:: q_{\beta}(x) = I_{\beta}(x) = \frac{1}{1 + \exp{ \left( \frac{G(x)}{1/\beta-1}\right)}} +# +# where :math:`G(x)` is the performance function, negative if the system fails, and :math:`I_{\beta}(x)` are smoothed +# versions of the indicator function. Then to compute the probability of failure, the potential can be computed as: +# +# .. math:: U_{\beta}(x) = \frac{- \frac{G(x)}{(1-\beta)^2}}{1 + \exp{ \left( -\frac{G(x)}{1/\beta-1} \right) }} = - \frac{1 - I_{\beta}(x)}{\beta (1 - \beta)} \ln{ \left[ \frac{1 - I_{\beta}(x)}{I_{\beta}(x)} \right] } + +# %% + +from scipy.stats import norm + + +def indic_sigmoid(y, beta): + return 1. / (1. + np.exp(y / (1. / beta - 1.))) + + +fig, ax = plt.subplots(figsize=(4, 3.5)) +ys = np.linspace(-5, 5, 100) +for i, s in enumerate(1. / np.array([1.01, 1.25, 2., 4., 70.])): + ax.plot(ys, indic_sigmoid(y=ys, beta=s), label=r'$\beta={:.2f}$'.format(s), color='blue', alpha=1. - i / 6) +ax.set_xlabel(r'$y=g(\theta)$', fontsize=13) +ax.set_ylabel(r'$q_{\beta}(\theta)=I_{\beta}(y)$', fontsize=13) +# ax.set_title(r'Smooth versions of the indicator function', fontsize=14) +ax.legend(fontsize=8.5) +plt.show() + +# %% md +# + +# %% + +beta = 2 # Specified Reliability Index +rho = 0.7 # Specified Correlation +dim = 2 # Dimension + +# Define the correlation matrix +C = np.ones((dim, dim)) * rho +np.fill_diagonal(C, 1) +print(C) + +# Print information related to the true probability of failure +e, v = np.linalg.eig(np.asarray(C)) +beff = np.sqrt(np.max(e)) * beta +print(beff) +from scipy.stats import norm + +pf_true = norm.cdf(-beta) +print('True pf={}'.format(pf_true)) + + +# %% md +# + +# %% + +def estimate_Pf_0(samples, model_values): + mask = model_values <= 0 + return np.sum(mask) / len(mask) + + +# %% md +# + +# %% + +# Sample from the prior +model = RunModel(model=PythonModel(model_script='local_reliability_funcs.py', model_object_name="correlated_gaussian", + b_eff=beff, d=dim)) +samples = MultivariateNormal(mean=np.zeros((2,)), cov=np.array([[1, 0.7], [0.7, 1]])).rvs(nsamples=20000) +model.run(samples=samples, append_samples=False) +model_values = np.array(model.qoi_list) + +print('Prob. failure (MC) = {}'.format(estimate_Pf_0(samples, model_values))) + +fig, ax = plt.subplots(figsize=(4, 3.5)) +mask = model_values <= 0 +ax.scatter(samples[np.squeeze(mask), 0], samples[np.squeeze(mask), 1], color='red', label='fail', alpha=0.5, marker='d') +ax.scatter(samples[~np.squeeze(mask), 0], samples[~np.squeeze(mask), 1], color='blue', label='safe', alpha=0.5) +plt.axis('equal') +plt.xlabel(r'$\theta_{1}$', fontsize=13) +plt.ylabel(r'$\theta_{2}$', fontsize=13) +ax.legend(fontsize=13) +fig.tight_layout() +plt.show() + +# %% md +# + +# %% + +distribution_reference = MultivariateNormal(mean=np.zeros((2,)), cov=np.array([[1, 0.7], [0.7, 1]])) + + +def log_factor_temp(x, temper_param): + model.run(samples=x, append_samples=False) + G_values = np.array(model.qoi_list) + return np.squeeze(np.log(indic_sigmoid(G_values, temper_param))) + + +# %% md +# + +# %% + +betas = (1. / np.array([1.01, 1.02, 1.05, 1.1, 1.2, 1.5, 2., 3., 5., 10., 25., 70.]))[::-1] + +print(len(betas)) +print(betas) + +fig, ax = plt.subplots(figsize=(5, 4)) +ys = np.linspace(-5, 5, 100) +for i, s in enumerate(betas): + ax.plot(ys, indic_sigmoid(y=ys, beta=s), label=r'$\beta={:.2f}$'.format(s), color='blue', alpha=1. - i / 15) +ax.set_xlabel(r'$y=g(\theta)$', fontsize=13) +ax.set_ylabel(r'$I_{\beta}(y)$', fontsize=13) +ax.set_title(r'Smooth versions of the indicator function', fontsize=14) +ax.legend() +plt.show() + +scales = [0.1 / np.sqrt(beta) for beta in betas] +print(scales) + +# %% md +# + +# %% + +from UQpy.sampling import MetropolisHastings, ParallelTemperingMCMC + +seed = -2. + 4. * np.random.rand(5, 2) + +print(betas) +samplers = [MetropolisHastings(burn_length=5000, jump=5, seed=list(seed), dimension=2, + proposal_is_symmetric=True, + proposal=JointIndependent([Normal(scale=scale)] * 2)) for scale in scales] +mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_factor_temp, + distribution_reference=distribution_reference, + n_iterations_between_sweeps=10, + tempering_parameters=list(betas), + random_state=123, + save_log_pdf=True, samplers=samplers) + +mcmc.run(nsamples_per_chain=250) +print(mcmc.samples.shape) +print(mcmc.mcmc_samplers[0].samples.shape) + +# %% md +# + +# %% + +fig, ax = plt.subplots(ncols=3, figsize=(12, 3.5)) +for j, ind in enumerate([0, 6, -1]): + ax[j].scatter(mcmc.mcmc_samplers[ind].samples[:, 0], mcmc.mcmc_samplers[ind].samples[:, 1], alpha=0.25) + ax[j].set_xlim([-4, 4]) + ax[j].set_ylim([-4, 4]) + ax[j].set_title(r'$\beta$ = {:.3f}'.format(mcmc.tempering_parameters[ind]), fontsize=15) + ax[j].set_xlabel(r'$\theta_{1}$', fontsize=13) + ax[j].set_ylabel(r'$\theta_{2}$', fontsize=13) +fig.tight_layout() +plt.show() + + +# %% md +# + +# %% + +def compute_potential(x, temper_param, log_intermediate_values): + indic_beta = np.exp(log_intermediate_values) + indic_beta = np.where(indic_beta > 1. - 1e-16, 1. - 1e-16, indic_beta) + indic_beta = np.where(indic_beta < 1e-16, 1e-16, indic_beta) + tmp_log = np.log((1. - indic_beta) / indic_beta) + return - (1. - indic_beta) / (temper_param * (1. - temper_param)) * tmp_log + + +# %% md +# + +# %% + +ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_Z0=np.log(0.5)) +print('Estimate of evidence by thermodynamic integration = {}'.format(ev)) + +# %% md +# + +# %% + +plt.plot(mcmc.thermodynamic_integration_results['temper_param_list'], + mcmc.thermodynamic_integration_results['expect_potentials'], marker='x') +plt.grid(True) + +# %% md +# + +# %% + +seed = -2. + 4. * np.random.rand(5, 2) + +samplers = [MetropolisHastings(burn_length=5000, jump=5, seed=list(seed), dimension=2, + proposal_is_symmetric=True, + proposal=JointIndependent([Normal(scale=scale)] * 2)) for scale in scales] +mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_factor_temp, + distribution_reference=distribution_reference, + n_iterations_between_sweeps=10, + tempering_parameters=list(betas), + random_state=123, + save_log_pdf=True, samplers=samplers) + +list_ev_0, list_ev_1 = [], [] +nsamples_per_chain = 0 +for i in range(50): + nsamples_per_chain += 50 + mcmc.run(nsamples_per_chain=nsamples_per_chain) + ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_Z0=np.log(0.5)) + # print(np.exp(log_ev)) + list_ev_0.append(ev) + ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, nsamples_from_p0=100000) + list_ev_1.append(ev) + +# %% md +# + +# %% + +fig, ax = plt.subplots(figsize=(5, 3.5)) +list_samples = [5 * i * 50 for i in range(1, 51)] +ax.plot(list_samples, list_ev_0) +ax.grid(True) +ax.set_ylabel(r'$Z_{1}$ = proba. failure', fontsize=14) +ax.set_xlabel(r'nb. saved samples per chain', fontsize=14) +plt.show() diff --git a/docs/code/sampling/tempering/sequential_tempering.py b/docs/code/sampling/tempering/sequential_tempering.py new file mode 100644 index 000000000..e166fe1f1 --- /dev/null +++ b/docs/code/sampling/tempering/sequential_tempering.py @@ -0,0 +1,283 @@ +""" + +Sequential Tempering for Bayesian Inference and Reliability analyses +==================================================================== +""" + +# %% md +# The general framework: one wants to sample from a distribution of the form +# +# .. math:: p_1 \left( x \right) = \frac{q_1 \left( x \right)p_0 \left( x \right)}{Z_1} +# +# +# where :math:`q_1 \left( x \right)` and :math:`p_0 \left( x \right)` can be evaluated; and potentially estimate the +# constant :math:`Z_1 = \int q_1 \left( x \right)p_0 \left( x \right) dx`. +# +# Sequential tempering introduces a sequence of intermediate distributions: +# +# .. math:: p_{\beta_j} \left( x \right) \propto q \left( x, \beta_j \right)p_0 \left( x \right) +# +# for values of :math:`\beta_j` in :math:`[0, 1]`. The algorithm starts with :math:`\beta_0 = 0`, which samples +# from the reference distribution :math:`p_0`, and ends for some :math:`j = m` such that :math:`\beta_m = 1`, sampling +# from the target. First, a set of sample points is generated from :math:`p_0 = p_{\beta_0}`, and then these are +# resampled according to some weights :math:`w_0` such that after resampling the points follow :math:`p_{\beta_1}`. +# This procedure of resampling is carried out at each intermediate level :math:`j` - resampling the points distributed +# as :math:`p_{\beta_{j}}` according to weights :math:`w_{j}` such that after resampling, the points are distributed +# according to :math:`p_{\beta_{j+1}}`. As the points are sequentially resampled to follow each intermediate +# distribution, eventually they are resampled from :math:`p_{\beta_{m-1}}` to follow :math:`p_{\beta_{m}} = p_1`. +# +# The weights are calculated as +# +# .. math:: w_j = \frac{q \left( x, \beta_{j+1} \right)}{q \left( x, \beta_j \right)} +# +# The normalizing constant is calculated during the generation of samples, as +# +# .. math:: Z_1 = \prod_{j = 0}^{m-1} \left\{ \frac{\sum_{i = 1}^{N_j} w_j}{N_j} \right\} +# +# where :math:`N_j` is the number of sample points generated from the intermediate distribution :math:`p_{\beta_j}`. + +# %% + +# %% md +# Bayesian Inference +# ------------------- +# +# In the Bayesian setting, :math:`p_0` is the prior, and :math:`q \left( x, \beta_j \right) = \mathcal{L}\left( data, x \right) ^{\beta_j}` + +# %% + +from UQpy.run_model import RunModel, PythonModel +import numpy as np +from UQpy.distributions import Uniform, Normal, JointIndependent, MultivariateNormal +from UQpy.sampling import SequentialTemperingMCMC +import matplotlib.pyplot as plt +from scipy.stats import multivariate_normal, norm, uniform +from UQpy.sampling.mcmc import * + + +# %% md +# + +# %% + +def likelihood(x, b): + mu1 = np.array([1., 1.]) + mu2 = -0.8 * np.ones(2) + w1 = 0.5 + # Width of 0.1 in each dimension + sigma1 = np.diag([0.02, 0.05]) + sigma2 = np.diag([0.05, 0.02]) + + # Posterior is a mixture of two gaussians + like = np.exp(np.logaddexp(np.log(w1) + multivariate_normal.logpdf(x=x, mean=mu1, cov=sigma1), + np.log(1. - w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2))) + return like ** b + + +prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + + +# %% md +# + +# %% + +# estimate evidence +def estimate_evidence_from_prior_samples(size): + samples = -2. + 4 * np.random.uniform(size=size * 2).reshape((size, 2)) + return np.mean(likelihood(samples, 1.0)) + + +def func_integration(x1, x2): + x = np.array([x1, x2]).reshape((1, 2)) + return likelihood(x, 1.0) * (1. / 4) ** 2 + + +def estimate_evidence_from_quadrature(): + from scipy.integrate import dblquad + ev = dblquad(func=func_integration, a=-2, b=2, gfun=lambda x: -2, hfun=lambda x: 2) + return ev + + +x = np.arange(-2, 2, 0.02) +y = np.arange(-2, 2, 0.02) +xx, yy = np.meshgrid(x, y) +z = likelihood(np.concatenate([xx.reshape((-1, 1)), yy.reshape((-1, 1))], axis=-1), 1.0) +h = plt.contourf(x, y, z.reshape(xx.shape)) +plt.title('Likelihood') +plt.axis('equal') +plt.show() + +# for nMC in [50000, 100000, 500000, 1000000]: +# print('Evidence = {}'.format(estimate_evidence_from_prior_samples(nMC))) +print('Evidence computed analytically = {}'.format(estimate_evidence_from_quadrature()[0])) + +# %% md +# + +# %% +sampler = MetropolisHastings(dimension=2, n_chains=20) +test = SequentialTemperingMCMC(pdf_intermediate=likelihood, + distribution_reference=prior, + save_intermediate_samples=True, + percentage_resampling=10, + sampler=sampler, + nsamples=4000) + +# %% md +# + +# %% + +print('Normalizing Constant = ' + str(test.evidence)) +print('Tempering Parameters = ' + str(test.tempering_parameters)) + +plt.figure() +plt.scatter(test.intermediate_samples[0][:, 0], test.intermediate_samples[0][:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[0])) +plt.show() + +plt.figure() +plt.scatter(test.intermediate_samples[2][:, 0], test.intermediate_samples[2][:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[2])) +plt.show() + +plt.figure() +plt.scatter(test.samples[:, 0], test.samples[:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[-1])) +plt.show() + +# %% md +# Reliability +# ------------------- +# +# In the reliability context, :math:`p_0` is the pdf of the parameters, and +# +# .. math:: q \left( x, \beta_j \right) = I_{\beta_j} \left( x \right) = \frac{1}{1 + \exp{\left( \frac{G \left( x \right)}{\frac{1}{\beta_j} - 1} \right)}} +# +# where :math:`G \left( x \right)` is the performance function, negative if the system fails, and :math:`I_{\beta_j} \left( x \right)` are smoothed versions of the indicator function. + +# %% + +from scipy.stats import norm + + +def indic_sigmoid(y, beta): + return 1. / (1. + np.exp(y / (1. / beta - 1.))) + + +fig, ax = plt.subplots(figsize=(4, 3.5)) +ys = np.linspace(-5, 5, 100) +for i, s in enumerate(1. / np.array([1.01, 1.25, 2., 4., 70.])): + ax.plot(ys, indic_sigmoid(y=ys, beta=s), label=r'$\beta={:.2f}$'.format(s), color='blue', alpha=1. - i / 6) +ax.set_xlabel(r'$y=g(\theta)$', fontsize=13) +ax.set_ylabel(r'$q_{\beta}(\theta)=I_{\beta}(y)$', fontsize=13) +# ax.set_title(r'Smooth versions of the indicator function', fontsize=14) +ax.legend(fontsize=8.5) +plt.show() + +# %% md +# + +# %% + +beta = 2 # Specified Reliability Index +rho = 0.7 # Specified Correlation +dim = 2 # Dimension + +# Define the correlation matrix +C = np.ones((dim, dim)) * rho +np.fill_diagonal(C, 1) +print(C) + +# Print information related to the true probability of failure +e, v = np.linalg.eig(np.asarray(C)) +beff = np.sqrt(np.max(e)) * beta +print(beff) +from scipy.stats import norm + +pf_true = norm.cdf(-beta) +print('True pf={}'.format(pf_true)) + + +# %% md +# + +# %% + + +def estimate_Pf_0(samples, model_values): + mask = model_values <= 0 + return np.sum(mask) / len(mask) + + +model = RunModel(model=PythonModel(model_script='local_reliability_funcs.py', model_object_name="correlated_gaussian", + b_eff=beff, d=dim)) +samples = MultivariateNormal(mean=np.zeros((2,)), cov=np.array([[1, 0.7], [0.7, 1]])).rvs(nsamples=20000) +model.run(samples=samples, append_samples=False) +model_values = np.array(model.qoi_list) + +print('Prob. failure (MC) = {}'.format(estimate_Pf_0(samples, model_values))) + +fig, ax = plt.subplots(figsize=(4, 3.5)) +mask = np.squeeze(model_values <= 0) +ax.scatter(samples[mask, 0], samples[mask, 1], color='red', label='fail', alpha=0.5, marker='d') +ax.scatter(samples[~mask, 0], samples[~mask, 1], color='blue', label='safe', alpha=0.5) +plt.axis('equal') +# plt.title('Failure domain for reliability problem', fontsize=14) +plt.xlabel(r'$\theta_{1}$', fontsize=13) +plt.ylabel(r'$\theta_{2}$', fontsize=13) +ax.legend(fontsize=13) +fig.tight_layout() +plt.show() + + +# %% md +# + +# %% + +def indic_sigmoid(y, b): + return 1.0 / (1.0 + np.exp((y * b) / (1.0 - b))) + + +def factor_param(x, b): + model.run(samples=x, append_samples=False) + G_values = np.array(model.qoi_list) + return np.squeeze(indic_sigmoid(G_values, b)) + + +prior = MultivariateNormal(mean=np.zeros((2,)), cov=C) + +sampler = MetropolisHastings(dimension=2, n_chains=20) +test = SequentialTemperingMCMC(pdf_intermediate=factor_param, + distribution_reference=prior, + save_intermediate_samples=True, + percentage_resampling=10, + random_state=960242069, + sampler=sampler, + nsamples=3000) + + +# %% md +# + +# %% + +print('Estimated Probability of Failure = ' + str(test.evidence)) +print('Tempering Parameters = ' + str(test.tempering_parameters)) + +plt.figure() +plt.scatter(test.intermediate_samples[0][:, 0], test.intermediate_samples[0][:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[0])) +plt.show() + +plt.figure() +plt.scatter(test.intermediate_samples[2][:, 0], test.intermediate_samples[2][:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[2])) +plt.show() + +plt.figure() +plt.scatter(test.samples[:, 0], test.samples[:, 1]) +plt.title(r'$\beta = $' + str(test.tempering_parameters[-1])) +plt.show() diff --git a/docs/source/bibliography.bib b/docs/source/bibliography.bib index fd51140c8..c3f85d8b7 100644 --- a/docs/source/bibliography.bib +++ b/docs/source/bibliography.bib @@ -828,6 +828,29 @@ @article{saltelli_2002 author = {Andrea Saltelli}, keywords = {Sensitivity analysis, Sensitivity measures, Sensitivity indices, Importance measures}, } + +@article{PTMCMC1, +title = {Parallel Tempering: Theory, Applications, and New Perspectives}, +author = {David J. Earl, Michael W. Deem}, +year = {2005}, +doi = {https://doi.org/10.48550/arXiv.physics/0508111} +} + +@inproceedings{PTMCMC2, +title = {Using Thermodynamic Integration to Calculate the Posterior Probability in Bayesian Model Selection Problems}, +booktitle = {AIP Conference Proceedings 707, 59}, +year = {2004}, +doi = {https://doi.org/10.1063/1.1751356}, +author = {Paul M. Goggans and Ying Chi} +} + +@article{STMCMC_ChingChen, +title = {Transitional Markov Chain Monte Carlo Method for Bayesian Model Updating, Model Class Selection, and Model Averaging}, +author = {Jianye Ching, Yi-Chu Chen}, +year = {2007}, +doi = {https://doi.org/10.1061/(ASCE)0733-9399(2007)133:7(816)} +} + @article{Kle2D, title = {Simulation of multi-dimensional random fields by Karhunen–Loève expansion}, journal = {Computer Methods in Applied Mechanics and Engineering}, diff --git a/docs/source/conf.py b/docs/source/conf.py index 0fe8f1b3c..c4230d73b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -82,6 +82,7 @@ "../code/sampling/monte_carlo", "../code/sampling/latin_hypercube", "../code/sampling/mcmc", + "../code/sampling/tempering", "../code/sampling/simplex", "../code/sampling/true_stratified_sampling", "../code/sampling/refined_stratified_sampling", @@ -122,6 +123,7 @@ "auto_examples/sampling/monte_carlo", "auto_examples/sampling/latin_hypercube", "auto_examples/sampling/mcmc", + "auto_examples/sampling/tempering", "auto_examples/sampling/simplex", "auto_examples/sampling/true_stratified_sampling", "auto_examples/sampling/refined_stratified_sampling", diff --git a/docs/source/sampling/mcmc/index.rst b/docs/source/sampling/mcmc/index.rst index 7ba4f66ae..a54e0f392 100644 --- a/docs/source/sampling/mcmc/index.rst +++ b/docs/source/sampling/mcmc/index.rst @@ -75,6 +75,7 @@ List of MCMC algorithms DRAM DREAM Stretch + Tempering MCMC Adding New MCMC Algorithms diff --git a/docs/source/sampling/mcmc/tempering.rst b/docs/source/sampling/mcmc/tempering.rst new file mode 100644 index 000000000..d81751a06 --- /dev/null +++ b/docs/source/sampling/mcmc/tempering.rst @@ -0,0 +1,76 @@ +Tempering MCMC +~~~~~~~~~~~~~~ + +Tempering MCMC algorithms aim at sampling from a target distribution :math:`p_1(x)` of the form +:math:`p_1(x)=\frac{q_1(x)p_0(x)}{Z_1}` where the factor :math:`q_1(x)` and reference distribution +:math:`p_0(x)` can be evaluated. Additionally, these algorithms return an estimate of the normalization +constant :math:`Z_1=\int{q_{1}(x) p_{0}(x)dx}`. + +The algorithms sample from a sequence of intermediate densities +:math:`p_{\beta}(x) \propto q_{\beta}(x) p_{0}(x)` for values of the parameter :math:`\beta` between 0 and 1 +(:math:`\beta=\frac{1}{T}` where :math:`T` is sometimes called the temperature, :math:`q_{\beta}(x)` is referred to as the intermediate factor associated with tempering parameter :math:`\beta`). +Setting :math:`\beta = 1` equates sampling from the target, while :math:`\beta \rightarrow 0` samples from the +reference distribution. + +Parallel tempering samples from all distributions simultaneously, and the tempering parameters :math:`0 < \beta_1 < \beta_2 < \cdots < \beta_{N} \leq 1` must be +chosen in advance by the user. Sequential tempering on the other hand samples from the various distributions sequentially, starting +from the reference distribution, and the tempering parameters are selected adaptively by the algorithm. + +The :class:`.TemperingMCMC` base class defines inputs that are common to parallel and sequential tempering: + +.. autoclass:: UQpy.sampling.mcmc.tempering_mcmc.TemperingMCMC + :members: + :exclude-members: run, evaluate_normalization_constant + +Parallel Tempering +^^^^^^^^^^^^^^^^^^^^ + +This algorithm (see e.g. :cite:`PTMCMC1` for theory about parallel tempering) runs the chains sampling from the various tempered distributions simultaneously. Periodically during the +run, the different temperatures swap members of their ensemble in a way that preserves detailed balance. The chains +closer to the reference chain (hot chains) can sample from regions that have low probability under the target and +thus allow a better exploration of the parameter space, while the cold chains can better explore regions of high +likelihood. + +In parallel tempering, the normalizing constant :math:`Z_1` is evaluated via thermodynamic integration (:cite:`PTMCMC2`). Defining +the potential function :math:`U_{\beta}(x)=\frac{\partial \log{q_{\beta}(x)}}{\partial \beta}`, then + +:math:`\ln{Z_1} = \log{Z_0} + \int_{0}^{1} E_{x \sim p_{\beta}} \left[ U_{\beta}(x) \right] d\beta` + +where the expectations are approximated via MC sampling using saved samples from the intermediate distributions. A trapezoidal rule is used for integration. + +The :class:`.ParallelTemperingMCMC` class is imported using the following command: + +>>> from UQpy.sampling.mcmc.tempering_mcmc.ParallelTemperingMCMC import ParallelTemperingMCMC + +.. autoclass:: UQpy.sampling.mcmc.ParallelTemperingMCMC + :members: run, evaluate_normalization_constant + +Sequential Tempering +^^^^^^^^^^^^^^^^^^^^ + +This algorithm (first introduced in :cite:`STMCMC_ChingChen`) samples from a series of intermediate targets that are each tempered versions of the final/true +target. In going from one intermediate distribution to the next, the existing samples are resampled according to +some weights (similar to importance sampling). To ensure that there aren't a large number of duplicates, the +resampling step is followed by a short (or even single-step) Metropolis Hastings run that disperses the samples while +remaining within the correct intermediate distribution. The final intermediate target is the required target distribution, +and the samples following this distribution are the required samples. + +The normalization constant :math:`Z_1` is estimated as the product of the normalized sums of the resampling weights for +each intermediate distribution, i.e. if :math:`w_{\beta_j}(x_{j_i})` is the resampling weight corresponding to tempering +parameter :math:`\beta_j`, calculated for the i-th sample for the intermediate distribution associated with :math:`\beta_j`, +then :math:`Z_1 = \prod_{j=1}^{N} \ [ \sum_{i=i}^{\text{nsamples}} \ ]`. The Coefficient of Variance (COV) for this +estimator is also given in :cite:`STMCMC_ChingChen`. + +The :class:`.SequentialTemperingMCMC` class is imported using the following command: + +>>> from UQpy.sampling.mcmc.tempering_mcmc.SequentialTemperingMCMC import SequentialTemperingMCMC + +.. autoclass:: UQpy.sampling.mcmc.tempering_mcmc.SequentialTemperingMCMC + :members: + +Examples +~~~~~~~~~~~~~~~~~~ + +.. toctree:: + + Tempering Examples <../../auto_examples/sampling/tempering/index> \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index d11b73a6c..8331e2b10 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -numpy == 1.21.4 +numpy == 1.22.0 scipy == 1.8.0 matplotlib == 3.5.2 scikit-learn == 1.0.2 diff --git a/src/UQpy/reliability/taylor_series/FORM.py b/src/UQpy/reliability/taylor_series/FORM.py index 47eff2c67..e100cbb16 100644 --- a/src/UQpy/reliability/taylor_series/FORM.py +++ b/src/UQpy/reliability/taylor_series/FORM.py @@ -303,6 +303,7 @@ def run(self, seed_x: Union[list, np.ndarray] = None, else: k = k + 1 + elif (self.tol1 is None) and (self.tol2 is not None) and (self.tol3 is not None): error2 = np.linalg.norm(beta[k + 1] - beta[k]) error3 = np.linalg.norm(dg_u_record[k + 1, :] - dg_u_record[k, :]) diff --git a/src/UQpy/sampling/__init__.py b/src/UQpy/sampling/__init__.py index c583907fb..d4b2ea7c9 100644 --- a/src/UQpy/sampling/__init__.py +++ b/src/UQpy/sampling/__init__.py @@ -1,6 +1,7 @@ from UQpy.sampling.mcmc import * from UQpy.sampling.adaptive_kriging_functions import * from UQpy.sampling.stratified_sampling import * +from UQpy.sampling.mcmc.tempering_mcmc import * from UQpy.sampling.AdaptiveKriging import AdaptiveKriging from UQpy.sampling.ImportanceSampling import ImportanceSampling diff --git a/src/UQpy/sampling/mcmc/MetropolisHastings.py b/src/UQpy/sampling/mcmc/MetropolisHastings.py index a09a9380e..c8f9a4c1a 100644 --- a/src/UQpy/sampling/mcmc/MetropolisHastings.py +++ b/src/UQpy/sampling/mcmc/MetropolisHastings.py @@ -13,22 +13,22 @@ class MetropolisHastings(MCMC): @beartype def __init__( - self, - pdf_target: Union[Callable, list[Callable]] = None, - log_pdf_target: Union[Callable, list[Callable]] = None, - args_target: tuple = None, - burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0, - jump: int = 1, - dimension: int = None, - seed: list = None, - save_log_pdf: bool = False, - concatenate_chains: bool = True, - n_chains: int = None, - proposal: Distribution = None, - proposal_is_symmetric: bool = False, - random_state: RandomStateType = None, - nsamples: PositiveInteger = None, - nsamples_per_chain: PositiveInteger = None, + self, + pdf_target: Union[Callable, list[Callable]] = None, + log_pdf_target: Union[Callable, list[Callable]] = None, + args_target: tuple = None, + burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0, + jump: int = 1, + dimension: int = None, + seed: list = None, + save_log_pdf: bool = False, + concatenate_chains: bool = True, + n_chains: int = None, + proposal: Distribution = None, + proposal_is_symmetric: bool = False, + random_state: RandomStateType = None, + nsamples: PositiveInteger = None, + nsamples_per_chain: PositiveInteger = None, ): """ Metropolis-Hastings algorithm :cite:`MCMC1` :cite:`MCMC2` @@ -115,7 +115,7 @@ def __init__( self.logger.info("\nUQpy: Initialization of " + self.__class__.__name__ + " algorithm complete.") if (nsamples is not None) or (nsamples_per_chain is not None): - self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain,) + self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain, ) def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarray): """ @@ -144,11 +144,11 @@ def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarr ) # this vector will be used to compute accept_ratio of each chain unif_rvs = ( Uniform() - .rvs(nsamples=self.n_chains, random_state=self.random_state) - .reshape((-1,)) + .rvs(nsamples=self.n_chains, random_state=self.random_state) + .reshape((-1,)) ) for nc, (cand, log_p_cand, r_) in enumerate( - zip(candidate, log_p_candidate, log_ratios) + zip(candidate, log_p_candidate, log_ratios) ): accept = np.log(unif_rvs[nc]) < r_ if accept: diff --git a/src/UQpy/sampling/mcmc/__init__.py b/src/UQpy/sampling/mcmc/__init__.py index cbf9288ea..5a8eac1aa 100644 --- a/src/UQpy/sampling/mcmc/__init__.py +++ b/src/UQpy/sampling/mcmc/__init__.py @@ -3,4 +3,6 @@ from UQpy.sampling.mcmc.Stretch import Stretch from UQpy.sampling.mcmc.DRAM import DRAM from UQpy.sampling.mcmc.DREAM import DREAM + from UQpy.sampling.mcmc.baseclass.MCMC import MCMC +from UQpy.sampling.mcmc.tempering_mcmc import * diff --git a/src/UQpy/sampling/mcmc/baseclass/MCMC.py b/src/UQpy/sampling/mcmc/baseclass/MCMC.py index a863c9d7b..2a4936830 100644 --- a/src/UQpy/sampling/mcmc/baseclass/MCMC.py +++ b/src/UQpy/sampling/mcmc/baseclass/MCMC.py @@ -8,7 +8,7 @@ from UQpy.distributions import Distribution from UQpy.utilities.ValidationTypes import * from UQpy.utilities.Utilities import process_random_state -from abc import ABC +from abc import ABC, abstractmethod class MCMC(ABC): @@ -177,29 +177,22 @@ def _concatenate_chains(self): return None def _unconcatenate_chains(self): - self.samples = self.samples.reshape( - (-1, self.n_chains, self.dimension), order="C" - ) + self.samples = self.samples.reshape((-1, self.n_chains, self.dimension), order="C") if self.save_log_pdf: - self.log_pdf_values = self.log_pdf_values.reshape( - (-1, self.n_chains), order="C" - ) + self.log_pdf_values = self.log_pdf_values.reshape((-1, self.n_chains), order="C") return None def _initialize_samples(self, nsamples, nsamples_per_chain): if ((nsamples is not None) and (nsamples_per_chain is not None)) \ or (nsamples is None and nsamples_per_chain is None): raise ValueError("UQpy: Either nsamples or nsamples_per_chain must be provided (not both)") - if nsamples_per_chain is not None: - if not (isinstance(nsamples_per_chain, int) and nsamples_per_chain >= 0): - raise TypeError("UQpy: nsamples_per_chain must be an integer >= 0.") - nsamples = int(nsamples_per_chain * self.n_chains) - else: + if nsamples_per_chain is None: if not (isinstance(nsamples, int) and nsamples >= 0): raise TypeError("UQpy: nsamples must be an integer >= 0.") nsamples_per_chain = int(np.ceil(nsamples / self.n_chains)) - nsamples = int(nsamples_per_chain * self.n_chains) - + elif not (isinstance(nsamples_per_chain, int) and nsamples_per_chain >= 0): + raise TypeError("UQpy: nsamples_per_chain must be an integer >= 0.") + nsamples = int(nsamples_per_chain * self.n_chains) if self.samples is None: # very first call of run, set current_state as the seed and initialize self.samples self.samples = np.zeros((nsamples_per_chain, self.n_chains, self.dimension)) if self.save_log_pdf: @@ -316,3 +309,23 @@ def _check_methods_proposal(proposal_distribution): raise AttributeError("UQpy: The proposal should have a log_pdf or pdf method") proposal_distribution.log_pdf = lambda x: np.log( np.maximum(proposal_distribution.pdf(x), 10 ** (-320) * np.ones((x.shape[0],)))) + + def __copy__(self, **kwargs): + keys = kwargs.keys() + attributes = self.__dict__ + import inspect + initializer_parameters = inspect.signature(self.__class__.__init__).parameters.keys() + + for key in attributes.keys(): + if key not in initializer_parameters: + continue + new_value = attributes[key] if key not in keys else kwargs[key] + if new_value is not None: + kwargs[key] = new_value + + if 'seed' in kwargs.keys(): + kwargs['seed'] = list(kwargs['seed']) + if 'nsamples_per_chain' in kwargs.keys() and kwargs['nsamples_per_chain'] == 0: + del kwargs['nsamples_per_chain'] + + return self.__class__(**kwargs) diff --git a/src/UQpy/sampling/mcmc/tempering_mcmc/ParallelTemperingMCMC.py b/src/UQpy/sampling/mcmc/tempering_mcmc/ParallelTemperingMCMC.py new file mode 100644 index 000000000..9cd6346f6 --- /dev/null +++ b/src/UQpy/sampling/mcmc/tempering_mcmc/ParallelTemperingMCMC.py @@ -0,0 +1,229 @@ +from scipy.special import logsumexp +from scipy.integrate import trapz + +from UQpy.sampling.mcmc import MetropolisHastings +from UQpy.sampling.mcmc.baseclass.MCMC import * +from UQpy.sampling.mcmc.tempering_mcmc.baseclass.TemperingMCMC import TemperingMCMC + + +class ParallelTemperingMCMC(TemperingMCMC): + + @beartype + def __init__(self, n_iterations_between_sweeps: PositiveInteger, + pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), + distribution_reference: Distribution = None, + save_log_pdf: bool = False, nsamples: PositiveInteger = None, + nsamples_per_chain: PositiveInteger = None, + random_state: RandomStateType = None, + tempering_parameters: list = None, + n_tempering_parameters: int = None, + samplers: Union[MCMC, list[MCMC]] = None): + + """ + Class for Parallel-Tempering MCMC. + + :param save_log_pdf: boolean, see :class:`MCMC` documentation. Importantly, this needs to be set to True if + one wants to evaluate the normalization constant via thermodynamic integration. + :param n_iterations_between_sweeps: number of iterations (sampling steps) between sweeps between chains. + :param tempering_parameters: tempering parameters, as a list of N floats increasing from 0. to 1. Either + `tempering_parameters` or `n_tempering_parameters` should be provided + :param n_tempering_parameters: number of tempering levels N, the tempering parameters are selected to follow + a geometric suite by default + :param samplers: :class:`MCMC` object or list of such objects: MCMC samplers used to sample the parallel + chains. If only one object is provided, the same MCMC sampler is used for all chains. Default to running a + simple MH algorithm, where the proposal covariance for a given chain is inversely proportional to the + tempering parameter. + + """ + + super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, + args_pdf_intermediate=args_pdf_intermediate, distribution_reference=None, + save_log_pdf=save_log_pdf, random_state=random_state) + self.logger = logging.getLogger(__name__) + if not isinstance(samplers, list): + self.samplers = [samplers.__copy__() for _ in range(len(tempering_parameters))] + else: + self.samplers = samplers + + self.distribution_reference = distribution_reference + self.evaluate_log_reference = self._preprocess_reference(self.distribution_reference) + + # Initialize PT specific inputs: niter_between_sweeps and temperatures + self.n_iterations_between_sweeps = n_iterations_between_sweeps + self.tempering_parameters = tempering_parameters + self.n_tempering_parameters = n_tempering_parameters + if self.tempering_parameters is None: + if self.n_tempering_parameters is None: + raise ValueError('UQpy: either input tempering_parameters or n_tempering_parameters should be provided.') + elif not (isinstance(self.n_tempering_parameters, int) and self.n_tempering_parameters >= 2): + raise ValueError('UQpy: input n_tempering_parameters should be a integer >= 2.') + else: + self.tempering_parameters = [1. / np.sqrt(2) ** i for i in + range(self.n_tempering_parameters - 1, -1, -1)] + elif (not isinstance(self.tempering_parameters, (list, tuple)) + or not (all(isinstance(t, (int, float)) and (0 < t <= 1.) for t in self.tempering_parameters)) + # or float(self.temperatures[0]) != 1. + ): + raise ValueError( + 'UQpy: tempering_parameters should be a list of floats in [0, 1], starting at 0. and increasing to 1.') + else: + self.n_tempering_parameters = len(self.tempering_parameters) + + # default value + for i, sampler in enumerate(self.samplers): + if isinstance(sampler, MetropolisHastings) and sampler.proposal is None: + from UQpy.distributions import JointIndependent, Normal + self.samplers[i] = sampler.__copy__(proposal_is_symmetric=True, + proposal=JointIndependent( + [Normal(scale=1. / np.sqrt(self.tempering_parameters[i]))] * + sampler.dimension)) + + # Initialize algorithm outputs + self.intermediate_samples = None + """List of samples from the intermediate tempering levels. """ + self.samples = None + """ Samples from the target distribution (tempering parameter = 1). """ + self.log_pdf_values = None + """ Log pdf values of saved samples from the target. """ + self.thermodynamic_integration_results = None + """ Results of the thermodynamic integration (see method `evaluate_normalization_constant`). """ + + self.mcmc_samplers = [] + """List of MCMC samplers, one per tempering level. """ + for i, temper_param in enumerate(self.tempering_parameters): + log_pdf_target = (lambda x, temper_param=temper_param: self.evaluate_log_reference( + x) + self.evaluate_log_intermediate(x, temper_param)) + self.mcmc_samplers.append(self.samplers[i].__copy__(log_pdf_target=log_pdf_target, concatenate_chains=True, + save_log_pdf=save_log_pdf, + random_state=self.random_state)) + + self.logger.info('\nUQpy: Initialization of ' + self.__class__.__name__ + ' algorithm complete.') + + # If nsamples is provided, run the algorithm + if (nsamples is not None) or (nsamples_per_chain is not None): + self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) + + @beartype + def run(self, nsamples: PositiveInteger = None, nsamples_per_chain: PositiveInteger = None): + """ + Run the MCMC algorithm. + + This function samples from the MCMC chains and appends samples to existing ones (if any). This method + leverages the `run_iterations` method specific to each of the samplers. + + :param nsamples: Number of samples to generate from the target (the same number of samples will be generated + for all intermediate distributions). + :param nsamples_per_chain: Number of samples per chain to generate from the target. Either + `nsamples` or `nsamples_per_chain` must be provided (not both) + + """ + current_state, current_log_pdf = [], [] + final_ns_per_chain = 0 + for i, mcmc_sampler in enumerate(self.mcmc_samplers): + if mcmc_sampler.evaluate_log_target is None and mcmc_sampler.evaluate_log_target_marginals is None: + (mcmc_sampler.evaluate_log_target, mcmc_sampler.evaluate_log_target_marginals,) = \ + mcmc_sampler._preprocess_target(pdf_=mcmc_sampler.pdf_target, + log_pdf_=mcmc_sampler.log_pdf_target, + args=mcmc_sampler.args_target) + ns, ns_per_chain, current_state_t, current_log_pdf_t = mcmc_sampler._initialize_samples( + nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) + current_state.append(current_state_t.copy()) + current_log_pdf.append(current_log_pdf_t.copy()) + if i == 0: + final_ns_per_chain = ns_per_chain + + self.logger.info('UQpy: Running MCMC...') + + # Run nsims iterations of the MCMC algorithm, starting at current_state + while self.mcmc_samplers[0].nsamples_per_chain < final_ns_per_chain: + # update the total number of iterations + + # run one iteration of MCMC algorithms at various temperatures + new_state, new_log_pdf = [], [] + for t, sampler in enumerate(self.mcmc_samplers): + sampler.iterations_number += 1 + new_state_t, new_log_pdf_t = sampler.run_one_iteration( + current_state[t], current_log_pdf[t]) + new_state.append(new_state_t.copy()) + new_log_pdf.append(new_log_pdf_t.copy()) + + # Do sweeps if necessary + if self.mcmc_samplers[-1].iterations_number % self.n_iterations_between_sweeps == 0: + for i in range(self.n_tempering_parameters - 1): + log_accept = (self.mcmc_samplers[i].evaluate_log_target(new_state[i + 1]) + + self.mcmc_samplers[i + 1].evaluate_log_target(new_state[i]) - + self.mcmc_samplers[i].evaluate_log_target(new_state[i]) - + self.mcmc_samplers[i + 1].evaluate_log_target(new_state[i + 1])) + for nc, log_accept_chain in enumerate(log_accept): + if np.log(self.random_state.rand()) < log_accept_chain: + new_state[i][nc], new_state[i + 1][nc] = new_state[i + 1][nc], new_state[i][nc] + new_log_pdf[i][nc], new_log_pdf[i + 1][nc] = new_log_pdf[i + 1][nc], new_log_pdf[i][nc] + + # Update the chain, only if burn-in is over and the sample is not being jumped over + # also increase the current number of samples and samples_per_chain + if self.mcmc_samplers[-1].iterations_number > self.mcmc_samplers[-1].burn_length and \ + (self.mcmc_samplers[-1].iterations_number - + self.mcmc_samplers[-1].burn_length) % self.mcmc_samplers[-1].jump == 0: + for t, sampler in enumerate(self.mcmc_samplers): + sampler.samples[sampler.nsamples_per_chain, :, :] = new_state[t].copy() + if self.save_log_pdf: + sampler.log_pdf_values[sampler.nsamples_per_chain, :] = new_log_pdf[t].copy() + sampler.nsamples_per_chain += 1 + sampler.samples_counter += sampler.n_chains + + self.logger.info('UQpy: MCMC run successfully !') + + # Concatenate chains maybe + if self.mcmc_samplers[-1].concatenate_chains: + for t, mcmc_sampler in enumerate(self.mcmc_samplers): + mcmc_sampler._concatenate_chains() + + # Samples connect to posterior samples, i.e. the chain with beta=1. + self.intermediate_samples = [sampler.samples for sampler in self.mcmc_samplers] + self.samples = self.mcmc_samplers[-1].samples + if self.save_log_pdf: + self.log_pdf_values = self.mcmc_samplers[-1].log_pdf_values + + @beartype + def evaluate_normalization_constant(self, compute_potential, log_Z0: float = None, nsamples_from_p0: int = None): + """ + Evaluate normalization constant :math:`Z_1`. + + The function returns an approximation of :math:`Z_1`, and saves intermediate results + (value of :math:`\ln{Z_0}`, list of tempering parameters used in integration, and values of the associated + expected potentials. + + :param compute_potential: Function that takes three inputs: :code:`x` (sample points where to evaluate the + potential), :code:`log_factor_tempered_values` (values of the log intermediate factors evaluated at points + :code:`x`), :code:`temper_param` (tempering parameter) and evaluates the potential: + :param log_Z0: Value of :math:`\ln{Z_{0}}` (float), if unknwon, see `nsamples_from_p0`. + :param nsamples_from_p0: number of samples from the reference distribution to sample to evaluate + :math:`\ln{Z_{0}}`. Used only if input `log_Z0` is not provided. + + """ + if not self.save_log_pdf: + raise NotImplementedError('UQpy: the evidence cannot be computed when save_log_pdf is set to False.') + if log_Z0 is None and nsamples_from_p0 is None: + raise ValueError('UQpy: input log_Z0 or nsamples_from_p0 should be provided.') + # compute average of log_target for the target at various temperatures + log_pdf_averages = [] + for i, (temper_param, sampler) in enumerate(zip(self.tempering_parameters, self.mcmc_samplers)): + log_factor_values = sampler.log_pdf_values - self.evaluate_log_reference(sampler.samples) + potential_values = compute_potential( + x=sampler.samples, temper_param=temper_param, log_intermediate_values=log_factor_values) + log_pdf_averages.append(np.mean(potential_values)) + + # use quadrature to integrate between 0 and 1 + temper_param_list_for_integration = np.copy(np.array(self.tempering_parameters)) + log_pdf_averages = np.array(log_pdf_averages) + int_value = trapz(x=temper_param_list_for_integration, y=log_pdf_averages) + if log_Z0 is None: + samples_p0 = self.distribution_reference.rvs(nsamples=nsamples_from_p0) + log_Z0 = np.log(1. / nsamples_from_p0) + logsumexp( + self.evaluate_log_intermediate(x=samples_p0, temper_param=self.tempering_parameters[0])) + + self.thermodynamic_integration_results = { + 'log_Z0': log_Z0, 'temper_param_list': temper_param_list_for_integration, + 'expect_potentials': log_pdf_averages} + + return np.exp(int_value + log_Z0) diff --git a/src/UQpy/sampling/mcmc/tempering_mcmc/SequentialTemperingMCMC.py b/src/UQpy/sampling/mcmc/tempering_mcmc/SequentialTemperingMCMC.py new file mode 100644 index 000000000..bed012745 --- /dev/null +++ b/src/UQpy/sampling/mcmc/tempering_mcmc/SequentialTemperingMCMC.py @@ -0,0 +1,359 @@ +import copy + +import scipy.stats as stats + +from UQpy.sampling.mcmc.MetropolisHastings import MetropolisHastings +from UQpy.distributions.collection.MultivariateNormal import MultivariateNormal +from UQpy.sampling.mcmc.baseclass.MCMC import * +from UQpy.sampling.mcmc.tempering_mcmc.baseclass.TemperingMCMC import TemperingMCMC + + +class SequentialTemperingMCMC(TemperingMCMC): + + @beartype + def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), seed=None, + distribution_reference: Distribution = None, + sampler: MCMC = None, + nsamples: PositiveInteger = None, + recalculate_weights: bool = False, + save_intermediate_samples=False, + percentage_resampling: int = 100, + random_state: RandomStateType = None, + resampling_burn_length: int = 0, + resampling_proposal: Distribution = None, + resampling_proposal_is_symmetric: bool = True): + + """ + Class for Sequential-Tempering MCMC + + :param sampler: :class:`MCMC` object: MCMC samplers used to draw the remaining samples for the intermediate + distribution after the resampling step. Default to running a simple MH algorithm, where the proposal covariance + is calculated as per the procedure given in Ching and Chen (2007) and Betz et. al. (2016). + :param recalculate_weights: boolean: To be set to true if the resampling weights are to be recalculated after + each point is generated during the resampling step. This is done so that the resampling weights are in + accordance with the new sample generated after Metropolis Hastings is used for dispersion to ensure uniqueness + of the samples. + :param save_intermediate_samples: boolean: To be set to true to save the samples that are generated according to + the intermediate distributions. + :param percentage_resampling: float: Indicates what percentage of samples for a given intermediate distribution + are to be generated through resampling from the set of samples generated for the previous intermediate + distribution. + :param resampling_burn_length: int: Burn-in length for the Metropolis Hastings dispersion step to ensure + uniqueness. + :param resampling_proposal: :class:`.Distribution` object. The proposal distribution for the Metropolis Hastings + dispersion step. + :param resampling_proposal_is_symmetric: boolean: Indicates whether the provided resampling proposal is + symmetric. + """ + + self.proposal = resampling_proposal + self.proposal_is_symmetric = resampling_proposal_is_symmetric + self.resampling_burn_length = resampling_burn_length + self.logger = logging.getLogger(__name__) + + super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, + args_pdf_intermediate=args_pdf_intermediate, distribution_reference=distribution_reference, + random_state=random_state) + self.logger = logging.getLogger(__name__) + + self.sampler = sampler + # Initialize inputs + self.save_intermediate_samples = save_intermediate_samples + self.recalculate_weights = recalculate_weights + + self.resample_fraction = percentage_resampling / 100 + + self.__dimension = sampler.dimension + self.__n_chains = sampler.n_chains + + self.n_samples_per_chain = int(np.floor(((1 - self.resample_fraction) * nsamples) / self.__n_chains)) + self.n_resamples = int(nsamples - (self.n_samples_per_chain * self.__n_chains)) + + # Initialize input distributions + self.evaluate_log_reference, self.seed = self._preprocess_reference(dist_=distribution_reference, + seed_=seed, nsamples=nsamples, + dimension=self.__dimension, + random_state=self.random_state) + + # Initialize flag that indicates whether default proposal is to be used (default proposal defined adaptively + # during run) + self.proposal_given_flag = self.proposal is not None + # Initialize attributes + self.tempering_parameters = None + self.evidence = None + self.evidence_CoV = None + + # Call the run function + if nsamples is not None: + self.run(nsamples=nsamples) + else: + raise ValueError('UQpy: a value for "nsamples" must be specified ') + + @beartype + def run(self, nsamples: PositiveInteger = None): + """ + Run the MCMC algorithm. + + This function samples from each intermediate distribution until samples from the target are generated. Samples + cannot be appended to existing samples in this method. It leverages the `run_iterations` method specific to the + sampler. + + :param nsamples: Number of samples to generate from the target (the same number of samples will be generated + for all intermediate distributions). + + """ + self.logger.info('TMCMC Start') + + if self.samples is not None: + raise RuntimeError('UQpy: run method cannot be called multiple times for the same object') + + points = self.seed # Generated Samples from prior for zero-th tempering level + + # Initializing other variables + current_tempering_parameter = 0.0 # Intermediate exponent + previous_tempering_parameter = current_tempering_parameter + self.tempering_parameters = np.array(current_tempering_parameter) + pts_index = np.arange(nsamples) # Array storing sample indices + weights = np.zeros(nsamples) # Array storing plausibility weights + weight_probabilities = np.zeros(nsamples) # Array storing plausibility weight probabilities + expected_q0 = sum( + np.exp(self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), 0.0)) + for i in range(nsamples)) / nsamples + + evidence_estimator = expected_q0 + + if self.save_intermediate_samples is True: + self.intermediate_samples = [] + self.intermediate_samples += [points.copy()] + + # Calculate covariance matrix for the default proposal + cov_scale = 0.2 + # Looping over all adaptively decided tempering levels + while current_tempering_parameter < 1: + + # Copy the state of the points array + points_copy = np.copy(points) + + # Adaptively set the tempering exponent for the current level + previous_tempering_parameter = current_tempering_parameter + current_tempering_parameter = self._find_temper_param(previous_tempering_parameter, points, + self.evaluate_log_intermediate, nsamples) + # d_exp = temper_param - temper_param_prev + self.tempering_parameters = np.append(self.tempering_parameters, current_tempering_parameter) + + self.logger.info('beta selected') + + # Calculate the plausibility weights + for i in range(nsamples): + weights[i] = np.exp(self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), + current_tempering_parameter) + - self.evaluate_log_intermediate(points[i, :].reshape((1, -1)), + previous_tempering_parameter)) + + # Calculate normalizing constant for the plausibility weights (sum of the weights) + w_sum = np.sum(weights) + # Calculate evidence from each tempering level + evidence_estimator = evidence_estimator * (w_sum / nsamples) + # Normalize plausibility weight probabilities + weight_probabilities = (weights / w_sum) + + w_theta_sum = np.zeros(self.__dimension) + for i in range(nsamples): + for j in range(self.__dimension): + w_theta_sum[j] += weights[i] * points[i, j] + sigma_matrix = np.zeros((self.__dimension, self.__dimension)) + for i in range(nsamples): + points_deviation = np.zeros((self.__dimension, 1)) + for j in range(self.__dimension): + points_deviation[j, 0] = points[i, j] - (w_theta_sum[j] / w_sum) + sigma_matrix += (weights[i] / w_sum) * np.dot(points_deviation, + points_deviation.T) # Normalized by w_sum as per Betz et al + sigma_matrix = cov_scale ** 2 * sigma_matrix + + mcmc_log_pdf_target = self._target_generator(self.evaluate_log_intermediate, + self.evaluate_log_reference, current_tempering_parameter) + + self.logger.info('Begin Resampling') + # Resampling and MH-MCMC step + for i in range(self.n_resamples): + + # Resampling from previous tempering level + lead_index = int(self.random_state.choice(pts_index, p=weight_probabilities)) + lead = points_copy[lead_index] + + # Defining the default proposal + if self.proposal_given_flag is False: + self.proposal = MultivariateNormal(lead, cov=sigma_matrix) + + # Single MH-MCMC step + x = MetropolisHastings(dimension=self.__dimension, log_pdf_target=mcmc_log_pdf_target, seed=list(lead), + nsamples=1, n_chains=1, burn_length=self.resampling_burn_length, + proposal=self.proposal, random_state=self.random_state, + proposal_is_symmetric=self.proposal_is_symmetric) + + # Setting the generated sample in the array + points[i] = x.samples + points_copy[lead_index] = x.samples + + if self.recalculate_weights: + weights[lead_index] = np.exp( + self.evaluate_log_intermediate(points[lead_index, :].reshape((1, -1)), current_tempering_parameter) + - self.evaluate_log_intermediate(points[lead_index, :].reshape((1, -1)), previous_tempering_parameter)) + w_sum = np.sum(weights) + for j in range(nsamples): + weight_probabilities[j] = weights[j] / w_sum + + self.logger.info('Begin MCMC') + mcmc_seed = self._mcmc_seed_generator(resampled_pts=points[0:self.n_resamples, :], + arr_length=self.n_resamples, + seed_length=self.__n_chains, + random_state=self.random_state) + + y = copy.deepcopy(self.sampler) + self.update_target_and_seed(y, mcmc_seed, mcmc_log_pdf_target) + y = self.sampler.__copy__(log_pdf_target=mcmc_log_pdf_target, seed=mcmc_seed, + nsamples_per_chain=self.n_samples_per_chain, + concatenate_chains=True, random_state=self.random_state) + points[self.n_resamples:, :] = y.samples + + if self.save_intermediate_samples is True: + self.intermediate_samples += [points.copy()] + + self.logger.info('Tempering level ended') + + # Setting the calculated values to the attributes + self.samples = points + self.evidence = evidence_estimator + + def update_target_and_seed(self, mcmc_class, mcmc_seed, mcmc_log_pdf_target): + mcmc_class.seed = mcmc_seed + mcmc_class.log_pdf_target = mcmc_log_pdf_target + mcmc_class.pdf_target = None + (mcmc_class.evaluate_log_target, mcmc_class.evaluate_log_target_marginals,) = \ + mcmc_class._preprocess_target(pdf_=None, log_pdf_=mcmc_class.log_pdf_target, args=None) + + def evaluate_normalization_constant(self): + return self.evidence + + @staticmethod + def _find_temper_param(temper_param_prev, samples, q_func, n, iter_lim=1000, iter_thresh=0.00001): + """ + Find the tempering parameter for the next intermediate target using bisection search between 1.0 and the + previous tempering parameter (taken to be 0.0 for the first level). + + **Inputs:** + + * **temper_param_prev** ('float'): + The value of the previous tempering parameter + + * **samples** (`ndarray`): + Generated samples from the previous intermediate target distribution + + * **q_func** (callable): + The intermediate distribution (called 'self.evaluate_log_intermediate' in this code) + + * **n** ('int'): + Number of samples + + * **iter_lim** ('int'): + Number of iterations to run the bisection search algorithm for, to avoid infinite loops + + * **iter_thresh** ('float'): + Threshold on the bisection interval, to avoid infinite loops + """ + bot = temper_param_prev + top = 1.0 + flag = 0 # Indicates when the tempering exponent has been found (flag = 1 => solution found) + loop_counter = 0 + while flag == 0: + loop_counter += 1 + q_scaled = np.zeros(n) + temper_param_trial = ((bot + top) / 2) + for i2 in range(n): + q_scaled[i2] = np.exp(q_func(samples[i2, :].reshape((1, -1)), 1) + - q_func(samples[i2, :].reshape((1, -1)), temper_param_prev)) + sigma_1 = np.std(q_scaled) + mu_1 = np.mean(q_scaled) + if sigma_1 < mu_1: + flag = 1 + temper_param_trial = 1 + continue + for i3 in range(n): + q_scaled[i3] = np.exp(q_func(samples[i3, :].reshape((1, -1)), temper_param_trial) + - q_func(samples[i3, :].reshape((1, -1)), temper_param_prev)) + sigma = np.std(q_scaled) + mu = np.mean(q_scaled) + if sigma < (0.9 * mu): + bot = temper_param_trial + elif sigma > (1.1 * mu): + top = temper_param_trial + else: + flag = 1 + if loop_counter > iter_lim: + flag = 2 + raise RuntimeError('UQpy: unable to find tempering exponent due to nonconvergence') + if top - bot <= iter_thresh: + flag = 3 + raise RuntimeError('UQpy: unable to find tempering exponent due to nonconvergence') + return temper_param_trial + + def _preprocess_reference(self, dist_, seed_=None, nsamples=None, dimension=None, random_state=None): + """ + Preprocess the target pdf inputs. + + Utility function (static method), that if given a distribution object, returns the log pdf of the target + distribution of the first tempering level (the prior in a Bayesian setting), and generates the samples from this + level. If instead the samples of the first level are passed, then the function passes these samples to the rest + of the algorithm, and does a Kernel Density Approximation to estimate the log pdf of the target distribution for + this level (as specified by the given sample points). + + **Inputs:** + + * seed_ ('ndarray'): The samples of the first tempering level + * prior_ ('Distribution' object): Target distribution for the first tempering level + * nsamples (int): Number of samples to be generated + * dimension (int): The dimension of the sample space + + **Output/Returns:** + + * evaluate_log_pdf (callable): Callable that computes the log of the target density function (the prior) + """ + + if dist_ is not None: + if not (isinstance(dist_, Distribution)): + raise TypeError('UQpy: A UQpy.Distribution object must be provided.') + else: + evaluate_log_pdf = (lambda x: dist_.log_pdf(x)) + if seed_ is not None: + if seed_.shape[0] == nsamples and seed_.shape[1] == dimension: + seed_values = seed_ + else: + raise TypeError('UQpy: the seed values should be a numpy array of size (nsamples, dimension)') + else: + seed_values = dist_.rvs(nsamples=nsamples, random_state=random_state) + else: + raise ValueError('UQpy: prior distribution must be provided') + return evaluate_log_pdf, seed_values + + @staticmethod + def _mcmc_seed_generator(resampled_pts, arr_length, seed_length, random_state): + """ + Generates the seed from the resampled samples for the mcmc step + + Utility function (static method), that returns a selection of the resampled points (at any tempering level) to + be used as the seed for the following mcmc exploration step. + + **Inputs:** + + * resampled_pts ('ndarray'): The resampled samples of the tempering level + * arr_length (int): Length of resampled_pts + * seed_length (int): Number of samples needed in the seed (same as nchains) + + **Output/Returns:** + + * evaluate_log_pdf (callable): Callable that computes the log of the target density function (the prior) + """ + index_arr = np.arange(arr_length) + seed_indices = random_state.choice(index_arr, size=seed_length, replace=False) + mcmc_seed = resampled_pts[seed_indices, :] + return mcmc_seed diff --git a/src/UQpy/sampling/mcmc/tempering_mcmc/__init__.py b/src/UQpy/sampling/mcmc/tempering_mcmc/__init__.py new file mode 100644 index 000000000..50f45aa2f --- /dev/null +++ b/src/UQpy/sampling/mcmc/tempering_mcmc/__init__.py @@ -0,0 +1,4 @@ +from UQpy.sampling.mcmc.tempering_mcmc.ParallelTemperingMCMC import ParallelTemperingMCMC +from UQpy.sampling.mcmc.tempering_mcmc.SequentialTemperingMCMC import SequentialTemperingMCMC + +from UQpy.sampling.mcmc.tempering_mcmc.baseclass import * diff --git a/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/TemperingMCMC.py b/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/TemperingMCMC.py new file mode 100644 index 000000000..00d0921ba --- /dev/null +++ b/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/TemperingMCMC.py @@ -0,0 +1,121 @@ +from UQpy.sampling.mcmc.baseclass.MCMC import * +from abc import ABC + + +class TemperingMCMC(ABC): + + def __init__(self, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), + distribution_reference=None, save_log_pdf=True, random_state=None): + """ + Parent class to parallel and sequential tempering MCMC algorithms. + + :param pdf_intermediate: callable that computes the intermediate factor. It should take at + least two inputs :code:`x` (ndarray, point(s) at which to evaluate the function), and :code:`temper_param` (float, + tempering parameter). Eit her `pdf_intermediate` or `log_pdf_intermediate` must be provided + (`log_pdf_intermediate` is preferred). Within the code, the `log_pdf_intermediate` is evaluated as: + :code:`log_pdf_intermediate(x, temper_param, *args_pdf_intermediate)` + where `args_pdf_intermediate` are additional positional arguments that are provided to the class via its + `args_pdf_intermediate` input + :param log_pdf_intermediate: see `pdf_intermediate` + :param args_pdf_intermediate: see `pdf_intermediate` + :param distribution_reference: reference pdf :math:`p_0` as a :class:`.Distribution` object + :param save_log_pdf: see same input in :class:`MCMC` + """ + self.logger = logging.getLogger(__name__) + # Check a few inputs + self.save_log_pdf = save_log_pdf + self.random_state = process_random_state(random_state) + + # Initialize the prior and likelihood + self.evaluate_log_intermediate = self._preprocess_intermediate( + log_pdf_=log_pdf_intermediate, pdf_=pdf_intermediate, args=args_pdf_intermediate) + if not (isinstance(distribution_reference, Distribution) or (distribution_reference is None)): + raise TypeError('UQpy: if provided, input distribution_reference should be a UQpy.Distribution object.') + # self.evaluate_log_reference = self._preprocess_reference(dist_=distribution_reference, args=()) + + # Initialize the outputs + self.samples = None + self.intermediate_samples = None + if self.save_log_pdf: + self.log_pdf_values = None + + @abstractmethod + def run(self, nsamples): + """ Run the tempering MCMC algorithms to generate nsamples from the target posterior """ + pass + + @abstractmethod + def evaluate_normalization_constant(self, **kwargs): + """ Computes the normalization constant :math:`Z_{1}=\int{q_{1}(x) p_{0}(x)dx}` where :math:`p_0` is the + reference pdf and :math:`q_1` is the target factor.""" + pass + + def _preprocess_reference(self, dist_, **kwargs): + """ + Preprocess the target pdf inputs. + + Utility function (static method), that transforms the log_pdf, pdf, args inputs into a function that evaluates + log_pdf_target(x) for a given x. If the target is given as a list of callables (marginal pdfs), the list of + log margianals is also returned. + + **Inputs:** + + * dist_ (distribution object) + + **Output/Returns:** + + * evaluate_log_pdf (callable): Callable that computes the log of the target density function + + """ + # log_pdf is provided + if dist_ is None: + evaluate_log_pdf = None + elif isinstance(dist_, Distribution): + evaluate_log_pdf = (lambda x: dist_.log_pdf(x)) + else: + raise TypeError('UQpy: A UQpy.Distribution object must be provided.') + return evaluate_log_pdf + + @staticmethod + def _preprocess_intermediate(log_pdf_, pdf_, args): + """ + Preprocess the target pdf inputs. + + Utility function (static method), that transforms the log_pdf, pdf, args inputs into a function that evaluates + log_pdf_target(x, beta) for a given x. If the target is given as a list of callables (marginal pdfs), the list of + log margianals is also returned. + + **Inputs:** + + * log_pdf_ (callable): Log of the target density function from which to draw random samples. Either + pdf_target or log_pdf_target must be provided. + * pdf_ (callable): Target density function from which to draw random samples. Either pdf_target or + log_pdf_target must be provided. + * args (tuple): Positional arguments of the pdf target. + + **Output/Returns:** + + * evaluate_log_pdf (callable): Callable that computes the log of the target density function + + """ + # log_pdf is provided + if log_pdf_ is not None: + if not callable(log_pdf_): + raise TypeError('UQpy: log_pdf_intermediate must be a callable') + if args is None: + args = () + evaluate_log_pdf = (lambda x, temper_param: log_pdf_(x, temper_param, *args)) + elif pdf_ is not None: + if not callable(pdf_): + raise TypeError('UQpy: pdf_intermediate must be a callable') + if args is None: + args = () + evaluate_log_pdf = (lambda x, temper_param: np.log( + np.maximum(pdf_(x, temper_param, *args), 10 ** (-320) * np.ones((x.shape[0],))))) + else: + raise ValueError('UQpy: log_pdf_intermediate or pdf_intermediate must be provided') + return evaluate_log_pdf + + @staticmethod + def _target_generator(intermediate_logpdf_, reference_logpdf_, temper_param_): + return lambda x: (reference_logpdf_(x) + intermediate_logpdf_(x, temper_param_)) diff --git a/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/__init__.py b/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/__init__.py new file mode 100644 index 000000000..589679839 --- /dev/null +++ b/src/UQpy/sampling/mcmc/tempering_mcmc/baseclass/__init__.py @@ -0,0 +1 @@ +from UQpy.sampling.mcmc.tempering_mcmc.baseclass.TemperingMCMC import TemperingMCMC diff --git a/tests/unit_tests/sampling/test_tempering.py b/tests/unit_tests/sampling/test_tempering.py new file mode 100644 index 000000000..c264ff3d1 --- /dev/null +++ b/tests/unit_tests/sampling/test_tempering.py @@ -0,0 +1,137 @@ +import numpy as np +from scipy.stats import multivariate_normal, uniform +from UQpy.distributions import Uniform, JointIndependent, MultivariateNormal +from UQpy.sampling import MetropolisHastings, ParallelTemperingMCMC +from UQpy.sampling.mcmc.tempering_mcmc.SequentialTemperingMCMC import SequentialTemperingMCMC + + +def log_rosenbrock(x): + return -(100 * (x[:, 1] - x[:, 0] ** 2) ** 2 + (1 - x[:, 0]) ** 2) / 20 + + +def log_intermediate(x, beta): + return beta * log_rosenbrock(x) + + +def log_prior(x): + loc, scale = -20., 40. + return Uniform(loc=loc, scale=scale).log_pdf(x[:, 0]) + Uniform(loc=loc, scale=scale).log_pdf(x[:, 1]) + + +def compute_potential(x, temper_param, log_intermediate_values): + return log_intermediate_values / temper_param + + +random_state = np.random.RandomState(1234) +seed = -2. + 4. * random_state.rand(5, 2) +betas = [1. / np.sqrt(2.) ** i for i in range(10 - 1, -1, -1)] + +prior_distribution = JointIndependent(marginals=[Uniform(loc=-2, scale=4), Uniform(loc=-2, scale=4)]) + + +def test_parallel(): + samplers = [MetropolisHastings(burn_length=10, jump=2, seed=list(seed), dimension=2) for _ in range(len(betas))] + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, + distribution_reference=prior_distribution, + n_iterations_between_sweeps=4, + tempering_parameters=betas, + random_state=3456, + save_log_pdf=False, samplers=samplers) + mcmc.run(nsamples_per_chain=100) + assert mcmc.samples.shape == (500, 2) + + +def test_thermodynamic_integration(): + samplers = [MetropolisHastings(burn_length=10, jump=2, seed=list(seed), dimension=2) for _ in range(len(betas))] + mcmc = ParallelTemperingMCMC(log_pdf_intermediate=log_intermediate, + distribution_reference=prior_distribution, + n_iterations_between_sweeps=4, + tempering_parameters=betas, + save_log_pdf=True, + random_state=3456, + samplers=samplers) + mcmc.run(nsamples_per_chain=100) + log_ev = mcmc.evaluate_normalization_constant(compute_potential=compute_potential, log_Z0=0.) + assert np.round(log_ev, 4) == 0.203 + + +def likelihood(x, b): + mu1 = np.array([1., 1.]) + mu2 = -0.8 * np.ones(2) + w1 = 0.5 + # Width of 0.1 in each dimension + sigma1 = np.diag([0.02, 0.05]) + sigma2 = np.diag([0.05, 0.02]) + + # Posterior is a mixture of two gaussians + like = np.exp(np.logaddexp(np.log(w1) + multivariate_normal.logpdf(x=x, mean=mu1, cov=sigma1), + np.log(1. - w1) + multivariate_normal.logpdf(x=x, mean=mu2, cov=sigma2))) + return like ** b + + +def test_sequential(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + sampler = MetropolisHastings(dimension=2, n_chains=20) + test = SequentialTemperingMCMC(pdf_intermediate=likelihood, + distribution_reference=prior, + save_intermediate_samples=True, + percentage_resampling=10, + random_state=960242069, + sampler=sampler, + nsamples=100) + assert np.round(test.evidence, 4) == 0.0437 + + +def test_sequential_recalculated_weights(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + sampler = MetropolisHastings(dimension=2, n_chains=20) + test = SequentialTemperingMCMC(recalculate_weights=True, + pdf_intermediate=likelihood, + distribution_reference=prior, + percentage_resampling=10, + sampler=sampler, + nsamples=100, + random_state=960242069) + assert np.round(test.evidence, 4) == 0.082 + + +def test_sequential_evaluate_normalization_constant_method_check(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + sampler = MetropolisHastings(dimension=2, n_chains=20) + test = SequentialTemperingMCMC(pdf_intermediate=likelihood, + distribution_reference=prior, + percentage_resampling=10, + sampler=sampler, + nsamples=100, + random_state=960242069) + assert np.round(test.evaluate_normalization_constant(), 4) == 0.0437 + + +def test_sequential_proposal_given(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + sampler = MetropolisHastings(dimension=2, n_chains=20) + test = SequentialTemperingMCMC(pdf_intermediate=likelihood, + resampling_proposal=MultivariateNormal(mean=np.zeros(2), cov=1.0), + distribution_reference=prior, + percentage_resampling=10, + sampler=sampler, + nsamples=100, + random_state=960242069) + assert np.round(test.evaluate_normalization_constant(), 4) == 0.0437 + + +def test_sequential_seed_given(): + prior = JointIndependent(marginals=[Uniform(loc=-2.0, scale=4.0), Uniform(loc=-2.0, scale=4.0)]) + sampler = MetropolisHastings(dimension=2, n_chains=20) + samps = (np.array([uniform.rvs(loc=-2.0, scale=4.0, size=100, random_state=960242069), + uniform.rvs(loc=-2.0, scale=4.0, size=100, random_state=24969420)]).T) + test = SequentialTemperingMCMC(pdf_intermediate=likelihood, + seed=samps, + distribution_reference=prior, + save_intermediate_samples=True, + percentage_resampling=10, + sampler=sampler, + nsamples=100, + random_state=960242069) + assert np.round(test.evaluate_normalization_constant(), 4) == 0.0489 + diff --git a/tests/unit_tests/sampling/test_true_stratified.py b/tests/unit_tests/sampling/test_true_stratified.py index 1114d566c..e5881a3eb 100644 --- a/tests/unit_tests/sampling/test_true_stratified.py +++ b/tests/unit_tests/sampling/test_true_stratified.py @@ -18,6 +18,7 @@ def test_rectangular_sts(): assert x.samples[9, 1] == 0.5495253722712197 + def test_delaunay_sts(): marginals = [Exponential(loc=1., scale=1.), Exponential(loc=1., scale=1.)] seeds = np.array([[0, 0], [0.4, 0.8], [1, 0], [1, 1]])