mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-03-04 23:27:46 +01:00
Merge branch 'db_updates'
This commit is contained in:
@@ -128,6 +128,10 @@ username =
|
||||
password =
|
||||
|
||||
[database]
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///var/lib/meshview/packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///var/lib/meshview/packets.db
|
||||
```
|
||||
|
||||
|
||||
17
README.md
17
README.md
@@ -275,9 +275,24 @@ password = large4cats
|
||||
# Database Configuration
|
||||
# -------------------------
|
||||
[database]
|
||||
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///packets.db
|
||||
|
||||
> **NOTE (PostgreSQL setup)**
|
||||
> If you want to use PostgreSQL instead of SQLite:
|
||||
>
|
||||
> 1) Install PostgreSQL for your OS.
|
||||
> 2) Create a user and database:
|
||||
> - `CREATE USER meshview WITH PASSWORD 'change_me';`
|
||||
> - `CREATE DATABASE meshview OWNER meshview;`
|
||||
> 3) Update `config.ini`:
|
||||
> - `connection_string = postgresql+asyncpg://meshview:change_me@localhost:5432/meshview`
|
||||
> 4) Initialize the schema:
|
||||
> - `./env/bin/python startdb.py`
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Database Cleanup Configuration
|
||||
|
||||
@@ -8,6 +8,7 @@ Create Date: 2026-01-09 09:55:00.000000
|
||||
from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
|
||||
94
alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
Normal file
94
alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Add last_update_us to node and migrate data.
|
||||
|
||||
Revision ID: b7c3c2e3a1f0
|
||||
Revises: 9f3b1a8d2c4f
|
||||
Create Date: 2026-01-12 10:12:00.000000
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "b7c3c2e3a1f0"
|
||||
down_revision: str | None = "9f3b1a8d2c4f"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def _parse_datetime(value):
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
dt = value
|
||||
elif isinstance(value, str):
|
||||
text = value.replace("Z", "+00:00")
|
||||
try:
|
||||
dt = datetime.fromisoformat(text)
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
if dt.tzinfo is None:
|
||||
return dt.replace(tzinfo=UTC)
|
||||
return dt.astimezone(UTC)
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
|
||||
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
|
||||
|
||||
node = sa.table(
|
||||
"node",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("last_update", sa.DateTime()),
|
||||
sa.column("last_update_us", sa.BigInteger()),
|
||||
)
|
||||
|
||||
rows = conn.execute(sa.select(node.c.id, node.c.last_update)).all()
|
||||
for node_id, last_update in rows:
|
||||
dt = _parse_datetime(last_update)
|
||||
if dt is None:
|
||||
continue
|
||||
last_update_us = int(dt.timestamp() * 1_000_000)
|
||||
conn.execute(
|
||||
sa.update(node).where(node.c.id == node_id).values(last_update_us=last_update_us)
|
||||
)
|
||||
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_column("last_update")
|
||||
else:
|
||||
op.drop_column("node", "last_update")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
op.add_column("node", sa.Column("last_update", sa.DateTime(), nullable=True))
|
||||
|
||||
node = sa.table(
|
||||
"node",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("last_update", sa.DateTime()),
|
||||
sa.column("last_update_us", sa.BigInteger()),
|
||||
)
|
||||
|
||||
rows = conn.execute(sa.select(node.c.id, node.c.last_update_us)).all()
|
||||
for node_id, last_update_us in rows:
|
||||
if last_update_us is None:
|
||||
continue
|
||||
dt = datetime.fromtimestamp(last_update_us / 1_000_000, tz=UTC).replace(tzinfo=None)
|
||||
conn.execute(sa.update(node).where(node.c.id == node_id).values(last_update=dt))
|
||||
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_node_last_update_us")
|
||||
batch_op.drop_column("last_update_us")
|
||||
else:
|
||||
op.drop_index("idx_node_last_update_us", table_name="node")
|
||||
op.drop_column("node", "last_update_us")
|
||||
@@ -0,0 +1,34 @@
|
||||
"""Drop last_update_us from node.
|
||||
|
||||
Revision ID: d4d7b0c2e1a4
|
||||
Revises: b7c3c2e3a1f0
|
||||
Create Date: 2026-01-12 10:20:00.000000
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "d4d7b0c2e1a4"
|
||||
down_revision: str | None = "b7c3c2e3a1f0"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_node_last_update_us")
|
||||
batch_op.drop_column("last_update_us")
|
||||
else:
|
||||
op.drop_index("idx_node_last_update_us", table_name="node")
|
||||
op.drop_column("node", "last_update_us")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
|
||||
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
|
||||
@@ -6,7 +6,7 @@ parser = argparse.ArgumentParser(description="MeshView Configuration Loader")
|
||||
parser.add_argument(
|
||||
"--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
# Initialize config parser
|
||||
config_parser = configparser.ConfigParser()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from meshview import models
|
||||
@@ -9,10 +10,19 @@ async_session = None
|
||||
def init_database(database_connection_string):
|
||||
global engine, async_session
|
||||
kwargs = {"echo": False}
|
||||
# Ensure SQLite is opened in read-only mode
|
||||
database_connection_string += "?mode=ro"
|
||||
kwargs["connect_args"] = {"uri": True}
|
||||
engine = create_async_engine(database_connection_string, **kwargs)
|
||||
url = make_url(database_connection_string)
|
||||
connect_args = {}
|
||||
|
||||
if url.drivername.startswith("sqlite"):
|
||||
query = dict(url.query)
|
||||
query.setdefault("mode", "ro")
|
||||
url = url.set(query=query)
|
||||
connect_args["uri"] = True
|
||||
|
||||
if connect_args:
|
||||
kwargs["connect_args"] = connect_args
|
||||
|
||||
engine = create_async_engine(url, **kwargs)
|
||||
async_session = async_sessionmaker(
|
||||
bind=engine,
|
||||
class_=AsyncSession,
|
||||
|
||||
@@ -186,19 +186,24 @@ async def create_migration_status_table(engine: AsyncEngine) -> None:
|
||||
text("""
|
||||
CREATE TABLE IF NOT EXISTS migration_status (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
in_progress BOOLEAN NOT NULL DEFAULT 0,
|
||||
in_progress BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
)
|
||||
|
||||
# Insert initial row if not exists
|
||||
await conn.execute(
|
||||
result = await conn.execute(
|
||||
text("""
|
||||
INSERT OR IGNORE INTO migration_status (id, in_progress)
|
||||
VALUES (1, 0)
|
||||
SELECT 1 FROM migration_status WHERE id = 1
|
||||
""")
|
||||
)
|
||||
if result.first() is None:
|
||||
await conn.execute(
|
||||
text("""
|
||||
INSERT INTO migration_status (id, in_progress)
|
||||
VALUES (1, FALSE)
|
||||
""")
|
||||
)
|
||||
|
||||
|
||||
async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None:
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, ForeignKey, Index, desc
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||
@@ -22,7 +20,6 @@ class Node(Base):
|
||||
last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
last_long: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
channel: Mapped[str] = mapped_column(nullable=True)
|
||||
last_update: Mapped[datetime] = mapped_column(nullable=True)
|
||||
first_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
last_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
|
||||
@@ -33,11 +30,7 @@ class Node(Base):
|
||||
)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
column.name: getattr(self, column.name)
|
||||
for column in self.__table__.columns
|
||||
if column.name != "last_update"
|
||||
}
|
||||
return {column.name: getattr(self, column.name) for column in self.__table__.columns}
|
||||
|
||||
|
||||
class Packet(Base):
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from meshview import models
|
||||
@@ -5,9 +6,11 @@ from meshview import models
|
||||
|
||||
def init_database(database_connection_string):
|
||||
global engine, async_session
|
||||
engine = create_async_engine(
|
||||
database_connection_string, echo=False, connect_args={"timeout": 900}
|
||||
)
|
||||
url = make_url(database_connection_string)
|
||||
kwargs = {"echo": False}
|
||||
if url.drivername.startswith("sqlite"):
|
||||
kwargs["connect_args"] = {"timeout": 900}
|
||||
engine = create_async_engine(url, **kwargs)
|
||||
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import datetime
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from meshtastic.protobuf.config_pb2 import Config
|
||||
from meshtastic.protobuf.mesh_pb2 import HardwareModel
|
||||
@@ -10,6 +14,8 @@ from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshview import decode_payload, mqtt_database
|
||||
from meshview.models import Node, Packet, PacketSeen, Traceroute
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def process_envelope(topic, env):
|
||||
# MAP_REPORT_APP
|
||||
@@ -37,8 +43,7 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.node_id == node_id))
|
||||
).scalar_one_or_none()
|
||||
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
|
||||
if node:
|
||||
node.node_id = node_id
|
||||
@@ -50,7 +55,6 @@ async def process_envelope(topic, env):
|
||||
node.last_lat = map_report.latitude_i
|
||||
node.last_long = map_report.longitude_i
|
||||
node.firmware = map_report.firmware_version
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -66,7 +70,6 @@ async def process_envelope(topic, env):
|
||||
firmware=map_report.firmware_version,
|
||||
last_lat=map_report.latitude_i,
|
||||
last_long=map_report.longitude_i,
|
||||
last_update=now,
|
||||
first_seen_us=now_us,
|
||||
last_seen_us=now_us,
|
||||
)
|
||||
@@ -84,22 +87,41 @@ async def process_envelope(topic, env):
|
||||
result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
|
||||
packet = result.scalar_one_or_none()
|
||||
if not packet:
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
stmt = (
|
||||
sqlite_insert(Packet)
|
||||
.values(
|
||||
id=env.packet.id,
|
||||
portnum=env.packet.decoded.portnum,
|
||||
from_node_id=getattr(env.packet, "from"),
|
||||
to_node_id=env.packet.to,
|
||||
payload=env.packet.SerializeToString(),
|
||||
import_time_us=now_us,
|
||||
channel=env.channel_id,
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
packet_values = {
|
||||
"id": env.packet.id,
|
||||
"portnum": env.packet.decoded.portnum,
|
||||
"from_node_id": getattr(env.packet, "from"),
|
||||
"to_node_id": env.packet.to,
|
||||
"payload": env.packet.SerializeToString(),
|
||||
"import_time_us": now_us,
|
||||
"channel": env.channel_id,
|
||||
}
|
||||
utc_time = datetime.datetime.fromtimestamp(now_us / 1_000_000, datetime.UTC)
|
||||
dialect = session.get_bind().dialect.name
|
||||
stmt = None
|
||||
if dialect == "sqlite":
|
||||
stmt = (
|
||||
sqlite_insert(Packet)
|
||||
.values(**packet_values)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
await session.execute(stmt)
|
||||
elif dialect == "postgresql":
|
||||
stmt = (
|
||||
pg_insert(Packet)
|
||||
.values(**packet_values)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
|
||||
if stmt is not None:
|
||||
await session.execute(stmt)
|
||||
else:
|
||||
try:
|
||||
async with session.begin_nested():
|
||||
session.add(Packet(**packet_values))
|
||||
await session.flush()
|
||||
except IntegrityError:
|
||||
pass
|
||||
|
||||
# --- PacketSeen (no conflict handling here, normal insert)
|
||||
|
||||
@@ -118,8 +140,7 @@ async def process_envelope(topic, env):
|
||||
)
|
||||
)
|
||||
if not result.scalar_one_or_none():
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
seen = PacketSeen(
|
||||
packet_id=env.packet.id,
|
||||
node_id=int(env.gateway_id[1:], 16),
|
||||
@@ -161,8 +182,7 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.id == user.id))
|
||||
).scalar_one_or_none()
|
||||
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
|
||||
if node:
|
||||
node.node_id = node_id
|
||||
@@ -171,7 +191,6 @@ async def process_envelope(topic, env):
|
||||
node.hw_model = hw_model
|
||||
node.role = role
|
||||
node.channel = env.channel_id
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -184,7 +203,6 @@ async def process_envelope(topic, env):
|
||||
hw_model=hw_model,
|
||||
role=role,
|
||||
channel=env.channel_id,
|
||||
last_update=now,
|
||||
first_seen_us=now_us,
|
||||
last_seen_us=now_us,
|
||||
)
|
||||
@@ -203,11 +221,9 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.node_id == from_node_id))
|
||||
).scalar_one_or_none()
|
||||
if node:
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
node.last_lat = position.latitude_i
|
||||
node.last_long = position.longitude_i
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -217,8 +233,7 @@ async def process_envelope(topic, env):
|
||||
if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
|
||||
packet_id = env.packet.id
|
||||
if packet_id is not None:
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
session.add(
|
||||
Traceroute(
|
||||
packet_id=packet_id,
|
||||
|
||||
@@ -75,8 +75,8 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
|
||||
return color;
|
||||
}
|
||||
|
||||
function timeAgo(dateStr){
|
||||
const diff = Date.now() - new Date(dateStr);
|
||||
function timeAgoFromUs(us){
|
||||
const diff = Date.now() - (us / 1000);
|
||||
const s=Math.floor(diff/1000), m=Math.floor(s/60), h=Math.floor(m/60), d=Math.floor(h/24);
|
||||
if(d>0) return d+'d'; if(h>0) return h+'h'; if(m>0) return m+'m'; return s+'s';
|
||||
}
|
||||
@@ -118,7 +118,7 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
|
||||
<b>Channel:</b> ${node.channel}<br>
|
||||
<b>Model:</b> ${node.hw_model}<br>
|
||||
<b>Role:</b> ${node.role}<br>`;
|
||||
if(node.last_update) popupContent+=`<b>Last seen:</b> ${timeAgo(node.last_update)}<br>`;
|
||||
if(node.last_seen_us) popupContent+=`<b>Last seen:</b> ${timeAgoFromUs(node.last_seen_us)}<br>`;
|
||||
if(node.firmware) popupContent+=`<b>Firmware:</b> ${node.firmware}<br>`;
|
||||
|
||||
marker.on('click', e=>{
|
||||
|
||||
@@ -1,11 +1,14 @@
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import Text, and_, cast, func, or_, select, text
|
||||
from sqlalchemy import Text, and_, cast, func, or_, select
|
||||
from sqlalchemy.orm import lazyload
|
||||
|
||||
from meshview import database, models
|
||||
from meshview.models import Node, Packet, PacketSeen, Traceroute
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_node(node_id):
|
||||
async with database.async_session() as session:
|
||||
@@ -176,9 +179,9 @@ async def get_mqtt_neighbors(since):
|
||||
async def get_total_node_count(channel: str = None) -> int:
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
q = select(func.count(Node.id)).where(
|
||||
Node.last_update > datetime.now() - timedelta(days=1)
|
||||
)
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us)
|
||||
|
||||
if channel:
|
||||
q = q.where(Node.channel == channel)
|
||||
@@ -193,26 +196,32 @@ async def get_total_node_count(channel: str = None) -> int:
|
||||
async def get_top_traffic_nodes():
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
result = await session.execute(
|
||||
text("""
|
||||
SELECT
|
||||
n.node_id,
|
||||
n.long_name,
|
||||
n.short_name,
|
||||
n.channel,
|
||||
COUNT(DISTINCT p.id) AS total_packets_sent,
|
||||
COUNT(ps.packet_id) AS total_times_seen
|
||||
FROM node n
|
||||
LEFT JOIN packet p ON n.node_id = p.from_node_id
|
||||
AND p.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000
|
||||
LEFT JOIN packet_seen ps ON p.id = ps.packet_id
|
||||
GROUP BY n.node_id, n.long_name, n.short_name
|
||||
HAVING total_packets_sent > 0
|
||||
ORDER BY total_times_seen DESC;
|
||||
""")
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent")
|
||||
total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen")
|
||||
|
||||
stmt = (
|
||||
select(
|
||||
Node.node_id,
|
||||
Node.long_name,
|
||||
Node.short_name,
|
||||
Node.channel,
|
||||
total_packets_sent,
|
||||
total_times_seen,
|
||||
)
|
||||
.select_from(Node)
|
||||
.outerjoin(
|
||||
Packet,
|
||||
(Packet.from_node_id == Node.node_id) & (Packet.import_time_us >= cutoff_us),
|
||||
)
|
||||
.outerjoin(PacketSeen, PacketSeen.packet_id == Packet.id)
|
||||
.group_by(Node.node_id, Node.long_name, Node.short_name, Node.channel)
|
||||
.having(total_packets_sent > 0)
|
||||
.order_by(total_times_seen.desc())
|
||||
)
|
||||
|
||||
rows = result.fetchall()
|
||||
rows = (await session.execute(stmt)).all()
|
||||
|
||||
nodes = [
|
||||
{
|
||||
@@ -235,33 +244,30 @@ async def get_top_traffic_nodes():
|
||||
async def get_node_traffic(node_id: int):
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
result = await session.execute(
|
||||
text("""
|
||||
SELECT
|
||||
node.long_name, packet.portnum,
|
||||
COUNT(*) AS packet_count
|
||||
FROM packet
|
||||
JOIN node ON packet.from_node_id = node.node_id
|
||||
WHERE node.node_id = :node_id
|
||||
AND packet.import_time_us >= (CAST(strftime('%s','now') AS INTEGER) - 86400) * 1000000
|
||||
GROUP BY packet.portnum
|
||||
ORDER BY packet_count DESC;
|
||||
"""),
|
||||
{"node_id": node_id},
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
packet_count = func.count().label("packet_count")
|
||||
|
||||
stmt = (
|
||||
select(Node.long_name, Packet.portnum, packet_count)
|
||||
.select_from(Packet)
|
||||
.join(Node, Packet.from_node_id == Node.node_id)
|
||||
.where(Node.node_id == node_id)
|
||||
.where(Packet.import_time_us >= cutoff_us)
|
||||
.group_by(Node.long_name, Packet.portnum)
|
||||
.order_by(packet_count.desc())
|
||||
)
|
||||
|
||||
# Map the result to include node.long_name and packet data
|
||||
traffic_data = [
|
||||
result = await session.execute(stmt)
|
||||
return [
|
||||
{
|
||||
"long_name": row[0], # node.long_name
|
||||
"portnum": row[1], # packet.portnum
|
||||
"packet_count": row[2], # COUNT(*) as packet_count
|
||||
"long_name": row.long_name,
|
||||
"portnum": row.portnum,
|
||||
"packet_count": row.packet_count,
|
||||
}
|
||||
for row in result.all()
|
||||
]
|
||||
|
||||
return traffic_data
|
||||
|
||||
except Exception as e:
|
||||
# Log the error or handle it as needed
|
||||
print(f"Error fetching node traffic: {str(e)}")
|
||||
@@ -290,7 +296,11 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
|
||||
# Apply filters based on provided parameters
|
||||
if node_id is not None:
|
||||
query = query.where(Node.node_id == node_id)
|
||||
try:
|
||||
node_id_int = int(node_id)
|
||||
except (TypeError, ValueError):
|
||||
node_id_int = node_id
|
||||
query = query.where(Node.node_id == node_id_int)
|
||||
if role is not None:
|
||||
query = query.where(Node.role == role.upper()) # Ensure role is uppercase
|
||||
if channel is not None:
|
||||
@@ -299,10 +309,12 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
query = query.where(Node.hw_model == hw_model)
|
||||
|
||||
if days_active is not None:
|
||||
query = query.where(Node.last_update > datetime.now() - timedelta(days_active))
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000)
|
||||
query = query.where(Node.last_seen_us > cutoff_us)
|
||||
|
||||
# Exclude nodes where last_update is an empty string
|
||||
query = query.where(Node.last_update != "")
|
||||
# Exclude nodes with missing last_seen_us
|
||||
query = query.where(Node.last_seen_us.is_not(None))
|
||||
|
||||
# Order results by long_name in ascending order
|
||||
query = query.order_by(Node.short_name.asc())
|
||||
@@ -313,7 +325,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
return nodes # Return the list of nodes
|
||||
|
||||
except Exception:
|
||||
print("error reading DB") # Consider using logging instead of print
|
||||
logger.exception("error reading DB")
|
||||
return [] # Return an empty list in case of failure
|
||||
|
||||
|
||||
@@ -325,24 +337,35 @@ async def get_packet_stats(
|
||||
to_node: int | None = None,
|
||||
from_node: int | None = None,
|
||||
):
|
||||
now = datetime.now()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if period_type == "hour":
|
||||
start_time = now - timedelta(hours=length)
|
||||
time_format = '%Y-%m-%d %H:00'
|
||||
time_format_sqlite = "%Y-%m-%d %H:00"
|
||||
time_format_pg = "YYYY-MM-DD HH24:00"
|
||||
elif period_type == "day":
|
||||
start_time = now - timedelta(days=length)
|
||||
time_format = '%Y-%m-%d'
|
||||
time_format_sqlite = "%Y-%m-%d"
|
||||
time_format_pg = "YYYY-MM-DD"
|
||||
else:
|
||||
raise ValueError("period_type must be 'hour' or 'day'")
|
||||
|
||||
async with database.async_session() as session:
|
||||
q = select(
|
||||
func.strftime(
|
||||
time_format,
|
||||
dialect = session.get_bind().dialect.name
|
||||
if dialect == "postgresql":
|
||||
period_expr = func.to_char(
|
||||
func.to_timestamp(Packet.import_time_us / 1_000_000.0),
|
||||
time_format_pg,
|
||||
)
|
||||
else:
|
||||
period_expr = func.strftime(
|
||||
time_format_sqlite,
|
||||
func.datetime(Packet.import_time_us / 1_000_000, "unixepoch"),
|
||||
).label('period'),
|
||||
func.count().label('count'),
|
||||
)
|
||||
|
||||
q = select(
|
||||
period_expr.label("period"),
|
||||
func.count().label("count"),
|
||||
).where(Packet.import_time_us >= int(start_time.timestamp() * 1_000_000))
|
||||
|
||||
# Filters
|
||||
|
||||
@@ -201,13 +201,37 @@ function portLabel(portnum, payload, linksHtml) {
|
||||
/* ======================================================
|
||||
TIME FORMAT
|
||||
====================================================== */
|
||||
function formatLocalTime(importTimeUs) {
|
||||
const ms = importTimeUs / 1000;
|
||||
return new Date(ms).toLocaleTimeString([], {
|
||||
function formatTimes(importTimeUs) {
|
||||
const ms = Number(importTimeUs) / 1000;
|
||||
if (!Number.isFinite(ms)) {
|
||||
return { local: "—", utc: "—", epoch: "—" };
|
||||
}
|
||||
const date = new Date(ms);
|
||||
const local = date.toLocaleTimeString([], {
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit"
|
||||
second: "2-digit",
|
||||
timeZoneName: "short"
|
||||
});
|
||||
const utc = date.toLocaleTimeString([], {
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit",
|
||||
timeZone: "UTC",
|
||||
timeZoneName: "short"
|
||||
});
|
||||
return { local, utc, epoch: String(importTimeUs) };
|
||||
}
|
||||
|
||||
function logPacketTimes(packet) {
|
||||
const times = formatTimes(packet.import_time_us);
|
||||
console.log(
|
||||
"[firehose] packet time",
|
||||
"id=" + packet.id,
|
||||
"epoch_us=" + times.epoch,
|
||||
"local=" + times.local,
|
||||
"utc=" + times.utc
|
||||
);
|
||||
}
|
||||
|
||||
/* ======================================================
|
||||
@@ -245,6 +269,7 @@ async function fetchUpdates() {
|
||||
const list = document.getElementById("packet_list");
|
||||
|
||||
for (const pkt of packets.reverse()) {
|
||||
logPacketTimes(pkt);
|
||||
|
||||
/* FROM — includes translation */
|
||||
const from =
|
||||
@@ -304,7 +329,9 @@ async function fetchUpdates() {
|
||||
const html = `
|
||||
<tr class="packet-row">
|
||||
|
||||
<td>${formatLocalTime(pkt.import_time_us)}</td>
|
||||
<td>
|
||||
${formatTimes(pkt.import_time_us).local}<br>
|
||||
</td>
|
||||
|
||||
<td>
|
||||
<span class="toggle-btn">▶</span>
|
||||
|
||||
@@ -140,8 +140,8 @@ map.on("popupopen", function (e) {
|
||||
if (popupEl) applyTranslationsMap(popupEl);
|
||||
});
|
||||
|
||||
function timeAgo(date){
|
||||
const diff = Date.now() - new Date(date);
|
||||
function timeAgoFromUs(us){
|
||||
const diff = Date.now() - (us / 1000);
|
||||
const s = Math.floor(diff/1000), m = Math.floor(s/60),
|
||||
h = Math.floor(m/60), d = Math.floor(h/24);
|
||||
return d>0?d+"d":h>0?h+"h":m>0?m+"m":s+"s";
|
||||
@@ -289,7 +289,7 @@ fetch('/api/nodes?days_active=3')
|
||||
hw_model: n.hw_model || "",
|
||||
role: n.role || "",
|
||||
firmware: n.firmware || "",
|
||||
last_update: n.last_update || "",
|
||||
last_seen_us: n.last_seen_us || null,
|
||||
isRouter: (n.role||"").toLowerCase().includes("router")
|
||||
}));
|
||||
|
||||
@@ -333,8 +333,8 @@ function renderNodesOnMap(){
|
||||
<b data-translate-lang="role_label"></b> ${node.role}<br>
|
||||
|
||||
${
|
||||
node.last_update
|
||||
? `<b data-translate-lang="last_seen"></b> ${timeAgo(node.last_update)}<br>`
|
||||
node.last_seen_us
|
||||
? `<b data-translate-lang="last_seen"></b> ${timeAgoFromUs(node.last_seen_us)}<br>`
|
||||
: ""
|
||||
}
|
||||
|
||||
|
||||
@@ -6,12 +6,15 @@ import logging
|
||||
import os
|
||||
|
||||
from aiohttp import web
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import func, select, text
|
||||
|
||||
from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshview import database, decode_payload, store
|
||||
from meshview.__version__ import __version__, _git_revision_short, get_version_info
|
||||
from meshview.config import CONFIG
|
||||
from meshview.models import Node
|
||||
from meshview.models import Packet as PacketModel
|
||||
from meshview.models import PacketSeen as PacketSeenModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -805,73 +808,54 @@ async def api_stats_top(request):
|
||||
limit = min(int(request.query.get("limit", 20)), 100)
|
||||
offset = int(request.query.get("offset", 0))
|
||||
|
||||
params = {
|
||||
"period_type": period_type,
|
||||
"length": length,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
}
|
||||
multiplier = 3600 if period_type == "hour" else 86400
|
||||
window_us = length * multiplier * 1_000_000
|
||||
|
||||
channel_filter = ""
|
||||
if channel:
|
||||
channel_filter = "AND n.channel = :channel"
|
||||
params["channel"] = channel
|
||||
max_packet_import = select(func.max(PacketModel.import_time_us)).scalar_subquery()
|
||||
max_seen_import = select(func.max(PacketSeenModel.import_time_us)).scalar_subquery()
|
||||
|
||||
sql = f"""
|
||||
WITH sent AS (
|
||||
SELECT
|
||||
p.from_node_id AS node_id,
|
||||
COUNT(*) AS sent
|
||||
FROM packet p
|
||||
WHERE p.import_time_us >= (
|
||||
SELECT MAX(import_time_us) FROM packet
|
||||
) - (
|
||||
CASE
|
||||
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
|
||||
ELSE :length * 86400 * 1000000
|
||||
END
|
||||
)
|
||||
GROUP BY p.from_node_id
|
||||
),
|
||||
seen AS (
|
||||
SELECT
|
||||
p.from_node_id AS node_id,
|
||||
COUNT(*) AS seen
|
||||
FROM packet_seen ps
|
||||
JOIN packet p ON p.id = ps.packet_id
|
||||
WHERE ps.import_time_us >= (
|
||||
SELECT MAX(import_time_us) FROM packet_seen
|
||||
) - (
|
||||
CASE
|
||||
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
|
||||
ELSE :length * 86400 * 1000000
|
||||
END
|
||||
)
|
||||
GROUP BY p.from_node_id
|
||||
sent_cte = (
|
||||
select(PacketModel.from_node_id.label("node_id"), func.count().label("sent"))
|
||||
.where(PacketModel.import_time_us >= max_packet_import - window_us)
|
||||
.group_by(PacketModel.from_node_id)
|
||||
.cte("sent")
|
||||
)
|
||||
SELECT
|
||||
n.node_id,
|
||||
n.long_name,
|
||||
n.short_name,
|
||||
n.channel,
|
||||
COALESCE(s.sent, 0) AS sent,
|
||||
COALESCE(se.seen, 0) AS seen
|
||||
FROM node n
|
||||
LEFT JOIN sent s ON s.node_id = n.node_id
|
||||
LEFT JOIN seen se ON se.node_id = n.node_id
|
||||
WHERE 1=1
|
||||
{channel_filter}
|
||||
ORDER BY seen DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
|
||||
count_sql = f"""
|
||||
SELECT COUNT(*) FROM node n WHERE 1=1 {channel_filter}
|
||||
"""
|
||||
seen_cte = (
|
||||
select(PacketModel.from_node_id.label("node_id"), func.count().label("seen"))
|
||||
.select_from(PacketSeenModel)
|
||||
.join(PacketModel, PacketModel.id == PacketSeenModel.packet_id)
|
||||
.where(PacketSeenModel.import_time_us >= max_seen_import - window_us)
|
||||
.group_by(PacketModel.from_node_id)
|
||||
.cte("seen")
|
||||
)
|
||||
|
||||
query = (
|
||||
select(
|
||||
Node.node_id,
|
||||
Node.long_name,
|
||||
Node.short_name,
|
||||
Node.channel,
|
||||
func.coalesce(sent_cte.c.sent, 0).label("sent"),
|
||||
func.coalesce(seen_cte.c.seen, 0).label("seen"),
|
||||
)
|
||||
.select_from(Node)
|
||||
.outerjoin(sent_cte, sent_cte.c.node_id == Node.node_id)
|
||||
.outerjoin(seen_cte, seen_cte.c.node_id == Node.node_id)
|
||||
.order_by(func.coalesce(seen_cte.c.seen, 0).desc())
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
)
|
||||
|
||||
count_query = select(func.count()).select_from(Node)
|
||||
|
||||
if channel:
|
||||
query = query.where(Node.channel == channel)
|
||||
count_query = count_query.where(Node.channel == channel)
|
||||
|
||||
async with database.async_session() as session:
|
||||
rows = (await session.execute(text(sql), params)).all()
|
||||
total = (await session.execute(text(count_sql), params)).scalar() or 0
|
||||
rows = (await session.execute(query)).all()
|
||||
total = (await session.execute(count_query)).scalar() or 0
|
||||
|
||||
nodes = []
|
||||
for r in rows:
|
||||
|
||||
@@ -81,7 +81,10 @@ password = large4cats
|
||||
# Database Configuration
|
||||
# -------------------------
|
||||
[database]
|
||||
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///packets.db
|
||||
|
||||
|
||||
|
||||
37
startdb.py
37
startdb.py
@@ -7,6 +7,7 @@ import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy.engine.url import make_url
|
||||
|
||||
from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store
|
||||
from meshview.config import CONFIG
|
||||
@@ -65,18 +66,16 @@ async def backup_database(database_url: str, backup_dir: str = ".") -> None:
|
||||
backup_dir: Directory to store backups (default: current directory)
|
||||
"""
|
||||
try:
|
||||
# Extract database file path from connection string
|
||||
# Format: sqlite+aiosqlite:///path/to/db.db
|
||||
if not database_url.startswith("sqlite"):
|
||||
url = make_url(database_url)
|
||||
if not url.drivername.startswith("sqlite"):
|
||||
cleanup_logger.warning("Backup only supported for SQLite databases")
|
||||
return
|
||||
|
||||
db_path = database_url.split("///", 1)[1] if "///" in database_url else None
|
||||
if not db_path:
|
||||
if not url.database or url.database == ":memory:":
|
||||
cleanup_logger.error("Could not extract database path from connection string")
|
||||
return
|
||||
|
||||
db_file = Path(db_path)
|
||||
db_file = Path(url.database)
|
||||
if not db_file.exists():
|
||||
cleanup_logger.error(f"Database file not found: {db_file}")
|
||||
return
|
||||
@@ -153,11 +152,11 @@ async def daily_cleanup_at(
|
||||
cleanup_logger.info("Waiting 60 seconds for backup to complete...")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
# Local-time cutoff as string for SQLite DATETIME comparison
|
||||
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
cleanup_logger.info(f"Running cleanup for records older than {cutoff}...")
|
||||
cutoff_dt = (
|
||||
datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=days_to_keep)
|
||||
).replace(tzinfo=None)
|
||||
cutoff_us = int(cutoff_dt.timestamp() * 1_000_000)
|
||||
cleanup_logger.info(f"Running cleanup for records older than {cutoff_dt.isoformat()}...")
|
||||
|
||||
try:
|
||||
async with db_lock: # Pause ingestion
|
||||
@@ -168,7 +167,7 @@ async def daily_cleanup_at(
|
||||
# Packet
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Packet).where(models.Packet.import_time < cutoff)
|
||||
delete(models.Packet).where(models.Packet.import_time_us < cutoff_us)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet")
|
||||
|
||||
@@ -176,7 +175,9 @@ async def daily_cleanup_at(
|
||||
# PacketSeen
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff)
|
||||
delete(models.PacketSeen).where(
|
||||
models.PacketSeen.import_time_us < cutoff_us
|
||||
)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen")
|
||||
|
||||
@@ -184,7 +185,9 @@ async def daily_cleanup_at(
|
||||
# Traceroute
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Traceroute).where(models.Traceroute.import_time < cutoff)
|
||||
delete(models.Traceroute).where(
|
||||
models.Traceroute.import_time_us < cutoff_us
|
||||
)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute")
|
||||
|
||||
@@ -192,17 +195,19 @@ async def daily_cleanup_at(
|
||||
# Node
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Node).where(models.Node.last_update < cutoff)
|
||||
delete(models.Node).where(models.Node.last_seen_us < cutoff_us)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Node")
|
||||
|
||||
await session.commit()
|
||||
|
||||
if vacuum_db:
|
||||
if vacuum_db and mqtt_database.engine.dialect.name == "sqlite":
|
||||
cleanup_logger.info("Running VACUUM...")
|
||||
async with mqtt_database.engine.begin() as conn:
|
||||
await conn.exec_driver_sql("VACUUM;")
|
||||
cleanup_logger.info("VACUUM completed.")
|
||||
elif vacuum_db:
|
||||
cleanup_logger.info("VACUUM skipped (not supported for this database).")
|
||||
|
||||
cleanup_logger.info("Cleanup completed successfully.")
|
||||
cleanup_logger.info("Ingestion resumed after cleanup.")
|
||||
|
||||
Reference in New Issue
Block a user