Source code for ciphers.crypto_backend

# https://cryptography.io/en/latest/hazmat/primitives/symmetric-encryption
import math
import base64
import logging
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes
)

from . import cipher, cipher_utils

[docs]class Encryptor(cipher.Encryptor): """Encryptor class for symmetric encryption through Fernet module. Essential parameters such as generated keys and cipher modes are dunder-prefixed for reuse while decrypting the message. """ def __init__(self, cipher_params: dict, arg_params: dict, **kwargs): """Initializer for primitives and underlying cipher objects :param kwargs: Consolidation of parameters consumed by base cipher library :type dict """ self._algorithm = cipher_params.get("algorithm", "AES") self._mode = cipher_params.get("mode", "CBC") self.__key = arg_params.get("key", None) or cipher_utils.gen_random_secret(arg_params.get("key_size", 32)).encode('utf-8') self.__iv = arg_params.get("iv", None) or cipher_utils.gen_random_secret(arg_params.get("iv_size", 16)).encode('utf-8') self.__encoding = arg_params.get("encoding", "utf-8") self.__pad_len = 0 self.__algorithm = arg_params.get("algorithm", {}) self.__mode = arg_params.get("mode", {}) if self._algorithm not in CIPHER_ARGS['algorithms']: raise Exception("Unsupported algo type {}".format(self._algorithm)) if self._mode not in CIPHER_ARGS['modes']: raise Exception("Unsupported mode type {}".format(self._mode)) algo_obj = vars(algorithms)[self._algorithm] mode_obj = vars(modes)[self._mode] self._cipher = Cipher( algo_obj(self.__key, **self.__algorithm), mode_obj(self.__iv, **self.__mode), default_backend() )
[docs] def encrypt(self, plain_text): """Template method for encryption of a single message :param plain_text: input message for encryption :type str :returns: encrypted output for secure storage :rtype str """ l = len(plain_text) self.__pad_len = (1<<(int(math.log2(l)) + 3)) - l ptext_padded = plain_text + \ cipher_utils.gen_random_secret(self.__pad_len) enc = self._cipher.encryptor() cipher_text = enc.update(ptext_padded.encode(self.__encoding)) \ + enc.finalize() return cipher_text
[docs]class Decryptor(cipher.Decryptor): """Decryptor class for symmetric decryption through Fernet module. Essential parameters such as generated keys and cipher modes are dunder-prefixed for reuse while decrypting the message. """ def __init__(self, cipher_params: dict, arg_params: dict, **kwargs): """Initializer for primitives and underlying cipher objects :param kwargs: Consolidation of parameters consumed by base cipher library :type dict """ self.__key = arg_params.get("key", None) self.__iv = arg_params.get("iv", None) self.__encoding = arg_params.get("encoding", "utf-8") self.__pad_len = arg_params.get("pad_len", 0) if any([self.__key, self.__iv]) is None: raise Exception("invalid seed params provided for decryption") self._algorithm = cipher_params.get("algorithm", "AES") self._mode = cipher_params.get("mode", "CBC") self.__algorithm = arg_params.get("algorithm", {}) self.__mode = arg_params.get("mode", {}) if self._algorithm not in CIPHER_ARGS['algorithms']: raise Exception("Unsupported algo type {}".format(self._algorithm)) if self._mode not in CIPHER_ARGS['modes']: raise Exception("Unsupported mode type {}".format(self._mode)) algo_obj = eval("algorithms.{}".format(self._algorithm)) mode_obj = eval("modes.{}".format(self._mode)) self._cipher = Cipher( algo_obj(self.__key, **self.__algorithm), mode_obj(self.__iv, **self.__mode), default_backend() )
[docs] def decrypt(self, cipher_text): """Template method for decryption of a cipher text :param cipher_text: input message for decryption :type str :returns: decryption output access/modification :rtype str """ dec = self._cipher.decryptor() plain_text = dec.update(cipher_text) \ + dec.finalize() return plain_text.decode(self.__encoding)[:-1*self.__pad_len]
if __name__ != "__main__": CIPHER_PKG = {'algorithms': algorithms, 'modes': modes} CIPHER_ARGS = {k:cipher_utils.expand_attrs(v) \ for k,v in CIPHER_PKG.items()}