Source code for Acquire.Client._par


import datetime as _datetime
import json as _json
import os as _os

__all__ = ["PAR", "BucketReader", "BucketWriter", "ObjectReader",
           "ObjectWriter", "ComputeRunner"]


[docs]class PAR: """This class holds the result of a pre-authenticated request (a PAR - also called a pre-signed request). This holds a pre-authenticated URL to access either; (1) A individual object in an object store (read or write) (2) An entire bucket in the object store (write only) (3) A calculation to be performed on the compute service (start or stop) The PAR is created encrypted, so can only be used by the person or service that has access to the decryption key """ def __init__(self, url=None, key=None, encrypt_key=None, expires_datetime=None, is_readable=False, is_writeable=False, is_executable=False, driver_details=None): """Construct a PAR result by passing in the URL at which the object can be accessed, the UTC datetime when this expires, whether this is readable, writeable or executable, and the encryption key to use to encrypt the PAR. If this is an object store PAR, then optionally you can pass in the key for the object in the object store that this provides access to. If this is not supplied, then an entire bucket is accessed). If 'is_readable', then read-access has been granted, while if 'is_writeable' then write access has been granted. If 'is_executable' then this is a calculation PAR that triggers a calculation. Otherwise no access is possible. driver_details is provided by the machinery that creates the PAR, and supplies extra details that are used by the driver to create, register and manage PARs... You should not do anything with driver_details yourself """ service_url = None if url is None: is_readable = True self._uid = None else: from Acquire.Crypto import PublicKey as _PublicKey from Acquire.Crypto import PrivateKey as _PrivateKey if isinstance(encrypt_key, _PrivateKey): encrypt_key = encrypt_key.public_key() if not isinstance(encrypt_key, _PublicKey): raise TypeError( "You must supply a valid PublicKey to encrypt a PAR") url = encrypt_key.encrypt(url) from Acquire.ObjectStore import create_uuid as _create_uuid self._uid = _create_uuid() try: from Acquire.Service import get_this_service \ as _get_this_service service_url = _get_this_service().canonical_url() except: pass self._url = url self._key = key self._expires_datetime = expires_datetime self._service_url = service_url if driver_details is not None: if not isinstance(driver_details, dict): raise TypeError("The driver details must be a dictionary") self._driver_details = driver_details if is_readable: self._is_readable = True else: self._is_readable = False if is_writeable: self._is_writeable = True else: self._is_writeable = False if is_executable: self._is_executable = True else: self._is_executable = False if self._is_executable: self._is_readable = False self._is_writeable = False elif not (self._is_readable or self._is_writeable): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You cannot create a PAR that has no read or write " "or execute permissions!") else: self._is_executable = False def __str__(self): if self.seconds_remaining() < 1: return "PAR( expired )" elif self._is_executable: return "PAR( calculation, seconds_remaining=%s )" % \ (self.seconds_remaining(buffer=0)) if self._key is None: return "PAR( bucket=True, seconds_remaining=%s )" % \ (self.seconds_remaining(buffer=0)) else: return "PAR( key=%s, seconds_remaining=%s )" % \ (self.key(), self.seconds_remaining(buffer=0)) def _set_private_key(self, privkey): """Call this function to set the private key for this PAR. This is the private key that is used to decrypt the PAR, and is provided here if you want to use the PAR without having to always supply the key (by definition, you are the only person who has the key) """ from Acquire.Crypto import PrivateKey as _PrivateKey if not isinstance(privkey, _PrivateKey): raise TypeError("The private key must be type PrivateKey") self._privkey = privkey def _get_privkey(self, decrypt_key=None): """Return the private key used to decrypt the PAR, passing in the user-supplied key if needed """ try: if self._privkey is not None: return self._privkey except: pass if decrypt_key is None: raise PermissionError( "You must supply a private key to decrypt this PAR") from Acquire.Crypto import PrivateKey as _PrivateKey if not isinstance(decrypt_key, _PrivateKey): raise TypeError("The supplied private key must be type PrivateKey") return decrypt_key
[docs] def is_null(self): """Return whether or not this is null""" return self._uid is None
[docs] @staticmethod def checksum(data): """Return the checksum of the passed data. This is used either for validating data, and is also used to create a checksum of the URL so that the user can demonstrate that they can decrypt this PAR """ from hashlib import md5 as _md5 md5 = _md5() if isinstance(data, str): data = data.encode("utf-8") md5.update(data) return md5.hexdigest()
[docs] def url(self, decrypt_key=None): """Return the URL at which the bucket/object can be accessed. This will raise a PARTimeoutError if the url has less than 30 seconds of validity left. Note that you must pass in the key used to decrypt the PAR""" if self.seconds_remaining(buffer=30) <= 0: from Acquire.Client import PARTimeoutError raise PARTimeoutError( "The URL behind this PAR has expired and is no longer valid") return self._get_privkey(decrypt_key).decrypt(self._url)
[docs] def service_url(self): """Return the URL of the service that created this PAR""" if self.is_null(): return None else: return self._service_url
[docs] def service(self): """Return the service that created this PAR""" service_url = self.service_url() if service_url is not None: from Acquire.Service import get_trusted_service \ as _get_trusted_service print(service_url) return _get_trusted_service(service_url=service_url) else: return None
[docs] def uid(self): """Return the UID of this PAR""" return self._uid
[docs] def driver_details(self): """Return the driver details for this PAR. This is used only on the service that created the PAR, and returns an empty dictionary if the details are not available """ if self._driver_details is None: return {} else: return self._driver_details
[docs] def driver(self): """Return the driver behind this PAR - this is only available on the service """ try: return self.driver_details()["driver"] except: return None
[docs] def fingerprint(self): """Return a fingerprint for this PAR that can be used in authorisations """ return self._uid
[docs] def is_readable(self): """Return whether or not this PAR gives read access""" return self._is_readable
[docs] def is_writeable(self): """Return whether or not this PAR gives write access""" return self._is_writeable
[docs] def is_executable(self): """Return whether or not this is an executable job""" return self._is_executable
[docs] def key(self): """Return the key for the object this accesses - this is None if the PAR grants access to the entire bucket""" return self._key
[docs] def is_bucket(self): """Return whether or not this PAR is for an entire bucket""" return (self._key is None) and not (self.is_executable())
[docs] def is_calculation(self): """Return whether or not this PAR is for a calculation""" return self._is_executable
[docs] def is_object(self): """Return whether or not this PAR is for a single object""" return self._key is not None
[docs] def expires_when(self): """Return when this PAR expires (or expired)""" if self.is_null(): return None else: return self._expires_datetime
[docs] def seconds_remaining(self, buffer=30): """Return the number of seconds remaining before this PAR expires. This will return 0 if the PAR has already expired. To be safe, you should renew PARs if the number of seconds remaining is less than 60. This will subtract 'buffer' seconds from the actual validity to provide a buffer against race conditions (function says this is valid when it is not) """ from Acquire.ObjectStore import get_datetime_now as _get_datetime_now now = _get_datetime_now() buffer = float(buffer) if buffer < 0: buffer = 0 delta = (self._expires_datetime - now).total_seconds() - buffer if delta < 0: return 0 else: return delta
[docs] def read(self, decrypt_key=None): """Return an object that can be used to read data from this PAR""" if not self.is_readable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You do not have permission to read from this PAR: %s" % self) if self.is_bucket(): return BucketReader(self, self._get_privkey(decrypt_key)) else: return ObjectReader(self, self._get_privkey(decrypt_key))
[docs] def write(self, decrypt_key=None): """Return an object that can be used to write data to this PAR""" if not self.is_writeable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You do not have permission to write to this PAR: %s" % self) if self.is_bucket(): return BucketWriter(self, self._get_privkey(decrypt_key)) else: return ObjectWriter(self, self._get_privkey(decrypt_key))
[docs] def execute(self, decrypt_key=None): """Return an object that can be used to control execution of this PAR """ if not self.is_executable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You do not have permission to execute this PAR: %s" % self) return ComputeRunner(self, self._get_privkey(decrypt_key))
[docs] def close(self, decrypt_key=None): """Close this PAR - this closes and deletes the PAR. You must pass in the decryption key so that you can validate that you have permission to read (and thus close) this PAR """ if self.is_null(): return service = self.service() if service is not None: # we confirm we have permission to close this PAR by sending # a checksum of the url (which only we would know) url = self.url(decrypt_key=decrypt_key) args = {"par_uid": self._uid, "url_checksum": PAR.checksum(url)} service.call_function(function="close_par", args=args) # now that the PAR is closed, set it into a null state import copy as _copy par = PAR() self.__dict__ = _copy.copy(par.__dict__)
[docs] def to_data(self, passphrase=None): """Return a json-serialisable dictionary that contains all data for this object """ data = {} if self._url is None: return data from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string data["url"] = _bytes_to_string(self._url) data["uid"] = self._uid data["key"] = self._key data["expires_datetime"] = _datetime_to_string(self._expires_datetime) data["is_readable"] = self._is_readable data["is_writeable"] = self._is_writeable data["is_executable"] = self._is_executable try: if self._service_url is not None: data["service_url"] = self._service_url except: pass try: privkey = self._privkey except: privkey = None if privkey is not None: if passphrase is not None: data["privkey"] = privkey.to_data(passphrase) # note that we don't save the driver details as these # are stored separately on the service return data
[docs] @staticmethod def from_data(data, passphrase=None): """Return a PAR constructed from the passed json-deserliased dictionary """ if data is None or len(data) == 0: return PAR() from Acquire.ObjectStore import string_to_datetime \ as _string_to_datetime from Acquire.ObjectStore import string_to_bytes \ as _string_to_bytes par = PAR() par._url = _string_to_bytes(data["url"]) par._key = data["key"] par._uid = data["uid"] if par._key is not None: par._key = str(par._key) par._expires_datetime = _string_to_datetime(data["expires_datetime"]) par._is_readable = data["is_readable"] par._is_writeable = data["is_writeable"] par._is_executable = data["is_executable"] if "service_url" in data: par._service_url = data["service_url"] if "privkey" in data: if passphrase is not None: from Acquire.Crypto import PrivateKey as _PrivateKey par._privkey = _PrivateKey.from_data(data["privkey"], passphrase) # note that we don't load the driver details as this # is stored and loaded separately on the service return par
def _url_to_filepath(url): """Internal function used to strip the "file://" from the beginning of a file url """ return url[7:] def _read_local(url): """Internal function used to read data from the local testing object store """ with open("%s._data" % _url_to_filepath(url), "rb") as FILE: return FILE.read() def _read_remote(url): """Internal function used to read data from a remote URL""" status_code = None response = None try: from Acquire.Stubs import requests as _requests response = _requests.get(url) status_code = response.status_code except Exception as e: from Acquire.Client import PARReadError raise PARReadError( "Cannot read the remote PAR URL '%s' because of a possible " "nework issue: %s" % (url, str(e))) output = response.content if status_code != 200: from Acquire.Client import PARReadError raise PARReadError( "Failed to read data from the PAR URL. HTTP status code = %s, " "returned output: %s" % (status_code, output)) return output def _list_local(url): """Internal function to list all of the objects keys below 'url'""" local_dir = _url_to_filepath(url) keys = [] for dirpath, _, filenames in _os.walk(local_dir): local_path = dirpath[len(local_dir):] has_local_path = (len(local_path) > 0) for filename in filenames: if filename.endswith("._data"): filename = filename[0:-6] if has_local_path: keys.append("%s/%s" % (local_path, filename)) else: keys.append(filename) return keys def _list_remote(url): """Internal function to list all of the objects keys below 'url'""" return [] def _write_local(url, data): """Internal function used to write data to a local file""" filename = "%s._data" % _url_to_filepath(url) try: with open(filename, 'wb') as FILE: FILE.write(data) FILE.flush() except: dir = "/".join(filename.split("/")[0:-1]) _os.makedirs(dir, exist_ok=True) with open(filename, 'wb') as FILE: FILE.write(data) FILE.flush() def _write_remote(url, data): """Internal function used to write data to the passed remote URL""" try: from Acquire.Stubs import requests as _requests response = _requests.put(url, data=data) status_code = response.status_code except Exception as e: from Acquire.Client import PARWriteError raise PARWriteError( "Cannot write data to the remote PAR URL '%s' because of a " "possible nework issue: %s" % (url, str(e))) if status_code != 200: from Acquire.Client import PARWriteError raise PARWriteError( "Cannot write data to the remote PAR URL '%s' because of a " "possible nework issue: %s" % (url, str(response.content))) def _join_bucket_and_prefix(url, prefix): """Join together the passed url and prefix, returning the url directory and the remainig part which is the start of the file name """ if prefix is None: return url parts = prefix.split("/") return ("%s/%s" % (url, "/".join(parts[0:-2])), parts[-1])
[docs]class BucketReader: """This class provides functions to enable reading data from a bucket via a PAR """ def __init__(self, par=None, decrypt_key=None): if par: if not isinstance(par, PAR): raise TypeError( "You can only create a BucketReader from a PAR") elif not par.is_bucket(): raise ValueError( "You can only create a BucketReader from a PAR that " "represents an entire bucket: %s" % par) elif not par.is_readable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You cannot create a BucketReader from a PAR without " "read permissions: %s" % par) self._par = par self._url = par.url(decrypt_key) else: self._par = None
[docs] def get_object(self, key): """Return the binary data contained in the key 'key' in the passed bucket""" if self._par is None: from Acquire.Client import PARError raise PARError("You cannot read data from an empty PAR") while key.startswith("/"): key = key[1:] url = self._url if url.endswith("/"): url = "%s%s" % (url, key) else: url = "%s/%s" % (url, key) if url.startswith("file://"): return _read_local(url) else: return _read_remote(url)
[docs] def get_object_as_file(self, key, filename): """Get the object contained in the key 'key' in the passed 'bucket' and writing this to the file called 'filename'""" objdata = self.get_object(key) with open(filename, "wb") as FILE: FILE.write(objdata)
[docs] def get_string_object(self, key): """Return the string in 'bucket' associated with 'key'""" data = self.get_object(key) try: return data.decode("utf-8") except Exception as e: raise TypeError( "The object behind this PAR cannot be converted to a string. " "Error is: %s" % str(e))
[docs] def get_object_from_json(self, key): """Return an object constructed from json stored at 'key' in the passed bucket. This raises an exception if there is no data or the PAR has expired """ data = self.get_string_object(key) return _json.loads(data)
[docs] def get_all_object_names(self, prefix=None): """Returns the names of all objects in the passed bucket""" (url, part) = _join_bucket_and_prefix(self._url, prefix) if url.startswith("file://"): objnames = _list_local(url) else: objnames = _list_remote(url) # scan the object names returned and discard the ones that don't # match the prefix matches = [] if len(part) > 0: for objname in objnames: if objname.startswith(part): objname = objname[len(part):] while objname.startswith("/"): objname = objname[1:] matches.append(objname) else: matches = objnames return matches
[docs] def get_all_objects(self, prefix=None): """Return all of the objects in the passed bucket""" names = self.get_all_object_names(prefix) objects = {} if prefix: for name in names: objects[name] = self.get_object( "%s/%s" % (prefix, name)) else: for name in names: objects[name] = self.get_object(name) return objects
[docs] def get_all_strings(self, prefix=None): """Return all of the strings in the passed bucket""" objects = self.get_all_objects(prefix) names = list(objects.keys()) for name in names: try: s = objects[name].decode("utf-8") objects[name] = s except: del objects[name] return objects
[docs]class BucketWriter: """This class provides functions to enable writing data to a bucket via a PAR """ def __init__(self, par=None, decrypt_key=None): if par: if not isinstance(par, PAR): raise TypeError( "You can only create a BucketReader from a PAR") elif not par.is_bucket(): raise ValueError( "You can only create a BucketReader from a PAR that " "represents an entire bucket: %s" % par) elif not par.is_writeable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You cannot create a BucketWriter from a PAR without " "write permissions: %s" % par) self._par = par self._url = par.url(decrypt_key) else: self._par = None
[docs] def set_object(self, key, data): """Set the value of 'key' in 'bucket' to binary 'data'""" if self._par is None: from Acquire.Client import PARError raise PARError("You cannot write data to an empty PAR") while key.startswith("/"): key = key[1:] url = self._url if url.endswith("/"): url = "%s%s" % (url, key) else: url = "%s/%s" % (url, key) if url.startswith("file://"): return _write_local(url, data) else: return _write_remote(url, data)
[docs] def set_object_from_file(self, key, filename): """Set the value of 'key' in 'bucket' to equal the contents of the file located by 'filename'""" with open(filename, "rb") as FILE: data = FILE.read() self.set_object(key, data)
[docs] def set_string_object(self, key, string_data): """Set the value of 'key' in 'bucket' to the string 'string_data'""" self.set_object(key, string_data.encode("utf-8"))
[docs] def set_object_from_json(self, key, data): """Set the value of 'key' in 'bucket' to equal to contents of 'data', which has been encoded to json""" self.set_string_object(key, _json.dumps(data))
[docs]class ObjectReader: """This class provides functions for reading an object via a PAR""" def __init__(self, par=None, decrypt_key=None): if par: if not isinstance(par, PAR): raise TypeError( "You can only create an ObjectReader from a PAR") elif par.is_bucket(): raise ValueError( "You can only create an ObjectReader from a PAR that " "represents an object: %s" % par) elif not par.is_readable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You cannot create an ObjectReader from a PAR without " "read permissions: %s" % par) self._par = par self._url = par.url(decrypt_key) else: self._par = None
[docs] def get_object(self): """Return the binary data contained in this object""" if self._par is None: from Acquire.Client import PARError raise PARError("You cannot read data from an empty PAR") url = self._url if url.startswith("file://"): return _read_local(url) else: return _read_remote(url)
[docs] def get_object_as_file(self, filename): """Get the object contained in this PAR and write this to the file called 'filename'""" objdata = self.get_object() with open(filename, "wb") as FILE: FILE.write(objdata)
[docs] def get_string_object(self): """Return the object behind this PAR as a string (raises exception if it is not a string)'""" data = self.get_object() try: return data.decode("utf-8") except Exception as e: raise TypeError( "The object behind this PAR cannot be converted to a string. " "Error is: %s" % str(e))
[docs] def get_object_from_json(self): """Return an object constructed from json stored at behind this PAR. This raises an exception if there is no data or the PAR has expired """ data = self.get_string_object() return _json.loads(data)
[docs]class ObjectWriter(ObjectReader): """This is an extension of ObjectReader that also allows writing to the object via the PAR """ def __init__(self, par=None, decrypt_key=None): if par: if not isinstance(par, PAR): raise TypeError( "You can only create an ObjectReader from a PAR") elif par.is_bucket(): raise ValueError( "You can only create an ObjectReader from a PAR that " "represents an object: %s" % par) elif not par.is_writeable(): from Acquire.Client import PARPermissionsError raise PARPermissionsError( "You cannot create an ObjectWriter from a PAR without " "write permissions: %s" % par) self._par = par self._url = par.url(decrypt_key) else: self._par = None
[docs] def set_object(self, data): """Set the value of the object behind this PAR to the binary 'data'""" if self._par is None: from Acquire.Client import PARError raise PARError("You cannot write data to an empty PAR") url = self._url if url.startswith("file://"): return _write_local(url, data) else: return _write_remote(url, data)
[docs] def set_object_from_file(self, filename): """Set the value of the object behind this PAR to equal the contents of the file located by 'filename'""" with open(filename, "rb") as FILE: data = FILE.read() self.set_object(data)
[docs] def set_string_object(self, string_data): """Set the value of the object behind this PAR to the string 'string_data' """ self.set_object(string_data.encode("utf-8"))
[docs] def set_object_from_json(self, data): """Set the value of the object behind this PAR to equal to contents of 'data', which has been encoded to json""" self.set_string_object(_json.dumps(data))
[docs]class ComputeRunner: """This class provides functions for executing a calculation pre-authorised by the passed PAR """ def __init__(self, par=None, decrypt_key=None): if par: if not isinstance(par, PAR): raise TypeError( "You can only create a ComputeRunner from a PAR") elif not par.is_executable(): raise ValueError( "You can only create a ComputeRunner from a PAR that " "represents an executable calculation: %s" % par) self._par = par self._url = par.url(decrypt_key) else: self._par = None