Skip to content

服务层

CORSMiddleware

Bases: BaseHTTPMiddleware

Middleware that adds permissive CORS headers to all responses.

Source code in aloha/service/http/plain_http_handler.py
class CORSMiddleware(BaseHTTPMiddleware):
    """Middleware that adds permissive CORS headers to all responses."""

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        add_cors_headers(response)
        return response

DefaultHandler404

Default 404 response handler for FastAPI services.

Source code in aloha/service/handlers.py
class DefaultHandler404:
    """Default 404 response handler for FastAPI services."""

    def __init__(self, request: Request | None = None, **kwargs):
        self.request = request
        self._request = request

    async def handle(self, request: Request | None = None):
        """Return a JSON response for unmatched routes."""
        _request = request or self.request
        del _request
        return JSONResponse(
            {"code": 404, "message": ["Not Found"], "data": None},
            status_code=404,
        )

handle(request=None) async

Return a JSON response for unmatched routes.

Source code in aloha/service/handlers.py
async def handle(self, request: Request | None = None):
    """Return a JSON response for unmatched routes."""
    _request = request or self.request
    del _request
    return JSONResponse(
        {"code": 404, "message": ["Not Found"], "data": None},
        status_code=404,
    )

Service application bootstrap utilities for FastAPI.

Application

Bootstrap and run an aloha FastAPI web service.

Source code in aloha/service/app.py
class Application:
    """Bootstrap and run an aloha FastAPI web service."""

    def __init__(self, *args, **kwargs):
        """Create the service application wrapper."""
        settings = dict(SETTINGS.config)
        self.web_app = FastAPIApplication(settings)
        self._server = None

    def start(self):
        """Start the FastAPI app using uvicorn."""
        port = self.web_app.get_port()
        workers = self.web_app.get_workers()

        LOG.info("Starting FastAPI service at port [%s] with [%s] workers...", port, workers)

        try:
            # Configure uvicorn
            config = uvicorn.Config(
                app=self.web_app.app,
                host="0.0.0.0",
                port=port,
                workers=workers,
                log_level="info",
                access_log=True,
            )
            self._server = uvicorn.Server(config)

            # Run with uvloop if available
            try:
                import uvloop

                asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
            except ImportError:
                LOG.debug("uvloop is not installed; continuing with default asyncio event loop.")

            asyncio.run(self._server.serve())
        except KeyboardInterrupt:
            LOG.info("Service interrupted by user")
        except Exception as e:
            LOG.error("Service error: %s", str(e))
            raise e

    def stop(self):
        """Stop the server if it is currently running."""
        if self._server is not None:
            self._server.should_exit = True

__init__(*args, **kwargs)

Create the service application wrapper.

Source code in aloha/service/app.py
def __init__(self, *args, **kwargs):
    """Create the service application wrapper."""
    settings = dict(SETTINGS.config)
    self.web_app = FastAPIApplication(settings)
    self._server = None

start()

Start the FastAPI app using uvicorn.

Source code in aloha/service/app.py
def start(self):
    """Start the FastAPI app using uvicorn."""
    port = self.web_app.get_port()
    workers = self.web_app.get_workers()

    LOG.info("Starting FastAPI service at port [%s] with [%s] workers...", port, workers)

    try:
        # Configure uvicorn
        config = uvicorn.Config(
            app=self.web_app.app,
            host="0.0.0.0",
            port=port,
            workers=workers,
            log_level="info",
            access_log=True,
        )
        self._server = uvicorn.Server(config)

        # Run with uvloop if available
        try:
            import uvloop

            asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        except ImportError:
            LOG.debug("uvloop is not installed; continuing with default asyncio event loop.")

        asyncio.run(self._server.serve())
    except KeyboardInterrupt:
        LOG.info("Service interrupted by user")
    except Exception as e:
        LOG.error("Service error: %s", str(e))
        raise e

stop()

Stop the server if it is currently running.

Source code in aloha/service/app.py
def stop(self):
    """Stop the server if it is currently running."""
    if self._server is not None:
        self._server.should_exit = True

FastAPI web application assembly for aloha services.

FastAPIApplication

FastAPI application that loads routes from configured service modules.

Source code in aloha/service/web.py
class FastAPIApplication:
    """FastAPI application that loads routes from configured service modules."""

    def __init__(self, config: dict = None, **kwargs):
        """Create the FastAPI application and its routes."""
        self.config = config or {}
        self.app = FastAPI(title="Aloha Service", version="1.0.0", **kwargs)
        self._setup_default_handler()
        self._setup_routes()

    def _setup_default_handler(self):
        """Register a custom default 404 handler when configured."""
        handler_class = self.config.get("default_handler_class")
        if not handler_class:
            return

        @self.app.exception_handler(404)
        async def _default_404_handler(request: Request, exc: Exception):
            handler = handler_class(request=request)
            if hasattr(handler, "handle") and callable(handler.handle):
                return await handler.handle(request)
            if hasattr(handler, "__call__") and callable(handler):
                return await handler(request)
            if hasattr(handler, "response") and callable(handler.response):
                return await handler.response()
            return JSONResponse(
                {"code": 404, "message": ["Not Found"], "data": None},
                status_code=404,
            )

    def _setup_routes(self):
        """Setup routes from configured service modules."""
        settings = self.config.get("service", {})
        modules = settings.get("modules", [])

        for m in modules:
            routes = _load_routes(m)
            for url, handler_class in routes:
                self._register_handler(url, handler_class)
                s_log_msg = "Loaded API module %-50s" % url
                if LOG.level < logging.INFO:
                    s_log_msg += "\t from class %s" % str(handler_class)
                LOG.info(s_log_msg)

    def _register_handler(self, url: str, handler_class):
        """Register a handler class as FastAPI routes based on its methods."""
        has_get = hasattr(handler_class, "get") and callable(getattr(handler_class, "get"))
        has_post = hasattr(handler_class, "post") and callable(getattr(handler_class, "post"))

        # Determine path pattern for FastAPI
        fastapi_url, path_params = self._convert_url_pattern(url)

        # Store path_params in closure for use in handlers
        _has_path_params = path_params
        _original_url = url

        # Register POST handler if handler class has post method
        if has_post:

            async def post_handler(request: Request):
                kwargs = {}
                handler = handler_class()
                handler._request = request

                # Extract path params from URL
                if _has_path_params:
                    match_path = self._match_path(_original_url, str(request.url.path))
                    if match_path:
                        kwargs.update(match_path)

                try:
                    body = await request.json()
                except Exception:
                    body = {}

                kwargs.update(body)

                try:
                    result = await handler.post(**kwargs)
                    # If handler returns a Response object, return it directly
                    if isinstance(result, Response):
                        return result
                    # Otherwise, wrap in standard response format
                    resp = dict(code=5200, message=["success"])
                    if isinstance(result, dict):
                        resp["data"] = result.get("data", result)
                    else:
                        resp["data"] = result
                    return JSONResponse(resp)
                except Exception as e:
                    if handler.LOG.level == logging.DEBUG:
                        handler.LOG.error(e, exc_info=True)
                    return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

            self.app.post(fastapi_url)(post_handler)

        # Register GET handler if handler class has get method
        if has_get:

            async def get_handler(request: Request):
                kwargs = {}
                handler = handler_class()
                handler._request = request

                # Extract path params from URL
                if _has_path_params:
                    match_path = self._match_path(_original_url, str(request.url.path))
                    if match_path:
                        kwargs.update(match_path)

                kwargs.update(dict(request.query_params))

                try:
                    result = await handler.get(**kwargs)
                    # If handler returns a Response object, return it directly
                    if isinstance(result, Response):
                        return result
                    # Otherwise, wrap in standard response format
                    resp = dict(code=5200, message=["success"])
                    if isinstance(result, dict):
                        resp["data"] = result.get("data", result)
                    else:
                        resp["data"] = result
                    return JSONResponse(resp)
                except Exception as e:
                    if handler.LOG.level == logging.DEBUG:
                        handler.LOG.error(e, exc_info=True)
                    return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

            self.app.get(fastapi_url)(get_handler)

        # Default: register a POST handler using response() method
        if not has_post and not has_get:

            async def default_handler(request: Request):
                kwargs = {}
                handler = handler_class()
                handler._request = request

                # Extract path params from URL
                if _has_path_params:
                    match_path = self._match_path(_original_url, str(request.url.path))
                    if match_path:
                        kwargs.update(match_path)

                try:
                    body = await request.json()
                except Exception:
                    body = {}

                kwargs.update(body)

                resp = dict(code=5200, message=["success"])
                try:
                    result = handler.response(**kwargs)
                    resp["data"] = result
                except Exception as e:
                    if handler.LOG.level == logging.DEBUG:
                        handler.LOG.error(e, exc_info=True)
                    return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

                return JSONResponse(resp)

            self.app.post(fastapi_url)(default_handler)

    def _convert_url_pattern(self, tornado_pattern: str) -> Tuple[str, bool]:
        """Convert Tornado URL pattern to FastAPI pattern.

        Tornado: /api/common/sys_info/(.*)
        FastAPI: /api/common/sys_info/{path_param}
        """
        has_capture = "(.*)" in tornado_pattern
        fastapi_pattern = tornado_pattern.replace("(.*)", "{path_param:path}")
        return fastapi_pattern, has_capture

    def _match_path(self, tornado_pattern: str, path: str) -> dict:
        """Match a path against a Tornado pattern and extract params."""
        # Convert Tornado pattern to regex
        pattern = tornado_pattern
        pattern = pattern.replace("(.*)", r"(?P<path_param>.*)")
        pattern = "^" + pattern + "$"

        match = re.match(pattern, path)
        if match:
            return match.groupdict()
        return {}

    def get_port(self) -> int:
        """Get the configured port."""
        service_settings = self.config.get("service", {})
        port = service_settings.get("port") or int(os.environ.get("PORT_SVC", 8000))
        port = int(os.environ.get("PORT", port))
        return port

    def get_workers(self) -> int:
        """Get the configured number of workers."""
        service_settings = self.config.get("service", {})
        return int(service_settings.get("num_process") or 1)

__init__(config=None, **kwargs)

Create the FastAPI application and its routes.

Source code in aloha/service/web.py
def __init__(self, config: dict = None, **kwargs):
    """Create the FastAPI application and its routes."""
    self.config = config or {}
    self.app = FastAPI(title="Aloha Service", version="1.0.0", **kwargs)
    self._setup_default_handler()
    self._setup_routes()

get_port()

Get the configured port.

Source code in aloha/service/web.py
def get_port(self) -> int:
    """Get the configured port."""
    service_settings = self.config.get("service", {})
    port = service_settings.get("port") or int(os.environ.get("PORT_SVC", 8000))
    port = int(os.environ.get("PORT", port))
    return port

get_workers()

Get the configured number of workers.

Source code in aloha/service/web.py
def get_workers(self) -> int:
    """Get the configured number of workers."""
    service_settings = self.config.get("service", {})
    return int(service_settings.get("num_process") or 1)

Base HTTP client helpers for aloha API clients using httpx.

AbstractApiClient

Bases: ABC

Common client behavior for aloha HTTP APIs using httpx.

Source code in aloha/service/http/base_api_client.py
class AbstractApiClient(ABC):
    """Common client behavior for aloha HTTP APIs using httpx."""

    LOG = LOG
    RETRY_METHOD_WHITELIST: frozenset = frozenset(["GET", "POST"])
    RETRY_STATUS_FORCELIST: frozenset = frozenset({413, 429, 503, 502, 504})
    config = SETTINGS.config

    def __init__(self, url_endpoint: str = None, *args, **kwargs):
        """Store the endpoint used by the client."""
        self.url_endpoint = url_endpoint or ""
        LOG.debug("API Caller URL endpoint set to: %s" % self.url_endpoint)

    def get_http_client(self, total_retries: int = 3, *args, **kwargs) -> httpx.AsyncClient:
        """Create an httpx async client with retry support via custom transport."""
        # Create a custom transport that retries on specific status codes
        from httpx import AsyncClient, Limits, Timeout

        # Configure retry policy
        limits = Limits(max_keepalive_connections=20, max_connections=100, keepalive_expiry=30)
        timeout = Timeout(timeout=30.0, connect=5.0)

        # Create async client with retry capabilities
        client = AsyncClient(
            limits=limits,
            timeout=timeout,
            follow_redirects=True,
            http2=True,
        )
        return client

    def get_headers(self, *args, **kwargs) -> dict:
        """Build the default request headers used by aloha clients."""
        headers = {
            "Content-Type": "application/json",
            "Request-ID": str(uuid.uuid4()),
        }
        return headers

    @abstractmethod
    def wrap_request_data(self, data: dict) -> dict:
        """Transform the request payload before sending it."""
        assert isinstance(data, dict), "Data object must be a dict!"
        raise NotImplementedError()

    async def _async_call(self, api_url: str, data: dict = None, timeout: float = 5, **kwargs):
        """Async version: Call a remote API and return the parsed JSON response."""
        body = data or dict()
        body.update(kwargs)
        payload = self.wrap_request_data(data=body)
        LOG.debug("Calling api: %s" % api_url)

        async with self.get_http_client() as client:
            resp = await client.post(
                urljoin(self.url_endpoint, api_url), json=payload, timeout=timeout, headers=self.get_headers()
            )

        try:
            ret = resp.json()
        except Exception as e:
            LOG.error(str(e))
            raise RuntimeError(resp.text)

        return ret

    def call(self, api_url: str, data: dict = None, timeout: float = 5, **kwargs):
        """Call a remote API and return the parsed JSON response (sync wrapper)."""
        import asyncio

        try:
            loop = asyncio.get_event_loop()
            if loop.is_running():
                # If loop is running, we need to create a new task
                import concurrent.futures

                with concurrent.futures.ThreadPoolExecutor() as pool:
                    future = pool.submit(asyncio.run, self._async_call(api_url, data, timeout, **kwargs))
                    return future.result()
            else:
                return loop.run_until_complete(self._async_call(api_url, data, timeout, **kwargs))
        except RuntimeError:
            # No event loop exists
            return asyncio.run(self._async_call(api_url, data, timeout, **kwargs))

__init__(url_endpoint=None, *args, **kwargs)

Store the endpoint used by the client.

Source code in aloha/service/http/base_api_client.py
def __init__(self, url_endpoint: str = None, *args, **kwargs):
    """Store the endpoint used by the client."""
    self.url_endpoint = url_endpoint or ""
    LOG.debug("API Caller URL endpoint set to: %s" % self.url_endpoint)

call(api_url, data=None, timeout=5, **kwargs)

Call a remote API and return the parsed JSON response (sync wrapper).

Source code in aloha/service/http/base_api_client.py
def call(self, api_url: str, data: dict = None, timeout: float = 5, **kwargs):
    """Call a remote API and return the parsed JSON response (sync wrapper)."""
    import asyncio

    try:
        loop = asyncio.get_event_loop()
        if loop.is_running():
            # If loop is running, we need to create a new task
            import concurrent.futures

            with concurrent.futures.ThreadPoolExecutor() as pool:
                future = pool.submit(asyncio.run, self._async_call(api_url, data, timeout, **kwargs))
                return future.result()
        else:
            return loop.run_until_complete(self._async_call(api_url, data, timeout, **kwargs))
    except RuntimeError:
        # No event loop exists
        return asyncio.run(self._async_call(api_url, data, timeout, **kwargs))

get_headers(*args, **kwargs)

Build the default request headers used by aloha clients.

Source code in aloha/service/http/base_api_client.py
def get_headers(self, *args, **kwargs) -> dict:
    """Build the default request headers used by aloha clients."""
    headers = {
        "Content-Type": "application/json",
        "Request-ID": str(uuid.uuid4()),
    }
    return headers

get_http_client(total_retries=3, *args, **kwargs)

Create an httpx async client with retry support via custom transport.

Source code in aloha/service/http/base_api_client.py
def get_http_client(self, total_retries: int = 3, *args, **kwargs) -> httpx.AsyncClient:
    """Create an httpx async client with retry support via custom transport."""
    # Create a custom transport that retries on specific status codes
    from httpx import AsyncClient, Limits, Timeout

    # Configure retry policy
    limits = Limits(max_keepalive_connections=20, max_connections=100, keepalive_expiry=30)
    timeout = Timeout(timeout=30.0, connect=5.0)

    # Create async client with retry capabilities
    client = AsyncClient(
        limits=limits,
        timeout=timeout,
        follow_redirects=True,
        http2=True,
    )
    return client

wrap_request_data(data) abstractmethod

Transform the request payload before sending it.

Source code in aloha/service/http/base_api_client.py
@abstractmethod
def wrap_request_data(self, data: dict) -> dict:
    """Transform the request payload before sending it."""
    assert isinstance(data, dict), "Data object must be a dict!"
    raise NotImplementedError()

Base FastAPI dependencies and request helpers for aloha services.

AbstractApiHandler

Bases: ABC

Shared request parsing and response helpers for JSON APIs.

This is a base class that provides utility methods for API handlers. Subclasses should inherit from this and implement the response() method.

Source code in aloha/service/http/base_api_handler.py
class AbstractApiHandler(ABC):
    """Shared request parsing and response helpers for JSON APIs.

    This is a base class that provides utility methods for API handlers.
    Subclasses should inherit from this and implement the response() method.
    """

    LOG = LOG
    MAP_ERROR_INFO: dict = {"BAD_REQUEST": {"code": "5101", "message": ["Bad request: fail to parse body as JSON object!"]}}

    def __init__(self):
        """Initialize request state used by subclasses."""
        self.api_args: Optional[tuple] = None
        self.api_kwargs: Optional[dict] = None
        self._request: Optional[Request] = None
        self._response: Optional[Response] = None

    def response(self, *args, **kwargs) -> dict:
        """Subclasses must implement the business response."""
        raise NotImplementedError()

    @property
    def request_header_content_type(self) -> str:
        """Return the request content type with a JSON default."""
        if self._request is None:
            return "application/json; charset=utf-8"
        return self._request.headers.get("Content-Type", "application/json; charset=utf-8")

    @property
    def request_id(self) -> str:
        """Return or create a request identifier for tracing."""
        if self._request is None:
            return datetime.now().strftime("%Y%m%d-%H%M%S-%f")
        request_id = self._request.headers.get("Request-ID")
        if request_id is None:
            request_id = datetime.now().strftime("%Y%m%d-%H%M%S-%f")
        return request_id

    @property
    def request_body(self) -> Optional[dict]:
        """Parse the request body as JSON or multipart form data."""
        content_type: str = self.request_header_content_type

        if self._request is None:
            return {}

        # For multipart/form-data, use request_param logic
        if content_type.startswith("multipart/form-data"):
            return self.request_param

        try:
            body = asyncio.get_event_loop().run_until_complete(self._request.body())
            body_str = body.decode("utf-8")
            if body_str:
                return json.loads(body_str)
            return {}
        except (UnicodeDecodeError, json.JSONDecodeError):
            return self.MAP_ERROR_INFO["BAD_REQUEST"]

    @property
    def request_param(self) -> dict:
        """Parse query/body arguments into a JSON-friendly dict."""
        ret: dict = {}
        if self._request is None:
            return ret

        # Parse query parameters
        for k, v in self._request.query_params.items():
            try:
                value = json.loads(v)
            except json.JSONDecodeError:
                value = v
            ret[k] = value

        return ret

    def get_request_files(self) -> Dict[str, list]:
        """Get uploaded files from multipart form data."""
        if self._request is None:
            return {}
        return self._request._form

    def finish(self, data: Any, status_code: int = 200) -> Response:
        """Create a JSON response with proper content type."""
        if isinstance(data, dict):
            content = json.dumps(data, ensure_ascii=False, default=str, separators=(",", ":"))
        elif isinstance(data, str):
            content = data
        else:
            content = json.dumps(data, ensure_ascii=False, default=str, separators=(",", ":"))
        return Response(content=content, status_code=status_code, media_type="application/json")

    def set_header(self, key: str, value: str) -> None:
        """Set a response header (no-op in base class, overridden in FastAPI route)."""
        pass

    def set_status(self, status_code: int, reason: str = None) -> None:
        """Set the response status code (no-op in base class)."""
        pass

    async def _handle_request(self, request: Request, *args, **kwargs) -> Response:
        """Process the request and return a response."""
        self._request = request
        self.api_args = args
        self.api_kwargs = kwargs

        try:
            result = self.response(*args, **kwargs)
            if isinstance(result, (dict, list)):
                return self.finish(result)
            return result
        except Exception as e:
            if self.LOG.level == logging.DEBUG:
                self.LOG.error(e, exc_info=True)
            msgs = ["An internal error has occurred!", repr(e)]
            return self.finish({"code": 5201, "message": msgs}, status_code=500)

request_body property

Parse the request body as JSON or multipart form data.

request_header_content_type property

Return the request content type with a JSON default.

request_id property

Return or create a request identifier for tracing.

request_param property

Parse query/body arguments into a JSON-friendly dict.

__init__()

Initialize request state used by subclasses.

Source code in aloha/service/http/base_api_handler.py
def __init__(self):
    """Initialize request state used by subclasses."""
    self.api_args: Optional[tuple] = None
    self.api_kwargs: Optional[dict] = None
    self._request: Optional[Request] = None
    self._response: Optional[Response] = None

finish(data, status_code=200)

Create a JSON response with proper content type.

Source code in aloha/service/http/base_api_handler.py
def finish(self, data: Any, status_code: int = 200) -> Response:
    """Create a JSON response with proper content type."""
    if isinstance(data, dict):
        content = json.dumps(data, ensure_ascii=False, default=str, separators=(",", ":"))
    elif isinstance(data, str):
        content = data
    else:
        content = json.dumps(data, ensure_ascii=False, default=str, separators=(",", ":"))
    return Response(content=content, status_code=status_code, media_type="application/json")

get_request_files()

Get uploaded files from multipart form data.

Source code in aloha/service/http/base_api_handler.py
def get_request_files(self) -> Dict[str, list]:
    """Get uploaded files from multipart form data."""
    if self._request is None:
        return {}
    return self._request._form

response(*args, **kwargs)

Subclasses must implement the business response.

Source code in aloha/service/http/base_api_handler.py
def response(self, *args, **kwargs) -> dict:
    """Subclasses must implement the business response."""
    raise NotImplementedError()

set_header(key, value)

Set a response header (no-op in base class, overridden in FastAPI route).

Source code in aloha/service/http/base_api_handler.py
def set_header(self, key: str, value: str) -> None:
    """Set a response header (no-op in base class, overridden in FastAPI route)."""
    pass

set_status(status_code, reason=None)

Set the response status code (no-op in base class).

Source code in aloha/service/http/base_api_handler.py
def set_status(self, status_code: int, reason: str = None) -> None:
    """Set the response status code (no-op in base class)."""
    pass

create_handler_route(handler_class)

Create a FastAPI route wrapper for a handler class.

Source code in aloha/service/http/base_api_handler.py
def create_handler_route(handler_class):
    """Create a FastAPI route wrapper for a handler class."""

    class HandlerRoute(APIRouter):
        async def _execute_handler(self, request: Request, **kwargs) -> Response:
            handler = handler_class()
            return await handler._handle_request(request, **kwargs)

    return HandlerRoute

FastAPI middleware and dependencies with permissive CORS defaults.

CORSMiddleware

Bases: BaseHTTPMiddleware

Middleware that adds permissive CORS headers to all responses.

Source code in aloha/service/http/plain_http_handler.py
class CORSMiddleware(BaseHTTPMiddleware):
    """Middleware that adds permissive CORS headers to all responses."""

    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        add_cors_headers(response)
        return response

CORSResponse

Bases: JSONResponse

JSON response with permissive CORS headers for simple APIs.

Source code in aloha/service/http/plain_http_handler.py
class CORSResponse(JSONResponse):
    """JSON response with permissive CORS headers for simple APIs."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    async def __call__(self, scope, receive, send) -> None:
        await super().__call__(scope, receive, send)

add_cors_headers(response)

Add permissive CORS headers to a response.

Source code in aloha/service/http/plain_http_handler.py
def add_cors_headers(response: Response) -> None:
    """Add permissive CORS headers to a response."""
    response.headers["Access-Control-Allow-Origin"] = "*"
    response.headers["Access-Control-Allow-Headers"] = "*"
    response.headers["Access-Control-Max-Age"] = "1000"
    response.headers["Content-Type"] = "application/json; charset=UTF-8"
    response.headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS"
    response.headers["Access-Control-Allow-Headers"] = (
        "authorization, Authorization, Content-Type,"
        "Access-Control-Allow-Origin, Access-Control-Allow-Headers,"
        "X-Requested-By, Access-Control-Allow-Methods"
    )

Helpers for handling multipart upload files and remote file inputs using httpx.

iter_over_request_files(request, url_files) async

Yield uploaded files and optional remote files as normalized tuples.

Each yielded item is (field_name, file_name, content_type, body_bytes). Files can come from multipart form uploads or from URLs listed in url_files.

Args: request: FastAPI request object with files attribute url_files: List of URLs to download files from

Source code in aloha/service/http/files.py
async def iter_over_request_files(request, url_files):
    """Yield uploaded files and optional remote files as normalized tuples.

    Each yielded item is `(field_name, file_name, content_type, body_bytes)`.
    Files can come from multipart form uploads or from URLs listed in
    `url_files`.

    Args:
        request: FastAPI request object with files attribute
        url_files: List of URLs to download files from
    """
    # Handle multipart uploaded files
    if hasattr(request, "files") and request.files:
        for file_key, files in request.files.items():
            for f in files:
                file_name = getattr(f, "filename", "unknown")
                content_type = getattr(f, "content_type", "application/octet-stream")
                body = await f.read()
                LOG.info(f"File {file_name} from multipart has content type {content_type} and length bytes={len(body)}")
                yield file_key, file_name, content_type, body

    # Handle files from URL
    for file_key, list_url in {"url_files": url_files or []}.items():
        for url in sorted(set(list_url)):
            try:
                t_start = time.time()
                async with httpx.AsyncClient(follow_redirects=True) as client:
                    resp = await client.get(url)
                    if resp.status_code == 200:
                        body = resp.content
                        content_type = resp.headers.get("Content-Type", "UNKNOWN")
                    else:
                        raise RuntimeError(
                            "Failed to download file after %s seconds with code=%s from URL %s"
                            % (time.time() - t_start, resp.status_code, url)
                        )
            except Exception as e:
                raise e
            t_cost = time.time() - t_start
            LOG.info(f"File {url} has content type {content_type} and length bytes={len(body)}, downloaded in {t_cost} seconds")
            yield "url_files", url, content_type, body

iter_over_request_files_sync(request, url_files)

Synchronous version of iter_over_request_files for backward compatibility.

This is a sync wrapper that uses httpx sync client.

Source code in aloha/service/http/files.py
def iter_over_request_files_sync(request, url_files):
    """Synchronous version of iter_over_request_files for backward compatibility.

    This is a sync wrapper that uses httpx sync client.
    """

    # Handle multipart uploaded files (from FastAPI form data)
    if hasattr(request, "_form"):
        form_data = request._form
        for file_key, files in form_data.multi_items():
            if isinstance(files, list):
                for f in files:
                    if hasattr(f, "read"):
                        body = f.read()
                        file_name = getattr(f, "filename", "unknown")
                        content_type = getattr(f, "content_type", "application/octet-stream")
                        LOG.info(
                            f"File {file_name} from multipart has content type {content_type} and length bytes={len(body)}"
                        )
                        yield file_key, file_name, content_type, body
            else:
                yield file_key, files, "text/plain", str(files).encode()

    # Handle files from URL
    for file_key, list_url in {"url_files": url_files or []}.items():
        for url in sorted(set(list_url)):
            try:
                t_start = time.time()
                with httpx.Client(follow_redirects=True) as client:
                    resp = client.get(url)
                    if resp.status_code == 200:
                        body = resp.content
                        content_type = resp.headers.get("Content-Type", "UNKNOWN")
                    else:
                        raise RuntimeError(
                            "Failed to download file after %s seconds with code=%s from URL %s"
                            % (time.time() - t_start, resp.status_code, url)
                        )
            except Exception as e:
                raise e
            t_cost = time.time() - t_start
            LOG.info(f"File {url} has content type {content_type} and length bytes={len(body)}, downloaded in {t_cost} seconds")
            yield "url_files", url, content_type, body

Client helper for OpenAPI-style services protected by tokens.

OpenApiClient

Simple HTTP client that acquires and caches an access token.

Source code in aloha/service/openapi/client.py
class OpenApiClient:
    """Simple HTTP client that acquires and caches an access token."""

    retry_method_whitelist = frozenset(["GET", "POST"])
    retry_status_forcelist = frozenset({413, 429, 503, 502, 504})

    def __init__(self, url_oauth_get_token: str, client_id: str, client_secret: str, grant_type: str = "client_credentials"):
        """Store OAuth-style client credentials and token endpoint."""
        self.url_oauth_get_token = url_oauth_get_token
        self.client_id = client_id
        self.client_secret = client_secret
        self.grant_type = grant_type

        self.expires_at = None
        self.access_token = None

    @classmethod
    def get_request_session(cls, total_retries: int = 10, *args, **kwargs) -> Session:
        """Create a retry-enabled requests session."""
        session = Session()
        # https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry.DEFAULT_ALLOWED_METHODS
        retries = Retry(
            total=total_retries,
            backoff_factor=0.1,
            method_whitelist=cls.retry_method_whitelist,
            status_forcelist=cls.retry_status_forcelist,
        )
        for prefix in ("http://", "https://"):
            session.mount(prefix, HTTPAdapter(max_retries=retries))
        return session

    def get_access_token(self) -> str:
        """Fetch or refresh the cached access token."""
        now = datetime.now()

        if self.expires_at is None or self.expires_at > now:
            try:
                # refresh access_token
                resp = self.get_request_session().post(
                    self.url_oauth_get_token,
                    timeout=5,
                    json={"client_id": self.client_id, "client_secret": self.client_secret, "grant_type": self.grant_type},
                )

                data = resp.json()["data"]
                if data is None or "access_token" not in data:
                    raise RuntimeError("Fail to fetch OpenAPI token with result: %s" % resp.text)

                self.access_token = data["access_token"]

                expires_in = int(data["expires_in"])
                self.expires_at = datetime.now() + timedelta(minutes=expires_in - 1)
            except Exception as e:
                msg = "Exception acquiring ESG access token from [%s]: %s" % (self.url_oauth_get_token, str(e))
                LOG.error(msg)

        return self.access_token

    def _get_request_url(self, url: str):
        """Attach access token and request id to the target URL."""
        request_url = "{url}?access_token={access_token}&request_id={request_id}".format(
            url=url, access_token=self.get_access_token(), request_id=datetime.now().strftime("%Y%m%d-%H%M%S-%f")
        )
        return request_url

    @staticmethod
    def _get_data_from_esg_response(resp) -> Optional[dict]:
        """Parse a JSON response and unwrap legacy ESG payloads."""
        try:
            return resp.json()
        except (json.JSONDecodeError, JSONDecodeError):  # requests may use `simplejson`
            try:
                # when data is wrapped by ESG
                content = resp.text.replace('"data":"', '"data":').replace('}"}', "}}")
                data = json.loads(content)
                return data.get("data", {})
            except json.JSONDecodeError:
                msg = "Cannot parse ESG response: %s" % resp.text
                raise ValueError(msg)

    def post(self, url_api: str, body: dict, headers: dict = None, timeout: int = 5):
        """Send a POST request to the remote API."""
        url = self._get_request_url(url_api)
        LOG.debug("Calling ESG POST: %s" % url)
        try:
            resp = self.get_request_session().post(url=url, headers=headers, json=body, timeout=timeout)
            return self._get_data_from_esg_response(resp)
        except Exception as e:
            LOG.error("Error calling ESG API POST [%s]: %s" % (url, str(e)))

    def get(self, url_api: str, body: dict, headers: dict = None, timeout: int = 5):
        """Send a GET request to the remote API."""
        url = self._get_request_url(url_api)
        LOG.debug("Calling ESG GET: %s" % url)
        try:
            resp = self.get_request_session().get(url=url, headers=headers, json=body, timeout=timeout)
            return self._get_data_from_esg_response(resp)
        except Exception as e:
            LOG.error("Error calling ESG API GET [%s]: %s" % (url, str(e)))

__init__(url_oauth_get_token, client_id, client_secret, grant_type='client_credentials')

Store OAuth-style client credentials and token endpoint.

Source code in aloha/service/openapi/client.py
def __init__(self, url_oauth_get_token: str, client_id: str, client_secret: str, grant_type: str = "client_credentials"):
    """Store OAuth-style client credentials and token endpoint."""
    self.url_oauth_get_token = url_oauth_get_token
    self.client_id = client_id
    self.client_secret = client_secret
    self.grant_type = grant_type

    self.expires_at = None
    self.access_token = None

get(url_api, body, headers=None, timeout=5)

Send a GET request to the remote API.

Source code in aloha/service/openapi/client.py
def get(self, url_api: str, body: dict, headers: dict = None, timeout: int = 5):
    """Send a GET request to the remote API."""
    url = self._get_request_url(url_api)
    LOG.debug("Calling ESG GET: %s" % url)
    try:
        resp = self.get_request_session().get(url=url, headers=headers, json=body, timeout=timeout)
        return self._get_data_from_esg_response(resp)
    except Exception as e:
        LOG.error("Error calling ESG API GET [%s]: %s" % (url, str(e)))

get_access_token()

Fetch or refresh the cached access token.

Source code in aloha/service/openapi/client.py
def get_access_token(self) -> str:
    """Fetch or refresh the cached access token."""
    now = datetime.now()

    if self.expires_at is None or self.expires_at > now:
        try:
            # refresh access_token
            resp = self.get_request_session().post(
                self.url_oauth_get_token,
                timeout=5,
                json={"client_id": self.client_id, "client_secret": self.client_secret, "grant_type": self.grant_type},
            )

            data = resp.json()["data"]
            if data is None or "access_token" not in data:
                raise RuntimeError("Fail to fetch OpenAPI token with result: %s" % resp.text)

            self.access_token = data["access_token"]

            expires_in = int(data["expires_in"])
            self.expires_at = datetime.now() + timedelta(minutes=expires_in - 1)
        except Exception as e:
            msg = "Exception acquiring ESG access token from [%s]: %s" % (self.url_oauth_get_token, str(e))
            LOG.error(msg)

    return self.access_token

get_request_session(total_retries=10, *args, **kwargs) classmethod

Create a retry-enabled requests session.

Source code in aloha/service/openapi/client.py
@classmethod
def get_request_session(cls, total_retries: int = 10, *args, **kwargs) -> Session:
    """Create a retry-enabled requests session."""
    session = Session()
    # https://urllib3.readthedocs.io/en/latest/reference/urllib3.util.html#urllib3.util.Retry.DEFAULT_ALLOWED_METHODS
    retries = Retry(
        total=total_retries,
        backoff_factor=0.1,
        method_whitelist=cls.retry_method_whitelist,
        status_forcelist=cls.retry_status_forcelist,
    )
    for prefix in ("http://", "https://"):
        session.mount(prefix, HTTPAdapter(max_retries=retries))
    return session

post(url_api, body, headers=None, timeout=5)

Send a POST request to the remote API.

Source code in aloha/service/openapi/client.py
def post(self, url_api: str, body: dict, headers: dict = None, timeout: int = 5):
    """Send a POST request to the remote API."""
    url = self._get_request_url(url_api)
    LOG.debug("Calling ESG POST: %s" % url)
    try:
        resp = self.get_request_session().post(url=url, headers=headers, json=body, timeout=timeout)
        return self._get_data_from_esg_response(resp)
    except Exception as e:
        LOG.error("Error calling ESG API POST [%s]: %s" % (url, str(e)))

Version 0 JSON API helpers for FastAPI.

This module defines the simplest request/response protocol used by aloha: request bodies are passed directly to the handler method and the response is serialized as a JSON object with a code and message field.

APICaller

Bases: AbstractApiClient

Client helper for v0 endpoints.

The payload is sent as-is, without signature wrapping or token exchange.

Source code in aloha/service/api/v0.py
class APICaller(AbstractApiClient):
    """Client helper for v0 endpoints.

    The payload is sent as-is, without signature wrapping or token exchange.
    """

    def wrap_request_data(self, data: dict) -> dict:
        """Return the request body unchanged."""
        assert isinstance(data, dict), "Data object must be a dict!"
        return data

wrap_request_data(data)

Return the request body unchanged.

Source code in aloha/service/api/v0.py
def wrap_request_data(self, data: dict) -> dict:
    """Return the request body unchanged."""
    assert isinstance(data, dict), "Data object must be a dict!"
    return data

APIHandler

Bases: AbstractApiHandler, ABC

Base handler for v0 JSON endpoints using FastAPI.

Subclasses implement :meth:response, which receives parsed request data and returns a Python object that can be JSON-serialized.

Source code in aloha/service/api/v0.py
class APIHandler(BaseHandler, ABC):
    """Base handler for v0 JSON endpoints using FastAPI.

    Subclasses implement :meth:`response`, which receives parsed request data
    and returns a Python object that can be JSON-serialized.
    """

    MAP_ERROR_INFO = {"BAD_REQUEST": {"code": "5101", "message": ["Bad request: fail to parse body as JSON object!"]}}

    async def post(self, *args, **kwargs):
        """Parse the request body, call :meth:`response`, and return JSON."""
        req_body = self.request_body

        if req_body is not None:
            kwargs.update(req_body)

        resp = dict(code=5200, message=["success"])
        try:
            result = self.response(*args, **kwargs)
            resp["data"] = result
        except Exception as e:
            if self.LOG.level == logging.DEBUG:
                self.LOG.error(e, exc_info=True)
            return self.finish({"code": 5201, "message": [repr(e)]})

        return self.finish(resp)

    async def get(self, *args, **kwargs):
        """Handle GET request (useful for some v0 endpoints)."""
        kwargs.update(self.request_param)
        resp = dict(code=5200, message=["success"])
        try:
            result = self.response(*args, **kwargs)
            resp["data"] = result
        except Exception as e:
            if self.LOG.level == logging.DEBUG:
                self.LOG.error(e, exc_info=True)
            return self.finish({"code": 5201, "message": [repr(e)]})
        return self.finish(resp)

get(*args, **kwargs) async

Handle GET request (useful for some v0 endpoints).

Source code in aloha/service/api/v0.py
async def get(self, *args, **kwargs):
    """Handle GET request (useful for some v0 endpoints)."""
    kwargs.update(self.request_param)
    resp = dict(code=5200, message=["success"])
    try:
        result = self.response(*args, **kwargs)
        resp["data"] = result
    except Exception as e:
        if self.LOG.level == logging.DEBUG:
            self.LOG.error(e, exc_info=True)
        return self.finish({"code": 5201, "message": [repr(e)]})
    return self.finish(resp)

post(*args, **kwargs) async

Parse the request body, call :meth:response, and return JSON.

Source code in aloha/service/api/v0.py
async def post(self, *args, **kwargs):
    """Parse the request body, call :meth:`response`, and return JSON."""
    req_body = self.request_body

    if req_body is not None:
        kwargs.update(req_body)

    resp = dict(code=5200, message=["success"])
    try:
        result = self.response(*args, **kwargs)
        resp["data"] = result
    except Exception as e:
        if self.LOG.level == logging.DEBUG:
            self.LOG.error(e, exc_info=True)
        return self.finish({"code": 5201, "message": [repr(e)]})

    return self.finish(resp)

create_v0_router(handler_class)

Create FastAPI routes for a v0 API handler class.

Args: handler_class: A class inheriting from APIHandler

Returns: A function that registers routes on a FastAPI app

Source code in aloha/service/api/v0.py
def create_v0_router(handler_class):
    """Create FastAPI routes for a v0 API handler class.

    Args:
        handler_class: A class inheriting from APIHandler

    Returns:
        A function that registers routes on a FastAPI app
    """

    async def handle_post(request: Request, **kwargs):
        handler = handler_class()
        handler._request = request

        # Get body for POST
        try:
            body = await request.json()
        except Exception:
            body = {}

        kwargs.update(body)
        resp = dict(code=5200, message=["success"])
        try:
            result = handler.response(**kwargs)
            resp["data"] = result
        except Exception as e:
            if handler.LOG.level == logging.DEBUG:
                handler.LOG.error(e, exc_info=True)
            return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

        return JSONResponse(resp)

    async def handle_get(request: Request, **kwargs):
        handler = handler_class()
        handler._request = request

        # Get query params for GET
        kwargs.update(dict(request.query_params))
        resp = dict(code=5200, message=["success"])
        try:
            result = handler.response(**kwargs)
            resp["data"] = result
        except Exception as e:
            if handler.LOG.level == logging.DEBUG:
                handler.LOG.error(e, exc_info=True)
            return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

        return JSONResponse(resp)

    return handle_post, handle_get

Version 1 signed JSON API helpers for FastAPI.

Version 1 adds request signing with app_id, salt_uuid, and sign fields. Handlers validate the signature before dispatching to the service logic.

APICaller

Bases: AbstractApiClient

Client helper that wraps payloads with v1 signing metadata.

Source code in aloha/service/api/v1.py
class APICaller(AbstractApiClient):
    """Client helper that wraps payloads with v1 signing metadata."""

    APP_ID_KEYS = AbstractApiClient.config.get("APP_ID_KEYS", {})

    def wrap_request_data(
        self,
        data,
        app_id: str | None = None,
        app_key: str | None = None,
        salt_uuid: str | None = None,
        sign: str | None = None,
        sign_method: str | None = None,
    ):
        """Wrap the payload with signature fields expected by v1 handlers."""
        if app_id is None:
            app_id = list(self.APP_ID_KEYS.keys())[0]
        salt_uuid = salt_uuid or str(uuid.uuid1())
        sign = sign or sign_data(
            salt_uuid=salt_uuid,
            app_id=app_id,
            app_key=app_key or self.APP_ID_KEYS.get(app_id),
            data=data,
            sign_method=sign_method,
        )
        return {"salt_uuid": salt_uuid, "app_id": app_id, "sign": sign, "data": data}

wrap_request_data(data, app_id=None, app_key=None, salt_uuid=None, sign=None, sign_method=None)

Wrap the payload with signature fields expected by v1 handlers.

Source code in aloha/service/api/v1.py
def wrap_request_data(
    self,
    data,
    app_id: str | None = None,
    app_key: str | None = None,
    salt_uuid: str | None = None,
    sign: str | None = None,
    sign_method: str | None = None,
):
    """Wrap the payload with signature fields expected by v1 handlers."""
    if app_id is None:
        app_id = list(self.APP_ID_KEYS.keys())[0]
    salt_uuid = salt_uuid or str(uuid.uuid1())
    sign = sign or sign_data(
        salt_uuid=salt_uuid,
        app_id=app_id,
        app_key=app_key or self.APP_ID_KEYS.get(app_id),
        data=data,
        sign_method=sign_method,
    )
    return {"salt_uuid": salt_uuid, "app_id": app_id, "sign": sign, "data": data}

APIHandler

Bases: AbstractApiHandler, ABC

Signed API handler for v1 endpoints.

Source code in aloha/service/api/v1.py
class APIHandler(BaseHandler, ABC):
    """Signed API handler for v1 endpoints."""

    MAP_ERROR_INFO = {
        "BAD_REQUEST": {"code": "5101", "message": ["Bad request: fail to parse body as JSON object!"]},
        "MISSING_ARGS": {"code": "5102", "message": ["Required argument field(s) missing..."]},
        "SIGN_CHECK_FAIL": {"code": "5104", "message": ["Invalid sign, sign check failed!"]},
    }

    async def post(self):
        """Validate the signature and dispatch the wrapped payload."""
        body_arguments = self.request_body

        try:
            salt_uuid = body_arguments.pop("salt_uuid")
            app_id = body_arguments.pop("app_id")
            sign = body_arguments.pop("sign")
            data = body_arguments.pop("data")
        except KeyError:
            return self.finish(self.MAP_ERROR_INFO["MISSING_ARGS"])

        is_valid_req = sign_check(salt_uuid=salt_uuid, app_id=app_id, sign=sign, data=data)
        if not is_valid_req:
            return self.finish(self.MAP_ERROR_INFO["SIGN_CHECK_FAIL"])

        resp = dict(code=5200, message=["success"])
        try:
            result = self.response(**data)
            resp["data"] = result
            resp["salt_uuid"] = salt_uuid
        except Exception as e:
            if self.LOG.level == logging.DEBUG:
                self.LOG.error(e, exc_info=True)
            return self.finish({"code": 5201, "message": [repr(e)]})

        return self.finish(resp)

post() async

Validate the signature and dispatch the wrapped payload.

Source code in aloha/service/api/v1.py
async def post(self):
    """Validate the signature and dispatch the wrapped payload."""
    body_arguments = self.request_body

    try:
        salt_uuid = body_arguments.pop("salt_uuid")
        app_id = body_arguments.pop("app_id")
        sign = body_arguments.pop("sign")
        data = body_arguments.pop("data")
    except KeyError:
        return self.finish(self.MAP_ERROR_INFO["MISSING_ARGS"])

    is_valid_req = sign_check(salt_uuid=salt_uuid, app_id=app_id, sign=sign, data=data)
    if not is_valid_req:
        return self.finish(self.MAP_ERROR_INFO["SIGN_CHECK_FAIL"])

    resp = dict(code=5200, message=["success"])
    try:
        result = self.response(**data)
        resp["data"] = result
        resp["salt_uuid"] = salt_uuid
    except Exception as e:
        if self.LOG.level == logging.DEBUG:
            self.LOG.error(e, exc_info=True)
        return self.finish({"code": 5201, "message": [repr(e)]})

    return self.finish(resp)

create_v1_router(handler_class)

Create FastAPI routes for a v1 API handler class with signing validation.

Args: handler_class: A class inheriting from APIHandler

Returns: An async function that handles v1 signed requests

Source code in aloha/service/api/v1.py
def create_v1_router(handler_class):
    """Create FastAPI routes for a v1 API handler class with signing validation.

    Args:
        handler_class: A class inheriting from APIHandler

    Returns:
        An async function that handles v1 signed requests
    """

    async def handle_post(request: Request, **kwargs):
        try:
            body = await request.json()
        except Exception:
            return JSONResponse(
                {"code": "5101", "message": ["Bad request: fail to parse body as JSON object!"]}, status_code=400
            )

        try:
            salt_uuid = body.pop("salt_uuid")
            app_id = body.pop("app_id")
            sign = body.pop("sign")
            data = body.pop("data")
        except KeyError:
            return JSONResponse({"code": "5102", "message": ["Required argument field(s) missing..."]}, status_code=400)

        is_valid_req = sign_check(salt_uuid=salt_uuid, app_id=app_id, sign=sign, data=data)
        if not is_valid_req:
            return JSONResponse({"code": "5104", "message": ["Invalid sign, sign check failed!"]}, status_code=401)

        handler = handler_class()
        handler._request = request

        resp = dict(code=5200, message=["success"])
        try:
            result = handler.response(**data)
            resp["data"] = result
            resp["salt_uuid"] = salt_uuid
        except Exception as e:
            if handler.LOG.level == logging.DEBUG:
                handler.LOG.error(e, exc_info=True)
            return JSONResponse({"code": 5201, "message": [repr(e)]}, status_code=500)

        return JSONResponse(resp)

    return handle_post

sign_check(salt_uuid, app_id, sign, data, sign_method=None, date_time=None)

Validate a v1 request signature.

Source code in aloha/service/api/v1.py
def sign_check(salt_uuid: str, app_id: str, sign: str, data, sign_method: str = None, date_time=None):
    """Validate a v1 request signature."""

    func_sign_check = func_sign_check_default if sign_method is None else FUNC_SIGN_CHECK.get(sign_method)
    if func_sign_check is None:
        raise ValueError("Invalid `sign_method`: %s" % sign_method)

    app_key = APP_ID_KEYS.get(app_id)
    if app_key is None:
        return False

    # --> Compatible with older version API
    right_sign = func_sign_check(app_id + salt_uuid + app_key)
    if sign == right_sign:
        return True
    # <--

    public_key = str(json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":")))
    public_key = app_id + salt_uuid + public_key + app_key
    right_sign = func_sign_check(public_key)
    return sign == right_sign

sign_data(salt_uuid, app_id, app_key, data, sign_method=None)

Generate the v1 signature for a payload.

The signature is based on app_id + salt_uuid + data + app_key.

Source code in aloha/service/api/v1.py
def sign_data(salt_uuid: str, app_id: str, app_key: str, data, sign_method: str = None):
    """Generate the v1 signature for a payload.

    The signature is based on `app_id + salt_uuid + data + app_key`.
    """
    data_str = str(json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":")))
    public_key = app_id + salt_uuid + data_str + app_key

    func_sign_check = func_sign_check_default if sign_method is None else FUNC_SIGN_CHECK.get(sign_method)
    if func_sign_check is None:
        raise ValueError("Invalid `sign_method`: %s" % sign_method)
    sign = func_sign_check(public_key)
    return sign

Version 2 token-based JSON API helpers for FastAPI.

Version 2 uses an access token in the request header and a request-id header for tracing. It keeps the same request/response shape as the earlier API generations while adding header-based authentication.

APICaller

Bases: AbstractApiClient

Client helper that adds v2 access-token headers automatically.

Source code in aloha/service/api/v2.py
class APICaller(AbstractApiClient):
    """Client helper that adds v2 access-token headers automatically."""

    APP_ID_KEYS = AbstractApiClient.config.get("APP_ID_KEYS", {})
    APP_SECRET_KEY = AbstractApiClient.config.get("APP_SECRET_KEY")

    def wrap_request_data(self, data: dict) -> dict:
        """Return the request body unchanged."""
        assert isinstance(data, dict), "Data object must be a dict!"
        return data

    def get_headers(self, app_id: str = None, app_key: str = None) -> dict:
        """Build the HTTP headers expected by v2 handlers."""
        if app_id is None:
            app_id = list(self.APP_ID_KEYS.keys())[0]

        expire_time = datetime.now() + timedelta(days=1)

        access_token = jwt.encode(secret_key=self.APP_SECRET_KEY, payload={"exp": int(expire_time.timestamp()), "aid": app_id})

        headers = super().get_headers()
        headers.update({"Access-Token": access_token})
        return headers

get_headers(app_id=None, app_key=None)

Build the HTTP headers expected by v2 handlers.

Source code in aloha/service/api/v2.py
def get_headers(self, app_id: str = None, app_key: str = None) -> dict:
    """Build the HTTP headers expected by v2 handlers."""
    if app_id is None:
        app_id = list(self.APP_ID_KEYS.keys())[0]

    expire_time = datetime.now() + timedelta(days=1)

    access_token = jwt.encode(secret_key=self.APP_SECRET_KEY, payload={"exp": int(expire_time.timestamp()), "aid": app_id})

    headers = super().get_headers()
    headers.update({"Access-Token": access_token})
    return headers

wrap_request_data(data)

Return the request body unchanged.

Source code in aloha/service/api/v2.py
def wrap_request_data(self, data: dict) -> dict:
    """Return the request body unchanged."""
    assert isinstance(data, dict), "Data object must be a dict!"
    return data

APIHandler

Bases: AbstractApiHandler, ABC

Token-authenticated API handler for v2 endpoints.

Source code in aloha/service/api/v2.py
class APIHandler(BaseHandler, ABC):
    """Token-authenticated API handler for v2 endpoints."""

    async def prepare(self) -> Optional[Response]:
        """Validate the access token before handling the request."""
        access_token = self._request.headers.get("Access-Token")
        if access_token is None:
            return self.finish({"msg": "Invalid Access-Token in request header!"})
        else:
            secret_key = SETTINGS.config["APP_SECRET_KEY"]
            options = {"verify_exp": False}
            access_token = jwt.decode(secret_key, access_token, options=options)
            if not isinstance(access_token, dict):
                msg = "Invalid Access-Token found in request for [%s]: %s" % (str(self._request.url), access_token)
                self.LOG.error(msg)
                return self.finish({"msg": msg})
        return None

    async def post(self, *args, **kwargs):
        """Handle POST requests with JSON request bodies."""
        body_arguments = self.request_body
        kwargs.update(body_arguments)
        try:
            if self.LOG.level == logging.DEBUG:
                s_kwargs = json.dumps(kwargs, ensure_ascii=False)
                self.LOG.debug("POST Request [%s]: %s" % (self.request_id, s_kwargs[:1000]))
            self.api_args, self.api_kwargs = args or (), kwargs or {}
            resp = self.response(*self.api_args, **self.api_kwargs)
        except Exception as e:
            self.LOG.info("POST Request [%s]: %s" % (self.request_id, self._request._body))
            msgs = ["An internal error has occurred!", str(e)]
            self.LOG.error(e, exc_info=True)
            return self.finish({"status": "error", "message": msgs})

        return self.finish(resp)

    async def get(self, *args, **kwargs):
        """Handle GET requests with query-string arguments."""
        query_arguments = self.request_param
        kwargs.update(query_arguments)
        try:
            self.LOG.debug("GET Request [%s]: %s" % (self.request_id, kwargs))
            self.api_args, self.api_kwargs = args or (), kwargs or {}
            resp = self.response(*self.api_args, **self.api_kwargs)
        except Exception as e:
            self.LOG.info("GET Request [%s]: %s" % (self.request_id, kwargs))
            msgs = ["An internal error has occurred!", str(e)]
            self.LOG.error(e, exc_info=True)
            return self.finish({"status": "error", "message": msgs})

        return self.finish(resp)

get(*args, **kwargs) async

Handle GET requests with query-string arguments.

Source code in aloha/service/api/v2.py
async def get(self, *args, **kwargs):
    """Handle GET requests with query-string arguments."""
    query_arguments = self.request_param
    kwargs.update(query_arguments)
    try:
        self.LOG.debug("GET Request [%s]: %s" % (self.request_id, kwargs))
        self.api_args, self.api_kwargs = args or (), kwargs or {}
        resp = self.response(*self.api_args, **self.api_kwargs)
    except Exception as e:
        self.LOG.info("GET Request [%s]: %s" % (self.request_id, kwargs))
        msgs = ["An internal error has occurred!", str(e)]
        self.LOG.error(e, exc_info=True)
        return self.finish({"status": "error", "message": msgs})

    return self.finish(resp)

post(*args, **kwargs) async

Handle POST requests with JSON request bodies.

Source code in aloha/service/api/v2.py
async def post(self, *args, **kwargs):
    """Handle POST requests with JSON request bodies."""
    body_arguments = self.request_body
    kwargs.update(body_arguments)
    try:
        if self.LOG.level == logging.DEBUG:
            s_kwargs = json.dumps(kwargs, ensure_ascii=False)
            self.LOG.debug("POST Request [%s]: %s" % (self.request_id, s_kwargs[:1000]))
        self.api_args, self.api_kwargs = args or (), kwargs or {}
        resp = self.response(*self.api_args, **self.api_kwargs)
    except Exception as e:
        self.LOG.info("POST Request [%s]: %s" % (self.request_id, self._request._body))
        msgs = ["An internal error has occurred!", str(e)]
        self.LOG.error(e, exc_info=True)
        return self.finish({"status": "error", "message": msgs})

    return self.finish(resp)

prepare() async

Validate the access token before handling the request.

Source code in aloha/service/api/v2.py
async def prepare(self) -> Optional[Response]:
    """Validate the access token before handling the request."""
    access_token = self._request.headers.get("Access-Token")
    if access_token is None:
        return self.finish({"msg": "Invalid Access-Token in request header!"})
    else:
        secret_key = SETTINGS.config["APP_SECRET_KEY"]
        options = {"verify_exp": False}
        access_token = jwt.decode(secret_key, access_token, options=options)
        if not isinstance(access_token, dict):
            msg = "Invalid Access-Token found in request for [%s]: %s" % (str(self._request.url), access_token)
            self.LOG.error(msg)
            return self.finish({"msg": msg})
    return None

create_v2_router(handler_class)

Create FastAPI routes for a v2 API handler class with JWT token validation.

Args: handler_class: A class inheriting from APIHandler

Returns: Tuple of (handle_post, handle_get) functions for the routes

Source code in aloha/service/api/v2.py
def create_v2_router(handler_class):
    """Create FastAPI routes for a v2 API handler class with JWT token validation.

    Args:
        handler_class: A class inheriting from APIHandler

    Returns:
        Tuple of (handle_post, handle_get) functions for the routes
    """

    async def handle_post(request: Request, token_payload: Dict = Depends(verify_v2_token)):
        handler = handler_class()
        handler._request = request

        try:
            body = await request.json()
        except Exception:
            body = {}

        kwargs = body
        try:
            if handler.LOG.level == logging.DEBUG:
                s_kwargs = json.dumps(kwargs, ensure_ascii=False)
                handler.LOG.debug("POST Request [%s]: %s" % (handler.request_id, s_kwargs[:1000]))

            resp = handler.response(**kwargs)
        except Exception as e:
            handler.LOG.error(e, exc_info=True)
            msgs = ["An internal error has occurred.", str(e)]
            return JSONResponse({"status": "error", "message": msgs}, status_code=500)

        return handler.finish(resp)

    async def handle_get(request: Request, token_payload: Dict = Depends(verify_v2_token)):
        handler = handler_class()
        handler._request = request

        kwargs = dict(request.query_params)
        try:
            handler.LOG.debug("GET Request [%s]: %s" % (handler.request_id, kwargs))
            resp = handler.response(**kwargs)
        except Exception as e:
            handler.LOG.error(e, exc_info=True)
            msgs = ["An internal error has occurred.", repr(e)]
            return JSONResponse({"status": "error", "message": msgs}, status_code=500)

        return handler.finish(resp)

    return handle_post, handle_get

verify_v2_token(request)

Dependency to verify v2 access token.

Returns the decoded token payload if valid, otherwise raises HTTPException.

Source code in aloha/service/api/v2.py
def verify_v2_token(request: Request) -> Optional[Dict[str, Any]]:
    """Dependency to verify v2 access token.

    Returns the decoded token payload if valid, otherwise raises HTTPException.
    """

    access_token = request.headers.get("Access-Token")
    if access_token is None:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Access-Token in request header!")

    secret_key = SETTINGS.config.get("APP_SECRET_KEY")
    if not secret_key:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="APP_SECRET_KEY not configured!")

    options = {"verify_exp": False}
    try:
        payload = jwt.decode(secret_key, access_token, options=options)
        if not isinstance(payload, dict):
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Access-Token!")
        return payload
    except Exception as e:
        LOG.error(str(e), exc_info=True)
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Access-Token!")