Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hdf5_writer.py 17.84 KiB
import sys
import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)
import pandas as pd
import numpy as np
import h5py
import logging
import json
import utils.g5505_utils as utils
import instruments.filereader_registry as filereader_registry
def __transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
"""
Transfers data from a file_dict to an HDF5 file.
Parameters
----------
h5file : h5py.File
HDF5 file object where the data will be written.
group_name : str
Name of the HDF5 group where data will be stored.
file_dict : dict
Dictionary containing file data to be transferred. Required structure:
{
'name': str,
'attributes_dict': dict,
'datasets': [
{
'name': str,
'data': array-like,
'shape': tuple,
'attributes': dict (optional)
},
...
]
}
Returns
-------
None
"""
if not file_dict:
return
try:
# Create group and add their attributes
filename = file_dict['name']
group = h5file[group_name].create_group(name=filename)
# Add group attributes
group.attrs.update(file_dict['attributes_dict'])
# Add datasets to the just created group
for dataset in file_dict['datasets']:
dataset_obj = group.create_dataset(
name=dataset['name'],
data=dataset['data'],
shape=dataset['shape']
)
# Add dataset's attributes
attributes = dataset.get('attributes', {})
dataset_obj.attrs.update(attributes)
group.attrs['last_update_date'] = utils.created_at().encode('utf-8')
stdout = f'Completed transfer for /{group_name}/{filename}'
except Exception as inst:
stdout = inst
logging.error('Failed to transfer data into HDF5: %s', inst)
return stdout
def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True):
# Create copy of original file to avoid possible file corruption and work with it.
if work_with_copy:
tmp_file_path = utils.make_file_copy(source_file_path)
else:
tmp_file_path = source_file_path
# Open backup h5 file and copy complet filesystem directory onto a group in h5file
with h5py.File(tmp_file_path,'r') as src_file:
dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)
if 'tmp_files' in tmp_file_path:
os.remove(tmp_file_path)
stdout = f'Completed transfer for /{dest_group_name}'
return stdout
def create_hdf5_file_from_filesystem_path(path_to_input_directory: str,
path_to_filenames_dict: dict = None,
select_dir_keywords : list = [],
root_metadata_dict : dict = {}, mode = 'w'):
"""
Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
of a given filesystem path.
The data integration capabilities are limited by our file reader, which can only access data from a list of
admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
Files are formatted as composite objects consisting of a group, file, and attributes.
Parameters
----------
output_filename : str
Name of the output HDF5 file.
path_to_input_directory : str
Path to root directory, specified with forward slashes, e.g., path/to/root.
path_to_filenames_dict : dict, optional
A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files.
If provided, 'input_file_system_path' is ignored.
select_dir_keywords : list
List of string elements to consider or select only directory paths that contain
a word in 'select_dir_keywords'. When empty, all directory paths are considered
to be included in the HDF5 file group hierarchy.
root_metadata_dict : dict
Metadata to include at the root level of the HDF5 file.
mode : str
'w' create File, truncate if it exists, or 'r+' read/write, File must exists. By default, mode = "w".
Returns
-------
output_filename : str
Path to the created HDF5 file.
"""
if not mode in ['w','r+']:
raise ValueError(f'Parameter mode must take values in ["w","r+"]')
if not '/' in path_to_input_directory:
raise ValueError('path_to_input_directory needs to be specified using forward slashes "/".' )
#path_to_output_directory = os.path.join(path_to_input_directory,'..')
path_to_input_directory = os.path.normpath(path_to_input_directory).rstrip(os.sep)
for i, keyword in enumerate(select_dir_keywords):
select_dir_keywords[i] = keyword.replace('/',os.sep)
if not path_to_filenames_dict:
# On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory.
# Therefore, there wont be a copying conflict by setting up input and output directories the same
path_to_filenames_dict = utils.copy_directory_with_contraints(input_dir_path=path_to_input_directory,
output_dir_path=path_to_input_directory,
dry_run=True)
# Set input_directory as copied input directory
root_dir = path_to_input_directory
path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5'
start_message = f'\n[Start] Data integration :\nSource: {path_to_input_directory}\nDestination: {path_to_output_file}\n'
print(start_message)
logging.info(start_message)
# Check if the .h5 file already exists
if os.path.exists(path_to_output_file) and mode in ['w']:
message = (
f"[Notice] The file '{path_to_output_file}' already exists and will not be overwritten.\n"
"If you wish to replace it, please delete the existing file first and rerun the program."
)
print(message)
logging.error(message)
else:
with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:
number_of_dirs = len(path_to_filenames_dict.keys())
dir_number = 1
for dirpath, filtered_filenames_list in path_to_filenames_dict.items():
# Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict.
if not filtered_filenames_list:
continue
group_name = dirpath.replace(os.sep,'/')
group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')
# Flatten group name to two level
if select_dir_keywords:
offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
else:
offset = 2
tmp_list = group_name.split('/')
if len(tmp_list) > offset+1:
group_name = '/'.join([tmp_list[i] for i in range(offset+1)])
try:
# Create group called "group_name". Hierarchy of nested groups can be implicitly defined by the forward slashes
if not group_name in h5file.keys():
h5file.create_group(group_name)
h5file[group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
#h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
#h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
#else:
#print(group_name,' was already created.')
instFoldermsgStart = f'Starting data transfer from instFolder: {group_name}'
print(instFoldermsgStart)
except Exception as inst:
stdout = inst
logging.error('Failed to create group %s into HDF5: %s', group_name, inst)
if 'data_lineage_metadata.json' in filtered_filenames_list:
idx = filtered_filenames_list.index('data_lineage_metadata.json')
data_lineage_file = filtered_filenames_list[idx]
try:
with open('/'.join([dirpath,data_lineage_file]),'r') as dlf:
data_lineage_dict = json.load(dlf)
filtered_filenames_list.pop(idx)
except json.JSONDecodeError:
data_lineage_dict = {} # Start fresh if file is invalid
else:
data_lineage_dict = {}
for filenumber, filename in enumerate(filtered_filenames_list):
# hdf5 path to filename group
dest_group_name = f'{group_name}/{filename}'
if not 'h5' in filename:
#file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
#file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
# Check whether there is an available file reader
if file_dict is not None and isinstance(file_dict, dict):
if 'attributes_dict' in file_dict:
file_dict['attributes_dict'].update(data_lineage_dict.get(filename,{}))
stdout = __transfer_file_dict_to_hdf5(h5file, group_name, file_dict)
else:
source_file_path = os.path.join(dirpath,filename)
dest_file_obj = h5file
#group_name +'/'+filename
#ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
#g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
stdout = __copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False)
# Update the progress bar and log the end message
instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
# Print and log the start message
utils.progressBar(dir_number, number_of_dirs, instFoldermsdEnd)
logging.info(instFoldermsdEnd )
dir_number = dir_number + 1
print('[End] Data integration')
logging.info('[End] Data integration')
if len(root_metadata_dict.keys())>0:
for key, value in root_metadata_dict.items():
#if key in h5file.attrs:
# del h5file.attrs[key]
h5file.attrs.create(key, value)
#annotate_root_dir(output_filename,root_metadata_dict)
#output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename)
return path_to_output_file #, output_yml_filename_path
def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None):
"""
Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns.
Parameters:
-----------
ofilename (str): Path for the output HDF5 file.
input_data (pd.DataFrame or str): Input data as a DataFrame or a valid file system path.
group_by_funcs (list): List of callables or column names to define hierarchical grouping.
approach (str): Specifies the approach ('top-down' or 'bottom-up') for creating the HDF5 file.
extract_attrs_func (callable, optional): Function to extract additional attributes for HDF5 groups.
Returns:
--------
None
"""
# Check whether input_data is a valid file-system path or a DataFrame
is_valid_path = lambda x: os.path.exists(x) if isinstance(x, str) else False
if is_valid_path(input_data):
# If input_data is a file-system path, create a DataFrame with file info
file_list = os.listdir(input_data)
df = pd.DataFrame(file_list, columns=['filename'])
df = utils.augment_with_filetype(df) # Add filetype information if needed
elif isinstance(input_data, pd.DataFrame):
# If input_data is a DataFrame, make a copy
df = input_data.copy()
else:
raise ValueError("input_data must be either a valid file-system path or a DataFrame.")
# Generate grouping columns based on group_by_funcs
if utils.is_callable_list(group_by_funcs):
grouping_cols = []
for i, func in enumerate(group_by_funcs):
col_name = f'level_{i}_groups'
grouping_cols.append(col_name)
df[col_name] = func(df)
elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
grouping_cols = group_by_funcs
else:
raise ValueError("'group_by_funcs' must be a list of callables or valid column names in the DataFrame.")
# Generate group paths
df['group_path'] = ['/' + '/'.join(row) for row in df[grouping_cols].values.astype(str)]
# Open the HDF5 file in write mode
with h5py.File(ofilename, 'w') as file:
for group_path in df['group_path'].unique():
# Create groups in HDF5
group = file.create_group(group_path)
# Filter the DataFrame for the current group
datatable = df[df['group_path'] == group_path].copy()
# Drop grouping columns and the generated 'group_path'
datatable = datatable.drop(columns=grouping_cols + ['group_path'])
# Add datasets to groups if data exists
if not datatable.empty:
dataset = utils.convert_dataframe_to_np_structured_array(datatable)
group.create_dataset(name='data_table', data=dataset)
# Add attributes if extract_attrs_func is provided
if extract_attrs_func:
attrs = extract_attrs_func(datatable)
for key, value in attrs.items():
group.attrs[key] = value
# Save metadata about depth of hierarchy
file.attrs.create(name='depth', data=len(grouping_cols) - 1)
print(f"HDF5 file created successfully at {ofilename}")
return ofilename
def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5_path, script_date, script_name):
"""
Save processed dataframe columns with annotations to an HDF5 file.
Parameters:
df (pd.DataFrame): DataFrame containing processed time series.
annotator (): Annotator object with get_metadata method.
output_filename (str): Path to the source HDF5 file.
"""
# Convert datetime columns to string
datetime_cols = df.select_dtypes(include=['datetime64']).columns
if list(datetime_cols):
df[datetime_cols] = df[datetime_cols].map(str)
# Convert dataframe to structured array
icad_data_table = utils.convert_dataframe_to_np_structured_array(df)
# Get metadata
metadata_dict = annotator.get_metadata()
# Prepare project level attributes to be added at the root level
project_level_attributes = metadata_dict['metadata']['project']
# Prepare high-level attributes
high_level_attributes = {
'parent_files': metadata_dict['parent_files'],
**metadata_dict['metadata']['sample'],
**metadata_dict['metadata']['environment'],
**metadata_dict['metadata']['instruments']
}
# Prepare data level attributes
data_level_attributes = metadata_dict['metadata']['datasets']
for key, value in data_level_attributes.items():
if isinstance(value,dict):
data_level_attributes[key] = utils.convert_attrdict_to_np_structured_array(value)
# Prepare file dictionary
file_dict = {
'name': project_level_attributes['processing_file'],
'attributes_dict': high_level_attributes,
'datasets': [{
'name': "data_table",
'data': icad_data_table,
'shape': icad_data_table.shape,
'attributes': data_level_attributes
}]
}
# Check if the file exists
if os.path.exists(output_filename):
mode = "a"
print(f"File {output_filename} exists. Opening in append mode.")
else:
mode = "w"
print(f"File {output_filename} does not exist. Creating a new file.")
# Write to HDF5
with h5py.File(output_filename, mode) as h5file:
# Add project level attributes at the root/top level
h5file.attrs.update(project_level_attributes)
__transfer_file_dict_to_hdf5(h5file, '/', file_dict)
#if __name__ == '__main__':