import logging
from plateau.core import naming
from plateau.core.index import ExplicitSecondaryIndex
from plateau.io_components.metapartition import MetaPartition
from plateau.io_components.write import persist_indices
_logger = logging.getLogger(__name__)
[docs]
def update_indices_from_partitions(partition_list, dataset_metadata_factory):
"""This takes indices from a partition list and overwrites all indices in
the dataset metadata provided by the dataset metadata factory.
The same is done in the store dataset part. This is used in an
additional build index step (by the build_dataset_indices__pipeline)
which should be used after updating partitions of a dataset.
"""
dataset_indices = MetaPartition.merge_indices(partition_list)
indices = persist_indices(
store=dataset_metadata_factory.store,
dataset_uuid=dataset_metadata_factory.uuid,
indices=dataset_indices,
)
for column, storage_key in indices.items():
dataset_metadata_factory.indices[column] = ExplicitSecondaryIndex(
column=column, index_storage_key=storage_key
)
dataset_metadata_factory.store.put(
naming.metadata_key_from_uuid(dataset_metadata_factory.uuid),
dataset_metadata_factory.to_json(),
)
return dataset_metadata_factory