__all__ = ["Drive"]
def _get_storage_url():
"""Function to discover and return the default storage url"""
return "http://fn.acquire-aaai.com:8080/t/storage"
def _get_storage_service(storage_url=None):
"""Function to return the storage service for the system
Args:
storage_url (str, default=None): Storage URL to use
Returns:
Service: Service object
"""
if storage_url is None:
storage_url = _get_storage_url()
from Acquire.Client import Service as _Service
service = _Service(storage_url, service_type="storage")
if not service.is_storage_service():
from Acquire.Client import LoginError
raise LoginError(
"You can only use a valid storage service to get CloudDrive info! "
"The service at '%s' is a '%s'" %
(storage_url, service.service_type()))
return service
def _create_drive(user, name, drivemeta, storage_service):
"""Internal function used to create a Drive
Args:
user (User): User for drive
name (str): Name for drive
drivemeta (DriveMeta): Object containing
metadata for drive
storage_service (Service): Service for drive
Returns:
Drive: Drive object
"""
drive = Drive()
drive._name = drivemeta.name()
drive._drive_uid = drivemeta.uid()
drive._container = drivemeta.container_uids()
drive._acl = drivemeta.acl()
drive._aclrules = drivemeta.aclrules()
drive._user = user
drive._storage_service = storage_service
return drive
def _get_drive(user, name=None, storage_service=None, storage_url=None,
autocreate=True):
"""Return the drive called 'name' of the passed user. Note that the
user must be authenticated to call this function. The name
will default to 'main' if it is not set, and the drive will
be created automatically is 'autocreate' is True and the
drive does not exist
Args:
user (User): User to use drive
name (str, default=None): Name for drive
storage_service (Service, default=None): Service object to use
storage_url (str, default=None): URL for storage
autocreate (bool): If True create drive automatically,
if False do not
Returns:
Drive: Drive object
"""
if storage_service is None:
storage_service = _get_storage_service(storage_url)
else:
if not storage_service.is_storage_service():
raise TypeError("You can only query drives using "
"a valid storage service")
if name is None:
name = "main"
else:
name = str(name)
if autocreate:
autocreate = True
else:
autocreate = False
from Acquire.Client import Authorisation as _Authorisation
authorisation = _Authorisation(resource="UserDrives", user=user)
args = {"authorisation": authorisation.to_data(),
"name": name, "autocreate": autocreate}
response = storage_service.call_function(function="open_drive", args=args)
from Acquire.Client import DriveMeta as _DriveMeta
return _create_drive(user=user, name=name, storage_service=storage_service,
drivemeta=_DriveMeta.from_data(response["drive"]))
[docs]class Drive:
"""This class provides a handle to a user's drive (space
to hold files and folders). A drive is associated with
a single storage service and can be shared amongst several
users. Each drive has a unique UID, with users assiging
their own shorthand names.
"""
def __init__(self, user=None, name=None, storage_service=None,
storage_url=None, autocreate=True):
"""Construct a handle to the drive that the passed user
calls 'name' on the passed storage service. If
'autocreate' is True and the user is logged in then
this will automatically create the drive if
it doesn't exist already
"""
if user is not None:
drive = _get_drive(user=user, name=name,
storage_service=storage_service,
storage_url=storage_url, autocreate=autocreate)
from copy import copy as _copy
self.__dict__ = _copy(drive.__dict__)
else:
self._drive_uid = None
def __str__(self):
if self.is_null():
return "Drive::null"
else:
return "Drive(user='%s', name='%s')" % \
(self._user.username(), self.name())
[docs] def is_null(self):
"""Return whether or not this drive is null
Returns:
bool: True if null, else False
"""
return self._drive_uid is None
[docs] def acl(self):
"""Return the access control list for the user on this drive.
If the ACL is not known, then None is returned
Returns:
str: Access Control List for the user
"""
try:
return self._acl
except:
return None
[docs] def aclrules(self):
"""Return the ACL rules used to grant access to this drive. This
is only visible to owners of this drive. If it is not visible,
then None is returned
Returns:
str: ACL rules for the drive
"""
try:
return self._aclrules
except:
return None
[docs] def name(self):
"""Return the name given to this drive by the user
Returns:
str: Name of drive
"""
return self._name
[docs] def uid(self):
"""Return the UID of this drive
Returns:
str: UID of drive
"""
return self._drive_uid
[docs] def guid(self):
"""Return the global UID of this drive (combination of the
UID of the storage service and UID of the drive)
Returns:
str: Global UID for drive
"""
if self.is_null():
return None
else:
return "%s@%s" % (self.storage_service().uid(), self.uid())
[docs] def storage_service(self):
"""Return the storage service for this drive
Returns:
Service: Storage service for drive
"""
if self.is_null():
return None
else:
return self._storage_service
[docs] def bulk_upload(self, max_size=None, aclrules=None):
"""Start the bulk upload of a large number of files to this
drive, assuming we have write access to this drive. This
will return a bulk upload PAR that can be used to write to a bucket.
All of the files written to this bucket will be copied into
this drive using the (optionally) supplied aclrules
to control access (or "inherit" if no rules are supplied).
Once you have finished uploading, you must call the
"close" function on the BulkUploadPAR so that the files
are correctly copied. The filenames as they are written
to the PAR will be used, creating new files (and subdrives)
as needed
Args:
max_size (int, default=None): Max size for upload
aclrules (str, default=None): ACL rules for upload
Returns:
str: Name of drive
"""
if self.is_null():
raise PermissionError(
"Cannot perform a bulk upload of files to a null drive")
from Acquire.Client import Authorisation as _Authorisation
from Acquire.Client import PAR as _PAR
from Acquire.Crypto import PrivateKey as _PrivateKey
authorisation = _Authorisation(
resource="bulk_upload %s %s" %
(self._drive_uid, max_size), user=self._user)
# we need a new private key to secure access to this PAR
privkey = _PrivateKey()
args = {"drive_uid": self._drive_uid,
"authorisation": authorisation.to_data(),
"encrypt_key": privkey.public_key().to_data()}
if aclrules is not None:
args["aclrules"] = aclrules.to_data()
if max_size is not None:
args["max_size"] = max_size
# will eventually need to authorise payment...
response = self.storage_service().call_function(
function="bulk_upload", args=args)
par = _PAR.from_data(response["bulk_upload_par"])
par._set_private_key(privkey)
return par
[docs] def upload(self, filename, uploaded_name=None, aclrules=None,
force_par=False):
"""Upload the file at 'filename' to this drive, assuming we have
write access to this drive. The local file 'filename' will be
uploaded to the drive as the file called 'filename' (just the
filename - not the full path). If you want to specify the
uploaded name then set this as "uploaded_name" (which again will
just be a filename - no paths). If a file with this name exists,
then this will upload a new version (assuming you have permission).
Otherwise this will create a new file. You can set the
ACL rules used to grant access to this file via 'aclrule'.
If this is not set, then the rules will be derived from either
the last version of the file, or inherited from the drive.
Args:
filename (str): Name of file to upload
uploaded_name (str, default=None): Name of file once uploaded
aclrules (str, default=None): ACL rules for file
force_par (bool, default=False): If True force a pre-authenticated
request be created for the upload
Returns:
FileMeta: Object containing metadata on the uploaded file
"""
if self.is_null():
raise PermissionError("Cannot upload a file to a null drive!")
if uploaded_name is None:
uploaded_name = filename
from Acquire.Client import Authorisation as _Authorisation
from Acquire.Client import FileHandle as _FileHandle
from Acquire.Client import PAR as _PAR
from Acquire.Client import FileMeta as _FileMeta
local_cutoff = None
if force_par:
# only upload using a PAR
local_cutoff = 0
filehandle = _FileHandle(filename=filename,
remote_filename=uploaded_name,
drive_uid=self.uid(),
aclrules=aclrules,
local_cutoff=local_cutoff)
try:
authorisation = _Authorisation(
resource="upload %s" % filehandle.fingerprint(),
user=self._user)
args = {"filehandle": filehandle.to_data(),
"authorisation": authorisation.to_data()}
if not filehandle.is_localdata():
# we will need to upload against a PAR, so need to tell
# the service how to encrypt the PAR...
privkey = self._user.session_key()
args["encryption_key"] = privkey.public_key().to_data()
# will eventually need to authorise payment...
response = self.storage_service().call_function(
function="upload", args=args)
filemeta = _FileMeta.from_data(response["filemeta"])
# if this was a large file, then we will receive a PAR back
# which must be used to upload the file
if not filehandle.is_localdata():
par = _PAR.from_data(response["upload_par"])
par.write(privkey).set_object_from_file(
filehandle.local_filename())
par.close(privkey)
return filemeta
except:
# ensure that any temporary files are removed
filehandle.__del__()
raise
[docs] def download(self, filename, downloaded_name=None, version=None,
dir=None, force_par=False):
"""Download the file called 'filename' from this drive into
the local directory, or 'dir' if specified,
ideally called 'filename'
(or 'downloaded_name' if that is specified). If a local
file exists with this name, then a new, unique filename
will be used. This returns a dictionary mapping the
downloaded filename to the FileMeta of the file
Note that this only downloads files for which you
have read-access. If the file is not readable then
an exception is raised and nothing is returned
If 'version' is specified then download a specific version
of the file. Otherwise download the latest version
Args:
filename (str): Name of file to download
downloaded_name (str, default=None): Name of file once downloaded
version (datetime, default=None): Datetime denoting version
of file to use
dir (str, default=None): Directory for file
force_par (bool, default=False): If True force a pre-authenticated
request be created for the download
Returns:
FileMeta: Object containing metadata on the uploaded file
"""
if self.is_null():
raise PermissionError("Cannot download from a null drive!")
if downloaded_name is None:
downloaded_name = filename
from Acquire.Client import create_new_file as \
_create_new_file
downloaded_name = _create_new_file(filename=downloaded_name,
dir=dir)
from Acquire.Client import Authorisation as _Authorisation
from Acquire.Client import FileMeta as _FileMeta
authorisation = _Authorisation(
resource="download %s %s" % (self.uid(),
filename),
user=self._user)
privkey = self._user.session_key()
args = {"drive_uid": self.uid(),
"filename": filename,
"authorisation": authorisation.to_data(),
"encryption_key": privkey.public_key().to_data()}
if force_par:
args["force_par"] = True
if version is not None:
from Acquire.ObjectStore import datetime_to_string \
as _datetime_to_string
args["version"] = _datetime_to_string(version)
response = self.storage_service().call_function(
function="download", args=args)
filemeta = _FileMeta.from_data(response["filemeta"])
if "filedata" in response:
# we have already downloaded the file to 'filedata'
filedata = response["filedata"]
from Acquire.ObjectStore import string_to_bytes \
as _string_to_bytes
filedata = _string_to_bytes(response["filedata"])
del response["filedata"]
# validate that the size and checksum are correct
filemeta.assert_correct_data(filedata)
if filemeta.is_compressed():
# uncompress the data
from Acquire.Client import uncompress as _uncompress
filedata = _uncompress(
inputdata=filedata,
compression_type=filemeta.compression_type())
# write the data to the specified local file...
with open(downloaded_name, "wb") as FILE:
FILE.write(filedata)
FILE.flush()
else:
from Acquire.Client import PAR as _PAR
par = _PAR.from_data(response["download_par"])
par.read(privkey).get_object_as_file(downloaded_name)
par.close(privkey)
# validate that the size and checksum are correct
filemeta.assert_correct_data(filename=downloaded_name)
# uncompress the file if desired
if filemeta.is_compressed():
from Acquire.Client import uncompress as _uncompress
_uncompress(inputfile=downloaded_name,
outputfile=downloaded_name,
compression_type=filemeta.compression_type())
filemeta._set_drive(self)
return (downloaded_name, filemeta)
@staticmethod
def _list_drives(user, drive_uid=None,
storage_service=None, storage_url=None):
"""Return a list of all of the DriveMetas of the drives accessible
at the top-level by the passed user on the passed storage
service
Args:
user (User): Name of file to download
drive_uid (str, default=None): UID of drive
storage_service (Service): Service for drives
storage_url (str): URL for storage service
Returns:
list: List of DriveMetas for the drives
"""
if storage_service is None:
storage_service = _get_storage_service(storage_url)
else:
if not storage_service.is_storage_service():
raise TypeError("You can only query drives using "
"a valid storage service")
from Acquire.Client import Authorisation as _Authorisation
authorisation = _Authorisation(resource="UserDrives", user=user)
args = {"authorisation": authorisation.to_data()}
if drive_uid is not None:
args["drive_uid"] = str(drive_uid)
response = storage_service.call_function(
function="list_drives", args=args)
from Acquire.ObjectStore import string_to_list as _string_to_list
from Acquire.Client import DriveMeta as _DriveMeta
return _string_to_list(response["drives"], _DriveMeta)
[docs] @staticmethod
def list_toplevel_drives(user, storage_service=None, storage_url=None):
"""Return a list of all of the DriveMetas of the drives accessible
at the top-level by the passed user on the passed storage
service
Args:
user (User): User for drives
storage_service (Service, default=None): Storage service to query
storage_url (str): URL for storage service
Returns:
list: List of DriveMetas for the drives
"""
return Drive._list_drives(user=user,
storage_service=storage_service,
storage_url=storage_url)
[docs] def list_drives(self):
"""Return a list of the DriveMetas of all of the drives contained
in this drive that are accessible to the user
Returns:
list: List of DriveMetas for the drives
"""
if self.is_null():
return []
else:
return Drive._list_drives(user=self._user,
drive_uid=self._drive_uid,
storage_service=self._storage_service)
[docs] def list_files(self, include_metadata=False):
"""Return a list of the FileMetas of all of the files contained
in this drive
Args:
include_metadata (bool, default=False): If True include
metadata for the returned files
Returns:
list: List of FileMetas for files in drive
"""
if self.is_null():
return []
from Acquire.Client import Authorisation as _Authorisation
from Acquire.ObjectStore import string_to_list as _string_to_list
from Acquire.Storage import FileMeta as _FileMeta
if include_metadata:
include_metadata = True
else:
include_metadata = False
authorisation = _Authorisation(resource="list_files",
user=self._user)
args = {"authorisation": authorisation.to_data(),
"drive_uid": self._drive_uid,
"include_metadata": include_metadata}
response = self.storage_service().call_function(function="list_files",
args=args)
files = _string_to_list(response["files"], _FileMeta)
for f in files:
f._set_drive(self)
return files
[docs] def list_versions(self, filename, include_metadata=False):
"""Return a list of all of the versions of the specified file.
This returns an empty list if there are no versions of this
file
Args:
filename: Filename for listing of versions
include_metadata (bool, default=False): If True include
metadata for the returned files
Returns:
list: List of FileMetas for versions of file
"""
if self.is_null():
return []
if include_metadata:
include_metadata = True
else:
include_metadata = False
from Acquire.Client import Authorisation as _Authorisation
authorisation = _Authorisation(resource="list_versions %s" % filename,
user=self._user)
args = {"authorisation": authorisation.to_data(),
"drive_uid": self._drive_uid,
"include_metadata": include_metadata,
"filename": filename}
response = self.storage_service().call_function(
function="list_versions",
args=args)
from Acquire.ObjectStore import string_to_list \
as _string_to_list
from Acquire.Storage import FileMeta as _FileMeta
versions = _string_to_list(response["versions"], _FileMeta)
for version in versions:
version._set_drive(self)
return versions