import logging import time from pathlib import Path from typing import Any, Callable, Dict, Optional import yaml logger = logging.getLogger(__name__) class MeshCLI: def __init__( self, config_path: str, config: Dict[str, Any], config_manager, # ConfigManager instance for save & live updates identity_type: str = "repeater", enable_regions: bool = True, send_advert_callback: Optional[Callable] = None, identity=None, storage_handler=None, ): self.config_path = Path(config_path) self.config = config self.config_manager = config_manager self.identity_type = identity_type self.enable_regions = enable_regions self.send_advert_callback = send_advert_callback self.identity = identity self.storage_handler = storage_handler # Store event loop reference for thread-safe scheduling import asyncio try: self._event_loop = asyncio.get_running_loop() except RuntimeError: self._event_loop = None # Get repeater config shortcut self.repeater_config = config.get("repeater", {}) def handle_command(self, sender_pubkey: bytes, command: str, is_admin: bool) -> str: # Check admin permission first if not is_admin: return "Error: Admin permission required" logger.debug(f"handle_command received: '{command}' (len={len(command)})") # Extract optional sequence prefix (XX|) prefix = "" if len(command) > 4 and command[2] == "|": prefix = command[:3] command = command[3:] logger.debug(f"Extracted prefix: '{prefix}', remaining command: '{command}'") # Strip leading/trailing whitespace command = command.strip() logger.debug(f"After strip: '{command}'") # Route to appropriate handler reply = self._route_command(command) # Add prefix back to reply if present if prefix: return prefix + reply return reply def _route_command(self, command: str) -> str: # Help if command == "help" or command.startswith("help "): return self._cmd_help(command) # System commands elif command == "reboot": return self._cmd_reboot() elif command == "advert": return self._cmd_advert() elif command.startswith("clock"): return self._cmd_clock(command) elif command.startswith("time "): return self._cmd_time(command) elif command == "start ota": return "Error: OTA not supported in Python repeater" elif command.startswith("password "): return self._cmd_password(command) elif command == "clear stats": return self._cmd_clear_stats() elif command == "ver": return self._cmd_version() # Get commands elif command.startswith("get "): return self._cmd_get(command[4:]) # Set commands elif command.startswith("set "): return self._cmd_set(command[4:]) # ACL commands elif command.startswith("setperm "): return self._cmd_setperm(command) elif command == "get acl": return "Error: Use 'get acl' via serial console only" # Region commands (repeaters only) elif command.startswith("region"): if self.enable_regions: return self._cmd_region(command) else: return "Error: Region commands not available for room servers" # Neighbor commands elif command == "neighbors": return self._cmd_neighbors() elif command.startswith("neighbor.remove "): return self._cmd_neighbor_remove(command) # Temporary radio params elif command.startswith("tempradio "): return self._cmd_tempradio(command) # Sensor commands elif command.startswith("sensor "): return "Error: Sensor commands not implemented in Python repeater" # GPS commands elif command.startswith("gps"): return "Error: GPS commands not implemented in Python repeater" # Logging commands elif command.startswith("log "): return self._cmd_log(command) # Statistics commands elif command.startswith("stats-"): return "Error: Stats commands not fully implemented yet" else: return "Unknown command" # ==================== Help Command ==================== def _cmd_help(self, command: str) -> str: """Show available commands or detailed help for a specific command.""" parts = command.split(None, 1) if len(parts) == 2: return self._help_detail(parts[1]) lines = [ "=== pyMC CLI Commands ===", "", "System:", " reboot Restart the repeater service", " advert Send self advertisement", " clock Show current UTC time", " clock sync Sync clock (no-op, uses system time)", " ver Show version info", " password Change admin password", " clear stats Clear statistics", "", "Get:", " get name Node name", " get radio Radio params (freq,bw,sf,cr)", " get freq Frequency (MHz)", " get tx TX power", " get af Airtime factor", " get repeat Repeat mode (on/off)", " get lat / get lon GPS coordinates", " get role Identity role", " get guest.password Guest password", " get allow.read.only Read-only access setting", " get advert.interval Advert interval (minutes)", " get flood.advert.interval Flood advert interval (hours)", " get flood.max Max flood hops", " get rxdelay RX delay base", " get txdelay TX delay factor", " get direct.txdelay Direct TX delay factor", " get multi.acks Multi-ack count", " get int.thresh Interference threshold", " get agc.reset.interval AGC reset interval", "", "Set: (use 'help set' for details)", " set ", "", "Other:", " neighbors List neighbors", " neighbor.remove Remove neighbor by pubkey", " tempradio ", " setperm Set ACL permissions", " log start|stop|erase Logging control", ] if self.enable_regions: lines.append(" region ... Region commands") lines += ["", "Type 'help ' for details on a specific command."] return "\n".join(lines) def _help_detail(self, topic: str) -> str: """Return detailed help for a specific command topic.""" topic = topic.strip() details = { "set": ( "Set commands \u2014 set :\n" " set name Set node name\n" " set radio Set radio (restart required)\n" " set freq Set frequency (restart required)\n" " set tx Set TX power\n" " set af Airtime factor\n" " set repeat on|off Enable/disable repeating\n" " set lat Latitude\n" " set lon Longitude\n" " set guest.password Guest password\n" " set allow.read.only on|off Read-only access\n" " set advert.interval 60-240 minutes\n" " set flood.advert.interval
3-48 hours\n" " set flood.max Max flood hops (max 64)\n" " set rxdelay RX delay base (>=0)\n" " set txdelay TX delay factor (>=0)\n" " set direct.txdelay Direct TX delay (>=0)\n" " set multi.acks Multi-ack count\n" " set int.thresh Interference threshold\n" " set agc.reset.interval AGC reset (rounded to x4)" ), "get": "Get commands \u2014 type 'help' to see all 'get' parameters.", "reboot": "Restart the repeater service via systemd.", "advert": "Trigger a self-advertisement flood packet.", "clock": "'clock' shows UTC time. 'clock sync' is a no-op (system time used).", "ver": "Show repeater version and identity type.", "password": "password \u2014 Change the admin password.", "tempradio": ( "tempradio \n" " Apply temporary radio parameters that revert after timeout.\n" " freq: 300-2500 MHz, bw: 7-500 kHz, sf: 5-12, cr: 5-8" ), "neighbors": "List known neighbor nodes from the routing table.", "setperm": "setperm \u2014 Set ACL permissions for a node.", "log": "log start|stop|erase \u2014 Control logging.", } return details.get(topic, f"No detailed help for '{topic}'. Type 'help' for command list.") # ==================== System Commands ==================== def _cmd_reboot(self) -> str: """Reboot the repeater process.""" from repeater.service_utils import restart_service logger.warning("Reboot command received via mesh CLI") success, message = restart_service() if success: return f"OK - {message}" else: return f"Error: {message}" def _cmd_advert(self) -> str: """Send self advertisement.""" if not self.send_advert_callback: logger.warning("Advert command received but no callback configured") return "Error: Advert functionality not configured" try: import asyncio async def delayed_advert(): """Delay advert to let CLI response send first (matches C++ 1500ms delay).""" await asyncio.sleep(1.5) await self.send_advert_callback() if self._event_loop and self._event_loop.is_running(): asyncio.run_coroutine_threadsafe(delayed_advert(), self._event_loop) else: return "Error: Event loop not available" logger.info("Advert scheduled for sending (1.5s delay)") return "OK - Advert sent" except Exception as e: logger.error(f"Failed to schedule advert: {e}", exc_info=True) return f"Error: {e}" def _cmd_clock(self, command: str) -> str: """Handle clock commands.""" if command == "clock": # Display current time import datetime dt = datetime.datetime.utcnow() return f"{dt.hour:02d}:{dt.minute:02d} - {dt.day}/{dt.month}/{dt.year} UTC" elif command == "clock sync": # Clock sync happens automatically via sender_timestamp in protocol return "OK - clock sync not needed (system time used)" else: return "Unknown clock command" def _cmd_time(self, command: str) -> str: """Set time - not supported in Python (use system time).""" return "Error: Time setting not supported (system time is used)" def _cmd_password(self, command: str) -> str: """Change admin password.""" new_password = command[9:].strip() if not new_password: return "Error: Password cannot be empty" # Update security config if "security" not in self.config: self.config["security"] = {} self.config["security"]["password"] = new_password # Save config and live update try: saved, err = self.config_manager.save_to_file() if not saved: logger.error(f"Failed to save password: {err}") return f"Error: Failed to save config: {err}" self.config_manager.live_update_daemon(["security"]) return f"password now: {new_password}" except Exception as e: logger.error(f"Failed to save password: {e}") return "Error: Failed to save password" def _cmd_clear_stats(self) -> str: """Clear statistics.""" # TODO: Implement stats clearing return "Error: Not yet implemented" def _cmd_version(self) -> str: """Get version information.""" role = "room_server" if self.identity_type == "room_server" else "repeater" version = self.config.get("version", "1.0.0") return f"pyMC_{role} v{version}" # ==================== Get Commands ==================== def _cmd_get(self, param: str) -> str: """Handle get commands.""" param = param.strip() logger.debug(f"_cmd_get called with param: '{param}' (len={len(param)})") if param == "af": af = self.repeater_config.get("airtime_factor", 1.0) return f"> {af}" elif param == "name": name = self.repeater_config.get("name", "Unknown") return f"> {name}" elif param == "repeat": mode = self.repeater_config.get("mode", "forward") return f"> {'on' if mode == 'forward' else 'off'}" elif param == "lat": lat = self.repeater_config.get("latitude", 0.0) return f"> {lat}" elif param == "lon": lon = self.repeater_config.get("longitude", 0.0) return f"> {lon}" elif param == "radio": radio = self.config.get("radio", {}) freq_hz = radio.get("frequency", 915000000) bw_hz = radio.get("bandwidth", 125000) sf = radio.get("spreading_factor", 7) cr = radio.get("coding_rate", 5) # Convert Hz to MHz for freq, Hz to kHz for bandwidth (match C++ ftoa output) freq_mhz = freq_hz / 1_000_000.0 bw_khz = bw_hz / 1_000.0 return f"> {freq_mhz},{bw_khz},{sf},{cr}" elif param == "freq": freq_hz = self.config.get("radio", {}).get("frequency", 915000000) freq_mhz = freq_hz / 1_000_000.0 return f"> {freq_mhz}" elif param == "tx": power = self.config.get("radio", {}).get("tx_power", 20) return f"> {power}" elif param == "public.key": if not self.identity: return "Error: Identity not available" try: pubkey = self.identity.get_public_key() pubkey_hex = pubkey.hex() return f"> {pubkey_hex}" except Exception as e: logger.error(f"Failed to get public key: {e}") return f"Error: {e}" elif param == "role": role = "room_server" if self.identity_type == "room_server" else "repeater" return f"> {role}" elif param == "guest.password": guest_pw = self.config.get("security", {}).get("guest_password", "") return f"> {guest_pw}" elif param == "allow.read.only": allow = self.config.get("security", {}).get("allow_read_only", False) return f"> {'on' if allow else 'off'}" elif param == "advert.interval": interval = self.repeater_config.get("advert_interval_minutes", 120) return f"> {interval}" elif param == "flood.advert.interval": interval = self.repeater_config.get("flood_advert_interval_hours", 24) return f"> {interval}" elif param == "flood.max": max_flood = self.repeater_config.get("max_flood_hops", 3) return f"> {max_flood}" elif param == "rxdelay": delay = self.repeater_config.get("rx_delay_base", 0.0) return f"> {delay}" elif param == "txdelay": delay = self.repeater_config.get("tx_delay_factor", 1.0) return f"> {delay}" elif param == "direct.txdelay": delay = self.repeater_config.get("direct_tx_delay_factor", 0.5) return f"> {delay}" elif param == "multi.acks": acks = self.repeater_config.get("multi_acks", 0) return f"> {acks}" elif param == "int.thresh": thresh = self.repeater_config.get("interference_threshold", -120) return f"> {thresh}" elif param == "agc.reset.interval": interval = self.repeater_config.get("agc_reset_interval", 0) return f"> {interval}" else: return f"??: {param}" # ==================== Set Commands ==================== def _cmd_set(self, param: str) -> str: """Handle set commands.""" parts = param.split(None, 1) if len(parts) < 2: return "Error: Missing value" key, value = parts[0], parts[1] try: if key == "af": self.repeater_config["airtime_factor"] = float(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "name": self.repeater_config["node_name"] = value saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "repeat": self.repeater_config["mode"] = "forward" if value.lower() == "on" else "monitor" saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return f"OK - repeat is now {'ON' if self.repeater_config['mode'] == 'forward' else 'OFF'}" elif key == "lat": self.repeater_config["latitude"] = float(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "lon": self.repeater_config["longitude"] = float(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "radio": # Format: freq bw sf cr radio_parts = value.split() if len(radio_parts) != 4: return "Error: Expected freq bw sf cr" if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(radio_parts[0]) self.config["radio"]["bandwidth"] = float(radio_parts[1]) self.config["radio"]["spreading_factor"] = int(radio_parts[2]) self.config["radio"]["coding_rate"] = int(radio_parts[3]) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["radio"]) return "OK - restart repeater to apply" elif key == "freq": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["frequency"] = float(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["radio"]) return "OK - restart repeater to apply" elif key == "tx": if "radio" not in self.config: self.config["radio"] = {} self.config["radio"]["tx_power"] = int(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["radio"]) return "OK" elif key == "guest.password": if "security" not in self.config: self.config["security"] = {} self.config["security"]["guest_password"] = value saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["security"]) return "OK" elif key == "allow.read.only": if "security" not in self.config: self.config["security"] = {} self.config["security"]["allow_read_only"] = value.lower() == "on" saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["security"]) return "OK" elif key == "advert.interval": mins = int(value) if mins > 0 and (mins < 60 or mins > 240): return "Error: interval range is 60-240 minutes" self.repeater_config["advert_interval_minutes"] = mins saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "flood.advert.interval": hours = int(value) if (hours > 0 and hours < 3) or hours > 48: return "Error: interval range is 3-48 hours" self.repeater_config["flood_advert_interval_hours"] = hours saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "flood.max": max_val = int(value) if max_val > 64: return "Error: max 64" self.repeater_config["max_flood_hops"] = max_val saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "rxdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["rx_delay_base"] = delay saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater", "delays"]) return "OK" elif key == "txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["tx_delay_factor"] = delay saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater", "delays"]) return "OK" elif key == "direct.txdelay": delay = float(value) if delay < 0: return "Error: cannot be negative" self.repeater_config["direct_tx_delay_factor"] = delay saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater", "delays"]) return "OK" elif key == "multi.acks": self.repeater_config["multi_acks"] = int(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "int.thresh": self.repeater_config["interference_threshold"] = int(value) saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return "OK" elif key == "agc.reset.interval": interval = int(value) # Round to nearest multiple of 4 rounded = (interval // 4) * 4 self.repeater_config["agc_reset_interval"] = rounded saved, _ = self.config_manager.save_to_file() self.config_manager.live_update_daemon(["repeater"]) return f"OK - interval rounded to {rounded}" else: return f"unknown config: {key}" except ValueError as e: return f"Error: invalid value - {e}" except Exception as e: logger.error(f"Set command error: {e}") return f"Error: {e}" # ==================== ACL Commands ==================== def _cmd_setperm(self, command: str) -> str: """Set permissions for a public key.""" # Format: setperm {pubkey-hex} {permissions-int} parts = command[8:].split() if len(parts) < 2: return "Err - bad params" pubkey_hex = parts[0] try: permissions = int(parts[1]) except ValueError: return "Err - invalid permissions" # TODO: Apply permissions via ACL logger.info(f"setperm command: {pubkey_hex} -> {permissions}") return "Error: Not yet implemented - use config file" # ==================== Region Commands ==================== def _cmd_region(self, command: str) -> str: """Handle region commands.""" parts = command.split() if len(parts) == 1: return "Error: Region commands not implemented in Python repeater" subcommand = parts[1] if subcommand == "load": return "Error: Region commands not implemented" elif subcommand == "save": return "Error: Region commands not implemented" elif subcommand in ("allowf", "denyf", "get", "home", "put", "remove"): return "Error: Region commands not implemented" else: return "Err - ??" # ==================== Neighbor Commands ==================== def _cmd_neighbors(self) -> str: """List neighbors.""" if not self.storage_handler: return "Error: Storage not available" try: neighbors = self.storage_handler.get_neighbors() if not neighbors: return "No neighbors discovered yet" # Filter to only show repeaters and zero hop nodes filtered_neighbors = { pubkey: info for pubkey, info in neighbors.items() if info.get("is_repeater", False) or info.get("zero_hop", False) } if not filtered_neighbors: return "No repeaters or zero hop neighbors discovered yet" # Format output similar to C++ version # Format: " heard Xs ago" import time current_time = int(time.time()) lines = [] for pubkey, info in filtered_neighbors.items(): last_seen = info.get("last_seen", 0) seconds_ago = int(current_time - last_seen) # Get first 4 bytes of pubkey as hex (match C++ format) pubkey_short = pubkey[:8] if len(pubkey) >= 8 else pubkey snr = info.get("snr", 0) or 0 # Format: <4byte_hex>:: (matches C++ format) lines.append(f"{pubkey_short}:{seconds_ago}:{int(snr)}") return "\n".join(lines) except Exception as e: logger.error(f"Failed to list neighbors: {e}", exc_info=True) return f"Error: {e}" def _cmd_neighbor_remove(self, command: str) -> str: """Remove a neighbor.""" pubkey_hex = command[16:].strip() if not pubkey_hex: return "ERR: Missing pubkey" # TODO: Remove neighbor from routing table logger.info(f"neighbor.remove: {pubkey_hex}") return "Error: Not yet implemented" # ==================== Temporary Radio Commands ==================== def _cmd_tempradio(self, command: str) -> str: """Apply temporary radio parameters.""" # Format: tempradio {freq} {bw} {sf} {cr} {timeout_mins} parts = command[10:].split() if len(parts) < 5: return "Error: Expected freq bw sf cr timeout_mins" try: freq = float(parts[0]) bw = float(parts[1]) sf = int(parts[2]) cr = int(parts[3]) timeout_mins = int(parts[4]) # Validate if not (300.0 <= freq <= 2500.0): return "Error: invalid frequency" if not (7.0 <= bw <= 500.0): return "Error: invalid bandwidth" if not (5 <= sf <= 12): return "Error: invalid spreading factor" if not (5 <= cr <= 8): return "Error: invalid coding rate" if timeout_mins <= 0: return "Error: invalid timeout" # TODO: Apply temporary radio parameters logger.info(f"tempradio: {freq}MHz {bw}kHz SF{sf} CR4/{cr} for {timeout_mins}min") return "Error: Not yet implemented" except ValueError: return "Error, invalid params" # ==================== Logging Commands ==================== def _cmd_log(self, command: str) -> str: """Handle log commands.""" if command == "log start": # TODO: Enable logging return "Error: Not yet implemented" elif command == "log stop": # TODO: Disable logging return "Error: Not yet implemented" elif command == "log erase": # TODO: Clear log file return "Error: Not yet implemented" elif command == "log": return "Error: Use journalctl to view logs" else: return "Unknown log command"