Commit de3ccbf8 authored by Renato Bellotti's avatar Renato Bellotti
Browse files

Removed OpalRunner and SlurmJob because they introduce MLLIB as a dependency.

parent fa84e6fd
from .simulation import Simulation from .simulation import Simulation
from .opaldict import OpalDict from .opaldict import OpalDict
from .slurmjob import SlurmJob
from .opalrunner import OpalRunner
import os
import json
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from runOPAL import OpalDict, Simulation, SlurmJob
from mllib.data.opal_stat_file_to_dataframe import StatFile
class OpalRunner:
def __init__(self,
input_directory,
output_directory,
fieldmap_directory,
base_name,
hyperthreading=0,
quiet=True,
partition='hourly',
slurm_time='00:59:59',
slurm_ram='16'):
'''
Initialise the runner.
Parameters
==========
input_directory: str
Directory where the `<base_name>.data` file is stored.
Must also contain a file `tmpl/<base_name>.tmpl`.
output_directory: str
Directory where all output files are written to.
If multiple design variables are given, the output of each is
written to a subdirectory of `output_directory`. The name of the
subdirectory is the row index of the design variable configuration.
fieldmap_directory: str
Directory where the fieldmaps are stored.
base_name: str
Name of the .data file without the extension.
The template file has `base_name` as its base name, too.
hyperthreading: int (optional)
Defines the number of Hyper-Threads used. Default: 0
quiet: bool (optional)
Whether to silence output. Default: True
partition: str (optional)
SLURM partition to run the jobs in. Default: 'hourly'
slurm_time: str (optional)
Maximum runtime of the job on SLURM.
Must be in the format 'HH:MM:ss'.
Default: '00:59:59'
slurm_ram: str (optional)
How much RAM [GB] to allocate for a single job. Default: 16
'''
self._input_dir = input_directory
self._total_output_dir = output_directory
self._fieldmap_dir = fieldmap_directory
self._base_name = base_name
self._tmpl_file = f'{input_directory}/tmpl/{base_name}.tmpl'
self._data_file = f'{input_directory}/{base_name}.data'
self._hyperthreading = hyperthreading
self._quiet = quiet
self._partition = partition
self._slurm_time = slurm_time
self._slurm_ram = slurm_ram
def run_configurations(self, design_variables,
pre_command=None, post_command=None):
'''
Enqueues OPAL simulations for the given design variables.
The output of each run is written to a separate subdirectory.
Additional to the OPAL output, a file `design_values.json` representing
the design values is written to each subdirectory.
Parameters
==========
design_variables: pandas.DataFrame
A DataFrame containing the input variables.
Each row is a configuration. The column names are the names of the
design values as they would be put in the .data file.
pre_command: str (optional)
Command to execute before launching OPAL.
If None, execute the common pre-commands to launch on Merlin6.
Default: None
post_command: str (optional)
Command to execute before launching OPAL.
If None, execute the common pre-commands to launch on Merlin6.
Default: None
Returns
=======
list of str
A list containing the SLURM IDs of the enqueued jobs.
The jobs have just been submitted to SLURM, they have not
necessarily run yet.
'''
do_test = False
do_keep = False
do_no_batch = False
do_optimise = False
info = 6
launched_jobs = []
for row, dvars in design_variables.iterrows():
output_path = f'{self._total_output_dir}/{row}'
if not os.path.exists(output_path):
os.makedirs(output_path)
input_file = f'{output_path}/{self._base_name}.in'
# Log the design variable configuration.
dvar_values = dvars.to_dict()
with open(f'{output_path}/dvar_values.json', 'w') as file:
json.dump(dvar_values, file, indent=4)
# Collect the values from the .data file.
parameters = OpalDict(self._data_file)
# Add the design variables to the parameters that will be
# substituted in the template file.
for key, val in dvar_values.items():
parameters[key] = val
os.environ['FIELDMAPS'] = self._fieldmap_dir
os.environ['SLURM_TIME'] = self._slurm_time
os.environ['SLURM_PARTITION'] = self._partition
os.environ['SLURM_RAM'] = self._slurm_ram
# commands to execute before running OPAL
if pre_command is None:
pre_command = [
'module use /afs/psi.ch/project/amas/modulefiles',
'module load opal-toolchain/master',
]
pre_command = '\n'.join(pre_command)
# commands to execute after running OPAL
if post_command is None:
post_command = [
f'rm {output_path}/*.lbal',
f'rm {output_path}/*.h5',
]
post_command = '\n'.join(post_command)
# Queue the simulation.
sim = Simulation(parameters)
job_ID = sim.run(row, self._base_name, self._input_dir,
self._tmpl_file, input_file,
do_test, do_keep, do_no_batch, do_optimise,
info, self._partition, self._hyperthreading,
self._quiet,
preCommand=pre_command,
postCommand=post_command)
launched_jobs.append(job_ID)
return launched_jobs
def run_configurations_blocking(self, design_variables):
'''
Run the design variable configurations in a blocking way.
Calls self.run_configurations(design_variables) and wait for completion
of all jobs.
Parameters
==========
design_variables: pandas.DataFrame
Returns
=======
IDs: list of str
'''
IDs = self.run_configurations(design_variables)
for ID in IDs:
SlurmJob(ID).wait_for_completion()
return IDs
class Result:
def __init__(self, functions, columns):
self._functions = functions
columns = columns.copy()
columns.remove('Path length')
self._columns = columns
def __call__(self, s):
rows = []
for f in self._functions:
rows.append(f(s))
result = np.vstack(rows)
return pd.DataFrame(data=result, columns=self._columns)
def get_quantities_of_interest(self, stat_file_columns, dvar_IDs,
kind='slinear'):
'''
Returns a function that allows to evaluate the quantities of interest.
This function assumes that all jobs have already finished successfully.
Parameters
==========
stat_file_columns: list of str
Columns of the .stat files that are interesting
dvar_IDs: list
Must be the indices of a pandas.DataFrame that was used as input
to run_configurations() or run_configurations_blocking() earlier.
kind: str
Which kind of interpolation to perform.
Must be a valid `kind` parameter for `scipy.interpolate.interp1d`.
Returns
=======
callable(float)
The callable takes the longitudinal position as its only argument.
It returns a pandas.DataFrame that whose column names are
the `stat_file_columns`. The indices are the `dvar_IDs`.
The function interpolates the .stat file values of the given
columns, and returns the values at the desired position.
'''
if 'Path length' not in stat_file_columns:
stat_file_columns.append('Path length')
functions = []
for ID in dvar_IDs:
# get the path to the .stat file
output_dir = f'{self._total_output_dir}/{ID}'
output_path = f'{output_dir}/{self._base_name}.stat'
# load the relevant content
df = StatFile(output_path).getDataFrame()
df = df[stat_file_columns]
# interpolate
s_fix = df['Path length'].values
y_fix = df.drop(columns='Path length').values
f = interp1d(
s_fix, y_fix,
axis=0,
kind=kind,
bounds_error=False,
fill_value='extrapolate')
functions.append(f)
return self.Result(functions, stat_file_columns)
import subprocess
import time
class SlurmJob:
'''
Class representing a SLURM job.
'''
def __init__(self, ID):
'''
Parameters
==========
ID: int or str
Identification number of the slurm job.
'''
self._ID = ID
@property
def status(self):
'''
Returns the current job status.
'''
cmd = 'sacct -j {} -o state'.format(self._ID).split(' ')
completed_process = subprocess.run(cmd,
stdout=subprocess.PIPE,
encoding='utf-8')
output = completed_process.stdout
# Format of output:
#
# State
# ----------
# CANCELLED+
#
state = output.split('\n')[2].strip()
return state
def wait_for_completion(self, timeout=None):
'''
Wait until the job has finished.
Parameters
==========
timeout: int or None
Maximum time to wait (in s). If None: Wait forever for completion.
Returns
=======
True if the the job completed successfully, False if it failed
Raises
======
TimeoutError
If the timeout is exceeded
RuntimeError
If the job state is not in
['PENDING', 'RUNNING', 'COMPLETED', 'FAILED']
'''
start_time = time.time()
has_started = False
while True:
if (timeout is not None) and (time.time() - start_time > timeout):
raise TimeoutError(f'Job {self._ID} has timed out!')
state = self.status
if state == '':
pass
elif state == 'PENDING':
pass
elif state == 'RUNNING':
if not has_started:
print('Job is running...')
has_started = True
elif state == 'COMPLETED':
return True
elif state == 'FAILED':
return False
else:
raise RuntimeError(f'Unknown job state: {state}')
time.sleep(3)
...@@ -3,17 +3,15 @@ from setuptools import setup ...@@ -3,17 +3,15 @@ from setuptools import setup
# https://stackoverflow.com/a/39811884 # https://stackoverflow.com/a/39811884
setup( setup(
name='runOPAL', name='runOPAL',
version='1.0', version='1.0',
description='Run OPAL simulations from within Python', description='Run OPAL simulations from within Python',
author='Andreas Adelmann et al.', author='Andreas Adelmann et al.',
packages=['runOPAL'], # same as name packages=['runOPAL'], # same as name
python_requires='>=3.6', python_requires='>=3.6',
install_requires=[ install_requires=[
'numpy>=1.17', 'numpy>=1.17',
'scipy>=1.3', 'scipy>=1.3',
'pandas>=1.0', 'pandas>=1.0',
'mllib @ git+https://git@gitlab.psi.ch/adelmann/mllib.git@master'
] ]
) )
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment