import os import json import numpy as np import pandas as pd from scipy.interpolate import interp1d from runOPAL import OpalDict, Simulation, SlurmJob from mllib.data.opal_stat_file_to_dataframe import StatFile class OpalRunner: def __init__(self, input_directory, output_directory, fieldmap_directory, base_name, hyperthreading=0, quiet=True, partition='hourly', slurm_time='00:59:59', slurm_ram='16'): ''' Initialise the runner. Parameters ========== input_directory: str Directory where the `.data` file is stored. Must also contain a file `tmpl/.tmpl`. output_directory: str Directory where all output files are written to. If multiple design variables are given, the output of each is written to a subdirectory of `output_directory`. The name of the subdirectory is the row index of the design variable configuration. fieldmap_directory: str Directory where the fieldmaps are stored. base_name: str Name of the .data file without the extension. The template file has `base_name` as its base name, too. hyperthreading: int (optional) Defines the number of Hyper-Threads used. Default: 0 quiet: bool (optional) Wheather to silence output. Default: True partition: str (optional) SLURM partition to run the jobs in. Default: 'hourly' slurm_time: str (optional) Maximum runtime of the job on SLURM. Must be in the format 'HH:MM:ss'. Default: '00:59:59' slurm_ram: str (optional) How much RAM [GB] to allocate for a single job. Default: 16 ''' self._input_dir = input_directory self._total_output_dir = output_directory self._fieldmap_dir = fieldmap_directory self._base_name = base_name self._tmpl_file = f'{input_directory}/tmpl/{base_name}.tmpl' self._data_file = f'{input_directory}/{base_name}.data' self._hyperthreading = hyperthreading self._quiet = quiet self._partition = partition self._slurm_time = slurm_time self._slurm_ram = slurm_ram def run_configurations(self, design_variables): ''' Enqueues OPAL simulations for the given design variables. The output of each run is written to a separate subdirectory. Additional to the OPAL output, a file `design_values.json` representing the design values is written to each subdirectory. Parameters ========== design_variables: pandas.DataFrame A DataFrame containing the input variables. Each row is a configuration. The column names are the names of the design values as they would be put in the .data file. Returns ======= list(str) A list containing the SLURM IDs of the enqueued jobs. The jobs have just been submitted to SLURM, they have not necessarily run yet. ''' do_test = False do_keep = False do_no_batch = False do_optimise = False info = 6 launched_jobs = [] for row, dvars in design_variables.iterrows(): output_path = f'{self._total_output_dir}/{row}' if not os.path.exists(output_path): os.makedirs(output_path) input_file = f'{output_path}/{self._base_name}.in' # Log the design variable configuration. dvar_values = dvars.to_dict() with open(f'{output_path}/dvar_values.json', 'w') as file: json.dump(dvar_values, file, indent=4) # Collect the values from the .data file. parameters = OpalDict(self._data_file) # Add the design variables to the parameters that will be # substituted in the template file. for key, val in dvar_values.items(): parameters[key] = val os.environ['FIELDMAPS'] = self._fieldmap_dir os.environ['SLURM_TIME'] = self._slurm_time os.environ['SLURM_PARTITION'] = self._partition os.environ['SLURM_RAM'] = self._slurm_ram # commands to execute before running OPAL pre_cmd = [ 'module use unstable', 'module load cmake/3.9.6', 'module load gcc/7.3.0', 'module load gsl/2.5', 'module load openmpi/3.1.3', 'module load boost/1.68.0', 'module load hdf5/1.10.4', 'module load OpenBLAS/0.2.20', 'module load H5hut/2.0.0rc5', ] pre_cmd = '\n'.join(pre_cmd) # commands to execute after running OPAL post_cmd = [ f'rm {output_path}/*.lbal', f'rm {output_path}/*.h5', ] post_cmd = '\n'.join(post_cmd) # Queue the simulation. sim = Simulation(parameters) job_ID = sim.run(row, self._base_name, self._input_dir, self._tmpl_file, input_file, do_test, do_keep, do_no_batch, do_optimise, info, self._partition, self._hyperthreading, self._quiet, preCommand=pre_cmd, postCommand=post_cmd) launched_jobs.append(job_ID) return launched_jobs def run_configurations_blocking(self, design_variables): ''' Run the design variable configurations in a blocking way. Calls self.run_configurations(design_variables) and wait for completion of all jobs. Parameters ========== design_variables: pandas.DataFrame Returns ======= IDs: list(str) ''' IDs = self.run_configurations(design_variables) for ID in IDs: SlurmJob(ID).wait_for_completion() return IDs class Result: def __init__(self, functions, columns): self._functions = functions columns = columns.copy() columns.remove('Path length') self._columns = columns def __call__(self, s): rows = [] for f in self._functions: rows.append(f(s)) result = np.vstack(rows) return pd.DataFrame(data=result, columns=self._columns) def get_quantities_of_interest(self, stat_file_columns, dvar_IDs, kind='slinear'): ''' Returns a function that allows to evaluate the quantities of interest. This function assumes that all jobs have already finished successfully. Parameters ========== stat_file_columns: list(str) Columns of the .stat files that are interesting dvar_IDs: list Must be the indices of a pandas.DataFrame that was used as input to run_configurations() or run_configurations_blocking() earlier. kind: str Which kind of interpolation to perform. Must be a valid `kind` parameter for `scipy.interpolate.interp1d`. Returns ======= callable(float) The callable takes the longitudinal position as its only argument. It returns a pandas.DataFrame that whose column names are the `stat_file_columns`. The indices are the `dvar_IDs`. The function interpolates the .stat file values of the given columns, and returns the values at the desired position. ''' if 'Path length' not in stat_file_columns: stat_file_columns.append('Path length') functions = [] for ID in dvar_IDs: # get the path to the .stat file output_dir = f'{self._total_output_dir}/{ID}' output_path = f'{output_dir}/{self._base_name}.stat' # load the relevant content df = StatFile(output_path).getDataFrame() df = df[stat_file_columns] # interpolate s_fix = df['Path length'].values y_fix = df.drop(columns='Path length').values f = interp1d( s_fix, y_fix, axis=0, kind=kind, bounds_error=False, fill_value='extrapolate') functions.append(f) return self.Result(functions, stat_file_columns)