diff --git a/repeater/data_acquisition/letsmesh_handler.py b/repeater/data_acquisition/letsmesh_handler.py index 73c8aa4..95b3fd2 100644 --- a/repeater/data_acquisition/letsmesh_handler.py +++ b/repeater/data_acquisition/letsmesh_handler.py @@ -10,8 +10,9 @@ from nacl.signing import SigningKey from typing import Callable, Optional from .. import __version__ + # -------------------------------------------------------------------- -# Helper: Base64URL without padding (required by MeshCore broker) +# Helper: Base64URL without padding # -------------------------------------------------------------------- def b64url(x: bytes) -> str: return base64.urlsafe_b64encode(x).rstrip(b"=").decode() @@ -25,24 +26,23 @@ LETSMESH_BROKERS = [ "name": "Europe (LetsMesh v1)", "host": "mqtt-eu-v1.letsmesh.net", "port": 443, - "audience": "mqtt-eu-v1.letsmesh.net" + "audience": "mqtt-eu-v1.letsmesh.net", }, { "name": "US West (LetsMesh v1)", "host": "mqtt-us-v1.letsmesh.net", "port": 443, - "audience": "mqtt-us-v1.letsmesh.net" + "audience": "mqtt-us-v1.letsmesh.net", }, { "name": "Europe (LetsMesh v1)", "host": "mqtt-eu-v1.letsmesh.net", "port": 443, - "audience": "mqtt-eu-v1.letsmesh.net" - } + "audience": "mqtt-eu-v1.letsmesh.net", + }, ] - # ==================================================================== # MeshCore → MQTT Publisher with Ed25519 auth token # ==================================================================== @@ -64,8 +64,9 @@ class MeshCoreToMqttJwtPusher: ): # Extract values from config from ..config import get_node_info + node_info = get_node_info(config) - + iata_code = node_info["iata_code"] broker_index = node_info["broker_index"] status_interval = node_info["status_interval"] @@ -90,10 +91,7 @@ class MeshCoreToMqttJwtPusher: self._running = False # MQTT WebSocket client - self.client = mqtt.Client( - client_id=f"meshcore_{self.public_key}", - transport="websockets" - ) + self.client = mqtt.Client(client_id=f"meshcore_{self.public_key}", transport="websockets") self.client.on_connect = self._on_connect self.client.on_disconnect = self._on_disconnect @@ -103,34 +101,30 @@ class MeshCoreToMqttJwtPusher: def _generate_jwt(self) -> str: now = datetime.now(UTC) - header = { - "alg": "Ed25519", - "typ": "JWT" - } + header = {"alg": "Ed25519", "typ": "JWT"} payload = { "publicKey": self.public_key, "aud": self.broker["audience"], "iat": int(now.timestamp()), - "exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()) + "exp": int((now + timedelta(minutes=self.jwt_expiry_minutes)).timestamp()), } # Encode header and payload (compact JSON - no spaces) - header_b64 = b64url(json.dumps(header, separators=(',', ':')).encode()) - payload_b64 = b64url(json.dumps(payload, separators=(',', ':')).encode()) + header_b64 = b64url(json.dumps(header, separators=(",", ":")).encode()) + payload_b64 = b64url(json.dumps(payload, separators=(",", ":")).encode()) signing_input = f"{header_b64}.{payload_b64}".encode() seed32 = binascii.unhexlify(self.private_key_hex) signer = SigningKey(seed32) - + # Verify the public key matches what we expect derived_public = binascii.hexlify(bytes(signer.verify_key)).decode() if derived_public.upper() != self.public_key.upper(): raise ValueError( - f"Public key mismatch! " - f"Derived: {derived_public}, Expected: {self.public_key}" + f"Public key mismatch! " f"Derived: {derived_public}, Expected: {self.public_key}" ) - + # Sign the message signature = signer.sign(signing_input).signature signature_hex = binascii.hexlify(signature).decode() @@ -148,9 +142,7 @@ class MeshCoreToMqttJwtPusher: self._running = True # Publish initial status on connect self.publish_status( - state="online", - origin=self.node_name, - radio_config=self.radio_config + state="online", origin=self.node_name, radio_config=self.radio_config ) else: logging.error(f"Failed with code {rc}") @@ -168,7 +160,7 @@ class MeshCoreToMqttJwtPusher: username = f"v1_{self.public_key}" self.client.username_pw_set(username=username, password=token) - + # Conditional TLS setup if self.use_tls: self.client.tls_set() @@ -184,10 +176,11 @@ class MeshCoreToMqttJwtPusher: # Must use raw hostname without wss:// self.client.connect(self.broker["host"], self.broker["port"], keepalive=60) self.client.loop_start() - + # Start status heartbeat if interval is set if self.status_interval > 0: import threading + self._status_task = threading.Thread(target=self._status_heartbeat_loop, daemon=True) self._status_task.start() logging.info(f"Started status heartbeat (interval: {self.status_interval}s)") @@ -195,27 +188,23 @@ class MeshCoreToMqttJwtPusher: def disconnect(self): self._running = False # Publish offline status before disconnecting - self.publish_status( - state="offline", - origin=self.node_name, - radio_config=self.radio_config - ) + self.publish_status(state="offline", origin=self.node_name, radio_config=self.radio_config) import time + time.sleep(0.5) # Give time for the message to be sent - + self.client.loop_stop() self.client.disconnect() logging.info("Disconnected") - + def _status_heartbeat_loop(self): """Background thread that publishes periodic status updates""" import time + while self._running: try: self.publish_status( - state="online", - origin=self.node_name, - radio_config=self.radio_config + state="online", origin=self.node_name, radio_config=self.radio_config ) time.sleep(self.status_interval) except Exception as e: @@ -226,11 +215,7 @@ class MeshCoreToMqttJwtPusher: # Packet helpers # ---------------------------------------------------------------- def _process_packet(self, pkt: dict) -> dict: - return { - "timestamp": datetime.now(UTC).isoformat(), - "origin_id": self.public_key, - **pkt - } + return {"timestamp": datetime.now(UTC).isoformat(), "origin_id": self.public_key, **pkt} def _topic(self, subtopic: str) -> str: return f"meshcore/{self.iata_code}/{self.public_key}/{subtopic}" @@ -239,13 +224,9 @@ class MeshCoreToMqttJwtPusher: return self.publish(subtopic, self._process_packet(pkt), retain) def publish_raw_data(self, raw_hex: str, subtopic="raw", retain=False): - pkt = { - "type": "raw", - "data": raw_hex, - "bytes": len(raw_hex) // 2 - } + pkt = {"type": "raw", "data": raw_hex, "bytes": len(raw_hex) // 2} return self.publish_packet(pkt, subtopic, retain) - + def publish_status( self, state: str = "online", @@ -256,7 +237,7 @@ class MeshCoreToMqttJwtPusher: ): """ Publish device status/heartbeat message - + Args: state: Device state (online/offline) location: Optional dict with latitude/longitude @@ -268,12 +249,8 @@ class MeshCoreToMqttJwtPusher: if self.stats_provider: live_stats = self.stats_provider() else: - live_stats = { - "uptime_secs": 0, - "packets_sent": 0, - "packets_received": 0 - } - + live_stats = {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0} + status = { "status": state, "timestamp": datetime.now(UTC).isoformat(), @@ -283,17 +260,12 @@ class MeshCoreToMqttJwtPusher: "firmware_version": self.app_version, "radio": radio_config or self.radio_config, "client_version": f"pyMC_repeater/{self.app_version}", - "stats": { - **live_stats, - "errors": 0, - "queue_len": 0, - **(extra_stats or {}) - } + "stats": {**live_stats, "errors": 0, "queue_len": 0, **(extra_stats or {})}, } - + if location: status["location"] = location - + return self.publish("status", status, retain=False) def publish(self, subtopic: str, payload: dict, retain: bool = False): @@ -308,12 +280,14 @@ class MeshCoreToMqttJwtPusher: # LetsMesh Packet Data Class # ==================================================================== + @dataclass class LetsMeshPacket: """ Data class for LetsMesh packet format. Converts internal packet_record format to LetsMesh publish format. """ + origin: str origin_id: str timestamp: str @@ -333,29 +307,31 @@ class LetsMeshPacket: hash: str @classmethod - def from_packet_record(cls, packet_record: dict, origin: str, origin_id: str) -> Optional['LetsMeshPacket']: + def from_packet_record( + cls, packet_record: dict, origin: str, origin_id: str + ) -> Optional["LetsMeshPacket"]: """ Create LetsMeshPacket from internal packet_record format. - + Args: packet_record: Internal packet record dictionary origin: Node name origin_id: Public key of the node - + Returns: LetsMeshPacket instance or None if raw_packet is missing """ if "raw_packet" not in packet_record or not packet_record["raw_packet"]: return None - + # Extract timestamp and format date/time timestamp = packet_record.get("timestamp", 0) dt = datetime.fromtimestamp(timestamp) - + # Format route type (1=Flood->F, 2=Direct->D, etc) route_map = {1: "F", 2: "D"} route = route_map.get(packet_record.get("route", 0), str(packet_record.get("route", 0))) - + return cls( origin=origin, origin_id=origin_id, @@ -373,9 +349,9 @@ class LetsMeshPacket: RSSI=str(packet_record.get("rssi", 0)), score=str(int(packet_record.get("score", 0) * 1000)), duration="0", - hash=packet_record.get("packet_hash", "") + hash=packet_record.get("packet_hash", ""), ) - + def to_dict(self) -> dict: """Convert to dictionary for JSON serialization""" return asdict(self) diff --git a/repeater/data_acquisition/storage_collector.py b/repeater/data_acquisition/storage_collector.py index 5294ab1..d333c6f 100644 --- a/repeater/data_acquisition/storage_collector.py +++ b/repeater/data_acquisition/storage_collector.py @@ -20,13 +20,13 @@ class StorageCollector: self.repeater_handler = repeater_handler self.storage_dir = Path(config.get("storage_dir", "/var/lib/pymc_repeater")) self.storage_dir.mkdir(parents=True, exist_ok=True) - + node_name = config.get("repeater", {}).get("node_name", "unknown") - + self.sqlite_handler = SQLiteHandler(self.storage_dir) self.rrd_handler = RRDToolHandler(self.storage_dir) self.mqtt_handler = MQTTHandler(config.get("mqtt", {}), node_name) - + # Initialize LetsMesh handler if configured self.letsmesh_handler = None if config.get("letsmesh", {}).get("enabled", False) and local_identity: @@ -34,21 +34,24 @@ class StorageCollector: # Get keys from local_identity (signing_key.encode() is the private key seed) private_key_hex = local_identity.signing_key.encode().hex() public_key_hex = local_identity.get_public_key().hex() - + self.letsmesh_handler = MeshCoreToMqttJwtPusher( private_key=private_key_hex, public_key=public_key_hex, config=config, - stats_provider=self._get_live_stats + stats_provider=self._get_live_stats, ) self.letsmesh_handler.connect() - + # Get disallowed packet types from config from ..config import get_node_info + node_info = get_node_info(config) self.disallowed_packet_types = set(node_info["disallowed_packet_types"]) - - logger.info(f"LetsMesh handler initialized with public key: {public_key_hex[:16]}...") + + logger.info( + f"LetsMesh handler initialized with public key: {public_key_hex[:16]}..." + ) if self.disallowed_packet_types: logger.info(f"Disallowed packet types: {sorted(self.disallowed_packet_types)}") else: @@ -63,65 +66,66 @@ class StorageCollector: def _get_live_stats(self) -> dict: """Get live stats from RepeaterHandler""" if not self.repeater_handler: - return { - "uptime_secs": 0, - "packets_sent": 0, - "packets_received": 0 - } - + return {"uptime_secs": 0, "packets_sent": 0, "packets_received": 0} + uptime_secs = int(time.time() - self.repeater_handler.start_time) return { "uptime_secs": uptime_secs, "packets_sent": self.repeater_handler.forwarded_count, - "packets_received": self.repeater_handler.rx_count + "packets_received": self.repeater_handler.rx_count, } def record_packet(self, packet_record: dict): - logger.debug(f"Recording packet: type={packet_record.get('type')}, transmitted={packet_record.get('transmitted')}") - + """Record packet to storage and publish to MQTT/LetsMesh""" + logger.debug( + f"Recording packet: type={packet_record.get('type')}, " + f"transmitted={packet_record.get('transmitted')}" + ) + + # Store to local databases and publish to local MQTT self.sqlite_handler.store_packet(packet_record) - cumulative_counts = self.sqlite_handler.get_cumulative_counts() self.rrd_handler.update_packet_metrics(packet_record, cumulative_counts) self.mqtt_handler.publish(packet_record, "packet") - - # Publish to LetsMesh if enabled - if self.letsmesh_handler: - try: - # Check if packet has type field - if "type" not in packet_record: - logger.error("Cannot publish to LetsMesh: packet_record missing 'type' field") - return - - packet_type = packet_record["type"] - if packet_type in self.disallowed_packet_types: - logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (in disallowed types)") - return - - # Create LetsMesh packet from record - node_name = self.config.get("repeater", {}).get("node_name", "Unknown") - letsmesh_packet = LetsMeshPacket.from_packet_record( - packet_record, - origin=node_name, - origin_id=self.letsmesh_handler.public_key - ) - - if letsmesh_packet: - self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict()) - - except Exception as e: - logger.error(f"Failed to publish packet to LetsMesh: {e}") + # Publish to LetsMesh if enabled + self._publish_to_letsmesh(packet_record) + + def _publish_to_letsmesh(self, packet_record: dict): + """Publish packet to LetsMesh broker if enabled and allowed""" + if not self.letsmesh_handler: + return + + try: + packet_type = packet_record.get("type") + if packet_type is None: + logger.error("Cannot publish to LetsMesh: packet_record missing 'type' field") + return + + if packet_type in self.disallowed_packet_types: + logger.debug(f"Skipped publishing packet type 0x{packet_type:02X} (disallowed)") + return + + node_name = self.config.get("repeater", {}).get("node_name", "Unknown") + letsmesh_packet = LetsMeshPacket.from_packet_record( + packet_record, origin=node_name, origin_id=self.letsmesh_handler.public_key + ) + + if letsmesh_packet: + self.letsmesh_handler.publish_packet(letsmesh_packet.to_dict()) + logger.debug(f"Published packet type 0x{packet_type:02X} to LetsMesh") + else: + logger.debug("Skipped LetsMesh publish: packet missing raw_packet data") + + except Exception as e: + logger.error(f"Failed to publish packet to LetsMesh: {e}", exc_info=True) def record_advert(self, advert_record: dict): self.sqlite_handler.store_advert(advert_record) self.mqtt_handler.publish(advert_record, "advert") def record_noise_floor(self, noise_floor_dbm: float): - noise_record = { - "timestamp": time.time(), - "noise_floor_dbm": noise_floor_dbm - } + noise_record = {"timestamp": time.time(), "noise_floor_dbm": noise_floor_dbm} self.sqlite_handler.store_noise_floor(noise_record) self.mqtt_handler.publish(noise_record, "noise_floor") @@ -131,12 +135,14 @@ class StorageCollector: def get_recent_packets(self, limit: int = 100) -> list: return self.sqlite_handler.get_recent_packets(limit) - def get_filtered_packets(self, - packet_type: Optional[int] = None, - route: Optional[int] = None, - start_timestamp: Optional[float] = None, - end_timestamp: Optional[float] = None, - limit: int = 1000) -> list: + def get_filtered_packets( + self, + packet_type: Optional[int] = None, + route: Optional[int] = None, + start_timestamp: Optional[float] = None, + end_timestamp: Optional[float] = None, + limit: int = 1000, + ) -> list: return self.sqlite_handler.get_filtered_packets( packet_type, route, start_timestamp, end_timestamp, limit ) @@ -144,15 +150,19 @@ class StorageCollector: def get_packet_by_hash(self, packet_hash: str) -> Optional[dict]: return self.sqlite_handler.get_packet_by_hash(packet_hash) - def get_rrd_data(self, start_time: Optional[int] = None, end_time: Optional[int] = None, - resolution: str = "average") -> Optional[dict]: + def get_rrd_data( + self, + start_time: Optional[int] = None, + end_time: Optional[int] = None, + resolution: str = "average", + ) -> Optional[dict]: return self.rrd_handler.get_data(start_time, end_time, resolution) def get_packet_type_stats(self, hours: int = 24) -> dict: rrd_stats = self.rrd_handler.get_packet_type_stats(hours) if rrd_stats: return rrd_stats - + logger.warning("Falling back to SQLite for packet type stats") return self.sqlite_handler.get_packet_type_stats(hours) @@ -180,8 +190,17 @@ class StorageCollector: except Exception as e: logger.error(f"Error disconnecting LetsMesh handler: {e}") - def create_transport_key(self, name: str, flood_policy: str, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]: - return self.sqlite_handler.create_transport_key(name, flood_policy, transport_key, parent_id, last_used) + def create_transport_key( + self, + name: str, + flood_policy: str, + transport_key: Optional[str] = None, + parent_id: Optional[int] = None, + last_used: Optional[float] = None, + ) -> Optional[int]: + return self.sqlite_handler.create_transport_key( + name, flood_policy, transport_key, parent_id, last_used + ) def get_transport_keys(self) -> list: return self.sqlite_handler.get_transport_keys() @@ -189,8 +208,18 @@ class StorageCollector: def get_transport_key_by_id(self, key_id: int) -> Optional[dict]: return self.sqlite_handler.get_transport_key_by_id(key_id) - def update_transport_key(self, key_id: int, name: Optional[str] = None, flood_policy: Optional[str] = None, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> bool: - return self.sqlite_handler.update_transport_key(key_id, name, flood_policy, transport_key, parent_id, last_used) + def update_transport_key( + self, + key_id: int, + name: Optional[str] = None, + flood_policy: Optional[str] = None, + transport_key: Optional[str] = None, + parent_id: Optional[int] = None, + last_used: Optional[float] = None, + ) -> bool: + return self.sqlite_handler.update_transport_key( + key_id, name, flood_policy, transport_key, parent_id, last_used + ) def delete_transport_key(self, key_id: int) -> bool: - return self.sqlite_handler.delete_transport_key(key_id) \ No newline at end of file + return self.sqlite_handler.delete_transport_key(key_id) diff --git a/repeater/engine.py b/repeater/engine.py index 8322c7b..2848c6c 100644 --- a/repeater/engine.py +++ b/repeater/engine.py @@ -76,7 +76,7 @@ class RepeaterHandler(BaseHandler): # Storage collector for persistent packet logging try: - + local_identity = dispatcher.local_identity if dispatcher else None self.storage = StorageCollector(config, local_identity, repeater_handler=self) logger.info("StorageCollector initialized successfully") @@ -86,7 +86,7 @@ class RepeaterHandler(BaseHandler): # Initialize background timer tracking self.last_noise_measurement = time.time() - self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds + self.noise_floor_interval = NOISE_FLOOR_INTERVAL # 30 seconds self._background_task = None self._start_background_tasks() @@ -208,9 +208,17 @@ class RepeaterHandler(BaseHandler): # Record packet for charts packet_record = { "timestamp": time.time(), - "header": f"0x{packet.header:02X}" if hasattr(packet, "header") and packet.header is not None else None, - "payload": packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None, - "payload_length": len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0, + "header": ( + f"0x{packet.header:02X}" + if hasattr(packet, "header") and packet.header is not None + else None + ), + "payload": ( + packet.payload.hex() if hasattr(packet, "payload") and packet.payload else None + ), + "payload_length": ( + len(packet.payload) if hasattr(packet, "payload") and packet.payload else 0 + ), "type": payload_type, "route": route_type, "length": len(packet.payload or b""), @@ -241,8 +249,6 @@ class RepeaterHandler(BaseHandler): except Exception as e: logger.error(f"Failed to store packet record: {e}") - - # If this is a duplicate, try to attach it to the original packet if is_dupe and len(self.recent_packets) > 0: # Find the original packet with same hash @@ -268,20 +274,20 @@ class RepeaterHandler(BaseHandler): def log_trace_record(self, packet_record: dict) -> None: """Manually log a packet trace record (used by external callers)""" self.recent_packets.append(packet_record) - + self.rx_count += 1 if packet_record.get("transmitted", False): self.forwarded_count += 1 else: self.dropped_count += 1 - + # Store to persistent storage (same as __call__ does) if self.storage: try: self.storage.record_packet(packet_record) except Exception as e: logger.error(f"Failed to store packet record: {e}") - + if len(self.recent_packets) > self.max_recent_packets: self.recent_packets.pop(0) @@ -292,8 +298,6 @@ class RepeaterHandler(BaseHandler): for k in expired: del self.seen_packets[k] - - def _get_drop_reason(self, packet: Packet) -> str: if self.is_duplicate(packet): @@ -359,7 +363,7 @@ class RepeaterHandler(BaseHandler): longitude = appdata_decoded.get("longitude") current_time = time.time() - + # Check if this is a new neighbor current_neighbors = self.storage.get_neighbors() if self.storage else {} is_new_neighbor = pubkey not in current_neighbors @@ -451,9 +455,7 @@ class RepeaterHandler(BaseHandler): next_hop = packet.path[0] if next_hop != self.local_hash: - logger.debug( - f"Direct: not our hop (next={next_hop:02X}, local={self.local_hash:02X})" - ) + logger.debug(f"Direct: not our hop (next={next_hop:02X}, local={self.local_hash:02X})") return None original_path = list(packet.path) @@ -583,7 +585,7 @@ class RepeaterHandler(BaseHandler): def get_noise_floor(self) -> Optional[float]: try: radio = self.dispatcher.radio if self.dispatcher else None - if radio and hasattr(radio, 'get_noise_floor'): + if radio and hasattr(radio, "get_noise_floor"): return radio.get_noise_floor() return None except Exception as e: @@ -670,22 +672,22 @@ class RepeaterHandler(BaseHandler): try: while True: current_time = time.time() - + # Check noise floor recording (every 30 seconds) if current_time - self.last_noise_measurement >= self.noise_floor_interval: await self._record_noise_floor_async() self.last_noise_measurement = current_time - + # Check advert sending (every N hours) if self.send_advert_interval_hours > 0 and self.send_advert_func: interval_seconds = self.send_advert_interval_hours * 3600 if current_time - self.last_advert_time >= interval_seconds: await self._send_periodic_advert_async() self.last_advert_time = current_time - + # Sleep for 5 seconds before next check await asyncio.sleep(5.0) - + except asyncio.CancelledError: logger.info("Background timer loop cancelled") raise @@ -698,7 +700,7 @@ class RepeaterHandler(BaseHandler): async def _record_noise_floor_async(self): if not self.storage: return - + try: noise_floor = self.get_noise_floor() if noise_floor is not None: @@ -710,7 +712,9 @@ class RepeaterHandler(BaseHandler): logger.error(f"Error recording noise floor: {e}") async def _send_periodic_advert_async(self): - logger.info(f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)") + logger.info( + f"Periodic advert timer triggered (interval: {self.send_advert_interval_hours}h)" + ) try: if self.send_advert_func: success = await self.send_advert_func() @@ -727,7 +731,7 @@ class RepeaterHandler(BaseHandler): if self._background_task and not self._background_task.done(): self._background_task.cancel() logger.info("Background timer task cancelled") - + if self.storage: try: self.storage.close() @@ -739,4 +743,4 @@ class RepeaterHandler(BaseHandler): try: self.cleanup() except Exception: - pass \ No newline at end of file + pass