From 03183384c615abbc3c08b027c190a77c1a372f2c Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Fri, 21 Feb 2025 11:24:15 -0800 Subject: [PATCH 1/8] Adding screenshots --- config.ini | 2 +- main.py | 4 +++- meshview/mqtt_reader.py | 1 + meshview/templates/net.html | 2 +- meshview/web.py | 38 +------------------------------------ 5 files changed, 7 insertions(+), 40 deletions(-) diff --git a/config.ini b/config.ini index 693429b..9b4e152 100644 --- a/config.ini +++ b/config.ini @@ -6,7 +6,7 @@ acme_challenge = [mqtt] server = mqtt.bayme.sh -topics = ['msh/US/bayarea/#', 'msh/US/CA/mrymesh/#'] +topics = ["msh/US/bayarea/#"] port = 1883 username = meshdev password = large4cats diff --git a/main.py b/main.py index 7accc67..f92a106 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,7 @@ from meshview import database from meshview import store from meshview import web from meshview import http +import json async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: str, mqtt_user: str | None = None, mqtt_passwd: str | None = None): @@ -23,10 +24,11 @@ async def main(config): mqtt_user: str = config["mqtt"]["username"] if config["mqtt"]["password"] != "": mqtt_passwd: str = config["mqtt"]["password"] + mqtt_topics = json.loads(config["mqtt"]["topics"]) async with asyncio.TaskGroup() as tg: tg.create_task( - load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), config["mqtt"]["topics"], mqtt_user, mqtt_passwd) + load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) ) tg.create_task( web.run_server( diff --git a/meshview/mqtt_reader.py b/meshview/mqtt_reader.py index 5215f98..c63d3ce 100644 --- a/meshview/mqtt_reader.py +++ b/meshview/mqtt_reader.py @@ -33,6 +33,7 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa mqtt_server, port=mqtt_port , username=mqtt_user, password=mqtt_passwd , identifier=identifier, ) as client: for topic in topics: + print(topic) await client.subscribe(topic) async for msg in client.messages: try: diff --git a/meshview/templates/net.html b/meshview/templates/net.html index b16e0db..9d6fcbb 100644 --- a/meshview/templates/net.html +++ b/meshview/templates/net.html @@ -24,7 +24,7 @@
Weekly Mesh check-in. We will keep it open on every Wednesday from 5:00pm for checkins so you do not have to rush.
The message format should be (LONG NAME) - (CITY YOU ARE IN) #BayMeshNet.

-
+
{% for packet in packets %} {% include 'chat_packet.html' %} {% else %} diff --git a/meshview/web.py b/meshview/web.py index 2669546..5265195 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -1573,7 +1573,7 @@ async def net(request): raise # Let aiohttp handle HTTP exceptions properly except Exception as e: - print("Error processing chat request") + print("Error processing net request") return web.Response( text="An internal server error occurred.", status=500, @@ -1581,42 +1581,6 @@ async def net(request): ) -@routes.get("/net_events") -async def net_events(request): - chat_packet = env.get_template("net_packet.html") - - # Precompile regex for performance (case insensitive) - seq_pattern = re.compile(r"seq \d+$") - - with notify.subscribe(node_id=0xFFFFFFFF) as event: - async with sse_response(request) as resp: - while resp.is_connected(): - try: - await asyncio.wait_for(event.wait(), timeout=10) - except asyncio.TimeoutError: - continue # Timeout occurred, loop again - - if event.is_set(): - # Ensure event.packets is valid before accessing it - packets = [ - p for p in (event.packets or []) - if p.portnum == PortNum.TEXT_MESSAGE_APP - ] - event.clear() - - try: - for packet in packets: - ui_packet = Packet.from_model(packet) - if not seq_pattern.match(ui_packet.payload) and "baymeshnet" in ui_packet.payload.lower(): - await resp.send( - chat_packet.render(packet=ui_packet), - event="net_packet", - ) - except ConnectionResetError: - print("Client disconnected from SSE stream.") - return # Gracefully exit on disconnection - - @routes.get("/map") async def map(request): try: From 1bd41ae82a98eba45ecc937f7fd61cebb5e03aec Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Fri, 21 Feb 2025 11:57:01 -0800 Subject: [PATCH 2/8] Adding screenshots --- meshview/templates/map.html | 155 ++++++++++++++++++++++++++++++++++++ 1 file changed, 155 insertions(+) create mode 100644 meshview/templates/map.html diff --git a/meshview/templates/map.html b/meshview/templates/map.html new file mode 100644 index 0000000..2247f5e --- /dev/null +++ b/meshview/templates/map.html @@ -0,0 +1,155 @@ +{% extends "base.html" %} +{% block css %} + + +{% endblock %} + +{% block body %} +
+ +
+ LongFast + MediumSlow + LongFast Routers + MediumSlow Routers +
+ + + + +{% endblock %} +test From 5f97274e80e5e634d209d487ab3a1f8ee46e663c Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Wed, 5 Mar 2025 16:11:40 -0800 Subject: [PATCH 3/8] Major changes to the project - Now the database process and the web process are separate. - Added Map - Added new graphing tools --- .idea/.gitignore | 3 - .../inspectionProfiles/profiles_settings.xml | 6 + .idea/meshview-2.iml | 10 + .idea/modules.xml | 8 + config.ini | 7 +- main.py | 16 +- meshview/database.py | 39 +- meshview/models.py | 2 - meshview/mqtt_database.py | 16 + meshview/mqtt_store.py | 165 +++ meshview/store.py | 304 +---- meshview/templates/base.html | 4 +- meshview/templates/chat.html | 8 +- meshview/templates/firehose.html | 87 +- meshview/templates/map.html | 8 +- meshview/templates/node.html | 149 ++- meshview/templates/node2.html | 58 + meshview/templates/nodegraph.html | 190 +++ meshview/templates/packet.html | 3 +- meshview/templates/packet_details.html | 98 +- meshview/templates/traceroute.html | 67 + meshview/web.py | 1126 ++++++----------- requirements.txt | 57 +- startdb.py | 50 + 24 files changed, 1380 insertions(+), 1101 deletions(-) delete mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/meshview-2.iml create mode 100644 .idea/modules.xml create mode 100644 meshview/mqtt_database.py create mode 100644 meshview/mqtt_store.py create mode 100644 meshview/templates/node2.html create mode 100644 meshview/templates/nodegraph.html create mode 100644 meshview/templates/traceroute.html create mode 100644 startdb.py diff --git a/.idea/.gitignore b/.idea/.gitignore deleted file mode 100644 index 26d3352..0000000 --- a/.idea/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -# Default ignored files -/shelf/ -/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/meshview-2.iml b/.idea/meshview-2.iml new file mode 100644 index 0000000..7198901 --- /dev/null +++ b/.idea/meshview-2.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..99ffd7e --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/config.ini b/config.ini index 693429b..5f9e91f 100644 --- a/config.ini +++ b/config.ini @@ -6,10 +6,13 @@ acme_challenge = [mqtt] server = mqtt.bayme.sh -topics = ['msh/US/bayarea/#', 'msh/US/CA/mrymesh/#'] +topics = ["msh/US/bayarea/#", "msh/US/CA/mrymesh/#"] port = 1883 username = meshdev password = large4cats [database] -connection_string = sqlite+aiosqlite:///packets.db \ No newline at end of file +connection_string = sqlite+aiosqlite:///packets.db + +[website] +title = San Francisco Bay Area Mesh \ No newline at end of file diff --git a/main.py b/main.py index e256ba4..717c67a 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,16 @@ import asyncio import argparse import configparser -import json from meshview import mqtt_reader from meshview import database -from meshview import store +from meshview import mqtt_store from meshview import web -from meshview import http import json async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None): async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd): - await store.process_envelope(topic, env) + await mqtt_store.process_envelope(topic, env) async def main(config): @@ -28,9 +26,6 @@ async def main(config): mqtt_topics = json.loads(config["mqtt"]["topics"]) async with asyncio.TaskGroup() as tg: - tg.create_task( - load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) - ) tg.create_task( web.run_server( config["server"]["bind"], @@ -38,13 +33,6 @@ async def main(config): config["server"].get("tls_cert"), ) ) - if config["server"].get("acme_challenge"): - tg.create_task( - http.run_server( - config["server"]["bind"], config["server"]["acme_challenge"] - ) - ) - def load_config(file_path): """Load configuration from an INI-style text file.""" diff --git a/meshview/database.py b/meshview/database.py index a20e37b..83fc134 100644 --- a/meshview/database.py +++ b/meshview/database.py @@ -1,15 +1,36 @@ from meshview import models -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker +from sqlalchemy.ext.asyncio import async_sessionmaker +from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession -def init_database(database_connection_string): + + +engine = None +async_session = None + + +def init_database(database_connection_string, read_only=False): global engine, async_session - kwargs = {} - if not database_connection_string.startswith('sqlite'): - kwargs['pool_size'] = 20 - kwargs['max_overflow'] = 50 - print (**kwargs) - engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 15}) - async_session = async_sessionmaker(engine, expire_on_commit=False) + + kwargs = {"echo": False} + + if database_connection_string.startswith("sqlite"): + if read_only: + # Ensure SQLite is opened in read-only mode + database_connection_string += "?mode=ro" + kwargs["connect_args"] = {"uri": True} + else: + kwargs["connect_args"] = {"timeout": 15} + else: + kwargs["pool_size"] = 20 + kwargs["max_overflow"] = 50 + + print("Database connection settings:", kwargs) # Debugging output + + engine = create_async_engine(database_connection_string, **kwargs) + async_session = async_sessionmaker( bind=engine, + class_=AsyncSession, + expire_on_commit=False, +) async def create_tables(): async with engine.begin() as conn: diff --git a/meshview/models.py b/meshview/models.py index 569943c..78efa10 100644 --- a/meshview/models.py +++ b/meshview/models.py @@ -1,5 +1,4 @@ from datetime import datetime - from sqlalchemy.orm import DeclarativeBase, foreign from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import mapped_column, relationship, Mapped @@ -69,4 +68,3 @@ class Traceroute(Base): done: Mapped[bool] = mapped_column(nullable=True) route: Mapped[bytes] = mapped_column(nullable=True) import_time: Mapped[datetime] = mapped_column(nullable=True) - diff --git a/meshview/mqtt_database.py b/meshview/mqtt_database.py new file mode 100644 index 0000000..a20e37b --- /dev/null +++ b/meshview/mqtt_database.py @@ -0,0 +1,16 @@ +from meshview import models +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker + +def init_database(database_connection_string): + global engine, async_session + kwargs = {} + if not database_connection_string.startswith('sqlite'): + kwargs['pool_size'] = 20 + kwargs['max_overflow'] = 50 + print (**kwargs) + engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 15}) + async_session = async_sessionmaker(engine, expire_on_commit=False) + +async def create_tables(): + async with engine.begin() as conn: + await conn.run_sync(models.Base.metadata.create_all) diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py new file mode 100644 index 0000000..292d996 --- /dev/null +++ b/meshview/mqtt_store.py @@ -0,0 +1,165 @@ +import datetime +from sqlalchemy import select +from sqlalchemy import update +from meshtastic.protobuf.config_pb2 import Config +from meshtastic.protobuf.portnums_pb2 import PortNum +from meshtastic.protobuf.mesh_pb2 import User, HardwareModel +from meshview import mqtt_database +from meshview import decode_payload +from meshview.models import Packet, PacketSeen, Node, Traceroute + + + + +async def process_envelope(topic, env): + + # Checking if the received packet is a MAP_REPORT + # Update the node table with the firmware version + if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP: + # Extract the node ID from the packet (renamed from 'id' to 'node_id' to avoid conflicts with Python's built-in id function) + node_id = getattr(env.packet, "from") + + # Decode the MAP report payload to extract the firmware version + map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload) + + # Establish an asynchronous database session + async with mqtt_database.async_session() as session: + # Construct an SQLAlchemy update statement + stmt = ( + update(Node) + .where(Node.node_id == node_id) # Ensure correct column reference + .values(firmware=map_report.firmware_version) # Assign new firmware value + ) + + # Execute the update statement asynchronously + await session.execute(stmt) + + # Commit the changes to the database + await session.commit() + + # This ignores any packet that does not have a ID + if not env.packet.id: + return + + async with mqtt_database.async_session() as session: + result = await session.execute(select(Packet).where(Packet.id == env.packet.id)) + new_packet = False + packet = result.scalar_one_or_none() + if not packet: + new_packet = True + packet = Packet( + id=env.packet.id, + portnum=env.packet.decoded.portnum, + from_node_id=getattr(env.packet, "from"), + to_node_id=env.packet.to, + payload=env.packet.SerializeToString(), + # p.r. Here seems to be where the packet is imported on the Database and import time is set. + import_time=datetime.datetime.now(), + channel=env.channel_id, + ) + session.add(packet) + + result = await session.execute( + select(PacketSeen).where( + PacketSeen.packet_id == env.packet.id, + PacketSeen.node_id == int(env.gateway_id[1:], 16), + PacketSeen.rx_time == env.packet.rx_time, + ) + ) + seen = None + if not result.scalar_one_or_none(): + seen = PacketSeen( + packet_id=env.packet.id, + node_id=int(env.gateway_id[1:], 16), + channel=env.channel_id, + rx_time=env.packet.rx_time, + rx_snr=env.packet.rx_snr, + rx_rssi=env.packet.rx_rssi, + hop_limit=env.packet.hop_limit, + hop_start=env.packet.hop_start, + topic=topic, + # p.r. Here seems to be where the packet is imported on the Database and import time is set. + import_time=datetime.datetime.now(), + ) + session.add(seen) + + + + if env.packet.decoded.portnum == PortNum.NODEINFO_APP: + user = decode_payload.decode_payload( + PortNum.NODEINFO_APP, env.packet.decoded.payload + ) + if user: + result = await session.execute(select(Node).where(Node.id == user.id)) + if user.id and user.id[0] == "!": + try: + node_id = int(user.id[1:], 16) + except ValueError: + node_id = None + pass + else: + node_id = None + + try: + hw_model = HardwareModel.Name(user.hw_model) + except ValueError: + hw_model = "unknown" + try: + role = Config.DeviceConfig.Role.Name(user.role) + except ValueError: + role = "unknown" + + if node := result.scalar_one_or_none(): + node.node_id = node_id + node.long_name = user.long_name + node.short_name = user.short_name + node.hw_model = hw_model + node.role = role + node.last_update =datetime.datetime.now() + + else: + node = Node( + id=user.id, + node_id=node_id, + long_name=user.long_name, + short_name=user.short_name, + hw_model=hw_model, + role=role, + channel=env.channel_id, + # if need to update time of last update it may be here + ) + session.add(node) + + if env.packet.decoded.portnum == PortNum.POSITION_APP: + position = decode_payload.decode_payload( + PortNum.POSITION_APP, env.packet.decoded.payload + ) + if position and position.latitude_i and position.longitude_i: + from_node_id = getattr(env.packet, 'from') + node = (await session.execute(select(Node).where(Node.node_id == from_node_id))).scalar_one_or_none() + if node: + node.last_lat = position.latitude_i + node.last_long = position.longitude_i + session.add(node) + + if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP: + packet_id = None + if env.packet.decoded.want_response: + packet_id = env.packet.id + else: + result = await session.execute(select(Packet).where(Packet.id == env.packet.decoded.request_id)) + if result.scalar_one_or_none(): + packet_id = env.packet.decoded.request_id + if packet_id is not None: + session.add(Traceroute( + packet_id=packet_id, + route=env.packet.decoded.payload, + done=not env.packet.decoded.want_response, + gateway_node_id=int(env.gateway_id[1:], 16), + import_time=datetime.datetime.now(), + )) + + await session.commit() + if new_packet: + await packet.awaitable_attrs.to_node + await packet.awaitable_attrs.from_node diff --git a/meshview/store.py b/meshview/store.py index 3ff6118..87ef868 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -1,176 +1,8 @@ import datetime - from sqlalchemy import select, func from sqlalchemy.orm import lazyload -from sqlalchemy import update -from meshtastic.protobuf.config_pb2 import Config -from meshtastic.protobuf.portnums_pb2 import PortNum -from meshtastic.protobuf.mesh_pb2 import User, HardwareModel from meshview import database -from meshview import decode_payload from meshview.models import Packet, PacketSeen, Node, Traceroute -from meshview import notify - - - -async def process_envelope(topic, env): - - # Checking if the received packet is a MAP_REPORT - # Update the node table with the firmware version - if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP: - # Extract the node ID from the packet (renamed from 'id' to 'node_id' to avoid conflicts with Python's built-in id function) - node_id = getattr(env.packet, "from") - - # Decode the MAP report payload to extract the firmware version - map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload) - - # Establish an asynchronous database session - async with database.async_session() as session: - # Construct an SQLAlchemy update statement - stmt = ( - update(Node) - .where(Node.node_id == node_id) # Ensure correct column reference - .values(firmware=map_report.firmware_version) # Assign new firmware value - ) - - # Execute the update statement asynchronously - await session.execute(stmt) - - # Commit the changes to the database - await session.commit() - - # This ignores any packet that does not have a ID - if not env.packet.id: - return - - async with database.async_session() as session: - result = await session.execute(select(Packet).where(Packet.id == env.packet.id)) - new_packet = False - packet = result.scalar_one_or_none() - if not packet: - new_packet = True - packet = Packet( - id=env.packet.id, - portnum=env.packet.decoded.portnum, - from_node_id=getattr(env.packet, "from"), - to_node_id=env.packet.to, - payload=env.packet.SerializeToString(), - # p.r. Here seems to be where the packet is imported on the Database and import time is set. - import_time=datetime.datetime.now(), - channel=env.channel_id, - ) - session.add(packet) - - result = await session.execute( - select(PacketSeen).where( - PacketSeen.packet_id == env.packet.id, - PacketSeen.node_id == int(env.gateway_id[1:], 16), - PacketSeen.rx_time == env.packet.rx_time, - ) - ) - seen = None - if not result.scalar_one_or_none(): - seen = PacketSeen( - packet_id=env.packet.id, - node_id=int(env.gateway_id[1:], 16), - channel=env.channel_id, - rx_time=env.packet.rx_time, - rx_snr=env.packet.rx_snr, - rx_rssi=env.packet.rx_rssi, - hop_limit=env.packet.hop_limit, - hop_start=env.packet.hop_start, - topic=topic, - # p.r. Here seems to be where the packet is imported on the Database and import time is set. - import_time=datetime.datetime.now(), - ) - session.add(seen) - - - - if env.packet.decoded.portnum == PortNum.NODEINFO_APP: - user = decode_payload.decode_payload( - PortNum.NODEINFO_APP, env.packet.decoded.payload - ) - if user: - result = await session.execute(select(Node).where(Node.id == user.id)) - if user.id and user.id[0] == "!": - try: - node_id = int(user.id[1:], 16) - except ValueError: - node_id = None - pass - else: - node_id = None - - try: - hw_model = HardwareModel.Name(user.hw_model) - except ValueError: - hw_model = "unknown" - try: - role = Config.DeviceConfig.Role.Name(user.role) - except ValueError: - role = "unknown" - - if node := result.scalar_one_or_none(): - node.node_id = node_id - node.long_name = user.long_name - node.short_name = user.short_name - node.hw_model = hw_model - node.role = role - node.last_update =datetime.datetime.now() - - else: - node = Node( - id=user.id, - node_id=node_id, - long_name=user.long_name, - short_name=user.short_name, - hw_model=hw_model, - role=role, - channel=env.channel_id, - # if need to update time of last update it may be here - ) - session.add(node) - - if env.packet.decoded.portnum == PortNum.POSITION_APP: - position = decode_payload.decode_payload( - PortNum.POSITION_APP, env.packet.decoded.payload - ) - if position and position.latitude_i and position.longitude_i: - from_node_id = getattr(env.packet, 'from') - node = (await session.execute(select(Node).where(Node.node_id == from_node_id))).scalar_one_or_none() - if node: - node.last_lat = position.latitude_i - node.last_long = position.longitude_i - session.add(node) - - if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP: - packet_id = None - if env.packet.decoded.want_response: - packet_id = env.packet.id - else: - result = await session.execute(select(Packet).where(Packet.id == env.packet.decoded.request_id)) - if result.scalar_one_or_none(): - packet_id = env.packet.decoded.request_id - if packet_id is not None: - session.add(Traceroute( - packet_id=packet_id, - route=env.packet.decoded.payload, - done=not env.packet.decoded.want_response, - gateway_node_id=int(env.gateway_id[1:], 16), - import_time=datetime.datetime.now(), - )) - - await session.commit() - if new_packet: - await packet.awaitable_attrs.to_node - await packet.awaitable_attrs.from_node - notify.notify_packet(packet.to_node_id, packet) - notify.notify_packet(packet.from_node_id, packet) - notify.notify_packet(None, packet) - if seen: - notify.notify_uplinked(seen.node_id, packet) - async def get_node(node_id): async with database.async_session() as session: @@ -189,7 +21,7 @@ async def get_fuzzy_nodes(query): return result.scalars() -async def get_packets(node_id=None, portnum=None, since=None, limit=500, before=None, after=None): +async def get_packets(node_id=None, portnum=None, since=None, limit=1000, before=None, after=None): async with database.async_session() as session: q = select(Packet) @@ -209,7 +41,8 @@ async def get_packets(node_id=None, portnum=None, since=None, limit=500, before= q = q.limit(limit) result = await session.execute(q.order_by(Packet.import_time.desc())) - return result.scalars() + packets = list(result.scalars()) # Convert to list + return packets # Return the list async def get_packets_from(node_id=None, portnum=None, since=None, limit=500): @@ -300,137 +133,6 @@ async def get_mqtt_neighbors(since): ) return result -# In order to provide separate network graphs for LongFast and MediumSlow, I am duplicating the procedures. -# 3 procedures are needed. These would have to be replicated for any other network that we may need to use graphs. -# -# get_traceroutes_longfast -# get_packets_longfast -# get_mqtt_neighbors_longfast -# -# p.r. -# -# Get Traceroute for LongFast only -async def get_traceroutes_longfast(since): - async with database.async_session() as session: - result = await session.execute( - select(Traceroute) - .join(Packet) - .where( - (Traceroute.import_time > (datetime.datetime.now() - since)) - & (Packet.channel == "LongFast") - ) - .order_by(Traceroute.import_time) - ) - return result.scalars() - -# Get MQTT Neighbors for LongFast only -# p.r. -async def get_mqtt_neighbors_longfast(since): - async with database.async_session() as session: - result = await session.execute(select(PacketSeen, Packet) - .join(Packet) - .where( - (PacketSeen.hop_limit == PacketSeen.hop_start) - & (PacketSeen.hop_start != 0) - & (Packet.channel == "LongFast") - ) - - .options( - lazyload(Packet.from_node), - lazyload(Packet.to_node), - ) - ) - return result - -# Get Packets for LongFast only -# p.r. -async def get_packets_longfast(node_id=None, portnum=None, since=None, limit=500, before=None, after=None): - async with database.async_session() as session: - q = select(Packet) - - # Add condition for channel being "LongFast" - q = q.where(Packet.channel == "LongFast") - - if node_id: - q = q.where( - (Packet.from_node_id == node_id) | (Packet.to_node_id == node_id) - ) - if portnum: - q = q.where(Packet.portnum == portnum) - if since: - q = q.where(Packet.import_time > (datetime.datetime.now() - since)) - if before: - q = q.where(Packet.import_time < before) - if after: - q = q.where(Packet.import_time > after) - if limit is not None: - q = q.limit(limit) - - result = await session.execute(q.order_by(Packet.import_time.desc())) - return result.scalars() - -# Get Traceroute for mediumslow only -# p.r. -async def get_traceroutes_mediumslow(since): - async with database.async_session() as session: - result = await session.execute( - select(Traceroute) - .join(Packet) - .where( - (Traceroute.import_time > (datetime.datetime.now() - since)) - & (Packet.channel == "MediumSlow") - ) - .order_by(Traceroute.import_time) - ) - return result.scalars() - -# Get MQTT Neighbors for mediumslow only -# p.r. -async def get_mqtt_neighbors_mediumslow(since): - async with database.async_session() as session: - result = await session.execute(select(PacketSeen, Packet) - .join(Packet) - .where( - (PacketSeen.hop_limit == PacketSeen.hop_start) - & (PacketSeen.hop_start != 0) - & (Packet.channel == "MediumSlow") - ) - - .options( - lazyload(Packet.from_node), - lazyload(Packet.to_node), - ) - ) - return result - -# Get Packets for MediumSlow only -# p.r. -async def get_packets_mediumslow(node_id=None, portnum=None, since=None, limit=500, before=None, after=None): - async with database.async_session() as session: - q = select(Packet) - - # Add condition for channel being "MediumSlow" - q = q.where(Packet.channel == "MediumSlow") - - if node_id: - q = q.where( - (Packet.from_node_id == node_id) | (Packet.to_node_id == node_id) - ) - if portnum: - q = q.where(Packet.portnum == portnum) - if since: - q = q.where(Packet.import_time > (datetime.datetime.now() - since)) - if before: - q = q.where(Packet.import_time < before) - if after: - q = q.where(Packet.import_time > after) - if limit is not None: - q = q.limit(limit) - - result = await session.execute(q.order_by(Packet.import_time.desc())) - return result.scalars() - - # We count the total amount of packages # This is to be used by /stats in web.py diff --git a/meshview/templates/base.html b/meshview/templates/base.html index c21c916..54a5ac1 100644 --- a/meshview/templates/base.html +++ b/meshview/templates/base.html @@ -10,6 +10,8 @@ + + {% block head %} {% endblock %} @@ -35,7 +37,7 @@
Bay Area Mesh - http://bayme.sh
Quick Links:  Nodes - Conversations - See everything -  - Mesh Graph LF - MS  - Stats +  - Mesh Graph LF - MS  - Stats  - Weekly Net - Map

Loading... diff --git a/meshview/templates/chat.html b/meshview/templates/chat.html index 821f601..a20fd5f 100644 --- a/meshview/templates/chat.html +++ b/meshview/templates/chat.html @@ -20,7 +20,13 @@ {% block body %} -
+ + +
{% for packet in packets %} {% include 'chat_packet.html' %} {% else %} diff --git a/meshview/templates/firehose.html b/meshview/templates/firehose.html index c8da75b..c0b5e42 100644 --- a/meshview/templates/firehose.html +++ b/meshview/templates/firehose.html @@ -1,7 +1,71 @@ {% extends "base.html" %} {% block body %} -
+ + + +
{% set options = { 1: "Text Message", @@ -9,31 +73,26 @@ 4: "Node Info", 67: "Telemetry", 71: "Neighbor Info", + 70: "Trace Route", } %} - +
-
+
{% for packet in packets %} - {% include 'packet.html' %} + {% include 'packet.html' %} {% else %} - No packets found. + No packets found. {% endfor %}
-
+ {% endblock %} diff --git a/meshview/templates/map.html b/meshview/templates/map.html index cd1d003..a460cc9 100644 --- a/meshview/templates/map.html +++ b/meshview/templates/map.html @@ -93,11 +93,11 @@ let isRouter = node.role.toLowerCase().includes("router"); let markerOptions = { - radius: isRouter ? 8 : 7, - color: isRouter ? "black" : color, + radius: isRouter ? 9 : 7, + color: "white", fillColor: color, - fillOpacity: 0.6, - weight: isRouter ? 1 : 0 + fillOpacity: 1, + weight: .7, }; var popupContent = ` diff --git a/meshview/templates/node.html b/meshview/templates/node.html index 2bf8c57..a348752 100644 --- a/meshview/templates/node.html +++ b/meshview/templates/node.html @@ -1,21 +1,28 @@ {% extends "base.html" %} {% block css %} + /* Styles for the node info card */ #node_info { - height:100%; + height: 100%; } - #map{ - height:100%; + + /* Styles for the map */ + #map { + height: 100%; min-height: 400px; } - #packet_details{ + + /* Styles for packet details section */ + #packet_details { height: 95vh; overflow: scroll; top: 3em; } + + /* Ensure inline display for details */ div.tab-pane > dl { display: inline-block; - } + } {% endblock %} {% block body %} @@ -27,17 +34,18 @@ {% if node %} hx-ext="sse" sse-connect="/events?node_id={{node_id}}{% if portnum %}&portnum={{portnum}}{% endif %}" - {% endif %} - > + {% endif %} + >
+
{% if node %}
{{node.long_name}} ({{node.node_id|node_id_to_hex}})
-
+
ShortName
{{node.short_name}}
HW Model
@@ -54,45 +62,146 @@ {% endif %}
+
+
- +
- {% include 'packet_list.html' %} + {% include 'packet_list.html' %}
+
{% if trace %} + + {% endif %} {% endblock %} diff --git a/meshview/templates/node2.html b/meshview/templates/node2.html new file mode 100644 index 0000000..cf9fd61 --- /dev/null +++ b/meshview/templates/node2.html @@ -0,0 +1,58 @@ +{% extends "base.html" %} + +{% block css %} + #node_info { + height:100%; + } + #map{ + height:100%; + min-height: 400px; + } + #packet_details{ + height: 95vh; + overflow: scroll; + top: 3em; + } + div.tab-pane > dl { + display: inline-block; + } +{% endblock %} + +{% block body %} + +{% include "search_form.html" %} + +
+
+
+ {% if node %} +
+ {{node.long_name}} +
+
+
+
ShortName
+
{{node.short_name}}
+
HW Model
+
{{node.hw_model}}
+
Role
+
{{node.role}}
+
+
+ {% else %} +
+ A NodeInfo has not been seen. +
+ {% endif %} +
+
+
+
+ {% include 'packet_list.html' %} +
+
+
+
+
+
+{% endblock %} diff --git a/meshview/templates/nodegraph.html b/meshview/templates/nodegraph.html new file mode 100644 index 0000000..b088685 --- /dev/null +++ b/meshview/templates/nodegraph.html @@ -0,0 +1,190 @@ +{% extends "base.html" %} + +{% block head %} + +{% endblock %} + +{% block css %} +#mynetwork { + width: 100%; + height: 100vh; + max-width: 2000px; + max-height: 2000px; + border: 1px solid lightgray; + background-color: white; +} + +.legend { + position: absolute; + bottom: 10px; + left: 10px; + background-color: rgba(255, 255, 255, 0.8); + padding: 5px; + border-radius: 5px; + border: 1px solid #ccc; + font-size: 12px; + color: #333; +} + +#node-info { + position: absolute; + bottom: 10px; + right: 10px; + background-color: rgba(255, 255, 255, 0.9); + padding: 10px; + border-radius: 5px; + border: 1px solid #ccc; + font-size: 14px; + color: #333; + width: 250px; + max-height: 200px; + overflow-y: auto; +} +{% endblock %} + +{% block body %} +
+ + +
+
Traceroute
+
NeighborInfo
+
+ + +
+ Long Name:
+ Short Name:
+ Role:
+ Hardware Model: +
+ +< +{% endblock %} diff --git a/meshview/templates/packet.html b/meshview/templates/packet.html index a6f7ceb..a6d7cc7 100644 --- a/meshview/templates/packet.html +++ b/meshview/templates/packet.html @@ -29,8 +29,7 @@
{{packet.id}} - 🔎 - 🔗 + 🔎
diff --git a/meshview/templates/packet_details.html b/meshview/templates/packet_details.html index d273f0a..2496dc6 100644 --- a/meshview/templates/packet_details.html +++ b/meshview/templates/packet_details.html @@ -36,25 +36,103 @@ {% if map_center %} + + + {% endif %} diff --git a/meshview/templates/traceroute.html b/meshview/templates/traceroute.html new file mode 100644 index 0000000..8e94a01 --- /dev/null +++ b/meshview/templates/traceroute.html @@ -0,0 +1,67 @@ +{% extends "base.html" %} + +{% block head %} + +{% endblock %} + +{% block body %} +
+ + + +{% endblock %} diff --git a/meshview/web.py b/meshview/web.py index 5265195..3a62550 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -3,111 +3,36 @@ import io from collections import Counter from dataclasses import dataclass import datetime -from aiohttp_sse import sse_response import ssl import re import os - import pydot from pandas import DataFrame import plotly.express as px import seaborn as sns import matplotlib.pyplot as plt -import matplotlib.ticker as ticker from aiohttp import web from markupsafe import Markup from jinja2 import Environment, PackageLoader, select_autoescape from google.protobuf import text_format from google.protobuf.message import Message - from meshtastic.protobuf.portnums_pb2 import PortNum from meshview import store from meshview import models from meshview import decode_payload -from meshview import notify - - -with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png: - empty_png = png.read() +import gc +import psutil env = Environment(loader=PackageLoader("meshview"), autoescape=select_autoescape()) +# Optimize garbage collection frequency +gc.set_threshold(100, 10, 10) -async def build_trace(node_id): - trace = [] - for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)): - p = Packet.from_model(raw_p) - if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i: - continue - trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7)) - if not trace: - for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP): - p = Packet.from_model(raw_p) - if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i: - continue - trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7)) - break - return trace +with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png: + empty_png = png.read() -async def build_neighbors(node_id): - packet = (await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1)).first() - if not packet: - return [] - if packet.import_time < datetime.datetime.utcnow() - datetime.timedelta(days=1): - return [] - _, payload = decode_payload.decode(packet) - neighbors = [] - results = {} - async with asyncio.TaskGroup() as tg: - for n in payload.neighbors: - results[n.node_id] = { - 'node_id': n.node_id, - 'snr': n.snr, - } - neighbors.append((n.node_id, tg.create_task(store.get_node(n.node_id)))) - - for node_id, node in neighbors: - node = await node - if node and node.last_lat and node.last_long: - results[node_id]['short_name'] = node.short_name - results[node_id]['long_name'] = node.long_name - results[node_id]['location'] = (node.last_lat * 1e-7, node.last_long * 1e-7) - else: - del results[node_id] - - return list(results.values()) - - -def node_id_to_hex(node_id): - if node_id == 4294967295: - return "^all" - else: - return f"!{hex(node_id)[2:]}" - - -def format_timestamp(timestamp): - if isinstance(timestamp, int): - timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc) - return timestamp.isoformat(timespec="milliseconds") - - -env.filters["node_id_to_hex"] = node_id_to_hex -env.filters["format_timestamp"] = format_timestamp - - -routes = web.RouteTableDef() - - -@routes.get("/") -async def index(request): - template = env.get_template("index.html") - return web.Response( - text=template.render(is_hx_request="HX-Request" in request.headers, node=None), - content_type="text/html", - ) - @dataclass class Packet: @@ -124,6 +49,7 @@ class Packet: pretty_payload: Markup import_time: datetime.datetime + @classmethod def from_model(cls, packet): mesh_packet, payload = decode_payload.decode(packet) @@ -173,6 +99,17 @@ class Packet: raw_payload=payload, ) +@dataclass +class UplinkedNode: + lat: float + long: float + long_name: str + short_name: str + hops: int + snr: float + rssi: float + + def generate_responce(request, body, raw_node_id="", node=None): if "HX-Request" in request.headers: @@ -190,16 +127,104 @@ def generate_responce(request, body, raw_node_id="", node=None): ) +async def build_trace(node_id): + trace = [] + for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)): + p = Packet.from_model(raw_p) + if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i: + continue + trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7)) + + if not trace: + for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP): + p = Packet.from_model(raw_p) + if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i: + continue + trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7)) + break + + gc.collect() # Force garbage collection + return trace + + +async def build_neighbors(node_id): + packets = await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1) + packet = packets.first() + print(packet) + if not packet: + return [] + + _, payload = decode_payload.decode(packet) + neighbors = {} + + # Gather node information asynchronously + tasks = {n.node_id: store.get_node(n.node_id) for n in payload.neighbors} + results = await asyncio.gather(*tasks.values(), return_exceptions=True) + + for neighbor, node in zip(payload.neighbors, results): + if isinstance(node, Exception): + continue + if node and node.last_lat and node.last_long: + neighbors[neighbor.node_id] = { + 'node_id': neighbor.node_id, + 'snr': neighbor.snr, # Fix dictionary keying issue + 'short_name': node.short_name, + 'long_name': node.long_name, + 'location': (node.last_lat * 1e-7, node.last_long * 1e-7), + } + + return list(neighbors.values()) # Return a list of dictionaries + + + +def node_id_to_hex(node_id): + if node_id == 4294967295: + return "^all" + else: + return f"!{hex(node_id)[2:]}" + + +def format_timestamp(timestamp): + if isinstance(timestamp, int): + timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc) + return timestamp.isoformat(timespec="milliseconds") + + +env.filters["node_id_to_hex"] = node_id_to_hex +env.filters["format_timestamp"] = format_timestamp + +routes = web.RouteTableDef() +@routes.get("/") +async def index(request): + raise web.HTTPFound(location="/map") # Redirect to /home + + +def generate_response(request, body, raw_node_id="", node=None): + if "HX-Request" in request.headers: + return web.Response(text=body, content_type="text/html") + + template = env.get_template("index.html") + response = web.Response( + text=template.render( + is_hx_request="HX-Request" in request.headers, + raw_node_id=raw_node_id, + node_html=Markup(body), + node=node, + ), + content_type="text/html", + ) + gc.collect() + return response + + @routes.get("/node_search") async def node_search(request): if not "q" in request.query or not request.query["q"]: return web.Response(text="Bad node id") - portnum = request.query.get("portnum") - if portnum: - portnum = int(portnum) - raw_node_id = request.query["q"] + raw_node_id = request.query["q"] node_id = None + if raw_node_id == "^all": node_id = 0xFFFFFFFF elif raw_node_id[0] == "!": @@ -225,13 +250,12 @@ async def node_search(request): ) template = env.get_template("search.html") - return web.Response( - text=template.render( - nodes=fuzzy_nodes, - query_string=request.query_string, - ), + response = web.Response( + text=template.render(nodes=fuzzy_nodes, query_string=request.query_string), content_type="text/html", ) + gc.collect() + return response @routes.get("/node_match") @@ -252,26 +276,6 @@ async def node_match(request): @routes.get("/packet_list/{node_id}") async def packet_list(request): - node_id = int(request.match_info["node_id"]) - if portnum := request.query.get("portnum"): - portnum = int(portnum) - else: - portnum = None - return await _packet_list(request, store.get_packets(node_id, portnum), 'packet') - - -@routes.get("/uplinked_list/{node_id}") -async def uplinked_list(request): - node_id = int(request.match_info["node_id"]) - if portnum := request.query.get("portnum"): - portnum = int(portnum) - else: - portnum = None - return await _packet_list(request, store.get_uplinked_packets(node_id, portnum), 'uplinked') - -# does this needs a web.response ? - -async def _packet_list(request, raw_packets, packet_event): node_id = int(request.match_info["node_id"]) if portnum := request.query.get("portnum"): portnum = int(portnum) @@ -279,14 +283,13 @@ async def _packet_list(request, raw_packets, packet_event): portnum = None async with asyncio.TaskGroup() as tg: - raw_packets = tg.create_task(raw_packets) node = tg.create_task(store.get_node(node_id)) + raw_packets = tg.create_task(store.get_packets(node_id,portnum, limit=200)) trace = tg.create_task(build_trace(node_id)) - neighbors = tg.create_task(build_neighbors(node_id)) + neighbors = await tg.create_task(build_neighbors(node_id)) has_telemetry = tg.create_task(store.has_packets(node_id, PortNum.TELEMETRY_APP)) - packets = (Packet.from_model(p) for p in await raw_packets) - + packets = [Packet.from_model(p) for p in await raw_packets] # Convert generator to a list template = env.get_template("node.html") return web.Response( text=template.render( @@ -295,9 +298,8 @@ async def _packet_list(request, raw_packets, packet_event): node=await node, portnum=portnum, packets=packets, - packet_event=packet_event, trace=await trace, - neighbors=await neighbors, + neighbors=neighbors, has_telemetry=await has_telemetry, query_string=request.query_string, ), @@ -305,152 +307,6 @@ async def _packet_list(request, raw_packets, packet_event): ) -@routes.get("/chat_events") -async def chat_events(request): - - chat_packet = env.get_template("chat_packet.html") - - # Precompile regex for filtering out unwanted messages (case insensitive) - seq_pattern = re.compile(r"seq \d+$", re.IGNORECASE) - - # Subscribe to notifications for packets from all nodes (0xFFFFFFFF = broadcast) - with notify.subscribe(node_id=0xFFFFFFFF) as event: - async with sse_response(request) as resp: - while resp.is_connected(): # Keep the connection open while the client is connected - try: - # Wait for an event with a timeout of 10 seconds - await asyncio.wait_for(event.wait(), timeout=10) - except asyncio.TimeoutError: - # Timeout reached, continue looping to keep connection alive - continue - - if event.is_set(): - # Extract relevant packets, ensuring event.packets is not None - packets = [ - p for p in (event.packets or []) - if p.portnum == PortNum.TEXT_MESSAGE_APP - ] - event.clear() # Reset event flag - - try: - for packet in packets: - ui_packet = Packet.from_model(packet) - - # Filter out packets that match "seq " - if not seq_pattern.match(ui_packet.payload): - await resp.send( - chat_packet.render(packet=ui_packet), - event="chat_packet", # SSE event type - ) - except ConnectionResetError: - # Log when a client disconnects unexpectedly - print("Client disconnected from SSE stream.") - return # Exit the loop and close the connection - - -@routes.get("/events") -async def events(request): - """ - Server-Sent Events (SSE) endpoint for real-time packet updates. - - This endpoint listens for new network packets and streams them to connected clients. - Clients can optionally filter packets based on `node_id` and `portnum` query parameters. - - Query Parameters: - - node_id (int, optional): Filter packets for a specific node (default: all nodes). - - portnum (int, optional): Filter packets for a specific port number (default: all ports). - - Args: - request (aiohttp.web.Request): The incoming HTTP request. - - Returns: - aiohttp.web.StreamResponse: SSE response streaming network events. - """ - # Extract and convert query parameters (if provided) - node_id = request.query.get("node_id") - if node_id: - node_id = int(node_id) # Convert node_id to an integer - - portnum = request.query.get("portnum") - if portnum: - portnum = int(portnum) # Convert portnum to an integer - - # Load Jinja2 templates for rendering packets - packet_template = env.get_template("packet.html") - net_packet_template = env.get_template("net_packet.html") - - # Subscribe to packet notifications for the given node_id (or all nodes if None) - with notify.subscribe(node_id) as event: - async with sse_response(request) as resp: - while resp.is_connected(): # Keep connection open while client is connected - try: - # Wait for an event with a timeout of 10 seconds - await asyncio.wait_for(event.wait(), timeout=10) - except asyncio.TimeoutError: - continue # No new packets, continue waiting - - if event.is_set(): - # Extract relevant packets based on `portnum` filter (if provided) - packets = [ - p for p in (event.packets or []) - if portnum is None or portnum == p.portnum - ] - - # Extract uplinked packets (if port filter applies) - uplinked = [ - u for u in (event.uplinked or []) - if portnum is None or portnum == u.portnum - ] - - event.clear() # Reset event flag - - try: - # Process and send incoming packets - for packet in packets: - ui_packet = Packet.from_model(packet) - - # Send standard packet event - await resp.send( - packet_template.render( - is_hx_request="HX-Request" in request.headers, - node_id=node_id, - packet=ui_packet, - ), - event="packet", - ) - - # If the packet belongs to `PortNum.TEXT_MESSAGE_APP` and contains "#baymeshnet", - # send it as a network event - if ui_packet.portnum == PortNum.TEXT_MESSAGE_APP and '#baymeshnet' in ui_packet.payload.lower(): - await resp.send( - net_packet_template.render(packet=ui_packet), - event="net_packet", - ) - - # Process and send uplinked packets separately - for packet in uplinked: - await resp.send( - packet_template.render( - is_hx_request="HX-Request" in request.headers, - node_id=node_id, - packet=Packet.from_model(packet), - ), - event="uplinked", - ) - - except ConnectionResetError: - print("Client disconnected from SSE stream.") - return # Gracefully exit on disconnection - -@dataclass -class UplinkedNode: - lat: float - long: float - long_name: str - short_name: str - hops: int - snr: float - rssi: float # Updated code p.r. @routes.get("/packet_details/{packet_id}") @@ -458,6 +314,7 @@ async def packet_details(request): packet_id = int(request.match_info["packet_id"]) packets_seen = list(await store.get_packets_seen(packet_id)) packet = await store.get_packet(packet_id) + node= await store.get_node(packet.from_node_id) from_node_cord = None if packet and packet.from_node and packet.from_node.last_lat: @@ -499,6 +356,7 @@ async def packet_details(request): map_center=map_center, from_node_cord=from_node_cord, uplinked_nodes=uplinked_nodes, + node=node, ), content_type="text/html", ) @@ -509,7 +367,8 @@ async def packet_details(request): portnum = request.query.get("portnum") if portnum: portnum = int(portnum) - packets = await store.get_packets(portnum=portnum, limit=50) + packets = await store.get_packets(portnum=portnum, limit=20) + print_memory_usage() template = env.get_template("firehose.html") return web.Response( text=template.render( @@ -519,59 +378,23 @@ async def packet_details(request): content_type="text/html", ) -@routes.get("/chat") -async def chat(request): - try: - # Fetch packets for the given node ID and port number - #print("Fetching packets...") - packets = await store.get_packets( - node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=100 - ) - #print(f"Fetched {len(packets)} packets.") - - # Convert packets to UI packets - #print("Processing packets...") - ui_packets = [Packet.from_model(p) for p in packets] - - # Filter packets - #print("Filtering packets...") - filtered_packets = [ - p for p in ui_packets if not re.match(r"seq \d+$", p.payload) - ] - - # Render template - #print("Rendering template...") - template = env.get_template("chat.html") - return web.Response( - text=template.render(packets=filtered_packets), - content_type="text/html", - ) - - except Exception as e: - # Log the error and return an appropriate response - #print(f"Error in chat handler: {e}") - return web.Response( - text="An error occurred while processing your request.", - status=500, - content_type="text/plain", - ) - - - @routes.get("/packet/{packet_id}") async def packet(request): packet = await store.get_packet(int(request.match_info["packet_id"])) if not packet: - return web.Response( - status=404, - ) + return web.Response(status=404) + + node = await store.get_node(packet.from_node_id) + print_memory_usage() template = env.get_template("packet_index.html") + return web.Response( - text=template.render(packet=Packet.from_model(packet)), + text=template.render(packet=Packet.from_model(packet), node=node), content_type="text/html", ) + async def graph_telemetry(node_id, payload_type, graph_config): data = {'date': []} fields = [] @@ -626,10 +449,6 @@ async def graph_telemetry(node_id, payload_type, graph_config): if 'palette' in ax_config: args['palette'] = ax_config['palette'] sns.lineplot(data=ax_df, ax=ax, **args) - if i: - sns.move_legend(ax, "upper right") - else: - sns.move_legend(ax, "upper left") png = io.BytesIO() plt.savefig(png, dpi=100) @@ -641,6 +460,7 @@ async def graph_telemetry(node_id, payload_type, graph_config): ) + @routes.get("/graph/power/{node_id}") async def graph_power(request): return await graph_telemetry( @@ -814,7 +634,7 @@ async def graph_neighbors(request): png = io.BytesIO() plt.savefig(png, dpi=100) plt.close() - + print_memory_usage() return web.Response( body=png.getvalue(), content_type="image/png", @@ -856,12 +676,12 @@ async def graph_neighbors2(request): df = DataFrame(data) fig = px.line(df, x="time", y="snr", color="node_name", markers=True) html = fig.to_html(full_html=True, include_plotlyjs='cdn') + print_memory_usage() return web.Response( text=html, content_type="text/html", ) - @routes.get("/graph/traceroute/{packet_id}") async def graph_traceroute(request): packet_id = int(request.match_info['packet_id']) @@ -964,6 +784,134 @@ async def graph_traceroute(request): content_type="image/svg+xml", ) + + + +@routes.get("/graph/traceroute2/{packet_id}") +async def graph_traceroute2(request): + packet_id = int(request.match_info['packet_id']) + traceroutes = list(await store.get_traceroute(packet_id)) + + # Fetch the packet + packet = await store.get_packet(packet_id) + if not packet: + return web.Response(status=404) + + node_ids = set() + for tr in traceroutes: + route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) + node_ids.add(tr.gateway_node_id) + for node_id in route.route: + node_ids.add(node_id) + node_ids.add(packet.from_node_id) + node_ids.add(packet.to_node_id) + + nodes = {} + async with asyncio.TaskGroup() as tg: + for node_id in node_ids: + nodes[node_id] = tg.create_task(store.get_node(node_id)) + + # Initialize graph for traceroute + graph = pydot.Dot('traceroute', graph_type="digraph") + + paths = set() + node_color = {} + mqtt_nodes = set() + saw_reply = set() + dest = None + node_seen_time = {} + for tr in traceroutes: + if tr.done: + saw_reply.add(tr.gateway_node_id) + if tr.done and dest: + continue + route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) + path = [packet.from_node_id] + path.extend(route.route) + if tr.done: + dest = packet.to_node_id + path.append(packet.to_node_id) + elif path[-1] != tr.gateway_node_id: + path.append(tr.gateway_node_id) + + if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time: + node_seen_time[path[-1]] = tr.import_time + + mqtt_nodes.add(tr.gateway_node_id) + node_color[path[-1]] = '#' + hex(hash(tuple(path)))[3:9] + paths.add(tuple(path)) + + used_nodes = set() + for path in paths: + used_nodes.update(path) + + import_times = [tr.import_time for tr in traceroutes if tr.import_time] + if import_times: + first_time = min(import_times) + else: + first_time = 0 + + # Prepare data for ECharts rendering + chart_nodes = [] + chart_edges = [] + for node_id in used_nodes: + node = await nodes[node_id] + if not node: + # Handle case where node is None + node_name = node_id_to_hex(node_id) + chart_nodes.append({ + "name": str(node_id), + "value": node_name, + "symbol": 'rect', + }) + else: + node_name = f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}' + if node_id in node_seen_time: + ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000 + node_name += f'\n {ms:.2f}ms' + style = 'dashed' + if node_id == dest: + style = 'filled' + elif node_id in mqtt_nodes: + style = 'solid' + + if node_id in saw_reply: + style += ', diagonals' + + chart_nodes.append({ + "name": str(node_id), + "value": node_name, + "symbol": 'rect', + "long_name": node.long_name, + "short_name": node.short_name, + "role": node.role, + "hw_model": node.hw_model, + }) + + # Create edges + for path in paths: + color = '#' + hex(hash(tuple(path)))[3:9] + for src, dest in zip(path, path[1:]): + chart_edges.append({ + "source": str(src), + "target": str(dest), + "originalColor": color, + }) + + chart_data = { + "nodes": chart_nodes, + "edges": chart_edges, + } + + template = env.get_template("traceroute.html") + # Render the page with the chart data + return web.Response( + text=template.render(chart_data=chart_data, packet_id=packet_id), + content_type="text/html", + ) + + + @routes.get("/graph/network") async def graph_network(request): root = request.query.get("root") @@ -1118,404 +1066,13 @@ async def graph_network(request): penwidth=1.85, dir=edge_dir, )) - + print_memory_usage() return web.Response( body=graph.create_svg(), content_type="image/svg+xml", ) -@routes.get("/stats") -async def stats(request): - try: - # Add logging to track execution - total_packets = await store.get_total_packet_count() - total_nodes = await store.get_total_node_count() - total_packets_seen = await store.get_total_packet_seen_count() - total_nodes_longfast = await store.get_total_node_count_longfast() - total_nodes_mediumslow = await store.get_total_node_count_mediumslow() - - # Render template - #print("Rendering template...") - template = env.get_template("stats.html") - return web.Response( - text=template.render( - total_packets=total_packets, - total_nodes=total_nodes, - total_packets_seen=total_packets_seen, - total_nodes_longfast=total_nodes_longfast, - total_nodes_mediumslow=total_nodes_mediumslow, - ), - content_type="text/html", - ) - except Exception as e: - # Log and return error response - #print(f"Error in stats handler: {e}") - return web.Response( - text="An error occurred while processing your request.", - status=500, - content_type="text/plain", - ) - - -@routes.get("/graph/longfast") -async def graph_network_longfast(request): - try: - root = request.query.get("root") - depth = int(request.query.get("depth", 5)) - hours = int(request.query.get("hours", 24)) - minutes = int(request.query.get("minutes", 0)) - - # Validation - if root and not root.isdigit(): - return web.Response(status=400, text="Invalid root node ID.") - if depth < 1: - return web.Response(status=400, text="Depth must be at least 1.") - - since = datetime.timedelta(hours=hours, minutes=minutes) - - nodes = {} - node_ids = set() - traceroutes = [] - - # Fetch traceroutes - for tr in await store.get_traceroutes_longfast(since): - node_ids.add(tr.gateway_node_id) - node_ids.add(tr.packet.from_node_id) - node_ids.add(tr.packet.to_node_id) - route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) - node_ids.update(route.route) - - path = [tr.packet.from_node_id] - path.extend(route.route) - if tr.done: - path.append(tr.packet.to_node_id) - else: - if path[-1] != tr.gateway_node_id: - path.append(tr.gateway_node_id) - traceroutes.append((tr, path)) - - edges = Counter() - edge_type = {} - used_nodes = set() - - # Fetch MQTT neighbors - for ps, p in await store.get_mqtt_neighbors_longfast(since): - node_ids.add(ps.node_id) - node_ids.add(p.from_node_id) - used_nodes.add(ps.node_id) - used_nodes.add(p.from_node_id) - edges[(p.from_node_id, ps.node_id)] += 1 - edge_type[(p.from_node_id, ps.node_id)] = 'sni' - - # Fetch NeighborInfo packets - for packet in await store.get_packets_longfast(portnum=PortNum.NEIGHBORINFO_APP, since=since): - try: - _, neighbor_info = decode_payload.decode(packet) - node_ids.add(packet.from_node_id) - used_nodes.add(packet.from_node_id) - for node in neighbor_info.neighbors: - node_ids.add(node.node_id) - used_nodes.add(node.node_id) - edges[(node.node_id, packet.from_node_id)] += 1 - edge_type[(node.node_id, packet.from_node_id)] = 'ni' - except Exception as e: - print(f"Error decoding NeighborInfo packet: {e}") - - # Retrieve node details concurrently - async with asyncio.TaskGroup() as tg: - for node_id in node_ids: - nodes[node_id] = tg.create_task(store.get_node(node_id)) - - tr_done = set() - for tr, path in traceroutes: - if tr.done: - if tr.packet_id in tr_done: - continue - else: - tr_done.add(tr.packet_id) - - for src, dest in zip(path, path[1:]): - used_nodes.add(src) - used_nodes.add(dest) - edges[(src, dest)] += 1 - edge_type[(src, dest)] = 'tr' - - # Helper to get node name - async def get_node_name(node_id): - try: - node = await nodes[node_id] - if node: - return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}' - return node_id_to_hex(node_id) - except Exception as e: - return f"Error retrieving node: {str(e)}" - - # Filter nodes and edges if root is specified - if root: - new_used_nodes = set() - new_edges = Counter() - edge_map = {} - for src, dest in edges: - edge_map.setdefault(dest, []).append(src) - - queue = [int(root)] - for _ in range(depth): - next_queue = [] - for node in queue: - new_used_nodes.add(node) - for dest in edge_map.get(node, []): - new_used_nodes.add(dest) - new_edges[(dest, node)] += 1 - next_queue.append(dest) - queue = next_queue - - used_nodes = new_used_nodes - edges = new_edges - - # Create graph - graph = pydot.Dot('network', graph_type="digraph", layout="sfdp", overlap="scale", model='subset', splines="true") - for node_id in used_nodes: - node = await nodes[node_id] - color = '#000000' - node_name = await get_node_name(node_id) - if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'): - color = '#0000FF' - #elif node and node.role == 'CLIENT_MUTE': - # color = '#00FF00' - graph.add_node(pydot.Node( - str(node_id), - label=node_name, - shape='box', - color=color, - fontsize="10", width="0", height="0", - href=f"/graph/network?root={node_id}&depth={depth-1}", - )) - - # Adjust edge visualization - if edges: - max_edge_count = edges.most_common(1)[0][1] - else: - max_edge_count = 1 - - size_ratio = 2. / max_edge_count - edge_added = set() - - for (src, dest), edge_count in edges.items(): - size = max(size_ratio * edge_count, .25) - arrowsize = max(size_ratio * edge_count, .5) - if edge_type[(src, dest)] in ('ni'): - color = '#FF0000' - elif edge_type[(src, dest)] in ('sni'): - color = '#040fb3' - else: - color = '#000000' - edge_dir = "forward" - if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]: - edge_dir = "both" - edge_added.add((dest, src)) - - if (src, dest) not in edge_added: - edge_added.add((src, dest)) - graph.add_edge(pydot.Edge( - str(src), - str(dest), - color=color, - tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}', - penwidth=.5, - dir=edge_dir, - arrowsize=".5", - )) - - return web.Response( - body=graph.create_svg(), - content_type="image/svg+xml", - ) - - except Exception as e: - print(f"Error in graph_network_longfast: {e}") - return web.Response(status=500, text="Internal Server Error") - - - -@routes.get("/graph/mediumslow") -async def graph_network_mediumslow(request): - try: - root = request.query.get("root") - depth = int(request.query.get("depth", 3)) - hours = int(request.query.get("hours", 24)) - minutes = int(request.query.get("minutes", 0)) - - # Validation - if root and not root.isdigit(): - return web.Response(status=400, text="Invalid root node ID.") - if depth < 1: - return web.Response(status=400, text="Depth must be at least 1.") - - since = datetime.timedelta(hours=hours, minutes=minutes) - - nodes = {} - node_ids = set() - traceroutes = [] - - # Fetch traceroutes - for tr in await store.get_traceroutes_mediumslow(since): - node_ids.add(tr.gateway_node_id) - node_ids.add(tr.packet.from_node_id) - node_ids.add(tr.packet.to_node_id) - route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) - node_ids.update(route.route) - - path = [tr.packet.from_node_id] - path.extend(route.route) - if tr.done: - path.append(tr.packet.to_node_id) - else: - if path[-1] != tr.gateway_node_id: - path.append(tr.gateway_node_id) - traceroutes.append((tr, path)) - - edges = Counter() - edge_type = {} - used_nodes = set() - - # Fetch MQTT neighbors - for ps, p in await store.get_mqtt_neighbors_mediumslow(since): - node_ids.add(ps.node_id) - node_ids.add(p.from_node_id) - used_nodes.add(ps.node_id) - used_nodes.add(p.from_node_id) - edges[(p.from_node_id, ps.node_id)] += 1 - edge_type[(p.from_node_id, ps.node_id)] = 'sni' - - # Fetch NeighborInfo packets - for packet in await store.get_packets_mediumslow(portnum=PortNum.NEIGHBORINFO_APP, since=since): - try: - _, neighbor_info = decode_payload.decode(packet) - node_ids.add(packet.from_node_id) - used_nodes.add(packet.from_node_id) - for node in neighbor_info.neighbors: - node_ids.add(node.node_id) - used_nodes.add(node.node_id) - edges[(node.node_id, packet.from_node_id)] += 1 - edge_type[(node.node_id, packet.from_node_id)] = 'ni' - except Exception as e: - print(f"Error decoding NeighborInfo packet: {e}") - - # Retrieve node details concurrently - async with asyncio.TaskGroup() as tg: - for node_id in node_ids: - nodes[node_id] = tg.create_task(store.get_node(node_id)) - - tr_done = set() - for tr, path in traceroutes: - if tr.done: - if tr.packet_id in tr_done: - continue - else: - tr_done.add(tr.packet_id) - - for src, dest in zip(path, path[1:]): - used_nodes.add(src) - used_nodes.add(dest) - edges[(src, dest)] += 1 - edge_type[(src, dest)] = 'tr' - - # Helper to get node name - async def get_node_name(node_id): - try: - node = await nodes[node_id] - if node: - return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}' - return node_id_to_hex(node_id) - except Exception as e: - return f"Error retrieving node: {str(e)}" - - # Filter nodes and edges if root is specified - if root: - new_used_nodes = set() - new_edges = Counter() - edge_map = {} - for src, dest in edges: - edge_map.setdefault(dest, []).append(src) - - queue = [int(root)] - for _ in range(depth): - next_queue = [] - for node in queue: - new_used_nodes.add(node) - for dest in edge_map.get(node, []): - new_used_nodes.add(dest) - new_edges[(dest, node)] += 1 - next_queue.append(dest) - queue = next_queue - - used_nodes = new_used_nodes - edges = new_edges - - # Create graph - graph = pydot.Dot('network', graph_type="digraph", layout="sfdp", overlap="scale", model='subset', esep="+5", splines="true", nodesep="2", ranksep="2") - - for node_id in used_nodes: - node = await nodes[node_id] - color = '#000000' - node_name = await get_node_name(node_id) - if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'): - color = '#0000FF' - elif node and node.role == 'CLIENT_MUTE': - color = '#00FF00' - graph.add_node(pydot.Node( - str(node_id), - label=node_name, - shape='box', - color=color, - fontsize="10", width="0", height="0", - href=f"/graph/mediumslow?root={node_id}&depth={depth-1}", - )) - - # Adjust edge visualization - if edges: - max_edge_count = edges.most_common(1)[0][1] - else: - max_edge_count = 1 - - size_ratio = 2. / max_edge_count - edge_added = set() - - for (src, dest), edge_count in edges.items(): - size = max(size_ratio * edge_count, .25) - arrowsize = max(size_ratio * edge_count, .5) - if edge_type[(src, dest)] in ('ni'): - color = '#FF0000' - elif edge_type[(src, dest)] in ('sni'): - color = '#040fb3' - else: - color = '#000000' - edge_dir = "forward" - if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]: - edge_dir = "both" - edge_added.add((dest, src)) - - if (src, dest) not in edge_added: - edge_added.add((src, dest)) - graph.add_edge(pydot.Edge( - str(src), - str(dest), - color=color, - tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}', - penwidth=.5, - dir=edge_dir, - arrowsize=".5", - )) - - return web.Response( - body=graph.create_svg(), - content_type="image/svg+xml", - ) - - except Exception as e: - print(f"Error in graph_network_longfast: {e}") - return web.Response(status=500, text="Internal Server Error") @routes.get("/nodelist") async def nodelist(request): @@ -1528,7 +1085,7 @@ async def nodelist(request): #print(hw_model) nodes= await store.get_nodes(role,channel, hw_model) template = env.get_template("nodelist.html") - + print_memory_usage() return web.Response( text=template.render(nodes=nodes), content_type="text/html", @@ -1545,6 +1102,7 @@ async def nodelist(request): @routes.get("/net") async def net(request): try: + print_memory_usage() # Fetch packets for the given node ID and port number packets = await store.get_packets( node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=200 @@ -1586,6 +1144,7 @@ async def map(request): try: nodes= await store.get_nodes() template = env.get_template("map.html") + print_memory_usage() return web.Response( text=template.render(nodes=nodes), content_type="text/html", @@ -1598,7 +1157,161 @@ async def map(request): content_type="text/plain", ) + +# Print memory usage +def print_memory_usage(): + process = psutil.Process(os.getpid()) + print(f"Memory Usage: {process.memory_info().rss / (1024 * 1024):.2f} MB") + +@routes.get("/stats") +async def stats(request): + try: + total_packets = await store.get_total_packet_count() + total_nodes = await store.get_total_node_count() + total_packets_seen = await store.get_total_packet_seen_count() + total_nodes_longfast = await store.get_total_node_count_longfast() + total_nodes_mediumslow = await store.get_total_node_count_mediumslow() + print_memory_usage() + template = env.get_template("stats.html") + return web.Response( + text=template.render( + total_packets=total_packets, + total_nodes=total_nodes, + total_packets_seen=total_packets_seen, + total_nodes_longfast=total_nodes_longfast, + total_nodes_mediumslow=total_nodes_mediumslow, + ), + content_type="text/html", + ) + except Exception as e: + return web.Response( + text="An error occurred while processing your request.", + status=500, + content_type="text/plain", + ) + + +@routes.get("/chat") +async def chat(request): + try: + # Fetch packets for the given node ID and port number + packets = await store.get_packets( + node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=100 + ) + #print(f"Fetched {len(packets)} packets.") + + # Convert packets to UI packets + #print("Processing packets...") + ui_packets = [Packet.from_model(p) for p in packets] + + # Filter packets + #print("Filtering packets...") + filtered_packets = [ + p for p in ui_packets if not re.match(r"seq \d+$", p.payload) + ] + + # Render template + #print("Rendering template...") + template = env.get_template("chat.html") + return web.Response( + text=template.render(packets=filtered_packets), + content_type="text/html", + ) + + except Exception as e: + # Log the error and return an appropriate response + #print(f"Error in chat handler: {e}") + return web.Response( + text="An error occurred while processing your request.", + status=500, + content_type="text/plain", + ) + + +# Assuming the route URL structure is /nodegraph/{channel} +@routes.get("/nodegraph/{channel}") +async def nodegraph(request): + channel = request.match_info.get('channel', 'LongFast') # Default to 'MediumSlow' if no channel is provided + nodes = await store.get_nodes(channel=channel) # Fetch nodes for the given channel + node_ids = set() + edges_set = set() # Track unique edges + edge_type = {} # Store type of each edge + used_nodes = set() # This will track nodes involved in edges (including traceroutes) + since = datetime.timedelta(hours=48) + traceroutes = [] + + # Fetch traceroutes + for tr in await store.get_traceroutes(since): + node_ids.add(tr.gateway_node_id) + node_ids.add(tr.packet.from_node_id) + node_ids.add(tr.packet.to_node_id) + route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route) + node_ids.update(route.route) + + path = [tr.packet.from_node_id] + path.extend(route.route) + if tr.done: + path.append(tr.packet.to_node_id) + else: + if path[-1] != tr.gateway_node_id: + path.append(tr.gateway_node_id) + traceroutes.append((tr, path)) + + # Add traceroute edges with their type and update used_nodes + for i in range(len(path) - 1): + edge_pair = (path[i], path[i + 1]) + edges_set.add(edge_pair) + edge_type[edge_pair] = "traceroute" + used_nodes.add(path[i]) # Add all nodes in the traceroute path + used_nodes.add(path[i + 1]) # Add all nodes in the traceroute path + + # Fetch NeighborInfo packets + for packet in await store.get_packets(portnum=PortNum.NEIGHBORINFO_APP, since=since): + try: + _, neighbor_info = decode_payload.decode(packet) + node_ids.add(packet.from_node_id) + used_nodes.add(packet.from_node_id) + for node in neighbor_info.neighbors: + node_ids.add(node.node_id) + used_nodes.add(node.node_id) + + edge_pair = (node.node_id, packet.from_node_id) + if edge_pair not in edges_set: + edges_set.add(edge_pair) + edge_type[edge_pair] = "neighbor" + except Exception as e: + print(f"Error decoding NeighborInfo packet: {e}") + + # Convert edges_set to a list of dicts with colors + edges = [ + { + "from": frm, + "to": to, + "originalColor": "#ff5733" if edge_type[(frm, to)] == "traceroute" else "#3388ff", # Red for traceroute, Blue for neighbor + "lineStyle": { + "color": "#ff5733" if edge_type[(frm, to)] == "traceroute" else "#3388ff", + "width": 2 + } + } + for frm, to in edges_set + ] + + # Filter nodes to only include those involved in edges (including traceroutes) + nodes_with_edges = [node for node in nodes if node.node_id in used_nodes] + + template = env.get_template("nodegraph.html") + return web.Response( + text=template.render( + nodes=nodes_with_edges, + edges=edges, # Pass edges with color info + ), + content_type="text/html", + ) + + + async def run_server(bind, port, tls_cert): + gc.set_threshold(10, 10, 10) app = web.Application() app.add_routes(routes) runner = web.AppRunner(app) @@ -1611,6 +1324,7 @@ async def run_server(bind, port, tls_cert): for host in bind: site = web.TCPSite(runner, host, port, ssl_context=ssl_context) await site.start() - + print_memory_usage() while True: await asyncio.sleep(3600) # sleep forever + diff --git a/requirements.txt b/requirements.txt index e064e6a..1a5d9ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,13 +1,46 @@ -protobuf -aiomqtt -sqlalchemy[asyncio] -cryptography -aiosqlite -aiohttp -aiodns -Jinja2 +protobuf~=5.29.3 +aiomqtt~=2.3.0 +sqlalchemy[asyncio]~=2.0.38 +cryptography~=44.0.1 +aiosqlite~=0.21.0 +aiohttp~=3.11.12 +aiodns~=3.2.0 +Jinja2~=3.1.5 aiohttp-sse -asyncpg -seaborn -pydot -plotly +asyncpg~=0.30.0 +seaborn~=0.13.2 +pydot~=3.0.4 +plotly~=6.0.0 + +numpy~=2.2.3 +pillow~=11.1.0 +pip~=23.2.1 +attrs~=25.1.0 +cffi~=1.17.1 +paho-mqtt~=2.1.0 +pytz~=2025.1 +idna~=3.10 +multidict~=6.1.0 +propcache~=0.2.1 +typing_extensions~=4.12.2 +pyparsing~=3.2.1 +pycares~=4.5.0 +MarkupSafe~=3.0.2 +pandas~=2.2.3 +matplotlib~=3.10.0 +python-dateutil~=2.9.0.post0 +packaging~=24.2 +narwhals~=1.27.1 +yarl~=1.18.3 +aiosignal~=1.3.2 +frozenlist~=1.5.0 +aiohappyeyeballs~=2.4.6 +cycler~=0.12.1 +six~=1.17.0 +greenlet~=3.1.1 +psutil~=7.0.0 +objgraph~=3.6.2 +contourpy~=1.3.1 +fonttools~=4.56.0 +pycparser~=2.22 +kiwisolver~=1.4.8 \ No newline at end of file diff --git a/startdb.py b/startdb.py new file mode 100644 index 0000000..9aba173 --- /dev/null +++ b/startdb.py @@ -0,0 +1,50 @@ +import asyncio +import argparse +import configparser +from meshview import mqtt_reader +from meshview import mqtt_database +from meshview import mqtt_store +import json + + +async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None): + async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd): + await mqtt_store.process_envelope(topic, env) + + +async def main(config): + mqtt_database.init_database(config["database"]["connection_string"]) + + await mqtt_database.create_tables() + mqtt_user = None + mqtt_passwd = None + if config["mqtt"]["username"] != "": + mqtt_user: str = config["mqtt"]["username"] + if config["mqtt"]["password"] != "": + mqtt_passwd: str = config["mqtt"]["password"] + mqtt_topics = json.loads(config["mqtt"]["topics"]) + + async with asyncio.TaskGroup() as tg: + tg.create_task( + load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd) + ) + + +def load_config(file_path): + """Load configuration from an INI-style text file.""" + config_parser = configparser.ConfigParser() + config_parser.read(file_path) + + # Convert to a dictionary for easier access + config = {section: dict(config_parser.items(section)) for section in config_parser.sections()} + return config + + +if __name__ == '__main__': + parser = argparse.ArgumentParser("meshview") + parser.add_argument("--config", help="Path to the configuration file.", default='config.ini') + args = parser.parse_args() + + config = load_config(args.config) + + asyncio.run(main(config)) \ No newline at end of file From 19aba18fcd8d8f303f10f1483b5d55f801a99e6b Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Wed, 5 Mar 2025 16:44:26 -0800 Subject: [PATCH 4/8] Update README.md --- README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.md b/README.md index 712b5f7..c6977b4 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,12 @@ connection_string = sqlite+aiosqlite:///packets.db ``` bash ./env/bin/python main.py ``` +Start the database connection. + +``` bash +./env/bin/python startdb.py +``` + Now you can hit http://localhost:8081/ ***(if you did not change the web server port )*** You can specify the path to your `config.ini` file with the run command argument `--config` From 45ce4c17e568c1df1229957befc42f6202936153 Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Wed, 5 Mar 2025 16:45:26 -0800 Subject: [PATCH 5/8] Update README.md --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index c6977b4..6a44375 100644 --- a/README.md +++ b/README.md @@ -58,15 +58,15 @@ connection_string = sqlite+aiosqlite:///packets.db ``` ## Running Meshview - -``` bash -./env/bin/python main.py -``` Start the database connection. - ``` bash ./env/bin/python startdb.py ``` +Start the web server. +``` bash +./env/bin/python main.py +``` + Now you can hit http://localhost:8081/ ***(if you did not change the web server port )*** From 9006444cf83f4c7cc73bb2aba82ed0ddb0fced4f Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Wed, 5 Mar 2025 16:53:05 -0800 Subject: [PATCH 6/8] Major changes to the project - Now the database process and the web process are separate. - Added Map - Added new graphing tools --- meshview/templates/chat_packet.html | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/meshview/templates/chat_packet.html b/meshview/templates/chat_packet.html index 8352215..9a6d833 100644 --- a/meshview/templates/chat_packet.html +++ b/meshview/templates/chat_packet.html @@ -1,6 +1,6 @@
- ✉️ {{packet.from_node.channel}}
{{packet.import_time.strftime('%-I:%M:%S %p - %d-%m-%Y')}}
- - {{packet.from_node.long_name or (packet.from_node_id | node_id_to_hex) }} - {{packet.payload}} + {{packet.import_time.strftime('%-I:%M:%S %p - %d-%m-%Y')}} + ✉️ {{packet.from_node.channel}} + {{packet.from_node.long_name or (packet.from_node_id | node_id_to_hex) }} + {{packet.payload}}
\ No newline at end of file From ab5b8a70122ecec4ce14ed50db0f27b74e5e361c Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Thu, 6 Mar 2025 15:50:33 -0800 Subject: [PATCH 7/8] Added more analytics by reporting on the number and kinds of packets each node sends in a 24 hour period. --- meshview/store.py | 63 ++++++++++++++++ meshview/templates/base.html | 2 +- meshview/templates/node.html | 1 + meshview/templates/node_traffic.html | 105 +++++++++++++++++++++++++++ meshview/templates/packet.html | 3 +- meshview/templates/packet_list.html | 2 +- meshview/templates/top.html | 65 +++++++++++++++++ meshview/web.py | 30 +++++++- 8 files changed, 266 insertions(+), 5 deletions(-) create mode 100644 meshview/templates/node_traffic.html create mode 100644 meshview/templates/top.html diff --git a/meshview/store.py b/meshview/store.py index 87ef868..fd27e51 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -3,6 +3,7 @@ from sqlalchemy import select, func from sqlalchemy.orm import lazyload from meshview import database from meshview.models import Packet, PacketSeen, Node, Traceroute +from sqlalchemy import text async def get_node(node_id): async with database.async_session() as session: @@ -211,6 +212,68 @@ async def get_nodes_mediumslow(): return result.scalars() +async def get_top_traffic_nodes(): + async with database.async_session() as session: + result = await session.execute(text(""" + SELECT + n.node_id, + n.long_name, + n.role, + COUNT(p.id) AS packet_count + FROM + packet p + JOIN + node n + ON + p.from_node_id = n.node_id + WHERE + p.import_time >= DATETIME('now', '-1 day') + GROUP BY + n.long_name, n.role + ORDER BY + packet_count DESC + LIMIT 100; + """)) + + return result.fetchall() # Returns a list of tuples + + +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select + + +async def get_node_traffic(node_id: int): + try: + async with database.async_session() as session: + result = await session.execute( + text(""" + SELECT + node.long_name, packet.portnum, + COUNT(*) AS packet_count + FROM packet + JOIN node ON packet.from_node_id = node.node_id OR packet.to_node_id = node.node_id + WHERE node.node_id = :node_id + AND packet.import_time >= DATETIME('now', '-1 day') + GROUP BY packet.portnum + ORDER BY packet_count DESC; + """), {"node_id": node_id} + ) + + # Map the result to include node.long_name and packet data + traffic_data = [{ + "long_name": row[0], # node.long_name + "portnum": row[1], # packet.portnum + "packet_count": row[2] # COUNT(*) as packet_count + } for row in result.all()] + + return traffic_data + + except Exception as e: + # Log the error or handle it as needed + print(f"Error fetching node traffic: {str(e)}") + return [] + + async def get_nodes(role=None, channel=None, hw_model=None): """ diff --git a/meshview/templates/base.html b/meshview/templates/base.html index 54a5ac1..436b396 100644 --- a/meshview/templates/base.html +++ b/meshview/templates/base.html @@ -38,7 +38,7 @@
Bay Area Mesh - http://bayme.sh
Quick Links:  Nodes - Conversations - See everything  - Mesh Graph LF - MS  - Stats -  - Weekly Net - Map

+  - Weekly Net - Map - Top Traffic

Loading...
diff --git a/meshview/templates/node.html b/meshview/templates/node.html index a348752..434bffe 100644 --- a/meshview/templates/node.html +++ b/meshview/templates/node.html @@ -53,6 +53,7 @@
Role
{{node.role}}
+ Get node traffic totals {% include "node_graphs.html" %}
{% else %} diff --git a/meshview/templates/node_traffic.html b/meshview/templates/node_traffic.html new file mode 100644 index 0000000..a8aca5b --- /dev/null +++ b/meshview/templates/node_traffic.html @@ -0,0 +1,105 @@ +{% extends "base.html" %} + +{% block css %} +.table-title { + font-size: 2rem; + text-align: center; + margin-bottom: 20px; + } + + .traffic-table { + width: 50%; + border-collapse: collapse; + margin: 0 auto; + font-family: Arial, sans-serif; + } + + .traffic-table th, + .traffic-table td { + padding: 10px 15px; + text-align: left; + border: 1px solid #474b4e; + } + + .traffic-table th { + background-color: #272b2f; + color: white; + } + + .traffic:nth-of-type(odd) { + background-color: #272b2f; /* Lighter than #2a2a2a */ + } + + .traffic { + border: 1px solid #474b4e; + padding: 8px; + margin-bottom: 4px; + border-radius: 8px; + } + + .traffic:nth-of-type(even) { + background-color: #212529; /* Slightly lighter than the previous #181818 */ + } + + .footer { + text-align: center; + margin-top: 20px; + } + +{% endblock %} + +{% block body %} +
+

{{ traffic[0].long_name }} (last 24 hours)

+ + + + + + + + + {% for port in traffic %} + + + + + {% else %} + + + + {% endfor %} + +
Port NumberPacket Count
+ {% if port.portnum == 1 %} + TEXT_MESSAGE_APP + {% elif port.portnum == 3 %} + POSITION_APP + {% elif port.portnum == 4 %} + NODEINFO_APP + {% elif port.portnum == 5 %} + ROUTING_APP + {% elif port.portnum == 67 %} + TELEMETRY_APP + {% elif port.portnum == 70 %} + TRACEROUTE_APP + {% elif port.portnum == 71 %} + NEIGHBORINFO_APP + {% elif port.portnum == 73 %} + MAP_REPORT_APP + {% elif port.portnum == 0 %} + UNKNOWN_APP + {% elif port.portnum == 0 %} + UNKNOWN_APP + {% elif port.portnum == 0 %} + UNKNOWN_APP + {% else %} + {{ port.portnum }} + {% endif %} + {{ port.packet_count }}
No traffic data available for this node.
+
+ + +{% endblock %} diff --git a/meshview/templates/packet.html b/meshview/templates/packet.html index a6d7cc7..c832b97 100644 --- a/meshview/templates/packet.html +++ b/meshview/templates/packet.html @@ -13,7 +13,6 @@ {%- endif -%} ) - -> {{packet.to_node.long_name}}( {%- if not to_me -%} @@ -40,7 +39,7 @@
payload
{% if packet.pretty_payload %} -
{{packet.pretty_payload}}
+
{{packet.pretty_payload}}
{% endif %} {% if packet.raw_mesh_packet.decoded and packet.raw_mesh_packet.decoded.portnum == 70 %}
    diff --git a/meshview/templates/packet_list.html b/meshview/templates/packet_list.html index f90c58f..24f2f8c 100644 --- a/meshview/templates/packet_list.html +++ b/meshview/templates/packet_list.html @@ -1,4 +1,4 @@ -
    +
    {% for packet in packets %} {% include 'packet.html' %} {% else %} diff --git a/meshview/templates/top.html b/meshview/templates/top.html new file mode 100644 index 0000000..b875f77 --- /dev/null +++ b/meshview/templates/top.html @@ -0,0 +1,65 @@ +{% extends "base.html" %} +{% block css %} +.table-title { + font-size: 2rem; + text-align: center; + margin-bottom: 20px; + } + + .traffic-table { + width: 60%; + border-collapse: collapse; + margin: 0 auto; + font-family: Arial, sans-serif; + } + + .traffic-table th, + .traffic-table td { + padding: 10px 15px; + text-align: left; + border: 1px solid #474b4e; + } + + .traffic-table th { + background-color: #272b2f; + color: white; + } + + .traffic:nth-of-type(odd) { + background-color: #272b2f; /* Lighter than #2a2a2a */ + } + + .traffic { + border: 1px solid #474b4e; + padding: 8px; + margin-bottom: 4px; + border-radius: 8px; + } + + .traffic:nth-of-type(even) { + background-color: #212529; /* Slightly lighter than the previous #181818 */ + } +{% endblock %} + +{% block body %} +

    Top Traffic Nodes (last 24 hours)

    + + + + + + + + + + {% for node in nodes %} + + + + + + {% endfor %} + +
    Node NameRolePacket Count
    {{ node[1] }} {{ node[2] }}{{ node[3] }}
    + +{% endblock %} diff --git a/meshview/web.py b/meshview/web.py index 3a62550..39b5fbc 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -23,7 +23,6 @@ from meshview import decode_payload import gc import psutil - env = Environment(loader=PackageLoader("meshview"), autoescape=select_autoescape()) # Optimize garbage collection frequency @@ -1190,6 +1189,35 @@ async def stats(request): content_type="text/plain", ) +@routes.get("/top") +async def top(request): + try: + node_id = request.query.get("node_id") # Get node_id from the URL query parameters + + if node_id: + # If node_id is provided, fetch traffic data for the specific node + node_traffic = await store.get_node_traffic(int(node_id)) + print(node_traffic) + template = env.get_template("node_traffic.html") # Render a different template + html_content = template.render(traffic=node_traffic, node_id=node_id) + else: + # Otherwise, fetch top traffic nodes as usual + top_nodes = await store.get_top_traffic_nodes() + template = env.get_template("top.html") + html_content = template.render(nodes=top_nodes) + + return web.Response( + text=html_content, + content_type="text/html", + ) + except Exception as e: + return web.Response( + text=f"An error occurred: {str(e)}", + status=500, + content_type="text/plain", + ) + + @routes.get("/chat") async def chat(request): From a59787a01025baba802764d2467686014b903353 Mon Sep 17 00:00:00 2001 From: Pablo Revilla Date: Thu, 6 Mar 2025 15:59:09 -0800 Subject: [PATCH 8/8] Added more analytics by reporting on the number and kinds of packets each node sends in a 24 hour period. --- meshview/store.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/meshview/store.py b/meshview/store.py index fd27e51..f1f7e3b 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -237,11 +237,6 @@ async def get_top_traffic_nodes(): return result.fetchall() # Returns a list of tuples - -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.future import select - - async def get_node_traffic(node_id: int): try: async with database.async_session() as session: