Source code for Acquire.Client._wallet


from Acquire.Stubs import lazy_import as _lazy_import

import os as _os
import sys as _sys
import json as _json

_getpass = _lazy_import.lazy_module("getpass")
_glob = _lazy_import.lazy_module("glob")
_re = _lazy_import.lazy_module("re")

# use a variable so we can monkey-patch while testing
_input = input

# whether or not we are in testing mode
_is_testing = False

__all__ = ["Wallet"]


def _read_json(filename):
    """Return a json-decoded dictionary from the data written
       to 'filename'
    """
    global _is_testing
    if _is_testing:
        return {}

    with open(filename, "rb") as FILE:
        s = FILE.read().decode("utf-8")
        return _json.loads(s)


def _write_json(data, filename):
    """Write the passed json-encodable dictionary to 'filename'"""
    global _is_testing
    if _is_testing:
        return

    s = _json.dumps(data)
    with open(filename, "wb") as FILE:
        FILE.write(s.encode("utf-8"))


def _read_service(filename):
    """Read and return the service written to 'filename'"""
    from Acquire.Client import Service as _Service
    return _Service.from_data(_read_json(filename))


def _write_service(service, filename):
    """Write the passed service to 'filename'"""
    _write_json(service.to_data(), filename)


[docs]class Wallet: """This class holds a wallet that can be used to simplify sending passwords and one-time-password (OTP) codes to an acquire identity service. This holds a wallet of passwords and (optionally) OTP secrets that are encrypted using a local keypair that is unlocked by a password supplied by the user locally. """ def __init__(self): self._wallet_key = None self._get_wallet_key() self._cache = {} self._service_info = {} self._manual_password = False self._manual_otpcode = False @staticmethod def _wallet_dir(): """Directory containing all of the wallet files""" homedir = _os.path.expanduser("~") walletdir = "%s/.acquire" % homedir if not _os.path.exists(walletdir): _os.mkdir(walletdir) return walletdir def _create_wallet_key(self, filename): """Create a new wallet key for the user""" password = _getpass.getpass( prompt="Please enter a password to encrypt your wallet: ") from Acquire.Client import PrivateKey as _PrivateKey key = _PrivateKey() bytes = key.bytes(password) password2 = _getpass.getpass(prompt="Please confirm the password: ") if password != password2: print("The passwords don't match. Please try again.") self._create_wallet_key(filename) return # the passwords match - write this to the file with open(filename, "wb") as FILE: FILE.write(bytes) return key def _get_wallet_key(self): """Return the private key used to encrypt everything in the wallet. This will ask for the users password """ if self._wallet_key: return self._wallet_key wallet_dir = Wallet._wallet_dir() keyfile = "%s/wallet_key.pem" % wallet_dir if not _os.path.exists(keyfile): self._wallet_key = self._create_wallet_key(keyfile) return self._wallet_key # read the keyfile and decrypt with open(keyfile, "rb") as FILE: bytes = FILE.read() # get the user password wallet_key = None while not wallet_key: password = _getpass.getpass( prompt="Please enter your wallet password: ") try: from Acquire.Client import PrivateKey as _PrivateKey wallet_key = _PrivateKey.read_bytes(bytes, password) except: print("Invalid password. Please try again.") self._wallet_key = wallet_key return wallet_key @staticmethod def _get_userfile(username, identity_url): """Return the userfile for the passed username logging into the passed identity url""" from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring return "%s/user_%s_%s_encrypted" % ( Wallet._wallet_dir(), _string_to_safestring(username), _string_to_safestring(identity_url))
[docs] @staticmethod def remove_user_info(username): """Call this function to remove the userinfo associated with the account 'username' for all identity services """ wallet_dir = Wallet._wallet_dir() userfiles = _glob.glob("%s/user_*_encrypted" % wallet_dir) for userfile in userfiles: try: userinfo = self._read_userfile(userfile) if userinfo["username"] == username: _os.unlink(userfile) except: pass
def _read_userfile(self, filename): """Read all info from the passed userfile""" try: return self._cache[filename] except: pass if not _os.path.exists(filename): return data = _read_json(filename) from Acquire.ObjectStore import string_to_bytes \ as _string_to_bytes try: data["password"] = _string_to_bytes(data["password"]) except: pass try: data["otpsecret"] = _string_to_bytes(data["otpsecret"]) except: pass self._cache[filename] = data return data def _read_userinfo(self, username, identity_url): """Read all info for the passed user at the identity service reached at 'identity_url'""" return self._read_userfile(Wallet._get_userfile(username, identity_url)) def _get_username(self): """Function to find a username automatically, of if that fails, to ask the user """ wallet_dir = Wallet._wallet_dir() userfiles = _glob.glob("%s/user_*_encrypted" % wallet_dir) userinfos = {} from Acquire.ObjectStore import safestring_to_string \ as _safestring_to_string for userfile in userfiles: try: userinfo = self._read_userfile(userfile) username = _safestring_to_string(userinfo["username"]) userinfos[username] = userinfo except: pass usernames = list(userinfos.keys()) usernames.sort() if len(usernames) == 1: return usernames[0] if len(usernames) == 0: return _input("Please type your username: ") print("Please choose the account by typing in its number, " "or type a new username if you want a different account.") for (i, username) in enumerate(usernames): print("[%d] %s" % (i+1, username)) username = None while not username: print("Make your selection: ", end="") reply = _input( "Make your selection (1 to %d) " % (len(usernames)) ) try: idx = int(reply) - 1 if idx < 0 or idx >= len(usernames): print("Invalid account. Try again...") else: username = usernames[idx] except: username = None return username def _get_user_password(self, username, identity_url): """Get the user password for the passed user on the passed identity_url. If remember_device then save the password in the wallet if it is not already there """ try: userinfo = self._read_userinfo(username, identity_url) if userinfo: password = userinfo["password"] self._manual_password = False # this needs to be decrypted return self._wallet_key.decrypt(password) except: pass self._manual_password = True return _getpass.getpass(prompt="Please enter the login password: ") def _get_otpcode(self, username, identity_url): """Get the OTP code for the passed user on the passed identity service url""" try: userinfo = self._read_userinfo(username, identity_url) if userinfo: from Acquire.Client import OTP as _OTP secret = self._wallet_key.decrypt(userinfo["otpsecret"]) device_uid = userinfo["device_uid"] self._manual_otpcode = False self._device_uid = device_uid return _OTP(secret=secret).generate() except: pass self._manual_otpcode = True self._device_uid = None return _getpass.getpass( prompt="Please enter the one-time-password code: ")
[docs] @staticmethod def add_service(service): """Add a cached service info for the passed service. If it already exists, then this verifies that the added service is the same as the previously-seen service """ global _is_testing if _is_testing: return service from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring service_file = "%s/service_%s" % ( Wallet._wallet_dir(), _string_to_safestring(service.canonical_url())) existing_service = None try: existing_service = _read_service(service_file) except: pass if existing_service is not None: if service.validation_string() == \ existing_service.validation_string(): return service elif service.is_evolution_of(existing_service): # the service has evolved - this is ok _write_service(service, service_file) return service else: reply = _input( "This is a service you have seen before, but " "it has changed?\n\n" "URL = %s (%s)\n" "UID = %s (%s)\n" "public_key fingerprint = %s (%s)\n" "public_certificate fingerprint = %s (%s)\n\n" "verification string = %s (%s)\n\n" "\nDo you trust this updated service? y/n " % (service.canonical_url(), existing_service.canonical_url(), service.uid(), existing_service.uid(), service.public_key().fingerprint(), existing_service.public_key().fingerprint(), service.public_certificate().fingerprint(), existing_service.public_certificate().fingerprint(), service.validation_string(), existing_service.validation_string()) ) if reply[0].lower() == 'y': print("Now trusting %s" % str(service)) else: print("Not trusting this service!") raise PermissionError( "We do not trust the service '%s'" % str(service)) # We trust the service, so save this for future reference _write_service(service, service_file) return service reply = _input( "This is a new service that you have not seen before.\n\n" "URL = %s\n" "UID = %s\n" "public_key fingerprint = %s\n" "public_certificate fingerprint = %s\n\n" "verification string = %s\n\n" "\nDo you trust this service? y/n " % (service.canonical_url(), service.uid(), service.public_key().fingerprint(), service.public_certificate().fingerprint(), service.validation_string()) ) if reply[0].lower() == 'y': print("Now trusting %s" % str(service)) else: print("Not trusting this service!") raise PermissionError( "We do not trust the service '%s'" % str(service)) # We trust the service, so save this for future reference _write_service(service, service_file) return service
[docs] @staticmethod def get_services(): """Return all of the trusted services known to this wallet""" global _is_testing if _is_testing: return [] service_files = _glob.glob("%s/service_*" % Wallet._wallet_dir()) services = [] for service_file in service_files: services.append(_read_service(service_file)) return services
[docs] @staticmethod def get_service(service_url): """Return the service at 'service_url'. This will return the cached service if it exists, or will add a new service if the user so wishes """ global _is_testing if _is_testing: from Acquire.Service import get_remote_service as \ _get_remote_service return _get_remote_service(service_url) from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring service_file = "%s/service_%s" % ( Wallet._wallet_dir(), _string_to_safestring(service_url)) existing_service = None try: existing_service = _read_service(service_file) except: pass if existing_service is not None: # check if the keys need rotating - if they do, load up # the new keys and save them to the service file... if existing_service.should_refresh_keys(): existing_service.refresh_keys() _write_service(existing_service, service_file) return existing_service else: from Acquire.Service import get_remote_service as \ _get_remote_service service = _get_remote_service(service_url) return Wallet.add_service(service)
[docs] @staticmethod def remove_all_services(): """Remove all trusted services from this Wallet""" global _is_testing if _is_testing: return service_files = _glob.glob("%s/service_*" % Wallet._wallet_dir()) for service_file in service_files: if _os.path.exists(service_file): _os.unlink(service_file) # clear cache to force a new lookup from ._service import _cache_service_lookup _cache_service_lookup.clear()
[docs] @staticmethod def remove_service(service): """Remove the cached service info for the passed service""" global _is_testing if _is_testing: return if isinstance(service, str): service_url = service else: service_url = service.canonical_url() from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring service_file = "%s/service_%s" % ( Wallet._wallet_dir(), _string_to_safestring(service_url)) if _os.path.exists(service_file): _os.unlink(service_file) # clear cache to force a new lookup from ._service import _cache_service_lookup _cache_service_lookup.clear()
[docs] def send_password(self, url, username=None, remember_password=True, remember_device=None, dryrun=None): """Send a password and one-time code to the supplied login url""" self._manual_password = False self._manual_otpcode = False if not remember_password: remember_device = False # the login URL is of the form "server/code" words = url.split("/") identity_service = "/".join(words[0:-1]) short_uid = words[-1].split("=")[-1] # now get the service service = Wallet.get_service(identity_service) if not service.can_identify_users(): from Acquire.Client import LoginError raise LoginError( "Service '%s' is unable to identify users! " "You cannot log into something that is not " "a valid identity service!" % (service)) if not username: # choose a username from any existing files... username = self._get_username() print("Logging in using username '%s'" % username) password = self._get_user_password(username, service.canonical_url()) otpcode = self._get_otpcode(username, service.canonical_url()) print("\nLogging in to '%s', session '%s'..." % ( service.canonical_url(), short_uid), end="") _sys.stdout.flush() if dryrun: print("Calling %s with username=%s, password=%s, otpcode=%s, " "remember_device=%s, device_uid=%s, short_uid=%s" % (service.canonical_url(), username, password, otpcode, remember_device, self._device_uid, short_uid)) return try: function = "login" args = {"username": username, "password": password, "otpcode": otpcode, "remember_device": remember_device, "device_uid": self._device_uid, "short_uid": short_uid} response = service.call_function(function=function, args=args) print("SUCCEEDED!") _sys.stdout.flush() except Exception as e: print("FAILED!") _sys.stdout.flush() from Acquire.Client import LoginError raise LoginError("Failed to log in. %s" % e.args) if remember_password: try: provisioning_uri = response["provisioning_uri"] except: provisioning_uri = None try: device_uid = response["device_uid"] except: device_uid = None otpsecret = None if provisioning_uri: try: otpsecret = _re.search(r"secret=([\w\d+]+)&issuer", provisioning_uri).groups()[0] except: pass try: user_info = self._read_userinfo(username, service.canonical_url()) except: user_info = {} if user_info is None: user_info = {} pubkey = self._wallet_key.public_key() must_write = self._manual_password if otpsecret: if self._manual_otpcode: must_write = True if must_write: from Acquire.ObjectStore import string_to_safestring \ as _string_to_safestring from Acquire.ObjectStore import bytes_to_string \ as _bytes_to_string user_info["username"] = _string_to_safestring(username) user_info["password"] = _bytes_to_string( pubkey.encrypt( password.encode("utf-8"))) if otpsecret: user_info["otpsecret"] = _bytes_to_string( pubkey.encrypt( otpsecret.encode("utf-8"))) user_info["device_uid"] = device_uid _write_json(data=user_info, filename=Wallet._get_userfile( username, service.canonical_url())) self._manual_password = False self._manual_otpcode = False return response