Source code for plateau.utils.store

"""Workarounds for limitations of the minimalkv API."""

import logging
import time
from urllib.parse import quote

from minimalkv.contrib import VALID_KEY_RE_EXTENDED

try:
    # azure-storage-blob < 12
    from azure.common import (
        AzureMissingResourceHttpError as _AzureMissingResourceHttpError,
    )
    from azure.storage.blob import BlockBlobService as _BlockBlobService
except ImportError:

    class _BlockBlobService:  # type: ignore
        """Dummy class."""

    class _AzureMissingResourceHttpError:  # type: ignore
        """Dummy class."""


try:
    # azure-storage-blob >= 12
    from azure.core.exceptions import ResourceNotFoundError as _ResourceNotFoundError
    from azure.storage.blob import ContainerClient as _ContainerClient
except ImportError:

    class _ContainerClient:  # type: ignore
        """Dummy class."""

    class _ResourceNotFoundError:  # type: ignore
        """Dummy class."""


__all__ = ("copy_keys",)


_logger = logging.getLogger(__name__)


# Specialized implementation for azure-storage-blob < 12, using BlockBlobService (`bbs`):


def _has_azure_bbs(store):
    try:
        # store decorators will forward getattr calls
        return isinstance(store.block_blob_service, _BlockBlobService)
    except AttributeError:
        return False


def _azure_bbs_content_md5(block_blob_service, container, key, accept_missing=False):
    try:
        return block_blob_service.get_blob_properties(
            container, key
        ).properties.content_settings.content_md5
    except _AzureMissingResourceHttpError:
        if accept_missing:
            return None
        else:
            raise KeyError(key) from None


def _copy_azure_bbs(keys, src_store, tgt_store):
    src_container = src_store.container
    tgt_container = tgt_store.container
    src_bbs = src_store.block_blob_service
    tgt_bbs = tgt_store.block_blob_service

    cprops = {}
    for k in keys:
        source_md5 = _azure_bbs_content_md5(
            src_bbs, src_container, k, accept_missing=False
        )

        if source_md5 is None:
            _logger.debug(f"Missing hash for {k}")
        else:
            tgt_md5 = _azure_bbs_content_md5(
                tgt_bbs, tgt_container, k, accept_missing=True
            )

            if source_md5 == tgt_md5:
                _logger.debug(f"Omitting copy to {k} (checksum match)")
                continue

        copy_source = src_bbs.make_blob_url(
            src_container, quote(k), sas_token=src_bbs.sas_token
        )
        cprops[k] = tgt_bbs.copy_blob(tgt_container, k, copy_source)

    for k, cprop in cprops.items():
        while True:
            blob = tgt_bbs.get_blob_properties(tgt_container, k)
            cprop_current = blob.properties.copy
            assert cprop.id == cprop_current.id, f"Concurrent copy to {k}"
            if cprop_current.status == "pending":
                _logger.debug(f"Waiting for pending copy to {k}...")
                time.sleep(0.1)
                continue
            elif cprop_current.status == "success":
                _logger.debug(f"Copy to {k} completed")
                break  # break from while, continue in for-loop
            else:
                raise RuntimeError(
                    f"Error while copying: status is {cprop_current.status}: {cprop_current.status_description}"
                )


# Specialized implementation for azure-storage-blob >= 12, using ContainerClient (`cc`):
def _has_azure_cc(store):
    try:
        # store decorators will forward getattr calls
        return isinstance(store.blob_container_client, _ContainerClient)
    except AttributeError:
        return False


def _azure_cc_content_md5(cc, key, accept_missing=False):
    try:
        bc = cc.get_blob_client(key)
        return bc.get_blob_properties().content_settings.content_md5
    except _ResourceNotFoundError:
        if accept_missing:
            return None
        else:
            raise KeyError(key) from None


def _copy_azure_cc(keys, src_store, tgt_store):
    src_cc = src_store.blob_container_client
    tgt_cc = tgt_store.blob_container_client

    copy_ids = {}
    for k in keys:
        source_md5 = _azure_cc_content_md5(src_cc, k, accept_missing=False)

        if source_md5 is None:
            _logger.debug(f"Missing hash for {k}")
        else:
            tgt_md5 = _azure_cc_content_md5(tgt_cc, k, accept_missing=True)

            if source_md5 == tgt_md5:
                _logger.debug(f"Omitting copy to {k} (checksum match)")
                continue

        copy_source = src_cc.get_blob_client(k).url
        copy_ids[k] = tgt_cc.get_blob_client(k).start_copy_from_url(copy_source)[
            "copy_id"
        ]

    for k, copy_id in copy_ids.items():
        while True:
            cprop_current = tgt_cc.get_blob_client(k).get_blob_properties().copy
            assert copy_id == cprop_current.id, f"Concurrent copy to {k}"
            if cprop_current.status == "pending":
                _logger.debug(f"Waiting for pending copy to {k}...")
                time.sleep(0.1)
                continue
            elif cprop_current.status == "success":
                _logger.debug(f"Copy to {k} completed")
                break  # break from while, continue in for-loop
            else:
                raise RuntimeError(
                    f"Error while copying: status is {cprop_current.status}: {cprop_current.status_description}"
                )


def _copy_naive(keys, src_store, tgt_store):
    for k in keys:
        tgt_store.put(k, src_store.get(k))


[docs] def copy_keys(keys, src_store, tgt_store): """Copy keys from one store the another. Parameters ---------- keys: Iterable[str] Keys to copy. src_store: Union[minimalkv.KeyValueStore, Callable[[], minimalkv.KeyValueStore]] Source KV store. tgt_store: Union[minimalkv.KeyValueStore, Callable[[], minimalkv.KeyValueStore]] Target KV store. """ if callable(src_store): src_store = src_store() if callable(tgt_store): tgt_store = tgt_store() keys = sorted(keys) for k in keys: if (k is None) or (not VALID_KEY_RE_EXTENDED.match(k)) or (k == "/"): raise ValueError(f"Illegal key: {k}") if _has_azure_bbs(src_store) and _has_azure_bbs(tgt_store): _logger.debug( "Azure stores based on BlockBlobStorage class detected, use fast-path." ) _copy_azure_bbs(keys, src_store, tgt_store) elif _has_azure_cc(src_store) and _has_azure_cc(tgt_store): _logger.debug( "Azure stores based on ContainerClient class detected, use fast-path." ) _copy_azure_cc(keys, src_store, tgt_store) else: _logger.debug("Use naive slow-path.") _copy_naive(keys, src_store, tgt_store)