diff --git a/debian/control b/debian/control index 85cc62c..83a70d0 100644 --- a/debian/control +++ b/debian/control @@ -26,9 +26,18 @@ Depends: ${python3:Depends}, python3-cherrypy3, python3-paho-mqtt, python3-psutil, - python3-jwt + python3-jwt, + python3-pip, + python3-rrdtool, + libffi-dev, + jq +Recommends: python3-periphery, + python3-spidev Description: PyMC Repeater Daemon A mesh networking repeater daemon for LoRa devices. . This package provides the pymc-repeater service for managing mesh network repeater functionality with a web interface. + . + Note: This package will install pymc_core, cherrypy-cors, and ws4py + from PyPI during postinst as they are not available in Debian repos. diff --git a/debian/pymc-repeater.postinst b/debian/pymc-repeater.postinst index 0b59d96..dc4f967 100755 --- a/debian/pymc-repeater.postinst +++ b/debian/pymc-repeater.postinst @@ -35,8 +35,19 @@ case "$1" in # Install pymc_core from PyPI if not already installed if ! python3 -c "import pymc_core" 2>/dev/null; then - echo "Installing pymc_core dependency from PyPI..." - python3 -m pip install --break-system-packages 'pymc_core[hardware]' || true + echo "Installing pymc_core[hardware] from PyPI..." + python3 -m pip install --break-system-packages 'pymc_core[hardware]>=1.0.7' || true + fi + + # Install packages not available in Debian repos + if ! python3 -c "import cherrypy_cors" 2>/dev/null; then + echo "Installing cherrypy-cors from PyPI..." + python3 -m pip install --break-system-packages 'cherrypy-cors==1.7.0' || true + fi + + if ! python3 -c "import ws4py" 2>/dev/null; then + echo "Installing ws4py from PyPI..." + python3 -m pip install --break-system-packages 'ws4py>=0.5.1' || true fi ;; esac diff --git a/manage.sh b/manage.sh index a95a546..7dff09c 100755 --- a/manage.sh +++ b/manage.sh @@ -105,6 +105,7 @@ show_main_menu() { CHOICE=$($DIALOG --backtitle "pyMC Repeater Management" --title "pyMC Repeater Management" --menu "\nCurrent Status: $status\n\nChoose an action:" 18 70 9 \ "install" "Install pyMC Repeater" \ "upgrade" "Upgrade existing installation" \ + "reset" "reset existing installation to defaults" \ "uninstall" "Remove pyMC Repeater completely" \ "config" "Configure radio settings" \ "start" "Start the service" \ @@ -129,6 +130,13 @@ show_main_menu() { show_error "pyMC Repeater is not installed!\n\nUse 'install' first." fi ;; + "reset") + if is_installed; then + reset_repeater + else + show_error "pyMC Repeater is not installed!\n\nUse 'install' first." + fi + ;; "uninstall") if is_installed; then uninstall_repeater @@ -174,26 +182,31 @@ install_repeater() { # Welcome screen $DIALOG --backtitle "pyMC Repeater Management" --title "Welcome" --msgbox "\nWelcome to pyMC Repeater Setup\n\nThis installer will configure your Linux system as a LoRa mesh network repeater.\n\nPress OK to continue..." 12 70 - # SPI Check - CONFIG_FILE="" - if [ -f "/boot/firmware/config.txt" ]; then - CONFIG_FILE="/boot/firmware/config.txt" - elif [ -f "/boot/config.txt" ]; then - CONFIG_FILE="/boot/config.txt" - fi - - if [ -n "$CONFIG_FILE" ] && ! grep -q "dtparam=spi=on" "$CONFIG_FILE" 2>/dev/null && ! grep -q "spi_bcm2835" /proc/modules 2>/dev/null; then - if ask_yes_no "SPI Not Enabled" "\nSPI interface is required but not enabled!\n\nWould you like to enable it now?\n(This will require a reboot)"; then - echo "dtparam=spi=on" >> "$CONFIG_FILE" - show_info "SPI Enabled" "\nSPI has been enabled in $CONFIG_FILE\n\nSystem will reboot now. Please run this script again after reboot." - reboot + # SPI Check - Universal approach that works on all boards + if ! ls /dev/spidev* >/dev/null 2>&1; then + # SPI devices not found, check if we're on a Raspberry Pi and can enable it + CONFIG_FILE="" + if [ -f "/boot/firmware/config.txt" ]; then + CONFIG_FILE="/boot/firmware/config.txt" + elif [ -f "/boot/config.txt" ]; then + CONFIG_FILE="/boot/config.txt" + fi + + if [ -n "$CONFIG_FILE" ]; then + # Raspberry Pi detected - offer to enable SPI + if ask_yes_no "SPI Not Enabled" "\nSPI interface is required but not detected (/dev/spidev* not found)!\n\nWould you like to enable it now?\n(This will require a reboot)"; then + echo "dtparam=spi=on" >> "$CONFIG_FILE" + show_info "SPI Enabled" "\nSPI has been enabled in $CONFIG_FILE\n\nSystem will reboot now. Please run this script again after reboot." + reboot + else + show_error "SPI is required for LoRa radio operation.\n\nPlease enable SPI manually and run this script again." + return + fi else - show_error "SPI is required for LoRa radio operation.\n\nPlease enable SPI manually and run this script again." + # Not a Raspberry Pi - provide generic instructions + show_error "SPI interface is required but not detected (/dev/spidev* not found).\n\nPlease enable SPI in your system's configuration and ensure the SPI kernel module is loaded.\n\nFor Raspberry Pi: sudo raspi-config -> Interfacing Options -> SPI -> Enable" return fi - elif [ -z "$CONFIG_FILE" ]; then - show_error "Could not find config.txt file.\n\nPlease enable SPI manually:\nsudo raspi-config -> Interfacing Options -> SPI -> Enable" - return fi # Get script directory for file copying during installation @@ -328,7 +341,7 @@ EOF echo "Note: Using optimized binary wheels for faster installation" echo "" - if pip install --break-system-packages --force-reinstall --no-cache-dir .; then + if pip install --break-system-packages --no-cache-dir .; then echo "" echo "✓ Python package installation completed successfully!" @@ -376,6 +389,74 @@ EOF fi } +# Reset function +reset_repeater() { + local config_file="$CONFIG_DIR/config.yaml" + local updated_example="$CONFIG_DIR/config.yaml.example" + + if [ "$EUID" -ne 0 ]; then + show_error "Upgrade requires root privileges.\n\nPlease run: sudo $0" + return + fi + + local current_version=$(get_version) + + if ask_yes_no "Confirm Reset of pyMC Repeater restoring to default configuration.\n\nContinue?"; then + + # Show info that upgrade is starting + show_info "Reseting" "Starting reset process...\n\nProgress will be shown in the terminal." + + echo "=== Reset Progress ===" + echo "[1/4] Stopping service..." + systemctl stop "$SERVICE_NAME" 2>/dev/null || true + + echo "[2/4] Backing up configuration..." + if [ -d "$CONFIG_DIR" ]; then + cp -r "$CONFIG_DIR" "$CONFIG_DIR.backup.$(date +%Y%m%d_%H%M%S)" 2>/dev/null || true + echo " ✓ Configuration backed up" + fi + echo "3/4 Restore default config.yaml from config.yaml.example" + cp $updated_example $config_file + sleep 5 + # Reload systemd and start the service + echo "4/4 Restart the service" + systemctl daemon-reload + systemctl start "$SERVICE_NAME" + # Show final results + sleep 2 + local ip_address=$(hostname -I | awk '{print $1}') + if is_running; then + clear + echo "═══════════════════════════════════════════════════════════════" + echo " ✓ Reset Completed Successfully!" + echo "═══════════════════════════════════════════════════════════════" + echo "" + echo "Service is running on:" + echo " → http://$ip_address:8000" + echo "" + echo "═══════════════════════════════════════════════════════════════" + echo " NEXT STEP: Complete Web Setup Wizard" + echo "═══════════════════════════════════════════════════════════════" + echo "" + echo "Open the web dashboard in your browser to complete setup:" + echo "" + echo " 1. Navigate to: http://$ip_address:8000" + echo " 2. Complete the 5-step setup wizard:" + echo " • Choose repeater name" + echo " • Select hardware board" + echo " • Configure radio settings" + echo " • Set admin password" + echo " 3. Log in to your configured repeater" + echo "" + echo "═══════════════════════════════════════════════════════════════" + echo "" + read -p "Press Enter to return to main menu..." || true + else + show_error "Installation completed but service failed to start!\n\nCheck logs from the main menu for details." + fi + fi +} + # Upgrade function upgrade_repeater() { if [ "$EUID" -ne 0 ]; then @@ -521,7 +602,7 @@ EOF echo "⚠ Package update failed, but continuing..." fi - # Note: pymc_core is already reinstalled as part of the full --force-reinstall above + echo "" echo "✓ All packages including pymc_core reinstalled successfully" diff --git a/repeater/data_acquisition/letsmesh_handler.py b/repeater/data_acquisition/letsmesh_handler.py index 32ef579..835102f 100644 --- a/repeater/data_acquisition/letsmesh_handler.py +++ b/repeater/data_acquisition/letsmesh_handler.py @@ -24,7 +24,7 @@ try: except ImportError: HAS_REASON_CODES = False - +logger = logging.getLogger("LetsMeshHandler") # -------------------------------------------------------------------- # Helper: Base64URL without padding # -------------------------------------------------------------------- @@ -70,6 +70,7 @@ class _BrokerConnection: use_tls: bool, email: str, owner: str, + broker_index: int = 0, on_connect_callback: Optional[Callable] = None, on_disconnect_callback: Optional[Callable] = None, ): @@ -78,6 +79,7 @@ class _BrokerConnection: self.public_key = public_key.upper() self.iata_code = iata_code self.jwt_expiry_minutes = jwt_expiry_minutes + self.broker_index = broker_index self.use_tls = use_tls self.email = email self.owner = owner @@ -89,9 +91,7 @@ class _BrokerConnection: self._reconnect_attempts = 0 self._reconnect_timer = None self._max_reconnect_delay = 300 # 5 minutes max - self._loop_running = False # Track if MQTT loop is active - - # MQTT WebSocket client - unique client ID per broker + self._jwt_refresh_timer = None client_id = f"meshcore_{self.public_key}_{broker['host']}" self.client = mqtt.Client(client_id=client_id, transport="websockets") self.client.on_connect = self._on_connect @@ -146,6 +146,7 @@ class _BrokerConnection: logging.info(f"Connected to {self.broker['name']}") self._running = True self._reconnect_attempts = 0 # Reset counter on success + self._schedule_jwt_refresh() # Schedule proactive JWT refresh if self._on_connect_callback: self._on_connect_callback(self.broker["name"]) else: @@ -159,16 +160,17 @@ class _BrokerConnection: self._running = False if rc != 0: # Unexpected disconnect - logging.warning(f"Disconnected from {self.broker['name']} (rc={rc})") + error_msg = get_mqtt_error_message(rc, is_disconnect=True) + logging.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}") if was_running: # Only reconnect if we were intentionally connected - self._schedule_reconnect() + self._schedule_reconnect(reason=error_msg) else: logging.info(f"Clean disconnect from {self.broker['name']}") if self._on_disconnect_callback: self._on_disconnect_callback(self.broker["name"]) - def _schedule_reconnect(self): + def _schedule_reconnect(self, reason: str = "connection lost"): """Schedule reconnection with exponential backoff""" if self._reconnect_timer: self._reconnect_timer.cancel() @@ -177,38 +179,44 @@ class _BrokerConnection: delay = min(5 * (2 ** self._reconnect_attempts), self._max_reconnect_delay) self._reconnect_attempts += 1 - logging.info(f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts})") - self._reconnect_timer = threading.Timer(delay, self._attempt_reconnect) + logging.info(f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts}, reason: {reason})") + self._reconnect_timer = threading.Timer(delay, lambda: self._attempt_reconnect(reason)) self._reconnect_timer.daemon = True self._reconnect_timer.start() - def _attempt_reconnect(self): - """Attempt to reconnect to broker""" + def _attempt_reconnect(self, reason: str = "connection lost"): + """Attempt to reconnect to broker with fresh JWT""" try: - logging.info(f"Attempting reconnection to {self.broker['name']}...") - self.refresh_jwt_token() # Refresh token before reconnecting - # Check if loop is still running - restart if needed - if not hasattr(self, '_loop_running') or not self._loop_running: - logging.warning(f"MQTT loop not running for {self.broker['name']}, restarting...") - self.client.loop_start() - self._loop_running = True + logging.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...") + + # Stop the loop if it's still running (websocket mode requires clean restart) + try: + self.client.loop_stop() + except: + pass + + self._set_jwt_credentials() + + # Reconnect and restart loop self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) + self.client.loop_start() + self._loop_running = True except Exception as e: logging.error(f"Reconnection failed for {self.broker['name']}: {e}") self._schedule_reconnect() # Try again later - def refresh_jwt_token(self): - """Refresh JWT token for MQTT authentication""" + def _set_jwt_credentials(self): + """Set JWT token credentials before connecting (CONNECT handshake only)""" try: token = self._generate_jwt() username = f"v1_{self.public_key}" self.client.username_pw_set(username=username, password=token) self._connect_time = datetime.now(UTC) - logging.debug(f"JWT token refreshed for {self.broker['name']}") + logging.debug(f"JWT credentials set for {self.broker['name']}") logging.debug(f"Using username: {username}") logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}") except Exception as e: - logging.error(f"Failed to generate JWT token for {self.broker['name']}: {e}") + logging.error(f"Failed to set JWT credentials for {self.broker['name']}: {e}") raise def connect(self): @@ -224,8 +232,8 @@ class _BrokerConnection: else: protocol = "ws" - # Generate and set JWT token - self.refresh_jwt_token() + # Set JWT credentials before CONNECT handshake + self._set_jwt_credentials() logging.info( f"Connecting to {self.broker['name']} " @@ -241,10 +249,13 @@ class _BrokerConnection: self._running = False self._loop_running = False - # Cancel any pending reconnection + # Cancel any pending timers if self._reconnect_timer: self._reconnect_timer.cancel() self._reconnect_timer = None + if self._jwt_refresh_timer: + self._jwt_refresh_timer.cancel() + self._jwt_refresh_timer = None self.client.loop_stop() self.client.disconnect() @@ -260,14 +271,53 @@ class _BrokerConnection: def is_connected(self) -> bool: """Check if connection is active""" return self._running + + def has_pending_reconnect(self) -> bool: + """Check if a reconnection is scheduled""" + return self._reconnect_timer is not None and self._reconnect_timer.is_alive() - def should_refresh_token(self) -> bool: - """Check if JWT token needs refresh (at 80% of expiry)""" + def should_reconnect_for_token_expiry(self) -> bool: + """Check if connection should be reconnected due to JWT expiry (at 80% of lifetime)""" if not self._connect_time: return False elapsed = (datetime.now(UTC) - self._connect_time).total_seconds() expiry_seconds = self.jwt_expiry_minutes * 60 - return elapsed >= expiry_seconds * 0.8 + # Stagger refresh by 5% per broker to prevent simultaneous disconnects + # Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc. + stagger_offset = self.broker_index * 0.05 + refresh_threshold = 0.80 + stagger_offset + return elapsed >= expiry_seconds * refresh_threshold + + def _schedule_jwt_refresh(self): + """Schedule proactive JWT refresh before token expires""" + if self._jwt_refresh_timer: + self._jwt_refresh_timer.cancel() + + expiry_seconds = self.jwt_expiry_minutes * 60 + # Stagger refresh by 5% per broker to prevent simultaneous disconnects + # Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc. + stagger_offset = self.broker_index * 0.05 + refresh_threshold = 0.80 + stagger_offset + refresh_delay = expiry_seconds * refresh_threshold + + logging.info( + f"JWT refresh scheduled for {self.broker['name']} in {refresh_delay:.0f}s " + f"({refresh_threshold*100:.0f}% of {self.jwt_expiry_minutes}min token lifetime)" + ) + self._jwt_refresh_timer = threading.Timer(refresh_delay, self.reconnect_for_token_expiry) + self._jwt_refresh_timer.daemon = True + self._jwt_refresh_timer.start() + + def reconnect_for_token_expiry(self): + """Proactively reconnect with new JWT before current one expires""" + if not self._running: + return + + logging.info(f"JWT token expiring soon for {self.broker['name']}, refreshing...") + self._running = False + self._jwt_refresh_timer = None + self.client.disconnect() # Triggers clean disconnect, then reconnect via timer + self._schedule_reconnect(reason="JWT token expiry") # ==================================================================== @@ -314,7 +364,7 @@ class MeshCoreToMqttJwtPusher: self.brokers = LETSMESH_BROKERS.copy() logging.info(f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers") else: - # Single broker mode (backward compatibility) + if broker_index >= len(LETSMESH_BROKERS): raise ValueError(f"Invalid broker_index {broker_index}") self.brokers = [LETSMESH_BROKERS[broker_index]] @@ -352,7 +402,7 @@ class MeshCoreToMqttJwtPusher: # Create broker connections self.connections: List[_BrokerConnection] = [] - for broker in self.brokers: + for idx, broker in enumerate(self.brokers): conn = _BrokerConnection( broker=broker, local_identity=self.local_identity, @@ -362,6 +412,7 @@ class MeshCoreToMqttJwtPusher: use_tls=self.use_tls, email=self.email, owner=self.owner, + broker_index=idx, on_connect_callback=self._on_broker_connected, on_disconnect_callback=self._on_broker_disconnected, ) @@ -384,22 +435,42 @@ class MeshCoreToMqttJwtPusher: def _on_broker_disconnected(self, broker_name: str): """Callback when a broker disconnects""" - # Check if all connections are down + # Check if all connections are down AND none have pending reconnects all_down = all(not conn.is_connected() for conn in self.connections) - if all_down: - logging.warning("All broker connections lost") - self._running = False + any_reconnecting = any(conn.has_pending_reconnect() for conn in self.connections) + + if all_down and not any_reconnecting: + logging.warning("All broker connections lost with no pending reconnects") + elif all_down: + logging.info("All brokers temporarily disconnected, reconnects pending") def connect(self): """Establish connections to all configured brokers""" - for conn in self.connections: + for idx, conn in enumerate(self.connections): try: - conn.connect() + if idx == 0: + # Connect first broker immediately + conn.connect() + else: + # Stagger additional brokers using background timers + delay = idx * 30 + logging.info(f"Staggering connection to {conn.broker['name']} by {delay}s") + timer = threading.Timer(delay, lambda c=conn: self._delayed_connect(c)) + timer.daemon = True + timer.start() except Exception as e: logging.error(f"Failed to connect to {conn.broker['name']}: {e}") + + def _delayed_connect(self, conn): + """Connect a broker after a delay (called by timer)""" + try: + conn.connect() + except Exception as e: + logging.error(f"Failed to connect to {conn.broker['name']}: {e}") def disconnect(self): """Disconnect from all brokers""" + # Stop the heartbeat loop self._running = False # Publish offline status before disconnecting @@ -423,15 +494,12 @@ class MeshCoreToMqttJwtPusher: while self._running: try: - # Refresh JWT tokens for all connections before they expire - for conn in self.connections: - if conn.is_connected() and conn.should_refresh_token(): - conn.refresh_jwt_token() - + # Publish status (JWT refresh now handled by individual broker timers) self.publish_status( state="online", origin=self.node_name, radio_config=self.radio_config ) logging.debug(f"Status heartbeat sent (next in {self.status_interval}s)") + time.sleep(self.status_interval) except Exception as e: logging.error(f"Status heartbeat error: {e}") @@ -507,7 +575,6 @@ class MeshCoreToMqttJwtPusher: results.append((conn.broker["name"], result)) logging.debug(f"Published to {conn.broker['name']}/{topic}") - # Log if no brokers were available if not results: logging.warning(f"No active broker connections for publishing to {topic}") diff --git a/repeater/data_acquisition/sqlite_handler.py b/repeater/data_acquisition/sqlite_handler.py index 205b43d..0f198c9 100644 --- a/repeater/data_acquisition/sqlite_handler.py +++ b/repeater/data_acquisition/sqlite_handler.py @@ -560,7 +560,8 @@ class SQLiteHandler: route: Optional[int] = None, start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None, - limit: int = 1000) -> list: + limit: int = 1000, + offset: int = 0) -> list: try: with sqlite3.connect(self.sqlite_path) as conn: conn.row_factory = sqlite3.Row @@ -599,8 +600,9 @@ class SQLiteHandler: else: query = base_query - query += " ORDER BY timestamp DESC LIMIT ?" + query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" params.append(limit) + params.append(offset) packets = conn.execute(query, params).fetchall() diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index 241e53b..1019415 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -216,9 +216,10 @@ class StorageCollector: start_timestamp: Optional[float] = None, end_timestamp: Optional[float] = None, limit: int = 1000, + offset: int = 0, ) -> list: return self.sqlite_handler.get_filtered_packets( - packet_type, route, start_timestamp, end_timestamp, limit + packet_type, route, start_timestamp, end_timestamp, limit, offset ) def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]: diff --git a/repeater/web/api_endpoints.py b/repeater/web/api_endpoints.py index c45f91a..e3c3155 100644 --- a/repeater/web/api_endpoints.py +++ b/repeater/web/api_endpoints.py @@ -841,6 +841,45 @@ class APIEndpoints: logger.error(f"Error getting recent packets: {e}") return self._error(e) + @cherrypy.expose + @cherrypy.tools.gzip(compress_level=6) + @cherrypy.tools.json_out() + def bulk_packets(self, limit=1000, offset=0, start_timestamp=None, end_timestamp=None): + """ + Optimized bulk packet retrieval with gzip compression and DB-level pagination. + """ + try: + # Enforce reasonable limits + limit = min(int(limit), 10000) + offset = max(int(offset), 0) + + # Get packets from storage with TRUE DB-level pagination + # Uses SQL "LIMIT ? OFFSET ?" - no Python slicing needed! + storage = self._get_storage() + packets = storage.get_filtered_packets( + packet_type=None, + route=None, + start_timestamp=float(start_timestamp) if start_timestamp else None, + end_timestamp=float(end_timestamp) if end_timestamp else None, + limit=limit, + offset=offset + ) + + response = { + "success": True, + "data": packets, + "count": len(packets), + "offset": offset, + "limit": limit, + "compressed": True + } + + return response + + except Exception as e: + logger.error(f"Error getting bulk packets: {e}") + return self._error(e) + @cherrypy.expose @cherrypy.tools.json_out() def filtered_packets(self, start_timestamp=None, end_timestamp=None, limit=1000, type=None, route=None): diff --git a/repeater/web/http_server.py b/repeater/web/http_server.py index cc9e119..2740e7f 100644 --- a/repeater/web/http_server.py +++ b/repeater/web/http_server.py @@ -284,6 +284,8 @@ class HTTPStatsServer: config = { "/": { "tools.sessions.on": False, + # "tools.gzip.on": True, + # "tools.gzip.mime_types": ["application/json", "text/html", "text/plain"], # Ensure proper content types for static files "tools.staticfile.content_types": { 'js': 'application/javascript', @@ -297,6 +299,12 @@ class HTTPStatsServer: "/api": { "tools.require_auth.on": True, }, + # Enable gzip for bulk packet downloads + "/api/bulk_packets": { + "tools.gzip.on": True, + "tools.gzip.mime_types": ["application/json"], + "tools.gzip.compress_level": 6, + }, # Public documentation endpoints (no auth required) "/api/openapi": { "tools.require_auth.on": False, @@ -332,6 +340,7 @@ class HTTPStatsServer: "tools.websocket.handler_cls": PacketWebSocket, "tools.trailing_slash.on": False, "tools.require_auth.on": False, + "tools.gzip.on": False, } logger.info("WebSocket endpoint configured at /ws/packets") except Exception as e: