mirror of
https://github.com/rightup/pyMC_Repeater.git
synced 2026-05-18 15:26:21 +02:00
489 lines
18 KiB
Python
489 lines
18 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Optional
|
|
|
|
from repeater.config import resolve_storage_dir
|
|
|
|
from .mqtt_handler import MeshCoreToMqttPusher
|
|
from .rrdtool_handler import RRDToolHandler
|
|
from .sqlite_handler import SQLiteHandler
|
|
from .storage_utils import PacketRecord
|
|
|
|
logger = logging.getLogger("StorageCollector")
|
|
|
|
|
|
class StorageCollector:
|
|
def __init__(self, config: dict, local_identity=None, repeater_handler=None):
|
|
self.config = config
|
|
self.repeater_handler = repeater_handler
|
|
self.glass_publish_callback = None
|
|
self._pending_tasks = set()
|
|
|
|
self.storage_dir = resolve_storage_dir(config)
|
|
self.storage_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.sqlite_handler = SQLiteHandler(self.storage_dir)
|
|
self.rrd_handler = RRDToolHandler(self.storage_dir)
|
|
|
|
# Initialize MQTT handler if configured
|
|
self.mqtt_handler = None
|
|
if (config.get("mqtt_brokers", {}) or config.get("letsmesh", {}) or config.get("mqtt", {})) and local_identity:
|
|
try:
|
|
# Pass local_identity directly (supports both standard and firmware keys)
|
|
self.mqtt_handler = MeshCoreToMqttPusher(
|
|
local_identity=local_identity,
|
|
config=config,
|
|
stats_provider=self._get_live_stats,
|
|
)
|
|
self.mqtt_handler.connect()
|
|
|
|
public_key_hex = local_identity.get_public_key().hex()
|
|
logger.info(
|
|
f"MQTT handler initialized with public key: {public_key_hex[:16]}..."
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize MQTT handler: {e}")
|
|
self.mqtt_handler = None
|
|
|
|
# Initialize hardware stats collector
|
|
from .hardware_stats import HardwareStatsCollector
|
|
|
|
self.hardware_stats = HardwareStatsCollector()
|
|
logger.info("Hardware stats collector initialized")
|
|
|
|
# Initialize WebSocket handler for real-time updates
|
|
self.websocket_available = False
|
|
self.websocket_has_connected_clients = lambda: False
|
|
self._last_ws_stats_broadcast: float = 0.0
|
|
self._ws_stats_broadcast_interval_sec: float = 5.0
|
|
try:
|
|
from .websocket_handler import (
|
|
broadcast_packet,
|
|
broadcast_stats,
|
|
has_connected_clients,
|
|
)
|
|
|
|
self.websocket_broadcast_packet = broadcast_packet
|
|
self.websocket_broadcast_stats = broadcast_stats
|
|
self.websocket_has_connected_clients = has_connected_clients
|
|
self.websocket_available = True
|
|
logger.info("WebSocket handler initialized for real-time updates")
|
|
except ImportError:
|
|
logger.debug("WebSocket handler not available")
|
|
|
|
def _track_task(self, task: asyncio.Task):
|
|
"""Track background task for lifecycle management and error handling."""
|
|
self._pending_tasks.add(task)
|
|
|
|
def on_done(t: asyncio.Task):
|
|
self._pending_tasks.discard(t)
|
|
try:
|
|
t.result()
|
|
except asyncio.CancelledError:
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"Background task error: {e}", exc_info=True)
|
|
|
|
task.add_done_callback(on_done)
|
|
|
|
def _schedule_background(self, coro_factory, *args, sync_fallback=None):
|
|
"""Schedule a coroutine if a loop exists; otherwise run sync fallback."""
|
|
try:
|
|
loop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
if sync_fallback is not None:
|
|
sync_fallback(*args)
|
|
return
|
|
|
|
task = loop.create_task(coro_factory(*args))
|
|
self._track_task(task)
|
|
|
|
def _get_live_stats(self) -> dict:
|
|
"""Get live stats from RepeaterHandler"""
|
|
if not self.repeater_handler:
|
|
return {
|
|
"uptime_secs": 0,
|
|
"packets_sent": 0,
|
|
"packets_received": 0,
|
|
"errors": 0,
|
|
"queue_len": 0,
|
|
}
|
|
|
|
uptime_secs = int(time.time() - self.repeater_handler.start_time)
|
|
|
|
# Get airtime stats
|
|
airtime_stats = self.repeater_handler.airtime_mgr.get_stats()
|
|
|
|
# Get latest noise floor from database
|
|
noise_floor = None
|
|
try:
|
|
recent_noise = self.sqlite_handler.get_noise_floor_history(hours=0.5, limit=1)
|
|
if recent_noise and len(recent_noise) > 0:
|
|
noise_floor = recent_noise[-1].get("noise_floor_dbm")
|
|
except Exception as e:
|
|
logger.debug(f"Could not fetch noise floor: {e}")
|
|
|
|
stats = {
|
|
"uptime_secs": uptime_secs,
|
|
"packets_sent": self.repeater_handler.forwarded_count,
|
|
"packets_received": self.repeater_handler.rx_count,
|
|
"errors": 0,
|
|
"queue_len": 0, # N/A for Python repeater
|
|
}
|
|
|
|
# Add airtime stats
|
|
if airtime_stats:
|
|
stats["tx_air_secs"] = airtime_stats["total_airtime_ms"] / 1000
|
|
stats["current_airtime_ms"] = airtime_stats["current_airtime_ms"]
|
|
stats["utilization_percent"] = airtime_stats["utilization_percent"]
|
|
|
|
# Add noise floor if available
|
|
if noise_floor is not None:
|
|
stats["noise_floor"] = noise_floor
|
|
|
|
return stats
|
|
|
|
def record_packet(self, packet_record: dict, skip_mqtt_if_invalid: bool = True):
|
|
"""Record packet to storage and publish to MQTT
|
|
|
|
Args:
|
|
packet_record: Dictionary containing packet information
|
|
skip_mqtt_if_invalid: If True, don't publish packets with drop_reason to mqtt
|
|
"""
|
|
logger.debug(
|
|
f"Recording packet: type={packet_record.get('type')}, "
|
|
f"transmitted={packet_record.get('transmitted')}"
|
|
)
|
|
|
|
# HOT PATH: Store to local databases only (fast, non-blocking)
|
|
self.sqlite_handler.store_packet(packet_record)
|
|
cumulative_counts = self.sqlite_handler.get_cumulative_counts()
|
|
self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts)
|
|
|
|
# DEFERRED: Publish to network sinks and WebSocket in background tasks
|
|
# This prevents network latency from blocking packet processing
|
|
self._schedule_background(
|
|
self._deferred_publish,
|
|
packet_record,
|
|
skip_mqtt_if_invalid,
|
|
sync_fallback=self._publish_packet_sync,
|
|
)
|
|
|
|
async def _deferred_publish(self, packet_record: dict, skip_mqtt: bool):
|
|
"""Deferred background task for all network publishing operations."""
|
|
try:
|
|
self._publish_packet_sync(packet_record, skip_mqtt)
|
|
except Exception as e:
|
|
logger.error(f"Deferred publish failed: {e}", exc_info=True)
|
|
|
|
def _publish_packet_sync(self, packet_record: dict, skip_mqtt: bool):
|
|
"""Publish packet updates synchronously (used when no asyncio loop is active)."""
|
|
self._publish_to_glass(packet_record, "packet")
|
|
|
|
if self.websocket_available:
|
|
try:
|
|
self.websocket_broadcast_packet(packet_record)
|
|
if self.websocket_has_connected_clients():
|
|
now_mono = time.monotonic()
|
|
if (
|
|
now_mono - self._last_ws_stats_broadcast
|
|
>= self._ws_stats_broadcast_interval_sec
|
|
):
|
|
self._last_ws_stats_broadcast = now_mono
|
|
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
|
|
uptime_seconds = (
|
|
time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
|
|
)
|
|
self.websocket_broadcast_stats(
|
|
{
|
|
"packet_stats": packet_stats_24h,
|
|
"system_stats": {"uptime_seconds": uptime_seconds},
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.debug(f"WebSocket broadcast failed: {e}")
|
|
|
|
|
|
self._publish_packet_to_mqtt(packet_record)
|
|
|
|
def _publish_packet_to_mqtt(self, packet_record: dict):
|
|
"""Publish packet to mqtt broker if enabled and allowed.
|
|
|
|
The ``duration`` field in the published JSON is sourced from
|
|
``packet_record['airtime_ms']``, populated upstream by
|
|
RepeaterHandler._build_packet_record using the Semtech reference
|
|
time-on-air formula. No recomputation is needed here.
|
|
"""
|
|
if not self.mqtt_handler:
|
|
return
|
|
|
|
try:
|
|
packet_type = packet_record.get("type")
|
|
if packet_type is None:
|
|
logger.error("Cannot publish to mqtt: packet_record missing 'type' field")
|
|
return
|
|
|
|
node_name = self.config.get("repeater", {}).get("node_name", "Unknown")
|
|
packet = PacketRecord.from_packet_record(
|
|
packet_record, origin=node_name, origin_id=self.mqtt_handler.public_key
|
|
)
|
|
|
|
if packet:
|
|
self.mqtt_handler.publish_packet(packet.to_dict())
|
|
logger.debug(f"Published packet type 0x{packet_type:02X} to mqtt")
|
|
else:
|
|
logger.debug("Skipped mqtt publish: packet missing raw_packet data")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to publish packet to mqtt: {e}", exc_info=True)
|
|
|
|
def record_advert(self, advert_record: dict):
|
|
"""Record advert to storage and defer network publishing to background tasks."""
|
|
self.sqlite_handler.store_advert(advert_record)
|
|
self._schedule_background(
|
|
self._deferred_publish_advert,
|
|
advert_record,
|
|
sync_fallback=self._publish_advert_sync,
|
|
)
|
|
|
|
async def _deferred_publish_advert(self, advert_record: dict):
|
|
"""Deferred background task for advert publishing."""
|
|
try:
|
|
self._publish_advert_sync(advert_record)
|
|
except Exception as e:
|
|
logger.error(f"Deferred advert publish failed: {e}", exc_info=True)
|
|
|
|
def _publish_advert_sync(self, advert_record: dict):
|
|
if self.mqtt_handler:
|
|
self.mqtt_handler.publish_mqtt(advert_record, "advert")
|
|
self._publish_to_glass(advert_record, "advert")
|
|
|
|
def record_noise_floor(self, noise_floor_dbm: float):
|
|
"""Record noise floor to storage and defer network publishing to background tasks."""
|
|
noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm}
|
|
self.sqlite_handler.store_noise_floor(noise_record)
|
|
self._schedule_background(
|
|
self._deferred_publish_noise_floor,
|
|
noise_record,
|
|
sync_fallback=self._publish_noise_floor_sync,
|
|
)
|
|
|
|
async def _deferred_publish_noise_floor(self, noise_record: dict):
|
|
"""Deferred background task for noise floor publishing."""
|
|
try:
|
|
self._publish_noise_floor_sync(noise_record)
|
|
except Exception as e:
|
|
logger.error(f"Deferred noise floor publish failed: {e}", exc_info=True)
|
|
|
|
def _publish_noise_floor_sync(self, noise_record: dict):
|
|
if self.mqtt_handler:
|
|
self.mqtt_handler.publish_mqtt(noise_record, "noise_floor")
|
|
self._publish_to_glass(noise_record, "noise_floor")
|
|
|
|
def record_crc_errors(self, count: int):
|
|
"""Record a batch of CRC errors detected since last poll and defer publishing."""
|
|
crc_record = {"timestamp": time.time(), "count": count}
|
|
self.sqlite_handler.store_crc_errors(crc_record)
|
|
self._schedule_background(
|
|
self._deferred_publish_crc_errors,
|
|
crc_record,
|
|
sync_fallback=self._publish_crc_errors_sync,
|
|
)
|
|
|
|
async def _deferred_publish_crc_errors(self, crc_record: dict):
|
|
"""Deferred background task for CRC error publishing."""
|
|
try:
|
|
self._publish_crc_errors_sync(crc_record)
|
|
except Exception as e:
|
|
logger.error(f"Deferred CRC errors publish failed: {e}", exc_info=True)
|
|
|
|
def _publish_crc_errors_sync(self, crc_record: dict):
|
|
if self.mqtt_handler:
|
|
self.mqtt_handler.publish_mqtt(crc_record, "crc_errors")
|
|
self._publish_to_glass(crc_record, "crc_errors")
|
|
|
|
def get_crc_error_count(self, hours: int = 24) -> int:
|
|
return self.sqlite_handler.get_crc_error_count(hours)
|
|
|
|
def get_crc_error_history(self, hours: int = 24, limit: int = None) -> list:
|
|
return self.sqlite_handler.get_crc_error_history(hours, limit)
|
|
|
|
def get_packet_stats(self, hours: int = 24) -> dict:
|
|
return self.sqlite_handler.get_packet_stats(hours)
|
|
|
|
def get_recent_packets(self, limit: int = 100) -> list:
|
|
return self.sqlite_handler.get_recent_packets(limit)
|
|
|
|
def get_filtered_packets(
|
|
self,
|
|
packet_type: Optional[int] = None,
|
|
route: Optional[int] = None,
|
|
start_timestamp: Optional[float] = None,
|
|
end_timestamp: Optional[float] = None,
|
|
limit: int = 1000,
|
|
offset: int = 0,
|
|
) -> list:
|
|
return self.sqlite_handler.get_filtered_packets(
|
|
packet_type, route, start_timestamp, end_timestamp, limit, offset
|
|
)
|
|
|
|
def get_airtime_data(
|
|
self,
|
|
start_timestamp: Optional[float] = None,
|
|
end_timestamp: Optional[float] = None,
|
|
limit: int = 50000,
|
|
) -> list:
|
|
return self.sqlite_handler.get_airtime_data(start_timestamp, end_timestamp, limit)
|
|
|
|
def get_airtime_buckets(
|
|
self,
|
|
start_timestamp: float,
|
|
end_timestamp: float,
|
|
bucket_seconds: int = 60,
|
|
sf: int = 9,
|
|
bw_hz: int = 62500,
|
|
cr: int = 5,
|
|
preamble: int = 17,
|
|
) -> dict:
|
|
return self.sqlite_handler.get_airtime_buckets(
|
|
start_timestamp, end_timestamp, bucket_seconds, sf, bw_hz, cr, preamble
|
|
)
|
|
|
|
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
|
|
return self.sqlite_handler.get_packet_by_hash(packet_hash)
|
|
|
|
def get_rrd_data(
|
|
self,
|
|
start_time: Optional[int] = None,
|
|
end_time: Optional[int] = None,
|
|
resolution: str = "average",
|
|
) -> Optional[dict]:
|
|
return self.rrd_handler.get_data(start_time, end_time, resolution)
|
|
|
|
def get_packet_type_stats(self, hours: int = 24) -> dict:
|
|
rrd_stats = self.rrd_handler.get_packet_type_stats(hours)
|
|
if rrd_stats:
|
|
return rrd_stats
|
|
|
|
logger.warning("Falling back to SQLite for packet type stats")
|
|
return self.sqlite_handler.get_packet_type_stats(hours)
|
|
|
|
def get_route_stats(self, hours: int = 24) -> dict:
|
|
return self.sqlite_handler.get_route_stats(hours)
|
|
|
|
def get_neighbors(self) -> dict:
|
|
return self.sqlite_handler.get_neighbors()
|
|
|
|
def get_node_name_by_pubkey(self, pubkey: str) -> Optional[str]:
|
|
"""
|
|
Lookup node name from adverts table by public key.
|
|
|
|
Args:
|
|
pubkey: Public key in hex string format
|
|
|
|
Returns:
|
|
Node name if found, None otherwise
|
|
"""
|
|
try:
|
|
import sqlite3
|
|
|
|
with sqlite3.connect(self.sqlite_handler.sqlite_path) as conn:
|
|
result = conn.execute(
|
|
"SELECT node_name FROM adverts WHERE pubkey = ? AND node_name IS NOT NULL ORDER BY last_seen DESC LIMIT 1",
|
|
(pubkey,),
|
|
).fetchone()
|
|
return result[0] if result else None
|
|
except Exception as e:
|
|
logger.debug(f"Could not lookup node name for {pubkey[:8] if pubkey else 'None'}: {e}")
|
|
return None
|
|
|
|
def cleanup_old_data(self, days: int = 7):
|
|
self.sqlite_handler.cleanup_old_data(days)
|
|
|
|
def get_noise_floor_history(self, hours: int = 24, limit: int = None) -> list:
|
|
return self.sqlite_handler.get_noise_floor_history(hours, limit)
|
|
|
|
def get_noise_floor_stats(self, hours: int = 24) -> dict:
|
|
return self.sqlite_handler.get_noise_floor_stats(hours)
|
|
|
|
def close(self):
|
|
# Cancel all pending background tasks
|
|
for task in self._pending_tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
if self.mqtt_handler:
|
|
try:
|
|
self.mqtt_handler.disconnect()
|
|
logger.info("MQTT handler disconnected")
|
|
except Exception as e:
|
|
logger.error(f"Error disconnecting MQTT handler: {e}")
|
|
|
|
def set_glass_publisher(self, publish_callback):
|
|
self.glass_publish_callback = publish_callback
|
|
|
|
def _publish_to_glass(self, record: dict, record_type: str):
|
|
if not self.glass_publish_callback:
|
|
return
|
|
try:
|
|
self.glass_publish_callback(record_type, record)
|
|
except Exception as e:
|
|
logger.debug(f"Failed to publish telemetry to Glass MQTT: {e}")
|
|
|
|
def create_transport_key(
|
|
self,
|
|
name: str,
|
|
flood_policy: str,
|
|
transport_key: Optional[str] = None,
|
|
parent_id: Optional[int] = None,
|
|
last_used: Optional[float] = None,
|
|
) -> Optional[int]:
|
|
return self.sqlite_handler.create_transport_key(
|
|
name, flood_policy, transport_key, parent_id, last_used
|
|
)
|
|
|
|
def get_transport_keys(self) -> list:
|
|
return self.sqlite_handler.get_transport_keys()
|
|
|
|
def get_transport_key_by_id(self, key_id: int) -> Optional[dict]:
|
|
return self.sqlite_handler.get_transport_key_by_id(key_id)
|
|
|
|
def update_transport_key(
|
|
self,
|
|
key_id: int,
|
|
name: Optional[str] = None,
|
|
flood_policy: Optional[str] = None,
|
|
transport_key: Optional[str] = None,
|
|
parent_id: Optional[int] = None,
|
|
last_used: Optional[float] = None,
|
|
) -> bool:
|
|
return self.sqlite_handler.update_transport_key(
|
|
key_id, name, flood_policy, transport_key, parent_id, last_used
|
|
)
|
|
|
|
def delete_transport_key(self, key_id: int) -> bool:
|
|
return self.sqlite_handler.delete_transport_key(key_id)
|
|
|
|
def delete_advert(self, advert_id: int) -> bool:
|
|
return self.sqlite_handler.delete_advert(advert_id)
|
|
|
|
def get_hardware_stats(self) -> Optional[dict]:
|
|
"""Get current hardware statistics"""
|
|
try:
|
|
return self.hardware_stats.get_stats()
|
|
except Exception as e:
|
|
logger.error(f"Error getting hardware stats: {e}")
|
|
return None
|
|
|
|
def get_hardware_processes(self) -> Optional[list]:
|
|
"""Get current process summary"""
|
|
try:
|
|
return self.hardware_stats.get_processes_summary()
|
|
except Exception as e:
|
|
logger.error(f"Error getting hardware processes: {e}")
|
|
return None
|