Source code for tables_io.io_utils.read

"""IO Read Functions for tables_io"""

import os
import json
from collections import OrderedDict

import numpy as np
from typing import List, Mapping, Optional, Union

from ..utils.array_utils import force_to_pandables
from ..conv.conv_tabledict import convert
from ..conv.conv_table import dataframe_to_dict, hdf5_group_to_dict
from ..lazy_modules import apTable, fits, h5py, pa, pd, pq
from ..types import (
    ASTROPY_FITS,
    ASTROPY_HDF5,
    FILE_FORMAT_SUFFIX_MAP,
    FILE_FORMATS,
    JSON,
    JSON_STRING,
    NATIVE_FORMAT,
    NATIVE_TABLE_TYPE,
    NUMPY_FITS,
    NUMPY_HDF5,
    PANDAS_HDF5,
    PANDAS_PARQUET,
    PANDAS_CSV,
    PYARROW_HDF5,
    PYARROW_PARQUET,
    file_type,
)


def _force_to_slice(
    val: slice | int | None,
    check_step_for: str="",
) -> slice | None:
    if val is None:
        return None
    if isinstance(val, int):
        return slice(val, val+1)
    if not check_step_for and val.step is not None:
        raise ValueError(f"Function {check_step_for} does not allow step {val}")
    return val
    

# I. Top-level interface functions


[docs] def read( filepath: str, tType: Union[int, str, None] = None, fmt: Optional[str] = None, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, slice_dict: dict[str, slice | int] | None = None, **kwargs, ): """Reads in a given file to either a `Table-like` format if there is one table within the file, or a `TableDict-like` format if there are multiple tables or files. Uses :py:func:`read_native` to read the file. The `TableDict-like` format is an `OrderedDict` of `Table-like` objects. The `Table-like` objects currently supported are: `astropyTable`, `numpyRecarray`, `numpyDict` (dict of `numpy` arrays), `pandasDataFrame`, and `pyarrowTable`. If given just the filepath, the function will read any tables in the file to its default `Table-like` format in memory. If given a specific tabular type, the function will read in the file to the default type and then convert to the requested type. The `keys` argument is required when reading in multi-dataset parquet files, to specify which dataset files to read in. Otherwise, the only required argument is the filepath. Accepted tabular types: ================== =============== Format string Format integer ================== =============== "astropyTable" 0 "numpyDict" 1 "numpyRecarray" 2 "pandasDataFrame" 3 "pyarrowTable" 4 "jsonString" 5 ================== =============== Parameters ---------- filepath : `str` Full path to the file to load tType : `int`, `str` or `None` Table type, if `None` the default table type will be used. fmt : `str` or `None` File format, if `None` it will be taken from the file extension. keys : `list` or `None` This argument is required for reading multiple associated parquet files. The keys should be the unique identifiers for each dataset or file. allow_missing_keys : `bool`, by default False If False will raise FileNotFoundError if a key is missing from the given file. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables kwargs : Additional arguments to pass to the native file reader Returns ------- data : `OrderedDict` ( `str` -> `Table-like` ) The data Example ------- For a single `Table-like` object, we can read it in as follows: >>> import tables_io >>> df = tables_io.read('filename.h5') >>> print(df) col1 col2 0 1 3 1 2 4 Notice that it has been automatically read in as the default tabular type for `h5` files, a `pandasDataFrame`. For a `TableDict-like` object, we read it in as follows: >>> table_dict = tables_io.read('filename.hdf5', tType='astropyTable') >>> table_dict OrderedDict({'tab_1': <Table length=2> x y int64 int64 ----- ----- 2 1 4 3, 'tab_2': <Table length=2> a b int64 int64 ----- ----- 5 3 7 4}) Notice that the resulting `OrderedDict` has `astropyTable` objects as the values. """ odict = read_native(filepath, fmt, keys, allow_missing_keys, slice_dict, **kwargs) if len(odict) == 1: # For special keys, use the table alone without an enclosing dictionary. single_dict_key = list(odict.keys())[0] if single_dict_key in ["", None, "__astropy_table__", "data"]: odict = odict[single_dict_key] if tType is None: # pragma: no cover return odict return convert(odict, tType)
[docs] def read_native( filepath: str, fmt: Optional[str] = None, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, slice_dict: dict[str, slice | int] | None = None, **kwargs, ): """Reads in a file to its corresponding default tabular format. The format of the file is either given by `fmt`, or determined based on the `suffix` of the file path. This determines what tabular format the file is read in as. In all cases, the data from the file is returned as an `OrderedDict` or `TableDict-like` object, with `str` keys and `Table-like` values. The `Table-like` values can be `astropyTable`, `numpyRecarray`, `numpyDict` (dict of `numpy` arrays), `pandasDataFrame`, and `pyarrowTable`. Parameters ---------- filepath : `str` Full path of the file to load fmt : `str` or `None` File format, if `None` it will be taken from the file extension. keys : `list` or `None` This argument is required for reading multiple associated parquet files. The keys should be the unique identifiers for each dataset or file. allow_missing_keys : `bool`, by default False. If False will raise FileNotFoundError if a key is missing from the given file. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables kwargs : Additional arguments to pass to the native file reader Returns ------- data : `OrderedDict` ( `str` -> `Table-like` ) The data Example ------- Reading in a file that is in `NUMPY_HDF5` format: >>> import tables_io >>> tab = tables_io.read_native('filename.hdf5') >>> print(tab) OrderedDict({'tab_1': OrderedDict({'col_1': array([0., 2.]), 'col_2': array([2., 3.])}), 'tab_2': OrderedDict({'col_a': array([1., 1.]), 'col_b': array([3., 3.])})}) """ fType = file_type(filepath, fmt) if fType == ASTROPY_FITS: try: return read_fits_to_ap_tables(filepath, keys=keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == ASTROPY_HDF5: try: return read_HDF5_to_ap_tables(filepath, keys=keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == NUMPY_HDF5: try: return read_HDF5_to_dicts(filepath, keys=keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == NUMPY_FITS: try: return read_fits_to_recarrays(filepath, keys=keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == PANDAS_HDF5: try: return read_H5_to_dataframes(filepath, keys=keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == PANDAS_PARQUET: try: return read_pq_to_dataframes( filepath, keys, allow_missing_keys, slice_dict=slice_dict, **kwargs ) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == PYARROW_HDF5: try: return read_HDF5_to_tables(filepath, keys, slice_dict=slice_dict) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == PYARROW_PARQUET: try: return read_pq_to_tables( filepath, keys, allow_missing_keys, slice_dict=slice_dict, **kwargs ) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == PANDAS_CSV: try: return read_csv_to_dataframes( filepath, keys, allow_missing_keys, slice_dict=slice_dict, **kwargs ) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e if fType == JSON: try: return read_json(filepath, keys, allow_missing_keys, slice_dict=slice_dict, **kwargs) except Exception as e: raise RuntimeError( read_native_error_message( filepath, fType, fmt, keys, allow_missing_keys, **kwargs ) + f" \n because of error: \n {e}" ) from e raise TypeError( f"Unsupported FileType {fType}. Supported types are: {list(FILE_FORMATS.values())}" ) # pragma: no cover
[docs] def io_open(filepath: str, fmt: Optional[str] = None, **kwargs): """Returns the file object. This allows you to open large files without reading the whole file into memory. It opens the file object with different packages depending on the file type. It uses astropy to open FITS files (`astropy.io.fits.open()`), h5py for any HDF5 files (`h5py.File()`), or pyarrow parquet for any parquet files (`pyarrow.parquet.ParquetFile()`). You can specify which file type you are supplying via the `fmt` argument, or it will automatically determine the file type from its suffix. If the given file is not one of the supported types, it will raise a TypeError. Parameters ---------- filepath : `str` The path to the file to load. fmt : `str` or `None` The file format, if `None` it will be taken from the file extension. Returns ------- File object. One of `pyarrow.parquet.ParquetFile`, `h5py.File` or `astropy.io.fits.HDUList`. Example ------- For example, to read in a sample `fits` file: >>> import tables_io >>> hdul = tables_io.io_open("./data/test.fits", "fits") >>> hdul.info() No. Name Ver Type Cards Dimensions Format 0 PRIMARY 1 PrimaryHDU 4 () 1 DF 1 BinTableHDU 37 10R x 14C [K, E, E, E, E, E, E, E, E, E, E, E, E, D] """ fType = file_type(filepath, fmt) if fType in [ASTROPY_FITS, NUMPY_FITS]: return fits.open(filepath, **kwargs) if fType in [ASTROPY_HDF5, NUMPY_HDF5, PANDAS_HDF5, PYARROW_HDF5]: return h5py.File(filepath, **kwargs) if fType in [PYARROW_PARQUET, PANDAS_PARQUET]: # basepath = os.path.splitext(filepath)[0] return pq.ParquetFile(filepath, **kwargs) if fType in [PANDAS_CSV]: if "iterator" not in kwargs: kwargs["iterator"] = True return pd.read_csv(filepath, **kwargs) if fType in [JSON]: raise NotImplementedError("Can not use io_open on json files") raise TypeError( f"Unsupported FileType {fType}. Supported types are: {list(FILE_FORMATS.values())}" ) # pragma: no cover
[docs] def check_columns( filepath: str, columns_to_check: List[str], fmt: Optional[str] = None, parent_groupname: Optional[str] = None, **kwargs, ): """Read the file column names from file and ensure that it contains at least the columns specified in a provided list. If not, an error will be raised. * For FITS files, columns across all extensions will be checked at one time. * For HDF5 files, only columns within a single level of the specified parent_groupname will be checked. Note: If more columns are available in the file than specified in the list, the file will still pass the check. Parameters ---------- filepath : `str` File name for the file to read. If there's no suffix, it will be applied based on the object type. columns_to_check: `list` A list of columns to be compared with the data fmt : `str` or `None` The input file format, If `None` this will use `io_open` parent_groupname: `str` or `None` For hdf5 files, the groupname for the data """ fType = file_type(filepath, fmt) # Read the file below: file = io_open(filepath, fmt=None, **kwargs) if fType in [ASTROPY_FITS, NUMPY_FITS]: col_list = [] for hdu in file[1:]: columns = hdu.columns for col in columns: if col.name not in col_list: col_list.append(col.name) elif fType in [ASTROPY_HDF5, NUMPY_HDF5, PANDAS_HDF5, PYARROW_HDF5]: col_list = read_HDF5_group_names(filepath, parent_groupname=parent_groupname) elif fType in [PYARROW_PARQUET, PANDAS_PARQUET]: col_list = file.schema.names elif fType in [PANDAS_CSV]: data = file.read(nrows=1) col_list = data.columns.tolist() else: raise TypeError( f"Unsupported FileType {fType}. Supported types are: {list(FILE_FORMATS.values())}" ) # pragma: no cover # check columns intersection = set(columns_to_check).intersection(col_list) if len(intersection) < len(columns_to_check): diff = set(columns_to_check) - intersection raise KeyError("The following columns are not found: ", diff)
# II. Reading Files # II A. Reading `astropy.table.Table` from FITS files
[docs] def read_fits_to_ap_tables( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """ Reads `astropy.table.Table` objects into an `OrderedDict` TableDict-like object from a FITS file. If a list of keys is given, will read only those tables. Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` A list of which tables to read, in lower case. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- tables : `OrderedDict` of `astropy.table.Table` Keys will be HDU names, values will be tables """ fin = fits.open(filepath) tables = OrderedDict() for i, hdu in enumerate(fin[1:]): if keys is not None: if hdu.name.lower() not in keys: continue if slice_dict is not None: the_slice = slice_dict.get(hdu.name.lower()) else: the_slice = None # In base case, handle cases where no names are provided or # names are repeated. If no names are provided and more than one table # is in the FITS file, use string of extension number as its name ext_num = i + 1 tab_name = hdu.name.lower() if (tab_name == "") & (len(fin) > 2): tab_name = str(ext_num) # Checking for repeated names: if tab_name in tables.keys(): tab_name = f"{tab_name}_{str(ext_num)}" if the_slice is not None: # FIXME, improve this to actually only read slice tables[tab_name] = apTable.Table.read(filepath, hdu=ext_num)[the_slice] else: tables[tab_name] = apTable.Table.read(filepath, hdu=ext_num) return tables
# II B Reading `np.recarray` from FITS files
[docs] def read_fits_to_recarrays( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """ Reads `np.recarray` objects into an `OrderedDict` TableDict-like object from a FITS file. If a list of keys is given, will read only those tables. Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` A list of which HDU names to read, in lower case. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- tables : `OrderedDict` of `np.recarray` Keys will be HDU names, values will be tables """ fin = fits.open(filepath) tables = OrderedDict() for i, hdu in enumerate(fin[1:]): if keys is not None and hdu.name.lower() not in keys: continue if slice_dict is not None: the_slice = slice_dict.get(hdu.name.lower()) else: the_slice = None # In base case, handle cases where no names are provided or # names are repeated. If no names are provided and more than one table # is in the FITS file, use string of extension number as its name ext_num = i + 1 tab_name = hdu.name.lower() if (tab_name == "") & (len(fin) > 2): tab_name = str(ext_num) # Checking for repeated names: if tab_name in tables.keys(): tab_name = f"{tab_name}_{str(ext_num)}" if the_slice: tables[tab_name] = hdu.data[the_slice] else: tables[tab_name] = hdu.data return tables
# II C Reading `astropy.table.Table` from HDF5 file
[docs] def read_HDF5_to_ap_tables( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """ Reads `astropy.table.Table` objects into an `OrderedDict` TableDict-like object from an hdf5 file. Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` A list of which datasets to read in. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- tables : `OrderedDict` of `astropy.table.Table` Keys will be 'paths', values will be tables """ fin = h5py.File(filepath) tables = OrderedDict() for k in fin.keys(): if keys is not None and k not in keys: continue if slice_dict is not None: the_slice = slice_dict.get(k) else: the_slice = None if the_slice is not None: if the_slice.step: raise ValueError( f"Can not use step when reading astropy tables {the_slice}" ) tables[k] = apTable.Table.read(filepath, path=k, format="hdf5")[the_slice] else: tables[k] = apTable.Table.read(filepath, path=k, format="hdf5") return tables
## II D. Reading `OrderedDict` (`str`, `numpy.array`) and `np.array` from HDF5 file
[docs] def read_HDF5_group( filepath: str, groupname: Optional[str] = None, read_slice: slice | int | None = None, ): """Read and return the requested group and file object from an hdf5 file. If no group is provided, returns the `h5py.File` object twice. Parameters ---------- filepath : `str` File in question groupname : `str` or `None` The name or path to the desired group. read_slice : `slice` or `int` or `None` Slice of data to read Returns ------- grp : `h5py.Group` or `h5py.File` The requested group infp : `h5py.File` The input file (returned so that the user can explicitly close the file) """ infp = h5py.File(filepath, "r") if groupname is None or not groupname: # pragma: no cover return infp, infp read_slice = _force_to_slice(read_slice) if read_slice is not None: return infp[groupname][read_slice], infp return infp[groupname], infp
[docs] def read_HDF5_group_to_dict(hg, start: Optional[int] = None, end: Optional[int] = None): """ Reads `numpy.array` objects from an open hdf5 file object. If given a dataset, returns a `numpy.array` of that dataset. If given a group, it will read `numpy.array` objects into an `OrderedDict` for all of the keys in that group. If start and end are provided, it will only read in the given slice [start:end] of all the datasets. Parameters ---------- hg: `hdf5` object The hdf5 object to read in, either a dataset or a group. start : `int` or `None` Starting row of dataset(s) to read. end : `int` or `None` Ending row of dataset(s) to read. Returns ------- tables : `OrderedDict` of `numpy.array` or a `numpy.array` Keys will be 'paths', values will be arrays in the case of an `OrderedDict`. """ # pylint: disable=unused-argument if isinstance(hg, h5py.Dataset): return read_HDF5_dataset_to_array(hg, start, end) return OrderedDict( [(key, read_HDF5_dataset_to_array(val, start, end)) for key, val in hg.items()] )
[docs] def read_HDF5_group_names( filepath: str, parent_groupname: Optional[str] = None ) -> List[str]: """Read and return the list of group names from one level of an hdf5 file. Parameters ---------- filepath : `str` File in question parent_groupname : `str` or `None` For hdf5 files, the parent groupname. All group names under this will be returned. If `None`, return the top level group names. Returns ------- names : `list` of `str` The names of the groups in the file """ infp = h5py.File(filepath, "r") if parent_groupname is None: # pragma: no cover return list(infp.keys()) try: subgroups = infp[parent_groupname].keys() except KeyError as msg: raise KeyError( f"Group {parent_groupname} not found in file {filepath}" ) from msg return list(subgroups)
[docs] def read_HDF5_to_dicts( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """ Reads `numpy.array` objects into an `OrderedDict` from an hdf5 file. If a list of keys is given, will only read those specific datasets. Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` A list of which tables to read from the file. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- dicts : `OrderedDict`, (`str`, `OrderedDict`, (`str`, `numpy.array`) ) The data """ fin = h5py.File(filepath) l_out = [] for key, val in fin.items(): if keys is not None and key not in keys: continue if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None if the_slice is not None: if the_slice.step: raise ValueError( f"Can not use step with read_HDF5_group_to_dict {the_slice}" ) l_out.append( ( key, read_HDF5_group_to_dict( val, start=the_slice.start, end=the_slice.stop ), ) ) else: l_out.append((key, read_HDF5_group_to_dict(val))) return OrderedDict(l_out)
[docs] def read_HDF5_dataset_to_array( dataset, start: Optional[int] = None, end: Optional[int] = None ) -> np.array: """Reads all or part of a hdf5 dataset into a `numpy.array` Parameters ---------- dataset : `h5py.Dataset` The input dataset start : `int` or `None` Starting row end : `int` or `None` Ending row Returns ------- out : `numpy.array` Something that pandas can handle """ if start is None or end is None: return np.array(dataset) return np.array(dataset[start:end])
# II D. Reading `pandas.DataFrame` from HDF5
[docs] def read_H5_to_dataframe( filepath: str, key: Optional[str] = None, read_slice: slice | int | None = None, ): """ Reads `pandas.DataFrame` objects from an 'h5' file (a pandas `hdf5` file). Parameters ---------- filepath: `str` Path to input file key : `str` or `None` The key in the hdf5 file read_slice : `slice` or `int` or `None` Slice of data to read Returns ------- df : `pandas.DataFrame` The dataframe """ read_slice = _force_to_slice(read_slice, "read_H5_to_dataframe") if read_slice is not None: return pd.read_hdf(filepath, key, start=read_slice.start, stop=read_slice.stop) return pd.read_hdf(filepath, key)
[docs] def read_H5_to_dataframes( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """Open an `h5` (pandas `hdf5`) file and and return an `OrderedDict` of `pandas.DataFrame` objects Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` A list of which tables to read. slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- tab : `OrderedDict` (`str` : `pandas.DataFrame`) The data Notes ----- We are using the file suffix 'h5' to specify 'hdf5' files written from DataFrames using `pandas` They have a different structure than 'hdf5' files written with `h5py` or `astropy.table` """ fin = h5py.File(filepath) l_out = [] for key in fin.keys(): if keys is not None and key not in keys: continue if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None l_out.append( (key, read_H5_to_dataframe(filepath, key=key, read_slice=the_slice)) ) return OrderedDict(l_out)
# II E Reading `pandas.DataFrame` from parquet file
[docs] def read_pq_to_dataframe( filepath: str, columns: Optional[List[str]] = None, read_slice: slice | int | None = None, **kwargs, ): """ Reads a `pandas.DataFrame` object from a parquet file. Parameters ---------- filepath: `str` Path to input file columns : `list` (`str`) or `None` Names of the columns to read, `None` will read all the columns read_slice : `slice` or `int` or `None` Slice of data to read **kwargs : additional arguments to pass to the native file reader Returns ------- df : `pandas.DataFrame` The data frame """ read_slice = _force_to_slice(read_slice, "read_pq_to_dataframe") if read_slice is not None: filters = [("id", ">=", read_slice.start), ("id", "<=", read_slice.stop)] return pd.read_parquet( filepath, engine="pyarrow", columns=columns, filters=filters, **kwargs ) return pd.read_parquet(filepath, engine="pyarrow", columns=columns, **kwargs)
[docs] def read_pq_to_dataframes( filepath: str, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, columns: Union[List[str], Mapping, None] = None, slice_dict: dict[str, slice | int] | None = None, **kwargs, ) -> Mapping: """ Reads `pandas.DataFrame` objects from an parquet file. Parameters ---------- filepath: `str` Path to input file keys : `list` Keys for the input objects. Used to complete filepaths allow_missing_keys: `bool` If False will raise FileNotFoundError if a key is missing columns : `dict` of `list (str)`, `list` (`str`), or `None` Names of the columns to read. - if a dictionary, keys are the `keys`, and values are a list of string column names. for each keyed table, only the columns in the value list will be loaded. if the key is not found, all columns will be loaded. - if a list, only the columns in the list will be loaded. - `None` will read all the columns slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables **kwargs : additional arguments to pass to the native file reader Returns ------- tables : `OrderedDict` of `pandas.DataFrame` Keys will be taken from keys """ if keys is None: # pragma: no cover keys = [""] dataframes = OrderedDict() basepath, ext = os.path.splitext(filepath) if not ext: # pragma: no cover ext = "." + FILE_FORMAT_SUFFIX_MAP[PANDAS_PARQUET] for key in keys: if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None try: column_list = None if pd.api.types.is_dict_like(columns): column_list = columns[key] elif pd.api.types.is_list_like(columns): column_list = columns print("column_list", column_list) dataframes[key] = read_pq_to_dataframe( f"{basepath}{key}{ext}", columns=column_list, read_slice=the_slice, **kwargs, ) except FileNotFoundError as msg: # pragma: no cover if allow_missing_keys: continue raise msg return dataframes
# II F. Reading `OrderedDict` (`str`, `numpy.array`) from parquet file
[docs] def read_pq_to_dict( filepath: str, columns: Optional[List[str]] = None, read_slice: slice | int | None = None, **kwargs, ) -> Mapping: """Open a parquet file and return an `OrderedDict` of `numpy.array` objects Parameters ---------- filepath: `str` Path to input file columns : `list` (`str`) or `None` Names of the columns to read, `None` will read all the columns read_slice : `slice` or `int` or `None` Slice of data to read **kwargs : additional arguments to pass to the native file reader Returns ------- tab : `OrderedDict` (`str` : `numpy.array`) The data """ read_slice = _force_to_slice(read_slice, "read_pq_to_dict") if read_slice is not None: filters = [("id", ">=", read_slice.start), ("id", "<=", read_slice.stop)] tab = pq.read_table(filepath, columns=columns, filters=filters, **kwargs) else: tab = pq.read_table(filepath, columns=columns, **kwargs) return OrderedDict( [ (c_name, col.to_numpy()) for c_name, col in zip(tab.column_names, tab.itercolumns()) ] )
[docs] def read_H5_to_dict( filepath: str, groupname: Optional[str] = None, read_slice: slice | int | None = None, ) -> Mapping: """Open an `h5` file and and return an `OrderedDict` of `numpy.array` objects. Parameters ---------- filepath: `str` Path to input file groupname : `str` or `None` The name of the group with the data read_slice : `slice` or `int` or `None` Slice of data to read Returns ------- tab : `OrderedDict` (`str` : `numpy.array`) The data Notes ----- We are using the file suffix 'h5' to specify 'hdf5' files written from DataFrames using `pandas` They have a different structure than 'hdf5' files written with `h5py` or `astropy.table` """ df = read_H5_to_dataframe(filepath, groupname, read_slice) return dataframe_to_dict(df)
[docs] def read_HDF5_to_dict( filepath: str, groupname: Optional[str] = None, read_slice: slice | int | None = None, ) -> Mapping: """Read in h5py hdf5 data, return a dictionary of all of the keys Parameters ---------- filepath: `str` Path to input file groupname : `str` or `None` The groupname for the data read_slice : `slice` or `int` or `None` Slice of data to read Returns ------- tab : `OrderedDict` (`str` : `numpy.array`) The data Notes ----- We are using the file suffix 'hdf5' to specify 'hdf5' files written with `h5py` or `astropy.table` They have a different structure than 'h5' files written `panda` """ hg, infp = read_HDF5_group(filepath, groupname, read_slice) data = hdf5_group_to_dict(hg) infp.close() return data
# II G. Reading `pyarrow.Table` from HDF5 file
[docs] def read_HDF5_to_table( filepath: str, key: Optional[str] = None, read_slice: slice | int | None = None, ): """ Reads `pyarrow.Table` objects from an hdf5 file. Parameters ---------- filepath: `str` Path to input file key : `str` or `None` The key in the hdf5 file read_slice : `slice` or `int` or `None` Slice of data to read Returns ------- table : `pyarrow.Table` The table """ pydict = read_HDF5_to_dicts(filepath, [key], slice_dict={key: read_slice})[key] t_dict = {} for key, val in pydict.items(): t_dict[key] = force_to_pandables(val) return pa.Table.from_pydict(t_dict)
[docs] def read_HDF5_to_tables( filepath: str, keys: Optional[List[str]] = None, slice_dict: dict[str, slice | int] | None = None, ) -> Mapping: """Open an `HDF5` file and and return an `OrderedDict` of `pyarrow.Table` Parameters ---------- filepath: `str` Path to input file keys : `list` or `None` Which tables to read slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables Returns ------- tab : `OrderedDict` (`str` : `pyarrow.Table`) The data """ fin = h5py.File(filepath) l_out = [] for key in fin.keys(): if keys is not None and key not in keys: # pragma: no cover continue if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None l_out.append((key, read_HDF5_to_table(filepath, key=key, read_slice=the_slice))) return OrderedDict(l_out)
# II H. Reading `pyarrow.Table` from parquet file
[docs] def read_pq_to_table( filepath: str, columns: Optional[List[str]] = None, read_slice: slice | int | None = None, **kwargs, ): """ Reads a `pyarrow.Table` object from an parquet file. Parameters ---------- filepath: `str` Path to input file columns : `list` (`str`) or `None` Names of the columns to read, `None` will read all the columns read_slice : `slice` or `int` or `None` Slice of data to read **kwargs : additional arguments to pass to the native file reader Returns ------- table : `pyarrow.Table` The table """ read_slice = _force_to_slice(read_slice, "read_pq_to_table") if read_slice is not None: filters = [("id", ">=", read_slice.start), ("id", "<=", read_slice.stop)] return pq.read_table(filepath, columns=columns, filters=filters, **kwargs) return pq.read_table(filepath, columns=columns, **kwargs)
[docs] def read_pq_to_tables( filepath: str, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, columns: Union[List[str], Mapping, None] = None, slice_dict: dict[str, slice | int] | None = None, **kwargs, ) -> Mapping: """ Reads `pyarrow.Table` objects from a parquet file into an `OrderedDict`. Parameters ---------- filepath: `str` Path to input file keys : `list` Keys for the input objects. Used to complete filepaths allow_missing_keys: `bool` If False will raise FileNotFoundError if a key is missing. By default False. columns : `dict` of `list (str)`, `list` (`str`), or `None` Names of the columns to read. - if a dictionary, keys are the `keys`, and values are a list of string column names. for each keyed table, only the columns in the value list will be loaded. if the key is not found, all columns will be loaded. - if a list, only the columns in the list will be loaded. - `None` will read all the columns slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables **kwargs : additional arguments to pass to the native file reader Returns ------- tables : `OrderedDict` of `pyarrow.Table` Keys will be taken from keys """ if keys is None: # pragma: no cover keys = [""] tables = OrderedDict() basepath, ext = os.path.splitext(filepath) if not ext: # pragma: no cover ext = "." + FILE_FORMAT_SUFFIX_MAP[PANDAS_PARQUET] for key in keys: if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None try: column_list = None if pd.api.types.is_dict_like(columns): # pragma: no cover column_list = columns[key] elif pd.api.types.is_list_like(columns): # pragma: no cover column_list = columns print("column_list", column_list) tables[key] = read_pq_to_table( f"{basepath}{key}{ext}", columns=column_list, read_slice=the_slice, **kwargs, ) except FileNotFoundError as msg: # pragma: no cover if allow_missing_keys: continue raise msg return tables
# II E Reading `pandas.DataFrame` from csv file
[docs] def try_parse(val) -> Union[np.array, list, dict, str]: """Tries to parse a string into a numpy array or a JSON object. This function attempts to convert a string representation of a numpy array or a JSON object Parameters ---------- val : `str` The string to parse Returns ------- val : `numpy.array` or `list` or `dict` or `str` If the string is a valid numpy array or JSON object, it returns the parsed object. If parsing fails, it returns the original string. """ try: if isinstance(val, str) and val.startswith("[") and val.endswith("]"): return np.array([np.float64(x) for x in val.strip("[]").split()]) except Exception: pass try: if isinstance(val, str) and (val.startswith("[") or val.startswith("{")): return json.loads(val) except Exception: pass return val
[docs] def read_csv_to_dataframes( filepath: str, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, columns: Union[List[str], Mapping, None] = None, slice_dict: dict[str, slice | int] | None = None, **kwargs, ) -> Mapping: """ Reads `pandas.DataFrame` objects from a csv file into an `OrderedDict`. Parameters ---------- filepath: `str` Path to input file keys : `list` Keys for the input objects. Used to complete filepaths allow_missing_keys: `bool` If False will raise FileNotFoundError if a key is missing. By default False. columns : `dict` of `list (str)`, `list` (`str`), or `None` Names of the columns to read. - if a dictionary, keys are the `keys`, and values are a list of string column names. for each keyed table, only the columns in the value list will be loaded. if the key is not found, all columns will be loaded. - if a list, only the columns in the list will be loaded. - `None` will read all the columns slice_dict: `dict[str, slice | int]` or `None` If provided, specfies which slices to read from which tables **kwargs : additional arguments to pass to the native file reader Returns ------- tables : `OrderedDict` of `pandas.DataFrame` Keys will be taken from keys """ if keys is None: # pragma: no cover keys = [""] dataframes = OrderedDict() basepath, ext = os.path.splitext(filepath) if not ext: # pragma: no cover ext = "." + FILE_FORMAT_SUFFIX_MAP[PANDAS_CSV] for key in keys: if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None try: column_list = None if pd.api.types.is_dict_like(columns): # pragma: no cover column_list = columns[key] elif pd.api.types.is_list_like(columns): # pragma: no cover column_list = columns print("column_list", column_list) if column_list is not None and "usecols" not in kwargs: kwargs["usecols"] = column_list df = pd.read_csv(f"{basepath}{key}{ext}", **kwargs) if isinstance(df, pd.DataFrame): for col in df.columns: if isinstance(df[col].iloc[0], str) or not df[col].iloc: df[col] = df[col].apply(try_parse) if the_slice: dataframes[key] = df[the_slice] else: dataframes[key] = df return dataframes except FileNotFoundError as msg: # pragma: no cover if allow_missing_keys: continue raise msg
# II E Reading json files def read_json( filepath: str, keys: Optional[List[str]] = None, allow_missing_keys: bool = False, columns: Union[List[str], Mapping, None] = None, slice_dict: dict[str, slice | int] | None = None, **kwargs, ): with open(filepath, 'r', encoding='utf-8') as fin: data = json.load(fin) l_out = [] for key, val in data.items(): if keys is not None and key not in keys: # pragma: no cover continue if slice_dict is not None: the_slice = slice_dict.get(key) else: the_slice = None the_slice = _force_to_slice(the_slice) sub_dict = json.loads(val) if the_slice is not None: a_table = {kk: np.array(vv)[the_slice] for kk, vv in sub_dict.items()} else: a_table = {kk: np.array(vv) for kk, vv in sub_dict.items()} l_out.append((key, a_table)) return OrderedDict(l_out) # III. Miscellaneous
[docs] def read_native_error_message( filepath: str, fType: int, fmt: Optional[str], keys: Optional[List[str]], allow_missing_keys: bool, **kwargs, ) -> str: """Generates an error message to be printed out if a file cannot be read in by read_native. Parameters ---------- filepath : `str` Full path of the file to load fmt : `str` or `None` File format, if `None` it will be taken from the file extension. keys : `list` or `None` This argument is required for reading multiple associated parquet files. The keys should be the unique identifiers for each dataset or file. allow_missing_keys : `bool`, by default False. If False will raise FileNotFoundError if a key is missing from the given file. **kwargs : additional arguments to pass to the native file reader Returns ------- str The error message string. """ return f"{FILE_FORMATS[fType]} file could not be read in with the following arguments: \n filepath: '{filepath}', fmt: '{fmt}', keys: {keys}, allow_missing_keys: {allow_missing_keys}, and **kwargs: {kwargs}"