""" Mesh CLI Handler Handles administrative commands sent to repeaters and room servers via TXT_MSG packets. Only users with admin permissions (via ACL) can execute these commands. """ import logging from pathlib import Path from typing import Any, Callable, Dict logger = logging.getLogger(__name__) class MeshCLI: """ CLI command handler for mesh node administration (repeaters and room servers). Commands follow the format: XX|command params where XX is an optional sequence number that gets echoed in the reply. """ def __init__( self, config_path: str, config: Dict[str, Any], save_config_callback: Callable, identity_type: str = "repeater", enable_regions: bool = True, ): """ Initialize the CLI handler. Args: config_path: Path to the config.yaml file config: Current configuration dictionary save_config_callback: Callback to save config changes identity_type: Type of identity ('repeater' or 'room_server') enable_regions: Whether to enable region commands (only for repeaters) """ self.config_path = Path(config_path) self.config = config self.save_config = save_config_callback self.identity_type = identity_type self.enable_regions = enable_regions # Get repeater config shortcut self.repeater_config = config.get("repeater", {}) def handle_command(self, sender_pubkey: bytes, command: str, is_admin: bool) -> str: """ Handle an incoming command from a client. Args: sender_pubkey: Public key of sender command: Command string (may include XX| prefix) is_admin: Whether sender has admin permissions Returns: Reply string to send back to sender """ # Check admin permission first if not is_admin: return "Error: Admin permission required" logger.debug(f"handle_command received: '{command}' (len={len(command)})") # Extract optional sequence prefix (XX|) prefix = "" if len(command) > 4 and command[2] == "|": prefix = command[:3] command = command[3:] logger.debug(f"Extracted prefix: '{prefix}', remaining command: '{command}'") # Strip leading/trailing whitespace command = command.strip() logger.debug(f"After strip: '{command}'") # Route to appropriate handler reply = self._route_command(command) # Add prefix back to reply if present if prefix: return prefix + reply return reply def _route_command(self, command: str) -> str: """Route command to appropriate handler method.""" # Help if command == "help" or command.startswith("help "): return self._cmd_help(command) # System commands elif command == "reboot": return self._cmd_reboot() elif command == "advert": return self._cmd_advert() elif command.startswith("clock"): return self._cmd_clock(command) elif command.startswith("time "): return self._cmd_time(command) elif command == "start ota": return "Error: OTA not supported in Python repeater" elif command.startswith("password "): return self._cmd_password(command) elif command == "clear stats": return self._cmd_clear_stats() elif command == "ver": return self._cmd_version() # Get commands elif command.startswith("get "): return self._cmd_get(command[4:]) # Set commands elif command.startswith("set "): return self._cmd_set(command[4:]) # ACL commands elif command.startswith("setperm "): return self._cmd_setperm(command) elif command == "get acl": return "Error: Use 'get acl' via serial console only" # Region commands (repeaters only) elif command.startswith("region"): if self.enable_regions: return self._cmd_region(command) else: return "Error: Region commands not available for room servers" # Neighbor commands elif command == "neighbors": return self._cmd_neighbors() elif command.startswith("neighbor.remove "): return self._cmd_neighbor_remove(command) # Temporary radio params elif command.startswith("tempradio "): return self._cmd_tempradio(command) # Sensor commands elif command.startswith("sensor "): return "Error: Sensor commands not implemented in Python repeater" # GPS commands elif command.startswith("gps"): return "Error: GPS commands not implemented in Python repeater" # Logging commands elif command.startswith("log "): return self._cmd_log(command) # Statistics commands elif command.startswith("stats-"): return "Error: Stats commands not fully implemented yet" else: return "Unknown command" # ==================== Help Command ==================== def _cmd_help(self, command: str) -> str: """Show available commands or detailed help for a specific command.""" parts = command.split(None, 1) if len(parts) == 2: return self._help_detail(parts[1]) lines = [ "=== pyMC CLI Commands ===", "", "System:", " reboot Restart the repeater service", " advert Send self advertisement", " clock Show current UTC time", " clock sync Sync clock (no-op, uses system time)", " ver Show version info", " password Change admin password", " clear stats Clear statistics", "", "Get:", " get name Node name", " get radio Radio params (freq,bw,sf,cr)", " get freq Frequency (MHz)", " get tx TX power", " get af Airtime factor", " get repeat Repeat mode (on/off)", " get lat / get lon GPS coordinates", " get role Identity role", " get guest.password Guest password", " get allow.read.only Read-only access setting", " get advert.interval Advert interval (minutes)", " get flood.advert.interval Flood advert interval (hours)", " get flood.max Max flood hops", " get rxdelay RX delay base", " get txdelay TX delay factor", " get direct.txdelay Direct TX delay factor", " get multi.acks Multi-ack count", " get int.thresh Interference threshold", " get agc.reset.interval AGC reset interval", "", "Set: (use 'help set' for details)", " set ", "", "Other:", " neighbors List neighbors", " neighbor.remove Remove neighbor by pubkey", " tempradio ", " setperm Set ACL permissions", " log start|stop|erase Logging control", ] if self.enable_regions: lines.append(" region ... Region commands") lines += ["", "Type 'help ' for details on a specific command."] return "\n".join(lines) def _help_detail(self, topic: str) -> str: """Return detailed help for a specific command topic.""" topic = topic.strip() details = { "set": ( "Set commands — set :\n" " set name Set node name\n" " set radio Set radio (restart required)\n" " set freq Set frequency (restart required)\n" " set tx Set TX power\n" " set af Airtime factor\n" " set repeat on|off Enable/disable repeating\n" " set lat Latitude\n" " set lon Longitude\n" " set guest.password Guest password\n" " set allow.read.only on|off Read-only access\n" " set advert.interval 60-240 minutes\n" " set flood.advert.interval
3-48 hours\n" " set flood.max Max flood hops (max 64)\n" " set rxdelay RX delay base (>=0)\n" " set txdelay TX delay factor (>=0)\n" " set direct.txdelay Direct TX delay (>=0)\n" " set multi.acks Multi-ack count\n" " set int.thresh Interference threshold\n" " set agc.reset.interval AGC reset (rounded to x4)" ), "get": "Get commands — type 'help' to see all 'get' parameters.", "reboot": "Restart the repeater service via systemd.", "advert": "Trigger a self-advertisement flood packet.", "clock": "'clock' shows UTC time. 'clock sync' is a no-op (system time used).", "ver": "Show repeater version and identity type.", "password": "password — Change the admin password.", "tempradio": ( "tempradio \n" " Apply temporary radio parameters that revert after timeout.\n" " freq: 300-2500 MHz, bw: 7-500 kHz, sf: 5-12, cr: 5-8" ), "neighbors": "List known neighbor nodes from the routing table.", "setperm": "setperm — Set ACL permissions for a node.", "log": "log start|stop|erase — Control logging.", } return details.get(topic, f"No detailed help for '{topic}'. Type 'help' for command list.") # ==================== System Commands == def _cmd_reboot(self) -> str: """Reboot the repeater process.""" from repeater.service_utils import restart_service logger.warning("Reboot command received via repeater CLI") success, message = restart_service() if success: return f"OK - {message}" else: return f"Error: {message}" def _cmd_advert(self) -> str: """Send self advertisement.""" logger.info("Advert command received") # TODO: Trigger advertisement through packet handler return "Error: Not yet implemented" def _cmd_clock(self, command: str) -> str: """Handle clock commands.""" if command == "clock": # Display current time import datetime dt = datetime.datetime.now(datetime.timezone.utc) return f"{dt.hour:02d}:{dt.minute:02d} - {dt.day}/{dt.month}/{dt.year} UTC" elif command == "clock sync": # Clock sync happens automatically via sender_timestamp in protocol return "OK - clock sync not needed (system time used)" else: return "Unknown clock command" def _cmd_time(self, command: str) -> str: """Set time - not supported in Python (use system time).""" return "Error: Time setting not supported (system time is used)" def _cmd_password(self, command: str) -> str: """Change admin password.""" new_password = command[9:].strip() if not new_password: return "Error: Password cannot be empty" # Update security config if "security" not in self.config: self.config["security"] = {} self.config["security"]["password"] = new_password # Save config try: self.save_config() return f"password now: {new_password}" except Exception as e: logger.error(f"Failed to save password: {e}") return "Error: Failed to save password" def _cmd_clear_stats(self) -> str: """Clear statistics.""" # TODO: Implement stats clearing return "Error: Not yet implemented" def _cmd_version(self) -> str: """Get version information.""" role = "room_server" if self.identity_type == "room_server" else "repeater" version = self.config.get("version", "1.0.0") return f"pyMC_{role} v{version}" # ==================== Get Commands ==================== def _cmd_get(self, param: str) -> str: """Handle get commands.""" param = param.strip() logger.debug(f"_cmd_get called with param: '{param}' (len={len(param)})") if param == "af": af = self.repeater_config.get("airtime_factor", 1.0) return f"> {af}" elif param == "name": name = self.repeater_config.get("name", "Unknown") return f"> {name}" elif param == "repeat": mode = self.repeater_config.get("mode", "forward") return f"> {'on' if mode == 'forward' else 'off'}" elif param == "lat": lat = self.repeater_config.get("latitude", 0.0) return f"> {lat}" elif param == "lon": lon = self.repeater_config.get("longitude", 0.0) return f"> {lon}" elif param == "radio": radio = self.config.get("radio", {}) freq_hz = radio.get("frequency", 915000000) bw_hz = radio.get("bandwidth", 125000) sf = radio.get("spreading_factor", 7) cr = radio.get("coding_rate", 5) # Convert Hz to MHz for freq, Hz to kHz for bandwidth (match C++ ftoa output) freq_mhz = freq_hz / 1_000_000.0 bw_khz = bw_hz / 1_000.0 return f"> {freq_mhz},{bw_khz},{sf},{cr}" elif param == "freq": freq_hz = self.config.get("radio", {}).get("frequency", 915000000) freq_mhz = freq_hz / 1_000_000.0 return f"> {freq_mhz}" elif param == "tx": power = self.config.get("radio", {}).get("tx_power", 20) return f"> {power}" elif param == "public.key": # TODO: Get from identity return "Error: Not yet implemented" elif param == "role": role = "room_server" if self.identity_type == "room_server" else "repeater" return f"> {role}" elif param == "guest.password": guest_pw = self.config.get("security", {}).get("guest_password", "") return f"> {guest_pw}" elif param == "allow.read.only": allow = self.config.get("security", {}).get("allow_read_only", False) return f"> {'on' if allow else 'off'}" elif param == "advert.interval": interval = self.repeater_config.get("advert_interval_minutes", 120) return f"> {interval}" elif param == "flood.advert.interval": interval = self.repeater_config.get("flood_advert_interval_hours", 24) return f"> {interval}" elif param == "flood.max": max_flood = self.repeater_config.get("max_flood_hops", 64) return f"> {max_flood}" elif param == "rxdelay": delay = self.repeater_config.get("rx_delay_base", 0.0) return f"> {delay}" elif param == "txdelay": delay = self.repeater_config.get("tx_delay_factor", 1.0) return f"> {delay}" elif param == "direct.txdelay": delay = self.repeater_config.get("direct_tx_delay_factor", 0.5) return f"> {delay}" elif param == "multi.acks": acks = self.repeater_config.get("multi_acks", 0) return f"> {acks}" elif param == "int.thresh": thresh = self.repeater_config.get("interference_threshold", -120) return f"> {thresh}" elif param == "agc.reset.interval": interval = self.repeater_config.get("agc_reset_interval", 0) return f"> {interval}" else: return f"??: {param}" # ==================== Set Commands ==================== def _cmd_set(self, param: str) -> str: """Handle set commands.""" parts = param.split(None, 1) if len(parts) < 2: return "Error: Missing value" key, value = parts[0], parts[1] try: if key == "af": self.repeater_config["airtime_factor"] = float(value) self.save_config() return "OK" elif key == "name": self.repeater_config["name"] = value self.save_config() return "OK" elif key == "repeat": self.repeater_config["mode"] = "forward" if value.lower() == "on" else "monitor" self.save_config() return f"OK - repeat is now {'ON' if self.repeater_config['mode'] == 'forward' else 'OFF'}" elif key == "lat": self.repeater_config["latitude"] = float(value) self.save_config() return "OK" elif key == "lon": self.repeater_config["longitude"] = float(value) self.save_config() return "OK" elif key == "radio": # Format: freq bw sf cr radio_parts = value.split() if len(radio_parts) != 4: return "Error: Expected freq bw sf cr" if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(radio_parts[0]) self.config["radio"]["bandwidth"] = float(radio_parts[1]) self.config["radio"]["spreading_factor"] = int(radio_parts[2]) self.config["radio"]["coding_rate"] = int(radio_parts[3]) self.save_config() return "OK - restart repeater to apply" elif key == "freq": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(value) self.save_config() return "OK - restart repeater to apply" elif key == "tx": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["tx_power"] = int(value) self.save_config() return "OK" elif key == "guest.password": if "security" not in self.config: self.config["security"] = {} self.config["security"]["guest_password"] = value self.save_config() return "OK" elif key == "allow.read.only": if "security" not in self.config: self.config["security"] = {} self.config["security"]["allow_read_only"] = value.lower() == "on" self.save_config() return "OK" elif key == "advert.interval": mins = int(value) if mins > 0 and (mins < 60 or mins > 240): return "Error: interval range is 60-240 minutes" self.repeater_config["advert_interval_minutes"] = mins self.save_config() return "OK" elif key == "flood.advert.interval": hours = int(value) if (hours > 0 and hours < 3) or hours > 48: return "Error: interval range is 3-48 hours" self.repeater_config["flood_advert_interval_hours"] = hours self.save_config() return "OK" elif key == "flood.max": max_val = int(value) if max_val > 64: return "Error: max 64" self.repeater_config["max_flood_hops"] = max_val self.save_config() return "OK" elif key == "rxdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["rx_delay_base"] = delay self.save_config() return "OK" elif key == "txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["tx_delay_factor"] = delay self.save_config() return "OK" elif key == "direct.txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["direct_tx_delay_factor"] = delay self.save_config() return "OK" elif key == "multi.acks": self.repeater_config["multi_acks"] = int(value) self.save_config() return "OK" elif key == "int.thresh": self.repeater_config["interference_threshold"] = int(value) self.save_config() return "OK" elif key == "agc.reset.interval": interval = int(value) # Round to nearest multiple of 4 rounded = (interval // 4) * 4 self.repeater_config["agc_reset_interval"] = rounded self.save_config() return f"OK - interval rounded to {rounded}" else: return f"unknown config: {key}" except ValueError as e: return f"Error: invalid value - {e}" except Exception as e: logger.error(f"Set command error: {e}") return f"Error: {e}" # ==================== ACL Commands ==================== def _cmd_setperm(self, command: str) -> str: """Set permissions for a public key.""" # Format: setperm {pubkey-hex} {permissions-int} parts = command[8:].split() if len(parts) < 2: return "Err - bad params" pubkey_hex = parts[0] try: permissions = int(parts[1]) except ValueError: return "Err - invalid permissions" # TODO: Apply permissions via ACL logger.info(f"setperm command: {pubkey_hex} -> {permissions}") return "Error: Not yet implemented - use config file" # ==================== Region Commands ==================== def _cmd_region(self, command: str) -> str: """Handle region commands.""" parts = command.split() if len(parts) == 1: return "Error: Region commands not implemented in Python repeater" subcommand = parts[1] if subcommand == "load": return "Error: Region commands not implemented" elif subcommand == "save": return "Error: Region commands not implemented" elif subcommand in ("allowf", "denyf", "get", "home", "put", "remove"): return "Error: Region commands not implemented" else: return "Err - ??" # ==================== Neighbor Commands ==================== def _cmd_neighbors(self) -> str: """List neighbors.""" # TODO: Get neighbors from routing table return "Error: Not yet implemented" def _cmd_neighbor_remove(self, command: str) -> str: """Remove a neighbor.""" pubkey_hex = command[16:].strip() if not pubkey_hex: return "ERR: Missing pubkey" # TODO: Remove neighbor from routing table logger.info(f"neighbor.remove: {pubkey_hex}") return "Error: Not yet implemented" # ==================== Temporary Radio Commands ==================== def _cmd_tempradio(self, command: str) -> str: """Apply temporary radio parameters.""" # Format: tempradio {freq} {bw} {sf} {cr} {timeout_mins} parts = command[10:].split() if len(parts) < 5: return "Error: Expected freq bw sf cr timeout_mins" try: freq = float(parts[0]) bw = float(parts[1]) sf = int(parts[2]) cr = int(parts[3]) timeout_mins = int(parts[4]) # Validate if not (300.0 <= freq <= 2500.0): return "Error: invalid frequency" if not (7.0 <= bw <= 500.0): return "Error: invalid bandwidth" if not (5 <= sf <= 12): return "Error: invalid spreading factor" if not (5 <= cr <= 8): return "Error: invalid coding rate" if timeout_mins <= 0: return "Error: invalid timeout" # TODO: Apply temporary radio parameters logger.info(f"tempradio: {freq}MHz {bw}kHz SF{sf} CR4/{cr} for {timeout_mins}min") return "Error: Not yet implemented" except ValueError: return "Error, invalid params" # ==================== Logging Commands ==================== def _cmd_log(self, command: str) -> str: """Handle log commands.""" if command == "log start": # TODO: Enable logging return "Error: Not yet implemented" elif command == "log stop": # TODO: Disable logging return "Error: Not yet implemented" elif command == "log erase": # TODO: Clear log file return "Error: Not yet implemented" elif command == "log": return "Error: Use journalctl to view logs" else: return "Unknown log command" # Backward compatibility alias RepeaterCLI = MeshCLI