Getting Started
When working with plateau tables as a Python user, we will use DataFrame
as the user-facing type.
We typically expect that the contents of a dataset are large, often too large to be held in memory by a single machine but for demonstration purposes, we will use a small DataFrame with a mixed set of types.
In [1]: import numpy as np
In [2]: import pandas as pd
In [3]: df = pd.DataFrame(
...: {
...: "A": 1.0,
...: "B": pd.Timestamp("20130102"),
...: "C": pd.Series(1, index=list(range(4)), dtype="float32"),
...: "D": np.array([3] * 4, dtype="int32"),
...: "E": pd.Categorical(["test", "train", "test", "prod"]),
...: "F": "foo",
...: }
...: )
...:
In [4]: another_df = pd.DataFrame(
...: {
...: "A": 5.0,
...: "B": pd.Timestamp("20110102"),
...: "C": pd.Series(1, index=list(range(4)), dtype="float32"),
...: "D": np.array([12] * 4, dtype="int32"),
...: "E": pd.Categorical(["prod", "train", "test", "train"]),
...: "F": "bar",
...: }
...: )
...:
Defining the storage location
We want to store this DataFrame now as a dataset. Therefore, we first need to connect to a storage location.
We define a store factory as a callable which contains the storage information.
We will use minimalkv in this example to construct such a store factory
for the local filesystem (hfs://
indicates we are using the local filesystem and
what follows is the filepath).
In [5]: from functools import partial
In [6]: from tempfile import TemporaryDirectory
In [7]: from minimalkv import get_store_from_url
In [8]: dataset_dir = TemporaryDirectory()
In [9]: store_url = f"hfs://{dataset_dir.name}"
Storage locations
minimalkv offers support for several stores in plateau, these can be created using the function minimalkv.get_store_from_url with one of the following prefixes:
hfs
: Local filesystemhazure
: AzureBlockBlobStoragehs3
: BotoStore (Amazon S3)
Interface
plateau can write to any location that
fulfills the minimalkv.KeyValueStore interface as long as they
support ExtendedKeyspaceMixin
(this is necessary so that /
can be used in the storage key name).
For more information, take a look out at the minimalkv documentation.
Writing data to storage
Now that we have some data and a location to store it in, we can persist it as a
dataset. To do so, we will use store_dataframes_as_dataset()
to store the DataFrame
df
that we already have in memory.
In [10]: from plateau.api.dataset import store_dataframes_as_dataset
In [11]: df.dtypes.equals(another_df.dtypes) # both have the same schema
Out[11]: True
In [12]: dm = store_dataframes_as_dataset(
....: store_url, "a_unique_dataset_identifier", [df, another_df]
....: )
....:
Scheduling backends
The import path of this function already gives us a hint about the general
structuring of the plateau modules. In plateau.io
we have all
the building blocks to build data pipelines that read and write from/to storages.
The next module level (e.g. eager
) describes the scheduling backend.
The scheduling backends currently supported by plateau are:
eager
runs all execution immediately and on the local machine.iter
executes operations on the dataset using a generator/iterator interface. The standard format to read/store dataframes initer
is by providing a generator of dataframes.dask
is suitable for larger datasets. It can be used to work on datasets in parallel or even in a cluster by usingdask.distributed
as the backend. There are alsodask.bag
anddask.dataframe
which support I/O operations for the respective dask collections.
After calling store_dataframes_as_dataset()
,
a DatasetMetadata
object is returned.
This class holds information about the structure and schema of the dataset.
In [13]: dm.table_name
Out[13]: 'table'
In [14]: sorted(dm.partitions.keys())
Out[14]: ['64d476f48a5e43d7a66e4684ae228bc4', '7e7e1216cbea470fb793801cf7660c4a']
In [15]: dm.schema.remove_metadata()
Out[15]:
A: double
B: timestamp[ns]
C: double
D: int64
E: string
F: string
For this guide we want to take a closer look at the partitions
attribute.
partitions
are the physical “pieces” of data which together constitute the
contents of a dataset. Data is written to storage on a per-partition basis. See
the section on partitioning for further details: Partitioning.
The attribute schema
can be accessed to see the underlying schema of the dataset.
See Table type system for more information.
To store multiple dataframes into a dataset, it is possible to pass a collection of dataframes; the exact format will depend on the I/O backend used.
plateau assumes these dataframes are different chunks of the same table and
will therefore be required to have the same schema. A ValueError
will be
thrown otherwise.
For example,
In [16]: df2 = pd.DataFrame(
....: {
....: "G": "foo",
....: "H": pd.Categorical(["test", "train", "test", "train"]),
....: "I": np.array([9] * 4, dtype="int32"),
....: "J": pd.Series(3, index=list(range(4)), dtype="float32"),
....: "K": pd.Timestamp("20190604"),
....: "L": 2.0,
....: }
....: )
....:
In [17]: df.dtypes.equals(df2.dtypes) # schemas are different!
Out[17]: False
In [18]: store_dataframes_as_dataset(
....: store_url,
....: "will_not_work",
....: [df, df2],
....: )
....:
---------------------------------------------------------------------------
ValueError: Schema violation
Origin schema: {table/9e7d9217c82b4fda9c4e720dc987c60d}
Origin reference: {table/80feb4d84ac34a9c9d08ba48c8170647}
Note
Read these sections for more details: Table type system, Specification
Reading data from storage
After we have written the data, we may want to read it back in again. For this we can
use read_table()
. This method returns the complete
table of the dataset as a pandas DataFrame.
In [19]: from plateau.api.dataset import read_table
In [20]: read_table("a_unique_dataset_identifier", store_url)
Out[20]:
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
1 1.0 2013-01-02 1.0 3 train foo
2 1.0 2013-01-02 1.0 3 test foo
3 1.0 2013-01-02 1.0 3 prod foo
4 5.0 2011-01-02 1.0 12 prod bar
5 5.0 2011-01-02 1.0 12 train bar
6 5.0 2011-01-02 1.0 12 test bar
7 5.0 2011-01-02 1.0 12 train bar
We can also read a dataframe iteratively, using
read_dataset_as_dataframes__iterator()
. This will return a generator of pandas.DataFrame
where every element represents one file. For example,
In [21]: from plateau.api.dataset import read_dataset_as_dataframes__iterator
In [22]: for partition_index, df in enumerate(
....: read_dataset_as_dataframes__iterator(
....: dataset_uuid="a_unique_dataset_identifier", store=store_url
....: )
....: ):
....: print(f"Partition #{partition_index}")
....: print(f"Data: \n{df}")
....:
Partition #0
Data:
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
1 1.0 2013-01-02 1.0 3 train foo
2 1.0 2013-01-02 1.0 3 test foo
3 1.0 2013-01-02 1.0 3 prod foo
Partition #1
Data:
A B C D E F
0 5.0 2011-01-02 1.0 12 prod bar
1 5.0 2011-01-02 1.0 12 train bar
2 5.0 2011-01-02 1.0 12 test bar
3 5.0 2011-01-02 1.0 12 train bar
Respectively, the dask.delayed
back-end provides the function
read_dataset_as_delayed()
, which has a very similar
interface to the read_dataset_as_dataframes__iterator()
function but returns a collection of dask.delayed
objects.
Filtering using predicates
It is possible to filter data during reads using simple predicates by using
the predicates
argument. Technically speaking, plateau supports predicates
in disjunctive normal form.
When this argument is defined, plateau uses the Apache Parquet metadata as well as indices and partition information to speed up queries when possible. How this works is a complex topic, see Efficient Querying.
In [23]: read_table("a_unique_dataset_identifier", store_url, predicates=[[("A", "<", 2.5)]])
Out[23]:
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
1 1.0 2013-01-02 1.0 3 train foo
2 1.0 2013-01-02 1.0 3 test foo
3 1.0 2013-01-02 1.0 3 prod foo