Dakota Interface Scripts (interface.py
)#
This interface script controls the analysis workflow in each optimization iteration. For this example, the analysis workflow can be overall divided into four big steps: initialization, cross-sectional analysis, post-processing and writing outputs.
The overall workflow is enclosed in a try-except
block to catch any possible errors in the process.
The whole analysis process is scripted as a function called process
accepting one argument: the name of the JSON argument file.
The overall structure of the script is:
def process(fn_json_args):
try:
# Initialization
# Cross-sectional analysis
# Post-processing
# Writing output
except:
# Steps if there are errors
Data management#
One key aspect in the analysis script including different inputs, outputs, models (e.g., cross-sections) and solvers is a central data storage. This example uses a single dictionary object to store all parameters:
params = {}
Parameters can be from multiple sources, such as Dakota inputs, data processing results or outputs of a previous analysis.
Results are also organized in a single dictionary object. They are further grouped into either interim outputs or final outputs:
results = {
'interim': {},
'final': []
}
The interim results are also stored as a dictionary.
They will be written into a JSON file at the end of the analysis process.
On the other hand, the final results are stored in a list, since Dakota requires the order of the results in the output file is the same as the order in the responses
section in the Dakota input file.
Step 1: Initialization#
This step initializes data objects from difference inputs and loads necessary modules. This can be further divided into several sub-steps.
1.1: Reading inputs from Dakota#
First read parameters from the file generated by Dakota.
import dakota.interfacing as di
dakota_params, dakota_results = di.read_parameters_file(
'input.in', 'output.out'
)
As stated above, since we use params
to store all data, parameters in dakota_params
need to be copied to params
.
for param_name in dakota_params.descriptors:
params[param_name] = dakota_params[param_name]
Then extract the current evaluation number for the use in the logger.
evid = int(dakota_params.eval_id)
log_prompt = f'eval {evid}'
params['eval_id'] = evid
1.2: Reading arguments from the JSON file#
Arguments in the JSON file can be loaded into a dictionary object very easily using a Python build-in module.
import json
with open(fn_json_args, 'r') as fo:
interface_args = json.load(fo)
1.3: Loading the module of data processing functions#
To be able to use the data processing functions written in the file data_proc_funcs.py
, we need to import this file as a module.
The file name is specified in the JSON argument data_process_functions_file
.
try:
fn_dpf = interface_args['data_process_functions_file']
if fn_dpf != '':
import_str = 'import {0} as dpf'.format(fn_dpf)
exec(import_str)
except KeyError:
pass
This means that if the file name is given (e.g., data_proc_funcs
), then the script will execute the following:
import data_proc_funcs as dpf
1.4: Initializing the logger#
The component ‘msgpi’ included in iVABS provides a logging tool that can record different levels of messages and print them to the screen or write to a file.
To initialize the logger, three optional arguments can be provided to customize the level of logging and the log file name.
These three arguments are specified by the user in the JSON argument file.
In this example, log_level_cmd
and log_level_file
specify the output level of log messages to the screen and file, respectively.
The level can be one of the following: debug
, info
, warning
, error
, critical
, with logging level increasing from the first to the last.
import msgpi.logger as mlog
log_level_cmd = 'INFO'
log_level_file = 'INFO'
log_file_name = 'log.txt'
if 'log_level_cmd' in interface_args.keys():
log_level_cmd = interface_args['log_level_cmd'].upper()
if 'log_level_file' in interface_args.keys():
log_level_file = interface_args['log_level_file'].upper()
if 'log_file_name' in interface_args.keys():
log_file_name = interface_args['log_file_name']
logger = mlog.initLogger(
__name__,
cout_level=log_level_cmd, fout_level=log_level_file, filename=log_file_name
)
To create a log, use one of the following:
logger.debug('message')
logger.info('message')
logger.warning('message')
logger.error('message')
logger.critical('message')
Only messages having a higher level than that specified in the argument will be printed.
For instance, if log_level_cmd='error'
, only messages of error
and critical
will be printed to the screen.
Step 2: Cross-sectional analysis#
In general this step contains three sub-steps: pre-processing, cross-sectional analysis and post-processing. For this example, only first two steps are needed.
2.1: Pre-processing#
In this example, two processing functions are evaluated.
Functions are defined in the file data_proc_funcs.py
and their names are specified by the key cs_pre_process
in the JSON argument file.
Functions will be evaluated sequentially according to their order specified in the argument file.
try:
for func in interface_args['cs_pre_process']:
eval(f'dpf.{func}')(params, results, interface_args)
except KeyError:
pass
2.2: Cross-sectional analysis#
First, get the cross-section name and template file name from the argument file:
cs_name, cs_design_tmp = list(interface_args['cross-section_design_template'].items())[0]
Then use the cross-section name to create a new CrossSection
object:
import msgpi.cross_section as mcs
cs = mcs.CrossSection(cs_name)
Also the template file name needs to be added to this object:
cs.fn_design_tmp = cs_design_tmp
Then, the key to carry out the cross-sectional analysis is to create a CrossSectionJob
object.
Creating of this object requires a CrossSection
object, a dictionary of parameters (params
), a dictionary of interface arguments (interface_args
) and an optional logger:
import msgpi.interface as mint
cs_job = mint.CrossSectionJob(cs, params, interface_args, logger=logger)
Finally, to carry out the actual analysis, just simply call the run()
function of the job:
cs_job.run()
The complete beam properties will be stored in cs_job.cs.props
.
Some of the properties requested in the argument file (beam_properties
) will be stored in cs_job.outputs
.
Step 3: Post-processing#
This step is for calculating the actual responses (e.g., objective) from intermediate quantities (e.g., beam properties).
Unlike the pre-processing, here the script only calls a single function named postprocess
from the file data_proc_funcs.py
.
This function accepts the following five arguments: params
, results
, interface_args
, cs_job.inputs
, cs_job.outputs
.
eval('dpf.postprocess')(
params, results, interface_args, cs_job.inputs, cs_job.outputs
)
Then, we need to implement this function in the file data_proc_funcs.py
to accomplish all kinds of calculations of the final output.
See Section: Data Processing Functions (data_proc_funcs.py).
Step 4: Writing outputs#
The last step is to write results into files. Interim results can be directly dumped to a JSON file:
with open('interim.out', 'w') as fo:
json.dump(results['interim'], fo, indent=4)
For the final results that will be read by Dakota, they need to be written in a correct order:
with open('output.out', 'w') as fo:
for r in results['final']:
fo.write(f'{r[1]:24.16E} {r[0]}\n')
Exception#
In case that there are errors and the analysis cannot be done successfully, the keyword FAIL
should be written into the result file, which is required by Dakota:
with open('output.out', 'w') as fo:
fo.write('FAIL')
Exception error messages can be printed out:
import traceback as tb
e = tb.format_exc()
print(e)
The traceback module will trace back the exception through nested functions.
Complete file#
import json
import sys
import traceback as tb
import msgpi.cross_section as mcs
import msgpi.interface as mint
import msgpi.logger as mlog
import dakota.interfacing as di
def process(fn_json_args):
try:
params = {}
results = {
"interim": {},
"final": []
}
# ------------------------------------------------------------
# FROM DAKOTA
# ------------------------------------------------------------
dakota_params, dakota_results = di.read_parameters_file(
'input.in', 'output.out'
)
evid = int(dakota_params.eval_id)
log_prompt = f'eval {evid}'
params['eval_id'] = evid
for param_name in dakota_params.descriptors:
params[param_name] = dakota_params[param_name]
# ------------------------------------------------------------
# FROM DAKOTA END
# ------------------------------------------------------------
# Load JSON arguments
# -------------------
with open(fn_json_args, 'r') as fo:
interface_args = json.load(fo)
# Load data processing module
# ---------------------------
try:
fn_dpf = interface_args['data_process_functions_file']
if fn_dpf != '':
import_str = 'import {0} as dpf'.format(fn_dpf)
exec(import_str)
except KeyError:
pass
# Logger initialization
# ---------------------
log_level_cmd = 'INFO'
log_level_file = 'INFO'
log_file_name = 'log.txt'
if 'log_level_cmd' in interface_args.keys():
log_level_cmd = interface_args['log_level_cmd'].upper()
if 'log_level_file' in interface_args.keys():
log_level_file = interface_args['log_level_file'].upper()
if 'log_file_name' in interface_args.keys():
log_file_name = interface_args['log_file_name']
logger = mlog.initLogger(
__name__,
cout_level=log_level_cmd, fout_level=log_level_file, filename=log_file_name
)
logger.info(f'{log_prompt} start')
# ------------------------------------------------------------
# CROSS-SECTION MODULE
# ------------------------------------------------------------
# CS pre-processing
# -----------------
try:
for func in interface_args['cs_pre_process']:
eval(f'dpf.{func}')(params, results, interface_args)
except KeyError:
pass
# CS analysis
# -----------
cs_name, cs_design_tmp = list(interface_args['cross-section_design_template'].items())[0]
cs = mcs.CrossSection(cs_name)
cs.fn_design_tmp = cs_design_tmp
cs_job = mint.CrossSectionJob(cs, params, interface_args, logger=logger)
cs_job.run()
# CS post-processing
# ------------------
# ------------------------------------------------------------
# CROSS-SECTION MODULE END
# ------------------------------------------------------------
# Overall post-processing of results
# ----------------------------------
eval('dpf.postprocess')(
params, results, interface_args, cs_job.inputs, cs_job.outputs
)
# Write output
# ------------
with open('interim.out', 'w') as fo:
json.dump(results['interim'], fo, indent=4)
with open('output.out', 'w') as fo:
for r in results['final']:
fo.write(f'{r[1]:24.16E} {r[0]}\n')
logger.critical(f'{log_prompt} finished')
return
except:
logger.info(f'{log_prompt} failed')
with open('output.out', 'w') as fo:
fo.write('FAIL')
e = tb.format_exc()
print(e)
return
if __name__ == '__main__':
process(sys.argv[1])