Source code for Acquire.Identity._loginsession


import datetime as _datetime
import uuid as _uuid

import base64 as _base64

__all__ = ["LoginSession"]


[docs]class LoginSession: """This class holds all details of a single login session""" def __init__(self, public_key=None, public_cert=None, ip_addr=None, hostname=None, login_message=None): self._pubkey = None self._uid = None self._request_datetime = None self._login_datetime = None self._logout_datetime = None self._pubcert = None self._status = None self._ipaddr = None self._hostname = None self._login_message = None from Acquire.Crypto import PublicKey as _PublicKey if public_key: try: public_key = open(public_key, "r").readlines() except: pass try: self._pubkey = _PublicKey.from_data(public_key) except: self._pubkey = public_key if not isinstance(self._pubkey, _PublicKey): raise TypeError("The public key must be of type PublicKey") from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._uid = str(_uuid.uuid4()) self._request_datetime = _get_datetime_now() self._status = "unapproved" if public_cert: try: public_cert = open(public_cert, "r").readlines() except: pass try: self._pubcert = _PublicKey.from_data(public_cert) except: self._pubcert = public_cert if not isinstance(self._pubcert, _PublicKey): raise TypeError("The public certificate must be of " "type PublicKey") if ip_addr: self._ipaddr = ip_addr if hostname: self._hostname = hostname if login_message: self._login_message = login_message def __str__(self): if self.is_null(): return "LoginSession::null" else: return "LoginSession(uid=%s, status=%s)" % \ (self.uid(), self.status()) def __eq__(self, other): if isinstance(other, self.__class__): return self._uid == other._uid else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] def is_null(self): """Return whether or not this object is null""" return self._uid is None
[docs] def public_key(self): """Return the public key""" if self._pubkey is None: from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public key from " "a logged out or otherwise denied session") elif self.is_suspicious(): from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public key from " "a login session that has been marked as suspicious") return self._pubkey
[docs] def public_certificate(self): """Return the public certificate""" if self._pubcert is None: from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public certificate from " "a logged out or otherwise denied session") elif self.is_suspicious(): from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot get a public certificate from " "a login session that has been marked as suspicious") return self._pubcert
[docs] def request_source(self): """Return the IP address of the source of this request. This is used to rate limit someone who is maliciously requesting logins...""" return self._ipaddr
[docs] def uuid(self): """Return the UUID of this request""" return self._uid
[docs] @staticmethod def to_short_uid(long_uid): """Return the short UID version of the passed long uid""" return long_uid[:8]
[docs] def short_uuid(self): """Return a short UUID that will be used to provide a more human-readable session ID""" if self._uid: return LoginSession.to_short_uid(self._uid) else: return None
[docs] def uid(self): """Synonym for uuid""" return self.uuid()
[docs] def short_uid(self): """Synonym for short_uuid""" return self.short_uuid()
[docs] def regenerate_uuid(self): """Regenerate the UUID as there has been a clash""" if self._pubkey: self._uid = str(_uuid.uuid4())
[docs] def regenerate_uid(self): """Synonym for regenerate_uuid""" self.regenerate_uuid()
[docs] def creation_time(self): """Return the date and time when this was created""" return self._request_datetime
[docs] def login_time(self): """Return the date and time when the user logged in. This returns None if the user has not yet logged in""" return self._login_datetime
[docs] def logout_time(self): """Return the date and time when the user logged out. This returns None if the user has not yet logged out""" return self._logout_datetime
[docs] def hours_since_creation(self): """Return the number of hours since this request was created. This will return a float, so 1 second is 1 / 3600th of an hour""" if self._request_datetime: from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now delta = _get_datetime_now() - self._request_datetime return delta.total_seconds() / 3600.0 else: return 0
[docs] def status(self): """Return the status of this login session""" return self._status
[docs] def set_suspicious(self): """Put this login session into a suspicious state. This will be because weird activity has been detected which indicates that the session may be have been cracked. A login session in a suspicious state should not be granted any permissions. """ if self._uid: self._status = "suspicious"
[docs] def set_approved(self): """Register that this request has been approved""" if self._uid: if self._pubkey is None or self._pubcert is None: from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot approve a login session " "that doesn't have a valid public key or certificate. " "Perhaps you have already denied the authorisation " "or logged out?") if self.status() != "unapproved": from Acquire.Identity import LoginSessionError raise LoginSessionError( "You cannot approve a login session " "that is not in the 'unapproved' state. This login " "session is in the '%s' state." % self.status()) from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._login_datetime = _get_datetime_now() self._status = "approved"
def _clear_keys(self): """Function called to remove all keys from this session, as it has now been terminated (and so the keys are no longer valid) """ self._pubkey = None
[docs] def set_denied(self): """Register that this request has been denied""" if self._uid: self._status = "denied" self._clear_keys() # also clear the certificate self._pubcert = None
[docs] def set_logged_out(self): """Register that this request has been closed as the user has logged out""" if self._uid: from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now self._status = "logged_out" self._logout_datetime = _get_datetime_now() self._clear_keys()
[docs] def login(self): """Convenience function to set the session into the logged in state""" self.set_approved()
[docs] def logout(self): """Convenience function to set the session into the logged out state""" self.set_logged_out()
[docs] def is_suspicious(self): """Return whether or not this session is suspicious""" if self._status: return self._status == "suspicious" else: return False
[docs] def is_approved(self): """Return whether or not this session is open and approved by the user""" if self._status: return self._status == "approved" else: return False
[docs] def is_logged_out(self): """Return whether or not this session has logged out""" if self._status: return self._status == "logged_out" else: return False
[docs] def login_message(self): """Return any login message that has been set by the user""" return self._login_message
[docs] def hostname(self): """Return the user-supplied hostname of the host making the login request""" return self._hostname
[docs] def to_data(self): """Return a data version (dictionary) of this LoginSession""" if self._uid is None: return None from Acquire.ObjectStore import datetime_to_string \ as _datetime_to_string data = {} data["uid"] = self._uid data["request_datetime"] = _datetime_to_string(self._request_datetime) try: data["login_datetime"] = _datetime_to_string(self._login_datetime) except: data["login_datetime"] = None try: data["logout_datetime"] = _datetime_to_string( self._logout_datetime) except: data["logout_datetime"] = None if self._pubkey: data["public_key"] = self._pubkey.to_data() if self._pubcert: data["public_certificate"] = self._pubcert.to_data() data["status"] = self._status data["ipaddr"] = self._ipaddr data["hostname"] = self._hostname data["login_message"] = self._login_message return data
[docs] @staticmethod def from_data(data): """Return a LoginSession constructed from the passed data (dictionary) """ if data is None: return None try: from Acquire.ObjectStore import string_to_datetime \ as _string_to_datetime from Acquire.Crypto import PublicKey as _PublicKey logses = LoginSession() try: logses._uid = data["uid"] except: logses._uid = data["uuid"] # for backward compatibility logses._request_datetime = _string_to_datetime( data["request_datetime"]) try: logses._login_datetime = _string_to_datetime( data["login_datetime"]) except: logses._login_datetime = None try: logses._logout_datetime = _string_to_datetime( data["logout_datetime"]) except: logses._logout_datetime = None try: logses._pubkey = _PublicKey.from_data(data["public_key"]) except: logses._pubkey = None try: logses._pubcert = _PublicKey.from_data( data["public_certificate"]) except: logses._pubcert = None logses._status = data["status"] logses._ipaddr = data["ipaddr"] logses._hostname = data["hostname"] logses._login_message = data["login_message"] return logses except Exception as e: from Acquire.Service import exception_to_string from Acquire.Identity import LoginSessionError raise LoginSessionError( "Cannot load the LoginSession from " "the object store?\n\nCAUSE: %s" % (exception_to_string(e)))