import asyncio import functools import logging import os import signal import socket import sys import time from repeater.companion.utils import ( CompanionContactCapacityError, effective_max_contacts, enforce_companion_contact_capacity, format_companion_bridge_limits, normalize_companion_identity_key, parse_companion_bridge_kwargs, validate_companion_node_name, ) from repeater.config import NullRadio, get_radio_for_board, load_config, save_config from repeater.config_manager import ConfigManager from repeater.data_acquisition.glass_handler import GlassHandler from repeater.data_acquisition.gps_service import GPSService from repeater.engine import RepeaterHandler from repeater.handler_helpers import ( AdvertHelper, DiscoveryHelper, LoginHelper, PathHelper, ProtocolRequestHelper, TextHelper, TraceHelper, ) from repeater.identity_manager import IdentityManager from repeater.packet_router import PacketRouter from repeater.sensors import SensorManager from repeater.web.http_server import HTTPStatsServer, _log_buffer logger = logging.getLogger("RepeaterDaemon") class RepeaterDaemon: def __init__(self, config: dict, radio=None): self.config = config self.radio = radio self.dispatcher = None self.repeater_handler = None self.local_hash = None self.local_identity = None self.identity_manager = None self.config_manager = None self.http_server = None self.trace_helper = None self.advert_helper = None self.discovery_helper = None self.login_helper = None self.text_helper = None self.path_helper = None self.protocol_request_helper = None self.glass_handler = None self.gps_service = None self.sensor_manager = None self.acl = None self.router = None self.companion_bridges: dict[int, object] = {} self.companion_frame_servers: list = [] self._shutdown_started = False self._main_task = None self.radio_status = "unknown" self.radio_error = None log_level = config.get("logging", {}).get("level", "INFO") logging.basicConfig( level=getattr(logging, log_level), format=config.get("logging", {}).get("format"), ) root_logger = logging.getLogger() _log_buffer.setLevel(getattr(logging, log_level)) root_logger.addHandler(_log_buffer) async def initialize(self): logger.info(f"Initializing repeater: {self.config['repeater']['node_name']}") # ----------------------------------------------- # Get the actual Network IP Address try: # This looks for the IP assigned to the default hostname host_name = socket.gethostname() # We try to get the IP associated with the hostname self.network_ip = socket.gethostbyname(host_name) # If that still gives 127.0.x.x, let's try a different internal method if self.network_ip.startswith("127."): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # We use a non-routable IP that doesn't require an actual connection s.connect(("10.255.255.255", 1)) self.network_ip = s.getsockname()[0] s.close() except Exception as e: logger.warning(f"Could not determine network IP: {e}") self.network_ip = "Unknown" logger.info(f"System Network IP: {self.network_ip}") # ----------------------------------------------- if self.radio is None: radio_type_raw = self.config.get("radio_type") radio_type = "none" if radio_type_raw is None else str(radio_type_raw) radio_type_lower = radio_type.lower().strip() radio_explicitly_disabled = radio_type_lower in ( "", "none", "null", "disabled", "off", "no_radio", ) logger.info(f"Initializing radio hardware... (radio_type={radio_type})") try: self.radio = get_radio_for_board(self.config) if isinstance(self.radio, NullRadio): self.radio_status = "disabled" if radio_explicitly_disabled else "degraded" if self.radio_status == "disabled": self.radio_error = None else: self.radio_error = ( self.radio_error or f"Radio type '{radio_type}' unavailable; running in no-radio mode" ) else: self.radio_status = "ok" self.radio_error = None # KISS modem: schedule RX callbacks on the event loop for thread safety if hasattr(self.radio, "set_event_loop"): self.radio.set_event_loop(asyncio.get_running_loop()) if hasattr(self.radio, "set_custom_cad_thresholds"): # Load CAD settings from config, with defaults cad_config = self.config.get("radio", {}).get("cad", {}) peak_threshold = cad_config.get("peak_threshold", 23) min_threshold = cad_config.get("min_threshold", 11) self.radio.set_custom_cad_thresholds(peak=peak_threshold, min_val=min_threshold) logger.info( f"CAD thresholds set from config: peak={peak_threshold}, min={min_threshold}" ) else: logger.warning("Radio does not support CAD configuration") if hasattr(self.radio, "get_frequency"): logger.info(f"Radio config - Freq: {self.radio.get_frequency():.1f}MHz") if hasattr(self.radio, "get_spreading_factor"): logger.info(f"Radio config - SF: {self.radio.get_spreading_factor()}") if hasattr(self.radio, "get_bandwidth"): logger.info(f"Radio config - BW: {self.radio.get_bandwidth()}kHz") if hasattr(self.radio, "get_coding_rate"): logger.info(f"Radio config - CR: {self.radio.get_coding_rate()}") if hasattr(self.radio, "get_tx_power"): logger.info(f"Radio config - TX Power: {self.radio.get_tx_power()}dBm") logger.info("Radio hardware initialized") except Exception as e: logger.error(f"Failed to initialize radio hardware: {e}") self.radio_status = "degraded" self.radio_error = str(e) logger.warning( "Radio type '%s' unavailable; starting in no-radio mode to keep service alive. " "Check radio configuration and hardware mapping.", radio_type, ) self.radio = NullRadio() try: from pymc_core import LocalIdentity from pymc_core.node.dispatcher import Dispatcher self.dispatcher = Dispatcher(self.radio) logger.info("Dispatcher initialized") # Initialize Identity Manager for additional identities (e.g., room servers) self.identity_manager = IdentityManager(self.config) logger.info("Identity manager initialized") # Set up default repeater identity (not managed by identity manager) identity_key = self.config.get("repeater", {}).get("identity_key") if not identity_key: logger.error("No identity key found in configuration. Cannot init repeater.") raise RuntimeError("Identity key is required for repeater operation") local_identity = LocalIdentity(seed=identity_key) self.local_identity = local_identity self.dispatcher.local_identity = local_identity pubkey = local_identity.get_public_key() self.local_hash = pubkey[0] self.local_hash_bytes = bytes(pubkey[:3]) logger.info(f"Local identity set: {local_identity.get_address_bytes().hex()}") local_hash_hex = f"0x{self.local_hash:02x}" logger.info(f"Local node hash (from identity): {local_hash_hex}") # Load additional identities from config (e.g., room servers) await self._load_additional_identities() self.dispatcher._is_own_packet = lambda pkt: False self.repeater_handler = RepeaterHandler( self.config, self.dispatcher, self.local_hash, local_hash_bytes=self.local_hash_bytes, send_advert_func=self.send_advert, ) # Create router self.router = PacketRouter(self) await self.router.start() # Register router as entry point for ALL packets via fallback handler # All received packets flow through router → helpers → repeater engine self.dispatcher.register_fallback_handler(self._router_callback) logger.info("Packet router registered as fallback (catches all packets)") # Set default path hash mode for flood 0-hop packets (adverts, etc.) path_hash_mode = self.config.get("mesh", {}).get("path_hash_mode", 0) if path_hash_mode not in (0, 1, 2): logger.warning( f"Invalid mesh.path_hash_mode={path_hash_mode}, must be 0/1/2; using 0" ) path_hash_mode = 0 self.dispatcher.set_default_path_hash_mode(path_hash_mode) mode_names = {0: "1-byte", 1: "2-byte", 2: "3-byte"} logger.info( f"Path hash mode set to {mode_names[path_hash_mode]} (mesh.path_hash_mode={path_hash_mode})" ) # Create processing helpers (handlers created internally) self.trace_helper = TraceHelper( local_hash=self.local_hash, repeater_handler=self.repeater_handler, packet_injector=self.router.inject_packet, log_fn=logger.info, local_identity=self.local_identity, ) logger.info("Trace processing helper initialized") # Create advert helper for neighbor tracking self.advert_helper = AdvertHelper( local_identity=self.local_identity, storage=self.repeater_handler.storage if self.repeater_handler else None, config=self.config, log_fn=logger.info, ) logger.info("Advert processing helper initialized") # Set up discovery handler if enabled allow_discovery = self.config.get("repeater", {}).get("allow_discovery", True) if allow_discovery: self.discovery_helper = DiscoveryHelper( local_identity=self.local_identity, packet_injector=self.router.inject_packet, node_type=2, log_fn=logger.info, debug_log_fn=logger.debug, ) logger.info("Discovery processing helper initialized") else: logger.info("Discovery response handler disabled") # Create login helper (will create per-identity ACLs) self.login_helper = LoginHelper( identity_manager=self.identity_manager, packet_injector=self.router.inject_packet, log_fn=logger.info, sqlite_handler=( self.repeater_handler.storage.sqlite_handler if self.repeater_handler and self.repeater_handler.storage else None ), # For anon regions-discovery replies config=self.config, # For owner-info / feature-flags replies ) # Register default repeater identity self.login_helper.register_identity( name="repeater", identity=self.local_identity, identity_type="repeater", config=self.config, # Pass full config so repeater can access top-level security section ) # Register room server identities with their configs for name, identity, config in self.identity_manager.get_identities_by_type( "room_server" ): self.login_helper.register_identity( name=name, identity=identity, identity_type="room_server", config=config, # Pass room-specific config ) logger.info("Login processing helper initialized") # Initialize ConfigManager for centralized config management self.config_manager = ConfigManager( config_path=getattr(self, "config_path", "/etc/pymc_repeater/config.yaml"), config=self.config, daemon_instance=self, ) logger.info("Config manager initialized") self.sensor_manager = SensorManager(self.config) self.sensor_manager.start() if self.sensor_manager.get_summary().get("loaded", 0): logger.info("Sensor manager initialized") else: logger.info("No configured sensors loaded") self.gps_service = GPSService( self.config, location_update_callback=self._update_repeater_location_from_gps, ) self.gps_service.start() if self.config.get("gps", {}).get("enabled", False): logger.info("GPS diagnostics initialized") else: logger.info("GPS diagnostics disabled") # Initialize text message helper with per-identity ACLs self.text_helper = TextHelper( identity_manager=self.identity_manager, packet_injector=self.router.inject_packet, acl_dict=self.login_helper.get_acl_dict(), # Per-identity ACLs log_fn=logger.info, config_path=getattr(self, "config_path", None), # For CLI to save changes config=self.config, # For CLI to read/modify settings config_manager=self.config_manager, # New centralized config manager sqlite_handler=( self.repeater_handler.storage.sqlite_handler if self.repeater_handler and self.repeater_handler.storage else None ), # For room server database send_advert_callback=self.send_advert, # For CLI advert command ) self.text_helper._loop = asyncio.get_running_loop() # Register default repeater identity for text messages self.text_helper.register_identity( name="repeater", identity=self.local_identity, identity_type="repeater", radio_config=self.config.get("radio", {}), ) # Register room server identities for text messages for name, identity, config in self.identity_manager.get_identities_by_type( "room_server" ): self.text_helper.register_identity( name=name, identity=identity, identity_type="room_server", radio_config=config, # Pass room-specific config (includes max_posts, etc.) ) logger.info("Text message processing helper initialized") # Initialize PATH packet helper for updating client out_path self.path_helper = PathHelper( acl_dict=self.login_helper.get_acl_dict(), # Per-identity ACLs log_fn=logger.info, ) logger.info("PATH packet processing helper initialized") # Initialize protocol request handler for status/telemetry requests self.protocol_request_helper = ProtocolRequestHelper( identity_manager=self.identity_manager, packet_injector=self.router.inject_packet, acl_dict=self.login_helper.get_acl_dict(), radio=self.radio, engine=self.repeater_handler, neighbor_tracker=self.advert_helper, config=self.config, ) # Register repeater identity for protocol requests self.protocol_request_helper.register_identity( name="repeater", identity=self.local_identity, identity_type="repeater" ) logger.info("Protocol request handler initialized") # Load companion identities (CompanionBridge + frame server per companion) await self._load_companion_identities() # Subscribe to raw RX in pyMC_core so we can push PUSH_CODE_LOG_RX_DATA to companion clients self.dispatcher.add_raw_rx_subscriber(self._on_raw_rx_for_companions) n = len(getattr(self, "companion_frame_servers", [])) logger.info( "Raw RX subscriber registered (%s companion frame server(s)). Connect a client to see rx_log (0x88).", n, ) # Subscribe to parsed packets (pre-dedup) so duplicate path variants # still appear in the web UI even though the Dispatcher blocks them. self.dispatcher.add_raw_packet_subscriber(self._on_raw_packet_for_dedup_logging) # When trace reaches final node, push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv) self.trace_helper.on_trace_complete = self._on_trace_complete_for_companions # Optional pyMC_Glass integration loop (inform/control plane) self.glass_handler = GlassHandler( config=self.config, daemon_instance=self, config_manager=self.config_manager, ) await self.glass_handler.start() if ( self.repeater_handler and self.repeater_handler.storage and hasattr(self.repeater_handler.storage, "set_glass_publisher") ): self.repeater_handler.storage.set_glass_publisher( self.glass_handler.publish_telemetry ) except Exception as e: logger.error(f"Failed to initialize dispatcher: {e}") raise async def _load_additional_identities(self): from pymc_core import LocalIdentity identities_config = self.config.get("identities", {}) # Load room server identities room_servers = identities_config.get("room_servers") or [] for room_config in room_servers: try: name = room_config.get("name") identity_key = room_config.get("identity_key") if not name or not identity_key: logger.warning("Skipping room server config: missing name or identity_key") continue # Convert identity_key to bytes if it's a hex string if isinstance(identity_key, bytes): identity_key_bytes = identity_key elif isinstance(identity_key, str): try: identity_key_bytes = bytes.fromhex(identity_key) if len(identity_key_bytes) not in (32, 64): logger.error( f"Identity key for '{name}' is invalid length: {len(identity_key_bytes)} bytes (expected 32 or 64)" ) continue except ValueError as e: logger.error(f"Identity key for '{name}' is not valid hex: {e}") continue else: logger.error( f"Identity key for '{name}' has unknown type: {type(identity_key)}" ) continue # Create the identity room_identity = LocalIdentity(seed=identity_key_bytes) # Register with the manager and all helpers success = self._register_identity_everywhere( name=name, identity=room_identity, config=room_config, identity_type="room_server", ) if success: room_hash = room_identity.get_public_key()[0] logger.info( f"Loaded room server '{name}': hash=0x{room_hash:02x}, " f"address={room_identity.get_address_bytes().hex()}" ) except Exception as e: logger.error(f"Failed to load room server identity '{name}': {e}") # Summary logging total_identities = len(self.identity_manager.list_identities()) logger.info(f"Identity manager loaded {total_identities} total identities") async def _load_companion_identities(self) -> None: """Load companion identities from config and create CompanionBridge + frame server for each.""" from pymc_core import LocalIdentity from pymc_core.companion.models import Channel from repeater.companion import CompanionFrameServer, RepeaterCompanionBridge companions_config = self.config.get("identities", {}).get("companions") or [] if not companions_config: return sqlite_handler = None if self.repeater_handler and self.repeater_handler.storage: sqlite_handler = self.repeater_handler.storage.sqlite_handler if not sqlite_handler and companions_config: logger.warning( "Companion persistence disabled: no storage (contacts/channels will not survive restart or disconnect)" ) radio_config = ( self.repeater_handler.radio_config if self.repeater_handler else self.config.get("radio", {}) ) for comp_config in companions_config: try: name = comp_config.get("name") identity_key = comp_config.get("identity_key") settings = comp_config.get("settings") or {} if not name or not identity_key: logger.warning("Skipping companion config: missing name or identity_key") continue if isinstance(identity_key, str): try: identity_key_bytes = bytes.fromhex( normalize_companion_identity_key(identity_key) ) except ValueError as e: logger.error(f"Companion '{name}' identity_key invalid hex: {e}") continue elif isinstance(identity_key, bytes): identity_key_bytes = identity_key else: logger.error(f"Companion '{name}' identity_key has unknown type") continue if len(identity_key_bytes) not in (32, 64): logger.error( f"Companion '{name}' identity_key must be 32 bytes (hex) or 64 bytes (MeshCore firmware key)" ) continue identity = LocalIdentity(seed=identity_key_bytes) pubkey = identity.get_public_key() companion_hash = pubkey[0] companion_hash_str = f"0x{companion_hash:02x}" node_name = settings.get("node_name", name) tcp_port = settings.get("tcp_port", 5000) bind_address = settings.get("bind_address", "0.0.0.0") # nosec B104 tcp_timeout_raw = settings.get("tcp_timeout", 8 * 60 * 60) # 8 hours client_idle_timeout_sec = None if tcp_timeout_raw == 0 else int(tcp_timeout_raw) def _make_sync_node_name_to_config(companion_name: str): """Return a callback that syncs node_name to config for this companion (binds name at creation).""" def _sync(new_node_name: str) -> None: try: validated = validate_companion_node_name(new_node_name) except ValueError: return companions = (self.config.get("identities") or {}).get("companions") or [] for entry in companions: if entry.get("name") == companion_name: if "settings" not in entry: entry["settings"] = {} entry["settings"]["node_name"] = validated config_path = getattr(self, "config_path", None) if config_path: save_config(self.config, config_path) break return _sync bridge_kwargs = parse_companion_bridge_kwargs(settings) max_contacts = effective_max_contacts(bridge_kwargs) if sqlite_handler: trimmed = enforce_companion_contact_capacity( companion_hash_str, max_contacts, sqlite_handler, trim=bool(settings.get("trim_contacts_on_overflow")), companion_name=name, ) if trimmed: logger.warning( "Companion '%s': trimmed %d contact(s) to fit " "max_contacts=%d (trim_contacts_on_overflow)", name, trimmed, max_contacts, ) bridge = RepeaterCompanionBridge( identity=identity, # Tag the injector with this companion's hash so inject_packet can # skip its own frame server when echoing TX as raw RX (a node never # hears its own transmission). packet_injector=functools.partial( self.router.inject_packet, origin_hash=companion_hash_str ), node_name=node_name, radio_config=radio_config, sqlite_handler=sqlite_handler, companion_hash=companion_hash_str, on_prefs_saved=_make_sync_node_name_to_config(name), **bridge_kwargs, ) # Load contacts from SQLite if sqlite_handler: contact_rows = sqlite_handler.companion_load_contacts(companion_hash_str) if contact_rows: records = [] for row in contact_rows: d = dict(row) d["public_key"] = d.pop("pubkey", d.get("public_key", b"")) records.append(d) bridge.contacts.load_from_dicts(records) # Load channels from SQLite (normalize secret to 32 bytes to match # CompanionBase.set_channel and GroupTextHandler/PacketBuilder) channel_rows = sqlite_handler.companion_load_channels(companion_hash_str) for row in channel_rows: s = row.get("secret", b"") if isinstance(s, bytes): raw = s elif isinstance(s, (bytearray, memoryview)): raw = bytes(s) elif s: raw = bytes.fromhex(s if isinstance(s, str) else str(s)) else: raw = b"" if len(raw) < 32: raw = raw + b"\x00" * (32 - len(raw)) elif len(raw) > 32: raw = raw[:32] ch = Channel(name=row.get("name", ""), secret=raw) bridge.channels.set(row.get("channel_idx", 0), ch) # Preload queued messages from SQLite into bridge, bounded by # offline_queue_size (0 disables offline storage entirely). retention = getattr(bridge.message_queue, "_max_size", None) if retention != 0: for msg_dict in sqlite_handler.companion_load_messages( companion_hash_str, limit=retention or 100 ): from pymc_core.companion.models import QueuedMessage sk = msg_dict.get("sender_key", b"") if isinstance(sk, str): sk = bytes.fromhex(sk) bridge.message_queue.push( QueuedMessage( sender_key=sk, txt_type=msg_dict.get("txt_type", 0), timestamp=msg_dict.get("timestamp", 0), text=msg_dict.get("text", ""), is_channel=bool(msg_dict.get("is_channel", False)), channel_idx=msg_dict.get("channel_idx", 0), path_len=msg_dict.get("path_len", 0), ) ) # Ensure public channel (0) exists with default key for new companions from repeater.companion.constants import DEFAULT_PUBLIC_CHANNEL_SECRET if bridge.get_channel(0) is None: bridge.set_channel(0, "Public", DEFAULT_PUBLIC_CHANNEL_SECRET) self.companion_bridges[companion_hash] = bridge frame_server = CompanionFrameServer( bridge=bridge, companion_hash=companion_hash_str, port=tcp_port, bind_address=bind_address, client_idle_timeout_sec=client_idle_timeout_sec, sqlite_handler=sqlite_handler, local_hash=self.local_hash, stats_getter=self._get_companion_stats, control_handler=( self.discovery_helper.control_handler if self.discovery_helper else None ), ) await frame_server.start() self.companion_frame_servers.append(frame_server) self.identity_manager.register_identity( name=name, identity=identity, config=comp_config, identity_type="companion", ) limits = format_companion_bridge_limits(bridge_kwargs) logger.info( f"Loaded companion '{name}': hash=0x{companion_hash:02x}, " f"port={tcp_port}, bind={bind_address}, " f"client_idle_timeout_sec={client_idle_timeout_sec}{limits}" ) except CompanionContactCapacityError as e: logger.error("%s", e) except Exception as e: logger.error(f"Failed to load companion '{name}': {e}", exc_info=True) async def add_companion_from_config(self, comp_config: dict) -> None: """ Load a single companion from config and register it (hot-reload). Creates RepeaterCompanionBridge, CompanionFrameServer, starts the server, and registers with identity_manager. Raises on error. """ from pymc_core import LocalIdentity from pymc_core.companion.models import Channel from repeater.companion import CompanionFrameServer, RepeaterCompanionBridge from repeater.companion.constants import DEFAULT_PUBLIC_CHANNEL_SECRET name = comp_config.get("name") identity_key = comp_config.get("identity_key") settings = comp_config.get("settings") or {} if not name or not identity_key: raise ValueError("Companion config missing name or identity_key") if isinstance(identity_key, str): try: identity_key_bytes = bytes.fromhex(normalize_companion_identity_key(identity_key)) except ValueError as e: raise ValueError(f"Companion '{name}' identity_key invalid hex: {e}") from e elif isinstance(identity_key, bytes): identity_key_bytes = identity_key else: raise ValueError(f"Companion '{name}' identity_key has unknown type") if len(identity_key_bytes) not in (32, 64): raise ValueError( f"Companion '{name}' identity_key must be 32 bytes (hex) or 64 bytes (MeshCore firmware key)" ) # Already registered? if name in self.identity_manager.named_identities: raise ValueError(f"Companion '{name}' is already registered") identity = LocalIdentity(seed=identity_key_bytes) pubkey = identity.get_public_key() companion_hash = pubkey[0] companion_hash_str = f"0x{companion_hash:02x}" if companion_hash in self.companion_bridges: raise ValueError(f"Companion with hash 0x{companion_hash:02x} already loaded") sqlite_handler = None if self.repeater_handler and self.repeater_handler.storage: sqlite_handler = self.repeater_handler.storage.sqlite_handler radio_config = ( self.repeater_handler.radio_config if self.repeater_handler else self.config.get("radio", {}) ) node_name = settings.get("node_name", name) tcp_port = settings.get("tcp_port", 5000) bind_address = settings.get("bind_address", "0.0.0.0") # nosec B104 tcp_timeout_raw = settings.get("tcp_timeout", 120) client_idle_timeout_sec = None if tcp_timeout_raw == 0 else int(tcp_timeout_raw) bridge_kwargs = parse_companion_bridge_kwargs(settings) max_contacts = effective_max_contacts(bridge_kwargs) if sqlite_handler: trimmed = enforce_companion_contact_capacity( companion_hash_str, max_contacts, sqlite_handler, trim=bool(settings.get("trim_contacts_on_overflow")), companion_name=name, ) if trimmed: logger.warning( "Hot-reload companion '%s': trimmed %d contact(s) to fit " "max_contacts=%d (trim_contacts_on_overflow)", name, trimmed, max_contacts, ) bridge = RepeaterCompanionBridge( identity=identity, packet_injector=functools.partial( self.router.inject_packet, origin_hash=companion_hash_str ), node_name=node_name, radio_config=radio_config, sqlite_handler=sqlite_handler, companion_hash=companion_hash_str, **bridge_kwargs, ) if sqlite_handler: contact_rows = sqlite_handler.companion_load_contacts(companion_hash_str) if contact_rows: records = [] for row in contact_rows: d = dict(row) d["public_key"] = d.pop("pubkey", d.get("public_key", b"")) records.append(d) bridge.contacts.load_from_dicts(records) channel_rows = sqlite_handler.companion_load_channels(companion_hash_str) for row in channel_rows: s = row.get("secret", b"") if isinstance(s, bytes): raw = s elif isinstance(s, (bytearray, memoryview)): raw = bytes(s) elif s: raw = bytes.fromhex(s if isinstance(s, str) else str(s)) else: raw = b"" if len(raw) < 32: raw = raw + b"\x00" * (32 - len(raw)) elif len(raw) > 32: raw = raw[:32] ch = Channel(name=row.get("name", ""), secret=raw) bridge.channels.set(row.get("channel_idx", 0), ch) retention = getattr(bridge.message_queue, "_max_size", None) if retention != 0: for msg_dict in sqlite_handler.companion_load_messages( companion_hash_str, limit=retention or 100 ): from pymc_core.companion.models import QueuedMessage sk = msg_dict.get("sender_key", b"") if isinstance(sk, str): sk = bytes.fromhex(sk) bridge.message_queue.push( QueuedMessage( sender_key=sk, txt_type=msg_dict.get("txt_type", 0), timestamp=msg_dict.get("timestamp", 0), text=msg_dict.get("text", ""), is_channel=bool(msg_dict.get("is_channel", False)), channel_idx=msg_dict.get("channel_idx", 0), path_len=msg_dict.get("path_len", 0), ) ) if bridge.get_channel(0) is None: bridge.set_channel(0, "Public", DEFAULT_PUBLIC_CHANNEL_SECRET) self.companion_bridges[companion_hash] = bridge frame_server = CompanionFrameServer( bridge=bridge, companion_hash=companion_hash_str, port=tcp_port, bind_address=bind_address, client_idle_timeout_sec=client_idle_timeout_sec, sqlite_handler=sqlite_handler, local_hash=self.local_hash, stats_getter=self._get_companion_stats, control_handler=( self.discovery_helper.control_handler if self.discovery_helper else None ), ) await frame_server.start() self.companion_frame_servers.append(frame_server) self.identity_manager.register_identity( name=name, identity=identity, config=comp_config, identity_type="companion", ) limits = format_companion_bridge_limits(bridge_kwargs) logger.info( f"Hot-reload: Loaded companion '{name}': hash=0x{companion_hash:02x}, " f"port={tcp_port}, bind={bind_address}, " f"client_idle_timeout_sec={client_idle_timeout_sec}{limits}" ) async def _on_raw_rx_for_companions( self, data: bytes, rssi: int, snr: float, exclude_hash: str | None = None ) -> None: """Raw RX subscriber: push PUSH_CODE_LOG_RX_DATA (0x88) to connected companion clients. ``exclude_hash`` skips the frame server for that companion hash; used when echoing a companion's own injected TX so it never hears its own transmission. OTA RX subscribers leave it unset, so received packets reach every companion. """ servers = getattr(self, "companion_frame_servers", []) if not servers: return for fs in servers: if exclude_hash is not None and getattr(fs, "companion_hash", None) == exclude_hash: continue try: fs.push_rx_raw(snr, rssi, data) except Exception as e: logger.debug("Push RX raw to companion: %s", e) def _on_raw_packet_for_dedup_logging(self, pkt, data: bytes, analysis: dict) -> None: """Record duplicate packets for UI visibility. Called by Dispatcher's raw_packet_subscriber (pre-dedup) so we see all path variants. Only records packets the engine has already seen; novel packets are left for the normal handler path. """ if not self.repeater_handler: return if not self.repeater_handler.is_duplicate(pkt): return # First variant — will reach engine via normal handler path rssi = getattr(pkt, "_rssi", 0) or 0 snr = getattr(pkt, "_snr", 0.0) or 0.0 self.repeater_handler.record_duplicate(pkt, rssi=rssi, snr=snr) async def deliver_control_data( self, snr: float, rssi: int, path_len: int, path_bytes: bytes, payload_bytes: bytes, ) -> None: """Deliver CONTROL payload (e.g. discovery response) to companion clients (PUSH_CODE_CONTROL_DATA 0x8E).""" # Only push discovery responses (0x90); client expects these, not the request (0x80) if len(payload_bytes) < 6 or (payload_bytes[0] & 0xF0) != 0x90: return # Push every discovery response to the client, including our own (snr=0, rssi=0 = local node's response) servers = getattr(self, "companion_frame_servers", []) if not servers: return tag = int.from_bytes(payload_bytes[2:6], "little") if len(payload_bytes) >= 6 else 0 logger.debug( "Delivering discovery response to %s companion(s): tag=0x%08X, len=%s", len(servers), tag, len(payload_bytes), ) for fs in servers: try: await fs.push_control_data(snr, rssi, path_len, path_bytes, payload_bytes) except Exception as e: logger.warning("Companion push_control_data error: %s", e) async def _on_trace_complete_for_companions(self, packet, parsed_data) -> None: """Trace completed at this node: push PUSH_CODE_TRACE_DATA (0x89) to companion clients (firmware onTraceRecv).""" path_hashes = parsed_data.get("trace_path_bytes") or b"" if not path_hashes: return flags = parsed_data.get("flags", 0) path_sz = flags & 0x03 hash_len = len(path_hashes) expected_snr_len = hash_len >> path_sz if expected_snr_len <= 0: return tag = parsed_data.get("tag", 0) auth_code = parsed_data.get("auth_code", 0) snr_scaled = max(-128, min(127, int(round(packet.get_snr() * 4)))) snr_byte = snr_scaled if snr_scaled >= 0 else (256 + snr_scaled) # Firmware: memcpy path_snrs from pkt->path (length hash_len >> path_sz), then final SNR byte raw = bytes(packet.path)[:expected_snr_len] if len(raw) < expected_snr_len: raw = raw + b"\x00" * (expected_snr_len - len(raw)) path_snrs = raw for fs in getattr(self, "companion_frame_servers", []): try: await fs.push_trace_data_async( hash_len, flags, tag, auth_code, path_hashes, path_snrs, snr_byte ) except Exception as e: logger.debug("Push trace data to companion: %s", e) def _register_identity_everywhere( self, name: str, identity, config: dict, identity_type: str ) -> bool: """ Register an identity with the manager and all helpers in one place. This is the single source of truth for identity registration. """ # Register with identity manager success = self.identity_manager.register_identity( name=name, identity=identity, config=config, identity_type=identity_type ) if not success: return False # Register with all helpers if self.login_helper: self.login_helper.register_identity( name=name, identity=identity, identity_type=identity_type, config=config ) if self.text_helper: self.text_helper.register_identity( name=name, identity=identity, identity_type=identity_type, radio_config=self.config.get("radio", {}), ) if self.protocol_request_helper: self.protocol_request_helper.register_identity( name=name, identity=identity, identity_type=identity_type ) return True async def _router_callback(self, packet): """ Single entry point for ALL packets. Enqueues packets for router processing. """ if self.router: try: await self.router.enqueue(packet) except Exception as e: logger.error(f"Error enqueuing packet in router: {e}", exc_info=True) def register_text_handler_for_identity( self, name: str, identity, identity_type: str = "room_server", radio_config: dict = None ): if not self.text_helper: logger.warning("Text helper not initialized, cannot register identity") return False try: self.text_helper.register_identity( name=name, identity=identity, identity_type=identity_type, radio_config=radio_config or self.config.get("radio", {}), ) logger.info(f"Registered text handler for {identity_type} '{name}'") return True except Exception as e: logger.error(f"Failed to register text handler for '{name}': {e}") return False def get_stats(self) -> dict: stats = {} if self.repeater_handler: stats = self.repeater_handler.get_stats() # Add public key if available if self.local_identity: try: pubkey = self.local_identity.get_public_key() stats["public_key"] = pubkey.hex() except Exception: stats["public_key"] = None if self.gps_service: stats["gps"] = self.gps_service.get_summary() if self.sensor_manager: stats["sensors"] = self.sensor_manager.get_summary() stats["radio_status"] = self.radio_status if self.radio_error: stats["radio_error"] = self.radio_error return stats async def _get_companion_stats(self, stats_type: int) -> dict: """Return stats dict for companion CMD_GET_STATS (format expected by frame_server + meshcore_py).""" from repeater.companion.constants import ( STATS_TYPE_CORE, STATS_TYPE_PACKETS, STATS_TYPE_RADIO, ) if not self.repeater_handler: return {} engine = self.repeater_handler airtime = engine.airtime_mgr.get_stats() uptime_secs = int(time.time() - engine.start_time) queue_len = 0 for bridge in getattr(self, "companion_bridges", {}).values(): queue_len += getattr(getattr(bridge, "message_queue", None), "count", 0) or 0 if stats_type == STATS_TYPE_CORE: return { "battery_mv": 0, "uptime_secs": uptime_secs, "errors": 0, "queue_len": min(255, queue_len), } if stats_type == STATS_TYPE_RADIO: noise_floor = int(engine.get_cached_noise_floor() or 0) radio = getattr(self, "dispatcher", None) and getattr(self.dispatcher, "radio", None) if radio: _r = getattr(radio, "get_last_rssi", lambda: 0) _s = getattr(radio, "get_last_snr", lambda: 0.0) last_rssi = _r() if callable(_r) else _r last_snr = _s() if callable(_s) else _s else: last_rssi, last_snr = 0, 0.0 tx_air_secs = int(airtime.get("total_airtime_ms", 0) / 1000) return { "noise_floor": noise_floor, "last_rssi": int(last_rssi) if last_rssi is not None else 0, "last_snr": float(last_snr) if last_snr is not None else 0.0, "tx_air_secs": tx_air_secs, "rx_air_secs": 0, } if stats_type == STATS_TYPE_PACKETS: return { "recv": getattr(engine, "rx_count", 0), "sent": getattr(engine, "forwarded_count", 0), "flood_tx": getattr(engine, "forwarded_count", 0), "direct_tx": 0, "flood_rx": getattr(engine, "rx_count", 0), "direct_rx": 0, "recv_errors": getattr(engine, "dropped_count", 0), } return {} async def send_advert(self) -> bool: if not self.dispatcher or not self.local_identity: logger.error("Cannot send advert: dispatcher or identity not initialized") return False mode = self.config.get("repeater", {}).get("mode", "forward") if mode == "no_tx": logger.debug("Adverts disabled in no_tx mode") return False try: from pymc_core.protocol import PacketBuilder from pymc_core.protocol.constants import ADVERT_FLAG_HAS_NAME, ADVERT_FLAG_IS_REPEATER # Get node name and location from config repeater_config = self.config.get("repeater", {}) node_name = repeater_config.get("node_name", "Repeater") latitude = repeater_config.get("latitude", 0.0) longitude = repeater_config.get("longitude", 0.0) location_source = "config" if self.gps_service: location = self.gps_service.get_repeater_location() latitude = location.get("latitude", latitude) longitude = location.get("longitude", longitude) location_source = str(location.get("source", location_source)) flags = ADVERT_FLAG_IS_REPEATER | ADVERT_FLAG_HAS_NAME packet = PacketBuilder.create_advert( local_identity=self.local_identity, name=node_name, lat=latitude, lon=longitude, feature1=0, feature2=0, flags=flags, route_type="flood", ) # Send via dispatcher await self.dispatcher.send_packet(packet, wait_for_ack=False) if self.repeater_handler: self.repeater_handler.mark_seen(packet) pkt_hash = packet.calculate_packet_hash().hex()[:16] self.dispatcher.packet_filter.track_packet(pkt_hash) logger.debug("Marked own advert as seen in duplicate cache") logger.info( "Sent flood advert '%s' at (%.6f, %.6f) source=%s", node_name, latitude, longitude, location_source, ) return True except Exception as e: logger.error(f"Failed to send advert: {e}", exc_info=True) return False def _update_repeater_location_from_gps(self, location: dict) -> bool: """Persist the latest valid GPS fix as the repeater's advertised location.""" latitude = location.get("latitude") longitude = location.get("longitude") if latitude is None or longitude is None: return False repeater_config = self.config.setdefault("repeater", {}) current_latitude = repeater_config.get("latitude") current_longitude = repeater_config.get("longitude") try: if ( current_latitude is not None and current_longitude is not None and abs(float(current_latitude) - float(latitude)) < 0.000001 and abs(float(current_longitude) - float(longitude)) < 0.000001 ): return False except (TypeError, ValueError): pass updates = { "repeater": { "latitude": float(latitude), "longitude": float(longitude), } } if self.config_manager: result = self.config_manager.update_and_save( updates=updates, live_update=True, live_update_sections=["repeater"], ) if not result.get("success"): logger.warning( "GPS location fix could not update repeater config: %s", result.get("error", "unknown error"), ) return False else: repeater_config.update(updates["repeater"]) logger.info( "Updated repeater location from GPS fix: latitude=%.6f longitude=%.6f", latitude, longitude, ) return True def _signal_shutdown(self, sig, loop): """Handle SIGTERM/SIGINT by scheduling async shutdown.""" if self._shutdown_started: logger.info(f"Received signal {sig.name}, shutdown already in progress") return logger.info(f"Received signal {sig.name}, shutting down...") loop.create_task(self._shutdown()) # Cancel run() so dispatcher.run_forever() unwinds cleanly. if self._main_task and not self._main_task.done(): self._main_task.cancel() async def _shutdown(self): """Best-effort shutdown: stop background services and release hardware.""" if self._shutdown_started: return self._shutdown_started = True # Stop companion frame servers first to close client sockets and child workers. for frame_server in getattr(self, "companion_frame_servers", []): try: await frame_server.stop() except Exception as e: logger.warning(f"Companion frame server stop error: {e}") # Stop companion bridges to flush/persist state. if hasattr(self, "companion_bridges"): for bridge in self.companion_bridges.values(): if hasattr(bridge, "stop"): try: await bridge.stop() except Exception as e: logger.warning(f"Companion bridge stop error: {e}") # Stop router if self.router: try: await self.router.stop() except Exception as e: logger.warning(f"Error stopping router: {e}") # Stop HTTP server if self.http_server: try: await asyncio.wait_for(asyncio.to_thread(self.http_server.stop), timeout=3) except asyncio.TimeoutError: logger.warning("Timeout stopping HTTP server") except Exception as e: logger.warning(f"Error stopping HTTP server: {e}") # Stop Glass inform loop if self.glass_handler: try: await self.glass_handler.stop() except Exception as e: logger.warning(f"Error stopping Glass handler: {e}") # Stop sensor manager. if self.sensor_manager: try: self.sensor_manager.stop() except Exception as e: logger.warning(f"Error stopping sensor manager: {e}") # Stop GPS diagnostics. if self.gps_service: try: self.gps_service.stop() except Exception as e: logger.warning(f"Error stopping GPS diagnostics: {e}") # Close storage publishers (MQTT/LetsMesh) to stop their worker threads. try: if self.repeater_handler and self.repeater_handler.storage: await asyncio.wait_for( asyncio.to_thread(self.repeater_handler.storage.close), timeout=5 ) except asyncio.TimeoutError: logger.warning("Timeout closing storage publishers") except Exception as e: logger.warning(f"Error closing storage: {e}") # Release radio resources if self.radio and hasattr(self.radio, "cleanup"): try: self.radio.cleanup() except Exception as e: logger.warning(f"Error cleaning up radio: {e}") # Release CH341 USB device if in use try: radio_type_raw = self.config.get("radio_type") radio_type = "" if radio_type_raw is None else str(radio_type_raw).lower() if radio_type == "sx1262_ch341": from pymc_core.hardware.ch341.ch341_async import CH341Async CH341Async.reset_instance() except Exception as e: logger.debug(f"CH341 reset skipped/failed: {e}") # Do not force-stop the event loop here; asyncio.run() owns loop lifecycle. @staticmethod def _detect_container() -> bool: """Detect if running inside an LXC/Docker/systemd-nspawn container.""" try: with open("/proc/1/environ", "rb") as f: if b"container=" in f.read(): return True except (OSError, PermissionError): pass return os.path.exists("/run/host/container-manager") async def run(self): logger.info("Repeater daemon started") self._main_task = asyncio.current_task() # Register signal handlers for graceful shutdown loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGINT): loop.add_signal_handler( sig, functools.partial(self._signal_shutdown, sig, loop), ) # Warn if running inside a container (udev rules won't work here) if os.path.exists("/.dockerenv") or os.environ.get("container") or self._detect_container(): logger.warning( "Container environment detected. " "USB device udev rules must be configured on the HOST, not inside this container." ) try: await self.initialize() # Start HTTP stats server http_port = self.config.get("http", {}).get("port", 8000) http_host = self.config.get("http", {}).get("host", "0.0.0.0") # nosec B104 node_name = self.config.get("repeater", {}).get("node_name", "Repeater") # Format public key for display pub_key_formatted = "" if self.local_identity: pub_key_hex = self.local_identity.get_public_key().hex() # Format as if len(pub_key_hex) >= 16: pub_key_formatted = f"{pub_key_hex[:8]}...{pub_key_hex[-8:]}" else: pub_key_formatted = pub_key_hex current_loop = asyncio.get_event_loop() self.http_server = HTTPStatsServer( host=http_host, port=http_port, stats_getter=self.get_stats, node_name=node_name, pub_key=pub_key_formatted, send_advert_func=self.send_advert, config=self.config, event_loop=current_loop, daemon_instance=self, config_path=getattr(self, "config_path", "/etc/pymc_repeater/config.yaml"), ) try: self.http_server.start() except Exception as e: logger.error(f"Failed to start HTTP server: {e}") # Run dispatcher (handles RX/TX via pymc_core) try: await self.dispatcher.run_forever() except asyncio.CancelledError: logger.info("Dispatcher loop cancelled for shutdown") except KeyboardInterrupt: logger.info("Shutting down...") for frame_server in getattr(self, "companion_frame_servers", []): try: await frame_server.stop() except Exception as e: logger.debug(f"Companion frame server stop: {e}") if hasattr(self, "companion_bridges"): for bridge in self.companion_bridges.values(): if hasattr(bridge, "stop"): try: await bridge.stop() except Exception as e: logger.debug(f"Companion bridge stop: {e}") if self.router: await self.router.stop() if self.http_server: self.http_server.stop() finally: await self._shutdown() def main(): import argparse parser = argparse.ArgumentParser(description="pyMC Repeater Daemon") parser.add_argument( "--config", help="Path to config file (default: /etc/pymc_repeater/config.yaml)", ) parser.add_argument( "--log-level", choices=["DEBUG", "INFO", "WARNING", "ERROR"], help="Log level (default: INFO)", ) args = parser.parse_args() # Load configuration config = load_config(args.config) config_path = args.config if args.config else "/etc/pymc_repeater/config.yaml" if args.log_level: if "logging" not in config: config["logging"] = {} config["logging"]["level"] = args.log_level # Don't initialize radio here - it will be done inside the async event loop daemon = RepeaterDaemon(config, radio=None) daemon.config_path = config_path # Run try: asyncio.run(daemon.run()) except KeyboardInterrupt: logger.info("Repeater stopped") except Exception as e: logger.error(f"Fatal error: {e}", exc_info=True) sys.exit(1) if __name__ == "__main__": main()