diff --git a/repeater/data_acquisition/letsmesh_handler.py b/repeater/data_acquisition/letsmesh_handler.py index 95b3fd2..da67b9c 100644 --- a/repeater/data_acquisition/letsmesh_handler.py +++ b/repeater/data_acquisition/letsmesh_handler.py @@ -5,7 +5,6 @@ import base64 import paho.mqtt.client as mqtt from datetime import datetime, timedelta, UTC -from dataclasses import dataclass, asdict from nacl.signing import SigningKey from typing import Callable, Optional from .. import __version__ @@ -275,83 +274,3 @@ class MeshCoreToMqttJwtPusher: logging.debug(f"Published to {topic}: {message}") return result - -# ==================================================================== -# LetsMesh Packet Data Class -# ==================================================================== - - -@dataclass -class LetsMeshPacket: - """ - Data class for LetsMesh packet format. - Converts internal packet_record format to LetsMesh publish format. - """ - - origin: str - origin_id: str - timestamp: str - type: str - direction: str - time: str - date: str - len: str - packet_type: str - route: str - payload_len: str - raw: str - SNR: str - RSSI: str - score: str - duration: str - hash: str - - @classmethod - def from_packet_record( - cls, packet_record: dict, origin: str, origin_id: str - ) -> Optional["LetsMeshPacket"]: - """ - Create LetsMeshPacket from internal packet_record format. - - Args: - packet_record: Internal packet record dictionary - origin: Node name - origin_id: Public key of the node - - Returns: - LetsMeshPacket instance or None if raw_packet is missing - """ - if "raw_packet" not in packet_record or not packet_record["raw_packet"]: - return None - - # Extract timestamp and format date/time - timestamp = packet_record.get("timestamp", 0) - dt = datetime.fromtimestamp(timestamp) - - # Format route type (1=Flood->F, 2=Direct->D, etc) - route_map = {1: "F", 2: "D"} - route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0))) - - return cls( - origin=origin, - origin_id=origin_id, - timestamp=dt.isoformat(), - type="PACKET", - direction="rx", - time=dt.strftime("%H:%M:%S"), - date=dt.strftime("%-d/%-m/%Y"), - len=str(len(packet_record["raw_packet"]) // 2), - packet_type=str(packet_record.get("type", 0)), - route=route, - payload_len=str(packet_record.get("payload_length", 0)), - raw=packet_record["raw_packet"], - SNR=str(packet_record.get("snr", 0)), - RSSI=str(packet_record.get("rssi", 0)), - score=str(int(packet_record.get("score", 0) * 1000)), - duration="0", - hash=packet_record.get("packet_hash", ""), - ) - - def to_dict(self) -> dict: - """Convert to dictionary for JSON serialization""" - return asdict(self) diff --git a/repeater/data_acquisition/mqtt_handler.py b/repeater/data_acquisition/mqtt_handler.py index 4a69a89..27fc5f3 100644 --- a/repeater/data_acquisition/mqtt_handler.py +++ b/repeater/data_acquisition/mqtt_handler.py @@ -1,6 +1,6 @@ import json import logging -from typing import Dict, Any +from typing import Dict, Any, Optional try: import paho.mqtt.client as mqtt @@ -8,13 +8,16 @@ try: except ImportError: MQTT_AVAILABLE = False +from .storage_utils import PacketRecord + logger = logging.getLogger("MQTTHandler") class MQTTHandler: - def __init__(self, mqtt_config: dict, node_name: str = "unknown"): + def __init__(self, mqtt_config: dict, node_name: str = "unknown", node_id: str = "unknown"): self.mqtt_config = mqtt_config self.node_name = node_name + self.node_id = node_id self.client = None self.available = MQTT_AVAILABLE self._init_client() @@ -45,15 +48,39 @@ class MQTTHandler: self.client = None def publish(self, record: dict, record_type: str): + """ + Publish record to MQTT. + Packets MUST use PacketRecord format. Non-packet records use original format. + + Args: + record: The record dictionary to publish + record_type: Type of record (packet, advert, noise_floor, etc.) + """ if not self.client: return try: base_topic = self.mqtt_config.get("base_topic", "meshcore/repeater") topic = f"{base_topic}/{self.node_name}/{record_type}" - payload = {k: v for k, v in record.items() if v is not None} + + if record_type == "packet": + packet_record = PacketRecord.from_packet_record( + record, + origin=self.node_name, + origin_id=self.node_id + ) + if not packet_record: + logger.debug("Skipping MQTT publish: packet missing required data for PacketRecord") + return + + payload = packet_record.to_dict() + logger.debug("Publishing packet using PacketRecord format") + else: + payload = {k: v for k, v in record.items() if v is not None} + message = json.dumps(payload, default=str) self.client.publish(topic, message, qos=0, retain=False) + logger.debug(f"Published to {topic}") except Exception as e: logger.error(f"Failed to publish to MQTT: {e}") diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index d333c6f..529865f 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -8,7 +8,8 @@ from typing import Optional, Dict, Any from .sqlite_handler import SQLiteHandler from .rrdtool_handler import RRDToolHandler from .mqtt_handler import MQTTHandler -from .letsmesh_handler import MeshCoreToMqttJwtPusher, LetsMeshPacket +from .letsmesh_handler import MeshCoreToMqttJwtPusher +from .storage_utils import PacketRecord logger = logging.getLogger("StorageCollector") @@ -22,10 +23,11 @@ class StorageCollector: self.storage_dir.mkdir(parents=True, exist_ok=True) node_name = config.get("repeater", {}).get("node_name", "unknown") + node_id = local_identity.get_public_key().hex() if local_identity else "unknown" self.sqlite_handler = SQLiteHandler(self.storage_dir) self.rrd_handler = RRDToolHandler(self.storage_dir) - self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name) + self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name, node_id) # Initialize LetsMesh handler if configured self.letsmesh_handler = None @@ -107,12 +109,12 @@ class StorageCollector: return node_name = self.config.get("repeater", {}).get("node_name", "Unknown") - letsmesh_packet = LetsMeshPacket.from_packet_record( + packet = PacketRecord.from_packet_record( packet_record, origin=node_name, origin_id=self.letsmesh_handler.public_key ) - if letsmesh_packet: - self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict()) + if packet: + self.letsmesh_handler.publish_packet(packet.to_dict()) logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh") else: logger.debug("Skipped LetsMesh publish: packet missing raw_packet data") diff --git a/repeater/data_acquisition/storage_utils.py b/repeater/data_acquisition/storage_utils.py new file mode 100644 index 0000000..bd930a5 --- /dev/null +++ b/repeater/data_acquisition/storage_utils.py @@ -0,0 +1,82 @@ +"""Storage utility classes and functions for data acquisition.""" + +from dataclasses import dataclass, asdict +from datetime import datetime +from typing import Optional + + +@dataclass +class PacketRecord: + """ + Data class for packet record format. + Converts internal packet_record format to standardized publish format. + Reusable across MQTT, LetsMesh, and other handlers. + """ + + origin: str + origin_id: str + timestamp: str + type: str + direction: str + time: str + date: str + len: str + packet_type: str + route: str + payload_len: str + raw: str + SNR: str + RSSI: str + score: str + duration: str + hash: str + + @classmethod + def from_packet_record( + cls, packet_record: dict, origin: str, origin_id: str + ) -> Optional["PacketRecord"]: + """ + Create PacketRecord from internal packet_record format. + + Args: + packet_record: Internal packet record dictionary + origin: Node name + origin_id: Public key of the node + + Returns: + PacketRecord instance or None if raw_packet is missing + """ + if "raw_packet" not in packet_record or not packet_record["raw_packet"]: + return None + + # Extract timestamp and format date/time + timestamp = packet_record.get("timestamp", 0) + dt = datetime.fromtimestamp(timestamp) + + # Format route type (1=Flood->F, 2=Direct->D, etc) + route_map = {1: "F", 2: "D"} + route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0))) + + return cls( + origin=origin, + origin_id=origin_id, + timestamp=dt.isoformat(), + type="PACKET", + direction="rx", + time=dt.strftime("%H:%M:%S"), + date=dt.strftime("%-d/%-m/%Y"), + len=str(len(packet_record["raw_packet"]) // 2), + packet_type=str(packet_record.get("type", 0)), + route=route, + payload_len=str(packet_record.get("payload_length", 0)), + raw=packet_record["raw_packet"], + SNR=str(packet_record.get("snr", 0)), + RSSI=str(packet_record.get("rssi", 0)), + score=str(int(packet_record.get("score", 0) * 1000)), + duration="0", + hash=packet_record.get("packet_hash", ""), + ) + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization""" + return asdict(self)