Code indexing in gitaly is broken and leads to code not being visible to the user. We work on the issue with highest priority.

Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hdf5_writer.py 17.84 KiB
import sys
import os
root_dir = os.path.abspath(os.curdir)
sys.path.append(root_dir)

import pandas as pd
import numpy as np
import h5py
import logging
import json

import utils.g5505_utils as utils
import instruments.filereader_registry as filereader_registry

 
   
def __transfer_file_dict_to_hdf5(h5file, group_name, file_dict):
    """
    Transfers data from a file_dict to an HDF5 file.

    Parameters
    ----------
    h5file : h5py.File
        HDF5 file object where the data will be written.
    group_name : str
        Name of the HDF5 group where data will be stored.
    file_dict : dict
        Dictionary containing file data to be transferred. Required structure:
        {
            'name': str,
            'attributes_dict': dict,
            'datasets': [
                {
                    'name': str,
                    'data': array-like,
                    'shape': tuple,
                    'attributes': dict (optional)
                },
                ...
            ]
        }

    Returns
    -------
    None
    """

    if not file_dict:
        return

    try:
        # Create group and add their attributes
        filename = file_dict['name']
        group = h5file[group_name].create_group(name=filename)
        # Add group attributes                                
        group.attrs.update(file_dict['attributes_dict'])
        
        # Add datasets to the just created group
        for dataset in file_dict['datasets']:
            dataset_obj = group.create_dataset(
                name=dataset['name'], 
                data=dataset['data'],
                shape=dataset['shape']
            )
            
            # Add dataset's attributes                                
            attributes = dataset.get('attributes', {})
            dataset_obj.attrs.update(attributes)
        group.attrs['last_update_date'] = utils.created_at().encode('utf-8')

        stdout = f'Completed transfer for /{group_name}/{filename}'

    except Exception as inst: 
        stdout = inst
        logging.error('Failed to transfer data into HDF5: %s', inst)

    return stdout

def __copy_file_in_group(source_file_path, dest_file_obj : h5py.File, dest_group_name, work_with_copy : bool = True):
    # Create copy of original file to avoid possible file corruption and work with it.

    if work_with_copy:
        tmp_file_path = utils.make_file_copy(source_file_path)
    else:
        tmp_file_path = source_file_path

    # Open backup h5 file and copy complet filesystem directory onto a group in h5file
    with h5py.File(tmp_file_path,'r') as src_file:
        dest_file_obj.copy(source= src_file['/'], dest= dest_group_name)

    if 'tmp_files' in tmp_file_path:
        os.remove(tmp_file_path)

    stdout = f'Completed transfer for /{dest_group_name}'
    return stdout

def create_hdf5_file_from_filesystem_path(path_to_input_directory: str, 
                                          path_to_filenames_dict: dict = None,
                                          select_dir_keywords : list = [],
                                          root_metadata_dict : dict = {}, mode = 'w'):

    """
    Creates an .h5 file with name "output_filename" that preserves the directory tree (or folder structure)
    of a given filesystem path.

    The data integration capabilities are limited by our file reader, which can only access data from a list of
    admissible file formats. These, however, can be extended. Directories are groups in the resulting HDF5 file.
    Files are formatted as composite objects consisting of a group, file, and attributes.

    Parameters
    ----------
    output_filename : str
        Name of the output HDF5 file.
    path_to_input_directory : str
        Path to root directory, specified with forward slashes, e.g., path/to/root.

    path_to_filenames_dict : dict, optional
        A pre-processed dictionary where keys are directory paths on the input directory's tree and values are lists of files.
        If provided, 'input_file_system_path' is ignored.

    select_dir_keywords : list
        List of string elements to consider or select only directory paths that contain
                                a word in 'select_dir_keywords'. When empty, all directory paths are considered
                                to be included in the HDF5 file group hierarchy.
    root_metadata_dict : dict
        Metadata to include at the root level of the HDF5 file.

    mode : str
        'w' create File, truncate if it exists, or 'r+' read/write, File must exists. By default, mode = "w".

    Returns
    -------
    output_filename : str
        Path to the created HDF5 file.
    """


    if not mode in ['w','r+']:
        raise ValueError(f'Parameter mode must take values in ["w","r+"]')
    
    if not '/' in path_to_input_directory:
        raise  ValueError('path_to_input_directory needs to be specified using forward slashes "/".' )

    #path_to_output_directory = os.path.join(path_to_input_directory,'..')
    path_to_input_directory = os.path.normpath(path_to_input_directory).rstrip(os.sep)    

    
    for i, keyword in enumerate(select_dir_keywords):
        select_dir_keywords[i] = keyword.replace('/',os.sep)          

    if not path_to_filenames_dict:
        # On dry_run=True, returns path to files dictionary of the output directory without making a actual copy of the input directory. 
        # Therefore, there wont be a copying conflict by setting up input and output directories the same
        path_to_filenames_dict = utils.copy_directory_with_contraints(input_dir_path=path_to_input_directory, 
                                                                      output_dir_path=path_to_input_directory,
                                                                      dry_run=True)
    # Set input_directory as copied input directory
    root_dir = path_to_input_directory
    path_to_output_file = path_to_input_directory.rstrip(os.path.sep) + '.h5'

    start_message = f'\n[Start] Data integration :\nSource: {path_to_input_directory}\nDestination: {path_to_output_file}\n'
    
    print(start_message)
    logging.info(start_message)

    # Check if the .h5 file already exists
    if os.path.exists(path_to_output_file) and mode in ['w']:
        message = (
            f"[Notice] The file '{path_to_output_file}' already exists and will not be overwritten.\n"
            "If you wish to replace it, please delete the existing file first and rerun the program."
        )
        print(message)
        logging.error(message)
    else:
        with h5py.File(path_to_output_file, mode=mode, track_order=True) as h5file:

            number_of_dirs = len(path_to_filenames_dict.keys())
            dir_number = 1
            for dirpath, filtered_filenames_list in path_to_filenames_dict.items():            
            
                # Check if filtered_filenames_list is nonempty. TODO: This is perhaps redundant by design of path_to_filenames_dict. 
                if not filtered_filenames_list:
                    continue

                group_name = dirpath.replace(os.sep,'/')
                group_name = group_name.replace(root_dir.replace(os.sep,'/') + '/', '/')

                # Flatten group name to two level
                if select_dir_keywords:
                    offset = sum([len(i.split(os.sep)) if i in dirpath else 0 for i in select_dir_keywords])
                else:
                    offset = 2
                tmp_list = group_name.split('/')
                if len(tmp_list) > offset+1:
                    group_name = '/'.join([tmp_list[i] for i in range(offset+1)])   

                try:
                    # Create group called "group_name". Hierarchy of nested groups can be implicitly defined by the forward slashes
                    if not group_name in h5file.keys():                    
                        h5file.create_group(group_name)
                        h5file[group_name].attrs['creation_date'] = utils.created_at().encode('utf-8')
                        #h5file[group_name].attrs.create(name='filtered_file_list',data=convert_string_to_bytes(filtered_filename_list))
                        #h5file[group_name].attrs.create(name='file_list',data=convert_string_to_bytes(filenames_list))
                    #else:                           
                        #print(group_name,' was already created.') 
                        instFoldermsgStart = f'Starting data transfer from instFolder: {group_name}'
                        print(instFoldermsgStart)

                except Exception as inst: 
                    stdout = inst
                    logging.error('Failed to create group %s into HDF5: %s', group_name, inst)

                if 'data_lineage_metadata.json' in filtered_filenames_list:
                    idx = filtered_filenames_list.index('data_lineage_metadata.json') 
                    data_lineage_file = filtered_filenames_list[idx]
                    try:
                        with open('/'.join([dirpath,data_lineage_file]),'r') as dlf:                        
                            data_lineage_dict = json.load(dlf)
                        filtered_filenames_list.pop(idx)
                    except json.JSONDecodeError:
                            data_lineage_dict = {}  # Start fresh if file is invalid
                        
                else:
                    data_lineage_dict = {}                
                   

                for filenumber, filename in enumerate(filtered_filenames_list):
                    
                    # hdf5 path to filename group 
                    dest_group_name = f'{group_name}/{filename}'

                    if not 'h5' in filename:
                        #file_dict = config_file.select_file_readers(group_id)[file_ext](os.path.join(dirpath,filename))
                        #file_dict = ext_to_reader_dict[file_ext](os.path.join(dirpath,filename))
                        file_dict = filereader_registry.select_file_reader(dest_group_name)(os.path.join(dirpath,filename))
                        # Check whether there is an available file reader 
                        if file_dict is not None and isinstance(file_dict, dict):
                            if 'attributes_dict' in file_dict:
                                file_dict['attributes_dict'].update(data_lineage_dict.get(filename,{}))

                        stdout = __transfer_file_dict_to_hdf5(h5file, group_name, file_dict)
                        
                    else:
                        source_file_path = os.path.join(dirpath,filename)
                        dest_file_obj = h5file
                        #group_name +'/'+filename
                        #ext_to_reader_dict[file_ext](source_file_path, dest_file_obj, dest_group_name)
                        #g5505f_reader.select_file_reader(dest_group_name)(source_file_path, dest_file_obj, dest_group_name)
                        stdout = __copy_file_in_group(source_file_path, dest_file_obj, dest_group_name, False)

                # Update the progress bar and log the end message
                instFoldermsdEnd = f'\nCompleted data transfer for instFolder: {group_name}\n'
                # Print and log the start message
                utils.progressBar(dir_number, number_of_dirs, instFoldermsdEnd)
                logging.info(instFoldermsdEnd )
                dir_number = dir_number + 1

            print('[End] Data integration')
            logging.info('[End] Data integration')
        
            if len(root_metadata_dict.keys())>0:
                for key, value in root_metadata_dict.items():
                    #if key in h5file.attrs:
                    #    del h5file.attrs[key]
                    h5file.attrs.create(key, value)
                #annotate_root_dir(output_filename,root_metadata_dict)  

    
    #output_yml_filename_path = hdf5_vis.take_yml_snapshot_of_hdf5_file(output_filename)

    return path_to_output_file #, output_yml_filename_path

def create_hdf5_file_from_dataframe(ofilename, input_data, group_by_funcs: list, approach: str = None, extract_attrs_func=None):
    """
    Creates an HDF5 file with hierarchical groups based on the specified grouping functions or columns.

    Parameters:
    -----------
        ofilename (str): Path for the output HDF5 file.
        input_data (pd.DataFrame or str): Input data as a DataFrame or a valid file system path.
        group_by_funcs (list): List of callables or column names to define hierarchical grouping.
        approach (str): Specifies the approach ('top-down' or 'bottom-up') for creating the HDF5 file.
        extract_attrs_func (callable, optional): Function to extract additional attributes for HDF5 groups.

    Returns:
    --------
        None
    """
    # Check whether input_data is a valid file-system path or a DataFrame
    is_valid_path = lambda x: os.path.exists(x) if isinstance(x, str) else False

    if is_valid_path(input_data):
        # If input_data is a file-system path, create a DataFrame with file info
        file_list = os.listdir(input_data)
        df = pd.DataFrame(file_list, columns=['filename'])
        df = utils.augment_with_filetype(df)  # Add filetype information if needed
    elif isinstance(input_data, pd.DataFrame):
        # If input_data is a DataFrame, make a copy
        df = input_data.copy()
    else:
        raise ValueError("input_data must be either a valid file-system path or a DataFrame.")

    # Generate grouping columns based on group_by_funcs
    if utils.is_callable_list(group_by_funcs):
        grouping_cols = []
        for i, func in enumerate(group_by_funcs):
            col_name = f'level_{i}_groups'
            grouping_cols.append(col_name)
            df[col_name] = func(df)
    elif utils.is_str_list(group_by_funcs) and all([item in df.columns for item in group_by_funcs]):
        grouping_cols = group_by_funcs
    else:
        raise ValueError("'group_by_funcs' must be a list of callables or valid column names in the DataFrame.")

    # Generate group paths
    df['group_path'] = ['/' + '/'.join(row) for row in df[grouping_cols].values.astype(str)]

    # Open the HDF5 file in write mode
    with h5py.File(ofilename, 'w') as file:
        for group_path in df['group_path'].unique():
            # Create groups in HDF5
            group = file.create_group(group_path)

            # Filter the DataFrame for the current group
            datatable = df[df['group_path'] == group_path].copy()

            # Drop grouping columns and the generated 'group_path'
            datatable = datatable.drop(columns=grouping_cols + ['group_path'])

            # Add datasets to groups if data exists
            if not datatable.empty:
                dataset = utils.convert_dataframe_to_np_structured_array(datatable)
                group.create_dataset(name='data_table', data=dataset)

            # Add attributes if extract_attrs_func is provided
            if extract_attrs_func:
                attrs = extract_attrs_func(datatable)
                for key, value in attrs.items():
                    group.attrs[key] = value

        # Save metadata about depth of hierarchy
        file.attrs.create(name='depth', data=len(grouping_cols) - 1)

    print(f"HDF5 file created successfully at {ofilename}")

    return ofilename


def save_processed_dataframe_to_hdf5(df, annotator, output_filename): # src_hdf5_path, script_date, script_name):
    """
    Save processed dataframe columns with annotations to an HDF5 file.

    Parameters:
        df (pd.DataFrame): DataFrame containing processed time series.
        annotator (): Annotator object with get_metadata method.
        output_filename  (str): Path to the source HDF5 file.
    """
    # Convert datetime columns to string
    datetime_cols = df.select_dtypes(include=['datetime64']).columns

    if list(datetime_cols):
        df[datetime_cols] = df[datetime_cols].map(str)

    # Convert dataframe to structured array
    icad_data_table = utils.convert_dataframe_to_np_structured_array(df)

    # Get metadata
    metadata_dict = annotator.get_metadata()

    # Prepare project level attributes to be added at the root level

    project_level_attributes = metadata_dict['metadata']['project']
    
    # Prepare high-level attributes
    high_level_attributes = {
        'parent_files': metadata_dict['parent_files'],
        **metadata_dict['metadata']['sample'],
        **metadata_dict['metadata']['environment'],
        **metadata_dict['metadata']['instruments']
    }

    # Prepare data level attributes
    data_level_attributes = metadata_dict['metadata']['datasets']

    for key, value in data_level_attributes.items():
        if isinstance(value,dict):
            data_level_attributes[key] = utils.convert_attrdict_to_np_structured_array(value)


    # Prepare file dictionary
    file_dict = {
        'name': project_level_attributes['processing_file'],
        'attributes_dict': high_level_attributes,
        'datasets': [{
            'name': "data_table",
            'data': icad_data_table,
            'shape': icad_data_table.shape,
            'attributes': data_level_attributes
        }]
    }

    # Check if the file exists
    if os.path.exists(output_filename):
        mode = "a"
        print(f"File {output_filename} exists. Opening in append mode.")        
    else:
        mode = "w"
        print(f"File {output_filename} does not exist. Creating a new file.")


    # Write to HDF5
    with h5py.File(output_filename, mode) as h5file:
        # Add project level attributes at the root/top level
        h5file.attrs.update(project_level_attributes)
        __transfer_file_dict_to_hdf5(h5file, '/', file_dict)

#if __name__ == '__main__':