Source code for UQpy.reliability.taylor_series.FORM

import logging
from typing import Union
import numpy as np
import scipy.stats as stats
from beartype import beartype

from UQpy.run_model.RunModel import RunModel
from UQpy.transformations import *
from UQpy.distributions import *
from UQpy.reliability.taylor_series.baseclass.TaylorSeries import TaylorSeries
from UQpy.utilities.ValidationTypes import PositiveInteger
from UQpy.transformations import Decorrelate
import warnings

warnings.filterwarnings('ignore')


[docs]class FORM(TaylorSeries): @beartype def __init__( self, distributions: Union[None, Distribution, list[Distribution]], runmodel_object: RunModel, seed_x: Union[list, np.ndarray] = None, seed_u: Union[list, np.ndarray] = None, df_step: Union[int, float] = 0.01, corr_x: Union[list, np.ndarray] = None, corr_z: Union[list, np.ndarray] = None, n_iterations: PositiveInteger = 100, tolerance_u: Union[float, int, None] = 1e-3, tolerance_beta: Union[float, int, None] = 1e-3, tolerance_gradient: Union[float, int, None] = 1e-3, ): """ A class perform the First Order reliability Method. Each time :meth:`run` is called the results are appended. This is a child class of the :class:`.TaylorSeries` class. :param distributions: Marginal probability distributions of each random variable. Must be an object of type :class:`.DistributionContinuous1D` or :class:`.JointIndependent`. :param runmodel_object: The computational model. It should be of type :class:`RunModel`. :param seed_x: The initial starting point in the parameter space **X** for the `Hasofer-Lind` algorithm. Either `seed_u` or `seed_x` must be provided. If either `seed_u` or `seed_x` is provided, then the :py:meth:`run` method will be executed automatically. Otherwise, the the :py:meth:`run` method must be executed by the user. :param seed_u: The initial starting point in the uncorrelated standard normal space **U** for the `Hasofer-Lind` algorithm. Either `seed_u` or `seed_x` must be provided. Default: :code:`seed_u = (0, 0, ..., 0)` If either `seed_u` or `seed_x` is provided, then the :py:meth:`run` method will be executed automatically. Otherwise, the the :py:meth:`run` method must be executed by the user. :param df_step: Finite difference step in standard normal space. Default: :math:`0.01` :param corr_x: Covariance matrix If `corr_x` is provided, it is the correlation matrix (:math:`\mathbf{C_X}`) of the random vector **X** . :param corr_z: Covariance matrix If `corr_z` is provided, it is the correlation matrix (:math:`\mathbf{C_Z}`) of the standard normal random vector **Z** . Default: `corr_z` is specified as the identity matrix. :param n_iterations: Maximum number of iterations for the `HLRF` algorithm. Default: :math:`100` :param tolerance_u: Convergence threshold for criterion :math:`||\mathbf{U}^{k} - \mathbf{U}^{k-1}||_2 \leq` :code:`tolerance_u` of the `HLRF` algorithm. Default: :math:`1.0e-3` :param tolerance_beta: Convergence threshold for criterion :math:`||\\beta_{HL}^{k}-\\beta_{HL}^{k-1}||_2 \leq` :code:`tolerance_beta` of the `HLRF` algorithm. Default: :math:`1.0e-3` :param tolerance_gradient: Convergence threshold for criterion :math:`||\\nabla G(\mathbf{U}^{k})- \\nabla G(\mathbf{U}^{k-1})||_2 \leq` :code:`tolerance_gradient` of the `HLRF` algorithm. Default: :math:`1.0e-3` By default, all three tolerances must be satisfied for convergence. Specifying a tolerance as :code:`None` will ignore that criteria. """ if isinstance(distributions, list): self.dimension = len(distributions) self.dimension = 0 for i in range(len(distributions)): if isinstance(distributions[i], DistributionContinuous1D): self.dimension += 1 elif isinstance(distributions[i], JointIndependent): self.dimension += len(distributions[i].marginals) else: raise TypeError( "UQpy: A ``DistributionContinuous1D`` or ``JointIndependent`` object must be " "provided.") elif isinstance(distributions, DistributionContinuous1D): self.dimension = 1 elif isinstance(distributions, JointIndependent): self.dimension = len(distributions.marginals) else: raise TypeError("UQpy: A ``DistributionContinuous1D`` or ``JointIndependent`` object must be provided.") self.nataf_object = Nataf(distributions=distributions, corr_z=corr_z, corr_x=corr_x) self.corr_x = corr_x self.corr_z = corr_z self.df_step = df_step self.dist_object = distributions self.n_iterations = n_iterations self.runmodel_object = runmodel_object self.seed_u = seed_u self.seed_x = seed_x self.tolerance_beta = tolerance_beta self.tolerance_gradient = tolerance_gradient self.tolerance_u = tolerance_u self.logger = logging.getLogger(__name__) # Initialize output self.alpha: float = np.nan """Direction cosine.""" self.alpha_record: list = [] """Record of the alpha (directional cosine).""" self.beta: list = [] """Hasofer-Lind reliability index.""" self.beta_record: list = [] """Record of all Hasofer-Lind reliability index values.""" self.design_point_u: list = [] """Design point in the uncorrelated standard normal space U.""" self.design_point_x: list = [] """Design point in the parameter space X.""" self.error_record: list = [] """Record of the error defined by criteria :math:`\\text{tolerance}_\\textbf{U}, \\text{tolerance}_\\beta, \\text{tolerance}_{\\nabla G(\\textbf{U})}`.""" self.failure_probability: list = [] """FORM probability of failure :math:`\\Phi(-\\beta)`.""" self.g0 = None self.iterations: list = [] """Number of model evaluations.""" self.jacobian_zx = None """Jacobian of the transformation from correlated standard normal space to the parameter space.""" self.state_function_record: list = [] """Record of the performance function.""" self.state_function_gradient_record: list = [] """Record of the model’s gradient in the standard normal space.""" self.u_record: list = [] """Record of all iteration points in the standard normal space **U**.""" self.x = None self.x_record: list = [] """Record of all iteration points in the parameter space **X**.""" if (seed_x is not None) and (seed_u is not None): raise ValueError('UQpy: Only one input (seed_x or seed_u) may be provided') if self.seed_u is not None: self.run(seed_u=self.seed_u) elif self.seed_x is not None: self.run(seed_x=self.seed_x)
[docs] def run(self, seed_x: Union[list, np.ndarray] = None, seed_u: Union[list, np.ndarray] = None): """ Runs FORM. :param seed_u: Either `seed_u` or `seed_x` must be provided. If `seed_u` is provided, it should be a point in the uncorrelated standard normal space of **U**. If `seed_x` is provided, it should be a point in the parameter space of **X**. :param seed_x: The initial starting point for the `Hasofer-Lind` algorithm. Either `seed_u` or `seed_x` must be provided. If `seed_u` is provided, it should be a point in the uncorrelated standard normal space of **U**. If `seed_x` is provided, it should be a point in the parameter space of **X**. """ self.logger.info("UQpy: Running FORM...") if seed_u is None and seed_x is None: seed = np.zeros(self.dimension) elif seed_u is None and seed_x is not None: self.nataf_object.run(samples_x=seed_x.reshape(1, -1), jacobian=False) seed_z = self.nataf_object.samples_z seed = Decorrelate(seed_z, self.nataf_object.corr_z).samples_u elif seed_u is not None and seed_x is None: seed = np.squeeze(seed_u) else: raise ValueError("UQpy: Only one seed (seed_x or seed_u) must be provided") converged = False k = 0 beta = np.zeros(shape=(self.n_iterations + 1,)) u = np.zeros([self.n_iterations + 1, self.dimension]) state_function_gradient_record = np.zeros([self.n_iterations + 1, self.dimension]) u[0, :] = seed self.state_function_record.append(0.0) while not converged and k < self.n_iterations: self.logger.info("Number of iteration: %i", k) # FORM always starts from the standard normal space if k == 0: if seed_x is not None: x = seed_x else: seed_z = Correlate(samples_u=seed.reshape(1, -1), corr_z=self.nataf_object.corr_z).samples_z self.nataf_object.run(samples_z=seed_z.reshape(1, -1), jacobian=True) x = self.nataf_object.samples_x self.jacobian_zx = self.nataf_object.jxz else: z = Correlate(u[k, :].reshape(1, -1), self.nataf_object.corr_z).samples_z self.nataf_object.run(samples_z=z, jacobian=True) x = self.nataf_object.samples_x self.jacobian_zx = self.nataf_object.jxz self.x = x self.u_record.append(u) self.x_record.append(x) self.logger.info("Design point Y: {0}\n".format(u[k, :]) + "Design point X: {0}\n".format(self.x) + "Jacobian Jzx: {0}\n".format(self.jacobian_zx)) # 2. evaluate Limit State Function and the gradient at point u_k and direction cosines state_function_gradient, qoi, _ = self._derivatives(point_u=u[k, :], point_x=self.x, runmodel_object=self.runmodel_object, nataf_object=self.nataf_object, df_step=self.df_step, order="first") self.state_function_record.append(qoi) state_function_gradient_record[k + 1, :] = state_function_gradient norm_of_state_function_gradient = np.linalg.norm(state_function_gradient) alpha = state_function_gradient / norm_of_state_function_gradient self.logger.info("Directional cosines (alpha): {0}\n".format(alpha) + "State Function Gradient: {0}\n".format(state_function_gradient) + "Norm of State Function Gradient: {0}\n".format(norm_of_state_function_gradient)) self.alpha = alpha.squeeze() self.alpha_record.append(self.alpha) beta[k] = -np.inner(u[k, :].T, self.alpha) beta[k + 1] = beta[k] + qoi / norm_of_state_function_gradient self.logger.info("Beta: {0}\n".format(beta[k]) + "Pf: {0}".format(stats.norm.cdf(-beta[k]))) u[k + 1, :] = -beta[k + 1] * self.alpha error_u = np.linalg.norm(u[k + 1, :] - u[k, :]) error_beta = np.linalg.norm(beta[k + 1] - beta[k]) error_gradient = np.linalg.norm(state_function_gradient - state_function_gradient_record[k, :]) self.error_record.append([error_u, error_beta, error_gradient]) converged_in_u = True if (self.tolerance_u is None) else (error_u <= self.tolerance_u) converged_in_beta = True if (self.tolerance_beta is None) else (error_beta <= self.tolerance_beta) converged_in_gradient = True if (self.tolerance_gradient is None) \ else (error_gradient <= self.tolerance_gradient) converged = converged_in_u and converged_in_beta and converged_in_gradient if not converged: k += 1 self.logger.info("Error: %s", self.error_record[-1]) if k > self.n_iterations: self.logger.info("UQpy: Maximum number of iterations {0} was reached before convergence." .format(self.n_iterations)) else: self.design_point_u.append(u[k, :]) self.design_point_x.append(np.squeeze(self.x)) self.beta.append(beta[k]) self.failure_probability.append(stats.norm.cdf(-beta[k])) self.state_function_gradient_record.append(state_function_gradient_record[:k]) self.iterations.append(k)