5 Commits

Author SHA1 Message Date
pablorevilla-meshtastic
c9639d851b Fix Time function on store.py 2026-01-15 08:48:22 -08:00
pablorevilla-meshtastic
fa98f56318 Made a cople of changes to the time handling and database config. 2026-01-12 20:10:19 -08:00
pablorevilla-meshtastic
f85e783e8c Adding code to work with multiple databases types. 2026-01-12 14:18:51 -08:00
pablorevilla-meshtastic
e12e3a2a41 Database change to remove import time columns 2026-01-09 13:30:14 -08:00
pablorevilla-meshtastic
da31794d8d Bump version to 3.0.2 and update release date to 2026-1-9 2026-01-09 11:49:58 -08:00
25 changed files with 859 additions and 549 deletions

1
.gitignore vendored
View File

@@ -45,3 +45,4 @@ __pycache__/
# OS
.DS_Store
Thumbs.db
packets.db-journal

View File

@@ -128,6 +128,10 @@ username =
password =
[database]
# SQLAlchemy async connection string.
# Examples:
# sqlite+aiosqlite:///var/lib/meshview/packets.db
# postgresql+asyncpg://user:pass@host:5432/meshview
connection_string = sqlite+aiosqlite:///var/lib/meshview/packets.db
```

View File

@@ -272,9 +272,24 @@ password = large4cats
# Database Configuration
# -------------------------
[database]
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
# SQLAlchemy async connection string.
# Examples:
# sqlite+aiosqlite:///packets.db
# postgresql+asyncpg://user:pass@host:5432/meshview
connection_string = sqlite+aiosqlite:///packets.db
> **NOTE (PostgreSQL setup)**
> If you want to use PostgreSQL instead of SQLite:
>
> 1) Install PostgreSQL for your OS.
> 2) Create a user and database:
> - `CREATE USER meshview WITH PASSWORD 'change_me';`
> - `CREATE DATABASE meshview OWNER meshview;`
> 3) Update `config.ini`:
> - `connection_string = postgresql+asyncpg://meshview:change_me@localhost:5432/meshview`
> 4) Initialize the schema:
> - `./env/bin/python startdb.py`
# -------------------------
# Database Cleanup Configuration
@@ -493,10 +508,22 @@ sleep 5
echo "Run cleanup..."
# Run cleanup queries
sqlite3 "$DB_FILE" <<EOF
DELETE FROM packet WHERE import_time < datetime('now', '-14 day');
DELETE FROM packet_seen WHERE import_time < datetime('now', '-14 day');
DELETE FROM traceroute WHERE import_time < datetime('now', '-14 day');
DELETE FROM node WHERE last_update < datetime('now', '-14 day') OR last_update IS NULL OR last_update = '';
DELETE FROM packet
WHERE import_time_us IS NOT NULL
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
SELECT 'packet deleted: ' || changes();
DELETE FROM packet_seen
WHERE import_time_us IS NOT NULL
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
SELECT 'packet_seen deleted: ' || changes();
DELETE FROM traceroute
WHERE import_time_us IS NOT NULL
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
SELECT 'traceroute deleted: ' || changes();
DELETE FROM node
WHERE last_seen_us IS NULL
OR last_seen_us < (strftime('%s','now','-14 days') * 1000000);
SELECT 'node deleted: ' || changes();
VACUUM;
EOF

View File

@@ -0,0 +1,65 @@
"""Drop import_time columns.
Revision ID: 9f3b1a8d2c4f
Revises: 2b5a61bb2b75
Create Date: 2026-01-09 09:55:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "9f3b1a8d2c4f"
down_revision: str | None = "2b5a61bb2b75"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
packet_indexes = {idx["name"] for idx in inspector.get_indexes("packet")}
packet_columns = {col["name"] for col in inspector.get_columns("packet")}
with op.batch_alter_table("packet", schema=None) as batch_op:
if "idx_packet_import_time" in packet_indexes:
batch_op.drop_index("idx_packet_import_time")
if "idx_packet_from_node_time" in packet_indexes:
batch_op.drop_index("idx_packet_from_node_time")
if "import_time" in packet_columns:
batch_op.drop_column("import_time")
packet_seen_columns = {col["name"] for col in inspector.get_columns("packet_seen")}
with op.batch_alter_table("packet_seen", schema=None) as batch_op:
if "import_time" in packet_seen_columns:
batch_op.drop_column("import_time")
traceroute_indexes = {idx["name"] for idx in inspector.get_indexes("traceroute")}
traceroute_columns = {col["name"] for col in inspector.get_columns("traceroute")}
with op.batch_alter_table("traceroute", schema=None) as batch_op:
if "idx_traceroute_import_time" in traceroute_indexes:
batch_op.drop_index("idx_traceroute_import_time")
if "import_time" in traceroute_columns:
batch_op.drop_column("import_time")
def downgrade() -> None:
with op.batch_alter_table("traceroute", schema=None) as batch_op:
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
batch_op.create_index("idx_traceroute_import_time", ["import_time"], unique=False)
with op.batch_alter_table("packet_seen", schema=None) as batch_op:
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
with op.batch_alter_table("packet", schema=None) as batch_op:
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
batch_op.create_index("idx_packet_import_time", [sa.text("import_time DESC")], unique=False)
batch_op.create_index(
"idx_packet_from_node_time",
["from_node_id", sa.text("import_time DESC")],
unique=False,
)

View File

@@ -0,0 +1,94 @@
"""Add last_update_us to node and migrate data.
Revision ID: b7c3c2e3a1f0
Revises: 9f3b1a8d2c4f
Create Date: 2026-01-12 10:12:00.000000
"""
from collections.abc import Sequence
from datetime import UTC, datetime
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "b7c3c2e3a1f0"
down_revision: str | None = "9f3b1a8d2c4f"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def _parse_datetime(value):
if value is None:
return None
if isinstance(value, datetime):
dt = value
elif isinstance(value, str):
text = value.replace("Z", "+00:00")
try:
dt = datetime.fromisoformat(text)
except ValueError:
return None
else:
return None
if dt.tzinfo is None:
return dt.replace(tzinfo=UTC)
return dt.astimezone(UTC)
def upgrade() -> None:
conn = op.get_bind()
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
node = sa.table(
"node",
sa.column("id", sa.String()),
sa.column("last_update", sa.DateTime()),
sa.column("last_update_us", sa.BigInteger()),
)
rows = conn.execute(sa.select(node.c.id, node.c.last_update)).all()
for node_id, last_update in rows:
dt = _parse_datetime(last_update)
if dt is None:
continue
last_update_us = int(dt.timestamp() * 1_000_000)
conn.execute(
sa.update(node).where(node.c.id == node_id).values(last_update_us=last_update_us)
)
if conn.dialect.name == "sqlite":
with op.batch_alter_table("node", schema=None) as batch_op:
batch_op.drop_column("last_update")
else:
op.drop_column("node", "last_update")
def downgrade() -> None:
conn = op.get_bind()
op.add_column("node", sa.Column("last_update", sa.DateTime(), nullable=True))
node = sa.table(
"node",
sa.column("id", sa.String()),
sa.column("last_update", sa.DateTime()),
sa.column("last_update_us", sa.BigInteger()),
)
rows = conn.execute(sa.select(node.c.id, node.c.last_update_us)).all()
for node_id, last_update_us in rows:
if last_update_us is None:
continue
dt = datetime.fromtimestamp(last_update_us / 1_000_000, tz=UTC).replace(tzinfo=None)
conn.execute(sa.update(node).where(node.c.id == node_id).values(last_update=dt))
if conn.dialect.name == "sqlite":
with op.batch_alter_table("node", schema=None) as batch_op:
batch_op.drop_index("idx_node_last_update_us")
batch_op.drop_column("last_update_us")
else:
op.drop_index("idx_node_last_update_us", table_name="node")
op.drop_column("node", "last_update_us")

View File

@@ -0,0 +1,34 @@
"""Drop last_update_us from node.
Revision ID: d4d7b0c2e1a4
Revises: b7c3c2e3a1f0
Create Date: 2026-01-12 10:20:00.000000
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "d4d7b0c2e1a4"
down_revision: str | None = "b7c3c2e3a1f0"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
conn = op.get_bind()
if conn.dialect.name == "sqlite":
with op.batch_alter_table("node", schema=None) as batch_op:
batch_op.drop_index("idx_node_last_update_us")
batch_op.drop_column("last_update_us")
else:
op.drop_index("idx_node_last_update_us", table_name="node")
op.drop_column("node", "last_update_us")
def downgrade() -> None:
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)

View File

@@ -1,82 +1,38 @@
# API Documentation
## 1. Chat API
Base URL: `http(s)://<host>`
### GET `/api/chat`
Returns the most recent chat messages.
All endpoints return JSON. Timestamps are either ISO 8601 strings or `*_us` values in
microseconds since epoch.
**Query Parameters**
- `limit` (optional, int): Maximum number of messages to return. Default: `100`.
**Response Example**
```json
{
"packets": [
{
"id": 123,
"import_time": "2025-07-22T12:45:00",
"from_node_id": 987654,
"from_node": "Alice",
"channel": "main",
"payload": "Hello, world!"
}
]
}
```
---
### GET `/api/chat/updates`
Returns chat messages imported after a given timestamp.
**Query Parameters**
- `last_time` (optional, ISO timestamp): Only messages imported after this time are returned.
**Response Example**
```json
{
"packets": [
{
"id": 124,
"import_time": "2025-07-22T12:50:00",
"from_node_id": 987654,
"from_node": "Alice",
"channel": "main",
"payload": "New message!"
}
],
"latest_import_time": "2025-07-22T12:50:00"
}
```
---
## 2. Nodes API
## 1. Nodes API
### GET `/api/nodes`
Returns a list of all nodes, with optional filtering by last seen.
Returns a list of nodes, with optional filtering.
**Query Parameters**
- `hours` (optional, int): Return nodes seen in the last N hours.
- `days` (optional, int): Return nodes seen in the last N days.
- `last_seen_after` (optional, ISO timestamp): Return nodes seen after this time.
Query Parameters
- `node_id` (optional, int): Exact node ID.
- `role` (optional, string): Node role.
- `channel` (optional, string): Channel name.
- `hw_model` (optional, string): Hardware model.
- `days_active` (optional, int): Nodes seen within the last N days.
**Response Example**
Response Example
```json
{
"nodes": [
{
"id": 42,
"node_id": 1234,
"long_name": "Alice",
"short_name": "A",
"channel": "main",
"last_seen": "2025-07-22T12:40:00",
"hardware": "T-Beam",
"hw_model": "T-Beam",
"firmware": "1.2.3",
"role": "client",
"last_lat": 37.7749,
"last_long": -122.4194
"last_lat": 377749000,
"last_long": -1224194000,
"channel": "main",
"last_seen_us": 1736370123456789
}
]
}
@@ -84,45 +40,58 @@ Returns a list of all nodes, with optional filtering by last seen.
---
## 3. Packets API
## 2. Packets API
### GET `/api/packets`
Returns a list of packets with optional filters.
Returns packets with optional filters.
**Query Parameters**
- `limit` (optional, int): Maximum number of packets to return. Default: `200`.
- `since` (optional, ISO timestamp): Only packets imported after this timestamp are returned.
Query Parameters
- `packet_id` (optional, int): Return exactly one packet (overrides other filters).
- `limit` (optional, int): Max packets to return, clamped 1-1000. Default: `50`.
- `since` (optional, int): Only packets imported after this microsecond timestamp.
- `portnum` (optional, int): Filter by port number.
- `contains` (optional, string): Payload substring filter.
- `from_node_id` (optional, int): Filter by sender node ID.
- `to_node_id` (optional, int): Filter by recipient node ID.
- `node_id` (optional, int): Legacy filter matching either from or to node ID.
**Response Example**
Response Example
```json
{
"packets": [
{
"id": 123,
"import_time_us": 1736370123456789,
"channel": "main",
"from_node_id": 5678,
"to_node_id": 91011,
"portnum": 1,
"import_time": "2025-07-22T12:45:00",
"payload": "Hello, Bob!"
"long_name": "Alice",
"payload": "Hello, Bob!",
"to_long_name": "Bob",
"reply_id": 122
}
]
],
"latest_import_time": 1736370123456789
}
```
---
Notes
- For `portnum=1` (text messages), packets are filtered to remove sequence-only payloads.
- `latest_import_time` is returned when available for incremental polling (microseconds).
---
## 4. Channels API
## 3. Channels API
### GET `/api/channels`
Returns a list of channels seen in a given time period.
Returns channels seen in a time period.
**Query Parameters**
- `period_type` (optional, string): Time granularity (`hour` or `day`). Default: `hour`.
Query Parameters
- `period_type` (optional, string): `hour` or `day`. Default: `hour`.
- `length` (optional, int): Number of periods to look back. Default: `24`.
**Response Example**
Response Example
```json
{
"channels": ["LongFast", "MediumFast", "ShortFast"]
@@ -131,29 +100,21 @@ Returns a list of channels seen in a given time period.
---
## 5. Statistics API
## 4. Stats API
### GET `/api/stats`
Returns packet statistics aggregated by time periods, with optional filtering.
Retrieve packet statistics aggregated by time periods, with optional filtering.
---
## Query Parameters
| Parameter | Type | Required | Default | Description |
|--------------|---------|----------|----------|-------------------------------------------------------------------------------------------------|
| `period_type` | string | No | `hour` | Time granularity of the stats. Allowed values: `hour`, `day`. |
| `length` | integer | No | 24 | Number of periods to include (hours or days). |
| `channel` | string | No | — | Filter results by channel name (case-insensitive). |
| `portnum` | integer | No | — | Filter results by port number. |
| `to_node` | integer | No | — | Filter results to packets sent **to** this node ID. |
| `from_node` | integer | No | — | Filter results to packets sent **from** this node ID. |
---
## Response
Query Parameters
- `period_type` (optional, string): `hour` or `day`. Default: `hour`.
- `length` (optional, int): Number of periods to include. Default: `24`.
- `channel` (optional, string): Filter by channel (case-insensitive).
- `portnum` (optional, int): Filter by port number.
- `to_node` (optional, int): Filter by destination node ID.
- `from_node` (optional, int): Filter by source node ID.
- `node` (optional, int): If provided, return combined `sent` and `seen` totals for that node.
Response Example (series)
```json
{
"period_type": "hour",
@@ -163,65 +124,117 @@ Retrieve packet statistics aggregated by time periods, with optional filtering.
"to_node": 12345678,
"from_node": 87654321,
"data": [
{ "period": "2025-08-08 14:00", "count": 10 },
{ "period": "2025-08-08 15:00", "count": 7 }
]
}
```
Response Example (`node` totals)
```json
{
"node_id": 12345678,
"period_type": "hour",
"length": 24,
"sent": 42,
"seen": 58
}
```
---
### GET `/api/stats/count`
Returns total packet counts, optionally filtered.
Query Parameters
- `packet_id` (optional, int): Filter packet_seen by packet ID.
- `period_type` (optional, string): `hour` or `day`.
- `length` (optional, int): Number of periods to include.
- `channel` (optional, string): Filter by channel.
- `from_node` (optional, int): Filter by source node ID.
- `to_node` (optional, int): Filter by destination node ID.
Response Example
```json
{
"total_packets": 12345,
"total_seen": 67890
}
```
---
### GET `/api/stats/top`
Returns nodes sorted by packets seen, with pagination.
Query Parameters
- `period_type` (optional, string): `hour` or `day`. Default: `day`.
- `length` (optional, int): Number of periods to include. Default: `1`.
- `channel` (optional, string): Filter by channel.
- `limit` (optional, int): Max nodes to return. Default: `20`, max `100`.
- `offset` (optional, int): Pagination offset. Default: `0`.
Response Example
```json
{
"total": 250,
"limit": 20,
"offset": 0,
"nodes": [
{
"period": "2025-08-08 14:00",
"count": 10
},
{
"period": "2025-08-08 15:00",
"count": 7
"node_id": 1234,
"long_name": "Alice",
"short_name": "A",
"channel": "main",
"sent": 100,
"seen": 240,
"avg": 2.4
}
// more entries...
]
}
```
---
## 6. Edges API
## 5. Edges API
### GET `/api/edges`
Returns network edges (connections between nodes) based on traceroutes and neighbor info.
Traceroute edges are collected over the last 48 hours. Neighbor edges are based on
port 71 packets.
**Query Parameters**
- `type` (optional, string): Filter by edge type (`traceroute` or `neighbor`). If omitted, returns both types.
Query Parameters
- `type` (optional, string): `traceroute` or `neighbor`. If omitted, returns both.
- `node_id` (optional, int): Filter edges to only those touching a node.
**Response Example**
Response Example
```json
{
"edges": [
{
"from": 12345678,
"to": 87654321,
"type": "traceroute"
},
{
"from": 11111111,
"to": 22222222,
"type": "neighbor"
}
{ "from": 12345678, "to": 87654321, "type": "traceroute" },
{ "from": 11111111, "to": 22222222, "type": "neighbor" }
]
}
```
---
## 7. Configuration API
## 6. Config API
### GET `/api/config`
Returns the current site configuration (safe subset exposed to clients).
Returns a safe subset of server configuration.
**Response Example**
Response Example
```json
{
"site": {
"domain": "meshview.example.com",
"domain": "example.com",
"language": "en",
"title": "Bay Area Mesh",
"message": "Real time data from around the bay area",
"title": "Meshview",
"message": "",
"starting": "/chat",
"nodes": "true",
"conversations": "true",
"chat": "true",
"everything": "true",
"graphs": "true",
"stats": "true",
@@ -236,11 +249,11 @@ Returns the current site configuration (safe subset exposed to clients).
"firehose_interval": 3,
"weekly_net_message": "Weekly Mesh check-in message.",
"net_tag": "#BayMeshNet",
"version": "2.0.8 ~ 10-22-25"
"version": "3.0.0"
},
"mqtt": {
"server": "mqtt.bayme.sh",
"topics": ["msh/US/bayarea/#"]
"server": "mqtt.example.com",
"topics": ["msh/region/#"]
},
"cleanup": {
"enabled": "false",
@@ -254,91 +267,125 @@ Returns the current site configuration (safe subset exposed to clients).
---
## 8. Language/Translations API
## 7. Language API
### GET `/api/lang`
Returns translation strings for the UI.
Returns translation strings.
**Query Parameters**
- `lang` (optional, string): Language code (e.g., `en`, `es`). Defaults to site language setting.
- `section` (optional, string): Specific section to retrieve translations for.
Query Parameters
- `lang` (optional, string): Language code (e.g., `en`, `es`). Default from config or `en`.
- `section` (optional, string): Return only one section (e.g., `nodelist`, `firehose`).
**Response Example (full)**
Response Example
```json
{
"chat": {
"title": "Chat",
"send": "Send"
},
"map": {
"title": "Map",
"zoom_in": "Zoom In"
}
}
```
**Response Example (section-specific)**
Request: `/api/lang?section=chat`
```json
{
"title": "Chat",
"send": "Send"
"title": "Meshview",
"search_placeholder": "Search..."
}
```
---
## 9. Health Check API
## 8. Packets Seen API
### GET `/api/packets_seen/{packet_id}`
Returns packet_seen entries for a packet.
Path Parameters
- `packet_id` (required, int): Packet ID.
Response Example
```json
{
"seen": [
{
"packet_id": 123,
"node_id": 456,
"rx_time": "2025-07-22T12:45:00",
"hop_limit": 7,
"hop_start": 0,
"channel": "main",
"rx_snr": 5.0,
"rx_rssi": -90,
"topic": "msh/region/#",
"import_time_us": 1736370123456789
}
]
}
```
---
## 9. Traceroute API
### GET `/api/traceroute/{packet_id}`
Returns traceroute details and derived paths for a packet.
Path Parameters
- `packet_id` (required, int): Packet ID.
Response Example
```json
{
"packet": {
"id": 123,
"from": 111,
"to": 222,
"channel": "main"
},
"traceroute_packets": [
{
"index": 0,
"gateway_node_id": 333,
"done": true,
"forward_hops": [111, 444, 222],
"reverse_hops": [222, 444, 111]
}
],
"unique_forward_paths": [
{ "path": [111, 444, 222], "count": 2 }
],
"unique_reverse_paths": [
[222, 444, 111]
],
"winning_paths": [
[111, 444, 222]
]
}
```
---
## 10. Health API
### GET `/health`
Health check endpoint for monitoring, load balancers, and orchestration systems.
Returns service health and database status.
**Response Example (Healthy)**
Response Example
```json
{
"status": "healthy",
"timestamp": "2025-11-03T14:30:00.123456Z",
"timestamp": "2025-07-22T12:45:00+00:00",
"version": "3.0.0",
"git_revision": "6416978",
"git_revision": "abc1234",
"database": "connected",
"database_size": "853.03 MB",
"database_size_bytes": 894468096
}
```
**Response Example (Unhealthy)**
Status Code: `503 Service Unavailable`
```json
{
"status": "unhealthy",
"timestamp": "2025-11-03T14:30:00.123456Z",
"version": "2.0.8",
"git_revision": "6416978",
"database": "disconnected"
"database_size": "12.34 MB",
"database_size_bytes": 12939444
}
```
---
## 10. Version API
## 11. Version API
### GET `/version`
Returns detailed version information including semver, release date, and git revision.
Returns version metadata.
**Response Example**
Response Example
```json
{
"version": "2.0.8",
"release_date": "2025-10-22",
"git_revision": "6416978a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q",
"git_revision_short": "6416978"
"version": "3.0.0",
"git_revision": "abc1234",
"build_time": "2025-11-01T12:00:00+00:00"
}
```
---
## Notes
- All timestamps (`import_time`, `last_seen`) are returned in ISO 8601 format.
- `portnum` is an integer representing the packet type.
- `payload` is always a UTF-8 decoded string.
- Node IDs are integers (e.g., `12345678`).

View File

@@ -3,8 +3,8 @@
import subprocess
from pathlib import Path
__version__ = "3.0.1"
__release_date__ = "2025-12-4"
__version__ = "3.0.2"
__release_date__ = "2026-1-9"
def get_git_revision():

View File

@@ -6,7 +6,7 @@ parser = argparse.ArgumentParser(description="MeshView Configuration Loader")
parser.add_argument(
"--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)"
)
args = parser.parse_args()
args, _ = parser.parse_known_args()
# Initialize config parser
config_parser = configparser.ConfigParser()

View File

@@ -1,3 +1,4 @@
from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from meshview import models
@@ -9,10 +10,19 @@ async_session = None
def init_database(database_connection_string):
global engine, async_session
kwargs = {"echo": False}
# Ensure SQLite is opened in read-only mode
database_connection_string += "?mode=ro"
kwargs["connect_args"] = {"uri": True}
engine = create_async_engine(database_connection_string, **kwargs)
url = make_url(database_connection_string)
connect_args = {}
if url.drivername.startswith("sqlite"):
query = dict(url.query)
query.setdefault("mode", "ro")
url = url.set(query=query)
connect_args["uri"] = True
if connect_args:
kwargs["connect_args"] = connect_args
engine = create_async_engine(url, **kwargs)
async_session = async_sessionmaker(
bind=engine,
class_=AsyncSession,

View File

@@ -186,19 +186,24 @@ async def create_migration_status_table(engine: AsyncEngine) -> None:
text("""
CREATE TABLE IF NOT EXISTS migration_status (
id INTEGER PRIMARY KEY CHECK (id = 1),
in_progress BOOLEAN NOT NULL DEFAULT 0,
in_progress BOOLEAN NOT NULL DEFAULT FALSE,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
)
# Insert initial row if not exists
await conn.execute(
result = await conn.execute(
text("""
INSERT OR IGNORE INTO migration_status (id, in_progress)
VALUES (1, 0)
SELECT 1 FROM migration_status WHERE id = 1
""")
)
if result.first() is None:
await conn.execute(
text("""
INSERT INTO migration_status (id, in_progress)
VALUES (1, FALSE)
""")
)
async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None:

View File

@@ -1,5 +1,3 @@
from datetime import datetime
from sqlalchemy import BigInteger, ForeignKey, Index, desc
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
@@ -22,7 +20,6 @@ class Node(Base):
last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True)
last_long: Mapped[int] = mapped_column(BigInteger, nullable=True)
channel: Mapped[str] = mapped_column(nullable=True)
last_update: Mapped[datetime] = mapped_column(nullable=True)
first_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
last_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
@@ -33,11 +30,7 @@ class Node(Base):
)
def to_dict(self):
return {
column.name: getattr(self, column.name)
for column in self.__table__.columns
if column.name != "last_update"
}
return {column.name: getattr(self, column.name) for column in self.__table__.columns}
class Packet(Base):
@@ -55,17 +48,13 @@ class Packet(Base):
overlaps="from_node",
)
payload: Mapped[bytes] = mapped_column(nullable=True)
import_time: Mapped[datetime] = mapped_column(nullable=True)
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
channel: Mapped[str] = mapped_column(nullable=True)
__table_args__ = (
Index("idx_packet_from_node_id", "from_node_id"),
Index("idx_packet_to_node_id", "to_node_id"),
Index("idx_packet_import_time", desc("import_time")),
Index("idx_packet_import_time_us", desc("import_time_us")),
# Composite index for /top endpoint performance - filters by from_node_id AND import_time
Index("idx_packet_from_node_time", "from_node_id", desc("import_time")),
Index("idx_packet_from_node_time_us", "from_node_id", desc("import_time_us")),
)
@@ -86,7 +75,6 @@ class PacketSeen(Base):
rx_snr: Mapped[float] = mapped_column(nullable=True)
rx_rssi: Mapped[int] = mapped_column(nullable=True)
topic: Mapped[str] = mapped_column(nullable=True)
import_time: Mapped[datetime] = mapped_column(nullable=True)
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
__table_args__ = (
@@ -108,11 +96,7 @@ class Traceroute(Base):
gateway_node_id: Mapped[int] = mapped_column(BigInteger, nullable=True)
done: Mapped[bool] = mapped_column(nullable=True)
route: Mapped[bytes] = mapped_column(nullable=True)
import_time: Mapped[datetime] = mapped_column(nullable=True)
route_return: Mapped[bytes] = mapped_column(nullable=True)
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
__table_args__ = (
Index("idx_traceroute_import_time", "import_time"),
Index("idx_traceroute_import_time_us", "import_time_us"),
)
__table_args__ = (Index("idx_traceroute_import_time_us", "import_time_us"),)

View File

@@ -1,3 +1,4 @@
from sqlalchemy.engine.url import make_url
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from meshview import models
@@ -5,9 +6,11 @@ from meshview import models
def init_database(database_connection_string):
global engine, async_session
engine = create_async_engine(
database_connection_string, echo=False, connect_args={"timeout": 900}
)
url = make_url(database_connection_string)
kwargs = {"echo": False}
if url.drivername.startswith("sqlite"):
kwargs["connect_args"] = {"timeout": 900}
engine = create_async_engine(url, **kwargs)
async_session = async_sessionmaker(engine, expire_on_commit=False)

View File

@@ -1,8 +1,12 @@
import datetime
import logging
import re
import time
from sqlalchemy import select
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.exc import IntegrityError
from meshtastic.protobuf.config_pb2 import Config
from meshtastic.protobuf.mesh_pb2 import HardwareModel
@@ -10,6 +14,8 @@ from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import decode_payload, mqtt_database
from meshview.models import Node, Packet, PacketSeen, Traceroute
logger = logging.getLogger(__name__)
async def process_envelope(topic, env):
# MAP_REPORT_APP
@@ -37,8 +43,7 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.node_id == node_id))
).scalar_one_or_none()
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
now_us = int(time.time() * 1_000_000)
if node:
node.node_id = node_id
@@ -50,7 +55,6 @@ async def process_envelope(topic, env):
node.last_lat = map_report.latitude_i
node.last_long = map_report.longitude_i
node.firmware = map_report.firmware_version
node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -66,7 +70,6 @@ async def process_envelope(topic, env):
firmware=map_report.firmware_version,
last_lat=map_report.latitude_i,
last_long=map_report.longitude_i,
last_update=now,
first_seen_us=now_us,
last_seen_us=now_us,
)
@@ -84,24 +87,41 @@ async def process_envelope(topic, env):
result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
packet = result.scalar_one_or_none()
if not packet:
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
stmt = (
sqlite_insert(Packet)
.values(
id=env.packet.id,
portnum=env.packet.decoded.portnum,
from_node_id=getattr(env.packet, "from"),
to_node_id=env.packet.to,
payload=env.packet.SerializeToString(),
import_time=now,
import_time_us=now_us,
channel=env.channel_id,
now_us = int(time.time() * 1_000_000)
packet_values = {
"id": env.packet.id,
"portnum": env.packet.decoded.portnum,
"from_node_id": getattr(env.packet, "from"),
"to_node_id": env.packet.to,
"payload": env.packet.SerializeToString(),
"import_time_us": now_us,
"channel": env.channel_id,
}
utc_time = datetime.datetime.fromtimestamp(now_us / 1_000_000, datetime.UTC)
dialect = session.get_bind().dialect.name
stmt = None
if dialect == "sqlite":
stmt = (
sqlite_insert(Packet)
.values(**packet_values)
.on_conflict_do_nothing(index_elements=["id"])
)
.on_conflict_do_nothing(index_elements=["id"])
)
await session.execute(stmt)
elif dialect == "postgresql":
stmt = (
pg_insert(Packet)
.values(**packet_values)
.on_conflict_do_nothing(index_elements=["id"])
)
if stmt is not None:
await session.execute(stmt)
else:
try:
async with session.begin_nested():
session.add(Packet(**packet_values))
await session.flush()
except IntegrityError:
pass
# --- PacketSeen (no conflict handling here, normal insert)
@@ -120,8 +140,7 @@ async def process_envelope(topic, env):
)
)
if not result.scalar_one_or_none():
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
now_us = int(time.time() * 1_000_000)
seen = PacketSeen(
packet_id=env.packet.id,
node_id=int(env.gateway_id[1:], 16),
@@ -132,7 +151,6 @@ async def process_envelope(topic, env):
hop_limit=env.packet.hop_limit,
hop_start=env.packet.hop_start,
topic=topic,
import_time=now,
import_time_us=now_us,
)
session.add(seen)
@@ -164,8 +182,7 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.id == user.id))
).scalar_one_or_none()
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
now_us = int(time.time() * 1_000_000)
if node:
node.node_id = node_id
@@ -174,7 +191,6 @@ async def process_envelope(topic, env):
node.hw_model = hw_model
node.role = role
node.channel = env.channel_id
node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -187,7 +203,6 @@ async def process_envelope(topic, env):
hw_model=hw_model,
role=role,
channel=env.channel_id,
last_update=now,
first_seen_us=now_us,
last_seen_us=now_us,
)
@@ -206,11 +221,9 @@ async def process_envelope(topic, env):
await session.execute(select(Node).where(Node.node_id == from_node_id))
).scalar_one_or_none()
if node:
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
now_us = int(time.time() * 1_000_000)
node.last_lat = position.latitude_i
node.last_long = position.longitude_i
node.last_update = now
node.last_seen_us = now_us
if node.first_seen_us is None:
node.first_seen_us = now_us
@@ -220,18 +233,15 @@ async def process_envelope(topic, env):
if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
packet_id = env.packet.id
if packet_id is not None:
now = datetime.datetime.now(datetime.UTC)
now_us = int(now.timestamp() * 1_000_000)
now_us = int(time.time() * 1_000_000)
session.add(
Traceroute(
packet_id=packet_id,
route=env.packet.decoded.payload,
done=not env.packet.decoded.want_response,
gateway_node_id=int(env.gateway_id[1:], 16),
import_time=now,
import_time_us=now_us,
)
)
await session.commit()

View File

@@ -75,8 +75,8 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
return color;
}
function timeAgo(dateStr){
const diff = Date.now() - new Date(dateStr);
function timeAgoFromUs(us){
const diff = Date.now() - (us / 1000);
const s=Math.floor(diff/1000), m=Math.floor(s/60), h=Math.floor(m/60), d=Math.floor(h/24);
if(d>0) return d+'d'; if(h>0) return h+'h'; if(m>0) return m+'m'; return s+'s';
}
@@ -118,7 +118,7 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
<b>Channel:</b> ${node.channel}<br>
<b>Model:</b> ${node.hw_model}<br>
<b>Role:</b> ${node.role}<br>`;
if(node.last_update) popupContent+=`<b>Last seen:</b> ${timeAgo(node.last_update)}<br>`;
if(node.last_seen_us) popupContent+=`<b>Last seen:</b> ${timeAgoFromUs(node.last_seen_us)}<br>`;
if(node.firmware) popupContent+=`<b>Firmware:</b> ${node.firmware}<br>`;
marker.on('click', e=>{

View File

@@ -0,0 +1,36 @@
// Shared port label/color definitions for UI pages.
window.PORT_LABEL_MAP = {
0: "UNKNOWN",
1: "Text",
3: "Position",
4: "Node Info",
5: "Routing",
6: "Admin",
8: "Waypoint",
35: "Store Forward++",
65: "Store & Forward",
67: "Telemetry",
70: "Traceroute",
71: "Neighbor",
73: "Map Report",
};
window.PORT_COLOR_MAP = {
0: "#6c757d",
1: "#007bff",
3: "#28a745",
4: "#ffc107",
5: "#dc3545",
6: "#20c997",
8: "#fd7e14",
35: "#8bc34a",
65: "#6610f2",
67: "#17a2b8",
70: "#ff4444",
71: "#ff66cc",
73: "#9999ff",
};
// Aliases for pages that expect different names.
window.PORT_MAP = window.PORT_LABEL_MAP;
window.PORT_COLORS = window.PORT_COLOR_MAP;

View File

@@ -1,10 +1,14 @@
from datetime import datetime, timedelta
from sqlalchemy import select, and_, or_, func, cast, Text
import logging
from datetime import datetime, timedelta, timezone
from sqlalchemy import Text, and_, cast, func, or_, select
from sqlalchemy.orm import lazyload
from meshview import database, models
from meshview.models import Node, Packet, PacketSeen, Traceroute
logger = logging.getLogger(__name__)
async def get_node(node_id):
async with database.async_session() as session:
@@ -91,8 +95,10 @@ async def get_packets_from(node_id=None, portnum=None, since=None, limit=500):
if portnum:
q = q.where(Packet.portnum == portnum)
if since:
q = q.where(Packet.import_time > (datetime.now() - since))
result = await session.execute(q.limit(limit).order_by(Packet.import_time.desc()))
now_us = int(datetime.now().timestamp() * 1_000_000)
start_us = now_us - int(since.total_seconds() * 1_000_000)
q = q.where(Packet.import_time_us > start_us)
result = await session.execute(q.limit(limit).order_by(Packet.import_time_us.desc()))
return result.scalars()
@@ -108,7 +114,7 @@ async def get_packets_seen(packet_id):
result = await session.execute(
select(PacketSeen)
.where(PacketSeen.packet_id == packet_id)
.order_by(PacketSeen.import_time.desc())
.order_by(PacketSeen.import_time_us.desc())
)
return result.scalars()
@@ -129,18 +135,21 @@ async def get_traceroute(packet_id):
result = await session.execute(
select(Traceroute)
.where(Traceroute.packet_id == packet_id)
.order_by(Traceroute.import_time)
.order_by(Traceroute.import_time_us)
)
return result.scalars()
async def get_traceroutes(since):
if isinstance(since, datetime):
since_us = int(since.timestamp() * 1_000_000)
else:
since_us = int(since)
async with database.async_session() as session:
stmt = (
select(Traceroute)
.join(Packet)
.where(Traceroute.import_time > since)
.order_by(Traceroute.import_time)
.where(Traceroute.import_time_us > since_us)
.order_by(Traceroute.import_time_us)
)
stream = await session.stream_scalars(stmt)
async for tr in stream:
@@ -148,6 +157,8 @@ async def get_traceroutes(since):
async def get_mqtt_neighbors(since):
now_us = int(datetime.now().timestamp() * 1_000_000)
start_us = now_us - int(since.total_seconds() * 1_000_000)
async with database.async_session() as session:
result = await session.execute(
select(PacketSeen, Packet)
@@ -155,7 +166,7 @@ async def get_mqtt_neighbors(since):
.where(
(PacketSeen.hop_limit == PacketSeen.hop_start)
& (PacketSeen.hop_start != 0)
& (PacketSeen.import_time > (datetime.now() - since))
& (PacketSeen.import_time_us > start_us)
)
.options(
lazyload(Packet.from_node),
@@ -168,9 +179,9 @@ async def get_mqtt_neighbors(since):
async def get_total_node_count(channel: str = None) -> int:
try:
async with database.async_session() as session:
q = select(func.count(Node.id)).where(
Node.last_update > datetime.now() - timedelta(days=1)
)
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
cutoff_us = now_us - 86400 * 1_000_000
q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us)
if channel:
q = q.where(Node.channel == channel)
@@ -185,26 +196,32 @@ async def get_total_node_count(channel: str = None) -> int:
async def get_top_traffic_nodes():
try:
async with database.async_session() as session:
result = await session.execute(
text("""
SELECT
n.node_id,
n.long_name,
n.short_name,
n.channel,
COUNT(DISTINCT p.id) AS total_packets_sent,
COUNT(ps.packet_id) AS total_times_seen
FROM node n
LEFT JOIN packet p ON n.node_id = p.from_node_id
AND p.import_time >= DATETIME('now', 'localtime', '-24 hours')
LEFT JOIN packet_seen ps ON p.id = ps.packet_id
GROUP BY n.node_id, n.long_name, n.short_name
HAVING total_packets_sent > 0
ORDER BY total_times_seen DESC;
""")
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
cutoff_us = now_us - 86400 * 1_000_000
total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent")
total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen")
stmt = (
select(
Node.node_id,
Node.long_name,
Node.short_name,
Node.channel,
total_packets_sent,
total_times_seen,
)
.select_from(Node)
.outerjoin(
Packet,
(Packet.from_node_id == Node.node_id) & (Packet.import_time_us >= cutoff_us),
)
.outerjoin(PacketSeen, PacketSeen.packet_id == Packet.id)
.group_by(Node.node_id, Node.long_name, Node.short_name, Node.channel)
.having(total_packets_sent > 0)
.order_by(total_times_seen.desc())
)
rows = result.fetchall()
rows = (await session.execute(stmt)).all()
nodes = [
{
@@ -227,33 +244,30 @@ async def get_top_traffic_nodes():
async def get_node_traffic(node_id: int):
try:
async with database.async_session() as session:
result = await session.execute(
text("""
SELECT
node.long_name, packet.portnum,
COUNT(*) AS packet_count
FROM packet
JOIN node ON packet.from_node_id = node.node_id
WHERE node.node_id = :node_id
AND packet.import_time >= DATETIME('now', 'localtime', '-24 hours')
GROUP BY packet.portnum
ORDER BY packet_count DESC;
"""),
{"node_id": node_id},
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
cutoff_us = now_us - 86400 * 1_000_000
packet_count = func.count().label("packet_count")
stmt = (
select(Node.long_name, Packet.portnum, packet_count)
.select_from(Packet)
.join(Node, Packet.from_node_id == Node.node_id)
.where(Node.node_id == node_id)
.where(Packet.import_time_us >= cutoff_us)
.group_by(Node.long_name, Packet.portnum)
.order_by(packet_count.desc())
)
# Map the result to include node.long_name and packet data
traffic_data = [
result = await session.execute(stmt)
return [
{
"long_name": row[0], # node.long_name
"portnum": row[1], # packet.portnum
"packet_count": row[2], # COUNT(*) as packet_count
"long_name": row.long_name,
"portnum": row.portnum,
"packet_count": row.packet_count,
}
for row in result.all()
]
return traffic_data
except Exception as e:
# Log the error or handle it as needed
print(f"Error fetching node traffic: {str(e)}")
@@ -282,7 +296,11 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
# Apply filters based on provided parameters
if node_id is not None:
query = query.where(Node.node_id == node_id)
try:
node_id_int = int(node_id)
except (TypeError, ValueError):
node_id_int = node_id
query = query.where(Node.node_id == node_id_int)
if role is not None:
query = query.where(Node.role == role.upper()) # Ensure role is uppercase
if channel is not None:
@@ -291,10 +309,12 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
query = query.where(Node.hw_model == hw_model)
if days_active is not None:
query = query.where(Node.last_update > datetime.now() - timedelta(days_active))
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000)
query = query.where(Node.last_seen_us > cutoff_us)
# Exclude nodes where last_update is an empty string
query = query.where(Node.last_update != "")
# Exclude nodes with missing last_seen_us
query = query.where(Node.last_seen_us.is_not(None))
# Order results by long_name in ascending order
query = query.order_by(Node.short_name.asc())
@@ -305,7 +325,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
return nodes # Return the list of nodes
except Exception:
print("error reading DB") # Consider using logging instead of print
logger.exception("error reading DB")
return [] # Return an empty list in case of failure
@@ -317,22 +337,36 @@ async def get_packet_stats(
to_node: int | None = None,
from_node: int | None = None,
):
now = datetime.now()
now = datetime.now(timezone.utc)
if period_type == "hour":
start_time = now - timedelta(hours=length)
time_format = '%Y-%m-%d %H:00'
time_format_sqlite = "%Y-%m-%d %H:00"
time_format_pg = "YYYY-MM-DD HH24:00"
elif period_type == "day":
start_time = now - timedelta(days=length)
time_format = '%Y-%m-%d'
time_format_sqlite = "%Y-%m-%d"
time_format_pg = "YYYY-MM-DD"
else:
raise ValueError("period_type must be 'hour' or 'day'")
async with database.async_session() as session:
dialect = session.get_bind().dialect.name
if dialect == "postgresql":
period_expr = func.to_char(
func.to_timestamp(Packet.import_time_us / 1_000_000.0),
time_format_pg,
)
else:
period_expr = func.strftime(
time_format_sqlite,
func.datetime(Packet.import_time_us / 1_000_000, "unixepoch"),
)
q = select(
func.strftime(time_format, Packet.import_time).label('period'),
func.count().label('count'),
).where(Packet.import_time >= start_time)
period_expr.label("period"),
func.count().label("count"),
).where(Packet.import_time_us >= int(start_time.timestamp() * 1_000_000))
# Filters
if channel:

View File

@@ -115,6 +115,7 @@
</div>
<script src="/static/portmaps.js"></script>
<script>
/* ======================================================
FIREHOSE TRANSLATION SYSTEM (isolated from base)
@@ -177,41 +178,8 @@ function nodeName(id) {
/* ======================================================
PORT COLORS & NAMES
====================================================== */
const PORT_MAP = {
0: "UNKNOWN APP",
1: "Text Message",
3: "Position",
4: "Node Info",
5: "Routing",
6: "Administration",
8: "Waypoint",
65: "Store Forward",
67: "Telemetry",
70: "Trace Route",
71: "Neighbor Info"
};
const PORT_COLORS = {
0: "#6c757d",
1: "#007bff",
3: "#28a745",
4: "#ffc107",
5: "#dc3545",
6: "#20c997",
65: "#6610f2",
67: "#17a2b8",
68: "#fd7e14",
69: "#6f42c1",
70: "#ff4444",
71: "#ff66cc",
72: "#00cc99",
73: "#9999ff",
74: "#cc00cc",
75: "#ffbb33",
76: "#00bcd4",
77: "#8bc34a",
78: "#795548"
};
const PORT_MAP = window.PORT_MAP || {};
const PORT_COLORS = window.PORT_COLORS || {};
function portLabel(portnum, payload, linksHtml) {
const name = PORT_MAP[portnum] || "Unknown";
@@ -233,13 +201,37 @@ function portLabel(portnum, payload, linksHtml) {
/* ======================================================
TIME FORMAT
====================================================== */
function formatLocalTime(importTimeUs) {
const ms = importTimeUs / 1000;
return new Date(ms).toLocaleTimeString([], {
function formatTimes(importTimeUs) {
const ms = Number(importTimeUs) / 1000;
if (!Number.isFinite(ms)) {
return { local: "—", utc: "—", epoch: "—" };
}
const date = new Date(ms);
const local = date.toLocaleTimeString([], {
hour: "2-digit",
minute: "2-digit",
second: "2-digit"
second: "2-digit",
timeZoneName: "short"
});
const utc = date.toLocaleTimeString([], {
hour: "2-digit",
minute: "2-digit",
second: "2-digit",
timeZone: "UTC",
timeZoneName: "short"
});
return { local, utc, epoch: String(importTimeUs) };
}
function logPacketTimes(packet) {
const times = formatTimes(packet.import_time_us);
console.log(
"[firehose] packet time",
"id=" + packet.id,
"epoch_us=" + times.epoch,
"local=" + times.local,
"utc=" + times.utc
);
}
/* ======================================================
@@ -261,7 +253,7 @@ async function fetchUpdates() {
if (updatesPaused) return;
const url = new URL("/api/packets", window.location.origin);
url.searchParams.set("limit", 50);
url.searchParams.set("limit", 100);
if (lastImportTimeUs)
url.searchParams.set("since", lastImportTimeUs);
@@ -277,6 +269,7 @@ async function fetchUpdates() {
const list = document.getElementById("packet_list");
for (const pkt of packets.reverse()) {
logPacketTimes(pkt);
/* FROM — includes translation */
const from =
@@ -336,7 +329,9 @@ async function fetchUpdates() {
const html = `
<tr class="packet-row">
<td>${formatLocalTime(pkt.import_time_us)}</td>
<td>
${formatTimes(pkt.import_time_us).local}<br>
</td>
<td>
<span class="toggle-btn">▶</span>

View File

@@ -140,8 +140,8 @@ map.on("popupopen", function (e) {
if (popupEl) applyTranslationsMap(popupEl);
});
function timeAgo(date){
const diff = Date.now() - new Date(date);
function timeAgoFromUs(us){
const diff = Date.now() - (us / 1000);
const s = Math.floor(diff/1000), m = Math.floor(s/60),
h = Math.floor(m/60), d = Math.floor(h/24);
return d>0?d+"d":h>0?h+"h":m>0?m+"m":s+"s";
@@ -289,7 +289,7 @@ fetch('/api/nodes?days_active=3')
hw_model: n.hw_model || "",
role: n.role || "",
firmware: n.firmware || "",
last_update: n.last_update || "",
last_seen_us: n.last_seen_us || null,
isRouter: (n.role||"").toLowerCase().includes("router")
}));
@@ -333,8 +333,8 @@ function renderNodesOnMap(){
<b data-translate-lang="role_label"></b> ${node.role}<br>
${
node.last_update
? `<b data-translate-lang="last_seen"></b> ${timeAgo(node.last_update)}<br>`
node.last_seen_us
? `<b data-translate-lang="last_seen"></b> ${timeAgoFromUs(node.last_seen_us)}<br>`
: ""
}

View File

@@ -141,7 +141,7 @@ document.addEventListener("DOMContentLoaded", async () => {
</span>
<span class="col-3 nodename">
<a href="/packet_list/${packet.from_node_id}">
<a href="/node/${packet.from_node_id}">
${escapeHtml(fromName)}
</a>
</span>

View File

@@ -285,34 +285,12 @@
</div>
<script src="https://cdn.jsdelivr.net/npm/echarts@5.5.0/dist/echarts.min.js"></script>
<script src="/static/portmaps.js"></script>
<script>
const PORT_COLOR_MAP = {
0: "#6c757d",
1: "#007bff",
3: "#28a745",
4: "#ffc107",
5: "#dc3545",
6: "#20c997",
65: "#6610f2",
67: "#17a2b8",
70: "#ff9800",
71: "#ff66cc",
};
const PORT_LABEL_MAP = {
0: "UNKNOWN",
1: "Text",
3: "Position",
4: "Node Info",
5: "Routing",
6: "Admin",
65: "Store & Forward",
67: "Telemetry",
70: "Traceroute",
71: "Neighbor"
};
const PORT_COLOR_MAP = window.PORT_COLOR_MAP || {};
const PORT_LABEL_MAP = window.PORT_LABEL_MAP || {};
/* ======================================================
NODE PAGE TRANSLATION (isolated from base)

View File

@@ -1,7 +1,10 @@
"""Main web server routes and page rendering for Meshview."""
import asyncio
import datetime
import logging
import os
import pathlib
import re
import ssl
from dataclasses import dataclass
@@ -12,7 +15,7 @@ from google.protobuf import text_format
from google.protobuf.message import Message
from jinja2 import Environment, PackageLoader, Undefined, select_autoescape
from markupsafe import Markup
import pathlib
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import config, database, decode_payload, migrations, models, store
from meshview.__version__ import (
@@ -45,6 +48,8 @@ with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png:
@dataclass
class Packet:
"""UI-friendly packet wrapper for templates and API payloads."""
id: int
from_node_id: int
from_node: models.Node
@@ -56,11 +61,11 @@ class Packet:
raw_payload: object
payload: str
pretty_payload: Markup
import_time: datetime.datetime
import_time_us: int
@classmethod
def from_model(cls, packet):
"""Convert a Packet ORM model into a presentation-friendly Packet."""
mesh_packet, payload = decode_payload.decode(packet)
pretty_payload = None
@@ -101,7 +106,6 @@ class Packet:
data=text_mesh_packet,
payload=text_payload, # now always a string
pretty_payload=pretty_payload,
import_time=packet.import_time,
import_time_us=packet.import_time_us, # <-- include microseconds
raw_mesh_packet=mesh_packet,
raw_payload=payload,
@@ -109,6 +113,7 @@ class Packet:
async def build_trace(node_id):
"""Build a recent GPS trace list for a node using position packets."""
trace = []
for raw_p in await store.get_packets_from(
node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)
@@ -130,6 +135,7 @@ async def build_trace(node_id):
async def build_neighbors(node_id):
"""Return neighbor node metadata for the given node ID."""
packets = await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1)
packet = packets.first()
@@ -159,6 +165,7 @@ async def build_neighbors(node_id):
def node_id_to_hex(node_id):
"""Format a node_id in Meshtastic hex notation."""
if node_id is None or isinstance(node_id, Undefined):
return "Invalid node_id" # i... have no clue
if node_id == 4294967295:
@@ -168,6 +175,7 @@ def node_id_to_hex(node_id):
def format_timestamp(timestamp):
"""Normalize timestamps to ISO 8601 strings."""
if isinstance(timestamp, int):
timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.UTC)
return timestamp.isoformat(timespec="milliseconds")
@@ -185,6 +193,7 @@ routes = web.RouteTableDef()
@routes.get("/")
async def index(request):
"""Redirect root URL to configured starting page."""
"""
Redirect root URL '/' to the page specified in CONFIG['site']['starting'].
Defaults to '/map' if not set.
@@ -194,15 +203,10 @@ async def index(request):
raise web.HTTPFound(location=starting_url)
# redirect for backwards compatibility
@routes.get("/packet_list/{packet_id}")
async def redirect_packet_list(request):
packet_id = request.match_info["packet_id"]
raise web.HTTPFound(location=f"/node/{packet_id}")
# Generic static HTML route
@routes.get("/{page}")
async def serve_page(request):
"""Serve static HTML pages from meshview/static."""
page = request.match_info["page"]
# default to index.html if no extension
@@ -217,7 +221,6 @@ async def serve_page(request):
return web.Response(text=content, content_type="text/html")
@routes.get("/net")
async def net(request):
return web.Response(
@@ -352,8 +355,8 @@ async def graph_traceroute(request):
# It seems some nodes add them self to the list before uplinking
path.append(tr.gateway_node_id)
if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time:
node_seen_time[path[-1]] = tr.import_time
if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time_us:
node_seen_time[path[-1]] = tr.import_time_us
mqtt_nodes.add(tr.gateway_node_id)
node_color[path[-1]] = '#' + hex(hash(tuple(path)))[3:9]
@@ -363,7 +366,7 @@ async def graph_traceroute(request):
for path in paths:
used_nodes.update(path)
import_times = [tr.import_time for tr in traceroutes if tr.import_time]
import_times = [tr.import_time_us for tr in traceroutes if tr.import_time_us]
if import_times:
first_time = min(import_times)
else:
@@ -378,7 +381,7 @@ async def graph_traceroute(request):
f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}'
)
if node_id in node_seen_time:
ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000
ms = (node_seen_time[node_id] - first_time) / 1000
node_name += f'\n {ms:.2f}ms'
style = 'dashed'
if node_id == dest:
@@ -396,7 +399,7 @@ async def graph_traceroute(request):
shape='box',
color=node_color.get(node_id, 'black'),
style=style,
href=f"/packet_list/{node_id}",
href=f"/node/{node_id}",
)
)
@@ -412,6 +415,7 @@ async def graph_traceroute(request):
async def run_server():
"""Start the aiohttp web server after migrations are complete."""
# Wait for database migrations to complete before starting web server
logger.info("Checking database schema status...")
database_url = CONFIG["database"]["connection_string"]
@@ -428,6 +432,7 @@ async def run_server():
logger.info("Database schema verified - starting web server")
app = web.Application()
app.router.add_static("/static/", pathlib.Path(__file__).parent / "static")
app.add_routes(api.routes) # Add API routes
app.add_routes(routes) # Add main web routes

View File

@@ -6,12 +6,15 @@ import logging
import os
from aiohttp import web
from sqlalchemy import text
from sqlalchemy import func, select, text
from meshtastic.protobuf.portnums_pb2 import PortNum
from meshview import database, decode_payload, store
from meshview.__version__ import __version__, _git_revision_short, get_version_info
from meshview.config import CONFIG
from meshview.models import Node
from meshview.models import Packet as PacketModel
from meshview.models import PacketSeen as PacketSeenModel
logger = logging.getLogger(__name__)
@@ -126,7 +129,6 @@ async def api_packets(request):
"portnum": int(p.portnum) if p.portnum is not None else None,
"payload": (p.payload or "").strip(),
"import_time_us": p.import_time_us,
"import_time": p.import_time.isoformat() if p.import_time else None,
"channel": getattr(p.from_node, "channel", ""),
"long_name": getattr(p.from_node, "long_name", ""),
}
@@ -208,7 +210,6 @@ async def api_packets(request):
packet_dict = {
"id": p.id,
"import_time_us": p.import_time_us,
"import_time": p.import_time.isoformat() if p.import_time else None,
"channel": getattr(p.from_node, "channel", ""),
"from_node_id": p.from_node_id,
"to_node_id": p.to_node_id,
@@ -228,20 +229,12 @@ async def api_packets(request):
packets_data.append(packet_dict)
# --- Latest import_time for incremental fetch ---
# --- Latest import_time_us for incremental fetch ---
latest_import_time = None
if packets_data:
for p in packets_data:
if p.get("import_time_us") and p["import_time_us"] > 0:
latest_import_time = max(latest_import_time or 0, p["import_time_us"])
elif p.get("import_time") and latest_import_time is None:
try:
dt = datetime.datetime.fromisoformat(
p["import_time"].replace("Z", "+00:00")
)
latest_import_time = int(dt.timestamp() * 1_000_000)
except Exception:
pass
response = {"packets": packets_data}
if latest_import_time is not None:
@@ -431,14 +424,10 @@ async def api_edges(request):
try:
node_filter = int(node_filter_str)
except ValueError:
return web.json_response(
{"error": "node_id must be integer"},
status=400
)
return web.json_response({"error": "node_id must be integer"}, status=400)
edges = {}
traceroute_count = 0
neighbor_packet_count = 0
edges_added_tr = 0
edges_added_neighbor = 0
@@ -463,8 +452,6 @@ async def api_edges(request):
# --- Neighbor edges ---
if filter_type in (None, "neighbor"):
packets = await store.get_packets(portnum=71)
neighbor_packet_count = len(packets)
for packet in packets:
try:
_, neighbor_info = decode_payload.decode(packet)
@@ -479,21 +466,16 @@ async def api_edges(request):
# Convert to list
edges_list = [
{"from": frm, "to": to, "type": edge_type}
for (frm, to), edge_type in edges.items()
{"from": frm, "to": to, "type": edge_type} for (frm, to), edge_type in edges.items()
]
# NEW → apply node_id filtering
if node_filter is not None:
edges_list = [
e for e in edges_list
if e["from"] == node_filter or e["to"] == node_filter
]
edges_list = [e for e in edges_list if e["from"] == node_filter or e["to"] == node_filter]
return web.json_response({"edges": edges_list})
@routes.get("/api/config")
async def api_config(request):
try:
@@ -711,7 +693,6 @@ async def api_packets_seen(request):
"rx_snr": row.rx_snr,
"rx_rssi": row.rx_rssi,
"topic": row.topic,
"import_time": (row.import_time.isoformat() if row.import_time else None),
"import_time_us": row.import_time_us,
}
)
@@ -725,6 +706,7 @@ async def api_packets_seen(request):
status=500,
)
@routes.get("/api/traceroute/{packet_id}")
async def api_traceroute(request):
packet_id = int(request.match_info['packet_id'])
@@ -746,14 +728,15 @@ async def api_traceroute(request):
forward_list = list(route.route)
reverse_list = list(route.route_back)
tr_groups.append({
"index": idx,
"import_time": tr.import_time.isoformat() if tr.import_time else None,
"gateway_node_id": tr.gateway_node_id,
"done": tr.done,
"forward_hops": forward_list,
"reverse_hops": reverse_list,
})
tr_groups.append(
{
"index": idx,
"gateway_node_id": tr.gateway_node_id,
"done": tr.done,
"forward_hops": forward_list,
"reverse_hops": reverse_list,
}
)
# --------------------------------------------
# Compute UNIQUE paths + counts + winning path
@@ -796,18 +779,20 @@ async def api_traceroute(request):
# --------------------------------------------
# Final API output
# --------------------------------------------
return web.json_response({
"packet": {
"id": packet.id,
"from": packet.from_node_id,
"to": packet.to_node_id,
"channel": packet.channel,
},
"traceroute_packets": tr_groups,
"unique_forward_paths": unique_forward_paths_json,
"unique_reverse_paths": unique_reverse_paths_json,
"winning_paths": winning_paths_json,
})
return web.json_response(
{
"packet": {
"id": packet.id,
"from": packet.from_node_id,
"to": packet.to_node_id,
"channel": packet.channel,
},
"traceroute_packets": tr_groups,
"unique_forward_paths": unique_forward_paths_json,
"unique_reverse_paths": unique_reverse_paths_json,
"winning_paths": winning_paths_json,
}
)
@routes.get("/api/stats/top")
@@ -823,90 +808,75 @@ async def api_stats_top(request):
limit = min(int(request.query.get("limit", 20)), 100)
offset = int(request.query.get("offset", 0))
params = {
"period_type": period_type,
"length": length,
"limit": limit,
"offset": offset,
}
multiplier = 3600 if period_type == "hour" else 86400
window_us = length * multiplier * 1_000_000
channel_filter = ""
if channel:
channel_filter = "AND n.channel = :channel"
params["channel"] = channel
max_packet_import = select(func.max(PacketModel.import_time_us)).scalar_subquery()
max_seen_import = select(func.max(PacketSeenModel.import_time_us)).scalar_subquery()
sql = f"""
WITH sent AS (
SELECT
p.from_node_id AS node_id,
COUNT(*) AS sent
FROM packet p
WHERE p.import_time_us >= (
SELECT MAX(import_time_us) FROM packet
) - (
CASE
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
ELSE :length * 86400 * 1000000
END
)
GROUP BY p.from_node_id
),
seen AS (
SELECT
p.from_node_id AS node_id,
COUNT(*) AS seen
FROM packet_seen ps
JOIN packet p ON p.id = ps.packet_id
WHERE ps.import_time_us >= (
SELECT MAX(import_time_us) FROM packet_seen
) - (
CASE
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
ELSE :length * 86400 * 1000000
END
)
GROUP BY p.from_node_id
sent_cte = (
select(PacketModel.from_node_id.label("node_id"), func.count().label("sent"))
.where(PacketModel.import_time_us >= max_packet_import - window_us)
.group_by(PacketModel.from_node_id)
.cte("sent")
)
SELECT
n.node_id,
n.long_name,
n.short_name,
n.channel,
COALESCE(s.sent, 0) AS sent,
COALESCE(se.seen, 0) AS seen
FROM node n
LEFT JOIN sent s ON s.node_id = n.node_id
LEFT JOIN seen se ON se.node_id = n.node_id
WHERE 1=1
{channel_filter}
ORDER BY seen DESC
LIMIT :limit OFFSET :offset
"""
count_sql = f"""
SELECT COUNT(*) FROM node n WHERE 1=1 {channel_filter}
"""
seen_cte = (
select(PacketModel.from_node_id.label("node_id"), func.count().label("seen"))
.select_from(PacketSeenModel)
.join(PacketModel, PacketModel.id == PacketSeenModel.packet_id)
.where(PacketSeenModel.import_time_us >= max_seen_import - window_us)
.group_by(PacketModel.from_node_id)
.cte("seen")
)
query = (
select(
Node.node_id,
Node.long_name,
Node.short_name,
Node.channel,
func.coalesce(sent_cte.c.sent, 0).label("sent"),
func.coalesce(seen_cte.c.seen, 0).label("seen"),
)
.select_from(Node)
.outerjoin(sent_cte, sent_cte.c.node_id == Node.node_id)
.outerjoin(seen_cte, seen_cte.c.node_id == Node.node_id)
.order_by(func.coalesce(seen_cte.c.seen, 0).desc())
.limit(limit)
.offset(offset)
)
count_query = select(func.count()).select_from(Node)
if channel:
query = query.where(Node.channel == channel)
count_query = count_query.where(Node.channel == channel)
async with database.async_session() as session:
rows = (await session.execute(text(sql), params)).all()
total = (await session.execute(text(count_sql), params)).scalar() or 0
rows = (await session.execute(query)).all()
total = (await session.execute(count_query)).scalar() or 0
nodes = []
for r in rows:
avg = r.seen / max(r.sent, 1)
nodes.append({
"node_id": r.node_id,
"long_name": r.long_name,
"short_name": r.short_name,
"channel": r.channel,
"sent": r.sent,
"seen": r.seen,
"avg": round(avg, 2),
})
nodes.append(
{
"node_id": r.node_id,
"long_name": r.long_name,
"short_name": r.short_name,
"channel": r.channel,
"sent": r.sent,
"seen": r.seen,
"avg": round(avg, 2),
}
)
return web.json_response({
"total": total,
"limit": limit,
"offset": offset,
"nodes": nodes,
})
return web.json_response(
{
"total": total,
"limit": limit,
"offset": offset,
"nodes": nodes,
}
)

View File

@@ -81,7 +81,10 @@ password = large4cats
# Database Configuration
# -------------------------
[database]
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
# SQLAlchemy async connection string.
# Examples:
# sqlite+aiosqlite:///packets.db
# postgresql+asyncpg://user:pass@host:5432/meshview
connection_string = sqlite+aiosqlite:///packets.db

View File

@@ -7,6 +7,7 @@ import shutil
from pathlib import Path
from sqlalchemy import delete
from sqlalchemy.engine.url import make_url
from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store
from meshview.config import CONFIG
@@ -65,18 +66,16 @@ async def backup_database(database_url: str, backup_dir: str = ".") -> None:
backup_dir: Directory to store backups (default: current directory)
"""
try:
# Extract database file path from connection string
# Format: sqlite+aiosqlite:///path/to/db.db
if not database_url.startswith("sqlite"):
url = make_url(database_url)
if not url.drivername.startswith("sqlite"):
cleanup_logger.warning("Backup only supported for SQLite databases")
return
db_path = database_url.split("///", 1)[1] if "///" in database_url else None
if not db_path:
if not url.database or url.database == ":memory:":
cleanup_logger.error("Could not extract database path from connection string")
return
db_file = Path(db_path)
db_file = Path(url.database)
if not db_file.exists():
cleanup_logger.error(f"Database file not found: {db_file}")
return
@@ -153,11 +152,11 @@ async def daily_cleanup_at(
cleanup_logger.info("Waiting 60 seconds for backup to complete...")
await asyncio.sleep(60)
# Local-time cutoff as string for SQLite DATETIME comparison
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime(
"%Y-%m-%d %H:%M:%S"
)
cleanup_logger.info(f"Running cleanup for records older than {cutoff}...")
cutoff_dt = (
datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=days_to_keep)
).replace(tzinfo=None)
cutoff_us = int(cutoff_dt.timestamp() * 1_000_000)
cleanup_logger.info(f"Running cleanup for records older than {cutoff_dt.isoformat()}...")
try:
async with db_lock: # Pause ingestion
@@ -168,7 +167,7 @@ async def daily_cleanup_at(
# Packet
# -------------------------
result = await session.execute(
delete(models.Packet).where(models.Packet.import_time < cutoff)
delete(models.Packet).where(models.Packet.import_time_us < cutoff_us)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet")
@@ -176,7 +175,9 @@ async def daily_cleanup_at(
# PacketSeen
# -------------------------
result = await session.execute(
delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff)
delete(models.PacketSeen).where(
models.PacketSeen.import_time_us < cutoff_us
)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen")
@@ -184,7 +185,9 @@ async def daily_cleanup_at(
# Traceroute
# -------------------------
result = await session.execute(
delete(models.Traceroute).where(models.Traceroute.import_time < cutoff)
delete(models.Traceroute).where(
models.Traceroute.import_time_us < cutoff_us
)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute")
@@ -192,17 +195,19 @@ async def daily_cleanup_at(
# Node
# -------------------------
result = await session.execute(
delete(models.Node).where(models.Node.last_update < cutoff)
delete(models.Node).where(models.Node.last_seen_us < cutoff_us)
)
cleanup_logger.info(f"Deleted {result.rowcount} rows from Node")
await session.commit()
if vacuum_db:
if vacuum_db and mqtt_database.engine.dialect.name == "sqlite":
cleanup_logger.info("Running VACUUM...")
async with mqtt_database.engine.begin() as conn:
await conn.exec_driver_sql("VACUUM;")
cleanup_logger.info("VACUUM completed.")
elif vacuum_db:
cleanup_logger.info("VACUUM skipped (not supported for this database).")
cleanup_logger.info("Cleanup completed successfully.")
cleanup_logger.info("Ingestion resumed after cleanup.")