plateau.io.eager module

plateau.io.eager.build_dataset_indices(store, dataset_uuid, columns, factory=None)[source]

Function which builds a ExplicitSecondaryIndex.

This function loads the dataset, computes the requested indices and writes the indices to the dataset. The dataset partitions itself are not mutated.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • columns – A subset of columns to be loaded.

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

plateau.io.eager.commit_dataset(store: str | KeyValueStore | Callable[[], KeyValueStore] | None = None, dataset_uuid: str | None = None, new_partitions: Iterable[MetaPartition] | None = None, delete_scope: Iterable[Dict[str, Any]] | None = None, metadata: Dict[Any, Any] | None = None, metadata_merger: Callable[[List[Dict]], Dict] | None = None, default_metadata_version: int = 4, partition_on: Iterable[str] | None = None, factory: DatasetFactory | None = None, secondary_indices: Iterable[str] | None = None)[source]

Commit new state to an existing dataset. This can be used for three distinct operations.

  1. Add previously written partitions to this dataset

    If for some reasons, the existing pipelines are not sufficient but you need more control, you can write the files outside of a plateau pipeline and commit them whenever you choose to.

    This should be used in combination with write_single_partition() and create_empty_dataset_header().

    import pandas as pd
    from plateau.io.eager import write_single_partition, commit_dataset
    
    store = "hfs://my_store"
    
    # The partition writing can be done concurrently and distributed if wanted.
    # Only the information about what partitions have been written is required for the commit.
    new_partitions = [
        write_single_partition(
            store=store,
            dataset_uuid='dataset_uuid',
            data=pd.DataFrame({'column': [1, 2]}),
        )
    ]
    
    new_dataset = commit_dataset(
        store=store,
        dataset_uuid='dataset_uuid',
        new_partitions=new_partitions,
    )
    
  2. Simple delete of partitions

    If you want to remove some partitions this is one of the simples ways of doing so. By simply providing a delete_scope, this removes the references to these files in an atomic commit.

    commit_dataset(
        store=store,
        dataset_uuid='dataset_uuid',
        delete_scope=[
            {
                "partition_column": "part_value_to_be_removed"
            }
        ],
    )
    
  3. Add additional metadata

    To add new metadata to an existing dataset

    commit_dataset(
        store=store,
        dataset_uuid='dataset_uuid',
        metadata={"new": "user_metadata"},
    )
    

    Note:

    If you do not want the new metadata to be merged with the existing one, povide a custom ``metadata_merger``
    
Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • delete_scope (List[Dict]) – This defines which partitions are replaced with the input and therefore get deleted. It is a lists of query filters for the dataframe in the form of a dictionary, e.g.: [{‘column_1’: ‘value_1’}, {‘column_1’: ‘value_2’}]. Each query filter will be given to: func: `dataset.query and the returned partitions will be deleted. If no scope is given nothing will be deleted. For plateau.io.dask.update.update_dataset.* a delayed object resolving to a list of dicts is also accepted.

  • metadata (Optional[Dict]) – A dictionary used to update the dataset metadata.

  • metadata_merger (Optional[Callable]) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).

  • default_metadata_version (int) – Default metadata version. (Note: Metadata version greater than 3 are only supported)

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.

  • new_partitions – Input partition to be committed.

plateau.io.eager.create_empty_dataset_header(store, dataset_uuid, schema, partition_on=None, metadata=None, overwrite=False, metadata_storage_format='json', metadata_version=4, table_name: str = 'table')[source]

Create an dataset header without any partitions. This may be used in combination with write_single_partition() to create implicitly partitioned datasets.

Note

The created dataset will always have explicit_partition==False

Warning

This function should only be used in very rare occasions. Usually you’re better off using full end-to-end pipelines.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • schema (SchemaWrapper) – The dataset table schema

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • metadata (Optional[Dict]) – A dictionary used to update the dataset metadata.

  • overwrite (Optional[bool]) – If True, allow overwrite of an existing dataset.

  • metadata_storage_format (str) – Optional list of datastorage format to use. Currently supported is .json & .msgpack.zstd”

  • metadata_version (Optional[int]) – The dataset metadata version

  • table_name

    The table name of the dataset to be loaded. This creates a namespace for the partitioning like

    dataset_uuid/table_name/*

    This is to support legacy workflows. We recommend not to use this and use the default wherever possible.

plateau.io.eager.delete_dataset(dataset_uuid=None, store=None, factory=None)[source]

Delete the entire dataset from the store.

Parameters:
  • dataset_uuid (str) – The dataset UUID

  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

plateau.io.eager.garbage_collect_dataset(dataset_uuid=None, store=None, factory=None)[source]

Remove auxiliary files that are no longer tracked by the dataset.

These files include indices that are no longer referenced by the metadata as well as files in the directories of the tables that are no longer referenced. The latter is only applied to static datasets.

Parameters:
  • dataset_uuid (str) – The dataset UUID

  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

plateau.io.eager.read_dataset_as_dataframes(dataset_uuid: str | None = None, store=None, columns: Dict[str, List[str]] | None = None, predicate_pushdown_to_io: bool = True, categoricals: List[str] | None = None, dates_as_object: bool = True, predicates: List[List[Tuple[str, str, Any]]] | None = None, factory: DatasetFactory | None = None, dispatch_by: List[str] | None = None) List[DataFrame][source]

Read a dataset as a list of dataframes.

Every element of the list corresponds to a physical partition.

Parameters:
  • dataset_uuid (str) – The dataset UUID

  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • columns – A subset of columns to be loaded.

  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.

  • categoricals – Load the provided subset of columns as a pandas.Categorical.

  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.

  • predicates (List[List[Tuple[str, str, Any]]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

    See also Filtering / Predicate pushdown and Efficient Querying

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

  • dispatch_by (Optional[List[str]]) –

    List of index columns to group and partition the jobs by. There will be one job created for every observed index value combination. This may result in either many very small partitions or in few very large partitions, depending on the index you are using this on.

    Secondary indices

    This is also useable in combination with secondary indices where the physical file layout may not be aligned with the logically requested layout. For optimal performance it is recommended to use this for columns which can benefit from predicate pushdown since the jobs will fetch their data individually and will not shuffle data in memory / over network.

Returns:

Returns a list of pandas.DataFrame. One element per partition

Return type:

List[pandas.DataFrame]

Examples

Dataset in store contains two partitions with two files each

>>> import minimalkv
>>> from plateau.io.eager import read_dataset_as_dataframes

>>> store = minimalkv.get_store_from_url('s3://bucket_with_dataset')

>>> dfs = read_dataset_as_dataframes('dataset_uuid', store, 'core')
plateau.io.eager.read_table(dataset_uuid: str | None = None, store=None, columns: Dict[str, List[str]] | None = None, predicate_pushdown_to_io: bool = True, categoricals: List[str] | None = None, dates_as_object: bool = True, predicates: List[List[Tuple[str, str, Any]]] | None = None, factory: DatasetFactory | None = None) DataFrame[source]

A utility function to load a single table with multiple partitions as a single dataframe in one go. Mostly useful for smaller tables or datasets where all partitions fit into memory.

The order of partitions is not guaranteed to be stable in the resulting dataframe.

Parameters:
  • dataset_uuid (str) – The dataset UUID

  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • columns – A subset of columns to be loaded.

  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.

  • categoricals – Load the provided subset of columns as a pandas.Categorical.

  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.

  • predicates (List[List[Tuple[str, str, Any]]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

    See also Filtering / Predicate pushdown and Efficient Querying

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

Returns:

Returns a pandas.DataFrame holding the data of the requested columns

Return type:

pandas.DataFrame

Examples

Dataset in store contains two partitions with two files each

>>> import minimalkv
>>> from plateau.io.eager import read_table

>>> store = minimalkv.get_store_from_url('s3://bucket_with_dataset')

>>> df = read_table(store, 'dataset_uuid', 'core')
plateau.io.eager.store_dataframes_as_dataset(store: KeyValueStore, dataset_uuid: str, dfs: List[DataFrame | Dict[str, DataFrame]], metadata: Dict[str, Dict[str, Any]] | None = None, partition_on: List[str] | None = None, df_serializer: DataFrameSerializer | None = None, overwrite: bool = False, secondary_indices=None, metadata_storage_format='json', metadata_version=4, table_name: str = 'table')[source]

Utility function to store a list of dataframes as a partitioned dataset with multiple tables (files).

Useful for very small datasets where all data fits into memory.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • metadata (Optional[Dict]) – A dictionary used to update the dataset metadata.

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • df_serializer (Optional[plateau.serialization.DataFrameSerializer]) – A pandas DataFrame serialiser from plateau.serialization

  • overwrite (Optional[bool]) – If True, allow overwrite of an existing dataset.

  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.

  • metadata_storage_format (str) – Optional list of datastorage format to use. Currently supported is .json & .msgpack.zstd”

  • metadata_version (Optional[int]) – The dataset metadata version

  • table_name

    The table name of the dataset to be loaded. This creates a namespace for the partitioning like

    dataset_uuid/table_name/*

    This is to support legacy workflows. We recommend not to use this and use the default wherever possible.

  • dfs – The dataframe(s) to be stored.

plateau.io.eager.update_dataset_from_dataframes(df_list: List[DataFrame | Dict[str, DataFrame]], store: KeyValueStore | None = None, dataset_uuid: str | None = None, delete_scope=None, metadata=None, df_serializer: DataFrameSerializer | None = None, metadata_merger: Callable | None = None, default_metadata_version: int = 4, partition_on: List[str] | None = None, sort_partitions_by: str | None = None, secondary_indices: List[str] | None = None, table_name: str = 'table', factory: DatasetFactory | None = None) DatasetMetadata[source]

Update a plateau dataset in store at once, using a list of dataframes.

Useful for datasets which do not fit into memory.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • delete_scope (List[Dict]) – This defines which partitions are replaced with the input and therefore get deleted. It is a lists of query filters for the dataframe in the form of a dictionary, e.g.: [{‘column_1’: ‘value_1’}, {‘column_1’: ‘value_2’}]. Each query filter will be given to: func: `dataset.query and the returned partitions will be deleted. If no scope is given nothing will be deleted. For plateau.io.dask.update.update_dataset.* a delayed object resolving to a list of dicts is also accepted.

  • metadata (Optional[Dict]) – A dictionary used to update the dataset metadata.

  • df_serializer (Optional[plateau.serialization.DataFrameSerializer]) – A pandas DataFrame serialiser from plateau.serialization

  • metadata_merger (Optional[Callable]) – By default partition metadata is combined using the combine_metadata() function. You can supply a callable here that implements a custom merge operation on the metadata dictionaries (depending on the matches this might be more than two values).

  • default_metadata_version (int) – Default metadata version. (Note: Metadata version greater than 3 are only supported)

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • sort_partitions_by (str) – Provide a column after which the data should be sorted before storage to enable predicate pushdown.

  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.

  • table_name

    The table name of the dataset to be loaded. This creates a namespace for the partitioning like

    dataset_uuid/table_name/*

    This is to support legacy workflows. We recommend not to use this and use the default wherever possible.

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

  • df_list – The dataframe(s) to be stored.

Return type:

The dataset metadata object (DatasetMetadata).

plateau.io.eager.write_single_partition(store: KeyValueStore | None = None, dataset_uuid: str | None = None, data=None, df_serializer: DataFrameSerializer | None = None, metadata_version: int = 4, partition_on: List[str] | None = None, factory=None, secondary_indices=None, table_name: str = 'table')[source]

Write the parquet file(s) for a single partition. This will not update the dataset header and can therefore be used for highly concurrent dataset writes.

For datasets with explicit partitions, the dataset header can be updated by calling plateau.io.eager.commit_dataset() with the output of this function.

Note

It is highly recommended to use the full pipelines whenever possible. This functionality should be used with caution and should only be necessary in cases where traditional pipeline scheduling is not an option.

Note

This function requires an existing dataset metadata file and the schemas for the tables to be present. Either you have ensured that the dataset always exists though some other means or use create_empty_dataset_header() at the start of your computation to ensure the basic dataset metadata is there.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • dataset_uuid (str) – The dataset UUID

  • df_serializer (Optional[plateau.serialization.DataFrameSerializer]) – A pandas DataFrame serialiser from plateau.serialization

  • metadata_version (Optional[int]) – The dataset metadata version

  • partition_on (List) – Column names by which the dataset should be partitioned by physically. These columns may later on be used as an Index to improve query performance. Partition columns need to be present in all dataset tables. Sensitive to ordering.

  • factory (plateau.core.factory.DatasetFactory) – A DatasetFactory holding the store and UUID to the source dataset.

  • secondary_indices (List[str]) – A list of columns for which a secondary index should be calculated.

  • table_name

    The table name of the dataset to be loaded. This creates a namespace for the partitioning like

    dataset_uuid/table_name/*

    This is to support legacy workflows. We recommend not to use this and use the default wherever possible.

  • data (Dict) – The input is defined according to parse_input_to_metapartition()

Return type:

An empty MetaPartition referencing the new files