diff --git a/README-Docker.md b/README-Docker.md
index d87d368..5a929ca 100644
--- a/README-Docker.md
+++ b/README-Docker.md
@@ -128,6 +128,10 @@ username =
password =
[database]
+# SQLAlchemy async connection string.
+# Examples:
+# sqlite+aiosqlite:///var/lib/meshview/packets.db
+# postgresql+asyncpg://user:pass@host:5432/meshview
connection_string = sqlite+aiosqlite:///var/lib/meshview/packets.db
```
diff --git a/README.md b/README.md
index 79cac56..5672089 100644
--- a/README.md
+++ b/README.md
@@ -275,9 +275,24 @@ password = large4cats
# Database Configuration
# -------------------------
[database]
-# SQLAlchemy connection string. This one uses SQLite with asyncio support.
+# SQLAlchemy async connection string.
+# Examples:
+# sqlite+aiosqlite:///packets.db
+# postgresql+asyncpg://user:pass@host:5432/meshview
connection_string = sqlite+aiosqlite:///packets.db
+> **NOTE (PostgreSQL setup)**
+> If you want to use PostgreSQL instead of SQLite:
+>
+> 1) Install PostgreSQL for your OS.
+> 2) Create a user and database:
+> - `CREATE USER meshview WITH PASSWORD 'change_me';`
+> - `CREATE DATABASE meshview OWNER meshview;`
+> 3) Update `config.ini`:
+> - `connection_string = postgresql+asyncpg://meshview:change_me@localhost:5432/meshview`
+> 4) Initialize the schema:
+> - `./env/bin/python startdb.py`
+
# -------------------------
# Database Cleanup Configuration
diff --git a/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py b/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py
index e7eb5b8..24e2ccc 100644
--- a/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py
+++ b/alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py
@@ -8,6 +8,7 @@ Create Date: 2026-01-09 09:55:00.000000
from collections.abc import Sequence
import sqlalchemy as sa
+
from alembic import op
# revision identifiers, used by Alembic.
diff --git a/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py b/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
new file mode 100644
index 0000000..f2129c1
--- /dev/null
+++ b/alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
@@ -0,0 +1,94 @@
+"""Add last_update_us to node and migrate data.
+
+Revision ID: b7c3c2e3a1f0
+Revises: 9f3b1a8d2c4f
+Create Date: 2026-01-12 10:12:00.000000
+"""
+
+from collections.abc import Sequence
+from datetime import UTC, datetime
+
+import sqlalchemy as sa
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "b7c3c2e3a1f0"
+down_revision: str | None = "9f3b1a8d2c4f"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def _parse_datetime(value):
+ if value is None:
+ return None
+ if isinstance(value, datetime):
+ dt = value
+ elif isinstance(value, str):
+ text = value.replace("Z", "+00:00")
+ try:
+ dt = datetime.fromisoformat(text)
+ except ValueError:
+ return None
+ else:
+ return None
+
+ if dt.tzinfo is None:
+ return dt.replace(tzinfo=UTC)
+ return dt.astimezone(UTC)
+
+
+def upgrade() -> None:
+ conn = op.get_bind()
+ op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
+ op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
+
+ node = sa.table(
+ "node",
+ sa.column("id", sa.String()),
+ sa.column("last_update", sa.DateTime()),
+ sa.column("last_update_us", sa.BigInteger()),
+ )
+
+ rows = conn.execute(sa.select(node.c.id, node.c.last_update)).all()
+ for node_id, last_update in rows:
+ dt = _parse_datetime(last_update)
+ if dt is None:
+ continue
+ last_update_us = int(dt.timestamp() * 1_000_000)
+ conn.execute(
+ sa.update(node).where(node.c.id == node_id).values(last_update_us=last_update_us)
+ )
+
+ if conn.dialect.name == "sqlite":
+ with op.batch_alter_table("node", schema=None) as batch_op:
+ batch_op.drop_column("last_update")
+ else:
+ op.drop_column("node", "last_update")
+
+
+def downgrade() -> None:
+ conn = op.get_bind()
+ op.add_column("node", sa.Column("last_update", sa.DateTime(), nullable=True))
+
+ node = sa.table(
+ "node",
+ sa.column("id", sa.String()),
+ sa.column("last_update", sa.DateTime()),
+ sa.column("last_update_us", sa.BigInteger()),
+ )
+
+ rows = conn.execute(sa.select(node.c.id, node.c.last_update_us)).all()
+ for node_id, last_update_us in rows:
+ if last_update_us is None:
+ continue
+ dt = datetime.fromtimestamp(last_update_us / 1_000_000, tz=UTC).replace(tzinfo=None)
+ conn.execute(sa.update(node).where(node.c.id == node_id).values(last_update=dt))
+
+ if conn.dialect.name == "sqlite":
+ with op.batch_alter_table("node", schema=None) as batch_op:
+ batch_op.drop_index("idx_node_last_update_us")
+ batch_op.drop_column("last_update_us")
+ else:
+ op.drop_index("idx_node_last_update_us", table_name="node")
+ op.drop_column("node", "last_update_us")
diff --git a/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py b/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py
new file mode 100644
index 0000000..e7bd5a0
--- /dev/null
+++ b/alembic/versions/d4d7b0c2e1a4_drop_last_update_us_from_node.py
@@ -0,0 +1,34 @@
+"""Drop last_update_us from node.
+
+Revision ID: d4d7b0c2e1a4
+Revises: b7c3c2e3a1f0
+Create Date: 2026-01-12 10:20:00.000000
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision: str = "d4d7b0c2e1a4"
+down_revision: str | None = "b7c3c2e3a1f0"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+ conn = op.get_bind()
+ if conn.dialect.name == "sqlite":
+ with op.batch_alter_table("node", schema=None) as batch_op:
+ batch_op.drop_index("idx_node_last_update_us")
+ batch_op.drop_column("last_update_us")
+ else:
+ op.drop_index("idx_node_last_update_us", table_name="node")
+ op.drop_column("node", "last_update_us")
+
+
+def downgrade() -> None:
+ op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
+ op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
diff --git a/meshview/config.py b/meshview/config.py
index e6e6b14..e6d4600 100644
--- a/meshview/config.py
+++ b/meshview/config.py
@@ -6,7 +6,7 @@ parser = argparse.ArgumentParser(description="MeshView Configuration Loader")
parser.add_argument(
"--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)"
)
-args = parser.parse_args()
+args, _ = parser.parse_known_args()
# Initialize config parser
config_parser = configparser.ConfigParser()
diff --git a/meshview/database.py b/meshview/database.py
index 1505905..068adfa 100644
--- a/meshview/database.py
+++ b/meshview/database.py
@@ -1,3 +1,4 @@
+from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from meshview import models
@@ -9,10 +10,19 @@ async_session = None
def init_database(database_connection_string):
global engine, async_session
kwargs = {"echo": False}
- # Ensure SQLite is opened in read-only mode
- database_connection_string += "?mode=ro"
- kwargs["connect_args"] = {"uri": True}
- engine = create_async_engine(database_connection_string, **kwargs)
+ url = make_url(database_connection_string)
+ connect_args = {}
+
+ if url.drivername.startswith("sqlite"):
+ query = dict(url.query)
+ query.setdefault("mode", "ro")
+ url = url.set(query=query)
+ connect_args["uri"] = True
+
+ if connect_args:
+ kwargs["connect_args"] = connect_args
+
+ engine = create_async_engine(url, **kwargs)
async_session = async_sessionmaker(
bind=engine,
class_=AsyncSession,
diff --git a/meshview/migrations.py b/meshview/migrations.py
index e76eeda..2ed5a50 100644
--- a/meshview/migrations.py
+++ b/meshview/migrations.py
@@ -186,19 +186,24 @@ async def create_migration_status_table(engine: AsyncEngine) -> None:
text("""
CREATE TABLE IF NOT EXISTS migration_status (
id INTEGER PRIMARY KEY CHECK (id = 1),
- in_progress BOOLEAN NOT NULL DEFAULT 0,
+ in_progress BOOLEAN NOT NULL DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
)
- # Insert initial row if not exists
- await conn.execute(
+ result = await conn.execute(
text("""
- INSERT OR IGNORE INTO migration_status (id, in_progress)
- VALUES (1, 0)
+ SELECT 1 FROM migration_status WHERE id = 1
""")
)
+ if result.first() is None:
+ await conn.execute(
+ text("""
+ INSERT INTO migration_status (id, in_progress)
+ VALUES (1, FALSE)
+ """)
+ )
async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None:
diff --git a/meshview/models.py b/meshview/models.py
index cbe6c22..759d471 100644
--- a/meshview/models.py
+++ b/meshview/models.py
@@ -1,5 +1,3 @@
-from datetime import datetime
-
from sqlalchemy import BigInteger, ForeignKey, Index, desc
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@@ -22,7 +20,6 @@ class Node(Base):
last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True)
last_long: Mapped[int] = mapped_column(BigInteger, nullable=True)
channel: Mapped[str] = mapped_column(nullable=True)
- last_update: Mapped[datetime] = mapped_column(nullable=True)
first_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
last_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
@@ -33,11 +30,7 @@ class Node(Base):
)
def to_dict(self):
- return {
- column.name: getattr(self, column.name)
- for column in self.__table__.columns
- if column.name != "last_update"
- }
+ return {column.name: getattr(self, column.name) for column in self.__table__.columns}
class Packet(Base):
diff --git a/meshview/mqtt_database.py b/meshview/mqtt_database.py
index 6b5e004..e74b9b0 100644
--- a/meshview/mqtt_database.py
+++ b/meshview/mqtt_database.py
@@ -1,3 +1,4 @@
+from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from meshview import models
@@ -5,9 +6,11 @@ from meshview import models
def init_database(database_connection_string):
global engine, async_session
- engine = create_async_engine(
- database_connection_string, echo=False, connect_args={"timeout": 900}
- )
+ url = make_url(database_connection_string)
+ kwargs = {"echo": False}
+ if url.drivername.startswith("sqlite"):
+ kwargs["connect_args"] = {"timeout": 900}
+ engine = create_async_engine(url, **kwargs)
async_session = async_sessionmaker(engine, expire_on_commit=False)
diff --git a/meshview/mqtt_store.py b/meshview/mqtt_store.py
index d22c3f5..97018d4 100644
--- a/meshview/mqtt_store.py
+++ b/meshview/mqtt_store.py
@@ -1,8 +1,12 @@
import datetime
+import logging
import re
+import time
from sqlalchemy import select
+from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
+from sqlalchemy.exc import IntegrityError
from meshtastic.protobuf.config_pb2 import Config
from meshtastic.protobuf.mesh_pb2 import HardwareModel
@@ -10,6 +14,8 @@ from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import decode_payload, mqtt_database
from meshview.models import Node, Packet, PacketSeen, Traceroute
+logger = logging.getLogger(__name__)
+
async def process_envelope(topic, env):
# MAP_REPORT_APP
@@ -37,8 +43,7 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.node_id == node_id))
).scalar_one_or_none()
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
+ now_us = int(time.time() * 1_000_000)
if node:
node.node_id = node_id
@@ -50,7 +55,6 @@ async def process_envelope(topic, env):
node.last_lat = map_report.latitude_i
node.last_long = map_report.longitude_i
node.firmware = map_report.firmware_version
- node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -66,7 +70,6 @@ async def process_envelope(topic, env):
firmware=map_report.firmware_version,
last_lat=map_report.latitude_i,
last_long=map_report.longitude_i,
- last_update=now,
first_seen_us=now_us,
last_seen_us=now_us,
)
@@ -84,22 +87,41 @@ async def process_envelope(topic, env):
result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
packet = result.scalar_one_or_none()
if not packet:
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
- stmt = (
- sqlite_insert(Packet)
- .values(
- id=env.packet.id,
- portnum=env.packet.decoded.portnum,
- from_node_id=getattr(env.packet, "from"),
- to_node_id=env.packet.to,
- payload=env.packet.SerializeToString(),
- import_time_us=now_us,
- channel=env.channel_id,
+ now_us = int(time.time() * 1_000_000)
+ packet_values = {
+ "id": env.packet.id,
+ "portnum": env.packet.decoded.portnum,
+ "from_node_id": getattr(env.packet, "from"),
+ "to_node_id": env.packet.to,
+ "payload": env.packet.SerializeToString(),
+ "import_time_us": now_us,
+ "channel": env.channel_id,
+ }
+ utc_time = datetime.datetime.fromtimestamp(now_us / 1_000_000, datetime.UTC)
+ dialect = session.get_bind().dialect.name
+ stmt = None
+ if dialect == "sqlite":
+ stmt = (
+ sqlite_insert(Packet)
+ .values(**packet_values)
+ .on_conflict_do_nothing(index_elements=["id"])
)
- .on_conflict_do_nothing(index_elements=["id"])
- )
- await session.execute(stmt)
+ elif dialect == "postgresql":
+ stmt = (
+ pg_insert(Packet)
+ .values(**packet_values)
+ .on_conflict_do_nothing(index_elements=["id"])
+ )
+
+ if stmt is not None:
+ await session.execute(stmt)
+ else:
+ try:
+ async with session.begin_nested():
+ session.add(Packet(**packet_values))
+ await session.flush()
+ except IntegrityError:
+ pass
# --- PacketSeen (no conflict handling here, normal insert)
@@ -118,8 +140,7 @@ async def process_envelope(topic, env):
)
)
if not result.scalar_one_or_none():
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
+ now_us = int(time.time() * 1_000_000)
seen = PacketSeen(
packet_id=env.packet.id,
node_id=int(env.gateway_id[1:], 16),
@@ -161,8 +182,7 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.id == user.id))
).scalar_one_or_none()
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
+ now_us = int(time.time() * 1_000_000)
if node:
node.node_id = node_id
@@ -171,7 +191,6 @@ async def process_envelope(topic, env):
node.hw_model = hw_model
node.role = role
node.channel = env.channel_id
- node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -184,7 +203,6 @@ async def process_envelope(topic, env):
hw_model=hw_model,
role=role,
channel=env.channel_id,
- last_update=now,
first_seen_us=now_us,
last_seen_us=now_us,
)
@@ -203,11 +221,9 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.node_id == from_node_id))
).scalar_one_or_none()
if node:
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
+ now_us = int(time.time() * 1_000_000)
node.last_lat = position.latitude_i
node.last_long = position.longitude_i
- node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -217,8 +233,7 @@ async def process_envelope(topic, env):
if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
packet_id = env.packet.id
if packet_id is not None:
- now = datetime.datetime.now(datetime.UTC)
- now_us = int(now.timestamp() * 1_000_000)
+ now_us = int(time.time() * 1_000_000)
session.add(
Traceroute(
packet_id=packet_id,
diff --git a/meshview/static/kiosk.html b/meshview/static/kiosk.html
index 002d0f3..799950a 100644
--- a/meshview/static/kiosk.html
+++ b/meshview/static/kiosk.html
@@ -75,8 +75,8 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
return color;
}
- function timeAgo(dateStr){
- const diff = Date.now() - new Date(dateStr);
+ function timeAgoFromUs(us){
+ const diff = Date.now() - (us / 1000);
const s=Math.floor(diff/1000), m=Math.floor(s/60), h=Math.floor(m/60), d=Math.floor(h/24);
if(d>0) return d+'d'; if(h>0) return h+'h'; if(m>0) return m+'m'; return s+'s';
}
@@ -118,7 +118,7 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
Channel: ${node.channel}
Model: ${node.hw_model}
Role: ${node.role}
`;
- if(node.last_update) popupContent+=`Last seen: ${timeAgo(node.last_update)}
`;
+ if(node.last_seen_us) popupContent+=`Last seen: ${timeAgoFromUs(node.last_seen_us)}
`;
if(node.firmware) popupContent+=`Firmware: ${node.firmware}
`;
marker.on('click', e=>{
diff --git a/meshview/store.py b/meshview/store.py
index 18072b7..8d7fac6 100644
--- a/meshview/store.py
+++ b/meshview/store.py
@@ -1,11 +1,14 @@
-from datetime import datetime, timedelta
+import logging
+from datetime import datetime, timedelta, timezone
-from sqlalchemy import Text, and_, cast, func, or_, select, text
+from sqlalchemy import Text, and_, cast, func, or_, select
from sqlalchemy.orm import lazyload
from meshview import database, models
from meshview.models import Node, Packet, PacketSeen, Traceroute
+logger = logging.getLogger(__name__)
+
async def get_node(node_id):
async with database.async_session() as session:
@@ -176,9 +179,9 @@ async def get_mqtt_neighbors(since):
async def get_total_node_count(channel: str = None) -> int:
try:
async with database.async_session() as session:
- q = select(func.count(Node.id)).where(
- Node.last_update > datetime.now() - timedelta(days=1)
- )
+ now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
+ cutoff_us = now_us - 86400 * 1_000_000
+ q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us)
if channel:
q = q.where(Node.channel == channel)
@@ -193,26 +196,32 @@ async def get_total_node_count(channel: str = None) -> int:
async def get_top_traffic_nodes():
try:
async with database.async_session() as session:
- result = await session.execute(
- text("""
- SELECT
- n.node_id,
- n.long_name,
- n.short_name,
- n.channel,
- COUNT(DISTINCT p.id) AS total_packets_sent,
- COUNT(ps.packet_id) AS total_times_seen
- FROM node n
- LEFT JOIN packet p ON n.node_id = p.from_node_id
- AND p.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000
- LEFT JOIN packet_seen ps ON p.id = ps.packet_id
- GROUP BY n.node_id, n.long_name, n.short_name
- HAVING total_packets_sent > 0
- ORDER BY total_times_seen DESC;
- """)
+ now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
+ cutoff_us = now_us - 86400 * 1_000_000
+ total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent")
+ total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen")
+
+ stmt = (
+ select(
+ Node.node_id,
+ Node.long_name,
+ Node.short_name,
+ Node.channel,
+ total_packets_sent,
+ total_times_seen,
+ )
+ .select_from(Node)
+ .outerjoin(
+ Packet,
+ (Packet.from_node_id == Node.node_id) & (Packet.import_time_us >= cutoff_us),
+ )
+ .outerjoin(PacketSeen, PacketSeen.packet_id == Packet.id)
+ .group_by(Node.node_id, Node.long_name, Node.short_name, Node.channel)
+ .having(total_packets_sent > 0)
+ .order_by(total_times_seen.desc())
)
- rows = result.fetchall()
+ rows = (await session.execute(stmt)).all()
nodes = [
{
@@ -235,33 +244,30 @@ async def get_top_traffic_nodes():
async def get_node_traffic(node_id: int):
try:
async with database.async_session() as session:
- result = await session.execute(
- text("""
- SELECT
- node.long_name, packet.portnum,
- COUNT(*) AS packet_count
- FROM packet
- JOIN node ON packet.from_node_id = node.node_id
- WHERE node.node_id = :node_id
- AND packet.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000
- GROUP BY packet.portnum
- ORDER BY packet_count DESC;
- """),
- {"node_id": node_id},
+ now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
+ cutoff_us = now_us - 86400 * 1_000_000
+ packet_count = func.count().label("packet_count")
+
+ stmt = (
+ select(Node.long_name, Packet.portnum, packet_count)
+ .select_from(Packet)
+ .join(Node, Packet.from_node_id == Node.node_id)
+ .where(Node.node_id == node_id)
+ .where(Packet.import_time_us >= cutoff_us)
+ .group_by(Node.long_name, Packet.portnum)
+ .order_by(packet_count.desc())
)
- # Map the result to include node.long_name and packet data
- traffic_data = [
+ result = await session.execute(stmt)
+ return [
{
- "long_name": row[0], # node.long_name
- "portnum": row[1], # packet.portnum
- "packet_count": row[2], # COUNT(*) as packet_count
+ "long_name": row.long_name,
+ "portnum": row.portnum,
+ "packet_count": row.packet_count,
}
for row in result.all()
]
- return traffic_data
-
except Exception as e:
# Log the error or handle it as needed
print(f"Error fetching node traffic: {str(e)}")
@@ -290,7 +296,11 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
# Apply filters based on provided parameters
if node_id is not None:
- query = query.where(Node.node_id == node_id)
+ try:
+ node_id_int = int(node_id)
+ except (TypeError, ValueError):
+ node_id_int = node_id
+ query = query.where(Node.node_id == node_id_int)
if role is not None:
query = query.where(Node.role == role.upper()) # Ensure role is uppercase
if channel is not None:
@@ -299,10 +309,12 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
query = query.where(Node.hw_model == hw_model)
if days_active is not None:
- query = query.where(Node.last_update > datetime.now() - timedelta(days_active))
+ now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
+ cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000)
+ query = query.where(Node.last_seen_us > cutoff_us)
- # Exclude nodes where last_update is an empty string
- query = query.where(Node.last_update != "")
+ # Exclude nodes with missing last_seen_us
+ query = query.where(Node.last_seen_us.is_not(None))
# Order results by long_name in ascending order
query = query.order_by(Node.short_name.asc())
@@ -313,7 +325,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
return nodes # Return the list of nodes
except Exception:
- print("error reading DB") # Consider using logging instead of print
+ logger.exception("error reading DB")
return [] # Return an empty list in case of failure
@@ -325,24 +337,35 @@ async def get_packet_stats(
to_node: int | None = None,
from_node: int | None = None,
):
- now = datetime.now()
+ now = datetime.now(timezone.utc)
if period_type == "hour":
start_time = now - timedelta(hours=length)
- time_format = '%Y-%m-%d %H:00'
+ time_format_sqlite = "%Y-%m-%d %H:00"
+ time_format_pg = "YYYY-MM-DD HH24:00"
elif period_type == "day":
start_time = now - timedelta(days=length)
- time_format = '%Y-%m-%d'
+ time_format_sqlite = "%Y-%m-%d"
+ time_format_pg = "YYYY-MM-DD"
else:
raise ValueError("period_type must be 'hour' or 'day'")
async with database.async_session() as session:
- q = select(
- func.strftime(
- time_format,
+ dialect = session.get_bind().dialect.name
+ if dialect == "postgresql":
+ period_expr = func.to_char(
+ func.to_timestamp(Packet.import_time_us / 1_000_000.0),
+ time_format_pg,
+ )
+ else:
+ period_expr = func.strftime(
+ time_format_sqlite,
func.datetime(Packet.import_time_us / 1_000_000, "unixepoch"),
- ).label('period'),
- func.count().label('count'),
+ )
+
+ q = select(
+ period_expr.label("period"),
+ func.count().label("count"),
).where(Packet.import_time_us >= int(start_time.timestamp() * 1_000_000))
# Filters
diff --git a/meshview/templates/firehose.html b/meshview/templates/firehose.html
index fe30b15..790b4d6 100644
--- a/meshview/templates/firehose.html
+++ b/meshview/templates/firehose.html
@@ -201,13 +201,37 @@ function portLabel(portnum, payload, linksHtml) {
/* ======================================================
TIME FORMAT
====================================================== */
-function formatLocalTime(importTimeUs) {
- const ms = importTimeUs / 1000;
- return new Date(ms).toLocaleTimeString([], {
+function formatTimes(importTimeUs) {
+ const ms = Number(importTimeUs) / 1000;
+ if (!Number.isFinite(ms)) {
+ return { local: "—", utc: "—", epoch: "—" };
+ }
+ const date = new Date(ms);
+ const local = date.toLocaleTimeString([], {
hour: "2-digit",
minute: "2-digit",
- second: "2-digit"
+ second: "2-digit",
+ timeZoneName: "short"
});
+ const utc = date.toLocaleTimeString([], {
+ hour: "2-digit",
+ minute: "2-digit",
+ second: "2-digit",
+ timeZone: "UTC",
+ timeZoneName: "short"
+ });
+ return { local, utc, epoch: String(importTimeUs) };
+}
+
+function logPacketTimes(packet) {
+ const times = formatTimes(packet.import_time_us);
+ console.log(
+ "[firehose] packet time",
+ "id=" + packet.id,
+ "epoch_us=" + times.epoch,
+ "local=" + times.local,
+ "utc=" + times.utc
+ );
}
/* ======================================================
@@ -245,6 +269,7 @@ async function fetchUpdates() {
const list = document.getElementById("packet_list");
for (const pkt of packets.reverse()) {
+ logPacketTimes(pkt);
/* FROM — includes translation */
const from =
@@ -304,7 +329,9 @@ async function fetchUpdates() {
const html = `