Skip to content

加密

Hash helpers used by the signing utilities.

get_md5_of_str(string)

Return the MD5 hex digest of a string.

Source code in aloha/encrypt/hash.py
7
8
9
def get_md5_of_str(string):
    """Return the MD5 hex digest of a string."""
    return hashlib.md5(string.encode()).hexdigest()

get_sha256_of_str(string)

Return the SHA-256 hex digest of a string.

Source code in aloha/encrypt/hash.py
def get_sha256_of_str(string):
    """Return the SHA-256 hex digest of a string."""
    return hashlib.sha256(string.encode()).hexdigest()

hash_base62(s, length=6)

Return a Base62-encoded hash of a string with specified length.

Source code in aloha/encrypt/hash.py
def hash_base62(s: str, length: int = 6) -> str:
    """Return a Base62-encoded hash of a string with specified length."""
    assert length > 0 and length <= 11, "Length must be between 1 and 11 for Base62 encoding."

    CHARS = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
    digest = hashlib.sha256(s.encode()).digest()  # 32 bytes = 256 bits
    # Convert the digest to an integer and then to Base62
    num = int.from_bytes(digest, "big")
    result = []
    for _ in range(length):
        result.append(CHARS[num % 62])
        num //= 62
    return "".join(reversed(result))

hash_dict(dic)

Hash a dictionary after JSON normalization.

Source code in aloha/encrypt/hash.py
def hash_dict(dic):
    """Hash a dictionary after JSON normalization."""
    s = json.dumps(dict(dic), sort_keys=True, ensure_ascii=False, default=str)
    return hashlib.md5(s.encode()).hexdigest()

hash_obj(obj)

Hash an arbitrary JSON-serializable object.

Source code in aloha/encrypt/hash.py
def hash_obj(obj):
    """Hash an arbitrary JSON-serializable object."""
    s = json.dumps(obj, sort_keys=True, ensure_ascii=False, default=str)
    return hashlib.md5(s.encode()).hexdigest()

JWT encode/decode helpers.

decode(secret_key, token, **kwargs)

Decode a JWT token and return either the payload or an error string.

Source code in aloha/encrypt/jwt.py
def decode(secret_key: str, token: str, **kwargs):
    """Decode a JWT token and return either the payload or an error string."""
    try:
        resp = jwt.decode(jwt=token, key=secret_key, algorithms=["HS256"], **kwargs)
    except jwt.ExpiredSignatureError as e:
        resp = str(e)
    except jwt.PyJWTError as e:
        resp = str(e)
    return resp

encode(secret_key, payload, headers=None, **kwargs)

Encode a payload into a JWT token.

Source code in aloha/encrypt/jwt.py
def encode(secret_key: str, payload: dict, headers: dict | None = None, **kwargs):
    """Encode a payload into a JWT token."""
    token = jwt.encode(payload=payload, key=secret_key, headers=headers, **kwargs)
    return token

AES encrypt/decrypt helpers.

AesEncryptor

Encrypt and decrypt strings with a selectable AES cipher profile.

Source code in aloha/encrypt/aes.py
class AesEncryptor:
    """Encrypt and decrypt strings with a selectable AES cipher profile."""

    supported_cipher_methods = _AES_CIPHER_METHODS

    def __init__(self, key: Union[str, bytes] = None, key_size: int = 16, cipher_name: str = "AES/ECB/PKCS5Padding"):
        """Initialize the AES key and cipher settings."""
        _key = key
        if key is None:
            _key = _generate_key(key_size)
        elif isinstance(key, str):
            _key = key.encode()

        if len(_key) not in (
            16,
            24,
            32,
        ):
            raise ValueError("Invalid key size/length [%s] for AesEncryptor!" % len(_key))

        self.key_aes, self.block_size = _key, AES.block_size
        # https://pycryptodome.readthedocs.io/en/latest/src/util/util.html
        self.cipher_name = cipher_name

    def encrypt(self, text: str, output_format="hex", func_pad: Optional[Callable] = None) -> Union[str, bytes]:
        """Encrypt a UTF-8 string and return hex, base64, or raw bytes."""
        dict_params, pad_style = _AES_CIPHER_METHODS.get(self.cipher_name)
        if not callable(func_pad):

            def _func_pad(x):
                return pad(x, block_size=self.block_size, style=pad_style)

            if not callable(func_pad):
                func_pad = _func_pad

        data = text.encode()
        padded = func_pad(data)
        cipher = AES.new(key=self.key_aes, **dict_params)
        bytes_crypt = cipher.encrypt(padded)

        if output_format == "hex":
            crypt = binascii.b2a_hex(bytes_crypt).decode()
        elif output_format == "base64":
            crypt = base64.b64encode(bytes_crypt).decode()
        elif output_format in ("bytes", "bin"):
            crypt = bytes_crypt
        else:
            raise ValueError("Unknown output_type [%s]" % output_format)
        return crypt

    def decrypt(
        self, text: Union[str, bytes], input_format: str = "hex", func_unpad: Optional[Callable] = None
    ) -> Union[str, bytes]:
        """Decrypt ciphertext produced by :meth:`encrypt`."""
        text += (len(text) % 4) * "="
        if input_format == "hex":
            crypt = binascii.a2b_hex(text)
        elif input_format == "base64":
            crypt = base64.b64decode(text)
        elif input_format in ("bytes", "bin"):
            crypt = text
        else:
            raise ValueError("Unknown output_type [%s]" % input_format)
        dict_params, pad_style = _AES_CIPHER_METHODS.get(self.cipher_name)
        cipher = AES.new(key=self.key_aes, **dict_params)
        data = cipher.decrypt(crypt)

        def _func_unpad(x):
            return unpad(x, block_size=self.block_size, style=pad_style)

        if not callable(func_unpad):
            func_unpad = _func_unpad
        data = func_unpad(data)
        return data.decode("UTF-8")

__init__(key=None, key_size=16, cipher_name='AES/ECB/PKCS5Padding')

Initialize the AES key and cipher settings.

Source code in aloha/encrypt/aes.py
def __init__(self, key: Union[str, bytes] = None, key_size: int = 16, cipher_name: str = "AES/ECB/PKCS5Padding"):
    """Initialize the AES key and cipher settings."""
    _key = key
    if key is None:
        _key = _generate_key(key_size)
    elif isinstance(key, str):
        _key = key.encode()

    if len(_key) not in (
        16,
        24,
        32,
    ):
        raise ValueError("Invalid key size/length [%s] for AesEncryptor!" % len(_key))

    self.key_aes, self.block_size = _key, AES.block_size
    # https://pycryptodome.readthedocs.io/en/latest/src/util/util.html
    self.cipher_name = cipher_name

decrypt(text, input_format='hex', func_unpad=None)

Decrypt ciphertext produced by :meth:encrypt.

Source code in aloha/encrypt/aes.py
def decrypt(
    self, text: Union[str, bytes], input_format: str = "hex", func_unpad: Optional[Callable] = None
) -> Union[str, bytes]:
    """Decrypt ciphertext produced by :meth:`encrypt`."""
    text += (len(text) % 4) * "="
    if input_format == "hex":
        crypt = binascii.a2b_hex(text)
    elif input_format == "base64":
        crypt = base64.b64decode(text)
    elif input_format in ("bytes", "bin"):
        crypt = text
    else:
        raise ValueError("Unknown output_type [%s]" % input_format)
    dict_params, pad_style = _AES_CIPHER_METHODS.get(self.cipher_name)
    cipher = AES.new(key=self.key_aes, **dict_params)
    data = cipher.decrypt(crypt)

    def _func_unpad(x):
        return unpad(x, block_size=self.block_size, style=pad_style)

    if not callable(func_unpad):
        func_unpad = _func_unpad
    data = func_unpad(data)
    return data.decode("UTF-8")

encrypt(text, output_format='hex', func_pad=None)

Encrypt a UTF-8 string and return hex, base64, or raw bytes.

Source code in aloha/encrypt/aes.py
def encrypt(self, text: str, output_format="hex", func_pad: Optional[Callable] = None) -> Union[str, bytes]:
    """Encrypt a UTF-8 string and return hex, base64, or raw bytes."""
    dict_params, pad_style = _AES_CIPHER_METHODS.get(self.cipher_name)
    if not callable(func_pad):

        def _func_pad(x):
            return pad(x, block_size=self.block_size, style=pad_style)

        if not callable(func_pad):
            func_pad = _func_pad

    data = text.encode()
    padded = func_pad(data)
    cipher = AES.new(key=self.key_aes, **dict_params)
    bytes_crypt = cipher.encrypt(padded)

    if output_format == "hex":
        crypt = binascii.b2a_hex(bytes_crypt).decode()
    elif output_format == "base64":
        crypt = base64.b64encode(bytes_crypt).decode()
    elif output_format in ("bytes", "bin"):
        crypt = bytes_crypt
    else:
        raise ValueError("Unknown output_type [%s]" % output_format)
    return crypt

main()

Small self-test for the AES helper.

Source code in aloha/encrypt/aes.py
def main():
    """Small self-test for the AES helper."""
    a = AesEncryptor()
    src = "hello~"
    enc = a.encrypt(src, output_format="base64")
    dec = a.decrypt(enc, input_format="base64")
    assert src == dec

RSA encrypt/decrypt and signing helpers.

RsaEncryptor

Encrypt, decrypt, and convert RSA keys and payloads.

Source code in aloha/encrypt/rsa.py
class RsaEncryptor:
    """Encrypt, decrypt, and convert RSA keys and payloads."""

    _dict_cache_cipher = {}
    _dict_cache_decipher = {}
    supported_cipher_methods = _RSA_CIPHER_METHODS

    # ref: https://cryptobook.nakov.com/asymmetric-key-ciphers/rsa-encrypt-decrypt-examples
    def __init__(
        self, key_private: str | None = None, key_public: str | None = None, cipher_name: str = "RSA/ECB/PKCS1Padding"
    ):
        """Load optional keys and select a cipher profile."""
        self.key_private, self.key_public = self.load_keys_from_string(key_private=key_private, key_public=key_public)
        assert self._get_cipher_module(cipher_name) is not None, "Invalid cipher_name!"
        self.cipher_name = cipher_name

    @staticmethod
    def _get_cipher_module(full_cipher_name: str | None = None) -> Optional[Tuple]:
        try:
            return _RSA_CIPHER_METHODS[full_cipher_name]
        except KeyError:
            raise ValueError("Unsupported full cipher name, supported ones: %s." % ",".join(sorted(_RSA_CIPHER_METHODS)))

    @staticmethod
    def generate_key_pair(size: int = 1024) -> Tuple[str, str]:
        """Generate a PEM-encoded RSA key pair."""
        key_pair = RSA.generate(size)
        key_private, key_public = key_pair.exportKey(), key_pair.publickey().exportKey()
        return key_private.decode("ascii"), key_public.decode("ascii")

    @lru_cache
    def get_cipher(self, key_public: str | None = None, cipher_name="RSA/ECB/PKCS1Padding") -> t_cipher_module:
        """Return a cached public-key cipher instance."""
        if key_public is None:
            key_pub = self.key_public
        else:
            _, key_pub = self.load_keys_from_string(key_public=key_public)
        if key_pub is None:
            raise RuntimeError("Public Key not set!")
        cache_key = key_pub.export_key(format="OpenSSH")
        if cache_key not in RsaEncryptor._dict_cache_cipher:
            pkcs_module, dict_param = self._get_cipher_module(cipher_name)
            RsaEncryptor._dict_cache_cipher[cache_key] = pkcs_module.new(key_pub, **dict_param)
        # debug: print('->PUB',  cache_key, len(RsaEncryptor._dict_cache_cipher))
        return RsaEncryptor._dict_cache_cipher[cache_key]

    @lru_cache
    def get_decipher(self, key_private: str | None = None, cipher_name="RSA/ECB/PKCS1Padding") -> t_cipher_module:
        """Return a cached private-key decipher instance."""
        if key_private is None:
            key_pri = self.key_private
        else:
            key_pri, _ = self.load_keys_from_string(key_private=key_private)
        if key_pri is None:
            raise RuntimeError("Private Key not set!")
        cache_key = key_pri.export_key(format="OpenSSH")
        if cache_key not in RsaEncryptor._dict_cache_decipher:
            pkcs_module, dict_param = self._get_cipher_module(cipher_name)
            RsaEncryptor._dict_cache_decipher[cache_key] = pkcs_module.new(key_pri, **dict_param)
        # debug: print('->PRI', cache_key, len(RsaEncryptor._dict_cache_decipher))
        return RsaEncryptor._dict_cache_decipher[cache_key]

    @staticmethod
    def load_keys_from_binary(
        key_private: bytes | None = None, key_public: bytes | None = None
    ) -> Tuple[Optional[RSA.RsaKey], Optional[RSA.RsaKey]]:
        """Load RSA keys from PEM/DER bytes."""
        _key_private, _key_public = None, None

        if key_private is not None:
            try:
                _key_private = RSA.import_key(key_private)
            except ValueError:
                raise ValueError("RSA pri key format error: [%s]" % key_private)

        if key_public is not None:
            try:
                _key_public = RSA.import_key(key_public)
            except ValueError:
                raise ValueError("RSA pub key format error: [%s]" % key_public)

        return _key_private, _key_public

    @staticmethod
    def load_keys_from_string(
        key_private: str | None = None, key_public: str | None = None
    ) -> Tuple[Optional[RSA.RsaKey], Optional[RSA.RsaKey]]:
        """Load RSA keys from PEM-like strings."""
        _key_private, _key_public = None, None

        if key_private is not None:
            if not key_private.startswith("-----"):
                key_pri = f"-----BEGIN RSA PRIVATE KEY-----\n{key_private}\n-----END RSA PRIVATE KEY-----"
            else:
                key_pri = key_private
            try:
                _key_private = RSA.import_key(key_pri)
            except ValueError:
                raise ValueError("RSA private key format error: [%s]" % key_pri)

        if key_public is not None:
            if not key_public.startswith("-----"):
                key_pub = f"-----BEGIN PUBLIC KEY-----\n{key_public}\n-----END PUBLIC KEY-----"
            else:
                key_pub = key_public
            try:
                _key_public = RSA.import_key(key_pub)
            except ValueError:
                raise ValueError("RSA public key format error: [%s]" % key_pub)

        return _key_private, _key_public

    def encrypt_with_public_key(
        self, message: Union[str, bytes], key_public: str | None = None, cipher_name: str = None
    ) -> bytes:
        """Encrypt a message with a public key."""
        data = message if isinstance(message, bytes) else message.encode("UTF-8")
        cipher = self.get_cipher(key_public=key_public, cipher_name=cipher_name or self.cipher_name)
        return cipher.encrypt(data)

    def decrypt_with_private_key(
        self, ciphertext: Union[str, bytes], key_private: str | None = None, cipher_name: str | None = None, **kwargs
    ) -> bytes:
        """Decrypt ciphertext with a private key."""
        data = ciphertext if isinstance(ciphertext, bytes) else ciphertext.encode("ascii")
        decipher = self.get_decipher(key_private=key_private, cipher_name=cipher_name or self.cipher_name)
        return decipher.decrypt(data, **kwargs)

    @staticmethod
    def convert_bytes_to_base64(data: bytes) -> str:
        """Encode raw bytes as base64 text."""
        return base64.b64encode(data).decode()

    @staticmethod
    def convert_base64_to_bytes(data: str) -> bytes:
        """Decode base64 text into raw bytes."""
        return base64.decodebytes(data.encode("ascii"))

__init__(key_private=None, key_public=None, cipher_name='RSA/ECB/PKCS1Padding')

Load optional keys and select a cipher profile.

Source code in aloha/encrypt/rsa.py
def __init__(
    self, key_private: str | None = None, key_public: str | None = None, cipher_name: str = "RSA/ECB/PKCS1Padding"
):
    """Load optional keys and select a cipher profile."""
    self.key_private, self.key_public = self.load_keys_from_string(key_private=key_private, key_public=key_public)
    assert self._get_cipher_module(cipher_name) is not None, "Invalid cipher_name!"
    self.cipher_name = cipher_name

convert_base64_to_bytes(data) staticmethod

Decode base64 text into raw bytes.

Source code in aloha/encrypt/rsa.py
@staticmethod
def convert_base64_to_bytes(data: str) -> bytes:
    """Decode base64 text into raw bytes."""
    return base64.decodebytes(data.encode("ascii"))

convert_bytes_to_base64(data) staticmethod

Encode raw bytes as base64 text.

Source code in aloha/encrypt/rsa.py
@staticmethod
def convert_bytes_to_base64(data: bytes) -> str:
    """Encode raw bytes as base64 text."""
    return base64.b64encode(data).decode()

decrypt_with_private_key(ciphertext, key_private=None, cipher_name=None, **kwargs)

Decrypt ciphertext with a private key.

Source code in aloha/encrypt/rsa.py
def decrypt_with_private_key(
    self, ciphertext: Union[str, bytes], key_private: str | None = None, cipher_name: str | None = None, **kwargs
) -> bytes:
    """Decrypt ciphertext with a private key."""
    data = ciphertext if isinstance(ciphertext, bytes) else ciphertext.encode("ascii")
    decipher = self.get_decipher(key_private=key_private, cipher_name=cipher_name or self.cipher_name)
    return decipher.decrypt(data, **kwargs)

encrypt_with_public_key(message, key_public=None, cipher_name=None)

Encrypt a message with a public key.

Source code in aloha/encrypt/rsa.py
def encrypt_with_public_key(
    self, message: Union[str, bytes], key_public: str | None = None, cipher_name: str = None
) -> bytes:
    """Encrypt a message with a public key."""
    data = message if isinstance(message, bytes) else message.encode("UTF-8")
    cipher = self.get_cipher(key_public=key_public, cipher_name=cipher_name or self.cipher_name)
    return cipher.encrypt(data)

generate_key_pair(size=1024) staticmethod

Generate a PEM-encoded RSA key pair.

Source code in aloha/encrypt/rsa.py
@staticmethod
def generate_key_pair(size: int = 1024) -> Tuple[str, str]:
    """Generate a PEM-encoded RSA key pair."""
    key_pair = RSA.generate(size)
    key_private, key_public = key_pair.exportKey(), key_pair.publickey().exportKey()
    return key_private.decode("ascii"), key_public.decode("ascii")

get_cipher(key_public=None, cipher_name='RSA/ECB/PKCS1Padding') cached

Return a cached public-key cipher instance.

Source code in aloha/encrypt/rsa.py
@lru_cache
def get_cipher(self, key_public: str | None = None, cipher_name="RSA/ECB/PKCS1Padding") -> t_cipher_module:
    """Return a cached public-key cipher instance."""
    if key_public is None:
        key_pub = self.key_public
    else:
        _, key_pub = self.load_keys_from_string(key_public=key_public)
    if key_pub is None:
        raise RuntimeError("Public Key not set!")
    cache_key = key_pub.export_key(format="OpenSSH")
    if cache_key not in RsaEncryptor._dict_cache_cipher:
        pkcs_module, dict_param = self._get_cipher_module(cipher_name)
        RsaEncryptor._dict_cache_cipher[cache_key] = pkcs_module.new(key_pub, **dict_param)
    # debug: print('->PUB',  cache_key, len(RsaEncryptor._dict_cache_cipher))
    return RsaEncryptor._dict_cache_cipher[cache_key]

get_decipher(key_private=None, cipher_name='RSA/ECB/PKCS1Padding') cached

Return a cached private-key decipher instance.

Source code in aloha/encrypt/rsa.py
@lru_cache
def get_decipher(self, key_private: str | None = None, cipher_name="RSA/ECB/PKCS1Padding") -> t_cipher_module:
    """Return a cached private-key decipher instance."""
    if key_private is None:
        key_pri = self.key_private
    else:
        key_pri, _ = self.load_keys_from_string(key_private=key_private)
    if key_pri is None:
        raise RuntimeError("Private Key not set!")
    cache_key = key_pri.export_key(format="OpenSSH")
    if cache_key not in RsaEncryptor._dict_cache_decipher:
        pkcs_module, dict_param = self._get_cipher_module(cipher_name)
        RsaEncryptor._dict_cache_decipher[cache_key] = pkcs_module.new(key_pri, **dict_param)
    # debug: print('->PRI', cache_key, len(RsaEncryptor._dict_cache_decipher))
    return RsaEncryptor._dict_cache_decipher[cache_key]

load_keys_from_binary(key_private=None, key_public=None) staticmethod

Load RSA keys from PEM/DER bytes.

Source code in aloha/encrypt/rsa.py
@staticmethod
def load_keys_from_binary(
    key_private: bytes | None = None, key_public: bytes | None = None
) -> Tuple[Optional[RSA.RsaKey], Optional[RSA.RsaKey]]:
    """Load RSA keys from PEM/DER bytes."""
    _key_private, _key_public = None, None

    if key_private is not None:
        try:
            _key_private = RSA.import_key(key_private)
        except ValueError:
            raise ValueError("RSA pri key format error: [%s]" % key_private)

    if key_public is not None:
        try:
            _key_public = RSA.import_key(key_public)
        except ValueError:
            raise ValueError("RSA pub key format error: [%s]" % key_public)

    return _key_private, _key_public

load_keys_from_string(key_private=None, key_public=None) staticmethod

Load RSA keys from PEM-like strings.

Source code in aloha/encrypt/rsa.py
@staticmethod
def load_keys_from_string(
    key_private: str | None = None, key_public: str | None = None
) -> Tuple[Optional[RSA.RsaKey], Optional[RSA.RsaKey]]:
    """Load RSA keys from PEM-like strings."""
    _key_private, _key_public = None, None

    if key_private is not None:
        if not key_private.startswith("-----"):
            key_pri = f"-----BEGIN RSA PRIVATE KEY-----\n{key_private}\n-----END RSA PRIVATE KEY-----"
        else:
            key_pri = key_private
        try:
            _key_private = RSA.import_key(key_pri)
        except ValueError:
            raise ValueError("RSA private key format error: [%s]" % key_pri)

    if key_public is not None:
        if not key_public.startswith("-----"):
            key_pub = f"-----BEGIN PUBLIC KEY-----\n{key_public}\n-----END PUBLIC KEY-----"
        else:
            key_pub = key_public
        try:
            _key_public = RSA.import_key(key_pub)
        except ValueError:
            raise ValueError("RSA public key format error: [%s]" % key_pub)

    return _key_private, _key_public

main()

Small self-test for the RSA helper.

Source code in aloha/encrypt/rsa.py
def main():
    """Small self-test for the RSA helper."""
    key_pairs = [  # private_key, public_key
        RsaEncryptor.generate_key_pair(),
    ]

    for str_src in ("aloha!", "😄", '{"x": 1}'):
        for i, (key_pri, key_pub) in enumerate(key_pairs):
            rsa_enc = RsaEncryptor()
            x_enc = rsa_enc.encrypt_with_public_key(str_src, key_public=key_pub)
            x_txt = rsa_enc.convert_bytes_to_base64(x_enc)

            rsa_dec = RsaEncryptor()
            y_bin = rsa_dec.convert_base64_to_bytes(x_txt)
            y_dec = rsa_dec.decrypt_with_private_key(y_bin, key_private=key_pri)
            y_txt = y_dec.decode("UTF-8")

            msg = "[test {i_case} success = {status}] {src} -> {enc}".format(
                i_case=i, status=(y_txt == str_src), src=y_txt, enc=x_txt
            )
            print(msg)

AesVault

Bases: AesEncryptor, BaseVault

Password vault that stores encrypted secrets with AES.

Source code in aloha/encrypt/vault/plain.py
class AesVault(AesEncryptor, BaseVault):
    """Password vault that stores encrypted secrets with AES."""

    def __init__(self, key: str | None = None):
        """Initialize the vault with an optional AES key."""
        super().__init__(key)

    def decrypt_password(self, pwd):
        """Decrypt the stored password and return plain text."""
        if _is_empty_str(pwd):
            return None
        return self.decrypt(pwd)

__init__(key=None)

Initialize the vault with an optional AES key.

Source code in aloha/encrypt/vault/plain.py
def __init__(self, key: str | None = None):
    """Initialize the vault with an optional AES key."""
    super().__init__(key)

decrypt_password(pwd)

Decrypt the stored password and return plain text.

Source code in aloha/encrypt/vault/plain.py
def decrypt_password(self, pwd):
    """Decrypt the stored password and return plain text."""
    if _is_empty_str(pwd):
        return None
    return self.decrypt(pwd)

BaseVault

Bases: ABC

Abstract base class for password vault implementations.

Source code in aloha/encrypt/vault/base.py
class BaseVault(abc.ABC):
    """Abstract base class for password vault implementations."""

    @abc.abstractmethod
    def decrypt_password(self, *args, **kwargs):
        """Decrypt a password and return the plain-text value."""
        return kwargs.get("password")

    def get_password(self, password, *args, **kwargs):
        """Return a password, optionally URL-encoded."""
        kwargs.update(password if isinstance(password, dict) else {"password": password})
        url_quote = kwargs.get("url_encode", True)

        pwd = self.decrypt_password(*args, **kwargs)
        if pwd is None:
            return None

        if url_quote:
            return urlquote(pwd)
        else:
            return pwd

decrypt_password(*args, **kwargs) abstractmethod

Decrypt a password and return the plain-text value.

Source code in aloha/encrypt/vault/base.py
@abc.abstractmethod
def decrypt_password(self, *args, **kwargs):
    """Decrypt a password and return the plain-text value."""
    return kwargs.get("password")

get_password(password, *args, **kwargs)

Return a password, optionally URL-encoded.

Source code in aloha/encrypt/vault/base.py
def get_password(self, password, *args, **kwargs):
    """Return a password, optionally URL-encoded."""
    kwargs.update(password if isinstance(password, dict) else {"password": password})
    url_quote = kwargs.get("url_encode", True)

    pwd = self.decrypt_password(*args, **kwargs)
    if pwd is None:
        return None

    if url_quote:
        return urlquote(pwd)
    else:
        return pwd

CyberArkVault

Bases: BaseVault, AesEncryptor

Fetch and decrypt passwords from a CyberArk-compatible endpoint.

Source code in aloha/encrypt/vault/cyberark.py
class CyberArkVault(BaseVault, AesEncryptor):
    """Fetch and decrypt passwords from a CyberArk-compatible endpoint."""

    _cached: dict = {}

    def __init__(self, url: str, app_id: str, key: str | None = None, safe: str = "AIM_ELIS_LAS", folder: str = "root"):
        """Initialize the vault with the CyberArk endpoint and credentials."""
        super().__init__(key)
        self.key, self.url, self.app_id, self.safe, self.folder = key, url, app_id, safe, folder

    @staticmethod
    def get_sign(appid, keyvalue):
        """Generate the CyberArk request signature."""
        hash_string = appid + "&" + keyvalue
        sha256 = hashlib.sha256()
        sha256.update(hash_string.encode("utf8"))
        return sha256.hexdigest()

    def decrypt_password(self, text):
        """Decrypt the AES-encrypted password returned by CyberArk."""
        if text is None:
            return None

        cryptor = AES.new(self.key_aes, AES.MODE_ECB)
        s = cryptor.decrypt(a2b_hex(text.encode()))
        s = s[0 : -s[-1]]
        return s.decode()

    def get_cyberark_password(self, object: str | None = None, **kwargs):
        """Request and decrypt a password from the CyberArk endpoint."""
        assert isinstance(object, str)
        kwargs.update({"object": object})

        app_id = kwargs.get("app_id", self.app_id)
        data = {
            "appId": app_id,
            "safe": kwargs.get("safe", self.safe),
            "folder": kwargs.get("folder", self.folder),
            "object": kwargs.get("object", object),
            "sign": self.get_sign(app_id, self.key),
        }

        retry = 5
        while retry:
            try:
                LOG.debug("POST CyberArk: %s with data: %s", self.url, data)
                resp = requests.post(
                    self.url,
                    json=data,
                    headers={"Content-Type": "application/json"},
                    # verify=False,
                )
                tmp = resp.json()
                if resp.status_code == 200 and int(tmp["code"]) == 200:
                    LOG.debug("Got data from CyberArk: %s", tmp)
                    return self.decrypt_password(tmp["password"])
                else:
                    raise RuntimeError(resp.text)
            except Exception as e:
                retry -= 1
                if retry == 0:
                    raise e
                else:
                    LOG.error("CyberArk request error: {}".format(e))
        return None

    def get_password(self, object=None, **kwargs):
        """Return a cached CyberArk password, optionally URL-encoded."""
        key_for_cache = "{app_id};{safe};{folder};{key};{object}".format(
            app_id=self.app_id, safe=self.safe, folder=self.folder, key=self.key, object=object
        )
        if key_for_cache not in self._cached:
            kwargs.update(object if isinstance(object, dict) else {"object": object})
            url_quote = kwargs.get("url_encode", True)

            pwd = self.get_cyberark_password(**kwargs)
            if url_quote:  # quote/escape password by default
                pwd = urlquote(pwd)
            self._cached[key_for_cache] = pwd
        else:
            LOG.debug("Using cached CyberArk key: %s" % key_for_cache)

        return self._cached[key_for_cache]

__init__(url, app_id, key=None, safe='AIM_ELIS_LAS', folder='root')

Initialize the vault with the CyberArk endpoint and credentials.

Source code in aloha/encrypt/vault/cyberark.py
def __init__(self, url: str, app_id: str, key: str | None = None, safe: str = "AIM_ELIS_LAS", folder: str = "root"):
    """Initialize the vault with the CyberArk endpoint and credentials."""
    super().__init__(key)
    self.key, self.url, self.app_id, self.safe, self.folder = key, url, app_id, safe, folder

decrypt_password(text)

Decrypt the AES-encrypted password returned by CyberArk.

Source code in aloha/encrypt/vault/cyberark.py
def decrypt_password(self, text):
    """Decrypt the AES-encrypted password returned by CyberArk."""
    if text is None:
        return None

    cryptor = AES.new(self.key_aes, AES.MODE_ECB)
    s = cryptor.decrypt(a2b_hex(text.encode()))
    s = s[0 : -s[-1]]
    return s.decode()

get_cyberark_password(object=None, **kwargs)

Request and decrypt a password from the CyberArk endpoint.

Source code in aloha/encrypt/vault/cyberark.py
def get_cyberark_password(self, object: str | None = None, **kwargs):
    """Request and decrypt a password from the CyberArk endpoint."""
    assert isinstance(object, str)
    kwargs.update({"object": object})

    app_id = kwargs.get("app_id", self.app_id)
    data = {
        "appId": app_id,
        "safe": kwargs.get("safe", self.safe),
        "folder": kwargs.get("folder", self.folder),
        "object": kwargs.get("object", object),
        "sign": self.get_sign(app_id, self.key),
    }

    retry = 5
    while retry:
        try:
            LOG.debug("POST CyberArk: %s with data: %s", self.url, data)
            resp = requests.post(
                self.url,
                json=data,
                headers={"Content-Type": "application/json"},
                # verify=False,
            )
            tmp = resp.json()
            if resp.status_code == 200 and int(tmp["code"]) == 200:
                LOG.debug("Got data from CyberArk: %s", tmp)
                return self.decrypt_password(tmp["password"])
            else:
                raise RuntimeError(resp.text)
        except Exception as e:
            retry -= 1
            if retry == 0:
                raise e
            else:
                LOG.error("CyberArk request error: {}".format(e))
    return None

get_password(object=None, **kwargs)

Return a cached CyberArk password, optionally URL-encoded.

Source code in aloha/encrypt/vault/cyberark.py
def get_password(self, object=None, **kwargs):
    """Return a cached CyberArk password, optionally URL-encoded."""
    key_for_cache = "{app_id};{safe};{folder};{key};{object}".format(
        app_id=self.app_id, safe=self.safe, folder=self.folder, key=self.key, object=object
    )
    if key_for_cache not in self._cached:
        kwargs.update(object if isinstance(object, dict) else {"object": object})
        url_quote = kwargs.get("url_encode", True)

        pwd = self.get_cyberark_password(**kwargs)
        if url_quote:  # quote/escape password by default
            pwd = urlquote(pwd)
        self._cached[key_for_cache] = pwd
    else:
        LOG.debug("Using cached CyberArk key: %s" % key_for_cache)

    return self._cached[key_for_cache]

get_sign(appid, keyvalue) staticmethod

Generate the CyberArk request signature.

Source code in aloha/encrypt/vault/cyberark.py
@staticmethod
def get_sign(appid, keyvalue):
    """Generate the CyberArk request signature."""
    hash_string = appid + "&" + keyvalue
    sha256 = hashlib.sha256()
    sha256.update(hash_string.encode("utf8"))
    return sha256.hexdigest()

DummyVault

Bases: BaseVault

Vault implementation that returns passwords unchanged.

Source code in aloha/encrypt/vault/base.py
class DummyVault(BaseVault):
    """Vault implementation that returns passwords unchanged."""

    def decrypt_password(self, *args, **kwargs):
        """Return the original password without decryption."""
        return kwargs.get("password")

decrypt_password(*args, **kwargs)

Return the original password without decryption.

Source code in aloha/encrypt/vault/base.py
def decrypt_password(self, *args, **kwargs):
    """Return the original password without decryption."""
    return kwargs.get("password")

Password vault abstraction used by database and service helpers.

BaseVault

Bases: ABC

Abstract base class for password vault implementations.

Source code in aloha/encrypt/vault/base.py
class BaseVault(abc.ABC):
    """Abstract base class for password vault implementations."""

    @abc.abstractmethod
    def decrypt_password(self, *args, **kwargs):
        """Decrypt a password and return the plain-text value."""
        return kwargs.get("password")

    def get_password(self, password, *args, **kwargs):
        """Return a password, optionally URL-encoded."""
        kwargs.update(password if isinstance(password, dict) else {"password": password})
        url_quote = kwargs.get("url_encode", True)

        pwd = self.decrypt_password(*args, **kwargs)
        if pwd is None:
            return None

        if url_quote:
            return urlquote(pwd)
        else:
            return pwd

decrypt_password(*args, **kwargs) abstractmethod

Decrypt a password and return the plain-text value.

Source code in aloha/encrypt/vault/base.py
@abc.abstractmethod
def decrypt_password(self, *args, **kwargs):
    """Decrypt a password and return the plain-text value."""
    return kwargs.get("password")

get_password(password, *args, **kwargs)

Return a password, optionally URL-encoded.

Source code in aloha/encrypt/vault/base.py
def get_password(self, password, *args, **kwargs):
    """Return a password, optionally URL-encoded."""
    kwargs.update(password if isinstance(password, dict) else {"password": password})
    url_quote = kwargs.get("url_encode", True)

    pwd = self.decrypt_password(*args, **kwargs)
    if pwd is None:
        return None

    if url_quote:
        return urlquote(pwd)
    else:
        return pwd

DummyVault

Bases: BaseVault

Vault implementation that returns passwords unchanged.

Source code in aloha/encrypt/vault/base.py
class DummyVault(BaseVault):
    """Vault implementation that returns passwords unchanged."""

    def decrypt_password(self, *args, **kwargs):
        """Return the original password without decryption."""
        return kwargs.get("password")

decrypt_password(*args, **kwargs)

Return the original password without decryption.

Source code in aloha/encrypt/vault/base.py
def decrypt_password(self, *args, **kwargs):
    """Return the original password without decryption."""
    return kwargs.get("password")

main()

Small self-test for the dummy vault.

Source code in aloha/encrypt/vault/base.py
def main():
    """Small self-test for the dummy vault."""
    vault = DummyVault()
    ret = vault.get_password(None, url_quote=True)
    # print(ret)
    return ret

AES-backed password vault helpers.

AesVault

Bases: AesEncryptor, BaseVault

Password vault that stores encrypted secrets with AES.

Source code in aloha/encrypt/vault/plain.py
class AesVault(AesEncryptor, BaseVault):
    """Password vault that stores encrypted secrets with AES."""

    def __init__(self, key: str | None = None):
        """Initialize the vault with an optional AES key."""
        super().__init__(key)

    def decrypt_password(self, pwd):
        """Decrypt the stored password and return plain text."""
        if _is_empty_str(pwd):
            return None
        return self.decrypt(pwd)

__init__(key=None)

Initialize the vault with an optional AES key.

Source code in aloha/encrypt/vault/plain.py
def __init__(self, key: str | None = None):
    """Initialize the vault with an optional AES key."""
    super().__init__(key)

decrypt_password(pwd)

Decrypt the stored password and return plain text.

Source code in aloha/encrypt/vault/plain.py
def decrypt_password(self, pwd):
    """Decrypt the stored password and return plain text."""
    if _is_empty_str(pwd):
        return None
    return self.decrypt(pwd)

main()

Small self-test for the AES vault.

Source code in aloha/encrypt/vault/plain.py
def main():
    """Small self-test for the AES vault."""
    vault = AesVault()
    pwd = vault.get_password(None, url_quote=True)
    # print(pwd)
    return pwd

CyberArk-based password vault helpers.

CyberArkVault

Bases: BaseVault, AesEncryptor

Fetch and decrypt passwords from a CyberArk-compatible endpoint.

Source code in aloha/encrypt/vault/cyberark.py
class CyberArkVault(BaseVault, AesEncryptor):
    """Fetch and decrypt passwords from a CyberArk-compatible endpoint."""

    _cached: dict = {}

    def __init__(self, url: str, app_id: str, key: str | None = None, safe: str = "AIM_ELIS_LAS", folder: str = "root"):
        """Initialize the vault with the CyberArk endpoint and credentials."""
        super().__init__(key)
        self.key, self.url, self.app_id, self.safe, self.folder = key, url, app_id, safe, folder

    @staticmethod
    def get_sign(appid, keyvalue):
        """Generate the CyberArk request signature."""
        hash_string = appid + "&" + keyvalue
        sha256 = hashlib.sha256()
        sha256.update(hash_string.encode("utf8"))
        return sha256.hexdigest()

    def decrypt_password(self, text):
        """Decrypt the AES-encrypted password returned by CyberArk."""
        if text is None:
            return None

        cryptor = AES.new(self.key_aes, AES.MODE_ECB)
        s = cryptor.decrypt(a2b_hex(text.encode()))
        s = s[0 : -s[-1]]
        return s.decode()

    def get_cyberark_password(self, object: str | None = None, **kwargs):
        """Request and decrypt a password from the CyberArk endpoint."""
        assert isinstance(object, str)
        kwargs.update({"object": object})

        app_id = kwargs.get("app_id", self.app_id)
        data = {
            "appId": app_id,
            "safe": kwargs.get("safe", self.safe),
            "folder": kwargs.get("folder", self.folder),
            "object": kwargs.get("object", object),
            "sign": self.get_sign(app_id, self.key),
        }

        retry = 5
        while retry:
            try:
                LOG.debug("POST CyberArk: %s with data: %s", self.url, data)
                resp = requests.post(
                    self.url,
                    json=data,
                    headers={"Content-Type": "application/json"},
                    # verify=False,
                )
                tmp = resp.json()
                if resp.status_code == 200 and int(tmp["code"]) == 200:
                    LOG.debug("Got data from CyberArk: %s", tmp)
                    return self.decrypt_password(tmp["password"])
                else:
                    raise RuntimeError(resp.text)
            except Exception as e:
                retry -= 1
                if retry == 0:
                    raise e
                else:
                    LOG.error("CyberArk request error: {}".format(e))
        return None

    def get_password(self, object=None, **kwargs):
        """Return a cached CyberArk password, optionally URL-encoded."""
        key_for_cache = "{app_id};{safe};{folder};{key};{object}".format(
            app_id=self.app_id, safe=self.safe, folder=self.folder, key=self.key, object=object
        )
        if key_for_cache not in self._cached:
            kwargs.update(object if isinstance(object, dict) else {"object": object})
            url_quote = kwargs.get("url_encode", True)

            pwd = self.get_cyberark_password(**kwargs)
            if url_quote:  # quote/escape password by default
                pwd = urlquote(pwd)
            self._cached[key_for_cache] = pwd
        else:
            LOG.debug("Using cached CyberArk key: %s" % key_for_cache)

        return self._cached[key_for_cache]

__init__(url, app_id, key=None, safe='AIM_ELIS_LAS', folder='root')

Initialize the vault with the CyberArk endpoint and credentials.

Source code in aloha/encrypt/vault/cyberark.py
def __init__(self, url: str, app_id: str, key: str | None = None, safe: str = "AIM_ELIS_LAS", folder: str = "root"):
    """Initialize the vault with the CyberArk endpoint and credentials."""
    super().__init__(key)
    self.key, self.url, self.app_id, self.safe, self.folder = key, url, app_id, safe, folder

decrypt_password(text)

Decrypt the AES-encrypted password returned by CyberArk.

Source code in aloha/encrypt/vault/cyberark.py
def decrypt_password(self, text):
    """Decrypt the AES-encrypted password returned by CyberArk."""
    if text is None:
        return None

    cryptor = AES.new(self.key_aes, AES.MODE_ECB)
    s = cryptor.decrypt(a2b_hex(text.encode()))
    s = s[0 : -s[-1]]
    return s.decode()

get_cyberark_password(object=None, **kwargs)

Request and decrypt a password from the CyberArk endpoint.

Source code in aloha/encrypt/vault/cyberark.py
def get_cyberark_password(self, object: str | None = None, **kwargs):
    """Request and decrypt a password from the CyberArk endpoint."""
    assert isinstance(object, str)
    kwargs.update({"object": object})

    app_id = kwargs.get("app_id", self.app_id)
    data = {
        "appId": app_id,
        "safe": kwargs.get("safe", self.safe),
        "folder": kwargs.get("folder", self.folder),
        "object": kwargs.get("object", object),
        "sign": self.get_sign(app_id, self.key),
    }

    retry = 5
    while retry:
        try:
            LOG.debug("POST CyberArk: %s with data: %s", self.url, data)
            resp = requests.post(
                self.url,
                json=data,
                headers={"Content-Type": "application/json"},
                # verify=False,
            )
            tmp = resp.json()
            if resp.status_code == 200 and int(tmp["code"]) == 200:
                LOG.debug("Got data from CyberArk: %s", tmp)
                return self.decrypt_password(tmp["password"])
            else:
                raise RuntimeError(resp.text)
        except Exception as e:
            retry -= 1
            if retry == 0:
                raise e
            else:
                LOG.error("CyberArk request error: {}".format(e))
    return None

get_password(object=None, **kwargs)

Return a cached CyberArk password, optionally URL-encoded.

Source code in aloha/encrypt/vault/cyberark.py
def get_password(self, object=None, **kwargs):
    """Return a cached CyberArk password, optionally URL-encoded."""
    key_for_cache = "{app_id};{safe};{folder};{key};{object}".format(
        app_id=self.app_id, safe=self.safe, folder=self.folder, key=self.key, object=object
    )
    if key_for_cache not in self._cached:
        kwargs.update(object if isinstance(object, dict) else {"object": object})
        url_quote = kwargs.get("url_encode", True)

        pwd = self.get_cyberark_password(**kwargs)
        if url_quote:  # quote/escape password by default
            pwd = urlquote(pwd)
        self._cached[key_for_cache] = pwd
    else:
        LOG.debug("Using cached CyberArk key: %s" % key_for_cache)

    return self._cached[key_for_cache]

get_sign(appid, keyvalue) staticmethod

Generate the CyberArk request signature.

Source code in aloha/encrypt/vault/cyberark.py
@staticmethod
def get_sign(appid, keyvalue):
    """Generate the CyberArk request signature."""
    hash_string = appid + "&" + keyvalue
    sha256 = hashlib.sha256()
    sha256.update(hash_string.encode("utf8"))
    return sha256.hexdigest()

main()

Small self-test scaffold for the CyberArk vault.

Source code in aloha/encrypt/vault/cyberark.py
def main():
    """Small self-test scaffold for the CyberArk vault."""
    cfg_cyberark = dict(
        url="https://localhost/pidms/rest/pwd/getPassword",  # to fill properly
        app_id="",
        safe="",
        folder="root",
        key="",
    )
    # from ...settings import SETTINGS
    # cfg_cyberark = SETTINGS.config['CYBERARK_CONFIG']
    vault = CyberArkVault(**cfg_cyberark)
    pwd = vault.get_password({"object": "PG_"})
    assert pwd is not None