Skip to content

工具函数

HTML extraction helpers.

extract_img_url(string)

Extract the first image source URL from an HTML fragment.

Source code in aloha/util/html.py
def extract_img_url(string):
    """Extract the first image source URL from an HTML fragment."""
    try:
        if string is None:
            return None
        html = etree.HTML(string)
        for ii in html:
            images = ii.xpath("p/img/@src")
            return images[0]
    except Exception as e:
        print(e, string)

extract_text(raw_data)

Extract visible text from an HTML fragment.

Source code in aloha/util/html.py
def extract_text(raw_data):
    """Extract visible text from an HTML fragment."""
    if raw_data is not None:
        html = etree.HTML(raw_data)

        content = []
        if html is not None:
            for script in html.xpath("//script"):
                parent = script.getparent()
                if parent is not None:
                    if script.tail:
                        prev = script.getprevious()
                        if prev is not None:
                            prev.tail = (prev.tail or "") + script.tail
                        else:
                            parent.text = (parent.text or "") + script.tail
                    parent.remove(script)

            html_data = html.xpath("/html/body/*//text()")
            for data in html_data:
                tmp = (
                    data.strip(" \n\r")
                    .replace("\n", "")
                    .replace("\t", "")
                    .replace("\u3000", "")
                    .replace("\xa0", "")
                    .replace("\r", "")
                    .replace("\u2028", "")
                    .replace("\u2029", "")
                )
                if tmp:
                    content.append(tmp)

        item_article = "".join(content)
        return item_article
    else:
        return None

ObjectWithDateTimeEncoder

Bases: JSONEncoder

JSON encoder that handles datetime and pandas timestamp objects.

Converts: - pandas NaT to None - pandas Timestamp to Unix timestamp (float) - datetime objects to Unix timestamp (float)

Source code in aloha/util/json.py
class ObjectWithDateTimeEncoder(json.JSONEncoder):
    """
    JSON encoder that handles datetime and pandas timestamp objects.

    Converts:
    - pandas NaT to None
    - pandas Timestamp to Unix timestamp (float)
    - datetime objects to Unix timestamp (float)
    """

    def default(self, obj):
        """
        Convert objects to JSON serializable types.

        :param obj: Object to encode
        :return: JSON serializable representation
        """
        if isinstance(obj, NaTType):  # notice: NaTType is a subclass of datetime
            return None
        if isinstance(obj, Timestamp):
            return obj.timestamp()
        if isinstance(obj, datetime):
            return obj.timestamp()

        return json.JSONEncoder.default(self, obj)

default(obj)

Convert objects to JSON serializable types.

Parameters:

Name Type Description Default
obj

Object to encode

required

Returns:

Type Description

JSON serializable representation

Source code in aloha/util/json.py
def default(self, obj):
    """
    Convert objects to JSON serializable types.

    :param obj: Object to encode
    :return: JSON serializable representation
    """
    if isinstance(obj, NaTType):  # notice: NaTType is a subclass of datetime
        return None
    if isinstance(obj, Timestamp):
        return obj.timestamp()
    if isinstance(obj, datetime):
        return obj.timestamp()

    return json.JSONEncoder.default(self, obj)

Random helper aliases built on top of secrets.SystemRandom.

random_bool()

Return a random boolean value.

Source code in aloha/util/random.py
def random_bool():
    """Return a random boolean value."""
    return random.choice([True, False])

get_cpu_info(*args, **kwargs)

Get CPU information.

Returns:

Type Description
dict

Dictionary containing CPU information including core count, frequency, and usage percentage

Source code in aloha/util/sys_info.py
def get_cpu_info(*args, **kwargs) -> dict:
    """
    Get CPU information.

    :return: Dictionary containing CPU information including core count, frequency, and usage percentage
    """
    cpu_freq = psutil.cpu_freq()  # CPU frequencies
    ret = {
        "num_cores_physical": psutil.cpu_count(logical=False),
        "num_cores_total": psutil.cpu_count(logical=True),
        "freq_max_mhz": f"{cpu_freq.max:.2f}",
        "freq_min_mhz": f"{cpu_freq.min:.2f}",
        "freq_cur_mhz": f"{cpu_freq.current:.2f}",
        "cpu_percent_total": f"{psutil.cpu_percent()}%",
    }
    for i, percentage in enumerate(psutil.cpu_percent(percpu=True, interval=1)):
        ret["cpu_percent_core_%02d" % i] = f"{percentage}%"

    return ret

get_disk_info(*args, **kwargs)

Get disk information.

Returns:

Type Description
dict

Dictionary containing disk I/O statistics and partition information

Source code in aloha/util/sys_info.py
def get_disk_info(*args, **kwargs) -> dict:
    """
    Get disk information.

    :return: Dictionary containing disk I/O statistics and partition information
    """
    # get IO statistics since boot
    disk_io = psutil.disk_io_counters()
    partitions = psutil.disk_partitions()

    ret = {
        "io_total_read": f"{get_size(disk_io.read_bytes)}",
        "io_total_write": f"{get_size(disk_io.write_bytes)}",
        "partitions": [],
    }

    for partition in partitions:
        part = {
            "device": f"{partition.device}",
            "mount_point": f"{partition.mountpoint}",
            "fs_type": f"{partition.fstype}",
        }

        try:
            partition_usage = psutil.disk_usage(partition.mountpoint)
            part.update(
                {
                    "size_total": f"{get_size(partition_usage.total)}",
                    "size_used": f"{get_size(partition_usage.used)}",
                    "size_free": f"{get_size(partition_usage.free)}",
                    "percent_used": f"{partition_usage.percent}%",
                }
            )
        except PermissionError:
            pass  # this can be caught due to the disk that isn't ready

        ret["partitions"].append(part)

    return ret

get_mem_info(*args, **kwargs)

Get memory information.

Returns:

Type Description
dict

Dictionary containing virtual memory and swap space information

Source code in aloha/util/sys_info.py
def get_mem_info(*args, **kwargs) -> dict:
    """
    Get memory information.

    :return: Dictionary containing virtual memory and swap space information
    """
    svmem = psutil.virtual_memory()
    swap = psutil.swap_memory()

    return {
        "vm_total": f"{get_size(svmem.total)}",
        "vm_available": f"{get_size(svmem.available)}",
        "vm_used": f"{get_size(svmem.used)}",
        "vm_percent": f"{svmem.percent}%",
        "swap_total": f"{get_size(swap.total)}",
        "swap_free": f"{get_size(swap.free)}",
        "swap_used": f"{get_size(swap.used)}",
        "swap_percent": f"{swap.percent}%",
    }

get_net_info(*args, **kwargs)

Get network information.

Returns:

Type Description
dict

Dictionary containing network I/O statistics and interface information

Source code in aloha/util/sys_info.py
def get_net_info(*args, **kwargs) -> dict:
    """
    Get network information.

    :return: Dictionary containing network I/O statistics and interface information
    """
    # get IO statistics since boot
    net_io = psutil.net_io_counters()

    # get all network interfaces (virtual and physical)
    if_addresses = psutil.net_if_addrs()

    ret = {
        "net_total_sent": f"{get_size(net_io.bytes_sent)}",
        "net_total_received": f"{get_size(net_io.bytes_recv)}",
        "interfaces": [],
    }

    for interface_name, interface_addresses in if_addresses.items():
        interface = {"name": interface_name}

        for address in interface_addresses:
            family = str(address.family).split(".")[-1]
            family = {"AF_LINK": "mac", "AF_INET": "ipv4", "AF_INET6": "ipv6"}.get(family, family)

            interface["%s_address" % family] = address.address
            interface["%s_netmask" % family] = address.netmask
            interface["%s_broadcast" % family] = address.broadcast

        ret["interfaces"].append(interface)

    return ret

get_os_info(*args, **kwargs)

Get operating system information.

Returns:

Type Description
dict

Dictionary containing OS information including boot time and platform details

Source code in aloha/util/sys_info.py
def get_os_info(*args, **kwargs) -> dict:
    """
    Get operating system information.

    :return: Dictionary containing OS information including boot time and platform details
    """
    ret = {}

    boot_time = datetime.fromtimestamp(psutil.boot_time())
    ret["boot_time"] = boot_time.strftime("%Y-%m-%d %H:%M:%S.%f")

    uname = platform.uname()
    ret.update(uname._asdict())
    return ret

get_size(bytes, suffix='B')

Scale bytes to its proper format.

e.g: 1253656 => '1.20MB' 1253656678 => '1.17GB'

Parameters:

Name Type Description Default
bytes

Number of bytes to format

required
suffix

Unit suffix (default: "B")

'B'

Returns:

Type Description

Formatted size string

Source code in aloha/util/sys_info.py
def get_size(bytes, suffix="B"):
    """
    Scale bytes to its proper format.

    e.g:
        1253656 => '1.20MB'
        1253656678 => '1.17GB'

    :param bytes: Number of bytes to format
    :param suffix: Unit suffix (default: "B")
    :return: Formatted size string
    """
    factor = 1024
    for unit in ["", "K", "M", "G", "T", "P"]:
        if bytes < factor:
            return f"{bytes:.2f}{unit}{suffix}"
        bytes /= factor

get_sys_info(*args, **kwargs)

Get comprehensive system information.

Combines information from OS, CPU, memory, disk, and network subsystems.

Returns:

Type Description
dict

Dictionary containing complete system information

Source code in aloha/util/sys_info.py
def get_sys_info(*args, **kwargs) -> dict:
    """
    Get comprehensive system information.

    Combines information from OS, CPU, memory, disk, and network subsystems.

    :return: Dictionary containing complete system information
    """
    return {
        "os_info": get_os_info(),
        "cpu_info": get_cpu_info(),
        "mem_info": get_mem_info(),
        "disk_info": get_disk_info(),
        "net_info": get_net_info(),
    }

main()

Print system information as JSON to stdout.

Source code in aloha/util/sys_info.py
def main():
    """
    Print system information as JSON to stdout.
    """
    data = get_sys_info()
    import json

    data = json.dumps(data, ensure_ascii=False, indent=2)
    print(data)

时间工具 (aloha.util.time)

该模块提供用于包装函数调用(如通过 requestshttpx 发起外部 HTTP 请求)的超时控制工具,并在操作成功或失败(超时/异常)时触发可选的回调函数。

核心函数

  • run_with_timeout: 以同步方式运行函数,并应用超时限制。
  • run_async_with_timeout: 以异步方式(协程或在执行器中运行同步函数)运行函数,并应用超时限制。

使用示例

from aloha.util.time import run_with_timeout
import requests

def success_callback(response):
    print("请求成功:", response.status_code)

def fail_callback(exception):
    print("请求失败或超时:", exception)

# 同步超时包装调用
try:
    run_with_timeout(
        requests.get,
        2.5,  # 2.5 秒超时限制
        "https://httpbin.org/delay/1",
        fn_callback_success=success_callback,
        fn_callback_fail=fail_callback
    )
except TimeoutError:
    print("捕获到超时异常 (TimeoutError)")

Time and timeout utilities.

run_async_with_timeout(func, timeout_seconds, *args, fn_callback_success=None, fn_callback_fail=None, **kwargs) async

Wrap an asynchronous function call (coroutine function or sync function inside executor) with a timeout.

If the operation completes within timeout_seconds, fn_callback_success(result) is executed if provided, and the result is returned. If the operation times out or raises an exception, fn_callback_fail(exception) is executed if provided, and the exception is reraised.

Source code in aloha/util/time.py
async def run_async_with_timeout(
    func: Callable[..., Any],
    timeout_seconds: float,
    *args: Any,
    fn_callback_success: Optional[Callable[[Any], Any]] = None,
    fn_callback_fail: Optional[Callable[[Exception], Any]] = None,
    **kwargs: Any,
) -> Any:
    """Wrap an asynchronous function call (coroutine function or sync function inside executor) with a timeout.

    If the operation completes within `timeout_seconds`, `fn_callback_success(result)`
    is executed if provided, and the result is returned.
    If the operation times out or raises an exception, `fn_callback_fail(exception)`
    is executed if provided, and the exception is reraised.
    """
    try:
        if inspect.iscoroutinefunction(func):
            coro = func(*args, **kwargs)
        else:
            # Run sync function in default executor to prevent blocking the event loop
            loop = asyncio.get_running_loop()
            coro = loop.run_in_executor(None, lambda: func(*args, **kwargs))

        result = await asyncio.wait_for(coro, timeout=timeout_seconds)
        if fn_callback_success is not None:
            fn_callback_success(result)
        return result
    except Exception as e:
        if isinstance(e, (asyncio.TimeoutError, concurrent.futures.TimeoutError)):
            exc = TimeoutError(f"Operation timed out after {timeout_seconds} seconds")
        else:
            exc = e
        if fn_callback_fail is not None:
            fn_callback_fail(exc)
        raise exc

run_with_timeout(func, timeout_seconds, *args, fn_callback_success=None, fn_callback_fail=None, **kwargs)

Wrap a synchronous function call with a timeout.

If the operation completes within timeout_seconds, fn_callback_success(result) is executed if provided, and the result is returned. If the operation times out or raises an exception, fn_callback_fail(exception) is executed if provided, and the exception is reraised.

Source code in aloha/util/time.py
def run_with_timeout(
    func: Callable[..., Any],
    timeout_seconds: float,
    *args: Any,
    fn_callback_success: Optional[Callable[[Any], Any]] = None,
    fn_callback_fail: Optional[Callable[[Exception], Any]] = None,
    **kwargs: Any,
) -> Any:
    """Wrap a synchronous function call with a timeout.

    If the operation completes within `timeout_seconds`, `fn_callback_success(result)`
    is executed if provided, and the result is returned.
    If the operation times out or raises an exception, `fn_callback_fail(exception)`
    is executed if provided, and the exception is reraised.
    """
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        future = executor.submit(func, *args, **kwargs)
        try:
            result = future.result(timeout=timeout_seconds)
            if fn_callback_success is not None:
                fn_callback_success(result)
            return result
        except Exception as e:
            if isinstance(e, concurrent.futures.TimeoutError):
                exc = TimeoutError(f"Operation timed out after {timeout_seconds} seconds")
            else:
                exc = e
            if fn_callback_fail is not None:
                fn_callback_fail(exc)
            raise exc