Source code for UQpy.sampling.mcmc.DRAM

import logging
from typing import Callable
import warnings
warnings.filterwarnings('ignore')
import numpy as np

from beartype import beartype
from UQpy.sampling.mcmc.baseclass.MCMC import MCMC
from UQpy.distributions import *
from UQpy.utilities.ValidationTypes import *


[docs]class DRAM(MCMC): @beartype def __init__( self, pdf_target: Union[Callable, list[Callable]] = None, log_pdf_target: Union[Callable, list[Callable]] = None, args_target: tuple = None, burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0, jump: int = 1, dimension: int = None, seed: list = None, save_log_pdf: bool = False, concatenate_chains: bool = True, initial_covariance: float = None, covariance_update_rate: float = 100, scale_parameter: float = None, delayed_rejection_scale: float = 1 / 5, save_covariance: bool = False, random_state: RandomStateType = None, n_chains: int = None, nsamples: int = None, nsamples_per_chain: int = None, ): """ Delayed Rejection Adaptive Metropolis algorithm :cite:`Dram1` :cite:`MCMC2` In this algorithm, the proposal density is Gaussian and its covariance C is being updated from samples as :code:`C = scale_parameter * C_sample` where :code:`C_sample` is the sample covariance. Also, the delayed rejection scheme is applied, i.e, if a candidate is not accepted another one is generated from the proposal with covariance :code:`delayed_rejection_scale ** 2 * C`. :param pdf_target: Target density function from which to draw random samples. Either `pdf_target` or `log_pdf_target` must be provided (the latter should be preferred for better numerical stability). If `pdf_target` is a callable, it refers to the joint pdf to sample from, it must take at least one input **x**, which are the point(s) at which to evaluate the pdf. Within :class:`.MCMC` the `pdf_target` is evaluated as: :code:`p(x) = pdf_target(x, \*args_target)` where **x** is a :class:`numpy.ndarray of shape :code:`(nsamples, dimension)` and `args_target` are additional positional arguments that are provided to :class:`.MCMC` via its `args_target` input. If `pdf_target` is a list of callables, it refers to independent marginals to sample from. The marginal in dimension :code:`j` is evaluated as: :code:`p_j(xj) = pdf_target[j](xj, \*args_target[j])` where **x** is a :class:`numpy.ndarray` of shape :code:`(nsamples, dimension)` :param log_pdf_target: Logarithm of the target density function from which to draw random samples. Either pdf_target or log_pdf_target must be provided (the latter should be preferred for better numerical stability). Same comments as for input `pdf_target`. :param args_target: Positional arguments of the pdf / log-pdf target function. See `pdf_target` :param burn_length: Length of burn-in - i.e., number of samples at the beginning of the chain to discard (note: no thinning during burn-in). Default is :math:`0`, no burn-in. :param jump: Thinning parameter, used to reduce correlation between samples. Setting :code:`jump=n` corresponds to skipping n-1 states between accepted states of the chain. Default is :math:`1` (no thinning). :param dimension: A scalar value defining the dimension of target density function. Either `dimension` and `n_chains` or `seed` must be provided. :param seed: Seed of the Markov chain(s), shape :code:`(n_chains, dimension)`. Default: :code:`zeros(n_chains x dimension)`. If `seed` is not provided, both `n_chains` and `dimension` must be provided. :param save_log_pdf: Boolean that indicates whether to save log-pdf values along with the samples. Default: :any:`False` :param concatenate_chains: Boolean that indicates whether to concatenate the chains after a run, i.e., samples are stored as a :class:`numpy.ndarray` of shape :code:`(nsamples * n_chains, dimension)` if :any:`True`, :code:`(nsamples, n_chains, dimension)` if :any:`False`. Default: :any:`True` :param n_chains: The number of Markov chains to generate. Either `dimension` and `n_chains` or `seed` must be provided. :param initial_covariance: Initial covariance for the gaussian proposal distribution. Default: I(dim) :param covariance_update_rate: Rate at which covariance is being updated, i.e., every k0 iterations. Default: :math:`100` :param scale_parameter: Scale parameter for covariance updating. Default: :math:`2.38^2/dim` :param delayed_rejection_scale: Scale parameter for delayed rejection. Default: :math:`1/5` :param save_covariance: If :any:`True`, updated covariance is saved in attribute :py:attr:`adaptive_covariance`. Default: :any:`False` :param random_state: Random seed used to initialize the pseudo-random number generator. Default is :any:`None`. :param nsamples: Number of samples to generate. :param nsamples_per_chain: Number of samples to generate per chain. """ self.nsamples = nsamples self.nsamples_per_chain = nsamples_per_chain super().__init__( pdf_target=pdf_target, log_pdf_target=log_pdf_target, args_target=args_target, dimension=dimension, seed=seed, burn_length=burn_length, jump=jump, save_log_pdf=save_log_pdf, concatenate_chains=concatenate_chains, random_state=random_state, n_chains=n_chains, ) self.logger = logging.getLogger(__name__) # Check the initial covariance self.initial_covariance = initial_covariance if self.initial_covariance is None: self.initial_covariance = np.eye(self.dimension) elif not (isinstance(self.initial_covariance, np.ndarray) and self.initial_covariance == (self.dimension, self.dimension)): raise TypeError( "UQpy: Input initial_covariance should be a 2D ndarray of shape (dimension, dimension)") self.covariance_update_rate = covariance_update_rate self.scale_parameter = scale_parameter if self.scale_parameter is None: self.scale_parameter = 2.38 ** 2 / self.dimension self.delayed_rejection_scale = delayed_rejection_scale self.save_covariance = save_covariance for key, typ in zip( [ "covariance_update_rate", "scale_parameter", "delayed_rejection_scale", "save_covariance", ], [int, float, float, bool], ): if not isinstance(getattr(self, key), typ): raise TypeError("Input " + key + " must be of type " + typ.__name__) # initialize the sample mean and sample covariance that you need self.current_covariance = np.tile( self.initial_covariance[np.newaxis, ...], (self.n_chains, 1, 1)) self.sample_mean = np.zeros((self.n_chains, self.dimension,)) self.sample_covariance = np.zeros((self.n_chains, self.dimension, self.dimension)) if self.save_covariance: self.adaptive_covariance = [self.current_covariance.copy(), ] self.logger.info("\nUQpy: Initialization of " + self.__class__.__name__ + " algorithm complete.") if (nsamples is not None) or (nsamples_per_chain is not None): self.run(nsamples=nsamples, nsamples_per_chain=nsamples_per_chain)
[docs] def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarray): """ Run one iteration of the mcmc chain for DRAM algorithm, starting at current state - see :class:`MCMC` class. """ from UQpy.distributions import MultivariateNormal multivariate_normal = MultivariateNormal(mean=np.zeros(self.dimension, ), cov=1.0) # Sample candidate candidate = np.zeros_like(current_state) for nc, current_cov in enumerate(self.current_covariance): multivariate_normal.update_parameters(cov=current_cov) candidate[nc, :] = current_state[nc, :] + \ multivariate_normal.rvs(nsamples=1, random_state=self.random_state) \ .reshape((self.dimension,)) # Compute log_pdf_target of candidate sample log_p_candidate = self.evaluate_log_target(candidate) # Compare candidate with current sample and decide or not to keep the candidate (loop over nc chains) accept_vec = np.zeros((self.n_chains,)) delayed_chains_indices = ([]) # indices of chains that will undergo delayed rejection unif_rvs = (Uniform().rvs(nsamples=self.n_chains, random_state=self.random_state) .reshape((-1,))) for nc, (cand, log_p_cand, log_p_curr) in enumerate( zip(candidate, log_p_candidate, current_log_pdf)): accept = np.log(unif_rvs[nc]) < log_p_cand - log_p_curr if accept: current_state[nc, :] = cand current_log_pdf[nc] = log_p_cand accept_vec[nc] += 1.0 else: # enter delayed rejection delayed_chains_indices.append(nc) # these indices will enter the delayed rejection part # Delayed rejection if delayed_chains_indices: # performed delayed rejection for some chains current_states_delayed = np.zeros( (len(delayed_chains_indices), self.dimension)) candidates_delayed = np.zeros((len(delayed_chains_indices), self.dimension)) candidate2 = np.zeros((len(delayed_chains_indices), self.dimension)) # Sample other candidates closer to the current one for i, nc in enumerate(delayed_chains_indices): current_states_delayed[i, :] = current_state[nc, :] candidates_delayed[i, :] = candidate[nc, :] multivariate_normal.update_parameters( cov=self.delayed_rejection_scale ** 2 * self.current_covariance[nc]) candidate2[i, :] = current_states_delayed[i, :] + \ multivariate_normal.rvs(nsamples=1, random_state=self.random_state) \ .reshape((self.dimension,)) # Evaluate their log_target log_p_candidate2 = self.evaluate_log_target(candidate2) log_prop_cand_cand2 = multivariate_normal.log_pdf(candidates_delayed - candidate2) log_prop_cand_curr = multivariate_normal.log_pdf(candidates_delayed - current_states_delayed) # Accept or reject unif_rvs = (Uniform().rvs(nsamples=len(delayed_chains_indices), random_state=self.random_state).reshape((-1,))) for (nc, cand2, log_p_cand2, j1, j2, u_rv) in zip( delayed_chains_indices, candidate2, log_p_candidate2, log_prop_cand_cand2, log_prop_cand_curr, unif_rvs, ): alpha_cand_cand2 = min(1.0, np.exp(log_p_candidate[nc] - log_p_cand2)) alpha_cand_curr = min(1.0, np.exp(log_p_candidate[nc] - current_log_pdf[nc])) log_alpha2 = (log_p_cand2 - current_log_pdf[nc] + j1 - j2 + np.log(max(1.0 - alpha_cand_cand2, 10 ** (-320))) - np.log(max(1.0 - alpha_cand_curr, 10 ** (-320)))) accept = np.log(u_rv) < min(0.0, log_alpha2) if accept: current_state[nc, :] = cand2 current_log_pdf[nc] = log_p_cand2 accept_vec[nc] += 1.0 # Adaptive part: update the covariance for nc in range(self.n_chains): # update covariance self.sample_mean[nc], self.sample_covariance[nc], = self._recursive_update_mean_covariance( nsamples=self.iterations_number, new_sample=current_state[nc, :], previous_mean=self.sample_mean[nc], previous_covariance=self.sample_covariance[nc], ) if (self.iterations_number > 1) and (self.iterations_number % self.covariance_update_rate == 0): self.current_covariance[nc] = self.scale_parameter * self.sample_covariance[nc] + \ 1e-6 * np.eye(self.dimension) if self.save_covariance and \ ((self.iterations_number > 1) and (self.iterations_number % self.covariance_update_rate == 0)): self.adaptive_covariance.append(self.current_covariance.copy()) # Update the acceptance rate self._update_acceptance_rate(accept_vec) return current_state, current_log_pdf
@staticmethod def _recursive_update_mean_covariance( nsamples, new_sample, previous_mean, previous_covariance=None ): """ Iterative formula to compute a new sample mean and covariance based on previous ones and new sample. New covariance is computed only of previous_covariance is provided. **Inputs:** * n (int): Number of samples used to compute the new mean * new_sample (ndarray (dim, )): new sample * previous_mean (ndarray (dim, )): Previous sample mean, to be updated with new sample value * previous_covariance (ndarray (dim, dim)): Previous sample covariance, to be updated with new sample value **Output/Returns:** * new_mean (ndarray (dim, )): Updated sample mean * new_covariance (ndarray (dim, dim)): Updated sample covariance """ new_mean = (nsamples - 1) / nsamples * previous_mean + 1 / nsamples * new_sample if previous_covariance is None: return new_mean dimensions = new_sample.size if nsamples == 1: new_covariance = np.zeros((dimensions, dimensions)) else: delta_n = (new_sample - previous_mean).reshape((dimensions, 1)) new_covariance = (nsamples - 2) / (nsamples - 1) \ * previous_covariance + 1 / nsamples * np.matmul(delta_n, delta_n.T) return new_mean, new_covariance