Source code for Acquire.Identity._authorisation


__all__ = ["Authorisation"]


[docs]class Authorisation: """This class holds the information needed to show that a user has authorised an action. This contains a signed token that records the time that the authorisation that was signed, together with an extra key (or secret) that can be used by the user and provider to verify that the authorisation is for the correct resource """ def __init__(self, resource=None, user=None, testing_key=None): """Create an authorisation for the passed resource that is authorised by the passed user (who must be authenticated) If testing_key is passed, then this authorisation is being tested as part of the unit tests """ if resource is not None: resource = str(resource) self._signature = None self._last_validated_datetime = None if resource is not None: if user is None and testing_key is None: raise ValueError( "You must pass in an authenticated user who will " "provide authorisation for resource '%s'" % resource) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now if user is not None: from Acquire.Client import User as _User if not isinstance(user, _User): raise TypeError("The passed user must be of type User") elif not user.is_logged_in(): raise PermissionError( "The passed user '%s' must be authenticated to enable " "you to generate an authorisation for the account") self._user_uid = user.uid() self._session_uid = user.session_uid() self._identity_url = user.identity_service().canonical_url() self._identity_uid = user.identity_service_uid() self._auth_datetime = _get_datetime_now() message = self._get_message(resource) self._signature = user.signing_key().sign(message) self._last_validated_datetime = _get_datetime_now() self._last_verified_resource = resource self._last_verified_key = None if user.guid() != self.user_guid(): # interesting future case when we allow individual users # to be identified by multiple identity services... raise PermissionError( "We do not yet support a single user being identified " "by multiple identity services: %s versus %s" % (user.guid(), self.user_guid())) elif testing_key is not None: self._user_uid = "some user uid" self._session_uid = "some session uid" self._identity_url = "some identity_url" self._identity_uid = "some identity uid" self._auth_datetime = _get_datetime_now() self._is_testing = True message = self._get_message(resource) self._signature = testing_key.sign(message) self._last_validated_datetime = _get_datetime_now() self._last_verified_resource = resource self._last_verified_key = testing_key.public_key()
[docs] def is_null(self): """Return whether or not this authorisation is null""" return self._signature is None
def _get_message(self, resource=None): """Internal function that is used to generate the message for the resource that is signed. This message encodes information about the user and identity service that signed the message, as well as the resource. This helps prevent tamporing with the data in this authorisation """ from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string if resource is None: return "%s|%s|%s|%s" % ( self._user_uid, self._session_uid, self._identity_uid, _datetime_to_string(self._auth_datetime)) else: return "%s|%s|%s|%s|%s" % ( self._user_uid, self._session_uid, self._identity_uid, str(resource), _datetime_to_string(self._auth_datetime)) def __str__(self): try: return "Authorisation(signature=%s)" % self._signature except: return "Authorisation()" def __repr__(self): return self.__str__() def __eq__(self, other): if isinstance(other, self.__class__): return self._signature == other._signature else: return False def __ne__(self, other): return not self.__eq__(other) def _fix_integer(self, value, max_value): max_value = int(max_value) if value is None: return max_value else: try: value = int(value) except: return max_value if value <= 0 or value > max_value: return max_value else: return value
[docs] def from_user(self, user_uid, service_uid): """Return whether or not this authorisation comes from the user with passed user_uid registered on the passed service_uid """ return (user_uid == self._user_uid) and \ (service_uid == self._identity_uid)
[docs] def user_uid(self): """Return the UID of the user who created this authorisation""" if self.is_null(): return None else: return self._user_uid
[docs] def user_guid(self): """Return the global UID for this user""" return "%s@%s" % (self.identity_uid(), self.user_uid())
[docs] def session_uid(self): """Return the login session that authenticated the user""" if self.is_null(): return None else: return self._session_uid
[docs] def identity_url(self): """Return the URL of the identity service that authenticated the user """ if self.is_null(): return None else: return self._identity_url
[docs] def identity_uid(self): """Return the UID of the identity service that authenticated the user """ if self.is_null(): return None else: return self._identity_uid
[docs] def signature_time(self): """Return the time when the authentication was signed""" if self.is_null(): return None else: return self._auth_datetime
[docs] def last_verification_time(self): """Return the last time this authorisation was verified. Note that you should re-verify authorisations periodically, to ensure that they identity service is still happy that the login session was not suspicious """ if self.is_null(): return None else: return self._last_validated_datetime
[docs] def signature(self): """Return the actual signature""" if self.is_null(): return None else: return self._signature
[docs] def is_stale(self, stale_time=7200): """Return whether or not this authorisation is stale. 'stale_time' is the number of seconds after which the authorisation is considered stale (and thus no longer valid) """ stale_time = self._fix_integer(stale_time, 365*24*7200) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now now = _get_datetime_now() return ((now - self._auth_datetime).seconds > stale_time)
[docs] def is_verified(self, resource=None, refresh_time=3600, stale_time=7200, testing_key=None): """Return whether or not this authorisation has been verified. Note that this will cache any verification for 'refresh_time' (in seconds) 'stale_time' gives the time (in seconds) beyond which the authorisation will be considered stale (and thus not valid). By default this is 7200 seconds (2 hours), meaning that the authorisation must be used within 2 hours to be valid. """ refresh_time = self._fix_integer(refresh_time, 24*3600) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now now = _get_datetime_now() try: if self._last_verified_resource != resource: return False except: pass if self._last_validated_datetime is not None: if self._last_verified_key != testing_key: return False if (now - self._last_validated_datetime).seconds < refresh_time: # no need to re-validate return not self.is_stale(stale_time) return False
[docs] def verify(self, resource=None, refresh_time=3600, stale_time=7200, force=False, testing_key=None): """Verify that this is a valid authorisation provided by the user for the passed 'resource'. This will cache the verification for 'refresh_time' (in seconds), but re-verification can be forced if 'force' is True. 'stale_time' gives the time (in seconds) beyond which the authorisation will be considered stale (and thus not valid). By default this is 7200 seconds (2 hours), meaning that the authorisation must be used within 2 hours to be valid. If 'testing_key' is passed, then this object is being tested as part of the unit tests """ if self.is_null(): raise PermissionError("Cannot verify a null Authorisation") if self.is_stale(stale_time): raise PermissionError("Cannot verify a stale Authorisation") if not force: if self.is_verified(resource=resource, refresh_time=refresh_time, stale_time=stale_time, testing_key=testing_key): return if testing_key is not None: if not self._is_testing: raise PermissionError( "You cannot pass a test key to a non-testing " "Authorisation") message = self._get_message(resource) try: testing_key.verify(self._signature, message) except Exception as e: from Acquire.Service import exception_to_string raise PermissionError(exception_to_string(e)) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._last_validated_datetime = _get_datetime_now() self._last_verified_resource = resource self._last_verified_key = testing_key return try: # we need to get the public signing key for this session from Acquire.Service import get_trusted_service \ as _get_trusted_service from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now try: identity_service = _get_trusted_service( self._identity_url) except: raise PermissionError( "Unable to verify the authorisation as we do not trust " "the identity service at %s" % self._identity_url) if not identity_service.can_identify_users(): raise PermissionError( "Cannot verify an Authorisation that does not use a valid " "identity service") if identity_service.uid() != self._identity_uid: raise PermissionError( "Cannot verify this Authorisation as the actual UID of " "the identity service at '%s' (%s) does not match " "the UID of the service that signed this authorisation " "(%s)" % (self._identity_url, identity_service.uid(), self._identity_uid)) response = identity_service.whois( user_uid=self._user_uid, session_uid=self._session_uid) try: logout_datetime = _string_to_datetime( response["logout_datetime"]) except: logout_datetime = None if logout_datetime: # the user has logged out from this session - ensure that # the authorisation was created before the user logged out if logout_datetime < self.signature_time(): raise PermissionError( "This authorisation was signed after the user logged " "out. This means that the authorisation is not valid. " "Please log in again and create a new authorisation.") message = self._get_message(resource) from Acquire.Crypto import PublicKey as _PublicKey pubcert = _PublicKey.from_data(response["public_cert"]) pubcert.verify(self._signature, message) self._last_validated_datetime = _get_datetime_now() self._last_verified_resource = resource self._last_verified_key = None except PermissionError: raise except Exception as e: if resource: raise PermissionError( "Cannot verify the authorisation for resource %s: %s" % (resource, str(e))) else: raise PermissionError( "Cannot verify the authorisation: %s" % (str(e))) except: if resource: raise PermissionError( "Cannot verify the authorisation for resource %s" % resource) else: raise PermissionError("Cannot verify the authorisation")
[docs] @staticmethod def from_data(data): """Return an authorisation created from the json-decoded dictionary""" auth = Authorisation() if (data and len(data) > 0): from Acquire.ObjectStore import string_to_datetime \ as _string_to_datetime from Acquire.ObjectStore import string_to_bytes \ as _string_to_bytes auth._user_uid = data["user_uid"] auth._session_uid = data["session_uid"] auth._identity_url = data["identity_url"] auth._identity_uid = data["identity_uid"] auth._auth_datetime = _string_to_datetime(data["auth_datetime"]) auth._signature = _string_to_bytes(data["signature"]) auth._last_validated_datetime = None if "is_testing" in data: auth._is_testing = data["is_testing"] return auth
[docs] def to_data(self): """Return this object serialised to a json-encoded dictionary""" data = {} if self.is_null(): return data from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string data["user_uid"] = str(self._user_uid) data["session_uid"] = str(self._session_uid) data["identity_url"] = str(self._identity_url) data["identity_uid"] = str(self._identity_uid) data["auth_datetime"] = _datetime_to_string(self._auth_datetime) data["signature"] = _bytes_to_string(self._signature) try: data["is_testing"] = self._is_testing except: pass return data