Source code for Acquire.Storage._driveinfo


__all__ = ["DriveInfo"]

_drive_root = "storage/drive"

_fileinfo_root = "storage/file"


def _validate_file_upload(par, file_bucket, file_key, objsize, checksum):
    """Call this function to signify that the file associated with
       this PAR has been uploaded. This will check that the
       objsize and checksum match with what was promised
    """
    from Acquire.ObjectStore import ObjectStore as _ObjectStore
    from Acquire.Service import get_service_account_bucket \
        as _get_service_account_bucket
    from Acquire.Service import get_this_service as _get_this_service

    service = _get_this_service()
    bucket = _get_service_account_bucket()

    file_bucket = _ObjectStore.get_bucket(
                        bucket=bucket, bucket_name=file_bucket,
                        compartment=service.storage_compartment(),
                        create_if_needed=True)

    # check that the file uploaded matches what was promised
    (real_objsize, real_checksum) = _ObjectStore.get_size_and_checksum(
                                                file_bucket, file_key)

    if real_objsize != objsize or real_checksum != checksum:
        # probably should delete the broken object here...

        from Acquire.Storage import FileValidationError
        raise FileValidationError(
            "The file uploaded does not match what was promised. "
            "size: %s versus %s, checksum: %s versus %s. Please try "
            "to upload the file again." %
            (real_objsize, objsize,
             real_checksum, checksum))

    # SHOULD HERE RECEIPT THE STORAGE TRANSACTION


def _validate_bulk_upload(par, bucket_uid, identifiers,
                          drive_uid, aclrules, max_size):
    """Call this internal function to signify that the bulk upload
       is complete. The files have been uploaded to the bucket with
       name 'bucket_uid', and were uploaded by the user with passed
       user_guid. The files should be placed into the drive with
       specified drive_uid, using the passed ACLRules, and they
       should not have a total size greater than 'max_size'
    """
    from Acquire.ObjectStore import ObjectStore as _ObjectStore
    from Acquire.Service import get_service_account_bucket \
        as _get_service_account_bucket
    from Acquire.Service import get_this_service as _get_this_service
    from Acquire.Storage import DriveInfo as _DriveInfo

    service = _get_this_service()
    bucket = _get_service_account_bucket()

    drive = _DriveInfo(drive_uid=drive_uid)

    tmpbucket = _ObjectStore.get_bucket(
                                bucket=bucket,
                                bucket_name=bucket_uid,
                                compartment=service.storage_compartment(),
                                create_if_needed=False)

    try:
        total_size = drive.copy_from(bucket=tmpbucket, aclrules=aclrules)
    except Exception as e:
        # delete the bucket and force the user to upload again...
        _ObjectStore.delete_bucket(bucket=tmpbucket, force=True)
        raise e

    if total_size > max_size:
        # should be cross with the user - give them time to make up
        # the difference in cost, or else we will delete this data
        pass

    # SHOULD NOW RECEIPT THE STORAGE TRANSACTION

    # the tmpbucket should now be empty, and all files transferred
    _ObjectStore.delete_bucket(bucket=tmpbucket, force=True)


[docs]class DriveInfo: """This class provides a service-side handle to the information about a particular cloud drive """ def __init__(self, drive_uid=None, identifiers=None, is_authorised=False, parent_drive_uid=None): """Construct a DriveInfo for the drive with UID 'drive_uid', and optionally the GUID of the user making the request (and whether this was authorised). If this drive has a parent then it is a sub-drive and not recorded in the list of top-level drives """ self._drive_uid = drive_uid self._parent_drive_uid = parent_drive_uid self._identifiers = identifiers self._is_authorised = is_authorised if self._drive_uid is not None: self.load() def __str__(self): if self.is_null(): return "Drive::null" else: return "Drive(%s)" % self._drive_uid
[docs] def is_null(self): return self._drive_uid is None
def _drive_key(self): """Return the key for this drive in the object store""" return "%s/%s/info" % (_drive_root, self._drive_uid)
[docs] def uid(self): """Return the UID of this drive""" return self._drive_uid
def _get_metadata_bucket(self): """Return the bucket that contains all of the metadata about the files for this drive """ from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.Service import get_this_service as _get_this_service service = _get_this_service() bucket = _get_service_account_bucket() bucket_name = "user_metadata" try: return _ObjectStore.get_bucket( bucket=bucket, bucket_name=bucket_name, compartment=service.storage_compartment(), create_if_needed=True) except Exception as e: from Acquire.ObjectStore import RequestBucketError raise RequestBucketError( "Unable to open the bucket '%s': %s" % (bucket_name, str(e))) def _get_file_bucketname(self): """Return the name of the bucket that will contain all of the files for this drive """ return "user_files" def _get_file_bucket(self): """Return the bucket that contains all of the files for this drive """ from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.Service import get_this_service as _get_this_service service = _get_this_service() bucket = _get_service_account_bucket() bucket_name = self._get_file_bucketname() try: return _ObjectStore.get_bucket( bucket=bucket, bucket_name=bucket_name, compartment=service.storage_compartment(), create_if_needed=True) except Exception as e: from Acquire.ObjectStore import RequestBucketError raise RequestBucketError( "Unable to open the bucket '%s': %s" % (bucket_name, str(e))) def _resolve_acl(self, authorisation=None, resource=None, accept_partial_match=False): """Internal function used to authorise access to this drive, returning the ACLRule of the access """ if authorisation is not None: from Acquire.Client import Authorisation as _Authorisation if not isinstance(authorisation, _Authorisation): raise TypeError( "The authorisation must be of type Authorisation") identifiers = authorisation.verify( resource=resource, accept_partial_match=accept_partial_match, return_identifiers=True) else: try: identifiers = self._identifiers except: identifiers = None try: upstream = self._upstream except: upstream = None drive_acl = self.aclrules().resolve(identifiers=identifiers, upstream=upstream, must_resolve=True, unresolved=False) return (drive_acl, identifiers)
[docs] def bulk_upload(self, authorisation, encrypt_key, max_size=None, aclrules=None): """Start the process of a bulk upload of a set of files to this Drive. This will return a Bucket-Write PAR to a temporary bucket to which the files can be uploaded. Once the PAR is closed the files will be copied to the Drive and the temporary bucket deleted. You need to provide authorisation for this action, a public key to encrypt the PAR, and optionally specify the maximum size of the data to be uploaded and the ACLs. If the maximum size is not set then you will be limited to a maximum of 100MB upload. If the ACLs are not set, then they will inherit from the Drive (or from previous versions of the file if they exist) """ from Acquire.Crypto import PublicKey as _PublicKey from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded from Acquire.Storage import ACLRules as _ACLRules if not isinstance(encrypt_key, _PublicKey): raise TypeError("The encryption key must be of type PublicKey") if aclrules is not None: if not isinstance(aclrules, _ACLRules): raise TypeError("The ACLRules must be of type ACLRules") else: aclrules = _ACLRules.inherit() if max_size is None: max_size = 100*1024*1024 # default 100 MB try: max_size = int(max_size) except: raise TypeError("max_size must be an interger!") (drive_acl, identifiers) = self._resolve_acl( authorisation=authorisation, resource="bulk_upload %s %s" % (self.uid(), max_size)) if not drive_acl.is_writeable(): raise PermissionError( "You do not have permission to write to this drive. " "Your permissions are %s" % str(drive_acl)) from Acquire.ObjectStore import create_uuid as _create_uuid from Acquire.ObjectStore import Function as _Function from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.Service import get_this_service as _get_this_service # construct a bucket for this bulk upload, given a unique name bucket_uid = _create_uuid() func = _Function(function=_validate_bulk_upload, bucket_uid=bucket_uid, identifiers=identifiers, drive_uid=self.uid(), aclrules=aclrules.to_data(), max_size=max_size) service = _get_this_service() bucket = _get_service_account_bucket() tmpbucket = _ObjectStore.create_bucket( bucket=bucket, bucket_name=bucket_uid, compartment=service.storage_compartment()) try: par = _ObjectStore.create_par(bucket=tmpbucket, encrypt_key=encrypt_key, key=None, readable=False, writeable=True, cleanup_function=func) except: _ObjectStore.delete_bucket(bucket=tmpbucket, force=True) raise return par
[docs] def upload(self, filehandle, authorisation, encrypt_key=None): """Upload the file associated with the passed filehandle. If the filehandle has the data embedded, then this uploads the file data directly and returns a FileMeta for the result. Otherwise, this returns a PAR which should be used to upload the data. The PAR will be encrypted using 'encrypt_key'. Remember to close the PAR once the file has been uploaded, so that it can be validated as correct """ from Acquire.Client import FileHandle as _FileHandle from Acquire.Storage import FileInfo as _FileInfo from Acquire.Crypto import PublicKey as _PublicKey from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded if not isinstance(filehandle, _FileHandle): raise TypeError("The fileinfo must be of type FileInfo") if encrypt_key is not None: if not isinstance(encrypt_key, _PublicKey): raise TypeError("The encryption key must be of type PublicKey") (drive_acl, identifiers) = self._resolve_acl( authorisation=authorisation, resource="upload %s" % filehandle.fingerprint()) if not drive_acl.is_writeable(): raise PermissionError( "You do not have permission to write to this drive. " "Your permissions are %s" % str(drive_acl)) # now generate a FileInfo for this FileHandle fileinfo = _FileInfo(drive_uid=self._drive_uid, filehandle=filehandle, identifiers=identifiers, upstream=drive_acl) # resolve the ACL for the file from this FileHandle filemeta = fileinfo.get_filemeta() file_acl = filemeta.acl() if not file_acl.is_writeable(): raise PermissionError( "Despite having write permission to the drive, you " "do not have write permission for the file. Your file " "permissions are %s" % str(file_acl)) file_bucket = self._get_file_bucket() file_key = fileinfo.latest_version()._file_key() filedata = None if filehandle.is_localdata(): # the filehandle already contains the file, so save it # directly filedata = filehandle.local_filedata() _ObjectStore.set_object(bucket=file_bucket, key=file_key, data=filedata) if filedata is None: # the file is too large to include in the filehandle so # we need to use a PAR to upload from Acquire.ObjectStore import Function as _Function f = _Function(function=_validate_file_upload, file_bucket=self._get_file_bucketname(), file_key=file_key, objsize=fileinfo.filesize(), checksum=fileinfo.checksum()) par = _ObjectStore.create_par(bucket=file_bucket, encrypt_key=encrypt_key, key=file_key, readable=False, writeable=True, cleanup_function=f) else: par = None # now save the fileinfo to the object store fileinfo.save() filemeta = fileinfo.get_filemeta() assert(filemeta.acl().is_owner()) # return the PAR if we need to have a second-stage of upload return (filemeta, par)
[docs] def download(self, filename, authorisation, version=None, encrypt_key=None, force_par=False): """Download the file called filename. This will return a FileHandle that describes the file. If the file is sufficiently small, then the filedata will be embedded into this handle. Otherwise a PAR will be generated and also returned to allow the file to be downloaded separately. The PAR will be encrypted with 'encrypt_key'. Remember to close the PAR once you have finished downloading the file... """ from Acquire.Client import FileHandle as _FileHandle from Acquire.Storage import FileInfo as _FileInfo from Acquire.Crypto import PublicKey as _PublicKey from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded if not isinstance(encrypt_key, _PublicKey): raise TypeError("The encryption key must be of type PublicKey") (drive_acl, identifiers) = self._resolve_acl( authorisation=authorisation, resource="download %s %s" % (self._drive_uid, filename)) # even if the drive_acl is not readable by this user, they # may have read permission for the file... # now get the FileInfo for this FileHandle fileinfo = _FileInfo.load(drive=self, filename=filename, version=version, identifiers=identifiers, upstream=drive_acl) # resolve the ACL for the file from this FileHandle filemeta = fileinfo.get_filemeta() file_acl = filemeta.acl() if not file_acl.is_readable(): raise PermissionError( "You do not have read permissions for the file. Your file " "permissions are %s" % str(file_acl)) file_bucket = self._get_file_bucket() file_key = fileinfo.latest_version()._file_key() filedata = None par = None if force_par or fileinfo.filesize() > 1048576: # the file is too large to include in the download so # we need to use a PAR to download par = _ObjectStore.create_par(bucket=file_bucket, encrypt_key=encrypt_key, key=file_key, readable=True, writeable=False) else: # one-trip download of files that are less than 1 MB filedata = _ObjectStore.get_object(file_bucket, file_key) # return the filemeta, and either the filedata or par return (filemeta, filedata, par)
[docs] def is_opened_by_owner(self): """Return whether or not this drive was opened and authorised by one of the drive owners """ try: identifiers = self._identifiers except: identifiers = None if identifiers is None or (not self._is_authorised): return False try: upstream = self._upstream except: upstream = None return self.aclrules().resolve(identifiers=identifiers, upstream=upstream).is_owner()
[docs] def aclrules(self): """Return the acl rules for this drive""" try: return self._aclrules except: from Acquire.Storage import ACLRules as _ACLRules return _ACLRules()
[docs] def set_permission(self, user_guid, aclrule): """Set the permission for the user with the passed user_guid to 'aclrule". Note that you can only do this if you are the owner and this drive was opened in an authorised way. Also note that you cannot remove your own ownership permission if this would leave the drive without any owners """ if self.is_null(): return from Acquire.Identity import ACLRules as _ACLRules aclrules = _ACLRules.create(user_guid=user_guid, rule=aclrule) # make sure we have the latest version self.load() if not self.is_opened_by_owner(): raise PermissionError( "You cannot change user permissions as you are either " "not the owner of this drive or you failed to provide " "authorisation when you opened the drive") # this will append the new rules, ensuring that the change # does not leave the drive ownerless self._aclrules.append(aclrules, ensure_owner=True) self.save() self.load()
[docs] def list_files(self, authorisation=None, include_metadata=False): """Return the list of FileMeta data for the files contained in this Drive. The passed authorisation is needed in case the list contents of this drive is not public """ (drive_acl, identifiers) = self._resolve_acl( authorisation=authorisation, resource="list_files") if not drive_acl.is_readable(): raise PermissionError( "You don't have permission to read this Drive") from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import encoded_to_string as _encoded_to_string from Acquire.Storage import FileMeta as _FileMeta metadata_bucket = self._get_metadata_bucket() names = _ObjectStore.get_all_object_names( metadata_bucket, "%s/%s" % (_fileinfo_root, self._drive_uid)) files = [] if include_metadata: # we need to load all of the metadata info for this file to # return to the user from Acquire.Storage import FileInfo as _FileInfo for name in names: data = _ObjectStore.get_object_from_json(metadata_bucket, name) fileinfo = _FileInfo.from_data(data, identifiers=identifiers, upstream=drive_acl) filemeta = fileinfo.get_filemeta() file_acl = filemeta.acl() if file_acl.is_readable() or file_acl.is_writeable(): files.append(filemeta) else: for name in names: filename = _encoded_to_string(name.split("/")[-1]) files.append(_FileMeta(filename=filename)) return files
[docs] def list_versions(self, filename, authorisation=None, include_metadata=False): """Return the list of versions of the file with specified filename. If 'include_metadata' is true then this will load full metadata for each version. This will return a sorted list of FileMeta objects. The passed authorisation is needed in case the version info is not public """ (drive_acl, identifiers) = self._resolve_acl( authorisation=authorisation, resource="list_versions %s" % filename) if not drive_acl.is_readable(): raise PermissionError( "You don't have permission to read this Drive") from Acquire.Storage import FileInfo as _FileInfo versions = _FileInfo.list_versions(drive=self, filename=filename, identifiers=identifiers, upstream=drive_acl, include_metadata=include_metadata) result = [] for version in versions: aclrules = version.aclrules() if aclrules is not None: acl = aclrules.resolve(upstream=drive_acl, identifiers=identifiers, must_resolve=True, unresolved=False) if acl.is_readable() or acl.is_writeable(): result.append(version) else: result.append(version) # return the versions sorted in upload order result.sort(key=lambda x: x.uploaded_when()) return result
[docs] def load(self): """Load the metadata about this drive from the object store""" if self.is_null(): return from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.ObjectStore import ObjectStore as _ObjectStore bucket = _get_service_account_bucket() drive_key = self._drive_key() try: data = _ObjectStore.get_object_from_json(bucket, drive_key) except: data = None if data is None: # by default this user is the drive's owner try: user_guid = self._identifiers["user_guid"] except: user_guid = None if user_guid is None: # we cannot create the drive as we don't know who # requested it raise PermissionError( "Cannot create the DriveInfo for a new drive as the " "original request did not specify the creating user") elif not self._is_authorised: # we cannot create the drive if the request was not # authorised raise PermissionError( "Cannot create the DriveInfo for a new drive as the " "original request was not authorised by the user") # create a new drive and save it... from Acquire.Identity import ACLRule as _ACLRule from Acquire.Identity import ACLRules as _ACLRules self._aclrules = _ACLRules.owner(user_guid=user_guid) data = self.to_data() data = _ObjectStore.set_ins_object_from_json(bucket, drive_key, data) from copy import copy as _copy other = DriveInfo.from_data(data) identifiers = self._identifiers is_authorised = self._is_authorised self.__dict__ = _copy(other.__dict__) self._identifiers = identifiers self._is_authorised = is_authorised
[docs] def save(self): """Save the metadata about this drive to the object store""" if self.is_null(): return from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.ObjectStore import ObjectStore as _ObjectStore bucket = _get_service_account_bucket() drive_key = self._drive_key() data = self.to_data() _ObjectStore.set_object_from_json(bucket, drive_key, data)
[docs] def to_data(self): """Return a json-serialisable dictionary for this object""" data = {} if not self.is_null(): from Acquire.ObjectStore import dict_to_string as _dict_to_string data["uid"] = self._drive_uid if self._aclrules is not None: data["aclrules"] = self._aclrules.to_data() return data
[docs] @staticmethod def from_data(data): """Return a DriveInfo constructed from the passed json-deserialised dictionary """ info = DriveInfo() if data is None or len(data) == 0: return info info._drive_uid = str(data["uid"]) if "aclrules" in data: from Acquire.Storage import ACLRules as _ACLRules info._aclrules = _ACLRules.from_data(data["aclrules"]) return info