# server_http.py
# pylint: disable=line-too-long,broad-exception-raised,broad-exception-caught,import-outside-toplevel,protected-access
"""HTTP server SDK implementation for App Mesh."""
# Standard library imports
import logging
import os
import time
from http import HTTPStatus
from typing import Optional, Tuple, Union
# Local imports
from .client_http import AppMeshClient
from .exceptions import AppMeshError
logger = logging.getLogger(__name__)
[docs]
class AppMeshServer:
"""Server SDK for App Mesh application interacting with the local App Mesh REST service over HTTPS.
Build-in runtime environment variables required:
- APP_MESH_PROCESS_KEY
- APP_MESH_APPLICATION_NAME
Methods:
- task_fetch(): fetch invocation payloads
- task_return(): return results to the invoking client
Example:
context = appmesh.AppMeshServer()
payload = context.task_fetch()
result = do_something_with(payload)
context.task_return(result)
"""
_RETRY_DELAY_SECONDS = 0.1
def __init__(
self,
base_url: str = "https://127.0.0.1:6060",
ssl_verify: Union[bool, str] = AppMeshClient._DEFAULT_SSL_CA_CERT_PATH,
ssl_client_cert: Optional[Union[str, Tuple[str, str]]] = None,
request_timeout: Tuple[float, float] = (60, 300),
*,
client: Optional[AppMeshClient] = None,
logger_: Optional[logging.Logger] = None,
):
"""Initialize a server-side helper for task fetch/return.
Args:
base_url: The server's base URI. Defaults to "https://127.0.0.1:6060".
ssl_verify: SSL server verification mode.
ssl_client_cert: SSL client certificate file(s).
request_timeout: Timeouts `(connect_timeout, read_timeout)` in seconds.
client: Pre-configured AppMeshClient instance (used by TCP/WSS subclasses so all
transports share the same task API).
logger_: Optional logger instance.
"""
self._client = client or AppMeshClient(base_url, ssl_verify, ssl_client_cert, request_timeout, auto_refresh_token=False) # Server endpoints use APP_MESH_PROCESS_KEY; no JWT refresh needed.
self._logger = logger_ or logger
@staticmethod
def _get_runtime_env() -> Tuple[str, str]:
"""Read and validate required runtime environment variables."""
process_key = os.getenv("APP_MESH_PROCESS_KEY")
app_name = os.getenv("APP_MESH_APPLICATION_NAME")
if not process_key:
raise AppMeshError("Missing environment variable: APP_MESH_PROCESS_KEY. This must be set by App Mesh service.")
if not app_name:
raise AppMeshError("Missing environment variable: APP_MESH_APPLICATION_NAME. This must be set by App Mesh service.")
return process_key, app_name
[docs]
def task_fetch(self) -> Union[str, bytes]:
"""Fetch task data in the currently running App Mesh application process.
Used by App Mesh application process to obtain the payload from App Mesh service
that a client pushed to it. Retries indefinitely until successful.
If a request fails within 100ms, sleeps briefly before retrying;
otherwise retries immediately.
Returns:
The payload bytes provided by the invoking client.
"""
pkey, app_name = self._get_runtime_env()
path = f"/appmesh/app/{app_name}/task"
query_params = {"process_key": pkey}
while True:
attempt_start = time.monotonic()
try:
resp = self._client._request_http(
AppMeshClient._Method.GET,
path=path,
query=query_params,
raise_on_fail=False,
)
if resp.status_code == HTTPStatus.OK:
return resp.content
if resp.status_code == HTTPStatus.PRECONDITION_FAILED:
self._logger.error("Process key mismatch (412): this process has been superseded, exiting")
raise SystemExit(1)
self._logger.warning("task_fetch failed with status %d: %s, retrying...", resp.status_code, resp.text)
except SystemExit:
raise
except Exception as ex:
self._logger.warning("task_fetch request failed: %s, retrying...", ex)
remaining = self._RETRY_DELAY_SECONDS - (time.monotonic() - attempt_start)
if remaining > 0:
time.sleep(remaining)
[docs]
def task_return(self, result: Union[str, bytes]) -> None:
"""Return the result of a server-side invocation back to the original client.
Used by App Mesh application process to post the `result` to App Mesh service
after processing payload data so the invoking client can retrieve it.
Args:
result: Result payload to be delivered back to the client exactly as provided.
"""
pkey, app_name = self._get_runtime_env()
path = f"/appmesh/app/{app_name}/task"
query_params = {"process_key": pkey}
resp = self._client._request_http(
AppMeshClient._Method.PUT,
path=path,
query=query_params,
body=result,
)
if resp.status_code != HTTPStatus.OK:
msg = f"task_return failed with status {resp.status_code}: {resp.text}"
self._logger.error(msg)
raise AppMeshError(msg)