Source code for Acquire.Crypto._keys


import os as _os
import base64 as _base64
import uuid as _uuid

from Acquire.Stubs import lazy_import as _lazy_import

_rsa = _lazy_import.lazy_module(
            "cryptography.hazmat.primitives.asymmetric.rsa")
_serialization = _lazy_import.lazy_module(
            "cryptography.hazmat.primitives.serialization")
_default_backend = _lazy_import.lazy_function(
            "cryptography.hazmat.backends.default_backend")
_hashes = _lazy_import.lazy_module("cryptography.hazmat.primitives.hashes")
_padding = _lazy_import.lazy_module(
            "cryptography.hazmat.primitives.asymmetric.padding")
_fernet = _lazy_import.lazy_module("cryptography.fernet")

__all__ = ["PrivateKey", "PublicKey", "SymmetricKey", "get_private_key"]


def _bytes_to_string(b):
    """Return the passed binary bytes safely encoded to
       a base64 utf-8 string"""
    if b is None:
        return None
    else:
        return _base64.b64encode(b).decode("utf-8")


def _string_to_bytes(s):
    """Return the passed base64 utf-8 encoded binary data
       back converted from a string back to bytes. Note that
       this can only convert strings that were encoded using
       bytes_to_string - you cannot use this to convert
       arbitrary strings to bytes"""
    if s is None:
        return None
    else:
        return _base64.b64decode(s.encode("utf-8"))


def _assert_strong_passphrase(passphrase, mangleFunction):
    """This function returns whether or not the passed
       passphrase is sufficiently strong. To be strong,
       the password must be between 6-50 characters,
       mix upper and lower case, and contain letters and
       numbers
    """

    if mangleFunction:
        passphrase = str(mangleFunction(passphrase))
    else:
        passphrase = str(passphrase)

    if len(passphrase) < 6 or len(passphrase) > 50:
        from Acquire.Crypto import WeakPassphraseError
        print(passphrase)
        raise WeakPassphraseError("The pass-phrase '%s' must contain between "
                                  "6 and 50 characters" % passphrase)

    if len(passphrase) < 24:
        import re as _re

        if not (_re.search(r'[A-Z]', passphrase) and
                _re.search(r'[a-z]', passphrase) and
                _re.search(r'[0-9]', passphrase)):
            from Acquire.Crypto import WeakPassphraseError
            print(passphrase)
            raise WeakPassphraseError(
                "Short pass-phrases must contain numbers and "
                "upper- and lowercase characters")

    # an MD5 passphrase is 32 characters, only lower case and numbers

    return passphrase


def _generate_private_key():
    """Internal function that is used to generate all of our private keys"""
    return _rsa.generate_private_key(public_exponent=65537,
                                     key_size=2048,
                                     backend=_default_backend())


def _generate_symmetric_key():
    """Internal function that is used to generate the symmetric keys"""
    return _fernet.Fernet.generate_key()


_key_database = {}


[docs]def get_private_key(key="default"): """Internal function used to return the key associated with 'key'. This allows a single session to re-use a key, rather than continually generating new keys (which is slow). Make sure you know that you can safely re-use a key. Re-using keys for different function calls is completely ok """ global _key_database if key in _key_database: return _key_database[key] else: privkey = PrivateKey() _key_database[key] = privkey return privkey
[docs]class PublicKey: """This is a holder for an in-memory public key""" def __init__(self, public_key=None): """Construct from the passed public key""" self._pubkey = public_key
[docs] def bytes(self): """Return the raw bytes for this key""" if self._pubkey is None: return None return self._pubkey.public_bytes( encoding=_serialization.Encoding.PEM, format=_serialization.PublicFormat .SubjectPublicKeyInfo)
[docs] def pem(self): """Return a PEM string for this key""" return self.bytes().decode("utf-8")
def __str__(self): """Return a string representation of this key""" return "PublicKey('%s')" % self.bytes().decode("utf-8") def __eq__(self, other): if isinstance(other, self.__class__): return self.bytes() == other.bytes() else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] def write(self, filename): """Write this public key to 'filename'""" if self._pubkey is None: return pubkey_bytes = self.bytes() with open(filename, "wb") as FILE: FILE.write(pubkey_bytes)
[docs] @staticmethod def read_bytes(data): """Read and return a public key from 'data'""" public_key = _serialization.load_pem_public_key( data, backend=_default_backend()) return PublicKey(public_key)
[docs] @staticmethod def read(filename): """Read and return a public key from 'filename'""" with open(filename, "rb") as FILE: return PublicKey.read_bytes(FILE.read())
[docs] def fingerprint(self): """Return the fingerprint of this key - this is useful to help work out which key to use to decrypt data """ from hashlib import md5 as _md5 md5 = _md5() md5.update(self.bytes()) h = md5.hexdigest() # return this signature as "AA:BB:CC:DD:EE:etc." return ":".join([h[i:i+2] for i in range(0, len(h), 2)])
[docs] def encrypt(self, message): """Encrypt and return the passed message. For short messages this will use the private key directly. For longer messages, this will generate a random symmetric key, will encrypt the message using that, and will then encrypt the symmetric key. This returns some bytes """ if isinstance(message, str): message = message.encode("utf-8") try: return self._pubkey.encrypt( message, _padding.OAEP( mgf=_padding.MGF1(algorithm=_hashes.SHA256()), algorithm=_hashes.SHA256(), label=None) ) except: pass # this is a longer message that cannot be encoded using # an asymmetric key - need to use a symmetric key key = _fernet.Fernet.generate_key() f = _fernet.Fernet(key) token = f.encrypt(message) encrypted_key = self._pubkey.encrypt( key, _padding.OAEP( mgf=_padding.MGF1(algorithm=_hashes.SHA256()), algorithm=_hashes.SHA256(), label=None) ) # the first 256 bytes are the encrypted key - the rest # is the token, because we are using 2048 bit (256 byte) keys return encrypted_key + token
[docs] def verify(self, signature, message): """Verify that the message has been correctly signed""" if self._pubkey is None: from Acquire.Crypto import KeyManipulationError raise KeyManipulationError("You cannot verify a message using " "an empty public key!") if isinstance(message, str): message = message.encode("utf-8") try: self._pubkey.verify( signature, message, _padding.PSS( mgf=_padding.MGF1(_hashes.SHA256()), salt_length=_padding.PSS.MAX_LENGTH), _hashes.SHA256()) except Exception as e: from Acquire.Crypto import SignatureVerificationError raise SignatureVerificationError( "Error validating the signature " "for the passed message: %s" % str(e))
[docs] def to_data(self): """Return this public key as a json-serialisable dictionary""" data = {} b = self.bytes() if b is not None: data["bytes"] = _bytes_to_string(self.bytes()) return data
[docs] @staticmethod def from_data(data): """Construct from the passed json-deserialised dictionary""" if isinstance(data, str): return PublicKey.read_bytes(_string_to_bytes(data)) elif isinstance(data, bytes): return PublicKey.read_bytes(data) else: key = PublicKey() if (data and len(data) > 0): key = PublicKey.read_bytes(_string_to_bytes(data["bytes"])) return key
[docs]class PrivateKey: """This is a holder for an in-memory private key""" def __init__(self, private_key=None, auto_generate=True): """Construct the key either from a passed key, or by generating a new key""" self._privkey = private_key if self._privkey is None: if auto_generate: self._privkey = _generate_private_key() def __str__(self): """Return a string representation of this key""" return "PrivateKey( public_key='%s' )" % \ self.public_key().bytes().decode("utf-8") def __eq__(self, other): if isinstance(other, self.__class__): return self.public_key() == other.public_key() else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] @staticmethod def read_bytes(data, passphrase, mangleFunction=None): """Read a private key from the passed bytes 'data' that is encrypted using 'passphrase' and return a PrivateKey object holding that key """ passphrase = _assert_strong_passphrase(passphrase, mangleFunction) private_key = None try: private_key = _serialization.load_pem_private_key( data, password=passphrase.encode("utf-8"), backend=_default_backend()) except Exception as e: from Acquire.Crypto import KeyManipulationError raise KeyManipulationError("Cannot unlock key. %s" % str(e)) return PrivateKey(private_key)
[docs] @staticmethod def read(filename, passphrase, mangleFunction=None): """Read a private key from 'filename' that is encrypted using 'passphrase' and return a PrivateKey object holding that key""" data = None try: with open(filename, "rb") as FILE: data = FILE.read() except IOError as e: from Acquire.Crypto import KeyManipulationError raise KeyManipulationError( "Cannot read the private keyfile %s: %s" % (filename, str(e))) return PrivateKey.read_bytes(data, passphrase, mangleFunction)
[docs] @staticmethod def random_passphrase(): """Randomly generate and return a passphrase that obeys the password rules and could be used to serialise a PrivateKey """ import random as _random import string as _string # use the operating system as source of random numbers rand = _random.SystemRandom() # generate a random password comprised of a random set of # upper, lower and digits characters nvals = int(rand.uniform(20, 40)) nlower = int(rand.uniform(1, nvals-5)) nupper = int(rand.uniform(1, nvals-nlower-5)) ndigits = nvals - nupper - nlower assert(nlower > 0) assert(nupper > 0) assert(ndigits > 0) lower = [rand.choice(_string.ascii_lowercase) for _ in range(nlower)] upper = [rand.choice(_string.ascii_uppercase) for _ in range(nupper)] digits = [rand.choice(_string.digits) for _ in range(ndigits)] passphrase = "".join(rand.sample(lower+upper+digits, nvals)) _assert_strong_passphrase(passphrase, mangleFunction=None) return passphrase
[docs] def pem(self, passphrase, mangleFunction=None): """Return a PEM string for this key""" return self.bytes(passphrase, mangleFunction).decode("utf-8")
[docs] def bytes(self, passphrase, mangleFunction=None): """Return the raw bytes for this key, encoded by the passed passphrase that has been optionally mangled by mangleFunction""" if self._privkey is None: return None passphrase = _assert_strong_passphrase(passphrase, mangleFunction) return self._privkey.private_bytes( encoding=_serialization.Encoding.PEM, format=_serialization.PrivateFormat.PKCS8, encryption_algorithm=_serialization.BestAvailableEncryption( passphrase.encode("utf-8")))
[docs] def write(self, filename, passphrase, mangleFunction=None): """Write this key to 'filename', encrypted with 'passphrase'""" if self._privkey is None: return privkey_bytes = self.bytes(passphrase, mangleFunction) with open(_os.open(filename, _os.O_CREAT | _os.O_WRONLY, 0o700), 'wb') as FILE: FILE.write(privkey_bytes)
[docs] def public_key(self, filename=None): """Get the public key for this private key. If filename is specified then this is written to the passed file""" if self._privkey is None: return None return PublicKey(self._privkey.public_key())
[docs] def key_size_in_bytes(self): """Return the number of bytes in this key""" if self._privkey is None: return 0 else: return int(self._privkey.key_size / 8)
[docs] def fingerprint(self): """Return the fingerprint of this key - this is useful to help work out which key to use to decrypt data """ return self.public_key().fingerprint()
[docs] def encrypt(self, message): """Encrypt and return the passed message""" return self.public_key().encrypt(message)
[docs] def verify(self, signature, message): """Verify the passed signature is correct for the passed message""" return self.public_key().verify(signature, message)
[docs] def decrypt(self, message): """Decrypt and return the passed message""" key_size = self.key_size_in_bytes() if key_size == 0: from Acquire.Crypto import DecryptionError raise DecryptionError("You cannot decrypt a message " "with a null key!") # try standard decryption try: message = self._privkey.decrypt( message, _padding.OAEP( mgf=_padding.MGF1(algorithm=_hashes.SHA256()), algorithm=_hashes.SHA256(), label=None)) try: return message.decode("utf-8") except: return message except: pass # it is a larger message, so need to decrypt the secret symmetric # key, and then use that to decrypt the rest of the token try: symkey = self._privkey.decrypt( message[0:key_size], _padding.OAEP( mgf=_padding.MGF1(algorithm=_hashes.SHA256()), algorithm=_hashes.SHA256(), label=None)) except Exception as e: from Acquire.Crypto import DecryptionError raise DecryptionError( "Cannot decrypt the symmetric key used " "to encrypt the long message '%s' (%s): %s" % (message[0:key_size], key_size, str(e))) try: f = _fernet.Fernet(symkey) except: f = _fernet.Fernet(symkey.decode("utf-8")) try: try: message = f.decrypt(message[key_size:]) except: message = f.decrypt(message[key_size:].encode("utf-8")) except Exception as e: from Acquire.Crypto import DecryptionError raise DecryptionError( "Cannot decrypt the long message using the " "symmetric key: %s" % str(e)) try: return message.decode("utf-8") except: return message
[docs] def sign(self, message): """Return the signature for the passed message""" if self._privkey is None: return None if isinstance(message, str): message = message.encode("utf-8") signature = self._privkey.sign( message, _padding.PSS( mgf=_padding.MGF1(_hashes.SHA256()), salt_length=_padding.PSS.MAX_LENGTH), _hashes.SHA256()) return signature
[docs] def to_data(self, passphrase, mangleFunction=None): """Return the json-serialisable data for this key""" data = {} b = self.bytes(passphrase, mangleFunction) if b is not None: data["bytes"] = _bytes_to_string(b) return data
[docs] @staticmethod def from_data(data, passphrase, mangleFunction=None): """Return a private key constructed from the passed json-deserialised dictionary """ if isinstance(data, str): return PrivateKey.read_bytes(_string_to_bytes(data), passphrase, mangleFunction) elif isinstance(data, bytes): return PrivateKey.read_bytes(data, passphrase, mangleFunction) elif (data and len(data) > 0): return PrivateKey.read_bytes(_string_to_bytes(data["bytes"]), passphrase, mangleFunction) else: return None
[docs]class SymmetricKey: """This is a holder for an in-memory symmetric key (for symmetric encryption) """ def __init__(self, symmetric_key=None, auto_generate=True): """Construct the key either from a passed key, or by generating a new key. The passed key will be converted into a URL-safe base64-encoded 32byte key """ if symmetric_key is not None: from Acquire.Crypto import Hash as _Hash from Acquire.ObjectStore import string_to_encoded \ as _string_to_encoded self._symkey = _string_to_encoded( _Hash.md5(symmetric_key)).encode("utf-8") else: if auto_generate: self._symkey = _generate_symmetric_key() def __str__(self): """Return a string representation of this key""" return "SymmetricKey()" def __eq__(self, other): if isinstance(other, self.__class__): return self._symkey == other._symkey else: return False def __ne__(self, other): return not self.__eq__(other)
[docs] @staticmethod def read_bytes(data, passphrase, mangleFunction=None): """Read a SymmetricKey from the passed bytes 'data' that is encrypted using 'passphrase' and return a SymmetricKey object holding that key """ passphrase = _assert_strong_passphrase(passphrase, mangleFunction) symmetric_key = SymmetricKey(auto_generate=False) try: s = SymmetricKey(symmetric_key=passphrase) symmetric_key._symkey = s.decrypt(data) except Exception as e: from Acquire.Crypto import KeyManipulationError raise KeyManipulationError("Cannot unlock key. %s" % str(e)) try: symmetric_key._symkey = symmetric_key._symkey.encode("utf-8") except: pass return symmetric_key
[docs] @staticmethod def read(filename, passphrase, mangleFunction=None): """Read a private key from 'filename' that is encrypted using 'passphrase' and return a SymmetricKey object holding that key""" data = None try: with open(filename, "rb") as FILE: data = FILE.read() except IOError as e: from Acquire.Crypto import KeyManipulationError raise KeyManipulationError( "Cannot read the keyfile %s: %s" % (filename, str(e))) return SymmetricKey.read_bytes(data, passphrase, mangleFunction)
[docs] @staticmethod def random_passphrase(): """Randomly generate and return a passphrase that obeys the password rules and could be used to serialise a SymmetricKey """ import random as _random import string as _string # use the operating system as source of random numbers rand = _random.SystemRandom() # generate a random password comprised of a random set of # upper, lower and digits characters nvals = int(rand.uniform(20, 40)) nlower = int(rand.uniform(1, nvals-5)) nupper = int(rand.uniform(1, nvals-nlower-5)) ndigits = nvals - nupper - nlower assert(nlower > 0) assert(nupper > 0) assert(ndigits > 0) lower = [rand.choice(_string.ascii_lowercase) for _ in range(nlower)] upper = [rand.choice(_string.ascii_uppercase) for _ in range(nupper)] digits = [rand.choice(_string.digits) for _ in range(ndigits)] passphrase = "".join(rand.sample(lower+upper+digits, nvals)) _assert_strong_passphrase(passphrase, mangleFunction=None) return passphrase
[docs] def bytes(self, passphrase, mangleFunction=None): """Return the raw bytes for this key, encoded by the passed passphrase that has been optionally mangled by mangleFunction""" if self._symkey is None: return None passphrase = _assert_strong_passphrase(passphrase, mangleFunction) s = SymmetricKey(symmetric_key=passphrase) return s.encrypt(self._symkey)
[docs] def write(self, filename, passphrase, mangleFunction=None): """Write this key to 'filename', encrypted with 'passphrase'""" if self._symkey is None: return symkey_bytes = self.bytes(passphrase, mangleFunction) with open(_os.open(filename, _os.O_CREAT | _os.O_WRONLY, 0o700), 'wb') as FILE: FILE.write(symkey_bytes)
[docs] def fingerprint(self): """Return the fingerprint of this key - this is useful to help work out which key to use to decrypt data """ if self._symkey is None: return None from hashlib import md5 as _md5 md5 = _md5() md5.update(self._symkey) h = md5.hexdigest() # return this signature as "AA:BB:CC:DD:EE:etc." return ":".join([h[i:i+2] for i in range(0, len(h), 2)])
[docs] def encrypt(self, message): """Encrypt and return the passed message""" if self._symkey is None: self._symkey = _generate_symmetric_key() if isinstance(message, str): message = message.encode("utf-8") assert(type(self._symkey) is bytes) f = _fernet.Fernet(self._symkey) token = f.encrypt(message) return token
[docs] def decrypt(self, message): """Decrypt and return the passed message""" if self._symkey is None: from Acquire.Crypto import DecryptionError raise DecryptionError("You cannot decrypt a message " "with a null key!") assert(type(self._symkey) is bytes) f = _fernet.Fernet(self._symkey) try: message = f.decrypt(message) except Exception as e: from Acquire.Crypto import DecryptionError raise DecryptionError( "Cannot decrypt the message using the " "symmetric key: %s : %s : %s" % (str(e), self._symkey, message)) try: return message.decode("utf-8") except: return message
[docs] def to_data(self, passphrase, mangleFunction=None): """Return the json-serialisable data for this key""" data = {} b = self.bytes(passphrase, mangleFunction) if b is not None: data["bytes"] = _bytes_to_string(b) return data
[docs] @staticmethod def from_data(data, passphrase, mangleFunction=None): """Return a key constructed from the passed json-deserialised dictionary """ if isinstance(data, str): return SymmetricKey.read_bytes(_string_to_bytes(data), passphrase, mangleFunction) elif isinstance(data, bytes): return SymmetricKey.read_bytes(data, passphrase, mangleFunction) elif (data and len(data) > 0): return SymmetricKey.read_bytes(_string_to_bytes(data["bytes"]), passphrase, mangleFunction) else: return None