diff --git a/README b/README index d4c2f7f..1273ae6 100644 --- a/README +++ b/README @@ -1,6 +1,8 @@ Meshview ======== +Now running at https://meshview.bayme.sh + This project watches a MQTT topic for meshtastic messages, imports them to a database and has a web UI to view them. Requires Python 3.12 diff --git a/images/main.png b/images/main.png new file mode 100644 index 0000000..d2aab24 Binary files /dev/null and b/images/main.png differ diff --git a/meshview/database.py b/meshview/database.py index 1d027ee..a20e37b 100644 --- a/meshview/database.py +++ b/meshview/database.py @@ -1,20 +1,16 @@ -from sqlalchemy.ext.asyncio import async_sessionmaker -from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.ext.asyncio import create_async_engine - from meshview import models +from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker - -def init_database(database_connetion_string): +def init_database(database_connection_string): global engine, async_session kwargs = {} - if not database_connetion_string.startswith('sqlite'): + if not database_connection_string.startswith('sqlite'): kwargs['pool_size'] = 20 kwargs['max_overflow'] = 50 - engine = create_async_engine(database_connetion_string, echo=False, **kwargs) + print (**kwargs) + engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 15}) async_session = async_sessionmaker(engine, expire_on_commit=False) - async def create_tables(): async with engine.begin() as conn: await conn.run_sync(models.Base.metadata.create_all) diff --git a/meshview/decode_payload.py b/meshview/decode_payload.py index 99fe696..f8f9a20 100644 --- a/meshview/decode_payload.py +++ b/meshview/decode_payload.py @@ -1,3 +1,4 @@ +from meshtastic.protobuf.mqtt_pb2 import MapReport from meshtastic.protobuf.portnums_pb2 import PortNum from meshtastic.protobuf.mesh_pb2 import ( Position, @@ -24,6 +25,7 @@ DECODE_MAP = { PortNum.TRACEROUTE_APP: RouteDiscovery.FromString, PortNum.ROUTING_APP: Routing.FromString, PortNum.TEXT_MESSAGE_APP: text_message, + PortNum.MAP_REPORT_APP: MapReport.FromString } diff --git a/meshview/models.py b/meshview/models.py index 8932032..569943c 100644 --- a/meshview/models.py +++ b/meshview/models.py @@ -14,30 +14,31 @@ class Node(Base): __tablename__ = "node" id: Mapped[str] = mapped_column(primary_key=True) node_id: Mapped[int] = mapped_column(BigInteger, nullable=True, unique=True) - long_name: Mapped[str] - short_name: Mapped[str] - hw_model: Mapped[str] + long_name: Mapped[str] = mapped_column(nullable=True) + short_name: Mapped[str] = mapped_column(nullable=True) + hw_model: Mapped[str] = mapped_column(nullable=True) + firmware: Mapped[str] = mapped_column(nullable=True) role: Mapped[str] = mapped_column(nullable=True) last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True) last_long: Mapped[int] = mapped_column(BigInteger, nullable=True) - channel: Mapped[str] - + channel: Mapped[str] = mapped_column(nullable=True) + last_update: Mapped[datetime] = mapped_column(nullable=True) class Packet(Base): __tablename__ = "packet" id: Mapped[int] = mapped_column(BigInteger, primary_key=True) - portnum: Mapped[int] - from_node_id: Mapped[int] = mapped_column(BigInteger) + portnum: Mapped[int] = mapped_column(nullable=True) + from_node_id: Mapped[int] = mapped_column(BigInteger, nullable=True) from_node: Mapped["Node"] = relationship( primaryjoin="Packet.from_node_id == foreign(Node.node_id)", lazy="joined" ) - to_node_id: Mapped[int] = mapped_column(BigInteger) + to_node_id: Mapped[int] = mapped_column(BigInteger,nullable=True) to_node: Mapped["Node"] = relationship( primaryjoin="Packet.to_node_id == foreign(Node.node_id)", lazy="joined" ) - payload: Mapped[bytes] - import_time: Mapped[datetime] - channel: Mapped[str] + payload: Mapped[bytes] = mapped_column(nullable=True) + import_time: Mapped[datetime] = mapped_column(nullable=True) + channel: Mapped[str] = mapped_column(nullable=True) class PacketSeen(Base): @@ -48,13 +49,13 @@ class PacketSeen(Base): lazy="joined", primaryjoin="PacketSeen.node_id == foreign(Node.node_id)" ) rx_time: Mapped[int] = mapped_column(BigInteger, primary_key=True) - hop_limit: Mapped[int] + hop_limit: Mapped[int] = mapped_column(nullable=True) hop_start: Mapped[int] = mapped_column(nullable=True) - channel: Mapped[str] + channel: Mapped[str] = mapped_column(nullable=True) rx_snr: Mapped[float] = mapped_column(nullable=True) rx_rssi: Mapped[int] = mapped_column(nullable=True) - topic: Mapped[str] - import_time: Mapped[datetime] + topic: Mapped[str] = mapped_column(nullable=True) + import_time: Mapped[datetime] = mapped_column(nullable=True) class Traceroute(Base): @@ -64,8 +65,8 @@ class Traceroute(Base): packet: Mapped["Packet"] = relationship( primaryjoin="Traceroute.packet_id == foreign(Packet.id)", lazy="joined" ) - gateway_node_id: Mapped[int] = mapped_column(BigInteger) - done: Mapped[bool] - route: Mapped[bytes] - import_time: Mapped[datetime] + gateway_node_id: Mapped[int] = mapped_column(BigInteger, nullable=True) + done: Mapped[bool] = mapped_column(nullable=True) + route: Mapped[bytes] = mapped_column(nullable=True) + import_time: Mapped[datetime] = mapped_column(nullable=True) diff --git a/meshview/mqtt_reader.py b/meshview/mqtt_reader.py index 1bac492..5453f5c 100644 --- a/meshview/mqtt_reader.py +++ b/meshview/mqtt_reader.py @@ -1,11 +1,9 @@ import base64 import asyncio import random - import aiomqtt from google.protobuf.message import DecodeError from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==") diff --git a/meshview/notify.py b/meshview/notify.py index d74b383..57268e9 100644 --- a/meshview/notify.py +++ b/meshview/notify.py @@ -34,16 +34,13 @@ def create_event(node_id): def remove_event(node_event): - print("removing event") waiting_node_ids_events[node_event.node_id].remove(node_event) - def notify_packet(node_id, packet): for event in waiting_node_ids_events[node_id]: event.packets.append(packet) event.set() - def notify_uplinked(node_id, packet): for event in waiting_node_ids_events[node_id]: event.uplinked.append(packet) @@ -52,8 +49,15 @@ def notify_uplinked(node_id, packet): @contextlib.contextmanager def subscribe(node_id): + """ + Context manager for subscribing to events for a node_id. + Automatically manages event creation and cleanup. + """ event = create_event(node_id) try: yield event + except Exception as e: + print(f"Error during subscription for node_id={node_id}: {e}") + raise finally: remove_event(event) diff --git a/meshview/store.py b/meshview/store.py index 97c13d2..3ff6118 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -2,7 +2,7 @@ import datetime from sqlalchemy import select, func from sqlalchemy.orm import lazyload - +from sqlalchemy import update from meshtastic.protobuf.config_pb2 import Config from meshtastic.protobuf.portnums_pb2 import PortNum from meshtastic.protobuf.mesh_pb2 import User, HardwareModel @@ -12,7 +12,34 @@ from meshview.models import Packet, PacketSeen, Node, Traceroute from meshview import notify + async def process_envelope(topic, env): + + # Checking if the received packet is a MAP_REPORT + # Update the node table with the firmware version + if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP: + # Extract the node ID from the packet (renamed from 'id' to 'node_id' to avoid conflicts with Python's built-in id function) + node_id = getattr(env.packet, "from") + + # Decode the MAP report payload to extract the firmware version + map_report = decode_payload.decode_payload(PortNum.MAP_REPORT_APP, env.packet.decoded.payload) + + # Establish an asynchronous database session + async with database.async_session() as session: + # Construct an SQLAlchemy update statement + stmt = ( + update(Node) + .where(Node.node_id == node_id) # Ensure correct column reference + .values(firmware=map_report.firmware_version) # Assign new firmware value + ) + + # Execute the update statement asynchronously + await session.execute(stmt) + + # Commit the changes to the database + await session.commit() + + # This ignores any packet that does not have a ID if not env.packet.id: return @@ -58,6 +85,8 @@ async def process_envelope(topic, env): ) session.add(seen) + + if env.packet.decoded.portnum == PortNum.NODEINFO_APP: user = decode_payload.decode_payload( PortNum.NODEINFO_APP, env.packet.decoded.payload @@ -88,7 +117,7 @@ async def process_envelope(topic, env): node.short_name = user.short_name node.hw_model = hw_model node.role = role - # if need to update time of last update it may be here + node.last_update =datetime.datetime.now() else: node = Node( @@ -415,6 +444,7 @@ async def get_total_packet_count(): async def get_total_node_count(): async with database.async_session() as session: q = select(func.count(Node.id)) # Use SQLAlchemy's func to count nodes + q = q.where(Node.last_update > datetime.datetime.now() - datetime.timedelta(days=1)) # Look for nodes with nodeinfo updates in the last 24 hours result = await session.execute(q) return result.scalar() # Return the total count of nodes @@ -427,24 +457,13 @@ async def get_total_packet_seen_count(): async def get_total_node_count_longfast() -> int: - """ - Retrieves the total count of nodes where the channel is equal to 'LongFast'. - - This function queries the database asynchronously to count the number of nodes - in the `Node` table that meet the condition `channel == 'LongFast'`. It uses - SQLAlchemy's asynchronous session management and query construction. - - Returns: - int: The total count of nodes with `channel == 'LongFast'`. - - Raises: - Exception: If an error occurs during the database query execution. - """ try: # Open an asynchronous session with the database async with database.async_session() as session: # Build the query to count nodes where channel == 'LongFast' - q = select(func.count(Node.id)).filter(Node.channel == 'LongFast') + q = select(func.count(Node.id)) + q = q.where(Node.last_update > datetime.datetime.now() - datetime.timedelta( days=1)) # Look for nodes with nodeinfo updates in the last 24 hours + q = q.where(Node.channel == 'LongFast') # # Execute the query asynchronously and fetch the result result = await session.execute(q) @@ -458,25 +477,14 @@ async def get_total_node_count_longfast() -> int: async def get_total_node_count_mediumslow() -> int: - """ - Retrieves the total count of nodes where the channel is equal to 'MediumSlow'. - - This function queries the database asynchronously to count the number of nodes - in the `Node` table that meet the condition `channel == 'MediumSlow'`. It uses - SQLAlchemy's asynchronous session management and query construction. - - Returns: - int: The total count of nodes with `channel == 'MediumSlow'`. - - Raises: - Exception: If an error occurs during the database query execution. - """ try: # Open an asynchronous session with the database async with database.async_session() as session: # Build the query to count nodes where channel == 'LongFast' - q = select(func.count(Node.id)).filter(Node.channel == 'MediumSlow') - + q = select(func.count(Node.id)) + q = q.where(Node.last_update > datetime.datetime.now() - datetime.timedelta( + days=1)) # Look for nodes with nodeinfo updates in the last 24 hours + q = q.where(Node.channel == 'MediumSlow') # # Execute the query asynchronously and fetch the result result = await session.execute(q) @@ -498,6 +506,51 @@ async def get_nodes_mediumslow(): (Node.channel == "MediumSlow") ) ) + return result.scalars() +async def get_nodes(role=None, channel=None, hw_model=None): + """ + Fetches nodes from the database based on optional filtering criteria. + + Parameters: + role (str, optional): The role of the node (converted to uppercase for consistency). + channel (str, optional): The communication channel associated with the node. + hw_model (str, optional): The hardware model of the node. + + Returns: + list: A list of Node objects that match the given criteria. + """ + try: + async with database.async_session() as session: + #print(channel) # Debugging output (consider replacing with logging) + + # Start with a base query selecting all nodes + query = select(Node) + + # Apply filters based on provided parameters + if role is not None: + query = query.where(Node.role == role.upper()) # Ensure role is uppercase + if channel is not None: + query = query.where(Node.channel == channel) + if hw_model is not None: + query = query.where(Node.hw_model == hw_model) + + # Exclude nodes where last_update is an empty string + query = query.where(Node.last_update != "") + + # Order results by long_name in ascending order + query = query.order_by(Node.long_name.asc()) + + # Execute the query and retrieve results + result = await session.execute(query) + nodes = result.scalars().all() + return nodes # Return the list of nodes + + except Exception as e: + print("error reading DB") # Consider using logging instead of print + return [] # Return an empty list in case of failure + + + diff --git a/meshview/templates/base.html b/meshview/templates/base.html index c82bbc8..c089e4d 100644 --- a/meshview/templates/base.html +++ b/meshview/templates/base.html @@ -34,7 +34,9 @@
| Long Name | +Short Name | +HW Model | +Firmware | +Role | +Last Latitude | +Last Longitude | +Channel | +Last Update | +
|---|---|---|---|---|---|---|---|---|
| {{ node.long_name }} | +{{ node.short_name }} | +{{ node.hw_model if node.hw_model else "N/A" }} | +{{ node.firmware }} | +{{ node.role if node.role else "N/A" }} | +{{ "{:.7f}".format(node.last_lat / 10**7) if node.last_lat else "N/A" }} | +{{ "{:.7f}".format(node.last_long / 10**7) if node.last_long else "N/A" }} | +{{ node.channel if node.channel else "N/A" }} | ++ {{ node.last_update.strftime('%-I:%M:%S %p - %m-%d-%Y') if node.last_update else "N/A" }} + | +
No nodes found.
+ {% endif %} +{{packet.data}}- Total Nodes: + Total Active Nodes (Last 24 hours): {{ total_nodes }}
- Total Nodes LongFast:
+ Total Active Nodes LongFast:
- Total Nodes MediumSlow:
+ Total Active Nodes MediumSlow:
{{ total_nodes_longfast }}
({{ (total_nodes_longfast / total_nodes * 100) | round(2) }}%)
@@ -33,7 +33,7 @@
{{ total_nodes_mediumslow }}
({{ (total_nodes_mediumslow / total_nodes * 100) | round(2) }}%)
diff --git a/meshview/web.py b/meshview/web.py
index 52080b3..b917882 100644
--- a/meshview/web.py
+++ b/meshview/web.py
@@ -307,71 +307,121 @@ async def _packet_list(request, raw_packets, packet_event):
@routes.get("/chat_events")
async def chat_events(request):
+ """
+ Server-Sent Events (SSE) endpoint for real-time chat packet updates.
+
+ This endpoint listens for new chat packets related to `PortNum.TEXT_MESSAGE_APP`
+ and streams them to connected clients. Messages matching the pattern `"seq \d+$"`
+ are filtered out before sending.
+
+ Args:
+ request (aiohttp.web.Request): The incoming HTTP request.
+
+ Returns:
+ aiohttp.web.StreamResponse: SSE response streaming chat events.
+ """
chat_packet = env.get_template("chat_packet.html")
+ # Precompile regex for filtering out unwanted messages (case insensitive)
+ seq_pattern = re.compile(r"seq \d+$", re.IGNORECASE)
+
+ # Subscribe to notifications for packets from all nodes (0xFFFFFFFF = broadcast)
with notify.subscribe(node_id=0xFFFFFFFF) as event:
async with sse_response(request) as resp:
- while resp.is_connected():
+ while resp.is_connected(): # Keep the connection open while the client is connected
try:
- async with asyncio.timeout(10):
- await event.wait()
- except TimeoutError:
+ # Wait for an event with a timeout of 10 seconds
+ await asyncio.wait_for(event.wait(), timeout=10)
+ except asyncio.TimeoutError:
+ # Timeout reached, continue looping to keep connection alive
continue
+
if event.is_set():
+ # Extract relevant packets, ensuring event.packets is not None
packets = [
- p
- for p in event.packets
- if PortNum.TEXT_MESSAGE_APP == p.portnum
+ p for p in (event.packets or [])
+ if p.portnum == PortNum.TEXT_MESSAGE_APP
]
- event.clear()
+ event.clear() # Reset event flag
+
try:
for packet in packets:
ui_packet = Packet.from_model(packet)
- if not re.match(r"seq \d+$", ui_packet.payload):
+
+ # Filter out packets that match "seq