Skip to content

数据库

PasswordVault

Password vault manager that provides access to password vault implementations.

Caches vault instances for performance.

Source code in aloha/db/base.py
class PasswordVault:
    """
    Password vault manager that provides access to password vault implementations.

    Caches vault instances for performance.
    """

    _dict_cache_vault = {}

    @staticmethod
    def get_vault(vault_type: str | None = None, vault_config: dict | None = None, **kwargs) -> vault.BaseVault:
        """
        Get a password vault instance.

        Supports multiple vault types:
        - 'plain' or 'aes': AES-based vault (default fallback)
        - 'cyberark': CyberArk vault
        - Other/None: Dummy vault (plain text)

        :param vault_type: Type of vault to use (overrides config)
        :param vault_config: Vault configuration dictionary
        :param args: Additional arguments
        :param kwargs: Additional keyword arguments
        :return: Vault instance implementing BaseVault interface
        :raises RuntimeError: If CyberArk vault is requested but config is missing
        """
        encryption_method = vault_type or SETTINGS.config.get("PASSWORD_ENCRYPTION")
        LOG.debug("Using password vault: %s", encryption_method)  # nosemgrep

        cache_key = "%s:%s" % (encryption_method, str(vault_config))
        if cache_key not in PasswordVault._dict_cache_vault:
            if encryption_method in ("plain", "aes") or encryption_method is True:
                v = vault.AesVault(**(vault_config or {}))
            elif encryption_method == "cyberark":
                config_cyberark = vault_config or SETTINGS.config.get("CYBERARK_CONFIG")
                if config_cyberark is None:
                    raise RuntimeError("Missing [CYBERARK_CONFIG] in config!")
                v = vault.CyberArkVault(**config_cyberark)
            else:
                msg = "Using plain password vault as unknown value of PASSWORD_ENCRYPTION=%s in config." % encryption_method
                LOG.info(msg)  # nosemgrep
                v = vault.DummyVault(**(vault_config or {}))
            PasswordVault._dict_cache_vault[cache_key] = v

        return PasswordVault._dict_cache_vault[cache_key]

get_vault(vault_type=None, vault_config=None, **kwargs) staticmethod

Get a password vault instance.

Supports multiple vault types: - 'plain' or 'aes': AES-based vault (default fallback) - 'cyberark': CyberArk vault - Other/None: Dummy vault (plain text)

Parameters:

Name Type Description Default
vault_type str | None

Type of vault to use (overrides config)

None
vault_config dict | None

Vault configuration dictionary

None
args

Additional arguments

required
kwargs

Additional keyword arguments

{}

Returns:

Type Description
BaseVault

Vault instance implementing BaseVault interface

Raises:

Type Description
RuntimeError

If CyberArk vault is requested but config is missing

Source code in aloha/db/base.py
@staticmethod
def get_vault(vault_type: str | None = None, vault_config: dict | None = None, **kwargs) -> vault.BaseVault:
    """
    Get a password vault instance.

    Supports multiple vault types:
    - 'plain' or 'aes': AES-based vault (default fallback)
    - 'cyberark': CyberArk vault
    - Other/None: Dummy vault (plain text)

    :param vault_type: Type of vault to use (overrides config)
    :param vault_config: Vault configuration dictionary
    :param args: Additional arguments
    :param kwargs: Additional keyword arguments
    :return: Vault instance implementing BaseVault interface
    :raises RuntimeError: If CyberArk vault is requested but config is missing
    """
    encryption_method = vault_type or SETTINGS.config.get("PASSWORD_ENCRYPTION")
    LOG.debug("Using password vault: %s", encryption_method)  # nosemgrep

    cache_key = "%s:%s" % (encryption_method, str(vault_config))
    if cache_key not in PasswordVault._dict_cache_vault:
        if encryption_method in ("plain", "aes") or encryption_method is True:
            v = vault.AesVault(**(vault_config or {}))
        elif encryption_method == "cyberark":
            config_cyberark = vault_config or SETTINGS.config.get("CYBERARK_CONFIG")
            if config_cyberark is None:
                raise RuntimeError("Missing [CYBERARK_CONFIG] in config!")
            v = vault.CyberArkVault(**config_cyberark)
        else:
            msg = "Using plain password vault as unknown value of PASSWORD_ENCRYPTION=%s in config." % encryption_method
            LOG.info(msg)  # nosemgrep
            v = vault.DummyVault(**(vault_config or {}))
        PasswordVault._dict_cache_vault[cache_key] = v

    return PasswordVault._dict_cache_vault[cache_key]

main()

Command-line tool to decrypt passwords from config.

Usage: python -m aloha.db.base

Source code in aloha/db/base.py
def main():
    """
    Command-line tool to decrypt passwords from config.

    Usage: python -m aloha.db.base <config_key>
    """
    import sys

    config_key = sys.argv[-1]
    LOG.debug("Getting pwd for deploy key [deploy.%s]" % config_key)
    try:
        db_config = SETTINGS.config["deploy"][config_key]
        password_vault = PasswordVault.get_vault()
        p = password_vault.get_password(db_config.get("password"))
        LOG.debug("Decrypted PWD: %s" % p)
    except KeyError:
        LOG.error("Please make sure config key [deploy.%s] exists!" % config_key)

DuckDB connection helpers.

DuckOperator

Create and use a DuckDB connection through SQLAlchemy.

Source code in aloha/db/duckdb.py
class DuckOperator:
    """Create and use a DuckDB connection through SQLAlchemy."""

    def __init__(self, db_config, **kwargs):
        """Build a DuckDB engine, creating the database file if necessary."""
        """db_config example:
        {
            "path": "/path/to/db.duckdb",     # file path of duckdb, use ":memory:" for in-memory mode
            "schema": "sales",                # optional, 'main' by default
            "read_only": True,                # optional, False by default, (will set to False if in in-memory mode)
            "config": {"memory_limit": "500mb"}, # optional, duckdb connection configs
        }
        """
        self._config = {
            "path": db_config.get("path", ":memory:"),
            "schema": db_config.get("schema", "main"),
            "read_only": bool(db_config.get("read_only", False)),
            "config": db_config.get("config", {}),
            "auto_commit": db_config.get("auto_commit", True),
        }

        if not self._config["path"] or self._config["path"] == ":memory:":  # in-memory mode
            self._config["path"] = ":memory:"

            if self._config["read_only"]:  # in-memory mode cannot be read-only
                LOG.warning("In-memory database cannot be read-only. Setting read_only=False.")
                self._config["read_only"] = False

        else:
            self._prepare_database()

        try:
            str_connection = f"duckdb:///{self._config['path']}"
            self.engine = create_engine(
                str_connection,
                connect_args={"read_only": self._config["read_only"], "config": self._config["config"]},
                **kwargs,
            )

            self._initialize_schema()
            msg = f"DuckDB connected: {self._config['path']} [schema={self._config['schema']}, read_only={self._config['read_only']}]"
            LOG.debug(msg)
        except Exception as e:
            LOG.exception(e)
            raise RuntimeError("Failed to connect to DuckDB")

    def _prepare_database(self):
        """Prepare the database file and its parent directory."""
        path = self._config["path"]
        path_obj = Path(path)

        parent_dir = path_obj.parent
        if not parent_dir.exists():
            if self._config["read_only"]:
                raise RuntimeError(f"Directory '{parent_dir}' does not exist and read_only=True")
            try:
                parent_dir.mkdir(parents=True, exist_ok=True)
                LOG.debug(f"Created directory: {parent_dir}")
            except Exception as e:
                raise RuntimeError(f"Failed to create directory '{parent_dir}': {e}")

        if not path_obj.exists():
            if self._config["read_only"]:
                raise RuntimeError(f"DuckDB file '{path}' does not exist and read_only=True")
            try:
                LOG.debug(f"Database file not found, creating: {path}")
                duckdb.connect(path).close()
            except Exception as e:
                raise RuntimeError(f"Failed to create database file '{path}': {e}")

    def _initialize_schema(self):
        """Create or select the requested schema."""
        if self._config["schema"] == "main":
            return

        try:
            if self._config["read_only"]:
                result = self.engine.connext().execute(
                    text("SELECT schema_name FROM information_schema.schemata WHERE schema_name = :schema"),
                    {"schema": self._config["schema"]},
                )
                if not result.fetchone():
                    raise RuntimeError(f"Schema '{self._config['schema']}' does not exist and read_only=True")
            else:
                self.engine.connect().execute(text(f"CREATE SCHEMA IF NOT EXISTS {self._config['schema']}"))

            self.engine.connect().execute(text(f"SET schema '{self._config['schema']}'"))
        except Exception as e:
            raise RuntimeError(f"Failed to initialize schema: {e}")

    @property
    def connection(self):
        return self.engine

    conn = connection

    def execute_query(self, sql, *args, **kwargs):
        """Execute a SQL statement and return the cursor result."""
        with self.engine.connect() as conn:
            cur = conn.execute(text(sql), *args, *kwargs)
            if self._config.get("auto_commit", True):
                conn.commit()
            return cur

    @property
    def connection_str(self) -> str:
        """Return a human-readable connection string."""
        return f"duckdb:///{self._config['path']} [schema={self._config['schema']}, read_only={self._config['read_only']}]"

connection_str property

Return a human-readable connection string.

__init__(db_config, **kwargs)

Build a DuckDB engine, creating the database file if necessary.

Source code in aloha/db/duckdb.py
def __init__(self, db_config, **kwargs):
    """Build a DuckDB engine, creating the database file if necessary."""
    """db_config example:
    {
        "path": "/path/to/db.duckdb",     # file path of duckdb, use ":memory:" for in-memory mode
        "schema": "sales",                # optional, 'main' by default
        "read_only": True,                # optional, False by default, (will set to False if in in-memory mode)
        "config": {"memory_limit": "500mb"}, # optional, duckdb connection configs
    }
    """
    self._config = {
        "path": db_config.get("path", ":memory:"),
        "schema": db_config.get("schema", "main"),
        "read_only": bool(db_config.get("read_only", False)),
        "config": db_config.get("config", {}),
        "auto_commit": db_config.get("auto_commit", True),
    }

    if not self._config["path"] or self._config["path"] == ":memory:":  # in-memory mode
        self._config["path"] = ":memory:"

        if self._config["read_only"]:  # in-memory mode cannot be read-only
            LOG.warning("In-memory database cannot be read-only. Setting read_only=False.")
            self._config["read_only"] = False

    else:
        self._prepare_database()

    try:
        str_connection = f"duckdb:///{self._config['path']}"
        self.engine = create_engine(
            str_connection,
            connect_args={"read_only": self._config["read_only"], "config": self._config["config"]},
            **kwargs,
        )

        self._initialize_schema()
        msg = f"DuckDB connected: {self._config['path']} [schema={self._config['schema']}, read_only={self._config['read_only']}]"
        LOG.debug(msg)
    except Exception as e:
        LOG.exception(e)
        raise RuntimeError("Failed to connect to DuckDB")

execute_query(sql, *args, **kwargs)

Execute a SQL statement and return the cursor result.

Source code in aloha/db/duckdb.py
def execute_query(self, sql, *args, **kwargs):
    """Execute a SQL statement and return the cursor result."""
    with self.engine.connect() as conn:
        cur = conn.execute(text(sql), *args, *kwargs)
        if self._config.get("auto_commit", True):
            conn.commit()
        return cur

ElasticSearchOperator

Create and use an Elasticsearch client with optional index helpers.

Source code in aloha/db/elasticsearch.py
class ElasticSearchOperator:
    """Create and use an Elasticsearch client with optional index helpers."""

    def __init__(self, config, index_config=None):
        """Build the client and optionally load the index configuration."""
        self.es_config = config

        password_vault = PasswordVault.get_vault(config.get("vault_type"), config.get("vault_config"))
        username = config.get("username")
        password = password_vault.get_password(config.get("password"))

        hosts = config.get("host", "localhost")
        masked_hosts = _mask_hosts(hosts)
        LOG.debug("ElasticSearch connection info: " + str(masked_hosts))

        self._config = {
            "http_auth": (username, password) if username is not None and password is not None else None,
            "hosts": hosts,
            "timeout": config.get("timeout", 0.1),
            "max_retries": config.get("max_retries", 3),
            "retry_on_timeout": config.get("retry_on_timeout", True),
        }

        self.index_config = index_config
        self.index_name = self.es_config.get("index_name")
        self.index_type = self.es_config.get("index_type")

        self.es = Elasticsearch(**self._config)

        if index_config is not None:
            self.index_config = self._load_config(index_config)

    @staticmethod
    def _load_config(config):
        """Load an index configuration from a dict or JSON file."""
        if isinstance(config, dict):
            return config

        elif isinstance(config, str) and ".json" in config:
            with open(config, "r", encoding="utf-8") as f:
                config = json.load(f)
            return config

        else:
            raise ValueError("Invalid ES config data type")

    def put_mapping(self, index_name=None, index_type=None, index_config: dict | None = None):
        """Apply a mapping definition to the current index."""
        return self.es.indices.put_mapping(
            index=index_name or self.index_name,
            doc_type=index_type or self.index_type,
            body=index_config["mappings"][index_type or self.index_type],
        )

    def build_index(self, index_name=None, index_config=None, raise_if_exist=False):
        """Create the index if it does not already exist."""
        if self.es.indices.exists(index=index_name or self.index_name) is not True:
            res = self.es.indices.create(index=index_name or self.index_name, body=index_config or self.index_config)
            return res
        else:
            msg = "Index [%s] already exits" % self.index_name
            if raise_if_exist:
                raise RuntimeError(msg)
            else:
                LOG.info(msg)
                return False

    def search(self, query, index_name=None, index_type=None):
        """Execute a search query."""
        return self.es.search(index=index_name or self.index_name, doc_type=index_type or self.index_type, body=query)

    def msearch(self, body):
        """Execute a multi-search request."""
        return self.es.msearch(body=body)

    def insert(self, doc, index_name=None, index_type=None, id=None):
        """Insert or replace a document."""
        return self.es.index(index=index_name or self.index_name, doc_type=index_type or self.index_type, id=id, body=doc)

__init__(config, index_config=None)

Build the client and optionally load the index configuration.

Source code in aloha/db/elasticsearch.py
def __init__(self, config, index_config=None):
    """Build the client and optionally load the index configuration."""
    self.es_config = config

    password_vault = PasswordVault.get_vault(config.get("vault_type"), config.get("vault_config"))
    username = config.get("username")
    password = password_vault.get_password(config.get("password"))

    hosts = config.get("host", "localhost")
    masked_hosts = _mask_hosts(hosts)
    LOG.debug("ElasticSearch connection info: " + str(masked_hosts))

    self._config = {
        "http_auth": (username, password) if username is not None and password is not None else None,
        "hosts": hosts,
        "timeout": config.get("timeout", 0.1),
        "max_retries": config.get("max_retries", 3),
        "retry_on_timeout": config.get("retry_on_timeout", True),
    }

    self.index_config = index_config
    self.index_name = self.es_config.get("index_name")
    self.index_type = self.es_config.get("index_type")

    self.es = Elasticsearch(**self._config)

    if index_config is not None:
        self.index_config = self._load_config(index_config)

build_index(index_name=None, index_config=None, raise_if_exist=False)

Create the index if it does not already exist.

Source code in aloha/db/elasticsearch.py
def build_index(self, index_name=None, index_config=None, raise_if_exist=False):
    """Create the index if it does not already exist."""
    if self.es.indices.exists(index=index_name or self.index_name) is not True:
        res = self.es.indices.create(index=index_name or self.index_name, body=index_config or self.index_config)
        return res
    else:
        msg = "Index [%s] already exits" % self.index_name
        if raise_if_exist:
            raise RuntimeError(msg)
        else:
            LOG.info(msg)
            return False

insert(doc, index_name=None, index_type=None, id=None)

Insert or replace a document.

Source code in aloha/db/elasticsearch.py
def insert(self, doc, index_name=None, index_type=None, id=None):
    """Insert or replace a document."""
    return self.es.index(index=index_name or self.index_name, doc_type=index_type or self.index_type, id=id, body=doc)

msearch(body)

Execute a multi-search request.

Source code in aloha/db/elasticsearch.py
def msearch(self, body):
    """Execute a multi-search request."""
    return self.es.msearch(body=body)

put_mapping(index_name=None, index_type=None, index_config=None)

Apply a mapping definition to the current index.

Source code in aloha/db/elasticsearch.py
def put_mapping(self, index_name=None, index_type=None, index_config: dict | None = None):
    """Apply a mapping definition to the current index."""
    return self.es.indices.put_mapping(
        index=index_name or self.index_name,
        doc_type=index_type or self.index_type,
        body=index_config["mappings"][index_type or self.index_type],
    )

search(query, index_name=None, index_type=None)

Execute a search query.

Source code in aloha/db/elasticsearch.py
def search(self, query, index_name=None, index_type=None):
    """Execute a search query."""
    return self.es.search(index=index_name or self.index_name, doc_type=index_type or self.index_type, body=query)

Kafka connection helpers.

KafkaOperator

Create Kafka admin, producer, and consumer clients.

Source code in aloha/db/kafka.py
class KafkaOperator:
    """Create Kafka admin, producer, and consumer clients."""

    def __init__(self, kafka_config):
        """
        Parameter reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

        :param kafka_config:
        host = [
            {host: kafka_server_1, port: 9092}
        ]
        """
        self._config = json.loads(json.dumps(kafka_config, ensure_ascii=False))  # deep copy

        if "host" in kafka_config:
            self._config = {
                "bootstrap.servers": ",".join(["{host}:{port}".format(**i) for i in kafka_config.pop("host")]),
            }
        LOG.debug("Kafka connection info: " + str(self._config))

    def admin_client(self, *args, **kwargs):
        """Return a configured Kafka AdminClient."""
        config_admin = {**self._config}
        a = kafka_admin.AdminClient(config_admin)
        return a

    def create_topic(self, topic: str, num_partitions=3, replication_factor=1, *args, **kwargs):
        """Create a Kafka topic and wait for the broker response."""
        """Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability."""
        a = self.admin_client()
        new_topic = kafka_admin.NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)

        # Call create_topics to asynchronously create topics. A dict of <topic,future> is returned.
        fs = a.create_topics([new_topic])

        # Wait for each operation to finish.
        for topic, f in fs.items():
            try:
                f.result()  # The result itself is None
                LOG.info("Topic {} created".format(topic))
            except Exception as e:
                LOG.error("Failed to create topic {}: {}".format(topic, e))
                return False
            finally:
                a.close()

        return True

    def producer_deliver(self, topic: str, generator: typing.Iterator[str], func_callback: callable = None, *args, **kwargs):
        """Stream messages from an iterator into a Kafka topic."""
        # func_callback should be a function that takes two arguments: err and msg
        config_producer = {**self._config}
        p = kafka.Producer(config_producer)

        def delivery_report(err, msg):
            """Called once for each message produced to indicate delivery result. Triggered by poll() or flush()."""
            if err is not None:
                LOG.error("Kafka msg delivery failed: {}".format(err))
            else:
                LOG.debug("Kafka msg delivered to {} [{}]".format(msg.topic(), msg.partition()))

        if func_callback is None:
            func_callback = delivery_report

        for data in generator:  # some data from the generator
            # Trigger any available delivery report callbacks from previous produce() calls
            p.poll(0)

            # Asynchronously produce a message, the delivery report callback
            # will be triggered from poll() above, or flush() below, when the message has
            # been successfully delivered or failed permanently.
            p.produce(topic, data.encode("utf-8"), callback=func_callback)

        # Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
        p.flush()

    def consumer_generator(
        self, topics_subscribe: list, group_id: str | None = None, poll_timeout: float = 1.0, *args, **kwargs
    ) -> typing.Iterator[str]:
        """Yield decoded messages from the subscribed Kafka topics."""
        config_consumer = {"auto.offset.reset": "earliest", **self._config}
        if group_id is not None:
            config_consumer["group.id"] = group_id
        c = kafka.Consumer(config_consumer)

        c.subscribe(topics_subscribe)
        while True:
            msg = c.poll(poll_timeout)

            if msg is None:
                continue
            elif msg.error():
                code = msg.error().code()
                if code == kafka.KafkaError._PARTITION_EOF:
                    pass
                LOG.error("Kafka consumer: {}".format(msg.error()))
                continue

            data = msg.value().decode("utf-8")
            LOG.debug("Received message: {}".format(data))
            yield data

        c.close()

__init__(kafka_config)

Parameter reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Parameters:

Name Type Description Default
kafka_config

host = [ {host: kafka_server_1, port: 9092} ]

required
Source code in aloha/db/kafka.py
def __init__(self, kafka_config):
    """
    Parameter reference: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

    :param kafka_config:
    host = [
        {host: kafka_server_1, port: 9092}
    ]
    """
    self._config = json.loads(json.dumps(kafka_config, ensure_ascii=False))  # deep copy

    if "host" in kafka_config:
        self._config = {
            "bootstrap.servers": ",".join(["{host}:{port}".format(**i) for i in kafka_config.pop("host")]),
        }
    LOG.debug("Kafka connection info: " + str(self._config))

admin_client(*args, **kwargs)

Return a configured Kafka AdminClient.

Source code in aloha/db/kafka.py
def admin_client(self, *args, **kwargs):
    """Return a configured Kafka AdminClient."""
    config_admin = {**self._config}
    a = kafka_admin.AdminClient(config_admin)
    return a

consumer_generator(topics_subscribe, group_id=None, poll_timeout=1.0, *args, **kwargs)

Yield decoded messages from the subscribed Kafka topics.

Source code in aloha/db/kafka.py
def consumer_generator(
    self, topics_subscribe: list, group_id: str | None = None, poll_timeout: float = 1.0, *args, **kwargs
) -> typing.Iterator[str]:
    """Yield decoded messages from the subscribed Kafka topics."""
    config_consumer = {"auto.offset.reset": "earliest", **self._config}
    if group_id is not None:
        config_consumer["group.id"] = group_id
    c = kafka.Consumer(config_consumer)

    c.subscribe(topics_subscribe)
    while True:
        msg = c.poll(poll_timeout)

        if msg is None:
            continue
        elif msg.error():
            code = msg.error().code()
            if code == kafka.KafkaError._PARTITION_EOF:
                pass
            LOG.error("Kafka consumer: {}".format(msg.error()))
            continue

        data = msg.value().decode("utf-8")
        LOG.debug("Received message: {}".format(data))
        yield data

    c.close()

create_topic(topic, num_partitions=3, replication_factor=1, *args, **kwargs)

Create a Kafka topic and wait for the broker response.

Source code in aloha/db/kafka.py
def create_topic(self, topic: str, num_partitions=3, replication_factor=1, *args, **kwargs):
    """Create a Kafka topic and wait for the broker response."""
    """Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability."""
    a = self.admin_client()
    new_topic = kafka_admin.NewTopic(topic, num_partitions=num_partitions, replication_factor=replication_factor)

    # Call create_topics to asynchronously create topics. A dict of <topic,future> is returned.
    fs = a.create_topics([new_topic])

    # Wait for each operation to finish.
    for topic, f in fs.items():
        try:
            f.result()  # The result itself is None
            LOG.info("Topic {} created".format(topic))
        except Exception as e:
            LOG.error("Failed to create topic {}: {}".format(topic, e))
            return False
        finally:
            a.close()

    return True

producer_deliver(topic, generator, func_callback=None, *args, **kwargs)

Stream messages from an iterator into a Kafka topic.

Source code in aloha/db/kafka.py
def producer_deliver(self, topic: str, generator: typing.Iterator[str], func_callback: callable = None, *args, **kwargs):
    """Stream messages from an iterator into a Kafka topic."""
    # func_callback should be a function that takes two arguments: err and msg
    config_producer = {**self._config}
    p = kafka.Producer(config_producer)

    def delivery_report(err, msg):
        """Called once for each message produced to indicate delivery result. Triggered by poll() or flush()."""
        if err is not None:
            LOG.error("Kafka msg delivery failed: {}".format(err))
        else:
            LOG.debug("Kafka msg delivered to {} [{}]".format(msg.topic(), msg.partition()))

    if func_callback is None:
        func_callback = delivery_report

    for data in generator:  # some data from the generator
        # Trigger any available delivery report callbacks from previous produce() calls
        p.poll(0)

        # Asynchronously produce a message, the delivery report callback
        # will be triggered from poll() above, or flush() below, when the message has
        # been successfully delivered or failed permanently.
        p.produce(topic, data.encode("utf-8"), callback=func_callback)

    # Wait for any outstanding messages to be delivered and delivery report callbacks to be triggered.
    p.flush()

MongoDB connection helpers.

MongoOperator(config)

Return a cached MongoDB operation wrapper for the given config.

Source code in aloha/db/mongo.py
def MongoOperator(config):
    """Return a cached MongoDB operation wrapper for the given config."""
    db_name = config.get("db_name")
    collection_name = config.get("collection_name")

    _config = {k: v for k, v in config.items() if v is not None}
    key = "%s:%s:%s" % (json.dumps(_config, sort_keys=True, ensure_ascii=False), db_name or "", collection_name or "")

    if key not in _conn:
        try:
            _conn[key] = _MongoDBOperation(_config, db_name=db_name, collection_name=collection_name)
        except Exception as e:
            LOG.exception(e)
            return
    return _conn[key]

MySQL connection helpers.

MySqlOperator

Create and use a SQLAlchemy-backed MySQL connection.

Source code in aloha/db/mysql.py
class MySqlOperator:
    """Create and use a SQLAlchemy-backed MySQL connection."""

    def __init__(self, db_config, **kwargs):
        """Build a connection pool from the provided database config."""
        password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
        self._config = {
            "host": db_config["host"],
            "port": db_config["port"],
            "user": db_config["user"],
            "password": password_vault.get_password(db_config["password"]),
            "dbname": db_config["dbname"],
        }

        try:
            self.db = create_engine(
                "mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}".format(**self._config),
                pool_size=50,
                pool_recycle=500,
                pool_pre_ping=True,
                **kwargs,
            )
            LOG.debug("MySQL connected: {host}:{port}/{dbname}".format(**self._config))
        except Exception as e:
            LOG.exception(e)
            raise RuntimeError("Failed to connect to MySQL")

    @property
    def connection(self):
        return self.db

    def execute_query(self, sql, *args, **kwargs):
        """Execute a SQL statement and return the cursor result."""
        with self.db.connect() as conn:
            cur = conn.execute(text(sql), *args, **kwargs)
            return cur

    @property
    def connection_str(self) -> str:
        """Return a human-readable connection string."""
        return "mysql://{user}:{password}@{host}:{port}/{dbname}".format(**self._config)

connection_str property

Return a human-readable connection string.

__init__(db_config, **kwargs)

Build a connection pool from the provided database config.

Source code in aloha/db/mysql.py
def __init__(self, db_config, **kwargs):
    """Build a connection pool from the provided database config."""
    password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
    self._config = {
        "host": db_config["host"],
        "port": db_config["port"],
        "user": db_config["user"],
        "password": password_vault.get_password(db_config["password"]),
        "dbname": db_config["dbname"],
    }

    try:
        self.db = create_engine(
            "mysql+pymysql://{user}:{password}@{host}:{port}/{dbname}".format(**self._config),
            pool_size=50,
            pool_recycle=500,
            pool_pre_ping=True,
            **kwargs,
        )
        LOG.debug("MySQL connected: {host}:{port}/{dbname}".format(**self._config))
    except Exception as e:
        LOG.exception(e)
        raise RuntimeError("Failed to connect to MySQL")

execute_query(sql, *args, **kwargs)

Execute a SQL statement and return the cursor result.

Source code in aloha/db/mysql.py
def execute_query(self, sql, *args, **kwargs):
    """Execute a SQL statement and return the cursor result."""
    with self.db.connect() as conn:
        cur = conn.execute(text(sql), *args, **kwargs)
        return cur

Oracle DB connection helpers.

OracledbOperator

Create and use a SQLAlchemy-backed Oracle connection.

Source code in aloha/db/oracle.py
class OracledbOperator:
    """Create and use a SQLAlchemy-backed Oracle connection."""

    def __init__(self, db_config, **kwargs):
        """Build an Oracle connection pool from the provided config."""
        """example of db_config:
        {
            "host": "192.168.1.100",
            "port": 1521,
            "user": "PT_INDEX",
            "password": "vault_key_or_plain",
            "service_name": "orcl",   # 推荐使用 service_name
            "sid": "orcl",            # 或使用 sid
            "vault_type": "...",
            "vault_config": {...},
            "lib_dir": "/opt/oracle/instantclient"  # optional, use THICK mode if defined.
        }
        """

        password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
        self._config = {
            "host": db_config["host"],
            "port": db_config["port"],
            "user": db_config["user"],
            "password": password_vault.get_password(db_config.get("password")),
        }

        if "lib_dir" in db_config:  # use Thick mode
            try:
                oracledb.init_oracle_client(lib_dir=db_config["lib_dir"])
                LOG.info("Oracle client initialized in THICK mode from: %s" % db_config["lib_dir"])
            except Exception as e:
                LOG.warning(f"Warning: {e}")
                raise RuntimeError(f"Failed to initialize Oracle client: {e}")

        service_name = db_config.get("service_name")
        sid = db_config.get("sid")

        if service_name:  # using service_name (recommended)
            dsn = oracledb.makedsn(db_config["host"], db_config["port"], service_name=service_name)
        elif sid:  # using SID
            dsn = oracledb.makedsn(db_config["host"], db_config["port"], sid=sid)
        else:
            raise ValueError("Oracle config must specify service_name or sid")

        self._config["dsn"] = dsn
        try:
            self.engine = create_engine(
                "oracle+oracledb://{user}:{password}@".format(**self._config),
                connect_args={"dsn": dsn},
                pool_size=20,
                max_overflow=10,
                pool_pre_ping=True,
                **kwargs,
            )
            msg = "OracleDB connected: {host}:{port}".format(**self._config)
            print(msg)
        except Exception as e:
            LOG.error(e)
            raise RuntimeError("Failed to connect to OracleDB")

    @property
    def connection(self):
        return self.engine

    def execute_query(self, sql, *args, **kwargs):
        """Execute a SQL statement and return the cursor result."""
        with self.engine.connect() as conn:
            cur = conn.execute(text(sql), *args, **kwargs)
            return cur

    @property
    def connection_str(self) -> str:
        """Return a human-readable connection string."""
        return "oracle://{user}@{host}:{port}".format(**self._config)

connection_str property

Return a human-readable connection string.

__init__(db_config, **kwargs)

Build an Oracle connection pool from the provided config.

Source code in aloha/db/oracle.py
def __init__(self, db_config, **kwargs):
    """Build an Oracle connection pool from the provided config."""
    """example of db_config:
    {
        "host": "192.168.1.100",
        "port": 1521,
        "user": "PT_INDEX",
        "password": "vault_key_or_plain",
        "service_name": "orcl",   # 推荐使用 service_name
        "sid": "orcl",            # 或使用 sid
        "vault_type": "...",
        "vault_config": {...},
        "lib_dir": "/opt/oracle/instantclient"  # optional, use THICK mode if defined.
    }
    """

    password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
    self._config = {
        "host": db_config["host"],
        "port": db_config["port"],
        "user": db_config["user"],
        "password": password_vault.get_password(db_config.get("password")),
    }

    if "lib_dir" in db_config:  # use Thick mode
        try:
            oracledb.init_oracle_client(lib_dir=db_config["lib_dir"])
            LOG.info("Oracle client initialized in THICK mode from: %s" % db_config["lib_dir"])
        except Exception as e:
            LOG.warning(f"Warning: {e}")
            raise RuntimeError(f"Failed to initialize Oracle client: {e}")

    service_name = db_config.get("service_name")
    sid = db_config.get("sid")

    if service_name:  # using service_name (recommended)
        dsn = oracledb.makedsn(db_config["host"], db_config["port"], service_name=service_name)
    elif sid:  # using SID
        dsn = oracledb.makedsn(db_config["host"], db_config["port"], sid=sid)
    else:
        raise ValueError("Oracle config must specify service_name or sid")

    self._config["dsn"] = dsn
    try:
        self.engine = create_engine(
            "oracle+oracledb://{user}:{password}@".format(**self._config),
            connect_args={"dsn": dsn},
            pool_size=20,
            max_overflow=10,
            pool_pre_ping=True,
            **kwargs,
        )
        msg = "OracleDB connected: {host}:{port}".format(**self._config)
        print(msg)
    except Exception as e:
        LOG.error(e)
        raise RuntimeError("Failed to connect to OracleDB")

execute_query(sql, *args, **kwargs)

Execute a SQL statement and return the cursor result.

Source code in aloha/db/oracle.py
def execute_query(self, sql, *args, **kwargs):
    """Execute a SQL statement and return the cursor result."""
    with self.engine.connect() as conn:
        cur = conn.execute(text(sql), *args, **kwargs)
        return cur

PostgreSQL connection helpers.

PostgresOperator

Create and use a SQLAlchemy-backed PostgreSQL connection.

Source code in aloha/db/postgres.py
class PostgresOperator:
    """Create and use a SQLAlchemy-backed PostgreSQL connection."""

    def __init__(self, db_config, **kwargs):
        """Build a PostgreSQL connection pool from the database config."""
        password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
        self._config = {
            "host": db_config["host"],
            "port": db_config["port"],
            "user": db_config["user"],
            "password": password_vault.get_password(db_config.get("password")),
            "dbname": db_config["dbname"],
        }
        connect_args = {}
        if "schema" in db_config:
            connect_args["options"] = "-csearch_path={}".format(db_config["schema"])

        try:
            self.engine = create_engine(
                "postgresql+psycopg://{user}:{password}@{host}:{port}/{dbname}".format(**self._config),
                connect_args=connect_args,
                client_encoding="utf8",
                pool_size=20,
                max_overflow=10,
                pool_pre_ping=True,
                **kwargs,
            )
            LOG.debug("PostgresSQL connected: {host}:{port}/{dbname}".format(**self._config))
        except Exception as e:
            LOG.error(e)
            raise RuntimeError("Failed to connect to PostgresSQL")

    @property
    def connection(self):
        return self.engine

    def execute_query(self, sql, *args, **kwargs):
        """Execute a SQL statement and return the cursor result."""
        with self.engine.connect() as conn:
            cur = conn.execute(text(sql), *args, **kwargs)
            return cur

    @property
    def connection_str(self) -> str:
        """Return a human-readable connection string."""
        return "postgresql://{user}:{password}@{host}:{port}/{dbname}".format(**self._config)

connection_str property

Return a human-readable connection string.

__init__(db_config, **kwargs)

Build a PostgreSQL connection pool from the database config.

Source code in aloha/db/postgres.py
def __init__(self, db_config, **kwargs):
    """Build a PostgreSQL connection pool from the database config."""
    password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
    self._config = {
        "host": db_config["host"],
        "port": db_config["port"],
        "user": db_config["user"],
        "password": password_vault.get_password(db_config.get("password")),
        "dbname": db_config["dbname"],
    }
    connect_args = {}
    if "schema" in db_config:
        connect_args["options"] = "-csearch_path={}".format(db_config["schema"])

    try:
        self.engine = create_engine(
            "postgresql+psycopg://{user}:{password}@{host}:{port}/{dbname}".format(**self._config),
            connect_args=connect_args,
            client_encoding="utf8",
            pool_size=20,
            max_overflow=10,
            pool_pre_ping=True,
            **kwargs,
        )
        LOG.debug("PostgresSQL connected: {host}:{port}/{dbname}".format(**self._config))
    except Exception as e:
        LOG.error(e)
        raise RuntimeError("Failed to connect to PostgresSQL")

execute_query(sql, *args, **kwargs)

Execute a SQL statement and return the cursor result.

Source code in aloha/db/postgres.py
def execute_query(self, sql, *args, **kwargs):
    """Execute a SQL statement and return the cursor result."""
    with self.engine.connect() as conn:
        cur = conn.execute(text(sql), *args, **kwargs)
        return cur

Redis connection helpers.

RedisOperator

Create Redis connections with version-checked redis-py.

Source code in aloha/db/redis.py
class RedisOperator:
    """Create Redis connections with version-checked redis-py."""

    def __init__(self, config):
        """Normalize Redis connection settings and build connection metadata."""
        self._check_redis_version()

        password_vault = PasswordVault.get_vault(config.get("vault_type"), config.get("vault_config"))
        _config = {
            "host": config["host"],
            "port": config.get("port", "6379"),
            "password": password_vault.get_password(config.get("password", None)),
            "decode_responses": config.get("decode_responses", True),
            "retry_on_timeout": True,
            "max_connections": config.get("max_connections", 1000),
            "socket_timeout": 3,
            "socket_connect_timeout": 1,
        }
        if "db" in config:
            _config["db"] = config["db"]
        self._config = _config

        self._pool = None

    @staticmethod
    def _check_redis_version() -> bool:
        """Ensure a redis-py version new enough for the helpers is installed."""
        ver_min = version.parse("4.1.0")
        valid = False
        try:
            ver_cur = version.parse(redis.__version__)
            if ver_cur >= ver_min:
                valid = True
                LOG.debug("Using redis version = %s" % redis.__version__)
        except Exception as e:
            LOG.error("Failed to obtain redis version!")
            LOG.error(str(e))

        if not valid:
            msg = "Invalid version of `redis-py`, version >4.1.0 required!"
            LOG.fatal(msg)
            raise ImportError(msg)

        return valid

    @property
    def connection_generic(self):
        """Return a standard Redis client."""
        LOG.debug("StrictRedis connection info: {host}:{port}".format(**self._config))

        if self._pool is None:
            self._pool = redis.ConnectionPool()
        return redis.Redis(connection_pool=self._pool, **self._config)

    @property
    def connection_cluster(self):
        """Return a Redis Cluster client."""
        LOG.debug("RedisCluster connection info: {host}:{port}".format(**self._config))
        return redis.RedisCluster(**self._config)

connection_cluster property

Return a Redis Cluster client.

connection_generic property

Return a standard Redis client.

__init__(config)

Normalize Redis connection settings and build connection metadata.

Source code in aloha/db/redis.py
def __init__(self, config):
    """Normalize Redis connection settings and build connection metadata."""
    self._check_redis_version()

    password_vault = PasswordVault.get_vault(config.get("vault_type"), config.get("vault_config"))
    _config = {
        "host": config["host"],
        "port": config.get("port", "6379"),
        "password": password_vault.get_password(config.get("password", None)),
        "decode_responses": config.get("decode_responses", True),
        "retry_on_timeout": True,
        "max_connections": config.get("max_connections", 1000),
        "socket_timeout": 3,
        "socket_connect_timeout": 1,
    }
    if "db" in config:
        _config["db"] = config["db"]
    self._config = _config

    self._pool = None

SQLite connection helpers.

SqliteOperator

Create and use a SQLAlchemy-backed SQLite connection.

Source code in aloha/db/sqlite.py
class SqliteOperator:
    """Create and use a SQLAlchemy-backed SQLite connection."""

    def __init__(self, db_config, **kwargs):
        """Build a SQLite or SQLCipher engine from the provided config."""
        self._connection_pattern = "sqlite://{dbname}"
        dbname = db_config.get("dbname", "")
        if len(dbname) > 0:
            dbname = "/%s" % dbname
        self._config = {"dbname": dbname}

        if "password" in db_config:
            try:
                import sqlcipher3
            except ImportError:
                raise RuntimeError("Python package required for encrypted sqlite3: sqlcipher3-binary")
            LOG.debug("Version of sqlcipher3 = %s" % sqlcipher3.sqlite_version)
            password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
            password = password_vault.get_password(db_config.get("password", None))
            self._config["password"] = password
            self._connection_pattern = "sqlite+pysqlcipher://:{password}@/{dbname}"
        else:
            LOG.debug("Version of sqlite = %s" % sqlite3.sqlite_version)

        try:
            self.db = create_engine(self._connection_pattern.format(**self._config), **kwargs)
            LOG.debug("Sqlite connected: %s" % self.connection_str)
        except Exception as e:
            LOG.exception(e)
            raise RuntimeError("Failed to connect to sqlite")

    @property
    def connection(self):
        return self.db

    def execute_query(self, sql, *args, **kwargs):
        """Execute a SQL statement and return the cursor result."""
        with self.db.connect() as conn:
            cur = conn.execute(text(sql), *args, **kwargs)
            return cur

    @property
    def connection_str(self) -> str:
        """Return the SQLAlchemy connection URL used by the engine."""
        return self._connection_pattern.format(**self._config)

connection_str property

Return the SQLAlchemy connection URL used by the engine.

__init__(db_config, **kwargs)

Build a SQLite or SQLCipher engine from the provided config.

Source code in aloha/db/sqlite.py
def __init__(self, db_config, **kwargs):
    """Build a SQLite or SQLCipher engine from the provided config."""
    self._connection_pattern = "sqlite://{dbname}"
    dbname = db_config.get("dbname", "")
    if len(dbname) > 0:
        dbname = "/%s" % dbname
    self._config = {"dbname": dbname}

    if "password" in db_config:
        try:
            import sqlcipher3
        except ImportError:
            raise RuntimeError("Python package required for encrypted sqlite3: sqlcipher3-binary")
        LOG.debug("Version of sqlcipher3 = %s" % sqlcipher3.sqlite_version)
        password_vault = PasswordVault.get_vault(db_config.get("vault_type"), db_config.get("vault_config"))
        password = password_vault.get_password(db_config.get("password", None))
        self._config["password"] = password
        self._connection_pattern = "sqlite+pysqlcipher://:{password}@/{dbname}"
    else:
        LOG.debug("Version of sqlite = %s" % sqlite3.sqlite_version)

    try:
        self.db = create_engine(self._connection_pattern.format(**self._config), **kwargs)
        LOG.debug("Sqlite connected: %s" % self.connection_str)
    except Exception as e:
        LOG.exception(e)
        raise RuntimeError("Failed to connect to sqlite")

execute_query(sql, *args, **kwargs)

Execute a SQL statement and return the cursor result.

Source code in aloha/db/sqlite.py
def execute_query(self, sql, *args, **kwargs):
    """Execute a SQL statement and return the cursor result."""
    with self.db.connect() as conn:
        cur = conn.execute(text(sql), *args, **kwargs)
        return cur