Source code for Acquire.Access._filewriterequest


from uuid import uuid4 as _uuid4
import copy as _copy
import os as _os

from Acquire.Access import Request as _Request

__all__ = ["FileWriteRequest"]


def _get_key(root_key, filename):
    """Return the object store key for 'filename', using 'root_key' as
       the root

        Args:
            root_key (str): root_key for accessing object store
            filename (str): filename to access in object store
        Returns:
            str: Location to access the file
        
    """
    basename = _os.path.basename(filename)

    if root_key:
        return "%s/%s" % (str(root_key), basename)
    else:
        return basename


def _clean_key(root_key, filekey):
    """Return the cleaned key 'filekey', using 'root_key' as the root
        
      Args:
            root_key (str): root_key for accessing object store
            filekey (str): filename to access in object store
        Returns:
            str: Location to access the file
        
    """
    if root_key:
        return "%s/%s" % (str(root_key), str(filekey))
    else:
        return str(filekey)


def _get_filesize_and_checksum(filename):
    """Calculates the size in bytes of the file and
        the MD5 hash
       
       Args:
            filename (str): name of file to calculate hash for
        Returns:
            tuple (int, str): size of file in bytes, md5 hash of file
    """
    from hashlib import md5 as _md5
    md5 = _md5()
    size = 0

    with open(filename, "rb") as f:
        for chunk in iter(lambda: f.read(4096), b""):
            md5.update(chunk)
            size += len(chunk)

    return (size, str(md5.hexdigest()))


def _list_all_files(directory, ignore_hidden=True):
    """Create a list of the path relative to 'directory' of
       all files contained in 'directory'.

       Args:
            directory (str): path of directory to traverse
            ignore_hidden (bool): Default=True, if True then include all
            hidden files, else ignore hidden files

        Returns:
            list: a list of all files in the directory

    """

    if not _os.path.isdir(directory):
        return []

    all_files = []

    for (root, _, filenames) in _os.walk(directory):
        root = root[len(directory)+1:]

        if ignore_hidden and root.startswith("."):
            continue

        for filename in filenames:
            if not (filename.startswith(".") and ignore_hidden):
                all_files.append(_os.path.join(root, filename))

    return all_files


def _clean_path(path):
    """This function cleans the passed path so that it doesn't contain
       redundant slashes or '..' etc., so that all backslashes are forwards
       slashes, and that the trailing slash is removed

       Args:
            path (str): the path string to clean
        Returns:
            str: the path string cleaned of extra characters

    """

    if path is None:
        return ""

    path = _os.path.normpath(path)

    # remove all ".." and "." from this path
    if path.find(".") != -1:
        parts = path.split("/")
        for i, part in enumerate(parts):
            if part == "." or part == "..":
                parts[i] = ""

        path = _os.path.normpath("/".join(parts))

    return path


def _expand_source_destination(source, destination=None,
                               root=None, ignore_hidden=True):
    """This function expands the 'source' and 'destination' into a pair
       of lists - the source files and the destination keys in the
       object store.

       Args:
            source (str): source directory to traverse
            destination (str, optional): destination directory
            root (str): root key to use in object store
            ignore_hidden (bool): If True ignore hidden files in folders, else
            include hidden files

        Returns:
            tuple (list, list): lists of files in the source and destination
            directories
        
    """

    if source is None:
        return ([], [])

    if not isinstance(source, list):
        source = [str(source)]

    is_destination_dir = False
    try:
        is_destination_dir = destination.endswith("/")
    except:
        pass

    destination = _os.path.normpath(_os.path.join(_clean_path(root),
                                                  _clean_path(destination)))

    # expand all of the sources into the full paths of files (which must exist)
    source_filenames = []
    rel_filenames = []

    from glob import glob as _glob

    for s in source:
        for f in _glob(str(s)):
            abspath = _os.path.abspath(f)
            if not _os.path.exists(abspath):
                raise FileExistsError("The file '%s' does not exist!"
                                      % abspath)

            if _os.path.isdir(abspath):
                dirfiles = _list_all_files(abspath, ignore_hidden)

                for dirfile in dirfiles:
                    source_filenames.append(_os.path.join(abspath, dirfile))
                    rel_filenames.append(dirfile)
            else:
                source_filenames.append(abspath)
                rel_filenames.append(_os.path.basename(abspath))

    destination_keys = []

    if len(rel_filenames) == 1:
        if is_destination_dir:
            destination_keys.append(_os.path.join(destination,
                                                  rel_filenames[0]))
        else:
            destination_keys.append(destination)
    else:
        for rel_filename in rel_filenames:
            destination_keys.append(_os.path.join(destination, rel_filename))

    if len(source_filenames) != len(destination_keys):
        raise ValueError("Something went wrong as the number of source "
                         "filenames should be equal to the number of keys")

    return (source_filenames, destination_keys)


[docs]class FileWriteRequest(_Request): """This class holds a request to write a file (or files) to a remote object store """ def __init__(self, source=None, destination=None, root=None, ignore_hidden=True, account=None, testing_key=None): """Construct the request to write the files specified in 'source' to 'destination' in the cloud object store. You can optionally pass a 'root' for all of the keys in the object store, and, if the source is a directory, you can ignore hidden files using 'ignore_hidden=True'. You must pass the 'account' from which payment will be taken to write files to the object store. Args: source (str): source directory to traverse destination (str, optional): destination directory root (str): root key to use in object store ignore_hidden (bool): If True ignore hidden files in folders, else include hidden files account (Account): instance of Account class testing_key (str): passed to enable testing of class Returns: None """ super().__init__() (filenames, keys) = FileWriteRequest.expand_source_and_destination( source, destination, root, ignore_hidden) self._destination_keys = keys self._source_filenames = filenames self._file_sizes = [] self._checksums = [] if len(self._source_filenames) == 0: self._uid = None return for filename in self._source_filenames: (size, checksum) = _get_filesize_and_checksum(filename) self._file_sizes.append(size) self._checksums.append(checksum) self._is_testing = False if testing_key: self._is_testing = True self._testing_key = testing_key elif account is not None: from Acquire.Client import Account as _Account if not isinstance(account, _Account): raise TypeError("The passed account must be type Account") if not account.is_logged_in(): raise PermissionError( "You can only authorise payment from the account if you " "have logged in") else: raise ValueError("You must pass a valid account to write files!") self._uid = str(_uuid4()) from Acquire.Identity import Authorisation as _Authorisation if self._is_testing: self._account_uid = "something" self._accounting_service_url = "somewhere" self._authorisation = _Authorisation( resource=self.resource_key(), testing_key=self._testing_key) else: self._account_uid = account.uid() self._accounting_service_url = account.accounting_service() \ .canonical_url() self._authorisation = _Authorisation( resource=self.resource_key(), user=account.owner())
[docs] def is_null(self): """Return whether or not this is a null request Returns: bool: True if UID set, else False """ return self._uid is None
def __str__(self): if self.is_null(): return "FileWriteRequest::null" else: return "FileWriteRequest(uid=%s)" % self._uid def __eq__(self, other): if isinstance(other, self.__class__): return self._uid == other._uid else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] def resource_key(self): """Function to return a string that can be used as a summary key for this resource request Returns: None or str : None if instance is null, else string containing uid and checksum """ if self.is_null(): return None else: return "%s %s" % (self._uid, ":".join(self._checksums))
[docs] def uid(self): """Get the UID of this request Returns: str: UID of this request """ return self._uid
[docs] def authorisation(self): """Return the authorisation behind this request Returns: Authorisation: the authorisation behind this request """ return self._authorisation
[docs] def source_filenames(self): """Return the filenames of the files to be copied. Note that this information is only available in the copy of the object that created the request - it is not saved when this object is serialised to json as we don't want to leak potentially sensitive data to the object store Returns: list: list of files to be copied """ return self._source_filenames
[docs] def destination_keys(self): """Return the object store keys to which the files will be written Returns: list: object store keys to which the files will be written """ return _copy.copy(self._destination_keys)
[docs] def filesizes(self): """Return the sizes of the files that are requested to be written Returns: list: size of the files to be written """ return _copy.copy(self._file_sizes)
[docs] def checksums(self): """Return the checksums of the files that are requested to be written Returns: list: checksums of the files to be written """ return _copy.copy(self._checksums)
[docs] def account_uid(self): """Return the UID of the account from which payment should be taken for the file storage Returns: str or None: returns the UID of the account if valid else None """ try: return self._account_uid except: return None
[docs] @staticmethod def expand_source_and_destination(source, destination, root=None, ignore_hidden=True): """This function expands the passed source and destination objects into the list of absolute paths of local source files, and the full keys of those files once they are uploaded to the object store. This returns a pair of lists - the lists match the absolute path of the local file to the desired full key of the file in the object store Args: source (list): list of source objects destination (list): list of destinations for objects root (str, optional, default=None): root key for objects ignore_hidden (bool, optional, default=True): ignore hidden files """ return _expand_source_destination(source, destination, root, ignore_hidden)
[docs] def accounting_service_url(self): """Return the canonical URL of the service holding the account Returns: str or None: If _accounting_service_url valid return return it else return None """ try: return self._accounting_service_url except: return None
[docs] def to_data(self): """Return this request as a json-serialisable dictionary Returns: dict: a JSON-serialisable dictionary of this request """ if self.is_null(): return {} data = super().to_data() data["uid"] = self._uid data["destination_keys"] = self._destination_keys data["file_sizes"] = self._file_sizes data["checksums"] = self._checksums data["authorisation"] = self._authorisation.to_data() data["account_uid"] = self._account_uid data["accounting_service_url"] = self._accounting_service_url # don't write self._source_filenames as this contains # potentially sensitive local data that is not needed # by the cloud object store return data
[docs] @staticmethod def from_data(data): """ Create a request from a JSON object Args: data (str) : an object serialised as a JSON object Returns: FileWriteRequest: a FileWriteRequest object created from the JSON data """ if (data and len(data) > 0): from Acquire.Identity import Authorisation as _Authorisation f = FileWriteRequest() f._uid = data["uid"] f._destination_keys = data["destination_keys"] f._file_sizes = data["file_sizes"] f._checksums = data["checksums"] f._authorisation = _Authorisation.from_data(data["authorisation"]) f._account_uid = data["account_uid"] f._accounting_service_url = data["accounting_service_url"] f._from_data(data) return f else: return None