Source code for Acquire.Client._drive


__all__ = ["Drive"]


def _get_storage_url():
    """Function to discover and return the default storage url"""
    return "http://fn.acquire-aaai.com:8080/t/storage"


def _get_storage_service(storage_url=None):
    """Function to return the storage service for the system"""
    if storage_url is None:
        storage_url = _get_storage_url()

    from Acquire.Client import Service as _Service
    service = _Service(storage_url)

    if not service.is_storage_service():
        from Acquire.Client import LoginError
        raise LoginError(
            "You can only use a valid storage service to get CloudDrive info! "
            "The service at '%s' is a '%s'" %
            (storage_url, service.service_type()))

    if service.service_url() != storage_url:
        service.update_service_url(storage_url)

    return service


def _create_drive(user, name, drivemeta, storage_service):
    """Internal function used to create a Drive"""
    drive = Drive()
    drive._name = drivemeta.name()
    drive._drive_uid = drivemeta.uid()
    drive._container = drivemeta.container_uids()
    drive._acl = drivemeta.acl()
    drive._aclrules = drivemeta.aclrules()
    drive._user = user
    drive._storage_service = storage_service
    return drive


def _get_drive(user, name=None, storage_service=None, storage_url=None,
               autocreate=True):
    """Return the drive called 'name' of the passed user. Note that the
       user must be authenticated to call this function. The name
       will default to 'main' if it is not set, and the drive will
       be created automatically is 'autocreate' is True and the
       drive does not exist
    """
    if storage_service is None:
        storage_service = _get_storage_service(storage_url)
    else:
        if not storage_service.is_storage_service():
            raise TypeError("You can only query drives using "
                            "a valid storage service")

    if name is None:
        name = "main"
    else:
        name = str(name)

    if autocreate:
        autocreate = True
    else:
        autocreate = False

    from Acquire.Client import Authorisation as _Authorisation
    authorisation = _Authorisation(resource="UserDrives", user=user)

    args = {"authorisation": authorisation.to_data(),
            "name": name, "autocreate": autocreate}

    response = storage_service.call_function(function="open_drive", args=args)

    from Acquire.Client import DriveMeta as _DriveMeta

    return _create_drive(user=user, name=name, storage_service=storage_service,
                         drivemeta=_DriveMeta.from_data(response["drive"]))


[docs]class Drive: """This class provides a handle to a user's drive (space to hold files and folders). A drive is associated with a single storage service and can be shared amongst several users. Each drive has a unique UID, with users assiging their own shorthand names. """ def __init__(self, user=None, name=None, storage_service=None, storage_url=None, autocreate=True): """Construct a handle to the drive that the passed user calls 'name' on the passed storage service. If 'autocreate' is True and the user is logged in then this will automatically create the drive if it doesn't exist already """ if user is not None: drive = _get_drive(user=user, name=name, storage_service=storage_service, storage_url=storage_url, autocreate=autocreate) from copy import copy as _copy self.__dict__ = _copy(drive.__dict__) else: self._drive_uid = None def __str__(self): if self.is_null(): return "Drive::null" else: return "Drive(user='%s', name='%s')" % \ (self._user.username(), self.name())
[docs] def is_null(self): """Return whether or not this drive is null""" return self._drive_uid is None
[docs] def acl(self): """Return the access control list for the user on this drive. If the ACL is not known, then None is returned """ try: return self._acl except: return None
[docs] def aclrules(self): """Return the ACL rules used to grant access to this drive. This is only visible to owners of this drive. If it is not visible, then None is returned """ try: return self._aclrules except: return None
[docs] def name(self): """Return the name given to this drive by the user""" return self._name
[docs] def uid(self): """Return the UID of this drive""" return self._drive_uid
[docs] def guid(self): """Return the global UID of this drive (combination of the UID of the storage service and UID of the drive) """ if self.is_null(): return None else: return "%s@%s" % (self.storage_service().uid(), self.uid())
[docs] def storage_service(self): """Return the storage service for this drive""" if self.is_null(): return None else: return self._storage_service
[docs] def upload(self, filename, uploaded_name=None, aclrules=None): """Upload the file at 'filename' to this drive, assuming we have write access to this drive. The local file 'filename' will be uploaded to the drive as the file called 'filename' (just the filename - not the full path). If you want to specify the uploaded name then set this as "uploaded_name" (which again will just be a filename - no paths). If a file with this name exists, then this will upload a new version (assuming you have permission). Otherwise this will create a new file. You can set the ACL rules used to grant access to this file via 'aclrule'. If this is not set, then the rules will be derived from either the last version of the file, or inherited from the drive. """ if self.is_null(): raise PermissionError("Cannot upload a file to a null drive!") if uploaded_name is None: uploaded_name = filename from Acquire.Client import Authorisation as _Authorisation from Acquire.Client import FileHandle as _FileHandle from Acquire.Client import PAR as _PAR from Acquire.Client import FileMeta as _FileMeta filehandle = _FileHandle(filename=filename, remote_filename=uploaded_name, drive_uid=self.uid(), user_guid=self._user.guid(), aclrules=aclrules) authorisation = _Authorisation( resource="upload %s" % filehandle.fingerprint(), user=self._user) args = {"filehandle": filehandle.to_data(), "authorisation": authorisation.to_data()} if not filehandle.is_localdata(): # we will need to upload against a PAR, so need to tell # the service how to encrypt the PAR... privkey = self._user.session_key() args["encryption_key"] = privkey.public_key().to_data() # will eventually need to authorise payment... response = self.storage_service().call_function( function="upload", args=args) filemeta = _FileMeta.from_data(response["filemeta"]) # if this was a large file, then we will receive a PAR back # which must be used to upload the file if not filehandle.is_localdata(): par = _PAR.from_data(response["upload_par"]) par.write(privkey).set_object_from_file( filehandle.local_filename()) authorisation = _Authorisation( resource="uploaded %s" % par.uid(), user=self._user) args = {"drive_uid": self.uid(), "authorisation": authorisation.to_data(), "par_uid": par.uid()} self.storage_service().call_function( function="uploaded", args=args) return filemeta
[docs] def download(self, filename, downloaded_name=None, version=None, dir=None): """Download the file called 'filename' from this drive into the local directory, or 'dir' if specified, ideally called 'filename' (or 'downloaded_name' if that is specified). If a local file exists with this name, then a new, unique filename will be used. This returns a dictionary mapping the downloaded filename to the FileMeta of the file Note that this only downloads files for which you have read-access. If the file is not readable then an exception is raised and nothing is returned If 'version' is specified then download a specific version of the file. Otherwise download the latest version """ if self.is_null(): raise PermissionError("Cannot download from a null drive!") if downloaded_name is None: downloaded_name = filename from Acquire.Client import create_new_file as \ _create_new_file downloaded_name = _create_new_file(filename=downloaded_name, dir=dir) from Acquire.Client import Authorisation as _Authorisation from Acquire.Client import FileMeta as _FileMeta authorisation = _Authorisation( resource="download %s %s" % (self.uid(), filename), user=self._user) privkey = self._user.session_key() args = {"drive_uid": self.uid(), "filename": filename, "authorisation": authorisation.to_data(), "encryption_key": privkey.public_key().to_data()} if version is not None: from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string args["version"] = _datetime_to_string(version) response = self.storage_service().call_function( function="download", args=args) filemeta = _FileMeta.from_data(response["filemeta"]) if "filedata" in response: # we have already downloaded the file to 'filedata' filedata = response["filedata"] from Acquire.ObjectStore import string_to_bytes \ as _string_to_bytes filedata = _string_to_bytes(response["filedata"]) del response["filedata"] # validate that the size and checksum are correct filemeta.assert_correct_data(filedata) if filemeta.is_compressed(): # uncompress the data from Acquire.Client import uncompress as _uncompress filedata = _uncompress( inputdata=filedata, compression_type=filemeta.compression_type()) # write the data to the specified local file... with open(downloaded_name, "wb") as FILE: FILE.write(filedata) FILE.flush() else: from Acquire.Client import PAR as _PAR par = _PAR.from_data(response["upload_par"]) par.read(privkey).get_object_as_file(downloaded_name) # validate that the size and checksum are correct filemeta.assert_correct_data(filename=downloaded_name) # uncompress the file if desired if filemeta.is_compressed(): from Acquire.Client import uncompress as _uncompress _uncompress(inputfile=downloaded_name, outputfile=downloaded_name, compression_type=filemeta.compression_type()) # everything is ok, so we don't need the PAR any more authorisation = _Authorisation( resource="downloaded %s" % par.uid(), user=self._user) args = {"drive_uid": self.uid(), "authorisation": authorisation.to_data(), "par_uid": par.uid()} self.storage_service().call_function( function="downloaded", args=args) filemeta._set_drive(self) return (downloaded_name, filemeta)
@staticmethod def _list_drives(user, drive_uid=None, storage_service=None, storage_url=None): """Return a list of all of the DriveMetas of the drives accessible at the top-level by the passed user on the passed storage service """ if storage_service is None: storage_service = _get_storage_service(storage_url) else: if not storage_service.is_storage_service(): raise TypeError("You can only query drives using " "a valid storage service") from Acquire.Client import Authorisation as _Authorisation authorisation = _Authorisation(resource="UserDrives", user=user) args = {"authorisation": authorisation.to_data()} if drive_uid is not None: args["drive_uid"] = str(drive_uid) response = storage_service.call_function( function="list_drives", args=args) from Acquire.ObjectStore import string_to_list as _string_to_list from Acquire.Client import DriveMeta as _DriveMeta return _string_to_list(response["drives"], _DriveMeta)
[docs] @staticmethod def list_toplevel_drives(user, storage_service=None, storage_url=None): """Return a list of all of the DriveMetas of the drives accessible at the top-level by the passed user on the passed storage service """ return Drive._list_drives(user=user, storage_service=storage_service, storage_url=storage_url)
[docs] def list_drives(self): """Return a list of the DriveMetas of all of the drives contained in this drive that are accessible to the user """ if self.is_null(): return [] else: return Drive._list_drives(user=self._user, drive_uid=self._drive_uid, storage_service=self._storage_service)
[docs] def list_files(self, include_metadata=False): """Return a list of the FileMetas of all of the files contained in this drive """ if self.is_null(): return [] from Acquire.Client import Authorisation as _Authorisation from Acquire.ObjectStore import string_to_list as _string_to_list from Acquire.Storage import FileMeta as _FileMeta if include_metadata: include_metadata = True else: include_metadata = False authorisation = _Authorisation(resource="list_files", user=self._user) args = {"authorisation": authorisation.to_data(), "drive_uid": self._drive_uid, "include_metadata": include_metadata} response = self.storage_service().call_function(function="list_files", args=args) files = _string_to_list(response["files"], _FileMeta) for f in files: f._set_drive(self) return files
[docs] def list_versions(self, filename, include_metadata=False): """Return a list of all of the versions of the specified file. This returns an empty list if there are no versions of this file """ if self.is_null(): return [] if include_metadata: include_metadata = True else: include_metadata = False from Acquire.Client import Authorisation as _Authorisation authorisation = _Authorisation(resource="list_versions %s" % filename, user=self._user) args = {"authorisation": authorisation.to_data(), "drive_uid": self._drive_uid, "include_metadata": include_metadata, "filename": filename} response = self.storage_service().call_function( function="list_versions", args=args) from Acquire.ObjectStore import string_to_list \ as _string_to_list from Acquire.Storage import FileMeta as _FileMeta versions = _string_to_list(response["versions"], _FileMeta) for version in versions: version._set_drive(self) return versions