Source code for time_split_app.widgets.data._data_loader_widget
import abc
from collections.abc import Collection
from datetime import date, datetime, timedelta
from typing import Literal, overload
import pandas as pd
import streamlit as st
from rics.types import LiteralHelper
from rics.misc import get_by_full_name
from time_split_app import config
AnyDateRange = tuple[datetime, datetime] | tuple[date, date]
Anchor = Literal["absolute", "relative", "now"]
AnchorOptions = Collection[Anchor]
ANCHOR_HELPER: LiteralHelper[Anchor] = LiteralHelper(Anchor, default_name="anchor", normalizer=str.lower)
ABSOLUTE: Literal["absolute"] = "absolute"
NOW: Literal["now"] = "now"
RELATIVE: Literal["relative"] = "relative"
[docs]
class DataLoaderWidget(abc.ABC):
"""Load or generate datasets that require user input."""
[docs]
@abc.abstractmethod
def get_title(self) -> str:
"""Title shown in the `⚙️ Configure data` menu. Uses Markdown syntax."""
[docs]
@abc.abstractmethod
def get_description(self) -> str:
"""Brief description shown in the `⚙️ Configure data` menu. Uses Markdown syntax."""
[docs]
def get_prefix(self) -> bytes | None:
"""Return a loader prefix.
Used to identify the loader when multiple custom loaders are used. Generated automatically if ``None``."""
return None
[docs]
@abc.abstractmethod
def load(self, params: bytes | None) -> tuple[pd.DataFrame, dict[str, str], bytes] | pd.DataFrame:
"""Load data.
.. note::
This method will be called many times due to the Streamlit data model.
You may want to use ``@streamlit.cache_data`` or ``@streamlit.cache_resource`` to improve performance. See
https://docs.streamlit.io/develop/concepts/architecture/caching
for more information.
The :meth:`select_range`-method may be used to prompt the user for a date range in which to retrieve data. If
any other input is needed, you may use the
Args:
params: Parameter preset as bytes. Handling is implementation-specific.
Returns:
A :class:`pandas.DataFrame` or a tuple ``(data, aggregations, params)``, where the ``params: bytes`` may be given as
the `params` argument to recreate the frame returned.
See :attr:`.QueryParams.data` for more information regarding the `params` argument.
"""
@classmethod
@overload
def select_range(
cls,
initial: AnyDateRange | None = None,
*,
date_only: Literal[False] = False,
start_options: AnchorOptions | None = None,
end_options: AnchorOptions | None = None,
) -> tuple[datetime, datetime]: ...
@classmethod
@overload
def select_range(
cls,
initial: AnyDateRange | None = None,
*,
date_only: Literal[True],
start_options: AnchorOptions | None = None,
end_options: AnchorOptions | None = None,
) -> tuple[date, date]: ...
[docs]
@classmethod
def select_range(
cls,
initial: AnyDateRange | None = None,
*,
date_only: bool | None = None,
start_options: AnchorOptions | None = None,
end_options: AnchorOptions | None = None,
) -> AnyDateRange:
"""Support method for getting user date range input.
Args:
initial: Initial range used by the widget.
date_only: If ``True``, disable the time selector and return dates. Default = based on config.
start_options: Start options to make available to the user. Default = all.
end_options: End options to make available to the user. Default = all.
Returns:
A tuple ``(start, end)``.
Raises:
TypeError: If `start_options` or `start_options` are invalid.
"""
from functools import partial
from ..time import select_datetime, DurationWidget
if date_only is None:
date_only = config.DATE_ONLY
start_options = ANCHOR_HELPER.options if start_options is None else cls._check(start_options, name="start")
end_options = ANCHOR_HELPER.options if end_options is None else cls._check(end_options, name="end")
select_datetime = partial(select_datetime, header=False, date_only=date_only)
explicit_initial_range = True
if initial is None:
explicit_initial_range = False
initial = initial_range_fn()
initial_start, initial_end = initial
delta: timedelta = initial_end - initial_start
duration_widget = DurationWidget.from_delta(delta, date_only)
with st.container(key=f"tight-rows-{cls.select_range.__qualname__}"):
left, right = st.columns(2)
left.subheader("Select Start", divider=True)
right.subheader("Select End", divider=True)
left_index, right_index = 0, 0
if explicit_initial_range:
try:
left_index = [*start_options].index(ABSOLUTE)
right_index = [*end_options].index(ABSOLUTE)
except Exception:
pass
elif initial_range_fn is default_initial_range_fn:
try:
left_index = [*start_options].index(RELATIVE)
right_index = [*end_options].index(NOW)
except Exception:
pass
with left:
start_type = st.radio(
"start-selection-type",
start_options,
index=left_index,
horizontal=True,
format_func=str.title,
label_visibility="collapsed",
)
with right:
end_type = st.radio(
"end-selection-type",
end_options,
index=right_index,
horizontal=True,
format_func=str.title,
label_visibility="collapsed",
)
if start_type == RELATIVE and end_type == RELATIVE:
st.error("At least one of `Start date` and `End date` must be fixed.", icon="🚨")
st.stop()
start: datetime | date | None = None
end: datetime | date | None = None
# Handle explicit starts
if start_type == NOW:
with left:
start = select_datetime("Start", None, disabled=True)
elif start_type == ABSOLUTE:
with left:
start = select_datetime("Start", initial_start)
# Handle explicit ends
if end_type == NOW:
with right:
end = select_datetime("End", None, disabled=True)
elif end_type == ABSOLUTE:
with right:
end = select_datetime("End", initial_end)
# Handle relative start anchor.
if start_type == RELATIVE:
assert end is not None
with left:
start = end - duration_widget.select("start-duration")
# Handle relative end anchor.
elif end_type == RELATIVE:
assert start is not None
with right:
end = start + duration_widget.select("end-duration")
assert start is not None
assert end is not None
if start >= end:
st.info("Select valid range.", icon="ℹ️") # noqa: RUF001
st.stop()
return start, end
@classmethod
def _check(cls, options: AnchorOptions, name: str) -> AnchorOptions:
name = f"{name}_options"
if len(options) > len(ANCHOR_HELPER.options):
raise TypeError(f"Bad {name}={options!r}; max length is {len(ANCHOR_HELPER.options)}.")
for i, option in enumerate(options):
ANCHOR_HELPER.check(option, f"start_options[{i}]")
if len(set(options)) != len(options):
raise ValueError(f"Bad {name}; options must be unique.")
return options
DEFAULT_INITIAL_RANGE_SECONDS = 30 * 24 * 60 * 60 # 30 days
def default_initial_range_fn() -> tuple[datetime, datetime] | tuple[date, date]:
now = datetime.now()
return now - timedelta(seconds=DEFAULT_INITIAL_RANGE_SECONDS), now
if func_name := config.DATA_GENERATOR_INITIAL_RANGE_FN:
try:
# Undocumented behavior!
value = int(func_name)
if value > 0:
DEFAULT_INITIAL_RANGE_SECONDS = value
initial_range_fn = default_initial_range_fn
except ValueError:
initial_range_fn = get_by_full_name(func_name)
else:
initial_range_fn = default_initial_range_fn