"""This module contains functionality for sensitivity analysis in ``UQpy``.
The module currently contains the following classes:
- ``Morris``: Class to compute sensitivity indices based on the Morris method.
"""
from typing import Union, Annotated
from beartype import beartype
from beartype.vale import Is
from UQpy.utilities.Utilities import process_random_state
from UQpy.utilities.ValidationTypes import RandomStateType, PositiveInteger, NumpyFloatArray
from UQpy.distributions import *
from UQpy.run_model.RunModel import RunModel
import numpy as np
from scipy.stats import randint
[docs]class MorrisSensitivity:
@beartype
def __init__(
self,
runmodel_object: RunModel,
distributions: Union[JointIndependent, Union[list, tuple]],
n_levels: Annotated[int, Is[lambda x: x >= 3]],
delta: Union[float, int] = None,
random_state: RandomStateType = None,
n_trajectories: PositiveInteger = None,
maximize_dispersion: bool = False,
):
"""
Compute sensitivity indices based on the Morris screening method.
:param runmodel_object: The computational model. It should be of type :class:`.RunModel`. The
output QoI can be a scalar or vector of length :code:`ny`, then the sensitivity indices of all :code:`ny` outputs are
computed independently.
:param distributions: List of :class:`.Distribution` objects corresponding to each random variable, or
:class:`.JointIndependent` object (multivariate RV with independent marginals).
:param n_levels: Number of levels that define the grid over the hypercube where evaluation points are
sampled. Must be an integer :math:`\ge 3`.
:param delta: Size of the jump between two consecutive evaluation points, must be a multiple of delta should be
in :code:`{1/(n_levels-1), ..., 1-1/(n_levels-1)}`.
Default: :math:`delta=\\frac{levels\_number}{2 * (levels\_number-1)}` if `n_levels` is even,
:math:`delta=0.5` if n_levels is odd.
:param random_state: Random seed used to initialize the pseudo-random number generator. Default is :any:`None`.
:param n_trajectories: Number of random trajectories, usually chosen between :math:`5` and :math:`10`.
The number of model evaluations is :code:`n_trajectories * (d+1)`. If None, the `Morris` object is created
but not run (see :py:meth:`run` method)
:param maximize_dispersion: If :any:`True`, generate a large number of design trajectories and keep the ones
that maximize dispersion between all trajectories, allows for a better coverage of the input space.
Default :any:`False`.
"""
# Check RunModel object and distributions
self.runmodel_object = runmodel_object
marginals = (distributions.marginals if isinstance(distributions, JointIndependent) else distributions)
self.icdfs = [getattr(dist, "icdf", None) for dist in marginals]
if any(icdf is None for icdf in self.icdfs):
raise ValueError("At least one of the distributions provided has a None icdf")
self.dimension = len(self.icdfs)
if self.dimension != len(self.runmodel_object.model.var_names):
raise ValueError("The number of distributions provided does not match the number of RunModel variables")
self.n_levels = n_levels
self.delta = delta
self.check_levels_delta()
self.random_state = process_random_state(random_state)
self.maximize_dispersion = maximize_dispersion
self.trajectories_unit_hypercube: NumpyFloatArray = None
"""Trajectories in the unit hypercube, :class:`numpy.ndarray` of shape :code:`(n_trajectories, d+1, d)`"""
self.trajectories_physical_space: NumpyFloatArray = None
"""Trajectories in the physical space, :class:`numpy.ndarray` of shape :code:`(n_trajectories, d+1, d)`"""
self.elementary_effects: NumpyFloatArray = None
"""Elementary effects :math:`EE_{k}`, :class:`numpy.ndarray` of shape :code:`(n_trajectories, d, ny)`."""
self.mustar_indices: NumpyFloatArray = None
"""First Morris sensitivity index :math:`\mu_{k}^{\star}`, :class:`numpy.ndarray` of shape :code:`(d, ny)`"""
self.sigma_indices: NumpyFloatArray = None
"""Second Morris sensitivity index :math:`\sigma_{k}`, :class:`numpy.ndarray` of shape :code:`(d, ny)`"""
if n_trajectories is not None:
self.run(n_trajectories)
def check_levels_delta(self):
# delta should be in {1/(nlevels-1), ..., 1-1/(nlevels-1)}
if (self.delta is None) and (self.n_levels % 2) == 0:
# delta = trial_probability / (2 * (trial_probability-1))
self.delta = self.n_levels / (2 * (self.n_levels - 1))
elif (self.delta is None) and (self.n_levels % 2) == 1:
self.delta = (1 / 2) # delta = (trial_probability-1) / (2 * (trial_probability-1))
elif not (isinstance(self.delta, (int, float)) and float(self.delta)
in [float(j / (self.n_levels - 1)) for j in range(1, self.n_levels - 1)]):
raise ValueError("UQpy: delta should be in {1/(nlevels-1), ..., 1-1/(nlevels-1)}")
[docs] @beartype
def run(self, n_trajectories: PositiveInteger):
"""
Run the Morris indices evaluation.
The code first sample trajectories in the unit hypercube and transform them to the physical space (see method
:py:meth:`sample_trajectories`), then runs the forward model to compute the elementary effects,
and finally computes the sensitivity indices.
:param n_trajectories: Number of random trajectories. Usually chosen between :math:`5` and :math:`10`.
The number of model evaluations is :code:`n_trajectories * (d+1)`.
"""
# Compute trajectories and elementary effects - append if any already exist
(trajectories_unit_hypercube, trajectories_physical_space,) = \
self.sample_trajectories(n_trajectories=n_trajectories, maximize_dispersion=self.maximize_dispersion,)
elementary_effects = self._compute_elementary_effects(trajectories_physical_space)
self.store_data(elementary_effects, trajectories_physical_space, trajectories_unit_hypercube)
self.mustar_indices, self.sigma_indices = self._compute_indices(self.elementary_effects)
def store_data(
self,
elementary_effects,
trajectories_physical_space,
trajectories_unit_hypercube,
):
if self.elementary_effects is None:
self.elementary_effects = elementary_effects
self.trajectories_unit_hypercube = trajectories_unit_hypercube
self.trajectories_physical_space = trajectories_physical_space
else:
self.elementary_effects = np.concatenate([self.elementary_effects, elementary_effects], axis=0)
self.trajectories_unit_hypercube = np.concatenate([self.trajectories_unit_hypercube,
trajectories_unit_hypercube], axis=0)
self.trajectories_physical_space = np.concatenate([self.trajectories_physical_space,
trajectories_physical_space], axis=0)
[docs] @beartype
def sample_trajectories(self, n_trajectories: PositiveInteger, maximize_dispersion: bool = False):
"""
Create the trajectories, first in the unit hypercube then transform them in the physical space.
:param n_trajectories: Number of random trajectories. Usually chosen between :math:`5` and :math:`10`.
The number of model evaluations is :code:`n_trajectories * (d+1)`.
:param maximize_dispersion: If :any:`True`, generate a large number of design trajectories and keep the ones
that maximize dispersion between all trajectories, allows for a better coverage of the input space.
Default :any:`False`.
"""
trajectories_unit_hypercube = []
perms_indices = []
ntrajectories_all = (10 * n_trajectories if maximize_dispersion else 1 * n_trajectories)
for r in range(ntrajectories_all):
if self.random_state is None:
perms = np.random.permutation(self.dimension)
else:
perms = self.random_state.permutation(self.dimension)
initial_state = (1.0 / (self.n_levels - 1) *
randint(low=0, high=int((self.n_levels - 1) * (1 - self.delta) + 1))
.rvs(size=(1, self.dimension), random_state=self.random_state))
trajectory_uh = np.tile(initial_state, [self.dimension + 1, 1])
for count_d, d in enumerate(perms):
trajectory_uh[count_d + 1:, d] = initial_state[0, d] + self.delta
trajectories_unit_hypercube.append(trajectory_uh)
perms_indices.append(perms)
trajectories_unit_hypercube = np.array(trajectories_unit_hypercube) # ndarray (r, d+1, d)
# if maximize_dispersion, compute the 'best' trajectories
if maximize_dispersion:
from itertools import combinations
distances = np.zeros((ntrajectories_all, ntrajectories_all))
for r in range(ntrajectories_all):
des_r = np.tile(trajectories_unit_hypercube[r, :, :][np.newaxis, :, :],[self.dimension + 1, 1, 1],)
for r2 in range(r + 1, ntrajectories_all):
des_r2 = np.tile(trajectories_unit_hypercube[r2, :, :][:, np.newaxis, :],
[1, self.dimension + 1, 1],)
distances[r, r2] = np.sum(np.sqrt(np.sum((des_r - des_r2) ** 2, axis=-1)))
# try 20000 combinations of ntrajectories trajectories, keep the one that maximizes the distance
def compute_combi_and_dist():
if self.random_state is None:
combi = np.random.choice(ntrajectories_all, replace=False, size=n_trajectories)
else:
combi = self.random_state.choice(ntrajectories_all, replace=False, size=n_trajectories)
dist_combi = 0.0
for pairs in list(combinations(combi, 2)):
dist_combi += distances[min(pairs), max(pairs)] ** 2
return combi, np.sqrt(dist_combi)
comb_to_keep, dist_comb = compute_combi_and_dist()
for _ in range(1, 20000):
comb, new_dist_comb = compute_combi_and_dist()
if new_dist_comb > dist_comb:
comb_to_keep, dist_comb = comb, new_dist_comb
trajectories_unit_hypercube = np.array([trajectories_unit_hypercube[j] for j in comb_to_keep])
# Avoid 0 and 1 cdf values
trajectories_unit_hypercube[trajectories_unit_hypercube < 0.01] = 0.01
trajectories_unit_hypercube[trajectories_unit_hypercube > 0.99] = 0.99
# Transform to physical space via icdf
trajectories_physical_space = []
for trajectory_uh in trajectories_unit_hypercube:
trajectory_ps = np.zeros_like(trajectory_uh)
for count_d, (design_d, icdf_d) in enumerate(zip(trajectory_uh.T, self.icdfs)):
trajectory_ps[:, count_d] = icdf_d(x=design_d)
trajectories_physical_space.append(trajectory_ps)
trajectories_physical_space = np.array(trajectories_physical_space)
return trajectories_unit_hypercube, trajectories_physical_space
def _compute_elementary_effects(self, trajectories_physical_space):
r, _, d = trajectories_physical_space.shape
# Run the model for all replicates
elementary_effects = []
for samples in trajectories_physical_space:
self.runmodel_object.run(samples=samples, append_samples=False)
qoi = np.array(self.runmodel_object.qoi_list)
el_effect = np.zeros((self.dimension,))
perms = [np.argwhere(bi != 0.0)[0, 0] for bi in (samples[1:] - samples[:-1])]
for count_d, d in enumerate(perms):
el_effect[d] = (qoi[count_d + 1] - qoi[count_d]) / self.delta
elementary_effects.append(el_effect)
return np.array(elementary_effects)
@staticmethod
def _compute_indices(elementary_effects):
# elementary_effects is an array of shape (r, d, ny) or (r, d)
mu_star = np.mean(np.abs(elementary_effects), axis=0)
mu = np.mean(elementary_effects, axis=0)
sigma = np.sqrt(np.mean((elementary_effects - mu) ** 2, axis=0))
return mu_star, sigma