Source code for plateau.io_components.gc

from typing import cast

from plateau.core.factory import _ensure_factory
from plateau.core.index import ExplicitSecondaryIndex
from plateau.core.naming import TABLE_METADATA_FILE


[docs] def dispatch_files_to_gc(dataset_uuid, store_factory, chunk_size, factory): ds_factory = _ensure_factory( dataset_uuid=dataset_uuid, store=store_factory, factory=factory, ) dataset_uuid = dataset_uuid or ds_factory.uuid index_path = f"{dataset_uuid}/indices/" remove_index_files = set(ds_factory.store.iter_keys(prefix=index_path)) for index in ds_factory.indices.values(): index_keys = set() # We only add the indices that are saved as explicit indices if index.index_storage_key: # type: ignore index_keys.add(cast(ExplicitSecondaryIndex, index).index_storage_key) remove_index_files -= index_keys remove_table_files = set() if ds_factory.explicit_partitions: table_files = set() for partition in ds_factory.partitions.values(): for name in partition.files.values(): table_files.add(name) for table in ds_factory.tables: table_path = f"{dataset_uuid}/{table}/" table_files.add(table_path + TABLE_METADATA_FILE) for key in ds_factory.store.iter_keys(prefix=table_path): remove_table_files.add(key) remove_table_files -= table_files files_to_remove = list(remove_index_files | remove_table_files) if chunk_size is None: yield files_to_remove else: for i in range(0, len(files_to_remove), chunk_size): yield files_to_remove[i : i + chunk_size]
[docs] def delete_files(files, store_factory): store = store_factory() for f in files: store.delete(f)