Source code for time_split_app.datasets._load

from typing import Any

import pandas as pd

from ._config import USE_ORIGINAL_INDEX
from ._read_fn import get_pandas_read_function


def dataframe_from_path(path: str, index: str, verify: bool = True, **kwargs: Any) -> pd.DataFrame:
    """Read dataframe from path.

    Args:
        path: A path. The read function to use is derived based on the file suffix(es).
        index: Index column. Pass ``{USE_ORIGINAL_INDEX!r}`` if the frame on disk already has a datetime-like index.
        verify: If ``True``, verify the index.
        **kwargs: Keyword arguments for the read function.

    Returns:
        A pandas DataFrame.

    Raises:
        DuplicateIndexError: If the data is not aggregated (only if `verify` is ``True``).
    """
    pandas_read_fn = get_pandas_read_function(path)
    df = pandas_read_fn(path, **kwargs)

    if index == USE_ORIGINAL_INDEX:
        if not isinstance(df.index, pd.DatetimeIndex):
            raise TypeError(f"Bad index; expected a {pd.DatetimeIndex.__name__} but got {type(df.index).__name__}.")
    else:
        df[index] = pd.to_datetime(df[index].astype(str))
        df = df.set_index(index)

    if verify is False or df.index.is_unique:
        return df

    raise DuplicateIndexError(df)


dataframe_from_path.__doc__ = dataframe_from_path.__doc__.format(USE_ORIGINAL_INDEX=USE_ORIGINAL_INDEX)  # type: ignore[union-attr]


[docs] class DuplicateIndexError(Exception): """Error raised when unaggregated data is detected.""" def __init__(self, df: pd.DataFrame, head: int = 5) -> None: super().__init__("Data must be pre-aggregated.") self._size = len(df) index = df.index self._name = index.name duplicates = df.loc[index.duplicated(False)] self._n_duplicates = len(duplicates) if len(duplicates) > head: duplicates = duplicates.head(head) self._samples = duplicates.sort_index(ascending=False) self.add_note(f"Sample data (showing 3/{self._n_duplicates} duplicate rows):\n{self._samples.head(3)}") @property def samples(self) -> pd.DataFrame: """Sample data with duplicated index values.""" return self._samples @property def n_duplicated(self) -> int: """Total number of duplicated index values.""" return self._n_duplicates @property def n_total(self) -> int: """Total number of rows in the original frame.""" return self._size