clean up code formatting and improve readability in letsmesh_handler and storage_collector

This commit is contained in:
Lloyd
2025-11-20 10:11:06 +00:00
parent f0ccf161c9
commit 08d4e054b0
3 changed files with 168 additions and 159 deletions

View File

@@ -10,8 +10,9 @@ from nacl.signing import SigningKey
from typing import Callable, Optional
from .. import __version__
# --------------------------------------------------------------------
# Helper: Base64URL without padding (required by MeshCore broker)
# Helper: Base64URL without padding
# --------------------------------------------------------------------
def b64url(x: bytes) -> str:
return base64.urlsafe_b64encode(x).rstrip(b"=").decode()
@@ -25,24 +26,23 @@ LETSMESH_BROKERS = [
"name": "Europe (LetsMesh v1)",
"host": "mqtt-eu-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-eu-v1.letsmesh.net"
"audience": "mqtt-eu-v1.letsmesh.net",
},
{
"name": "US West (LetsMesh v1)",
"host": "mqtt-us-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-us-v1.letsmesh.net"
"audience": "mqtt-us-v1.letsmesh.net",
},
{
"name": "Europe (LetsMesh v1)",
"host": "mqtt-eu-v1.letsmesh.net",
"port": 443,
"audience": "mqtt-eu-v1.letsmesh.net"
}
"audience": "mqtt-eu-v1.letsmesh.net",
},
]
# ====================================================================
# MeshCore → MQTT Publisher with Ed25519 auth token
# ====================================================================
@@ -64,8 +64,9 @@ class MeshCoreToMqttJwtPusher:
):
# Extract values from config
from ..config import get_node_info
node_info = get_node_info(config)
iata_code = node_info["iata_code"]
broker_index = node_info["broker_index"]
status_interval = node_info["status_interval"]
@@ -90,10 +91,7 @@ class MeshCoreToMqttJwtPusher:
self._running = False
# MQTT WebSocket client
self.client = mqtt.Client(
client_id=f"meshcore_{self.public_key}",
transport="websockets"
)
self.client = mqtt.Client(client_id=f"meshcore_{self.public_key}", transport="websockets")
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
@@ -103,34 +101,30 @@ class MeshCoreToMqttJwtPusher:
def _generate_jwt(self) -> str:
now = datetime.now(UTC)
header = {
"alg": "Ed25519",
"typ": "JWT"
}
header = {"alg": "Ed25519", "typ": "JWT"}
payload = {
"publicKey": self.public_key,
"aud": self.broker["audience"],
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp())
"exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()),
}
# Encode header and payload (compact JSON - no spaces)
header_b64 = b64url(json.dumps(header, separators=(',', ':')).encode())
payload_b64 = b64url(json.dumps(payload, separators=(',', ':')).encode())
header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode())
payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode())
signing_input = f"{header_b64}.{payload_b64}".encode()
seed32 = binascii.unhexlify(self.private_key_hex)
signer = SigningKey(seed32)
# Verify the public key matches what we expect
derived_public = binascii.hexlify(bytes(signer.verify_key)).decode()
if derived_public.upper() != self.public_key.upper():
raise ValueError(
f"Public key mismatch! "
f"Derived: {derived_public}, Expected: {self.public_key}"
f"Public key mismatch! " f"Derived: {derived_public}, Expected: {self.public_key}"
)
# Sign the message
signature = signer.sign(signing_input).signature
signature_hex = binascii.hexlify(signature).decode()
@@ -148,9 +142,7 @@ class MeshCoreToMqttJwtPusher:
self._running = True
# Publish initial status on connect
self.publish_status(
state="online",
origin=self.node_name,
radio_config=self.radio_config
state="online", origin=self.node_name, radio_config=self.radio_config
)
else:
logging.error(f"Failed with code {rc}")
@@ -168,7 +160,7 @@ class MeshCoreToMqttJwtPusher:
username = f"v1_{self.public_key}"
self.client.username_pw_set(username=username, password=token)
# Conditional TLS setup
if self.use_tls:
self.client.tls_set()
@@ -184,10 +176,11 @@ class MeshCoreToMqttJwtPusher:
# Must use raw hostname without wss://
self.client.connect(self.broker["host"], self.broker["port"], keepalive=60)
self.client.loop_start()
# Start status heartbeat if interval is set
if self.status_interval > 0:
import threading
self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True)
self._status_task.start()
logging.info(f"Started status heartbeat (interval: {self.status_interval}s)")
@@ -195,27 +188,23 @@ class MeshCoreToMqttJwtPusher:
def disconnect(self):
self._running = False
# Publish offline status before disconnecting
self.publish_status(
state="offline",
origin=self.node_name,
radio_config=self.radio_config
)
self.publish_status(state="offline", origin=self.node_name, radio_config=self.radio_config)
import time
time.sleep(0.5) # Give time for the message to be sent
self.client.loop_stop()
self.client.disconnect()
logging.info("Disconnected")
def _status_heartbeat_loop(self):
"""Background thread that publishes periodic status updates"""
import time
while self._running:
try:
self.publish_status(
state="online",
origin=self.node_name,
radio_config=self.radio_config
state="online", origin=self.node_name, radio_config=self.radio_config
)
time.sleep(self.status_interval)
except Exception as e:
@@ -226,11 +215,7 @@ class MeshCoreToMqttJwtPusher:
# Packet helpers
# ----------------------------------------------------------------
def _process_packet(self, pkt: dict) -> dict:
return {
"timestamp": datetime.now(UTC).isoformat(),
"origin_id": self.public_key,
**pkt
}
return {"timestamp": datetime.now(UTC).isoformat(), "origin_id": self.public_key, **pkt}
def _topic(self, subtopic: str) -> str:
return f"meshcore/{self.iata_code}/{self.public_key}/{subtopic}"
@@ -239,13 +224,9 @@ class MeshCoreToMqttJwtPusher:
return self.publish(subtopic, self._process_packet(pkt), retain)
def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False):
pkt = {
"type": "raw",
"data": raw_hex,
"bytes": len(raw_hex) // 2
}
pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2}
return self.publish_packet(pkt, subtopic, retain)
def publish_status(
self,
state: str = "online",
@@ -256,7 +237,7 @@ class MeshCoreToMqttJwtPusher:
):
"""
Publish device status/heartbeat message
Args:
state: Device state (online/offline)
location: Optional dict with latitude/longitude
@@ -268,12 +249,8 @@ class MeshCoreToMqttJwtPusher:
if self.stats_provider:
live_stats = self.stats_provider()
else:
live_stats = {
"uptime_secs": 0,
"packets_sent": 0,
"packets_received": 0
}
live_stats = {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0}
status = {
"status": state,
"timestamp": datetime.now(UTC).isoformat(),
@@ -283,17 +260,12 @@ class MeshCoreToMqttJwtPusher:
"firmware_version": self.app_version,
"radio": radio_config or self.radio_config,
"client_version": f"pyMC_repeater/{self.app_version}",
"stats": {
**live_stats,
"errors": 0,
"queue_len": 0,
**(extra_stats or {})
}
"stats": {**live_stats, "errors": 0, "queue_len": 0, **(extra_stats or {})},
}
if location:
status["location"] = location
return self.publish("status", status, retain=False)
def publish(self, subtopic: str, payload: dict, retain: bool = False):
@@ -308,12 +280,14 @@ class MeshCoreToMqttJwtPusher:
# LetsMesh Packet Data Class
# ====================================================================
@dataclass
class LetsMeshPacket:
"""
Data class for LetsMesh packet format.
Converts internal packet_record format to LetsMesh publish format.
"""
origin: str
origin_id: str
timestamp: str
@@ -333,29 +307,31 @@ class LetsMeshPacket:
hash: str
@classmethod
def from_packet_record(cls, packet_record: dict, origin: str, origin_id: str) -> Optional['LetsMeshPacket']:
def from_packet_record(
cls, packet_record: dict, origin: str, origin_id: str
) -> Optional["LetsMeshPacket"]:
"""
Create LetsMeshPacket from internal packet_record format.
Args:
packet_record: Internal packet record dictionary
origin: Node name
origin_id: Public key of the node
Returns:
LetsMeshPacket instance or None if raw_packet is missing
"""
if "raw_packet" not in packet_record or not packet_record["raw_packet"]:
return None
# Extract timestamp and format date/time
timestamp = packet_record.get("timestamp", 0)
dt = datetime.fromtimestamp(timestamp)
# Format route type (1=Flood->F, 2=Direct->D, etc)
route_map = {1: "F", 2: "D"}
route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0)))
return cls(
origin=origin,
origin_id=origin_id,
@@ -373,9 +349,9 @@ class LetsMeshPacket:
RSSI=str(packet_record.get("rssi", 0)),
score=str(int(packet_record.get("score", 0) * 1000)),
duration="0",
hash=packet_record.get("packet_hash", "")
hash=packet_record.get("packet_hash", ""),
)
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization"""
return asdict(self)

View File

@@ -20,13 +20,13 @@ class StorageCollector:
self.repeater_handler = repeater_handler
self.storage_dir = Path(config.get("storage_dir", "/var/lib/pymc_repeater"))
self.storage_dir.mkdir(parents=True, exist_ok=True)
node_name = config.get("repeater", {}).get("node_name", "unknown")
self.sqlite_handler = SQLiteHandler(self.storage_dir)
self.rrd_handler = RRDToolHandler(self.storage_dir)
self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name)
# Initialize LetsMesh handler if configured
self.letsmesh_handler = None
if config.get("letsmesh", {}).get("enabled", False) and local_identity:
@@ -34,21 +34,24 @@ class StorageCollector:
# Get keys from local_identity (signing_key.encode() is the private key seed)
private_key_hex = local_identity.signing_key.encode().hex()
public_key_hex = local_identity.get_public_key().hex()
self.letsmesh_handler = MeshCoreToMqttJwtPusher(
private_key=private_key_hex,
public_key=public_key_hex,
config=config,
stats_provider=self._get_live_stats
stats_provider=self._get_live_stats,
)
self.letsmesh_handler.connect()
# Get disallowed packet types from config
from ..config import get_node_info
node_info = get_node_info(config)
self.disallowed_packet_types = set(node_info["disallowed_packet_types"])
logger.info(f"LetsMesh handler initialized with public key: {public_key_hex[:16]}...")
logger.info(
f"LetsMesh handler initialized with public key: {public_key_hex[:16]}..."
)
if self.disallowed_packet_types:
logger.info(f"Disallowed packet types: {sorted(self.disallowed_packet_types)}")
else:
@@ -63,65 +66,66 @@ class StorageCollector:
def _get_live_stats(self) -> dict:
"""Get live stats from RepeaterHandler"""
if not self.repeater_handler:
return {
"uptime_secs": 0,
"packets_sent": 0,
"packets_received": 0
}
return {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0}
uptime_secs = int(time.time() - self.repeater_handler.start_time)
return {
"uptime_secs": uptime_secs,
"packets_sent": self.repeater_handler.forwarded_count,
"packets_received": self.repeater_handler.rx_count
"packets_received": self.repeater_handler.rx_count,
}
def record_packet(self, packet_record: dict):
logger.debug(f"Recording packet: type={packet_record.get('type')}, transmitted={packet_record.get('transmitted')}")
"""Record packet to storage and publish to MQTT/LetsMesh"""
logger.debug(
f"Recording packet: type={packet_record.get('type')}, "
f"transmitted={packet_record.get('transmitted')}"
)
# Store to local databases and publish to local MQTT
self.sqlite_handler.store_packet(packet_record)
cumulative_counts = self.sqlite_handler.get_cumulative_counts()
self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts)
self.mqtt_handler.publish(packet_record, "packet")
# Publish to LetsMesh if enabled
if self.letsmesh_handler:
try:
# Check if packet has type field
if "type" not in packet_record:
logger.error("Cannot publish to LetsMesh: packet_record missing 'type' field")
return
packet_type = packet_record["type"]
if packet_type in self.disallowed_packet_types:
logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (in disallowed types)")
return
# Create LetsMesh packet from record
node_name = self.config.get("repeater", {}).get("node_name", "Unknown")
letsmesh_packet = LetsMeshPacket.from_packet_record(
packet_record,
origin=node_name,
origin_id=self.letsmesh_handler.public_key
)
if letsmesh_packet:
self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict())
except Exception as e:
logger.error(f"Failed to publish packet to LetsMesh: {e}")
# Publish to LetsMesh if enabled
self._publish_to_letsmesh(packet_record)
def _publish_to_letsmesh(self, packet_record: dict):
"""Publish packet to LetsMesh broker if enabled and allowed"""
if not self.letsmesh_handler:
return
try:
packet_type = packet_record.get("type")
if packet_type is None:
logger.error("Cannot publish to LetsMesh: packet_record missing 'type' field")
return
if packet_type in self.disallowed_packet_types:
logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)")
return
node_name = self.config.get("repeater", {}).get("node_name", "Unknown")
letsmesh_packet = LetsMeshPacket.from_packet_record(
packet_record, origin=node_name, origin_id=self.letsmesh_handler.public_key
)
if letsmesh_packet:
self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict())
logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh")
else:
logger.debug("Skipped LetsMesh publish: packet missing raw_packet data")
except Exception as e:
logger.error(f"Failed to publish packet to LetsMesh: {e}", exc_info=True)
def record_advert(self, advert_record: dict):
self.sqlite_handler.store_advert(advert_record)
self.mqtt_handler.publish(advert_record, "advert")
def record_noise_floor(self, noise_floor_dbm: float):
noise_record = {
"timestamp": time.time(),
"noise_floor_dbm": noise_floor_dbm
}
noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm}
self.sqlite_handler.store_noise_floor(noise_record)
self.mqtt_handler.publish(noise_record, "noise_floor")
@@ -131,12 +135,14 @@ class StorageCollector:
def get_recent_packets(self, limit: int = 100) -> list:
return self.sqlite_handler.get_recent_packets(limit)
def get_filtered_packets(self,
packet_type: Optional[int] = None,
route: Optional[int] = None,
start_timestamp: Optional[float] = None,
end_timestamp: Optional[float] = None,
limit: int = 1000) -> list:
def get_filtered_packets(
self,
packet_type: Optional[int] = None,
route: Optional[int] = None,
start_timestamp: Optional[float] = None,
end_timestamp: Optional[float] = None,
limit: int = 1000,
) -> list:
return self.sqlite_handler.get_filtered_packets(
packet_type, route, start_timestamp, end_timestamp, limit
)
@@ -144,15 +150,19 @@ class StorageCollector:
def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]:
return self.sqlite_handler.get_packet_by_hash(packet_hash)
def get_rrd_data(self, start_time: Optional[int] = None, end_time: Optional[int] = None,
resolution: str = "average") -> Optional[dict]:
def get_rrd_data(
self,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
resolution: str = "average",
) -> Optional[dict]:
return self.rrd_handler.get_data(start_time, end_time, resolution)
def get_packet_type_stats(self, hours: int = 24) -> dict:
rrd_stats = self.rrd_handler.get_packet_type_stats(hours)
if rrd_stats:
return rrd_stats
logger.warning("Falling back to SQLite for packet type stats")
return self.sqlite_handler.get_packet_type_stats(hours)
@@ -180,8 +190,17 @@ class StorageCollector:
except Exception as e:
logger.error(f"Error disconnecting LetsMesh handler: {e}")
def create_transport_key(self, name: str, flood_policy: str, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]:
return self.sqlite_handler.create_transport_key(name, flood_policy, transport_key, parent_id, last_used)
def create_transport_key(
self,
name: str,
flood_policy: str,
transport_key: Optional[str] = None,
parent_id: Optional[int] = None,
last_used: Optional[float] = None,
) -> Optional[int]:
return self.sqlite_handler.create_transport_key(
name, flood_policy, transport_key, parent_id, last_used
)
def get_transport_keys(self) -> list:
return self.sqlite_handler.get_transport_keys()
@@ -189,8 +208,18 @@ class StorageCollector:
def get_transport_key_by_id(self, key_id: int) -> Optional[dict]:
return self.sqlite_handler.get_transport_key_by_id(key_id)
def update_transport_key(self, key_id: int, name: Optional[str] = None, flood_policy: Optional[str] = None, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> bool:
return self.sqlite_handler.update_transport_key(key_id, name, flood_policy, transport_key, parent_id, last_used)
def update_transport_key(
self,
key_id: int,
name: Optional[str] = None,
flood_policy: Optional[str] = None,
transport_key: Optional[str] = None,
parent_id: Optional[int] = None,
last_used: Optional[float] = None,
) -> bool:
return self.sqlite_handler.update_transport_key(
key_id, name, flood_policy, transport_key, parent_id, last_used
)
def delete_transport_key(self, key_id: int) -> bool:
return self.sqlite_handler.delete_transport_key(key_id)
return self.sqlite_handler.delete_transport_key(key_id)

View File

@@ -76,7 +76,7 @@ class RepeaterHandler(BaseHandler):
# Storage collector for persistent packet logging
try:
local_identity = dispatcher.local_identity if dispatcher else None
self.storage = StorageCollector(config, local_identity, repeater_handler=self)
logger.info("StorageCollector initialized successfully")
@@ -86,7 +86,7 @@ class RepeaterHandler(BaseHandler):
# Initialize background timer tracking
self.last_noise_measurement = time.time()
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds
self._background_task = None
self._start_background_tasks()
@@ -208,9 +208,17 @@ class RepeaterHandler(BaseHandler):
# Record packet for charts
packet_record = {
"timestamp": time.time(),
"header": f"0x{packet.header:02X}" if hasattr(packet, "header") and packet.header is not None else None,
"payload": packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None,
"payload_length": len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0,
"header": (
f"0x{packet.header:02X}"
if hasattr(packet, "header") and packet.header is not None
else None
),
"payload": (
packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None
),
"payload_length": (
len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0
),
"type": payload_type,
"route": route_type,
"length": len(packet.payload or b""),
@@ -241,8 +249,6 @@ class RepeaterHandler(BaseHandler):
except Exception as e:
logger.error(f"Failed to store packet record: {e}")
# If this is a duplicate, try to attach it to the original packet
if is_dupe and len(self.recent_packets) > 0:
# Find the original packet with same hash
@@ -268,20 +274,20 @@ class RepeaterHandler(BaseHandler):
def log_trace_record(self, packet_record: dict) -> None:
"""Manually log a packet trace record (used by external callers)"""
self.recent_packets.append(packet_record)
self.rx_count += 1
if packet_record.get("transmitted", False):
self.forwarded_count += 1
else:
self.dropped_count += 1
# Store to persistent storage (same as __call__ does)
if self.storage:
try:
self.storage.record_packet(packet_record)
except Exception as e:
logger.error(f"Failed to store packet record: {e}")
if len(self.recent_packets) > self.max_recent_packets:
self.recent_packets.pop(0)
@@ -292,8 +298,6 @@ class RepeaterHandler(BaseHandler):
for k in expired:
del self.seen_packets[k]
def _get_drop_reason(self, packet: Packet) -> str:
if self.is_duplicate(packet):
@@ -359,7 +363,7 @@ class RepeaterHandler(BaseHandler):
longitude = appdata_decoded.get("longitude")
current_time = time.time()
# Check if this is a new neighbor
current_neighbors = self.storage.get_neighbors() if self.storage else {}
is_new_neighbor = pubkey not in current_neighbors
@@ -451,9 +455,7 @@ class RepeaterHandler(BaseHandler):
next_hop = packet.path[0]
if next_hop != self.local_hash:
logger.debug(
f"Direct: not our hop (next={next_hop:02X}, local={self.local_hash:02X})"
)
logger.debug(f"Direct: not our hop (next={next_hop:02X}, local={self.local_hash:02X})")
return None
original_path = list(packet.path)
@@ -583,7 +585,7 @@ class RepeaterHandler(BaseHandler):
def get_noise_floor(self) -> Optional[float]:
try:
radio = self.dispatcher.radio if self.dispatcher else None
if radio and hasattr(radio, 'get_noise_floor'):
if radio and hasattr(radio, "get_noise_floor"):
return radio.get_noise_floor()
return None
except Exception as e:
@@ -670,22 +672,22 @@ class RepeaterHandler(BaseHandler):
try:
while True:
current_time = time.time()
# Check noise floor recording (every 30 seconds)
if current_time - self.last_noise_measurement >= self.noise_floor_interval:
await self._record_noise_floor_async()
self.last_noise_measurement = current_time
# Check advert sending (every N hours)
if self.send_advert_interval_hours > 0 and self.send_advert_func:
interval_seconds = self.send_advert_interval_hours * 3600
if current_time - self.last_advert_time >= interval_seconds:
await self._send_periodic_advert_async()
self.last_advert_time = current_time
# Sleep for 5 seconds before next check
await asyncio.sleep(5.0)
except asyncio.CancelledError:
logger.info("Background timer loop cancelled")
raise
@@ -698,7 +700,7 @@ class RepeaterHandler(BaseHandler):
async def _record_noise_floor_async(self):
if not self.storage:
return
try:
noise_floor = self.get_noise_floor()
if noise_floor is not None:
@@ -710,7 +712,9 @@ class RepeaterHandler(BaseHandler):
logger.error(f"Error recording noise floor: {e}")
async def _send_periodic_advert_async(self):
logger.info(f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)")
logger.info(
f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)"
)
try:
if self.send_advert_func:
success = await self.send_advert_func()
@@ -727,7 +731,7 @@ class RepeaterHandler(BaseHandler):
if self._background_task and not self._background_task.done():
self._background_task.cancel()
logger.info("Background timer task cancelled")
if self.storage:
try:
self.storage.close()
@@ -739,4 +743,4 @@ class RepeaterHandler(BaseHandler):
try:
self.cleanup()
except Exception:
pass
pass