forked from iarv/pyMC_Repeater
650 lines
25 KiB
Python
650 lines
25 KiB
Python
import logging
|
|
from typing import Optional, Dict, Any, Callable
|
|
import yaml
|
|
from pathlib import Path
|
|
import time
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MeshCLI:
|
|
|
|
def __init__(
|
|
self,
|
|
config_path: str,
|
|
config: Dict[str, Any],
|
|
config_manager, # ConfigManager instance for save & live updates
|
|
identity_type: str = "repeater",
|
|
enable_regions: bool = True,
|
|
send_advert_callback: Optional[Callable] = None,
|
|
identity = None,
|
|
storage_handler = None
|
|
):
|
|
|
|
self.config_path = Path(config_path)
|
|
self.config = config
|
|
self.config_manager = config_manager
|
|
self.identity_type = identity_type
|
|
self.enable_regions = enable_regions
|
|
self.send_advert_callback = send_advert_callback
|
|
self.identity = identity
|
|
self.storage_handler = storage_handler
|
|
|
|
# Get repeater config shortcut
|
|
self.repeater_config = config.get('repeater', {})
|
|
|
|
def handle_command(self, sender_pubkey: bytes, command: str, is_admin: bool) -> str:
|
|
|
|
# Check admin permission first
|
|
if not is_admin:
|
|
return "Error: Admin permission required"
|
|
|
|
logger.debug(f"handle_command received: '{command}' (len={len(command)})")
|
|
|
|
# Extract optional sequence prefix (XX|)
|
|
prefix = ""
|
|
if len(command) > 4 and command[2] == '|':
|
|
prefix = command[:3]
|
|
command = command[3:]
|
|
logger.debug(f"Extracted prefix: '{prefix}', remaining command: '{command}'")
|
|
|
|
# Strip leading/trailing whitespace
|
|
command = command.strip()
|
|
logger.debug(f"After strip: '{command}'")
|
|
|
|
# Route to appropriate handler
|
|
reply = self._route_command(command)
|
|
|
|
# Add prefix back to reply if present
|
|
if prefix:
|
|
return prefix + reply
|
|
return reply
|
|
|
|
def _route_command(self, command: str) -> str:
|
|
|
|
# System commands
|
|
if command == "reboot":
|
|
return self._cmd_reboot()
|
|
elif command == "advert":
|
|
return self._cmd_advert()
|
|
elif command.startswith("clock"):
|
|
return self._cmd_clock(command)
|
|
elif command.startswith("time "):
|
|
return self._cmd_time(command)
|
|
elif command == "start ota":
|
|
return "Error: OTA not supported in Python repeater"
|
|
elif command.startswith("password "):
|
|
return self._cmd_password(command)
|
|
elif command == "clear stats":
|
|
return self._cmd_clear_stats()
|
|
elif command == "ver":
|
|
return self._cmd_version()
|
|
|
|
# Get commands
|
|
elif command.startswith("get "):
|
|
return self._cmd_get(command[4:])
|
|
|
|
# Set commands
|
|
elif command.startswith("set "):
|
|
return self._cmd_set(command[4:])
|
|
|
|
# ACL commands
|
|
elif command.startswith("setperm "):
|
|
return self._cmd_setperm(command)
|
|
elif command == "get acl":
|
|
return "Error: Use 'get acl' via serial console only"
|
|
|
|
# Region commands (repeaters only)
|
|
elif command.startswith("region"):
|
|
if self.enable_regions:
|
|
return self._cmd_region(command)
|
|
else:
|
|
return "Error: Region commands not available for room servers"
|
|
|
|
# Neighbor commands
|
|
elif command == "neighbors":
|
|
return self._cmd_neighbors()
|
|
elif command.startswith("neighbor.remove "):
|
|
return self._cmd_neighbor_remove(command)
|
|
|
|
# Temporary radio params
|
|
elif command.startswith("tempradio "):
|
|
return self._cmd_tempradio(command)
|
|
|
|
# Sensor commands
|
|
elif command.startswith("sensor "):
|
|
return "Error: Sensor commands not implemented in Python repeater"
|
|
|
|
# GPS commands
|
|
elif command.startswith("gps"):
|
|
return "Error: GPS commands not implemented in Python repeater"
|
|
|
|
# Logging commands
|
|
elif command.startswith("log "):
|
|
return self._cmd_log(command)
|
|
|
|
# Statistics commands
|
|
elif command.startswith("stats-"):
|
|
return "Error: Stats commands not fully implemented yet"
|
|
|
|
else:
|
|
return "Unknown command"
|
|
|
|
# ==================== System Commands ====================
|
|
|
|
def _cmd_reboot(self) -> str:
|
|
"""Reboot the repeater process."""
|
|
from repeater.service_utils import restart_service
|
|
|
|
logger.warning("Reboot command received via mesh CLI")
|
|
success, message = restart_service()
|
|
|
|
if success:
|
|
return f"OK - {message}"
|
|
else:
|
|
return f"Error: {message}"
|
|
|
|
def _cmd_advert(self) -> str:
|
|
"""Send self advertisement."""
|
|
if not self.send_advert_callback:
|
|
logger.warning("Advert command received but no callback configured")
|
|
return "Error: Advert functionality not configured"
|
|
|
|
try:
|
|
import asyncio
|
|
|
|
async def delayed_advert():
|
|
"""Delay advert to let CLI response send first (matches C++ 1500ms delay)."""
|
|
await asyncio.sleep(1.5)
|
|
await self.send_advert_callback()
|
|
|
|
asyncio.create_task(delayed_advert())
|
|
logger.info("Advert scheduled for sending (1.5s delay)")
|
|
return "OK - Advert sent"
|
|
except Exception as e:
|
|
logger.error(f"Failed to schedule advert: {e}", exc_info=True)
|
|
return f"Error: {e}"
|
|
|
|
def _cmd_clock(self, command: str) -> str:
|
|
"""Handle clock commands."""
|
|
if command == "clock":
|
|
# Display current time
|
|
import datetime
|
|
dt = datetime.datetime.utcnow()
|
|
return f"{dt.hour:02d}:{dt.minute:02d} - {dt.day}/{dt.month}/{dt.year} UTC"
|
|
elif command == "clock sync":
|
|
# Clock sync happens automatically via sender_timestamp in protocol
|
|
return "OK - clock sync not needed (system time used)"
|
|
else:
|
|
return "Unknown clock command"
|
|
|
|
def _cmd_time(self, command: str) -> str:
|
|
"""Set time - not supported in Python (use system time)."""
|
|
return "Error: Time setting not supported (system time is used)"
|
|
|
|
def _cmd_password(self, command: str) -> str:
|
|
"""Change admin password."""
|
|
new_password = command[9:].strip()
|
|
|
|
if not new_password:
|
|
return "Error: Password cannot be empty"
|
|
|
|
# Update security config
|
|
if 'security' not in self.config:
|
|
self.config['security'] = {}
|
|
|
|
self.config['security']['password'] = new_password
|
|
|
|
# Save config and live update
|
|
try:
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['security'])
|
|
return f"password now: {new_password}"
|
|
except Exception as e:
|
|
logger.error(f"Failed to save password: {e}")
|
|
return "Error: Failed to save password"
|
|
|
|
def _cmd_clear_stats(self) -> str:
|
|
"""Clear statistics."""
|
|
# TODO: Implement stats clearing
|
|
return "Error: Not yet implemented"
|
|
|
|
def _cmd_version(self) -> str:
|
|
"""Get version information."""
|
|
role = "room_server" if self.identity_type == "room_server" else "repeater"
|
|
version = self.config.get('version', '1.0.0')
|
|
return f"pyMC_{role} v{version}"
|
|
|
|
# ==================== Get Commands ====================
|
|
|
|
def _cmd_get(self, param: str) -> str:
|
|
"""Handle get commands."""
|
|
param = param.strip()
|
|
logger.debug(f"_cmd_get called with param: '{param}' (len={len(param)})")
|
|
|
|
if param == "af":
|
|
af = self.repeater_config.get('airtime_factor', 1.0)
|
|
return f"> {af}"
|
|
|
|
elif param == "name":
|
|
name = self.repeater_config.get('name', 'Unknown')
|
|
return f"> {name}"
|
|
|
|
elif param == "repeat":
|
|
disabled = self.repeater_config.get('disable_forward', False)
|
|
return f"> {'off' if disabled else 'on'}"
|
|
|
|
elif param == "lat":
|
|
lat = self.repeater_config.get('latitude', 0.0)
|
|
return f"> {lat}"
|
|
|
|
elif param == "lon":
|
|
lon = self.repeater_config.get('longitude', 0.0)
|
|
return f"> {lon}"
|
|
|
|
elif param == "radio":
|
|
radio = self.config.get('radio', {})
|
|
freq_hz = radio.get('frequency', 915000000)
|
|
bw_hz = radio.get('bandwidth', 125000)
|
|
sf = radio.get('spreading_factor', 7)
|
|
cr = radio.get('coding_rate', 5)
|
|
# Convert Hz to MHz for freq, Hz to kHz for bandwidth (match C++ ftoa output)
|
|
freq_mhz = freq_hz / 1_000_000.0
|
|
bw_khz = bw_hz / 1_000.0
|
|
return f"> {freq_mhz},{bw_khz},{sf},{cr}"
|
|
|
|
elif param == "freq":
|
|
freq_hz = self.config.get('radio', {}).get('frequency', 915000000)
|
|
freq_mhz = freq_hz / 1_000_000.0
|
|
return f"> {freq_mhz}"
|
|
|
|
elif param == "tx":
|
|
power = self.config.get('radio', {}).get('tx_power', 20)
|
|
return f"> {power}"
|
|
|
|
elif param == "public.key":
|
|
if not self.identity:
|
|
return "Error: Identity not available"
|
|
try:
|
|
pubkey = self.identity.get_public_key()
|
|
pubkey_hex = pubkey.hex()
|
|
return f"> {pubkey_hex}"
|
|
except Exception as e:
|
|
logger.error(f"Failed to get public key: {e}")
|
|
return f"Error: {e}"
|
|
|
|
elif param == "role":
|
|
role = "room_server" if self.identity_type == "room_server" else "repeater"
|
|
return f"> {role}"
|
|
|
|
elif param == "guest.password":
|
|
guest_pw = self.config.get('security', {}).get('guest_password', '')
|
|
return f"> {guest_pw}"
|
|
|
|
elif param == "allow.read.only":
|
|
allow = self.config.get('security', {}).get('allow_read_only', False)
|
|
return f"> {'on' if allow else 'off'}"
|
|
|
|
elif param == "advert.interval":
|
|
interval = self.repeater_config.get('advert_interval_minutes', 120)
|
|
return f"> {interval}"
|
|
|
|
elif param == "flood.advert.interval":
|
|
interval = self.repeater_config.get('flood_advert_interval_hours', 24)
|
|
return f"> {interval}"
|
|
|
|
elif param == "flood.max":
|
|
max_flood = self.repeater_config.get('max_flood_hops', 3)
|
|
return f"> {max_flood}"
|
|
|
|
elif param == "rxdelay":
|
|
delay = self.repeater_config.get('rx_delay_base', 0.0)
|
|
return f"> {delay}"
|
|
|
|
elif param == "txdelay":
|
|
delay = self.repeater_config.get('tx_delay_factor', 1.0)
|
|
return f"> {delay}"
|
|
|
|
elif param == "direct.txdelay":
|
|
delay = self.repeater_config.get('direct_tx_delay_factor', 0.5)
|
|
return f"> {delay}"
|
|
|
|
elif param == "multi.acks":
|
|
acks = self.repeater_config.get('multi_acks', 0)
|
|
return f"> {acks}"
|
|
|
|
elif param == "int.thresh":
|
|
thresh = self.repeater_config.get('interference_threshold', -120)
|
|
return f"> {thresh}"
|
|
|
|
elif param == "agc.reset.interval":
|
|
interval = self.repeater_config.get('agc_reset_interval', 0)
|
|
return f"> {interval}"
|
|
|
|
else:
|
|
return f"??: {param}"
|
|
|
|
# ==================== Set Commands ====================
|
|
|
|
def _cmd_set(self, param: str) -> str:
|
|
"""Handle set commands."""
|
|
parts = param.split(None, 1)
|
|
if len(parts) < 2:
|
|
return "Error: Missing value"
|
|
|
|
key, value = parts[0], parts[1]
|
|
|
|
try:
|
|
if key == "af":
|
|
self.repeater_config['airtime_factor'] = float(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "name":
|
|
self.repeater_config['node_name'] = value
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "repeat":
|
|
disabled = value.lower() == "off"
|
|
self.repeater_config['disable_forward'] = disabled
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return f"OK - repeat is now {'OFF' if disabled else 'ON'}"
|
|
|
|
elif key == "lat":
|
|
self.repeater_config['latitude'] = float(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "lon":
|
|
self.repeater_config['longitude'] = float(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "radio":
|
|
# Format: freq bw sf cr
|
|
radio_parts = value.split()
|
|
if len(radio_parts) != 4:
|
|
return "Error: Expected freq bw sf cr"
|
|
|
|
if 'radio' not in self.config:
|
|
self.config['radio'] = {}
|
|
|
|
self.config['radio']['frequency'] = float(radio_parts[0])
|
|
self.config['radio']['bandwidth'] = float(radio_parts[1])
|
|
self.config['radio']['spreading_factor'] = int(radio_parts[2])
|
|
self.config['radio']['coding_rate'] = int(radio_parts[3])
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['radio'])
|
|
return "OK - restart repeater to apply"
|
|
|
|
elif key == "freq":
|
|
if 'radio' not in self.config:
|
|
self.config['radio'] = {}
|
|
self.config['radio']['frequency'] = float(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['radio'])
|
|
return "OK - restart repeater to apply"
|
|
|
|
elif key == "tx":
|
|
if 'radio' not in self.config:
|
|
self.config['radio'] = {}
|
|
self.config['radio']['tx_power'] = int(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['radio'])
|
|
return "OK"
|
|
|
|
elif key == "guest.password":
|
|
if 'security' not in self.config:
|
|
self.config['security'] = {}
|
|
self.config['security']['guest_password'] = value
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['security'])
|
|
return "OK"
|
|
|
|
elif key == "allow.read.only":
|
|
if 'security' not in self.config:
|
|
self.config['security'] = {}
|
|
self.config['security']['allow_read_only'] = value.lower() == "on"
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['security'])
|
|
return "OK"
|
|
|
|
elif key == "advert.interval":
|
|
mins = int(value)
|
|
if mins > 0 and (mins < 60 or mins > 240):
|
|
return "Error: interval range is 60-240 minutes"
|
|
self.repeater_config['advert_interval_minutes'] = mins
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "flood.advert.interval":
|
|
hours = int(value)
|
|
if (hours > 0 and hours < 3) or hours > 48:
|
|
return "Error: interval range is 3-48 hours"
|
|
self.repeater_config['flood_advert_interval_hours'] = hours
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "flood.max":
|
|
max_val = int(value)
|
|
if max_val > 64:
|
|
return "Error: max 64"
|
|
self.repeater_config['max_flood_hops'] = max_val
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "rxdelay":
|
|
delay = float(value)
|
|
if delay < 0:
|
|
return "Error: cannot be negative"
|
|
self.repeater_config['rx_delay_base'] = delay
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater', 'delays'])
|
|
return "OK"
|
|
|
|
elif key == "txdelay":
|
|
delay = float(value)
|
|
if delay < 0:
|
|
return "Error: cannot be negative"
|
|
self.repeater_config['tx_delay_factor'] = delay
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater', 'delays'])
|
|
return "OK"
|
|
|
|
elif key == "direct.txdelay":
|
|
delay = float(value)
|
|
if delay < 0:
|
|
return "Error: cannot be negative"
|
|
self.repeater_config['direct_tx_delay_factor'] = delay
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater', 'delays'])
|
|
return "OK"
|
|
|
|
elif key == "multi.acks":
|
|
self.repeater_config['multi_acks'] = int(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "int.thresh":
|
|
self.repeater_config['interference_threshold'] = int(value)
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return "OK"
|
|
|
|
elif key == "agc.reset.interval":
|
|
interval = int(value)
|
|
# Round to nearest multiple of 4
|
|
rounded = (interval // 4) * 4
|
|
self.repeater_config['agc_reset_interval'] = rounded
|
|
self.config_manager.save_to_file()
|
|
self.config_manager.live_update_daemon(['repeater'])
|
|
return f"OK - interval rounded to {rounded}"
|
|
|
|
else:
|
|
return f"unknown config: {key}"
|
|
|
|
except ValueError as e:
|
|
return f"Error: invalid value - {e}"
|
|
except Exception as e:
|
|
logger.error(f"Set command error: {e}")
|
|
return f"Error: {e}"
|
|
|
|
# ==================== ACL Commands ====================
|
|
|
|
def _cmd_setperm(self, command: str) -> str:
|
|
"""Set permissions for a public key."""
|
|
# Format: setperm {pubkey-hex} {permissions-int}
|
|
parts = command[8:].split()
|
|
if len(parts) < 2:
|
|
return "Err - bad params"
|
|
|
|
pubkey_hex = parts[0]
|
|
try:
|
|
permissions = int(parts[1])
|
|
except ValueError:
|
|
return "Err - invalid permissions"
|
|
|
|
# TODO: Apply permissions via ACL
|
|
logger.info(f"setperm command: {pubkey_hex} -> {permissions}")
|
|
return "Error: Not yet implemented - use config file"
|
|
|
|
# ==================== Region Commands ====================
|
|
|
|
def _cmd_region(self, command: str) -> str:
|
|
"""Handle region commands."""
|
|
parts = command.split()
|
|
|
|
if len(parts) == 1:
|
|
return "Error: Region commands not implemented in Python repeater"
|
|
|
|
subcommand = parts[1]
|
|
|
|
if subcommand == "load":
|
|
return "Error: Region commands not implemented"
|
|
elif subcommand == "save":
|
|
return "Error: Region commands not implemented"
|
|
elif subcommand in ("allowf", "denyf", "get", "home", "put", "remove"):
|
|
return "Error: Region commands not implemented"
|
|
else:
|
|
return "Err - ??"
|
|
|
|
# ==================== Neighbor Commands ====================
|
|
|
|
def _cmd_neighbors(self) -> str:
|
|
"""List neighbors."""
|
|
if not self.storage_handler:
|
|
return "Error: Storage not available"
|
|
|
|
try:
|
|
neighbors = self.storage_handler.get_neighbors()
|
|
|
|
if not neighbors:
|
|
return "No neighbors discovered yet"
|
|
|
|
# Filter to only show repeaters and zero hop nodes
|
|
filtered_neighbors = {
|
|
pubkey: info for pubkey, info in neighbors.items()
|
|
if info.get('is_repeater', False) or info.get('zero_hop', False)
|
|
}
|
|
|
|
if not filtered_neighbors:
|
|
return "No repeaters or zero hop neighbors discovered yet"
|
|
|
|
# Format output similar to C++ version
|
|
# Format: "<pubkey_prefix> heard Xs ago"
|
|
import time
|
|
current_time = int(time.time())
|
|
|
|
lines = []
|
|
for pubkey, info in filtered_neighbors.items():
|
|
last_seen = info.get('last_seen', 0)
|
|
seconds_ago = int(current_time - last_seen)
|
|
|
|
# Get first 4 bytes of pubkey as hex (match C++ format)
|
|
pubkey_short = pubkey[:8] if len(pubkey) >= 8 else pubkey
|
|
snr = info.get('snr', 0) or 0
|
|
|
|
# Format: <4byte_hex>:<seconds_ago>:<snr> (matches C++ format)
|
|
lines.append(f"{pubkey_short}:{seconds_ago}:{int(snr)}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to list neighbors: {e}", exc_info=True)
|
|
return f"Error: {e}"
|
|
|
|
def _cmd_neighbor_remove(self, command: str) -> str:
|
|
"""Remove a neighbor."""
|
|
pubkey_hex = command[16:].strip()
|
|
|
|
if not pubkey_hex:
|
|
return "ERR: Missing pubkey"
|
|
|
|
# TODO: Remove neighbor from routing table
|
|
logger.info(f"neighbor.remove: {pubkey_hex}")
|
|
return "Error: Not yet implemented"
|
|
|
|
# ==================== Temporary Radio Commands ====================
|
|
|
|
def _cmd_tempradio(self, command: str) -> str:
|
|
"""Apply temporary radio parameters."""
|
|
# Format: tempradio {freq} {bw} {sf} {cr} {timeout_mins}
|
|
parts = command[10:].split()
|
|
|
|
if len(parts) < 5:
|
|
return "Error: Expected freq bw sf cr timeout_mins"
|
|
|
|
try:
|
|
freq = float(parts[0])
|
|
bw = float(parts[1])
|
|
sf = int(parts[2])
|
|
cr = int(parts[3])
|
|
timeout_mins = int(parts[4])
|
|
|
|
# Validate
|
|
if not (300.0 <= freq <= 2500.0):
|
|
return "Error: invalid frequency"
|
|
if not (7.0 <= bw <= 500.0):
|
|
return "Error: invalid bandwidth"
|
|
if not (5 <= sf <= 12):
|
|
return "Error: invalid spreading factor"
|
|
if not (5 <= cr <= 8):
|
|
return "Error: invalid coding rate"
|
|
if timeout_mins <= 0:
|
|
return "Error: invalid timeout"
|
|
|
|
# TODO: Apply temporary radio parameters
|
|
logger.info(f"tempradio: {freq}MHz {bw}kHz SF{sf} CR4/{cr} for {timeout_mins}min")
|
|
return "Error: Not yet implemented"
|
|
|
|
except ValueError:
|
|
return "Error, invalid params"
|
|
|
|
# ==================== Logging Commands ====================
|
|
|
|
def _cmd_log(self, command: str) -> str:
|
|
"""Handle log commands."""
|
|
if command == "log start":
|
|
# TODO: Enable logging
|
|
return "Error: Not yet implemented"
|
|
elif command == "log stop":
|
|
# TODO: Disable logging
|
|
return "Error: Not yet implemented"
|
|
elif command == "log erase":
|
|
# TODO: Clear log file
|
|
return "Error: Not yet implemented"
|
|
elif command == "log":
|
|
return "Error: Use journalctl to view logs"
|
|
else:
|
|
return "Unknown log command"
|