From e25ff221277b61118e9c4ec6b28e85242cb93cc7 Mon Sep 17 00:00:00 2001 From: Joel Krauska Date: Fri, 3 Oct 2025 20:38:37 -0700 Subject: [PATCH] Add Ruff formatting and pre-commit hooks Configure Ruff as the code formatter and linter with pre-commit hooks. Applied automatic formatting fixes across the entire codebase including: - Import sorting and organization - Code style consistency (spacing, line breaks, indentation) - String quote normalization - Removal of trailing whitespace and unnecessary blank lines --- .pre-commit-config.yaml | 8 + main.py | 8 +- meshview/config.py | 7 +- meshview/database.py | 14 +- meshview/decode_payload.py | 16 +- meshview/models.py | 19 +- meshview/mqtt_database.py | 9 +- meshview/mqtt_reader.py | 19 +- meshview/mqtt_store.py | 52 ++--- meshview/notify.py | 4 +- meshview/store.py | 109 +++++---- meshview/web.py | 464 ++++++++++++++++++++----------------- mvrun.py | 41 ++-- pyproject.toml | 13 ++ startdb.py | 31 +-- 15 files changed, 449 insertions(+), 365 deletions(-) create mode 100644 .pre-commit-config.yaml create mode 100644 pyproject.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..0dd031f --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,8 @@ +repos: + - repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.6.9 # pin the latest you’re comfortable with + hooks: + - id: ruff + args: [--fix, --exit-non-zero-on-fix] # fail if it had to change files + - id: ruff-format + \ No newline at end of file diff --git a/main.py b/main.py index 828a47c..43c4a1e 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,12 @@ import asyncio + from meshview import web -async def main(): +async def main(): async with asyncio.TaskGroup() as tg: - tg.create_task( - web.run_server() - ) + tg.create_task(web.run_server()) + if __name__ == '__main__': asyncio.run(main()) diff --git a/meshview/config.py b/meshview/config.py index 17e1923..e6e6b14 100644 --- a/meshview/config.py +++ b/meshview/config.py @@ -1,9 +1,11 @@ -import configparser import argparse +import configparser # Parse command-line arguments parser = argparse.ArgumentParser(description="MeshView Configuration Loader") -parser.add_argument("--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)") +parser.add_argument( + "--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)" +) args = parser.parse_args() # Initialize config parser @@ -12,4 +14,3 @@ if not config_parser.read(args.config): raise FileNotFoundError(f"Config file '{args.config}' not found! Ensure the file exists.") CONFIG = {section: dict(config_parser.items(section)) for section in config_parser.sections()} - diff --git a/meshview/database.py b/meshview/database.py index 3586281..1505905 100644 --- a/meshview/database.py +++ b/meshview/database.py @@ -1,6 +1,6 @@ +from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine + from meshview import models -from sqlalchemy.ext.asyncio import async_sessionmaker -from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession engine = None async_session = None @@ -13,10 +13,12 @@ def init_database(database_connection_string): database_connection_string += "?mode=ro" kwargs["connect_args"] = {"uri": True} engine = create_async_engine(database_connection_string, **kwargs) - async_session = async_sessionmaker( bind=engine, - class_=AsyncSession, - expire_on_commit=False, -) + async_session = async_sessionmaker( + bind=engine, + class_=AsyncSession, + expire_on_commit=False, + ) + async def create_tables(): async with engine.begin() as conn: diff --git a/meshview/decode_payload.py b/meshview/decode_payload.py index f8f9a20..080a935 100644 --- a/meshview/decode_payload.py +++ b/meshview/decode_payload.py @@ -1,16 +1,16 @@ -from meshtastic.protobuf.mqtt_pb2 import MapReport -from meshtastic.protobuf.portnums_pb2 import PortNum +from google.protobuf.message import DecodeError + from meshtastic.protobuf.mesh_pb2 import ( - Position, + MeshPacket, NeighborInfo, - NodeInfo, - User, + Position, RouteDiscovery, Routing, - MeshPacket, + User, ) +from meshtastic.protobuf.mqtt_pb2 import MapReport +from meshtastic.protobuf.portnums_pb2 import PortNum from meshtastic.protobuf.telemetry_pb2 import Telemetry -from google.protobuf.message import DecodeError def text_message(payload): @@ -25,7 +25,7 @@ DECODE_MAP = { PortNum.TRACEROUTE_APP: RouteDiscovery.FromString, PortNum.ROUTING_APP: Routing.FromString, PortNum.TEXT_MESSAGE_APP: text_message, - PortNum.MAP_REPORT_APP: MapReport.FromString + PortNum.MAP_REPORT_APP: MapReport.FromString, } diff --git a/meshview/models.py b/meshview/models.py index 6185d74..b88606b 100644 --- a/meshview/models.py +++ b/meshview/models.py @@ -1,8 +1,8 @@ from datetime import datetime -from sqlalchemy.orm import DeclarativeBase, foreign + +from sqlalchemy import BigInteger, ForeignKey, Index, desc from sqlalchemy.ext.asyncio import AsyncAttrs -from sqlalchemy.orm import mapped_column, relationship, Mapped -from sqlalchemy import ForeignKey, BigInteger, Index, desc +from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship class Base(AsyncAttrs, DeclarativeBase): @@ -24,9 +24,7 @@ class Node(Base): channel: Mapped[str] = mapped_column(nullable=True) last_update: Mapped[datetime] = mapped_column(nullable=True) - __table_args__ = ( - Index("idx_node_node_id", "node_id"), - ) + __table_args__ = (Index("idx_node_node_id", "node_id"),) def to_dict(self): return { @@ -79,10 +77,7 @@ class PacketSeen(Base): topic: Mapped[str] = mapped_column(nullable=True) import_time: Mapped[datetime] = mapped_column(nullable=True) - __table_args__ = ( - Index("idx_packet_seen_node_id", "node_id"), - ) - + __table_args__ = (Index("idx_packet_seen_node_id", "node_id"),) class Traceroute(Base): @@ -98,6 +93,4 @@ class Traceroute(Base): route: Mapped[bytes] = mapped_column(nullable=True) import_time: Mapped[datetime] = mapped_column(nullable=True) - __table_args__ = ( - Index("idx_traceroute_import_time", "import_time"), - ) + __table_args__ = (Index("idx_traceroute_import_time", "import_time"),) diff --git a/meshview/mqtt_database.py b/meshview/mqtt_database.py index f52bf7c..6b5e004 100644 --- a/meshview/mqtt_database.py +++ b/meshview/mqtt_database.py @@ -1,11 +1,16 @@ +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine + from meshview import models -from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker + def init_database(database_connection_string): global engine, async_session - engine = create_async_engine(database_connection_string, echo=False, connect_args={"timeout": 900}) + engine = create_async_engine( + database_connection_string, echo=False, connect_args={"timeout": 900} + ) async_session = async_sessionmaker(engine, expire_on_commit=False) + async def create_tables(): async with engine.begin() as conn: await conn.run_sync(models.Base.metadata.create_all) diff --git a/meshview/mqtt_reader.py b/meshview/mqtt_reader.py index e2e6b79..e443eea 100644 --- a/meshview/mqtt_reader.py +++ b/meshview/mqtt_reader.py @@ -1,11 +1,13 @@ -import base64 import asyncio +import base64 +import logging import random import time + import aiomqtt -import logging -from google.protobuf.message import DecodeError from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from google.protobuf.message import DecodeError + from meshtastic.protobuf.mqtt_pb2 import ServiceEnvelope KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==") @@ -13,7 +15,7 @@ KEY = base64.b64decode("1PG7OiApB1nwvP+rz05pAQ==") logging.basicConfig( level=logging.INFO, format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" + datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) @@ -48,7 +50,6 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa password=mqtt_passwd, identifier=identifier, ) as client: - logger.info(f"Connected to MQTT broker at {mqtt_server}:{mqtt_port}") for topic in topics: logger.info(f"Subscribing to: {topic}") @@ -76,10 +77,14 @@ async def get_topic_envelopes(mqtt_server, mqtt_port, topics, mqtt_user, mqtt_pa msg_count += 1 # FIXME: make this interval configurable or time based - if msg_count % 10000 == 0: # Log notice every 10000 messages (approx every hour at 3/sec) + if ( + msg_count % 10000 == 0 + ): # Log notice every 10000 messages (approx every hour at 3/sec) elapsed_time = time.time() - start_time msg_rate = msg_count / elapsed_time if elapsed_time > 0 else 0 - logger.info(f"Processed {msg_count} messages so far... ({msg_rate:.2f} msg/sec)") + logger.info( + f"Processed {msg_count} messages so far... ({msg_rate:.2f} msg/sec)" + ) yield msg.topic.value, envelope diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py index c4f5c42..57fe9d2 100644 --- a/meshview/mqtt_store.py +++ b/meshview/mqtt_store.py @@ -1,17 +1,17 @@ import datetime import re + from sqlalchemy import select from sqlalchemy.dialects.sqlite import insert as sqlite_insert + from meshtastic.protobuf.config_pb2 import Config +from meshtastic.protobuf.mesh_pb2 import HardwareModel from meshtastic.protobuf.portnums_pb2 import PortNum -from meshtastic.protobuf.mesh_pb2 import User, HardwareModel -from meshview import mqtt_database -from meshview import decode_payload -from meshview.models import Packet, PacketSeen, Node, Traceroute +from meshview import decode_payload, mqtt_database +from meshview.models import Node, Packet, PacketSeen, Traceroute async def process_envelope(topic, env): - # MAP_REPORT_APP if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP: node_id = getattr(env.packet, "from") @@ -73,22 +73,26 @@ async def process_envelope(topic, env): async with mqtt_database.async_session() as session: # --- Packet insert with ON CONFLICT DO NOTHING - result = await session.execute( - select(Packet).where(Packet.id == env.packet.id) - ) - new_packet = False + result = await session.execute(select(Packet).where(Packet.id == env.packet.id)) + # FIXME: Not Used + # new_packet = False packet = result.scalar_one_or_none() if not packet: - new_packet = True - stmt = sqlite_insert(Packet).values( - id=env.packet.id, - portnum=env.packet.decoded.portnum, - from_node_id=getattr(env.packet, "from"), - to_node_id=env.packet.to, - payload=env.packet.SerializeToString(), - import_time=datetime.datetime.now(), - channel=env.channel_id, - ).on_conflict_do_nothing(index_elements=["id"]) + # FIXME: Not Used + # new_packet = True + stmt = ( + sqlite_insert(Packet) + .values( + id=env.packet.id, + portnum=env.packet.decoded.portnum, + from_node_id=getattr(env.packet, "from"), + to_node_id=env.packet.to, + payload=env.packet.SerializeToString(), + import_time=datetime.datetime.now(), + channel=env.channel_id, + ) + .on_conflict_do_nothing(index_elements=["id"]) + ) await session.execute(stmt) # --- PacketSeen (no conflict handling here, normal insert) @@ -138,9 +142,7 @@ async def process_envelope(topic, env): ) node = ( - await session.execute( - select(Node).where(Node.id == user.id) - ) + await session.execute(select(Node).where(Node.id == user.id)) ).scalar_one_or_none() if node: @@ -174,9 +176,7 @@ async def process_envelope(topic, env): if position and position.latitude_i and position.longitude_i: from_node_id = getattr(env.packet, "from") node = ( - await session.execute( - select(Node).where(Node.node_id == from_node_id) - ) + await session.execute(select(Node).where(Node.node_id == from_node_id)) ).scalar_one_or_none() if node: node.last_lat = position.latitude_i @@ -207,6 +207,6 @@ async def process_envelope(topic, env): await session.commit() - #if new_packet: + # if new_packet: # await packet.awaitable_attrs.to_node # await packet.awaitable_attrs.from_node diff --git a/meshview/notify.py b/meshview/notify.py index 57268e9..4926413 100644 --- a/meshview/notify.py +++ b/meshview/notify.py @@ -1,6 +1,6 @@ +import asyncio import contextlib from collections import defaultdict -import asyncio waiting_node_ids_events = defaultdict(set) @@ -36,11 +36,13 @@ def create_event(node_id): def remove_event(node_event): waiting_node_ids_events[node_event.node_id].remove(node_event) + def notify_packet(node_id, packet): for event in waiting_node_ids_events[node_id]: event.packets.append(packet) event.set() + def notify_uplinked(node_id, packet): for event in waiting_node_ids_events[node_id]: event.uplinked.append(packet) diff --git a/meshview/store.py b/meshview/store.py index 6187413..ef3c7ae 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -1,10 +1,12 @@ -from sqlalchemy import select, func -from sqlalchemy.orm import lazyload -from meshview import database -from meshview.models import Packet, PacketSeen, Node, Traceroute -from sqlalchemy import text from datetime import datetime, timedelta +from sqlalchemy import func, select, text +from sqlalchemy.orm import lazyload + +from meshview import database +from meshview.models import Node, Packet, PacketSeen, Traceroute + + async def get_node(node_id): async with database.async_session() as session: result = await session.execute(select(Node).where(Node.node_id == node_id)) @@ -27,9 +29,7 @@ async def get_packets(node_id=None, portnum=None, after=None, before=None, limit q = select(Packet) if node_id: - q = q.where( - (Packet.from_node_id == node_id) | (Packet.to_node_id == node_id) - ) + q = q.where((Packet.from_node_id == node_id) | (Packet.to_node_id == node_id)) if portnum: q = q.where(Packet.portnum == portnum) if after: @@ -47,15 +47,12 @@ async def get_packets(node_id=None, portnum=None, after=None, before=None, limit return packets - async def get_packets_from(node_id=None, portnum=None, since=None, limit=500): async with database.async_session() as session: q = select(Packet) if node_id: - q = q.where( - Packet.from_node_id == node_id - ) + q = q.where(Packet.from_node_id == node_id) if portnum: q = q.where(Packet.portnum == portnum) if since: @@ -73,7 +70,13 @@ async def get_packet(packet_id): async def get_uplinked_packets(node_id, portnum=None): async with database.async_session() as session: - q = select(Packet).join(PacketSeen).where(PacketSeen.node_id == node_id).order_by(Packet.import_time.desc()).limit(500) + q = ( + select(Packet) + .join(PacketSeen) + .where(PacketSeen.node_id == node_id) + .order_by(Packet.import_time.desc()) + .limit(500) + ) if portnum: q = q.where(Packet.portnum == portnum) result = await session.execute(q) @@ -93,18 +96,20 @@ async def get_packets_seen(packet_id): async def has_packets(node_id, portnum): async with database.async_session() as session: return bool( - (await session.execute( + ( + await session.execute( select(Packet.id).where(Packet.from_node_id == node_id).limit(1) - )).scalar() + ) + ).scalar() ) async def get_traceroute(packet_id): async with database.async_session() as session: result = await session.execute( - select(Traceroute) - .where(Traceroute.packet_id == packet_id) - .order_by(Traceroute.import_time) + select(Traceroute) + .where(Traceroute.packet_id == packet_id) + .order_by(Traceroute.import_time) ) return result.scalars() @@ -122,10 +127,10 @@ async def get_traceroutes(since): yield tr - async def get_mqtt_neighbors(since): async with database.async_session() as session: - result = await session.execute(select(PacketSeen, Packet) + result = await session.execute( + select(PacketSeen, Packet) .join(Packet) .where( (PacketSeen.hop_limit == PacketSeen.hop_start) @@ -148,6 +153,7 @@ async def get_total_packet_count(): result = await session.execute(q) return result.scalar() # Return the total count of packets + # We count the total amount of seen packets async def get_total_packet_seen_count(): async with database.async_session() as session: @@ -156,7 +162,6 @@ async def get_total_packet_seen_count(): return result.scalar() # Return the` total count of seen packets - async def get_total_node_count(channel: str = None) -> int: try: async with database.async_session() as session: @@ -177,7 +182,8 @@ async def get_total_node_count(channel: str = None) -> int: async def get_top_traffic_nodes(): try: async with database.async_session() as session: - result = await session.execute(text(""" + result = await session.execute( + text(""" SELECT n.node_id, n.long_name, @@ -192,18 +198,22 @@ async def get_top_traffic_nodes(): GROUP BY n.node_id, n.long_name, n.short_name HAVING total_packets_sent > 0 ORDER BY total_times_seen DESC; - """)) + """) + ) rows = result.fetchall() - nodes = [{ - 'node_id': row[0], - 'long_name': row[1], - 'short_name': row[2], - 'channel': row[3], - 'total_packets_sent': row[4], - 'total_times_seen': row[5] - } for row in rows] + nodes = [ + { + 'node_id': row[0], + 'long_name': row[1], + 'short_name': row[2], + 'channel': row[3], + 'total_packets_sent': row[4], + 'total_times_seen': row[5], + } + for row in rows + ] return nodes except Exception as e: @@ -211,7 +221,6 @@ async def get_top_traffic_nodes(): return [] - async def get_node_traffic(node_id: int): try: async with database.async_session() as session: @@ -226,15 +235,19 @@ async def get_node_traffic(node_id: int): AND packet.import_time >= DATETIME('now', 'localtime', '-24 hours') GROUP BY packet.portnum ORDER BY packet_count DESC; - """), {"node_id": node_id} + """), + {"node_id": node_id}, ) # Map the result to include node.long_name and packet data - traffic_data = [{ - "long_name": row[0], # node.long_name - "portnum": row[1], # packet.portnum - "packet_count": row[2] # COUNT(*) as packet_count - } for row in result.all()] + traffic_data = [ + { + "long_name": row[0], # node.long_name + "portnum": row[1], # packet.portnum + "packet_count": row[2], # COUNT(*) as packet_count + } + for row in result.all() + ] return traffic_data @@ -244,7 +257,6 @@ async def get_node_traffic(node_id: int): return [] - async def get_nodes(role=None, channel=None, hw_model=None, days_active=None): """ Fetches nodes from the database based on optional filtering criteria. @@ -259,7 +271,7 @@ async def get_nodes(role=None, channel=None, hw_model=None, days_active=None): """ try: async with database.async_session() as session: - #print(channel) # Debugging output (consider replacing with logging) + # print(channel) # Debugging output (consider replacing with logging) # Start with a base query selecting all nodes query = select(Node) @@ -286,7 +298,7 @@ async def get_nodes(role=None, channel=None, hw_model=None, days_active=None): nodes = result.scalars().all() return nodes # Return the list of nodes - except Exception as e: + except Exception: print("error reading DB") # Consider using logging instead of print return [] # Return an empty list in case of failure @@ -297,7 +309,7 @@ async def get_packet_stats( channel: str | None = None, portnum: int | None = None, to_node: int | None = None, - from_node: int | None = None + from_node: int | None = None, ): now = datetime.now() @@ -311,13 +323,10 @@ async def get_packet_stats( raise ValueError("period_type must be 'hour' or 'day'") async with database.async_session() as session: - q = ( - select( - func.strftime(time_format, Packet.import_time).label('period'), - func.count().label('count') - ) - .where(Packet.import_time >= start_time) - ) + q = select( + func.strftime(time_format, Packet.import_time).label('period'), + func.count().label('count'), + ).where(Packet.import_time >= start_time) # Filters if channel: @@ -341,7 +350,7 @@ async def get_packet_stats( "portnum": portnum, "to_node": to_node, "from_node": from_node, - "data": data + "data": data, } diff --git a/meshview/web.py b/meshview/web.py index dd5f328..d10f6a6 100644 --- a/meshview/web.py +++ b/meshview/web.py @@ -1,39 +1,37 @@ import asyncio import datetime -from datetime import timedelta import json import logging import os +import pathlib +import re import ssl +import traceback from collections import Counter, defaultdict from dataclasses import dataclass +from datetime import timedelta + import pydot +from aiohttp import web from google.protobuf import text_format from google.protobuf.message import Message -from jinja2 import Environment, PackageLoader, select_autoescape, Undefined +from jinja2 import Environment, PackageLoader, Undefined, select_autoescape from markupsafe import Markup from pandas import DataFrame + from meshtastic.protobuf.portnums_pb2 import PortNum -from meshview import config -from meshview import database -from meshview import decode_payload -from meshview import models -from meshview import store -from aiohttp import web -import re -import traceback -import pathlib +from meshview import config, database, decode_payload, models, store logging.basicConfig( level=logging.INFO, format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" + datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) SEQ_REGEX = re.compile(r"seq \d+") -SOFTWARE_RELEASE= "2.0.7 ~ 09-17-25" +SOFTWARE_RELEASE = "2.0.7 ~ 09-17-25" CONFIG = config.CONFIG env = Environment(loader=PackageLoader("meshview"), autoescape=select_autoescape()) @@ -44,6 +42,7 @@ database.init_database(CONFIG["database"]["connection_string"]) with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png: empty_png = png.read() + @dataclass class Packet: id: int @@ -59,7 +58,6 @@ class Packet: pretty_payload: Markup import_time: datetime.datetime - @classmethod def from_model(cls, packet): mesh_packet, payload = decode_payload.decode(packet) @@ -76,10 +74,7 @@ class Packet: text_payload = "Did not decode" elif isinstance(payload, Message): text_payload = text_format.MessageToString(payload) - elif ( - packet.portnum == PortNum.TEXT_MESSAGE_APP - and packet.to_node_id != 0xFFFFFFFF - ): + elif packet.portnum == PortNum.TEXT_MESSAGE_APP and packet.to_node_id != 0xFFFFFFFF: text_payload = "" else: text_payload = payload @@ -109,6 +104,7 @@ class Packet: raw_payload=payload, ) + @dataclass class UplinkedNode: lat: float @@ -122,7 +118,9 @@ class UplinkedNode: async def build_trace(node_id): trace = [] - for raw_p in await store.get_packets_from(node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)): + for raw_p in await store.get_packets_from( + node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24) + ): p = Packet.from_model(raw_p) if not p.raw_payload or not p.raw_payload.latitude_i or not p.raw_payload.longitude_i: continue @@ -153,7 +151,7 @@ async def build_neighbors(node_id): tasks = {n.node_id: store.get_node(n.node_id) for n in payload.neighbors} results = await asyncio.gather(*tasks.values(), return_exceptions=True) - for neighbor, node in zip(payload.neighbors, results): + for neighbor, node in zip(payload.neighbors, results, strict=False): if isinstance(node, Exception): continue if node and node.last_lat and node.last_long: @@ -170,7 +168,7 @@ async def build_neighbors(node_id): def node_id_to_hex(node_id): if node_id is None or isinstance(node_id, Undefined): - return "Invalid node_id" # i... have no clue + return "Invalid node_id" # i... have no clue if node_id == 4294967295: return "^all" else: @@ -179,7 +177,7 @@ def node_id_to_hex(node_id): def format_timestamp(timestamp): if isinstance(timestamp, int): - timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.timezone.utc) + timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.UTC) return timestamp.isoformat(timespec="milliseconds") @@ -187,6 +185,8 @@ env.filters["node_id_to_hex"] = node_id_to_hex env.filters["format_timestamp"] = format_timestamp routes = web.RouteTableDef() + + @routes.get("/") async def index(request): """ @@ -198,7 +198,6 @@ async def index(request): raise web.HTTPFound(location=starting_url) - def generate_response(request, body, raw_node_id="", node=None): if "HX-Request" in request.headers: return web.Response(text=body, content_type="text/html") @@ -262,7 +261,7 @@ async def node_search(request): @routes.get("/node_match") async def node_match(request): - if not "q" in request.query or not request.query["q"]: + if "q" not in request.query or not request.query["q"]: return web.Response(text="Bad node id") raw_node_id = request.query["q"] node_options = await store.get_fuzzy_nodes(raw_node_id) @@ -275,6 +274,7 @@ async def node_match(request): content_type="text/html", ) + @routes.get("/packet_list/{node_id}") async def packet_list(request): try: @@ -350,7 +350,7 @@ async def packet_list(request): except asyncio.CancelledError: raise # Let TaskGroup cancellation propagate correctly - except Exception as e: + except Exception: # Log full traceback for diagnostics traceback.print_exc() template = env.get_template("error.html") @@ -414,7 +414,7 @@ async def packet_details(request): from_node_cord=from_node_cord, uplinked_nodes=uplinked_nodes, node=node, - site_config = CONFIG, + site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE, ), content_type="text/html", @@ -422,7 +422,7 @@ async def packet_details(request): @routes.get("/firehose") -async def packet_details(request): +async def packet_details_firehose(request): portnum = request.query.get("portnum") if portnum: portnum = int(portnum) @@ -432,12 +432,13 @@ async def packet_details(request): text=template.render( packets=(Packet.from_model(p) for p in packets), portnum=portnum, - site_config = CONFIG, + site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE, ), content_type="text/html", ) + @routes.get("/firehose/updates") async def firehose_updates(request): try: @@ -454,7 +455,6 @@ async def firehose_updates(request): # Query packets after last_time (microsecond precision) packets = await store.get_packets(after=last_time, limit=10) - # Convert to UI model ui_packets = [Packet.from_model(p) for p in packets] @@ -521,6 +521,7 @@ async def graph_power_json(request): ], ) + @routes.get("/graph/utilization_json/{node_id}") async def graph_chutil_json(request): return await graph_telemetry_json( @@ -529,6 +530,7 @@ async def graph_chutil_json(request): [{'label': 'utilization', 'fields': ['channel_utilization', 'air_util_tx']}], ) + @routes.get("/graph/wind_speed_json/{node_id}") async def graph_wind_speed_json(request): return await graph_telemetry_json( @@ -537,6 +539,7 @@ async def graph_wind_speed_json(request): [{'label': 'wind speed m/s', 'fields': ['wind_speed']}], ) + @routes.get("/graph/wind_direction_json/{node_id}") async def graph_wind_direction_json(request): return await graph_telemetry_json( @@ -545,6 +548,7 @@ async def graph_wind_direction_json(request): [{'label': 'wind direction', 'fields': ['wind_direction']}], ) + @routes.get("/graph/temperature_json/{node_id}") async def graph_temperature_json(request): return await graph_telemetry_json( @@ -553,6 +557,7 @@ async def graph_temperature_json(request): [{'label': 'temperature C', 'fields': ['temperature']}], ) + @routes.get("/graph/humidity_json/{node_id}") async def graph_humidity_json(request): return await graph_telemetry_json( @@ -561,6 +566,7 @@ async def graph_humidity_json(request): [{'label': 'humidity', 'fields': ['relative_humidity']}], ) + @routes.get("/graph/pressure_json/{node_id}") async def graph_pressure_json(request): return await graph_telemetry_json( @@ -569,6 +575,7 @@ async def graph_pressure_json(request): [{'label': 'barometric pressure', 'fields': ['barometric_pressure']}], ) + @routes.get("/graph/iaq_json/{node_id}") async def graph_iaq_json(request): return await graph_telemetry_json( @@ -577,6 +584,7 @@ async def graph_iaq_json(request): [{'label': 'IAQ', 'fields': ['iaq']}], ) + @routes.get("/graph/power_metrics_json/{node_id}") async def graph_power_metrics_json(request): return await graph_telemetry_json( @@ -584,7 +592,11 @@ async def graph_power_metrics_json(request): 'power_metrics', [ {'label': 'voltage', 'fields': ['ch1_voltage', 'ch2_voltage', 'ch3_voltage']}, - {'label': 'current', 'fields': ['ch1_current', 'ch2_current', 'ch3_current'], 'palette': 'Set2'}, + { + 'label': 'current', + 'fields': ['ch1_current', 'ch2_current', 'ch3_current'], + 'palette': 'Set2', + }, ], ) @@ -616,21 +628,26 @@ async def graph_telemetry_json(node_id, payload_type, graph_config): series = [] for conf in graph_config: for field in conf['fields']: - series.append({ - 'name': f"{conf['label']} - {field}" if len(conf['fields']) > 1 else conf['label'], - 'data': df[field].tolist() - }) + series.append( + { + 'name': f"{conf['label']} - {field}" + if len(conf['fields']) > 1 + else conf['label'], + 'data': df[field].tolist(), + } + ) - return web.json_response({ - 'timestamps': df['date'].tolist(), - 'series': series, - }) + return web.json_response( + { + 'timestamps': df['date'].tolist(), + 'series': series, + } + ) @routes.get("/graph/neighbors_json/{node_id}") async def graph_neighbors_json(request): import datetime - from pandas import DataFrame node_id = int(request.match_info['node_id']) oldest = datetime.datetime.now() - datetime.timedelta(days=4) @@ -663,10 +680,13 @@ async def graph_neighbors_json(request): name = node.short_name if node else node_id_to_hex(node_id) series.append({"name": name, "data": snrs}) - return web.json_response({ - "timestamps": dates, - "series": series, - }) + return web.json_response( + { + "timestamps": dates, + "series": series, + } + ) + @routes.get("/graph/traceroute/{packet_id}") async def graph_traceroute(request): @@ -738,7 +758,9 @@ async def graph_traceroute(request): if not node: node_name = node_id_to_hex(node_id) else: - node_name = f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}' + node_name = ( + f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}' + ) if node_id in node_seen_time: ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000 node_name += f'\n {ms:.2f}ms' @@ -751,18 +773,20 @@ async def graph_traceroute(request): if node_id in saw_reply: style += ', diagonals' - graph.add_node(pydot.Node( - str(node_id), - label=node_name, - shape='box', - color=node_color.get(node_id, 'black'), - style=style, - href=f"/packet_list/{node_id}", - )) + graph.add_node( + pydot.Node( + str(node_id), + label=node_name, + shape='box', + color=node_color.get(node_id, 'black'), + style=style, + href=f"/packet_list/{node_id}", + ) + ) for path in paths: color = '#' + hex(hash(tuple(path)))[3:9] - for src, dest in zip(path, path[1:]): + for src, dest in zip(path, path[1:], strict=False): graph.add_edge(pydot.Edge(src, dest, color=color)) return web.Response( @@ -796,7 +820,8 @@ async def graph_traceroute2(request): nodes[node_id] = tg.create_task(store.get_node(node_id)) # Initialize graph for traceroute - graph = pydot.Dot('traceroute', graph_type="digraph") + # FIXME: This is not used + # graph = pydot.Dot('traceroute', graph_type="digraph") paths = set() node_color = {} @@ -843,13 +868,17 @@ async def graph_traceroute2(request): if not node: # Handle case where node is None node_name = node_id_to_hex(node_id) - chart_nodes.append({ - "name": str(node_id), - "value": node_name, - "symbol": 'rect', - }) + chart_nodes.append( + { + "name": str(node_id), + "value": node_name, + "symbol": 'rect', + } + ) else: - node_name = f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}' + node_name = ( + f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}' + ) if node_id in node_seen_time: ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000 node_name += f'\n {ms:.2f}ms' @@ -862,25 +891,29 @@ async def graph_traceroute2(request): if node_id in saw_reply: style += ', diagonals' - chart_nodes.append({ - "name": str(node_id), - "value": node_name, - "symbol": 'rect', - "long_name": node.long_name, - "short_name": node.short_name, - "role": node.role, - "hw_model": node.hw_model, - }) + chart_nodes.append( + { + "name": str(node_id), + "value": node_name, + "symbol": 'rect', + "long_name": node.long_name, + "short_name": node.short_name, + "role": node.role, + "hw_model": node.hw_model, + } + ) # Create edges for path in paths: color = '#' + hex(hash(tuple(path)))[3:9] - for src, dest in zip(path, path[1:]): - chart_edges.append({ - "source": str(src), - "target": str(dest), - "originalColor": color, - }) + for src, dest in zip(path, path[1:], strict=False): + chart_edges.append( + { + "source": str(src), + "target": str(dest), + "originalColor": color, + } + ) chart_data = { "nodes": chart_nodes, @@ -895,7 +928,6 @@ async def graph_traceroute2(request): ) - @routes.get("/graph/network") async def graph_network(request): root = request.query.get("root") @@ -962,7 +994,7 @@ async def graph_network(request): else: tr_done.add(tr.packet_id) - for src, dest in zip(path, path[1:]): + for src, dest in zip(path, path[1:], strict=False): used_nodes.add(src) used_nodes.add(dest) edges[(src, dest)] += 1 @@ -984,7 +1016,7 @@ async def graph_network(request): edge_map.setdefault(dest, []).append(src) queue = [int(root)] - for i in range(depth): + for _ in range(depth): next_queue = [] for node in queue: new_used_nodes.add(node) @@ -997,8 +1029,15 @@ async def graph_network(request): used_nodes = new_used_nodes edges = new_edges # Create the graph - graph = pydot.Dot('network', graph_type="digraph", layout="sfdp", overlap="prism", esep="+10", nodesep="0.5", - ranksep="1") + graph = pydot.Dot( + 'network', + graph_type="digraph", + layout="sfdp", + overlap="prism", + esep="+10", + nodesep="0.5", + ranksep="1", + ) for node_id in used_nodes: node_future = nodes.get(node_id) @@ -1015,30 +1054,32 @@ async def graph_network(request): elif node and node.role == 'CLIENT_MUTE': color = '#00FF00' - graph.add_node(pydot.Node( - str(node_id), - label=node_name, - shape='box', - color=color, - href=f"/graph/network?root={node_id}&depth={depth - 1}", - )) - - if edges: - max_edge_count = edges.most_common(1)[0][1] - else: - max_edge_count = 1 - - size_ratio = 2. / max_edge_count + graph.add_node( + pydot.Node( + str(node_id), + label=node_name, + shape='box', + color=color, + href=f"/graph/network?root={node_id}&depth={depth - 1}", + ) + ) + # FIXME: Not used + # if edges: + # max_edge_count = edges.most_common(1)[0][1] + # else: + # max_edge_count = 1 + # size_ratio = 2.0 / max_edge_count edge_added = set() - for (src, dest), edge_count in edges.items(): - size = max(size_ratio * edge_count, .25) - arrowsize = max(size_ratio * edge_count, .5) + for (src, dest), _ in edges.items(): + # FIXME: These are not used + # size = max(size_ratio * edge_count, 0.25) + # arrowsize = max(size_ratio * edge_count, 0.5) if edge_type[(src, dest)] in ('ni'): color = '#FF0000' - elif edge_type[(src, dest)] in ('sni'): + elif edge_type[(src, dest)] in ('sni'): color = '#00FF00' else: color = '#000000' @@ -1049,30 +1090,28 @@ async def graph_network(request): if (src, dest) not in edge_added: edge_added.add((src, dest)) - graph.add_edge(pydot.Edge( - str(src), - str(dest), - color=color, - tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}', - penwidth=1.85, - dir=edge_dir, - )) + graph.add_edge( + pydot.Edge( + str(src), + str(dest), + color=color, + tooltip=f'{await get_node_name(src)} -> {await get_node_name(dest)}', + penwidth=1.85, + dir=edge_dir, + ) + ) return web.Response( body=graph.create_svg(), content_type="image/svg+xml", ) - @routes.get("/nodelist") async def nodelist(request): try: template = env.get_template("nodelist.html") return web.Response( - text=template.render( - site_config=CONFIG, - SOFTWARE_RELEASE=SOFTWARE_RELEASE - ), + text=template.render(site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE), content_type="text/html", ) except Exception: @@ -1086,14 +1125,12 @@ async def nodelist(request): return web.Response(text=rendered, status=500, content_type="text/html") - @routes.get("/net") async def net(request): try: # Fetch packets for the given node ID and port number after_time = datetime.datetime.now() - timedelta(days=6) - packets = await store.get_packets( - portnum=PortNum.TEXT_MESSAGE_APP, after=after_time) + packets = await store.get_packets(portnum=PortNum.TEXT_MESSAGE_APP, after=after_time) # Convert packets to UI packets ui_packets = [Packet.from_model(p) for p in packets] @@ -1102,22 +1139,22 @@ async def net(request): # Filter packets: exclude "seq \d+$" but include those containing Tag filtered_packets = [ - p for p in ui_packets - if not seq_pattern.match(p.payload) and (CONFIG["site"]["net_tag"]).lower() in p.payload.lower() + p + for p in ui_packets + if not seq_pattern.match(p.payload) + and (CONFIG["site"]["net_tag"]).lower() in p.payload.lower() ] # Render template template = env.get_template("net.html") return web.Response( text=template.render( - packets=filtered_packets, - site_config = CONFIG, - SOFTWARE_RELEASE=SOFTWARE_RELEASE + packets=filtered_packets, site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE ), content_type="text/html", ) - except web.HTTPException as e: + except web.HTTPException: raise # Let aiohttp handle HTTP exceptions properly except Exception as e: @@ -1144,12 +1181,12 @@ async def map(request): for node in nodes: if hasattr(node, "last_update") and isinstance(node.last_update, datetime.datetime): node.last_update = node.last_update.isoformat() - + # Parse optional URL parameters for custom view map_center_lat = request.query.get("lat") map_center_lng = request.query.get("lng") map_zoom = request.query.get("zoom") - + # Validate and convert parameters if provided custom_view = None if map_center_lat and map_center_lng: @@ -1161,7 +1198,7 @@ async def map(request): except (ValueError, TypeError): # Invalid parameters, ignore and use defaults pass - + template = env.get_template("map.html") return web.Response( @@ -1169,16 +1206,18 @@ async def map(request): nodes=nodes, custom_view=custom_view, site_config=CONFIG, - SOFTWARE_RELEASE=SOFTWARE_RELEASE), + SOFTWARE_RELEASE=SOFTWARE_RELEASE, + ), content_type="text/html", ) - except Exception as e: + except Exception: return web.Response( text="An error occurred while processing your request.", status=500, content_type="text/plain", ) + @routes.get("/stats") async def stats(request): try: @@ -1191,7 +1230,7 @@ async def stats(request): total_packets=total_packets, total_nodes=total_nodes, total_packets_seen=total_packets_seen, - site_config = CONFIG, + site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE, ), content_type="text/html", @@ -1203,6 +1242,7 @@ async def stats(request): content_type="text/plain", ) + @routes.get("/top") async def top(request): try: @@ -1212,15 +1252,15 @@ async def top(request): # If node_id is provided, fetch traffic data for the specific node node_traffic = await store.get_node_traffic(int(node_id)) template = env.get_template("node_traffic.html") # Render a different template - html_content = template.render(traffic=node_traffic, node_id=node_id, site_config = CONFIG) + html_content = template.render( + traffic=node_traffic, node_id=node_id, site_config=CONFIG + ) else: # Otherwise, fetch top traffic nodes as usual top_nodes = await store.get_top_traffic_nodes() template = env.get_template("top.html") html_content = template.render( - nodes=top_nodes, - site_config = CONFIG, - SOFTWARE_RELEASE=SOFTWARE_RELEASE + nodes=top_nodes, site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE ) return web.Response( @@ -1238,15 +1278,13 @@ async def top(request): ) return web.Response(text=rendered, status=500, content_type="text/html") + @routes.get("/chat") async def chat(request): try: template = env.get_template("chat.html") return web.Response( - text=template.render( - site_config=CONFIG, - SOFTWARE_RELEASE=SOFTWARE_RELEASE - ), + text=template.render(site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE), content_type="text/html", ) except Exception as e: @@ -1266,7 +1304,9 @@ async def chat(request): async def nodegraph(request): nodes = await store.get_nodes(days_active=3) # Fetch nodes for the given channel node_ids = set() - edges_map = defaultdict(lambda: { "weight": 0, "type": None }) # weight is based on the number of traceroutes and neighbor info packets + edges_map = defaultdict( + lambda: {"weight": 0, "type": None} + ) # weight is based on the number of traceroutes and neighbor info packets used_nodes = set() # This will track nodes involved in edges (including traceroutes) since = datetime.timedelta(hours=48) traceroutes = [] @@ -1308,7 +1348,9 @@ async def nodegraph(request): edge_pair = (node.node_id, packet.from_node_id) edges_map[edge_pair]["weight"] += 1 - edges_map[edge_pair]["type"] = "neighbor" # Overrides an existing traceroute pairing with neighbor + edges_map[edge_pair]["type"] = ( + "neighbor" # Overrides an existing traceroute pairing with neighbor + ) except Exception as e: logger.error(f"Error decoding NeighborInfo packet: {e}") @@ -1332,12 +1374,13 @@ async def nodegraph(request): text=template.render( nodes=nodes_with_edges, edges=edges, # Pass edges with color info - site_config = CONFIG, + site_config=CONFIG, SOFTWARE_RELEASE=SOFTWARE_RELEASE, ), content_type="text/html", ) + # Show basic details about the site on the site @routes.get("/config") async def get_config(request): @@ -1345,21 +1388,23 @@ async def get_config(request): site = CONFIG.get("site", {}) mqtt = CONFIG.get("mqtt", {}) - return web.json_response({ - "Server": site.get("domain", ""), - "Title": site.get("title", ""), - "Message": site.get("message", ""), - "MQTT Server": mqtt.get("server", ""), - "Topics": json.loads(mqtt.get("topics", "[]")), - "Release": SOFTWARE_RELEASE, - "Time": datetime.datetime.now().isoformat() - }, dumps=lambda obj: json.dumps(obj, indent=2)) + return web.json_response( + { + "Server": site.get("domain", ""), + "Title": site.get("title", ""), + "Message": site.get("message", ""), + "MQTT Server": mqtt.get("server", ""), + "Topics": json.loads(mqtt.get("topics", "[]")), + "Release": SOFTWARE_RELEASE, + "Time": datetime.datetime.now().isoformat(), + }, + dumps=lambda obj: json.dumps(obj, indent=2), + ) except (json.JSONDecodeError, TypeError): return web.json_response({"error": "Invalid configuration format"}, status=500) - # API Section ####################################################################### # How this works @@ -1368,6 +1413,7 @@ async def get_config(request): # The response includes "latest_import_time" for frontend to keep track of the newest message timestamp. # The backend fetches extra packets (limit*5) to account for filtering messages like "seq N" and since filtering. + @routes.get("/api/channels") async def api_channels(request: web.Request): period_type = request.query.get("period_type", "hour") @@ -1412,15 +1458,12 @@ async def api_chat(request): # Filter out "seq N" and missing payloads filtered_packets = [ - p for p in ui_packets - if p.payload and not SEQ_REGEX.fullmatch(p.payload) + p for p in ui_packets if p.payload and not SEQ_REGEX.fullmatch(p.payload) ] # Apply "since" filter if since: - filtered_packets = [ - p for p in filtered_packets if p.import_time > since - ] + filtered_packets = [p for p in filtered_packets if p.import_time > since] # Sort by import_time descending (latest first) filtered_packets.sort(key=lambda p: p.import_time, reverse=True) @@ -1432,9 +1475,7 @@ async def api_chat(request): packets_data = [] for p in filtered_packets: reply_id = getattr( - getattr(getattr(p, "raw_mesh_packet", None), "decoded", None), - "reply_id", - None + getattr(getattr(p, "raw_mesh_packet", None), "decoded", None), "reply_id", None ) packet_dict = { @@ -1459,16 +1500,17 @@ async def api_chat(request): else: latest_import_time = None - return web.json_response({ - "packets": packets_data, - "latest_import_time": latest_import_time, - }) + return web.json_response( + { + "packets": packets_data, + "latest_import_time": latest_import_time, + } + ) except Exception as e: logger.error(f"Error in /api/chat: {e}") return web.json_response( - {"error": "Failed to fetch chat data", "details": str(e)}, - status=500 + {"error": "Failed to fetch chat data", "details": str(e)}, status=500 ) @@ -1489,28 +1531,27 @@ async def api_nodes(request): # Fetch nodes from database using your get_nodes function nodes = await store.get_nodes( - role=role, - channel=channel, - hw_model=hw_model, - days_active=days_active + role=role, channel=channel, hw_model=hw_model, days_active=days_active ) # Prepare the JSON response nodes_data = [] for n in nodes: - nodes_data.append({ - "id": getattr(n, "id", None), - "node_id": n.node_id, - "long_name": n.long_name, - "short_name": n.short_name, - "hw_model": n.hw_model, - "firmware": n.firmware, - "role": n.role, - "last_lat": getattr(n, "last_lat", None), - "last_long": getattr(n, "last_long", None), - "channel": n.channel, - "last_update": n.last_update.isoformat() - }) + nodes_data.append( + { + "id": getattr(n, "id", None), + "node_id": n.node_id, + "long_name": n.long_name, + "short_name": n.short_name, + "hw_model": n.hw_model, + "firmware": n.firmware, + "role": n.role, + "last_lat": getattr(n, "last_lat", None), + "last_long": getattr(n, "last_long", None), + "channel": n.channel, + "last_update": n.last_update.isoformat(), + } + ) return web.json_response({"nodes": nodes_data}) @@ -1535,30 +1576,27 @@ async def api_packets(request): logger.error(f"Failed to parse 'since' timestamp '{since_str}': {e}") # Fetch last N packets - packets = await store.get_packets( - limit=limit, - after=since_time - ) + packets = await store.get_packets(limit=limit, after=since_time) packets = [Packet.from_model(p) for p in packets] # Build JSON response (no raw_payload) - packets_json = [{ - "id": p.id, - "from_node_id": p.from_node_id, - "to_node_id": p.to_node_id, - "portnum": int(p.portnum), - "import_time": p.import_time.isoformat(), - "payload": p.payload - } for p in packets] + packets_json = [ + { + "id": p.id, + "from_node_id": p.from_node_id, + "to_node_id": p.to_node_id, + "portnum": int(p.portnum), + "import_time": p.import_time.isoformat(), + "payload": p.payload, + } + for p in packets + ] return web.json_response({"packets": packets_json}) except Exception as e: logger.error(f"Error in /api/packets: {e}") - return web.json_response( - {"error": "Failed to fetch packets"}, - status=500 - ) + return web.json_response({"error": "Failed to fetch packets"}, status=500) @routes.get("/api/stats") @@ -1573,18 +1611,14 @@ async def api_stats(request): period_type = request.query.get("period_type", "hour").lower() if period_type not in allowed_periods: return web.json_response( - {"error": f"Invalid period_type. Must be one of {allowed_periods}"}, - status=400 + {"error": f"Invalid period_type. Must be one of {allowed_periods}"}, status=400 ) # length validation try: length = int(request.query.get("length", 24)) except ValueError: - return web.json_response( - {"error": "length must be an integer"}, - status=400 - ) + return web.json_response({"error": "length must be an integer"}, status=400) # Optional filters channel = request.query.get("channel") @@ -1597,8 +1631,8 @@ async def api_stats(request): except ValueError: raise web.HTTPBadRequest( text=json.dumps({"error": f"{name} must be an integer"}), - content_type="application/json" - ) + content_type="application/json", + ) from None return None portnum = parse_int_param("portnum") @@ -1612,7 +1646,7 @@ async def api_stats(request): channel=channel, portnum=portnum, to_node=to_node, - from_node=from_node + from_node=from_node, ) return web.json_response(stats) @@ -1623,8 +1657,8 @@ async def api_config(request): try: site = CONFIG.get("site", {}) safe_site = { - "map_interval": site.get("map_interval", 3), # default 3 if missing - "firehose_interval": site.get("firehose_interval", 3) # default 3 if missing + "map_interval": site.get("map_interval", 3), # default 3 if missing + "firehose_interval": site.get("firehose_interval", 3), # default 3 if missing } safe_config = {"site": safe_site} @@ -1653,7 +1687,7 @@ async def api_edges(request): path = [tr.packet.from_node_id] + list(route.route) path.append(tr.packet.to_node_id if tr.done else tr.gateway_node_id) - for a, b in zip(path, path[1:]): + for a, b in zip(path, path[1:], strict=False): edges[(a, b)] = "traceroute" # Only build neighbor edges if requested @@ -1665,15 +1699,13 @@ async def api_edges(request): for node in neighbor_info.neighbors: edges.setdefault((node.node_id, packet.from_node_id), "neighbor") except Exception as e: - logger.error(f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}") - - return web.json_response({ - "edges": [ - {"from": a, "to": b, "type": typ} - for (a, b), typ in edges.items() - ] - }) + logger.error( + f"Error decoding NeighborInfo packet {getattr(packet, 'id', '?')}: {e}" + ) + return web.json_response( + {"edges": [{"from": a, "to": b, "type": typ} for (a, b), typ in edges.items()]} + ) # Generic static HTML route @@ -1696,11 +1728,11 @@ async def serve_page(request): async def run_server(): app = web.Application() app.add_routes(routes) - + # Check if access logging should be disabled enable_access_log = CONFIG.get("logging", {}).get("access_log", "False").lower() == "true" access_log_handler = None if not enable_access_log else logging.getLogger("aiohttp.access") - + runner = web.AppRunner(app, access_log=access_log_handler) await runner.setup() if CONFIG["server"]["tls_cert"]: diff --git a/mvrun.py b/mvrun.py index 02f83bf..0b9e7e4 100644 --- a/mvrun.py +++ b/mvrun.py @@ -1,15 +1,15 @@ import argparse -import threading -import subprocess import logging import os import signal +import subprocess import sys +import threading logging.basicConfig( level=logging.INFO, format="%(asctime)s %(filename)s:%(lineno)d [pid:%(process)d] %(levelname)s - %(message)s", - datefmt="%Y-%m-%d %H:%M:%S" + datefmt="%Y-%m-%d %H:%M:%S", ) logger = logging.getLogger(__name__) @@ -18,6 +18,7 @@ logger = logging.getLogger(__name__) running_processes = [] pid_files = [] + def cleanup_pid_file(pid_file): """Remove a PID file if it exists""" if os.path.exists(pid_file): @@ -27,10 +28,11 @@ def cleanup_pid_file(pid_file): except Exception as e: logger.error(f"Error removing PID file {pid_file}: {e}") + def signal_handler(sig, frame): """Handle Ctrl-C gracefully""" logger.info("Received interrupt signal (Ctrl-C), shutting down gracefully...") - + # Terminate all running processes for process in running_processes: if process and process.poll() is None: # Process is still running @@ -47,14 +49,15 @@ def signal_handler(sig, frame): process.wait() except Exception as e: logger.error(f"Error terminating process PID {process.pid}: {e}") - + # Clean up PID files for pid_file in pid_files: cleanup_pid_file(pid_file) - + logger.info("Shutdown complete") sys.exit(0) + # Run python in subprocess def run_script(script_name, pid_file, *args): process = None @@ -67,31 +70,34 @@ def run_script(script_name, pid_file, *args): # Run the subprocess (output goes directly to console for real-time viewing) process = subprocess.Popen(command) - + # Track the process globally running_processes.append(process) - + # Write PID to file with open(pid_file, 'w') as f: f.write(str(process.pid)) logger.info(f"Started {script_name} with PID {process.pid}, written to {pid_file}") - + # Wait for the process to complete process.wait() - + except Exception as e: logger.error(f"Error running {script_name}: {e}") finally: # Clean up PID file when process exits cleanup_pid_file(pid_file) + # Parse runtime argument (--config) and start subprocess threads def main(): # Register signal handler for Ctrl-C signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - - parser = argparse.ArgumentParser(description="Helper script to run the database and web frontend in separate threads.") + + parser = argparse.ArgumentParser( + description="Helper script to run the database and web frontend in separate threads." + ) # Add --config runtime argument parser.add_argument('--config', help="Path to the configuration file.", default='config.ini') @@ -100,16 +106,20 @@ def main(): # PID file paths db_pid_file = 'meshview-db.pid' web_pid_file = 'meshview-web.pid' - + # Track PID files globally for cleanup pid_files.append(db_pid_file) pid_files.append(web_pid_file) # Database Thread - dbthrd = threading.Thread(target=run_script, args=('startdb.py', db_pid_file, '--config', args.config)) + dbthrd = threading.Thread( + target=run_script, args=('startdb.py', db_pid_file, '--config', args.config) + ) # Web server thread - webthrd = threading.Thread(target=run_script, args=('main.py', web_pid_file, '--config', args.config)) + webthrd = threading.Thread( + target=run_script, args=('main.py', web_pid_file, '--config', args.config) + ) # Start Meshview subprocess threads logger.info(f"Starting Meshview with config: {args.config}") @@ -125,5 +135,6 @@ def main(): # This shouldn't be reached due to signal handler, but just in case signal_handler(signal.SIGINT, None) + if __name__ == '__main__': main() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b879c85 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,13 @@ +[tool.ruff] +# Linting +target-version = "py313" +line-length = 100 +extend-exclude = ["build", "dist", ".venv"] + +[tool.ruff.lint] +select = ["E", "F", "I", "UP", "B"] # pick your rulesets +ignore = ["E501"] # example; let formatter handle line length + +[tool.ruff.format] +quote-style = "preserve" +indent-style = "space" \ No newline at end of file diff --git a/startdb.py b/startdb.py index 568b7e2..06ecd25 100644 --- a/startdb.py +++ b/startdb.py @@ -1,12 +1,11 @@ import asyncio -import json import datetime +import json import logging + from sqlalchemy import delete -from meshview import mqtt_reader -from meshview import mqtt_database -from meshview import mqtt_store -from meshview import models + +from meshview import models, mqtt_database, mqtt_reader, mqtt_store from meshview.config import CONFIG # ------------------------- @@ -20,31 +19,32 @@ formatter = logging.Formatter('%(asctime)s [%(levelname)s] %(message)s') file_handler.setFormatter(formatter) cleanup_logger.addHandler(file_handler) + # ------------------------- # Helper functions # ------------------------- def get_bool(config, section, key, default=False): return str(config.get(section, {}).get(key, default)).lower() in ("1", "true", "yes", "on") + def get_int(config, section, key, default=0): try: return int(config.get(section, {}).get(key, default)) except ValueError: return default + # ------------------------- # Shared DB lock # ------------------------- db_lock = asyncio.Lock() + # ------------------------- # Database cleanup using ORM # ------------------------- async def daily_cleanup_at( - hour: int = 2, - minute: int = 0, - days_to_keep: int = 14, - vacuum_db: bool = True + hour: int = 2, minute: int = 0, days_to_keep: int = 14, vacuum_db: bool = True ): while True: now = datetime.datetime.now() @@ -56,7 +56,9 @@ async def daily_cleanup_at( await asyncio.sleep(delay) # Local-time cutoff as string for SQLite DATETIME comparison - cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime("%Y-%m-%d %H:%M:%S") + cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime( + "%Y-%m-%d %H:%M:%S" + ) cleanup_logger.info(f"Running cleanup for records older than {cutoff}...") try: @@ -110,6 +112,7 @@ async def daily_cleanup_at( except Exception as e: cleanup_logger.error(f"Error during cleanup: {e}") + # ------------------------- # MQTT loading # ------------------------- @@ -118,7 +121,7 @@ async def load_database_from_mqtt( mqtt_port: int, topics: list, mqtt_user: str | None = None, - mqtt_passwd: str | None = None + mqtt_passwd: str | None = None, ): async for topic, env in mqtt_reader.get_topic_envelopes( mqtt_server, mqtt_port, topics, mqtt_user, mqtt_passwd @@ -126,6 +129,7 @@ async def load_database_from_mqtt( async with db_lock: # Block if cleanup is running await mqtt_store.process_envelope(topic, env) + # ------------------------- # Main function # ------------------------- @@ -156,12 +160,11 @@ async def main(): ) if cleanup_enabled: - tg.create_task( - daily_cleanup_at(cleanup_hour, cleanup_minute, cleanup_days, vacuum_db) - ) + tg.create_task(daily_cleanup_at(cleanup_hour, cleanup_minute, cleanup_days, vacuum_db)) else: cleanup_logger.info("Daily cleanup is disabled by configuration.") + # ------------------------- # Entry point # -------------------------