From f85e783e8c36ce51f0c907a2d8cdf5027d15a965 Mon Sep 17 00:00:00 2001 From: pablorevilla-meshtastic Date: Mon, 12 Jan 2026 14:18:51 -0800 Subject: [PATCH 1/3] Adding code to work with multiple databases types. --- README-Docker.md | 4 + README.md | 5 +- .../9f3b1a8d2c4f_drop_import_time_columns.py | 1 + ...b7c3c2e3a1f0_add_last_update_us_to_node.py | 94 ++++++++++++++ ...7b0c2e1a4_drop_last_update_us_from_node.py | 34 +++++ meshview/config.py | 2 +- meshview/database.py | 18 ++- meshview/migrations.py | 15 ++- meshview/models.py | 9 +- meshview/mqtt_database.py | 9 +- meshview/mqtt_store.py | 75 ++++++----- meshview/static/kiosk.html | 6 +- meshview/store.py | 120 ++++++++++-------- meshview/templates/firehose.html | 37 +++++- meshview/templates/map.html | 10 +- meshview/web_api/api.py | 108 +++++++--------- sample.config.ini | 5 +- startdb.py | 37 +++--- 18 files changed, 393 insertions(+), 196 deletions(-) create mode 100644 alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py create mode 100644 alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py diff --git a/README-Docker.md b/README-Docker.md index d87d368..5a929ca 100644 --- a/README-Docker.md +++ b/README-Docker.md @@ -128,6 +128,10 @@ username = password = [database] +# SQLAlchemy async connection string. +# Examples: +# sqlite+aiosqlite:///var/lib/meshview/packets.db +# postgresql+asyncpg://user:pass@host:5432/meshview connection_string = sqlite+aiosqlite:///var/lib/meshview/packets.db ``` diff --git a/README.md b/README.md index c6bbb70..798d58e 100644 --- a/README.md +++ b/README.md @@ -272,7 +272,10 @@ password = large4cats # Database Configuration # ------------------------- [database] -# SQLAlchemy connection string. This one uses SQLite with asyncio support. +# SQLAlchemy async connection string. +# Examples: +# sqlite+aiosqlite:///packets.db +# postgresql+asyncpg://user:pass@host:5432/meshview connection_string = sqlite+aiosqlite:///packets.db diff --git a/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py b/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py index e7eb5b8..24e2ccc 100644 --- a/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py +++ b/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py @@ -8,6 +8,7 @@ Create Date: 2026-01-09 09:55:00.000000 from collections.abc import Sequence import sqlalchemy as sa + from alembic import op # revision identifiers, used by Alembic. diff --git a/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py b/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py new file mode 100644 index 0000000..f2129c1 --- /dev/null +++ b/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py @@ -0,0 +1,94 @@ +"""Add last_update_us to node and migrate data. + +Revision ID: b7c3c2e3a1f0 +Revises: 9f3b1a8d2c4f +Create Date: 2026-01-12 10:12:00.000000 +""" + +from collections.abc import Sequence +from datetime import UTC, datetime + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "b7c3c2e3a1f0" +down_revision: str | None = "9f3b1a8d2c4f" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def _parse_datetime(value): + if value is None: + return None + if isinstance(value, datetime): + dt = value + elif isinstance(value, str): + text = value.replace("Z", "+00:00") + try: + dt = datetime.fromisoformat(text) + except ValueError: + return None + else: + return None + + if dt.tzinfo is None: + return dt.replace(tzinfo=UTC) + return dt.astimezone(UTC) + + +def upgrade() -> None: + conn = op.get_bind() + op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True)) + op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False) + + node = sa.table( + "node", + sa.column("id", sa.String()), + sa.column("last_update", sa.DateTime()), + sa.column("last_update_us", sa.BigInteger()), + ) + + rows = conn.execute(sa.select(node.c.id, node.c.last_update)).all() + for node_id, last_update in rows: + dt = _parse_datetime(last_update) + if dt is None: + continue + last_update_us = int(dt.timestamp() * 1_000_000) + conn.execute( + sa.update(node).where(node.c.id == node_id).values(last_update_us=last_update_us) + ) + + if conn.dialect.name == "sqlite": + with op.batch_alter_table("node", schema=None) as batch_op: + batch_op.drop_column("last_update") + else: + op.drop_column("node", "last_update") + + +def downgrade() -> None: + conn = op.get_bind() + op.add_column("node", sa.Column("last_update", sa.DateTime(), nullable=True)) + + node = sa.table( + "node", + sa.column("id", sa.String()), + sa.column("last_update", sa.DateTime()), + sa.column("last_update_us", sa.BigInteger()), + ) + + rows = conn.execute(sa.select(node.c.id, node.c.last_update_us)).all() + for node_id, last_update_us in rows: + if last_update_us is None: + continue + dt = datetime.fromtimestamp(last_update_us / 1_000_000, tz=UTC).replace(tzinfo=None) + conn.execute(sa.update(node).where(node.c.id == node_id).values(last_update=dt)) + + if conn.dialect.name == "sqlite": + with op.batch_alter_table("node", schema=None) as batch_op: + batch_op.drop_index("idx_node_last_update_us") + batch_op.drop_column("last_update_us") + else: + op.drop_index("idx_node_last_update_us", table_name="node") + op.drop_column("node", "last_update_us") diff --git a/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py b/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py new file mode 100644 index 0000000..e7bd5a0 --- /dev/null +++ b/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py @@ -0,0 +1,34 @@ +"""Drop last_update_us from node. + +Revision ID: d4d7b0c2e1a4 +Revises: b7c3c2e3a1f0 +Create Date: 2026-01-12 10:20:00.000000 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "d4d7b0c2e1a4" +down_revision: str | None = "b7c3c2e3a1f0" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + conn = op.get_bind() + if conn.dialect.name == "sqlite": + with op.batch_alter_table("node", schema=None) as batch_op: + batch_op.drop_index("idx_node_last_update_us") + batch_op.drop_column("last_update_us") + else: + op.drop_index("idx_node_last_update_us", table_name="node") + op.drop_column("node", "last_update_us") + + +def downgrade() -> None: + op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True)) + op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False) diff --git a/meshview/config.py b/meshview/config.py index e6e6b14..e6d4600 100644 --- a/meshview/config.py +++ b/meshview/config.py @@ -6,7 +6,7 @@ parser = argparse.ArgumentParser(description="MeshView Configuration Loader") parser.add_argument( "--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)" ) -args = parser.parse_args() +args, _ = parser.parse_known_args() # Initialize config parser config_parser = configparser.ConfigParser() diff --git a/meshview/database.py b/meshview/database.py index 1505905..068adfa 100644 --- a/meshview/database.py +++ b/meshview/database.py @@ -1,3 +1,4 @@ +from sqlalchemy.engine.url import make_url from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine from meshview import models @@ -9,10 +10,19 @@ async_session = None def init_database(database_connection_string): global engine, async_session kwargs = {"echo": False} - # Ensure SQLite is opened in read-only mode - database_connection_string += "?mode=ro" - kwargs["connect_args"] = {"uri": True} - engine = create_async_engine(database_connection_string, **kwargs) + url = make_url(database_connection_string) + connect_args = {} + + if url.drivername.startswith("sqlite"): + query = dict(url.query) + query.setdefault("mode", "ro") + url = url.set(query=query) + connect_args["uri"] = True + + if connect_args: + kwargs["connect_args"] = connect_args + + engine = create_async_engine(url, **kwargs) async_session = async_sessionmaker( bind=engine, class_=AsyncSession, diff --git a/meshview/migrations.py b/meshview/migrations.py index e76eeda..2ed5a50 100644 --- a/meshview/migrations.py +++ b/meshview/migrations.py @@ -186,19 +186,24 @@ async def create_migration_status_table(engine: AsyncEngine) -> None: text(""" CREATE TABLE IF NOT EXISTS migration_status ( id INTEGER PRIMARY KEY CHECK (id = 1), - in_progress BOOLEAN NOT NULL DEFAULT 0, + in_progress BOOLEAN NOT NULL DEFAULT FALSE, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) ) - # Insert initial row if not exists - await conn.execute( + result = await conn.execute( text(""" - INSERT OR IGNORE INTO migration_status (id, in_progress) - VALUES (1, 0) + SELECT 1 FROM migration_status WHERE id = 1 """) ) + if result.first() is None: + await conn.execute( + text(""" + INSERT INTO migration_status (id, in_progress) + VALUES (1, FALSE) + """) + ) async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None: diff --git a/meshview/models.py b/meshview/models.py index cbe6c22..759d471 100644 --- a/meshview/models.py +++ b/meshview/models.py @@ -1,5 +1,3 @@ -from datetime import datetime - from sqlalchemy import BigInteger, ForeignKey, Index, desc from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship @@ -22,7 +20,6 @@ class Node(Base): last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True) last_long: Mapped[int] = mapped_column(BigInteger, nullable=True) channel: Mapped[str] = mapped_column(nullable=True) - last_update: Mapped[datetime] = mapped_column(nullable=True) first_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True) last_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True) @@ -33,11 +30,7 @@ class Node(Base): ) def to_dict(self): - return { - column.name: getattr(self, column.name) - for column in self.__table__.columns - if column.name != "last_update" - } + return {column.name: getattr(self, column.name) for column in self.__table__.columns} class Packet(Base): diff --git a/meshview/mqtt_database.py b/meshview/mqtt_database.py index 6b5e004..e74b9b0 100644 --- a/meshview/mqtt_database.py +++ b/meshview/mqtt_database.py @@ -1,3 +1,4 @@ +from sqlalchemy.engine.url import make_url from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from meshview import models @@ -5,9 +6,11 @@ from meshview import models def init_database(database_connection_string): global engine, async_session - engine = create_async_engine( - database_connection_string, echo=False, connect_args={"timeout": 900} - ) + url = make_url(database_connection_string) + kwargs = {"echo": False} + if url.drivername.startswith("sqlite"): + kwargs["connect_args"] = {"timeout": 900} + engine = create_async_engine(url, **kwargs) async_session = async_sessionmaker(engine, expire_on_commit=False) diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py index d22c3f5..97018d4 100644 --- a/meshview/mqtt_store.py +++ b/meshview/mqtt_store.py @@ -1,8 +1,12 @@ import datetime +import logging import re +import time from sqlalchemy import select +from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.dialects.sqlite import insert as sqlite_insert +from sqlalchemy.exc import IntegrityError from meshtastic.protobuf.config_pb2 import Config from meshtastic.protobuf.mesh_pb2 import HardwareModel @@ -10,6 +14,8 @@ from meshtastic.protobuf.portnums_pb2 import PortNum from meshview import decode_payload, mqtt_database from meshview.models import Node, Packet, PacketSeen, Traceroute +logger = logging.getLogger(__name__) + async def process_envelope(topic, env): # MAP_REPORT_APP @@ -37,8 +43,7 @@ async def process_envelope(topic, env): await session.execute(select(Node).where(Node.node_id == node_id)) ).scalar_one_or_none() - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) + now_us = int(time.time() * 1_000_000) if node: node.node_id = node_id @@ -50,7 +55,6 @@ async def process_envelope(topic, env): node.last_lat = map_report.latitude_i node.last_long = map_report.longitude_i node.firmware = map_report.firmware_version - node.last_update = now node.last_seen_us = now_us if node.first_seen_us is None: node.first_seen_us = now_us @@ -66,7 +70,6 @@ async def process_envelope(topic, env): firmware=map_report.firmware_version, last_lat=map_report.latitude_i, last_long=map_report.longitude_i, - last_update=now, first_seen_us=now_us, last_seen_us=now_us, ) @@ -84,22 +87,41 @@ async def process_envelope(topic, env): result = await session.execute(select(Packet).where(Packet.id == env.packet.id)) packet = result.scalar_one_or_none() if not packet: - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) - stmt = ( - sqlite_insert(Packet) - .values( - id=env.packet.id, - portnum=env.packet.decoded.portnum, - from_node_id=getattr(env.packet, "from"), - to_node_id=env.packet.to, - payload=env.packet.SerializeToString(), - import_time_us=now_us, - channel=env.channel_id, + now_us = int(time.time() * 1_000_000) + packet_values = { + "id": env.packet.id, + "portnum": env.packet.decoded.portnum, + "from_node_id": getattr(env.packet, "from"), + "to_node_id": env.packet.to, + "payload": env.packet.SerializeToString(), + "import_time_us": now_us, + "channel": env.channel_id, + } + utc_time = datetime.datetime.fromtimestamp(now_us / 1_000_000, datetime.UTC) + dialect = session.get_bind().dialect.name + stmt = None + if dialect == "sqlite": + stmt = ( + sqlite_insert(Packet) + .values(**packet_values) + .on_conflict_do_nothing(index_elements=["id"]) ) - .on_conflict_do_nothing(index_elements=["id"]) - ) - await session.execute(stmt) + elif dialect == "postgresql": + stmt = ( + pg_insert(Packet) + .values(**packet_values) + .on_conflict_do_nothing(index_elements=["id"]) + ) + + if stmt is not None: + await session.execute(stmt) + else: + try: + async with session.begin_nested(): + session.add(Packet(**packet_values)) + await session.flush() + except IntegrityError: + pass # --- PacketSeen (no conflict handling here, normal insert) @@ -118,8 +140,7 @@ async def process_envelope(topic, env): ) ) if not result.scalar_one_or_none(): - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) + now_us = int(time.time() * 1_000_000) seen = PacketSeen( packet_id=env.packet.id, node_id=int(env.gateway_id[1:], 16), @@ -161,8 +182,7 @@ async def process_envelope(topic, env): await session.execute(select(Node).where(Node.id == user.id)) ).scalar_one_or_none() - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) + now_us = int(time.time() * 1_000_000) if node: node.node_id = node_id @@ -171,7 +191,6 @@ async def process_envelope(topic, env): node.hw_model = hw_model node.role = role node.channel = env.channel_id - node.last_update = now node.last_seen_us = now_us if node.first_seen_us is None: node.first_seen_us = now_us @@ -184,7 +203,6 @@ async def process_envelope(topic, env): hw_model=hw_model, role=role, channel=env.channel_id, - last_update=now, first_seen_us=now_us, last_seen_us=now_us, ) @@ -203,11 +221,9 @@ async def process_envelope(topic, env): await session.execute(select(Node).where(Node.node_id == from_node_id)) ).scalar_one_or_none() if node: - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) + now_us = int(time.time() * 1_000_000) node.last_lat = position.latitude_i node.last_long = position.longitude_i - node.last_update = now node.last_seen_us = now_us if node.first_seen_us is None: node.first_seen_us = now_us @@ -217,8 +233,7 @@ async def process_envelope(topic, env): if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP: packet_id = env.packet.id if packet_id is not None: - now = datetime.datetime.now(datetime.UTC) - now_us = int(now.timestamp() * 1_000_000) + now_us = int(time.time() * 1_000_000) session.add( Traceroute( packet_id=packet_id, diff --git a/meshview/static/kiosk.html b/meshview/static/kiosk.html index 002d0f3..799950a 100644 --- a/meshview/static/kiosk.html +++ b/meshview/static/kiosk.html @@ -75,8 +75,8 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; } return color; } - function timeAgo(dateStr){ - const diff = Date.now() - new Date(dateStr); + function timeAgoFromUs(us){ + const diff = Date.now() - (us / 1000); const s=Math.floor(diff/1000), m=Math.floor(s/60), h=Math.floor(m/60), d=Math.floor(h/24); if(d>0) return d+'d'; if(h>0) return h+'h'; if(m>0) return m+'m'; return s+'s'; } @@ -118,7 +118,7 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; } Channel: ${node.channel}
Model: ${node.hw_model}
Role: ${node.role}
`; - if(node.last_update) popupContent+=`Last seen: ${timeAgo(node.last_update)}
`; + if(node.last_seen_us) popupContent+=`Last seen: ${timeAgoFromUs(node.last_seen_us)}
`; if(node.firmware) popupContent+=`Firmware: ${node.firmware}
`; marker.on('click', e=>{ diff --git a/meshview/store.py b/meshview/store.py index 18072b7..7f5e98b 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta -from sqlalchemy import Text, and_, cast, func, or_, select, text +from sqlalchemy import Text, and_, cast, func, or_, select from sqlalchemy.orm import lazyload from meshview import database, models @@ -176,9 +176,9 @@ async def get_mqtt_neighbors(since): async def get_total_node_count(channel: str = None) -> int: try: async with database.async_session() as session: - q = select(func.count(Node.id)).where( - Node.last_update > datetime.now() - timedelta(days=1) - ) + now_us = int(datetime.utcnow().timestamp() * 1_000_000) + cutoff_us = now_us - 86400 * 1_000_000 + q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us) if channel: q = q.where(Node.channel == channel) @@ -193,26 +193,32 @@ async def get_total_node_count(channel: str = None) -> int: async def get_top_traffic_nodes(): try: async with database.async_session() as session: - result = await session.execute( - text(""" - SELECT - n.node_id, - n.long_name, - n.short_name, - n.channel, - COUNT(DISTINCT p.id) AS total_packets_sent, - COUNT(ps.packet_id) AS total_times_seen - FROM node n - LEFT JOIN packet p ON n.node_id = p.from_node_id - AND p.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000 - LEFT JOIN packet_seen ps ON p.id = ps.packet_id - GROUP BY n.node_id, n.long_name, n.short_name - HAVING total_packets_sent > 0 - ORDER BY total_times_seen DESC; - """) + now_us = int(datetime.utcnow().timestamp() * 1_000_000) + cutoff_us = now_us - 86400 * 1_000_000 + total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent") + total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen") + + stmt = ( + select( + Node.node_id, + Node.long_name, + Node.short_name, + Node.channel, + total_packets_sent, + total_times_seen, + ) + .select_from(Node) + .outerjoin( + Packet, + (Packet.from_node_id == Node.node_id) & (Packet.import_time_us >= cutoff_us), + ) + .outerjoin(PacketSeen, PacketSeen.packet_id == Packet.id) + .group_by(Node.node_id, Node.long_name, Node.short_name, Node.channel) + .having(total_packets_sent > 0) + .order_by(total_times_seen.desc()) ) - rows = result.fetchall() + rows = (await session.execute(stmt)).all() nodes = [ { @@ -235,33 +241,30 @@ async def get_top_traffic_nodes(): async def get_node_traffic(node_id: int): try: async with database.async_session() as session: - result = await session.execute( - text(""" - SELECT - node.long_name, packet.portnum, - COUNT(*) AS packet_count - FROM packet - JOIN node ON packet.from_node_id = node.node_id - WHERE node.node_id = :node_id - AND packet.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000 - GROUP BY packet.portnum - ORDER BY packet_count DESC; - """), - {"node_id": node_id}, + now_us = int(datetime.utcnow().timestamp() * 1_000_000) + cutoff_us = now_us - 86400 * 1_000_000 + packet_count = func.count().label("packet_count") + + stmt = ( + select(Node.long_name, Packet.portnum, packet_count) + .select_from(Packet) + .join(Node, Packet.from_node_id == Node.node_id) + .where(Node.node_id == node_id) + .where(Packet.import_time_us >= cutoff_us) + .group_by(Node.long_name, Packet.portnum) + .order_by(packet_count.desc()) ) - # Map the result to include node.long_name and packet data - traffic_data = [ + result = await session.execute(stmt) + return [ { - "long_name": row[0], # node.long_name - "portnum": row[1], # packet.portnum - "packet_count": row[2], # COUNT(*) as packet_count + "long_name": row.long_name, + "portnum": row.portnum, + "packet_count": row.packet_count, } for row in result.all() ] - return traffic_data - except Exception as e: # Log the error or handle it as needed print(f"Error fetching node traffic: {str(e)}") @@ -299,10 +302,12 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a query = query.where(Node.hw_model == hw_model) if days_active is not None: - query = query.where(Node.last_update > datetime.now() - timedelta(days_active)) + now_us = int(datetime.utcnow().timestamp() * 1_000_000) + cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000) + query = query.where(Node.last_seen_us > cutoff_us) - # Exclude nodes where last_update is an empty string - query = query.where(Node.last_update != "") + # Exclude nodes with missing last_seen_us + query = query.where(Node.last_seen_us.is_not(None)) # Order results by long_name in ascending order query = query.order_by(Node.short_name.asc()) @@ -329,20 +334,31 @@ async def get_packet_stats( if period_type == "hour": start_time = now - timedelta(hours=length) - time_format = '%Y-%m-%d %H:00' + time_format_sqlite = "%Y-%m-%d %H:00" + time_format_pg = "YYYY-MM-DD HH24:00" elif period_type == "day": start_time = now - timedelta(days=length) - time_format = '%Y-%m-%d' + time_format_sqlite = "%Y-%m-%d" + time_format_pg = "YYYY-MM-DD" else: raise ValueError("period_type must be 'hour' or 'day'") async with database.async_session() as session: - q = select( - func.strftime( - time_format, + dialect = session.get_bind().dialect.name + if dialect == "postgresql": + period_expr = func.to_char( + func.to_timestamp(Packet.import_time_us / 1_000_000.0), + time_format_pg, + ) + else: + period_expr = func.strftime( + time_format_sqlite, func.datetime(Packet.import_time_us / 1_000_000, "unixepoch"), - ).label('period'), - func.count().label('count'), + ) + + q = select( + period_expr.label("period"), + func.count().label("count"), ).where(Packet.import_time_us >= int(start_time.timestamp() * 1_000_000)) # Filters diff --git a/meshview/templates/firehose.html b/meshview/templates/firehose.html index fe30b15..790b4d6 100644 --- a/meshview/templates/firehose.html +++ b/meshview/templates/firehose.html @@ -201,13 +201,37 @@ function portLabel(portnum, payload, linksHtml) { /* ====================================================== TIME FORMAT ====================================================== */ -function formatLocalTime(importTimeUs) { - const ms = importTimeUs / 1000; - return new Date(ms).toLocaleTimeString([], { +function formatTimes(importTimeUs) { + const ms = Number(importTimeUs) / 1000; + if (!Number.isFinite(ms)) { + return { local: "—", utc: "—", epoch: "—" }; + } + const date = new Date(ms); + const local = date.toLocaleTimeString([], { hour: "2-digit", minute: "2-digit", - second: "2-digit" + second: "2-digit", + timeZoneName: "short" }); + const utc = date.toLocaleTimeString([], { + hour: "2-digit", + minute: "2-digit", + second: "2-digit", + timeZone: "UTC", + timeZoneName: "short" + }); + return { local, utc, epoch: String(importTimeUs) }; +} + +function logPacketTimes(packet) { + const times = formatTimes(packet.import_time_us); + console.log( + "[firehose] packet time", + "id=" + packet.id, + "epoch_us=" + times.epoch, + "local=" + times.local, + "utc=" + times.utc + ); } /* ====================================================== @@ -245,6 +269,7 @@ async function fetchUpdates() { const list = document.getElementById("packet_list"); for (const pkt of packets.reverse()) { + logPacketTimes(pkt); /* FROM — includes translation */ const from = @@ -304,7 +329,9 @@ async function fetchUpdates() { const html = ` - ${formatLocalTime(pkt.import_time_us)} + + ${formatTimes(pkt.import_time_us).local}
+ diff --git a/meshview/templates/map.html b/meshview/templates/map.html index 08e16fb..4ca22cc 100644 --- a/meshview/templates/map.html +++ b/meshview/templates/map.html @@ -140,8 +140,8 @@ map.on("popupopen", function (e) { if (popupEl) applyTranslationsMap(popupEl); }); -function timeAgo(date){ - const diff = Date.now() - new Date(date); +function timeAgoFromUs(us){ + const diff = Date.now() - (us / 1000); const s = Math.floor(diff/1000), m = Math.floor(s/60), h = Math.floor(m/60), d = Math.floor(h/24); return d>0?d+"d":h>0?h+"h":m>0?m+"m":s+"s"; @@ -289,7 +289,7 @@ fetch('/api/nodes?days_active=3') hw_model: n.hw_model || "", role: n.role || "", firmware: n.firmware || "", - last_update: n.last_update || "", + last_seen_us: n.last_seen_us || null, isRouter: (n.role||"").toLowerCase().includes("router") })); @@ -333,8 +333,8 @@ function renderNodesOnMap(){ ${node.role}
${ - node.last_update - ? ` ${timeAgo(node.last_update)}
` + node.last_seen_us + ? ` ${timeAgoFromUs(node.last_seen_us)}
` : "" } diff --git a/meshview/web_api/api.py b/meshview/web_api/api.py index df5bde9..517cc73 100644 --- a/meshview/web_api/api.py +++ b/meshview/web_api/api.py @@ -6,12 +6,15 @@ import logging import os from aiohttp import web -from sqlalchemy import text +from sqlalchemy import func, select, text from meshtastic.protobuf.portnums_pb2 import PortNum from meshview import database, decode_payload, store from meshview.__version__ import __version__, _git_revision_short, get_version_info from meshview.config import CONFIG +from meshview.models import Node +from meshview.models import Packet as PacketModel +from meshview.models import PacketSeen as PacketSeenModel logger = logging.getLogger(__name__) @@ -805,73 +808,54 @@ async def api_stats_top(request): limit = min(int(request.query.get("limit", 20)), 100) offset = int(request.query.get("offset", 0)) - params = { - "period_type": period_type, - "length": length, - "limit": limit, - "offset": offset, - } + multiplier = 3600 if period_type == "hour" else 86400 + window_us = length * multiplier * 1_000_000 - channel_filter = "" - if channel: - channel_filter = "AND n.channel = :channel" - params["channel"] = channel + max_packet_import = select(func.max(PacketModel.import_time_us)).scalar_subquery() + max_seen_import = select(func.max(PacketSeenModel.import_time_us)).scalar_subquery() - sql = f""" - WITH sent AS ( - SELECT - p.from_node_id AS node_id, - COUNT(*) AS sent - FROM packet p - WHERE p.import_time_us >= ( - SELECT MAX(import_time_us) FROM packet - ) - ( - CASE - WHEN :period_type = 'hour' THEN :length * 3600 * 1000000 - ELSE :length * 86400 * 1000000 - END - ) - GROUP BY p.from_node_id - ), - seen AS ( - SELECT - p.from_node_id AS node_id, - COUNT(*) AS seen - FROM packet_seen ps - JOIN packet p ON p.id = ps.packet_id - WHERE ps.import_time_us >= ( - SELECT MAX(import_time_us) FROM packet_seen - ) - ( - CASE - WHEN :period_type = 'hour' THEN :length * 3600 * 1000000 - ELSE :length * 86400 * 1000000 - END - ) - GROUP BY p.from_node_id + sent_cte = ( + select(PacketModel.from_node_id.label("node_id"), func.count().label("sent")) + .where(PacketModel.import_time_us >= max_packet_import - window_us) + .group_by(PacketModel.from_node_id) + .cte("sent") ) - SELECT - n.node_id, - n.long_name, - n.short_name, - n.channel, - COALESCE(s.sent, 0) AS sent, - COALESCE(se.seen, 0) AS seen - FROM node n - LEFT JOIN sent s ON s.node_id = n.node_id - LEFT JOIN seen se ON se.node_id = n.node_id - WHERE 1=1 - {channel_filter} - ORDER BY seen DESC - LIMIT :limit OFFSET :offset - """ - count_sql = f""" - SELECT COUNT(*) FROM node n WHERE 1=1 {channel_filter} - """ + seen_cte = ( + select(PacketModel.from_node_id.label("node_id"), func.count().label("seen")) + .select_from(PacketSeenModel) + .join(PacketModel, PacketModel.id == PacketSeenModel.packet_id) + .where(PacketSeenModel.import_time_us >= max_seen_import - window_us) + .group_by(PacketModel.from_node_id) + .cte("seen") + ) + + query = ( + select( + Node.node_id, + Node.long_name, + Node.short_name, + Node.channel, + func.coalesce(sent_cte.c.sent, 0).label("sent"), + func.coalesce(seen_cte.c.seen, 0).label("seen"), + ) + .select_from(Node) + .outerjoin(sent_cte, sent_cte.c.node_id == Node.node_id) + .outerjoin(seen_cte, seen_cte.c.node_id == Node.node_id) + .order_by(func.coalesce(seen_cte.c.seen, 0).desc()) + .limit(limit) + .offset(offset) + ) + + count_query = select(func.count()).select_from(Node) + + if channel: + query = query.where(Node.channel == channel) + count_query = count_query.where(Node.channel == channel) async with database.async_session() as session: - rows = (await session.execute(text(sql), params)).all() - total = (await session.execute(text(count_sql), params)).scalar() or 0 + rows = (await session.execute(query)).all() + total = (await session.execute(count_query)).scalar() or 0 nodes = [] for r in rows: diff --git a/sample.config.ini b/sample.config.ini index b8a1dd4..e4b834b 100644 --- a/sample.config.ini +++ b/sample.config.ini @@ -81,7 +81,10 @@ password = large4cats # Database Configuration # ------------------------- [database] -# SQLAlchemy connection string. This one uses SQLite with asyncio support. +# SQLAlchemy async connection string. +# Examples: +# sqlite+aiosqlite:///packets.db +# postgresql+asyncpg://user:pass@host:5432/meshview connection_string = sqlite+aiosqlite:///packets.db diff --git a/startdb.py b/startdb.py index e733875..57ea461 100644 --- a/startdb.py +++ b/startdb.py @@ -7,6 +7,7 @@ import shutil from pathlib import Path from sqlalchemy import delete +from sqlalchemy.engine.url import make_url from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store from meshview.config import CONFIG @@ -65,18 +66,16 @@ async def backup_database(database_url: str, backup_dir: str = ".") -> None: backup_dir: Directory to store backups (default: current directory) """ try: - # Extract database file path from connection string - # Format: sqlite+aiosqlite:///path/to/db.db - if not database_url.startswith("sqlite"): + url = make_url(database_url) + if not url.drivername.startswith("sqlite"): cleanup_logger.warning("Backup only supported for SQLite databases") return - db_path = database_url.split("///", 1)[1] if "///" in database_url else None - if not db_path: + if not url.database or url.database == ":memory:": cleanup_logger.error("Could not extract database path from connection string") return - db_file = Path(db_path) + db_file = Path(url.database) if not db_file.exists(): cleanup_logger.error(f"Database file not found: {db_file}") return @@ -153,11 +152,11 @@ async def daily_cleanup_at( cleanup_logger.info("Waiting 60 seconds for backup to complete...") await asyncio.sleep(60) - # Local-time cutoff as string for SQLite DATETIME comparison - cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime( - "%Y-%m-%d %H:%M:%S" - ) - cleanup_logger.info(f"Running cleanup for records older than {cutoff}...") + cutoff_dt = ( + datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=days_to_keep) + ).replace(tzinfo=None) + cutoff_us = int(cutoff_dt.timestamp() * 1_000_000) + cleanup_logger.info(f"Running cleanup for records older than {cutoff_dt.isoformat()}...") try: async with db_lock: # Pause ingestion @@ -168,7 +167,7 @@ async def daily_cleanup_at( # Packet # ------------------------- result = await session.execute( - delete(models.Packet).where(models.Packet.import_time < cutoff) + delete(models.Packet).where(models.Packet.import_time_us < cutoff_us) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet") @@ -176,7 +175,9 @@ async def daily_cleanup_at( # PacketSeen # ------------------------- result = await session.execute( - delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff) + delete(models.PacketSeen).where( + models.PacketSeen.import_time_us < cutoff_us + ) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen") @@ -184,7 +185,9 @@ async def daily_cleanup_at( # Traceroute # ------------------------- result = await session.execute( - delete(models.Traceroute).where(models.Traceroute.import_time < cutoff) + delete(models.Traceroute).where( + models.Traceroute.import_time_us < cutoff_us + ) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute") @@ -192,17 +195,19 @@ async def daily_cleanup_at( # Node # ------------------------- result = await session.execute( - delete(models.Node).where(models.Node.last_update < cutoff) + delete(models.Node).where(models.Node.last_seen_us < cutoff_us) ) cleanup_logger.info(f"Deleted {result.rowcount} rows from Node") await session.commit() - if vacuum_db: + if vacuum_db and mqtt_database.engine.dialect.name == "sqlite": cleanup_logger.info("Running VACUUM...") async with mqtt_database.engine.begin() as conn: await conn.exec_driver_sql("VACUUM;") cleanup_logger.info("VACUUM completed.") + elif vacuum_db: + cleanup_logger.info("VACUUM skipped (not supported for this database).") cleanup_logger.info("Cleanup completed successfully.") cleanup_logger.info("Ingestion resumed after cleanup.") From fa98f56318bb7c58e59fac920e714347793ea4d9 Mon Sep 17 00:00:00 2001 From: pablorevilla-meshtastic Date: Mon, 12 Jan 2026 20:10:19 -0800 Subject: [PATCH 2/3] Made a cople of changes to the time handling and database config. --- meshview/store.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/meshview/store.py b/meshview/store.py index 7f5e98b..c85122c 100644 --- a/meshview/store.py +++ b/meshview/store.py @@ -1,3 +1,4 @@ +import logging from datetime import datetime, timedelta from sqlalchemy import Text, and_, cast, func, or_, select @@ -6,6 +7,8 @@ from sqlalchemy.orm import lazyload from meshview import database, models from meshview.models import Node, Packet, PacketSeen, Traceroute +logger = logging.getLogger(__name__) + async def get_node(node_id): async with database.async_session() as session: @@ -176,7 +179,7 @@ async def get_mqtt_neighbors(since): async def get_total_node_count(channel: str = None) -> int: try: async with database.async_session() as session: - now_us = int(datetime.utcnow().timestamp() * 1_000_000) + now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us) @@ -193,7 +196,7 @@ async def get_total_node_count(channel: str = None) -> int: async def get_top_traffic_nodes(): try: async with database.async_session() as session: - now_us = int(datetime.utcnow().timestamp() * 1_000_000) + now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent") total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen") @@ -241,7 +244,7 @@ async def get_top_traffic_nodes(): async def get_node_traffic(node_id: int): try: async with database.async_session() as session: - now_us = int(datetime.utcnow().timestamp() * 1_000_000) + now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 packet_count = func.count().label("packet_count") @@ -293,7 +296,11 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a # Apply filters based on provided parameters if node_id is not None: - query = query.where(Node.node_id == node_id) + try: + node_id_int = int(node_id) + except (TypeError, ValueError): + node_id_int = node_id + query = query.where(Node.node_id == node_id_int) if role is not None: query = query.where(Node.role == role.upper()) # Ensure role is uppercase if channel is not None: @@ -302,7 +309,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a query = query.where(Node.hw_model == hw_model) if days_active is not None: - now_us = int(datetime.utcnow().timestamp() * 1_000_000) + now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000) query = query.where(Node.last_seen_us > cutoff_us) @@ -318,7 +325,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a return nodes # Return the list of nodes except Exception: - print("error reading DB") # Consider using logging instead of print + logger.exception("error reading DB") return [] # Return an empty list in case of failure @@ -330,7 +337,7 @@ async def get_packet_stats( to_node: int | None = None, from_node: int | None = None, ): - now = datetime.now() + now = datetime.now(datetime.UTC) if period_type == "hour": start_time = now - timedelta(hours=length) From c9639d851b91bb64046b18b5c5c2ed215f534706 Mon Sep 17 00:00:00 2001 From: pablorevilla-meshtastic Date: Thu, 15 Jan 2026 08:48:22 -0800 Subject: [PATCH 3/3] Fix Time function on store.py --- README.md | 32 ++++++++++++++++++++++++++++---- meshview/store.py | 12 ++++++------ 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 798d58e..e4c8535 100644 --- a/README.md +++ b/README.md @@ -278,6 +278,18 @@ password = large4cats # postgresql+asyncpg://user:pass@host:5432/meshview connection_string = sqlite+aiosqlite:///packets.db +> **NOTE (PostgreSQL setup)** +> If you want to use PostgreSQL instead of SQLite: +> +> 1) Install PostgreSQL for your OS. +> 2) Create a user and database: +> - `CREATE USER meshview WITH PASSWORD 'change_me';` +> - `CREATE DATABASE meshview OWNER meshview;` +> 3) Update `config.ini`: +> - `connection_string = postgresql+asyncpg://meshview:change_me@localhost:5432/meshview` +> 4) Initialize the schema: +> - `./env/bin/python startdb.py` + # ------------------------- # Database Cleanup Configuration @@ -496,10 +508,22 @@ sleep 5 echo "Run cleanup..." # Run cleanup queries sqlite3 "$DB_FILE" < int: try: async with database.async_session() as session: - now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) + now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us) @@ -196,7 +196,7 @@ async def get_total_node_count(channel: str = None) -> int: async def get_top_traffic_nodes(): try: async with database.async_session() as session: - now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) + now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent") total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen") @@ -244,7 +244,7 @@ async def get_top_traffic_nodes(): async def get_node_traffic(node_id: int): try: async with database.async_session() as session: - now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) + now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000) cutoff_us = now_us - 86400 * 1_000_000 packet_count = func.count().label("packet_count") @@ -309,7 +309,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a query = query.where(Node.hw_model == hw_model) if days_active is not None: - now_us = int(datetime.now(datetime.UTC).timestamp() * 1_000_000) + now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000) cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000) query = query.where(Node.last_seen_us > cutoff_us) @@ -337,7 +337,7 @@ async def get_packet_stats( to_node: int | None = None, from_node: int | None = None, ): - now = datetime.now(datetime.UTC) + now = datetime.now(timezone.utc) if period_type == "hour": start_time = now - timedelta(hours=length)