# pylint: disable=E1101
from datetime import date
from functools import partial
import numpy as np
import pandas as pd
import pandas.testing as pdt
import pytest
from plateau.api.dataset import read_dataset_as_ddf
from plateau.core._compat import PANDAS_3
from plateau.core.dataset import DatasetMetadata
from plateau.core.naming import DEFAULT_METADATA_VERSION
from plateau.core.testing import TIME_TO_FREEZE_ISO
from plateau.io.eager import (
read_dataset_as_metapartitions,
read_table,
store_dataframes_as_dataset,
)
from plateau.io.iter import read_dataset_as_dataframes__iterator
[docs]
def test_update_dataset_with_partitions(
store_factory, metadata_version, bound_update_dataset, mocker, store
):
partitions = [
pd.DataFrame({"p": [1]}),
pd.DataFrame({"p": [2]}),
]
dataset = bound_update_dataset(
partitions,
store=store_factory,
metadata={"dataset": "metadata"},
dataset_uuid="dataset_uuid",
default_metadata_version=metadata_version,
secondary_indices=["p"],
)
dataset = dataset.load_index("p", store)
dataset_updated = bound_update_dataset(
[pd.DataFrame({"p": [3]})],
store=store_factory,
delete_scope=[{"p": 1}],
metadata={"extra": "metadata"},
dataset_uuid="dataset_uuid",
default_metadata_version=metadata_version,
secondary_indices=["p"],
)
dataset_updated = dataset_updated.load_index("p", store)
# First check the index. if they are as expected, use them to determine label name
exp_idx_values = {1, 2}
ind = dataset.indices["p"]
ind_updated = dataset_updated.indices["p"]
assert set(ind.index_dct.keys()) == exp_idx_values
exp_updated_idx_values = {2, 3}
assert set(ind_updated.index_dct.keys()) == exp_updated_idx_values
assert ind_updated.index_dct[2] == ind.index_dct[2]
assert ind.index_dct[1] != ind_updated.index_dct[3]
expected_metadata = {"dataset": "metadata", "extra": "metadata"}
# We do not mock the time resolution of this test since otherwise we cannot ensure
# that the indices are not overwritten
expected_metadata["creation_time"] = dataset_updated.metadata["creation_time"]
assert dataset_updated.metadata == expected_metadata
assert dataset_updated.uuid == "dataset_uuid"
store_files = list(store.keys())
# 1 dataset metadata file and 2 index file and 3 partition files
# common metadata for v4 datasets (1 table)
expected_number_files = 7
assert len(store_files) == expected_number_files
# Ensure the dataset can be loaded properly
stored_dataset = DatasetMetadata.load_from_store("dataset_uuid", store)
stored_dataset = stored_dataset.load_index("p", store)
assert dataset_updated == stored_dataset
[docs]
@pytest.mark.xfail(reason="How to handle empty input??")
def test_update_dataset_with_partitions_delete_only(
store_factory, metadata_version, frozen_time_em, bound_update_dataset, store
):
partitions = [
pd.DataFrame({"p": [1]}),
pd.DataFrame({"p": [2]}),
]
dataset = store_dataframes_as_dataset(
dfs=partitions,
store=store_factory,
metadata={"dataset": "metadata"},
dataset_uuid="dataset_uuid",
secondary_indices=["p"],
metadata_version=metadata_version,
)
dataset = dataset.load_index("p", store)
# FIXME: is this a regression?
dataset_updated = bound_update_dataset(
None,
store=store_factory,
dataset_uuid="dataset_uuid",
delete_scope=[{"p": 1}],
metadata={"extra": "metadata"},
default_metadata_version=metadata_version,
secondary_indices=["p"],
)
dataset_updated = dataset_updated.load_index("p", store)
assert len(dataset.partitions) == 2
assert len(dataset_updated.partitions) == 1
store_files = list(store.keys())
# 1 dataset metadata file and 1 index file and 2 partition files
# note: the update writes a new index file but due to frozen_time this gets
# the same name as the previous one and overwrites it.
expected_number_files = 4
# common metadata for v4 datasets (1 table)
expected_number_files += 1
assert len(store_files) == expected_number_files
assert set(dataset.indices["p"].observed_values()) == {1, 2}
assert set(dataset_updated.indices["p"].observed_values()) == {2}
# Ensure the dataset can be loaded properly
stored_dataset = DatasetMetadata.load_from_store("dataset_uuid", store)
stored_dataset = stored_dataset.load_index("p", store)
assert dataset_updated == stored_dataset
[docs]
def test_update_dataset_with_partitions__reducer_partitions(
store_factory, frozen_time_em, bound_update_dataset
):
assert set(store_factory().keys()) == set()
df1 = pd.DataFrame(
{"P": [1, 2, 3, 1, 2, 3], "L": [1, 1, 1, 1, 1, 1], "TARGET": np.arange(10, 16)}
)
df2 = df1.copy(deep=True)
df2.L = 2
df2.TARGET += 2
df_list = [
df1,
df2,
]
dataset = store_dataframes_as_dataset(
dfs=df_list,
store=store_factory,
dataset_uuid="dataset_uuid",
partition_on=["P"],
secondary_indices="L",
metadata_version=4,
)
dataset_loadedidx = dataset.load_all_indices(store=store_factory())
cluster_1_label = (
dataset_loadedidx.indices["L"].eval_operator(op="==", value=1).pop()
)
cluster_1_label = cluster_1_label.split("/")[-1]
cluster_2_label = (
dataset_loadedidx.indices["L"].eval_operator(op="==", value=2).pop()
)
cluster_2_label = cluster_2_label.split("/")[-1]
df3 = df2.copy(deep=True)
df3.TARGET -= 5
dataset_updated = bound_update_dataset(
[df3],
store=store_factory,
dataset_uuid="dataset_uuid",
delete_scope=[{"L": 2}],
metadata={"extra": "metadata"},
partition_on=["P"],
secondary_indices=["L"],
)
dataset_updated_loadedidx = dataset_updated.load_all_indices(store=store_factory())
cluster_3_labels = dataset_updated_loadedidx.indices["L"].eval_operator(
op="==", value=2
)
cluster_3_label = {c3_label.split("/")[-1] for c3_label in cluster_3_labels}
assert len(cluster_3_label) == 1
cluster_3_label = cluster_3_label.pop()
exp_partitions = [
f"P=1/{cluster_1_label}",
f"P=1/{cluster_3_label}",
f"P=2/{cluster_1_label}",
f"P=2/{cluster_3_label}",
f"P=3/{cluster_1_label}",
f"P=3/{cluster_3_label}",
]
assert sorted(exp_partitions) == sorted(dataset_updated.partitions.keys())
updated_idx_keys = sorted(dataset_updated.indices.keys())
assert sorted(dataset.indices.keys()) == updated_idx_keys
expected_new_idx = {}
for k, v in dataset_loadedidx.indices["P"].index_dct.items():
val = [pl.replace(cluster_2_label, cluster_3_label) for pl in v]
expected_new_idx[k] = val
updated_P_idx_dct = dataset_updated_loadedidx.indices["P"].index_dct
assert sorted(expected_new_idx.keys()) == sorted(updated_P_idx_dct.keys())
for k, v in updated_P_idx_dct.items():
assert sorted(expected_new_idx[k]) == sorted(v)
[docs]
def test_update_dataset_with_partitions__reducer_nonexistent(
store_factory, metadata_version, frozen_time_em, bound_update_dataset, store
):
dataset_updated = bound_update_dataset(
[pd.DataFrame({"p": [3]})],
store=store_factory,
dataset_uuid="dataset_uuid",
delete_scope=[{"p": 1}],
metadata={"extra": "metadata"},
default_metadata_version=metadata_version,
secondary_indices=["p"],
)
dataset_updated = dataset_updated.load_index("p", store)
ind_updated = dataset_updated.indices["p"]
cluster_3_label = ind_updated.eval_operator(op="==", value=3).pop()
expected_metadata = {"extra": "metadata"}
expected_metadata["creation_time"] = TIME_TO_FREEZE_ISO
assert dataset_updated.metadata == expected_metadata
assert list(dataset_updated.partitions) == [cluster_3_label]
updated_part_c3 = dataset_updated.partitions[cluster_3_label]
assert updated_part_c3.label == cluster_3_label
assert dataset_updated.uuid == "dataset_uuid"
store_files = list(store.keys())
# 1 dataset metadata file and 1 index file and 1 partition files
# note: the update writes a new index file but due to frozen_time this gets
# the same name as the previous one and overwrites it.
expected_number_files = 3
# common metadata for v4 datasets (1 table)
expected_number_files += 1
assert len(store_files) == expected_number_files
exp_updated_idx = {3: [cluster_3_label]}
assert dataset_updated.indices["p"].index_dct == exp_updated_idx
# Ensure the dataset can be loaded properly
stored_dataset = DatasetMetadata.load_from_store("dataset_uuid", store)
stored_dataset = stored_dataset.load_index("p", store)
assert dataset_updated == stored_dataset
[docs]
@pytest.mark.parametrize(
"dfs,ok",
[
(
[
pd.DataFrame(
{
"P": pd.Series([1], dtype=np.int64),
"X": pd.Series([1], dtype=np.int64),
}
),
pd.DataFrame(
{
"P": pd.Series([2], dtype=np.int64),
"X": pd.Series([2], dtype=np.int64),
}
),
],
True,
),
(
[
pd.DataFrame(
{
"P": pd.Series([1], dtype=np.int64),
"X": pd.Series([1], dtype=np.int32),
}
),
pd.DataFrame(
{
"P": pd.Series([2], dtype=np.int64),
"X": pd.Series([2], dtype=np.int16),
}
),
],
True,
),
(
[
pd.DataFrame(
{
"P": pd.Series([1], dtype=np.int16),
"X": pd.Series([1], dtype=np.int64),
}
),
pd.DataFrame(
{
"P": pd.Series([2], dtype=np.int32),
"X": pd.Series([2], dtype=np.int64),
}
),
],
True,
),
(
[
pd.DataFrame(
{
"P": pd.Series([1], dtype=np.int64),
"X": pd.Series([1], dtype=np.int64),
}
),
pd.DataFrame(
{
"P": pd.Series([2], dtype=np.int64),
"X": pd.Series([2], dtype=np.uint64),
}
),
],
False,
),
(
[
pd.DataFrame(
{
"P": pd.Series([1], dtype=np.int64),
"X": pd.Series([1], dtype=np.int64),
}
),
pd.DataFrame(
{
"P": pd.Series([2], dtype=np.int64),
"X": pd.Series([2], dtype=np.int64),
"Y": pd.Series([2], dtype=np.int64),
}
),
],
False,
),
(
[
pd.DataFrame(
{
"P": pd.Series([1, 2], dtype=np.int64),
"X": pd.Series([1, 2], dtype=np.int64),
}
),
pd.DataFrame(
{
"P": pd.Series([3], dtype=np.int64),
"X": pd.Series([3], dtype=np.uint64),
}
),
],
False,
),
],
)
def test_schema_check_update(dfs, ok, store_factory, bound_update_dataset):
store_dataframes_as_dataset(
dfs=dfs[:1],
store=store_factory,
dataset_uuid="dataset_uuid",
partition_on=["P"],
metadata_version=4,
)
pipe = partial(
bound_update_dataset,
store=store_factory,
dataset_uuid="dataset_uuid",
partition_on=["P"],
)
if ok:
pipe(dfs[1:])
else:
with pytest.raises(
Exception,
match=r"Schemas\sfor\sdataset\s\\*'dataset_uuid\\*'\sare\snot\scompatible!",
):
pipe(dfs[1:])
[docs]
def test_sort_partitions_by(
store_factory, metadata_version, frozen_time_em, bound_update_dataset
):
df1 = pd.DataFrame({"P": [3], "L": [1], "TARGET": [1]})
df2 = pd.DataFrame(
{
"P": [1, 2, 3, 1, 2, 3],
"L": [1, 1, 1, 1, 1, 1],
"TARGET": list(reversed(np.arange(10, 16))),
}
)
df3 = pd.DataFrame(
{
"P": [1, 2, 3, 1, 2, 3],
"L": [1, 1, 1, 1, 1, 1],
"TARGET": [88, 1, 5, 99, 12, 11],
}
)
df_list = [df1]
new_partitions = [
df2,
df3,
]
store_dataframes_as_dataset(
dfs=df_list,
store=store_factory,
dataset_uuid="dataset_uuid",
metadata_version=metadata_version,
)
bound_update_dataset(
new_partitions,
store=store_factory,
dataset_uuid="dataset_uuid",
metadata={"extra": "metadata"},
default_metadata_version=metadata_version,
sort_partitions_by=["TARGET"],
)
# Check that the `sort_partitions_by` column is indeed sorted monotonically among partitions
for df in read_dataset_as_dataframes__iterator(
store=store_factory, dataset_uuid="dataset_uuid"
):
assert (df.TARGET == sorted(df.TARGET)).all()
[docs]
def test_update_secondary_indices_subset(store_factory, bound_update_dataset):
df1 = pd.DataFrame({"A": range(10), "indexed": 1})
dataset_uuid = "dataset_uuid"
bound_update_dataset(
df1, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="indexed"
)
df2 = pd.DataFrame({"A": range(10), "indexed": 2})
# secondary index is omitted. plateau should pick it up regardless
bound_update_dataset(df2, dataset_uuid=dataset_uuid, store=store_factory)
dm = DatasetMetadata.load_from_store(
dataset_uuid, store_factory(), load_all_indices=True
)
obs_values = dm.indices["indexed"].observed_values()
assert sorted(obs_values) == [1, 2]
with pytest.raises(ValueError, match="Incorrect indices provided"):
# secondary index is omitted. plateau should pick it up regardless
bound_update_dataset(
df2, dataset_uuid=dataset_uuid, store=store_factory, secondary_indices="A"
)
[docs]
def test_update_first_time_with_secondary_indices(store_factory, bound_update_dataset):
dataset_uuid = "dataset_uuid"
index_column = "p"
new_partition = [pd.DataFrame({index_column: [1, 2]})]
bound_update_dataset(
[new_partition],
store=store_factory,
dataset_uuid=dataset_uuid,
secondary_indices=[index_column],
)
[docs]
def test_partition_on_null(store_factory, bound_update_dataset): # gh-262
keys = ["a", "b", "c", np.nan]
values = range(len(keys))
d = dict(zip(keys, values, strict=False))
df = (
pd.DataFrame.from_dict(d, orient="index")
.reset_index()
.rename(columns={"index": "part", 0: "value"})
)
with pytest.raises(
Exception, match=r"Original dataframe size .* on a column with null values."
):
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid="a_unique_dataset_identifier",
partition_on=["part"],
)
[docs]
def test_update_infers_partition_on(store_factory, bound_update_dataset, df_not_nested):
dataset_uuid = "dataset_uuid"
dataset = bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
partition_on=df_not_nested.columns[0],
)
# update the dataset
# do not use partition_on since it should be interfered from the existing dataset
updated_dataset = bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
)
assert len(updated_dataset.partitions) == 2 * len(dataset.partitions)
[docs]
def test_update_raises_incompatible_partition_keys(
store_factory, bound_update_dataset, df_not_nested
):
dataset_uuid = "dataset_uuid"
bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
partition_on=df_not_nested.columns[0],
)
# Not allowed to use different partition_on
with pytest.raises(
ValueError, match="Incompatible set of partition keys encountered."
):
bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
partition_on=df_not_nested.columns[1],
)
[docs]
def test_update_raises_incompatible_inidces(
store_factory, bound_update_dataset, df_not_nested
):
dataset_uuid = "dataset_uuid"
bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
secondary_indices=df_not_nested.columns[0],
)
# Not allowed to update with indices which do not yet exist in dataset
with pytest.raises(ValueError, match="indices"):
bound_update_dataset(
[df_not_nested],
dataset_uuid=dataset_uuid,
store=store_factory,
secondary_indices=df_not_nested.columns[1],
)
[docs]
def test_update_of_dataset_with_non_default_table_name(
store_factory, bound_update_dataset
):
"""Tests that datasets with table names other than "table" can be created,
updated and read successfully (regression test for issue #445)."""
# Create initial dataset
dataset_uuid = "dataset_uuid"
df_create = pd.DataFrame(
{"date": [date(2021, 1, 1), date(2021, 1, 2)], "value": range(2)}
)
store_dataframes_as_dataset(
dfs=[df_create],
store=store_factory,
dataset_uuid=dataset_uuid,
table_name="non-default-name",
partition_on=["date"],
)
dm = DatasetMetadata.load_from_store(dataset_uuid, store_factory())
assert dm.table_name == "non-default-name"
# Update dataset
df_update = pd.DataFrame(
{"date": [date(2021, 1, 3), date(2021, 1, 4)], "value": range(2)}
)
bound_update_dataset(
[df_update],
store=store_factory,
dataset_uuid=dataset_uuid,
table_name="non-default-name",
partition_on=["date"],
)
dm = DatasetMetadata.load_from_store(dataset_uuid, store_factory())
assert dm.table_name == "non-default-name"
# Assert equality of dataframe
df_read = (
read_dataset_as_ddf(dataset_uuid, store_factory(), "table")
.compute()
.reset_index(drop=True)
)
df_expected = pd.concat([df_create, df_update]).reset_index(drop=True)
pd.testing.assert_frame_equal(df_read, df_expected)
def _dtype_from_storage_nan_value(storage_backend, na_value):
if PANDAS_3:
dtype = pd.StringDtype(storage=storage_backend, na_value=na_value)
else:
if storage_backend == "pyarrow" and na_value is pd.NA:
dtype = "string[pyarrow]"
elif storage_backend == "pyarrow" and na_value is np.nan:
dtype = "string[pyarrow_numpy]"
elif storage_backend == "python" and na_value is np.nan:
return None
elif storage_backend == "python" and na_value is pd.NA:
dtype = "string"
else:
raise ValueError(f"Unsupported storage backend: {storage_backend}")
return dtype
[docs]
@pytest.mark.parametrize("storage_backend", ["pyarrow", "python"])
@pytest.mark.parametrize("na_value", [np.nan, pd.NA])
def test_update_after_empty_partition_string_dtypes(
store_factory, bound_update_dataset, storage_backend, na_value, backend_identifier
):
import pandas as pd
with pd.option_context("future.infer_string", True):
other_nan_value = {np.nan, pd.NA}
other_nan_value.remove(na_value)
other_nan_value = other_nan_value.pop()
dtype = _dtype_from_storage_nan_value(storage_backend, na_value)
if dtype is None:
pytest.skip()
df = pd.DataFrame({"str": pd.Series(["a", "b", None], dtype=dtype)})
dataset_uuid = "dataset_uuid"
bound_update_dataset(
[df.iloc[0:0]], # empty partition
store=store_factory,
dataset_uuid=dataset_uuid,
)
# Schema verification should not fail
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
if na_value is pd.NA:
expected_dtype = _dtype_from_storage_nan_value("python", pd.NA)
else:
expected_dtype = _dtype_from_storage_nan_value("pyarrow", np.nan)
# We have to cast to the expected dtype since pyarrow is only reading
# the above two data types in. They are ignoring the written storage
# backend and are defaulting to python for pd.NA and to pyarrow for
# np.nan
df["str"] = df["str"].astype(expected_dtype)
pdt.assert_frame_equal(read_table(dataset_uuid, store_factory()), df)
if backend_identifier == "dask.dataframe":
# FIXME: dask.dataframe triggers the schema validation error but somehow
# the exception is not properly forwarded and the test always fails
return
for storage in ["pyarrow", "python"]:
df = pd.DataFrame(
{
"str": pd.Series(
["c", "d"],
dtype=_dtype_from_storage_nan_value(storage, other_nan_value),
)
}
)
# Should be a ValueError but dask sometimes raises a different exception
# type
with pytest.raises(ValueError, match="Schemas.*are not compatible.*"):
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
[docs]
@pytest.mark.parametrize("storage_backend", ["pyarrow", "python"])
@pytest.mark.parametrize("na_value", [np.nan, pd.NA])
def test_update_after_empty_partition_string_dtypes_categoricals(
store_factory, bound_update_dataset, storage_backend, na_value
):
import pandas as pd
with pd.option_context("future.infer_string", True):
other_nan_value = {np.nan, pd.NA}
other_nan_value.remove(na_value)
other_nan_value = other_nan_value.pop()
dtype = _dtype_from_storage_nan_value(storage_backend, na_value)
if dtype is None:
pytest.skip()
df = pd.DataFrame(
{"str": pd.Series(["a", "b", None], dtype=dtype).astype("category")}
)
dataset_uuid = "dataset_uuid"
bound_update_dataset(
[df.iloc[0:0]], # empty partition
store=store_factory,
dataset_uuid=dataset_uuid,
)
# Schema verification should not fail
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
expected_dtype = _dtype_from_storage_nan_value("pyarrow", np.nan)
# We have to cast to the expected dtype since pyarrow is only reading
# categoricals with the pyarrow_numpy data type.
df["str"] = df["str"].astype(expected_dtype)
pdt.assert_frame_equal(read_table(dataset_uuid, store_factory()), df)
for storage in ["pyarrow", "python"]:
df = pd.DataFrame(
{
"str": pd.Series(
["c", "d"],
dtype=_dtype_from_storage_nan_value(storage, other_nan_value),
).astype("category")
}
)
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
after_update = read_table(dataset_uuid, store_factory())
if not PANDAS_3:
expected_dtype = "object"
expected_after_update = pd.DataFrame(
{"str": pd.Series(["a", "b", None, "c", "d", "c", "d"], dtype=expected_dtype)}
)
pdt.assert_frame_equal(after_update, expected_after_update)
# Storage of categorical dtypes will only happen with np.nan If we try the other na_value we'll get a validation error
for storage in ["pyarrow", "python"]:
df = pd.DataFrame(
{
"str": pd.Series(
["e", "f", None],
dtype=_dtype_from_storage_nan_value(storage, pd.NA),
)
}
)
with pytest.raises(ValueError, match="Schemas.*are not compatible.*"):
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
# With np.nan works fine?
skipped = False
for storage in ["pyarrow", "python"]:
dtype = _dtype_from_storage_nan_value(storage, np.nan)
if dtype is None:
skipped = True
continue
df = pd.DataFrame(
{
"str": pd.Series(
["e", "f", None],
dtype=dtype,
)
}
)
bound_update_dataset(
[df],
store=store_factory,
dataset_uuid=dataset_uuid,
)
after_update_as_cats = read_table(
dataset_uuid, store_factory(), categoricals=["str"]
)
values = ["a", "b", None, "c", "d", "c", "d", "e", "f", None, "e", "f", None]
if skipped:
values = values[:-3]
expected = pd.DataFrame(
{
"str": pd.Series(
values,
dtype=expected_dtype,
).astype("category")
}
)
pdt.assert_frame_equal(after_update_as_cats, expected)