Source code for Acquire.Identity._loginsession


__all__ = ["LoginSession"]

_sessions_key = "identity/sessions"


[docs]class LoginSession: """This class holds all details of a single login session""" def __init__(self, username=None, public_key=None, public_cert=None, ipaddr=None, hostname=None, login_message=None, scope=None, permissions=None): """Start a new login session for the user with specified username, passing in the additional data needed to request a login """ if public_key is not None: from Acquire.Crypto import PublicKey as _PublicKey if not isinstance(public_key, _PublicKey): raise TypeError("The public key must be of type PublicKey") if not isinstance(public_cert, _PublicKey): raise TypeError("The public certificate must be of " "type PublicKey") self._username = username self._pubkey = public_key self._pubcert = public_cert from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now from Acquire.ObjectStore import create_uuid as _create_uuid self._uid = _create_uuid() self._request_datetime = _get_datetime_now() self._status = None self._ipaddr = ipaddr self._hostname = hostname self._login_message = login_message self._scope = scope self._permissions = permissions # make sure this session is saved to the object store self._set_status("pending") else: self._uid = None def __str__(self): if self.is_null(): return "LoginSession::null" else: return "LoginSession(uid=%s, status=%s)" % \ (self.uid(), self.status()) def __eq__(self, other): if isinstance(other, self.__class__): return self._uid == other._uid else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] def is_null(self): """Return whether or not this object is null""" return self._uid is None
[docs] def username(self): """Return the username for this login session""" try: return self._username except: return None
[docs] def encoded_username(self): """Return a safely-encoded version of the username. This is used to create safe keys in the object store """ if self.is_null(): raise PermissionError( "You cannot get the encoded username of a null LoginSession") from Acquire.Identity import _encode_username return _encode_username(self._username)
[docs] def public_key(self): """Return the public key""" status = self.status() if status != "approved": from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public key from " "a session that is not fully approved (status = %s)" % status) return self._pubkey
[docs] def public_certificate(self): """Return the public certificate""" status = self.status() if status != "approved": from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public certificate from " "a session that is not fully approved (status = %s)" % status) return self._pubcert
[docs] def request_source(self): """Return the IP address of the source of this request. This could be used to rate limit someone who is maliciously requesting logins... """ if self.is_null(): return None return self._ipaddr
[docs] def uid(self): """Return the UID of this request""" if self.is_null(): return None return self._uid
[docs] @staticmethod def to_short_uid(long_uid): """Return the short UID version of the passed long uid""" return long_uid[:8]
[docs] def short_uid(self): """Return a short UUID that will be used to provide a more human-readable session ID """ if self._uid: return LoginSession.to_short_uid(self._uid) else: return None
[docs] def login_url(self): """Return the login URL to login to this session. This is the URL of this identity service plus the short UID of the session """ from Acquire.Service import get_this_service as _get_this_service service = _get_this_service(need_private_access=False) service_uid = service.uid() short_uid = self.short_uid() short_uid = ".".join([short_uid[i:i+2] for i in range(0, len(short_uid), 2)]) # eventually allow the service provider to configure this url url = "https://login.acquire-aaai.com" return "%s?id=%s/%s" % (url, service_uid, short_uid)
[docs] def regenerate_uid(self): """Regenerate the UUID as there has been a clash""" if not self.is_null(): from Acquire.ObjectStore import create_uuid as _create_uuid self._uid = _create_uuid()
[docs] def creation_time(self): """Return the date and time when this was created""" if self.is_null(): return None return self._request_datetime
[docs] def login_time(self): """Return the date and time when the user logged in. This returns None if the user has not yet logged in """ try: return self._login_datetime except: return None
[docs] def logout_time(self): """Return the date and time when the user logged out. This returns None if the user has not yet logged out """ try: return self._logout_datetime except: return None
[docs] def seconds_since_creation(self): """Return the number of seconds since this request was created """ if self.is_null(): return None from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now delta = _get_datetime_now() - self._request_datetime return delta.total_seconds()
[docs] def status(self): """Return the status of this login session""" if self.is_null(): return "null" else: return self._status
[docs] @staticmethod def get_status(uid): """Return the status of the LoginSession with specified UID""" from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket bucket = _get_service_account_bucket() try: key = "%s/status/%s" % (_sessions_key, uid) status = _ObjectStore.get_string_object(bucket=bucket, key=key) except: status = None if status is None: from Acquire.Identity import LoginSessionError raise LoginSessionError( "Cannot find a session with UID '%s'" % uid) return status
def _set_status(self, status): """Internal function to set the status of the session. This ensures that the data for the session is saved into the correct part of the object store """ if self.is_null(): raise PermissionError( "Cannot set the status of a null LoginSession") if status not in ["approved", "pending", "denied", "suspicious", "logged_out"]: raise ValueError("Cannot set an invalid status '%s'" % status) if status == self._status: return from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket bucket = _get_service_account_bucket() key = self._get_key() try: _ObjectStore.delete_object(bucket=bucket, key=key) except: pass self._status = status key = self._get_key() _ObjectStore.set_object_from_json(bucket=bucket, key=key, data=self.to_data()) key = "%s/status/%s" % (_sessions_key, self._uid) _ObjectStore.set_string_object(bucket=bucket, key=key, string_data=status)
[docs] def set_suspicious(self): """Put this login session into a suspicious state. This will be because weird activity has been detected which indicates that the session may be have been cracked. A login session in a suspicious state should not be granted any permissions. """ if not self.is_null(): self._set_status("suspicious")
[docs] def set_approved(self, user_uid=None, device_uid=None): """Register that this request has been approved, optionally providing data about the user who approved the session and the device from which the session was approved """ if self.is_null(): raise PermissionError( "You cannot approve a null LoginSession!") if self.status() != "pending": from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot approve a login session " "that is not in the 'unapproved' state. This login " "session is in the '%s' state." % self.status()) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._login_datetime = _get_datetime_now() self._user_uid = user_uid self._device_uid = device_uid self._set_status("approved")
def _clear_keys(self): """Function called to remove all keys from this session, as it has now been terminated (and so the keys are no longer valid) """ self._pubkey = None
[docs] def set_denied(self): """Register that this request has been denied""" if self.is_null(): raise PermissionError( "You cannot deny a null LoginSession!") self._clear_keys() self._pubcert = None self._set_status("denied")
[docs] def set_logged_out(self, authorisation=None, signature=None): """Register that this request has been closed as the user has logged out. If an authorisation is passed then verify that this is correct """ if self.is_null(): raise PermissionError( "You cannot logout from a null LoginSession!") if authorisation is not None: from Acquire.Identity import Authorisation as _Authorisation if not isinstance(authorisation, _Authorisation): raise TypeError("The authorisation must be type Authorisation") authorisation.verify(resource="logout %s" % self.uid()) if authorisation.user_uid() != self.user_uid(): raise PermissionError( "The user '%s' does not have permission to logout " "a session owned by %s" % (authorisation.user_uid(), self.user_uid())) if signature is not None: message = "logout %s" % self.uid() self._pubcert.verify(signature=signature, message=message) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._logout_datetime = _get_datetime_now() self._clear_keys() self._set_status("logged_out")
[docs] def login(self, user_uid=None, device_uid=None): """Convenience function to set the session into the logged in state""" self.set_approved(user_uid=user_uid, device_uid=device_uid)
[docs] def logout(self, authorisation=None, signature=None): """Convenience function to set the session into the logged out state""" self.set_logged_out(authorisation=authorisation, signature=signature)
[docs] def is_suspicious(self): """Return whether or not this session is suspicious""" return self.status() == "suspicious"
[docs] def is_approved(self): """Return whether or not this session is open and approved by the user""" return self.status() == "approved"
[docs] def is_logged_out(self): """Return whether or not this session has logged out""" return self.status() == "logged_out"
[docs] def login_message(self): """Return any login message that has been set by the user""" try: return self._login_message except: return None
[docs] def scope(self): """Return the scope requested for this login session""" try: return self._localised_scope except: try: return self._scope except: return None
[docs] def permissions(self): """Return the permissions requested for this login session""" try: return self._localised_permissions except: try: return self._permissions except: return None
[docs] def hostname(self): """Return the user-supplied hostname of the host making the login request """ try: return self._hostname except: return None
[docs] def ipaddr(self): """Return the user-supplied IP address of the host making the login request """ try: return self._ipaddr except: return None
[docs] def user_uid(self): """If known, return the UID of the user who approved this session """ try: return self._user_uid except: return None
[docs] def device_uid(self): """If known, return the UID of the device from which the user approved this session """ try: return self._device_uid except: return None
def _get_key(self): return "%s/%s/%s/%s" % (_sessions_key, self.status(), self.short_uid(), self.uid())
[docs] def save(self): """Save the current state of this LoginSession to the object store """ if self.is_null(): return from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket bucket = _get_service_account_bucket() key = self._get_key() _ObjectStore.set_object_from_json(bucket=bucket, key=key, data=self.to_data())
def _localise(self, scope, permissions): """Localise this session for the specified scope and permissions. This will raise an exception if the scope or permissions are greater than those embedded into the session """ # DO SOME WORK HERE TO ASSERT THAT THE SCOPE AND PERMISSIONS # ARE ACCEPTABLE if scope is None: self._localised_scope = self.scope() else: self._localised_scope = scope if permissions is None: self._localised_permissions = self.permissions() else: self._localised_permissions = permissions
[docs] @staticmethod def load(status=None, short_uid=None, uid=None, scope=None, permissions=None): """Load and return a LoginSession specified from either a short_uid or a long uid. Note that if more than one session matches the short_uid then you will get a list of LoginSessions returned """ from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket if short_uid is None and uid is None: raise PermissionError( "You must supply one of the short_uid or uid to load " "a LoginSession!") if status is None: if uid is None: raise PermissionError( "You must supply the full UID to get the status " "of a specific login session") status = LoginSession.get_status(uid=uid) bucket = _get_service_account_bucket() if uid is not None: short_uid = LoginSession.to_short_uid(uid) key = "%s/%s/%s/%s" % (_sessions_key, status, short_uid, uid) try: data = _ObjectStore.get_object_from_json(bucket=bucket, key=key) session = LoginSession.from_data(data) except: from Acquire.Identity import LoginSessionError raise LoginSessionError( "There is no valid session with UID %s in " "state %s" % (uid, status)) session._localise(scope=scope, permissions=permissions) return session # the user may pass in the short_uid as YY.YY.YY.YY # so remove all dots short_uid = short_uid.replace(".", "") prefix = "%s/%s/%s/" % (_sessions_key, status, short_uid) print(prefix) try: keys = _ObjectStore.get_all_objects_from_json(bucket=bucket, prefix=prefix) except: keys = {} sessions = [] for data in keys.values(): try: session = LoginSession.from_data(data) session._localise(scope=scope, permissions=permissions) sessions.append(session) except: pass if len(sessions) == 0: from Acquire.Identity import LoginSessionError raise LoginSessionError( "There is no valid session with short UID %s " "in state %s" % (short_uid, status)) elif len(sessions) == 1: sessions = sessions[0] return sessions
[docs] def to_data(self): """Return a data version (dictionary) of this LoginSession that can be serialised to json """ if self.is_null(): return {} from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string data = {} data["uid"] = self._uid data["username"] = self._username data["request_datetime"] = _datetime_to_string(self._request_datetime) data["public_certificate"] = self._pubcert.to_data() data["status"] = self._status if self._pubkey is not None: data["public_key"] = self._pubkey.to_data() try: data["login_datetime"] = _datetime_to_string(self._login_datetime) except: pass try: data["logout_datetime"] = _datetime_to_string( self._logout_datetime) except: pass try: data["ipaddr"] = self._ipaddr except: pass try: data["hostname"] = self._hostname except: pass try: data["login_message"] = self._login_message except: pass try: data["scope"] = self._scope except: pass try: data["permissions"] = self._permissions except: pass try: data["user_uid"] = self._user_uid except: pass try: data["device_uid"] = self._device_uid except: pass return data
[docs] @staticmethod def from_data(data): """Return a LoginSession constructed from the passed data (dictionary) """ l = LoginSession() if data is not None and len(data) > 0: from Acquire.ObjectStore import string_to_datetime \ as _string_to_datetime from Acquire.Crypto import PublicKey as _PublicKey l._uid = data["uid"] l._username = data["username"] l._request_datetime = _string_to_datetime( data["request_datetime"]) l._pubcert = _PublicKey.from_data(data["public_certificate"]) l._status = data["status"] try: l._pubkey = _PublicKey.from_data(data["public_key"]) except: l._pubkey = None try: l._login_datatime = _string_to_datetime( data["login_datetime"]) except: pass try: l._logout_datetime = _string_to_datetime( data["logout_datetime"]) except: pass try: l._ipaddr = data["ipaddr"] except: pass try: l._hostname = data["hostname"] except: pass try: l._login_message = data["login_message"] except: pass try: l._scope = data["scope"] except: pass try: l._permissions = data["permissions"] except: pass try: l._user_uid = data["user_uid"] except: pass try: l._device_uid = data["device_uid"] except: pass return l