Source code for UQpy.surrogates.gaussian_process.GaussianProcessRegression

import logging
import numpy as np
from scipy.linalg import cholesky, cho_solve

from beartype import beartype

from UQpy.utilities.Utilities import process_random_state
from UQpy.surrogates.baseclass.Surrogate import Surrogate
from UQpy.utilities.ValidationTypes import RandomStateType
from UQpy.utilities.kernels.baseclass.Kernel import Kernel
from UQpy.surrogates.gaussian_process.constraints.baseclass.Constraints import ConstraintsGPR


[docs]class GaussianProcessRegression(Surrogate): @beartype def __init__( self, kernel: Kernel, hyperparameters: list, regression_model=None, optimizer=None, bounds=None, optimize_constraints: ConstraintsGPR = None, optimizations_number: int = 1, normalize: bool = False, noise: bool = False, random_state: RandomStateType = None, ): """ GaussianProcessRegressor an Gaussian process regression-based surrogate model to predict the model output at new sample points. :param kernel: `kernel` specifies and evaluates the kernel. Built-in options: RBF :param hyperparameters: List or array of initial values for the kernel hyperparameters/scale parameters. This input attribute defines the dimension of input training point, thus its length/shape should be equal to the input dimension plus one (d+1), this list/array includes 'd' length scale and process standard deviation. In case of noisy observations/output, its length/shape should be equal to the input dimension plus two (d+2), this list/array includes 'd' lengthscales, process standard deviation and noise variance. :param regression_model: A class object, which computes the basis function at a sample point. If regression_model is None, this class will train GP with regression. Default: None :param bounds: This input attribute defines the bounds of the loguniform distributions, which randomly generates new starting point for optimization problem to estimate maximum likelihood estimator. This should be a closed bound. Default: [10**-3, 10**3] for each hyperparameter and [10**-10, 10**-1] for variance in noise (if applicable). :param optimizations_number: Number of times MLE optimization problem is to be solved with a random starting point. Default: 1. :param normalize: Boolean flag used in case data normalization is required. :param optimizer: A class object of 'MinimizeOptimizer' or 'FminCobyla' from UQpy.utilities module. If optimizer is not defined, this class will not solve MLE problem and predictions will be based on hyperparameters provided as input. Default: None. :param noise: Boolean flag used in case of noisy training data. Default: False :param random_state: Random seed used to initialize the pseudo-random number generator. If an integer is provided, this sets the seed for an object of :class:`numpy.random.RandomState`. Otherwise, the object itself can be passed directly. """ self.regression_model = regression_model self.kernel = kernel self.hyperparameters = np.array(hyperparameters) self.bounds = bounds self.optimizer = optimizer self.optimizations_number = optimizations_number self.optimize_constraints = optimize_constraints self.normalize = normalize self.noise = noise self.logger = logging.getLogger(__name__) self.random_state = random_state # Variables are used outside the __init__ self.samples = None self.values = None self.sample_mean, self.sample_std = None, None self.value_mean, self.value_std = None, None self.rmodel, self.cmodel = None, None self.beta = None """Regression coefficients.""" self.gamma = None self.err_var = None """Variance of the Gaussian random process.""" self.F_dash = None self.C_inv = None """Inverse Cholesky decomposition of the correlation matrix.""" self.G = None self.F, self.K = None, None self.cc, self.alpha_ = None, None self.mu = 0 if bounds is None: if isinstance(self.optimizer, type(None)) or isinstance(self.optimizer._bounds, type(None)): self._define_bounds() else: self.bounds = self.optimizer._bounds self.jac = False self.random_state = process_random_state(random_state) def _define_bounds(self): bounds_ = [[10 ** -3, 10 ** 3]] * self.hyperparameters.shape[0] if self.noise: bounds_[-1] = [10 ** -10, 10 ** -1] self.bounds = bounds_
[docs] def fit( self, samples, values, optimizations_number=None, hyperparameters=None, ): """ Fit the surrogate model using the training samples and the corresponding model values. The user can run this method multiple time after initiating the :class:`.GaussianProcessRegressor` class object. This method updates the samples and parameters of the :class:`.GaussianProcessRegressor` object. This method uses `hyperparameters` from previous run as the starting point for MLE problem unless user provides a new starting point. :param samples: `ndarray` containing the training points. :param values: `ndarray` containing the model evaluations at the training points. :param optimizations_number: number of optimization iterations :param hyperparameters: List or array of initial values for the kernel hyperparameters/scale parameters. The :meth:`fit` method has no returns, although it creates the `beta`, `err_var` and `C_inv` attributes of the :class:`.GaussianProcessRegressor` class. """ self.logger.info("UQpy: Running gpr.fit") if optimizations_number is not None: self.optimizations_number = optimizations_number if hyperparameters is not None: self.hyperparameters = np.array(hyperparameters) self.samples = np.array(samples) # Number of samples and dimensions of samples and values nsamples, input_dim = self.samples.shape output_dim = int(np.size(values) / nsamples) # Verify the length of hyperparameters and input dimension if self.noise: if self.hyperparameters.shape[0] != input_dim + 2: raise RuntimeError("UQpy: The length/shape of attribute 'hyperparameter' and input dimension are not " "consistent.") elif self.hyperparameters.shape[0] != input_dim + 1: raise RuntimeError("UQpy: The length/shape of attribute 'hyperparameter' and input dimension are not " "consistent.") self.values = np.array(values).reshape(nsamples, output_dim) # Normalizing the data if self.normalize: self.sample_mean, self.sample_std = np.mean(self.samples, 0), np.std(self.samples, 0) self.value_mean, self.value_std = np.mean(self.values, 0), np.std(self.values, 0) s_ = (self.samples - self.sample_mean) / self.sample_std y_ = (self.values - self.value_mean) / self.value_std else: s_ = self.samples y_ = self.values if self.regression_model is not None: self.F = self.regression_model.r(s_) # Maximum Likelihood Estimation : Solving optimization problem to calculate hyperparameters if self.optimizer is not None: # To ensure that cholesky of covariance matrix is computed again during optimization self.alpha_ = None lb = [np.log10(xy[0]) for xy in self.bounds] ub = [np.log10(xy[1]) for xy in self.bounds] starting_point = np.random.uniform(low=lb, high=ub, size=(self.optimizations_number, len(self.bounds))) starting_point[0, :] = np.log10(self.hyperparameters) if self.optimize_constraints is not None: cons = self.optimize_constraints.define_arguments(self.samples, self.values, self.predict) self.optimizer.apply_constraints(constraints=cons) self.optimizer.apply_constraints_argument(self.optimize_constraints.constraint_args) else: log_bounds = [[np.log10(xy[0]), np.log10(xy[1])] for xy in self.bounds] self.optimizer.update_bounds(bounds=log_bounds) minimizer = np.zeros([self.optimizations_number, len(self.bounds)]) fun_value = np.zeros([self.optimizations_number, 1]) for i__ in range(self.optimizations_number): p_ = self.optimizer.optimize(function=GaussianProcessRegression.log_likelihood, initial_guess=starting_point[i__, :], args=(self.kernel, s_, y_, self.noise, self.F), jac=self.jac) if isinstance(p_, np.ndarray): minimizer[i__, :] = p_ fun_value[i__, 0] = GaussianProcessRegression.log_likelihood(p_, self.kernel, s_, y_, self.noise, self.F) else: minimizer[i__, :] = p_.x fun_value[i__, 0] = p_.fun if min(fun_value) == np.inf: raise NotImplementedError("Maximum likelihood estimator failed: Choose different starting point or " "increase nopt") t = np.argmin(fun_value) self.hyperparameters = 10 ** minimizer[t, :] # Updated Correlation matrix corresponding to MLE estimates of hyperparameters if self.noise: self.kernel.kernel_parameter = self.hyperparameters[:-2] sigma = self.hyperparameters[-2] self.K = sigma ** 2 * self.kernel.calculate_kernel_matrix(x=s_, s=s_) + \ np.eye(nsamples) * (self.hyperparameters[-1]) ** 2 else: self.kernel.kernel_parameter = self.hyperparameters[:-1] sigma = self.hyperparameters[-1] self.K = sigma ** 2 * self.kernel.calculate_kernel_matrix(x=s_, s=s_) self.cc = cholesky(self.K + 1e-10 * np.eye(nsamples), lower=True) self.alpha_ = cho_solve((self.cc, True), y_) if self.regression_model is not None: # Compute the regression coefficient (solving this linear equation: F * beta = Y) # Eq: 3.8, DACE f_dash = np.linalg.solve(self.cc, self.F) y_dash = np.linalg.solve(self.cc, y_) q_, g_ = np.linalg.qr(f_dash) # Eq: 3.11, DACE # Check if F is a full rank matrix if np.linalg.matrix_rank(g_) != min(np.size(self.F, 0), np.size(self.F, 1)): raise NotImplementedError("Chosen regression functions are not sufficiently linearly independent") # Design parameters (beta: regression coefficient) self.beta = np.linalg.solve(g_, np.matmul(np.transpose(q_), y_dash)) self.mu = np.einsum("ij,jk->ik", self.F, self.beta) self.alpha_ = cho_solve((self.cc, True), y_ - self.mu) self.logger.info("UQpy: gpr fit complete.")
[docs] def predict(self, points, return_std: bool = False, hyperparameters: list = None): """ Predict the model response at new points. This method evaluates the regression and correlation model at new sample points. Then, it predicts the function value and standard deviation. :param points: Points at which to predict the model response. :param return_std: Indicator to estimate standard deviation. :param hyperparameters: Hyperparameters for correlation model. :return: Predicted values at the new points, Standard deviation of predicted values at the new points """ x_ = np.atleast_2d(points) if self.normalize: x_ = (x_ - self.sample_mean) / self.sample_std s_ = (self.samples - self.sample_mean) / self.sample_std y_ = (self.values - self.value_mean) / self.value_std else: s_ = self.samples y_ = self.values if hyperparameters is None: hyperparameters = self.hyperparameters noise_std = 0 kernelparameters = hyperparameters if self.noise: kernelparameters = hyperparameters[:-1] noise_std = hyperparameters[-1] if hyperparameters is not None: # This is used for MLE constraints, if constraints call 'predict' method. self.kernel.kernel_parameter = kernelparameters[:-1] if kernelparameters is not None: sigma = kernelparameters[-1] else: raise ValueError('kernelparameters is None') K = sigma ** 2 * self.kernel.calculate_kernel_matrix(x=s_, s=s_) + \ np.eye(self.samples.shape[0]) * (noise_std ** 2) cc = np.linalg.cholesky(K + 1e-10 * np.eye(self.samples.shape[0])) mu = 0 if self.regression_model is not None: f_dash = np.linalg.solve(cc, self.F) y_dash = np.linalg.solve(cc, y_) q_, g_ = np.linalg.qr(f_dash) # Eq: 3.11, DACE # Check if F is a full rank matrix if np.linalg.matrix_rank(g_) != min(np.size(self.F, 0), np.size(self.F, 1)): raise NotImplementedError("Chosen regression functions are not sufficiently linearly independent") # Design parameters (beta: regression coefficient) beta = np.linalg.solve(g_, np.matmul(np.transpose(q_), y_dash)) mu = np.einsum("ij,jk->ik", self.F, beta) alpha_ = cho_solve((cc, True), y_ - mu) else: cc, alpha_ = self.cc, self.alpha_ mu1 = 0 if self.regression_model is not None: fx = self.regression_model.r(x_) if self.beta is None: mu1 = np.einsum("ij,jk->ik", fx, beta) else: mu1 = np.einsum("ij,jk->ik", fx, self.beta) if kernelparameters is not None: self.kernel.kernel_parameter = kernelparameters[:-1] sigma = kernelparameters[-1] k = sigma**2*self.kernel.calculate_kernel_matrix(x=x_, s=s_) y = mu1 + k @ alpha_ if self.normalize: y = self.value_mean + y * self.value_std if x_.shape[1] == 1: y = y.flatten() if return_std: self.kernel.kernel_parameter = kernelparameters[:-1] sigma = kernelparameters[-1] k1 = sigma**2*self.kernel.calculate_kernel_matrix(x=x_, s=x_) var = (k1 - k @ cho_solve((cc, True), k.T)).diagonal() mse = np.sqrt(var) if self.normalize: mse = self.value_std * mse if x_.shape[1] == 1: mse = mse.flatten() return y, mse else: return y
@staticmethod def log_likelihood(p0, k_, s, y, ind_noise, fx_): """ :param p0: An 1-D numpy array of hyperparameters, that are identified through MLE. The last two elements are process variance and data noise, rest of the elements are length scale along each input dimension. :param k_: Kernel :param s: Input training data :param y: Output training data :param ind_noise: Boolean flag to indicate the noisy output :param fx_: Basis function evaluated at training points :return: """ # Return the log-likelihood function and it's gradient. Gradient is calculated using Central Difference m = s.shape[0] if ind_noise: k_.kernel_parameter = 10 ** p0[:-2] sigma = 10 ** p0[-2] k__ = sigma ** 2 * k_.calculate_kernel_matrix(x=s, s=s) + np.eye(m) * (10 ** p0[-1]) ** 2 else: k_.kernel_parameter = 10 ** p0[:-1] sigma = 10 ** p0[-1] k__ = sigma ** 2 * k_.calculate_kernel_matrix(x=s, s=s) cc = cholesky(k__ + 1e-10 * np.eye(m), lower=True) mu = 0 if fx_ is not None: f_dash = np.linalg.solve(cc, fx_) y_dash = np.linalg.solve(cc, y) q_, g_ = np.linalg.qr(f_dash) # Eq: 3.11, DACE # Check if F is a full rank matrix if np.linalg.matrix_rank(g_) != min(np.size(fx_, 0), np.size(fx_, 1)): raise NotImplementedError("Chosen regression functions are not sufficiently linearly independent") # Design parameters (beta: regression coefficient) beta = np.linalg.solve(g_, np.matmul(np.transpose(q_), y_dash)) mu = np.einsum("ij,jk->ik", fx_, beta) term1 = (y - mu).T @ (cho_solve((cc, True), y - mu)) term2 = 2 * np.sum(np.log(np.abs(np.diag(cc)))) return 0.5 * (term1 + term2 + m * np.log(2 * np.pi))[0, 0]