mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-03-28 17:43:06 +01:00
Merge branch 'dev' into dev
This commit is contained in:
11
debian/control
vendored
11
debian/control
vendored
@@ -26,9 +26,18 @@ Depends: ${python3:Depends},
|
||||
python3-cherrypy3,
|
||||
python3-paho-mqtt,
|
||||
python3-psutil,
|
||||
python3-jwt
|
||||
python3-jwt,
|
||||
python3-pip,
|
||||
python3-rrdtool,
|
||||
libffi-dev,
|
||||
jq
|
||||
Recommends: python3-periphery,
|
||||
python3-spidev
|
||||
Description: PyMC Repeater Daemon
|
||||
A mesh networking repeater daemon for LoRa devices.
|
||||
.
|
||||
This package provides the pymc-repeater service for managing
|
||||
mesh network repeater functionality with a web interface.
|
||||
.
|
||||
Note: This package will install pymc_core, cherrypy-cors, and ws4py
|
||||
from PyPI during postinst as they are not available in Debian repos.
|
||||
|
||||
15
debian/pymc-repeater.postinst
vendored
15
debian/pymc-repeater.postinst
vendored
@@ -35,8 +35,19 @@ case "$1" in
|
||||
|
||||
# Install pymc_core from PyPI if not already installed
|
||||
if ! python3 -c "import pymc_core" 2>/dev/null; then
|
||||
echo "Installing pymc_core dependency from PyPI..."
|
||||
python3 -m pip install --break-system-packages 'pymc_core[hardware]' || true
|
||||
echo "Installing pymc_core[hardware] from PyPI..."
|
||||
python3 -m pip install --break-system-packages 'pymc_core[hardware]>=1.0.7' || true
|
||||
fi
|
||||
|
||||
# Install packages not available in Debian repos
|
||||
if ! python3 -c "import cherrypy_cors" 2>/dev/null; then
|
||||
echo "Installing cherrypy-cors from PyPI..."
|
||||
python3 -m pip install --break-system-packages 'cherrypy-cors==1.7.0' || true
|
||||
fi
|
||||
|
||||
if ! python3 -c "import ws4py" 2>/dev/null; then
|
||||
echo "Installing ws4py from PyPI..."
|
||||
python3 -m pip install --break-system-packages 'ws4py>=0.5.1' || true
|
||||
fi
|
||||
;;
|
||||
esac
|
||||
|
||||
119
manage.sh
119
manage.sh
@@ -105,6 +105,7 @@ show_main_menu() {
|
||||
CHOICE=$($DIALOG --backtitle "pyMC Repeater Management" --title "pyMC Repeater Management" --menu "\nCurrent Status: $status\n\nChoose an action:" 18 70 9 \
|
||||
"install" "Install pyMC Repeater" \
|
||||
"upgrade" "Upgrade existing installation" \
|
||||
"reset" "reset existing installation to defaults" \
|
||||
"uninstall" "Remove pyMC Repeater completely" \
|
||||
"config" "Configure radio settings" \
|
||||
"start" "Start the service" \
|
||||
@@ -129,6 +130,13 @@ show_main_menu() {
|
||||
show_error "pyMC Repeater is not installed!\n\nUse 'install' first."
|
||||
fi
|
||||
;;
|
||||
"reset")
|
||||
if is_installed; then
|
||||
reset_repeater
|
||||
else
|
||||
show_error "pyMC Repeater is not installed!\n\nUse 'install' first."
|
||||
fi
|
||||
;;
|
||||
"uninstall")
|
||||
if is_installed; then
|
||||
uninstall_repeater
|
||||
@@ -174,26 +182,31 @@ install_repeater() {
|
||||
# Welcome screen
|
||||
$DIALOG --backtitle "pyMC Repeater Management" --title "Welcome" --msgbox "\nWelcome to pyMC Repeater Setup\n\nThis installer will configure your Linux system as a LoRa mesh network repeater.\n\nPress OK to continue..." 12 70
|
||||
|
||||
# SPI Check
|
||||
CONFIG_FILE=""
|
||||
if [ -f "/boot/firmware/config.txt" ]; then
|
||||
CONFIG_FILE="/boot/firmware/config.txt"
|
||||
elif [ -f "/boot/config.txt" ]; then
|
||||
CONFIG_FILE="/boot/config.txt"
|
||||
fi
|
||||
|
||||
if [ -n "$CONFIG_FILE" ] && ! grep -q "dtparam=spi=on" "$CONFIG_FILE" 2>/dev/null && ! grep -q "spi_bcm2835" /proc/modules 2>/dev/null; then
|
||||
if ask_yes_no "SPI Not Enabled" "\nSPI interface is required but not enabled!\n\nWould you like to enable it now?\n(This will require a reboot)"; then
|
||||
echo "dtparam=spi=on" >> "$CONFIG_FILE"
|
||||
show_info "SPI Enabled" "\nSPI has been enabled in $CONFIG_FILE\n\nSystem will reboot now. Please run this script again after reboot."
|
||||
reboot
|
||||
# SPI Check - Universal approach that works on all boards
|
||||
if ! ls /dev/spidev* >/dev/null 2>&1; then
|
||||
# SPI devices not found, check if we're on a Raspberry Pi and can enable it
|
||||
CONFIG_FILE=""
|
||||
if [ -f "/boot/firmware/config.txt" ]; then
|
||||
CONFIG_FILE="/boot/firmware/config.txt"
|
||||
elif [ -f "/boot/config.txt" ]; then
|
||||
CONFIG_FILE="/boot/config.txt"
|
||||
fi
|
||||
|
||||
if [ -n "$CONFIG_FILE" ]; then
|
||||
# Raspberry Pi detected - offer to enable SPI
|
||||
if ask_yes_no "SPI Not Enabled" "\nSPI interface is required but not detected (/dev/spidev* not found)!\n\nWould you like to enable it now?\n(This will require a reboot)"; then
|
||||
echo "dtparam=spi=on" >> "$CONFIG_FILE"
|
||||
show_info "SPI Enabled" "\nSPI has been enabled in $CONFIG_FILE\n\nSystem will reboot now. Please run this script again after reboot."
|
||||
reboot
|
||||
else
|
||||
show_error "SPI is required for LoRa radio operation.\n\nPlease enable SPI manually and run this script again."
|
||||
return
|
||||
fi
|
||||
else
|
||||
show_error "SPI is required for LoRa radio operation.\n\nPlease enable SPI manually and run this script again."
|
||||
# Not a Raspberry Pi - provide generic instructions
|
||||
show_error "SPI interface is required but not detected (/dev/spidev* not found).\n\nPlease enable SPI in your system's configuration and ensure the SPI kernel module is loaded.\n\nFor Raspberry Pi: sudo raspi-config -> Interfacing Options -> SPI -> Enable"
|
||||
return
|
||||
fi
|
||||
elif [ -z "$CONFIG_FILE" ]; then
|
||||
show_error "Could not find config.txt file.\n\nPlease enable SPI manually:\nsudo raspi-config -> Interfacing Options -> SPI -> Enable"
|
||||
return
|
||||
fi
|
||||
|
||||
# Get script directory for file copying during installation
|
||||
@@ -328,7 +341,7 @@ EOF
|
||||
echo "Note: Using optimized binary wheels for faster installation"
|
||||
echo ""
|
||||
|
||||
if pip install --break-system-packages --force-reinstall --no-cache-dir .; then
|
||||
if pip install --break-system-packages --no-cache-dir .; then
|
||||
echo ""
|
||||
echo "✓ Python package installation completed successfully!"
|
||||
|
||||
@@ -376,6 +389,74 @@ EOF
|
||||
fi
|
||||
}
|
||||
|
||||
# Reset function
|
||||
reset_repeater() {
|
||||
local config_file="$CONFIG_DIR/config.yaml"
|
||||
local updated_example="$CONFIG_DIR/config.yaml.example"
|
||||
|
||||
if [ "$EUID" -ne 0 ]; then
|
||||
show_error "Upgrade requires root privileges.\n\nPlease run: sudo $0"
|
||||
return
|
||||
fi
|
||||
|
||||
local current_version=$(get_version)
|
||||
|
||||
if ask_yes_no "Confirm Reset of pyMC Repeater restoring to default configuration.\n\nContinue?"; then
|
||||
|
||||
# Show info that upgrade is starting
|
||||
show_info "Reseting" "Starting reset process...\n\nProgress will be shown in the terminal."
|
||||
|
||||
echo "=== Reset Progress ==="
|
||||
echo "[1/4] Stopping service..."
|
||||
systemctl stop "$SERVICE_NAME" 2>/dev/null || true
|
||||
|
||||
echo "[2/4] Backing up configuration..."
|
||||
if [ -d "$CONFIG_DIR" ]; then
|
||||
cp -r "$CONFIG_DIR" "$CONFIG_DIR.backup.$(date +%Y%m%d_%H%M%S)" 2>/dev/null || true
|
||||
echo " ✓ Configuration backed up"
|
||||
fi
|
||||
echo "3/4 Restore default config.yaml from config.yaml.example"
|
||||
cp $updated_example $config_file
|
||||
sleep 5
|
||||
# Reload systemd and start the service
|
||||
echo "4/4 Restart the service"
|
||||
systemctl daemon-reload
|
||||
systemctl start "$SERVICE_NAME"
|
||||
# Show final results
|
||||
sleep 2
|
||||
local ip_address=$(hostname -I | awk '{print $1}')
|
||||
if is_running; then
|
||||
clear
|
||||
echo "═══════════════════════════════════════════════════════════════"
|
||||
echo " ✓ Reset Completed Successfully!"
|
||||
echo "═══════════════════════════════════════════════════════════════"
|
||||
echo ""
|
||||
echo "Service is running on:"
|
||||
echo " → http://$ip_address:8000"
|
||||
echo ""
|
||||
echo "═══════════════════════════════════════════════════════════════"
|
||||
echo " NEXT STEP: Complete Web Setup Wizard"
|
||||
echo "═══════════════════════════════════════════════════════════════"
|
||||
echo ""
|
||||
echo "Open the web dashboard in your browser to complete setup:"
|
||||
echo ""
|
||||
echo " 1. Navigate to: http://$ip_address:8000"
|
||||
echo " 2. Complete the 5-step setup wizard:"
|
||||
echo " • Choose repeater name"
|
||||
echo " • Select hardware board"
|
||||
echo " • Configure radio settings"
|
||||
echo " • Set admin password"
|
||||
echo " 3. Log in to your configured repeater"
|
||||
echo ""
|
||||
echo "═══════════════════════════════════════════════════════════════"
|
||||
echo ""
|
||||
read -p "Press Enter to return to main menu..." || true
|
||||
else
|
||||
show_error "Installation completed but service failed to start!\n\nCheck logs from the main menu for details."
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
# Upgrade function
|
||||
upgrade_repeater() {
|
||||
if [ "$EUID" -ne 0 ]; then
|
||||
@@ -521,7 +602,7 @@ EOF
|
||||
echo "⚠ Package update failed, but continuing..."
|
||||
fi
|
||||
|
||||
# Note: pymc_core is already reinstalled as part of the full --force-reinstall above
|
||||
|
||||
echo ""
|
||||
echo "✓ All packages including pymc_core reinstalled successfully"
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ try:
|
||||
except ImportError:
|
||||
HAS_REASON_CODES = False
|
||||
|
||||
|
||||
logger = logging.getLogger("LetsMeshHandler")
|
||||
# --------------------------------------------------------------------
|
||||
# Helper: Base64URL without padding
|
||||
# --------------------------------------------------------------------
|
||||
@@ -70,6 +70,7 @@ class _BrokerConnection:
|
||||
use_tls: bool,
|
||||
email: str,
|
||||
owner: str,
|
||||
broker_index: int = 0,
|
||||
on_connect_callback: Optional[Callable] = None,
|
||||
on_disconnect_callback: Optional[Callable] = None,
|
||||
):
|
||||
@@ -78,6 +79,7 @@ class _BrokerConnection:
|
||||
self.public_key = public_key.upper()
|
||||
self.iata_code = iata_code
|
||||
self.jwt_expiry_minutes = jwt_expiry_minutes
|
||||
self.broker_index = broker_index
|
||||
self.use_tls = use_tls
|
||||
self.email = email
|
||||
self.owner = owner
|
||||
@@ -89,9 +91,7 @@ class _BrokerConnection:
|
||||
self._reconnect_attempts = 0
|
||||
self._reconnect_timer = None
|
||||
self._max_reconnect_delay = 300 # 5 minutes max
|
||||
self._loop_running = False # Track if MQTT loop is active
|
||||
|
||||
# MQTT WebSocket client - unique client ID per broker
|
||||
self._jwt_refresh_timer = None
|
||||
client_id = f"meshcore_{self.public_key}_{broker['host']}"
|
||||
self.client = mqtt.Client(client_id=client_id, transport="websockets")
|
||||
self.client.on_connect = self._on_connect
|
||||
@@ -146,6 +146,7 @@ class _BrokerConnection:
|
||||
logging.info(f"Connected to {self.broker['name']}")
|
||||
self._running = True
|
||||
self._reconnect_attempts = 0 # Reset counter on success
|
||||
self._schedule_jwt_refresh() # Schedule proactive JWT refresh
|
||||
if self._on_connect_callback:
|
||||
self._on_connect_callback(self.broker["name"])
|
||||
else:
|
||||
@@ -159,16 +160,17 @@ class _BrokerConnection:
|
||||
self._running = False
|
||||
|
||||
if rc != 0: # Unexpected disconnect
|
||||
logging.warning(f"Disconnected from {self.broker['name']} (rc={rc})")
|
||||
error_msg = get_mqtt_error_message(rc, is_disconnect=True)
|
||||
logging.warning(f"Disconnected from {self.broker['name']} (rc={rc}): {error_msg}")
|
||||
if was_running: # Only reconnect if we were intentionally connected
|
||||
self._schedule_reconnect()
|
||||
self._schedule_reconnect(reason=error_msg)
|
||||
else:
|
||||
logging.info(f"Clean disconnect from {self.broker['name']}")
|
||||
|
||||
if self._on_disconnect_callback:
|
||||
self._on_disconnect_callback(self.broker["name"])
|
||||
|
||||
def _schedule_reconnect(self):
|
||||
def _schedule_reconnect(self, reason: str = "connection lost"):
|
||||
"""Schedule reconnection with exponential backoff"""
|
||||
if self._reconnect_timer:
|
||||
self._reconnect_timer.cancel()
|
||||
@@ -177,38 +179,44 @@ class _BrokerConnection:
|
||||
delay = min(5 * (2 ** self._reconnect_attempts), self._max_reconnect_delay)
|
||||
self._reconnect_attempts += 1
|
||||
|
||||
logging.info(f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts})")
|
||||
self._reconnect_timer = threading.Timer(delay, self._attempt_reconnect)
|
||||
logging.info(f"Scheduling reconnect to {self.broker['name']} in {delay}s (attempt {self._reconnect_attempts}, reason: {reason})")
|
||||
self._reconnect_timer = threading.Timer(delay, lambda: self._attempt_reconnect(reason))
|
||||
self._reconnect_timer.daemon = True
|
||||
self._reconnect_timer.start()
|
||||
|
||||
def _attempt_reconnect(self):
|
||||
"""Attempt to reconnect to broker"""
|
||||
def _attempt_reconnect(self, reason: str = "connection lost"):
|
||||
"""Attempt to reconnect to broker with fresh JWT"""
|
||||
try:
|
||||
logging.info(f"Attempting reconnection to {self.broker['name']}...")
|
||||
self.refresh_jwt_token() # Refresh token before reconnecting
|
||||
# Check if loop is still running - restart if needed
|
||||
if not hasattr(self, '_loop_running') or not self._loop_running:
|
||||
logging.warning(f"MQTT loop not running for {self.broker['name']}, restarting...")
|
||||
self.client.loop_start()
|
||||
self._loop_running = True
|
||||
logging.info(f"Attempting reconnection to {self.broker['name']} (reason: {reason})...")
|
||||
|
||||
# Stop the loop if it's still running (websocket mode requires clean restart)
|
||||
try:
|
||||
self.client.loop_stop()
|
||||
except:
|
||||
pass
|
||||
|
||||
self._set_jwt_credentials()
|
||||
|
||||
# Reconnect and restart loop
|
||||
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
|
||||
self.client.loop_start()
|
||||
self._loop_running = True
|
||||
except Exception as e:
|
||||
logging.error(f"Reconnection failed for {self.broker['name']}: {e}")
|
||||
self._schedule_reconnect() # Try again later
|
||||
|
||||
def refresh_jwt_token(self):
|
||||
"""Refresh JWT token for MQTT authentication"""
|
||||
def _set_jwt_credentials(self):
|
||||
"""Set JWT token credentials before connecting (CONNECT handshake only)"""
|
||||
try:
|
||||
token = self._generate_jwt()
|
||||
username = f"v1_{self.public_key}"
|
||||
self.client.username_pw_set(username=username, password=token)
|
||||
self._connect_time = datetime.now(UTC)
|
||||
logging.debug(f"JWT token refreshed for {self.broker['name']}")
|
||||
logging.debug(f"JWT credentials set for {self.broker['name']}")
|
||||
logging.debug(f"Using username: {username}")
|
||||
logging.debug(f"Public key: {self.public_key[:16]}...{self.public_key[-16:]}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to generate JWT token for {self.broker['name']}: {e}")
|
||||
logging.error(f"Failed to set JWT credentials for {self.broker['name']}: {e}")
|
||||
raise
|
||||
|
||||
def connect(self):
|
||||
@@ -224,8 +232,8 @@ class _BrokerConnection:
|
||||
else:
|
||||
protocol = "ws"
|
||||
|
||||
# Generate and set JWT token
|
||||
self.refresh_jwt_token()
|
||||
# Set JWT credentials before CONNECT handshake
|
||||
self._set_jwt_credentials()
|
||||
|
||||
logging.info(
|
||||
f"Connecting to {self.broker['name']} "
|
||||
@@ -241,10 +249,13 @@ class _BrokerConnection:
|
||||
self._running = False
|
||||
self._loop_running = False
|
||||
|
||||
# Cancel any pending reconnection
|
||||
# Cancel any pending timers
|
||||
if self._reconnect_timer:
|
||||
self._reconnect_timer.cancel()
|
||||
self._reconnect_timer = None
|
||||
if self._jwt_refresh_timer:
|
||||
self._jwt_refresh_timer.cancel()
|
||||
self._jwt_refresh_timer = None
|
||||
|
||||
self.client.loop_stop()
|
||||
self.client.disconnect()
|
||||
@@ -260,14 +271,53 @@ class _BrokerConnection:
|
||||
def is_connected(self) -> bool:
|
||||
"""Check if connection is active"""
|
||||
return self._running
|
||||
|
||||
def has_pending_reconnect(self) -> bool:
|
||||
"""Check if a reconnection is scheduled"""
|
||||
return self._reconnect_timer is not None and self._reconnect_timer.is_alive()
|
||||
|
||||
def should_refresh_token(self) -> bool:
|
||||
"""Check if JWT token needs refresh (at 80% of expiry)"""
|
||||
def should_reconnect_for_token_expiry(self) -> bool:
|
||||
"""Check if connection should be reconnected due to JWT expiry (at 80% of lifetime)"""
|
||||
if not self._connect_time:
|
||||
return False
|
||||
elapsed = (datetime.now(UTC) - self._connect_time).total_seconds()
|
||||
expiry_seconds = self.jwt_expiry_minutes * 60
|
||||
return elapsed >= expiry_seconds * 0.8
|
||||
# Stagger refresh by 5% per broker to prevent simultaneous disconnects
|
||||
# Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc.
|
||||
stagger_offset = self.broker_index * 0.05
|
||||
refresh_threshold = 0.80 + stagger_offset
|
||||
return elapsed >= expiry_seconds * refresh_threshold
|
||||
|
||||
def _schedule_jwt_refresh(self):
|
||||
"""Schedule proactive JWT refresh before token expires"""
|
||||
if self._jwt_refresh_timer:
|
||||
self._jwt_refresh_timer.cancel()
|
||||
|
||||
expiry_seconds = self.jwt_expiry_minutes * 60
|
||||
# Stagger refresh by 5% per broker to prevent simultaneous disconnects
|
||||
# Broker 0: 80%, Broker 1: 85%, Broker 2: 90%, etc.
|
||||
stagger_offset = self.broker_index * 0.05
|
||||
refresh_threshold = 0.80 + stagger_offset
|
||||
refresh_delay = expiry_seconds * refresh_threshold
|
||||
|
||||
logging.info(
|
||||
f"JWT refresh scheduled for {self.broker['name']} in {refresh_delay:.0f}s "
|
||||
f"({refresh_threshold*100:.0f}% of {self.jwt_expiry_minutes}min token lifetime)"
|
||||
)
|
||||
self._jwt_refresh_timer = threading.Timer(refresh_delay, self.reconnect_for_token_expiry)
|
||||
self._jwt_refresh_timer.daemon = True
|
||||
self._jwt_refresh_timer.start()
|
||||
|
||||
def reconnect_for_token_expiry(self):
|
||||
"""Proactively reconnect with new JWT before current one expires"""
|
||||
if not self._running:
|
||||
return
|
||||
|
||||
logging.info(f"JWT token expiring soon for {self.broker['name']}, refreshing...")
|
||||
self._running = False
|
||||
self._jwt_refresh_timer = None
|
||||
self.client.disconnect() # Triggers clean disconnect, then reconnect via timer
|
||||
self._schedule_reconnect(reason="JWT token expiry")
|
||||
|
||||
|
||||
# ====================================================================
|
||||
@@ -314,7 +364,7 @@ class MeshCoreToMqttJwtPusher:
|
||||
self.brokers = LETSMESH_BROKERS.copy()
|
||||
logging.info(f"Multi-broker mode: connecting to all {len(LETSMESH_BROKERS)} built-in brokers")
|
||||
else:
|
||||
# Single broker mode (backward compatibility)
|
||||
|
||||
if broker_index >= len(LETSMESH_BROKERS):
|
||||
raise ValueError(f"Invalid broker_index {broker_index}")
|
||||
self.brokers = [LETSMESH_BROKERS[broker_index]]
|
||||
@@ -352,7 +402,7 @@ class MeshCoreToMqttJwtPusher:
|
||||
|
||||
# Create broker connections
|
||||
self.connections: List[_BrokerConnection] = []
|
||||
for broker in self.brokers:
|
||||
for idx, broker in enumerate(self.brokers):
|
||||
conn = _BrokerConnection(
|
||||
broker=broker,
|
||||
local_identity=self.local_identity,
|
||||
@@ -362,6 +412,7 @@ class MeshCoreToMqttJwtPusher:
|
||||
use_tls=self.use_tls,
|
||||
email=self.email,
|
||||
owner=self.owner,
|
||||
broker_index=idx,
|
||||
on_connect_callback=self._on_broker_connected,
|
||||
on_disconnect_callback=self._on_broker_disconnected,
|
||||
)
|
||||
@@ -384,22 +435,42 @@ class MeshCoreToMqttJwtPusher:
|
||||
|
||||
def _on_broker_disconnected(self, broker_name: str):
|
||||
"""Callback when a broker disconnects"""
|
||||
# Check if all connections are down
|
||||
# Check if all connections are down AND none have pending reconnects
|
||||
all_down = all(not conn.is_connected() for conn in self.connections)
|
||||
if all_down:
|
||||
logging.warning("All broker connections lost")
|
||||
self._running = False
|
||||
any_reconnecting = any(conn.has_pending_reconnect() for conn in self.connections)
|
||||
|
||||
if all_down and not any_reconnecting:
|
||||
logging.warning("All broker connections lost with no pending reconnects")
|
||||
elif all_down:
|
||||
logging.info("All brokers temporarily disconnected, reconnects pending")
|
||||
|
||||
def connect(self):
|
||||
"""Establish connections to all configured brokers"""
|
||||
for conn in self.connections:
|
||||
for idx, conn in enumerate(self.connections):
|
||||
try:
|
||||
conn.connect()
|
||||
if idx == 0:
|
||||
# Connect first broker immediately
|
||||
conn.connect()
|
||||
else:
|
||||
# Stagger additional brokers using background timers
|
||||
delay = idx * 30
|
||||
logging.info(f"Staggering connection to {conn.broker['name']} by {delay}s")
|
||||
timer = threading.Timer(delay, lambda c=conn: self._delayed_connect(c))
|
||||
timer.daemon = True
|
||||
timer.start()
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
|
||||
|
||||
def _delayed_connect(self, conn):
|
||||
"""Connect a broker after a delay (called by timer)"""
|
||||
try:
|
||||
conn.connect()
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to connect to {conn.broker['name']}: {e}")
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from all brokers"""
|
||||
# Stop the heartbeat loop
|
||||
self._running = False
|
||||
|
||||
# Publish offline status before disconnecting
|
||||
@@ -423,15 +494,12 @@ class MeshCoreToMqttJwtPusher:
|
||||
|
||||
while self._running:
|
||||
try:
|
||||
# Refresh JWT tokens for all connections before they expire
|
||||
for conn in self.connections:
|
||||
if conn.is_connected() and conn.should_refresh_token():
|
||||
conn.refresh_jwt_token()
|
||||
|
||||
# Publish status (JWT refresh now handled by individual broker timers)
|
||||
self.publish_status(
|
||||
state="online", origin=self.node_name, radio_config=self.radio_config
|
||||
)
|
||||
logging.debug(f"Status heartbeat sent (next in {self.status_interval}s)")
|
||||
|
||||
time.sleep(self.status_interval)
|
||||
except Exception as e:
|
||||
logging.error(f"Status heartbeat error: {e}")
|
||||
@@ -507,7 +575,6 @@ class MeshCoreToMqttJwtPusher:
|
||||
results.append((conn.broker["name"], result))
|
||||
logging.debug(f"Published to {conn.broker['name']}/{topic}")
|
||||
|
||||
# Log if no brokers were available
|
||||
if not results:
|
||||
logging.warning(f"No active broker connections for publishing to {topic}")
|
||||
|
||||
|
||||
@@ -560,7 +560,8 @@ class SQLiteHandler:
|
||||
route: Optional[int] = None,
|
||||
start_timestamp: Optional[float] = None,
|
||||
end_timestamp: Optional[float] = None,
|
||||
limit: int = 1000) -> list:
|
||||
limit: int = 1000,
|
||||
offset: int = 0) -> list:
|
||||
try:
|
||||
with sqlite3.connect(self.sqlite_path) as conn:
|
||||
conn.row_factory = sqlite3.Row
|
||||
@@ -599,8 +600,9 @@ class SQLiteHandler:
|
||||
else:
|
||||
query = base_query
|
||||
|
||||
query += " ORDER BY timestamp DESC LIMIT ?"
|
||||
query += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
|
||||
params.append(limit)
|
||||
params.append(offset)
|
||||
|
||||
packets = conn.execute(query, params).fetchall()
|
||||
|
||||
|
||||
@@ -216,9 +216,10 @@ class StorageCollector:
|
||||
start_timestamp: Optional[float] = None,
|
||||
end_timestamp: Optional[float] = None,
|
||||
limit: int = 1000,
|
||||
offset: int = 0,
|
||||
) -> list:
|
||||
return self.sqlite_handler.get_filtered_packets(
|
||||
packet_type, route, start_timestamp, end_timestamp, limit
|
||||
packet_type, route, start_timestamp, end_timestamp, limit, offset
|
||||
)
|
||||
|
||||
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
|
||||
|
||||
@@ -841,6 +841,45 @@ class APIEndpoints:
|
||||
logger.error(f"Error getting recent packets: {e}")
|
||||
return self._error(e)
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.gzip(compress_level=6)
|
||||
@cherrypy.tools.json_out()
|
||||
def bulk_packets(self, limit=1000, offset=0, start_timestamp=None, end_timestamp=None):
|
||||
"""
|
||||
Optimized bulk packet retrieval with gzip compression and DB-level pagination.
|
||||
"""
|
||||
try:
|
||||
# Enforce reasonable limits
|
||||
limit = min(int(limit), 10000)
|
||||
offset = max(int(offset), 0)
|
||||
|
||||
# Get packets from storage with TRUE DB-level pagination
|
||||
# Uses SQL "LIMIT ? OFFSET ?" - no Python slicing needed!
|
||||
storage = self._get_storage()
|
||||
packets = storage.get_filtered_packets(
|
||||
packet_type=None,
|
||||
route=None,
|
||||
start_timestamp=float(start_timestamp) if start_timestamp else None,
|
||||
end_timestamp=float(end_timestamp) if end_timestamp else None,
|
||||
limit=limit,
|
||||
offset=offset
|
||||
)
|
||||
|
||||
response = {
|
||||
"success": True,
|
||||
"data": packets,
|
||||
"count": len(packets),
|
||||
"offset": offset,
|
||||
"limit": limit,
|
||||
"compressed": True
|
||||
}
|
||||
|
||||
return response
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting bulk packets: {e}")
|
||||
return self._error(e)
|
||||
|
||||
@cherrypy.expose
|
||||
@cherrypy.tools.json_out()
|
||||
def filtered_packets(self, start_timestamp=None, end_timestamp=None, limit=1000, type=None, route=None):
|
||||
|
||||
@@ -284,6 +284,8 @@ class HTTPStatsServer:
|
||||
config = {
|
||||
"/": {
|
||||
"tools.sessions.on": False,
|
||||
# "tools.gzip.on": True,
|
||||
# "tools.gzip.mime_types": ["application/json", "text/html", "text/plain"],
|
||||
# Ensure proper content types for static files
|
||||
"tools.staticfile.content_types": {
|
||||
'js': 'application/javascript',
|
||||
@@ -297,6 +299,12 @@ class HTTPStatsServer:
|
||||
"/api": {
|
||||
"tools.require_auth.on": True,
|
||||
},
|
||||
# Enable gzip for bulk packet downloads
|
||||
"/api/bulk_packets": {
|
||||
"tools.gzip.on": True,
|
||||
"tools.gzip.mime_types": ["application/json"],
|
||||
"tools.gzip.compress_level": 6,
|
||||
},
|
||||
# Public documentation endpoints (no auth required)
|
||||
"/api/openapi": {
|
||||
"tools.require_auth.on": False,
|
||||
@@ -332,6 +340,7 @@ class HTTPStatsServer:
|
||||
"tools.websocket.handler_cls": PacketWebSocket,
|
||||
"tools.trailing_slash.on": False,
|
||||
"tools.require_auth.on": False,
|
||||
"tools.gzip.on": False,
|
||||
}
|
||||
logger.info("WebSocket endpoint configured at /ws/packets")
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user