Files
pyMC_Repeater/repeater/data_acquisition/storage_collector.py
T

348 lines
14 KiB
Python

import json
import logging
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any
from .sqlite_handler import SQLiteHandler
from .rrdtool_handler import RRDToolHandler
from .mqtt_handler import MQTTHandler
from .letsmesh_handler import MeshCoreToMqttJwtPusher
from .storage_utils import PacketRecord
logger = logging.getLogger("StorageCollector")
class StorageCollector:
def __init__(self, config: dict, local_identity=None, repeater_handler=None):
self.config = config
self.repeater_handler = repeater_handler
storage_dir_cfg = (
config.get("storage", {}).get("storage_dir")
or config.get("storage_dir")
or "/var/lib/pymc_repeater"
)
self.storage_dir = Path(storage_dir_cfg)
self.storage_dir.mkdir(parents=True, exist_ok=True)
node_name = config.get("repeater", {}).get("node_name", "unknown")
node_id = local_identity.get_public_key().hex() if local_identity else "unknown"
self.sqlite_handler = SQLiteHandler(self.storage_dir)
self.rrd_handler = RRDToolHandler(self.storage_dir)
self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name, node_id)
# Initialize LetsMesh handler if configured
self.letsmesh_handler = None
if config.get("letsmesh", {}).get("enabled", False) and local_identity:
try:
# Pass local_identity directly (supports both standard and firmware keys)
self.letsmesh_handler = MeshCoreToMqttJwtPusher(
local_identity=local_identity,
config=config,
stats_provider=self._get_live_stats,
)
self.letsmesh_handler.connect()
# Get disallowed packet types from config
from ..config import get_node_info
node_info = get_node_info(config)
self.disallowed_packet_types = set(node_info["disallowed_packet_types"])
public_key_hex = local_identity.get_public_key().hex()
logger.info(
f"LetsMesh handler initialized with public key: {public_key_hex[:16]}..."
)
if self.disallowed_packet_types:
logger.info(f"Disallowed packet types: {sorted(self.disallowed_packet_types)}")
else:
logger.info("All packet types allowed")
except Exception as e:
logger.error(f"Failed to initialize LetsMesh handler: {e}")
self.letsmesh_handler = None
self.disallowed_packet_types = set()
else:
self.disallowed_packet_types = set()
# Initialize hardware stats collector
from .hardware_stats import HardwareStatsCollector
self.hardware_stats = HardwareStatsCollector()
logger.info("Hardware stats collector initialized")
# Initialize WebSocket handler for real-time updates
self.websocket_available = False
try:
from .websocket_handler import broadcast_packet, broadcast_stats
self.websocket_broadcast_packet = broadcast_packet
self.websocket_broadcast_stats = broadcast_stats
self.websocket_available = True
logger.info("WebSocket handler initialized for real-time updates")
except ImportError:
logger.debug("WebSocket handler not available")
def _get_live_stats(self) -> dict:
"""Get live stats from RepeaterHandler"""
if not self.repeater_handler:
return {
"uptime_secs": 0,
"packets_sent": 0,
"packets_received": 0,
"errors": 0,
"queue_len": 0
}
uptime_secs = int(time.time() - self.repeater_handler.start_time)
# Get airtime stats
airtime_stats = self.repeater_handler.airtime_mgr.get_stats()
# Get latest noise floor from database
noise_floor = None
try:
recent_noise = self.sqlite_handler.get_noise_floor_history(hours=0.5, limit=1)
if recent_noise and len(recent_noise) > 0:
noise_floor = recent_noise[-1].get('noise_floor_dbm')
except Exception as e:
logger.debug(f"Could not fetch noise floor: {e}")
stats = {
"uptime_secs": uptime_secs,
"packets_sent": self.repeater_handler.forwarded_count,
"packets_received": self.repeater_handler.rx_count,
"errors": 0,
"queue_len": 0, # N/A for Python repeater
}
# Add airtime stats
if airtime_stats:
stats["tx_air_secs"] = airtime_stats["total_airtime_ms"] / 1000
stats["current_airtime_ms"] = airtime_stats["current_airtime_ms"]
stats["utilization_percent"] = airtime_stats["utilization_percent"]
# Add noise floor if available
if noise_floor is not None:
stats["noise_floor"] = noise_floor
return stats
def record_packet(self, packet_record: dict, skip_letsmesh_if_invalid: bool = True):
"""Record packet to storage and publish to MQTT/LetsMesh
Args:
packet_record: Dictionary containing packet information
skip_letsmesh_if_invalid: If True, don't publish packets with drop_reason to LetsMesh
"""
logger.debug(
f"Recording packet: type={packet_record.get('type')}, "
f"transmitted={packet_record.get('transmitted')}"
)
# Store to local databases and publish to local MQTT
self.sqlite_handler.store_packet(packet_record)
cumulative_counts = self.sqlite_handler.get_cumulative_counts()
self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts)
self.mqtt_handler.publish(packet_record, "packet")
# Broadcast to WebSocket clients for real-time updates
if self.websocket_available:
try:
self.websocket_broadcast_packet(packet_record)
# Broadcast 24-hour packet stats (same as /api/packet_stats?hours=24)
packet_stats_24h = self.sqlite_handler.get_packet_stats(hours=24)
uptime_seconds = time.time() - self.repeater_handler.start_time if self.repeater_handler else 0
self.websocket_broadcast_stats({
"packet_stats": packet_stats_24h,
"system_stats": {
"uptime_seconds": uptime_seconds,
}
})
except Exception as e:
logger.debug(f"WebSocket broadcast failed: {e}")
# Publish to LetsMesh if enabled (skip invalid packets if requested)
if skip_letsmesh_if_invalid and packet_record.get('drop_reason'):
logger.debug(f"Skipping LetsMesh publish for packet with drop_reason: {packet_record.get('drop_reason')}")
else:
self._publish_to_letsmesh(packet_record)
def _publish_to_letsmesh(self, packet_record: dict):
"""Publish packet to LetsMesh broker if enabled and allowed"""
if not self.letsmesh_handler:
return
try:
packet_type = packet_record.get("type")
if packet_type is None:
logger.error("Cannot publish to LetsMesh: packet_record missing 'type' field")
return
if packet_type in self.disallowed_packet_types:
logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)")
return
node_name = self.config.get("repeater", {}).get("node_name", "Unknown")
packet = PacketRecord.from_packet_record(
packet_record, origin=node_name, origin_id=self.letsmesh_handler.public_key
)
if packet:
self.letsmesh_handler.publish_packet(packet.to_dict())
logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh")
else:
logger.debug("Skipped LetsMesh publish: packet missing raw_packet data")
except Exception as e:
logger.error(f"Failed to publish packet to LetsMesh: {e}", exc_info=True)
def record_advert(self, advert_record: dict):
self.sqlite_handler.store_advert(advert_record)
self.mqtt_handler.publish(advert_record, "advert")
def record_noise_floor(self, noise_floor_dbm: float):
noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm}
self.sqlite_handler.store_noise_floor(noise_record)
self.mqtt_handler.publish(noise_record, "noise_floor")
def get_packet_stats(self, hours: int = 24) -> dict:
return self.sqlite_handler.get_packet_stats(hours)
def get_recent_packets(self, limit: int = 100) -> list:
return self.sqlite_handler.get_recent_packets(limit)
def get_filtered_packets(
self,
packet_type: Optional[int] = None,
route: Optional[int] = None,
start_timestamp: Optional[float] = None,
end_timestamp: Optional[float] = None,
limit: int = 1000,
offset: int = 0,
) -> list:
return self.sqlite_handler.get_filtered_packets(
packet_type, route, start_timestamp, end_timestamp, limit, offset
)
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
return self.sqlite_handler.get_packet_by_hash(packet_hash)
def get_rrd_data(
self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
resolution: str = "average",
) -> Optional[dict]:
return self.rrd_handler.get_data(start_time, end_time, resolution)
def get_packet_type_stats(self, hours: int = 24) -> dict:
rrd_stats = self.rrd_handler.get_packet_type_stats(hours)
if rrd_stats:
return rrd_stats
logger.warning("Falling back to SQLite for packet type stats")
return self.sqlite_handler.get_packet_type_stats(hours)
def get_route_stats(self, hours: int = 24) -> dict:
return self.sqlite_handler.get_route_stats(hours)
def get_neighbors(self) -> dict:
return self.sqlite_handler.get_neighbors()
def get_node_name_by_pubkey(self, pubkey: str) -> Optional[str]:
"""
Lookup node name from adverts table by public key.
Args:
pubkey: Public key in hex string format
Returns:
Node name if found, None otherwise
"""
try:
import sqlite3
with sqlite3.connect(self.sqlite_handler.sqlite_path) as conn:
result = conn.execute(
"SELECT node_name FROM adverts WHERE pubkey = ? AND node_name IS NOT NULL ORDER BY last_seen DESC LIMIT 1",
(pubkey,)
).fetchone()
return result[0] if result else None
except Exception as e:
logger.debug(f"Could not lookup node name for {pubkey[:8] if pubkey else 'None'}: {e}")
return None
def cleanup_old_data(self, days: int = 7):
self.sqlite_handler.cleanup_old_data(days)
def get_noise_floor_history(self, hours: int = 24, limit: int = None) -> list:
return self.sqlite_handler.get_noise_floor_history(hours, limit)
def get_noise_floor_stats(self, hours: int = 24) -> dict:
return self.sqlite_handler.get_noise_floor_stats(hours)
def close(self):
self.mqtt_handler.close()
if self.letsmesh_handler:
try:
self.letsmesh_handler.disconnect()
logger.info("LetsMesh handler disconnected")
except Exception as e:
logger.error(f"Error disconnecting LetsMesh handler: {e}")
def create_transport_key(
self,
name: str,
flood_policy: str,
transport_key: Optional[str] = None,
parent_id: Optional[int] = None,
last_used: Optional[float] = None,
) -> Optional[int]:
return self.sqlite_handler.create_transport_key(
name, flood_policy, transport_key, parent_id, last_used
)
def get_transport_keys(self) -> list:
return self.sqlite_handler.get_transport_keys()
def get_transport_key_by_id(self, key_id: int) -> Optional[dict]:
return self.sqlite_handler.get_transport_key_by_id(key_id)
def update_transport_key(
self,
key_id: int,
name: Optional[str] = None,
flood_policy: Optional[str] = None,
transport_key: Optional[str] = None,
parent_id: Optional[int] = None,
last_used: Optional[float] = None,
) -> bool:
return self.sqlite_handler.update_transport_key(
key_id, name, flood_policy, transport_key, parent_id, last_used
)
def delete_transport_key(self, key_id: int) -> bool:
return self.sqlite_handler.delete_transport_key(key_id)
def delete_advert(self, advert_id: int) -> bool:
return self.sqlite_handler.delete_advert(advert_id)
def get_hardware_stats(self) -> Optional[dict]:
"""Get current hardware statistics"""
try:
return self.hardware_stats.get_stats()
except Exception as e:
logger.error(f"Error getting hardware stats: {e}")
return None
def get_hardware_processes(self) -> Optional[list]:
"""Get current process summary"""
try:
return self.hardware_stats.get_processes_summary()
except Exception as e:
logger.error(f"Error getting hardware processes: {e}")
return None