""" Mesh CLI Handler Handles administrative commands sent to repeaters and room servers via TXT_MSG packets. Only users with admin permissions (via ACL) can execute these commands. """ import logging import time from pathlib import Path from typing import Any, Callable, Dict, Optional import yaml logger = logging.getLogger(__name__) class MeshCLI: """ CLI command handler for mesh node administration (repeaters and room servers). Commands follow the format: XX|command params where XX is an optional sequence number that gets echoed in the reply. """ def __init__( self, config_path: str, config: Dict[str, Any], save_config_callback: Callable, identity_type: str = "repeater", enable_regions: bool = True, ): """ Initialize the CLI handler. Args: config_path: Path to the config.yaml file config: Current configuration dictionary save_config_callback: Callback to save config changes identity_type: Type of identity ('repeater' or 'room_server') enable_regions: Whether to enable region commands (only for repeaters) """ self.config_path = Path(config_path) self.config = config self.save_config = save_config_callback self.identity_type = identity_type self.enable_regions = enable_regions # Get repeater config shortcut self.repeater_config = config.get("repeater", {}) def handle_command(self, sender_pubkey: bytes, command: str, is_admin: bool) -> str: """ Handle an incoming command from a client. Args: sender_pubkey: Public key of sender command: Command string (may include XX| prefix) is_admin: Whether sender has admin permissions Returns: Reply string to send back to sender """ # Check admin permission first if not is_admin: return "Error: Admin permission required" logger.debug(f"handle_command received: '{command}' (len={len(command)})") # Extract optional sequence prefix (XX|) prefix = "" if len(command) > 4 and command[2] == "|": prefix = command[:3] command = command[3:] logger.debug(f"Extracted prefix: '{prefix}', remaining command: '{command}'") # Strip leading/trailing whitespace command = command.strip() logger.debug(f"After strip: '{command}'") # Route to appropriate handler reply = self._route_command(command) # Add prefix back to reply if present if prefix: return prefix + reply return reply def _route_command(self, command: str) -> str: """Route command to appropriate handler method.""" # Help if command == "help" or command.startswith("help "): return self._cmd_help(command) # System commands elif command == "reboot": return self._cmd_reboot() elif command == "advert": return self._cmd_advert() elif command.startswith("clock"): return self._cmd_clock(command) elif command.startswith("time "): return self._cmd_time(command) elif command == "start ota": return "Error: OTA not supported in Python repeater" elif command.startswith("password "): return self._cmd_password(command) elif command == "clear stats": return self._cmd_clear_stats() elif command == "ver": return self._cmd_version() # Get commands elif command.startswith("get "): return self._cmd_get(command[4:]) # Set commands elif command.startswith("set "): return self._cmd_set(command[4:]) # ACL commands elif command.startswith("setperm "): return self._cmd_setperm(command) elif command == "get acl": return "Error: Use 'get acl' via serial console only" # Region commands (repeaters only) elif command.startswith("region"): if self.enable_regions: return self._cmd_region(command) else: return "Error: Region commands not available for room servers" # Neighbor commands elif command == "neighbors": return self._cmd_neighbors() elif command.startswith("neighbor.remove "): return self._cmd_neighbor_remove(command) # Temporary radio params elif command.startswith("tempradio "): return self._cmd_tempradio(command) # Sensor commands elif command.startswith("sensor "): return "Error: Sensor commands not implemented in Python repeater" # GPS commands elif command.startswith("gps"): return "Error: GPS commands not implemented in Python repeater" # Logging commands elif command.startswith("log "): return self._cmd_log(command) # Statistics commands elif command.startswith("stats-"): return "Error: Stats commands not fully implemented yet" else: return "Unknown command" # ==================== Help Command ==================== def _cmd_help(self, command: str) -> str: """Show available commands or detailed help for a specific command.""" parts = command.split(None, 1) if len(parts) == 2: return self._help_detail(parts[1]) lines = [ "=== pyMC CLI Commands ===", "", "System:", " reboot Restart the repeater service", " advert Send self advertisement", " clock Show current UTC time", " clock sync Sync clock (no-op, uses system time)", " ver Show version info", " password Change admin password", " clear stats Clear statistics", "", "Get:", " get name Node name", " get radio Radio params (freq,bw,sf,cr)", " get freq Frequency (MHz)", " get tx TX power", " get af Airtime factor", " get repeat Repeat mode (on/off)", " get lat / get lon GPS coordinates", " get role Identity role", " get guest.password Guest password", " get allow.read.only Read-only access setting", " get advert.interval Advert interval (minutes)", " get flood.advert.interval Flood advert interval (hours)", " get flood.max Max flood hops", " get rxdelay RX delay base", " get txdelay TX delay factor", " get direct.txdelay Direct TX delay factor", " get multi.acks Multi-ack count", " get int.thresh Interference threshold", " get agc.reset.interval AGC reset interval", "", "Set: (use 'help set' for details)", " set ", "", "Other:", " neighbors List neighbors", " neighbor.remove Remove neighbor by pubkey", " tempradio ", " setperm Set ACL permissions", " log start|stop|erase Logging control", ] if self.enable_regions: lines.append(" region ... Region commands") lines += ["", "Type 'help ' for details on a specific command."] return "\n".join(lines) def _help_detail(self, topic: str) -> str: """Return detailed help for a specific command topic.""" topic = topic.strip() details = { "set": ( "Set commands — set :\n" " set name Set node name\n" " set radio Set radio (restart required)\n" " set freq Set frequency (restart required)\n" " set tx Set TX power\n" " set af Airtime factor\n" " set repeat on|off Enable/disable repeating\n" " set lat Latitude\n" " set lon Longitude\n" " set guest.password Guest password\n" " set allow.read.only on|off Read-only access\n" " set advert.interval 60-240 minutes\n" " set flood.advert.interval
3-48 hours\n" " set flood.max Max flood hops (max 64)\n" " set rxdelay RX delay base (>=0)\n" " set txdelay TX delay factor (>=0)\n" " set direct.txdelay Direct TX delay (>=0)\n" " set multi.acks Multi-ack count\n" " set int.thresh Interference threshold\n" " set agc.reset.interval AGC reset (rounded to x4)" ), "get": "Get commands — type 'help' to see all 'get' parameters.", "reboot": "Restart the repeater service via systemd.", "advert": "Trigger a self-advertisement flood packet.", "clock": "'clock' shows UTC time. 'clock sync' is a no-op (system time used).", "ver": "Show repeater version and identity type.", "password": "password — Change the admin password.", "tempradio": ( "tempradio \n" " Apply temporary radio parameters that revert after timeout.\n" " freq: 300-2500 MHz, bw: 7-500 kHz, sf: 5-12, cr: 5-8" ), "neighbors": "List known neighbor nodes from the routing table.", "setperm": "setperm — Set ACL permissions for a node.", "log": "log start|stop|erase — Control logging.", } return details.get(topic, f"No detailed help for '{topic}'. Type 'help' for command list.") # ==================== System Commands == def _cmd_reboot(self) -> str: """Reboot the repeater process.""" from repeater.service_utils import restart_service logger.warning("Reboot command received via repeater CLI") success, message = restart_service() if success: return f"OK - {message}" else: return f"Error: {message}" def _cmd_advert(self) -> str: """Send self advertisement.""" logger.info("Advert command received") # TODO: Trigger advertisement through packet handler return "Error: Not yet implemented" def _cmd_clock(self, command: str) -> str: """Handle clock commands.""" if command == "clock": # Display current time import datetime dt = datetime.datetime.utcnow() return f"{dt.hour:02d}:{dt.minute:02d} - {dt.day}/{dt.month}/{dt.year} UTC" elif command == "clock sync": # Clock sync happens automatically via sender_timestamp in protocol return "OK - clock sync not needed (system time used)" else: return "Unknown clock command" def _cmd_time(self, command: str) -> str: """Set time - not supported in Python (use system time).""" return "Error: Time setting not supported (system time is used)" def _cmd_password(self, command: str) -> str: """Change admin password.""" new_password = command[9:].strip() if not new_password: return "Error: Password cannot be empty" # Update security config if "security" not in self.config: self.config["security"] = {} self.config["security"]["password"] = new_password # Save config try: self.save_config() return f"password now: {new_password}" except Exception as e: logger.error(f"Failed to save password: {e}") return "Error: Failed to save password" def _cmd_clear_stats(self) -> str: """Clear statistics.""" # TODO: Implement stats clearing return "Error: Not yet implemented" def _cmd_version(self) -> str: """Get version information.""" role = "room_server" if self.identity_type == "room_server" else "repeater" version = self.config.get("version", "1.0.0") return f"pyMC_{role} v{version}" # ==================== Get Commands ==================== def _cmd_get(self, param: str) -> str: """Handle get commands.""" param = param.strip() logger.debug(f"_cmd_get called with param: '{param}' (len={len(param)})") if param == "af": af = self.repeater_config.get("airtime_factor", 1.0) return f"> {af}" elif param == "name": name = self.repeater_config.get("name", "Unknown") return f"> {name}" elif param == "repeat": mode = self.repeater_config.get("mode", "forward") return f"> {'on' if mode == 'forward' else 'off'}" elif param == "lat": lat = self.repeater_config.get("latitude", 0.0) return f"> {lat}" elif param == "lon": lon = self.repeater_config.get("longitude", 0.0) return f"> {lon}" elif param == "radio": radio = self.config.get("radio", {}) freq_hz = radio.get("frequency", 915000000) bw_hz = radio.get("bandwidth", 125000) sf = radio.get("spreading_factor", 7) cr = radio.get("coding_rate", 5) # Convert Hz to MHz for freq, Hz to kHz for bandwidth (match C++ ftoa output) freq_mhz = freq_hz / 1_000_000.0 bw_khz = bw_hz / 1_000.0 return f"> {freq_mhz},{bw_khz},{sf},{cr}" elif param == "freq": freq_hz = self.config.get("radio", {}).get("frequency", 915000000) freq_mhz = freq_hz / 1_000_000.0 return f"> {freq_mhz}" elif param == "tx": power = self.config.get("radio", {}).get("tx_power", 20) return f"> {power}" elif param == "public.key": # TODO: Get from identity return "Error: Not yet implemented" elif param == "role": role = "room_server" if self.identity_type == "room_server" else "repeater" return f"> {role}" elif param == "guest.password": guest_pw = self.config.get("security", {}).get("guest_password", "") return f"> {guest_pw}" elif param == "allow.read.only": allow = self.config.get("security", {}).get("allow_read_only", False) return f"> {'on' if allow else 'off'}" elif param == "advert.interval": interval = self.repeater_config.get("advert_interval_minutes", 120) return f"> {interval}" elif param == "flood.advert.interval": interval = self.repeater_config.get("flood_advert_interval_hours", 24) return f"> {interval}" elif param == "flood.max": max_flood = self.repeater_config.get("max_flood_hops", 3) return f"> {max_flood}" elif param == "rxdelay": delay = self.repeater_config.get("rx_delay_base", 0.0) return f"> {delay}" elif param == "txdelay": delay = self.repeater_config.get("tx_delay_factor", 1.0) return f"> {delay}" elif param == "direct.txdelay": delay = self.repeater_config.get("direct_tx_delay_factor", 0.5) return f"> {delay}" elif param == "multi.acks": acks = self.repeater_config.get("multi_acks", 0) return f"> {acks}" elif param == "int.thresh": thresh = self.repeater_config.get("interference_threshold", -120) return f"> {thresh}" elif param == "agc.reset.interval": interval = self.repeater_config.get("agc_reset_interval", 0) return f"> {interval}" else: return f"??: {param}" # ==================== Set Commands ==================== def _cmd_set(self, param: str) -> str: """Handle set commands.""" parts = param.split(None, 1) if len(parts) < 2: return "Error: Missing value" key, value = parts[0], parts[1] try: if key == "af": self.repeater_config["airtime_factor"] = float(value) self.save_config() return "OK" elif key == "name": self.repeater_config["name"] = value self.save_config() return "OK" elif key == "repeat": self.repeater_config["mode"] = "forward" if value.lower() == "on" else "monitor" self.save_config() return f"OK - repeat is now {'ON' if self.repeater_config['mode'] == 'forward' else 'OFF'}" elif key == "lat": self.repeater_config["latitude"] = float(value) self.save_config() return "OK" elif key == "lon": self.repeater_config["longitude"] = float(value) self.save_config() return "OK" elif key == "radio": # Format: freq bw sf cr radio_parts = value.split() if len(radio_parts) != 4: return "Error: Expected freq bw sf cr" if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(radio_parts[0]) self.config["radio"]["bandwidth"] = float(radio_parts[1]) self.config["radio"]["spreading_factor"] = int(radio_parts[2]) self.config["radio"]["coding_rate"] = int(radio_parts[3]) self.save_config() return "OK - restart repeater to apply" elif key == "freq": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(value) self.save_config() return "OK - restart repeater to apply" elif key == "tx": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["tx_power"] = int(value) self.save_config() return "OK" elif key == "guest.password": if "security" not in self.config: self.config["security"] = {} self.config["security"]["guest_password"] = value self.save_config() return "OK" elif key == "allow.read.only": if "security" not in self.config: self.config["security"] = {} self.config["security"]["allow_read_only"] = value.lower() == "on" self.save_config() return "OK" elif key == "advert.interval": mins = int(value) if mins > 0 and (mins < 60 or mins > 240): return "Error: interval range is 60-240 minutes" self.repeater_config["advert_interval_minutes"] = mins self.save_config() return "OK" elif key == "flood.advert.interval": hours = int(value) if (hours > 0 and hours < 3) or hours > 48: return "Error: interval range is 3-48 hours" self.repeater_config["flood_advert_interval_hours"] = hours self.save_config() return "OK" elif key == "flood.max": max_val = int(value) if max_val > 64: return "Error: max 64" self.repeater_config["max_flood_hops"] = max_val self.save_config() return "OK" elif key == "rxdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["rx_delay_base"] = delay self.save_config() return "OK" elif key == "txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["tx_delay_factor"] = delay self.save_config() return "OK" elif key == "direct.txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["direct_tx_delay_factor"] = delay self.save_config() return "OK" elif key == "multi.acks": self.repeater_config["multi_acks"] = int(value) self.save_config() return "OK" elif key == "int.thresh": self.repeater_config["interference_threshold"] = int(value) self.save_config() return "OK" elif key == "agc.reset.interval": interval = int(value) # Round to nearest multiple of 4 rounded = (interval // 4) * 4 self.repeater_config["agc_reset_interval"] = rounded self.save_config() return f"OK - interval rounded to {rounded}" else: return f"unknown config: {key}" except ValueError as e: return f"Error: invalid value - {e}" except Exception as e: logger.error(f"Set command error: {e}") return f"Error: {e}" # ==================== ACL Commands ==================== def _cmd_setperm(self, command: str) -> str: """Set permissions for a public key.""" # Format: setperm {pubkey-hex} {permissions-int} parts = command[8:].split() if len(parts) < 2: return "Err - bad params" pubkey_hex = parts[0] try: permissions = int(parts[1]) except ValueError: return "Err - invalid permissions" # TODO: Apply permissions via ACL logger.info(f"setperm command: {pubkey_hex} -> {permissions}") return "Error: Not yet implemented - use config file" # ==================== Region Commands ==================== def _cmd_region(self, command: str) -> str: """Handle region commands.""" parts = command.split() if len(parts) == 1: return "Error: Region commands not implemented in Python repeater" subcommand = parts[1] if subcommand == "load": return "Error: Region commands not implemented" elif subcommand == "save": return "Error: Region commands not implemented" elif subcommand in ("allowf", "denyf", "get", "home", "put", "remove"): return "Error: Region commands not implemented" else: return "Err - ??" # ==================== Neighbor Commands ==================== def _cmd_neighbors(self) -> str: """List neighbors.""" # TODO: Get neighbors from routing table return "Error: Not yet implemented" def _cmd_neighbor_remove(self, command: str) -> str: """Remove a neighbor.""" pubkey_hex = command[16:].strip() if not pubkey_hex: return "ERR: Missing pubkey" # TODO: Remove neighbor from routing table logger.info(f"neighbor.remove: {pubkey_hex}") return "Error: Not yet implemented" # ==================== Temporary Radio Commands ==================== def _cmd_tempradio(self, command: str) -> str: """Apply temporary radio parameters.""" # Format: tempradio {freq} {bw} {sf} {cr} {timeout_mins} parts = command[10:].split() if len(parts) < 5: return "Error: Expected freq bw sf cr timeout_mins" try: freq = float(parts[0]) bw = float(parts[1]) sf = int(parts[2]) cr = int(parts[3]) timeout_mins = int(parts[4]) # Validate if not (300.0 <= freq <= 2500.0): return "Error: invalid frequency" if not (7.0 <= bw <= 500.0): return "Error: invalid bandwidth" if not (5 <= sf <= 12): return "Error: invalid spreading factor" if not (5 <= cr <= 8): return "Error: invalid coding rate" if timeout_mins <= 0: return "Error: invalid timeout" # TODO: Apply temporary radio parameters logger.info(f"tempradio: {freq}MHz {bw}kHz SF{sf} CR4/{cr} for {timeout_mins}min") return "Error: Not yet implemented" except ValueError: return "Error, invalid params" # ==================== Logging Commands ==================== def _cmd_log(self, command: str) -> str: """Handle log commands.""" if command == "log start": # TODO: Enable logging return "Error: Not yet implemented" elif command == "log stop": # TODO: Disable logging return "Error: Not yet implemented" elif command == "log erase": # TODO: Clear log file return "Error: Not yet implemented" elif command == "log": return "Error: Use journalctl to view logs" else: return "Unknown log command" # Backward compatibility alias RepeaterCLI = MeshCLI