plateau.serialization package

Submodules

Module contents

class plateau.serialization.CsvSerializer(compress=True)[source]

Bases: DataFrameSerializer

static restore_dataframe(store: KeyValueStore, key: str, filter_query: str | None = None, columns: Iterable[str] | None = None, predicate_pushdown_to_io: Any = None, categories: Iterable[str] | None = None, predicates: List[List[Tuple[str, str, LiteralValue]]] | None = None, date_as_object: Any = None, **kwargs)[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters:
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store, key_prefix, df)[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters:
  • store (minimalkv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns:

The actual key where the DataFrame is stored.

Return type:

str

class plateau.serialization.DataFrameSerializer[source]

Bases: object

Abstract class that supports serializing DataFrames to/from minimalkv stores.

classmethod register_serializer(suffix, serializer)[source]
classmethod restore_dataframe(store: KeyValueStore, key: str, filter_query: str | None = None, columns: Iterable[str] | None = None, predicate_pushdown_to_io: bool = True, categories: Iterable[str] | None = None, predicates: List[List[Tuple[str, str, LiteralValue]]] | None = None, date_as_object: bool = False) DataFrame[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters:
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store: KeyValueStore, key_prefix: str, df: DataFrame) str[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters:
  • store (minimalkv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns:

The actual key where the DataFrame is stored.

Return type:

str

type_stable = False
class plateau.serialization.ParquetSerializer(compression: str = 'SNAPPY', chunk_size: int | None = None)[source]

Bases: DataFrameSerializer

Serializer to store a pandas.DataFrame as parquet.

On top of the plain serialization, this class handles forward and backwards compatibility between pyarrow versions.

Parameters:
  • compression – The compression algorithm to be used for the parquet file. For a comprehensive list of available compression algorithms, please see pyarrow.parquet.write_table(). The default is set to “SNAPPY” which usually offers a good balance between performance and compression rate. Depending on your data, picking a different algorithm may have vastly different characteristics and we can only recommend to test this on your own data. Depending on the reader parquet implementation, some compression algorithms may not be supported and we recommend to consult the documentation of the reader libraries first.

  • chunk_size – The number of rows stored in a Parquet RowGroup. To leverage predicate pushdown, it is necessary to set this value. We do not apply any default value since a good choice is very sensitive to the kind of data you are using and what kind of storage. A typical range to try out would be somewhere between 50k-200k. To fully leverage row group statistics, it is highly recommended to sort the file before serialization.

Notes

Regarding type stability and supported types there are a few known limitations users should be aware of.

  • pandas.Categorical

    plateau offers the keyword argument categories which contains a list of field names which are supposed to retrieved as a pandas.Categorical.

    See also Dictionary Encoding

    In [1]: ser = ParquetSerializer()
    
    In [2]: df = pd.DataFrame({"cat_field": pd.Categorical(["A"])})
    
    In [3]: df.dtypes
    Out[3]: 
    cat_field    category
    dtype: object
    
    In [4]: ser.restore_dataframe(store, ser.store(store, "cat", df))
    Out[4]: 
      cat_field
    0         A
    
    In [5]: ser.restore_dataframe(store, ser.store(store, "cat", df), categories=["cat_field"])
    Out[5]: 
      cat_field
    0         A
    
  • Timestamps with nanosecond resolution

    Timestamps can only be stored in micro second (us) accuracy. Trying to do differently may raise an exception.

    See also Timestamp

    In [6]: import pyarrow as pa
    
    In [7]: pa.__version__
    Out[7]: '15.0.2'
    
    In [8]: df = pd.DataFrame({"nanosecond": [pd.Timestamp("2021-01-01 00:00:00.0000001")]})
    
    # nanosecond resolution
    In [9]: ser.store(store, "key", df)
    ---------------------------------------------------------------------------
    ArrowInvalid                              Traceback (most recent call last)
    Cell In[9], line 1
    ----> 1 ser.store(store, "key", df)
    
    File ~/checkouts/readthedocs.org/user_builds/plateau/conda/latest/lib/python3.12/site-packages/plateau/serialization/_parquet.py:378, in ParquetSerializer.store(self, store, key_prefix, df)
        375     table = pa.Table.from_pandas(df)
        376 buf = pa.BufferOutputStream()
    --> 378 pq.write_table(
        379     table,
        380     buf,
        381     version=PARQUET_VERSION,
        382     chunk_size=self.chunk_size,
        383     compression=self.compression,
        384     coerce_timestamps="us",
        385 )
        386 store.put(key, buf.getvalue().to_pybytes())
        387 return key
    
    File ~/checkouts/readthedocs.org/user_builds/plateau/conda/latest/lib/python3.12/site-packages/pyarrow/parquet/core.py:1908, in write_table(table, where, row_group_size, version, use_dictionary, compression, write_statistics, use_deprecated_int96_timestamps, coerce_timestamps, allow_truncated_timestamps, data_page_size, flavor, filesystem, compression_level, use_byte_stream_split, column_encoding, data_page_version, use_compliant_nested_type, encryption_properties, write_batch_size, dictionary_pagesize_limit, store_schema, write_page_index, write_page_checksum, sorting_columns, **kwargs)
       1882 try:
       1883     with ParquetWriter(
       1884             where, table.schema,
       1885             filesystem=filesystem,
       (...)
       1906             sorting_columns=sorting_columns,
       1907             **kwargs) as writer:
    -> 1908         writer.write_table(table, row_group_size=row_group_size)
       1909 except Exception:
       1910     if _is_path_like(where):
    
    File ~/checkouts/readthedocs.org/user_builds/plateau/conda/latest/lib/python3.12/site-packages/pyarrow/parquet/core.py:1104, in ParquetWriter.write_table(self, table, row_group_size)
       1099     msg = ('Table schema does not match schema used to create file: '
       1100            '\ntable:\n{!s} vs. \nfile:\n{!s}'
       1101            .format(table.schema, self.schema))
       1102     raise ValueError(msg)
    -> 1104 self.writer.write_table(table, row_group_size=row_group_size)
    
    File ~/checkouts/readthedocs.org/user_builds/plateau/conda/latest/lib/python3.12/site-packages/pyarrow/_parquet.pyx:2180, in pyarrow._parquet.ParquetWriter.write_table()
    
    File ~/checkouts/readthedocs.org/user_builds/plateau/conda/latest/lib/python3.12/site-packages/pyarrow/error.pxi:91, in pyarrow.lib.check_status()
    
    ArrowInvalid: Casting from timestamp[ns] to timestamp[us] would lose data: 1609459200000000100
    
classmethod restore_dataframe(store: KeyValueStore, key: str, filter_query: str | None = None, columns: Iterable[str] | None = None, predicate_pushdown_to_io: bool = True, categories: Iterable[str] | None = None, predicates: List[List[Tuple[str, str, LiteralValue]]] | None = None, date_as_object: bool = False) DataFrame[source]

Load a DataFrame from the specified store. The key is also used to detect the used format.

Parameters:
  • store – store engine

  • key – Key that specifies a path where object should be retrieved from the store resource.

  • filter_query – Optional query to filter the DataFrame. Must adhere to the specification of pandas.DataFrame.query.

  • columns – Only read in listed columns. When set to None, the full file will be read in.

  • predicate_pushdown_to_io – Push predicates through to the I/O layer, default True. Disable this if you see problems with predicate pushdown for the given file even if the file format supports it. Note that this option only hides problems in the store layer that need to be addressed there.

  • categories – Columns that should be loaded as categoricals.

  • predicates

    Optional list of predicates, like [[(‘x’, ‘>’, 0), …], that are used to filter the resulting DataFrame, possibly using predicate pushdown, if supported by the file format. This parameter is not compatible with filter_query.

    Predicates are expressed in disjunctive normal form (DNF). This means that the innermost tuple describe a single column predicate. These inner predicate make are all combined with a conjunction (AND) into a larger predicate. The most outer list then combines all predicates with a disjunction (OR). By this, we should be able to express all kinds of predicates that are possible using boolean logic.

  • date_as_object – Retrieve all date columns as an object column holding datetime.date objects instead of pd.Timestamp. Note that this option only works for type-stable serializers, e.g. ParquetSerializer.

store(store, key_prefix, df)[source]

Persist a DataFrame to the specified store.

The used store format (e.g. Parquet) will be appended to the key.

Parameters:
  • store (minimalkv.KeyValueStore) – store engine

  • key_prefix (str) – Key prefix that specifies a path where object should be stored on the store resource. The used file format will be appended to the key.

  • df (pandas.DataFrame or pyarrow.Table) – DataFrame that shall be persisted

Returns:

The actual key where the DataFrame is stored.

Return type:

str

type_stable = True
plateau.serialization.check_predicates(predicates: List[List[Tuple[str, str, LiteralValue]]] | None) None[source]

Check if predicates are well-formed.

plateau.serialization.columns_in_predicates(predicates: List[List[Tuple[str, str, LiteralValue]]] | None) Set[str][source]

Determine all columns which are mentioned in the list of predicates.

Parameters:

predicates – The predicates to be scaned.

plateau.serialization.default_serializer()[source]
plateau.serialization.filter_array_like(array_like, op: str, value, mask=None, out=None, strict_date_types: bool = False, column_name: str | None = None)[source]

Filter an array-like object using operations defined in the predicates.

Parameters:
  • array_like

    The array like object to be filtered

    See also pandas.api.types.is_array_like

  • op

  • value

  • mask – A boolean array like object which will be combined with the result of this evaluation using a logical AND. If an array with all True is given, it will be the same result as if left empty

  • out – An array into which the result is stored. If provided, it must have a shape that the inputs broadcast to. If not provided or None, a freshly-allocated array is returned.

  • strict_date_types – If False (default), cast all datelike values to datetime64 for comparison.

  • column_name – Name of the column where array_like originates from, used for nicer error messages.

plateau.serialization.filter_df(df, filter_query=None)[source]

General implementation of query filtering.

Serialisation formats such as Parquet that support predicate push- down may pre-filter in their own implementations.

plateau.serialization.filter_df_from_predicates(df: DataFrame, predicates: List[List[Tuple[str, str, LiteralValue]]] | None, strict_date_types: bool = False) DataFrame[source]

Filter a pandas.DataFrame based on predicates in disjunctive normal form.

Parameters:
  • df – The pandas DataFrame to be filtered

  • predicates – Predicates in disjunctive normal form (DNF). For a thorough documentation, see DataFrameSerializer.restore_dataframe If None, the df is returned unmodified

  • strict_date_types – If False (default), cast all datelike values to datetime64 for comparison.

plateau.serialization.filter_predicates_by_column(predicates: List[List[Tuple[str, str, LiteralValue]]] | None, columns: List[str]) List[List[Tuple[str, str, LiteralValue]]] | None[source]

Takes a predicate list and removes all literals which are not referencing one of the given column.

In [1]: from plateau.serialization import filter_predicates_by_column

In [2]: predicates = [[("A", "==", 1), ("B", "<", 5)], [("C", "==", 4)]]

In [3]: filter_predicates_by_column(predicates, ["A"])
Out[3]: [[('A', '==', 1)]]
Parameters:
  • predicates – A list of predicates to be filtered

  • columns – A list of all columns allowed in the output