# UQpy is distributed under the MIT license.
#
# Copyright (C) 2018 -- Michael D. Shields
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the
# Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE
# WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
import logging
import os
import pickle
from typing import Union
import numpy as np
from beartype import beartype
from enum import Enum, auto
from UQpy.utilities.ValidationTypes import NumpyFloatArray
class RunType(Enum):
LOCAL = auto()
CLUSTER = auto()
[docs]class RunModel:
# Authors:
# B.S. Aakash, Lohit Vandanapu, Michael D.Shields, Michael H. Gardner
#
# Last
# modified: 8 / 31 / 2022 by Michael H. Gardner
@beartype
def __init__(
self,
model,
samples: Union[list, NumpyFloatArray] = None,
ntasks: int = 1,
cores_per_task: int = 1,
nodes: int = 1,
resume: bool = False,
run_type: str = 'LOCAL',
cluster_script: str = None
):
"""
Run a computational model at specified sample points.
This class is the interface between :py:mod:`UQpy` and computational models. The model is called in a Python script whose
name must be passed as one the arguments to the :class:`.RunModel` call. If the model is in Python, :py:mod:`UQpy` import the
model and executes it directly. If the model is not in Python, :class:`.RunModel` must be provided the name of a template
input file, the name of the Python script that runs the model, and an (optional) output Python script.
:param samples: Samples to be passed as inputs to the model.
Regardless of data type, the first dimension of ``samples`` must be equal to the number of samples at which
to execute the model. That is, ``len(samples) = nsamples``.
Regardless of data type, the second dimension of ``samples`` must be equal to the number of variables to
to pass for each model evaluation. That is, ``len(samples[0]) = n_vars``. Each variable need not be a scalar.
Variables may be scalar, vector, matrix, or tensor type (i.e. `float`, `list`, or `ndarray`).
If `samples` are not passed, a :class:`.RunModel` object will be instantiated that can be used later, with the
:meth:`run` method, to evaluate the model.
Used in both python and third-party model execution.
:param ntasks: Number of tasks to be run in parallel. By default, ``ntasks = 1`` and the models are executed
serially.
Setting ntasks equal to a positive integer greater than 1 will trigger the parallel workflow.
`ntasks` is used for both the Python and third-party model workflows. :class:`.RunModel` uses `GNU parallel` to
execute third-party models in parallel and the multiprocessing module to execute Python models in parallel.
:param cores_per_task: Number of cores to be used by each task. In cases where a third-party model runs across
multiple CPUs, this optional attribute allocates the necessary resources to each model evaluation.
`cores_per_task` is not used in the Python model workflow.
:param nodes: Number of nodes across which to distribute individual tasks on an HPC cluster in the third-party
model workflow. If more than one compute node is necessary to execute individual runs in parallel, `nodes` must
be specified.
"""
self.logger = logging.getLogger(__name__)
self.model = model
# Save option for resuming parallel execution
self.resume = resume
# Set location for model runs
self.run_type = RunType[run_type]
self.cluster_script = cluster_script
self.nodes = nodes
self.ntasks = ntasks
self.cores_per_task = cores_per_task
self.is_serial = ntasks <= 1 and cores_per_task <= 1 and nodes <= 1
# Initialize sample related variables
self.samples: NumpyFloatArray = []
"""Internally, :class:`.RunModel` converts the input `samples` into a numpy `ndarray` with at least two
dimension where the first dimension of the :class:`numpy.ndarray` corresponds to a single sample to be executed
by the model."""
self.samples = np.atleast_2d(self.samples)
self.qoi_list: list = []
"""A list containing the output quantities of interest
In the third-party model workflow, these output quantities of interest are extracted from the model output files
by `output_script`.
In the Python model workflow, the returned quantity of interest from the model evaluations is stored as
:py:attr:`qoi_list`.
This attribute is commonly used for adaptive algorithms that employ learning functions based on previous model
evaluations."""
self.n_existing_simulations: int = 0
"""Number of pre-existing model evaluations, prior to a new :meth:`run` method call.
If the :meth:`run` methods has previously been called and model evaluations performed, subsequent calls to the
:meth:`run` method will be appended to the :class:`RunModel` object. :py:attr:`nexist` stores the number of
previously existing model evaluations."""
self.n_new_simulations: int = 0
"""Number of model evaluations to be performed, ``nsim = len(samples)``."""
# Check if samples are provided.
if samples is None:
self.logger.info("\nUQpy: No samples are provided. Creating the object and building the model directory.\n")
elif isinstance(samples, (list, np.ndarray)):
self.run(samples)
else:
raise ValueError("\nUQpy: samples must be passed as a list or numpy ndarray\n")
[docs] def run(self, samples=None, append_samples=True):
"""
Execute a computational model at given sample values.
If `samples` are passed when defining the :class:`.RunModel` object, the :meth:`run` method is called automatically.
The :meth:`run` method may also be called directly after defining the :meth:`RunModel` object.
:param list samples: Samples to be passed as inputs to the model defined by the :class:`.RunModel` object.
Regardless of data type, the first dimension of ``samples`` must be equal to the number of samples at which
to execute the model. That is, ``len(samples) = nsamples``.
Regardless of data type, the second dimension of ``samples`` must be equal to the number of variables to
to pass for each model evaluation. That is, ``len(samples[0]) = n_vars``. Each variable need not be a
scalar. Variables may be scalar, vector, matrix, or tensor type (i.e. `float`, `list`, or `ndarray`).
Used in both python and third-party model execution.
:param bool append_samples: Append over overwrite existing samples and model evaluations.
If ``append_samples = False``, all previous samples and the corresponding quantities of interest from their
model evaluations are deleted.
If ``append_samples = True``, samples and their resulting quantities of interest are appended to the
existing ones.
"""
# Ensure the input samples have the correct structure
# --> If a list is provided, convert to at least 2d ndarray. dim1 = nsim, dim2 = n_vars
# --> If 1D array/list is provided, convert it to a 2d array. dim1 = 1, dim2 = n_vars
# --> If samples cannot be converted to an array, this will fail.
samples = np.atleast_2d(samples)
# Number of simulations to be performed
self.n_new_simulations = len(samples)
# If append_samples is False, a new set of samples is created, the previous ones are deleted!
if not append_samples:
self.samples = []
self.samples = np.atleast_2d(self.samples)
self.qoi_list = []
# Check if samples already exist, if yes append new samples to old ones
# if not self.samples: # There are currently no samples
if self.samples.size == 0:
self.n_existing_simulations = 0
self.samples = samples
else: # Samples already exist in the RunModel object, append new ones
self.n_existing_simulations = len(self.samples)
self.samples = np.vstack((self.samples, samples))
self.model.initialize(samples)
self.qoi_list.extend(self.serial_execution() if self.is_serial else self.parallel_execution())
self.model.finalize()
def parallel_execution(self):
# TODO: Check if files with the names used below already exist and raise error
with open('model.pkl', 'wb') as filehandle:
pickle.dump(self.model, filehandle)
with open('samples.pkl', 'wb') as filehandle:
pickle.dump(self.samples, filehandle)
if self.run_type is RunType.LOCAL:
os.system(f"mpirun python -m "
f"UQpy.run_model.model_execution.ParallelExecution {self.n_existing_simulations} "
f"{self.n_new_simulations}")
elif self.run_type is RunType.CLUSTER:
if self.cluster_script is None:
raise ValueError("\nUQpy: User-provided slurm script not input, please provide this input\n")
os.system(f"python -m UQpy.run_model.model_execution.ClusterExecution {self.cores_per_task} "
f"{self.n_new_simulations} {self.n_existing_simulations} {self.cluster_script}")
else:
raise ValueError("\nUQpy: RunType is not in currently supported list of cluster types\n")
with open('qoi.pkl', 'rb') as filehandle:
results = pickle.load(filehandle)
os.remove("model.pkl")
os.remove("samples.pkl")
os.remove("qoi.pkl")
self.logger.info("\nUQpy: Parallel execution of the python model complete.\n")
return results
def serial_execution(self):
results = []
for i in range(self.n_existing_simulations, self.n_existing_simulations + self.n_new_simulations):
sample = self.model.preprocess_single_sample(i, self.samples[i])
execution_output = self.model.execute_single_sample(i, sample)
results.append(self.model.postprocess_single_file(i, execution_output))
self.logger.info("\nUQpy: Serial execution of the python model complete.\n")
return results