diff --git a/.idea/.gitignore b/.idea/.gitignore
deleted file mode 100644
index 26d3352..0000000
--- a/.idea/.gitignore
+++ /dev/null
@@ -1,3 +0,0 @@
-# Default ignored files
-/shelf/
-/workspace.xml
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/meshview-2.iml b/.idea/meshview-2.iml
new file mode 100644
index 0000000..7198901
--- /dev/null
+++ b/.idea/meshview-2.iml
@@ -0,0 +1,10 @@
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..99ffd7e
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/README.md b/README.md
index 712b5f7..6a44375 100644
--- a/README.md
+++ b/README.md
@@ -58,10 +58,16 @@ connection_string = sqlite+aiosqlite:///packets.db
```
## Running Meshview
-
+Start the database connection.
+``` bash
+./env/bin/python startdb.py
+```
+Start the web server.
``` bash
./env/bin/python main.py
```
+
+
Now you can hit http://localhost:8081/ ***(if you did not change the web server port )***
You can specify the path to your `config.ini` file with the run command argument `--config`
diff --git a/main.py b/main.py
index 4d4ff40..c982bd2 100644
--- a/main.py
+++ b/main.py
@@ -1,18 +1,19 @@
import asyncio
import argparse
import configparser
-import json
from meshview import mqtt_reader
from meshview import database
-from meshview import store
+from meshview import mqtt_store
from meshview import web
from meshview import http
from meshview import models
+import json
+
async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd):
- await store.process_envelope(topic, env)
+ await mqtt_store.process_envelope(topic, env)
async def main(config):
@@ -39,9 +40,6 @@ async def main(config):
# print("Site configuration loaded to database")
async with asyncio.TaskGroup() as tg:
- tg.create_task(
- load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
- )
tg.create_task(
web.run_server(
config["server"]["bind"],
@@ -49,13 +47,6 @@ async def main(config):
config["server"].get("tls_cert"),
)
)
- if config["server"].get("acme_challenge"):
- tg.create_task(
- http.run_server(
- config["server"]["bind"], config["server"]["acme_challenge"]
- )
- )
-
def load_config(file_path):
"""Load configuration from an INI-style text file."""
diff --git a/meshview/database.py b/meshview/database.py
index a20e37b..83fc134 100644
--- a/meshview/database.py
+++ b/meshview/database.py
@@ -1,15 +1,36 @@
from meshview import models
-from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
+from sqlalchemy.ext.asyncio import async_sessionmaker
+from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
-def init_database(database_connection_string):
+
+
+engine = None
+async_session = None
+
+
+def init_database(database_connection_string, read_only=False):
global engine, async_session
- kwargs = {}
- if not database_connection_string.startswith('sqlite'):
- kwargs['pool_size'] = 20
- kwargs['max_overflow'] = 50
- print (**kwargs)
- engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 15})
- async_session = async_sessionmaker(engine, expire_on_commit=False)
+
+ kwargs = {"echo": False}
+
+ if database_connection_string.startswith("sqlite"):
+ if read_only:
+ # Ensure SQLite is opened in read-only mode
+ database_connection_string += "?mode=ro"
+ kwargs["connect_args"] = {"uri": True}
+ else:
+ kwargs["connect_args"] = {"timeout": 15}
+ else:
+ kwargs["pool_size"] = 20
+ kwargs["max_overflow"] = 50
+
+ print("Database connection settings:", kwargs) # Debugging output
+
+ engine = create_async_engine(database_connection_string, **kwargs)
+ async_session = async_sessionmaker( bind=engine,
+ class_=AsyncSession,
+ expire_on_commit=False,
+)
async def create_tables():
async with engine.begin() as conn:
diff --git a/meshview/models.py b/meshview/models.py
index cb69a89..752c102 100644
--- a/meshview/models.py
+++ b/meshview/models.py
@@ -1,5 +1,4 @@
from datetime import datetime
-
from sqlalchemy.orm import DeclarativeBase, foreign
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import mapped_column, relationship, Mapped
diff --git a/meshview/mqtt_database.py b/meshview/mqtt_database.py
new file mode 100644
index 0000000..a20e37b
--- /dev/null
+++ b/meshview/mqtt_database.py
@@ -0,0 +1,16 @@
+from meshview import models
+from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
+
+def init_database(database_connection_string):
+ global engine, async_session
+ kwargs = {}
+ if not database_connection_string.startswith('sqlite'):
+ kwargs['pool_size'] = 20
+ kwargs['max_overflow'] = 50
+ print (**kwargs)
+ engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 15})
+ async_session = async_sessionmaker(engine, expire_on_commit=False)
+
+async def create_tables():
+ async with engine.begin() as conn:
+ await conn.run_sync(models.Base.metadata.create_all)
diff --git a/meshview/mqtt_reader.py b/meshview/mqtt_reader.py
index 5215f98..c63d3ce 100644
--- a/meshview/mqtt_reader.py
+++ b/meshview/mqtt_reader.py
@@ -33,6 +33,7 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa
mqtt_server, port=mqtt_port , username=mqtt_user, password=mqtt_passwd , identifier=identifier,
) as client:
for topic in topics:
+ print(topic)
await client.subscribe(topic)
async for msg in client.messages:
try:
diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py
new file mode 100644
index 0000000..292d996
--- /dev/null
+++ b/meshview/mqtt_store.py
@@ -0,0 +1,165 @@
+import datetime
+from sqlalchemy import select
+from sqlalchemy import update
+from meshtastic.protobuf.config_pb2 import Config
+from meshtastic.protobuf.portnums_pb2 import PortNum
+from meshtastic.protobuf.mesh_pb2 import User, HardwareModel
+from meshview import mqtt_database
+from meshview import decode_payload
+from meshview.models import Packet, PacketSeen, Node, Traceroute
+
+
+
+
+async def process_envelope(topic, env):
+
+ # Checking if the received packet is a MAP_REPORT
+ # Update the node table with the firmware version
+ if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP:
+ # Extract the node ID from the packet (renamed from 'id' to 'node_id' to avoid conflicts with Python's built-in id function)
+ node_id = getattr(env.packet, "from")
+
+ # Decode the MAP report payload to extract the firmware version
+ map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload)
+
+ # Establish an asynchronous database session
+ async with mqtt_database.async_session() as session:
+ # Construct an SQLAlchemy update statement
+ stmt = (
+ update(Node)
+ .where(Node.node_id == node_id) # Ensure correct column reference
+ .values(firmware=map_report.firmware_version) # Assign new firmware value
+ )
+
+ # Execute the update statement asynchronously
+ await session.execute(stmt)
+
+ # Commit the changes to the database
+ await session.commit()
+
+ # This ignores any packet that does not have a ID
+ if not env.packet.id:
+ return
+
+ async with mqtt_database.async_session() as session:
+ result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
+ new_packet = False
+ packet = result.scalar_one_or_none()
+ if not packet:
+ new_packet = True
+ packet = Packet(
+ id=env.packet.id,
+ portnum=env.packet.decoded.portnum,
+ from_node_id=getattr(env.packet, "from"),
+ to_node_id=env.packet.to,
+ payload=env.packet.SerializeToString(),
+ # p.r. Here seems to be where the packet is imported on the Database and import time is set.
+ import_time=datetime.datetime.now(),
+ channel=env.channel_id,
+ )
+ session.add(packet)
+
+ result = await session.execute(
+ select(PacketSeen).where(
+ PacketSeen.packet_id == env.packet.id,
+ PacketSeen.node_id == int(env.gateway_id[1:], 16),
+ PacketSeen.rx_time == env.packet.rx_time,
+ )
+ )
+ seen = None
+ if not result.scalar_one_or_none():
+ seen = PacketSeen(
+ packet_id=env.packet.id,
+ node_id=int(env.gateway_id[1:], 16),
+ channel=env.channel_id,
+ rx_time=env.packet.rx_time,
+ rx_snr=env.packet.rx_snr,
+ rx_rssi=env.packet.rx_rssi,
+ hop_limit=env.packet.hop_limit,
+ hop_start=env.packet.hop_start,
+ topic=topic,
+ # p.r. Here seems to be where the packet is imported on the Database and import time is set.
+ import_time=datetime.datetime.now(),
+ )
+ session.add(seen)
+
+
+
+ if env.packet.decoded.portnum == PortNum.NODEINFO_APP:
+ user = decode_payload.decode_payload(
+ PortNum.NODEINFO_APP, env.packet.decoded.payload
+ )
+ if user:
+ result = await session.execute(select(Node).where(Node.id == user.id))
+ if user.id and user.id[0] == "!":
+ try:
+ node_id = int(user.id[1:], 16)
+ except ValueError:
+ node_id = None
+ pass
+ else:
+ node_id = None
+
+ try:
+ hw_model = HardwareModel.Name(user.hw_model)
+ except ValueError:
+ hw_model = "unknown"
+ try:
+ role = Config.DeviceConfig.Role.Name(user.role)
+ except ValueError:
+ role = "unknown"
+
+ if node := result.scalar_one_or_none():
+ node.node_id = node_id
+ node.long_name = user.long_name
+ node.short_name = user.short_name
+ node.hw_model = hw_model
+ node.role = role
+ node.last_update =datetime.datetime.now()
+
+ else:
+ node = Node(
+ id=user.id,
+ node_id=node_id,
+ long_name=user.long_name,
+ short_name=user.short_name,
+ hw_model=hw_model,
+ role=role,
+ channel=env.channel_id,
+ # if need to update time of last update it may be here
+ )
+ session.add(node)
+
+ if env.packet.decoded.portnum == PortNum.POSITION_APP:
+ position = decode_payload.decode_payload(
+ PortNum.POSITION_APP, env.packet.decoded.payload
+ )
+ if position and position.latitude_i and position.longitude_i:
+ from_node_id = getattr(env.packet, 'from')
+ node = (await session.execute(select(Node).where(Node.node_id == from_node_id))).scalar_one_or_none()
+ if node:
+ node.last_lat = position.latitude_i
+ node.last_long = position.longitude_i
+ session.add(node)
+
+ if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
+ packet_id = None
+ if env.packet.decoded.want_response:
+ packet_id = env.packet.id
+ else:
+ result = await session.execute(select(Packet).where(Packet.id == env.packet.decoded.request_id))
+ if result.scalar_one_or_none():
+ packet_id = env.packet.decoded.request_id
+ if packet_id is not None:
+ session.add(Traceroute(
+ packet_id=packet_id,
+ route=env.packet.decoded.payload,
+ done=not env.packet.decoded.want_response,
+ gateway_node_id=int(env.gateway_id[1:], 16),
+ import_time=datetime.datetime.now(),
+ ))
+
+ await session.commit()
+ if new_packet:
+ await packet.awaitable_attrs.to_node
+ await packet.awaitable_attrs.from_node
diff --git a/meshview/store.py b/meshview/store.py
index 307afe2..1b49e9e 100644
--- a/meshview/store.py
+++ b/meshview/store.py
@@ -1,175 +1,9 @@
import datetime
-
from sqlalchemy import select, func
from sqlalchemy.orm import lazyload
-from sqlalchemy import update
-from meshtastic.protobuf.config_pb2 import Config
-from meshtastic.protobuf.portnums_pb2 import PortNum
-from meshtastic.protobuf.mesh_pb2 import User, HardwareModel
from meshview import database
-from meshview import decode_payload
from meshview.models import Packet, PacketSeen, Node, Traceroute, SiteConfig
-from meshview import notify
-
-
-
-async def process_envelope(topic, env):
-
- # Checking if the received packet is a MAP_REPORT
- # Update the node table with the firmware version
- if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP:
- # Extract the node ID from the packet (renamed from 'id' to 'node_id' to avoid conflicts with Python's built-in id function)
- node_id = getattr(env.packet, "from")
-
- # Decode the MAP report payload to extract the firmware version
- map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload)
-
- # Establish an asynchronous database session
- async with database.async_session() as session:
- # Construct an SQLAlchemy update statement
- stmt = (
- update(Node)
- .where(Node.node_id == node_id) # Ensure correct column reference
- .values(firmware=map_report.firmware_version) # Assign new firmware value
- )
-
- # Execute the update statement asynchronously
- await session.execute(stmt)
-
- # Commit the changes to the database
- await session.commit()
-
- # This ignores any packet that does not have a ID
- if not env.packet.id:
- return
-
- async with database.async_session() as session:
- result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
- new_packet = False
- packet = result.scalar_one_or_none()
- if not packet:
- new_packet = True
- packet = Packet(
- id=env.packet.id,
- portnum=env.packet.decoded.portnum,
- from_node_id=getattr(env.packet, "from"),
- to_node_id=env.packet.to,
- payload=env.packet.SerializeToString(),
- # p.r. Here seems to be where the packet is imported on the Database and import time is set.
- import_time=datetime.datetime.now(),
- channel=env.channel_id,
- )
- session.add(packet)
-
- result = await session.execute(
- select(PacketSeen).where(
- PacketSeen.packet_id == env.packet.id,
- PacketSeen.node_id == int(env.gateway_id[1:], 16),
- PacketSeen.rx_time == env.packet.rx_time,
- )
- )
- seen = None
- if not result.scalar_one_or_none():
- seen = PacketSeen(
- packet_id=env.packet.id,
- node_id=int(env.gateway_id[1:], 16),
- channel=env.channel_id,
- rx_time=env.packet.rx_time,
- rx_snr=env.packet.rx_snr,
- rx_rssi=env.packet.rx_rssi,
- hop_limit=env.packet.hop_limit,
- hop_start=env.packet.hop_start,
- topic=topic,
- # p.r. Here seems to be where the packet is imported on the Database and import time is set.
- import_time=datetime.datetime.now(),
- )
- session.add(seen)
-
-
-
- if env.packet.decoded.portnum == PortNum.NODEINFO_APP:
- user = decode_payload.decode_payload(
- PortNum.NODEINFO_APP, env.packet.decoded.payload
- )
- if user:
- result = await session.execute(select(Node).where(Node.id == user.id))
- if user.id and user.id[0] == "!":
- try:
- node_id = int(user.id[1:], 16)
- except ValueError:
- node_id = None
- pass
- else:
- node_id = None
-
- try:
- hw_model = HardwareModel.Name(user.hw_model)
- except ValueError:
- hw_model = "unknown"
- try:
- role = Config.DeviceConfig.Role.Name(user.role)
- except ValueError:
- role = "unknown"
-
- if node := result.scalar_one_or_none():
- node.node_id = node_id
- node.long_name = user.long_name
- node.short_name = user.short_name
- node.hw_model = hw_model
- node.role = role
- node.last_update =datetime.datetime.now()
-
- else:
- node = Node(
- id=user.id,
- node_id=node_id,
- long_name=user.long_name,
- short_name=user.short_name,
- hw_model=hw_model,
- role=role,
- channel=env.channel_id,
- # if need to update time of last update it may be here
- )
- session.add(node)
-
- if env.packet.decoded.portnum == PortNum.POSITION_APP:
- position = decode_payload.decode_payload(
- PortNum.POSITION_APP, env.packet.decoded.payload
- )
- if position and position.latitude_i and position.longitude_i:
- from_node_id = getattr(env.packet, 'from')
- node = (await session.execute(select(Node).where(Node.node_id == from_node_id))).scalar_one_or_none()
- if node:
- node.last_lat = position.latitude_i
- node.last_long = position.longitude_i
- session.add(node)
-
- if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
- packet_id = None
- if env.packet.decoded.want_response:
- packet_id = env.packet.id
- else:
- result = await session.execute(select(Packet).where(Packet.id == env.packet.decoded.request_id))
- if result.scalar_one_or_none():
- packet_id = env.packet.decoded.request_id
- if packet_id is not None:
- session.add(Traceroute(
- packet_id=packet_id,
- route=env.packet.decoded.payload,
- done=not env.packet.decoded.want_response,
- gateway_node_id=int(env.gateway_id[1:], 16),
- import_time=datetime.datetime.now(),
- ))
-
- await session.commit()
- if new_packet:
- await packet.awaitable_attrs.to_node
- await packet.awaitable_attrs.from_node
- notify.notify_packet(packet.to_node_id, packet)
- notify.notify_packet(packet.from_node_id, packet)
- notify.notify_packet(None, packet)
- if seen:
- notify.notify_uplinked(seen.node_id, packet)
+from sqlalchemy import text
async def get_node(node_id):
@@ -189,7 +23,7 @@ async def get_fuzzy_nodes(query):
return result.scalars()
-async def get_packets(node_id=None, portnum=None, since=None, limit=500, before=None, after=None):
+async def get_packets(node_id=None, portnum=None, since=None, limit=1000, before=None, after=None):
async with database.async_session() as session:
q = select(Packet)
@@ -209,7 +43,8 @@ async def get_packets(node_id=None, portnum=None, since=None, limit=500, before=
q = q.limit(limit)
result = await session.execute(q.order_by(Packet.import_time.desc()))
- return result.scalars()
+ packets = list(result.scalars()) # Convert to list
+ return packets # Return the list
async def get_packets_from(node_id=None, portnum=None, since=None, limit=500):
@@ -300,140 +135,6 @@ async def get_mqtt_neighbors(since):
)
return result
-# In order to provide separate network graphs for LongFast and MediumSlow, I am duplicating the procedures.
-# 3 procedures are needed. These would have to be replicated for any other network that we may need to use graphs.
-#
-# get_traceroutes_longfast
-# get_packets_longfast
-# get_mqtt_neighbors_longfast
-#
-# p.r.
-# TODO # combine the duplicated funtions back to the original 3 by letting them take a second variable to specify channel name.
-# The default value for channel (none) should cause these functioins to operate the same as they did before they were channel specific.
-# This change will make adding new channel specific graphs much easier in the future.
-#
-# Get Traceroute for LongFast only
-async def get_traceroutes_longfast(since):
- async with database.async_session() as session:
- result = await session.execute(
- select(Traceroute)
- .join(Packet)
- .where(
- (Traceroute.import_time > (datetime.datetime.now() - since))
- & (Packet.channel == "LongFast")
- )
- .order_by(Traceroute.import_time)
- )
- return result.scalars()
-
-# Get MQTT Neighbors for LongFast only
-# p.r.
-async def get_mqtt_neighbors_longfast(since):
- async with database.async_session() as session:
- result = await session.execute(select(PacketSeen, Packet)
- .join(Packet)
- .where(
- (PacketSeen.hop_limit == PacketSeen.hop_start)
- & (PacketSeen.hop_start != 0)
- & (Packet.channel == "LongFast")
- )
-
- .options(
- lazyload(Packet.from_node),
- lazyload(Packet.to_node),
- )
- )
- return result
-
-# Get Packets for LongFast only
-# p.r.
-async def get_packets_longfast(node_id=None, portnum=None, since=None, limit=500, before=None, after=None):
- async with database.async_session() as session:
- q = select(Packet)
-
- # Add condition for channel being "LongFast"
- q = q.where(Packet.channel == "LongFast")
-
- if node_id:
- q = q.where(
- (Packet.from_node_id == node_id) | (Packet.to_node_id == node_id)
- )
- if portnum:
- q = q.where(Packet.portnum == portnum)
- if since:
- q = q.where(Packet.import_time > (datetime.datetime.now() - since))
- if before:
- q = q.where(Packet.import_time < before)
- if after:
- q = q.where(Packet.import_time > after)
- if limit is not None:
- q = q.limit(limit)
-
- result = await session.execute(q.order_by(Packet.import_time.desc()))
- return result.scalars()
-
-# Get Traceroute for mediumslow only
-# p.r.
-async def get_traceroutes_mediumslow(since):
- async with database.async_session() as session:
- result = await session.execute(
- select(Traceroute)
- .join(Packet)
- .where(
- (Traceroute.import_time > (datetime.datetime.now() - since))
- & (Packet.channel == "MediumSlow")
- )
- .order_by(Traceroute.import_time)
- )
- return result.scalars()
-
-# Get MQTT Neighbors for mediumslow only
-# p.r.
-async def get_mqtt_neighbors_mediumslow(since):
- async with database.async_session() as session:
- result = await session.execute(select(PacketSeen, Packet)
- .join(Packet)
- .where(
- (PacketSeen.hop_limit == PacketSeen.hop_start)
- & (PacketSeen.hop_start != 0)
- & (Packet.channel == "MediumSlow")
- )
-
- .options(
- lazyload(Packet.from_node),
- lazyload(Packet.to_node),
- )
- )
- return result
-
-# Get Packets for MediumSlow only
-# p.r.
-async def get_packets_mediumslow(node_id=None, portnum=None, since=None, limit=500, before=None, after=None):
- async with database.async_session() as session:
- q = select(Packet)
-
- # Add condition for channel being "MediumSlow"
- q = q.where(Packet.channel == "MediumSlow")
-
- if node_id:
- q = q.where(
- (Packet.from_node_id == node_id) | (Packet.to_node_id == node_id)
- )
- if portnum:
- q = q.where(Packet.portnum == portnum)
- if since:
- q = q.where(Packet.import_time > (datetime.datetime.now() - since))
- if before:
- q = q.where(Packet.import_time < before)
- if after:
- q = q.where(Packet.import_time > after)
- if limit is not None:
- q = q.limit(limit)
-
- result = await session.execute(q.order_by(Packet.import_time.desc()))
- return result.scalars()
-
-
# We count the total amount of packages
# This is to be used by /stats in web.py
@@ -512,6 +213,63 @@ async def get_nodes_mediumslow():
return result.scalars()
+async def get_top_traffic_nodes():
+ async with database.async_session() as session:
+ result = await session.execute(text("""
+ SELECT
+ n.node_id,
+ n.long_name,
+ n.role,
+ COUNT(p.id) AS packet_count
+ FROM
+ packet p
+ JOIN
+ node n
+ ON
+ p.from_node_id = n.node_id
+ WHERE
+ p.import_time >= DATETIME('now', '-1 day')
+ GROUP BY
+ n.long_name, n.role
+ ORDER BY
+ packet_count DESC
+ LIMIT 100;
+ """))
+
+ return result.fetchall() # Returns a list of tuples
+
+async def get_node_traffic(node_id: int):
+ try:
+ async with database.async_session() as session:
+ result = await session.execute(
+ text("""
+ SELECT
+ node.long_name, packet.portnum,
+ COUNT(*) AS packet_count
+ FROM packet
+ JOIN node ON packet.from_node_id = node.node_id OR packet.to_node_id = node.node_id
+ WHERE node.node_id = :node_id
+ AND packet.import_time >= DATETIME('now', '-1 day')
+ GROUP BY packet.portnum
+ ORDER BY packet_count DESC;
+ """), {"node_id": node_id}
+ )
+
+ # Map the result to include node.long_name and packet data
+ traffic_data = [{
+ "long_name": row[0], # node.long_name
+ "portnum": row[1], # packet.portnum
+ "packet_count": row[2] # COUNT(*) as packet_count
+ } for row in result.all()]
+
+ return traffic_data
+
+ except Exception as e:
+ # Log the error or handle it as needed
+ print(f"Error fetching node traffic: {str(e)}")
+ return []
+
+
async def get_nodes(role=None, channel=None, hw_model=None):
"""
diff --git a/meshview/templates/base.html b/meshview/templates/base.html
index 4536736..0908770 100644
--- a/meshview/templates/base.html
+++ b/meshview/templates/base.html
@@ -10,6 +10,8 @@
+
+
{% block head %}
{% endblock %}
@@ -36,8 +38,8 @@
{{ site_config.site_title }} - {{ site_config.site_domain }}
{{ site_config.site_message }}
+ - Mesh Graph LF - MS - Stats
+ - Weekly Net - Map - Top Traffic
Loading...
diff --git a/meshview/templates/chat.html b/meshview/templates/chat.html
index 09eebd8..df5db5d 100644
--- a/meshview/templates/chat.html
+++ b/meshview/templates/chat.html
@@ -21,7 +21,13 @@
{% block body %}
-
+
+
+
{% for packet in packets %}
{% include 'chat_packet.html' %}
{% else %}
diff --git a/meshview/templates/chat_packet.html b/meshview/templates/chat_packet.html
index 8352215..9a6d833 100644
--- a/meshview/templates/chat_packet.html
+++ b/meshview/templates/chat_packet.html
@@ -1,6 +1,6 @@
\ No newline at end of file
diff --git a/meshview/templates/firehose.html b/meshview/templates/firehose.html
index c8da75b..c0b5e42 100644
--- a/meshview/templates/firehose.html
+++ b/meshview/templates/firehose.html
@@ -1,7 +1,71 @@
{% extends "base.html" %}
{% block body %}
-
+
+
+
+
-
+
{% for packet in packets %}
- {% include 'packet.html' %}
+ {% include 'packet.html' %}
{% else %}
- No packets found.
+ No packets found.
{% endfor %}
-
+
{% endblock %}
diff --git a/meshview/templates/map.html b/meshview/templates/map.html
index cd1d003..a460cc9 100644
--- a/meshview/templates/map.html
+++ b/meshview/templates/map.html
@@ -93,11 +93,11 @@
let isRouter = node.role.toLowerCase().includes("router");
let markerOptions = {
- radius: isRouter ? 8 : 7,
- color: isRouter ? "black" : color,
+ radius: isRouter ? 9 : 7,
+ color: "white",
fillColor: color,
- fillOpacity: 0.6,
- weight: isRouter ? 1 : 0
+ fillOpacity: 1,
+ weight: .7,
};
var popupContent = `
diff --git a/meshview/templates/net.html b/meshview/templates/net.html
index b16e0db..9d6fcbb 100644
--- a/meshview/templates/net.html
+++ b/meshview/templates/net.html
@@ -24,7 +24,7 @@
Weekly Mesh check-in. We will keep it open on every Wednesday from 5:00pm for checkins so you do not have to rush.
The message format should be (LONG NAME) - (CITY YOU ARE IN) #BayMeshNet.
-
+
{% for packet in packets %}
{% include 'chat_packet.html' %}
{% else %}
diff --git a/meshview/templates/node.html b/meshview/templates/node.html
index 7104d95..434bffe 100644
--- a/meshview/templates/node.html
+++ b/meshview/templates/node.html
@@ -1,22 +1,28 @@
{% extends "base.html" %}
{% block css %}
+ /* Styles for the node info card */
#node_info {
- height:100%;
+ height: 100%;
}
- #map{
- height:100%;
+
+ /* Styles for the map */
+ #map {
+ height: 100%;
min-height: 400px;
}
- #packet_details{
+
+ /* Styles for packet details section */
+ #packet_details {
height: 95vh;
overflow: scroll;
top: 3em;
}
+
+ /* Ensure inline display for details */
div.tab-pane > dl {
display: inline-block;
- }
-
+ }
{% endblock %}
{% block body %}
@@ -28,34 +34,26 @@
{% if node %}
hx-ext="sse"
sse-connect="/events?node_id={{node_id}}{% if portnum %}&portnum={{portnum}}{% endif %}"
- {% endif %}
- >
+ {% endif %}
+ >
-
-
- {% include "buttons.html" %}
+
- {% include 'packet_list.html' %}
+
-
+
+
+
+
+ {% include 'packet_list.html' %}
+
+
-
{% if trace %}
+
+
{% endif %}
{% endblock %}
diff --git a/meshview/templates/node2.html b/meshview/templates/node2.html
new file mode 100644
index 0000000..cf9fd61
--- /dev/null
+++ b/meshview/templates/node2.html
@@ -0,0 +1,58 @@
+{% extends "base.html" %}
+
+{% block css %}
+ #node_info {
+ height:100%;
+ }
+ #map{
+ height:100%;
+ min-height: 400px;
+ }
+ #packet_details{
+ height: 95vh;
+ overflow: scroll;
+ top: 3em;
+ }
+ div.tab-pane > dl {
+ display: inline-block;
+ }
+{% endblock %}
+
+{% block body %}
+
+{% include "search_form.html" %}
+
+
+
+
+ {% if node %}
+
+
+
+ ShortName
+ {{node.short_name}}
+ HW Model
+ {{node.hw_model}}
+ Role
+ {{node.role}}
+
+
+ {% else %}
+
+ A NodeInfo has not been seen.
+
+ {% endif %}
+
+
+
+
+ {% include 'packet_list.html' %}
+
+
+
+
+{% endblock %}
diff --git a/meshview/templates/node_traffic.html b/meshview/templates/node_traffic.html
new file mode 100644
index 0000000..a8aca5b
--- /dev/null
+++ b/meshview/templates/node_traffic.html
@@ -0,0 +1,105 @@
+{% extends "base.html" %}
+
+{% block css %}
+.table-title {
+ font-size: 2rem;
+ text-align: center;
+ margin-bottom: 20px;
+ }
+
+ .traffic-table {
+ width: 50%;
+ border-collapse: collapse;
+ margin: 0 auto;
+ font-family: Arial, sans-serif;
+ }
+
+ .traffic-table th,
+ .traffic-table td {
+ padding: 10px 15px;
+ text-align: left;
+ border: 1px solid #474b4e;
+ }
+
+ .traffic-table th {
+ background-color: #272b2f;
+ color: white;
+ }
+
+ .traffic:nth-of-type(odd) {
+ background-color: #272b2f; /* Lighter than #2a2a2a */
+ }
+
+ .traffic {
+ border: 1px solid #474b4e;
+ padding: 8px;
+ margin-bottom: 4px;
+ border-radius: 8px;
+ }
+
+ .traffic:nth-of-type(even) {
+ background-color: #212529; /* Slightly lighter than the previous #181818 */
+ }
+
+ .footer {
+ text-align: center;
+ margin-top: 20px;
+ }
+
+{% endblock %}
+
+{% block body %}
+
+ {{ traffic[0].long_name }} (last 24 hours)
+
+
+
+ Port Number
+ Packet Count
+
+
+
+ {% for port in traffic %}
+
+
+ {% if port.portnum == 1 %}
+ TEXT_MESSAGE_APP
+ {% elif port.portnum == 3 %}
+ POSITION_APP
+ {% elif port.portnum == 4 %}
+ NODEINFO_APP
+ {% elif port.portnum == 5 %}
+ ROUTING_APP
+ {% elif port.portnum == 67 %}
+ TELEMETRY_APP
+ {% elif port.portnum == 70 %}
+ TRACEROUTE_APP
+ {% elif port.portnum == 71 %}
+ NEIGHBORINFO_APP
+ {% elif port.portnum == 73 %}
+ MAP_REPORT_APP
+ {% elif port.portnum == 0 %}
+ UNKNOWN_APP
+ {% elif port.portnum == 0 %}
+ UNKNOWN_APP
+ {% elif port.portnum == 0 %}
+ UNKNOWN_APP
+ {% else %}
+ {{ port.portnum }}
+ {% endif %}
+
+ {{ port.packet_count }}
+
+ {% else %}
+
+ No traffic data available for this node.
+
+ {% endfor %}
+
+
+
+
+
+{% endblock %}
diff --git a/meshview/templates/nodegraph.html b/meshview/templates/nodegraph.html
new file mode 100644
index 0000000..b088685
--- /dev/null
+++ b/meshview/templates/nodegraph.html
@@ -0,0 +1,190 @@
+{% extends "base.html" %}
+
+{% block head %}
+
+{% endblock %}
+
+{% block css %}
+#mynetwork {
+ width: 100%;
+ height: 100vh;
+ max-width: 2000px;
+ max-height: 2000px;
+ border: 1px solid lightgray;
+ background-color: white;
+}
+
+.legend {
+ position: absolute;
+ bottom: 10px;
+ left: 10px;
+ background-color: rgba(255, 255, 255, 0.8);
+ padding: 5px;
+ border-radius: 5px;
+ border: 1px solid #ccc;
+ font-size: 12px;
+ color: #333;
+}
+
+#node-info {
+ position: absolute;
+ bottom: 10px;
+ right: 10px;
+ background-color: rgba(255, 255, 255, 0.9);
+ padding: 10px;
+ border-radius: 5px;
+ border: 1px solid #ccc;
+ font-size: 14px;
+ color: #333;
+ width: 250px;
+ max-height: 200px;
+ overflow-y: auto;
+}
+{% endblock %}
+
+{% block body %}
+
+
+
+
+
Traceroute
+
NeighborInfo
+
+
+
+
+ Long Name:
+ Short Name:
+ Role:
+ Hardware Model:
+
+
+<
+{% endblock %}
diff --git a/meshview/templates/packet.html b/meshview/templates/packet.html
index a6f7ceb..c832b97 100644
--- a/meshview/templates/packet.html
+++ b/meshview/templates/packet.html
@@ -13,7 +13,6 @@
{%- endif -%}
)
- ->
{{packet.to_node.long_name}}(
{%- if not to_me -%}
@@ -29,8 +28,7 @@
{{packet.id}}
-
🔎
-
🔗
+
🔎
@@ -41,7 +39,7 @@
payload
{% if packet.pretty_payload %}
- {{packet.pretty_payload}}
+
{{packet.pretty_payload}}
{% endif %}
{% if packet.raw_mesh_packet.decoded and packet.raw_mesh_packet.decoded.portnum == 70 %}
diff --git a/meshview/templates/packet_details.html b/meshview/templates/packet_details.html
index d273f0a..2496dc6 100644
--- a/meshview/templates/packet_details.html
+++ b/meshview/templates/packet_details.html
@@ -36,25 +36,103 @@
{% if map_center %}
+
+
+
{% endif %}
diff --git a/meshview/templates/packet_list.html b/meshview/templates/packet_list.html
index f90c58f..24f2f8c 100644
--- a/meshview/templates/packet_list.html
+++ b/meshview/templates/packet_list.html
@@ -1,4 +1,4 @@
-
+
{% for packet in packets %}
{% include 'packet.html' %}
{% else %}
diff --git a/meshview/templates/top.html b/meshview/templates/top.html
new file mode 100644
index 0000000..b875f77
--- /dev/null
+++ b/meshview/templates/top.html
@@ -0,0 +1,65 @@
+{% extends "base.html" %}
+{% block css %}
+.table-title {
+ font-size: 2rem;
+ text-align: center;
+ margin-bottom: 20px;
+ }
+
+ .traffic-table {
+ width: 60%;
+ border-collapse: collapse;
+ margin: 0 auto;
+ font-family: Arial, sans-serif;
+ }
+
+ .traffic-table th,
+ .traffic-table td {
+ padding: 10px 15px;
+ text-align: left;
+ border: 1px solid #474b4e;
+ }
+
+ .traffic-table th {
+ background-color: #272b2f;
+ color: white;
+ }
+
+ .traffic:nth-of-type(odd) {
+ background-color: #272b2f; /* Lighter than #2a2a2a */
+ }
+
+ .traffic {
+ border: 1px solid #474b4e;
+ padding: 8px;
+ margin-bottom: 4px;
+ border-radius: 8px;
+ }
+
+ .traffic:nth-of-type(even) {
+ background-color: #212529; /* Slightly lighter than the previous #181818 */
+ }
+{% endblock %}
+
+{% block body %}
+
Top Traffic Nodes (last 24 hours)
+
+
+
+ Node Name
+ Role
+ Packet Count
+
+
+
+ {% for node in nodes %}
+
+ {{ node[1] }}
+ {{ node[2] }}
+ {{ node[3] }}
+
+ {% endfor %}
+
+
+
+{% endblock %}
diff --git a/meshview/templates/traceroute.html b/meshview/templates/traceroute.html
new file mode 100644
index 0000000..8e94a01
--- /dev/null
+++ b/meshview/templates/traceroute.html
@@ -0,0 +1,67 @@
+{% extends "base.html" %}
+
+{% block head %}
+
+{% endblock %}
+
+{% block body %}
+
+
+
+
+{% endblock %}
diff --git a/meshview/web.py b/meshview/web.py
index e78007b..76682fd 100644
--- a/meshview/web.py
+++ b/meshview/web.py
@@ -3,110 +3,33 @@ import io
from collections import Counter
from dataclasses import dataclass
import datetime
-from aiohttp_sse import sse_response
import ssl
import re
import os
-
import pydot
from pandas import DataFrame
import plotly.express as px
import seaborn as sns
import matplotlib.pyplot as plt
-import matplotlib.ticker as ticker
from aiohttp import web
from markupsafe import Markup
from jinja2 import Environment, PackageLoader, select_autoescape
from google.protobuf import text_format
from google.protobuf.message import Message
-
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import store
from meshview import models
from meshview import decode_payload
-from meshview import notify
-
-
-with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png:
- empty_png = png.read()
-
+import gc
+import psutil
env = Environment(loader=PackageLoader("meshview"), autoescape=select_autoescape())
+# Optimize garbage collection frequency
+gc.set_threshold(100, 10, 10)
-async def build_trace(node_id):
- trace = []
- for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)):
- p = Packet.from_model(raw_p)
- if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i:
- continue
- trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7))
- if not trace:
- for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP):
- p = Packet.from_model(raw_p)
- if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i:
- continue
- trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7))
- break
- return trace
-
-
-async def build_neighbors(node_id):
- packet = (await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1)).first()
- if not packet:
- return []
- if packet.import_time < datetime.datetime.utcnow() - datetime.timedelta(days=1):
- return []
- _, payload = decode_payload.decode(packet)
- neighbors = []
- results = {}
- async with asyncio.TaskGroup() as tg:
- for n in payload.neighbors:
- results[n.node_id] = {
- 'node_id': n.node_id,
- 'snr': n.snr,
- }
- neighbors.append((n.node_id, tg.create_task(store.get_node(n.node_id))))
-
- for node_id, node in neighbors:
- node = await node
- if node and node.last_lat and node.last_long:
- results[node_id]['short_name'] = node.short_name
- results[node_id]['long_name'] = node.long_name
- results[node_id]['location'] = (node.last_lat * 1e-7, node.last_long * 1e-7)
- else:
- del results[node_id]
-
- return list(results.values())
-
-
-def node_id_to_hex(node_id):
- if node_id == 4294967295:
- return "^all"
- else:
- return f"!{hex(node_id)[2:]}"
-
-
-def format_timestamp(timestamp):
- if isinstance(timestamp, int):
- timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
- return timestamp.isoformat(timespec="milliseconds")
-
-
-env.filters["node_id_to_hex"] = node_id_to_hex
-env.filters["format_timestamp"] = format_timestamp
-
-
-routes = web.RouteTableDef()
-
-
-@routes.get("/")
-async def index(request):
- template = env.get_template("index.html")
- return web.Response(
- text=template.render(is_hx_request="HX-Request" in request.headers, node=None, site_config = await store.get_site_config()),
- content_type="text/html",
- )
+with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png:
+ empty_png = png.read()
@dataclass
@@ -124,6 +47,7 @@ class Packet:
pretty_payload: Markup
import_time: datetime.datetime
+
@classmethod
def from_model(cls, packet):
mesh_packet, payload = decode_payload.decode(packet)
@@ -173,33 +97,116 @@ class Packet:
raw_payload=payload,
)
+@dataclass
+class UplinkedNode:
+ lat: float
+ long: float
+ long_name: str
+ short_name: str
+ hops: int
+ snr: float
+ rssi: float
-def generate_responce(request, body, raw_node_id="", node=None):
+
+async def build_trace(node_id):
+ trace = []
+ for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)):
+ p = Packet.from_model(raw_p)
+ if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i:
+ continue
+ trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7))
+
+ if not trace:
+ for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP):
+ p = Packet.from_model(raw_p)
+ if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i:
+ continue
+ trace.append((p.raw_payload.latitude_i * 1e-7, p.raw_payload.longitude_i * 1e-7))
+ break
+
+ gc.collect() # Force garbage collection
+ return trace
+
+
+async def build_neighbors(node_id):
+ packets = await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1)
+ packet = packets.first()
+ print(packet)
+ if not packet:
+ return []
+
+ _, payload = decode_payload.decode(packet)
+ neighbors = {}
+
+ # Gather node information asynchronously
+ tasks = {n.node_id: store.get_node(n.node_id) for n in payload.neighbors}
+ results = await asyncio.gather(*tasks.values(), return_exceptions=True)
+
+ for neighbor, node in zip(payload.neighbors, results):
+ if isinstance(node, Exception):
+ continue
+ if node and node.last_lat and node.last_long:
+ neighbors[neighbor.node_id] = {
+ 'node_id': neighbor.node_id,
+ 'snr': neighbor.snr, # Fix dictionary keying issue
+ 'short_name': node.short_name,
+ 'long_name': node.long_name,
+ 'location': (node.last_lat * 1e-7, node.last_long * 1e-7),
+ }
+
+ return list(neighbors.values()) # Return a list of dictionaries
+
+
+
+def node_id_to_hex(node_id):
+ if node_id == 4294967295:
+ return "^all"
+ else:
+ return f"!{hex(node_id)[2:]}"
+
+
+def format_timestamp(timestamp):
+ if isinstance(timestamp, int):
+ timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc)
+ return timestamp.isoformat(timespec="milliseconds")
+
+
+env.filters["node_id_to_hex"] = node_id_to_hex
+env.filters["format_timestamp"] = format_timestamp
+
+routes = web.RouteTableDef()
+@routes.get("/")
+async def index(request):
+ raise web.HTTPFound(location="/map") # Redirect to /home
+
+
+def generate_response(request, body, raw_node_id="", node=None):
if "HX-Request" in request.headers:
return web.Response(text=body, content_type="text/html")
template = env.get_template("index.html")
- return web.Response(
+ response = web.Response(
text=template.render(
is_hx_request="HX-Request" in request.headers,
raw_node_id=raw_node_id,
node_html=Markup(body),
node=node,
+ site_config = await store.get_site_config(),
),
content_type="text/html",
)
+ gc.collect()
+ return response
@routes.get("/node_search")
async def node_search(request):
if not "q" in request.query or not request.query["q"]:
return web.Response(text="Bad node id")
- portnum = request.query.get("portnum")
- if portnum:
- portnum = int(portnum)
- raw_node_id = request.query["q"]
+ raw_node_id = request.query["q"]
node_id = None
+
if raw_node_id == "^all":
node_id = 0xFFFFFFFF
elif raw_node_id[0] == "!":
@@ -225,13 +232,12 @@ async def node_search(request):
)
template = env.get_template("search.html")
- return web.Response(
- text=template.render(
- nodes=fuzzy_nodes,
- query_string=request.query_string,
- ),
+ response = web.Response(
+ text=template.render(nodes=fuzzy_nodes, query_string=request.query_string),
content_type="text/html",
)
+ gc.collect()
+ return response
@routes.get("/node_match")
@@ -259,37 +265,16 @@ async def packet_list(request):
portnum = int(portnum)
else:
portnum = None
- return await _packet_list(request, store.get_packets(node_id, portnum), 'packet')
-
-@routes.get("/uplinked_list/{node_id}")
-async def uplinked_list(request):
- node_id = int(request.match_info["node_id"])
- if portnum := request.query.get("portnum"):
- portnum = int(portnum)
- else:
- portnum = None
- return await _packet_list(request, store.get_uplinked_packets(node_id, portnum), 'uplinked')
-
-# does this needs a web.response ?
-
-async def _packet_list(request, raw_packets, packet_event):
- node_id = int(request.match_info["node_id"])
- if portnum := request.query.get("portnum"):
- portnum = int(portnum)
- else:
- portnum = None
-
async with asyncio.TaskGroup() as tg:
- raw_packets = tg.create_task(raw_packets)
node = tg.create_task(store.get_node(node_id))
+ raw_packets = tg.create_task(store.get_packets(node_id,portnum, limit=200))
trace = tg.create_task(build_trace(node_id))
- neighbors = tg.create_task(build_neighbors(node_id))
+ neighbors = await tg.create_task(build_neighbors(node_id))
has_telemetry = tg.create_task(store.has_packets(node_id, PortNum.TELEMETRY_APP))
- packets = (Packet.from_model(p) for p in await raw_packets)
-
+ packets = [Packet.from_model(p) for p in await raw_packets] # Convert generator to a list
template = env.get_template("node.html")
return web.Response(
text=template.render(
@@ -298,9 +283,8 @@ async def _packet_list(request, raw_packets, packet_event):
node=await node,
portnum=portnum,
packets=packets,
- packet_event=packet_event,
trace=await trace,
- neighbors=await neighbors,
+ neighbors=neighbors,
has_telemetry=await has_telemetry,
query_string=request.query_string,
site_config = await store.get_site_config(),
@@ -309,152 +293,6 @@ async def _packet_list(request, raw_packets, packet_event):
)
-@routes.get("/chat_events")
-async def chat_events(request):
-
- chat_packet = env.get_template("chat_packet.html")
-
- # Precompile regex for filtering out unwanted messages (case insensitive)
- seq_pattern = re.compile(r"seq \d+$", re.IGNORECASE)
-
- # Subscribe to notifications for packets from all nodes (0xFFFFFFFF = broadcast)
- with notify.subscribe(node_id=0xFFFFFFFF) as event:
- async with sse_response(request) as resp:
- while resp.is_connected(): # Keep the connection open while the client is connected
- try:
- # Wait for an event with a timeout of 10 seconds
- await asyncio.wait_for(event.wait(), timeout=10)
- except asyncio.TimeoutError:
- # Timeout reached, continue looping to keep connection alive
- continue
-
- if event.is_set():
- # Extract relevant packets, ensuring event.packets is not None
- packets = [
- p for p in (event.packets or [])
- if p.portnum == PortNum.TEXT_MESSAGE_APP
- ]
- event.clear() # Reset event flag
-
- try:
- for packet in packets:
- ui_packet = Packet.from_model(packet)
-
- # Filter out packets that match "seq
"
- if not seq_pattern.match(ui_packet.payload):
- await resp.send(
- chat_packet.render(packet=ui_packet),
- event="chat_packet", # SSE event type
- )
- except ConnectionResetError:
- # Log when a client disconnects unexpectedly
- print("Client disconnected from SSE stream.")
- return # Exit the loop and close the connection
-
-
-@routes.get("/events")
-async def events(request):
- """
- Server-Sent Events (SSE) endpoint for real-time packet updates.
-
- This endpoint listens for new network packets and streams them to connected clients.
- Clients can optionally filter packets based on `node_id` and `portnum` query parameters.
-
- Query Parameters:
- - node_id (int, optional): Filter packets for a specific node (default: all nodes).
- - portnum (int, optional): Filter packets for a specific port number (default: all ports).
-
- Args:
- request (aiohttp.web.Request): The incoming HTTP request.
-
- Returns:
- aiohttp.web.StreamResponse: SSE response streaming network events.
- """
- # Extract and convert query parameters (if provided)
- node_id = request.query.get("node_id")
- if node_id:
- node_id = int(node_id) # Convert node_id to an integer
-
- portnum = request.query.get("portnum")
- if portnum:
- portnum = int(portnum) # Convert portnum to an integer
-
- # Load Jinja2 templates for rendering packets
- packet_template = env.get_template("packet.html")
- net_packet_template = env.get_template("net_packet.html")
-
- # Subscribe to packet notifications for the given node_id (or all nodes if None)
- with notify.subscribe(node_id) as event:
- async with sse_response(request) as resp:
- while resp.is_connected(): # Keep connection open while client is connected
- try:
- # Wait for an event with a timeout of 10 seconds
- await asyncio.wait_for(event.wait(), timeout=10)
- except asyncio.TimeoutError:
- continue # No new packets, continue waiting
-
- if event.is_set():
- # Extract relevant packets based on `portnum` filter (if provided)
- packets = [
- p for p in (event.packets or [])
- if portnum is None or portnum == p.portnum
- ]
-
- # Extract uplinked packets (if port filter applies)
- uplinked = [
- u for u in (event.uplinked or [])
- if portnum is None or portnum == u.portnum
- ]
-
- event.clear() # Reset event flag
-
- try:
- # Process and send incoming packets
- for packet in packets:
- ui_packet = Packet.from_model(packet)
-
- # Send standard packet event
- await resp.send(
- packet_template.render(
- is_hx_request="HX-Request" in request.headers,
- node_id=node_id,
- packet=ui_packet,
- ),
- event="packet",
- )
-
- # If the packet belongs to `PortNum.TEXT_MESSAGE_APP` and contains "#baymeshnet",
- # send it as a network event
- if ui_packet.portnum == PortNum.TEXT_MESSAGE_APP and '#baymeshnet' in ui_packet.payload.lower():
- await resp.send(
- net_packet_template.render(packet=ui_packet),
- event="net_packet",
- )
-
- # Process and send uplinked packets separately
- for packet in uplinked:
- await resp.send(
- packet_template.render(
- is_hx_request="HX-Request" in request.headers,
- node_id=node_id,
- packet=Packet.from_model(packet),
- ),
- event="uplinked",
- )
-
- except ConnectionResetError:
- print("Client disconnected from SSE stream.")
- return # Gracefully exit on disconnection
-
-@dataclass
-class UplinkedNode:
- lat: float
- long: float
- long_name: str
- short_name: str
- hops: int
- snr: float
- rssi: float
# Updated code p.r.
@routes.get("/packet_details/{packet_id}")
@@ -462,6 +300,7 @@ async def packet_details(request):
packet_id = int(request.match_info["packet_id"])
packets_seen = list(await store.get_packets_seen(packet_id))
packet = await store.get_packet(packet_id)
+ node= await store.get_node(packet.from_node_id)
from_node_cord = None
if packet and packet.from_node and packet.from_node.last_lat:
@@ -503,6 +342,8 @@ async def packet_details(request):
map_center=map_center,
from_node_cord=from_node_cord,
uplinked_nodes=uplinked_nodes,
+ node=node,
+ site_config = await store.get_site_config(),
),
content_type="text/html",
)
@@ -513,7 +354,8 @@ async def packet_details(request):
portnum = request.query.get("portnum")
if portnum:
portnum = int(portnum)
- packets = await store.get_packets(portnum=portnum, limit=50)
+ packets = await store.get_packets(portnum=portnum, limit=20)
+ print_memory_usage()
template = env.get_template("firehose.html")
return web.Response(
text=template.render(
@@ -524,59 +366,24 @@ async def packet_details(request):
content_type="text/html",
)
-@routes.get("/chat")
-async def chat(request):
- try:
- # Fetch packets for the given node ID and port number
- #print("Fetching packets...")
- packets = await store.get_packets(
- node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=100
- )
- #print(f"Fetched {len(packets)} packets.")
-
- # Convert packets to UI packets
- #print("Processing packets...")
- ui_packets = [Packet.from_model(p) for p in packets]
-
- # Filter packets
- #print("Filtering packets...")
- filtered_packets = [
- p for p in ui_packets if not re.match(r"seq \d+$", p.payload)
- ]
-
- # Render template
- #print("Rendering template...")
- template = env.get_template("chat.html")
- return web.Response(
- text=template.render(packets=filtered_packets, site_config = await store.get_site_config()),
- content_type="text/html",
- )
-
- except Exception as e:
- # Log the error and return an appropriate response
- #print(f"Error in chat handler: {e}")
- return web.Response(
- text="An error occurred while processing your request.",
- status=500,
- content_type="text/plain",
- )
-
-
@routes.get("/packet/{packet_id}")
async def packet(request):
packet = await store.get_packet(int(request.match_info["packet_id"]))
if not packet:
- return web.Response(
- status=404,
- )
+ return web.Response(status=404)
+
+ node = await store.get_node(packet.from_node_id)
+ print_memory_usage()
template = env.get_template("packet_index.html")
+
return web.Response(
text=template.render(packet=Packet.from_model(packet), site_config = await store.get_site_config()),
content_type="text/html",
)
+
async def graph_telemetry(node_id, payload_type, graph_config):
data = {'date': []}
fields = []
@@ -631,10 +438,6 @@ async def graph_telemetry(node_id, payload_type, graph_config):
if 'palette' in ax_config:
args['palette'] = ax_config['palette']
sns.lineplot(data=ax_df, ax=ax, **args)
- if i:
- sns.move_legend(ax, "upper right")
- else:
- sns.move_legend(ax, "upper left")
png = io.BytesIO()
plt.savefig(png, dpi=100)
@@ -646,6 +449,7 @@ async def graph_telemetry(node_id, payload_type, graph_config):
)
+
@routes.get("/graph/power/{node_id}")
async def graph_power(request):
return await graph_telemetry(
@@ -819,7 +623,7 @@ async def graph_neighbors(request):
png = io.BytesIO()
plt.savefig(png, dpi=100)
plt.close()
-
+ print_memory_usage()
return web.Response(
body=png.getvalue(),
content_type="image/png",
@@ -861,12 +665,12 @@ async def graph_neighbors2(request):
df = DataFrame(data)
fig = px.line(df, x="time", y="snr", color="node_name", markers=True)
html = fig.to_html(full_html=True, include_plotlyjs='cdn')
+ print_memory_usage()
return web.Response(
text=html,
content_type="text/html",
)
-
@routes.get("/graph/traceroute/{packet_id}")
async def graph_traceroute(request):
packet_id = int(request.match_info['packet_id'])
@@ -969,6 +773,134 @@ async def graph_traceroute(request):
content_type="image/svg+xml",
)
+
+
+
+@routes.get("/graph/traceroute2/{packet_id}")
+async def graph_traceroute2(request):
+ packet_id = int(request.match_info['packet_id'])
+ traceroutes = list(await store.get_traceroute(packet_id))
+
+ # Fetch the packet
+ packet = await store.get_packet(packet_id)
+ if not packet:
+ return web.Response(status=404)
+
+ node_ids = set()
+ for tr in traceroutes:
+ route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
+ node_ids.add(tr.gateway_node_id)
+ for node_id in route.route:
+ node_ids.add(node_id)
+ node_ids.add(packet.from_node_id)
+ node_ids.add(packet.to_node_id)
+
+ nodes = {}
+ async with asyncio.TaskGroup() as tg:
+ for node_id in node_ids:
+ nodes[node_id] = tg.create_task(store.get_node(node_id))
+
+ # Initialize graph for traceroute
+ graph = pydot.Dot('traceroute', graph_type="digraph")
+
+ paths = set()
+ node_color = {}
+ mqtt_nodes = set()
+ saw_reply = set()
+ dest = None
+ node_seen_time = {}
+ for tr in traceroutes:
+ if tr.done:
+ saw_reply.add(tr.gateway_node_id)
+ if tr.done and dest:
+ continue
+ route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
+ path = [packet.from_node_id]
+ path.extend(route.route)
+ if tr.done:
+ dest = packet.to_node_id
+ path.append(packet.to_node_id)
+ elif path[-1] != tr.gateway_node_id:
+ path.append(tr.gateway_node_id)
+
+ if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time:
+ node_seen_time[path[-1]] = tr.import_time
+
+ mqtt_nodes.add(tr.gateway_node_id)
+ node_color[path[-1]] = '#' + hex(hash(tuple(path)))[3:9]
+ paths.add(tuple(path))
+
+ used_nodes = set()
+ for path in paths:
+ used_nodes.update(path)
+
+ import_times = [tr.import_time for tr in traceroutes if tr.import_time]
+ if import_times:
+ first_time = min(import_times)
+ else:
+ first_time = 0
+
+ # Prepare data for ECharts rendering
+ chart_nodes = []
+ chart_edges = []
+ for node_id in used_nodes:
+ node = await nodes[node_id]
+ if not node:
+ # Handle case where node is None
+ node_name = node_id_to_hex(node_id)
+ chart_nodes.append({
+ "name": str(node_id),
+ "value": node_name,
+ "symbol": 'rect',
+ })
+ else:
+ node_name = f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}'
+ if node_id in node_seen_time:
+ ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000
+ node_name += f'\n {ms:.2f}ms'
+ style = 'dashed'
+ if node_id == dest:
+ style = 'filled'
+ elif node_id in mqtt_nodes:
+ style = 'solid'
+
+ if node_id in saw_reply:
+ style += ', diagonals'
+
+ chart_nodes.append({
+ "name": str(node_id),
+ "value": node_name,
+ "symbol": 'rect',
+ "long_name": node.long_name,
+ "short_name": node.short_name,
+ "role": node.role,
+ "hw_model": node.hw_model,
+ })
+
+ # Create edges
+ for path in paths:
+ color = '#' + hex(hash(tuple(path)))[3:9]
+ for src, dest in zip(path, path[1:]):
+ chart_edges.append({
+ "source": str(src),
+ "target": str(dest),
+ "originalColor": color,
+ })
+
+ chart_data = {
+ "nodes": chart_nodes,
+ "edges": chart_edges,
+ }
+
+ template = env.get_template("traceroute.html")
+ # Render the page with the chart data
+ return web.Response(
+ text=template.render(chart_data=chart_data, packet_id=packet_id),
+ content_type="text/html",
+ )
+
+
+
@routes.get("/graph/network")
async def graph_network(request):
root = request.query.get("root")
@@ -1123,405 +1055,13 @@ async def graph_network(request):
penwidth=1.85,
dir=edge_dir,
))
-
+ print_memory_usage()
return web.Response(
body=graph.create_svg(),
content_type="image/svg+xml",
)
-@routes.get("/stats")
-async def stats(request):
- try:
- # Add logging to track execution
- total_packets = await store.get_total_packet_count()
- total_nodes = await store.get_total_node_count()
- total_packets_seen = await store.get_total_packet_seen_count()
- total_nodes_longfast = await store.get_total_node_count_longfast()
- total_nodes_mediumslow = await store.get_total_node_count_mediumslow()
-
- # Render template
- #print("Rendering template...")
- template = env.get_template("stats.html")
- return web.Response(
- text=template.render(
- total_packets=total_packets,
- total_nodes=total_nodes,
- total_packets_seen=total_packets_seen,
- total_nodes_longfast=total_nodes_longfast,
- total_nodes_mediumslow=total_nodes_mediumslow,
- site_config = await store.get_site_config(),
- ),
- content_type="text/html",
- )
- except Exception as e:
- # Log and return error response
- #print(f"Error in stats handler: {e}")
- return web.Response(
- text="An error occurred while processing your request.",
- status=500,
- content_type="text/plain",
- )
-
-
-@routes.get("/graph/longfast")
-async def graph_network_longfast(request):
- try:
- root = request.query.get("root")
- depth = int(request.query.get("depth", 5))
- hours = int(request.query.get("hours", 24))
- minutes = int(request.query.get("minutes", 0))
-
- # Validation
- if root and not root.isdigit():
- return web.Response(status=400, text="Invalid root node ID.")
- if depth < 1:
- return web.Response(status=400, text="Depth must be at least 1.")
-
- since = datetime.timedelta(hours=hours, minutes=minutes)
-
- nodes = {}
- node_ids = set()
- traceroutes = []
-
- # Fetch traceroutes
- for tr in await store.get_traceroutes_longfast(since):
- node_ids.add(tr.gateway_node_id)
- node_ids.add(tr.packet.from_node_id)
- node_ids.add(tr.packet.to_node_id)
- route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
- node_ids.update(route.route)
-
- path = [tr.packet.from_node_id]
- path.extend(route.route)
- if tr.done:
- path.append(tr.packet.to_node_id)
- else:
- if path[-1] != tr.gateway_node_id:
- path.append(tr.gateway_node_id)
- traceroutes.append((tr, path))
-
- edges = Counter()
- edge_type = {}
- used_nodes = set()
-
- # Fetch MQTT neighbors
- for ps, p in await store.get_mqtt_neighbors_longfast(since):
- node_ids.add(ps.node_id)
- node_ids.add(p.from_node_id)
- used_nodes.add(ps.node_id)
- used_nodes.add(p.from_node_id)
- edges[(p.from_node_id, ps.node_id)] += 1
- edge_type[(p.from_node_id, ps.node_id)] = 'sni'
-
- # Fetch NeighborInfo packets
- for packet in await store.get_packets_longfast(portnum=PortNum.NEIGHBORINFO_APP, since=since):
- try:
- _, neighbor_info = decode_payload.decode(packet)
- node_ids.add(packet.from_node_id)
- used_nodes.add(packet.from_node_id)
- for node in neighbor_info.neighbors:
- node_ids.add(node.node_id)
- used_nodes.add(node.node_id)
- edges[(node.node_id, packet.from_node_id)] += 1
- edge_type[(node.node_id, packet.from_node_id)] = 'ni'
- except Exception as e:
- print(f"Error decoding NeighborInfo packet: {e}")
-
- # Retrieve node details concurrently
- async with asyncio.TaskGroup() as tg:
- for node_id in node_ids:
- nodes[node_id] = tg.create_task(store.get_node(node_id))
-
- tr_done = set()
- for tr, path in traceroutes:
- if tr.done:
- if tr.packet_id in tr_done:
- continue
- else:
- tr_done.add(tr.packet_id)
-
- for src, dest in zip(path, path[1:]):
- used_nodes.add(src)
- used_nodes.add(dest)
- edges[(src, dest)] += 1
- edge_type[(src, dest)] = 'tr'
-
- # Helper to get node name
- async def get_node_name(node_id):
- try:
- node = await nodes[node_id]
- if node:
- return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}'
- return node_id_to_hex(node_id)
- except Exception as e:
- return f"Error retrieving node: {str(e)}"
-
- # Filter nodes and edges if root is specified
- if root:
- new_used_nodes = set()
- new_edges = Counter()
- edge_map = {}
- for src, dest in edges:
- edge_map.setdefault(dest, []).append(src)
-
- queue = [int(root)]
- for _ in range(depth):
- next_queue = []
- for node in queue:
- new_used_nodes.add(node)
- for dest in edge_map.get(node, []):
- new_used_nodes.add(dest)
- new_edges[(dest, node)] += 1
- next_queue.append(dest)
- queue = next_queue
-
- used_nodes = new_used_nodes
- edges = new_edges
-
- # Create graph
- graph = pydot.Dot('network', graph_type="digraph", layout="sfdp", overlap="scale", model='subset', splines="true")
- for node_id in used_nodes:
- node = await nodes[node_id]
- color = '#000000'
- node_name = await get_node_name(node_id)
- if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'):
- color = '#0000FF'
- #elif node and node.role == 'CLIENT_MUTE':
- # color = '#00FF00'
- graph.add_node(pydot.Node(
- str(node_id),
- label=node_name,
- shape='box',
- color=color,
- fontsize="10", width="0", height="0",
- href=f"/graph/longfast?root={node_id}&depth={depth-1}",
- ))
-
- # Adjust edge visualization
- if edges:
- max_edge_count = edges.most_common(1)[0][1]
- else:
- max_edge_count = 1
-
- size_ratio = 2. / max_edge_count
- edge_added = set()
-
- for (src, dest), edge_count in edges.items():
- size = max(size_ratio * edge_count, .25)
- arrowsize = max(size_ratio * edge_count, .5)
- if edge_type[(src, dest)] in ('ni'):
- color = '#FF0000'
- elif edge_type[(src, dest)] in ('sni'):
- color = '#040fb3'
- else:
- color = '#000000'
- edge_dir = "forward"
- if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]:
- edge_dir = "both"
- edge_added.add((dest, src))
-
- if (src, dest) not in edge_added:
- edge_added.add((src, dest))
- graph.add_edge(pydot.Edge(
- str(src),
- str(dest),
- color=color,
- tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}',
- penwidth=.5,
- dir=edge_dir,
- arrowsize=".5",
- ))
-
- return web.Response(
- body=graph.create_svg(),
- content_type="image/svg+xml",
- )
-
- except Exception as e:
- print(f"Error in graph_network_longfast: {e}")
- return web.Response(status=500, text="Internal Server Error")
-
-
-
-@routes.get("/graph/mediumslow")
-async def graph_network_mediumslow(request):
- try:
- root = request.query.get("root")
- depth = int(request.query.get("depth", 3))
- hours = int(request.query.get("hours", 24))
- minutes = int(request.query.get("minutes", 0))
-
- # Validation
- if root and not root.isdigit():
- return web.Response(status=400, text="Invalid root node ID.")
- if depth < 1:
- return web.Response(status=400, text="Depth must be at least 1.")
-
- since = datetime.timedelta(hours=hours, minutes=minutes)
-
- nodes = {}
- node_ids = set()
- traceroutes = []
-
- # Fetch traceroutes
- for tr in await store.get_traceroutes_mediumslow(since):
- node_ids.add(tr.gateway_node_id)
- node_ids.add(tr.packet.from_node_id)
- node_ids.add(tr.packet.to_node_id)
- route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
- node_ids.update(route.route)
-
- path = [tr.packet.from_node_id]
- path.extend(route.route)
- if tr.done:
- path.append(tr.packet.to_node_id)
- else:
- if path[-1] != tr.gateway_node_id:
- path.append(tr.gateway_node_id)
- traceroutes.append((tr, path))
-
- edges = Counter()
- edge_type = {}
- used_nodes = set()
-
- # Fetch MQTT neighbors
- for ps, p in await store.get_mqtt_neighbors_mediumslow(since):
- node_ids.add(ps.node_id)
- node_ids.add(p.from_node_id)
- used_nodes.add(ps.node_id)
- used_nodes.add(p.from_node_id)
- edges[(p.from_node_id, ps.node_id)] += 1
- edge_type[(p.from_node_id, ps.node_id)] = 'sni'
-
- # Fetch NeighborInfo packets
- for packet in await store.get_packets_mediumslow(portnum=PortNum.NEIGHBORINFO_APP, since=since):
- try:
- _, neighbor_info = decode_payload.decode(packet)
- node_ids.add(packet.from_node_id)
- used_nodes.add(packet.from_node_id)
- for node in neighbor_info.neighbors:
- node_ids.add(node.node_id)
- used_nodes.add(node.node_id)
- edges[(node.node_id, packet.from_node_id)] += 1
- edge_type[(node.node_id, packet.from_node_id)] = 'ni'
- except Exception as e:
- print(f"Error decoding NeighborInfo packet: {e}")
-
- # Retrieve node details concurrently
- async with asyncio.TaskGroup() as tg:
- for node_id in node_ids:
- nodes[node_id] = tg.create_task(store.get_node(node_id))
-
- tr_done = set()
- for tr, path in traceroutes:
- if tr.done:
- if tr.packet_id in tr_done:
- continue
- else:
- tr_done.add(tr.packet_id)
-
- for src, dest in zip(path, path[1:]):
- used_nodes.add(src)
- used_nodes.add(dest)
- edges[(src, dest)] += 1
- edge_type[(src, dest)] = 'tr'
-
- # Helper to get node name
- async def get_node_name(node_id):
- try:
- node = await nodes[node_id]
- if node:
- return f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}'
- return node_id_to_hex(node_id)
- except Exception as e:
- return f"Error retrieving node: {str(e)}"
-
- # Filter nodes and edges if root is specified
- if root:
- new_used_nodes = set()
- new_edges = Counter()
- edge_map = {}
- for src, dest in edges:
- edge_map.setdefault(dest, []).append(src)
-
- queue = [int(root)]
- for _ in range(depth):
- next_queue = []
- for node in queue:
- new_used_nodes.add(node)
- for dest in edge_map.get(node, []):
- new_used_nodes.add(dest)
- new_edges[(dest, node)] += 1
- next_queue.append(dest)
- queue = next_queue
-
- used_nodes = new_used_nodes
- edges = new_edges
-
- # Create graph
- graph = pydot.Dot('network', graph_type="digraph", layout="sfdp", overlap="scale", model='subset', esep="+5", splines="true", nodesep="2", ranksep="2")
-
- for node_id in used_nodes:
- node = await nodes[node_id]
- color = '#000000'
- node_name = await get_node_name(node_id)
- if node and node.role in ('ROUTER', 'ROUTER_CLIENT', 'REPEATER'):
- color = '#0000FF'
- elif node and node.role == 'CLIENT_MUTE':
- color = '#00FF00'
- graph.add_node(pydot.Node(
- str(node_id),
- label=node_name,
- shape='box',
- color=color,
- fontsize="10", width="0", height="0",
- href=f"/graph/mediumslow?root={node_id}&depth={depth-1}",
- ))
-
- # Adjust edge visualization
- if edges:
- max_edge_count = edges.most_common(1)[0][1]
- else:
- max_edge_count = 1
-
- size_ratio = 2. / max_edge_count
- edge_added = set()
-
- for (src, dest), edge_count in edges.items():
- size = max(size_ratio * edge_count, .25)
- arrowsize = max(size_ratio * edge_count, .5)
- if edge_type[(src, dest)] in ('ni'):
- color = '#FF0000'
- elif edge_type[(src, dest)] in ('sni'):
- color = '#040fb3'
- else:
- color = '#000000'
- edge_dir = "forward"
- if (dest, src) in edges and edge_type[(src, dest)] == edge_type[(dest, src)]:
- edge_dir = "both"
- edge_added.add((dest, src))
-
- if (src, dest) not in edge_added:
- edge_added.add((src, dest))
- graph.add_edge(pydot.Edge(
- str(src),
- str(dest),
- color=color,
- tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}',
- penwidth=.5,
- dir=edge_dir,
- arrowsize=".5",
- ))
-
- return web.Response(
- body=graph.create_svg(),
- content_type="image/svg+xml",
- )
-
- except Exception as e:
- print(f"Error in graph_network_longfast: {e}")
- return web.Response(status=500, text="Internal Server Error")
@routes.get("/nodelist")
async def nodelist(request):
@@ -1534,7 +1074,7 @@ async def nodelist(request):
#print(hw_model)
nodes= await store.get_nodes(role,channel, hw_model)
template = env.get_template("nodelist.html")
-
+ print_memory_usage()
return web.Response(
text=template.render(nodes=nodes, site_config = await store.get_site_config()),
content_type="text/html",
@@ -1551,6 +1091,7 @@ async def nodelist(request):
@routes.get("/net")
async def net(request):
try:
+ print_memory_usage()
# Fetch packets for the given node ID and port number
packets = await store.get_packets(
node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=200
@@ -1579,7 +1120,7 @@ async def net(request):
raise # Let aiohttp handle HTTP exceptions properly
except Exception as e:
- print("Error processing chat request")
+ print("Error processing net request")
return web.Response(
text="An internal server error occurred.",
status=500,
@@ -1587,47 +1128,12 @@ async def net(request):
)
-@routes.get("/net_events")
-async def net_events(request):
- chat_packet = env.get_template("net_packet.html")
-
- # Precompile regex for performance (case insensitive)
- seq_pattern = re.compile(r"seq \d+$")
-
- with notify.subscribe(node_id=0xFFFFFFFF) as event:
- async with sse_response(request) as resp:
- while resp.is_connected():
- try:
- await asyncio.wait_for(event.wait(), timeout=10)
- except asyncio.TimeoutError:
- continue # Timeout occurred, loop again
-
- if event.is_set():
- # Ensure event.packets is valid before accessing it
- packets = [
- p for p in (event.packets or [])
- if p.portnum == PortNum.TEXT_MESSAGE_APP
- ]
- event.clear()
-
- try:
- for packet in packets:
- ui_packet = Packet.from_model(packet)
- if not seq_pattern.match(ui_packet.payload) and "baymeshnet" in ui_packet.payload.lower():
- await resp.send(
- chat_packet.render(packet=ui_packet),
- event="net_packet",
- )
- except ConnectionResetError:
- print("Client disconnected from SSE stream.")
- return # Gracefully exit on disconnection
-
-
@routes.get("/map")
async def map(request):
try:
nodes= await store.get_nodes()
template = env.get_template("map.html")
+ print_memory_usage()
return web.Response(
text=template.render(nodes=nodes, site_config = await store.get_site_config()),
content_type="text/html",
@@ -1640,7 +1146,191 @@ async def map(request):
content_type="text/plain",
)
+
+# Print memory usage
+def print_memory_usage():
+ process = psutil.Process(os.getpid())
+ print(f"Memory Usage: {process.memory_info().rss / (1024 * 1024):.2f} MB")
+
+@routes.get("/stats")
+async def stats(request):
+ try:
+ total_packets = await store.get_total_packet_count()
+ total_nodes = await store.get_total_node_count()
+ total_packets_seen = await store.get_total_packet_seen_count()
+ total_nodes_longfast = await store.get_total_node_count_longfast()
+ total_nodes_mediumslow = await store.get_total_node_count_mediumslow()
+ print_memory_usage()
+ template = env.get_template("stats.html")
+ return web.Response(
+ text=template.render(
+ total_packets=total_packets,
+ total_nodes=total_nodes,
+ total_packets_seen=total_packets_seen,
+ total_nodes_longfast=total_nodes_longfast,
+ total_nodes_mediumslow=total_nodes_mediumslow,
+ site_config = await store.get_site_config(),
+ ),
+ content_type="text/html",
+ )
+ except Exception as e:
+ return web.Response(
+ text="An error occurred while processing your request.",
+ status=500,
+ content_type="text/plain",
+ )
+
+@routes.get("/top")
+async def top(request):
+ try:
+ node_id = request.query.get("node_id") # Get node_id from the URL query parameters
+
+ if node_id:
+ # If node_id is provided, fetch traffic data for the specific node
+ node_traffic = await store.get_node_traffic(int(node_id))
+ print(node_traffic)
+ template = env.get_template("node_traffic.html") # Render a different template
+ html_content = template.render(traffic=node_traffic, node_id=node_id)
+ else:
+ # Otherwise, fetch top traffic nodes as usual
+ top_nodes = await store.get_top_traffic_nodes()
+ template = env.get_template("top.html")
+ html_content = template.render(nodes=top_nodes, site_config = await store.get_site_config())
+
+ return web.Response(
+ text=html_content,
+ content_type="text/html",
+ )
+ except Exception as e:
+ return web.Response(
+ text=f"An error occurred: {str(e)}",
+ status=500,
+ content_type="text/plain",
+ )
+
+
+
+@routes.get("/chat")
+async def chat(request):
+ try:
+ # Fetch packets for the given node ID and port number
+ packets = await store.get_packets(
+ node_id=0xFFFFFFFF, portnum=PortNum.TEXT_MESSAGE_APP, limit=100
+ )
+ #print(f"Fetched {len(packets)} packets.")
+
+ # Convert packets to UI packets
+ #print("Processing packets...")
+ ui_packets = [Packet.from_model(p) for p in packets]
+
+ # Filter packets
+ #print("Filtering packets...")
+ filtered_packets = [
+ p for p in ui_packets if not re.match(r"seq \d+$", p.payload)
+ ]
+
+ # Render template
+ #print("Rendering template...")
+ template = env.get_template("chat.html")
+ return web.Response(
+ text=template.render(packets=filtered_packets, site_config = await store.get_site_config()),
+ content_type="text/html",
+ )
+
+ except Exception as e:
+ # Log the error and return an appropriate response
+ #print(f"Error in chat handler: {e}")
+ return web.Response(
+ text="An error occurred while processing your request.",
+ status=500,
+ content_type="text/plain",
+ )
+
+
+# Assuming the route URL structure is /nodegraph/{channel}
+@routes.get("/nodegraph/{channel}")
+async def nodegraph(request):
+ channel = request.match_info.get('channel', 'LongFast') # Default to 'MediumSlow' if no channel is provided
+ nodes = await store.get_nodes(channel=channel) # Fetch nodes for the given channel
+ node_ids = set()
+ edges_set = set() # Track unique edges
+ edge_type = {} # Store type of each edge
+ used_nodes = set() # This will track nodes involved in edges (including traceroutes)
+ since = datetime.timedelta(hours=48)
+ traceroutes = []
+
+ # Fetch traceroutes
+ for tr in await store.get_traceroutes(since):
+ node_ids.add(tr.gateway_node_id)
+ node_ids.add(tr.packet.from_node_id)
+ node_ids.add(tr.packet.to_node_id)
+ route = decode_payload.decode_payload(PortNum.TRACEROUTE_APP, tr.route)
+ node_ids.update(route.route)
+
+ path = [tr.packet.from_node_id]
+ path.extend(route.route)
+ if tr.done:
+ path.append(tr.packet.to_node_id)
+ else:
+ if path[-1] != tr.gateway_node_id:
+ path.append(tr.gateway_node_id)
+ traceroutes.append((tr, path))
+
+ # Add traceroute edges with their type and update used_nodes
+ for i in range(len(path) - 1):
+ edge_pair = (path[i], path[i + 1])
+ edges_set.add(edge_pair)
+ edge_type[edge_pair] = "traceroute"
+ used_nodes.add(path[i]) # Add all nodes in the traceroute path
+ used_nodes.add(path[i + 1]) # Add all nodes in the traceroute path
+
+ # Fetch NeighborInfo packets
+ for packet in await store.get_packets(portnum=PortNum.NEIGHBORINFO_APP, since=since):
+ try:
+ _, neighbor_info = decode_payload.decode(packet)
+ node_ids.add(packet.from_node_id)
+ used_nodes.add(packet.from_node_id)
+ for node in neighbor_info.neighbors:
+ node_ids.add(node.node_id)
+ used_nodes.add(node.node_id)
+
+ edge_pair = (node.node_id, packet.from_node_id)
+ if edge_pair not in edges_set:
+ edges_set.add(edge_pair)
+ edge_type[edge_pair] = "neighbor"
+ except Exception as e:
+ print(f"Error decoding NeighborInfo packet: {e}")
+
+ # Convert edges_set to a list of dicts with colors
+ edges = [
+ {
+ "from": frm,
+ "to": to,
+ "originalColor": "#ff5733" if edge_type[(frm, to)] == "traceroute" else "#3388ff", # Red for traceroute, Blue for neighbor
+ "lineStyle": {
+ "color": "#ff5733" if edge_type[(frm, to)] == "traceroute" else "#3388ff",
+ "width": 2
+ }
+ }
+ for frm, to in edges_set
+ ]
+
+ # Filter nodes to only include those involved in edges (including traceroutes)
+ nodes_with_edges = [node for node in nodes if node.node_id in used_nodes]
+
+ template = env.get_template("nodegraph.html")
+ return web.Response(
+ text=template.render(
+ nodes=nodes_with_edges,
+ edges=edges, # Pass edges with color info
+ ),
+ content_type="text/html",
+ )
+
+
+
async def run_server(bind, port, tls_cert):
+ gc.set_threshold(10, 10, 10)
app = web.Application()
app.add_routes(routes)
runner = web.AppRunner(app)
@@ -1653,6 +1343,7 @@ async def run_server(bind, port, tls_cert):
for host in bind:
site = web.TCPSite(runner, host, port, ssl_context=ssl_context)
await site.start()
-
+ print_memory_usage()
while True:
await asyncio.sleep(3600) # sleep forever
+
diff --git a/requirements.txt b/requirements.txt
index e064e6a..1a5d9ad 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,13 +1,46 @@
-protobuf
-aiomqtt
-sqlalchemy[asyncio]
-cryptography
-aiosqlite
-aiohttp
-aiodns
-Jinja2
+protobuf~=5.29.3
+aiomqtt~=2.3.0
+sqlalchemy[asyncio]~=2.0.38
+cryptography~=44.0.1
+aiosqlite~=0.21.0
+aiohttp~=3.11.12
+aiodns~=3.2.0
+Jinja2~=3.1.5
aiohttp-sse
-asyncpg
-seaborn
-pydot
-plotly
+asyncpg~=0.30.0
+seaborn~=0.13.2
+pydot~=3.0.4
+plotly~=6.0.0
+
+numpy~=2.2.3
+pillow~=11.1.0
+pip~=23.2.1
+attrs~=25.1.0
+cffi~=1.17.1
+paho-mqtt~=2.1.0
+pytz~=2025.1
+idna~=3.10
+multidict~=6.1.0
+propcache~=0.2.1
+typing_extensions~=4.12.2
+pyparsing~=3.2.1
+pycares~=4.5.0
+MarkupSafe~=3.0.2
+pandas~=2.2.3
+matplotlib~=3.10.0
+python-dateutil~=2.9.0.post0
+packaging~=24.2
+narwhals~=1.27.1
+yarl~=1.18.3
+aiosignal~=1.3.2
+frozenlist~=1.5.0
+aiohappyeyeballs~=2.4.6
+cycler~=0.12.1
+six~=1.17.0
+greenlet~=3.1.1
+psutil~=7.0.0
+objgraph~=3.6.2
+contourpy~=1.3.1
+fonttools~=4.56.0
+pycparser~=2.22
+kiwisolver~=1.4.8
\ No newline at end of file
diff --git a/sample.config.ini b/sample.config.ini
index b1fce3a..5f9e91f 100644
--- a/sample.config.ini
+++ b/sample.config.ini
@@ -6,10 +6,13 @@ acme_challenge =
[mqtt]
server = mqtt.bayme.sh
-topics = ["msh/US/bayarea/#", "msh/US/CA/mrymesh/#"]
+topics = ["msh/US/bayarea/#", "msh/US/CA/mrymesh/#"]
port = 1883
username = meshdev
password = large4cats
[database]
-connection_string = sqlite+aiosqlite:///packets.db
\ No newline at end of file
+connection_string = sqlite+aiosqlite:///packets.db
+
+[website]
+title = San Francisco Bay Area Mesh
\ No newline at end of file
diff --git a/startdb.py b/startdb.py
new file mode 100644
index 0000000..9aba173
--- /dev/null
+++ b/startdb.py
@@ -0,0 +1,50 @@
+import asyncio
+import argparse
+import configparser
+from meshview import mqtt_reader
+from meshview import mqtt_database
+from meshview import mqtt_store
+import json
+
+
+async def load_database_from_mqtt(mqtt_server: str , mqtt_port: int, topic: list, mqtt_user: str | None = None, mqtt_passwd: str | None = None):
+ async for topic, env in mqtt_reader.get_topic_envelopes(mqtt_server, mqtt_port, topic, mqtt_user, mqtt_passwd):
+ await mqtt_store.process_envelope(topic, env)
+
+
+async def main(config):
+ mqtt_database.init_database(config["database"]["connection_string"])
+
+ await mqtt_database.create_tables()
+ mqtt_user = None
+ mqtt_passwd = None
+ if config["mqtt"]["username"] != "":
+ mqtt_user: str = config["mqtt"]["username"]
+ if config["mqtt"]["password"] != "":
+ mqtt_passwd: str = config["mqtt"]["password"]
+ mqtt_topics = json.loads(config["mqtt"]["topics"])
+
+ async with asyncio.TaskGroup() as tg:
+ tg.create_task(
+ load_database_from_mqtt(config["mqtt"]["server"], int(config["mqtt"]["port"]), mqtt_topics, mqtt_user, mqtt_passwd)
+ )
+
+
+def load_config(file_path):
+ """Load configuration from an INI-style text file."""
+ config_parser = configparser.ConfigParser()
+ config_parser.read(file_path)
+
+ # Convert to a dictionary for easier access
+ config = {section: dict(config_parser.items(section)) for section in config_parser.sections()}
+ return config
+
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser("meshview")
+ parser.add_argument("--config", help="Path to the configuration file.", default='config.ini')
+ args = parser.parse_args()
+
+ config = load_config(args.config)
+
+ asyncio.run(main(config))
\ No newline at end of file