# app.py
"""Application definition"""
import json
import copy
from datetime import datetime
from typing import Optional, Any, Dict
from enum import Enum, unique
def _get_str(data: Optional[dict], key: str) -> Optional[str]:
"""Retrieve a string value from a dictionary by key, if it exists and is a valid string."""
if not data or key not in data:
return None
value = data[key]
return value if value and isinstance(value, str) else None
def _get_int(data: Optional[dict], key: str) -> Optional[int]:
"""Retrieve an integer value from a dictionary by key, if it exists and is a valid integer."""
if not data or key not in data or data[key] is None:
return None
value = data[key]
if isinstance(value, int):
return value
if isinstance(value, str) and value.isdigit():
return int(value)
return None
def _get_number(data: Optional[dict], key: str):
"""Retrieve a numeric value (int or float) from a dictionary by key."""
if not data or key not in data or data[key] is None:
return None
value = data[key]
if isinstance(value, (int, float)):
return value
return None
def _get_bool(data: Optional[dict], key: str) -> Optional[bool]:
"""Retrieve a boolean value from a dictionary by key, if it exists and is boolean-like."""
if not data or key not in data or data[key] is None:
return None
return bool(data[key])
def _get_item(data: Optional[dict], key: str) -> Optional[Any]:
"""Retrieve a deep copy of a value from a dictionary by key, if it exists."""
if not data or key not in data or data[key] is None:
return None
return copy.deepcopy(data[key])
[docs]
class App:
"""
An application in App Mesh, include all the process attributes,
resource limitations, behaviors, and permissions.
"""
[docs]
@unique
class Permission(Enum):
"""Application permission levels."""
DENY = "1"
READ = "2"
WRITE = "3"
[docs]
class Behavior:
"""
Application error handling behavior, including exit and control behaviors.
"""
[docs]
@unique
class Action(Enum):
"""Actions for application exit behaviors."""
RESTART = "restart"
STANDBY = "standby"
KEEPALIVE = "keepalive"
REMOVE = "remove"
def __init__(self, data: Optional[dict] = None) -> None:
if isinstance(data, (str, bytes, bytearray)):
data = json.loads(data)
self.exit = _get_str(data, "exit")
"""Default exit behavior, options: 'restart', 'standby', 'keepalive', 'remove'."""
self.control = _get_item(data, "control") or {}
"""Exit code specific behavior (e.g, --control 0:restart --control 1:standby), higher priority than default exit behavior"""
[docs]
def set_exit_behavior(self, action: "App.Behavior.Action") -> None:
"""Set default behavior for application exit."""
self.exit = action.value
[docs]
def set_control_behavior(self, control_code: int, action: "App.Behavior.Action") -> None:
"""Define behavior for specific exit codes."""
self.control[str(control_code)] = action.value
[docs]
class DailyLimitation:
"""
Application availability within a daily time range.
"""
def __init__(self, data: Optional[dict] = None) -> None:
if isinstance(data, (str, bytes, bytearray)):
data = json.loads(data)
self.daily_start = _get_int(data, "daily_start")
"""Start time for application availability (e.g., 09:00:00+08)."""
self.daily_end = _get_int(data, "daily_end")
"""End time for application availability (e.g., 09:00:00+08)."""
[docs]
def set_daily_range(self, start: datetime, end: datetime) -> None:
"""Set the valid daily start and end times."""
self.daily_start = int(start.timestamp())
self.daily_end = int(end.timestamp())
[docs]
class ResourceLimitation:
"""
Application resource limits, such as CPU and memory usage.
"""
def __init__(self, data: Optional[dict] = None) -> None:
if isinstance(data, (str, bytes, bytearray)):
data = json.loads(data)
self.cpu_shares = _get_int(data, "cpu_shares")
"""CPU shares, relative weight of CPU usage."""
self.memory_mb = _get_int(data, "memory_mb")
"""Physical memory limit in MB."""
self.memory_virt_mb = _get_int(data, "memory_virt_mb")
"""Virtual memory limit in MB."""
def __init__(self, data: Optional[dict] = None) -> None:
"""Initialize an App instance with optional configuration data."""
if isinstance(data, (str, bytes, bytearray)):
data = json.loads(data)
# Application configuration
self.name = _get_str(data, "name")
"""app name (unique)"""
self.command = _get_str(data, "command")
"""full command line with arguments"""
self.shell = _get_bool(data, "shell")
"""Whether run command in shell mode (enables shell syntax such as pipes and compound commands)"""
self.session_login = _get_bool(data, "session_login")
"""Whether to run the app in session login mode (inheriting the user's full login environment)"""
self.description = _get_str(data, "description")
"""app description string"""
self.metadata = _get_item(data, "metadata")
"""metadata string/JSON (input for app, pass to process stdin)"""
self.working_dir = _get_str(data, "working_dir")
"""working directory"""
self.status = _get_int(data, "status")
"""app status: 1 for enabled, 0 for disabled"""
self.docker_image = _get_str(data, "docker_image")
"""Docker image for containerized execution"""
self.stdout_cache_num = _get_int(data, "stdout_cache_num")
"""maximum number of stdout log files to retain"""
self.start_time = _get_int(data, "start_time")
"""start date time for app (ISO8601 time format, e.g., '2020-10-11T09:22:05')"""
self.end_time = _get_int(data, "end_time")
"""end date time for app (ISO8601 time format, e.g., '2020-10-11T10:22:05')"""
self.start_interval_seconds = _get_item(data, "start_interval_seconds")
"""start interval seconds for short running app, support integer seconds, ISO 8601 durations and cron expression (e.g., 30, 'P1Y2M3DT4H5M6S', 'P5W', '* */5 * * * *')"""
self.cron = _get_bool(data, "cron")
"""Whether the interval is specified as a cron expression"""
self.daily_limitation = App.DailyLimitation(_get_item(data, "daily_limitation"))
self.retention = _get_str(data, "retention")
"""extra timeout seconds for stopping current process, support ISO 8601 durations (e.g., 'P1Y2M3DT4H5M6S' 'P5W')."""
self.health_check_cmd = _get_str(data, "health_check_cmd")
"""health check script command (e.g., sh -x 'curl host:port/health', return 0 is health)"""
self.permission = _get_int(data, "permission")
"""app user permission, value is 2 bit integer: [group & other], each bit can be deny:1, read:2, write: 3."""
self.behavior = App.Behavior(_get_item(data, "behavior"))
self.env = data.get("env", {}) if data else {}
"""environment variables (e.g., -e env1=value1 -e env2=value2, APP_DOCKER_OPTS is used to input docker run parameters)"""
self.sec_env = data.get("sec_env", {}) if data else {}
"""security environment variables, encrypt in server side with app owner's cipher"""
self.pid = _get_int(data, "pid")
"""process id used to attach to the running process"""
self.resource_limit = App.ResourceLimitation(_get_item(data, "resource_limit"))
# Read-only attributes
self.register_time = _get_int(data, "register_time")
"""app register time"""
self.starts = _get_int(data, "starts")
"""number of times started"""
self.owner = _get_str(data, "owner")
"""owner name of app mesh user who created the app"""
self.user = _get_str(data, "pid_user")
"""process OS user name"""
self.pstree = _get_str(data, "pstree")
"""process tree"""
self.container_id = _get_str(data, "container_id")
"""docker container id"""
self.memory = _get_int(data, "memory")
"""memory usage"""
self.cpu = _get_number(data, "cpu")
"""cpu usage"""
self.fd = _get_int(data, "fd")
"""file descriptor usage"""
self.stdout_cache_size = _get_int(data, "stdout_cache_size")
"""number of stdout log files currently retained"""
self.last_start_time = _get_int(data, "last_start_time")
"""last start time"""
self.last_exit_time = _get_int(data, "last_exit_time")
"""last exit time"""
self.last_error = _get_str(data, "last_error")
"""last error message"""
self.next_start_time = _get_int(data, "next_start_time")
"""next start time"""
self.health = _get_int(data, "health")
"""health status: 0 for healthy, 1 for unhealthy"""
self.version = _get_int(data, "version")
"""app version"""
self.return_code = _get_int(data, "return_code")
"""last process exit code"""
self.task_id = _get_int(data, "task_id")
"""current task id"""
self.task_status = _get_str(data, "task_status")
"""task status"""
self.subscription_id = _get_str(data, "subscription_id")
"""subscription id returned by the daemon when add_app is called atomically with subscribe_events on a TCP/WSS client; empty for HTTP or when no subscribe_events was supplied"""
[docs]
def set_valid_time(self, start: Optional[datetime], end: Optional[datetime]) -> None:
"""Define the valid time window for the application."""
self.start_time = int(start.timestamp()) if start else None
self.end_time = int(end.timestamp()) if end else None
[docs]
def set_env(self, key: str, value: str, secure: bool = False) -> None:
"""Set an environment variable, marking it secure if specified."""
target = self.sec_env if secure else self.env
target[key] = value
[docs]
def set_permission(self, group_user: Permission, others_user: Permission) -> None:
"""Define application permissions based on user roles."""
self.permission = int(others_user.value) * 10 + int(group_user.value)
def __str__(self) -> str:
"""Return a JSON string representation of the application."""
return json.dumps(self.to_dict())
[docs]
def to_dict(self) -> Dict[str, Any]:
"""Convert the application data into a JSON-compatible dictionary, removing empty items."""
output = copy.deepcopy(self.__dict__)
output["behavior"] = self.behavior.__dict__
output["daily_limitation"] = self.daily_limitation.__dict__
output["resource_limit"] = self.resource_limit.__dict__
self._clean_empty(output)
return output
@staticmethod
def _clean_empty(data: dict) -> None:
"""Recursively remove None, empty string, and empty dict values from nested dictionaries (except 'metadata')."""
keys_to_delete = []
for key, value in data.items():
if isinstance(value, dict) and key != "metadata":
App._clean_empty(value)
if not value:
keys_to_delete.append(key)
elif value in (None, "", {}):
keys_to_delete.append(key)
for key in keys_to_delete:
del data[key]