Source code for Acquire.Service._service_account


import os as _os
import json as _json

from cachetools import cached as _cached
from cachetools import LRUCache as _LRUCache

# The cache can hold a maximum of 5 objects, and will replace the least
# recently used items first
_cache_serviceinfo_data = _LRUCache(maxsize=5)
_cache_service_info = _LRUCache(maxsize=5)
_cache_adminusers = _LRUCache(maxsize=5)
_cache_serviceuser = _LRUCache(maxsize=5)
_cache_service_account_uid = _LRUCache(maxsize=5)


__all__ = ["push_is_running_service", "pop_is_running_service",
           "is_running_service", "assert_running_service",
           "setup_this_service", "add_admin_user",
           "get_this_service", "get_admin_users",
           "get_service_private_key", "save_service_keys_to_objstore",
           "load_service_key_from_objstore",
           "get_service_private_certificate", "get_service_public_key",
           "get_service_public_certificate",
           "clear_serviceinfo_cache",
           "get_service_user_account_uid", "create_service_user_account"]


# The key in the object store for the service object
_service_key = "_service_key"

_is_running_service = 0


[docs]def push_is_running_service(): """Internal function used to push into the 'running_service' state. While we are in this state then we view that all code is running as part of a running service """ global _is_running_service _is_running_service += 1
[docs]def pop_is_running_service(): """Internal function used to pop out from the 'running_service' state. While we are in this state then we view that all code is running as part of a running service """ global _is_running_service _is_running_service -= 1 if _is_running_service < 0: _is_running_service = 0
[docs]def is_running_service(): """Return whether or not this code is running as part of a service""" global _is_running_service return _is_running_service > 0
[docs]def assert_running_service(): """Assert that this code is running as part of a valid service""" if not is_running_service(): from ._errors import ServiceError raise ServiceError( "You can only call this function from within a valid " "running service. A client cannot call this function.")
[docs]def clear_serviceinfo_cache(): """Clear the caches used to accelerate loading the service info and admin user objects """ _cache_adminusers.clear() _cache_service_info.clear() _cache_serviceinfo_data.clear() _cache_serviceuser.clear() _cache_service_account_uid.clear()
# Cache this function as the data will rarely change, and this # will prevent too many runs to the ObjectStore @_cached(_cache_serviceinfo_data) def _get_this_service_data(): """Internal function that loads up the service info data from the object store. """ assert_running_service() from Acquire.Service import ServiceAccountError # get the bucket again - can't pass as an argument as this is a cached # function - luckily _get_service_account_bucket is also a cached function try: from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket bucket = _get_service_account_bucket() except ServiceAccountError as e: raise e except Exception as e: raise ServiceAccountError( "Cannot log into the service account: %s" % str(e)) # find the service info from the object store try: from Acquire.ObjectStore import ObjectStore as _ObjectStore service = _ObjectStore.get_object_from_json(bucket, _service_key) except Exception as e: from Acquire.Service import MissingServiceAccountError raise MissingServiceAccountError( "Unable to load the service account for this service. An " "error occured while loading the data from the object " "store: %s" % str(e)) if not service: from Acquire.Service import MissingServiceAccountError raise MissingServiceAccountError( "You haven't yet created the service account " "for this service. Please create an account first.") return service def _get_service_password(): """Function used to get the primary password that locks the skeleton key that secures the entire tree of trust from the Service object """ service_password = _os.getenv("SERVICE_PASSWORD") if service_password is None: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "You must supply a $SERVICE_PASSWORD") return service_password
[docs]def setup_this_service(service_type, canonical_url, registry_uid, username, password): """Call this function to setup a new service that will serve at 'canonical_url', will be of the specified service_type. This will be registered at the registry at UID registry_uid (1) Delete the object store value "_service" if you want to reset the actual Service. This will assign a new UID for the service which would reset the certificates and keys. This new service will need to be re-introduced to other services that need to trust it """ assert_running_service() from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.ObjectStore import Mutex as _Mutex from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import Service as _Service bucket = _get_service_account_bucket() # ensure that this is the only time the service is set up mutex = _Mutex(key=_service_key, bucket=bucket, lease_time=120) try: service_info = _ObjectStore.get_object_from_json(bucket, _service_key) except: service_info = None service = None service_password = _get_service_password() user_uid = None otp = None if service_info: try: service = _Service.from_data(service_info, service_password) except Exception as e: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Something went wrong reading the Service data. You should " "either debug the error or delete the data at key '%s' " "to allow the service to be reset and constructed again. " "The error was %s" % (_service_key, str(e))) if service.uid().startswith("STAGE1"): from Acquire.Service import ServiceAccountError raise ServiceAccountError( "The service is currently under construction. Please " "try again later...") if service is None: # we need to create the new service if (service_type is None) or (canonical_url is None): from Acquire.Service import ServiceAccountError raise ServiceAccountError( "You need to supply both the service_type and canonical_url " "in order to initialise a new Service") # we need to build the service account - first stage 1 service = _Service.create(service_url=canonical_url, service_type=service_type) # write the stage1 service data, encrypted using the service password. # This will be needed to answer the challenge from the registry service_data = service.to_data(service_password) _ObjectStore.set_object_from_json(bucket, _service_key, service_data) # now we can register the service with a registry - this # will return the stage2-constructed service from Acquire.Registry import register_service as _register_service service = _register_service(service=service, registry_uid=registry_uid) canonical_url = _Service.get_canonical_url(canonical_url) if service.service_type() != service_type or \ service.canonical_url() != canonical_url: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "The existing service has a different type or URL to that " "requested at setup. The request type and URL are %s and %s, " "while the actual service type and URL are %s and %s." % (service_type, canonical_url, service.service_type(), service.canonical_url())) # we can add the first admin user service_uid = service.uid() skelkey = service.skeleton_key().public_key() # now register the new admin user account - remembering to # encode the password from Acquire.Client import Credentials as _Credentials password = _Credentials.encode_password(password=password, identity_uid=service_uid) from Acquire.Identity import UserAccount as _UserAccount (user_uid, otp) = _UserAccount.create(username=username, password=password, _service_uid=service_uid, _service_public_key=skelkey) add_admin_user(service, user_uid) # write the service data, encrypted using the service password service_data = service.to_data(service_password) # reload the data to check it is ok, and also to set the right class service = _Service.from_data(service_data, service_password) # now it is ok, save this data to the object store _ObjectStore.set_object_from_json(bucket, _service_key, service_data) mutex.unlock() from Acquire.Service import clear_service_cache as _clear_service_cache _clear_service_cache() return (service, user_uid, otp)
[docs]def add_admin_user(service, account_uid, authorisation=None): """Function that is called to add a new user account as a service administrator. If this is the first account then authorisation is not needed. If this is the second or subsequent admin account, then you need to provide an authorisation signed by one of the existing admin users. If you need to reset the admin users then delete the user accounts from the service. """ assert_running_service() from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.ObjectStore import Mutex as _Mutex from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.ObjectStore import get_datetime_now_to_string as \ _get_datetime_now_to_string bucket = _get_service_account_bucket() # see if the admin account details exists... admin_key = "%s/admin_users" % _service_key # ensure that we have exclusive access to this service mutex = _Mutex(key=admin_key, bucket=bucket) try: admin_users = _ObjectStore.get_object_from_json(bucket, admin_key) except: admin_users = None if admin_users is None: # this is the first admin user - automatically accept admin_users = {} authorised_by = "first admin" else: # validate that the new user has been authorised by an existing # admin... if authorisation is None: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "You must supply a valid authorisation from an existing admin " "user if you want to add a new admin user.") if authorisation.user_uid() not in admin_users: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "The authorisation for the new admin account is not valid " "because the user who signed it is not a valid admin on " "this service.") authorisation.verify(account_uid) authorised_by = authorisation.user_uid() # everything is ok - add this admin user to the admin_users # dictionary - save the date and time they became an admin, # and how they achieved this status (first admin, or whoever # authorised them) admin_users[account_uid] = {"datetime": _get_datetime_now_to_string(), "authorised_by": authorised_by} # everything is done, so now write this data to the object store _ObjectStore.set_object_from_json(bucket, admin_key, _json.dumps(admin_users)) # we can (finally!) release the mutex, as everyone else should now # be able to see the account mutex.unlock() _cache_adminusers.clear()
[docs]@_cached(_cache_adminusers) def get_admin_users(): """This function returns all of the admin_users data. This is a dictionary of the UIDs of all of the admin users """ assert_running_service() from Acquire.Service import ServiceAccountError try: from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket bucket = _get_service_account_bucket() except ServiceAccountError as e: raise e except Exception as e: raise ServiceAccountError( "Cannot log into the service account: %s" % str(e)) # find the admin accounts info from the object store try: key = "%s/admin_users" % _service_key from Acquire.ObjectStore import ObjectStore as _ObjectStore admin_users = _ObjectStore.get_object_from_json(bucket, key) except Exception as e: from Acquire.Service import MissingServiceAccountError raise MissingServiceAccountError( "Unable to load the Admin User data for this service. An " "error occured while loading the data from the object " "store: %s" % str(e)) if not admin_users: from Acquire.Service import MissingServiceAccountError raise MissingServiceAccountError( "You haven't yet created any Admin Users for the service account " "for this service. Please create an Admin User first.") return admin_users
[docs]@_cached(_cache_service_info) def get_this_service(need_private_access=False): """Return the service info object for this service. If private access is needed then this will decrypt and access the private keys and signing certificates, which is slow if you just need the public certificates. """ assert_running_service() from Acquire.Service import MissingServiceAccountError try: service_info = _get_this_service_data() except MissingServiceAccountError: raise except Exception as e: raise MissingServiceAccountError( "Unable to read the service info from the object store! : %s" % str(e)) service_password = None if need_private_access: service_password = _get_service_password() try: from Acquire.Service import Service as _Service if service_password: service = _Service.from_data(service_info, service_password) else: service = _Service.from_data(service_info) except Exception as e: raise MissingServiceAccountError( "Unable to create the ServiceAccount object: %s %s" % (e.__class__, str(e))) return service
[docs]@_cached(_cache_service_account_uid) def get_service_user_account_uid(accounting_service_uid): """Return the UID of the financial Acquire.Accounting.Account that is held on the accounting service with UID 'accounting_service_uid' for the service user on this service. This is the account to which payment for this service should be sent """ assert_running_service() from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.ObjectStore import ObjectStore as _ObjectStore bucket = _get_service_account_bucket() key = "%s/account/%s" % (_service_key, accounting_service_uid) try: account_uid = _ObjectStore.get_string_object(bucket, key) except: account_uid = None if account_uid is None: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "This service does not have a valid financial account on " "the accounting service at '%s'" % accounting_service_uid) return account_uid
[docs]def create_service_user_account(service, accounting_service_url): """Call this function to create the financial service account for this service on the accounting service at 'accounting_service_url' This does nothing if the account already exists """ assert_running_service() accounting_service = service.get_trusted_service( service_url=accounting_service_url) accounting_service_uid = accounting_service.uid() key = "%s/account/%s" % (_service_key, accounting_service_uid) bucket = service.bucket() from Acquire.ObjectStore import ObjectStore as _ObjectStore try: account_uid = _ObjectStore.get_string_object(bucket, key) except: account_uid = None if account_uid: # we already have an account... return service_user = service.login_service_user() try: from Acquire.Client import create_account as _create_account account = _create_account( service_user, "main", "Main account to receive payment for all use on service " "%s (%s)" % (service.canonical_url(), service.uid()), accounting_service=accounting_service) account_uid = account.uid() _ObjectStore.set_string_object(bucket, key, account_uid) except Exception as e: from Acquire.Service import exception_to_string from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Unable to create a financial account for the service " "principal for '%s' on accounting service '%s'\n\nERROR\n%s" % (str(service), str(accounting_service), exception_to_string(e)))
[docs]def load_service_key_from_objstore(fingerprint): """This function will see if we have an old key with the requested fingerprint, and if so, we will try to load and return that key from the object store """ from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.Crypto import KeyManipulationError bucket = _get_service_account_bucket() try: key = "%s/oldkeys/fingerprints/%s" % (_service_key, fingerprint) keyfile = _ObjectStore.get_string_object(bucket, key) except: keyfile = None if keyfile is None: raise KeyManipulationError( "Cannot find a key or certificate with fingerprint '%s' : %s" % (fingerprint, key)) try: keydata = _ObjectStore.get_object_from_json(bucket, keyfile) except Exception as e: keydata = None error = str(e) if keydata is None: raise KeyManipulationError( "Unable to load the key or certificate with fingerprint '%s': %s" % (fingerprint, error)) service = get_this_service(need_private_access=True) return service.load_keys(keydata)[fingerprint]
[docs]def save_service_keys_to_objstore(include_old_keys=False): """Call this function to ensure that the current set of keys used for this service are saved to object store """ service = get_this_service(need_private_access=True) oldkeys = service.dump_keys(include_old_keys=include_old_keys) # now write the old keys to storage from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket bucket = _get_service_account_bucket() key = "%s/oldkeys/%s" % (_service_key, oldkeys["datetime"]) _ObjectStore.set_object_from_json(bucket, key, oldkeys) # now write the pointers from fingerprint to file... for fingerprint in oldkeys.keys(): if fingerprint not in ["datetime", "encrypted_passphrase"]: _ObjectStore.set_string_object( bucket, "%s/oldkeys/fingerprints/%s" % (_service_key, fingerprint), key)
def _refresh_service_keys_and_certs(service): """This function will check if any key rotation is needed, and if so, it will automatically refresh the keys and certificates. The old keys and certificates will be stored in a database of old keys and certificates """ assert_running_service() if not service.should_refresh_keys(): return service # ensure that the current keys are saved to the object store save_service_keys_to_objstore() # generate new keys last_update = service.last_key_update() service.refresh_keys() # now lock the object store so that we are the only function # that can write the new keys to global state from Acquire.Service import get_service_account_bucket as \ _get_service_account_bucket from Acquire.Service import Service as _Service from Acquire.ObjectStore import Mutex as _Mutex from Acquire.ObjectStore import ObjectStore as _ObjectStore bucket = _get_service_account_bucket() m = _Mutex(key=service.uid(), bucket=bucket) service_data = _ObjectStore.get_object_from_json(bucket, _service_key) service_info = _Service.from_data(service_data) if service_info.last_key_update() == last_update: # no-one else has beaten us - write the updated keys to global state _ObjectStore.set_object_from_json(bucket, _service_key, service.to_data( _get_service_password())) m.unlock() # clear the cache as we will need to load a new object clear_serviceinfo_cache() return get_this_service(need_private_access=True)
[docs]def get_service_private_key(fingerprint=None): """This function returns the private key for this service""" s = get_this_service(need_private_access=True) s = _refresh_service_keys_and_certs(s) key = s.private_key() from Acquire.Service import get_service_account_bucket if fingerprint: if key.fingerprint() != fingerprint: key = s.last_key() if key.fingerprint() != fingerprint: try: return load_service_key_from_objstore(fingerprint) except Exception as e: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Cannot find a private key for '%s' that matches " "the fingerprint %s. This is either because you are " "using a key that is too old or " "you are requesting a wrong key. Please call " "refresh_keys on your Service object and try again: %s" % (str(s), fingerprint, str(e))) return key
[docs]def get_service_private_certificate(fingerprint=None): """This function returns the private signing certificate for this service """ s = get_this_service(need_private_access=True) s = _refresh_service_keys_and_certs(s) cert = s.private_certificate() if fingerprint: if cert.fingerprint() != fingerprint: cert = s.last_certificate() if cert.fingerprint() != fingerprint: try: return load_service_key_from_objstore(fingerprint) except Exception as e: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Cannot find a private certificate for '%s' that matches " "the fingerprint %s. This is either because you are " "using a certificate that is too old or " "you are requesting a wrong certificate. Please call " "refresh_keys on your Service object and try again: %s" % (str(s), fingerprint, str(e))) return cert
[docs]def get_service_public_key(fingerprint=None): """This function returns the public key for this service""" s = get_this_service(need_private_access=False) key = s.public_key() if fingerprint: if key.fingerprint() != fingerprint: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Cannot find a public key for '%s' that matches " "the fingerprint %s" % (str(s), fingerprint)) return key
[docs]def get_service_public_certificate(fingerprint=None): """This function returns the public certificate for this service""" s = get_this_service(need_private_access=False) cert = s.public_certificate() if fingerprint: if cert.fingerprint() != fingerprint: cert = s.last_certificate() if cert.fingerprint() != fingerprint: from Acquire.Service import ServiceAccountError raise ServiceAccountError( "Cannot find a public certificate for '%s' that matches " "the fingerprint %s" % (str(s), fingerprint)) return cert