tables_io.io_utils.iterator

IO Iterator Read Functions for tables_io

Functions

iterator(filepath, tType[, fmt, chunk_size, rank, ...])

Iterates through the data in a given file. The data is yielded (along with

iterator_native(filepath[, fmt, chunk_size, rank, ...])

Iterates through the data in a given file. The data is yielded (along with

get_input_data_length(filepath[, fmt])

Opens the given file and gets the length of data in that file. If the data is multi-dimensional, the

get_input_data_length_HDF5(→ int)

Open an HDF5 file and return the size of a group

iter_HDF5_to_dict(→ collections.abc.Iterator[int, int, ...)

Iterates through an HDF5 file, yielding one chunk of data at a time

iter_H5_to_dataframe(filepath[, chunk_size, ...])

iterator for sending chunks of data in hdf5.

iter_pq_to_dataframe(filepath[, chunk_size, columns])

Iterates through a parquet file, yielding one chunk of data at a time.

iter_csv_to_dataframe(filepath[, chunk_size])

Iterates through a CSV file, yielding one chunk of data at a time.

get_input_data_length_pq(→ int)

Open a Parquet file and return the size of a group

get_input_data_length_csv(→ int)

Open a CSV file and return the number of rows in it

iter_ds_to_table(source[, columns, chunk_size])

Iterator for sending chunks of data in parquet

get_input_data_length_ds(→ int)

Open a dataset and return the number of rows in a group

split_tasks_by_rank(→ collections.abc.Iterator[int])

Iterate through a list of tasks, yielding ones this process is responsible for.

data_ranges_by_rank(→ collections.abc.Iterator[int, int])

Split a number of rows by process.

Module Contents

iterator(filepath: str, tType: str | int, fmt: str | None = None, chunk_size: int | None = 100000, rank: int | None = 0, parallel_size: int | None = 1, **kwargs)[source]

Iterates through the data in a given file. The data is yielded (along with the start and stop index) as a Table-like object. The data will be read in as the tabular format given by tType. Uses iterator_native() to iterate through data, and converts it as it is yielded.

For a given file type, there are additional arguments that can be supplied to the native file reader. The main arguments that can be supplied are groupname for HDF5 files, and columns for parquet files. Other arguments for reading parquet files can be found in the documentation of pyarrow.parquet.read_table or pyarrow.dataset.dataset.

This function currently only works for the following file types: numpyHDF5, pandasParquet, pyarrowParquet, pyarrowHDF5

Accepted tabular types:

Format string

Format integer

“astropyTable”

0

“numpyDict”

1

“numpyRecarray”

2

“pandasDataFrame”

3

“pyarrowTable”

4

Parameters:
  • filepath (str) – File to load

  • tType (int or None) – Table type, if None this will use readNative

  • fmt (str or None) – File format, if None it will be taken from the file extension

  • chunk_size (int, by default 100,000) – The size of data chunk to iterate over

  • rank (int, by default 0) – The rank of this process under MPI

  • parallel_size (int, by default 1) – The number of processes under MPI

Returns:

  • start (int) – The starting index for the data.

  • stop (int) – The end index for the data.

  • data (Table-like) – The data from [start:stop]. The format will be the native tabular format for the file if no tType is given. Otherwise, the data will be in the tabular format tType.

Optional kwargs

groupnamestr or None, by default None

For HDF5 files, the group name where the data is.

columnslist of str or None, by default None

For parquet files, the names of the columns to read. None will read all the columns.

iterator_native(filepath: str, fmt: str | None = None, chunk_size: int | None = 100000, rank: int | None = 0, parallel_size: int | None = 1, **kwargs)[source]

Iterates through the data in a given file. The data is yielded (along with the start and stop index) as a Table-like object that has the default format for the given file type.

This function currently only works for the following file types: numpyHDF5, pandasParquet, pyarrowParquet, pyarrowHDF5

Any kwargs are passed to the specific iterator function for the file type.

Parameters:
  • filepath (str) – File to load

  • fmt (str or None) – File format, if None it will be taken from the file extension. By default None.

  • chunk_size (int, by default 100,000) – The size of data chunk to iterate over

  • rank (int, by default 0) – The rank of this process under MPI

  • parallel_size (int, by default 1) – The number of processes under MPI

Returns:

  • start (int) – Data start index

  • stop (int) – Data ending index

  • data (Table-like) – The data in the native type for that file, from [start:stop]

Optional kwargs

groupnamestr or None

For HDF5 files, the group name where the data is

columnslist of str or None

For parquet files, the names of the columns to read. None will read all the columns

get_input_data_length(filepath: str, fmt: str | None = None, **kwargs)[source]

Opens the given file and gets the length of data in that file. If the data is multi-dimensional, the function will give the length of the first axis of the data, which is typically the axis that you want to iterate over.

Parameters:
  • filepath (str) – File to load

  • fmt (str or None) – File format, if None it will be taken from the file extension.

Returns:

nrows – The length of the data

Return type:

int

Notes

The kwargs are passed to the specific iterator type.

get_input_data_length_HDF5(filepath: str, groupname: str | None = None) int[source]

Open an HDF5 file and return the size of a group

Parameters:
  • filepath (str) – The input filepath.

  • groupname (str or None) – The group name where the data is, by default None.

Returns:

length – The length of the data. In the case of a multi-dimensional array, this is the length of the first axis.

Return type:

int

Notes

For a multi-D array this returns the length of the first axis and not the total size of the array.

Normally that is what you want to be iterating over.

iter_HDF5_to_dict(filepath: str, groupname: str | None = None, chunk_size: int = 100000, rank: int = 0, parallel_size: int = 1) collections.abc.Iterator[int, int, Mapping][source]

Iterates through an HDF5 file, yielding one chunk of data at a time as an OrderedDict of np.array objects.

Parameters:
  • filepath (str) – The input filepath

  • groupname (str) – The group name where the data is, by default None.

  • chunk_size (int, by default 100,000) – The size of data chunk to iterate over

  • rank (int, by default 0) – The rank of this process under MPI

  • parallel_size (int, by default 1) – The number of processes under MPI

Yields:
  • start (int) – Data start index

  • end (int) – Data ending index

  • data (dict) – OrderedDict of np.array of all data from start:end

iter_H5_to_dataframe(filepath: str, chunk_size: int | None = 100000, groupname=None, rank: int | None = 0, parallel_size: int | None = 1)[source]

iterator for sending chunks of data in hdf5.

Parameters:
  • filepath (str) – input file name

  • chunk_size (int) – size of chunk to iterate over

Returns:

iterator chunk

Return type:

output

iter_pq_to_dataframe(filepath: str, chunk_size: int = 100000, columns: List[str] | None = None, **kwargs)[source]

Iterates through a parquet file, yielding one chunk of data at a time.

Parameters:
  • filepath (str) – path to input file

  • chunk_size (int, by default = 100,000) – The maximum chunk size of the data

  • columns (list (str) or None) – Names of the columns to read, None will read all the columns

  • kwargs (additional arguments to pass to the parquet read_table function)

Yields:
  • start (int) – Data start index

  • end (int) – Data ending index

  • data (pandas.DataFrame) – DataFrame of all data from start:end

iter_csv_to_dataframe(filepath: str, chunk_size: int = 100000, **kwargs)[source]

Iterates through a CSV file, yielding one chunk of data at a time.

Parameters:
  • filepath (str) – path to input file

  • chunk_size (int, by default = 100,000) – The maximum chunk size of the data

  • kwargs (additional arguments to pass to the pandas read_csv function)

Yields:

data (pandas.DataFrame) – DataFrame of all data from start:end

get_input_data_length_pq(filepath: str, columns: List[str] | None = None, **kwargs) int[source]

Open a Parquet file and return the size of a group

Parameters:
  • filepath (str) – Path to input file

  • columns (List[str] or None) – The groupname for the data

  • kwargs (additional arguments to pass to the pyarrow.parquet.read_table function)

Returns:

nrow – The length of the data

Return type:

int

Notes

For a multi-D array this return the length of the first axis and not the total size of the array.

Normally that is what you want to be iterating over.

get_input_data_length_csv(source: str, **kwargs) int[source]

Open a CSV file and return the number of rows in it

Parameters:

source (str) – Path to input file

Returns:

nrows – The length of the data

Return type:

int

iter_ds_to_table(source, columns: List[str] | None = None, chunk_size: int = 100000, **kwargs)[source]

Iterator for sending chunks of data in parquet

Parameters:
  • source (str) – input file name

  • columns (List[str], default None) – The list of columns to use

  • chunk_size (int) – The maximum size of the batches to be read in

  • kwargs (additional arguments to pass to the pyarrow.dataset.to_batches() function)

Yields:
  • start (int) – Data start index

  • end (int) – Data ending index

  • data (pyarrow.Table) – table of all data from start:end

get_input_data_length_ds(source, **kwargs) int[source]

Open a dataset and return the number of rows in a group

Parameters:
  • source (str) – Path to input file or directory

  • kwargs – kwargs are passed to pyarrow.dataset.dataset()

Returns:

nrows – The length of the data

Return type:

int

split_tasks_by_rank(tasks: collections.abc.Iterable[int], parallel_size: int, rank: int) collections.abc.Iterator[int][source]

Iterate through a list of tasks, yielding ones this process is responsible for.

Tasks are allocated in a round-robin way.

Parameters:
  • tasks (iterator) – Tasks to split up

  • parallel_size (int) – The number of processes under MPI

  • rank (int) – The rank of this process under MPI

Yields:

task (int) – The number of the first task for this process

data_ranges_by_rank(n_rows: int, chunk_rows: int, parallel_size: int, rank: int) collections.abc.Iterator[int, int][source]

Split a number of rows by process.

Given a total number of rows to read and a chunk size, yield the ranges within them that this process should handle.

Parameters:
  • n_rows (int) – Total number of rows to split up

  • chunk_rows (int) – Size of each chunk to be read

  • parallel_size (int) – The number of processes under MPI

  • rank (int) – The rank of this process under MPI

Yields:
  • start (int) – Data start index

  • end (int) – Data ending index