Source code for Acquire.Client._wallet


# use a variable so we can monkey-patch while testing
_input = input

__all__ = ["Wallet"]


def _get_wallet_dir():
    """Function that can be mocked in testing to ensure we don't
       mess with the user's main wallet
    """
    import os as _os
    home = _os.path.expanduser("~")
    return "%s/.acquire_wallet" % home


def _get_wallet_password(confirm_password=False):
    """Function that can be mocked in testing to ensure we don't
       mess with the user's main wallet
    """
    import getpass as _getpass
    password = _getpass.getpass(
                prompt="Please enter a password to encrypt your wallet: ")

    if not confirm_password:
        return password

    password2 = _getpass.getpass(
                    prompt="Please confirm the password: ")

    if password != password2:
        _output("The passwords don't match. Please try again.")
        return _get_wallet_password(confirm_password=confirm_password)

    return password


def _output(s, end=None):
    """Simple output function that can be replaced at testing
       to silence the wallet
    """
    if end is None:
        print(s)
    else:
        print(s, end=end)


def _flush_output():
    """Flush STDOUT"""
    try:
        import sys as _sys
        _sys.stdout.flush()
    except:
        pass


def _read_json(filename):
    """Return a json-decoded dictionary from the data written
       to 'filename'
    """
    import json as _json
    with open(filename, "rb") as FILE:
        s = FILE.read().decode("utf-8")

        return _json.loads(s)


def _write_json(data, filename):
    """Write the passed json-encodable dictionary to 'filename'"""
    import json as _json
    s = _json.dumps(data)
    with open(filename, "wb") as FILE:
        FILE.write(s.encode("utf-8"))


def _read_service(filename):
    """Read and return the service written to 'filename'"""
    from Acquire.Client import Service as _Service
    return _Service.from_data(_read_json(filename))


def _write_service(service, filename):
    """Write the passed service to 'filename'"""
    _write_json(service.to_data(), filename)


def _could_match(userinfo, username, password):
    if username is None:
        return True

    if "username" not in userinfo:
        return False

    if userinfo["username"] == username:
        if password is None:
            return True

        if "password" in userinfo:
            if userinfo["password"] == password:
                return True

    return False


[docs]class Wallet: """This class holds a wallet that can be used to simplify sending passwords and one-time-password (OTP) codes to an acquire identity service. This holds a wallet of passwords and (optionally) OTP secrets that are encrypted using a local keypair that is unlocked by a password supplied by the user locally. By default this will create the wallet in your home directory ($HOME/.acquire_wallet). If you want the wallet to be saved in a different directory, specify that as "wallet_dir". """ def __init__(self): """Construct a wallet to hold all user credentials""" from Acquire.Service import is_running_service \ as _is_running_service if _is_running_service(): from Acquire.Service import get_this_service \ as _get_this_service service = _get_this_service(need_private_access=False) raise PermissionError( "You cannot open a Wallet on a running Service (%s)" % service) self._wallet_key = None self._service_info = {} import os as _os wallet_dir = _get_wallet_dir() if not _os.path.exists(wallet_dir): _os.makedirs(wallet_dir, mode=0o700, exist_ok=False) elif not _os.path.isdir(wallet_dir): raise TypeError("The wallet directory must be a directory!") self._wallet_dir = wallet_dir def _create_wallet_key(self, filename): """Create a new wallet key for the user""" password = _get_wallet_password(confirm_password=True) from Acquire.Client import PrivateKey as _PrivateKey key = _PrivateKey() bytes = key.bytes(password) with open(filename, "wb") as FILE: FILE.write(bytes) return key def _get_wallet_key(self): """Return the private key used to encrypt everything in the wallet. This will ask for the users password """ if self._wallet_key: return self._wallet_key wallet_dir = self._wallet_dir keyfile = "%s/wallet_key.pem" % wallet_dir import os as _os if not _os.path.exists(keyfile): self._wallet_key = self._create_wallet_key(filename=keyfile) return self._wallet_key # read the keyfile and decrypt with open(keyfile, "rb") as FILE: bytes = FILE.read() wallet_key = None from Acquire.Client import PrivateKey as _PrivateKey # get the user password for _ in range(0, 5): password = _get_wallet_password() try: wallet_key = _PrivateKey.read_bytes(bytes, password) except: _output("Invalid password. Please try again.") if wallet_key: break if wallet_key is None: raise PermissionError("Too many failed password attempts...") self._wallet_key = wallet_key return wallet_key def _get_userinfo_filename(self, user_uid, identity_uid): """Return the filename for the passed user_uid logging into the passed identity service with identity_uid """ assert(user_uid is not None) assert(identity_uid is not None) return "%s/user_%s_%s_encrypted" % ( self._wallet_dir, user_uid, identity_uid) def _set_userinfo(self, userinfo, user_uid, identity_uid): """Save the userfile for the passed user_uid logging into the passed identity service with identity_uid """ from Acquire.ObjectStore import bytes_to_string as _bytes_to_string import json as _json filename = self._get_userinfo_filename(user_uid=user_uid, identity_uid=identity_uid) key = self._get_wallet_key().public_key() data = _bytes_to_string(key.encrypt(_json.dumps(userinfo))) userinfo = {"username": userinfo["username"], "user_uid": user_uid, "data": data} _write_json(data=userinfo, filename=filename) def _unlock_userinfo(self, userinfo): """Function used to unlock (decrypt) the passed userinfo""" from Acquire.ObjectStore import string_to_bytes as _string_to_bytes import json as _json key = self._get_wallet_key() data = _string_to_bytes(userinfo["data"]) result = _json.loads(key.decrypt(data)) result["user_uid"] = userinfo["user_uid"] return result def _get_userinfo(self, user_uid, identity_uid): """Read all info for the passed user at the identity service reached at 'identity_url' """ filename = self._get_userinfo_filename(user_uid=user_uid, identity_uid=identity_uid) userinfo = _read_json(filename=filename) return self._unlock_userinfo(userinfo) def _find_userinfo(self, username=None, password=None): """Function to find a user_info automatically, of if that fails, to ask the user """ wallet_dir = self._wallet_dir import glob as _glob userfiles = _glob.glob("%s/user_*_encrypted" % wallet_dir) userinfos = [] for userfile in userfiles: try: userinfo = _read_json(userfile) if _could_match(userinfo, username, password): userinfos.append((userinfo["username"], userinfo)) except: pass userinfos.sort(key=lambda x: x[0]) if len(userinfos) == 1: return self._unlock_userinfo(userinfos[0][1]) if len(userinfos) == 0: if username is None: username = _input("Please type your username: ") userinfo = {"username": username} if password is not None: userinfo["password"] = password return userinfo _output("Please choose the account by typing in its number, " "or type a new username if you want a different account.") for (i, (username, userinfo)) in enumerate(userinfos): _output("[%d] %s {%s}" % (i+1, username, userinfo["user_uid"])) max_tries = 5 for i in range(0, max_tries): reply = _input( "\nMake your selection (1 to %d) " % (len(userinfos)) ) try: idx = int(reply) - 1 except: idx = None if idx is None: # interpret this as a username return self._find_userinfo(username=reply, password=password) elif idx < 0 or idx >= len(userinfos): _output("Invalid account.") else: return self._unlock_userinfo(userinfos[idx][1]) if i < max_tries-1: _output("Try again...") userinfo = {} if username is not None: userinfo["username"] = username return userinfo def _get_user_password(self, userinfo): """Get the user password for the passed user on the passed identity_url """ if "password" in userinfo: return userinfo["password"] else: import getpass as _getpass password = _getpass.getpass( prompt="Please enter the login password: ") userinfo["password"] = password return password def _get_otpcode(self, userinfo): """Get the OTP code""" if "otpsecret" in userinfo: from Acquire.Client import OTP as _OTP otp = _OTP(userinfo["otpsecret"]) return otp.generate() else: import getpass as _getpass return _getpass.getpass( prompt="Please enter the one-time-password code: ")
[docs] def get_services(self): """Return all of the trusted services known to this wallet""" import glob as _glob service_files = _glob.glob("%s/service_*" % self._wallet_dir) services = [] for service_file in service_files: services.append(_read_service(service_file)) return services
[docs] def get_service(self, service_url=None, service_uid=None, service_type=None, autofetch=True): """Return the service at either 'service_url', or that has UID 'service_uid'. This will return the cached service if it exists, or will add a new service if we are able to validate it from a trusted registry """ from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring service = None if service_url is None: if service_uid is None: raise PermissionError( "You need to specify one of service_uid or service_url") # we need to look up the name... import glob as _glob service_files = _glob.glob("%s/service_*" % self._wallet_dir) for service_file in service_files: s = _read_service(service_file) if s.uid() == service_uid: service = s break else: from Acquire.Service import Service as _Service service_url = _Service.get_canonical_url(service_url, service_type=service_type) service_file = "%s/service_%s" % ( self._wallet_dir, _string_to_safestring(service_url)) try: service = _read_service(service_file) except: pass must_write = False if service is None: if not autofetch: from Acquire.Service import ServiceError raise ServiceError("No service at %s:%s" % (service_url, service_uid)) # we need to look this service up from the registry from Acquire.Registry import get_trusted_registry_service \ as _get_trusted_registry_service _output("Connecting to registry...") _flush_output() registry = _get_trusted_registry_service(service_uid=service_uid, service_url=service_url) _output("...connected to registry %s" % registry) _flush_output() if service_url is not None: _output("Securely fetching keys for %s..." % service_url) _flush_output() else: _output("Securely fetching keys for UID %s..." % service_uid) _flush_output() service = registry.get_service(service_url=service_url, service_uid=service_uid) _output("...success.\nFetched %s" % service) _flush_output() must_write = True # check if the keys need rotating - if they do, load up # the new keys and save them to the service file... if service.should_refresh_keys(): service.refresh_keys() must_write = True if service_uid is not None: if service.uid() != service_uid: raise PermissionError( "Disagreement over the service UID for '%s' (%s)" % (service, service_uid)) if must_write: service_file = "%s/service_%s" % ( self._wallet_dir, _string_to_safestring(service.canonical_url())) _write_service(service=service, filename=service_file) return service
[docs] def remove_all_services(self): """Remove all trusted services from this Wallet""" import glob as _glob import os as _os service_files = _glob.glob("%s/service_*" % self._wallet_dir) for service_file in service_files: if _os.path.exists(service_file): _os.unlink(service_file)
[docs] def remove_service(self, service): """Remove the cached service info for the passed service""" if isinstance(service, str): service_url = service else: service_url = service.canonical_url() from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring service_file = "%s/service_%s" % ( self._wallet_dir, _string_to_safestring(service_url)) import os as _os if _os.path.exists(service_file): _os.unlink(service_file)
[docs] def send_password(self, url, username=None, password=None, otpcode=None, remember_password=True, remember_device=None, dryrun=None): """Send a password and one-time code to the supplied login url""" if not remember_password: remember_device = False # the login URL is http[s]://something.com?id=XXXX/YY.YY.YY.YY # where XXXX is the service_uid of the service we should # connect with, and YY.YY.YY.YY is the short_uid of the login try: from urllib.parse import urlparse as _urlparse from urllib.parse import parse_qs as _parse_qs idcode = _parse_qs(_urlparse(url).query)["id"][0] except Exception as e: from Acquire.Client import LoginError raise LoginError( "Cannot identify the session or service information from " "the login URL '%s'. This should have id=XX-XX/YY.YY.YY.YY " "as a query parameter. <%s> %s" % (url, e.__class__.__name__, str(e))) try: (service_uid, short_uid) = idcode.split("/") except: from Acquire.Client import LoginError raise LoginError( "Cannot extract the service_uid and short_uid from the " "login ID code '%s'. This should be in the format " "XX-XX/YY.YY.YY.YY" % idcode) # now get the service try: service = self.get_service(service_uid=service_uid) except Exception as e: from Acquire.Client import LoginError raise LoginError( "Cannot find the service with UID %s: <%s> %s" % (service_uid, e.__class__.__name__, str(e))) if not service.can_identify_users(): from Acquire.Client import LoginError raise LoginError( "Service '%s' is unable to identify users! " "You cannot log into something that is not " "a valid identity service!" % (service)) userinfo = self._find_userinfo(username=username, password=password) if username is None: username = userinfo["username"] if "user_uid" in userinfo: user_uid = userinfo["user_uid"] else: user_uid = None _output("Logging in using username '%s'" % username) try: device_uid = userinfo["device_uid"] except: device_uid = None if password is None: password = self._get_user_password(userinfo=userinfo) if otpcode is None: otpcode = self._get_otpcode(userinfo=userinfo) else: # user is providing the primary OTP, so this is not a device device_uid = None _output("\nLogging in to '%s', session '%s'..." % ( service.canonical_url(), short_uid), end="") _flush_output() if dryrun: print("Calling %s with username=%s, password=%s, otpcode=%s, " "remember_device=%s, device_uid=%s, short_uid=%s " "user_uid=%s" % (service.canonical_url(), username, password, otpcode, remember_device, device_uid, short_uid, user_uid)) return try: from Acquire.Client import Credentials as _Credentials creds = _Credentials(username=username, password=password, otpcode=otpcode, short_uid=short_uid, device_uid=device_uid) args = {"credentials": creds.to_data(identity_uid=service.uid()), "user_uid": user_uid, "remember_device": remember_device, "short_uid": short_uid} response = service.call_function(function="login", args=args) _output("SUCCEEDED!") _flush_output() except Exception as e: _output("FAILED!") _flush_output() from Acquire.Client import LoginError raise LoginError("Failed to log in. %s" % e.args) if not remember_password: return try: returned_user_uid = response["user_uid"] if returned_user_uid != user_uid: # change of user? userinfo = {} user_uid = returned_user_uid except: # no user_uid, so nothing to save return if user_uid is None: # can't save anything return userinfo["username"] = username userinfo["password"] = password try: userinfo["device_uid"] = response["device_uid"] except: pass try: userinfo["otpsecret"] = response["otpsecret"] except: pass self._set_userinfo(userinfo=userinfo, user_uid=user_uid, identity_uid=service.uid())