# client_http.py
# pylint: disable=broad-exception-raised,line-too-long,broad-exception-caught,too-many-lines,import-outside-toplevel
"""App Mesh HTTP Client SDK for REST API interactions."""
# Standard library imports
import base64
import json
import locale
import logging
import os
import sys
import threading
import time
import warnings
from contextlib import suppress
from datetime import datetime
from enum import Enum, unique
from http import HTTPStatus
from http.cookiejar import Cookie, CookieJar, MozillaCookieJar
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib import parse
# Third-party imports
import aniso8601
import jwt
import requests
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
# Local imports
from .app import App
from .app_output import AppOutput
from .app_run import AppRun
from .exceptions import AppMeshAuthError, AppMeshRequestError
logger = logging.getLogger(__name__)
[docs]
class AppMeshClient:
"""
Client SDK for interacting with the App Mesh service via REST API.
The `AppMeshClient` class provides a comprehensive interface for managing and monitoring distributed applications
within the App Mesh ecosystem. It enables communication with the App Mesh REST API for operations such as
application lifecycle management, monitoring, and configuration.
This client is designed for direct usage in applications that require access to App Mesh services over HTTP-based REST.
Attributes:
- TLS (Transport Layer Security): Supports secure connections between the client and App Mesh service,
ensuring encrypted communication.
- JWT (JSON Web Token) and RBAC (Role-Based Access Control): Provides secure API access with
token-based authentication and authorization to enforce fine-grained permissions.
Methods:
# Authentication Management
- login()
- logout()
- authenticate()
- renew_token()
- disable_totp()
- get_totp_secret()
- enable_totp()
# Application Management
- add_app()
- delete_app()
- disable_app()
- enable_app()
- check_app_health()
- get_app_output()
- get_app()
- list_apps()
# Run Application Operations
- run_app_async()
- wait_for_async_run()
- run_app_sync()
- run_task()
- cancel_task()
# System Management
- forward_to
- set_config()
- get_config()
- set_log_level()
- get_host_resources()
- get_metrics()
- add_label()
- delete_label()
- list_labels()
# File Management
- download_file()
- upload_file()
# User and Role Management
- add_user()
- delete_user()
- lock_user()
- update_password()
- get_current_user()
- unlock_user()
- list_users()
- get_user_permissions()
- list_permissions()
- delete_role()
- update_role()
- list_roles()
- list_groups()
Example:
>>> python -m pip install --upgrade appmesh
>>> from appmesh import AppMeshClient
>>> client = AppMeshClient()
>>> client.login("your-name", "your-password")
>>> client.authenticate("your-token-for-token-login")
>>> response = client.get_app(app_name='ping')
"""
# Polling interval for wait_for_async_run (seconds for HTTP, overridden by TCP/WSS)
_POLL_INTERVAL = 1
# Duration constants
_DURATION_ONE_WEEK_ISO = "P1W"
_DURATION_TWO_DAYS_ISO = "P2D"
_DURATION_TWO_DAYS_HALF_ISO = "P2DT12H"
_TOKEN_REFRESH_INTERVAL = 300 # 5 min to refresh token
_TOKEN_REFRESH_OFFSET = 30 # 30s before token expire to refresh token
# Platform-aware default SSL paths (only used if directory exists)
_DEFAULT_SSL_DIR = Path("c:/local/appmesh/ssl" if os.name == "nt" else "/opt/appmesh/ssl")
if _DEFAULT_SSL_DIR.is_dir():
_DEFAULT_SSL_CA_CERT_PATH = str(_DEFAULT_SSL_DIR / "ca.pem")
_DEFAULT_SSL_CLIENT_CERT_PATH = str(_DEFAULT_SSL_DIR / "client.pem")
_DEFAULT_SSL_CLIENT_KEY_PATH = str(_DEFAULT_SSL_DIR / "client-key.pem")
else:
_DEFAULT_SSL_CA_CERT_PATH = False # No certs available, skip verification
_DEFAULT_SSL_CLIENT_CERT_PATH = None
_DEFAULT_SSL_CLIENT_KEY_PATH = None
# JWT constants
_DEFAULT_JWT_AUDIENCE = "appmesh-service"
# HTTP headers and constants
_JSON_KEY_MESSAGE = "message"
_HTTP_USER_AGENT = "appmesh/python"
_HTTP_HEADER_KEY_AUTH = "Authorization"
_HTTP_HEADER_KEY_USER_AGENT = "User-Agent"
_HTTP_HEADER_KEY_X_TARGET_HOST = "X-Target-Host"
_HTTP_HEADER_KEY_X_FILE_PATH = "X-File-Path"
_HTTP_HEADER_JWT_SET_COOKIE = "X-Set-Cookie"
_HTTP_HEADER_NAME_CSRF_TOKEN = "X-CSRF-Token"
_COOKIE_TOKEN = "appmesh_auth_token"
_COOKIE_CSRF_TOKEN = "appmesh_csrf_token"
@unique
class _Method(Enum):
"""REST methods"""
GET = "GET"
PUT = "PUT"
POST = "POST"
DELETE = "DELETE"
POST_STREAM = "POST_STREAM"
class _EncodingResponse(requests.Response):
"""Response subclass that handles encoding conversion on Windows."""
def __init__(self, response: requests.Response):
super().__init__()
self.__dict__.update(response.__dict__)
self._converted_text = None
self._should_convert = False
# Check if we need to convert encoding on Windows
if sys.platform == "win32":
content_type = response.headers.get("Content-Type", "").lower()
is_ok = response.status_code == HTTPStatus.OK
is_utf8_text = "text/plain" in content_type and "utf-8" in content_type
if is_ok and is_utf8_text:
try:
local_encoding = locale.getpreferredencoding()
if local_encoding.lower() not in {"utf-8", "utf8"}:
# Ensure response is decoded as UTF-8 first
self.encoding = "utf-8"
utf8_text = self.text # This gives us proper Unicode string
with suppress(UnicodeEncodeError, LookupError):
# Convert Unicode to local encoding, then back to Unicode
local_bytes = utf8_text.encode(local_encoding, errors="replace")
self._converted_text = local_bytes.decode(local_encoding)
self._should_convert = True
except (UnicodeError, LookupError):
self.encoding = "utf-8"
@property
def text(self):
"""Return converted text if needed, otherwise original text."""
if self._should_convert and self._converted_text is not None:
return self._converted_text
return super().text
def __init__(
self,
base_url: str = "https://127.0.0.1:6060",
ssl_verify: Union[bool, str] = _DEFAULT_SSL_CA_CERT_PATH,
ssl_client_cert: Optional[Union[str, Tuple[str, str]]] = (
(_DEFAULT_SSL_CLIENT_CERT_PATH, _DEFAULT_SSL_CLIENT_KEY_PATH)
if _DEFAULT_SSL_CLIENT_CERT_PATH
else None
),
request_timeout: Tuple[float, float] = (60, 300),
jwt_token: Optional[str] = None,
cookie_file: Optional[str] = None,
auto_refresh_token: bool = False,
):
"""Initialize an App Mesh HTTP client for interacting with the App Mesh server via secure HTTPS.
Args:
base_url: The server's base URI. Defaults to "https://127.0.0.1:6060".
ssl_verify: SSL server verification mode:
- True: Use system CAs.
- False: Disable verification (insecure).
- str: Path to custom CA or directory. To include system CAs, combine them into one file (e.g., cat custom_ca.pem /etc/ssl/certs/ca-certificates.crt > combined_ca.pem).
ssl_client_cert: SSL client certificate file(s):
- str: Single PEM file with cert+key
- tuple: (cert_path, key_path)
request_timeout: Timeouts `(connect_timeout, read_timeout)` in seconds. Default `(60, 300)`.
jwt_token: JWT token set directly without server verification (no network call).
cookie_file: Cookie file path for HTTP clients (set this to enable persistent cookie storage).
auto_refresh_token: Enable automatic token refresh before expiration (supports App Mesh and Keycloak tokens).
"""
self._ensure_logging_configured()
self.base_url = base_url
self.ssl_verify = ssl_verify
self.ssl_client_cert = ssl_client_cert
self.request_timeout = request_timeout
self._forward_to = None
# Token auto-refresh (single background thread + Event-based wake)
self._auto_refresh_token = auto_refresh_token
self._refresh_thread = None
self._refresh_stop = threading.Event()
self._refresh_wake = threading.Event()
# Session and cookie management
self._lock = threading.Lock()
self.session = requests.Session()
self.cookie_file = cookie_file
if self._load_cookies(cookie_file):
if self._auto_refresh_token and self._get_access_token():
self.start_token_refresh()
if jwt_token:
self.set_token(jwt_token)
@staticmethod
def _ensure_logging_configured() -> None:
"""Ensure logging is configured with a default console handler if needed."""
if not logging.root.handlers:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
def _get_access_token(self) -> Optional[str]:
"""Get the current access token."""
return self._get_cookie_value(self.session.cookies, self._COOKIE_TOKEN)
def _load_cookies(self, cookie_file: Optional[str]) -> bool:
"""Load cookies from a Mozilla-format file into the session."""
if not cookie_file:
return False
cookie_path = Path(cookie_file)
self.session.cookies = MozillaCookieJar(cookie_file)
if cookie_path.exists():
self.session.cookies.load(ignore_discard=True, ignore_expires=True)
return True
# Defer empty file creation to _on_token_changed (first actual write)
return False
@staticmethod
def _get_cookie_value(
cookies: Union[RequestsCookieJar, CookieJar], name: str, check_expiry: bool = True
) -> Optional[str]:
"""Get cookie value by name, checking expiry if requested."""
if not cookies or not name:
return None
# Fast path for RequestsCookieJar (default in requests.Session)
if isinstance(cookies, RequestsCookieJar):
cookie = cookies.get(name)
if not cookie:
return None
# Some requests versions return a string directly, others a Cookie object
if hasattr(cookie, "expires"):
if check_expiry and cookie.expires and cookie.expires < time.time():
return None # expired
return getattr(cookie, "value", None)
# Otherwise, assume the cookie is a plain value
return str(cookie)
# Generic CookieJar or derived types (MozillaCookieJar)
for cookie in cookies:
if cookie.name == name:
if check_expiry and cookie.expires and cookie.expires < time.time():
return None # expired
return cookie.value
return None
def _compute_refresh_delay(self) -> float:
"""Compute seconds to wait before next token refresh attempt."""
with suppress(Exception):
token = self._get_access_token()
if token:
exp = jwt.decode(token, options={"verify_signature": False}).get("exp", 0)
remaining = exp - time.time()
if remaining <= self._TOKEN_REFRESH_OFFSET:
return 1 # Expiring soon, refresh immediately
return max(1, min(remaining - self._TOKEN_REFRESH_OFFSET, self._TOKEN_REFRESH_INTERVAL))
return self._TOKEN_REFRESH_INTERVAL
def _token_refresh_loop(self) -> None:
"""Background thread: sleep → renew → repeat until stopped."""
while not self._refresh_stop.is_set():
delay = self._compute_refresh_delay()
self._refresh_wake.clear()
self._refresh_wake.wait(timeout=delay)
if self._refresh_stop.is_set():
break
if self._refresh_wake.is_set():
continue # Token changed externally, recompute delay
try:
self.renew_token()
logger.info("Token successfully refreshed")
except AppMeshAuthError as e:
# Token rejected (401/403) → dead token; retrying only spams logs.
logger.warning("Token refresh halted (auth rejected): %s", e)
return
except Exception as e:
# Network timeout, daemon down, SSL error, etc. — stop the loop.
# The current token stays valid until exp; the next user request
# will surface the connection issue with full context. Background
# spinning at ~1 Hz once exp approaches only floods the log.
logger.warning("Token refresh halted (%s: %s); call login() to resume", type(e).__name__, e)
return
[docs]
def start_token_refresh(self) -> None:
"""Start background token auto-refresh."""
if not self._auto_refresh_token:
return
if self._refresh_thread and self._refresh_thread.is_alive():
return
self._refresh_stop.clear()
self._refresh_wake.clear()
self._refresh_thread = threading.Thread(target=self._token_refresh_loop, daemon=True)
self._refresh_thread.start()
[docs]
def stop_token_refresh(self) -> None:
"""Stop background token auto-refresh."""
self._refresh_stop.set()
self._refresh_wake.set() # Wake thread so it exits promptly
if self._refresh_thread and self._refresh_thread.is_alive():
self._refresh_thread.join(timeout=5)
self._refresh_thread = None
[docs]
def close(self) -> None:
"""Close the client and release resources."""
self._auto_refresh_token = False
self.stop_token_refresh()
if self.session:
self.session.close()
self.session = None
def __enter__(self):
"""Support for context manager protocol."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Support for context manager protocol, ensuring resources are released."""
self.close()
def __del__(self):
"""Ensure resources are released when the object is garbage collected."""
try:
self.close()
except Exception: # pylint: disable=broad-exception-caught
pass # suppress all exceptions
def _on_token_changed(self, token: Optional[str]) -> None:
"""Notify that a token was updated: persist cookies, start/wake refresh thread."""
# Persist cookies (create file on first write)
if self.cookie_file:
cookie_path = Path(self.cookie_file)
if not cookie_path.exists():
cookie_path.parent.mkdir(parents=True, exist_ok=True)
with self._lock:
self.session.cookies.save(ignore_discard=True, ignore_expires=True)
if os.name == "posix":
try:
if cookie_path.stat().st_mode & 0o077:
cookie_path.chmod(0o600)
except OSError:
pass
# Start refresh thread if needed and wake it to recompute delay
if token and self._auto_refresh_token:
self.start_token_refresh()
self._refresh_wake.set()
@property
def forward_to(self) -> str:
"""Target host for request forwarding in a cluster.
Supports:
- "hostname" or "IP" → uses current service port
- "hostname:port" or "IP:port" → uses specified port
Returns:
str: Target host (e.g., "node" or "node:6060"), or empty string if unset.
Notes:
For JWT sharing across the cluster:
- All nodes must use the same `JWTSalt` and `Issuer` for JWT settings
- If port is omitted, current service port is used
"""
return self._forward_to or ""
@forward_to.setter
def forward_to(self, host: str) -> None:
"""Set target host for forwarding.
Examples:
>>> client.forward_to = "backend-node:6060" # Use specific port
>>> client.forward_to = "backend-node" # Use current service port
>>> client.forward_to = None # Disable forwarding
"""
self._forward_to = host
########################################
# Security
########################################
[docs]
def login(
self,
username: str,
password: str,
totp_code: Optional[str] = None,
token_expire: Union[str, int] = _DURATION_ONE_WEEK_ISO,
audience: Optional[str] = None,
) -> Optional[str]:
"""Login with username and password and attach the issued token to this client.
Args:
username: The name of the user.
password: The password of the user.
totp_code: The TOTP code if enabled for the user.
token_expire: Token expiration duration. Supports ISO 8601 durations (e.g., 'P1Y2M3DT4H5M6S' 'P1W').
audience: The audience of the JWT token, should be available by JWT service configuration (default is 'appmesh-service').
Returns:
TOTP challenge string if the server responds with HTTP 428 and no code was supplied,
otherwise ``None``. On success, the session token/cookie is updated and auto-refresh
starts when enabled for this client.
"""
# Standard App Mesh authentication
self.session.cookies.clear()
credentials = f"{username}:{password}".encode()
headers = {
self._HTTP_HEADER_KEY_AUTH: f"Basic {base64.b64encode(credentials).decode()}",
self._HTTP_HEADER_JWT_SET_COOKIE: "true", # Enable cookie token mode
"X-Expire-Seconds": str(self._parse_duration(token_expire)),
}
if audience:
headers["X-Audience"] = audience
if totp_code:
headers["X-Totp-Code"] = totp_code
resp = self._request_http(
AppMeshClient._Method.POST,
path="/appmesh/login",
header=headers,
)
if resp.status_code == HTTPStatus.PRECONDITION_REQUIRED:
# TOTP required (HTTP 428)
if "totp_challenge" in resp.json():
challenge = resp.json()["totp_challenge"]
if not totp_code:
return challenge
self.validate_totp(username, challenge, totp_code, token_expire)
[docs]
def validate_totp(
self, username: str, challenge: str, code: str, token_expire: Union[int, str] = _DURATION_ONE_WEEK_ISO
) -> None:
"""Validate TOTP challenge and obtain a new JWT token.
Args:
username: Username to validate.
challenge: Challenge string from server.
code: TOTP code to validate.
token_expire: Token expiration duration, defaults to `_DURATION_ONE_WEEK_ISO` (1 week).
Accepts either:
- **ISO 8601 duration string** (e.g., `'P1Y2M3DT4H5M6S'`, `'P1W'`)
- **Numeric value (seconds)** for simpler cases.
"""
body = {
"user_name": username,
"totp_code": code,
"totp_challenge": challenge,
"expire_seconds": self._parse_duration(token_expire),
}
headers = {self._HTTP_HEADER_JWT_SET_COOKIE: "true"}
self._request_http(
AppMeshClient._Method.POST,
path="/appmesh/totp/validate",
body=body,
header=headers,
)
[docs]
def logout(self) -> bool:
"""Logout from the current session."""
jwt_token = self._get_access_token()
if not jwt_token or not isinstance(jwt_token, str):
return False
resp = self._request_http(AppMeshClient._Method.POST, path="/appmesh/self/logoff", raise_on_fail=False)
if resp.status_code != HTTPStatus.OK:
logger.warning("Failed to logout: %s", resp.text)
return False
self.stop_token_refresh()
return True
[docs]
def authentication(
self, token: str, permission: Optional[str] = None, audience: Optional[str] = None, update_session: bool = True
) -> Tuple[bool, str]:
"""Deprecated: Use authenticate() instead."""
warnings.warn("authentication() is deprecated, use authenticate() instead", DeprecationWarning, stacklevel=2)
return self.authenticate(token, permission, audience, update_session)
[docs]
def authenticate(
self, token: str, permission: Optional[str] = None, audience: Optional[str] = None, update_session: bool = True
) -> Tuple[bool, str]:
"""Verify the provided JWT token with the server and optionally update the client session.
Args:
token: JWT token to verify.
permission: Optional permission ID to check (e.g., 'app-view', 'app-delete').
audience: Optional audience value to verify against the token.
update_session: When ``True``, update the current client session with the verified
token and persist local token state on success. When ``False``, only verify the
provided token and leave local state unchanged.
Returns:
Tuple of ``(success, message)`` where ``message`` is the raw response text.
"""
# Header auth token takes priority over cookie token
headers = {self._HTTP_HEADER_KEY_AUTH: f"Bearer {token}"}
if audience:
headers["X-Audience"] = audience
if permission:
headers["X-Permission"] = permission
if update_session:
headers[self._HTTP_HEADER_JWT_SET_COOKIE] = "true"
resp = self._request_http(AppMeshClient._Method.POST, path="/appmesh/auth", header=headers, raise_on_fail=False)
return resp.status_code == HTTPStatus.OK, resp.text
[docs]
def set_token(self, token: str) -> None:
"""Set a JWT token directly without server-side verification.
Use when the token is already known to be valid.
For server-side verification, use authenticate() instead.
Args:
token: A valid JWT token string. The token is stored in the client's cookie jar and
persisted immediately when `cookie_file` is configured.
"""
# CookieJar.set_cookie() is a base class method, works for both
# RequestsCookieJar (in-memory) and MozillaCookieJar (file-backed)
cookie = Cookie(
version=0,
name=self._COOKIE_TOKEN,
value=token,
port=None,
port_specified=False,
domain="",
domain_specified=False,
domain_initial_dot=False,
path="/",
path_specified=True,
secure=False,
expires=int(time.time()) + 86400 * 7,
discard=False,
comment=None,
comment_url=None,
rest={},
rfc2109=False,
)
self.session.cookies.set_cookie(cookie)
self._on_token_changed(token)
[docs]
def renew_token(self, token_expire: Union[int, str] = _DURATION_ONE_WEEK_ISO) -> None:
"""Renew the current JWT token.
Args:
token_expire: Token expiration duration (integer seconds or ISO 8601 string).
"""
jwt_token = self._get_access_token()
if not jwt_token:
raise AppMeshAuthError("No token to renew")
if not isinstance(jwt_token, str):
raise AppMeshAuthError("Unsupported token format")
self._request_http(
AppMeshClient._Method.POST,
path="/appmesh/token/renew",
header={"X-Expire-Seconds": str(self._parse_duration(token_expire))},
)
[docs]
def get_totp_secret(self) -> str:
"""Return the raw TOTP secret for the current user.
The server responds with a base64-encoded OTP provisioning URI; this helper parses that
URI and returns only the ``secret`` field for QR-code or authenticator setup.
"""
resp = self._request_http(method=AppMeshClient._Method.POST, path="/appmesh/totp/secret")
totp_uri = base64.b64decode(resp.json()["mfa_uri"]).decode()
parsed_uri = self._parse_totp_uri(totp_uri)
secret = parsed_uri.get("secret")
if secret is None:
raise AppMeshAuthError("TOTP URI does not contain a 'secret' field")
return secret
[docs]
def enable_totp(self, totp_code: str) -> None:
"""Set up 2FA for the current user.
Args:
totp_code: TOTP code.
"""
self._request_http(
method=AppMeshClient._Method.POST,
path="/appmesh/totp/setup",
header={"X-Totp-Code": totp_code},
)
[docs]
def disable_totp(self, user: str = "self") -> None:
"""Disable 2FA for the specified user."""
self._request_http(method=AppMeshClient._Method.POST, path=f"/appmesh/totp/{user}/disable")
@staticmethod
def _parse_totp_uri(totp_uri: str) -> dict:
"""Extract TOTP parameters from URI."""
parsed_info = {}
parsed_uri = parse.urlparse(totp_uri)
# Extract label from the path
parsed_info["label"] = parsed_uri.path[1:] # Remove leading slash
# Extract parameters from the query string
query_params = parse.parse_qs(parsed_uri.query)
for key, value in query_params.items():
parsed_info[key] = value[0]
return parsed_info
########################################
# Application view
########################################
[docs]
def get_app(self, app_name: str) -> App:
"""Get information about a specific application."""
resp = self._request_http(AppMeshClient._Method.GET, path=f"/appmesh/app/{app_name}")
return App(resp.json())
[docs]
def list_apps(self) -> List[App]:
"""Get information about all applications."""
resp = self._request_http(AppMeshClient._Method.GET, path="/appmesh/applications")
return [App(app) for app in resp.json()]
[docs]
def get_app_output(
self,
app_name: str,
stdout_position: int = 0,
stdout_index: int = 0,
stdout_maxsize: int = 10240,
process_uuid: str = "",
timeout: int = 0,
) -> AppOutput:
"""Get incremental stdout/stderr output for a running or completed application.
Args:
app_name: the application name
stdout_position: start read position, 0 means start from beginning.
stdout_index: index of history process stdout, 0 means get from current running process,
the stdout number depends on 'stdout_cache_size' of the application.
stdout_maxsize: max buffer size to read.
process_uuid: used to get the specified process instance instead of the latest one.
timeout: long-poll wait time in seconds before returning when no new output is available.
Returns:
``AppOutput`` containing response status, payload text, the next read cursor
(``out_position``), and ``exit_code`` when the process has already finished.
"""
resp = self._request_http(
AppMeshClient._Method.GET,
path=f"/appmesh/app/{app_name}/output",
query={
**({"stdout_position": str(stdout_position)} if stdout_position != 0 else {}),
**({"stdout_index": str(stdout_index)} if stdout_index != 0 else {}),
**({"stdout_maxsize": str(stdout_maxsize)} if stdout_maxsize != 0 else {}),
**({"process_uuid": process_uuid} if process_uuid != "" else {}),
**({"timeout": str(timeout)} if timeout != 0 else {}),
},
raise_on_fail=False,
)
out_position = int(resp.headers["X-Output-Position"]) if "X-Output-Position" in resp.headers else None
exit_code = int(resp.headers["X-Exit-Code"]) if "X-Exit-Code" in resp.headers else None
return AppOutput(status_code=resp.status_code, output=resp.text, out_position=out_position, exit_code=exit_code)
[docs]
def check_app_health(self, app_name: str) -> bool:
"""Check the health status of an application."""
resp = self._request_http(AppMeshClient._Method.GET, path=f"/appmesh/app/{app_name}/health")
return int(resp.text) == 0
########################################
# Application manage
########################################
[docs]
def add_app(self, app: App, subscribe_events: Optional[List[str]] = None) -> App:
"""Register a new application.
``subscribe_events`` only takes effect on a persistent connection (TCP/WSS) and is
silently ignored by the HTTP transport (no demuxer to deliver events to). When the
daemon creates a subscription, the returned App carries ``subscription_id``.
"""
query = {"subscribe_events": ",".join(subscribe_events)} if subscribe_events else None
resp = self._request_http(AppMeshClient._Method.PUT, path=f"/appmesh/app/{app.name}", query=query, body=app.to_dict())
return App(resp.json())
[docs]
def delete_app(self, app_name: str) -> bool:
"""Remove an application."""
resp = self._request_http(AppMeshClient._Method.DELETE, path=f"/appmesh/app/{app_name}", raise_on_fail=False)
if resp.status_code == HTTPStatus.OK:
return True
if resp.status_code == HTTPStatus.NOT_FOUND:
return False
logger.warning("Failed to delete app: %s", resp.text)
resp.raise_for_status()
return False
[docs]
def enable_app(self, app_name: str) -> None:
"""Enable an application."""
self._request_http(AppMeshClient._Method.POST, path=f"/appmesh/app/{app_name}/enable")
[docs]
def disable_app(self, app_name: str) -> None:
"""Disable an application."""
self._request_http(AppMeshClient._Method.POST, path=f"/appmesh/app/{app_name}/disable")
########################################
# Configuration
########################################
[docs]
def get_host_resources(self) -> Dict[str, Any]:
"""Get a report of host resources including CPU, memory, and disk."""
resp = self._request_http(AppMeshClient._Method.GET, path="/appmesh/resources")
return resp.json()
[docs]
def get_config(self) -> Dict[str, Any]:
"""Get the App Mesh configuration in JSON format."""
resp = self._request_http(AppMeshClient._Method.GET, path="/appmesh/config")
return resp.json()
[docs]
def set_config(self, config: dict) -> Dict[str, Any]:
"""Update the configuration."""
resp = self._request_http(AppMeshClient._Method.POST, path="/appmesh/config", body=config)
return resp.json()
[docs]
def set_log_level(self, level: str = "DEBUG") -> str:
"""Update the log level."""
config_dict = self.set_config(config={"BaseConfig": {"LogLevel": level}})
return config_dict["BaseConfig"]["LogLevel"]
########################################
# User Management
########################################
[docs]
def update_password(self, old_password: str, new_password: str, username: str = "self") -> None:
"""Change the password of a user."""
body = {
"old_password": base64.b64encode(old_password.encode()).decode(),
"new_password": base64.b64encode(new_password.encode()).decode(),
}
self._request_http(method=AppMeshClient._Method.POST, path=f"/appmesh/user/{username}/passwd", body=body)
[docs]
def add_user(self, username: str, user_data: dict) -> None:
"""Add a new user."""
self._request_http(method=AppMeshClient._Method.PUT, path=f"/appmesh/user/{username}", body=user_data)
[docs]
def delete_user(self, username: str) -> None:
"""Delete a user."""
self._request_http(method=AppMeshClient._Method.DELETE, path=f"/appmesh/user/{username}")
[docs]
def lock_user(self, username: str) -> None:
"""Lock a user."""
self._request_http(method=AppMeshClient._Method.POST, path=f"/appmesh/user/{username}/lock")
[docs]
def unlock_user(self, username: str) -> None:
"""Unlock a user."""
self._request_http(method=AppMeshClient._Method.POST, path=f"/appmesh/user/{username}/unlock")
[docs]
def list_users(self) -> Dict[str, Any]:
"""Get information about all users."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/users")
return resp.json()
[docs]
def get_current_user(self) -> dict:
"""Get information about the current user."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/user/self")
return resp.json()
[docs]
def list_groups(self) -> List[str]:
"""Get information about all user groups."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/user/groups")
return resp.json()
[docs]
def list_permissions(self) -> List[str]:
"""Get information about all available permissions."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/permissions")
return resp.json()
[docs]
def get_user_permissions(self) -> List[str]:
"""Get information about the permissions of the current user."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/user/permissions")
return resp.json()
[docs]
def list_roles(self) -> Dict[str, Dict]:
"""Get information about all roles with permission definitions."""
resp = self._request_http(method=AppMeshClient._Method.GET, path="/appmesh/roles")
return resp.json()
[docs]
def update_role(self, role_name: str, permission_set: list) -> None:
"""Update or add a role with defined permissions."""
self._request_http(method=AppMeshClient._Method.POST, path=f"/appmesh/role/{role_name}", body=permission_set)
[docs]
def delete_role(self, role_name: str) -> None:
"""Delete a user role."""
self._request_http(method=AppMeshClient._Method.DELETE, path=f"/appmesh/role/{role_name}")
########################################
# Label management
########################################
[docs]
def add_label(self, label_name: str, label_value: str) -> None:
"""Add a new label."""
self._request_http(AppMeshClient._Method.PUT, query={"value": label_value}, path=f"/appmesh/label/{label_name}")
[docs]
def delete_label(self, label_name: str) -> None:
"""Delete a label."""
self._request_http(AppMeshClient._Method.DELETE, path=f"/appmesh/label/{label_name}")
[docs]
def list_labels(self) -> Dict[str, str]:
"""Get information about all labels."""
resp = self._request_http(AppMeshClient._Method.GET, path="/appmesh/labels")
return resp.json()
########################################
# Prometheus metrics
########################################
[docs]
def get_metrics(self) -> str:
"""Get Prometheus metrics."""
resp = self._request_http(AppMeshClient._Method.GET, path="/appmesh/metrics")
return resp.text
########################################
# File management
########################################
@staticmethod
def _apply_file_attributes(local_path: Path, headers: CaseInsensitiveDict) -> None:
"""
Apply file attributes from headers to local file.
Expected headers: X-File-Mode (decimal str), X-File-User (username), X-File-Group (groupname).
"""
if sys.platform == "win32":
return
headers = CaseInsensitiveDict(headers or {})
# Ownership by name - apply FIRST, as chown clears setuid/setgid bits
user_header = headers.get("X-File-User")
group_header = headers.get("X-File-Group")
if user_header is not None and group_header is not None:
chown = getattr(os, "chown", None)
if callable(chown):
try:
import pwd
import grp
# Try to resolve as username/groupname first
try:
uid = pwd.getpwnam(user_header).pw_uid
except KeyError:
# Fall back to numeric UID if name lookup fails
try:
uid = int(user_header)
except ValueError:
uid = None
try:
gid = grp.getgrnam(group_header).gr_gid
except KeyError:
# Fall back to numeric GID if name lookup fails
try:
gid = int(group_header)
except ValueError:
gid = None
if uid is not None and gid is not None:
with suppress(OSError):
chown(str(local_path), uid, gid)
else:
logger.warning("Could not resolve X-File-User/Group: %s/%s", user_header, group_header)
except (ValueError, KeyError) as e:
logger.warning("Invalid X-File-User/Group values: %s/%s - %s", user_header, group_header, e)
# Mode - apply AFTER chown to preserve permission bits
if "X-File-Mode" in headers:
try:
file_mode = int(headers["X-File-Mode"])
# Validate mode is within valid range (0-511 for 0o777)
if 0 <= file_mode <= 0o777:
with suppress(OSError):
local_path.chmod(file_mode)
else:
logger.warning("X-File-Mode value out of range: %s", file_mode)
except ValueError:
logger.warning("Invalid X-File-Mode value: %s", headers.get("X-File-Mode"))
@staticmethod
def _get_file_attributes(local_path: Path) -> dict:
"""Get file attributes as header dictionary."""
if sys.platform == "win32":
return {}
try:
import pwd
import grp
st = local_path.stat()
result = {
"X-File-Mode": str(st.st_mode & 0o777), # Mask to keep only permission bits
}
# Get username/groupname for portability
try:
result["X-File-User"] = pwd.getpwuid(st.st_uid).pw_name
except KeyError:
# User not found, fall back to UID
result["X-File-User"] = str(st.st_uid)
try:
result["X-File-Group"] = grp.getgrgid(st.st_gid).gr_name
except KeyError:
# Group not found, fall back to GID
result["X-File-Group"] = str(st.st_gid)
return result
except OSError:
return {}
[docs]
def download_file(self, remote_file: str, local_file: str, preserve_permissions: bool = True) -> None:
"""Download a remote file to the local filesystem.
When ``preserve_permissions`` is ``True``, POSIX mode/owner/group metadata from App Mesh
response headers is applied best-effort on non-Windows platforms.
"""
resp = self._request_http(
AppMeshClient._Method.GET,
path="/appmesh/file/download",
header={self._HTTP_HEADER_KEY_X_FILE_PATH: remote_file},
)
# Write the file content locally
local_path = Path(local_file)
with local_path.open("wb") as fp:
for chunk in resp.iter_content(chunk_size=8 * 1024):
if chunk:
fp.write(chunk)
if preserve_permissions:
self._apply_file_attributes(local_path, resp.headers)
[docs]
def upload_file(self, local_file: str, remote_file: str, preserve_permissions: bool = True) -> None:
"""Upload a local file to the remote server.
When ``preserve_permissions`` is ``True``, the client also sends local POSIX metadata
in request headers so the server can recreate permissions/ownership when supported.
"""
local_path = Path(local_file)
if not local_path.exists():
raise FileNotFoundError(f"Local file not found: {local_file}")
from requests_toolbelt import MultipartEncoder
with local_path.open("rb") as fp:
encoder = MultipartEncoder(
fields={"filename": os.path.basename(remote_file), "file": ("filename", fp, "application/octet-stream")}
)
header = {self._HTTP_HEADER_KEY_X_FILE_PATH: parse.quote(remote_file), "Content-Type": encoder.content_type}
if preserve_permissions:
header.update(self._get_file_attributes(local_path))
# Upload file with or without attributes
# https://stackoverflow.com/questions/22567306/python-requests-file-upload
self._request_http(
AppMeshClient._Method.POST_STREAM, path="/appmesh/file/upload", header=header, body=encoder
)
########################################
# Application run
########################################
@staticmethod
def _parse_duration(timeout: Union[int, str]) -> int:
"""Parse duration from int or ISO 8601 string."""
if isinstance(timeout, int):
return timeout
if isinstance(timeout, str):
return int(aniso8601.parse_duration(timeout).total_seconds())
raise TypeError(f"Invalid timeout type: {timeout}")
[docs]
def run_task(self, app_name: str, data: str, timeout: int = 300) -> str:
"""Client send an invocation message to a running App Mesh application and wait for result.
This method posts the provided `data` to the App Mesh service which will
forward it to the specified running application instance.
Args:
app_name: Name of the target application (as registered in App Mesh).
data: Payload to deliver to the application. Typically a string.
timeout: Maximum time in seconds to wait for a response from the application. Defaults to 300 seconds.
Returns:
str: The HTTP response body returned by the remote application/service.
"""
resp = self._request_http(
AppMeshClient._Method.POST,
path=f"/appmesh/app/{app_name}/task",
body=data,
query={"timeout": str(timeout)},
)
return resp.text
[docs]
def cancel_task(self, app_name: str) -> bool:
"""Cancel a running task for an App Mesh application.
Args:
app_name: Name of the target application (as registered in App Mesh).
Returns:
bool: Task exist and cancelled status.
"""
resp = self._request_http(
AppMeshClient._Method.DELETE, path=f"/appmesh/app/{app_name}/task", raise_on_fail=False
)
return resp.status_code == HTTPStatus.OK
[docs]
def run_app_async(
self,
app: Union[App, str],
max_time: Union[int, str] = _DURATION_TWO_DAYS_ISO,
lifecycle: Union[int, str] = _DURATION_TWO_DAYS_HALF_ISO,
) -> AppRun:
"""Run an application asynchronously on a remote system without blocking the API.
Args:
app: An `App` instance or a shell command string.
- If `app` is a string, it is treated as a shell command for the remote run,
and an `App` instance is created as:
`App({"command": "<command_string>", "shell": True})`.
- If `app` is an `App` object, providing only the `name` attribute (without
a command) will run an existing application; otherwise, it is treated as a new application.
max_time: Maximum runtime for the remote process.
Accepts integer seconds or ISO 8601 duration format (e.g., 'P1Y2M3DT4H5M6S', 'P5W'). Defaults to `P2D`.
lifecycle: Maximum lifecycle time for the remote process.
Accepts integer seconds or ISO 8601 duration format. Defaults to `P2DT12H`.
Returns:
``AppRun`` handle that captures the current ``forward_to`` target so later polling can
continue against the same cluster node.
"""
if isinstance(app, str):
app = App({"command": app, "shell": True})
resp = self._request_http(
AppMeshClient._Method.POST,
body=app.to_dict(),
path="/appmesh/app/run",
query={
"timeout": str(self._parse_duration(max_time)),
"lifecycle": str(self._parse_duration(lifecycle)),
},
)
response_data = resp.json()
return AppRun(self, response_data["name"], response_data["process_uuid"])
[docs]
def wait_for_async_run(self, run: AppRun, print_stdout: bool = True, timeout: int = 0) -> Optional[int]:
"""Wait for an asynchronous run to finish.
Args:
run: asyncrized run result from run_async().
print_stdout: print remote stdout to local or not.
timeout : wait max timeout seconds and return if not finished, 0 means wait until finished
Returns:
Exit code if the process finished, otherwise ``None`` on timeout or polling failure.
On success, this method also makes a best-effort attempt to delete the temporary run app.
"""
if not run:
return None
last_output_position = 0
start = datetime.now()
interval = self._POLL_INTERVAL
while run.proc_uid:
app_out = self.get_app_output(
app_name=run.app_name,
stdout_position=last_output_position,
stdout_index=0,
process_uuid=run.proc_uid,
timeout=interval,
)
if app_out.output and print_stdout:
print(app_out.output, end="", flush=True)
if app_out.out_position is not None:
last_output_position = app_out.out_position
if app_out.exit_code is not None:
# success
with suppress(Exception):
self.delete_app(run.app_name)
return app_out.exit_code
if app_out.status_code != HTTPStatus.OK:
# failed
break
if timeout > 0 and (datetime.now() - start).seconds > timeout:
# timeout
break
return None
[docs]
def run_app_sync(
self,
app: Union[App, str],
max_time: Union[int, str] = _DURATION_TWO_DAYS_ISO,
lifecycle: Union[int, str] = _DURATION_TWO_DAYS_HALF_ISO,
) -> Tuple[Union[int, None], str]:
"""Synchronously run an application remotely, blocking until completion, and return the result.
If 'app' is a string, it is treated as a shell command and converted to an App instance.
If 'app' is App object, the name attribute is used to run an existing application if specified.
Args:
app: An App instance or a shell command string.
If a string, an App instance is created as:
`appmesh.App({"command": "<command_string>", "shell": True})`
max_time: Maximum runtime for the remote process.
Accepts integer seconds or ISO 8601 duration format (e.g., 'P1Y2M3DT4H5M6S', 'P5W').
lifecycle: Maximum lifecycle time for the remote process.
Accepts integer seconds or ISO 8601 duration format.
Returns:
``(exit_code, stdout_text)``. ``exit_code`` is ``None`` when the server did not return
an ``X-Exit-Code`` header.
"""
if isinstance(app, str):
app = App({"command": app, "shell": True})
resp = self._request_http(
AppMeshClient._Method.POST,
body=app.to_dict(),
path="/appmesh/app/syncrun",
query={
"timeout": str(self._parse_duration(max_time)),
"lifecycle": str(self._parse_duration(lifecycle)),
},
raise_on_fail=False,
)
exit_code = None
if resp.status_code == HTTPStatus.OK:
if "X-Exit-Code" in resp.headers:
exit_code = int(resp.headers["X-Exit-Code"])
return exit_code, resp.text
def _request_http(
self,
method: _Method,
path: str,
query: Optional[dict] = None,
header: Optional[dict] = None,
body=None,
raise_on_fail: bool = True,
) -> requests.Response:
"""Make an HTTP request."""
url = parse.urljoin(self.base_url, path)
# Prepare headers
headers = header.copy() if header else {}
has_explicit_auth = self._HTTP_HEADER_KEY_AUTH in headers
has_explicit_csrf = self._HTTP_HEADER_NAME_CSRF_TOKEN in headers
if not has_explicit_auth and not has_explicit_csrf:
csrf_token = self._get_cookie_value(self.session.cookies, self._COOKIE_CSRF_TOKEN)
if csrf_token:
# appmesh token
headers[self._HTTP_HEADER_NAME_CSRF_TOKEN] = csrf_token
else:
# OAuth token
access_token = self._get_access_token()
if access_token:
headers[self._HTTP_HEADER_KEY_AUTH] = f"Bearer {access_token}"
if self.forward_to:
target_host = self.forward_to
if ":" not in target_host:
parsed = parse.urlsplit(self.base_url)
default_port = {"http": 80, "https": 443}.get(parsed.scheme)
port = parsed.port or default_port
target_host = f"{target_host}:{port}"
headers[self._HTTP_HEADER_KEY_X_TARGET_HOST] = target_host
headers[self._HTTP_HEADER_KEY_USER_AGENT] = self._HTTP_USER_AGENT
# Convert body to JSON string if it's a dict or list
if isinstance(body, (dict, list)):
body = json.dumps(body)
headers.setdefault("Content-Type", "application/json")
try:
# Snapshot token before request for change detection
old_token = self._get_access_token()
request_kwargs = {
"url": url,
"headers": headers,
"cert": self.ssl_client_cert,
"verify": self.ssl_verify,
"timeout": self.request_timeout,
}
if method == AppMeshClient._Method.GET:
resp = self.session.get(params=query, **request_kwargs)
elif method == AppMeshClient._Method.POST:
resp = self.session.post(params=query, data=body, **request_kwargs)
elif method == AppMeshClient._Method.POST_STREAM:
resp = self.session.post(params=query, data=body, stream=True, **request_kwargs)
elif method == AppMeshClient._Method.DELETE:
resp = self.session.delete(**request_kwargs)
elif method == AppMeshClient._Method.PUT:
resp = self.session.put(params=query, data=body, **request_kwargs)
else:
raise AppMeshRequestError(f"Invalid http method: {method}")
if raise_on_fail and resp.status_code != HTTPStatus.PRECONDITION_REQUIRED:
if resp.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
raise AppMeshAuthError(f"HTTP {resp.status_code}: {resp.reason}")
resp.raise_for_status()
# Auto-detect token changes from server Set-Cookie responses
new_token = self._get_access_token()
if new_token != old_token:
self._on_token_changed(new_token)
# Wrap the response for encoding handling
return AppMeshClient._EncodingResponse(resp)
except requests.exceptions.RequestException as e:
raise AppMeshRequestError(f"HTTP request failed: {e}") from e