Source code for Acquire.Identity._usercredentials


__all__ = ["UserCredentials"]

_user_root = "identity/users"


[docs]class UserCredentials: """This class is used to store user credentials in the object store, and to verify that user credentials are correct. The user credentials are used to ultimately store a primary password for the user, which unlocks the user's primary private key """
[docs] @staticmethod def hash(username, password, service_uid=None): """Return a secure hash of the passed username and password""" from Acquire.Crypto import Hash as _Hash from Acquire.Service import get_this_service as _get_this_service if service_uid is None: service_uid = _get_this_service(need_private_access=False).uid() result = _Hash.multi_md5(service_uid, username+password) return result
[docs] @staticmethod def create(user_uid, password, primary_password, device_uid=None): """Create the credentials for the user with specified user_uid, optionally logging in via the specified device_uid, using the specified password, to protect the passed "primary_password" This returns the OTP that has been created to be associated with these credentials """ from Acquire.Crypto import PrivateKey as _PrivateKey from Acquire.Crypto import OTP as _OTP from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.ObjectStore import bytes_to_string as _bytes_to_string if device_uid is None: device_uid = user_uid privkey = _PrivateKey(auto_generate=True) otp = _OTP() otpsecret = otp.encrypt(privkey.public_key()) primary_password = privkey.encrypt(primary_password) data = {"primary_password": _bytes_to_string(primary_password), "private_key": privkey.to_data(passphrase=password), "otpsecret": _bytes_to_string(otpsecret) } key = "%s/credentials/%s/%s" % (_user_root, user_uid, device_uid) bucket = _get_service_account_bucket() _ObjectStore.set_object_from_json(bucket=bucket, key=key, data=data) return otp
[docs] @staticmethod def login(credentials, user_uids, remember_device=False): """Verify the passed credentials are correct. This will find the account that matches these credentials. If one does, then this will validate the credentials are correct, and then return a tuple of the (user_uid, primary_password) for that user """ from Acquire.Crypto import PrivateKey as _PrivateKey from Acquire.Crypto import OTP as _OTP from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket from Acquire.ObjectStore import string_to_bytes as _string_to_bytes from Acquire.Client import Credentials as _Credentials from Acquire.Crypto import RepeatedOTPCodeError \ as _RepeatedOTPCodeError if not isinstance(credentials, _Credentials): raise TypeError("The passed credentials must be type Credentials") username = credentials.username() short_uid = credentials.short_uid() device_uid = credentials.device_uid() password = credentials.password() otpcode = credentials.otpcode() bucket = _get_service_account_bucket() # now try to find a matching user_uid for user_uid in user_uids: data = None try: key = "%s/credentials/%s/%s" % (_user_root, user_uid, device_uid) data = _ObjectStore.get_object_from_json(bucket=bucket, key=key) verified_device_uid = device_uid except: pass if data is None: # unknown device_uid. Try using the user_uid key = "%s/credentials/%s/%s" % (_user_root, user_uid, user_uid) try: data = _ObjectStore.get_object_from_json(bucket=bucket, key=key) verified_device_uid = None except: pass if data is not None: # verify the credentials try: return UserCredentials.validate_password( user_uid=user_uid, device_uid=verified_device_uid, secrets=data, password=password, otpcode=otpcode, remember_device=remember_device) except _RepeatedOTPCodeError as e: # if the OTP code is entered twice, then we need # to invalidate the other session raise e except: # this is not the matching user... pass # only get here if there are no matching users (or the # user-supplied password etc. are wrong) from Acquire.Identity import UserValidationError raise UserValidationError( "Invalid credentials logging into session '%s' " "with username '%s'" % (short_uid, username))
[docs] @staticmethod def validate_password(user_uid, device_uid, secrets, password, otpcode, remember_device): """Validate that the passed password and one-time-code are valid. If they are, then return a tuple of the UserAccount of the unlocked user, the OTP that is used to generate secrets, and the device_uid of the login device If 'remember_device' is True and 'device_uid' is None, then this creates a new OTP for the login device, which is returned, and a new device_uid for that device. The password needed to match this device is a MD5 of the normal user password. """ from Acquire.Crypto import PrivateKey as _PrivateKey from Acquire.Crypto import OTP as _OTP from Acquire.ObjectStore import string_to_bytes as _string_to_bytes privkey = _PrivateKey.from_data(data=secrets["private_key"], passphrase=password) # decrypt and validate the OTP code data = _string_to_bytes(secrets["otpsecret"]) otpsecret = privkey.decrypt(data) otp = _OTP(secret=otpsecret) otp.verify(code=otpcode, once_only=True) # everything is ok - we can load the user account via the # decrypted primary password primary_password = _string_to_bytes(secrets["primary_password"]) primary_password = privkey.decrypt(primary_password) from Acquire.ObjectStore import ObjectStore as _ObjectStore from Acquire.Service import get_service_account_bucket \ as _get_service_account_bucket data = None secrets = None key = "%s/uids/%s" % (_user_root, user_uid) bucket = _get_service_account_bucket() try: data = _ObjectStore.get_object_from_json(bucket=bucket, key=key) except: pass if data is None: from Acquire.Identity import UserValidationError raise UserValidationError( "Unable to validate user as no account data is present!") from Acquire.Identity import UserAccount as _UserAccount user = _UserAccount.from_data(data=data, passphrase=primary_password) if user.uid() != user_uid: from Acquire.Identity import UserValidationError raise UserValidationError( "Unable to validate user as mismatch in user_uids!") if device_uid is None and remember_device: # create a new OTP that is unique for this device from Acquire.ObjectStore import create_uuid as _create_uuid from Acquire.Client import Credentials as _Credentials device_uid = _create_uuid() device_password = _Credentials.encode_device_uid( encoded_password=password, device_uid=device_uid) otp = UserCredentials.create(user_uid=user_uid, password=device_password, primary_password=primary_password, device_uid=device_uid) return {"user": user, "otp": otp, "device_uid": device_uid}