From 56623f9804a43a9745680566e106c9ab989b23af Mon Sep 17 00:00:00 2001 From: sh4un <97253929+sh4un-dot-com@users.noreply.github.com> Date: Thu, 22 Jan 2026 21:40:34 -0500 Subject: [PATCH] Update CHANGELOG, enhance configuration loading with default values, and add tests for missing defaults --- ammb/__init__.py | 2 + ammb/api.py | 70 ++-- ammb/bridge.py | 137 ++++--- ammb/config_handler.py | 263 ++++++++----- ammb/health.py | 59 ++- ammb/meshcore_handler.py | 366 +++++++++++++------ ammb/meshtastic_handler.py | 304 ++++++++++----- ammb/message_logger.py | 55 ++- ammb/metrics.py | 39 +- ammb/mqtt_handler.py | 291 +++++++++++---- ammb/protocol.py | 118 ++++-- ammb/rate_limiter.py | 28 +- ammb/utils.py | 23 +- ammb/validator.py | 138 ++++--- examples/meshcore_simulator.py | 55 ++- run_bridge.py | 63 ++-- tests/__init__.py | 3 +- tests/conftest.py | 23 +- tests/test_config_handler.py | 17 +- tests/test_config_handler_missing_default.py | 6 +- tests/test_protocol.py | 61 +++- tmp_run_load.py | 11 +- tmp_test_config.py | 11 +- 23 files changed, 1469 insertions(+), 674 deletions(-) diff --git a/ammb/__init__.py b/ammb/__init__.py index e7d0b90..83a9c0d 100644 --- a/ammb/__init__.py +++ b/ammb/__init__.py @@ -5,4 +5,6 @@ Akita Meshtastic Meshcore Bridge (AMMB) Package. from .bridge import Bridge +__all__ = ["Bridge", "__version__"] + __version__ = "0.2.1" diff --git a/ammb/api.py b/ammb/api.py index df18599..6d289f6 100644 --- a/ammb/api.py +++ b/ammb/api.py @@ -3,15 +3,15 @@ REST API for monitoring and controlling the bridge. """ +import json import logging import threading -from http.server import HTTPServer, BaseHTTPRequestHandler -import json +from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Optional -from urllib.parse import urlparse, parse_qs +from urllib.parse import urlparse -from .metrics import get_metrics from .health import get_health_monitor +from .metrics import get_metrics class BridgeAPIHandler(BaseHTTPRequestHandler): @@ -29,16 +29,16 @@ class BridgeAPIHandler(BaseHTTPRequestHandler): def do_GET(self): """Handle GET requests.""" parsed_path = urlparse(self.path) - path = parsed_path.path.rstrip('/') + path = parsed_path.path.rstrip("/") try: - if path == '/api/health': + if path == "/api/health": self._handle_health() - elif path == '/api/metrics': + elif path == "/api/metrics": self._handle_metrics() - elif path == '/api/status': + elif path == "/api/status": self._handle_status() - elif path == '/api/info': + elif path == "/api/info": self._handle_info() else: self._send_response(404, {"error": "Not found"}) @@ -50,10 +50,10 @@ class BridgeAPIHandler(BaseHTTPRequestHandler): def do_POST(self): """Handle POST requests.""" parsed_path = urlparse(self.path) - path = parsed_path.path.rstrip('/') + path = parsed_path.path.rstrip("/") try: - if path == '/api/control': + if path == "/api/control": self._handle_control() else: self._send_response(404, {"error": "Not found"}) @@ -90,45 +90,54 @@ class BridgeAPIHandler(BaseHTTPRequestHandler): info = { "name": "Akita Meshtastic Meshcore Bridge", "version": "1.0.0", - "external_transport": self.bridge.config.external_transport if self.bridge.config else "unknown", + "external_transport": ( + self.bridge.config.external_transport + if self.bridge.config + else "unknown" + ), "meshtastic_connected": ( self.bridge.meshtastic_handler._is_connected.is_set() - if self.bridge.meshtastic_handler else False + if self.bridge.meshtastic_handler + else False ), "external_connected": ( self.bridge.external_handler._is_connected.is_set() - if hasattr(self.bridge.external_handler, '_is_connected') and self.bridge.external_handler else False + if hasattr(self.bridge.external_handler, "_is_connected") + and self.bridge.external_handler + else False ), } self._send_response(200, info) def _handle_control(self): """Handle control requests.""" - content_length = int(self.headers.get('Content-Length', 0)) + content_length = int(self.headers.get("Content-Length", 0)) if content_length == 0: self._send_response(400, {"error": "No request body"}) return body = self.rfile.read(content_length) try: - data = json.loads(body.decode('utf-8')) - action = data.get('action') + data = json.loads(body.decode("utf-8")) + action = data.get("action") - if action == 'reset_metrics': + if action == "reset_metrics": metrics = get_metrics() metrics.reset() self._send_response(200, {"message": "Metrics reset"}) else: - self._send_response(400, {"error": f"Unknown action: {action}"}) + self._send_response( + 400, {"error": f"Unknown action: {action}"} + ) except json.JSONDecodeError: self._send_response(400, {"error": "Invalid JSON"}) def _send_response(self, status_code: int, data: dict): """Send JSON response.""" - response = json.dumps(data, indent=2).encode('utf-8') + response = json.dumps(data, indent=2).encode("utf-8") self.send_response(status_code) - self.send_header('Content-Type', 'application/json') - self.send_header('Content-Length', str(len(response))) + self.send_header("Content-Type", "application/json") + self.send_header("Content-Length", str(len(response))) self.end_headers() self.wfile.write(response) @@ -136,7 +145,9 @@ class BridgeAPIHandler(BaseHTTPRequestHandler): class BridgeAPIServer: """REST API server for the bridge.""" - def __init__(self, bridge_instance, host: str = '127.0.0.1', port: int = 8080): + def __init__( + self, bridge_instance, host: str = "127.0.0.1", port: int = 8080 + ): self.logger = logging.getLogger(__name__) self.bridge = bridge_instance self.host = host @@ -154,11 +165,17 @@ class BridgeAPIServer: try: self.server = HTTPServer((self.host, self.port), handler_factory) - self.server_thread = threading.Thread(target=self._serve, daemon=True, name="BridgeAPI") + self.server_thread = threading.Thread( + target=self._serve, daemon=True, name="BridgeAPI" + ) self.server_thread.start() - self.logger.info(f"Bridge API server started on http://{self.host}:{self.port}") + self.logger.info( + f"Bridge API server started on http://{self.host}:{self.port}" + ) except Exception as e: - self.logger.error(f"Failed to start API server: {e}", exc_info=True) + self.logger.error( + f"Failed to start API server: {e}", exc_info=True + ) def stop(self): """Stop the API server.""" @@ -174,4 +191,3 @@ class BridgeAPIServer: """Run the server.""" if self.server: self.server.serve_forever() - diff --git a/ammb/bridge.py b/ammb/bridge.py index 40a13db..eb32e0e 100644 --- a/ammb/bridge.py +++ b/ammb/bridge.py @@ -5,105 +5,141 @@ Main Bridge orchestrator class. import logging import threading -from queue import Queue import time -from typing import Union, Optional +from queue import Queue +from typing import Optional, Union -from .config_handler import BridgeConfig -from .meshtastic_handler import MeshtasticHandler -from .meshcore_handler import MeshcoreHandler -from .mqtt_handler import MQTTHandler -from .metrics import get_metrics -from .health import get_health_monitor, HealthStatus from .api import BridgeAPIServer +from .config_handler import BridgeConfig +from .health import HealthStatus, get_health_monitor +from .meshcore_handler import MeshcoreHandler +from .meshtastic_handler import MeshtasticHandler +from .metrics import get_metrics +from .mqtt_handler import MQTTHandler ExternalHandler = Union[MeshcoreHandler, MQTTHandler] + class Bridge: """Orchestrates the Meshtastic-External Network bridge operation.""" + # Attribute annotations for mypy + to_meshtastic_queue: Queue + to_external_queue: Queue + meshtastic_handler: Optional[MeshtasticHandler] + external_handler: Optional[ExternalHandler] + handlers: list[object] + api_server: Optional[BridgeAPIServer] + def __init__(self, config: BridgeConfig): self.logger = logging.getLogger(__name__) self.config = config - self.shutdown_event = threading.Event() + self.shutdown_event = threading.Event() self.to_meshtastic_queue = Queue(maxsize=config.queue_size) self.to_external_queue = Queue(maxsize=config.queue_size) - self.logger.info(f"Message queues initialized with max size: {config.queue_size}") + self.logger.info( + "Message queues initialized with max size: %s", + config.queue_size, + ) # Initialize metrics and health monitoring self.metrics = get_metrics() self.health_monitor = get_health_monitor() - self.health_monitor.register_component("meshtastic", HealthStatus.UNKNOWN) - self.health_monitor.register_component("external", HealthStatus.UNKNOWN) + self.health_monitor.register_component( + "meshtastic", HealthStatus.UNKNOWN + ) + self.health_monitor.register_component( + "external", HealthStatus.UNKNOWN + ) self.health_monitor.start_monitoring() # Initialize API server if enabled self.api_server: Optional[BridgeAPIServer] = None if config.api_enabled: - self.api_server = BridgeAPIServer(self, host=config.api_host, port=config.api_port) + api_host = config.api_host or "127.0.0.1" + api_port = int(config.api_port or 8080) + self.api_server = BridgeAPIServer( + self, host=api_host, port=api_port + ) self.logger.info("Initializing network handlers...") self.meshtastic_handler: Optional[MeshtasticHandler] = None self.external_handler: Optional[ExternalHandler] = None - self.handlers = [] + self.handlers: list[object] = [] try: self.meshtastic_handler = MeshtasticHandler( config=config, to_external_queue=self.to_external_queue, from_external_queue=self.to_meshtastic_queue, - shutdown_event=self.shutdown_event + shutdown_event=self.shutdown_event, ) self.handlers.append(self.meshtastic_handler) - if config.external_transport == 'serial': + if config.external_transport == "serial": self.logger.info("Selected external transport: Serial") self.external_handler = MeshcoreHandler( config=config, to_meshtastic_queue=self.to_meshtastic_queue, from_meshtastic_queue=self.to_external_queue, - shutdown_event=self.shutdown_event + shutdown_event=self.shutdown_event, ) self.handlers.append(self.external_handler) - elif config.external_transport == 'mqtt': + elif config.external_transport == "mqtt": self.logger.info("Selected external transport: MQTT") self.external_handler = MQTTHandler( config=config, to_meshtastic_queue=self.to_meshtastic_queue, from_meshtastic_queue=self.to_external_queue, - shutdown_event=self.shutdown_event + shutdown_event=self.shutdown_event, ) self.handlers.append(self.external_handler) else: - raise ValueError(f"Invalid external_transport configured: {config.external_transport}") + raise ValueError( + "Invalid external_transport configured: " + f"{config.external_transport}" + ) - self.logger.info(f"All required handlers initialized successfully.") + self.logger.info( + "All required handlers initialized successfully." + ) except (ValueError, Exception) as e: - self.logger.critical(f"Failed to initialize handlers: {e}. Bridge cannot start.", exc_info=True) - self.stop() - self.external_handler = None - + self.logger.critical( + "Failed to initialize handlers: %s. Bridge cannot start.", + e, + exc_info=True, + ) + self.stop() + self.external_handler = None def run(self): self.logger.info("Starting AMMB run sequence...") if not self.meshtastic_handler or not self.external_handler: - self.logger.error("One or more handlers failed to initialize. Bridge cannot run.") - self.stop() - return + self.logger.error( + "One or more handlers failed to initialize. Bridge cannot run." + ) + self.stop() + return self.logger.info("Attempting initial network connections...") if not self.meshtastic_handler.connect(): - self.logger.critical("Failed to connect to Meshtastic device on startup. Bridge cannot start.") + self.logger.critical( + "Failed to connect to Meshtastic device on startup. " + "Bridge cannot start." + ) self.stop() return if not self.external_handler.connect(): handler_type = type(self.external_handler).__name__ - self.logger.warning(f"Failed to initiate connection for {handler_type} initially. Handler will keep trying in background.") - + self.logger.warning( + "Failed to initiate connection for %s initially. " + "Handler will keep trying in background.", + handler_type, + ) self.logger.info("Starting handler background tasks/threads...") try: self.meshtastic_handler.start_sender() @@ -113,29 +149,40 @@ class Bridge: self.external_handler.start_publisher() except Exception as e: - self.logger.critical(f"Failed to start handler background tasks: {e}", exc_info=True) - self.stop() - return + self.logger.critical( + "Failed to start handler background tasks: %s", + e, + exc_info=True, + ) + self.stop() + return # Start API server if enabled if self.api_server: self.api_server.start() - self.logger.info("Bridge background tasks started. Running... (Press Ctrl+C to stop)") + self.logger.info( + "Bridge background tasks started. Running... " + "(Press Ctrl+C to stop)" + ) try: while not self.shutdown_event.is_set(): - time.sleep(1) + time.sleep(1) except Exception as e: - self.logger.critical(f"Unexpected error in main bridge loop: {e}", exc_info=True) + self.logger.critical( + f"Unexpected error in main bridge loop: {e}", exc_info=True + ) finally: - self.logger.info("Main loop exiting. Initiating shutdown sequence...") - self.stop() + self.logger.info( + "Main loop exiting. Initiating shutdown sequence..." + ) + self.stop() def stop(self): if self.shutdown_event.is_set(): - return + return self.logger.info("Signaling shutdown to all components...") self.shutdown_event.set() @@ -149,9 +196,11 @@ class Bridge: self.logger.info(f"Stopping {len(self.handlers)} handlers...") for handler in reversed(self.handlers): - try: - handler.stop() - except Exception as e: - self.logger.error(f"Error stopping handler: {e}", exc_info=True) + try: + handler.stop() + except Exception as e: + self.logger.error( + f"Error stopping handler: {e}", exc_info=True + ) self.logger.info("Bridge shutdown sequence complete.") diff --git a/ammb/config_handler.py b/ammb/config_handler.py index eb6fc9f..ed2cd90 100644 --- a/ammb/config_handler.py +++ b/ammb/config_handler.py @@ -6,15 +6,17 @@ Handles loading, validation, and access for the bridge configuration. import configparser import logging import os -from typing import NamedTuple, Optional, Literal +from typing import Literal, NamedTuple, Optional, cast + class BridgeConfig(NamedTuple): """Stores all configuration settings for the bridge.""" + # Meshtastic Settings meshtastic_port: str # External Network Interface Settings - external_transport: Literal['serial', 'mqtt'] + external_transport: Literal["serial", "mqtt"] # Serial Specific (Optional) serial_port: Optional[str] @@ -37,91 +39,122 @@ class BridgeConfig(NamedTuple): bridge_node_id: str queue_size: int log_level: str - + # API Settings (Optional) api_enabled: Optional[bool] = False - api_host: Optional[str] = '127.0.0.1' + api_host: Optional[str] = "127.0.0.1" api_port: Optional[int] = 8080 - + # MQTT TLS Settings (Optional) mqtt_tls_enabled: Optional[bool] = False mqtt_tls_ca_certs: Optional[str] = None mqtt_tls_insecure: Optional[bool] = False + CONFIG_FILE = "config.ini" DEFAULT_CONFIG = { - 'MESHTASTIC_SERIAL_PORT': '/dev/ttyUSB0', - 'EXTERNAL_TRANSPORT': 'serial', - 'SERIAL_PORT': '/dev/ttyS0', - 'SERIAL_BAUD_RATE': '9600', - 'SERIAL_PROTOCOL': 'json_newline', - 'MQTT_BROKER': 'localhost', - 'MQTT_PORT': '1883', - 'MQTT_TOPIC_IN': 'ammb/to_meshtastic', - 'MQTT_TOPIC_OUT': 'ammb/from_meshtastic', - 'MQTT_USERNAME': '', - 'MQTT_PASSWORD': '', - 'MQTT_CLIENT_ID': 'ammb_bridge_client', - 'MQTT_QOS': '0', - 'MQTT_RETAIN_OUT': 'False', - 'EXTERNAL_NETWORK_ID': 'default_external_net', - 'BRIDGE_NODE_ID': '!ammb_bridge', - 'MESSAGE_QUEUE_SIZE': '100', - 'LOG_LEVEL': 'INFO', - 'API_ENABLED': 'False', - 'API_HOST': '127.0.0.1', - 'API_PORT': '8080', - 'MQTT_TLS_ENABLED': 'False', - 'MQTT_TLS_CA_CERTS': '', - 'MQTT_TLS_INSECURE': 'False', + "MESHTASTIC_SERIAL_PORT": "/dev/ttyUSB0", + "EXTERNAL_TRANSPORT": "serial", + "SERIAL_PORT": "/dev/ttyS0", + "SERIAL_BAUD_RATE": "9600", + "SERIAL_PROTOCOL": "json_newline", + "MQTT_BROKER": "localhost", + "MQTT_PORT": "1883", + "MQTT_TOPIC_IN": "ammb/to_meshtastic", + "MQTT_TOPIC_OUT": "ammb/from_meshtastic", + "MQTT_USERNAME": "", + "MQTT_PASSWORD": "", + "MQTT_CLIENT_ID": "ammb_bridge_client", + "MQTT_QOS": "0", + "MQTT_RETAIN_OUT": "False", + "EXTERNAL_NETWORK_ID": "default_external_net", + "BRIDGE_NODE_ID": "!ammb_bridge", + "MESSAGE_QUEUE_SIZE": "100", + "LOG_LEVEL": "INFO", + "API_ENABLED": "False", + "API_HOST": "127.0.0.1", + "API_PORT": "8080", + "MQTT_TLS_ENABLED": "False", + "MQTT_TLS_CA_CERTS": "", + "MQTT_TLS_INSECURE": "False", } -VALID_LOG_LEVELS = {'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'} -VALID_SERIAL_PROTOCOLS = {'json_newline', 'raw_serial'} -VALID_TRANSPORTS = {'serial', 'mqtt'} +VALID_LOG_LEVELS = {"CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"} +VALID_SERIAL_PROTOCOLS = {"json_newline", "raw_serial"} +VALID_TRANSPORTS = {"serial", "mqtt"} VALID_MQTT_QOS = {0, 1, 2} + def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]: """ Loads and validates configuration from the specified INI file. """ logger = logging.getLogger(__name__) - config = configparser.ConfigParser(defaults=DEFAULT_CONFIG, interpolation=None) + config = configparser.ConfigParser( + defaults=DEFAULT_CONFIG, interpolation=None + ) if not os.path.exists(config_path): - logger.error(f"Configuration file not found: {config_path}") - logger.error("Please copy 'examples/config.ini.example' to 'config.ini' and configure it.") + logger.error("Configuration file not found: %s", config_path) + logger.error( + "Please copy 'examples/config.ini.example' to 'config.ini' " + "and configure it." + ) return None try: - logger.info(f"Reading configuration from: {config_path}") + logger.info("Reading configuration from: %s", config_path) config.read(config_path) - if 'DEFAULT' not in config.sections(): - logger.warning(f"Configuration file '{config_path}' lacks the [DEFAULT] section. Using only defaults.") - cfg_section = config['DEFAULT'] + if "DEFAULT" not in config.sections(): + logger.warning( + "Configuration file '%s' lacks the [DEFAULT] section.", + config_path, + ) + logger.warning("Using only defaults.") + cfg_section = config["DEFAULT"] + + meshtastic_port = cfg_section.get( + "MESHTASTIC_SERIAL_PORT", + fallback=DEFAULT_CONFIG["MESHTASTIC_SERIAL_PORT"], + ) + external_network_id = cfg_section.get( + "EXTERNAL_NETWORK_ID", + fallback=DEFAULT_CONFIG["EXTERNAL_NETWORK_ID"], + ) + bridge_node_id = cfg_section.get( + "BRIDGE_NODE_ID", fallback=DEFAULT_CONFIG["BRIDGE_NODE_ID"] + ) + log_level = cfg_section.get( + "LOG_LEVEL", fallback=DEFAULT_CONFIG["LOG_LEVEL"] + ).upper() - meshtastic_port = cfg_section.get('MESHTASTIC_SERIAL_PORT', fallback=DEFAULT_CONFIG['MESHTASTIC_SERIAL_PORT']) - external_network_id = cfg_section.get('EXTERNAL_NETWORK_ID', fallback=DEFAULT_CONFIG['EXTERNAL_NETWORK_ID']) - bridge_node_id = cfg_section.get('BRIDGE_NODE_ID', fallback=DEFAULT_CONFIG['BRIDGE_NODE_ID']) - log_level = cfg_section.get('LOG_LEVEL', fallback=DEFAULT_CONFIG['LOG_LEVEL']).upper() - if log_level not in VALID_LOG_LEVELS: - logger.error(f"Invalid LOG_LEVEL '{log_level}'. Must be one of: {VALID_LOG_LEVELS}") + logger.error( + "Invalid LOG_LEVEL '%s'. Must be one of: %s", + log_level, + VALID_LOG_LEVELS, + ) return None try: - queue_size = cfg_section.getint('MESSAGE_QUEUE_SIZE') - if queue_size <= 0: - raise ValueError("Queue size must be positive.") + queue_size = cfg_section.getint("MESSAGE_QUEUE_SIZE") + if queue_size is None or queue_size <= 0: + raise ValueError("Queue size must be positive.") except ValueError as e: - logger.error(f"Invalid integer value for MESSAGE_QUEUE_SIZE: {e}") + logger.error("Invalid integer value for MESSAGE_QUEUE_SIZE: %s", e) return None - external_transport = cfg_section.get('EXTERNAL_TRANSPORT', fallback=DEFAULT_CONFIG['EXTERNAL_TRANSPORT']).lower() + external_transport = cfg_section.get( + "EXTERNAL_TRANSPORT", fallback=DEFAULT_CONFIG["EXTERNAL_TRANSPORT"] + ).lower() if external_transport not in VALID_TRANSPORTS: - logger.error(f"Invalid EXTERNAL_TRANSPORT '{external_transport}'. Must be one of: {VALID_TRANSPORTS}") + logger.error( + "Invalid EXTERNAL_TRANSPORT '%s'. Must be one of: %s", + external_transport, + VALID_TRANSPORTS, + ) return None serial_port = None @@ -137,73 +170,109 @@ def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]: mqtt_qos = None mqtt_retain_out = None - if external_transport == 'serial': - serial_port = cfg_section.get('SERIAL_PORT', fallback=DEFAULT_CONFIG['SERIAL_PORT']) - serial_protocol = cfg_section.get('SERIAL_PROTOCOL', fallback=DEFAULT_CONFIG['SERIAL_PROTOCOL']).lower() + if external_transport == "serial": + serial_port = cfg_section.get( + "SERIAL_PORT", fallback=DEFAULT_CONFIG["SERIAL_PORT"] + ) + serial_protocol = cfg_section.get( + "SERIAL_PROTOCOL", fallback=DEFAULT_CONFIG["SERIAL_PROTOCOL"] + ).lower() if not serial_port: - logger.error("SERIAL_PORT must be set when EXTERNAL_TRANSPORT is 'serial'.") - return None + logger.error( + "SERIAL_PORT must be set when EXTERNAL_TRANSPORT " + "is 'serial'." + ) + return None if serial_protocol not in VALID_SERIAL_PROTOCOLS: - logger.warning( - f"Unrecognized SERIAL_PROTOCOL '{serial_protocol}'. " - f"Valid built-in options are: {VALID_SERIAL_PROTOCOLS}. " - f"Attempting to use '{serial_protocol}' - ensure a corresponding handler exists." - ) + logger.warning( + "Unrecognized SERIAL_PROTOCOL '%s'." + " Valid options: %s.", + serial_protocol, + VALID_SERIAL_PROTOCOLS, + ) + logger.warning( + "Attempting to use '%s' - ensure a corresponding " + "handler exists.", + serial_protocol, + ) try: - serial_baud = cfg_section.getint('SERIAL_BAUD_RATE') - if serial_baud <= 0: - raise ValueError("Serial baud rate must be positive.") + serial_baud = cfg_section.getint("SERIAL_BAUD_RATE") + if serial_baud is None or serial_baud <= 0: + raise ValueError("Serial baud rate must be positive.") except ValueError as e: - logger.error(f"Invalid integer value for SERIAL_BAUD_RATE: {e}") + logger.error( + "Invalid integer value for SERIAL_BAUD_RATE: %s", + e, + ) return None - elif external_transport == 'mqtt': - mqtt_broker = cfg_section.get('MQTT_BROKER') - mqtt_topic_in = cfg_section.get('MQTT_TOPIC_IN') - mqtt_topic_out = cfg_section.get('MQTT_TOPIC_OUT') - mqtt_username = cfg_section.get('MQTT_USERNAME') - mqtt_password = cfg_section.get('MQTT_PASSWORD') - mqtt_client_id = cfg_section.get('MQTT_CLIENT_ID') + elif external_transport == "mqtt": + mqtt_broker = cfg_section.get("MQTT_BROKER") + mqtt_topic_in = cfg_section.get("MQTT_TOPIC_IN") + mqtt_topic_out = cfg_section.get("MQTT_TOPIC_OUT") + mqtt_username = cfg_section.get("MQTT_USERNAME") + mqtt_password = cfg_section.get("MQTT_PASSWORD") + mqtt_client_id = cfg_section.get("MQTT_CLIENT_ID") if not mqtt_broker or not mqtt_topic_in or not mqtt_topic_out: - logger.error("MQTT_BROKER, MQTT_TOPIC_IN, and MQTT_TOPIC_OUT must be set when EXTERNAL_TRANSPORT is 'mqtt'.") - return None + logger.error( + "MQTT_BROKER, MQTT_TOPIC_IN and " + "MQTT_TOPIC_OUT must be set." + ) + return None if not mqtt_client_id: - logger.warning("MQTT_CLIENT_ID is empty. Using default.") - mqtt_client_id = DEFAULT_CONFIG['MQTT_CLIENT_ID'] + logger.warning( + "MQTT_CLIENT_ID empty. Using default." + ) + mqtt_client_id = DEFAULT_CONFIG["MQTT_CLIENT_ID"] try: - mqtt_port = cfg_section.getint('MQTT_PORT') - mqtt_qos = cfg_section.getint('MQTT_QOS') - mqtt_retain_out = cfg_section.getboolean('MQTT_RETAIN_OUT') - if mqtt_port <= 0 or mqtt_port > 65535: - raise ValueError("MQTT port must be between 1 and 65535.") - if mqtt_qos not in VALID_MQTT_QOS: - raise ValueError(f"MQTT_QOS must be one of {VALID_MQTT_QOS}.") + mqtt_port = cfg_section.getint("MQTT_PORT") + mqtt_qos = cfg_section.getint("MQTT_QOS") + mqtt_retain_out = cfg_section.getboolean("MQTT_RETAIN_OUT") + if mqtt_port is None or mqtt_port <= 0 or mqtt_port > 65535: + raise ValueError("MQTT port must be between 1 and 65535.") + if mqtt_qos is None or mqtt_qos not in VALID_MQTT_QOS: + msg = "MQTT_QOS must be one of %s" % (VALID_MQTT_QOS,) + raise ValueError(msg) except ValueError as e: - logger.error(f"Invalid integer/boolean value in MQTT configuration: {e}") + logger.error( + "Invalid integer/boolean value in MQTT configuration: %s", + e, + ) return None # Parse API settings - api_enabled = cfg_section.getboolean('API_ENABLED', fallback=False) - api_host = cfg_section.get('API_HOST', fallback='127.0.0.1') + api_enabled = cfg_section.getboolean("API_ENABLED", fallback=False) + api_host = cfg_section.get("API_HOST", fallback="127.0.0.1") try: - api_port = cfg_section.getint('API_PORT', fallback=8080) + api_port = cfg_section.getint("API_PORT", fallback=8080) if api_port <= 0 or api_port > 65535: - logger.warning(f"Invalid API_PORT {api_port}, using default 8080") + logger.warning( + "Invalid API_PORT %s, using default 8080", + api_port, + ) api_port = 8080 except ValueError: logger.warning("Invalid API_PORT, using default 8080") api_port = 8080 # Parse MQTT TLS settings - mqtt_tls_enabled = cfg_section.getboolean('MQTT_TLS_ENABLED', fallback=False) - mqtt_tls_ca_certs = cfg_section.get('MQTT_TLS_CA_CERTS', fallback='').strip() or None - mqtt_tls_insecure = cfg_section.getboolean('MQTT_TLS_INSECURE', fallback=False) + mqtt_tls_enabled = cfg_section.getboolean( + "MQTT_TLS_ENABLED", fallback=False + ) + mqtt_tls_ca_certs = ( + cfg_section.get("MQTT_TLS_CA_CERTS", fallback="").strip() or None + ) + mqtt_tls_insecure = cfg_section.getboolean( + "MQTT_TLS_INSECURE", fallback=False + ) bridge_config = BridgeConfig( meshtastic_port=meshtastic_port, - external_transport=external_transport, + external_transport=cast( + Literal["serial", "mqtt"], external_transport + ), serial_port=serial_port, serial_baud=serial_baud, serial_protocol=serial_protocol, @@ -218,7 +287,7 @@ def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]: mqtt_retain_out=mqtt_retain_out, external_network_id=external_network_id, bridge_node_id=bridge_node_id, - queue_size=queue_size, + queue_size=int(queue_size), log_level=log_level, api_enabled=api_enabled, api_host=api_host, @@ -227,9 +296,13 @@ def load_config(config_path: str = CONFIG_FILE) -> Optional[BridgeConfig]: mqtt_tls_ca_certs=mqtt_tls_ca_certs, mqtt_tls_insecure=mqtt_tls_insecure, ) - logger.debug(f"Configuration loaded: {bridge_config}") + logger.debug("Configuration loaded: %s", bridge_config) return bridge_config except Exception as e: - logger.error(f"Unexpected error loading configuration: {e}", exc_info=True) + logger.error( + "Unexpected error loading configuration: %s", + e, + exc_info=True, + ) return None diff --git a/ammb/health.py b/ammb/health.py index 718e4f8..1f011ef 100644 --- a/ammb/health.py +++ b/ammb/health.py @@ -6,14 +6,15 @@ Health monitoring and status checking for the bridge. import logging import threading import time -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timedelta from enum import Enum -from typing import Dict, Optional, List +from typing import Dict, Optional class HealthStatus(Enum): """Health status levels.""" + HEALTHY = "healthy" DEGRADED = "degraded" UNHEALTHY = "unhealthy" @@ -23,11 +24,12 @@ class HealthStatus(Enum): @dataclass class ComponentHealth: """Health status for a component.""" + name: str status: HealthStatus last_check: datetime message: str = "" - details: Dict = None + details: Dict = field(default_factory=dict) def __post_init__(self): if self.details is None: @@ -55,7 +57,9 @@ class HealthMonitor: self._monitoring = False self._monitor_thread: Optional[threading.Thread] = None - def register_component(self, name: str, initial_status: HealthStatus = HealthStatus.UNKNOWN): + def register_component( + self, name: str, initial_status: HealthStatus = HealthStatus.UNKNOWN + ): """Register a component for health monitoring.""" with self._lock: self.components[name] = ComponentHealth( @@ -64,7 +68,13 @@ class HealthMonitor: last_check=datetime.now(), ) - def update_component(self, name: str, status: HealthStatus, message: str = "", details: Optional[Dict] = None): + def update_component( + self, + name: str, + status: HealthStatus, + message: str = "", + details: Optional[Dict] = None, + ): """Update the health status of a component.""" with self._lock: if name in self.components: @@ -72,9 +82,18 @@ class HealthMonitor: self.components[name].last_check = datetime.now() self.components[name].message = message if details: + # Ensure details is a dict. + # Dataclass __post_init__ should set it. + # Be defensive if None. + if self.components[name].details is None: + self.components[name].details = {} self.components[name].details.update(details) else: - self.logger.warning(f"Component {name} not registered for health monitoring") + self.logger.warning( + "Component %s not registered for health " + "monitoring", + name, + ) def get_component_health(self, name: str) -> Optional[ComponentHealth]: """Get health status for a specific component.""" @@ -92,8 +111,8 @@ class HealthMonitor: } statuses = [comp.status for comp in self.components.values()] - - # Determine overall status + + # Determine overall status from component states if HealthStatus.UNHEALTHY in statuses: overall = HealthStatus.UNHEALTHY elif HealthStatus.DEGRADED in statuses: @@ -103,10 +122,14 @@ class HealthMonitor: else: overall = HealthStatus.UNKNOWN + components = { + name: comp.to_dict() + for name, comp in self.components.items() + } return { "status": overall.value, "timestamp": datetime.now().isoformat(), - "components": {name: comp.to_dict() for name, comp in self.components.items()}, + "components": components, } def start_monitoring(self): @@ -115,7 +138,9 @@ class HealthMonitor: return self._monitoring = True - self._monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True, name="HealthMonitor") + self._monitor_thread = threading.Thread( + target=self._monitor_loop, daemon=True, name="HealthMonitor" + ) self._monitor_thread.start() self.logger.info("Health monitoring started") @@ -138,13 +163,20 @@ class HealthMonitor: for name, comp in list(self.components.items()): if (now - comp.last_check) > stale_threshold: if comp.status != HealthStatus.UNHEALTHY: - self.logger.warning(f"Component {name} health check is stale") + self.logger.warning( + "Component %s health check is stale", + name, + ) comp.status = HealthStatus.DEGRADED - comp.message = "Health check stale - no recent updates" + comp.message = ( + "Health check stale; no recent updates" + ) time.sleep(self.check_interval) except Exception as e: - self.logger.error(f"Error in health monitor loop: {e}", exc_info=True) + self.logger.error( + "Error in health monitor loop: %s", e, exc_info=True + ) time.sleep(self.check_interval) @@ -160,4 +192,3 @@ def get_health_monitor() -> HealthMonitor: if _health_monitor is None: _health_monitor = HealthMonitor() return _health_monitor - diff --git a/ammb/meshcore_handler.py b/ammb/meshcore_handler.py index d5e7c5d..ea94c31 100644 --- a/ammb/meshcore_handler.py +++ b/ammb/meshcore_handler.py @@ -3,105 +3,167 @@ Handles interactions with an external device via a **Serial** port. """ +import json import logging import threading import time -import json -from queue import Queue, Empty, Full -from typing import Optional, Dict, Any +from queue import Empty, Full, Queue +from typing import Any, Dict, Optional + import serial from .config_handler import BridgeConfig -from .protocol import MeshcoreProtocolHandler, get_serial_protocol_handler +from .health import HealthStatus, get_health_monitor from .metrics import get_metrics -from .health import get_health_monitor, HealthStatus -from .validator import MessageValidator +from .protocol import MeshcoreProtocolHandler, get_serial_protocol_handler from .rate_limiter import RateLimiter +from .validator import MessageValidator + class MeshcoreHandler: """Manages Serial connection and communication with an external device.""" - RECONNECT_DELAY_S = 10 - def __init__(self, config: BridgeConfig, to_meshtastic_queue: Queue, from_meshtastic_queue: Queue, shutdown_event: threading.Event): + RECONNECT_DELAY_S = 10 + + def __init__( + self, + config: BridgeConfig, + to_meshtastic_queue: Queue, + from_meshtastic_queue: Queue, + shutdown_event: threading.Event, + ): self.logger = logging.getLogger(__name__) self.config = config self.to_meshtastic_queue = to_meshtastic_queue - self.to_serial_queue = from_meshtastic_queue + self.to_serial_queue = from_meshtastic_queue self.shutdown_event = shutdown_event self.serial_port: Optional[serial.Serial] = None self.receiver_thread: Optional[threading.Thread] = None self.sender_thread: Optional[threading.Thread] = None - self._lock = threading.Lock() + self._lock = threading.Lock() self._is_connected = threading.Event() - + # Initialize metrics, health, validator, and rate limiter self.metrics = get_metrics() self.health_monitor = get_health_monitor() self.validator = MessageValidator() - self.rate_limiter = RateLimiter(max_messages=60, time_window=60.0) + self.rate_limiter = RateLimiter(max_messages=60, time_window=60.0) - if not config.serial_port or not config.serial_baud or not config.serial_protocol: - raise ValueError("Serial transport selected, but required SERIAL configuration options are missing.") + if ( + not config.serial_port + or not config.serial_baud + or not config.serial_protocol + ): + raise ValueError( + "Serial transport selected, but required SERIAL " + "configuration options are missing." + ) try: - self.protocol_handler: MeshcoreProtocolHandler = get_serial_protocol_handler(config.serial_protocol) + self.protocol_handler: MeshcoreProtocolHandler = ( + get_serial_protocol_handler(config.serial_protocol) + ) except ValueError as e: - self.logger.critical(f"Failed to initialize serial protocol handler '{config.serial_protocol}': {e}.") + self.logger.critical( + "Failed to initialize serial protocol handler '%s': %s.", + config.serial_protocol, + e, + ) + class DummyHandler(MeshcoreProtocolHandler): - def read(self, port): return None - def encode(self, data): return None - def decode(self, line): return None + def read(self, port): + return None + + def encode(self, data): + return None + + def decode(self, line): + return None + self.protocol_handler = DummyHandler() self.logger.info("Serial Handler (MeshcoreHandler) Initialized.") - def connect(self) -> bool: - with self._lock: + with self._lock: if self.serial_port and self.serial_port.is_open: - self.logger.info(f"Serial port {self.config.serial_port} already connected.") - self._is_connected.set() + self.logger.info( + "Serial port %s already connected.", + self.config.serial_port, + ) + self._is_connected.set() return True try: - self.logger.info(f"Attempting connection to Serial device on {self.config.serial_port} at {self.config.serial_baud} baud...") + self.logger.info( + "Connecting to Serial device %s at %s baud...", + self.config.serial_port, + self.config.serial_baud, + ) self._is_connected.clear() - if self.serial_port: - try: - self.serial_port.close() - except Exception: pass + if self.serial_port: + try: + self.serial_port.close() + except Exception: + pass + # Ensure baud rate is available for typing and runtime + assert self.config.serial_baud is not None self.serial_port = serial.Serial( port=self.config.serial_port, - baudrate=self.config.serial_baud, - timeout=1, + baudrate=int(self.config.serial_baud), + timeout=1, ) if self.serial_port.is_open: - self.logger.info(f"Connected to Serial device on {self.config.serial_port}") + self.logger.info( + "Connected to Serial device on %s", + self.config.serial_port, + ) self._is_connected.set() self.metrics.record_external_connection() - self.health_monitor.update_component("external", HealthStatus.HEALTHY, "Serial connected") + self.health_monitor.update_component( + "external", HealthStatus.HEALTHY, "Serial connected" + ) return True else: - self.logger.error(f"Failed to open serial port {self.config.serial_port}, but no exception was raised.") + self.logger.error( + "Failed to open serial port %s; no exception raised.", + self.config.serial_port, + ) self.serial_port = None self._is_connected.clear() return False except serial.SerialException as e: - self.logger.error(f"Serial error connecting to device {self.config.serial_port}: {e}") + self.logger.error( + "Serial error connecting to device %s: %s", + self.config.serial_port, + e, + ) self.serial_port = None self._is_connected.clear() self.metrics.record_external_disconnection() - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, f"Connection failed: {e}") + self.health_monitor.update_component( + "external", + HealthStatus.UNHEALTHY, + f"Connection failed: {e}", + ) return False except Exception as e: - self.logger.error(f"Unexpected error connecting to serial device: {e}", exc_info=True) + self.logger.error( + "Unexpected error connecting to serial device: %s", + e, + exc_info=True, + ) self.serial_port = None self._is_connected.clear() self.metrics.record_external_disconnection() - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, f"Connection error: {e}") + self.health_monitor.update_component( + "external", + HealthStatus.UNHEALTHY, + f"Connection error: {e}", + ) return False def start_threads(self): @@ -109,22 +171,30 @@ class MeshcoreHandler: self.logger.warning("Serial receiver thread already started.") else: self.logger.info("Starting Serial receiver thread...") - self.receiver_thread = threading.Thread(target=self._serial_receiver_loop, daemon=True, name="SerialReceiver") + self.receiver_thread = threading.Thread( + target=self._serial_receiver_loop, + daemon=True, + name="SerialReceiver", + ) self.receiver_thread.start() if self.sender_thread and self.sender_thread.is_alive(): self.logger.warning("Serial sender thread already started.") else: self.logger.info("Starting Serial sender thread...") - self.sender_thread = threading.Thread(target=self._serial_sender_loop, daemon=True, name="SerialSender") + self.sender_thread = threading.Thread( + target=self._serial_sender_loop, + daemon=True, + name="SerialSender", + ) self.sender_thread.start() def stop(self): self.logger.info("Stopping Serial handler...") if self.receiver_thread and self.receiver_thread.is_alive(): - self.receiver_thread.join(timeout=2) + self.receiver_thread.join(timeout=2) if self.sender_thread and self.sender_thread.is_alive(): - self.sender_thread.join(timeout=5) + self.sender_thread.join(timeout=5) self._close_serial() self.logger.info("Serial handler stopped.") @@ -134,64 +204,98 @@ class MeshcoreHandler: port_name = self.config.serial_port try: self.serial_port.close() - self.logger.info(f"Serial port {port_name} closed.") + self.logger.info("Serial port %s closed.", port_name) except Exception as e: - self.logger.error(f"Error closing serial port {port_name}: {e}", exc_info=True) + self.logger.error( + "Error closing serial port %s: %s", + port_name, + e, + exc_info=True, + ) finally: self.serial_port = None self._is_connected.clear() self.metrics.record_external_disconnection() - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, "Disconnected") + self.health_monitor.update_component( + "external", HealthStatus.UNHEALTHY, "Disconnected" + ) def _serial_receiver_loop(self): - """Continuously reads from serial using Protocol Handler, translates, and queues.""" + """Continuously reads from serial using Protocol Handler, translates, + and queues.""" self.logger.info("Serial receiver loop started.") while not self.shutdown_event.is_set(): # --- Connection Check --- if not self._is_connected.is_set(): - self.logger.warning(f"Serial port {self.config.serial_port} not connected. Attempting reconnect...") - if self.connect(): - self.logger.info(f"Serial device reconnected successfully on {self.config.serial_port}.") + self.logger.warning( + "Serial port %s not connected. " + "Attempting reconnect...", + self.config.serial_port, + ) + if self.connect(): + self.logger.info( + "Serial device reconnected on %s", + self.config.serial_port, + ) else: self.shutdown_event.wait(self.RECONNECT_DELAY_S) - continue + continue # --- Read and Process Data --- try: raw_data: Optional[bytes] = None with self._lock: - if self.serial_port and self.serial_port.is_open: - # Delegate reading to protocol handler - raw_data = self.protocol_handler.read(self.serial_port) - else: - self._is_connected.clear() - continue + if self.serial_port and self.serial_port.is_open: + # Delegate reading to protocol handler + raw_data = self.protocol_handler.read(self.serial_port) + else: + self._is_connected.clear() + continue if raw_data: - self.logger.debug(f"Serial RAW RX: {raw_data!r}") - + self.logger.debug("Serial RAW RX: %r", raw_data) + # Decode using the selected protocol handler - decoded_msg: Optional[Dict[str, Any]] = self.protocol_handler.decode(raw_data) + decoded_msg: Optional[Dict[str, Any]] = ( + self.protocol_handler.decode(raw_data) + ) if decoded_msg: # Validate message - is_valid, error_msg = self.validator.validate_external_message(decoded_msg) + is_valid, error_msg = ( + self.validator.validate_external_message( + decoded_msg + ) + ) if not is_valid: - self.logger.warning(f"Invalid external message rejected: {error_msg}") + self.logger.warning( + "Invalid external message rejected: %s", + error_msg + ) self.metrics.record_error("external") continue # Check rate limit - if not self.rate_limiter.check_rate_limit("serial_receiver"): - self.logger.warning("Rate limit exceeded for Serial receiver") - self.metrics.record_rate_limit_violation("serial_receiver") + if not self.rate_limiter.check_rate_limit( + "serial_receiver" + ): + self.logger.warning( + "Rate limit exceeded for Serial receiver" + ) + self.metrics.record_rate_limit_violation( + "serial_receiver" + ) continue # Sanitize message - decoded_msg = self.validator.sanitize_external_message(decoded_msg) + decoded_msg = self.validator.sanitize_external_message( + decoded_msg + ) # Basic Translation Logic (Serial -> Meshtastic) - dest_meshtastic_id = decoded_msg.get("destination_meshtastic_id") + dest_meshtastic_id = decoded_msg.get( + "destination_meshtastic_id" + ) payload = decoded_msg.get("payload") payload_json = decoded_msg.get("payload_json") channel_index = decoded_msg.get("channel_index", 0) @@ -204,9 +308,12 @@ class MeshcoreHandler: try: text_payload_str = json.dumps(payload_json) except (TypeError, ValueError) as e: - self.logger.error(f"Failed to serialize payload_json: {e}") - elif payload is not None: - text_payload_str = str(payload) + self.logger.error( + "Failed to serialize payload_json: %s", + e + ) + elif payload is not None: + text_payload_str = str(payload) if dest_meshtastic_id and text_payload_str is not None: meshtastic_msg = { @@ -217,50 +324,83 @@ class MeshcoreHandler: } try: - self.to_meshtastic_queue.put_nowait(meshtastic_msg) - payload_size = len(text_payload_str.encode('utf-8')) if text_payload_str else 0 - self.metrics.record_external_received(payload_size) - self.logger.info(f"Queued message from Serial for Meshtastic node {dest_meshtastic_id}") + self.to_meshtastic_queue.put_nowait( + meshtastic_msg + ) + payload_size = ( + len(text_payload_str.encode("utf-8")) + if text_payload_str + else 0 + ) + self.metrics.record_external_received( + payload_size + ) + self.logger.info( + "Queued message for Meshtastic %s", + dest_meshtastic_id, + ) except Full: - self.logger.warning("Meshtastic send queue is full. Dropping incoming message from Serial.") + self.logger.warning( + "Meshtastic queue full; dropping message." + ) self.metrics.record_dropped("external") else: - self.logger.warning(f"Serial RX: Decoded message lacks required fields: {decoded_msg}") + self.logger.warning( + "Decoded serial message missing fields." + ) + self.logger.debug("Decoded: %s", decoded_msg) else: - # No data available from serial, sleep briefly to prevent CPU spin + # No data available - sleep briefly to avoid CPU spin time.sleep(0.1) except serial.SerialException as e: - self.logger.error(f"Serial error in receiver loop ({self.config.serial_port}): {e}. Attempting to reconnect...") - self._close_serial() - time.sleep(1) - except Exception as e: - self.logger.error(f"Unexpected error in serial_receiver_loop: {e}", exc_info=True) + self.logger.error( + "Serial error in receiver loop (%s): %s", + self.config.serial_port, + e, + ) + self.logger.info("Attempting to reconnect...") self._close_serial() - time.sleep(self.RECONNECT_DELAY_S / 2) + time.sleep(1) + except Exception as e: + self.logger.error( + "Unexpected error in serial_receiver_loop: %s", + e, + exc_info=True, + ) + self._close_serial() + time.sleep(self.RECONNECT_DELAY_S / 2) self.logger.info("Serial receiver loop stopped.") - def _serial_sender_loop(self): - """Continuously reads from the queue, encodes, and sends messages via Serial.""" + """Continuously reads from the queue, encodes, and sends + messages via Serial.""" self.logger.info("Serial sender loop started.") while not self.shutdown_event.is_set(): if not self._is_connected.is_set(): - time.sleep(self.RECONNECT_DELAY_S / 2) - continue + time.sleep(self.RECONNECT_DELAY_S / 2) + continue try: - item: Optional[Dict[str, Any]] = self.to_serial_queue.get(timeout=1) + item: Optional[Dict[str, Any]] = self.to_serial_queue.get( + timeout=1 + ) if not item: continue - encoded_message: Optional[bytes] = self.protocol_handler.encode(item) + encoded_message: Optional[bytes] = ( + self.protocol_handler.encode(item) + ) if encoded_message: # Truncate log for binary safety log_preview = repr(encoded_message[:50]) - self.logger.info(f"Serial TX -> Port: {self.config.serial_port}, Payload: {log_preview}") + self.logger.info( + "Serial TX port %s payload %s", + self.config.serial_port, + log_preview, + ) send_success = False with self._lock: @@ -269,30 +409,52 @@ class MeshcoreHandler: self.serial_port.write(encoded_message) self.serial_port.flush() send_success = True - self.metrics.record_external_sent(len(encoded_message)) + self.metrics.record_external_sent( + len(encoded_message) + ) except serial.SerialException as e: - self.logger.error(f"Serial error during send ({self.config.serial_port}): {e}.") - self._close_serial() + self.logger.error( + "Serial error during send (%s): %s.", + self.config.serial_port, + e + ) + self._close_serial() except Exception as e: - self.logger.error(f"Unexpected error sending Serial message: {e}", exc_info=True) + self.logger.error( + "Unexpected error sending Serial " + "message: %s", + e, + exc_info=True, + ) else: - self.logger.warning(f"Serial port disconnected just before send attempt.") - self._is_connected.clear() + self.logger.warning( + "Serial port disconnected before send." + ) + self._is_connected.clear() if send_success: - self.to_serial_queue.task_done() + self.to_serial_queue.task_done() else: - self.logger.error("Failed to send Serial message. Discarding.") - self.to_serial_queue.task_done() + self.logger.error( + "Failed to send Serial message. Discarding." + ) + self.to_serial_queue.task_done() else: - self.logger.error(f"Failed to encode message for Serial: {item}") - self.to_serial_queue.task_done() + self.logger.error( + "Failed to encode message for Serial: %s", + item + ) + self.to_serial_queue.task_done() except Empty: continue except Exception as e: - self.logger.error(f"Critical error in serial_sender_loop: {e}", exc_info=True) - self._is_connected.clear() - time.sleep(5) + self.logger.error( + "Critical error in serial_sender_loop: %s", + e, + exc_info=True, + ) + self._is_connected.clear() + time.sleep(5) self.logger.info("Serial sender loop stopped.") diff --git a/ammb/meshtastic_handler.py b/ammb/meshtastic_handler.py index b8b979f..5941200 100644 --- a/ammb/meshtastic_handler.py +++ b/ammb/meshtastic_handler.py @@ -6,41 +6,53 @@ Handles all interactions with the Meshtastic device and network. import logging import threading import time -from queue import Queue, Empty, Full -from typing import Optional, Dict, Any -import meshtastic -import meshtastic.serial_interface +from queue import Empty, Full, Queue +from typing import Any, Dict, Optional + +import meshtastic # type: ignore[import] +import meshtastic.serial_interface # type: ignore[import] from pubsub import pub -import serial from .config_handler import BridgeConfig +from .health import HealthStatus, get_health_monitor from .metrics import get_metrics -from .health import get_health_monitor, HealthStatus -from .validator import MessageValidator from .rate_limiter import RateLimiter +from .validator import MessageValidator + class MeshtasticHandler: """Manages connection and communication with the Meshtastic network.""" + RECONNECT_DELAY_S = 10 - def __init__(self, config: BridgeConfig, to_external_queue: Queue, from_external_queue: Queue, shutdown_event: threading.Event): + def __init__( + self, + config: BridgeConfig, + to_external_queue: Queue, + from_external_queue: Queue, + shutdown_event: threading.Event, + ): self.logger = logging.getLogger(__name__) self.config = config self.to_external_queue = to_external_queue self.to_meshtastic_queue = from_external_queue self.shutdown_event = shutdown_event - self.interface: Optional[meshtastic.serial_interface.SerialInterface] = None + self.interface: Optional[ + meshtastic.serial_interface.SerialInterface + ] = None self.my_node_id: Optional[str] = None self.sender_thread: Optional[threading.Thread] = None self._lock = threading.Lock() self._is_connected = threading.Event() - + # Initialize metrics, health, validator, and rate limiter self.metrics = get_metrics() self.health_monitor = get_health_monitor() self.validator = MessageValidator() - self.rate_limiter = RateLimiter(max_messages=60, time_window=60.0) # 60 messages per minute + self.rate_limiter = RateLimiter( + max_messages=60, time_window=60.0 + ) # 60 messages per minute def connect(self) -> bool: with self._lock: @@ -48,12 +60,17 @@ class MeshtasticHandler: return True try: - self.logger.info(f"Attempting connection to Meshtastic on {self.config.meshtastic_port}...") + self.logger.info( + "Attempting connection to Meshtastic on %s...", + self.config.meshtastic_port, + ) self._is_connected.clear() self.my_node_id = None if self.interface: - try: self.interface.close() - except Exception: pass + try: + self.interface.close() + except Exception: + pass self.interface = meshtastic.serial_interface.SerialInterface( self.config.meshtastic_port @@ -61,111 +78,170 @@ class MeshtasticHandler: my_info = self.interface.getMyNodeInfo() retry_count = 0 - while (not my_info or 'num' not in my_info) and retry_count < 3: - time.sleep(2) - my_info = self.interface.getMyNodeInfo() - retry_count += 1 + while ( + not my_info or "num" not in my_info + ) and retry_count < 3: + time.sleep(2) + my_info = self.interface.getMyNodeInfo() + retry_count += 1 - if my_info and 'num' in my_info: + if my_info and "num" in my_info: self.my_node_id = f"!{my_info['num']:x}" - user_id = my_info.get('user', {}).get('id', 'N/A') - self.logger.info(f"Connected to Meshtastic device. Node ID: {self.my_node_id} ('{user_id}')") + user_id = my_info.get("user", {}).get("id", "N/A") + self.logger.info( + "Connected to Meshtastic device. Node ID: %s (%s)", + self.my_node_id, + user_id, + ) self._is_connected.set() self.metrics.record_meshtastic_connection() - self.health_monitor.update_component("meshtastic", HealthStatus.HEALTHY, "Connected") + self.health_monitor.update_component( + "meshtastic", HealthStatus.HEALTHY, "Connected" + ) else: - self.logger.warning("Connected to Meshtastic, but failed to retrieve node info. Loopback detection unreliable.") - self._is_connected.set() - self.metrics.record_meshtastic_connection() - self.health_monitor.update_component("meshtastic", HealthStatus.DEGRADED, "Connected but node info unavailable") + self.logger.warning( + "Connected to Meshtastic but failed to retrieve " + "node info." + ) + self._is_connected.set() + self.metrics.record_meshtastic_connection() + self.health_monitor.update_component( + "meshtastic", + HealthStatus.DEGRADED, + "Connected but node info " + "unavailable", + ) - pub.subscribe(self._on_meshtastic_receive, "meshtastic.receive", weak=False) + pub.subscribe( + self._on_meshtastic_receive, + "meshtastic.receive", + weak=False, + ) self.logger.info("Meshtastic receive callback registered.") return True except Exception as e: - self.logger.error(f"Error connecting to Meshtastic device {self.config.meshtastic_port}: {e}", exc_info=False) + self.logger.error( + "Error connecting to Meshtastic device %s: %s", + self.config.meshtastic_port, + e, + exc_info=False, + ) if self.interface: - try: self.interface.close() - except Exception: pass + try: + self.interface.close() + except Exception: + pass self.interface = None self.my_node_id = None self._is_connected.clear() self.metrics.record_meshtastic_disconnection() - self.health_monitor.update_component("meshtastic", HealthStatus.UNHEALTHY, f"Connection failed: {e}") + self.health_monitor.update_component( + "meshtastic", + HealthStatus.UNHEALTHY, + f"Connection failed: {e}", + ) return False def start_sender(self): if self.sender_thread and self.sender_thread.is_alive(): return self.logger.info("Starting Meshtastic sender thread...") - self.sender_thread = threading.Thread(target=self._meshtastic_sender_loop, daemon=True, name="MeshtasticSender") + self.sender_thread = threading.Thread( + target=self._meshtastic_sender_loop, + daemon=True, + name="MeshtasticSender", + ) self.sender_thread.start() def stop(self): self.logger.info("Stopping Meshtastic handler...") try: pub.unsubscribe(self._on_meshtastic_receive, "meshtastic.receive") - except Exception: pass + except Exception: + pass with self._lock: if self.interface: try: self.interface.close() except Exception as e: - self.logger.error(f"Error closing Meshtastic interface: {e}") + self.logger.error( + "Error closing Meshtastic interface: %s", + e, + ) finally: self.interface = None self.my_node_id = None self._is_connected.clear() self.metrics.record_meshtastic_disconnection() - self.health_monitor.update_component("meshtastic", HealthStatus.UNHEALTHY, "Disconnected") + self.health_monitor.update_component( + "meshtastic", HealthStatus.UNHEALTHY, "Disconnected" + ) if self.sender_thread and self.sender_thread.is_alive(): - self.sender_thread.join(timeout=5) + self.sender_thread.join(timeout=5) self.logger.info("Meshtastic handler stopped.") def _on_meshtastic_receive(self, packet: Dict[str, Any], interface: Any): try: - if not packet or 'from' not in packet: + if not packet or "from" not in packet: return - sender_id_num = packet.get('from') - sender_id_hex = f"!{sender_id_num:x}" if isinstance(sender_id_num, int) else "UNKNOWN" - portnum = packet.get('decoded', {}).get('portnum', 'UNKNOWN') - payload_bytes = packet.get('decoded', {}).get('payload') - + sender_id_num = packet.get("from") + sender_id_hex = ( + f"!{sender_id_num:x}" + if isinstance(sender_id_num, int) + else "UNKNOWN" + ) + portnum = packet.get("decoded", {}).get("portnum", "UNKNOWN") + payload_bytes = packet.get("decoded", {}).get("payload") + # Loopback Prevention - bridge_id_lower = self.config.bridge_node_id.lower() if self.config.bridge_node_id else None - my_node_id_lower = self.my_node_id.lower() if self.my_node_id else None + bridge_id_lower = ( + self.config.bridge_node_id.lower() + if self.config.bridge_node_id + else None + ) + my_node_id_lower = ( + self.my_node_id.lower() if self.my_node_id else None + ) sender_id_lower = sender_id_hex.lower() - if (bridge_id_lower and sender_id_lower == bridge_id_lower) or \ - (my_node_id_lower and sender_id_lower == my_node_id_lower): - return + if (bridge_id_lower and sender_id_lower == bridge_id_lower) or ( + my_node_id_lower and sender_id_lower == my_node_id_lower + ): + return - portnum_str = str(portnum) if portnum else 'UNKNOWN' + portnum_str = str(portnum) if portnum else "UNKNOWN" translated_payload = None message_type = "meshtastic_message" - if portnum_str == 'TEXT_MESSAGE_APP' and payload_bytes: + if portnum_str == "TEXT_MESSAGE_APP" and payload_bytes: try: - text_payload = payload_bytes.decode('utf-8', errors='replace') - self.logger.info(f"Meshtastic RX <{portnum_str}> From {sender_id_hex}: '{text_payload}'") + text_payload = payload_bytes.decode( + "utf-8", errors="replace" + ) + self.logger.info( + "Meshtastic RX <%s> From %s: %r", + portnum_str, + sender_id_hex, + text_payload, + ) translated_payload = text_payload except UnicodeDecodeError: translated_payload = repr(payload_bytes) - elif portnum_str == 'POSITION_APP': - pos_data = packet.get('decoded', {}).get('position', {}) - translated_payload = { - "latitude": pos_data.get('latitude'), - "longitude": pos_data.get('longitude'), - "altitude": pos_data.get('altitude'), - "timestamp_gps": pos_data.get('time') - } - message_type = "meshtastic_position" + elif portnum_str == "POSITION_APP": + pos_data = packet.get("decoded", {}).get("position", {}) + translated_payload = { + "latitude": pos_data.get("latitude"), + "longitude": pos_data.get("longitude"), + "altitude": pos_data.get("altitude"), + "timestamp_gps": pos_data.get("time"), + } + message_type = "meshtastic_position" else: return @@ -177,62 +253,97 @@ class MeshtasticHandler: "portnum": portnum_str, "payload": translated_payload, "timestamp_rx": time.time(), - "rx_rssi": packet.get('rxRssi'), - "rx_snr": packet.get('rxSnr'), + "rx_rssi": packet.get("rxRssi"), + "rx_snr": packet.get("rxSnr"), } try: self.to_external_queue.put_nowait(external_message) - payload_size = len(str(translated_payload).encode('utf-8')) if translated_payload else 0 + payload_size = ( + len(str(translated_payload).encode("utf-8")) + if translated_payload + else 0 + ) self.metrics.record_meshtastic_received(payload_size) - self.logger.debug(f"Queued message from {sender_id_hex} for external handler.") + self.logger.debug( + "Queued message from %s for external handler.", + sender_id_hex, + ) except Full: self.logger.warning("External handler send queue is full.") self.metrics.record_dropped("meshtastic") - + except Exception as e: + self.logger.error( + "Error processing incoming Meshtastic packet: %s", + e, + exc_info=True, + ) except Exception as e: - self.logger.error(f"Error in _on_meshtastic_receive callback: {e}", exc_info=True) + self.logger.error( + "Unhandled error processing Meshtastic packet: %s", + e, + exc_info=True, + ) def _meshtastic_sender_loop(self): self.logger.info("Meshtastic sender loop started.") while not self.shutdown_event.is_set(): try: - item: Optional[Dict[str, Any]] = self.to_meshtastic_queue.get(timeout=1) - if not item: continue + item: Optional[Dict[str, Any]] = self.to_meshtastic_queue.get( + timeout=1 + ) + if not item: + continue if not self._is_connected.is_set(): - self.to_meshtastic_queue.task_done() - time.sleep(self.RECONNECT_DELAY_S / 2) - continue + self.to_meshtastic_queue.task_done() + time.sleep(self.RECONNECT_DELAY_S / 2) + continue # Validate and sanitize message - is_valid, error_msg = self.validator.validate_meshtastic_message(item) + is_valid, error_msg = ( + self.validator.validate_meshtastic_message(item) + ) if not is_valid: - self.logger.warning(f"Invalid message rejected: {error_msg}") + self.logger.warning( + "Invalid message rejected: %s", + error_msg, + ) self.metrics.record_error("meshtastic") self.to_meshtastic_queue.task_done() continue # Check rate limit if not self.rate_limiter.check_rate_limit("meshtastic_sender"): - self.logger.warning("Rate limit exceeded for Meshtastic sender") - self.metrics.record_rate_limit_violation("meshtastic_sender") + self.logger.warning( + "Rate limit exceeded for Meshtastic sender" + ) + self.metrics.record_rate_limit_violation( + "meshtastic_sender" + ) self.to_meshtastic_queue.task_done() continue # Sanitize message item = self.validator.sanitize_meshtastic_message(item) - destination = item.get('destination') - text_to_send = item.get('text') - channel_index = item.get('channel_index', 0) - want_ack = item.get('want_ack', False) + destination = item.get("destination") + text_to_send = item.get("text") + channel_index = item.get("channel_index", 0) + want_ack = item.get("want_ack", False) if destination and isinstance(text_to_send, str): - log_payload = (text_to_send[:100] + '...') if len(text_to_send) > 100 else text_to_send - self.logger.info(f"Meshtastic TX -> Dest: {destination}, Payload: '{log_payload}'") + log_payload = ( + (text_to_send[:100] + "...") + if len(text_to_send) > 100 + else text_to_send + ) + self.logger.info( + "Meshtastic TX -> Dest: %s, Payload: '%s'", + destination, + log_payload, + ) - send_success = False with self._lock: if self.interface and self._is_connected.is_set(): try: @@ -240,26 +351,37 @@ class MeshtasticHandler: text=text_to_send, destinationId=destination, channelIndex=channel_index, - wantAck=want_ack + wantAck=want_ack, + ) + payload_size = len( + text_to_send.encode("utf-8") + ) + self.metrics.record_meshtastic_sent( + payload_size ) - send_success = True - payload_size = len(text_to_send.encode('utf-8')) - self.metrics.record_meshtastic_sent(payload_size) except Exception as e: - self.logger.error(f"Error sending Meshtastic message: {e}") - if "Not connected" in str(e): self._is_connected.clear() + self.logger.error( + "Error sending Meshtastic message: %s", + e, + ) + if "Not connected" in str(e): + self._is_connected.clear() else: - self._is_connected.clear() + self._is_connected.clear() self.to_meshtastic_queue.task_done() else: self.to_meshtastic_queue.task_done() except Empty: - if not self._is_connected.is_set(): time.sleep(self.RECONNECT_DELAY_S) + if not self._is_connected.is_set(): + time.sleep(self.RECONNECT_DELAY_S) continue except Exception as e: - self.logger.error(f"Critical error in meshtastic_sender_loop: {e}", exc_info=True) + self.logger.error( + f"Critical error in meshtastic_sender_loop: {e}", + exc_info=True, + ) self._is_connected.clear() time.sleep(5) diff --git a/ammb/message_logger.py b/ammb/message_logger.py index 2a18324..c4736d3 100644 --- a/ammb/message_logger.py +++ b/ammb/message_logger.py @@ -3,19 +3,24 @@ Message persistence and logging for the bridge. """ -import logging import json +import logging import threading from datetime import datetime from pathlib import Path -from typing import Dict, Any, Optional -from queue import Queue, Empty +from queue import Empty, Queue +from typing import Any, Dict, Optional class MessageLogger: """Logs messages to file for persistence and analysis.""" - def __init__(self, log_file: Optional[str] = None, max_file_size_mb: int = 10, max_backups: int = 5): + def __init__( + self, + log_file: Optional[str] = None, + max_file_size_mb: int = 10, + max_backups: int = 5, + ): self.logger = logging.getLogger(__name__) self.log_file = log_file self.max_file_size = max_file_size_mb * 1024 * 1024 # Convert to bytes @@ -27,7 +32,8 @@ class MessageLogger: self._shutdown_event = threading.Event() if self._enabled: - self._log_path = Path(log_file) + assert self.log_file is not None + self._log_path = Path(self.log_file) self._log_path.parent.mkdir(parents=True, exist_ok=True) self._start_worker() @@ -36,9 +42,13 @@ class MessageLogger: if self._worker_thread and self._worker_thread.is_alive(): return - self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="MessageLogger") + self._worker_thread = threading.Thread( + target=self._worker_loop, daemon=True, name="MessageLogger" + ) self._worker_thread.start() - self.logger.info(f"Message logger started, logging to: {self.log_file}") + self.logger.info( + f"Message logger started, logging to: {self.log_file}" + ) def _worker_loop(self): """Background loop for writing messages.""" @@ -51,7 +61,9 @@ class MessageLogger: except Empty: continue except Exception as e: - self.logger.error(f"Error in message logger worker: {e}", exc_info=True) + self.logger.error( + f"Error in message logger worker: {e}", exc_info=True + ) def _write_message(self, message: Dict[str, Any]): """Write a message to the log file.""" @@ -60,20 +72,22 @@ class MessageLogger: try: # Add timestamp if not present - if 'timestamp' not in message: - message['timestamp'] = datetime.now().isoformat() + if "timestamp" not in message: + message["timestamp"] = datetime.now().isoformat() # Rotate log file if needed self._rotate_if_needed() # Write message as JSON line with self._lock: - with open(self._log_path, 'a', encoding='utf-8') as f: + with open(self._log_path, "a", encoding="utf-8") as f: json.dump(message, f, ensure_ascii=False) - f.write('\n') + f.write("\n") except Exception as e: - self.logger.error(f"Error writing message to log file: {e}", exc_info=True) + self.logger.error( + f"Error writing message to log file: {e}", exc_info=True + ) def _rotate_if_needed(self): """Rotate log file if it exceeds max size.""" @@ -86,16 +100,18 @@ class MessageLogger: try: # Rotate existing backups for i in range(self.max_backups - 1, 0, -1): - old_file = self._log_path.with_suffix(f'.{i}.log') - new_file = self._log_path.with_suffix(f'.{i + 1}.log') + old_file = self._log_path.with_suffix(f".{i}.log") + new_file = self._log_path.with_suffix(f".{i + 1}.log") if old_file.exists(): old_file.rename(new_file) # Move current log to .1.log - backup_file = self._log_path.with_suffix('.1.log') + backup_file = self._log_path.with_suffix(".1.log") self._log_path.rename(backup_file) - self.logger.info(f"Rotated log file: {self._log_path} -> {backup_file}") + self.logger.info( + f"Rotated log file: {self._log_path} -> {backup_file}" + ) except Exception as e: self.logger.error(f"Error rotating log file: {e}", exc_info=True) @@ -107,8 +123,8 @@ class MessageLogger: log_entry = { **message, - 'direction': direction, - 'logged_at': datetime.now().isoformat(), + "direction": direction, + "logged_at": datetime.now().isoformat(), } try: @@ -134,4 +150,3 @@ class MessageLogger: break self.logger.info("Message logger stopped") - diff --git a/ammb/metrics.py b/ammb/metrics.py index 4f4a368..119fc0d 100644 --- a/ammb/metrics.py +++ b/ammb/metrics.py @@ -5,16 +5,16 @@ Metrics and statistics collection for the bridge. import logging import threading -import time from collections import defaultdict from dataclasses import dataclass, field +from datetime import datetime from typing import Dict, Optional -from datetime import datetime, timedelta @dataclass class MessageStats: """Statistics for message processing.""" + total_received: int = 0 total_sent: int = 0 total_dropped: int = 0 @@ -57,8 +57,14 @@ class MessageStats: "total_sent": self.total_sent, "total_dropped": self.total_dropped, "total_errors": self.total_errors, - "last_received": self.last_received.isoformat() if self.last_received else None, - "last_sent": self.last_sent.isoformat() if self.last_sent else None, + "last_received": ( + self.last_received.isoformat() + if self.last_received + else None + ), + "last_sent": ( + self.last_sent.isoformat() if self.last_sent else None + ), "bytes_received": self.bytes_received, "bytes_sent": self.bytes_sent, } @@ -67,6 +73,7 @@ class MessageStats: @dataclass class ConnectionStats: """Statistics for connection health.""" + connection_count: int = 0 disconnection_count: int = 0 last_connected: Optional[datetime] = None @@ -89,7 +96,9 @@ class ConnectionStats: self.disconnection_count += 1 self.last_disconnected = datetime.now() if self.current_uptime_start: - uptime = (datetime.now() - self.current_uptime_start).total_seconds() + uptime = ( + datetime.now() - self.current_uptime_start + ).total_seconds() self.total_uptime_seconds += uptime self.current_uptime_start = None @@ -97,7 +106,9 @@ class ConnectionStats: """Get current uptime in seconds.""" with self._lock: if self.current_uptime_start: - return (datetime.now() - self.current_uptime_start).total_seconds() + return ( + datetime.now() - self.current_uptime_start + ).total_seconds() return 0.0 def to_dict(self) -> Dict: @@ -106,9 +117,18 @@ class ConnectionStats: return { "connection_count": self.connection_count, "disconnection_count": self.disconnection_count, - "last_connected": self.last_connected.isoformat() if self.last_connected else None, - "last_disconnected": self.last_disconnected.isoformat() if self.last_disconnected else None, - "total_uptime_seconds": self.total_uptime_seconds + self.get_current_uptime(), + "last_connected": ( + self.last_connected.isoformat() + if self.last_connected + else None + ), + "last_disconnected": ( + self.last_disconnected.isoformat() + if self.last_disconnected + else None + ), + "total_uptime_seconds": self.total_uptime_seconds + + self.get_current_uptime(), "current_uptime_seconds": self.get_current_uptime(), } @@ -224,4 +244,3 @@ def get_metrics() -> MetricsCollector: if _metrics is None: _metrics = MetricsCollector() return _metrics - diff --git a/ammb/mqtt_handler.py b/ammb/mqtt_handler.py index 17c9dfd..b0ffd9b 100644 --- a/ammb/mqtt_handler.py +++ b/ammb/mqtt_handler.py @@ -3,29 +3,42 @@ Handles interactions with an MQTT broker as the external network interface. """ +import json import logging import threading import time -import json -from queue import Queue, Empty, Full -from typing import Optional, Dict, Any +from queue import Empty, Full, Queue +from typing import Any, Dict, Optional # External dependencies import paho.mqtt.client as paho_mqtt -import paho.mqtt.enums as paho_enums # For MQTTv5 properties if used +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + import paho.mqtt.enums as paho_enums # type: ignore[import-not-found] +else: + paho_enums: Any = None # Project dependencies from .config_handler import BridgeConfig +from .health import HealthStatus, get_health_monitor from .metrics import get_metrics -from .health import get_health_monitor, HealthStatus -from .validator import MessageValidator from .rate_limiter import RateLimiter +from .validator import MessageValidator + class MQTTHandler: """Manages connection and communication with an MQTT broker.""" + RECONNECT_DELAY_S = 10 - def __init__(self, config: BridgeConfig, to_meshtastic_queue: Queue, from_meshtastic_queue: Queue, shutdown_event: threading.Event): + def __init__( + self, + config: BridgeConfig, + to_meshtastic_queue: Queue, + from_meshtastic_queue: Queue, + shutdown_event: threading.Event, + ): self.logger = logging.getLogger(__name__) self.config = config self.to_meshtastic_queue = to_meshtastic_queue @@ -36,15 +49,28 @@ class MQTTHandler: self.publisher_thread: Optional[threading.Thread] = None self._mqtt_connected = threading.Event() self._lock = threading.Lock() - + # Initialize metrics, health, validator, and rate limiter self.metrics = get_metrics() self.health_monitor = get_health_monitor() self.validator = MessageValidator() self.rate_limiter = RateLimiter(max_messages=60, time_window=60.0) - if not all([config.mqtt_broker, config.mqtt_port is not None, config.mqtt_topic_in, config.mqtt_topic_out, config.mqtt_client_id, config.mqtt_qos is not None, config.mqtt_retain_out is not None]): - raise ValueError("MQTT transport selected, but required MQTT configuration options seem missing.") + if not all( + [ + config.mqtt_broker, + config.mqtt_port is not None, + config.mqtt_topic_in, + config.mqtt_topic_out, + config.mqtt_client_id, + config.mqtt_qos is not None, + config.mqtt_retain_out is not None, + ] + ): + raise ValueError( + "MQTT transport selected, but required MQTT " + "configuration options seem missing." + ) self.logger.info("MQTT Handler Initialized.") @@ -55,20 +81,28 @@ class MQTTHandler: return True if self.client: - try: - self.client.reconnect() - return True - except Exception: - try: self.client.loop_stop(force=True) - except: pass - self.client = None - self._mqtt_connected.clear() + try: + self.client.reconnect() + return True + except Exception: + try: + self.client.loop_stop(force=True) + except Exception: + pass + self.client = None + self._mqtt_connected.clear() try: - self.client = paho_mqtt.Client(client_id=self.config.mqtt_client_id, - protocol=paho_mqtt.MQTTv311, - clean_session=True) - self.logger.info(f"Attempting connection to MQTT broker {self.config.mqtt_broker}:{self.config.mqtt_port}...") + self.client = paho_mqtt.Client( + client_id=self.config.mqtt_client_id, + protocol=paho_mqtt.MQTTv311, + clean_session=True, + ) + self.logger.info( + "Attempting connection to MQTT broker %s:%s...", + self.config.mqtt_broker, + self.config.mqtt_port, + ) self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect @@ -76,30 +110,59 @@ class MQTTHandler: self.client.on_log = self._on_log # TLS/SSL support - if hasattr(self.config, 'mqtt_tls_enabled') and self.config.mqtt_tls_enabled: + if ( + hasattr(self.config, "mqtt_tls_enabled") + and self.config.mqtt_tls_enabled + ): import ssl + context = ssl.create_default_context() - if hasattr(self.config, 'mqtt_tls_ca_certs') and self.config.mqtt_tls_ca_certs: - context.load_verify_locations(self.config.mqtt_tls_ca_certs) - if hasattr(self.config, 'mqtt_tls_insecure') and self.config.mqtt_tls_insecure: + if ( + hasattr(self.config, "mqtt_tls_ca_certs") + and self.config.mqtt_tls_ca_certs + ): + context.load_verify_locations( + self.config.mqtt_tls_ca_certs + ) + if ( + hasattr(self.config, "mqtt_tls_insecure") + and self.config.mqtt_tls_insecure + ): context.check_hostname = False context.verify_mode = ssl.CERT_NONE self.client.tls_set_context(context) self.logger.info("MQTT TLS/SSL enabled") if self.config.mqtt_username: - self.client.username_pw_set(self.config.mqtt_username, self.config.mqtt_password) + self.client.username_pw_set( + self.config.mqtt_username, self.config.mqtt_password + ) - self.client.connect_async(self.config.mqtt_broker, self.config.mqtt_port, keepalive=60) + # Ensure configuration values present for mypy + assert ( + self.config.mqtt_broker is not None + and self.config.mqtt_port is not None + ) + self.client.connect_async( + self.config.mqtt_broker, + self.config.mqtt_port, + keepalive=60, + ) self.client.loop_start() self.logger.info("MQTT client network loop started.") return True except Exception as e: - self.logger.error(f"Error initiating MQTT connection or starting loop: {e}", exc_info=True) - if self.client: - try: self.client.loop_stop(force=True) - except: pass + self.logger.error( + "Error initiating MQTT connection or starting loop: %s", + e, + exc_info=True, + ) + if self.client: + try: + self.client.loop_stop(force=True) + except Exception: + pass self.client = None self._mqtt_connected.clear() return False @@ -108,24 +171,32 @@ class MQTTHandler: if self.publisher_thread and self.publisher_thread.is_alive(): return self.logger.info("Starting MQTT publisher thread...") - self.publisher_thread = threading.Thread(target=self._mqtt_publisher_loop, daemon=True, name="MQTTPublisher") + self.publisher_thread = threading.Thread( + target=self._mqtt_publisher_loop, daemon=True, name="MQTTPublisher" + ) self.publisher_thread.start() def stop(self): self.logger.info("Stopping MQTT handler...") if self.publisher_thread and self.publisher_thread.is_alive(): - self.publisher_thread.join(timeout=5) + self.publisher_thread.join(timeout=5) with self._lock: if self.client: - try: self.client.loop_stop(force=True) - except Exception: pass - try: self.client.disconnect() - except Exception: pass + try: + self.client.loop_stop(force=True) + except Exception: + pass + try: + self.client.disconnect() + except Exception: + pass self.client = None self._mqtt_connected.clear() self.metrics.record_external_disconnection() - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, "Stopped") + self.health_monitor.update_component( + "external", HealthStatus.UNHEALTHY, "Stopped" + ) self.logger.info("MQTT handler stopped.") # --- MQTT Callbacks (Executed by Paho's Network Thread) --- @@ -133,60 +204,90 @@ class MQTTHandler: def _on_connect(self, client, userdata, flags, rc, properties=None): connack_str = paho_mqtt.connack_string(rc) if rc == 0: - self.logger.info(f"Successfully connected to MQTT broker: {self.config.mqtt_broker} ({connack_str})") + self.logger.info( + "Successfully connected to MQTT broker: %s (%s)", + self.config.mqtt_broker, + connack_str, + ) self._mqtt_connected.set() self.metrics.record_external_connection() - self.health_monitor.update_component("external", HealthStatus.HEALTHY, "MQTT connected") + self.health_monitor.update_component( + "external", HealthStatus.HEALTHY, "MQTT connected" + ) try: - client.subscribe(self.config.mqtt_topic_in, qos=self.config.mqtt_qos) + client.subscribe( + self.config.mqtt_topic_in, qos=self.config.mqtt_qos + ) except Exception as e: - self.logger.error(f"Error during MQTT subscription: {e}") + self.logger.error("Error during MQTT subscription: %s", e) else: - self.logger.error(f"MQTT connection failed. Result code: {rc} - {connack_str}") + self.logger.error( + "MQTT connection failed. Result code: %s - %s", + rc, + connack_str, + ) self._mqtt_connected.clear() self.metrics.record_external_disconnection() - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, f"Connection failed: {connack_str}") + self.health_monitor.update_component( + "external", + HealthStatus.UNHEALTHY, + "Connection failed: %s" % connack_str, + ) def _on_disconnect(self, client, userdata, rc, properties=None): self._mqtt_connected.clear() self.metrics.record_external_disconnection() if rc != 0: self.logger.warning(f"Unexpected MQTT disconnection. RC: {rc}") - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, f"Unexpected disconnect: RC {rc}") + self.health_monitor.update_component( + "external", + HealthStatus.UNHEALTHY, + f"Unexpected disconnect: RC {rc}", + ) else: - self.health_monitor.update_component("external", HealthStatus.UNHEALTHY, "Disconnected") + self.health_monitor.update_component( + "external", HealthStatus.UNHEALTHY, "Disconnected" + ) def _on_log(self, client, userdata, level, buf): """MQTT client logging callback.""" # Only log at DEBUG level to avoid noise if level <= paho_mqtt.MQTT_LOG_DEBUG: - self.logger.debug(f"MQTT: {buf}") + self.logger.debug("MQTT: %s", buf) def _on_message(self, client, userdata, msg: paho_mqtt.MQTTMessage): try: payload_bytes = msg.payload - if not payload_bytes: return + if not payload_bytes: + return try: - payload_str = payload_bytes.decode('utf-8', errors='replace') + payload_str = payload_bytes.decode("utf-8", errors="replace") mqtt_data = json.loads(payload_str) - + # Validate message - is_valid, error_msg = self.validator.validate_external_message(mqtt_data) + is_valid, error_msg = self.validator.validate_external_message( + mqtt_data + ) if not is_valid: - self.logger.warning(f"Invalid MQTT message rejected: {error_msg}") + self.logger.warning( + "Invalid MQTT message rejected: %s", + error_msg, + ) self.metrics.record_error("external") return # Check rate limit if not self.rate_limiter.check_rate_limit("mqtt_receiver"): - self.logger.warning("Rate limit exceeded for MQTT receiver") + self.logger.warning( + "Rate limit exceeded for MQTT receiver" + ) self.metrics.record_rate_limit_violation("mqtt_receiver") return # Sanitize message mqtt_data = self.validator.sanitize_external_message(mqtt_data) - + dest_meshtastic_id = mqtt_data.get("destination_meshtastic_id") payload = mqtt_data.get("payload") payload_json = mqtt_data.get("payload_json") @@ -197,23 +298,36 @@ class MQTTHandler: if isinstance(payload, str): text_payload_str = payload elif payload_json is not None: - try: text_payload_str = json.dumps(payload_json) - except Exception: pass + try: + text_payload_str = json.dumps(payload_json) + except Exception: + pass elif payload is not None: - text_payload_str = str(payload) + text_payload_str = str(payload) if dest_meshtastic_id and text_payload_str is not None: meshtastic_msg = { "destination": dest_meshtastic_id, "text": text_payload_str, - "channel_index": int(channel_index) if str(channel_index).isdigit() else 0, + "channel_index": ( + int(channel_index) + if str(channel_index).isdigit() + else 0 + ), "want_ack": bool(want_ack), } try: self.to_meshtastic_queue.put_nowait(meshtastic_msg) - payload_size = len(text_payload_str.encode('utf-8')) if text_payload_str else 0 + payload_size = ( + len(text_payload_str.encode("utf-8")) + if text_payload_str + else 0 + ) self.metrics.record_external_received(payload_size) - self.logger.info(f"Queued MQTT message for {dest_meshtastic_id}") + self.logger.info( + "Queued MQTT message for %s", + dest_meshtastic_id, + ) except Full: self.logger.warning("Meshtastic send queue full.") self.metrics.record_dropped("external") @@ -231,40 +345,57 @@ class MQTTHandler: continue try: - item: Optional[Dict[str, Any]] = self.to_mqtt_queue.get(timeout=1) - if not item: continue + item: Optional[Dict[str, Any]] = self.to_mqtt_queue.get( + timeout=1 + ) + if not item: + continue try: payload_str = json.dumps(item) topic = self.config.mqtt_topic_out - + if not topic: - self.logger.error("MQTT_TOPIC_OUT is not configured. Cannot publish.") - self.to_mqtt_queue.task_done() - continue - + self.logger.error( + "MQTT_TOPIC_OUT is not configured. Cannot publish." + ) + self.to_mqtt_queue.task_done() + continue + qos = self.config.mqtt_qos if qos not in [0, 1, 2]: - self.logger.error(f"Invalid MQTT_QOS ({qos}) for publishing. Using QoS 0.") - qos = 0 + self.logger.error( + "Invalid MQTT_QOS (%s); using QoS 0.", + qos, + ) + qos = 0 - with self._lock: - if self.client and self.client.is_connected(): - self.client.publish(topic, payload=payload_str, qos=qos, retain=self.config.mqtt_retain_out) - self.metrics.record_external_sent(len(payload_str.encode('utf-8'))) - else: - self._mqtt_connected.clear() - + if self.client and self.client.is_connected(): + self.client.publish( + topic, + payload=payload_str, + qos=qos, + retain=self.config.mqtt_retain_out, + ) + self.metrics.record_external_sent( + len(payload_str.encode("utf-8")) + ) + else: + self._mqtt_connected.clear() + self.to_mqtt_queue.task_done() except Exception as e: - self.logger.error(f"Error during MQTT publish: {e}") - self.to_mqtt_queue.task_done() + self.logger.error(f"Error during MQTT publish: {e}") + self.to_mqtt_queue.task_done() except Empty: continue except Exception as e: - self.logger.error(f"Critical error in mqtt_publisher_loop: {e}", exc_info=True) + self.logger.error( + f"Critical error in mqtt_publisher_loop: {e}", + exc_info=True, + ) time.sleep(5) self.logger.info("MQTT publisher loop stopped.") diff --git a/ammb/protocol.py b/ammb/protocol.py index e640883..5acbe16 100644 --- a/ammb/protocol.py +++ b/ammb/protocol.py @@ -1,55 +1,70 @@ # ammb/protocol.py """ -Defines handlers for different **Serial** communication protocols. +Defines handlers for different **Serial** communication +protocols. -Allows the bridge (specifically the MeshcoreHandler) to encode/decode -messages based on the serial protocol specified in the configuration. +Allows the bridge (specifically the MeshcoreHandler) to +encode/decode messages based on the serial protocol specified +in the configuration. Includes: - JsonNewlineProtocol: For text-based JSON (default). - RawSerialProtocol: For binary/companion modes (fallback). """ +import binascii import json import logging -import binascii from abc import ABC, abstractmethod -from typing import Dict, Optional, Any +from typing import Any, Dict, Optional + # --- Base Class --- class MeshcoreProtocolHandler(ABC): """ - Abstract base class for handling **Serial** protocols for the external device. + Abstract base class for **Serial** protocol handling. + + Subclasses implement read, encode, and decode methods. """ + def __init__(self): - # Get logger named after the specific subclass implementing the protocol - self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") + # Logger named after the subclass + name = __name__ + "." + self.__class__.__name__ + self.logger = logging.getLogger(name) self.logger.debug("Serial protocol handler initialized.") @abstractmethod def read(self, serial_port) -> Optional[bytes]: """ - Reads data from the serial port according to the protocol's needs. - Returns bytes read, or None. + Reads data from the serial port. + + Implementations should return bytes read, or None. """ pass @abstractmethod def encode(self, data: Dict[str, Any]) -> Optional[bytes]: - """Encodes a dictionary payload into bytes suitable for sending over serial.""" + """ + Encodes a dictionary payload into bytes for sending over serial. + """ pass @abstractmethod def decode(self, raw_data: bytes) -> Optional[Dict[str, Any]]: - """Decodes bytes received from serial into a dictionary.""" + """ + Decodes bytes received from serial into a dictionary. + """ pass + # --- Concrete Implementations --- + class JsonNewlineProtocol(MeshcoreProtocolHandler): """ Handles newline-terminated JSON strings encoded in UTF-8 over **Serial**. """ + def read(self, serial_port) -> Optional[bytes]: """Reads a single line ending in \\n.""" if serial_port.in_waiting > 0: @@ -58,32 +73,51 @@ class JsonNewlineProtocol(MeshcoreProtocolHandler): def encode(self, data: Dict[str, Any]) -> Optional[bytes]: try: - encoded_message = json.dumps(data).encode('utf-8') + b'\n' - self.logger.debug(f"Encoded: {encoded_message!r}") + encoded_message = json.dumps(data).encode("utf-8") + b"\n" + self.logger.debug("Encoded: %r", encoded_message) return encoded_message except (TypeError, ValueError) as e: - self.logger.error(f"JSON Encode Error: {e} - Data: {data}", exc_info=True) + self.logger.error( + "JSON Encode Error: %s - Data: %s", + e, + data, + exc_info=True, + ) return None except Exception as e: - self.logger.error(f"Unexpected serial encoding error: {e}", exc_info=True) + self.logger.error( + "Unexpected serial encoding error: %s", + e, + exc_info=True, + ) return None def decode(self, raw_data: bytes) -> Optional[Dict[str, Any]]: try: - decoded_str = raw_data.decode('utf-8', errors='replace').strip() + decoded_str = raw_data.decode("utf-8", errors="replace").strip() if not decoded_str: return None - + decoded_data = json.loads(decoded_str) if not isinstance(decoded_data, dict): - self.logger.warning(f"Decoded JSON is not a dictionary: {decoded_str!r}") - return None + self.logger.warning( + "Decoded JSON is not a dictionary: %r", + decoded_str, + ) + return None return decoded_data except json.JSONDecodeError: - self.logger.warning(f"Received non-JSON data or incomplete JSON line: {raw_data!r}") + self.logger.warning( + "Received non-JSON or incomplete JSON line: %r", + raw_data, + ) return None except Exception as e: - self.logger.error(f"Error decoding serial data: {e} - Raw: {raw_data!r}") + self.logger.error( + "Error decoding serial data: %s - Raw: %r", + e, + raw_data, + ) return None @@ -91,6 +125,7 @@ class RawSerialProtocol(MeshcoreProtocolHandler): """ Handles RAW Binary data from the serial port. Use for 'Companion USB Mode'. """ + def read(self, serial_port) -> Optional[bytes]: """Reads all currently available bytes from the buffer.""" if serial_port.in_waiting > 0: @@ -100,48 +135,59 @@ class RawSerialProtocol(MeshcoreProtocolHandler): def encode(self, data: Dict[str, Any]) -> Optional[bytes]: """Encodes outgoing data.""" try: - payload = data.get('payload', '') + payload = data.get("payload", "") if isinstance(payload, str): - return payload.encode('utf-8') + return payload.encode("utf-8") elif isinstance(payload, (bytes, bytearray)): return payload return None except Exception as e: - self.logger.error(f"Error encoding raw data: {e}") + self.logger.error("Error encoding raw data: %s", e) return None def decode(self, raw_data: bytes) -> Optional[Dict[str, Any]]: """Wraps received raw bytes into a bridge-compatible dictionary.""" if not raw_data: return None - + try: - hex_str = binascii.hexlify(raw_data).decode('ascii') + hex_str = binascii.hexlify(raw_data).decode("ascii") return { - "destination_meshtastic_id": "^all", - "payload": f"MC_BIN: {hex_str}", - "raw_binary": True + "destination_meshtastic_id": "^all", + "payload": "MC_BIN: " + hex_str, + "raw_binary": True, } except Exception as e: - self.logger.error(f"Error processing raw binary: {e}") + self.logger.error("Error processing raw binary: %s", e) return None + # --- Factory Function --- _serial_protocol_handlers = { - 'json_newline': JsonNewlineProtocol, - 'raw_serial': RawSerialProtocol, + "json_newline": JsonNewlineProtocol, + "raw_serial": RawSerialProtocol, } + def get_serial_protocol_handler(protocol_name: str) -> MeshcoreProtocolHandler: - """Factory function to get an instance of the appropriate **Serial** protocol handler.""" + """Factory function to get an instance of the appropriate + **Serial** protocol handler.""" logger = logging.getLogger(__name__) protocol_name_lower = protocol_name.lower() handler_class = _serial_protocol_handlers.get(protocol_name_lower) if handler_class: - logger.info(f"Using Serial protocol handler: {handler_class.__name__}") + logger.info( + "Using Serial protocol handler: %s", + handler_class.__name__, + ) return handler_class() else: - logger.error(f"Unsupported Serial protocol: '{protocol_name}'. Available: {list(_serial_protocol_handlers.keys())}") + logger.error( + "Unsupported Serial protocol: %s. " + "Available: %s", + protocol_name, + list(_serial_protocol_handlers.keys()), + ) raise ValueError(f"Unsupported Serial protocol: {protocol_name}") diff --git a/ammb/rate_limiter.py b/ammb/rate_limiter.py index 132484f..e1c7542 100644 --- a/ammb/rate_limiter.py +++ b/ammb/rate_limiter.py @@ -19,7 +19,8 @@ class RateLimiter: Args: max_messages: Maximum number of messages allowed - time_window: Time window in seconds (default: 60 seconds = 1 minute) + time_window: Time window in seconds (default: 60 seconds) + # (1 minute) """ self.logger = logging.getLogger(__name__) self.max_messages = max_messages @@ -36,18 +37,24 @@ class RateLimiter: True if message should be allowed, False if rate limit exceeded """ now = time.time() - + with self._lock: # Remove old message timestamps outside the time window - while self.message_times and (now - self.message_times[0]) > self.time_window: + while ( + self.message_times + and (now - self.message_times[0]) > self.time_window + ): self.message_times.popleft() # Check if we're at the limit if len(self.message_times) >= self.max_messages: self.violations += 1 self.logger.warning( - f"Rate limit exceeded for {source}: " - f"{len(self.message_times)}/{self.max_messages} messages in {self.time_window}s" + "Rate limit exceeded for %s: %s/%s messages in %ss", + source, + len(self.message_times), + self.max_messages, + self.time_window, ) return False @@ -60,10 +67,12 @@ class RateLimiter: with self._lock: if not self.message_times: return 0.0 - + now = time.time() # Count messages in the last minute - recent_count = sum(1 for t in self.message_times if (now - t) <= 60.0) + recent_count = sum( + 1 for t in self.message_times if (now - t) <= 60.0 + ) return recent_count def reset(self): @@ -97,7 +106,9 @@ class MultiSourceRateLimiter: """Check rate limit for a specific source.""" with self._lock: if source not in self.limiters: - self.limiters[source] = RateLimiter(self.max_messages, self.time_window) + self.limiters[source] = RateLimiter( + self.max_messages, self.time_window + ) return self.limiters[source].check_rate_limit(source) def get_stats(self) -> dict: @@ -117,4 +128,3 @@ class MultiSourceRateLimiter: else: for limiter in self.limiters.values(): limiter.reset() - diff --git a/ammb/utils.py b/ammb/utils.py index 6730bd9..107bd1a 100644 --- a/ammb/utils.py +++ b/ammb/utils.py @@ -5,8 +5,11 @@ Shared utilities for the AMMB application, primarily logging setup. import logging -LOG_FORMAT = '%(asctime)s - %(threadName)s - %(levelname)s - %(name)s - %(message)s' -DATE_FORMAT = '%Y-%m-%d %H:%M:%S' +LOG_FORMAT = ( + "%(asctime)s - %(threadName)s - %(levelname)s - %(name)s - %(message)s" +) +DATE_FORMAT = "%Y-%m-%d %H:%M:%S" + def setup_logging(log_level_str: str): """ @@ -15,15 +18,15 @@ def setup_logging(log_level_str: str): numeric_level = getattr(logging, log_level_str.upper(), None) if not isinstance(numeric_level, int): logging.warning( - f"Invalid log level specified: '{log_level_str}'. Defaulting to INFO." + "Invalid log level specified: '%s'. Defaulting to INFO.", + log_level_str, ) numeric_level = logging.INFO # Reconfigure the root logger - logging.basicConfig(level=numeric_level, - format=LOG_FORMAT, - datefmt=DATE_FORMAT, - force=True) + logging.basicConfig( + level=numeric_level, format=LOG_FORMAT, datefmt=DATE_FORMAT, force=True + ) # Adjust logging levels for noisy libraries logging.getLogger("pypubsub").setLevel(logging.WARNING) @@ -31,4 +34,8 @@ def setup_logging(log_level_str: str): logging.getLogger("meshtastic").setLevel(logging.INFO) logging.getLogger("paho").setLevel(logging.WARNING) - logging.info(f"Logging configured to level {logging.getLevelName(numeric_level)} ({numeric_level})") + logging.info( + "Logging configured to level %s (%s)", + logging.getLevelName(numeric_level), + numeric_level, + ) diff --git a/ammb/validator.py b/ammb/validator.py index e50e630..9135b77 100644 --- a/ammb/validator.py +++ b/ammb/validator.py @@ -5,21 +5,32 @@ Message validation and sanitization utilities. import logging import re -from typing import Dict, Any, Optional, Tuple -from datetime import datetime +from typing import Any, Dict, Optional, Tuple class MessageValidator: """Validates and sanitizes messages.""" - def __init__(self, max_message_length: int = 240, max_payload_length: int = 1000): + def __init__( + self, + max_message_length: int = 240, + max_payload_length: int = 1000, + ): self.logger = logging.getLogger(__name__) self.max_message_length = max_message_length self.max_payload_length = max_payload_length # Patterns for validation - self.meshtastic_id_pattern = re.compile(r'^!?[0-9a-fA-F]{8}$|^\^all$|^\^broadcast$') - self.safe_string_pattern = re.compile(r'^[\x20-\x7E\n\r\t]*$') # Printable ASCII + newlines/tabs + self.meshtastic_id_pattern = re.compile( + ( + r"^!?[0-9a-fA-F]{8}$|" + r"^\^all$|" + r"^\^broadcast$" + ) + ) + self.safe_string_pattern = re.compile( + r"^[\x20-\x7E\n\r\t]*$" + ) # Printable ASCII + newlines/tabs def validate_meshtastic_id(self, node_id: str) -> bool: """Validate a Meshtastic node ID format.""" @@ -27,48 +38,52 @@ class MessageValidator: return False return bool(self.meshtastic_id_pattern.match(node_id)) - def sanitize_string(self, text: str, max_length: Optional[int] = None) -> str: + def sanitize_string( + self, text: str, max_length: Optional[int] = None + ) -> str: """Sanitize a string for safe transmission.""" if not isinstance(text, str): text = str(text) - # Remove null bytes and control characters (except newline, tab, carriage return) - sanitized = ''.join( - c for c in text - if ord(c) >= 32 or c in '\n\r\t' - ) + # Remove nulls and control chars except newline/tab/CR + sanitized = "".join(c for c in text if ord(c) >= 32 or c in "\n\r\t") # Truncate if needed max_len = max_length or self.max_message_length if len(sanitized) > max_len: sanitized = sanitized[:max_len] - self.logger.warning(f"String truncated to {max_len} characters") + self.logger.warning("String truncated to %s characters", max_len) return sanitized - def validate_meshtastic_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + def validate_meshtastic_message( + self, message: Dict[str, Any] + ) -> Tuple[bool, Optional[str]]: """Validate a message destined for Meshtastic.""" if not isinstance(message, dict): return False, "Message must be a dictionary" - destination = message.get('destination') + destination = message.get("destination") if not destination: return False, "Missing 'destination' field" if not self.validate_meshtastic_id(destination): return False, f"Invalid destination format: {destination}" - text = message.get('text') + text = message.get("text") if not isinstance(text, str): return False, "Missing or invalid 'text' field" if len(text) > self.max_message_length: - return False, f"Message too long: {len(text)} > {self.max_message_length}" + msg = "Message too long: %s > %s" % ( + len(text), self.max_message_length + ) + return False, msg - channel_index = message.get('channel_index', 0) + channel_index = message.get("channel_index", 0) if not isinstance(channel_index, (int, str)): return False, "Invalid 'channel_index' type" - + try: channel_index = int(channel_index) if channel_index < 0 or channel_index > 7: @@ -78,72 +93,99 @@ class MessageValidator: return True, None - def validate_external_message(self, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]: + def validate_external_message( + self, message: Dict[str, Any] + ) -> Tuple[bool, Optional[str]]: """Validate a message from external system.""" if not isinstance(message, dict): return False, "Message must be a dictionary" # Check for required fields based on message type - payload = message.get('payload') - payload_json = message.get('payload_json') - + payload = message.get("payload") + payload_json = message.get("payload_json") + if payload is None and payload_json is None: return False, "Missing 'payload' or 'payload_json' field" - destination = message.get('destination_meshtastic_id') + destination = message.get("destination_meshtastic_id") if destination and not self.validate_meshtastic_id(destination): - return False, f"Invalid destination_meshtastic_id format: {destination}" + msg = ( + "Invalid destination_meshtastic_id format: %s" % (destination,) + ) + return False, msg # Validate payload length - if payload and isinstance(payload, str) and len(payload) > self.max_payload_length: - return False, f"Payload too long: {len(payload)} > {self.max_payload_length}" + if ( + payload + and isinstance(payload, str) + and len(payload) > self.max_payload_length + ): + msg = "Payload too long: %s > %s" % ( + len(payload), self.max_payload_length + ) + return False, msg return True, None - def sanitize_meshtastic_message(self, message: Dict[str, Any]) -> Dict[str, Any]: + def sanitize_meshtastic_message( + self, message: Dict[str, Any] + ) -> Dict[str, Any]: """Sanitize a message for Meshtastic.""" sanitized = message.copy() # Sanitize destination - if 'destination' in sanitized: - dest = str(sanitized['destination']).strip() + if "destination" in sanitized: + dest = str(sanitized["destination"]).strip() if not self.validate_meshtastic_id(dest): - self.logger.warning(f"Invalid destination, using broadcast: {dest}") - dest = '^all' - sanitized['destination'] = dest + self.logger.warning( + "Invalid destination, using broadcast: %s", + dest, + ) + dest = "^all" + sanitized["destination"] = dest # Sanitize text - if 'text' in sanitized: - sanitized['text'] = self.sanitize_string(sanitized['text'], self.max_message_length) + if "text" in sanitized: + sanitized["text"] = self.sanitize_string( + sanitized["text"], self.max_message_length + ) # Ensure channel_index is valid - if 'channel_index' in sanitized: + if "channel_index" in sanitized: try: - sanitized['channel_index'] = max(0, min(7, int(sanitized['channel_index']))) + sanitized["channel_index"] = max( + 0, min(7, int(sanitized["channel_index"])) + ) except (ValueError, TypeError): - sanitized['channel_index'] = 0 + sanitized["channel_index"] = 0 # Ensure want_ack is boolean - if 'want_ack' in sanitized: - sanitized['want_ack'] = bool(sanitized['want_ack']) + if "want_ack" in sanitized: + sanitized["want_ack"] = bool(sanitized["want_ack"]) return sanitized - def sanitize_external_message(self, message: Dict[str, Any]) -> Dict[str, Any]: + def sanitize_external_message( + self, message: Dict[str, Any] + ) -> Dict[str, Any]: """Sanitize a message from external system.""" sanitized = message.copy() # Sanitize destination if present - if 'destination_meshtastic_id' in sanitized: - dest = str(sanitized['destination_meshtastic_id']).strip() + if "destination_meshtastic_id" in sanitized: + dest = str(sanitized["destination_meshtastic_id"]).strip() if not self.validate_meshtastic_id(dest): - self.logger.warning(f"Invalid destination, using broadcast: {dest}") - dest = '^all' - sanitized['destination_meshtastic_id'] = dest + self.logger.warning( + "Invalid destination, using broadcast: %s", + dest, + ) + dest = "^all" + sanitized["destination_meshtastic_id"] = dest # Sanitize payload if it's a string - if 'payload' in sanitized and isinstance(sanitized['payload'], str): - sanitized['payload'] = self.sanitize_string(sanitized['payload'], self.max_payload_length) + if "payload" in sanitized and isinstance(sanitized["payload"], str): + sanitized["payload"] = self.sanitize_string( + sanitized["payload"], self.max_payload_length + ) return sanitized - diff --git a/examples/meshcore_simulator.py b/examples/meshcore_simulator.py index 4d2b8e3..507f2f7 100644 --- a/examples/meshcore_simulator.py +++ b/examples/meshcore_simulator.py @@ -3,11 +3,12 @@ Simple Meshcore Serial Device Simulator. """ -import serial -import time import json -import threading import random +import threading +import time + +import serial SIMULATOR_PORT = "/dev/ttyS1" BAUD_RATE = 9600 @@ -17,6 +18,7 @@ SENSOR_INTERVAL_S = 15 shutdown_event = threading.Event() serial_port = None + def serial_reader(): print("[Reader] Serial reader thread started.") while not shutdown_event.is_set(): @@ -26,18 +28,26 @@ def serial_reader(): line = serial_port.readline() if line: try: - decoded_line = line.decode('utf-8').strip() - print(f"\n[Reader] <<< Received: {decoded_line}") + decoded_line = line.decode("utf-8").strip() + print( + "[Reader] <<< Received: %s" % decoded_line + ) except UnicodeDecodeError: - print(f"\n[Reader] <<< Received non-UTF8 data: {line!r}") + print( + "[Reader] <<< Received non-UTF8 data: %r" + % (line,) + ) else: time.sleep(0.1) except Exception as e: - print(f"\n[Reader] Error: {e}") + print( + f"\n[Reader] Error: {e}" + ) time.sleep(1) else: time.sleep(1) + def periodic_sender(): print("[Periodic Sender] Started.") while not shutdown_event.is_set(): @@ -46,28 +56,35 @@ def periodic_sender(): sensor_value = round(20 + random.uniform(-2, 2), 2) message = { "destination_meshtastic_id": "!aabbccdd", - "payload_json": {"type": "temp", "val": sensor_value} + "payload_json": {"type": "temp", "val": sensor_value}, } - message_str = json.dumps(message) + '\n' - print(f"\n[Sender] >>> {message_str.strip()}") - serial_port.write(message_str.encode('utf-8')) + message_str = json.dumps(message) + "\n" + print( + f"\n[Sender] >>> {message_str.strip()}" + ) + serial_port.write(message_str.encode("utf-8")) except Exception as e: - print(f"\n[Sender] Error: {e}") + print( + f"\n[Sender] Error: {e}" + ) shutdown_event.wait(SENSOR_INTERVAL_S) + def main(): global serial_port print("--- Meshcore Simulator ---") reader = threading.Thread(target=serial_reader, daemon=True) reader.start() if SEND_PERIODIC_SENSOR_DATA: - sender = threading.Thread(target=periodic_sender, daemon=True) - sender.start() + sender = threading.Thread(target=periodic_sender, daemon=True) + sender.start() while not shutdown_event.is_set(): if not serial_port or not serial_port.is_open: try: - serial_port = serial.Serial(SIMULATOR_PORT, BAUD_RATE, timeout=0.5) + serial_port = serial.Serial( + SIMULATOR_PORT, BAUD_RATE, timeout=0.5 + ) print(f"[Main] Opened {SIMULATOR_PORT}") except Exception: time.sleep(5) @@ -75,13 +92,15 @@ def main(): try: user_input = input() if user_input: - msg = user_input.encode('utf-8') + b'\n' + msg = user_input.encode("utf-8") + b"\n" serial_port.write(msg) except KeyboardInterrupt: - break + break shutdown_event.set() - if serial_port: serial_port.close() + if serial_port: + serial_port.close() + if __name__ == "__main__": main() diff --git a/run_bridge.py b/run_bridge.py index 3fd5d74..aeee847 100644 --- a/run_bridge.py +++ b/run_bridge.py @@ -1,7 +1,8 @@ #!/usr/bin/env python3 # run_bridge.py """ -Executable script to initialize and run the Akita Meshtastic Meshcore Bridge (AMMB). +Executable script to initialize and run the Akita Meshtastic Meshcore +Bridge (AMMB). This script handles: - Checking for essential dependencies. @@ -11,9 +12,9 @@ This script handles: - Handling graceful shutdown on KeyboardInterrupt (Ctrl+C). """ -import sys import logging import os +import sys # Ensure the script can find the 'ammb' package project_root = os.path.dirname(os.path.abspath(__file__)) @@ -21,37 +22,46 @@ sys.path.insert(0, project_root) # --- Dependency Check --- try: - import configparser - import queue - import threading - import time - import json - import serial - import paho.mqtt.client as paho_mqtt - from pubsub import pub - import meshtastic - import meshtastic.serial_interface + import configparser # noqa: F401 + import json # noqa: F401 + import queue # noqa: F401 + import threading # noqa: F401 + import time # noqa: F401 + + import meshtastic # noqa: F401 + import meshtastic.serial_interface # noqa: F401 + import paho.mqtt.client as paho_mqtt # noqa: F401 + import serial # noqa: F401 + from pubsub import pub # noqa: F401 except ImportError as e: - print(f"ERROR: Missing required library - {e.name}", file=sys.stderr) + print("ERROR: Missing required library - %s" % e.name, file=sys.stderr) print("Please install required libraries by running:", file=sys.stderr) - print(f" pip install -r {os.path.join(project_root, 'requirements.txt')}", file=sys.stderr) + print( + " pip install -r %s" % os.path.join(project_root, 'requirements.txt'), + file=sys.stderr, + ) sys.exit(1) # --- Imports --- try: from ammb import Bridge + from ammb.config_handler import CONFIG_FILE, load_config from ammb.utils import setup_logging - from ammb.config_handler import load_config, CONFIG_FILE except ImportError as e: print(f"ERROR: Failed to import AMMB modules: {e}", file=sys.stderr) - print("Ensure the script is run from the project root directory", file=sys.stderr) + print( + "Ensure the script is run from the project root directory", + file=sys.stderr, + ) sys.exit(1) # --- Main Execution --- if __name__ == "__main__": # Basic logging setup until config is loaded - logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) logging.info("--- Akita Meshtastic Meshcore Bridge Starting ---") # --- Configuration Loading --- @@ -71,19 +81,26 @@ if __name__ == "__main__": # --- Bridge Initialization and Execution --- logging.info("Initializing bridge instance...") bridge = Bridge(config) - + # Check if external handler was successfully created if not bridge.external_handler: - logging.critical("Bridge initialization failed (likely handler issue). Exiting.") - sys.exit(1) + logging.critical( + "Bridge initialization failed (likely handler issue). Exiting." + ) + sys.exit(1) try: logging.info("Starting bridge run loop...") - bridge.run() + bridge.run() except KeyboardInterrupt: - logging.info("KeyboardInterrupt received. Initiating graceful shutdown...") + logging.info( + "KeyboardInterrupt received. Initiating graceful shutdown..." + ) except Exception as e: - logging.critical(f"Unhandled critical exception in bridge execution: {e}", exc_info=True) + logging.critical( + f"Unhandled critical exception in bridge execution: {e}", + exc_info=True, + ) logging.info("Attempting emergency shutdown...") bridge.stop() sys.exit(1) diff --git a/tests/__init__.py b/tests/__init__.py index aeaeda7..97b3358 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -2,4 +2,5 @@ """ Initialization file for the AMMB test suite package. """ -# This file can be empty or contain package-level test setup/fixtures if needed. + +# This file can be empty or contain package-level setup. diff --git a/tests/conftest.py b/tests/conftest.py index f17b72e..0dbacc8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,20 +1,21 @@ -import pytest import configparser +import pytest + + @pytest.fixture(scope="function") def temp_config_file(tmp_path): config_path = tmp_path / "config.ini" parser = configparser.ConfigParser() - parser['DEFAULT'] = { - 'MESHTASTIC_SERIAL_PORT': '/dev/test_meshtastic', - 'EXTERNAL_TRANSPORT': 'serial', - 'SERIAL_PORT': '/dev/test_meshcore', - 'SERIAL_BAUD_RATE': '19200', - 'SERIAL_PROTOCOL': 'json_newline', - 'MESSAGE_QUEUE_SIZE': '50', - 'LOG_LEVEL': 'DEBUG', + parser["DEFAULT"] = { + "MESHTASTIC_SERIAL_PORT": "/dev/test_meshtastic", + "EXTERNAL_TRANSPORT": "serial", + "SERIAL_PORT": "/dev/test_meshcore", + "SERIAL_BAUD_RATE": "19200", + "SERIAL_PROTOCOL": "json_newline", + "MESSAGE_QUEUE_SIZE": "50", + "LOG_LEVEL": "DEBUG", } - with open(config_path, 'w') as f: + with open(config_path, "w") as f: parser.write(f) yield str(config_path) - diff --git a/tests/test_config_handler.py b/tests/test_config_handler.py index cf99447..520b0fb 100644 --- a/tests/test_config_handler.py +++ b/tests/test_config_handler.py @@ -1,6 +1,5 @@ -import pytest -import json -from ammb.protocol import get_serial_protocol_handler, JsonNewlineProtocol +from ammb.protocol import JsonNewlineProtocol, get_serial_protocol_handler + def test_json_newline_encode(): handler = JsonNewlineProtocol() @@ -8,12 +7,14 @@ def test_json_newline_encode(): encoded = handler.encode(data) assert encoded == b'{"key": "value"}\n' + def test_factory_function(): - handler = get_serial_protocol_handler('json_newline') + handler = get_serial_protocol_handler("json_newline") assert isinstance(handler, JsonNewlineProtocol) - + + def test_raw_serial_handler(): - handler = get_serial_protocol_handler('raw_serial') - raw_data = b'\x01\x02\x03' + handler = get_serial_protocol_handler("raw_serial") + raw_data = b"\x01\x02\x03" decoded = handler.decode(raw_data) - assert decoded['payload'] == "MC_BIN: 010203" + assert decoded["payload"] == "MC_BIN: 010203" diff --git a/tests/test_config_handler_missing_default.py b/tests/test_config_handler_missing_default.py index e2e733a..778f2dd 100644 --- a/tests/test_config_handler_missing_default.py +++ b/tests/test_config_handler_missing_default.py @@ -5,10 +5,12 @@ def test_load_config_missing_default(tmp_path): cfgfile = tmp_path / "no_default.ini" cfgfile.write_text("[serial]\nSERIAL_PORT=/dev/ttyS1\n") - spec = importlib.util.spec_from_file_location('config_handler', 'ammb/config_handler.py') + spec = importlib.util.spec_from_file_location( + "config_handler", "ammb/config_handler.py" + ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) cfg = mod.load_config(str(cfgfile)) assert cfg is not None - assert cfg.log_level == 'INFO' + assert cfg.log_level == "INFO" diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 8eea9ec..98dba25 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -4,81 +4,104 @@ Tests for the ammb.protocol module, focusing on protocol handlers. """ import pytest -import json # Module to test -from ammb.protocol import get_serial_protocol_handler, JsonNewlineProtocol, MeshcoreProtocolHandler +from ammb.protocol import ( + JsonNewlineProtocol, + get_serial_protocol_handler, +) # --- Test JsonNewlineProtocol --- + @pytest.fixture def json_handler() -> JsonNewlineProtocol: """Provides an instance of the JsonNewlineProtocol handler.""" return JsonNewlineProtocol() + # Parameterize test data for encoding encode_test_data = [ ({"key": "value", "num": 123}, b'{"key": "value", "num": 123}\n'), ({"list": [1, 2, None]}, b'{"list": [1, 2, null]}\n'), - ({}, b'{}\n'), + ({}, b"{}\n"), ] + @pytest.mark.parametrize("input_dict, expected_bytes", encode_test_data) -def test_json_newline_encode_success(json_handler: JsonNewlineProtocol, input_dict: dict, expected_bytes: bytes): +def test_json_newline_encode_success( + json_handler: JsonNewlineProtocol, input_dict: dict, expected_bytes: bytes +): """Test successful encoding with JsonNewlineProtocol.""" result = json_handler.encode(input_dict) assert result == expected_bytes + def test_json_newline_encode_error(json_handler: JsonNewlineProtocol): """Test encoding data that cannot be JSON serialized.""" # Sets cannot be directly JSON serialized result = json_handler.encode({"data": {1, 2, 3}}) assert result is None + # Parameterize test data for decoding decode_test_data = [ (b'{"key": "value", "num": 123}\n', {"key": "value", "num": 123}), - (b'{"list": [1, 2, null]} \r\n', {"list": [1, 2, None]}), # Handle trailing whitespace/CR - (b'{}', {}), + ( + b'{"list": [1, 2, null]} \r\n', + {"list": [1, 2, None]}, + ), # Handle trailing whitespace/CR + (b"{}", {}), ] + @pytest.mark.parametrize("input_bytes, expected_dict", decode_test_data) -def test_json_newline_decode_success(json_handler: JsonNewlineProtocol, input_bytes: bytes, expected_dict: dict): +def test_json_newline_decode_success( + json_handler: JsonNewlineProtocol, input_bytes: bytes, expected_dict: dict +): """Test successful decoding with JsonNewlineProtocol.""" result = json_handler.decode(input_bytes) assert result == expected_dict + # Parameterize invalid data for decoding decode_error_data = [ - b'this is not json\n', # Invalid JSON - b'{"key": "value",\n', # Incomplete JSON - b'{"key": value_without_quotes}\n', # Invalid JSON syntax - b'\x80\x81\x82\n', # Invalid UTF-8 start bytes - b'', # Empty bytes - b' \n', # Whitespace only line - b'["list", "not_dict"]\n', # Valid JSON, but not a dictionary + b"this is not json\n", # Invalid JSON + b'{"key": "value",\n', # Incomplete JSON + b'{"key": value_without_quotes}\n', # Invalid JSON syntax + b"\x80\x81\x82\n", # Invalid UTF-8 start bytes + b"", # Empty bytes + b" \n", # Whitespace only line + b'["list", "not_dict"]\n', # Valid JSON, but not a dictionary ] + @pytest.mark.parametrize("invalid_bytes", decode_error_data) -def test_json_newline_decode_errors(json_handler: JsonNewlineProtocol, invalid_bytes: bytes): +def test_json_newline_decode_errors( + json_handler: JsonNewlineProtocol, invalid_bytes: bytes +): """Test decoding various forms of invalid input.""" result = json_handler.decode(invalid_bytes) assert result is None + # --- Test Factory Function --- + def test_get_protocol_handler_success(): """Test getting a known protocol handler.""" - handler = get_serial_protocol_handler('json_newline') + handler = get_serial_protocol_handler("json_newline") assert isinstance(handler, JsonNewlineProtocol) # Test case insensitivity - handler_upper = get_serial_protocol_handler('JSON_NEWLINE') + handler_upper = get_serial_protocol_handler("JSON_NEWLINE") assert isinstance(handler_upper, JsonNewlineProtocol) + def test_get_protocol_handler_unsupported(): """Test getting an unknown protocol handler raises ValueError.""" with pytest.raises(ValueError): - get_serial_protocol_handler('unknown_protocol') + get_serial_protocol_handler("unknown_protocol") -# Add tests for other protocol handlers (e.g., PlainTextProtocol) when implemented. +# Add tests for other protocol handlers (e.g., PlainTextProtocol) +# when implemented. diff --git a/tmp_run_load.py b/tmp_run_load.py index 9f10bc4..a309d1e 100644 --- a/tmp_run_load.py +++ b/tmp_run_load.py @@ -1,6 +1,11 @@ import importlib.util -spec = importlib.util.spec_from_file_location('config_handler', 'ammb/config_handler.py') + +spec = importlib.util.spec_from_file_location( + "config_handler", "ammb/config_handler.py" +) +if spec is None or spec.loader is None: + raise SystemExit("Failed to locate module spec for config_handler") mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) -print('Using file: tmp_no_default.ini') -print(mod.load_config('tmp_no_default.ini')) +print("Using file: tmp_no_default.ini") +print(mod.load_config("tmp_no_default.ini")) diff --git a/tmp_test_config.py b/tmp_test_config.py index d3c23e9..4596f0c 100644 --- a/tmp_test_config.py +++ b/tmp_test_config.py @@ -1,7 +1,8 @@ from ammb import config_handler -path='tmp_no_default.ini' -with open(path,'w') as f: - f.write('[serial]\nSERIAL_PORT=/dev/ttyS1\n') -print('Using file:', path) + +path = "tmp_no_default.ini" +with open(path, "w") as f: + f.write("[serial]\nSERIAL_PORT=/dev/ttyS1\n") +print("Using file:", path) cfg = config_handler.load_config(path) -print('Result:', cfg) +print("Result:", cfg)