Source code for core.cipherfactory

import re
import os
import stat
import argparse
import ciphers
import core.view
import core.util
import core.config

[docs]class CipherFactory(): """Factory wrapper for dispatching appropriate cipher-based encryptor/decryptor """ def __init__(self): self._enc = None self._dec = None self._parser_cfg = {} self._cipher_obj = None self._cipher_configured = False self._parser_configured = False self._cfg_obj = core.config.CipherConfig()
[docs] def create_vault(self): """Vault creation helper method""" if "vault_file" not in self._parser_cfg: raise Exception("vault file not specified in CLI") vault_path = self._parser_cfg["vault_file"] if os.path.exists(vault_path): raise Exception("vault file already exists: {}".format(vault_path)) elif not os.path.isabs(vault_path): core.util.safe_write(os.getcwd(), vault_path, self.encryptor.encrypt("[]")) else: core.util.safe_write("", vault_path, self.encryptor.encrypt("[]")) os.chmod(vault_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_ENFMT) self.update_cfg_file()
[docs] def load_cmd_cfg(self, parse_obj: argparse.Namespace): """Maintains command line arguments""" self._parser_cfg = vars(parse_obj) self._parser_configured = True
[docs] def load_param_cfg(self, config_obj: core.config.CipherConfig): """Maintains yaml config arguments""" if not self._parser_configured: raise Exception("Please provide CLI arguments into factory") elif not self._parser_cfg["create_vault"]: del self._cfg_obj self._cfg_obj = config_obj self._cipher_configured = True
[docs] def is_requested(self, operation): return bool(self._parser_cfg[operation])
[docs] def update_cfg_file(self): """Flushes new content into the config file""" cfg_attr = vars(self._enc) arg_params = {k[12:]:v for k,v in cfg_attr.items() if bool(re.match("_Encryptor__+", k))} cipher_params = {k[1:]:v for k,v in cfg_attr.items() if bool(re.match("_+", k)) and not bool(re.match("_Encryptor__+", k))} orig_cfg_path = self._parser_cfg.get('cfg_path') or 'cfg.yaml' cfg_filename = orig_cfg_path if not os.path.isabs(orig_cfg_path) \ else os.path.basename(orig_cfg_path) cfg_dirname = os.path.dirname(orig_cfg_path) \ if os.path.isabs(orig_cfg_path) else os.getcwd() self._cfg_obj.overwrite = self._parser_cfg['overwrite_cfg'] final_path = self._cfg_obj.store(cfg_filename = cfg_filename, cfg_dirname = cfg_dirname, arg_params = arg_params, cipher_params = cipher_params) view_obj = core.view.View() view_obj.print("updated cipher configuration flushed to: " + final_path)
def _load_cipher_module(self): if not self._parser_configured: raise Exception("Please provide appropriate CLI configs") if self._cipher_obj is None: self._cipher_obj = __import__("ciphers.{}".format(self._parser_cfg['cipher_suite']), fromlist=[ciphers]) @property def encryptor(self): """Returns an encryptor instance""" self._load_cipher_module() if self._enc is None: params = self._cfg_obj() cipher_params = params.pop("cipher_params", {}) arg_params = params.pop("arg_params", {}) self._enc = self._cipher_obj.Encryptor(cipher_params, arg_params, **params) if self._parser_cfg["overwrite_cfg"]: self.update_cfg_file() return self._enc @property def decryptor(self): """Returns a decryptor instance""" self._load_cipher_module() if self._dec is None: self._dec = self._cipher_obj.Decryptor(**self._cfg_obj()) return self._dec