From 3cd93c08a7018427beccd776ac49026bfd3cd600 Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Wed, 24 Sep 2025 19:39:06 -0700 Subject: [PATCH] Updates stats with pie chart and report for channel --- meshview/mqtt_store.py | 117 +++++++++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 39 deletions(-) diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py index f3259d2..02c2b07 100644 --- a/meshview/mqtt_store.py +++ b/meshview/mqtt_store.py @@ -1,4 +1,5 @@ import datetime +import re from sqlalchemy import select from sqlalchemy import update from meshtastic.protobuf.config_pb2 import Config @@ -9,30 +10,33 @@ from meshview import decode_payload from meshview.models import Packet, PacketSeen, Node, Traceroute - - async def process_envelope(topic, env): # Checking if the received packet is a MAP_REPORT - # Update the node table with the firmware version if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP: - # Extract the node ID from the packet and format the user ID node_id = getattr(env.packet, "from") user_id = f"!{node_id:0{8}x}" - # Decode the MAP report payload - map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload) + map_report = decode_payload.decode_payload( + PortNum.MAP_REPORT_APP, env.packet.decoded.payload + ) - # Establish an asynchronous database session async with mqtt_database.async_session() as session: try: - hw_model = HardwareModel.Name(map_report.hw_model) if hasattr(HardwareModel, 'Name') else "unknown" - role = Config.DeviceConfig.Role.Name(map_report.role) if hasattr(Config.DeviceConfig.Role, - 'Name') else "unknown" - node = (await session.execute(select(Node).where(Node.node_id == node_id))).scalar_one_or_none() + hw_model = ( + HardwareModel.Name(map_report.hw_model) + if hasattr(HardwareModel, "Name") + else "unknown" + ) + role = ( + Config.DeviceConfig.Role.Name(map_report.role) + if hasattr(Config.DeviceConfig.Role, "Name") + else "unknown" + ) + node = ( + await session.execute(select(Node).where(Node.node_id == node_id)) + ).scalar_one_or_none() - # Some nodes might have uplink disabled for the default channel - # and only be sending map reports, so check if it exists yet if node: node.node_id = node_id node.long_name = map_report.long_name @@ -46,26 +50,31 @@ async def process_envelope(topic, env): node.last_update = datetime.datetime.now() else: node = Node( - id=user_id, node_id=node_id, - long_name=map_report.long_name, short_name=map_report.short_name, - hw_model=hw_model, role=role, channel=env.channel_id, + id=user_id, + node_id=node_id, + long_name=map_report.long_name, + short_name=map_report.short_name, + hw_model=hw_model, + role=role, + channel=env.channel_id, firmware=map_report.firmware_version, - last_lat=map_report.latitude_i, last_long=map_report.longitude_i, + last_lat=map_report.latitude_i, + last_long=map_report.longitude_i, last_update=datetime.datetime.now(), ) session.add(node) except Exception as e: print(f"Error processing MAP_REPORT_APP: {e}") - # Commit the changes to the database await session.commit() - # This ignores any packet that does not have a ID if not env.packet.id: return async with mqtt_database.async_session() as session: - result = await session.execute(select(Packet).where(Packet.id == env.packet.id)) + result = await session.execute( + select(Packet).where(Packet.id == env.packet.id) + ) new_packet = False packet = result.scalar_one_or_none() if not packet: @@ -88,7 +97,6 @@ async def process_envelope(topic, env): PacketSeen.rx_time == env.packet.rx_time, ) ) - seen = None if not result.scalar_one_or_none(): seen = PacketSeen( packet_id=env.packet.id, @@ -106,13 +114,32 @@ async def process_envelope(topic, env): if env.packet.decoded.portnum == PortNum.NODEINFO_APP: try: - user = decode_payload.decode_payload(PortNum.NODEINFO_APP, env.packet.decoded.payload) + user = decode_payload.decode_payload( + PortNum.NODEINFO_APP, env.packet.decoded.payload + ) if user and user.id: - node_id = int(user.id[1:], 16) if user.id[0] == "!" else None - hw_model = HardwareModel.Name(user.hw_model) if user.hw_model in HardwareModel.values() else f"unknown({user.hw_model})" - role = Config.DeviceConfig.Role.Name(user.role) if hasattr(Config.DeviceConfig.Role,'Name') else "unknown" + # ✅ Safe fix: only parse hex IDs, otherwise leave None + if user.id[0] == "!" and re.fullmatch(r"[0-9a-fA-F]+", user.id[1:]): + node_id = int(user.id[1:], 16) + else: + node_id = None - node = (await session.execute(select(Node).where(Node.id == user.id))).scalar_one_or_none() + hw_model = ( + HardwareModel.Name(user.hw_model) + if user.hw_model in HardwareModel.values() + else f"unknown({user.hw_model})" + ) + role = ( + Config.DeviceConfig.Role.Name(user.role) + if hasattr(Config.DeviceConfig.Role, "Name") + else "unknown" + ) + + node = ( + await session.execute( + select(Node).where(Node.id == user.id) + ) + ).scalar_one_or_none() if node: node.node_id = node_id @@ -124,9 +151,13 @@ async def process_envelope(topic, env): node.last_update = datetime.datetime.now() else: node = Node( - id=user.id, node_id=node_id, - long_name=user.long_name, short_name=user.short_name, - hw_model=hw_model, role=role, channel=env.channel_id, + id=user.id, + node_id=node_id, + long_name=user.long_name, + short_name=user.short_name, + hw_model=hw_model, + role=role, + channel=env.channel_id, last_update=datetime.datetime.now(), ) session.add(node) @@ -138,8 +169,12 @@ async def process_envelope(topic, env): PortNum.POSITION_APP, env.packet.decoded.payload ) if position and position.latitude_i and position.longitude_i: - from_node_id = getattr(env.packet, 'from') - node = (await session.execute(select(Node).where(Node.node_id == from_node_id))).scalar_one_or_none() + from_node_id = getattr(env.packet, "from") + node = ( + await session.execute( + select(Node).where(Node.node_id == from_node_id) + ) + ).scalar_one_or_none() if node: node.last_lat = position.latitude_i node.last_long = position.longitude_i @@ -150,17 +185,21 @@ async def process_envelope(topic, env): if env.packet.decoded.want_response: packet_id = env.packet.id else: - result = await session.execute(select(Packet).where(Packet.id == env.packet.decoded.request_id)) + result = await session.execute( + select(Packet).where(Packet.id == env.packet.decoded.request_id) + ) if result.scalar_one_or_none(): packet_id = env.packet.decoded.request_id if packet_id is not None: - session.add(Traceroute( - packet_id=packet_id, - route=env.packet.decoded.payload, - done=not env.packet.decoded.want_response, - gateway_node_id=int(env.gateway_id[1:], 16), - import_time=datetime.datetime.now(), - )) + session.add( + Traceroute( + packet_id=packet_id, + route=env.packet.decoded.payload, + done=not env.packet.decoded.want_response, + gateway_node_id=int(env.gateway_id[1:], 16), + import_time=datetime.datetime.now(), + ) + ) await session.commit() if new_packet: