Source code for Acquire.ObjectStore._oci_objstore


import io as _io
import datetime as _datetime
import uuid as _uuid
import json as _json
import os as _os
import copy as _copy
import uuid as _uuid

__all__ = ["OCI_ObjectStore"]


def _sanitise_bucket_name(bucket_name):
    """This function sanitises the passed bucket name. It will always
        return a valid bucket name. If "None" is passed, then a new,
        unique bucket name will be generated
        
        Args:
            bucket_name (str): Bucket name to clean
        Returns:
            str: Cleaned bucket name
        
        """

    if bucket_name is None:
        return str(_uuid.uuid4())

    return "_".join(bucket_name.split())


def _clean_key(key):
    """This function cleans and returns a key so that it is suitable
       for use both as a key and a directory/file path
       e.g. it removes double-slashes

       Args:
            key (str): Key to clean
       Returns:
            str: Cleaned key

    """
    key = _os.path.normpath(key)

    if len(key) > 1024:
        from Acquire.ObjectStore import ObjectStoreError
        raise ObjectStoreError(
            "The object store does not support keys with longer than "
            "1024 characters (%s) - %s" % (len(key), key))

        # if this becomes a problem then we will implement a 'tinyurl'
        # to shorten keys and use this function to lookup long keys

    return key

def _get_object_url_for_region(region, uri):
    """Internal function used to get the full URL to the passed PAR URI
       for the specified region. This has the format;

       https://objectstorage.{region}.oraclecloud.com/{uri}

       Args:
            region (str): Region for cloud service
            uri (str): URI for cloud service
       Returns:
            str: Full URL for use with cloud service
    """
    server = "https://objectstorage.%s.oraclecloud.com" % region

    while uri.startswith("/"):
        uri = uri[1:]

    return "%s/%s" % (server, uri)


def _get_driver_details_from_par(par):
    """Internal function used to get the OCI driver details from the
       passed PAR (pre-authenticated request)

       Args:
            par (PAR): PAR holding details
        Args:
            dict: Dictionary holding OCI driver details
    """
    from Acquire.ObjectStore import datetime_to_string \
        as _datetime_to_string

    import copy as _copy
    details = _copy.copy(par._driver_details)

    if details is None:
        return {}
    else:
        # fix any non-string/number objects
        details["created_datetime"] = _datetime_to_string(
                                        details["created_datetime"])

    return details


def _get_driver_details_from_data(data):
    """Internal function used to get the OCI driver details from the
       passed data

       Args:
            data (dict): Dict holding OCI driver details
       Returns:
            dict: Dict holding OCI driver details
    """
    from Acquire.ObjectStore import string_to_datetime \
        as _string_to_datetime

    import copy as _copy
    details = _copy.copy(data)

    if "created_datetime" in details:
        details["created_datetime"] = _string_to_datetime(
                                            details["created_datetime"])

    return details


[docs]class OCI_ObjectStore: """This is the backend that abstracts using the Oracle Cloud Infrastructure object store """
[docs] @staticmethod def create_bucket(bucket, bucket_name, compartment=None): """Create and return a new bucket in the object store called 'bucket_name', optionally placing it into the compartment identified by 'compartment'. This will raise an ObjectStoreError if this bucket already exists Args: bucket (dict): Bucket to hold data bucket_name (str): Name of bucket to create compartment (str): Compartment in which to create bucket Returns: dict: New bucket """ new_bucket = _copy.copy(bucket) new_bucket["bucket_name"] = str(bucket_name) if compartment is not None: new_bucket["compartment_id"] = str(compartment) try: from oci.object_storage.models import CreateBucketDetails as \ _CreateBucketDetails except: raise ImportError( "Cannot import OCI. Please install OCI, e.g. via " "'pip install oci' so that you can connect to the " "Oracle Cloud Infrastructure") try: request = _CreateBucketDetails() request.compartment_id = new_bucket["compartment_id"] client = new_bucket["client"] request.name = _sanitise_bucket_name(bucket_name) new_bucket["bucket"] = client.create_bucket( client.get_namespace().data, request).data except Exception as e: # couldn't create the bucket - likely because it already # exists - try to connect to the existing bucket from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to create the bucket '%s', likely because it " "already exists: %s" % (bucket_name, str(e))) return new_bucket
[docs] @staticmethod def get_bucket(bucket, bucket_name, compartment=None, create_if_needed=True): """Find and return a new bucket in the object store called 'bucket_name', optionally placing it into the compartment identified by 'compartment'. If 'create_if_needed' is True then the bucket will be created if it doesn't exist. Otherwise, if the bucket does not exist then an exception will be raised. Args: bucket (dict): Bucket to hold data bucket_name (str): Name of bucket to create compartment (str, default=None): Compartment in which to create bucket create_if_needed (bool, default=None): If True, create bucket, else do not Returns: dict: New bucket """ new_bucket = _copy.copy(bucket) new_bucket["bucket_name"] = _sanitise_bucket_name(bucket_name) if compartment is not None: new_bucket["compartment_id"] = str(compartment) try: from oci.object_storage.models import CreateBucketDetails as \ _CreateBucketDetails except: raise ImportError( "Cannot import OCI. Please install OCI, e.g. via " "'pip install oci' so that you can connect to the " "Oracle Cloud Infrastructure") # try to get the existing bucket client = new_bucket["client"] namespace = client.get_namespace().data sanitised_name = _sanitise_bucket_name(bucket_name) try: existing_bucket = client.get_bucket( namespace, sanitised_name).data except: existing_bucket = None if existing_bucket: new_bucket["bucket"] = existing_bucket return new_bucket if create_if_needed: try: request = _CreateBucketDetails() request.compartment_id = new_bucket["compartment_id"] request.name = sanitised_name client.create_bucket(namespace, request) except: pass try: existing_bucket = client.get_bucket( namespace, sanitised_name).data except: existing_bucket = None if existing_bucket is None: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "There is not bucket called '%s'. Please check the " "compartment and access permissions." % bucket_name) new_bucket["bucket"] = existing_bucket return new_bucket
[docs] @staticmethod def get_bucket_name(bucket): """Return the name of the passed bucket Args: bucket (dict): Bucket holding data Returns: str: Name of bucket """ return bucket["bucket_name"]
[docs] @staticmethod def is_bucket_empty(bucket): """Return whether or not the passed bucket is empty Args: bucket (dict): Bucket holding data Returns: bool: True if bucket empty, else False """ objects = bucket["client"].list_objects(bucket["namespace"], bucket["bucket_name"], limit=1).data return len(objects) == 0
[docs] @staticmethod def delete_bucket(bucket, force=False): """Delete the passed bucket. This should be used with caution. Normally you can only delete a bucket if it is empty. If 'force' is True then it will remove all objects/pars from the bucket first, and then delete the bucket. This can cause a LOSS OF DATA! Args: bucket (dict): Bucket to delete force (bool, default=False): If True, delete even if bucket is not empty. If False and bucket not empty raise PermissionError Returns: None """ is_empty = OCI_ObjectStore.is_bucket_empty(bucket=bucket) if not is_empty: if force: OCI_ObjectStore.delete_all_objects(bucket=bucket) else: raise PermissionError( "You cannot delete the bucket %s as it is not empty" % OCI_ObjectStore.get_bucket_name(bucket=bucket)) # the bucket is empty - delete it client = bucket["client"] namespace = client.get_namespace().data bucket_name = bucket["bucket_name"] try: response = client.delete_bucket(namespace, bucket_name) except Exception as e: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to delete bucket '%s'. Please check the " "compartment and access permissions: Error %s" % (bucket_name, str(e))) if response.status not in [200, 204]: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to delete a bucket '%s' : Status %s, Error %s" % (bucket_name, response.status, str(response.data)))
[docs] @staticmethod def create_par(bucket, encrypt_key, key=None, readable=True, writeable=False, duration=3600, cleanup_function=None): """Create a pre-authenticated request for the passed bucket and key (if key is None then the request is for the entire bucket). This will return a PAR object that will contain a URL that can be used to access the object/bucket. If writeable is true, then the URL will also allow the object/bucket to be written to. PARs are time-limited. Set the lifetime in seconds by passing in 'duration' (by default this is one hour) Args: bucket (dict): Bucket to create PAR for encrypt_key (PublicKey): Public key to encrypt PAR key (str, default=None): Key readable (bool, default=True): If bucket is readable writeable (bool, default=False): If bucket is writeable duration (int, default=3600): Duration PAR should be valid for in seconds cleanup_function (function, default=None): Cleanup function to be passed to PARRegistry Returns: PAR: Pre-authenticated request for the bucket """ from Acquire.Crypto import PublicKey as _PublicKey if not isinstance(encrypt_key, _PublicKey): from Acquire.Client import PARError raise PARError( "You must supply a valid PublicKey to encrypt the " "returned PAR") # get the UTC datetime when this PAR should expire from Acquire.ObjectStore import get_datetime_now as _get_datetime_now expires_datetime = _get_datetime_now() + \ _datetime.timedelta(seconds=duration) is_bucket = (key is None) # Limitation of OCI - cannot have a bucket PAR with # read permissions! if is_bucket and readable: from Acquire.Client import PARError raise PARError( "You cannot create a Bucket PAR that has read permissions " "due to a limitation in the underlying platform") try: from oci.object_storage.models import \ CreatePreauthenticatedRequestDetails as \ _CreatePreauthenticatedRequestDetails except: raise ImportError( "Cannot import OCI. Please install OCI, e.g. via " "'pip install oci' so that you can connect to the " "Oracle Cloud Infrastructure") oci_par = None request = _CreatePreauthenticatedRequestDetails() if is_bucket: request.access_type = "AnyObjectWrite" elif readable and writeable: request.access_type = "ObjectReadWrite" elif readable: request.access_type = "ObjectRead" elif writeable: request.access_type = "ObjectWrite" else: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unsupported permissions model for PAR!") request.name = str(_uuid.uuid4()) if not is_bucket: request.object_name = _clean_key(key) request.time_expires = expires_datetime client = bucket["client"] try: response = client.create_preauthenticated_request( client.get_namespace().data, bucket["bucket_name"], request) except Exception as e: # couldn't create the preauthenticated request from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to create the PAR '%s': %s" % (str(request), str(e))) if response.status != 200: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to create the PAR '%s': Status %s, Error %s" % (str(request), response.status, str(response.data))) oci_par = response.data if oci_par is None: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to create the preauthenticated request!") created_datetime = oci_par.time_created.replace( tzinfo=_datetime.timezone.utc) expires_datetime = oci_par.time_expires.replace( tzinfo=_datetime.timezone.utc) # the URI returned by OCI does not include the server. We need # to get the server based on the region of this bucket url = _get_object_url_for_region(bucket["region"], oci_par.access_uri) # get the checksum for this URL - used to validate the close # request from Acquire.Client import PAR as _PAR from Acquire.ObjectStore import PARRegistry as _PARRegistry url_checksum = _PAR.checksum(url) driver_details = {"driver": "oci", "bucket": bucket["bucket_name"], "created_datetime": created_datetime, "par_id": oci_par.id, "par_name": oci_par.name} par = _PAR(url=url, encrypt_key=encrypt_key, key=oci_par.object_name, expires_datetime=expires_datetime, is_readable=readable, is_writeable=writeable, driver_details=driver_details) _PARRegistry.register(par=par, url_checksum=url_checksum, details_function=_get_driver_details_from_par, cleanup_function=cleanup_function) return par
[docs] @staticmethod def close_par(par=None, par_uid=None, url_checksum=None): """Close the passed PAR, which provides access to data in the passed bucket Args: par (PAR, default=None): PAR to close bucket par_uid (str, default=None): UID for PAR url_checksum (str, default=None): Checksum to pass to PARRegistry Returns: None """ from Acquire.ObjectStore import PARRegistry as _PARRegistry if par is None: par = _PARRegistry.get( par_uid=par_uid, details_function=_get_driver_details_from_data, url_checksum=url_checksum) from Acquire.Client import PAR as _PAR if not isinstance(par, _PAR): raise TypeError("The PAR must be of type PAR") if par.driver() != "oci": raise ValueError("Cannot delete a PAR that was not created " "by the OCI object store") # delete the PAR from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket par_bucket = par.driver_details()["bucket"] par_id = par.driver_details()["par_id"] bucket = _get_service_account_bucket() # now get the bucket accessed by the PAR... bucket = OCI_ObjectStore.get_bucket(bucket=bucket, bucket_name=par_bucket) client = bucket["client"] try: response = client.delete_preauthenticated_request( client.get_namespace().data, bucket["bucket_name"], par_id) except Exception as e: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to delete a PAR '%s' : Error %s" % (par_id, str(e))) if response.status not in [200, 204]: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError( "Unable to delete a PAR '%s' : Status %s, Error %s" % (par_id, response.status, str(response.data))) # close the PAR - this will trigger any close_function(s) _PARRegistry.close(par=par)
[docs] @staticmethod def get_object(bucket, key): """Return the binary data contained in the key 'key' in the passed bucket Args: bucket (dict): Bucket containing data key (str): Key for data in bucket Returns: bytes: Binary data """ key = _clean_key(key) try: response = bucket["client"].get_object(bucket["namespace"], bucket["bucket_name"], key) is_chunked = False except: try: response = bucket["client"].get_object(bucket["namespace"], bucket["bucket_name"], "%s/1" % key) is_chunked = True except: is_chunked = False pass # Raise the original error if not is_chunked: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError("No data at key '%s'" % key) data = None for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): if not data: data = chunk else: data += chunk if is_chunked: # keep going through to find more chunks next_chunk = 1 while True: next_chunk += 1 try: response = bucket["client"].get_object( bucket["namespace"], bucket["bucket_name"], "%s/%d" % (key, next_chunk)) except: response = None break for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): if not data: data = chunk else: data += chunk return data
[docs] @staticmethod def take_object(bucket, key): """Take (delete) the object from the object store, returning the object Args: bucket (dict): Bucket containing data key (str): Key for data Returns: bytes: Binary data """ # ideally the get and delete should be atomic... would like this API data = OCI_ObjectStore.get_object(bucket, key) try: OCI_ObjectStore.delete_object(bucket, key) except: pass return data
[docs] @staticmethod def get_all_object_names(bucket, prefix=None): """Returns the names of all objects in the passed bucket Args: bucket (dict): Bucket containing data prefix (str): Prefix for data Returns: list: List of all objects in bucket """ if prefix is not None: prefix = _clean_key(prefix) objects = bucket["client"].list_objects(bucket["namespace"], bucket["bucket_name"], prefix=prefix).data names = [] for obj in objects.objects: if prefix: if obj.name.startswith(prefix): name = obj.name else: name = obj.name while name.endswith("/"): name = name[0:-1] while name.startswith("/"): name = name[1:] if len(name) > 0: names.append(name) return names
[docs] @staticmethod def set_object(bucket, key, data): """Set the value of 'key' in 'bucket' to binary 'data' Args: bucket (dict): Bucket containing data key (str): Key for data in bucket data (bytes): Binary data to store in bucket Returns: None """ if data is None: data = b'0' f = _io.BytesIO(data) key = _clean_key(key) bucket["client"].put_object(bucket["namespace"], bucket["bucket_name"], key, f)
[docs] @staticmethod def delete_all_objects(bucket, prefix=None): """Deletes all objects... Args: bucket (dict): Bucket containing data prefix (str, default=None): Prefix for data, currently unused Returns: None """ for obj in OCI_ObjectStore.get_all_object_names(bucket): bucket["client"].delete_object(bucket["namespace"], bucket["bucket_name"], obj)
[docs] @staticmethod def delete_object(bucket, key): """Removes the object at 'key' Args: bucket (dict): Bucket containing data key (str): Key for data Returns: None """ try: key = _clean_key(key) bucket["client"].delete_object(bucket["namespace"], bucket["bucket_name"], key) except: pass
[docs] @staticmethod def get_size_and_checksum(bucket, key): """Return the object size (in bytes) and MD5 checksum of the object in the passed bucket at the specified key Args: bucket (dict): Bucket containing data key (str): Key for object Returns: tuple (int, str): Size and MD5 checksum of object """ key = _clean_key(key) try: response = bucket["client"].get_object(bucket["namespace"], bucket["bucket_name"], key) except: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError("No data at key '%s'" % key) content_length = response.headers["Content-Length"] checksum = response.headers["Content-MD5"] # the checksum is a base64 encoded Content-MD5 header # described as standard part of HTTP RFC 2616. Need to # convert this back to a hexdigest import binascii as _binascii import base64 as _base64 md5sum = _binascii.hexlify(_base64.b64decode(checksum)).decode("utf-8") return (int(content_length), md5sum)