from collections.abc import Sequence
from functools import partial
import dask
from dask import delayed
from dask.delayed import Delayed
from plateau.core import naming
from plateau.core.docs import default_docs
from plateau.core.factory import _ensure_factory
from plateau.core.naming import DEFAULT_METADATA_VERSION
from plateau.core.typing import StoreInput
from plateau.core.utils import lazy_store
from plateau.core.uuid import gen_uuid
from plateau.io_components.delete import (
delete_common_metadata,
delete_indices,
delete_top_level_metadata,
)
from plateau.io_components.gc import delete_files, dispatch_files_to_gc
from plateau.io_components.metapartition import (
SINGLE_TABLE,
MetaPartition,
parse_input_to_metapartition,
)
from plateau.io_components.read import dispatch_metapartitions_from_factory
from plateau.io_components.update import update_dataset_from_partitions
from plateau.io_components.utils import (
_ensure_compatible_indices,
normalize_arg,
normalize_args,
raise_if_indices_overlap,
validate_partition_keys,
)
from plateau.io_components.write import (
raise_if_dataset_exists,
store_dataset_from_partitions,
write_partition,
)
from ._utils import (
_cast_categorical_to_index_cat,
_get_data,
_maybe_get_categoricals_from_index,
map_delayed,
)
__all__ = (
"delete_dataset__delayed",
"garbage_collect_dataset__delayed",
"read_dataset_as_delayed",
"update_dataset_from_delayed",
"store_delayed_as_dataset",
)
def _delete_all_additional_metadata(dataset_factory):
delete_indices(dataset_factory=dataset_factory)
delete_common_metadata(dataset_factory=dataset_factory)
def _delete_tl_metadata(dataset_factory, *args):
"""This function serves as a collector function for delayed objects.
Therefore allowing additional arguments which are not used.
"""
delete_top_level_metadata(dataset_factory=dataset_factory)
[docs]
@default_docs
@normalize_args
def delete_dataset__delayed(dataset_uuid=None, store=None, factory=None):
"""
Parameters
----------
"""
dataset_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
load_schema=False,
)
gc = garbage_collect_dataset__delayed(factory=dataset_factory)
mps = dispatch_metapartitions_from_factory(dataset_factory)
delayed_dataset_uuid = delayed(_delete_all_additional_metadata)(
dataset_factory=dataset_factory
)
mps = map_delayed(
MetaPartition.delete_from_store,
mps,
store=store,
dataset_uuid=dataset_factory.dataset_uuid,
)
return delayed(_delete_tl_metadata)(
dataset_factory, list(mps), gc, delayed_dataset_uuid
)
[docs]
@default_docs
@normalize_args
def garbage_collect_dataset__delayed(
dataset_uuid: str | None = None,
store: StoreInput | None = None,
chunk_size: int = 100,
factory=None,
) -> list[Delayed]:
"""Remove auxiliary files that are no longer tracked by the dataset.
These files include indices that are no longer referenced by the metadata
as well as files in the directories of the tables that are no longer
referenced. The latter is only applied to static datasets.
Parameters
----------
chunk_size
Number of files that should be deleted in a single job.
"""
ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
)
nested_files = dispatch_files_to_gc(
dataset_uuid=None, store_factory=None, chunk_size=chunk_size, factory=ds_factory
)
return list(
map_delayed(delete_files, nested_files, store_factory=ds_factory.store_factory)
)
def _load_and_concat_metapartitions_inner(mps, args, kwargs):
return MetaPartition.concat_metapartitions(
[mp.load_dataframes(*args, **kwargs) for mp in mps]
)
def _load_and_concat_metapartitions(list_of_mps, *args, **kwargs):
return map_delayed(
_load_and_concat_metapartitions_inner, list_of_mps, args=args, kwargs=kwargs
)
@default_docs
@normalize_args
def read_dataset_as_delayed_metapartitions(
dataset_uuid=None,
store=None,
columns=None,
predicate_pushdown_to_io=True,
categoricals: Sequence[str] | None = None,
dates_as_object: bool = True,
predicates=None,
factory=None,
dispatch_by=None,
):
"""A collection of dask.delayed objects to retrieve a dataset from store
where each partition is loaded as a
:class:`~plateau.io_components.metapartition.MetaPartition`.
.. seealso:
:func:`~plateau.io.dask.read_dataset_as_delayed`
Parameters
----------
"""
ds_factory = _ensure_factory(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
)
store = ds_factory.store_factory
mps = dispatch_metapartitions_from_factory(
dataset_factory=ds_factory,
predicates=predicates,
dispatch_by=dispatch_by,
)
if dispatch_by is not None:
mps = _load_and_concat_metapartitions(
mps,
store=store,
columns=columns,
categoricals=categoricals,
predicate_pushdown_to_io=predicate_pushdown_to_io,
dates_as_object=dates_as_object,
predicates=predicates,
)
else:
mps = map_delayed(
MetaPartition.load_dataframes,
mps,
store=store,
columns=columns,
categoricals=categoricals,
predicate_pushdown_to_io=predicate_pushdown_to_io,
dates_as_object=dates_as_object,
predicates=predicates,
)
categoricals_from_index = _maybe_get_categoricals_from_index(
ds_factory, categoricals
)
if categoricals_from_index:
mps = map_delayed(
partial( # type: ignore
MetaPartition.apply,
func=partial( # type: ignore
_cast_categorical_to_index_cat, categories=categoricals_from_index
),
type_safe=True,
),
mps,
)
return list(mps)
[docs]
@default_docs
def read_dataset_as_delayed(
dataset_uuid=None,
store=None,
columns=None,
predicate_pushdown_to_io=True,
categoricals=None,
dates_as_object: bool = True,
predicates=None,
factory=None,
dispatch_by=None,
):
"""A collection of dask.delayed objects to retrieve a dataset from store
where each partition is loaded as a :class:`~pandas.DataFrame`.
Parameters
----------
"""
mps = read_dataset_as_delayed_metapartitions(
dataset_uuid=dataset_uuid,
store=store,
factory=factory,
columns=columns,
predicate_pushdown_to_io=predicate_pushdown_to_io,
categoricals=categoricals,
dates_as_object=dates_as_object,
predicates=predicates,
dispatch_by=dispatch_by,
)
return list(map_delayed(_get_data, mps))
[docs]
@default_docs
def update_dataset_from_delayed(
delayed_tasks: list[Delayed],
store=None,
dataset_uuid=None,
delete_scope=None,
metadata=None,
df_serializer=None,
metadata_merger=None,
default_metadata_version=DEFAULT_METADATA_VERSION,
partition_on=None,
sort_partitions_by=None,
secondary_indices=None,
factory=None,
table_name=SINGLE_TABLE,
):
"""A dask.delayed graph to add and store a list of dictionaries containing
dataframes to a plateau dataset in store. The input should be a list (or
splitter pipeline) containing
:class:`~plateau.io_components.metapartition.MetaPartition`. If you want to
use this pipeline step for just deleting partitions without adding new ones
you have to give an empty meta partition as input
(``[Metapartition(None)]``).
Parameters
----------
See Also
--------
:ref:`mutating_datasets`
"""
partition_on = normalize_arg("partition_on", partition_on)
store = normalize_arg("store", store)
secondary_indices = normalize_arg("secondary_indices", secondary_indices)
delete_scope = dask.delayed(normalize_arg)("delete_scope", delete_scope)
ds_factory, metadata_version, partition_on = validate_partition_keys(
dataset_uuid=dataset_uuid,
store=store,
default_metadata_version=default_metadata_version,
partition_on=partition_on,
ds_factory=factory,
)
secondary_indices = _ensure_compatible_indices(ds_factory, secondary_indices)
mps = map_delayed(
write_partition,
delayed_tasks,
secondary_indices=secondary_indices,
metadata_version=metadata_version,
partition_on=partition_on,
store_factory=store,
df_serializer=df_serializer,
dataset_uuid=dataset_uuid,
sort_partitions_by=sort_partitions_by,
dataset_table_name=table_name,
)
return dask.delayed(update_dataset_from_partitions)(
list(mps),
store_factory=store,
dataset_uuid=dataset_uuid,
ds_factory=ds_factory,
delete_scope=delete_scope,
metadata=metadata,
metadata_merger=metadata_merger,
)
[docs]
@default_docs
@normalize_args
def store_delayed_as_dataset(
delayed_tasks: list[Delayed],
store,
dataset_uuid=None,
metadata=None,
df_serializer=None,
overwrite=False,
metadata_merger=None,
metadata_version=naming.DEFAULT_METADATA_VERSION,
partition_on=None,
metadata_storage_format=naming.DEFAULT_METADATA_STORAGE_FORMAT,
table_name: str = SINGLE_TABLE,
secondary_indices=None,
) -> Delayed:
"""Transform and store a list of dictionaries containing dataframes to a
plateau dataset in store.
Parameters
----------
"""
store = lazy_store(store)
if dataset_uuid is None:
dataset_uuid = gen_uuid()
if not overwrite:
raise_if_dataset_exists(dataset_uuid=dataset_uuid, store=store)
raise_if_indices_overlap(partition_on, secondary_indices)
input_to_mps = partial(
parse_input_to_metapartition,
metadata_version=metadata_version,
table_name=table_name,
)
mps = map_delayed(input_to_mps, delayed_tasks)
if partition_on:
mps = map_delayed(MetaPartition.partition_on, mps, partition_on=partition_on)
if secondary_indices:
mps = map_delayed(MetaPartition.build_indices, mps, columns=secondary_indices)
mps = map_delayed(
MetaPartition.store_dataframes,
mps,
store=store,
df_serializer=df_serializer,
dataset_uuid=dataset_uuid,
)
return delayed(store_dataset_from_partitions)(
list(mps),
dataset_uuid=dataset_uuid,
store=store,
dataset_metadata=metadata,
metadata_merger=metadata_merger,
metadata_storage_format=metadata_storage_format,
)