Files
pyMC_Repeater/repeater/handler_helpers/repeater_cli.py

703 lines
26 KiB
Python

"""
Mesh CLI Handler
Handles administrative commands sent to repeaters and room servers via TXT_MSG packets.
Only users with admin permissions (via ACL) can execute these commands.
"""
import logging
import time
from pathlib import Path
from typing import Any, Callable, Dict, Optional
import yaml
logger = logging.getLogger(__name__)
class MeshCLI:
"""
CLI command handler for mesh node administration (repeaters and room servers).
Commands follow the format: XX|command params
where XX is an optional sequence number that gets echoed in the reply.
"""
def __init__(
self,
config_path: str,
config: Dict[str, Any],
save_config_callback: Callable,
identity_type: str = "repeater",
enable_regions: bool = True,
):
"""
Initialize the CLI handler.
Args:
config_path: Path to the config.yaml file
config: Current configuration dictionary
save_config_callback: Callback to save config changes
identity_type: Type of identity ('repeater' or 'room_server')
enable_regions: Whether to enable region commands (only for repeaters)
"""
self.config_path = Path(config_path)
self.config = config
self.save_config = save_config_callback
self.identity_type = identity_type
self.enable_regions = enable_regions
# Get repeater config shortcut
self.repeater_config = config.get("repeater", {})
def handle_command(self, sender_pubkey: bytes, command: str, is_admin: bool) -> str:
"""
Handle an incoming command from a client.
Args:
sender_pubkey: Public key of sender
command: Command string (may include XX| prefix)
is_admin: Whether sender has admin permissions
Returns:
Reply string to send back to sender
"""
# Check admin permission first
if not is_admin:
return "Error: Admin permission required"
logger.debug(f"handle_command received: '{command}' (len={len(command)})")
# Extract optional sequence prefix (XX|)
prefix = ""
if len(command) > 4 and command[2] == "|":
prefix = command[:3]
command = command[3:]
logger.debug(f"Extracted prefix: '{prefix}', remaining command: '{command}'")
# Strip leading/trailing whitespace
command = command.strip()
logger.debug(f"After strip: '{command}'")
# Route to appropriate handler
reply = self._route_command(command)
# Add prefix back to reply if present
if prefix:
return prefix + reply
return reply
def _route_command(self, command: str) -> str:
"""Route command to appropriate handler method."""
# Help
if command == "help" or command.startswith("help "):
return self._cmd_help(command)
# System commands
elif command == "reboot":
return self._cmd_reboot()
elif command == "advert":
return self._cmd_advert()
elif command.startswith("clock"):
return self._cmd_clock(command)
elif command.startswith("time "):
return self._cmd_time(command)
elif command == "start ota":
return "Error: OTA not supported in Python repeater"
elif command.startswith("password "):
return self._cmd_password(command)
elif command == "clear stats":
return self._cmd_clear_stats()
elif command == "ver":
return self._cmd_version()
# Get commands
elif command.startswith("get "):
return self._cmd_get(command[4:])
# Set commands
elif command.startswith("set "):
return self._cmd_set(command[4:])
# ACL commands
elif command.startswith("setperm "):
return self._cmd_setperm(command)
elif command == "get acl":
return "Error: Use 'get acl' via serial console only"
# Region commands (repeaters only)
elif command.startswith("region"):
if self.enable_regions:
return self._cmd_region(command)
else:
return "Error: Region commands not available for room servers"
# Neighbor commands
elif command == "neighbors":
return self._cmd_neighbors()
elif command.startswith("neighbor.remove "):
return self._cmd_neighbor_remove(command)
# Temporary radio params
elif command.startswith("tempradio "):
return self._cmd_tempradio(command)
# Sensor commands
elif command.startswith("sensor "):
return "Error: Sensor commands not implemented in Python repeater"
# GPS commands
elif command.startswith("gps"):
return "Error: GPS commands not implemented in Python repeater"
# Logging commands
elif command.startswith("log "):
return self._cmd_log(command)
# Statistics commands
elif command.startswith("stats-"):
return "Error: Stats commands not fully implemented yet"
else:
return "Unknown command"
# ==================== Help Command ====================
def _cmd_help(self, command: str) -> str:
"""Show available commands or detailed help for a specific command."""
parts = command.split(None, 1)
if len(parts) == 2:
return self._help_detail(parts[1])
lines = [
"=== pyMC CLI Commands ===",
"",
"System:",
" reboot Restart the repeater service",
" advert Send self advertisement",
" clock Show current UTC time",
" clock sync Sync clock (no-op, uses system time)",
" ver Show version info",
" password <pw> Change admin password",
" clear stats Clear statistics",
"",
"Get:",
" get name Node name",
" get radio Radio params (freq,bw,sf,cr)",
" get freq Frequency (MHz)",
" get tx TX power",
" get af Airtime factor",
" get repeat Repeat mode (on/off)",
" get lat / get lon GPS coordinates",
" get role Identity role",
" get guest.password Guest password",
" get allow.read.only Read-only access setting",
" get advert.interval Advert interval (minutes)",
" get flood.advert.interval Flood advert interval (hours)",
" get flood.max Max flood hops",
" get rxdelay RX delay base",
" get txdelay TX delay factor",
" get direct.txdelay Direct TX delay factor",
" get multi.acks Multi-ack count",
" get int.thresh Interference threshold",
" get agc.reset.interval AGC reset interval",
"",
"Set: (use 'help set' for details)",
" set <param> <value>",
"",
"Other:",
" neighbors List neighbors",
" neighbor.remove <key> Remove neighbor by pubkey",
" tempradio <freq> <bw> <sf> <cr> <timeout_mins>",
" setperm <pubkey> <perm> Set ACL permissions",
" log start|stop|erase Logging control",
]
if self.enable_regions:
lines.append(" region ... Region commands")
lines += ["", "Type 'help <command>' for details on a specific command."]
return "\n".join(lines)
def _help_detail(self, topic: str) -> str:
"""Return detailed help for a specific command topic."""
topic = topic.strip()
details = {
"set": (
"Set commands — set <param> <value>:\n"
" set name <name> Set node name\n"
" set radio <f> <bw> <sf> <cr> Set radio (restart required)\n"
" set freq <mhz> Set frequency (restart required)\n"
" set tx <power> Set TX power\n"
" set af <factor> Airtime factor\n"
" set repeat on|off Enable/disable repeating\n"
" set lat <deg> Latitude\n"
" set lon <deg> Longitude\n"
" set guest.password <pw> Guest password\n"
" set allow.read.only on|off Read-only access\n"
" set advert.interval <min> 60-240 minutes\n"
" set flood.advert.interval <hr> 3-48 hours\n"
" set flood.max <hops> Max flood hops (max 64)\n"
" set rxdelay <val> RX delay base (>=0)\n"
" set txdelay <val> TX delay factor (>=0)\n"
" set direct.txdelay <val> Direct TX delay (>=0)\n"
" set multi.acks <n> Multi-ack count\n"
" set int.thresh <dbm> Interference threshold\n"
" set agc.reset.interval <n> AGC reset (rounded to x4)"
),
"get": "Get commands — type 'help' to see all 'get' parameters.",
"reboot": "Restart the repeater service via systemd.",
"advert": "Trigger a self-advertisement flood packet.",
"clock": "'clock' shows UTC time. 'clock sync' is a no-op (system time used).",
"ver": "Show repeater version and identity type.",
"password": "password <new_password> — Change the admin password.",
"tempradio": (
"tempradio <freq_mhz> <bw_khz> <sf> <cr> <timeout_mins>\n"
" Apply temporary radio parameters that revert after timeout.\n"
" freq: 300-2500 MHz, bw: 7-500 kHz, sf: 5-12, cr: 5-8"
),
"neighbors": "List known neighbor nodes from the routing table.",
"setperm": "setperm <pubkey_hex> <permission_int> — Set ACL permissions for a node.",
"log": "log start|stop|erase — Control logging.",
}
return details.get(topic, f"No detailed help for '{topic}'. Type 'help' for command list.")
# ==================== System Commands ==
def _cmd_reboot(self) -> str:
"""Reboot the repeater process."""
from repeater.service_utils import restart_service
logger.warning("Reboot command received via repeater CLI")
success, message = restart_service()
if success:
return f"OK - {message}"
else:
return f"Error: {message}"
def _cmd_advert(self) -> str:
"""Send self advertisement."""
logger.info("Advert command received")
# TODO: Trigger advertisement through packet handler
return "Error: Not yet implemented"
def _cmd_clock(self, command: str) -> str:
"""Handle clock commands."""
if command == "clock":
# Display current time
import datetime
dt = datetime.datetime.utcnow()
return f"{dt.hour:02d}:{dt.minute:02d} - {dt.day}/{dt.month}/{dt.year} UTC"
elif command == "clock sync":
# Clock sync happens automatically via sender_timestamp in protocol
return "OK - clock sync not needed (system time used)"
else:
return "Unknown clock command"
def _cmd_time(self, command: str) -> str:
"""Set time - not supported in Python (use system time)."""
return "Error: Time setting not supported (system time is used)"
def _cmd_password(self, command: str) -> str:
"""Change admin password."""
new_password = command[9:].strip()
if not new_password:
return "Error: Password cannot be empty"
# Update security config
if "security" not in self.config:
self.config["security"] = {}
self.config["security"]["password"] = new_password
# Save config
try:
self.save_config()
return f"password now: {new_password}"
except Exception as e:
logger.error(f"Failed to save password: {e}")
return "Error: Failed to save password"
def _cmd_clear_stats(self) -> str:
"""Clear statistics."""
# TODO: Implement stats clearing
return "Error: Not yet implemented"
def _cmd_version(self) -> str:
"""Get version information."""
role = "room_server" if self.identity_type == "room_server" else "repeater"
version = self.config.get("version", "1.0.0")
return f"pyMC_{role} v{version}"
# ==================== Get Commands ====================
def _cmd_get(self, param: str) -> str:
"""Handle get commands."""
param = param.strip()
logger.debug(f"_cmd_get called with param: '{param}' (len={len(param)})")
if param == "af":
af = self.repeater_config.get("airtime_factor", 1.0)
return f"> {af}"
elif param == "name":
name = self.repeater_config.get("name", "Unknown")
return f"> {name}"
elif param == "repeat":
mode = self.repeater_config.get("mode", "forward")
return f"> {'on' if mode == 'forward' else 'off'}"
elif param == "lat":
lat = self.repeater_config.get("latitude", 0.0)
return f"> {lat}"
elif param == "lon":
lon = self.repeater_config.get("longitude", 0.0)
return f"> {lon}"
elif param == "radio":
radio = self.config.get("radio", {})
freq_hz = radio.get("frequency", 915000000)
bw_hz = radio.get("bandwidth", 125000)
sf = radio.get("spreading_factor", 7)
cr = radio.get("coding_rate", 5)
# Convert Hz to MHz for freq, Hz to kHz for bandwidth (match C++ ftoa output)
freq_mhz = freq_hz / 1_000_000.0
bw_khz = bw_hz / 1_000.0
return f"> {freq_mhz},{bw_khz},{sf},{cr}"
elif param == "freq":
freq_hz = self.config.get("radio", {}).get("frequency", 915000000)
freq_mhz = freq_hz / 1_000_000.0
return f"> {freq_mhz}"
elif param == "tx":
power = self.config.get("radio", {}).get("tx_power", 20)
return f"> {power}"
elif param == "public.key":
# TODO: Get from identity
return "Error: Not yet implemented"
elif param == "role":
role = "room_server" if self.identity_type == "room_server" else "repeater"
return f"> {role}"
elif param == "guest.password":
guest_pw = self.config.get("security", {}).get("guest_password", "")
return f"> {guest_pw}"
elif param == "allow.read.only":
allow = self.config.get("security", {}).get("allow_read_only", False)
return f"> {'on' if allow else 'off'}"
elif param == "advert.interval":
interval = self.repeater_config.get("advert_interval_minutes", 120)
return f"> {interval}"
elif param == "flood.advert.interval":
interval = self.repeater_config.get("flood_advert_interval_hours", 24)
return f"> {interval}"
elif param == "flood.max":
max_flood = self.repeater_config.get("max_flood_hops", 3)
return f"> {max_flood}"
elif param == "rxdelay":
delay = self.repeater_config.get("rx_delay_base", 0.0)
return f"> {delay}"
elif param == "txdelay":
delay = self.repeater_config.get("tx_delay_factor", 1.0)
return f"> {delay}"
elif param == "direct.txdelay":
delay = self.repeater_config.get("direct_tx_delay_factor", 0.5)
return f"> {delay}"
elif param == "multi.acks":
acks = self.repeater_config.get("multi_acks", 0)
return f"> {acks}"
elif param == "int.thresh":
thresh = self.repeater_config.get("interference_threshold", -120)
return f"> {thresh}"
elif param == "agc.reset.interval":
interval = self.repeater_config.get("agc_reset_interval", 0)
return f"> {interval}"
else:
return f"??: {param}"
# ==================== Set Commands ====================
def _cmd_set(self, param: str) -> str:
"""Handle set commands."""
parts = param.split(None, 1)
if len(parts) < 2:
return "Error: Missing value"
key, value = parts[0], parts[1]
try:
if key == "af":
self.repeater_config["airtime_factor"] = float(value)
self.save_config()
return "OK"
elif key == "name":
self.repeater_config["name"] = value
self.save_config()
return "OK"
elif key == "repeat":
self.repeater_config["mode"] = "forward" if value.lower() == "on" else "monitor"
self.save_config()
return f"OK - repeat is now {'ON' if self.repeater_config['mode'] == 'forward' else 'OFF'}"
elif key == "lat":
self.repeater_config["latitude"] = float(value)
self.save_config()
return "OK"
elif key == "lon":
self.repeater_config["longitude"] = float(value)
self.save_config()
return "OK"
elif key == "radio":
# Format: freq bw sf cr
radio_parts = value.split()
if len(radio_parts) != 4:
return "Error: Expected freq bw sf cr"
if "radio" not in self.config:
self.config["radio"] = {}
self.config["radio"]["frequency"] = float(radio_parts[0])
self.config["radio"]["bandwidth"] = float(radio_parts[1])
self.config["radio"]["spreading_factor"] = int(radio_parts[2])
self.config["radio"]["coding_rate"] = int(radio_parts[3])
self.save_config()
return "OK - restart repeater to apply"
elif key == "freq":
if "radio" not in self.config:
self.config["radio"] = {}
self.config["radio"]["frequency"] = float(value)
self.save_config()
return "OK - restart repeater to apply"
elif key == "tx":
if "radio" not in self.config:
self.config["radio"] = {}
self.config["radio"]["tx_power"] = int(value)
self.save_config()
return "OK"
elif key == "guest.password":
if "security" not in self.config:
self.config["security"] = {}
self.config["security"]["guest_password"] = value
self.save_config()
return "OK"
elif key == "allow.read.only":
if "security" not in self.config:
self.config["security"] = {}
self.config["security"]["allow_read_only"] = value.lower() == "on"
self.save_config()
return "OK"
elif key == "advert.interval":
mins = int(value)
if mins > 0 and (mins < 60 or mins > 240):
return "Error: interval range is 60-240 minutes"
self.repeater_config["advert_interval_minutes"] = mins
self.save_config()
return "OK"
elif key == "flood.advert.interval":
hours = int(value)
if (hours > 0 and hours < 3) or hours > 48:
return "Error: interval range is 3-48 hours"
self.repeater_config["flood_advert_interval_hours"] = hours
self.save_config()
return "OK"
elif key == "flood.max":
max_val = int(value)
if max_val > 64:
return "Error: max 64"
self.repeater_config["max_flood_hops"] = max_val
self.save_config()
return "OK"
elif key == "rxdelay":
delay = float(value)
if delay < 0:
return "Error: cannot be negative"
self.repeater_config["rx_delay_base"] = delay
self.save_config()
return "OK"
elif key == "txdelay":
delay = float(value)
if delay < 0:
return "Error: cannot be negative"
self.repeater_config["tx_delay_factor"] = delay
self.save_config()
return "OK"
elif key == "direct.txdelay":
delay = float(value)
if delay < 0:
return "Error: cannot be negative"
self.repeater_config["direct_tx_delay_factor"] = delay
self.save_config()
return "OK"
elif key == "multi.acks":
self.repeater_config["multi_acks"] = int(value)
self.save_config()
return "OK"
elif key == "int.thresh":
self.repeater_config["interference_threshold"] = int(value)
self.save_config()
return "OK"
elif key == "agc.reset.interval":
interval = int(value)
# Round to nearest multiple of 4
rounded = (interval // 4) * 4
self.repeater_config["agc_reset_interval"] = rounded
self.save_config()
return f"OK - interval rounded to {rounded}"
else:
return f"unknown config: {key}"
except ValueError as e:
return f"Error: invalid value - {e}"
except Exception as e:
logger.error(f"Set command error: {e}")
return f"Error: {e}"
# ==================== ACL Commands ====================
def _cmd_setperm(self, command: str) -> str:
"""Set permissions for a public key."""
# Format: setperm {pubkey-hex} {permissions-int}
parts = command[8:].split()
if len(parts) < 2:
return "Err - bad params"
pubkey_hex = parts[0]
try:
permissions = int(parts[1])
except ValueError:
return "Err - invalid permissions"
# TODO: Apply permissions via ACL
logger.info(f"setperm command: {pubkey_hex} -> {permissions}")
return "Error: Not yet implemented - use config file"
# ==================== Region Commands ====================
def _cmd_region(self, command: str) -> str:
"""Handle region commands."""
parts = command.split()
if len(parts) == 1:
return "Error: Region commands not implemented in Python repeater"
subcommand = parts[1]
if subcommand == "load":
return "Error: Region commands not implemented"
elif subcommand == "save":
return "Error: Region commands not implemented"
elif subcommand in ("allowf", "denyf", "get", "home", "put", "remove"):
return "Error: Region commands not implemented"
else:
return "Err - ??"
# ==================== Neighbor Commands ====================
def _cmd_neighbors(self) -> str:
"""List neighbors."""
# TODO: Get neighbors from routing table
return "Error: Not yet implemented"
def _cmd_neighbor_remove(self, command: str) -> str:
"""Remove a neighbor."""
pubkey_hex = command[16:].strip()
if not pubkey_hex:
return "ERR: Missing pubkey"
# TODO: Remove neighbor from routing table
logger.info(f"neighbor.remove: {pubkey_hex}")
return "Error: Not yet implemented"
# ==================== Temporary Radio Commands ====================
def _cmd_tempradio(self, command: str) -> str:
"""Apply temporary radio parameters."""
# Format: tempradio {freq} {bw} {sf} {cr} {timeout_mins}
parts = command[10:].split()
if len(parts) < 5:
return "Error: Expected freq bw sf cr timeout_mins"
try:
freq = float(parts[0])
bw = float(parts[1])
sf = int(parts[2])
cr = int(parts[3])
timeout_mins = int(parts[4])
# Validate
if not (300.0 <= freq <= 2500.0):
return "Error: invalid frequency"
if not (7.0 <= bw <= 500.0):
return "Error: invalid bandwidth"
if not (5 <= sf <= 12):
return "Error: invalid spreading factor"
if not (5 <= cr <= 8):
return "Error: invalid coding rate"
if timeout_mins <= 0:
return "Error: invalid timeout"
# TODO: Apply temporary radio parameters
logger.info(f"tempradio: {freq}MHz {bw}kHz SF{sf} CR4/{cr} for {timeout_mins}min")
return "Error: Not yet implemented"
except ValueError:
return "Error, invalid params"
# ==================== Logging Commands ====================
def _cmd_log(self, command: str) -> str:
"""Handle log commands."""
if command == "log start":
# TODO: Enable logging
return "Error: Not yet implemented"
elif command == "log stop":
# TODO: Disable logging
return "Error: Not yet implemented"
elif command == "log erase":
# TODO: Clear log file
return "Error: Not yet implemented"
elif command == "log":
return "Error: Use journalctl to view logs"
else:
return "Unknown log command"
# Backward compatibility alias
RepeaterCLI = MeshCLI