import collections
import datetime
import logging
import os
import pathlib
import platform
import re
import shutil
import subprocess
import numpy as np
[docs]class ThirdPartyModel:
def __init__(self, var_names: list[str], input_template: str, model_script: str, output_script: str = None,
model_object_name: str = None, output_object_name: str = None, fmt: str = None, separator: str = ', ',
delete_files: bool = False, model_dir: str = "Model_Runs"):
"""
:param var_names: A list containing the names of the variables present in `input_template`.
If `input template` is provided and `var_names` is not passed, i.e. if ``var_names=None``, then the default
variable names `x0`, `x1`, `x2`,..., `xn` are created and used by :class:`.RunModel`, where `n` is the number of
variables (`n_vars`).
The number of variables is equal to the second dimension of `samples` (i.e. ``n_vars=len(samples[0])``).
`var_names` is not used in the Python model workflow.
:param input_template: The name of the template input file that will be used to generate input files for
each run of the model. When operating :class:`.RunModel` with a third-party software model, ``input_template`` must
be specified.
The named file must be present in the current working directory from which :class:`.RunModel` is called.
:param model_script: The filename (with .py extension) of the Python script which contains commands to
execute the model.
The named file must be present in the current working directory from which :class:`.RunModel` is called.
:param model_object_name: In the Python workflow, `model_object_name` specifies the name of the function or
class within `model_script' that executes the model. If there is only one function or class in the
`model_script`, then it is not necessary to specify the model_object_name. If there are multiple objects within
the `model_script`, then `model_object_name` must be specified.
:param output_script: The filename of the Python script that contains the commands to process the output from
third-party software model evaluation. `output_script` is used to extract quantities of interest from model
output files and return the quantities of interest to :class:`.RunModel` for subsequent :py:mod:`UQpy` processing (e.g. for
adaptive methods that utilize the results of previous simulations to initialize new simulations).
If, in the third-party software model workflow, ``output_script = None`` (the default), then the
:py:attr:`qoi_list` attribute is empty and postprocessing must be handled outside of :py:mod:`UQpy`.
If used, the named file must be present in the current working directory from which :class:`.RunModel` is called.
`output_script` is not used in the Python model workflow. In the Python model workflow, all model postprocessing
is handled directly within `model_script`.
:param output_object_name: The name of the function or class within `output_script` that is used to collect
and process the output values from third-party software model output files. If the object is a class, the
output must be saved as an attribute called :py:attr:`qoi`. If it is a function, it should return the output
quantity of interest.
If there is only one function or only one class in `output_script`, then it is not necessary to specify
`output_object_name`. If there are multiple objects in `output_script`, then output_object_name must be
specified.
:param fmt: If the `template_input` requires variables to be written in specific format, this format can be
specified here.
Format specification follows standard Python conventions for the str.format() command described at:
https://docs.python.org/3/library/stdtypes.html#str.format. For additional details, see the Format String Syntax
description at: https://docs.python.org/3/library/string.html#formatstrings.
For example, ls-dyna .k files require each card is to be exactly 10 characters. The following format string
syntax can be used, "{:>10.4f}".
:param separator: A string used to delimit values when printing arrays to the `template_input`.
:param delete_files: Specifies whether or not to delete individual run output files after model execution
and output processing.
If `delete_files = True`, :class:`.RunModel` will remove all `run_i...` directories in the `model_dir`.
:param model_dir: Specifies the name of the sub-directory from which the model will be executed and to which
output files will be saved. A new directory is created by :class:`.RunModel` within the current directory whose name
is `model_dir` appended with a timestamp.
"""
self.template_text = None
self.logger = logging.getLogger(__name__)
if platform.system() in ["Windows"]:
self.python_command = "python"
else:
self.python_command = "python3"
self.separator = separator
self.fmt = fmt
self.check_formatting(fmt)
self.delete_files = delete_files
self.input_template = input_template
self.var_names = var_names
self.n_variables: int = 0
if self.var_names is not None and not ThirdPartyModel._is_list_of_strings(self.var_names):
raise ValueError("\nUQpy: Variable names should be passed as a list of strings.\n")
# Establish parent directory for simulations
self.parent_dir = os.getcwd()
# Create a list of all of the files and directories in the working directory. Do not include any other
# directories containing the same name as model_dir
model_files = self.create_model_files_list(model_dir)
# Create a new directory where the model will be executed
self.create_model_execution_directory(model_dir, model_files)
# Check if the model script is a python script
model_extension = pathlib.Path(model_script).suffix
if model_extension == '.py':
self.model_script = model_script
else:
raise ValueError("\nUQpy: The model script must be the name of a python script, with extension '.py'.")
self.model_object_name = model_object_name
self.output_script = output_script
self.output_object_name = output_object_name
self._check_python_model()
# Import the output script
if self.output_script is not None:
# self.output_module = __import__(self.output_script[:-3])
# Run function which checks if the output module has the output object
self._check_output_module()
def create_model_execution_directory(self, model_dir, model_files):
ts = datetime.datetime.now().strftime("%Y_%m_%d_%I_%M_%f")
self.model_dir = os.path.join(self.parent_dir, model_dir + "_" + ts)
os.makedirs(self.model_dir)
os.chdir(self.model_dir)
self.logger.info("\nUQpy: The following directory has been created for model evaluations: \n" + self.model_dir)
# Copy files from the model list to model run directory
for file_name in model_files:
full_file_name = os.path.join(self.parent_dir, file_name)
if not os.path.isdir(full_file_name):
shutil.copy(full_file_name, self.model_dir)
else:
new_dir_name = os.path.join(self.model_dir, os.path.basename(full_file_name))
shutil.copytree(full_file_name, new_dir_name)
self.logger.info("\nUQpy: The model files have been copied to the following directory for evaluation: \n"
+ self.model_dir)
parent_dir = os.path.dirname(self.model_dir)
os.chdir(parent_dir)
def create_model_files_list(self, model_dir):
model_files = []
for f_name in os.listdir(self.parent_dir):
path = os.path.join(self.parent_dir, f_name)
if model_dir not in path:
model_files.append(path)
self.model_files = model_files
return model_files
def check_formatting(self, fmt):
if self.fmt is None:
pass
elif isinstance(self.fmt, str):
if (self.fmt[0] != "{") or (self.fmt[-1] != "}") or (":" not in self.fmt):
raise ValueError("\nUQpy: fmt should be a string in brackets indicating a standard Python format.\n")
else:
raise TypeError("\nUQpy: fmt should be a str.\n")
@staticmethod
def _is_list_of_strings(list_of_strings):
"""
Check if input list contains only strings
** Input: **
:param list_of_strings: A list whose entries should be checked to see if they are strings
:type list_of_strings: list
"""
return (bool(list_of_strings) and isinstance(list_of_strings, list)
and all(isinstance(element, str) for element in list_of_strings))
def initialize(self, samples):
os.chdir(self.model_dir)
self.logger.info("\nUQpy: All model evaluations will be executed from the following directory: \n"
+ self.model_dir)
self.n_variables = len(samples[0])
if self.input_template is not None:
if self.var_names is None:
# If var_names is not passed and there is an input template, create default variable names
self.var_names = []
for i in range(self.n_variables):
self.var_names.append('x%d' % i)
elif len(self.var_names) != self.n_variables:
raise ValueError("\nUQpy: var_names must have the same length as the number of variables (i.e. "
"len(var_names) = len(samples[0]).\n")
assert os.path.isfile(self.input_template) and os.access(self.input_template, os.R_OK), \
"\nUQpy: File {} doesn't exist or isn't readable".format(self.input_template)
# Read in the text from the template files
with open(self.input_template, "r") as f:
self.template_text = str(f.read())
def finalize(self):
parent_dir = os.path.dirname(self.model_dir)
os.chdir(parent_dir)
def preprocess_single_sample(self, i, sample):
work_dir = os.path.join(self.model_dir, "run_" + str(i))
self._copy_files(work_dir=work_dir)
# Change current working directory to model run directory
os.chdir(work_dir)
self.logger.info("\nUQpy: Running model number " + str(i) + " in the following directory: \n" + work_dir)
# Call the input function
self._input_serial(i, sample)
def execute_single_sample(self, index, sample_to_send):
# os.system(f"{self.python_command} {self.model_script} {index}")
python_model = __import__(self.model_script[:-3])
model_object = getattr(python_model, self.model_object_name)
model_object(index)
def postprocess_single_file(self, index, model_output):
if self.output_script is not None:
output = self._output_serial(index)
work_dir = os.path.join(self.model_dir, "run_" + str(index))
# Remove the copied files and folders
self._remove_copied_files(work_dir)
# Return to the model directory
os.chdir(self.model_dir)
self.logger.info("\nUQpy: Model evaluation " + str(index) + " complete.\n")
self.logger.info("\nUQpy: Returning to the model directory:\n" + self.model_dir)
return output
def _input_serial(self, index, sample):
"""
Create one input file using the template and attach the index to the filename
** Input: **
:param index: The simulation number
:type index: int
"""
self.new_text = self._find_and_replace_var_names_with_values(sample=sample)
# Write the new text to the input file
self._create_input_files(file_name=self.input_template, num=index, text=self.new_text,
new_folder="InputFiles", )
def _create_input_files(self, file_name, num, text, new_folder="InputFiles"):
"""
Create input files using filename, index, text
** Input: **
:param file_name: Name of input file
:type file_name: str
:param num: The simulation number
:type num: int
:param text: Contents of the input file
:type text: str
:param new_folder: Name of directory where the created input files are placed
Default: 'InputFiles'
:type new_folder: str
"""
if not os.path.exists(new_folder):
os.makedirs(new_folder)
base_name = os.path.splitext(os.path.basename(file_name))
new_name = os.path.join(
new_folder, base_name[0] + "_" + str(num) + base_name[1]
)
with open(new_name, "w") as f:
f.write(text)
return
def _find_and_replace_var_names_with_values(self, sample):
"""
Replace placeholders containing variable names in template input text with sample values.
** Input: **
:param index: The sample number
:type index: int
"""
template_text = self.template_text
var_names = self.var_names
new_text = template_text
for j in range(self.n_variables):
string_regex = re.compile(r"<" + var_names[j] + r".*?>")
count = 0
for string in string_regex.findall(template_text):
temp_check = string[1:-1].split("[")[0]
pattern_check = re.compile(var_names[j])
if pattern_check.fullmatch(temp_check):
temp = string[1:-1].replace(var_names[j], "sample[" + str(j) + "]")
try:
temp = eval(temp)
except IndexError as err:
print("\nUQpy: Index Error: {0}\n".format(err))
raise IndexError("{0}".format(err))
if isinstance(temp, collections.abc.Iterable):
# If it is iterable, flatten and write as text file with designated separator
temp = np.array(temp).flatten()
to_add = ""
for i in range(len(temp) - 1):
if self.fmt is None:
to_add += str(temp[i]) + self.separator
else:
to_add += self.fmt.format(temp[i]) + self.separator
if self.fmt is None:
to_add += str(temp[-1])
else:
to_add += self.fmt.format(temp[-1])
else:
if self.fmt is None:
to_add = str(temp)
else:
to_add = self.fmt.format(temp)
new_text = (new_text[0: new_text.index(string)] + to_add
+ new_text[(new_text.index(string) + len(string)):])
count += 1
return new_text
def _output_serial(self, index):
"""
Execute the output script, obtain the output qoi and save it in qoi_list
** Input: **
:param index: The simulation number
:type index: int
"""
# Run output module
output_module = __import__(self.output_script[:-3])
output_object = getattr(output_module, self.output_object_name)
model_output = output_object(index)
return model_output.qoi if self.model_is_class else model_output
def _copy_files(self, work_dir):
os.makedirs(work_dir)
# Copy files from the model list to model run directory
for file_name in self.model_files:
full_file_name = os.path.join(self.model_dir, file_name)
if not os.path.isdir(full_file_name):
shutil.copy(full_file_name, work_dir)
else:
new_dir_name = os.path.join(work_dir, os.path.basename(full_file_name))
shutil.copytree(full_file_name, new_dir_name)
def _remove_copied_files(self, work_dir):
"""
Remove the copied files from each run directory to avoid having many redundant files.
** Input: **
:param work_dir: The working directory of the current run.
:type work_dir: str
"""
for file_name in self.model_files:
full_file_name = os.path.join(work_dir, os.path.basename(file_name))
if not os.path.isdir(full_file_name):
os.remove(full_file_name)
else:
shutil.rmtree(full_file_name)
def _check_output_module(self):
"""
Check if output script name is valid
This function get the names of the classes and functions in the imported module. There should be at least one
class or function in the module - if not there, exit with ValueError. If there is at least one class or
function in the module, if the output object name is not given as input and there is only one class or function,
take that class name or function name to extract output. If there is a output_object_name given, check if it is
a valid name. Else, a ValueError is raised.
"""
# Get the names of the classes and functions in the imported module
import inspect
output_module = __import__(self.output_script[:-3])
class_list = []
function_list = []
for name, obj in inspect.getmembers(output_module):
if inspect.isclass(obj):
class_list.append(name)
elif inspect.isfunction(obj):
function_list.append(name)
# There should be at least one class or function in the module - if not there, exit with error.
if len(class_list) == 0 and len(function_list) == 0:
raise ValueError("\nUQpy: The output object should be defined as a function or class in the script.\n")
else: # If there is at least one class or function in the module
# If the model object name is not given as input and there is only one class or function,
# take that class name or function name to run the model.
if self.output_object_name is None and len(class_list) + len(function_list) == 1:
if len(class_list) == 1:
self.output_object_name = class_list[0]
elif len(function_list) == 1:
self.output_object_name = function_list[0]
# If there is a model_object_name given, check if it is in the list.
if self.output_object_name in class_list:
# self.logger.info("\nUQpy: The output class that will be run: " + self.output_object_name)
self.output_is_class = True
elif self.output_object_name in function_list:
# self.logger.info("\nUQpy: The output function that will be run: " + self.output_object_name)
self.output_is_class = False
else:
if self.output_object_name is None:
raise ValueError("\nUQpy: There are more than one objects in the module. Specify the name of the "
"function or class which has to be executed.\n")
else:
print("\nUQpy: You specified the output object name as: " + str(self.output_object_name))
raise ValueError("\nUQpy: The file does not contain an object which was specified as the output "
"processor.\n")
def _check_python_model(self):
"""
Check if python model name is valid
This function gets the name of the classes and functions in the imported python module whose names is passed in
as the python model to RunModel. There should be at least one class or function in the module - if not there,
then the function exits raising a ValueError. If there is at least one class or function in the module,
if the model object name is not given as input and there is only one class or function, that class name or
function name is used to run the model. If there is a model_object_name given, check if it is a valid name.
Else, a ValueError is raised.
"""
# Get the names of the classes and functions in the imported module
import inspect
class_list = []
function_list = []
python_model = __import__(self.model_script[:-3])
for name, obj in inspect.getmembers(python_model):
if inspect.isclass(obj):
class_list.append(name)
elif inspect.isfunction(obj):
function_list.append(name)
# There should be at least one class or function in the module - if not there, exit with error.
if len(class_list) == 0 and len(function_list) == 0:
raise ValueError("\nUQpy: A python model should be defined as a function or class in the script.\n")
else: # If there is at least one class or function in the module
# If the model object name is not given as input and there is only one class or function,
# take that class name or function name to run the model.
if self.model_object_name is None and len(class_list) + len(function_list) == 1:
if len(class_list) == 1:
self.model_object_name = class_list[0]
elif len(function_list) == 1:
self.model_object_name = function_list[0]
# If there is a model_object_name given, check if it is in the list.
if self.model_object_name in class_list:
# self.logger.info("\nUQpy: The model class that will be run: " + self.model_object_name)
self.model_is_class = True
elif self.model_object_name in function_list:
# self.logger.info("\nUQpy: The model function that will be run: " + self.model_object_name)
self.model_is_class = False
else:
if self.model_object_name is None:
raise ValueError("\nUQpy: There are more than one objects in the module. Specify the name of the "
"function or class which has to be executed.\n")
else:
print("\nUQpy: You specified the model_object_name as: " + str(self.model_object_name))
raise ValueError("\nUQpy: The file does not contain an object which was specified as the model.\n")