import logging
from typing import Union, Annotated
from beartype import beartype
from beartype.vale import Is
from UQpy.distributions import *
import numpy as np
import scipy.stats as stats
from UQpy.utilities.Utilities import nearest_psd, calculate_gauss_quadrature_2d
from UQpy.utilities.Utilities import bi_variate_normal_pdf
from scipy.linalg import cholesky
from UQpy.utilities.ValidationTypes import PositiveInteger, NumpyFloatArray
DistributionList = Annotated[
list[Distribution],
Is[lambda lst: all(isinstance(item, Distribution) for item in lst)],
]
[docs]class Nataf:
@beartype
def __init__(
self,
distributions: Union[Distribution, DistributionList],
samples_x: Union[None, np.ndarray] = None,
samples_z: Union[None, np.ndarray] = None,
jacobian: bool = False,
corr_z: Union[None, np.ndarray] = None,
corr_x: Union[None, np.ndarray] = None,
itam_beta: Union[float, int] = 1.0,
itam_threshold1: Union[float, int] = 0.001,
itam_threshold2: Union[float, int] = 0.1,
itam_max_iter: int = 100,
n_gauss_points: int = 128
):
"""
Transform random variables using the Nataf or Inverse Nataf transformation
:param distributions: Probability distribution of each random variable. Must be an object of type
:class:`.DistributionContinuous1D` or :class:`.JointIndependent`.
:param samples_x: Random vector of shape ``(nsamples, n_dimensions)`` with prescribed probability
distributions.
If `samples_x` is provided, the :class:`.Nataf` class transforms them to `samples_z`.
:param samples_z: Standard normal random vector of shape ``(nsamples_, n_dimensions)``
If `samples_z` is provided, the :class:`.Nataf` class transforms them to `samples_x`.
:param jacobian: A boolean whether to return the jacobian of the transformation. Default: :any:`False`
:param corr_z: The correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal random vector **Z** .
Default: The identity matrix.
If ``corr_z`` is specified, the :class:`.Nataf` class computes the correlation distortion integral above to
solve for ``corr_x``.
:param corr_x: The correlation matrix (:math:`\mathbf{C_X}`) of the random vector **X** .
Default: The identity matrix.
If ``corr_x`` is specified, the :class:`.Nataf` class invokes the ITAM to compute ``corr_z``.
:param itam_beta: A parameter selected to optimize convergence speed and desired accuracy of the ITAM method.
Default: :math:`1.0`
:param itam_threshold1: If ``corr_x`` is specified, this specifies the threshold value for the error in the
`ITAM` method (see :py:mod:`.utilities` module) given by
:math:`\epsilon_1 = ||\mathbf{C_X}^{target} - \mathbf{C_X}^{computed}||`
Default: :math:`0.001`
:param itam_threshold2: If ``corr_x`` is specified, this specifies the threshold value for the error difference
between iterations in the `ITAM` method (see :py:mod:`.utilities` module) given by
:math:`\epsilon_1^{i} - \epsilon_1^{i-1}`
for iteration :math:`i`.
Default: :math:`0.01`
:param itam_max_iter: Maximum number of iterations for the ITAM method. Default: :math:`100`
:param n_gauss_points: The number of integration points used for the numerical integration of the
correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal random vector **Z**
"""
self.n_dimensions = 0
if isinstance(distributions, list):
for i in range(len(distributions)):
self.update_dimensions(distributions[i])
else:
self.update_dimensions(distributions)
self.dist_object = distributions
self.samples_x: NumpyFloatArray = samples_x
"""Random vector of shape ``(nsamples, n_dimensions)`` with prescribed probability distributions."""
self.samples_z: NumpyFloatArray = samples_z
"""Standard normal random vector of shape ``(nsamples, n_dimensions)``"""
self.jacobian = jacobian
self.jzx: NumpyFloatArray = None
"""The Jacobian of the transformation of shape ``(n_dimensions, n_dimensions)``."""
self.jxz: NumpyFloatArray = None
"""The Jacobian of the transformation of shape ``(n_dimensions, n_dimensions)``."""
self.itam_max_iter = itam_max_iter
self.itam_beta = float(itam_beta)
self.itam_threshold1 = float(itam_threshold1)
self.itam_threshold2 = float(itam_threshold2)
self.logger = logging.getLogger(__name__)
if corr_x is None and corr_z is None:
self.corr_x: NumpyFloatArray = np.eye(self.n_dimensions)
"""Distorted correlation matrix (:math:`\mathbf{C_X}`) of the random vector **X**."""
self.corr_z: NumpyFloatArray = np.eye(self.n_dimensions)
"""Distorted correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal vector **Z**."""
elif corr_x is not None:
self.corr_x = corr_x
if np.all(np.equal(self.corr_x, np.eye(self.n_dimensions))):
self.corr_z = self.corr_x
elif all(isinstance(x, Normal) for x in distributions):
self.corr_z = self.corr_x
else:
self.corr_z, self.itam_error1, self.itam_error2 = \
self.itam(self.dist_object, self.corr_x, self.itam_max_iter, self.itam_beta,
self.itam_threshold1, self.itam_threshold2, )
elif corr_z is not None:
self.corr_z = corr_z
if np.all(np.equal(self.corr_z, np.eye(self.n_dimensions))):
self.corr_x = self.corr_z
elif all(isinstance(x, Normal) for x in distributions):
self.corr_x = self.corr_z
else:
self.corr_x = self.distortion_z2x(self.dist_object, self.corr_z, n_gauss_points=n_gauss_points)
self.H: NumpyFloatArray = cholesky(self.corr_z, lower=True)
"""The lower triangular matrix resulting from the Cholesky decomposition of the correlation matrix
:math:`\mathbf{C_Z}`."""
if self.samples_x is not None or self.samples_z is not None:
self.run(self.samples_x, self.samples_z, self.jacobian)
[docs] @beartype
def run(
self,
samples_x: Union[None, np.ndarray] = None,
samples_z: Union[None, np.ndarray] = None,
jacobian: bool = False,
):
"""
Execute the Nataf transformation or its inverse.
If `samples_x` is provided, the :meth:`run` method performs the Nataf transformation. If `samples_z` is
provided, the :meth:`run` method performs the inverse Nataf transformation.
:param samples_x: Random vector **X** with prescribed probability distributions or standard normal random
vector **Z** of shape :code:`(nsamples, n_dimensions)`.
:param samples_z: Standard normal random vector of shape ``(nsamples, n_dimensions)``
If `samples_z` is provided, the :class:`.Nataf` class transforms them to `samples_x`.
:param jacobian: The jacobian of the transformation of shape ``(n_dimensions, n_dimensions)``. Default:
:any:`False`
"""
self.jacobian = jacobian
if samples_x is not None:
if len(samples_x.shape) != 2:
self.samples_x = np.atleast_2d(samples_x).T
else:
self.samples_x = samples_x
if not self.jacobian:
self.samples_z = self._transform_x2z(self.samples_x)
elif self.jacobian:
self.samples_z, self.jxz = self._transform_x2z(self.samples_x, jacobian=self.jacobian)
if samples_z is not None:
if len(samples_z.shape) != 2:
self.samples_z = np.atleast_2d(samples_z).T
else:
self.samples_z = samples_z
if not self.jacobian:
self.samples_x = self._transform_z2x(self.samples_z)
elif self.jacobian:
self.samples_x, self.jzx = self._transform_z2x(self.samples_z, jacobian=self.jacobian)
[docs] @staticmethod
def itam(
distributions: Union[
DistributionContinuous1D,
JointIndependent,
list[Union[DistributionContinuous1D, JointIndependent]]],
corr_x,
itam_max_iter: int = 100,
itam_beta: Union[float, int] = 1.0,
itam_threshold1: Union[float, int] = 0.001,
itam_threshold2: Union[float, int] = 0.01,
):
"""
Calculate the correlation matrix :math:`\mathbf{C_Z}` of the standard normal random vector
:math:`\mathbf{Z}` given the correlation matrix :math:`\mathbf{C_X}` of the random vector :math:`\mathbf{X}`
using the `ITAM` method :cite:`StochasticProcess13`.
:param distributions: Probability distribution of each random variable. Must be an object of type
:class:`.DistributionContinuous1D` or :class:`.JointIndependent`.
`distributions` must have a ``cdf`` method.
:param corr_x: The correlation matrix (:math:`\mathbf{C_X}`) of the random vector **X** .
Default: The ``identity`` matrix.
:param itam_max_iter: Maximum number of iterations for the ITAM method. Default: :math:`100`
:param itam_beta: A parameters selected to optimize convergence speed and desired accuracy of the ITAM method
(see :cite:`Nataf2`). Default: :math:`1.0`
:param itam_threshold1: A threshold value for the relative difference between the non-Gaussian correlation
function and the underlying Gaussian. Default: :math:`0.001`
:param itam_threshold2: If ``corr_x`` is specified, this specifies the threshold value for the error difference
between iterations in the `ITAM` method (see :py:mod:`.utilities` module) given by:
:math:`\epsilon_1^{i} - \epsilon_1^{i-1}`
for iteration :math:`i`. Default: :math:`0.01`
:return: Distorted correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal vector **Z**,
List of ITAM errors for each iteration, List of ITAM difference errors for each iteration
"""
# Initial Guess
corr_z0 = corr_x
corr_z = np.zeros_like(corr_z0)
# Iteration Condition
itam_error1 = []
itam_error2 = []
itam_error1.append(100.0)
itam_error2.append(abs(itam_error1[0] - 0.1) / 0.1)
logger = logging.getLogger(__name__)
logger.info("UQpy: Initializing Iterative Translation Approximation Method (ITAM)")
for k in range(itam_max_iter):
error0 = itam_error1[k]
corr0 = Nataf.distortion_z2x(distributions, corr_z0)
max_ratio = np.amax(np.ones((len(corr_x), len(corr_x))) / abs(corr_z0))
corr_z = np.nan_to_num((corr_x / corr0) ** itam_beta * corr_z0)
# Do not allow off-diagonal correlations to equal or exceed one
corr_z[corr_z < -1.0] = (max_ratio + 1) / 2 * corr_z0[corr_z < -1.0]
corr_z[corr_z > 1.0] = (max_ratio + 1) / 2 * corr_z0[corr_z > 1.0]
corr_z = np.array(nearest_psd(corr_z))
corr_z0 = corr_z.copy()
itam_error1.append(np.linalg.norm(corr_x - corr0))
itam_error2.append(abs(itam_error1[-1] - error0) / error0)
logger.info("UQpy: ITAM iteration number %i", k)
logger.info("UQpy: Current error, %e", (itam_error1[-1], itam_error2[-1]))
if itam_error1[k] <= itam_threshold1 and itam_error2[k] <= itam_threshold2:
break
logger.info("UQpy: ITAM Done.")
return corr_z, itam_error1, itam_error2
[docs] @staticmethod
def distortion_z2x(distributions: Union[Distribution, list[Distribution]], corr_z: np.ndarray,
n_gauss_points: int = 1024):
"""
This is a method to calculate the correlation matrix :math:`\mathbf{C_x}` of the random vector
:math:`\mathbf{x}` given the correlation matrix :math:`\mathbf{C_z}` of the standard normal random vector
:math:`\mathbf{z}`.
:param distributions: This is a method to calculate the correlation matrix :math:`\mathbf{C_x}` of the random vector
:math:`\mathbf{x}` given the correlation matrix :math:`\mathbf{C_z}` of the standard normal random vector
:math:`\mathbf{z}`.
This method is part of the :class:`.Nataf` class.
:param corr_z: The correlation matrix (:math:`\mathbf{C_z}`) of the standard normal vector **Z** .
Default: The ``identity`` matrix.
:param n_gauss_points: The number of integration points used for the numerical integration of the
correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal random vector **Z**
:return: Distorted correlation matrix (:math:`\mathbf{C_X}`) of the random vector **X**.
"""
logger = logging.getLogger(__name__)
z_max = 8
z_min = -z_max
ng = n_gauss_points
eta, w2d, xi = calculate_gauss_quadrature_2d(ng, z_max, z_min)
corr_x = np.ones_like(corr_z)
logger.info("UQpy: Computing Nataf correlation distortion...")
is_joint = isinstance(distributions, JointIndependent)
marginals = distributions.marginals if is_joint else distributions
corr_x = Nataf.calculate_corr_x(corr_x, corr_z, marginals, eta, w2d, xi, is_joint)
return corr_x
@staticmethod
def calculate_corr_x(corr_x, corr_z, marginals, eta, w2d, xi, is_joint):
if all(hasattr(m, "moments") for m in marginals) and all(
hasattr(m, "icdf") for m in marginals
):
for i in range(len(marginals)):
i_cdf_i = marginals[i].icdf
mi = marginals[i].moments()
if not (np.isfinite(mi[0]) and np.isfinite(mi[1])):
raise RuntimeError("UQpy: The marginal distributions need to have finite mean and variance.")
for j in range(i + 1, len(marginals)):
i_cdf_j = marginals[j].icdf
mj = marginals[j].moments()
if not (np.isfinite(mj[0]) and np.isfinite(mj[1])):
raise RuntimeError("UQpy: The marginal distributions need to have finite mean and variance.")
term1 = mj[0] ** 2 if is_joint else mj[0]
term2 = mi[0] ** 2 if is_joint else mi[0]
tmp_f_xi = i_cdf_j(np.atleast_2d(stats.norm.cdf(xi)).T) - term1
tmp_f_eta = i_cdf_i(np.atleast_2d(stats.norm.cdf(eta)).T) - term2
phi2 = bi_variate_normal_pdf(xi, eta, corr_z[i, j])
corr_x[i, j] = (1 / (np.sqrt(mj[1]) * np.sqrt(mi[1])) * np.sum(tmp_f_xi * tmp_f_eta * w2d * phi2))
corr_x[j, i] = corr_x[i, j]
return corr_x
def _transform_x2z(self, samples_x, jacobian=False):
"""
This is a method to transform a vector :math:`\mathbf{x}` of samples with marginal distributions
:math:`f_i(x_i)` and cumulative distributions :math:`F_i(x_i)` to a vector :math:`\mathbf{z}` of standard normal
samples according to: :math:`Z_{i}=\Phi^{-1}(F_i(X_{i}))`, where :math:`\Phi` is the cumulative
distribution function of a standard normal variable.
:param samples_x: Random vector of shape ``(nsamples, n_dimensions)`` with prescribed probability distributions.
:param jacobian: A boolean whether to return the jacobian of the transformation.
Default: False
:return: Standard normal random vector of shape ``(nsamples, n_dimensions)``, the jacobian of the transformation
of shape ``(n_dimensions, n_dimensions)``.
"""
m, n = np.shape(samples_x)
samples_z = None
if isinstance(self.dist_object, JointIndependent):
if all(hasattr(m, "cdf") for m in self.dist_object.marginals):
samples_z = np.zeros_like(samples_x)
for j in range(len(self.dist_object.marginals)):
samples_z[:, j] = stats.norm.ppf(self.dist_object.marginals[j].cdf(samples_x[:, j]))
elif isinstance(self.dist_object, DistributionContinuous1D):
samples_z = stats.norm.ppf(self.dist_object.cdf(samples_x))
else:
samples_z = np.zeros_like(samples_x)
for j in range(n):
samples_z[:, j] = stats.norm.ppf(
self.dist_object[j].cdf(samples_x[:, j])
)
if not jacobian:
return samples_z
jac = np.zeros(shape=(n, n))
jxz = [None] * m
for i in range(m):
for j in range(n):
xi = np.array([samples_x[i, j]])
zi = np.array([samples_z[i, j]])
jac[j, j] = stats.norm.pdf(zi) / self.dist_object[j].pdf(xi)
jxz[i] = np.linalg.solve(jac, self.H)
return samples_z, jxz
def _transform_z2x(self, samples_z, jacobian=False):
"""
This is a method to transform a standard normal vector :math:`\mathbf{z}` to a vector
:math:`\mathbf{x}` of samples with marginal distributions :math:`f_i(x_i)` and cumulative distributions
:math:`F_i(x_i)` to samples according to: :math:`Z_{i}=\Phi^{-1}(F_i(X_{i}))`, where :math:`\Phi` is the
cumulative distribution function of a standard normal variable.
:param samples_z: Standard normal random vector of shape ``(nsamples, n_dimensions)``
:param jacobian: A boolean whether to return the jacobian of the transformation. Default: False
:return: Random vector of shape ``(nsamples, n_dimensions)`` with prescribed probability distributions,
The jacobian of the transformation of shape ``(n_dimensions, n_dimensions)``.
"""
m, n = np.shape(samples_z)
h = cholesky(self.corr_z, lower=True)
# samples_z = (h @ samples_y.T).T
samples_x = np.zeros_like(samples_z)
if isinstance(self.dist_object, JointIndependent):
if all(hasattr(m, "icdf") for m in self.dist_object.marginals):
for j in range(len(self.dist_object.marginals)):
samples_x[:, j] = self.dist_object.marginals[j].icdf(stats.norm.cdf(samples_z[:, j]))
elif isinstance(self.dist_object, DistributionContinuous1D):
samples_x = self.dist_object.icdf(stats.norm.cdf(samples_z))
elif isinstance(self.dist_object, list):
for j in range(samples_x.shape[1]):
samples_x[:, j] = self.dist_object[j].icdf(stats.norm.cdf(samples_z[:, j]))
if not jacobian:
return samples_x
jac = np.zeros(shape=(n, n))
jzx = [None] * m
for i in range(m):
for j in range(n):
xi = np.array([samples_x[i, j]])
zi = np.array([samples_z[i, j]])
jac[j, j] = self.dist_object[j].pdf(xi) / stats.norm.pdf(zi)
jzx[i] = np.linalg.solve(h, jac)
return samples_x, jzx
[docs] @beartype
def rvs(self, nsamples: PositiveInteger):
"""
Generate realizations from the joint pdf of the random vector **X**.
:param nsamples: Number of samples to generate.
:return: Random vector in the parameter space of shape ``(nsamples, n_dimensions)``.
"""
h = cholesky(self.corr_z, lower=True)
n = int(nsamples)
m = np.size(self.dist_object)
y = np.random.randn(nsamples, m)
z = np.dot(h, y.T).T
samples_x = np.zeros([n, m])
for i in range(m):
samples_x[:, i] = self.dist_object[i].icdf(stats.norm.cdf(z[:, i]))
return samples_x
def update_dimensions(self, dist_object):
if isinstance(dist_object, DistributionContinuous1D):
self.n_dimensions += 1
elif isinstance(dist_object, JointIndependent):
self.n_dimensions += len(dist_object.marginals)
else:
raise TypeError("UQpy: A ``DistributionContinuous1D`` or ``JointIndependent`` object must be provided.")