Source code for tables_io.types

"""Type definitions and related functions for tables_io"""

import os
from collections import OrderedDict
from collections.abc import Iterable, Mapping
from typing import Union, Optional

import numpy as np

from .utils.array_utils import array_length
from .lazy_modules import pa, json

# Tabular data formats
AP_TABLE = 0
NUMPY_DICT = 1
NUMPY_RECARRAY = 2
PD_DATAFRAME = 3
PA_TABLE = 4
JSON_STRING = 5

TABULAR_FORMAT_NAMES = OrderedDict(
    [
        ("astropyTable", AP_TABLE),
        ("numpyDict", NUMPY_DICT),
        ("numpyRecarray", NUMPY_RECARRAY),
        ("pandasDataFrame", PD_DATAFRAME),
        ("pyarrowTable", PA_TABLE),
        ("jsonString", JSON_STRING),
    ]
)

TABULAR_FORMATS = OrderedDict([(val, key) for key, val in TABULAR_FORMAT_NAMES.items()])


# File Formats
ASTROPY_FITS = 0
ASTROPY_HDF5 = 1
NUMPY_HDF5 = 2
NUMPY_FITS = 3
PANDAS_HDF5 = 4
PANDAS_PARQUET = 5
PYARROW_HDF5 = 6
PYARROW_PARQUET = 7
PANDAS_CSV = 8
JSON = 9


FILE_FORMAT_NAMES = OrderedDict(
    [
        ("astropyFits", ASTROPY_FITS),
        ("astropyHdf5", ASTROPY_HDF5),
        ("numpyHdf5", NUMPY_HDF5),
        ("numpyFits", NUMPY_FITS),
        ("pyarrowHdf5", PYARROW_HDF5),
        ("pandasHdf5", PANDAS_HDF5),
        ("pandaParquet", PANDAS_PARQUET),
        ("pyarrowParquet", PYARROW_PARQUET),
        ("pandasCsv", PANDAS_CSV),
        ("json", JSON),
    ]
)

# Default suffixes for various file formats
FILE_FORMAT_SUFFIXS = OrderedDict(
    [
        ("fits", ASTROPY_FITS),
        ("hf5", ASTROPY_HDF5),
        ("hdf5", NUMPY_HDF5),
        ("fit", NUMPY_FITS),
        ("h5", PANDAS_HDF5),
        ("hd5", PYARROW_HDF5),
        ("parquet", PYARROW_PARQUET),
        ("parq", PANDAS_PARQUET),
        ("pq", PANDAS_PARQUET),
        ("csv", PANDAS_CSV),
        ("json", JSON),
    ]
)

DEFAULT_TABLE_KEY = OrderedDict(
    [
        ("fits", ""),
        ("hf5", None),
        ("hdf5", None),
        ("hd5", "data"),
        ("fit", ""),
        ("h5", "data"),
        ("parquet", ""),
        ("parq", ""),
        ("pq", ""),
        ("csv", ""),
        ("json", ""),
    ]
)

FILE_FORMATS = OrderedDict([(val, key) for key, val in FILE_FORMAT_NAMES.items()])

FILE_FORMAT_SUFFIX_MAP = OrderedDict(
    [(val, key) for key, val in FILE_FORMAT_SUFFIXS.items()]
)

# Default format to write various table types
NATIVE_FORMAT = OrderedDict(
    [
        (AP_TABLE, ASTROPY_HDF5),
        (NUMPY_DICT, NUMPY_HDF5),
        (NUMPY_RECARRAY, NUMPY_FITS),
        (PD_DATAFRAME, PANDAS_PARQUET),
        (PA_TABLE, PYARROW_PARQUET),
        (JSON_STRING, JSON),
    ]
)

NATIVE_TABLE_TYPE = OrderedDict([(val, key) for key, val in NATIVE_FORMAT.items()])

# Allowed formats to write various table types
ALLOWED_FORMATS = OrderedDict(
    [
        (AP_TABLE, [ASTROPY_FITS, ASTROPY_HDF5]),
        (NUMPY_DICT, [NUMPY_HDF5]),
        (NUMPY_RECARRAY, [ASTROPY_FITS]),
        (PD_DATAFRAME, [PANDAS_PARQUET, PANDAS_HDF5, PANDAS_CSV]),
        (PA_TABLE, [PYARROW_PARQUET, PANDAS_PARQUET, PANDAS_HDF5]),
        (JSON_STRING, [JSON]),
    ]
)

TABLE_FORMAT = {}
for key, val_list in ALLOWED_FORMATS.items():
    for val in val_list:
        TABLE_FORMAT[val] = key


def is_dataframe(obj):
    for c in obj.__class__.__mro__:
        if c.__name__ == "DataFrame" and c.__module__ in ["pandas.core.frame", "pandas"]:
            return True
    return False


def is_ap_table(obj):
    for c in obj.__class__.__mro__:
        if c.__name__ == "Table" and c.__module__ == "astropy.table.table":
            return True
    return False


def is_pa_table(obj):
    for c in obj.__class__.__mro__:
        if c.__name__ == "Table" and c.__module__ == "pyarrow.lib":
            return True
    return False


def is_json_table(obj):
    if not isinstance(obj, str):
        return False
    try:
        json.loads(obj)
    except (ValueError, json.JSONDecodeError):
        return False
    return True


[docs] def table_type(obj) -> int: """Identify the type of table we have Parameters ---------- obj : `object` The input object Returns ------- otype : `int` The object type, one of `TABULAR_FORMATS.keys()` Raises ------ TypeError The object is not a supported type IndexError One of the columns in a Mapping is the wrong length """ if is_dataframe(obj): return PD_DATAFRAME if is_ap_table(obj): return AP_TABLE if is_pa_table(obj): return PA_TABLE if isinstance(obj, (np.recarray, np.ma.core.MaskedArray)): return NUMPY_RECARRAY if is_json_table(obj): return JSON_STRING if not isinstance(obj, Mapping): raise TypeError( f"Object of type {type(obj)} is not one of the supported types. \n Must be one of {list(TABULAR_FORMAT_NAMES.keys())}" ) nRow = None for key, val in obj.items(): if is_table_like(val): raise TypeError(f"Column {key} is a table of type {type(val)}") if not isinstance(val, Iterable): raise TypeError(f"Column {key} of type {type(val)} is not iterable") if nRow is None: nRow = array_length(val) else: if array_length(val) != nRow: raise IndexError( f"Column {key} length {array_length(val)} != {nRow}. \n Column lengths are not equal, this is not a valid {TABULAR_FORMATS[NUMPY_DICT]} object" ) # pylint: disable=bad-string-format-type return NUMPY_DICT
[docs] def is_table_like(obj) -> bool: """Test to see if an object is one of the supported table types Parameters ---------- obj : `object` The input object Returns ------- table-like : `bool` True is the object is `Table-like`, False otherwise """ try: _ = table_type(obj) except (TypeError, IndexError): return False return True
[docs] def is_tabledict_like(obj) -> bool: """Test to see if an object is a `Mapping`, (`str`, `Table-like`), or `TableDict-like`. Parameters ---------- obj : `object` The input object Returns ------- tabledict : `bool` True is the object is a `Mapping`, (`str`, `Table-like`), False otherwise """ if not isinstance(obj, Mapping): return False for val in obj.values(): if not is_table_like(val): return False return True
[docs] def file_type(filepath: str, fmt: Optional[str] = None) -> int: """Identify the type of file we have Parameters ---------- filepath : `str` The path to the file fmt : `str` or `None` Overrides the file extension Returns ------- otype : `int` The object type, one of `FILE_FORMATS.keys()` Raises ------ KeyError The file format is not a support value """ if fmt is None: fmt = os.path.splitext(filepath)[1][1:] try: return FILE_FORMAT_SUFFIXS[fmt] except KeyError as msg: raise KeyError( f"Unknown file format {fmt}, supported types are" f"{list(FILE_FORMAT_SUFFIXS.keys())}" ) from msg
[docs] def tType_to_int(tType: Union[str, int]) -> int: """Takes table type as an `int` or `str`, and converts it to the corresponding `int` if it's a `str`. Parameters ---------- tType : Union[str, int] The tabular format Returns ------- int The number corresponding to the tabular format Raises ------ TypeError Raised if the given `str` is not one of the available tabular format options. """ if isinstance(tType, str): try: int_tType = TABULAR_FORMAT_NAMES[tType] except: raise TypeError( f"Unsupported tableType '{tType}', must be one of {TABULAR_FORMAT_NAMES}" ) if isinstance(tType, int): int_tType = tType return int_tType
[docs] def get_table_type(obj) -> str: """Gets the table type of a Table-like or TableDict-like object, and returns the name of that type. If the object is a TableDict-like object, it will check that all Table-like objects have the same type. Will raise an error if the object is not of a supported type. Parameters ---------- obj : `Table-like` or `TableDict-like` object The object to determine the type of. Returns ------- str Name of the tabular type Raises ------ TypeError Raises a TypeError if the table is not of a supported type """ # check if object is TableDict-like is_td = is_tabledict_like(obj) if is_td: # get the table type of the tables in the TableDict tab_types = [] for keys in obj.keys(): tab_types.append(table_type(obj[keys])) # if there is more than one table type raise an error if len(np.unique(tab_types)) > 1: raise TypeError( f"Object contains Table-like objects of multiple types (obj: {obj})" ) int_tType = tab_types[0] else: try: # get the table type of the Table int_tType = table_type(obj) except Exception as e: raise TypeError( f"Object of type {type(obj)} is not one of the supported types. \n Must be one of {list(TABULAR_FORMAT_NAMES.keys())}" ) from e tType = TABULAR_FORMATS[int_tType] return tType