"""This module is a collection of tests which should be implemented by all
plateau.
**read** backends. The tests are not subject to the semantic versioning scheme and may change with minor or even patch releases.
To use the tests of this module, add the following import statement to your test module and ensure that the following fixtures are available in your test environment.
```
from plateau.io.testing.read import * # noqa
```
Fixtures required to be implemented:
* ``output_type`` - One of {`dataframe`, `metpartition`, `table`} to define the outptu type of the returned result.
* ``bound_load_dataframes`` - A callable which will retrieve the partitions in the format specified by ``output_type``. The callable should accept all keyword arguments expected for a plateau reader.
Source test data
* ``dataset`` - A fixture generating test data (TODO: Expose this as a testing function)
* ``store_factory`` - A function scoped store factory
* ``store_session_factory`` - A session scoped store factory
Feature toggles (optional):
The following fixtures should be present (see tests.read.conftest)
* ``use_categoricals`` - Whether or not the call retrievs categorical data.
* ``dates_as_object`` - Whether or not the call retrievs date columns as objects.
"""
import datetime
from functools import partial
from itertools import permutations
import pandas as pd
import pandas.testing as pdt
import pytest
from minimalkv import get_store_from_url
from plateau.core._compat import PANDAS_3, pandas_infer_string
from plateau.io.eager import store_dataframes_as_dataset
from plateau.io.iter import store_dataframes_as_dataset__iter
from plateau.io_components.metapartition import SINGLE_TABLE, MetaPartition
[docs]
@pytest.fixture(params=[True, False], ids=["use_categoricals", "no_categoricals"])
def use_categoricals(request):
return request.param
[docs]
@pytest.fixture(params=[True, False], ids=["dates_as_object", "datest_as_datetime"])
def dates_as_object(request):
return request.param
[docs]
@pytest.fixture(params=[True, False], ids=["use_factory", "no_factory"])
def use_dataset_factory(request, dates_as_object):
return request.param
[docs]
class NoPickle:
def __getstate__(self):
raise RuntimeError("do NOT pickle this object!")
[docs]
def mark_nopickle(obj):
obj._nopickle = NoPickle()
[docs]
def no_pickle_store(url):
store = get_store_from_url(url)
mark_nopickle(store)
return store
[docs]
def no_pickle_factory(url):
return partial(no_pickle_store, url)
def _perform_read_test(
dataset_uuid,
store_factory,
execute_read_callable,
use_categoricals,
output_type,
dates_as_object,
read_kwargs=None,
ds_factory=None,
):
if not read_kwargs:
read_kwargs = {}
if use_categoricals:
# dataset_with_index has an index on L but not on P
categoricals = ["P", "L"]
else:
categoricals = None
result = execute_read_callable(
dataset_uuid=dataset_uuid,
store=store_factory,
factory=ds_factory,
categoricals=categoricals,
dates_as_object=dates_as_object,
**read_kwargs,
)
assert len(result) == 2
if output_type == "metapartition":
for res in result:
assert isinstance(res, MetaPartition)
result = [mp.data for mp in result]
def sort_by(obj):
return obj[SINGLE_TABLE].P.iloc[0]
elif output_type == "table":
assert isinstance(result[0], pd.DataFrame)
assert "P" in result[0]
def sort_by(obj):
return obj.P.iloc[0]
else:
assert isinstance(result[0], pd.DataFrame)
assert "P" in result[0]
def sort_by(obj):
return obj.P.iloc[0]
result = sorted(result, key=sort_by)
expected_df_core_1 = pd.DataFrame(
{"P": [1], "L": [1], "TARGET": [1], "DATE": [datetime.date(2010, 1, 1)]}
)
expected_df_core_2 = pd.DataFrame(
{"P": [2], "L": [2], "TARGET": [2], "DATE": [datetime.date(2009, 12, 31)]}
)
expected_dfs = [
expected_df_core_1,
expected_df_core_2,
]
for res, expected_df_core in zip(result, expected_dfs, strict=False):
if not dates_as_object:
expected_df_core["DATE"] = pd.to_datetime(expected_df_core["DATE"])
if use_categoricals:
expected_df_core = expected_df_core.astype(
{"P": "category", "L": "category"}
)
pdt.assert_frame_equal(
res.reset_index(drop=True),
expected_df_core.reset_index(drop=True),
check_dtype=False,
check_like=True,
check_categorical=False,
)
[docs]
@pytest.mark.parametrize(
"predicates",
[
[[("P", "==", 2)]],
[[("P", "in", [2])]],
[[("P", "!=", 1)]],
[[("P", ">", 1)]],
[[("P", ">=", 2)]],
],
)
def test_read_dataset_as_dataframes_predicate(
dataset, store_session_factory, bound_load_dataframes, predicates, output_type
):
if output_type != "dataframe":
pytest.skip()
result = bound_load_dataframes(
dataset_uuid=dataset.uuid,
store=store_session_factory,
predicates=predicates,
)
core_result = pd.concat(result)
expected_core = pd.DataFrame(
{"P": [2], "L": [2], "TARGET": [2], "DATE": [datetime.date(2009, 12, 31)]}
)
pdt.assert_frame_equal(
core_result, expected_core, check_dtype=False, check_like=True
)
[docs]
@pytest.mark.parametrize(
"predicates",
[
[[("P", "==", 2), ("TARGET", "==", 2)]],
[[("P", "in", [2]), ("TARGET", "==", 2)]],
[[("P", "!=", 1), ("L", "==", 2)]],
[[("P", "!=", 1), ("L", "in", [2])]],
[[("P", ">", 2)], [("TARGET", ">=", 2)]],
[[("P", ">=", 2)], [("TARGET", ">=", 2)]],
],
)
def test_read_dataset_as_dataframes_predicate_with_partition_keys(
dataset_partition_keys,
store_session_factory,
bound_load_dataframes,
predicates,
output_type,
):
if output_type != "dataframe":
pytest.skip()
result = bound_load_dataframes(
dataset_uuid=dataset_partition_keys.uuid,
store=store_session_factory,
predicates=predicates,
)
core_result = pd.concat(result)
expected_core = pd.DataFrame(
{"P": [2], "L": [2], "TARGET": [2], "DATE": [datetime.date(2009, 12, 31)]}
)
pdt.assert_frame_equal(
core_result, expected_core, check_dtype=False, check_like=True
)
[docs]
def test_read_dataset_as_dataframes_predicate_empty(
dataset_partition_keys,
store_session_factory,
output_type,
bound_load_dataframes,
):
if output_type != "dataframe":
pytest.skip()
result = bound_load_dataframes(
dataset_uuid=dataset_partition_keys.uuid,
store=store_session_factory,
predicates=[[("P", "==", -42)]],
columns={SINGLE_TABLE: ["P", "L", "TARGET"]},
)
assert len(result) == 0
def _gen_partition(b_c):
b, c = b_c
return pd.DataFrame({"a": [1], "b": [b], "c": c})
[docs]
def test_read_dataset_as_dataframes_dispatch_by_empty(
store_session_factory,
dataset_dispatch_by,
bound_load_dataframes,
backend_identifier,
output_type,
metadata_version,
dataset_dispatch_by_uuid,
):
if output_type == "table":
pytest.skip()
# Dispatch by primary index "A"
dispatched = bound_load_dataframes(
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=[],
)
assert len(dispatched) == 1
[docs]
@pytest.mark.parametrize("dispatch_by", ["A", "B", "C"])
def test_read_dataset_as_dataframes_dispatch_by_single_col(
store_session_factory,
dataset_dispatch_by,
bound_load_dataframes,
backend_identifier,
dispatch_by,
output_type,
metadata_version,
dataset_dispatch_by_uuid,
):
if output_type == "table":
pytest.skip()
# Dispatch by primary index "A"
dispatched_a = bound_load_dataframes(
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=[dispatch_by],
)
unique_a = set()
for data in dispatched_a:
unique_dispatch = data[dispatch_by].unique()
assert len(unique_dispatch) == 1
assert unique_dispatch[0] not in unique_a
unique_a.add(unique_dispatch[0])
[docs]
@pytest.fixture(scope="session")
def dataset_dispatch_by_uuid():
import uuid
return uuid.uuid1().hex
[docs]
@pytest.fixture(scope="session")
def dataset_dispatch_by(
metadata_version, store_session_factory, dataset_dispatch_by_uuid
):
cluster1 = pd.DataFrame(
{"A": [1, 1], "B": [10, 10], "C": [1, 2], "Content": ["cluster1", "cluster1"]}
)
cluster2 = pd.DataFrame(
{"A": [1, 1], "B": [10, 10], "C": [2, 3], "Content": ["cluster2", "cluster2"]}
)
cluster3 = pd.DataFrame({"A": [1], "B": [20], "C": [1], "Content": ["cluster3"]})
cluster4 = pd.DataFrame(
{"A": [2, 2], "B": [10, 10], "C": [1, 2], "Content": ["cluster4", "cluster4"]}
)
clusters = [cluster1, cluster2, cluster3, cluster4]
store_dataframes_as_dataset__iter(
df_generator=clusters,
store=store_session_factory,
dataset_uuid=dataset_dispatch_by_uuid,
metadata_version=metadata_version,
partition_on=["A", "B"],
secondary_indices=["C"],
)
return pd.concat(clusters).sort_values(["A", "B", "C"]).reset_index(drop=True)
[docs]
def test_read_dataset_as_dataframes_dispatch_by_multi_col(
store_session_factory,
bound_load_dataframes,
output_type,
dataset_dispatch_by,
dataset_dispatch_by_uuid,
):
if output_type == "table":
pytest.skip()
for dispatch_by in permutations(("A", "B", "C"), 2):
dispatched = bound_load_dataframes(
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=dispatch_by,
)
uniques = pd.DataFrame(columns=dispatch_by)
for part in dispatched:
if isinstance(part, MetaPartition):
data = part.data
else:
data = part
unique_dispatch = data[list(dispatch_by)].drop_duplicates()
assert len(unique_dispatch) == 1
uniques = pd.concat([uniques, unique_dispatch])
assert not any(uniques.duplicated())
[docs]
@pytest.mark.parametrize(
"dispatch_by, predicates, expected_dispatches",
[
# This should only dispatch one partition since there is only
# one file with valid data points
(["A"], [[("C", ">", 2)]], 1),
# We dispatch and restrict to one valie, i.e. one dispatch
(["B"], [[("B", "==", 10)]], 1),
# The same is true for a non-partition index col
(["C"], [[("C", "==", 1)]], 1),
# A condition where both primary and secondary indices need to work together
(["A", "C"], [[("A", ">", 1), ("C", "<", 3)]], 2),
],
)
def test_read_dispatch_by_with_predicates(
store_session_factory,
dataset_dispatch_by_uuid,
bound_load_dataframes,
dataset_dispatch_by,
dispatch_by,
output_type,
expected_dispatches,
predicates,
):
if output_type == "table":
pytest.skip()
dispatched = bound_load_dataframes(
dataset_uuid=dataset_dispatch_by_uuid,
store=store_session_factory,
dispatch_by=dispatch_by,
predicates=predicates,
)
assert len(dispatched) == expected_dispatches, dispatched
[docs]
def test_read_dataset_as_dataframes(
dataset,
store_session_factory,
dataset_factory,
use_dataset_factory,
bound_load_dataframes,
use_categoricals,
output_type,
dates_as_object,
):
if use_dataset_factory:
dataset_uuid = dataset.uuid
store_factory = store_session_factory
ds_factory = None
else:
dataset_uuid = None
store_factory = None
ds_factory = dataset_factory
_perform_read_test(
dataset_uuid=dataset_uuid,
store_factory=store_factory,
ds_factory=ds_factory,
execute_read_callable=bound_load_dataframes,
use_categoricals=use_categoricals,
output_type=output_type,
dates_as_object=dates_as_object,
)
[docs]
def test_read_dataset_as_dataframes_columns_projection(
store_factory, bound_load_dataframes, metadata_version
):
def _f(b_c):
b, c = b_c
df = pd.DataFrame({"a": [1, 1], "b": [b, b], "c": c, "d": [b, b + 1]})
return df
in_partitions = [_f([1, 100])]
dataset_uuid = "partitioned_uuid"
store_dataframes_as_dataset(
dfs=in_partitions,
store=store_factory,
dataset_uuid=dataset_uuid,
metadata_version=metadata_version,
partition_on=["a", "b"],
)
result = bound_load_dataframes(
dataset_uuid=dataset_uuid,
store=store_factory,
columns=["a", "b", "c"],
)
probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
expected_df = pd.DataFrame({"a": [1, 1], "b": [1, 1], "c": [100, 100]})
pdt.assert_frame_equal(expected_df, result_df, check_like=True)
[docs]
def test_read_dataset_as_dataframes_columns_primary_index_only(
store_factory, bound_load_dataframes, metadata_version
):
def _f(b_c):
b, c = b_c
df = pd.DataFrame({"a": [1, 1], "b": [b, b], "c": c, "d": [b, b + 1]})
return df
in_partitions = [_f([1, 100])]
dataset_uuid = "partitioned_uuid"
store_dataframes_as_dataset(
dfs=in_partitions,
store=store_factory,
dataset_uuid=dataset_uuid,
metadata_version=metadata_version,
partition_on=["a", "b"],
)
result = bound_load_dataframes(
dataset_uuid=dataset_uuid, store=store_factory, columns=["a", "b"]
)
probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
expected_df = pd.DataFrame({"a": [1, 1], "b": [1, 1]})
pdt.assert_frame_equal(expected_df, result_df, check_like=True)
[docs]
def test_empty_predicate_pushdown_empty_col_projection(
dataset, store_session_factory, bound_load_dataframes, backend_identifier
):
result = bound_load_dataframes(
dataset_uuid=dataset.uuid,
store=store_session_factory,
columns=[],
predicates=[[("P", "==", 12345678)]], # this product doesn't exist
)
if backend_identifier.startswith("dask"):
pytest.xfail("Output of dask for empty results is currently inconsistent")
probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
res = pd.concat(result_dfs).reset_index(drop=True)
assert len(res) == 0
# Ignore the different types of indices pandas generates after `reset_index`.
pdt.assert_frame_equal(
res,
pd.DataFrame(
index=res.index,
columns=pd.Index([], dtype=str if pandas_infer_string() else "object"),
),
)
[docs]
@pytest.mark.parametrize("partition_on", [["a", "b"], ["c"], ["a", "b", "c"]])
@pytest.mark.parametrize("datetype", [datetime.datetime, datetime.date])
@pytest.mark.parametrize("comp", ["==", ">="])
def test_datetime_predicate_with_dates_as_object(
dataset,
store_factory,
bound_load_dataframes,
metadata_version,
output_type,
partition_on,
datetype,
comp,
):
def _f(b_c):
b, c = b_c
df = pd.DataFrame({"a": [1, 1], "b": [b, b], "c": c, "d": [b, b + 1]})
# Account for pandas 2.1 change in behaviour in which datetime.datetime
# columns have their units set to [us] rather than [ns].
return (
df.astype({"c": "datetime64[ns]"})
if df["c"].dtype == "datetime64[us]"
else df
)
in_partitions = [_f([1, datetype(2000, 1, 1)])]
dataset_uuid = "partitioned_uuid"
store_dataframes_as_dataset(
dfs=in_partitions,
store=store_factory,
dataset_uuid=dataset_uuid,
metadata_version=metadata_version,
partition_on=partition_on,
)
result = bound_load_dataframes(
dataset_uuid="partitioned_uuid",
store=store_factory,
predicates=[[("c", comp, datetype(2000, 1, 1))]],
dates_as_object=True,
)
if output_type != "dataframe":
return
assert len(result) == 1
df_actual = result[0]
df_expected = in_partitions[0]
pdt.assert_frame_equal(df_actual, df_expected, check_like=True)
[docs]
def test_string_type_roundtrip(store_factory, bound_load_dataframes):
# Note: we're not actually roundtripping the string type since the loading
# type depends on the pandas version. Keeping the loading type aligned with
# what is typically initialized by pandas by default is likely the best
# option
df = pd.DataFrame(
{"str": pd.Series(["a", "b"], dtype="str" if PANDAS_3 else "string")}
)
store_dataframes_as_dataset(
dfs=[df], store=store_factory, dataset_uuid="dataset_uuid"
)
result = bound_load_dataframes(dataset_uuid="dataset_uuid", store=store_factory)
probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
pdt.assert_frame_equal(df, result_df)
[docs]
def test_non_default_table_name_roundtrip(store_factory, bound_load_dataframes):
df = pd.DataFrame({"A": [1]})
store_dataframes_as_dataset(
dfs=[df], store=store_factory, dataset_uuid="dataset_uuid", table_name="foo"
)
result = bound_load_dataframes(dataset_uuid="dataset_uuid", store=store_factory)
probe = result[0]
if isinstance(probe, MetaPartition):
result_dfs = [mp.data for mp in result]
else:
result_dfs = result
result_df = pd.concat(result_dfs).reset_index(drop=True)
pdt.assert_frame_equal(df, result_df)