plateau.io_components.metapartition module

class plateau.io_components.metapartition.MetaPartition(label: str | None, file: str | None = None, table_name: str = 'table', data: DataFrame | None = None, indices: Dict[Any, Any] | None = None, metadata_version: int | None = None, schema: SchemaWrapper | None = None, partition_keys: Sequence[str] | None = None, logical_conjunction: List[Tuple[Any, str, Any]] | None = None)[source]

Bases: Iterable

Wrapper for plateau partition which includes additional information about the parent dataset.

add_metapartition(metapartition: MetaPartition, schema_validation: bool = True)[source]

Adds a metapartition to the internal list structure to enable batch processing.

Parameters:
  • metapartition – The MetaPartition to be added.

  • schema_validation – If True (default), ensure that the table_meta of both MetaPartition objects are the same

apply(func: Callable, type_safe: bool = False) MetaPartition[source]

Applies a given function to all dataframes of the MetaPartition.

Parameters:
  • func – A callable accepting and returning a pandas.DataFrame

  • uuid – The changed dataset is assigned a new UUID.

  • type_safe – If the transformation is type-safe, optimizations can be applied

as_sentinel()[source]
build_indices(columns: Iterable[str])[source]

This builds the indices for this metapartition for the given columns. The indices for the passed columns are rebuilt, so exisiting index entries in the metapartition are overwritten.

Parameters:

columns – A list of columns from which the indices over all dataframes in the metapartition are overwritten

Returns:

self

static concat_metapartitions(metapartitions, label_merger=None)[source]
copy(**kwargs)[source]

Creates a shallow copy where the kwargs overwrite existing attributes.

property data
delete_from_store(dataset_uuid: Any, store: str | KeyValueStore | Callable[[], KeyValueStore]) MetaPartition[source]
property file: str
static from_dict(dct)[source]

Create a MetaPartition from a dictionary.

Parameters:

dct (dict) – Dictionary containing constructor arguments as keys

static from_partition(partition: Partition, data: DataFrame | None = None, indices: Dict | None = None, metadata_version: int | None = None, schema: SchemaWrapper | None = None, partition_keys: List[str] | None = None, logical_conjunction: List[Tuple[Any, str, Any]] | None = None, table_name: str = 'table')[source]

Transform a plateau Partition into a MetaPartition.

Parameters:
  • partition – The plateau partition to be wrapped

  • data – A dictionaries with materialised DataFrame

  • indices (dict) – The index dictionary of the dataset

  • schema – Type metadata for each table, optional

  • metadata_version

  • partition_keys – A list of the primary partition keys

Return type:

MetaPartition

get_parquet_metadata(store: str | KeyValueStore | Callable[[], KeyValueStore]) DataFrame[source]

Retrieve the parquet metadata for the MetaPartition. Especially relevant for calculating dataset statistics.

Parameters:
  • store – A factory function providing a KeyValueStore

  • table_name – Name of the plateau table for which the statistics should be retrieved

Returns:

A DataFrame with relevant parquet metadata

Return type:

pd.DataFrame

property indices
property is_sentinel: bool
property label: str
load_dataframes(store: KeyValueStore, columns: Sequence[str] | None = None, predicate_pushdown_to_io: bool = True, categoricals: Sequence[str] | None = None, dates_as_object: bool = True, predicates: List[List[Tuple[str, str, LiteralValue]]] | None = None) MetaPartition[source]

Load the dataframes of the partitions from store into memory.

Parameters:
  • store (Callable or str or minimalkv.KeyValueStore) –

    The store where we can find or store the dataset.

    Can be either minimalkv.KeyValueStore, a minimalkv store url or a generic Callable producing a minimalkv.KeyValueStore

  • columns – A subset of columns to be loaded.

  • predicate_pushdown_to_io (bool) – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the storage layer that need to be addressed there.

  • categoricals – Load the provided subset of columns as a pandas.Categorical.

  • dates_as_object (bool) – Load pyarrow.date{32,64} columns as object columns in Pandas instead of using np.datetime64 to preserve their type. While this improves type-safety, this comes at a performance cost.

  • predicates (List[List[Tuple[str, str, Any]]) –

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describes a single column predicate. These inner predicates are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

    Available operators are: ==, !=, <=, >=, <, > and in.

    Filtering for missings is supported with operators ==, != and in and values np.nan and None for float and string columns respectively.

    Categorical data

    When using order sensitive operators on categorical data we will assume that the categories obey a lexicographical ordering. This filtering may result in less than optimal performance and may be slower than the evaluation on non-categorical data.

    See also Filtering / Predicate pushdown and Efficient Querying

  • tables

    If a list is supplied, only the given tables of the partition are loaded. If the given table does not exist it is ignored.

    Examples

    >>> part = MetaPartition(
    ...     label='part_label'
    ...     files={
    ...         'core': 'core_key_in_store',
    ...         'helper': 'helper_key_in_store'
    ...     }
    ...  )
    >>> part.data
        {}
    >>> part = part.load_dataframes(store, ['core'])
    >>> part.data
        {
            'core': pd.DataFrame()
        }
    

load_schema(store: str | KeyValueStore | Callable[[], KeyValueStore], dataset_uuid: str) MetaPartition[source]

Loads all table metadata in memory and stores it under the tables attribute.

static merge_indices(metapartitions)[source]
property partition: Partition
partition_on(partition_on: str | Sequence[str])[source]

Partition all dataframes assigned to this MetaPartition according the the given columns.

If the MetaPartition object contains index information, the information is split in such a way that they reference the new partitions.

In case a requested partition column is not existent in all tables, a KeyError is raised.

All output partitions are re-assigned labels encoding the partitioned columns (urlencoded)

Examples:

>>> import pandas as pd
>>> from plateau.io_components.metapartition import MetaPartition
>>> mp = MetaPartition(
...     label='partition_label',
...     data={
...         "Table1": pd.DataFrame({
...             'P': [1, 2, 1, 2],
...             'L': [1, 1, 2, 2]
...         })
...     }
... )
>>> repartitioned_mp = mp.partition_on(['P', 'L'])
>>> assert [mp["label"] for mp in repartitioned_mp.metapartitions] == [
...     "P=1/L=1/partition_label",
...     "P=1/L=2/partition_label",
...     "P=2/L=1/partition_label",
...     "P=2/L=2/partition_label"
... ]
Parameters:

partition_on

remove_dataframes()[source]

Remove all dataframes from the metapartition in memory.

store_dataframes(store: str | KeyValueStore | Callable[[], KeyValueStore], dataset_uuid: str, df_serializer: DataFrameSerializer | None = None) MetaPartition[source]

Stores all dataframes of the MetaPartitions and registers the saved files under the files atrribute. The dataframe itself is deleted from memory.

Parameters:
  • store – If it is a function, the result of calling it must be a KeyValueStore.

  • dataset_uuid – The dataset UUID the partition will be assigned to

  • df_serializer – Serialiser to be used to store the dataframe

Return type:

MetaPartition

to_dict()[source]
validate_schema_compatible(store: str | KeyValueStore | Callable[[], KeyValueStore], dataset_uuid: str) MetaPartition[source]

Validates that the currently held DataFrames match the schema of the existing dataset.

Parameters:
  • store – If it is a function, the result of calling it must be a KeyValueStore.

  • dataset_uuid – The dataset UUID the partition will be assigned to

class plateau.io_components.metapartition.MetaPartitionIterator(metapartition)[source]

Bases: Iterator

next()

Return the next item from the iterator. When exhausted, raise StopIteration

plateau.io_components.metapartition.parse_input_to_metapartition(obj: DataFrame | Sequence | MetaPartition | None, table_name: str = 'table', metadata_version: int | None = None) MetaPartition[source]

Parses given user input and return a MetaPartition.

The expected input is a pandas.DataFrame or a list of pandas.DataFrame.

Every element of the list will be treated as a dedicated user input and will result in a physical file, if not specified otherwise.

Parameters:
  • obj

  • table_name – The table name assigned to the partitions

  • metadata_version – The plateau dataset specification version

plateau.io_components.metapartition.partition_labels_from_mps(mps: List[MetaPartition]) List[str][source]

Get a list of partition labels, flattening any nested meta partitions in the input and ignoring sentinels.