Source code for Acquire.ObjectStore._objstore


import io as _io
import datetime as _datetime
import uuid as _uuid
import json as _json
import os as _os

__all__ = ["ObjectStore", "set_object_store_backend",
           "use_testing_object_store_backend",
           "use_oci_object_store_backend"]

_objstore_backend = None


[docs]def use_testing_object_store_backend(backend): from ._testing_objstore import Testing_ObjectStore as _Testing_ObjectStore set_object_store_backend(_Testing_ObjectStore) bucket = "%s/testing_objstore" % backend try: # make sure that this directory exists if it doesn't already _os.mkdir(bucket) except: pass return bucket
[docs]def use_oci_object_store_backend(): from ._oci_objstore import OCI_ObjectStore as _OCI_ObjectStore set_object_store_backend(_OCI_ObjectStore)
[docs]class ObjectStore:
[docs] @staticmethod def create_bucket(bucket, bucket_name, compartment=None): """Create and return a new bucket in the object store called 'bucket_name', optionally placing it into the compartment identified by 'compartment'. This will raise an ObjectStoreError if this bucket already exists """ return _objstore_backend.create_bucket(bucket, bucket_name, compartment)
[docs] @staticmethod def get_bucket(bucket, bucket_name, compartment=None, create_if_needed=True): """Find and return a new bucket in the object store called 'bucket_name', optionally placing it into the compartment identified by 'compartment'. If 'create_if_needed' is True then the bucket will be created if it doesn't exist. Otherwise, if the bucket does not exist then an exception will be raised. """ return _objstore_backend.get_bucket(bucket, bucket_name, compartment, create_if_needed)
[docs] @staticmethod def create_par(bucket, encrypt_key, key=None, readable=True, writeable=False, duration=3600): """Create a pre-authenticated request for the passed bucket and key (if key is None then the request is for the entire bucket). This will return a PAR object that will contain a URL that can be used to access the object/bucket. If writeable is true, then the URL will also allow the object/bucket to be written to. PARs are time-limited. Set the lifetime in seconds by passing in 'duration' (by default this is one hour). Note that you must pass in a public key that will be used to encrypt this PAR. This is necessary as the PAR grants access to anyone who can decrypt the URL """ from Acquire.Client import PAR as _PAR par = _objstore_backend.create_par(bucket, encrypt_key, key, readable, writeable, duration) if not isinstance(par, _PAR): raise TypeError("A create_par request should always return an " "value of type PAR: %s is not correct!" % par) return par
[docs] @staticmethod def delete_par(bucket, par): """Delete the passed PAR, which provides access to data in the passed bucket """ _objstore_backend.delete_par(bucket=bucket, par=par)
[docs] @staticmethod def get_object_as_file(bucket, key, filename): """Get the object contained in the key 'key' in the passed 'bucket' and writing this to the file called 'filename'""" return _objstore_backend.get_object_as_file(bucket, key, filename)
[docs] @staticmethod def get_object(bucket, key): """Return the binary data contained in the key 'key' in the passed bucket""" return _objstore_backend.get_object(bucket, key)
[docs] @staticmethod def get_string_object(bucket, key): """Return the string in 'bucket' associated with 'key'""" return _objstore_backend.get_string_object(bucket, key)
[docs] @staticmethod def get_object_from_json(bucket, key): """Return an object constructed from json stored at 'key' in the passed bucket. This returns None if there is no data at this key """ return _objstore_backend.get_object_from_json(bucket, key)
[docs] @staticmethod def get_all_object_names(bucket, prefix=None): """Returns the names of all objects in the passed bucket""" return _objstore_backend.get_all_object_names(bucket, prefix)
[docs] @staticmethod def get_all_objects(bucket, prefix=None): """Return all of the objects in the passed bucket""" return _objstore_backend.get_all_objects(bucket, prefix)
[docs] @staticmethod def get_all_objects_from_json(bucket, prefix=None): """Return all of the objects in the passed bucket as json-deserialised objects """ return _objstore_backend.get_all_objects_from_json(bucket, prefix)
[docs] @staticmethod def get_all_strings(bucket, prefix=None): """Return all of the strings in the passed bucket""" return _objstore_backend.get_all_strings(bucket, prefix)
[docs] @staticmethod def set_object(bucket, key, data): """Set the value of 'key' in 'bucket' to binary 'data'""" _objstore_backend.set_object(bucket, key, data)
[docs] @staticmethod def set_object_from_file(bucket, key, filename): """Set the value of 'key' in 'bucket' to equal the contents of the file located by 'filename'""" _objstore_backend.set_object_from_file(bucket, key, filename)
[docs] @staticmethod def set_ins_object_from_json(bucket, key, data): """Set the value of 'key' in 'bucket' to equal to contents of 'data', which has been encoded to json, if (and only if) this key has not already been set (ins = 'if not set'). This returns the object at this key after the operation (either the set object or the value that was previously set """ from Acquire.ObjectStore import Mutex as _Mutex m = _Mutex(bucket=bucket, key=key) try: old_data = ObjectStore.get_object_from_json(bucket, key) except: old_data = None if old_data is not None: m.unlock() return old_data else: try: ObjectStore.set_object_from_json(bucket, key, data) except: m.unlock() raise return data
[docs] @staticmethod def set_ins_string_object(bucket, key, string_data): """Set the value of 'key' in 'bucket' to the string 'string_data', if (and only if) this key has not already been set (ins = 'if not set'). This returns the object at this key after the operation (either the set string, or the value that was previously set) """ from Acquire.ObjectStore import Mutex as _Mutex m = _Mutex(bucket=bucket, key=key) try: val = ObjectStore.get_string_object(bucket, key) except: val = None if val is not None: m.unlock() return val else: try: ObjectStore.set_string_object(bucket, key, string_data) except: m.unlock() raise return string_data
[docs] @staticmethod def set_string_object(bucket, key, string_data): """Set the value of 'key' in 'bucket' to the string 'string_data'""" _objstore_backend.set_string_object(bucket, key, string_data)
[docs] @staticmethod def set_object_from_json(bucket, key, data): """Set the value of 'key' in 'bucket' to equal to contents of 'data', which has been encoded to json""" _objstore_backend.set_object_from_json(bucket, key, data)
[docs] @staticmethod def delete_all_objects(bucket, prefix=None): """Deletes all objects...""" _objstore_backend.delete_all_objects(bucket, prefix)
[docs] @staticmethod def delete_object(bucket, key): """Removes the object at 'key'""" _objstore_backend.delete_object(bucket, key)
[docs] @staticmethod def clear_all_except(bucket, keys): """Removes all objects from the passed 'bucket' except those whose keys are or start with any key in 'keys'""" _objstore_backend.clear_all_except(bucket, keys)
[docs] @staticmethod def get_size_and_checksum(bucket, key): """Return the object size (in bytes) and checksum of the object in the passed bucket at the specified key """ return _objstore_backend.get_size_and_checksum(bucket, key)
[docs]def set_object_store_backend(backend): """Set the backend that is used to actually connect to the object store. This can only be set once in the program! """ global _objstore_backend if backend == _objstore_backend: return if _objstore_backend is not None: from Acquire.ObjectStore import ObjectStoreError raise ObjectStoreError("You cannot change the object store " "backend once it has been already set!") _objstore_backend = backend