Source code for appmesh.wss_transport

# wss_transport.py
"""WebSocket Secure (WSS) Transport layer handling WebSocket connections."""
__all__ = ["WSSTransport"]

import ssl
import socket
from pathlib import Path
from typing import Optional, Union, Tuple

from .exceptions import AppMeshConnectionError, AppMeshTimeoutError

try:
    from websocket import create_connection, WebSocketException, WebSocketTimeoutException
except ImportError as exc:
    raise ImportError(
        "websocket-client library is required for WSS support. Install it with: pip install websocket-client"
    ) from exc


[docs] class WSSTransport: """WebSocket Secure (WSS) Transport layer with TLS support using synchronous websocket-client library.""" # Maximum message size: 100 MB WSS_MAX_BLOCK_SIZE = 100 * 1024 * 1024 # Default connect timeout in seconds WSS_CONNECT_TIMEOUT = 30 # Default message timeout in seconds WSS_MESSAGE_TIMEOUT = 60 def __init__( self, address: Tuple[str, int], ssl_verify: Union[bool, str], ssl_client_cert: Optional[Union[str, Tuple[str, str]]] = None, ): """ Initialize WebSocket Secure (WSS) transport with TLS configuration. Args: address: Server address as (host, port) tuple. ssl_verify: SSL server verification mode: - True: Use system CA certificates - False: Disable verification (insecure) - str: Path to custom CA bundle or directory ssl_client_cert: SSL client certificate: - str: Path to PEM file with cert and key - tuple: (cert_path, key_path) Note: This implementation uses synchronous blocking sockets for WebSocket connections. No threading or asyncio is involved for simplicity and reliability. """ self.address = address self.ssl_verify = ssl_verify self.ssl_client_cert = ssl_client_cert self._websocket = None self._connect_timeout = self.WSS_CONNECT_TIMEOUT self._message_timeout = self.WSS_MESSAGE_TIMEOUT def __enter__(self): """Context manager entry.""" if not self.connected(): self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() def __del__(self): try: self.close() except Exception: # pylint: disable=broad-exception-caught pass # suppress all exceptions def __str__(self) -> str: """Return WSS URI representation.""" host, port = self.address scheme = "wss" return f"{scheme}://{host}:{port}"
[docs] def connect(self) -> None: """Establish WSS connection to server.""" if self.connected(): return uri = f"{self}/" sslopt: dict = {} # Prepare SSL options for websocket-client library if self.ssl_verify is not False: sslopt["cert_reqs"] = ssl.CERT_REQUIRED if isinstance(self.ssl_verify, str): # Use custom CA bundle or directory path = Path(self.ssl_verify) if path.is_file(): sslopt["ca_certs"] = str(path) elif path.is_dir(): sslopt["ca_certs"] = str(path) else: raise ValueError(f"ssl_verify path '{self.ssl_verify}' is invalid") else: # Disable verification (insecure) sslopt["cert_reqs"] = ssl.CERT_NONE sslopt["check_hostname"] = False # Add client certificate if provided if self.ssl_client_cert: if isinstance(self.ssl_client_cert, tuple): sslopt["certfile"] = self.ssl_client_cert[0] sslopt["keyfile"] = self.ssl_client_cert[1] else: sslopt["certfile"] = self.ssl_client_cert try: # Create WebSocket connection using websocket-client library self._websocket = create_connection( uri, timeout=self._connect_timeout, subprotocols=["appmesh-ws"], sslopt=sslopt if sslopt else None, ) # Set receive timeout for blocking recv calls self._websocket.settimeout(self._message_timeout) except (socket.timeout, socket.error, ssl.SSLError) as e: self._websocket = None raise AppMeshConnectionError(f"Failed to connect to {self.address}: {e}") from e
[docs] def close(self) -> None: """Close WebSocket connection.""" if self._websocket: try: self._websocket.close() except Exception: # pylint: disable=broad-exception-caught pass finally: self._websocket = None
[docs] def connected(self) -> bool: """Check if WebSocket is connected.""" if self._websocket is None: return False try: # For websocket-client library, check the connected property if hasattr(self._websocket, "connected"): return self._websocket.connected # Fallback: if property missing, assume disconnected to trigger reconnection return False except Exception: # pylint: disable=broad-exception-caught return False
[docs] def send_message(self, data: Union[bytes, bytearray, list]) -> None: """ Send a message over WebSocket. Args: data: Message data to send, or empty list for EOF signal. Note: WebSocket handles message framing automatically, so we don't need to add a length header. Just send msgpack-serialized data directly. """ if not self._websocket: raise AppMeshConnectionError("Cannot send message: not connected") try: # Convert empty list to empty bytes for EOF signal message_data = bytes(data) if data else b"" self._websocket.send_binary(message_data) except socket.timeout as e: self.close() raise AppMeshConnectionError(f"Message send timeout after {self._message_timeout}s") from e except (ConnectionError, WebSocketException) as e: self.close() raise AppMeshConnectionError(f"WebSocket connection closed: {e}") from e except Exception as e: # pylint: disable=broad-exception-caught self.close() raise AppMeshConnectionError(f"Error sending message: {e}") from e
[docs] def receive_message(self) -> Optional[bytearray]: """ Receive one application message from the WebSocket. Uses the high-level ``recv()`` API so that control frames (PING/PONG/CLOSE) are handled inside websocket-client — in particular, PING is auto-replied with PONG, which is what keeps long-idle subscribe connections alive against server-side ``idleTimeout``. Returns the data as bytearray, or an empty bytearray for EOF / non-data frames. """ if not self._websocket: raise AppMeshConnectionError("Cannot receive message: not connected") try: data = self._websocket.recv() if not data: return bytearray() if isinstance(data, str): return bytearray(data.encode("utf-8")) if isinstance(data, (bytes, bytearray)): return bytearray(data) raise TypeError(f"Unexpected data type from WebSocket: {type(data)}") except socket.timeout as e: # Idle timeout — connection is still healthy; do NOT close, let caller retry raise AppMeshTimeoutError(f"Message receive timeout after {self._message_timeout}s") from e except WebSocketTimeoutException as e: raise AppMeshTimeoutError(f"WebSocket timeout: {e}") from e except (ConnectionError, WebSocketException) as e: self.close() raise AppMeshConnectionError(f"WebSocket connection closed: {e}") from e except Exception as e: # pylint: disable=broad-exception-caught self.close() raise AppMeshConnectionError(f"Error receiving message: {e}") from e