Added DB configuration and snapshot for basic detaisl

This commit is contained in:
pablorevilla-meshtastic
2026-03-05 09:57:34 -08:00
parent 432e9f2b67
commit e0023bc8eb
9 changed files with 264 additions and 7 deletions
@@ -0,0 +1,35 @@
"""Add daily_snapshot table
Revision ID: 4f1d2a9c8b71
Revises: 23dad03d2e42
Create Date: 2026-03-05 00:00:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "4f1d2a9c8b71"
down_revision: str | None = "23dad03d2e42"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.create_table(
"daily_snapshot",
sa.Column("snapshot_date", sa.Date(), nullable=False),
sa.Column("node_count", sa.BigInteger(), nullable=False),
sa.Column("packet_count", sa.BigInteger(), nullable=False),
sa.Column("gateway_count", sa.BigInteger(), nullable=False),
sa.Column("captured_at_us", sa.BigInteger(), nullable=False),
sa.PrimaryKeyConstraint("snapshot_date"),
)
def downgrade() -> None:
op.drop_table("daily_snapshot")
+1
View File
@@ -127,6 +127,7 @@
"total_gateways": "Total Gateways",
"total_packets": "Total Packets",
"total_packets_seen": "Total Packets Seen",
"daily_snapshot_histogram": "Daily Network Snapshot Histogram (Last 30 Days)",
"packets_per_day_all": "Packets per Day - All Ports (Last 14 Days)",
"packets_per_day_text": "Packets per Day - Text Messages (Port 1, Last 14 Days)",
"packets_per_hour_all": "Packets per Hour - All Ports",
+1
View File
@@ -123,6 +123,7 @@
"total_gateways": "Gateways Totales",
"total_packets": "Paquetes Totales",
"total_packets_seen": "Paquetes Totales Vistos",
"daily_snapshot_histogram": "Histograma Diario de Instantáneas de Red (Últimos 30 Días)",
"packets_per_day_all": "Paquetes por Día - Todos los Puertos (Últimos 14 Días)",
"packets_per_day_text": "Paquetes por Día - Mensajes de Texto (Puerto 1, Últimos 14 Días)",
"packets_per_hour_all": "Paquetes por Hora - Todos los Puertos",
+13 -1
View File
@@ -1,4 +1,6 @@
from sqlalchemy import BigInteger, ForeignKey, Index, desc
from datetime import date
from sqlalchemy import BigInteger, Date, ForeignKey, Index, desc
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@@ -120,3 +122,13 @@ class NodePublicKey(Base):
Index("idx_node_public_key_node_id", "node_id"),
Index("idx_node_public_key_public_key", "public_key"),
)
class DailySnapshot(Base):
__tablename__ = "daily_snapshot"
snapshot_date: Mapped[date] = mapped_column(Date, primary_key=True)
node_count: Mapped[int] = mapped_column(BigInteger, nullable=False)
packet_count: Mapped[int] = mapped_column(BigInteger, nullable=False)
gateway_count: Mapped[int] = mapped_column(BigInteger, nullable=False)
captured_at_us: Mapped[int] = mapped_column(BigInteger, nullable=False)
+59 -2
View File
@@ -1,8 +1,9 @@
import logging
import re
import time
from datetime import UTC, datetime
from sqlalchemy import select, update
from sqlalchemy import func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.exc import IntegrityError
@@ -11,13 +12,69 @@ from meshtastic.protobuf.config_pb2 import Config
from meshtastic.protobuf.mesh_pb2 import HardwareModel
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import decode_payload, mqtt_database
from meshview.models import Node, NodePublicKey, Packet, PacketSeen, Traceroute
from meshview.models import DailySnapshot, Node, NodePublicKey, Packet, PacketSeen, Traceroute
logger = logging.getLogger(__name__)
MQTT_GATEWAY_CACHE: set[int] = set()
async def capture_daily_snapshot() -> None:
today = datetime.now(UTC).date()
async with mqtt_database.async_session() as session:
node_count = (await session.execute(select(func.count()).select_from(Node))).scalar_one()
packet_count = (
await session.execute(select(func.count()).select_from(Packet))
).scalar_one()
gateway_count = (
await session.execute(
select(func.count()).select_from(Node).where(Node.is_mqtt_gateway.is_(True))
)
).scalar_one()
captured_at_us = int(time.time() * 1_000_000)
values = {
"snapshot_date": today,
"node_count": node_count,
"packet_count": packet_count,
"gateway_count": gateway_count,
"captured_at_us": captured_at_us,
}
dialect = session.get_bind().dialect.name
stmt = None
if dialect == "sqlite":
stmt = (
sqlite_insert(DailySnapshot)
.values(**values)
.on_conflict_do_update(index_elements=["snapshot_date"], set_=values)
)
elif dialect == "postgresql":
stmt = (
pg_insert(DailySnapshot)
.values(**values)
.on_conflict_do_update(index_elements=["snapshot_date"], set_=values)
)
if stmt is not None:
await session.execute(stmt)
else:
snapshot = (
await session.execute(
select(DailySnapshot).where(DailySnapshot.snapshot_date == today)
)
).scalar_one_or_none()
if snapshot is None:
session.add(DailySnapshot(**values))
else:
snapshot.node_count = node_count
snapshot.packet_count = packet_count
snapshot.gateway_count = gateway_count
snapshot.captured_at_us = captured_at_us
await session.commit()
async def process_envelope(topic, env):
# MAP_REPORT_APP
if env.packet.decoded.portnum == PortNum.MAP_REPORT_APP:
+76 -3
View File
@@ -118,6 +118,15 @@
</div>
</div>
<div class="card-section">
<p class="section-header" data-translate-lang="daily_snapshot_histogram">
Daily Network Snapshot Histogram (Last 30 Days)
</p>
<button class="expand-btn" data-chart="chart_daily_snapshot" data-translate-lang="expand_chart">Expand Chart</button>
<button class="export-btn" data-chart="chart_daily_snapshot" data-translate-lang="export_csv">Export CSV</button>
<div id="chart_daily_snapshot" class="chart"></div>
</div>
<!-- Daily Charts -->
<div class="card-section">
<p class="section-header" data-translate-lang="packets_per_day_all">
@@ -247,6 +256,15 @@ async function fetchStats(period_type,length,portnum=null,channel=null){
}catch{return [];}
}
async function fetchDailySnapshots(length=30){
try{
const res=await fetch(`/api/snapshots/daily?length=${length}`);
if(!res.ok) return [];
const json=await res.json();
return json.data||[];
}catch{return [];}
}
async function fetchNodes(){
try{
const res=await fetch("/api/nodes");
@@ -365,6 +383,42 @@ function renderPieChart(elId,data,name){
return chart;
}
function renderDailySnapshotChart(domId,data){
const el=document.getElementById(domId);
if(!el) return;
const chart=echarts.init(el);
const labels=data.map(d=>d.date||"");
const nodeCounts=data.map(d=>d.node_count??0);
const packetCounts=data.map(d=>d.packet_count??0);
const gatewayCounts=data.map(d=>d.gateway_count??0);
chart.setOption({
backgroundColor:'#272b2f',
tooltip:{trigger:'axis'},
legend:{
data:['Nodes','Packets','Gateways'],
textStyle:{color:'#ccc'}
},
grid:{left:'6%', right:'6%', bottom:'18%'},
xAxis:{
type:'category',
data:labels,
axisLine:{lineStyle:{color:'#aaa'}},
axisLabel:{rotate:45,color:'#ccc'}
},
yAxis:{
type:'value',
axisLine:{lineStyle:{color:'#aaa'}},
axisLabel:{color:'#ccc'}
},
series:[
{name:'Nodes', type:'bar', data:nodeCounts, itemStyle:{color:'#5e81ac'}},
{name:'Packets', type:'bar', data:packetCounts, itemStyle:{color:'#88c0d0'}},
{name:'Gateways', type:'bar', data:gatewayCounts, itemStyle:{color:'#a3be8c'}}
]
});
return chart;
}
// --- Packet Type Pie Chart ---
async function fetchPacketTypeBreakdown(channel=null) {
@@ -388,6 +442,7 @@ async function fetchPacketTypeBreakdown(channel=null) {
// --- Init ---
let chartHourlyAll, chartPortnum1, chartPortnum3, chartPortnum4, chartPortnum67, chartPortnum70, chartPortnum71;
let chartDailyAll, chartDailyPortnum1;
let chartDailySnapshot;
let chartHwModel, chartRole, chartChannel;
let chartGatewayChannel, chartGatewayRole, chartGatewayFirmware;
let chartPacketTypes;
@@ -403,6 +458,10 @@ async function init(){
select.appendChild(opt);
});
// Daily snapshot histogram
const snapshots = await fetchDailySnapshots(30);
chartDailySnapshot = renderDailySnapshotChart("chart_daily_snapshot", snapshots);
// Daily all ports
const dailyAllData=await fetchStats('day',14);
updateTotalCount('total_daily_all',dailyAllData);
@@ -502,6 +561,7 @@ window.addEventListener('resize',()=>{
chartPortnum67,
chartPortnum70,
chartPortnum71,
chartDailySnapshot,
chartDailyAll,
chartDailyPortnum1,
chartHwModel,
@@ -554,10 +614,23 @@ document.querySelectorAll(".export-btn").forEach(btn=>{
const option=chart.getOption();
let rows=[];
if(option.series[0].type==="bar"||option.series[0].type==="line"){
rows.push(["Period","Count"]);
const xData=option.xAxis[0].data;
const yData=option.series[0].data;
for(let i=0;i<xData.length;i++) rows.push([xData[i],yData[i]]);
const series = option.series || [];
if(series.length > 1){
const headers = ["Period", ...series.map(s => s.name || "Series")];
rows.push(headers);
for(let i=0;i<xData.length;i++){
const row=[xData[i]];
series.forEach(s=>{
row.push((s.data && s.data[i]!==undefined) ? s.data[i] : "");
});
rows.push(row);
}
}else{
rows.push(["Period","Count"]);
const yData=series[0].data;
for(let i=0;i<xData.length;i++) rows.push([xData[i],yData[i]]);
}
}
if(option.series[0].type==="pie"){
rows.push(["Name","Value","Percentage"]);
+43 -1
View File
@@ -13,7 +13,7 @@ from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import database, decode_payload, store
from meshview.__version__ import __version__, _git_revision_short, get_version_info
from meshview.config import CONFIG
from meshview.models import Node, NodePublicKey
from meshview.models import DailySnapshot, Node, NodePublicKey
from meshview.models import Packet as PacketModel
from meshview.models import PacketSeen as PacketSeenModel
from meshview.radio.coverage import (
@@ -452,6 +452,48 @@ async def api_stats_count(request):
return web.json_response({"total_packets": total_packets, "total_seen": total_seen})
@routes.get("/api/snapshots/daily")
async def api_daily_snapshots(request):
length_raw = request.query.get("length", "30")
try:
length = int(length_raw)
except ValueError:
return web.json_response({"error": "length must be an integer"}, status=400)
# Keep query bounded.
length = max(1, min(length, 3650))
try:
async with database.async_session() as session:
rows = (
(
await session.execute(
select(DailySnapshot)
.order_by(DailySnapshot.snapshot_date.desc())
.limit(length)
)
)
.scalars()
.all()
)
rows = list(reversed(rows))
data = [
{
"date": row.snapshot_date.isoformat(),
"node_count": row.node_count,
"packet_count": row.packet_count,
"gateway_count": row.gateway_count,
"captured_at_us": row.captured_at_us,
}
for row in rows
]
return web.json_response({"data": data})
except Exception as e:
logger.error(f"Error in /api/snapshots/daily: {e}")
return web.json_response({"error": "Failed to fetch daily snapshots"}, status=500)
@routes.get("/api/edges")
async def api_edges(request):
since = datetime.datetime.now() - datetime.timedelta(hours=12)
+9
View File
@@ -118,6 +118,15 @@ backup_dir = ./backups
backup_hour = 2
backup_minute = 00
# -------------------------
# Daily Snapshot Configuration
# -------------------------
[snapshot]
# Time to run daily network snapshot capture (24-hour format)
# Defaults are used if this section is omitted.
hour = 1
minute = 00
# -------------------------
# Logging Configuration
+27
View File
@@ -129,6 +129,27 @@ async def daily_backup_at(hour: int = 2, minute: int = 0, backup_dir: str = ".")
await backup_database(database_url, backup_dir)
# -------------------------
# Daily snapshot scheduler
# -------------------------
async def daily_snapshot_at(hour: int = 1, minute: int = 0):
while True:
now = datetime.datetime.now()
next_run = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if next_run <= now:
next_run += datetime.timedelta(days=1)
delay = (next_run - now).total_seconds()
cleanup_logger.info(f"Next daily snapshot scheduled at {next_run}")
await asyncio.sleep(delay)
try:
async with db_lock:
await mqtt_store.capture_daily_snapshot()
cleanup_logger.info("Daily snapshot captured successfully.")
except Exception as e:
cleanup_logger.error(f"Error capturing daily snapshot: {e}")
# -------------------------
# Database cleanup using ORM
# -------------------------
@@ -290,6 +311,8 @@ async def main():
backup_dir = CONFIG.get("cleanup", {}).get("backup_dir", "./backups")
backup_hour = get_int(CONFIG, "cleanup", "backup_hour", cleanup_hour)
backup_minute = get_int(CONFIG, "cleanup", "backup_minute", cleanup_minute)
snapshot_hour = get_int(CONFIG, "snapshot", "hour", 1)
snapshot_minute = get_int(CONFIG, "snapshot", "minute", 0)
logger.info(f"Starting MQTT ingestion from {CONFIG['mqtt']['server']}:{CONFIG['mqtt']['port']}")
if cleanup_enabled:
@@ -300,6 +323,7 @@ async def main():
logger.info(
f"Daily backups enabled: storing in {backup_dir} at {backup_hour:02d}:{backup_minute:02d}"
)
logger.info(f"Daily snapshots enabled: capturing at {snapshot_hour:02d}:{snapshot_minute:02d}")
async with asyncio.TaskGroup() as tg:
tg.create_task(
@@ -329,8 +353,11 @@ async def main():
)
)
tg.create_task(daily_snapshot_at(snapshot_hour, snapshot_minute))
if not cleanup_enabled and not backup_enabled:
cleanup_logger.info("Daily cleanup and backups are both disabled by configuration.")
cleanup_logger.info("Daily snapshots remain enabled by schedule configuration.")
# -------------------------