Source code for Acquire.Client._user


from enum import Enum as _Enum

__all__ = ["User"]


class _LoginStatus(_Enum):
    EMPTY = 0
    LOGGING_IN = 1
    LOGGED_IN = 2
    LOGGED_OUT = 3
    ERROR = 4


def _output(s, end=None):
    """Simple output function that can be removed during testing"""
    if end is None:
        print(s)
    else:
        print(s, end=end)


def _get_identity_url():
    """Function to discover and return the default identity url"""
    return "fn.acquire-aaai.com"


def _get_identity_service(identity_url=None):
    """Function to return the identity service for the system"""
    if identity_url is None:
        identity_url = _get_identity_url()

    from Acquire.Service import is_running_service as _is_running_service
    if _is_running_service():
        from Acquire.Service import get_trusted_service \
            as _get_trusted_service
        return _get_trusted_service(service_url=identity_url,
                                    service_type='identity')

    from Acquire.Client import LoginError

    try:
        from Acquire.Client import Wallet as _Wallet
        wallet = _Wallet()
        service = wallet.get_service(service_url=identity_url,
                                     service_type="identity")
    except Exception as e:
        from Acquire.Service import exception_to_string
        raise LoginError("Have not received the identity service info from "
                         "the identity service at '%s'\n\nCAUSE: %s" %
                         (identity_url, exception_to_string(e)))

    if not service.can_identify_users():
        raise LoginError(
            "You can only use a valid identity service to log in! "
            "The service at '%s' is a '%s'" %
            (identity_url, service.service_type()))

    return service


def _get_random_sentence():
    """This function generates and returns a random (nonsense)
       sentence
    """
    adjs = ["white", "black", "fluffy", "small", "big"]
    animals = ["cows", "sheep", "mice", "dogs", "cats"]
    verbs = ["walk", "run", "stand", "climb", "sit"]
    advs = ["slowly", "quickly", "quietly", "noisily", "happily"]

    import random as _random

    adj = adjs[_random.randint(0, len(adjs)-1)]
    animal = animals[_random.randint(0, len(animals)-1)]
    verb = verbs[_random.randint(0, len(verbs)-1)]
    adv = advs[_random.randint(0, len(advs)-1)]

    return "%s %s %s %s" % (adj, animal, verb, adv)


[docs]class User: """This class holds all functionality that would be used by a user to authenticate with and access the service. This represents a single client login, and is the user-facing part of Acquire """ def __init__(self, username=None, identity_url=None, identity_uid=None, scope=None, permissions=None, auto_logout=True): """Construct the user with specified 'username', who will login to the identity service at specified URL 'identity_url', or with UID 'identity_uid'. You can optionally request a user with a limited scope or limited permissions. Service lookup will be using you wallet. By default, the user will logout when this object is destroyed. Prevent this behaviour by setting auto_logout to False """ self._username = username self._status = _LoginStatus.EMPTY self._identity_service = None self._scope = scope self._permissions = permissions if identity_url: self._identity_url = identity_url if identity_uid: self._identity_uid = identity_uid else: self._identity_uid = None self._user_uid = None if auto_logout: self._auto_logout = True else: self._auto_logout = False def __str__(self): return "User(name='%s', status=%s)" % (self.username(), self.status()) def __enter__(self): """Enter function used by 'with' statements'""" pass def __exit__(self, exception_type, exception_value, traceback): """Ensure that we logout""" if self._auto_logout: self.logout() def __del__(self): """Make sure that we log out before deleting this object""" if self._auto_logout: self.logout() def _set_status(self, status): """Internal function used to set the status from the string obtained from the LoginSession """ if status == "approved": self._status = _LoginStatus.LOGGED_IN elif status == "denied": self._set_error_state("Permission to log in was denied!") elif status == "logged_out": self._status = _LoginStatus.LOGGED_OUT
[docs] def is_null(self): """Return whether or not this user is null""" return self._username is None
[docs] def username(self): """Return the username of the user""" return self._username
[docs] def uid(self): """Return the UID of this user. This uniquely identifies the user across all systems """ if self.is_null(): return None if self._user_uid is None: raise PermissionError( "You cannot get the user UID until after you have logged in") return self._user_uid
[docs] def guid(self): """Return the global UID of the user. While the UID is highly likely to be unique, the GUID should be globally guaranteed to be unique. This is ensured by combining the UID of the user with the UID of the identity service that primarily identifies the user """ return "%s@%s" % (self.uid(), self.identity_service().uid())
[docs] def status(self): """Return the current status of this user""" return self._status
def _check_for_error(self): """Call to ensure that this object is not in an error state. If it is in an error state then raise an exception""" if self._status == _LoginStatus.ERROR: from Acquire.Client import LoginError raise LoginError(self._error_string) def _set_error_state(self, message): """Put this object into an error state, displaying the passed message if anyone tries to use this object""" self._status = _LoginStatus.ERROR self._error_string = message
[docs] def session_key(self): """Return the session key for the current login session""" self._check_for_error() try: return self._session_key except: return None
[docs] def signing_key(self): """Return the signing key used for the current login session""" self._check_for_error() try: return self._signing_key except: return None
[docs] def identity_service(self): """Return the identity service info object for the identity service used to validate the identity of this user """ if self._identity_service: return self._identity_service identity_service = _get_identity_service( identity_url=self.identity_service_url()) # if the user supplied the UID then validate this is correct # pylint: disable=assignment-from-none if self._identity_uid: if identity_service.uid() != self._identity_uid: from Acquire.Client import LoginError raise LoginError( "The UID of the identity service at '%s', which is " "%s, does not match that supplied by the user, '%s'. " "You should double-check that the UID is correct, or " "that you have supplied the correct identity_url" % (self.identity_service_url(), identity_service.uid(), self._identity_uid)) else: self._identity_uid = identity_service.uid() # pylint: enable=assignment-from-none self._identity_service = identity_service return self._identity_service
[docs] def identity_service_uid(self): """Return the UID of the identity service. The combination of user_uid+service_uid should uniquely identify this user account anywhere in the world """ if self._identity_uid is not None: return self._identity_uid else: return self._identity_service.uid()
[docs] def identity_service_url(self): """Return the URL to the identity service. This is the full URL to the service, minus the actual function to be called, e.g. https://function_service.com/t/identity """ self._check_for_error() try: return self._identity_url except: # return the default URL - this should be discovered... return _get_identity_url()
[docs] def login_url(self): """Return the URL that the user must connect to to authenticate this login session""" self._check_for_error() try: return self._login_url except: return None
[docs] def login_qr_code(self): """Return a QR code of the login URL that the user must connect to to authenticate this login session""" from Acquire.Client import create_qrcode as _create_qrcode return _create_qrcode(self._login_url)
[docs] def scope(self): """The scope under which this login was requested (and is thus valid) """ return self._scope
[docs] def permissions(self): """The permissions with which this login was requested, (and are thus provided) """ return self._permissions
[docs] def session_uid(self): """Return the UID of the current login session. Returns None if there is no valid login session""" self._check_for_error() try: return self._session_uid except: return None
[docs] def is_empty(self): """Return whether or not this is an empty login (so has not been used for anything yet...""" return self._status == _LoginStatus.EMPTY
[docs] def is_logged_in(self): """Return whether or not the user has successfully logged in""" return self._status == _LoginStatus.LOGGED_IN
[docs] def is_logging_in(self): """Return whether or not the user is in the process of loggin in""" return self._status == _LoginStatus.LOGGING_IN
[docs] def logout(self): """Log out from the current session""" if self.is_logged_in() or self.is_logging_in(): service = self.identity_service() args = {"session_uid": self._session_uid} if self.is_logged_in(): from Acquire.Client import Authorisation as _Authorisation authorisation = _Authorisation( resource="logout %s" % self._session_uid, user=self) args["authorisation"] = authorisation.to_data() else: # we are not fully logged in so cannot generate an # authorisation for the logout from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string resource = "logout %s" % self._session_uid signature = self.signing_key().sign(resource) args["signature"] = _bytes_to_string(signature) result = service.call_function(function="logout", args=args) self._status = _LoginStatus.LOGGED_OUT return result
[docs] @staticmethod def register(username, password, identity_url=None): """Request to register a new user with the specified username one the identity service running at 'identity_url', using the supplied 'password'. This will return a QR code that you must use immediately to add this user on the identity service to a QR code generator""" service = _get_identity_service(identity_url=identity_url) from Acquire.Client import Credentials as _Credentials encoded_password = _Credentials.encode_password( identity_uid=service.uid(), password=password) args = {"username": username, "password": encoded_password} result = service.call_function(function="register", args=args) try: provisioning_uri = result["provisioning_uri"] except: from Acquire.Client import UserError raise UserError( "Cannot register the user '%s' on " "the identity service at '%s'!" % (username, identity_url)) # return a QR code for the provisioning URI result = {} result["provisioning_uri"] = provisioning_uri try: import re otpsecret = re.search(r"secret=([\w\d+]+)&issuer", provisioning_uri).groups()[0] result["otpsecret"] = otpsecret except: pass try: from Acquire.Client import create_qrcode as _create_qrcode result["qrcode"] = _create_qrcode(provisioning_uri) except: pass return result
[docs] def request_login(self, login_message=None): """Request to authenticate as this user. This returns a login URL that you must connect to to supply your login credentials If 'login_message' is supplied, then this is passed to the identity service so that it can be displayed when the user accesses the login page. This helps the user validate that they have accessed the correct login page. Note that if the message is None, then a random message will be generated. """ self._check_for_error() from Acquire.Client import LoginError if not self.is_empty(): raise LoginError("You cannot try to log in twice using the same " "User object. Create another object if you want " "to try to log in again.") # first, create a private key that will be used # to sign all requests and identify this login from Acquire.Client import PrivateKey as _PrivateKey session_key = _PrivateKey() signing_key = _PrivateKey() args = {"username": self._username, "public_key": session_key.public_key().to_data(), "public_certificate": signing_key.public_key().to_data(), "scope": self._scope, "permissions": self._permissions } # get information from the local machine to help # the user validate that the login details are correct try: hostname = _socket.gethostname() ipaddr = _socket.gethostbyname(hostname) args["hostname"] = hostname args["ipaddr"] = ipaddr except: pass if login_message is None: try: login_message = _get_random_sentence() except: pass if login_message is not None: args["login_message"] = login_message identity_service = self.identity_service() result = identity_service.call_function( function="request_login", args=args) try: login_url = result["login_url"] except: login_url = None if login_url is None: error = "Failed to login. Could not extract the login URL! " \ "Result is %s" % (str(result)) self._set_error_state(error) raise LoginError(error) try: session_uid = result["session_uid"] except: session_uid = None if session_uid is None: error = "Failed to login. Could not extract the login " \ "session UID! Result is %s" % (str(result)) self._set_error_state(error) raise LoginError(error) # now save all of the needed data self._login_url = result["login_url"] self._session_key = session_key self._signing_key = signing_key self._session_uid = session_uid self._status = _LoginStatus.LOGGING_IN self._user_uid = None _output("Login by visiting: %s" % self._login_url) if login_message is not None: _output("(please check that this page displays the message '%s')" % login_message) from Acquire.Identity import LoginSession as _LoginSession return {"login_url": self._login_url, "session_uid": session_uid, "short_uid": _LoginSession.to_short_uid(session_uid)}
def _poll_session_status(self): """Function used to query the identity service for this session to poll for the session status""" from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string service = self.identity_service() args = {"session_uid": self._session_uid} result = service.call_function(function="get_session_info", args=args) # now update the status... status = result["session_status"] self._set_status(status) if self.is_logged_in(): if self._user_uid is None: user_uid = result["user_uid"] assert(user_uid is not None) self._user_uid = user_uid
[docs] def wait_for_login(self, timeout=None, polling_delta=5): """Block until the user has logged in. If 'timeout' is set then we will wait for a maximum of that number of seconds This will check whether we have logged in by polling the identity service every 'polling_delta' seconds. """ self._check_for_error() if not self.is_logging_in(): return self.is_logged_in() polling_delta = int(polling_delta) if polling_delta > 60: polling_delta = 60 elif polling_delta < 1: polling_delta = 1 import time as _time if timeout is None: # block forever.... while True: self._poll_session_status() if self.is_logged_in(): return True elif not self.is_logging_in(): return False _time.sleep(polling_delta) else: # only block until the timeout has been reached timeout = int(timeout) if timeout < 1: timeout = 1 from Acquire.ObjectStore import get_datetime_now \ as _get_datetime_now start_time = _get_datetime_now() while (_get_datetime_now() - start_time).seconds < timeout: self._poll_session_status() if self.is_logged_in(): return True elif not self.is_logging_in(): return False _time.sleep(polling_delta) return False