Commit a007baec authored by bellotti_r's avatar bellotti_r
Browse files

Added OpalRunner, a class that allows to call OPAL like a Python function.

parent 72e14e0d
from .simulation import Simulation
from .opaldict import OpalDict
from .slurmjob import SlurmJob
from .opalrunner import OpalRunner
import os
import json
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from runOPAL import OpalDict, Simulation, SlurmJob
from mllib.data.opal_stat_file_to_dataframe import StatFile
class OpalRunner:
def __init__(self,
input_directory,
output_directory,
fieldmap_directory,
base_name,
hyperthreading=0,
quiet=True,
partition='hourly',
slurm_time='00:59:59',
slurm_ram='16'):
'''
Initialise the runner.
Parameters
==========
input_directory: str
Directory where the `<base_name>.data` file is stored.
Must also contain a file `tmpl/<base_name>.tmpl`.
output_directory: str
Directory where all output files are written to.
If multiple design variables are given, the output of each is
written to a subdirectory of `output_directory`. The name of the
subdirectory is the row index of the design variable configuration.
fieldmap_directory: str
Directory where the fieldmaps are stored.
base_name: str
Name of the .data file without the extension.
The template file has `base_name` as its base name, too.
hyperthreading: int (optional)
Defines the number of Hyper-Threads used. Default: 0
quiet: bool (optional)
Wheather to silence output. Default: True
partition: str (optional)
SLURM partition to run the jobs in. Default: 'hourly'
slurm_time: str (optional)
Maximum runtime of the job on SLURM.
Must be in the format 'HH:MM:ss'.
Default: '00:59:59'
slurm_ram: str (optional)
How much RAM [GB] to allocate for a single job. Default: 16
'''
self._input_dir = input_directory
self._total_output_dir = output_directory
self._fieldmap_dir = fieldmap_directory
self._base_name = base_name
self._tmpl_file = f'{input_directory}/tmpl/{base_name}.tmpl'
self._data_file = f'{input_directory}/{base_name}.data'
self._hyperthreading = hyperthreading
self._quiet = quiet
self._partition = partition
self._slurm_time = slurm_time
self._slurm_ram = slurm_ram
def run_configurations(self, design_variables):
'''
Enqueues OPAL simulations for the given design variables.
The output of each run is written to a separate subdirectory.
Additional to the OPAL output, a file `design_values.json` representing
the design values is written to each subdirectory.
Parameters
==========
design_variables: pandas.DataFrame
A DataFrame containing the input variables.
Each row is a configuration. The column names are the names of the
design values as they would be put in the .data file.
Returns
=======
list(str)
A list containing the SLURM IDs of the enqueued jobs.
The jobs have just been submitted to SLURM, they have not
necessarily run yet.
'''
do_test = False
do_keep = False
do_no_batch = False
do_optimise = False
info = 6
launched_jobs = []
for row, dvars in design_variables.iterrows():
output_path = f'{self._total_output_dir}/{row}'
if not os.path.exists(output_path):
os.makedirs(output_path)
input_file = f'{output_path}/{self._base_name}.in'
# Log the design variable configuration.
dvar_values = dvars.to_dict()
with open(f'{output_path}/dvar_values.json', 'w') as file:
json.dump(dvar_values, file, indent=4)
# Collect the values from the .data file.
parameters = OpalDict(self._data_file)
# Add the design variables to the parameters that will be
# substituted in the template file.
for key, val in dvar_values.items():
parameters[key] = val
os.environ['FIELDMAPS'] = self._fieldmap_dir
os.environ['SLURM_TIME'] = self._slurm_time
os.environ['SLURM_PARTITION'] = self._partition
os.environ['SLURM_RAM'] = self._slurm_ram
# commands to execute before running OPAL
pre_cmd = [
'module use unstable',
'module load cmake/3.9.6',
'module load gcc/7.3.0',
'module load gsl/2.5',
'module load openmpi/3.1.3',
'module load boost/1.68.0',
'module load hdf5/1.10.4',
'module load OpenBLAS/0.2.20',
'module load H5hut/2.0.0rc5',
]
pre_cmd = '\n'.join(pre_cmd)
# commands to execute after running OPAL
post_cmd = [
f'rm {output_path}/*.lbal',
f'rm {output_path}/*.h5',
]
post_cmd = '\n'.join(post_cmd)
# Queue the simulation.
sim = Simulation(parameters)
job_ID = sim.run(row, self._base_name, self._input_dir,
self._tmpl_file, input_file,
do_test, do_keep, do_no_batch, do_optimise,
info, self._partition, self._hyperthreading,
self._quiet,
preCommand=pre_cmd,
postCommand=post_cmd)
launched_jobs.append(job_ID)
return launched_jobs
def run_configurations_blocking(self, design_variables):
'''
Run the design variable configurations in a blocking way.
Calls self.run_configurations(design_variables) and wait for completion
of all jobs.
Parameters
==========
design_variables: pandas.DataFrame
Returns
=======
IDs: list(str)
'''
IDs = self.run_configurations(design_variables)
for ID in IDs:
SlurmJob(ID).wait_for_completion()
return IDs
class Result:
def __init__(self, functions, columns):
self._functions = functions
columns = columns.copy()
columns.remove('Path length')
self._columns = columns
def __call__(self, s):
rows = []
for f in self._functions:
rows.append(f(s))
result = np.vstack(rows)
return pd.DataFrame(data=result, columns=self._columns)
def get_quantities_of_interest(self, stat_file_columns, dvar_IDs,
kind='slinear'):
'''
Returns a function that allows to evaluate the quantities of interest.
This function assumes that all jobs have already finished successfully.
Parameters
==========
stat_file_columns: list(str)
Columns of the .stat files that are interesting
dvar_IDs: list
Must be the indices of a pandas.DataFrame that was used as input
to run_configurations() or run_configurations_blocking() earlier.
kind: str
Which kind of interpolation to perform.
Must be a valid `kind` parameter for `scipy.interpolate.interp1d`.
Returns
=======
callable(float)
The callable takes the longitudinal position as its only argument.
It returns a pandas.DataFrame that whose column names are
the `stat_file_columns`. The indices are the `dvar_IDs`.
The function interpolates the .stat file values of the given
columns, and returns the values at the desired position.
'''
if 'Path length' not in stat_file_columns:
stat_file_columns.append('Path length')
functions = []
for ID in dvar_IDs:
# get the path to the .stat file
output_dir = f'{self._total_output_dir}/{ID}'
output_path = f'{output_dir}/{self._base_name}.stat'
# load the relevant content
df = StatFile(output_path).getDataFrame()
df = df[stat_file_columns]
# interpolate
s_fix = df['Path length'].values
y_fix = df.drop(columns='Path length').values
f = interp1d(
s_fix, y_fix,
axis=0,
kind=kind,
bounds_error=False,
fill_value='extrapolate')
functions.append(f)
return self.Result(functions, stat_file_columns)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment