Source code for UQpy.sampling.mcmc.tempering_mcmc.ParallelTemperingMCMC

from scipy.special import logsumexp
from scipy.integrate import trapezoid

from UQpy.sampling.mcmc import MetropolisHastings
from UQpy.sampling.mcmc.baseclass.MCMC import *
from UQpy.sampling.mcmc.tempering_mcmc.baseclass.TemperingMCMC import TemperingMCMC


[docs]class ParallelTemperingMCMC(TemperingMCMC): @beartype def __init__(self, n_iterations_between_sweeps: PositiveInteger, pdf_intermediate=None, log_pdf_intermediate=None, args_pdf_intermediate=(), distribution_reference: Distribution = None, save_log_pdf: bool = False, nsamples: PositiveInteger = None, nsamples_per_chain: PositiveInteger = None, random_state: RandomStateType = None, tempering_parameters: list = None, n_tempering_parameters: int = None, samplers: Union[MCMC, list[MCMC]] = None): """ Class for Parallel-Tempering MCMC. :param save_log_pdf: boolean, see :class:`MCMC` documentation. Importantly, this needs to be set to True if one wants to evaluate the normalization constant via thermodynamic integration. :param n_iterations_between_sweeps: number of iterations (sampling steps) between sweeps between chains. :param tempering_parameters: tempering parameters, as a list of N floats increasing from 0. to 1. Either `tempering_parameters` or `n_tempering_parameters` should be provided :param n_tempering_parameters: number of tempering levels N, the tempering parameters are selected to follow a geometric suite by default :param samplers: :class:`MCMC` object or list of such objects: MCMC samplers used to sample the parallel chains. If only one object is provided, the same MCMC sampler is used for all chains. Default to running a simple MH algorithm, where the proposal covariance for a given chain is inversely proportional to the tempering parameter. """ super().__init__(pdf_intermediate=pdf_intermediate, log_pdf_intermediate=log_pdf_intermediate, args_pdf_intermediate=args_pdf_intermediate, distribution_reference=None, save_log_pdf=save_log_pdf, random_state=random_state) self.logger = logging.getLogger(__name__) if not isinstance(samplers, list): self.samplers = [samplers.__copy__() for _ in range(len(tempering_parameters))] else: self.samplers = samplers self.distribution_reference = distribution_reference self.evaluate_log_reference = self._preprocess_reference(self.distribution_reference) # Initialize PT specific inputs: niter_between_sweeps and temperatures self.n_iterations_between_sweeps = n_iterations_between_sweeps self.tempering_parameters = tempering_parameters self.n_tempering_parameters = n_tempering_parameters if self.tempering_parameters is None: if self.n_tempering_parameters is None: raise ValueError('UQpy: either input tempering_parameters or n_tempering_parameters should be provided.') elif not (isinstance(self.n_tempering_parameters, int) and self.n_tempering_parameters >= 2): raise ValueError('UQpy: input n_tempering_parameters should be a integer >= 2.') else: self.tempering_parameters = [1. / np.sqrt(2) ** i for i in range(self.n_tempering_parameters - 1, -1, -1)] elif (not isinstance(self.tempering_parameters, (list, tuple)) or not (all(isinstance(t, (int, float)) and (0 < t <= 1.) for t in self.tempering_parameters)) # or float(self.temperatures[0]) != 1. ): raise ValueError( 'UQpy: tempering_parameters should be a list of floats in [0, 1], starting at 0. and increasing to 1.') else: self.n_tempering_parameters = len(self.tempering_parameters) # default value for i, sampler in enumerate(self.samplers): if isinstance(sampler, MetropolisHastings) and sampler.proposal is None: from UQpy.distributions import JointIndependent, Normal self.samplers[i] = sampler.__copy__(proposal_is_symmetric=True, proposal=JointIndependent( [Normal(scale=1. / np.sqrt(self.tempering_parameters[i]))] * sampler.dimension)) # Initialize algorithm outputs self.intermediate_samples = None """List of samples from the intermediate tempering levels. """ self.samples = None """ Samples from the target distribution (tempering parameter = 1). """ self.log_pdf_values = None """ Log pdf values of saved samples from the target. """ self.thermodynamic_integration_results = None """ Results of the thermodynamic integration (see method `evaluate_normalization_constant`). """ self.mcmc_samplers = [] """List of MCMC samplers, one per tempering level. """ for i, temper_param in enumerate(self.tempering_parameters): log_pdf_target = (lambda x, temper_param=temper_param: self.evaluate_log_reference( x) + self.evaluate_log_intermediate(x, temper_param)) self.mcmc_samplers.append(self.samplers[i].__copy__(log_pdf_target=log_pdf_target, concatenate_chains=True, save_log_pdf=save_log_pdf, random_state=self.random_state)) self.logger.info('\nUQpy: Initialization of ' + self.__class__.__name__ + ' algorithm complete.') # If nsamples is provided, run the algorithm if (nsamples is not None) or (nsamples_per_chain is not None): self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain)
[docs] @beartype def run(self, nsamples: PositiveInteger = None, nsamples_per_chain: PositiveInteger = None): """ Run the MCMC algorithm. This function samples from the MCMC chains and appends samples to existing ones (if any). This method leverages the `run_iterations` method specific to each of the samplers. :param nsamples: Number of samples to generate from the target (the same number of samples will be generated for all intermediate distributions). :param nsamples_per_chain: Number of samples per chain to generate from the target. Either `nsamples` or `nsamples_per_chain` must be provided (not both) """ current_state, current_log_pdf = [], [] final_ns_per_chain = 0 for i, mcmc_sampler in enumerate(self.mcmc_samplers): if mcmc_sampler.evaluate_log_target is None and mcmc_sampler.evaluate_log_target_marginals is None: (mcmc_sampler.evaluate_log_target, mcmc_sampler.evaluate_log_target_marginals,) = \ mcmc_sampler._preprocess_target(pdf_=mcmc_sampler.pdf_target, log_pdf_=mcmc_sampler.log_pdf_target, args=mcmc_sampler.args_target) ns, ns_per_chain, current_state_t, current_log_pdf_t = mcmc_sampler._initialize_samples( nsamples=nsamples, nsamples_per_chain=nsamples_per_chain) current_state.append(current_state_t.copy()) current_log_pdf.append(current_log_pdf_t.copy()) if i == 0: final_ns_per_chain = ns_per_chain self.logger.info('UQpy: Running MCMC...') # Run nsims iterations of the MCMC algorithm, starting at current_state while self.mcmc_samplers[0].nsamples_per_chain < final_ns_per_chain: # update the total number of iterations # run one iteration of MCMC algorithms at various temperatures new_state, new_log_pdf = [], [] for t, sampler in enumerate(self.mcmc_samplers): sampler.iterations_number += 1 new_state_t, new_log_pdf_t = sampler.run_one_iteration( current_state[t], current_log_pdf[t]) new_state.append(new_state_t.copy()) new_log_pdf.append(new_log_pdf_t.copy()) # Do sweeps if necessary if self.mcmc_samplers[-1].iterations_number % self.n_iterations_between_sweeps == 0: for i in range(self.n_tempering_parameters - 1): log_accept = (self.mcmc_samplers[i].evaluate_log_target(new_state[i + 1]) + self.mcmc_samplers[i + 1].evaluate_log_target(new_state[i]) - self.mcmc_samplers[i].evaluate_log_target(new_state[i]) - self.mcmc_samplers[i + 1].evaluate_log_target(new_state[i + 1])) for nc, log_accept_chain in enumerate(log_accept): if np.log(self.random_state.rand()) < log_accept_chain: new_state[i][nc], new_state[i + 1][nc] = new_state[i + 1][nc], new_state[i][nc] new_log_pdf[i][nc], new_log_pdf[i + 1][nc] = new_log_pdf[i + 1][nc], new_log_pdf[i][nc] # Update the chain, only if burn-in is over and the sample is not being jumped over # also increase the current number of samples and samples_per_chain if self.mcmc_samplers[-1].iterations_number > self.mcmc_samplers[-1].burn_length and \ (self.mcmc_samplers[-1].iterations_number - self.mcmc_samplers[-1].burn_length) % self.mcmc_samplers[-1].jump == 0: for t, sampler in enumerate(self.mcmc_samplers): sampler.samples[sampler.nsamples_per_chain, :, :] = new_state[t].copy() if self.save_log_pdf: sampler.log_pdf_values[sampler.nsamples_per_chain, :] = new_log_pdf[t].copy() sampler.nsamples_per_chain += 1 sampler.samples_counter += sampler.n_chains self.logger.info('UQpy: MCMC run successfully !') # Concatenate chains maybe if self.mcmc_samplers[-1].concatenate_chains: for t, mcmc_sampler in enumerate(self.mcmc_samplers): mcmc_sampler._concatenate_chains() # Samples connect to posterior samples, i.e. the chain with beta=1. self.intermediate_samples = [sampler.samples for sampler in self.mcmc_samplers] self.samples = self.mcmc_samplers[-1].samples if self.save_log_pdf: self.log_pdf_values = self.mcmc_samplers[-1].log_pdf_values
[docs] @beartype def evaluate_normalization_constant(self, compute_potential, log_Z0: float = None, nsamples_from_p0: int = None): """ Evaluate normalization constant :math:`Z_1`. The function returns an approximation of :math:`Z_1`, and saves intermediate results (value of :math:`\ln{Z_0}`, list of tempering parameters used in integration, and values of the associated expected potentials. :param compute_potential: Function that takes three inputs: :code:`x` (sample points where to evaluate the potential), :code:`log_factor_tempered_values` (values of the log intermediate factors evaluated at points :code:`x`), :code:`temper_param` (tempering parameter) and evaluates the potential: :param log_Z0: Value of :math:`\ln{Z_{0}}` (float), if unknwon, see `nsamples_from_p0`. :param nsamples_from_p0: number of samples from the reference distribution to sample to evaluate :math:`\ln{Z_{0}}`. Used only if input `log_Z0` is not provided. """ if not self.save_log_pdf: raise NotImplementedError('UQpy: the evidence cannot be computed when save_log_pdf is set to False.') if log_Z0 is None and nsamples_from_p0 is None: raise ValueError('UQpy: input log_Z0 or nsamples_from_p0 should be provided.') # compute average of log_target for the target at various temperatures log_pdf_averages = [] for i, (temper_param, sampler) in enumerate(zip(self.tempering_parameters, self.mcmc_samplers)): log_factor_values = sampler.log_pdf_values - self.evaluate_log_reference(sampler.samples) potential_values = compute_potential( x=sampler.samples, temper_param=temper_param, log_intermediate_values=log_factor_values) log_pdf_averages.append(np.mean(potential_values)) # use quadrature to integrate between 0 and 1 temper_param_list_for_integration = np.copy(np.array(self.tempering_parameters)) log_pdf_averages = np.array(log_pdf_averages) int_value = trapezoid(x=temper_param_list_for_integration, y=log_pdf_averages) if log_Z0 is None: samples_p0 = self.distribution_reference.rvs(nsamples=nsamples_from_p0) log_Z0 = np.log(1. / nsamples_from_p0) + logsumexp( self.evaluate_log_intermediate(x=samples_p0, temper_param=self.tempering_parameters[0])) self.thermodynamic_integration_results = { 'log_Z0': log_Z0, 'temper_param_list': temper_param_list_for_integration, 'expect_potentials': log_pdf_averages} return np.exp(int_value + log_Z0)