feat: implement PacketRecord class for standardized packet handling and update MQTTHandler and StorageCollector to utilize it

This commit is contained in:
Lloyd
2025-11-21 15:35:54 +00:00
parent 08d4e054b0
commit 3f3771d600
4 changed files with 119 additions and 89 deletions
@@ -5,7 +5,6 @@ import base64
import paho.mqtt.client as mqtt
from datetime import datetime, timedelta, UTC
from dataclasses import dataclass, asdict
from nacl.signing import SigningKey
from typing import Callable, Optional
from .. import __version__
@@ -275,83 +274,3 @@ class MeshCoreToMqttJwtPusher:
logging.debug(f"Published to {topic}: {message}")
return result
# ====================================================================
# LetsMesh Packet Data Class
# ====================================================================
@dataclass
class LetsMeshPacket:
"""
Data class for LetsMesh packet format.
Converts internal packet_record format to LetsMesh publish format.
"""
origin: str
origin_id: str
timestamp: str
type: str
direction: str
time: str
date: str
len: str
packet_type: str
route: str
payload_len: str
raw: str
SNR: str
RSSI: str
score: str
duration: str
hash: str
@classmethod
def from_packet_record(
cls, packet_record: dict, origin: str, origin_id: str
) -> Optional["LetsMeshPacket"]:
"""
Create LetsMeshPacket from internal packet_record format.
Args:
packet_record: Internal packet record dictionary
origin: Node name
origin_id: Public key of the node
Returns:
LetsMeshPacket instance or None if raw_packet is missing
"""
if "raw_packet" not in packet_record or not packet_record["raw_packet"]:
return None
# Extract timestamp and format date/time
timestamp = packet_record.get("timestamp", 0)
dt = datetime.fromtimestamp(timestamp)
# Format route type (1=Flood->F, 2=Direct->D, etc)
route_map = {1: "F", 2: "D"}
route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0)))
return cls(
origin=origin,
origin_id=origin_id,
timestamp=dt.isoformat(),
type="PACKET",
direction="rx",
time=dt.strftime("%H:%M:%S"),
date=dt.strftime("%-d/%-m/%Y"),
len=str(len(packet_record["raw_packet"]) // 2),
packet_type=str(packet_record.get("type", 0)),
route=route,
payload_len=str(packet_record.get("payload_length", 0)),
raw=packet_record["raw_packet"],
SNR=str(packet_record.get("snr", 0)),
RSSI=str(packet_record.get("rssi", 0)),
score=str(int(packet_record.get("score", 0) * 1000)),
duration="0",
hash=packet_record.get("packet_hash", ""),
)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)
+30 -3
View File
@@ -1,6 +1,6 @@
import json
import logging
from typing import Dict, Any
from typing import Dict, Any, Optional
try:
import paho.mqtt.client as mqtt
@@ -8,13 +8,16 @@ try:
except ImportError:
MQTT_AVAILABLE = False
from .storage_utils import PacketRecord
logger = logging.getLogger("MQTTHandler")
class MQTTHandler:
def __init__(self, mqtt_config: dict, node_name: str = "unknown"):
def __init__(self, mqtt_config: dict, node_name: str = "unknown", node_id: str = "unknown"):
self.mqtt_config = mqtt_config
self.node_name = node_name
self.node_id = node_id
self.client = None
self.available = MQTT_AVAILABLE
self._init_client()
@@ -45,15 +48,39 @@ class MQTTHandler:
self.client = None
def publish(self, record: dict, record_type: str):
"""
Publish record to MQTT.
Packets MUST use PacketRecord format. Non-packet records use original format.
Args:
record: The record dictionary to publish
record_type: Type of record (packet, advert, noise_floor, etc.)
"""
if not self.client:
return
try:
base_topic = self.mqtt_config.get("base_topic", "meshcore/repeater")
topic = f"{base_topic}/{self.node_name}/{record_type}"
payload = {k: v for k, v in record.items() if v is not None}
if record_type == "packet":
packet_record = PacketRecord.from_packet_record(
record,
origin=self.node_name,
origin_id=self.node_id
)
if not packet_record:
logger.debug("Skipping MQTT publish: packet missing required data for PacketRecord")
return
payload = packet_record.to_dict()
logger.debug("Publishing packet using PacketRecord format")
else:
payload = {k: v for k, v in record.items() if v is not None}
message = json.dumps(payload, default=str)
self.client.publish(topic, message, qos=0, retain=False)
logger.debug(f"Published to {topic}")
except Exception as e:
logger.error(f"Failed to publish to MQTT: {e}")
@@ -8,7 +8,8 @@ from typing import Optional, Dict, Any
from .sqlite_handler import SQLiteHandler
from .rrdtool_handler import RRDToolHandler
from .mqtt_handler import MQTTHandler
from .letsmesh_handler import MeshCoreToMqttJwtPusher, LetsMeshPacket
from .letsmesh_handler import MeshCoreToMqttJwtPusher
from .storage_utils import PacketRecord
logger = logging.getLogger("StorageCollector")
@@ -22,10 +23,11 @@ class StorageCollector:
self.storage_dir.mkdir(parents=True, exist_ok=True)
node_name = config.get("repeater", {}).get("node_name", "unknown")
node_id = local_identity.get_public_key().hex() if local_identity else "unknown"
self.sqlite_handler = SQLiteHandler(self.storage_dir)
self.rrd_handler = RRDToolHandler(self.storage_dir)
self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name)
self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name, node_id)
# Initialize LetsMesh handler if configured
self.letsmesh_handler = None
@@ -107,12 +109,12 @@ class StorageCollector:
return
node_name = self.config.get("repeater", {}).get("node_name", "Unknown")
letsmesh_packet = LetsMeshPacket.from_packet_record(
packet = PacketRecord.from_packet_record(
packet_record, origin=node_name, origin_id=self.letsmesh_handler.public_key
)
if letsmesh_packet:
self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict())
if packet:
self.letsmesh_handler.publish_packet(packet.to_dict())
logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh")
else:
logger.debug("Skipped LetsMesh publish: packet missing raw_packet data")
@@ -0,0 +1,82 @@
"""Storage utility classes and functions for data acquisition."""
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Optional
@dataclass
class PacketRecord:
"""
Data class for packet record format.
Converts internal packet_record format to standardized publish format.
Reusable across MQTT, LetsMesh, and other handlers.
"""
origin: str
origin_id: str
timestamp: str
type: str
direction: str
time: str
date: str
len: str
packet_type: str
route: str
payload_len: str
raw: str
SNR: str
RSSI: str
score: str
duration: str
hash: str
@classmethod
def from_packet_record(
cls, packet_record: dict, origin: str, origin_id: str
) -> Optional["PacketRecord"]:
"""
Create PacketRecord from internal packet_record format.
Args:
packet_record: Internal packet record dictionary
origin: Node name
origin_id: Public key of the node
Returns:
PacketRecord instance or None if raw_packet is missing
"""
if "raw_packet" not in packet_record or not packet_record["raw_packet"]:
return None
# Extract timestamp and format date/time
timestamp = packet_record.get("timestamp", 0)
dt = datetime.fromtimestamp(timestamp)
# Format route type (1=Flood->F, 2=Direct->D, etc)
route_map = {1: "F", 2: "D"}
route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0)))
return cls(
origin=origin,
origin_id=origin_id,
timestamp=dt.isoformat(),
type="PACKET",
direction="rx",
time=dt.strftime("%H:%M:%S"),
date=dt.strftime("%-d/%-m/%Y"),
len=str(len(packet_record["raw_packet"]) // 2),
packet_type=str(packet_record.get("type", 0)),
route=route,
payload_len=str(packet_record.get("payload_length", 0)),
raw=packet_record["raw_packet"],
SNR=str(packet_record.get("snr", 0)),
RSSI=str(packet_record.get("rssi", 0)),
score=str(int(packet_record.get("score", 0) * 1000)),
duration="0",
hash=packet_record.get("packet_hash", ""),
)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)