import logging
from typing import Callable, Tuple, List
import warnings
warnings.filterwarnings('ignore')
import numpy as np
from beartype import beartype
from UQpy.distributions import Distribution
from UQpy.utilities.ValidationTypes import *
from UQpy.utilities.Utilities import process_random_state
from abc import ABC, abstractmethod
[docs]class MCMC(ABC):
@beartype
def __init__(
self,
dimension: Union[None, int] = None,
pdf_target: Union[Callable, list[Callable], None] = None,
log_pdf_target: Union[Callable, list[Callable], None] = None,
args_target: Union[tuple, None] = None,
seed: Union[list, None] = None,
burn_length: Annotated[int, Is[lambda x: x >= 0]] = 0,
jump: PositiveInteger = 1,
n_chains: Union[None, int] = None,
save_log_pdf: bool = False,
concatenate_chains: bool = True,
random_state: RandomStateType = None,
):
"""
Generate samples from arbitrary user-specified probability density function using Markov Chain Monte Carlo.
This is the parent class for all MCMC algorithms. This parent class only provides the framework for
MCMC and cannot be used directly for sampling. Sampling is done by calling the child class for the specific
MCMC algorithm.
:param dimension: A scalar value defining the dimension of target density function. Either *dimension*
and *n_chains* or *seed* must be provided.
:param pdf_target: Target density function from which to draw random samples. Either `pdf_target` or
`log_pdf_target` must be provided (the latter should be preferred for better numerical stability).
If `pdf_target` is a callable, it refers to the joint pdf to sample from, it must take at least one
input :code:`x`, which are the point(s) at which to evaluate the pdf. Within MCMC the `pdf_target` is
evaluated as:
:code:`p(x) = pdf_target(x, *args_target)` where :code:`x` is a ndarray of shape :code:`(nsamples, dimension)`
and `args_target` are additional positional arguments that are provided to MCMC via its
`args_target` input.
If :code:`pdf_target` is a list of callables, it refers to independent marginals to sample from. The marginal
in dimension :code:`j` is evaluated as: :code:`p_j(xj) = pdf_target[j](xj, *args_target[j])` where :code:`x`
is a ndarray of shape :code:`(nsamples, dimension)`
:param log_pdf_target: Logarithm of the target density function from which to draw random samples. Either
`pdf_target` or `log_pdf_target` must be provided (the latter should be preferred for better numerical
stability).
:param args_target: Positional arguments of the pdf / log-pdf target function. See `pdf_target`
:param seed: Seed of the Markov chain(s), shape ``(nchains, dimension)``. Default: ``zeros(nchains,
dimension)``. If `seed` is not provided, both `nchains` and `dimension` must be provided.
:param burn_length: Length of burn-in - i.e., number of samples at the beginning of the chain to discard (note:
no thinning during burn-in). Default is :math:`0`, no burn-in.
:param jump: Thinning parameter, used to reduce correlation between samples. Setting :code:`jump=n` corresponds
to skipping :code:`n-1` states between accepted states of the chain. Default is :math:`1` (no thinning).
:param n_chains: The number of Markov chains to generate. Either `dimension` and `nchains` or `seed` must
be provided.
:param save_log_pdf: Boolean that indicates whether to save log-pdf values along with the samples.
Default: :any:`False`
:param concatenate_chains: Boolean that indicates whether to concatenate the chains after a run, i.e., samples
are stored as an :class:`numpy.ndarray` of shape ``(nsamples * nchains, dimension)`` if :any:`True`,
``(nsamples, nchains, dimension)`` if :any:`False`. Default: :any:`True`
:param random_state: Random seed used to initialize the pseudo-random number generator. Default is :any:`None`.
If an :any:`int` is provided, this sets the seed for an object of :class:`numpy.random.RandomState`. Otherwise,
the object itself can be passed directly.
"""
self.burn_length, self.jump = burn_length, jump
self._initialization_seed = seed
self.seed = self._preprocess_seed(seed=seed, dimensions=dimension, n_chains=n_chains)
self.n_chains, self.dimension = self.seed.shape
self.evaluate_log_target: Callable = None
"""It is a callable that evaluates the log-pdf of the target distribution at a given point **x**"""
self.evaluate_log_target_marginals: Callable = None
"""It is a callable that evaluates the log-pdf of the target marginal distributions at a given point **x**"""
# Check target pdf
self.save_log_pdf = save_log_pdf
self.concatenate_chains = concatenate_chains
self._random_state = random_state
self.random_state = process_random_state(random_state)
self.logger = logging.getLogger(__name__)
self.log_pdf_target = log_pdf_target
self.pdf_target = pdf_target
self.args_target = args_target
# Initialize a few more variables
self.samples: NumpyFloatArray = None
"""Set of MCMC samples following the target distribution, :class:`numpy.ndarray` of shape
:code:`(nsamples * n_chains, dimension)`
or :code:`(nsamples, n_chains, dimension)` (see input `concatenate_chains`)."""
self.log_pdf_values: NumpyFloatArray = None
"""Values of the log pdf for the accepted samples, :class:`numpy.ndarray` of shape
:code:`(n_chains * nsamples,)` or :code:`(nsamples, n_chains)`"""
self.acceptance_rate = [0.0] * self.n_chains
self.samples_counter: int = 0
"""Total number of samples; The :py:attr:`nsamples` attribute tallies the total number of generated samples.
After each iteration, it is updated by :math:`1`. At the end of the simulation, the :py:attr:`nsamples`
attribute equals the user-specified value for input :py:attr:`nsamples` given to the child class."""
self.nsamples_per_chain: int = 0
"""Total number of samples per chain; Similar to the attribute :py:attr:`nsamples`, it is updated during
iterations as new samples are saved."""
self.iterations_number: int = 0 # total nb of iterations, grows if you call run several times
"""Total number of iterations, updated on-the-fly as the algorithm proceeds. It is related to number of samples
as :code:`iterations_number=burn_length+jump*nsamples_per_chain`."""
[docs] def run(self, nsamples: PositiveInteger = None, nsamples_per_chain: int = None):
"""
Run the mcmc algorithm.
This function samples from the mcmc chains and appends samples to existing ones (if any).
This method leverages the :meth:`run_one_iteration` method that is specific to each algorithm.
:param nsamples: Number of samples to generate.
:param nsamples_per_chain: number of samples to generate per chain.
Either `nsamples` or `nsamples_per_chain` must be provided (not both). Not that if `nsamples`
is not a multiple of `n_chains`, `nsamples` is set to the next largest integer that is a multiple of
`n_chains`.
"""
if self.evaluate_log_target is None and self.evaluate_log_target_marginals is None:
(self.evaluate_log_target, self.evaluate_log_target_marginals,) = \
self._preprocess_target(pdf_=self.pdf_target, log_pdf_=self.log_pdf_target, args=self.args_target)
# Initialize the runs: allocate space for the new samples and log pdf values
(final_nsamples, final_nsamples_per_chain, current_state, current_log_pdf,) = self._initialize_samples(
nsamples=nsamples, nsamples_per_chain=nsamples_per_chain)
self.logger.info("UQpy: Running mcmc...")
# Run nsims iterations of the mcmc algorithm, starting at current_state
while self.nsamples_per_chain < final_nsamples_per_chain:
# update the total number of iterations
self.iterations_number += 1
# run iteration
current_state, current_log_pdf = self.run_one_iteration(current_state, current_log_pdf)
# Update the chain, only if burn-in is over and the sample is not being jumped over
# also increase the current number of samples and samples_per_chain
if (self.iterations_number > self.burn_length
and (self.iterations_number - self.burn_length) % self.jump == 0):
self.samples[self.nsamples_per_chain, :, :] = current_state.copy()
if self.save_log_pdf:
self.log_pdf_values[self.nsamples_per_chain, :] = current_log_pdf.copy()
self.nsamples_per_chain += 1
self.samples_counter += self.n_chains
self.logger.info("UQpy: mcmc run successfully !")
# Concatenate chains maybe
if self.concatenate_chains:
self._concatenate_chains()
[docs] def run_one_iteration(self, current_state: np.ndarray, current_log_pdf: np.ndarray):
"""
Run one iteration of the mcmc algorithm, starting at `current_state`.
This method is over-written for each different mcmc algorithm. It must return the new state and
associated log-pdf, which will be passed as inputs to the :meth:`run_one_iteration` method at the next
iteration.
:param current_state: Current state of the chain(s), :class:`numpy.ndarray` of shape ``(n_chains, dimension)``.
:param current_log_pdf: Log-pdf of the current state of the chain(s), :class:`numpy.ndarray` of shape
``(n_chains, )``.
:return: New state of the chain(s) and Log-pdf of the new state of the chain(s)
"""
return [], []
def _concatenate_chains(self):
self.samples = self.samples.reshape((-1, self.dimension), order="C")
if self.save_log_pdf:
self.log_pdf_values = self.log_pdf_values.reshape((-1,), order="C")
return None
def _unconcatenate_chains(self):
self.samples = self.samples.reshape((-1, self.n_chains, self.dimension), order="C")
if self.save_log_pdf:
self.log_pdf_values = self.log_pdf_values.reshape((-1, self.n_chains), order="C")
return None
def _initialize_samples(self, nsamples, nsamples_per_chain):
if ((nsamples is not None) and (nsamples_per_chain is not None)) \
or (nsamples is None and nsamples_per_chain is None):
raise ValueError("UQpy: Either nsamples or nsamples_per_chain must be provided (not both)")
if nsamples_per_chain is None:
if not (isinstance(nsamples, int) and nsamples >= 0):
raise TypeError("UQpy: nsamples must be an integer >= 0.")
nsamples_per_chain = int(np.ceil(nsamples / self.n_chains))
elif not (isinstance(nsamples_per_chain, int) and nsamples_per_chain >= 0):
raise TypeError("UQpy: nsamples_per_chain must be an integer >= 0.")
nsamples = int(nsamples_per_chain * self.n_chains)
if self.samples is None: # very first call of run, set current_state as the seed and initialize self.samples
self.samples = np.zeros((nsamples_per_chain, self.n_chains, self.dimension))
if self.save_log_pdf:
self.log_pdf_values = np.zeros((nsamples_per_chain, self.n_chains))
current_state = np.zeros_like(self.seed)
np.copyto(current_state, self.seed)
current_log_pdf = self.evaluate_log_target(current_state)
if self.burn_length == 0: # if nburn is 0, save the seed, run one iteration less
self.samples[0, :, :] = current_state
if self.save_log_pdf:
self.log_pdf_values[0, :] = current_log_pdf
self.nsamples_per_chain += 1
self.samples_counter += self.n_chains
final_nsamples, final_nsamples_per_chain = (nsamples, nsamples_per_chain,)
else: # fetch previous samples to start the new run, current state is last saved sample
if len(self.samples.shape) == 2: # the chains were previously concatenated
self._unconcatenate_chains()
current_state = self.samples[-1]
current_log_pdf = self.evaluate_log_target(current_state)
self.samples = np.concatenate([self.samples,
np.zeros((nsamples_per_chain, self.n_chains, self.dimension)), ], axis=0, )
if self.save_log_pdf:
self.log_pdf_values = np.concatenate([self.log_pdf_values,
np.zeros((nsamples_per_chain, self.n_chains)), ], axis=0, )
final_nsamples = nsamples + self.samples_counter
final_nsamples_per_chain = (nsamples_per_chain + self.nsamples_per_chain)
return final_nsamples, final_nsamples_per_chain, current_state, current_log_pdf
def _update_acceptance_rate(self, chain_state_acceptance=None):
self.acceptance_rate = [
na / self.iterations_number
+ (self.iterations_number - 1) / self.iterations_number * a
for (na, a) in zip(chain_state_acceptance, self.acceptance_rate)
]
@staticmethod
def _preprocess_target(log_pdf_, pdf_, args):
# log_pdf is provided
if log_pdf_ is not None:
if callable(log_pdf_):
if args is None:
args = ()
evaluate_log_pdf = lambda x: log_pdf_(x, *args)
evaluate_log_pdf_marginals = None
elif isinstance(log_pdf_, list) and (all(callable(p) for p in log_pdf_)):
if args is None:
args = [()] * len(log_pdf_)
if not (isinstance(args, list) and len(args) == len(log_pdf_)):
raise ValueError(
"UQpy: When log_pdf_target is a list, args should be a list (of tuples) of same "
"length."
)
evaluate_log_pdf_marginals = list(map(lambda i: lambda x: log_pdf_[i](x, *args[i]),
range(len(log_pdf_)), ))
evaluate_log_pdf = lambda x: np.sum(
[log_pdf_[i](x[:, i, np.newaxis], *args[i]) for i in range(len(log_pdf_))])
else:
raise TypeError("UQpy: log_pdf_target must be a callable or list of callables")
# pdf is provided
elif pdf_ is not None:
if callable(pdf_):
if args is None:
args = ()
evaluate_log_pdf = lambda x: np.log(np.maximum(pdf_(x, *args), 10 ** (-320) * np.ones((x.shape[0],))))
evaluate_log_pdf_marginals = None
elif isinstance(pdf_, (list, tuple)) and (all(callable(p) for p in pdf_)):
if args is None:
args = [()] * len(pdf_)
if not (isinstance(args, (list, tuple)) and len(args) == len(pdf_)):
raise ValueError(
"UQpy: When pdf_target is given as a list, args should also be a list of same "
"length.")
evaluate_log_pdf_marginals = list(
map(lambda i: lambda x: np.log(np.maximum(pdf_[i](x, *args[i]),
10 ** (-320) * np.ones((x.shape[0],)), )),
range(len(pdf_)), ))
evaluate_log_pdf = lambda x: np.sum([np.log(np.maximum(pdf_[i](x[:, i, np.newaxis], *args[i]),
10 ** (-320) * np.ones((x.shape[0],)), ))
for i in range(len(pdf_))])
else:
raise TypeError("UQpy: pdf_target must be a callable or list of callables")
else:
raise ValueError("UQpy: log_pdf_target or pdf_target should be provided.")
return evaluate_log_pdf, evaluate_log_pdf_marginals
@staticmethod
def _preprocess_seed(seed, dimensions, n_chains):
if seed is None:
if dimensions is None or n_chains is None:
raise ValueError("UQpy: Either `seed` or `dimension` and `nchains` must be provided.")
seed = np.zeros((n_chains, dimensions))
else:
seed = np.atleast_1d(seed)
if len(seed.shape) == 1:
seed = np.reshape(seed, (1, -1))
elif len(seed.shape) > 2:
raise ValueError("UQpy: Input seed should be an array of shape (dimension, ) or (nchains, dimension).")
if dimensions is not None and seed.shape[1] != dimensions:
raise ValueError("UQpy: Wrong dimensions between seed and dimension.")
if n_chains is not None and seed.shape[0] != n_chains:
raise ValueError("UQpy: The number of chains and the seed shape are inconsistent.")
return seed
@staticmethod
def _check_methods_proposal(proposal_distribution):
if not isinstance(proposal_distribution, Distribution):
raise TypeError("UQpy: Proposal should be a Distribution object")
if not hasattr(proposal_distribution, "rvs"):
raise AttributeError("UQpy: The proposal should have an rvs method")
if not hasattr(proposal_distribution, "log_pdf"):
if not hasattr(proposal_distribution, "pdf"):
raise AttributeError("UQpy: The proposal should have a log_pdf or pdf method")
proposal_distribution.log_pdf = lambda x: np.log(
np.maximum(proposal_distribution.pdf(x), 10 ** (-320) * np.ones((x.shape[0],))))
def __copy__(self, **kwargs):
keys = kwargs.keys()
attributes = self.__dict__
import inspect
initializer_parameters = inspect.signature(self.__class__.__init__).parameters.keys()
for key in attributes.keys():
if key not in initializer_parameters:
continue
new_value = attributes[key] if key not in keys else kwargs[key]
if new_value is not None:
kwargs[key] = new_value
if 'seed' in kwargs.keys():
kwargs['seed'] = list(kwargs['seed'])
if 'nsamples_per_chain' in kwargs.keys() and kwargs['nsamples_per_chain'] == 0:
del kwargs['nsamples_per_chain']
return self.__class__(**kwargs)