mirror of
https://github.com/pablorevilla-meshtastic/meshview.git
synced 2026-03-04 23:27:46 +01:00
Compare commits
5 Commits
v3.0.2
...
db_updates
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9639d851b | ||
|
|
fa98f56318 | ||
|
|
f85e783e8c | ||
|
|
e12e3a2a41 | ||
|
|
da31794d8d |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -45,3 +45,4 @@ __pycache__/
|
||||
# OS
|
||||
.DS_Store
|
||||
Thumbs.db
|
||||
packets.db-journal
|
||||
|
||||
@@ -128,6 +128,10 @@ username =
|
||||
password =
|
||||
|
||||
[database]
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///var/lib/meshview/packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///var/lib/meshview/packets.db
|
||||
```
|
||||
|
||||
|
||||
37
README.md
37
README.md
@@ -272,9 +272,24 @@ password = large4cats
|
||||
# Database Configuration
|
||||
# -------------------------
|
||||
[database]
|
||||
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///packets.db
|
||||
|
||||
> **NOTE (PostgreSQL setup)**
|
||||
> If you want to use PostgreSQL instead of SQLite:
|
||||
>
|
||||
> 1) Install PostgreSQL for your OS.
|
||||
> 2) Create a user and database:
|
||||
> - `CREATE USER meshview WITH PASSWORD 'change_me';`
|
||||
> - `CREATE DATABASE meshview OWNER meshview;`
|
||||
> 3) Update `config.ini`:
|
||||
> - `connection_string = postgresql+asyncpg://meshview:change_me@localhost:5432/meshview`
|
||||
> 4) Initialize the schema:
|
||||
> - `./env/bin/python startdb.py`
|
||||
|
||||
|
||||
# -------------------------
|
||||
# Database Cleanup Configuration
|
||||
@@ -493,10 +508,22 @@ sleep 5
|
||||
echo "Run cleanup..."
|
||||
# Run cleanup queries
|
||||
sqlite3 "$DB_FILE" <<EOF
|
||||
DELETE FROM packet WHERE import_time < datetime('now', '-14 day');
|
||||
DELETE FROM packet_seen WHERE import_time < datetime('now', '-14 day');
|
||||
DELETE FROM traceroute WHERE import_time < datetime('now', '-14 day');
|
||||
DELETE FROM node WHERE last_update < datetime('now', '-14 day') OR last_update IS NULL OR last_update = '';
|
||||
DELETE FROM packet
|
||||
WHERE import_time_us IS NOT NULL
|
||||
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
|
||||
SELECT 'packet deleted: ' || changes();
|
||||
DELETE FROM packet_seen
|
||||
WHERE import_time_us IS NOT NULL
|
||||
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
|
||||
SELECT 'packet_seen deleted: ' || changes();
|
||||
DELETE FROM traceroute
|
||||
WHERE import_time_us IS NOT NULL
|
||||
AND import_time_us < (strftime('%s','now','-14 days') * 1000000);
|
||||
SELECT 'traceroute deleted: ' || changes();
|
||||
DELETE FROM node
|
||||
WHERE last_seen_us IS NULL
|
||||
OR last_seen_us < (strftime('%s','now','-14 days') * 1000000);
|
||||
SELECT 'node deleted: ' || changes();
|
||||
VACUUM;
|
||||
EOF
|
||||
|
||||
|
||||
65
alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py
Normal file
65
alembic/versions/9f3b1a8d2c4f_drop_import_time_columns.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Drop import_time columns.
|
||||
|
||||
Revision ID: 9f3b1a8d2c4f
|
||||
Revises: 2b5a61bb2b75
|
||||
Create Date: 2026-01-09 09:55:00.000000
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "9f3b1a8d2c4f"
|
||||
down_revision: str | None = "2b5a61bb2b75"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
|
||||
packet_indexes = {idx["name"] for idx in inspector.get_indexes("packet")}
|
||||
packet_columns = {col["name"] for col in inspector.get_columns("packet")}
|
||||
|
||||
with op.batch_alter_table("packet", schema=None) as batch_op:
|
||||
if "idx_packet_import_time" in packet_indexes:
|
||||
batch_op.drop_index("idx_packet_import_time")
|
||||
if "idx_packet_from_node_time" in packet_indexes:
|
||||
batch_op.drop_index("idx_packet_from_node_time")
|
||||
if "import_time" in packet_columns:
|
||||
batch_op.drop_column("import_time")
|
||||
|
||||
packet_seen_columns = {col["name"] for col in inspector.get_columns("packet_seen")}
|
||||
with op.batch_alter_table("packet_seen", schema=None) as batch_op:
|
||||
if "import_time" in packet_seen_columns:
|
||||
batch_op.drop_column("import_time")
|
||||
|
||||
traceroute_indexes = {idx["name"] for idx in inspector.get_indexes("traceroute")}
|
||||
traceroute_columns = {col["name"] for col in inspector.get_columns("traceroute")}
|
||||
with op.batch_alter_table("traceroute", schema=None) as batch_op:
|
||||
if "idx_traceroute_import_time" in traceroute_indexes:
|
||||
batch_op.drop_index("idx_traceroute_import_time")
|
||||
if "import_time" in traceroute_columns:
|
||||
batch_op.drop_column("import_time")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("traceroute", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
|
||||
batch_op.create_index("idx_traceroute_import_time", ["import_time"], unique=False)
|
||||
|
||||
with op.batch_alter_table("packet_seen", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
|
||||
|
||||
with op.batch_alter_table("packet", schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column("import_time", sa.DateTime(), nullable=True))
|
||||
batch_op.create_index("idx_packet_import_time", [sa.text("import_time DESC")], unique=False)
|
||||
batch_op.create_index(
|
||||
"idx_packet_from_node_time",
|
||||
["from_node_id", sa.text("import_time DESC")],
|
||||
unique=False,
|
||||
)
|
||||
94
alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
Normal file
94
alembic/versions/b7c3c2e3a1f0_add_last_update_us_to_node.py
Normal file
@@ -0,0 +1,94 @@
|
||||
"""Add last_update_us to node and migrate data.
|
||||
|
||||
Revision ID: b7c3c2e3a1f0
|
||||
Revises: 9f3b1a8d2c4f
|
||||
Create Date: 2026-01-12 10:12:00.000000
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
from datetime import UTC, datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "b7c3c2e3a1f0"
|
||||
down_revision: str | None = "9f3b1a8d2c4f"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def _parse_datetime(value):
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, datetime):
|
||||
dt = value
|
||||
elif isinstance(value, str):
|
||||
text = value.replace("Z", "+00:00")
|
||||
try:
|
||||
dt = datetime.fromisoformat(text)
|
||||
except ValueError:
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
if dt.tzinfo is None:
|
||||
return dt.replace(tzinfo=UTC)
|
||||
return dt.astimezone(UTC)
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
|
||||
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
|
||||
|
||||
node = sa.table(
|
||||
"node",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("last_update", sa.DateTime()),
|
||||
sa.column("last_update_us", sa.BigInteger()),
|
||||
)
|
||||
|
||||
rows = conn.execute(sa.select(node.c.id, node.c.last_update)).all()
|
||||
for node_id, last_update in rows:
|
||||
dt = _parse_datetime(last_update)
|
||||
if dt is None:
|
||||
continue
|
||||
last_update_us = int(dt.timestamp() * 1_000_000)
|
||||
conn.execute(
|
||||
sa.update(node).where(node.c.id == node_id).values(last_update_us=last_update_us)
|
||||
)
|
||||
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_column("last_update")
|
||||
else:
|
||||
op.drop_column("node", "last_update")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
op.add_column("node", sa.Column("last_update", sa.DateTime(), nullable=True))
|
||||
|
||||
node = sa.table(
|
||||
"node",
|
||||
sa.column("id", sa.String()),
|
||||
sa.column("last_update", sa.DateTime()),
|
||||
sa.column("last_update_us", sa.BigInteger()),
|
||||
)
|
||||
|
||||
rows = conn.execute(sa.select(node.c.id, node.c.last_update_us)).all()
|
||||
for node_id, last_update_us in rows:
|
||||
if last_update_us is None:
|
||||
continue
|
||||
dt = datetime.fromtimestamp(last_update_us / 1_000_000, tz=UTC).replace(tzinfo=None)
|
||||
conn.execute(sa.update(node).where(node.c.id == node_id).values(last_update=dt))
|
||||
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_node_last_update_us")
|
||||
batch_op.drop_column("last_update_us")
|
||||
else:
|
||||
op.drop_index("idx_node_last_update_us", table_name="node")
|
||||
op.drop_column("node", "last_update_us")
|
||||
@@ -0,0 +1,34 @@
|
||||
"""Drop last_update_us from node.
|
||||
|
||||
Revision ID: d4d7b0c2e1a4
|
||||
Revises: b7c3c2e3a1f0
|
||||
Create Date: 2026-01-12 10:20:00.000000
|
||||
"""
|
||||
|
||||
from collections.abc import Sequence
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "d4d7b0c2e1a4"
|
||||
down_revision: str | None = "b7c3c2e3a1f0"
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
if conn.dialect.name == "sqlite":
|
||||
with op.batch_alter_table("node", schema=None) as batch_op:
|
||||
batch_op.drop_index("idx_node_last_update_us")
|
||||
batch_op.drop_column("last_update_us")
|
||||
else:
|
||||
op.drop_index("idx_node_last_update_us", table_name="node")
|
||||
op.drop_column("node", "last_update_us")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.add_column("node", sa.Column("last_update_us", sa.BigInteger(), nullable=True))
|
||||
op.create_index("idx_node_last_update_us", "node", ["last_update_us"], unique=False)
|
||||
@@ -1,82 +1,38 @@
|
||||
|
||||
# API Documentation
|
||||
|
||||
## 1. Chat API
|
||||
Base URL: `http(s)://<host>`
|
||||
|
||||
### GET `/api/chat`
|
||||
Returns the most recent chat messages.
|
||||
All endpoints return JSON. Timestamps are either ISO 8601 strings or `*_us` values in
|
||||
microseconds since epoch.
|
||||
|
||||
**Query Parameters**
|
||||
- `limit` (optional, int): Maximum number of messages to return. Default: `100`.
|
||||
|
||||
**Response Example**
|
||||
```json
|
||||
{
|
||||
"packets": [
|
||||
{
|
||||
"id": 123,
|
||||
"import_time": "2025-07-22T12:45:00",
|
||||
"from_node_id": 987654,
|
||||
"from_node": "Alice",
|
||||
"channel": "main",
|
||||
"payload": "Hello, world!"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### GET `/api/chat/updates`
|
||||
Returns chat messages imported after a given timestamp.
|
||||
|
||||
**Query Parameters**
|
||||
- `last_time` (optional, ISO timestamp): Only messages imported after this time are returned.
|
||||
|
||||
**Response Example**
|
||||
```json
|
||||
{
|
||||
"packets": [
|
||||
{
|
||||
"id": 124,
|
||||
"import_time": "2025-07-22T12:50:00",
|
||||
"from_node_id": 987654,
|
||||
"from_node": "Alice",
|
||||
"channel": "main",
|
||||
"payload": "New message!"
|
||||
}
|
||||
],
|
||||
"latest_import_time": "2025-07-22T12:50:00"
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 2. Nodes API
|
||||
## 1. Nodes API
|
||||
|
||||
### GET `/api/nodes`
|
||||
Returns a list of all nodes, with optional filtering by last seen.
|
||||
Returns a list of nodes, with optional filtering.
|
||||
|
||||
**Query Parameters**
|
||||
- `hours` (optional, int): Return nodes seen in the last N hours.
|
||||
- `days` (optional, int): Return nodes seen in the last N days.
|
||||
- `last_seen_after` (optional, ISO timestamp): Return nodes seen after this time.
|
||||
Query Parameters
|
||||
- `node_id` (optional, int): Exact node ID.
|
||||
- `role` (optional, string): Node role.
|
||||
- `channel` (optional, string): Channel name.
|
||||
- `hw_model` (optional, string): Hardware model.
|
||||
- `days_active` (optional, int): Nodes seen within the last N days.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"nodes": [
|
||||
{
|
||||
"id": 42,
|
||||
"node_id": 1234,
|
||||
"long_name": "Alice",
|
||||
"short_name": "A",
|
||||
"channel": "main",
|
||||
"last_seen": "2025-07-22T12:40:00",
|
||||
"hardware": "T-Beam",
|
||||
"hw_model": "T-Beam",
|
||||
"firmware": "1.2.3",
|
||||
"role": "client",
|
||||
"last_lat": 37.7749,
|
||||
"last_long": -122.4194
|
||||
"last_lat": 377749000,
|
||||
"last_long": -1224194000,
|
||||
"channel": "main",
|
||||
"last_seen_us": 1736370123456789
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -84,45 +40,58 @@ Returns a list of all nodes, with optional filtering by last seen.
|
||||
|
||||
---
|
||||
|
||||
## 3. Packets API
|
||||
## 2. Packets API
|
||||
|
||||
### GET `/api/packets`
|
||||
Returns a list of packets with optional filters.
|
||||
Returns packets with optional filters.
|
||||
|
||||
**Query Parameters**
|
||||
- `limit` (optional, int): Maximum number of packets to return. Default: `200`.
|
||||
- `since` (optional, ISO timestamp): Only packets imported after this timestamp are returned.
|
||||
Query Parameters
|
||||
- `packet_id` (optional, int): Return exactly one packet (overrides other filters).
|
||||
- `limit` (optional, int): Max packets to return, clamped 1-1000. Default: `50`.
|
||||
- `since` (optional, int): Only packets imported after this microsecond timestamp.
|
||||
- `portnum` (optional, int): Filter by port number.
|
||||
- `contains` (optional, string): Payload substring filter.
|
||||
- `from_node_id` (optional, int): Filter by sender node ID.
|
||||
- `to_node_id` (optional, int): Filter by recipient node ID.
|
||||
- `node_id` (optional, int): Legacy filter matching either from or to node ID.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"packets": [
|
||||
{
|
||||
"id": 123,
|
||||
"import_time_us": 1736370123456789,
|
||||
"channel": "main",
|
||||
"from_node_id": 5678,
|
||||
"to_node_id": 91011,
|
||||
"portnum": 1,
|
||||
"import_time": "2025-07-22T12:45:00",
|
||||
"payload": "Hello, Bob!"
|
||||
"long_name": "Alice",
|
||||
"payload": "Hello, Bob!",
|
||||
"to_long_name": "Bob",
|
||||
"reply_id": 122
|
||||
}
|
||||
]
|
||||
],
|
||||
"latest_import_time": 1736370123456789
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
Notes
|
||||
- For `portnum=1` (text messages), packets are filtered to remove sequence-only payloads.
|
||||
- `latest_import_time` is returned when available for incremental polling (microseconds).
|
||||
|
||||
---
|
||||
|
||||
## 4. Channels API
|
||||
## 3. Channels API
|
||||
|
||||
### GET `/api/channels`
|
||||
Returns a list of channels seen in a given time period.
|
||||
Returns channels seen in a time period.
|
||||
|
||||
**Query Parameters**
|
||||
- `period_type` (optional, string): Time granularity (`hour` or `day`). Default: `hour`.
|
||||
Query Parameters
|
||||
- `period_type` (optional, string): `hour` or `day`. Default: `hour`.
|
||||
- `length` (optional, int): Number of periods to look back. Default: `24`.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"channels": ["LongFast", "MediumFast", "ShortFast"]
|
||||
@@ -131,29 +100,21 @@ Returns a list of channels seen in a given time period.
|
||||
|
||||
---
|
||||
|
||||
## 5. Statistics API
|
||||
## 4. Stats API
|
||||
|
||||
### GET `/api/stats`
|
||||
Returns packet statistics aggregated by time periods, with optional filtering.
|
||||
|
||||
Retrieve packet statistics aggregated by time periods, with optional filtering.
|
||||
|
||||
---
|
||||
|
||||
## Query Parameters
|
||||
|
||||
| Parameter | Type | Required | Default | Description |
|
||||
|--------------|---------|----------|----------|-------------------------------------------------------------------------------------------------|
|
||||
| `period_type` | string | No | `hour` | Time granularity of the stats. Allowed values: `hour`, `day`. |
|
||||
| `length` | integer | No | 24 | Number of periods to include (hours or days). |
|
||||
| `channel` | string | No | — | Filter results by channel name (case-insensitive). |
|
||||
| `portnum` | integer | No | — | Filter results by port number. |
|
||||
| `to_node` | integer | No | — | Filter results to packets sent **to** this node ID. |
|
||||
| `from_node` | integer | No | — | Filter results to packets sent **from** this node ID. |
|
||||
|
||||
---
|
||||
|
||||
## Response
|
||||
Query Parameters
|
||||
- `period_type` (optional, string): `hour` or `day`. Default: `hour`.
|
||||
- `length` (optional, int): Number of periods to include. Default: `24`.
|
||||
- `channel` (optional, string): Filter by channel (case-insensitive).
|
||||
- `portnum` (optional, int): Filter by port number.
|
||||
- `to_node` (optional, int): Filter by destination node ID.
|
||||
- `from_node` (optional, int): Filter by source node ID.
|
||||
- `node` (optional, int): If provided, return combined `sent` and `seen` totals for that node.
|
||||
|
||||
Response Example (series)
|
||||
```json
|
||||
{
|
||||
"period_type": "hour",
|
||||
@@ -163,65 +124,117 @@ Retrieve packet statistics aggregated by time periods, with optional filtering.
|
||||
"to_node": 12345678,
|
||||
"from_node": 87654321,
|
||||
"data": [
|
||||
{ "period": "2025-08-08 14:00", "count": 10 },
|
||||
{ "period": "2025-08-08 15:00", "count": 7 }
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
Response Example (`node` totals)
|
||||
```json
|
||||
{
|
||||
"node_id": 12345678,
|
||||
"period_type": "hour",
|
||||
"length": 24,
|
||||
"sent": 42,
|
||||
"seen": 58
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### GET `/api/stats/count`
|
||||
Returns total packet counts, optionally filtered.
|
||||
|
||||
Query Parameters
|
||||
- `packet_id` (optional, int): Filter packet_seen by packet ID.
|
||||
- `period_type` (optional, string): `hour` or `day`.
|
||||
- `length` (optional, int): Number of periods to include.
|
||||
- `channel` (optional, string): Filter by channel.
|
||||
- `from_node` (optional, int): Filter by source node ID.
|
||||
- `to_node` (optional, int): Filter by destination node ID.
|
||||
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"total_packets": 12345,
|
||||
"total_seen": 67890
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### GET `/api/stats/top`
|
||||
Returns nodes sorted by packets seen, with pagination.
|
||||
|
||||
Query Parameters
|
||||
- `period_type` (optional, string): `hour` or `day`. Default: `day`.
|
||||
- `length` (optional, int): Number of periods to include. Default: `1`.
|
||||
- `channel` (optional, string): Filter by channel.
|
||||
- `limit` (optional, int): Max nodes to return. Default: `20`, max `100`.
|
||||
- `offset` (optional, int): Pagination offset. Default: `0`.
|
||||
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"total": 250,
|
||||
"limit": 20,
|
||||
"offset": 0,
|
||||
"nodes": [
|
||||
{
|
||||
"period": "2025-08-08 14:00",
|
||||
"count": 10
|
||||
},
|
||||
{
|
||||
"period": "2025-08-08 15:00",
|
||||
"count": 7
|
||||
"node_id": 1234,
|
||||
"long_name": "Alice",
|
||||
"short_name": "A",
|
||||
"channel": "main",
|
||||
"sent": 100,
|
||||
"seen": 240,
|
||||
"avg": 2.4
|
||||
}
|
||||
// more entries...
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 6. Edges API
|
||||
## 5. Edges API
|
||||
|
||||
### GET `/api/edges`
|
||||
Returns network edges (connections between nodes) based on traceroutes and neighbor info.
|
||||
Traceroute edges are collected over the last 48 hours. Neighbor edges are based on
|
||||
port 71 packets.
|
||||
|
||||
**Query Parameters**
|
||||
- `type` (optional, string): Filter by edge type (`traceroute` or `neighbor`). If omitted, returns both types.
|
||||
Query Parameters
|
||||
- `type` (optional, string): `traceroute` or `neighbor`. If omitted, returns both.
|
||||
- `node_id` (optional, int): Filter edges to only those touching a node.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"edges": [
|
||||
{
|
||||
"from": 12345678,
|
||||
"to": 87654321,
|
||||
"type": "traceroute"
|
||||
},
|
||||
{
|
||||
"from": 11111111,
|
||||
"to": 22222222,
|
||||
"type": "neighbor"
|
||||
}
|
||||
{ "from": 12345678, "to": 87654321, "type": "traceroute" },
|
||||
{ "from": 11111111, "to": 22222222, "type": "neighbor" }
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 7. Configuration API
|
||||
## 6. Config API
|
||||
|
||||
### GET `/api/config`
|
||||
Returns the current site configuration (safe subset exposed to clients).
|
||||
Returns a safe subset of server configuration.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"site": {
|
||||
"domain": "meshview.example.com",
|
||||
"domain": "example.com",
|
||||
"language": "en",
|
||||
"title": "Bay Area Mesh",
|
||||
"message": "Real time data from around the bay area",
|
||||
"title": "Meshview",
|
||||
"message": "",
|
||||
"starting": "/chat",
|
||||
"nodes": "true",
|
||||
"conversations": "true",
|
||||
"chat": "true",
|
||||
"everything": "true",
|
||||
"graphs": "true",
|
||||
"stats": "true",
|
||||
@@ -236,11 +249,11 @@ Returns the current site configuration (safe subset exposed to clients).
|
||||
"firehose_interval": 3,
|
||||
"weekly_net_message": "Weekly Mesh check-in message.",
|
||||
"net_tag": "#BayMeshNet",
|
||||
"version": "2.0.8 ~ 10-22-25"
|
||||
"version": "3.0.0"
|
||||
},
|
||||
"mqtt": {
|
||||
"server": "mqtt.bayme.sh",
|
||||
"topics": ["msh/US/bayarea/#"]
|
||||
"server": "mqtt.example.com",
|
||||
"topics": ["msh/region/#"]
|
||||
},
|
||||
"cleanup": {
|
||||
"enabled": "false",
|
||||
@@ -254,91 +267,125 @@ Returns the current site configuration (safe subset exposed to clients).
|
||||
|
||||
---
|
||||
|
||||
## 8. Language/Translations API
|
||||
## 7. Language API
|
||||
|
||||
### GET `/api/lang`
|
||||
Returns translation strings for the UI.
|
||||
Returns translation strings.
|
||||
|
||||
**Query Parameters**
|
||||
- `lang` (optional, string): Language code (e.g., `en`, `es`). Defaults to site language setting.
|
||||
- `section` (optional, string): Specific section to retrieve translations for.
|
||||
Query Parameters
|
||||
- `lang` (optional, string): Language code (e.g., `en`, `es`). Default from config or `en`.
|
||||
- `section` (optional, string): Return only one section (e.g., `nodelist`, `firehose`).
|
||||
|
||||
**Response Example (full)**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"chat": {
|
||||
"title": "Chat",
|
||||
"send": "Send"
|
||||
},
|
||||
"map": {
|
||||
"title": "Map",
|
||||
"zoom_in": "Zoom In"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Response Example (section-specific)**
|
||||
Request: `/api/lang?section=chat`
|
||||
```json
|
||||
{
|
||||
"title": "Chat",
|
||||
"send": "Send"
|
||||
"title": "Meshview",
|
||||
"search_placeholder": "Search..."
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 9. Health Check API
|
||||
## 8. Packets Seen API
|
||||
|
||||
### GET `/api/packets_seen/{packet_id}`
|
||||
Returns packet_seen entries for a packet.
|
||||
|
||||
Path Parameters
|
||||
- `packet_id` (required, int): Packet ID.
|
||||
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"seen": [
|
||||
{
|
||||
"packet_id": 123,
|
||||
"node_id": 456,
|
||||
"rx_time": "2025-07-22T12:45:00",
|
||||
"hop_limit": 7,
|
||||
"hop_start": 0,
|
||||
"channel": "main",
|
||||
"rx_snr": 5.0,
|
||||
"rx_rssi": -90,
|
||||
"topic": "msh/region/#",
|
||||
"import_time_us": 1736370123456789
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 9. Traceroute API
|
||||
|
||||
### GET `/api/traceroute/{packet_id}`
|
||||
Returns traceroute details and derived paths for a packet.
|
||||
|
||||
Path Parameters
|
||||
- `packet_id` (required, int): Packet ID.
|
||||
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"packet": {
|
||||
"id": 123,
|
||||
"from": 111,
|
||||
"to": 222,
|
||||
"channel": "main"
|
||||
},
|
||||
"traceroute_packets": [
|
||||
{
|
||||
"index": 0,
|
||||
"gateway_node_id": 333,
|
||||
"done": true,
|
||||
"forward_hops": [111, 444, 222],
|
||||
"reverse_hops": [222, 444, 111]
|
||||
}
|
||||
],
|
||||
"unique_forward_paths": [
|
||||
{ "path": [111, 444, 222], "count": 2 }
|
||||
],
|
||||
"unique_reverse_paths": [
|
||||
[222, 444, 111]
|
||||
],
|
||||
"winning_paths": [
|
||||
[111, 444, 222]
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 10. Health API
|
||||
|
||||
### GET `/health`
|
||||
Health check endpoint for monitoring, load balancers, and orchestration systems.
|
||||
Returns service health and database status.
|
||||
|
||||
**Response Example (Healthy)**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"status": "healthy",
|
||||
"timestamp": "2025-11-03T14:30:00.123456Z",
|
||||
"timestamp": "2025-07-22T12:45:00+00:00",
|
||||
"version": "3.0.0",
|
||||
"git_revision": "6416978",
|
||||
"git_revision": "abc1234",
|
||||
"database": "connected",
|
||||
"database_size": "853.03 MB",
|
||||
"database_size_bytes": 894468096
|
||||
}
|
||||
```
|
||||
|
||||
**Response Example (Unhealthy)**
|
||||
Status Code: `503 Service Unavailable`
|
||||
```json
|
||||
{
|
||||
"status": "unhealthy",
|
||||
"timestamp": "2025-11-03T14:30:00.123456Z",
|
||||
"version": "2.0.8",
|
||||
"git_revision": "6416978",
|
||||
"database": "disconnected"
|
||||
"database_size": "12.34 MB",
|
||||
"database_size_bytes": 12939444
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 10. Version API
|
||||
## 11. Version API
|
||||
|
||||
### GET `/version`
|
||||
Returns detailed version information including semver, release date, and git revision.
|
||||
Returns version metadata.
|
||||
|
||||
**Response Example**
|
||||
Response Example
|
||||
```json
|
||||
{
|
||||
"version": "2.0.8",
|
||||
"release_date": "2025-10-22",
|
||||
"git_revision": "6416978a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q",
|
||||
"git_revision_short": "6416978"
|
||||
"version": "3.0.0",
|
||||
"git_revision": "abc1234",
|
||||
"build_time": "2025-11-01T12:00:00+00:00"
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Notes
|
||||
- All timestamps (`import_time`, `last_seen`) are returned in ISO 8601 format.
|
||||
- `portnum` is an integer representing the packet type.
|
||||
- `payload` is always a UTF-8 decoded string.
|
||||
- Node IDs are integers (e.g., `12345678`).
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
__version__ = "3.0.1"
|
||||
__release_date__ = "2025-12-4"
|
||||
__version__ = "3.0.2"
|
||||
__release_date__ = "2026-1-9"
|
||||
|
||||
|
||||
def get_git_revision():
|
||||
|
||||
@@ -6,7 +6,7 @@ parser = argparse.ArgumentParser(description="MeshView Configuration Loader")
|
||||
parser.add_argument(
|
||||
"--config", type=str, default="config.ini", help="Path to config.ini file (default: config.ini)"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
args, _ = parser.parse_known_args()
|
||||
|
||||
# Initialize config parser
|
||||
config_parser = configparser.ConfigParser()
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
|
||||
|
||||
from meshview import models
|
||||
@@ -9,10 +10,19 @@ async_session = None
|
||||
def init_database(database_connection_string):
|
||||
global engine, async_session
|
||||
kwargs = {"echo": False}
|
||||
# Ensure SQLite is opened in read-only mode
|
||||
database_connection_string += "?mode=ro"
|
||||
kwargs["connect_args"] = {"uri": True}
|
||||
engine = create_async_engine(database_connection_string, **kwargs)
|
||||
url = make_url(database_connection_string)
|
||||
connect_args = {}
|
||||
|
||||
if url.drivername.startswith("sqlite"):
|
||||
query = dict(url.query)
|
||||
query.setdefault("mode", "ro")
|
||||
url = url.set(query=query)
|
||||
connect_args["uri"] = True
|
||||
|
||||
if connect_args:
|
||||
kwargs["connect_args"] = connect_args
|
||||
|
||||
engine = create_async_engine(url, **kwargs)
|
||||
async_session = async_sessionmaker(
|
||||
bind=engine,
|
||||
class_=AsyncSession,
|
||||
|
||||
@@ -186,19 +186,24 @@ async def create_migration_status_table(engine: AsyncEngine) -> None:
|
||||
text("""
|
||||
CREATE TABLE IF NOT EXISTS migration_status (
|
||||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||||
in_progress BOOLEAN NOT NULL DEFAULT 0,
|
||||
in_progress BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||||
)
|
||||
""")
|
||||
)
|
||||
|
||||
# Insert initial row if not exists
|
||||
await conn.execute(
|
||||
result = await conn.execute(
|
||||
text("""
|
||||
INSERT OR IGNORE INTO migration_status (id, in_progress)
|
||||
VALUES (1, 0)
|
||||
SELECT 1 FROM migration_status WHERE id = 1
|
||||
""")
|
||||
)
|
||||
if result.first() is None:
|
||||
await conn.execute(
|
||||
text("""
|
||||
INSERT INTO migration_status (id, in_progress)
|
||||
VALUES (1, FALSE)
|
||||
""")
|
||||
)
|
||||
|
||||
|
||||
async def set_migration_in_progress(engine: AsyncEngine, in_progress: bool) -> None:
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy import BigInteger, ForeignKey, Index, desc
|
||||
from sqlalchemy.ext.asyncio import AsyncAttrs
|
||||
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
|
||||
@@ -22,7 +20,6 @@ class Node(Base):
|
||||
last_lat: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
last_long: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
channel: Mapped[str] = mapped_column(nullable=True)
|
||||
last_update: Mapped[datetime] = mapped_column(nullable=True)
|
||||
first_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
last_seen_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
|
||||
@@ -33,11 +30,7 @@ class Node(Base):
|
||||
)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
column.name: getattr(self, column.name)
|
||||
for column in self.__table__.columns
|
||||
if column.name != "last_update"
|
||||
}
|
||||
return {column.name: getattr(self, column.name) for column in self.__table__.columns}
|
||||
|
||||
|
||||
class Packet(Base):
|
||||
@@ -55,17 +48,13 @@ class Packet(Base):
|
||||
overlaps="from_node",
|
||||
)
|
||||
payload: Mapped[bytes] = mapped_column(nullable=True)
|
||||
import_time: Mapped[datetime] = mapped_column(nullable=True)
|
||||
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
channel: Mapped[str] = mapped_column(nullable=True)
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_packet_from_node_id", "from_node_id"),
|
||||
Index("idx_packet_to_node_id", "to_node_id"),
|
||||
Index("idx_packet_import_time", desc("import_time")),
|
||||
Index("idx_packet_import_time_us", desc("import_time_us")),
|
||||
# Composite index for /top endpoint performance - filters by from_node_id AND import_time
|
||||
Index("idx_packet_from_node_time", "from_node_id", desc("import_time")),
|
||||
Index("idx_packet_from_node_time_us", "from_node_id", desc("import_time_us")),
|
||||
)
|
||||
|
||||
@@ -86,7 +75,6 @@ class PacketSeen(Base):
|
||||
rx_snr: Mapped[float] = mapped_column(nullable=True)
|
||||
rx_rssi: Mapped[int] = mapped_column(nullable=True)
|
||||
topic: Mapped[str] = mapped_column(nullable=True)
|
||||
import_time: Mapped[datetime] = mapped_column(nullable=True)
|
||||
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
|
||||
__table_args__ = (
|
||||
@@ -108,11 +96,7 @@ class Traceroute(Base):
|
||||
gateway_node_id: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
done: Mapped[bool] = mapped_column(nullable=True)
|
||||
route: Mapped[bytes] = mapped_column(nullable=True)
|
||||
import_time: Mapped[datetime] = mapped_column(nullable=True)
|
||||
route_return: Mapped[bytes] = mapped_column(nullable=True)
|
||||
import_time_us: Mapped[int] = mapped_column(BigInteger, nullable=True)
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_traceroute_import_time", "import_time"),
|
||||
Index("idx_traceroute_import_time_us", "import_time_us"),
|
||||
)
|
||||
__table_args__ = (Index("idx_traceroute_import_time_us", "import_time_us"),)
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from sqlalchemy.engine.url import make_url
|
||||
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
|
||||
|
||||
from meshview import models
|
||||
@@ -5,9 +6,11 @@ from meshview import models
|
||||
|
||||
def init_database(database_connection_string):
|
||||
global engine, async_session
|
||||
engine = create_async_engine(
|
||||
database_connection_string, echo=False, connect_args={"timeout": 900}
|
||||
)
|
||||
url = make_url(database_connection_string)
|
||||
kwargs = {"echo": False}
|
||||
if url.drivername.startswith("sqlite"):
|
||||
kwargs["connect_args"] = {"timeout": 900}
|
||||
engine = create_async_engine(url, **kwargs)
|
||||
async_session = async_sessionmaker(engine, expire_on_commit=False)
|
||||
|
||||
|
||||
|
||||
@@ -1,8 +1,12 @@
|
||||
import datetime
|
||||
import logging
|
||||
import re
|
||||
import time
|
||||
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from meshtastic.protobuf.config_pb2 import Config
|
||||
from meshtastic.protobuf.mesh_pb2 import HardwareModel
|
||||
@@ -10,6 +14,8 @@ from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshview import decode_payload, mqtt_database
|
||||
from meshview.models import Node, Packet, PacketSeen, Traceroute
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def process_envelope(topic, env):
|
||||
# MAP_REPORT_APP
|
||||
@@ -37,8 +43,7 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.node_id == node_id))
|
||||
).scalar_one_or_none()
|
||||
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
|
||||
if node:
|
||||
node.node_id = node_id
|
||||
@@ -50,7 +55,6 @@ async def process_envelope(topic, env):
|
||||
node.last_lat = map_report.latitude_i
|
||||
node.last_long = map_report.longitude_i
|
||||
node.firmware = map_report.firmware_version
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -66,7 +70,6 @@ async def process_envelope(topic, env):
|
||||
firmware=map_report.firmware_version,
|
||||
last_lat=map_report.latitude_i,
|
||||
last_long=map_report.longitude_i,
|
||||
last_update=now,
|
||||
first_seen_us=now_us,
|
||||
last_seen_us=now_us,
|
||||
)
|
||||
@@ -84,24 +87,41 @@ async def process_envelope(topic, env):
|
||||
result = await session.execute(select(Packet).where(Packet.id == env.packet.id))
|
||||
packet = result.scalar_one_or_none()
|
||||
if not packet:
|
||||
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
stmt = (
|
||||
sqlite_insert(Packet)
|
||||
.values(
|
||||
id=env.packet.id,
|
||||
portnum=env.packet.decoded.portnum,
|
||||
from_node_id=getattr(env.packet, "from"),
|
||||
to_node_id=env.packet.to,
|
||||
payload=env.packet.SerializeToString(),
|
||||
import_time=now,
|
||||
import_time_us=now_us,
|
||||
channel=env.channel_id,
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
packet_values = {
|
||||
"id": env.packet.id,
|
||||
"portnum": env.packet.decoded.portnum,
|
||||
"from_node_id": getattr(env.packet, "from"),
|
||||
"to_node_id": env.packet.to,
|
||||
"payload": env.packet.SerializeToString(),
|
||||
"import_time_us": now_us,
|
||||
"channel": env.channel_id,
|
||||
}
|
||||
utc_time = datetime.datetime.fromtimestamp(now_us / 1_000_000, datetime.UTC)
|
||||
dialect = session.get_bind().dialect.name
|
||||
stmt = None
|
||||
if dialect == "sqlite":
|
||||
stmt = (
|
||||
sqlite_insert(Packet)
|
||||
.values(**packet_values)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
await session.execute(stmt)
|
||||
elif dialect == "postgresql":
|
||||
stmt = (
|
||||
pg_insert(Packet)
|
||||
.values(**packet_values)
|
||||
.on_conflict_do_nothing(index_elements=["id"])
|
||||
)
|
||||
|
||||
if stmt is not None:
|
||||
await session.execute(stmt)
|
||||
else:
|
||||
try:
|
||||
async with session.begin_nested():
|
||||
session.add(Packet(**packet_values))
|
||||
await session.flush()
|
||||
except IntegrityError:
|
||||
pass
|
||||
|
||||
# --- PacketSeen (no conflict handling here, normal insert)
|
||||
|
||||
@@ -120,8 +140,7 @@ async def process_envelope(topic, env):
|
||||
)
|
||||
)
|
||||
if not result.scalar_one_or_none():
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
seen = PacketSeen(
|
||||
packet_id=env.packet.id,
|
||||
node_id=int(env.gateway_id[1:], 16),
|
||||
@@ -132,7 +151,6 @@ async def process_envelope(topic, env):
|
||||
hop_limit=env.packet.hop_limit,
|
||||
hop_start=env.packet.hop_start,
|
||||
topic=topic,
|
||||
import_time=now,
|
||||
import_time_us=now_us,
|
||||
)
|
||||
session.add(seen)
|
||||
@@ -164,8 +182,7 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.id == user.id))
|
||||
).scalar_one_or_none()
|
||||
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
|
||||
if node:
|
||||
node.node_id = node_id
|
||||
@@ -174,7 +191,6 @@ async def process_envelope(topic, env):
|
||||
node.hw_model = hw_model
|
||||
node.role = role
|
||||
node.channel = env.channel_id
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -187,7 +203,6 @@ async def process_envelope(topic, env):
|
||||
hw_model=hw_model,
|
||||
role=role,
|
||||
channel=env.channel_id,
|
||||
last_update=now,
|
||||
first_seen_us=now_us,
|
||||
last_seen_us=now_us,
|
||||
)
|
||||
@@ -206,11 +221,9 @@ async def process_envelope(topic, env):
|
||||
await session.execute(select(Node).where(Node.node_id == from_node_id))
|
||||
).scalar_one_or_none()
|
||||
if node:
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
node.last_lat = position.latitude_i
|
||||
node.last_long = position.longitude_i
|
||||
node.last_update = now
|
||||
node.last_seen_us = now_us
|
||||
if node.first_seen_us is None:
|
||||
node.first_seen_us = now_us
|
||||
@@ -220,18 +233,15 @@ async def process_envelope(topic, env):
|
||||
if env.packet.decoded.portnum == PortNum.TRACEROUTE_APP:
|
||||
packet_id = env.packet.id
|
||||
if packet_id is not None:
|
||||
now = datetime.datetime.now(datetime.UTC)
|
||||
now_us = int(now.timestamp() * 1_000_000)
|
||||
now_us = int(time.time() * 1_000_000)
|
||||
session.add(
|
||||
Traceroute(
|
||||
packet_id=packet_id,
|
||||
route=env.packet.decoded.payload,
|
||||
done=not env.packet.decoded.want_response,
|
||||
gateway_node_id=int(env.gateway_id[1:], 16),
|
||||
import_time=now,
|
||||
import_time_us=now_us,
|
||||
)
|
||||
)
|
||||
|
||||
await session.commit()
|
||||
|
||||
|
||||
@@ -75,8 +75,8 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
|
||||
return color;
|
||||
}
|
||||
|
||||
function timeAgo(dateStr){
|
||||
const diff = Date.now() - new Date(dateStr);
|
||||
function timeAgoFromUs(us){
|
||||
const diff = Date.now() - (us / 1000);
|
||||
const s=Math.floor(diff/1000), m=Math.floor(s/60), h=Math.floor(m/60), d=Math.floor(h/24);
|
||||
if(d>0) return d+'d'; if(h>0) return h+'h'; if(m>0) return m+'m'; return s+'s';
|
||||
}
|
||||
@@ -118,7 +118,7 @@ body { margin: 0; font-family: monospace; background: #121212; color: #eee; }
|
||||
<b>Channel:</b> ${node.channel}<br>
|
||||
<b>Model:</b> ${node.hw_model}<br>
|
||||
<b>Role:</b> ${node.role}<br>`;
|
||||
if(node.last_update) popupContent+=`<b>Last seen:</b> ${timeAgo(node.last_update)}<br>`;
|
||||
if(node.last_seen_us) popupContent+=`<b>Last seen:</b> ${timeAgoFromUs(node.last_seen_us)}<br>`;
|
||||
if(node.firmware) popupContent+=`<b>Firmware:</b> ${node.firmware}<br>`;
|
||||
|
||||
marker.on('click', e=>{
|
||||
|
||||
36
meshview/static/portmaps.js
Normal file
36
meshview/static/portmaps.js
Normal file
@@ -0,0 +1,36 @@
|
||||
// Shared port label/color definitions for UI pages.
|
||||
window.PORT_LABEL_MAP = {
|
||||
0: "UNKNOWN",
|
||||
1: "Text",
|
||||
3: "Position",
|
||||
4: "Node Info",
|
||||
5: "Routing",
|
||||
6: "Admin",
|
||||
8: "Waypoint",
|
||||
35: "Store Forward++",
|
||||
65: "Store & Forward",
|
||||
67: "Telemetry",
|
||||
70: "Traceroute",
|
||||
71: "Neighbor",
|
||||
73: "Map Report",
|
||||
};
|
||||
|
||||
window.PORT_COLOR_MAP = {
|
||||
0: "#6c757d",
|
||||
1: "#007bff",
|
||||
3: "#28a745",
|
||||
4: "#ffc107",
|
||||
5: "#dc3545",
|
||||
6: "#20c997",
|
||||
8: "#fd7e14",
|
||||
35: "#8bc34a",
|
||||
65: "#6610f2",
|
||||
67: "#17a2b8",
|
||||
70: "#ff4444",
|
||||
71: "#ff66cc",
|
||||
73: "#9999ff",
|
||||
};
|
||||
|
||||
// Aliases for pages that expect different names.
|
||||
window.PORT_MAP = window.PORT_LABEL_MAP;
|
||||
window.PORT_COLORS = window.PORT_COLOR_MAP;
|
||||
@@ -1,10 +1,14 @@
|
||||
from datetime import datetime, timedelta
|
||||
from sqlalchemy import select, and_, or_, func, cast, Text
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
from sqlalchemy import Text, and_, cast, func, or_, select
|
||||
from sqlalchemy.orm import lazyload
|
||||
|
||||
from meshview import database, models
|
||||
from meshview.models import Node, Packet, PacketSeen, Traceroute
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def get_node(node_id):
|
||||
async with database.async_session() as session:
|
||||
@@ -91,8 +95,10 @@ async def get_packets_from(node_id=None, portnum=None, since=None, limit=500):
|
||||
if portnum:
|
||||
q = q.where(Packet.portnum == portnum)
|
||||
if since:
|
||||
q = q.where(Packet.import_time > (datetime.now() - since))
|
||||
result = await session.execute(q.limit(limit).order_by(Packet.import_time.desc()))
|
||||
now_us = int(datetime.now().timestamp() * 1_000_000)
|
||||
start_us = now_us - int(since.total_seconds() * 1_000_000)
|
||||
q = q.where(Packet.import_time_us > start_us)
|
||||
result = await session.execute(q.limit(limit).order_by(Packet.import_time_us.desc()))
|
||||
return result.scalars()
|
||||
|
||||
|
||||
@@ -108,7 +114,7 @@ async def get_packets_seen(packet_id):
|
||||
result = await session.execute(
|
||||
select(PacketSeen)
|
||||
.where(PacketSeen.packet_id == packet_id)
|
||||
.order_by(PacketSeen.import_time.desc())
|
||||
.order_by(PacketSeen.import_time_us.desc())
|
||||
)
|
||||
return result.scalars()
|
||||
|
||||
@@ -129,18 +135,21 @@ async def get_traceroute(packet_id):
|
||||
result = await session.execute(
|
||||
select(Traceroute)
|
||||
.where(Traceroute.packet_id == packet_id)
|
||||
.order_by(Traceroute.import_time)
|
||||
.order_by(Traceroute.import_time_us)
|
||||
)
|
||||
return result.scalars()
|
||||
|
||||
|
||||
async def get_traceroutes(since):
|
||||
if isinstance(since, datetime):
|
||||
since_us = int(since.timestamp() * 1_000_000)
|
||||
else:
|
||||
since_us = int(since)
|
||||
async with database.async_session() as session:
|
||||
stmt = (
|
||||
select(Traceroute)
|
||||
.join(Packet)
|
||||
.where(Traceroute.import_time > since)
|
||||
.order_by(Traceroute.import_time)
|
||||
.where(Traceroute.import_time_us > since_us)
|
||||
.order_by(Traceroute.import_time_us)
|
||||
)
|
||||
stream = await session.stream_scalars(stmt)
|
||||
async for tr in stream:
|
||||
@@ -148,6 +157,8 @@ async def get_traceroutes(since):
|
||||
|
||||
|
||||
async def get_mqtt_neighbors(since):
|
||||
now_us = int(datetime.now().timestamp() * 1_000_000)
|
||||
start_us = now_us - int(since.total_seconds() * 1_000_000)
|
||||
async with database.async_session() as session:
|
||||
result = await session.execute(
|
||||
select(PacketSeen, Packet)
|
||||
@@ -155,7 +166,7 @@ async def get_mqtt_neighbors(since):
|
||||
.where(
|
||||
(PacketSeen.hop_limit == PacketSeen.hop_start)
|
||||
& (PacketSeen.hop_start != 0)
|
||||
& (PacketSeen.import_time > (datetime.now() - since))
|
||||
& (PacketSeen.import_time_us > start_us)
|
||||
)
|
||||
.options(
|
||||
lazyload(Packet.from_node),
|
||||
@@ -168,9 +179,9 @@ async def get_mqtt_neighbors(since):
|
||||
async def get_total_node_count(channel: str = None) -> int:
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
q = select(func.count(Node.id)).where(
|
||||
Node.last_update > datetime.now() - timedelta(days=1)
|
||||
)
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
q = select(func.count(Node.id)).where(Node.last_seen_us > cutoff_us)
|
||||
|
||||
if channel:
|
||||
q = q.where(Node.channel == channel)
|
||||
@@ -185,26 +196,32 @@ async def get_total_node_count(channel: str = None) -> int:
|
||||
async def get_top_traffic_nodes():
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
result = await session.execute(
|
||||
text("""
|
||||
SELECT
|
||||
n.node_id,
|
||||
n.long_name,
|
||||
n.short_name,
|
||||
n.channel,
|
||||
COUNT(DISTINCT p.id) AS total_packets_sent,
|
||||
COUNT(ps.packet_id) AS total_times_seen
|
||||
FROM node n
|
||||
LEFT JOIN packet p ON n.node_id = p.from_node_id
|
||||
AND p.import_time >= DATETIME('now', 'localtime', '-24 hours')
|
||||
LEFT JOIN packet_seen ps ON p.id = ps.packet_id
|
||||
GROUP BY n.node_id, n.long_name, n.short_name
|
||||
HAVING total_packets_sent > 0
|
||||
ORDER BY total_times_seen DESC;
|
||||
""")
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
total_packets_sent = func.count(func.distinct(Packet.id)).label("total_packets_sent")
|
||||
total_times_seen = func.count(PacketSeen.packet_id).label("total_times_seen")
|
||||
|
||||
stmt = (
|
||||
select(
|
||||
Node.node_id,
|
||||
Node.long_name,
|
||||
Node.short_name,
|
||||
Node.channel,
|
||||
total_packets_sent,
|
||||
total_times_seen,
|
||||
)
|
||||
.select_from(Node)
|
||||
.outerjoin(
|
||||
Packet,
|
||||
(Packet.from_node_id == Node.node_id) & (Packet.import_time_us >= cutoff_us),
|
||||
)
|
||||
.outerjoin(PacketSeen, PacketSeen.packet_id == Packet.id)
|
||||
.group_by(Node.node_id, Node.long_name, Node.short_name, Node.channel)
|
||||
.having(total_packets_sent > 0)
|
||||
.order_by(total_times_seen.desc())
|
||||
)
|
||||
|
||||
rows = result.fetchall()
|
||||
rows = (await session.execute(stmt)).all()
|
||||
|
||||
nodes = [
|
||||
{
|
||||
@@ -227,33 +244,30 @@ async def get_top_traffic_nodes():
|
||||
async def get_node_traffic(node_id: int):
|
||||
try:
|
||||
async with database.async_session() as session:
|
||||
result = await session.execute(
|
||||
text("""
|
||||
SELECT
|
||||
node.long_name, packet.portnum,
|
||||
COUNT(*) AS packet_count
|
||||
FROM packet
|
||||
JOIN node ON packet.from_node_id = node.node_id
|
||||
WHERE node.node_id = :node_id
|
||||
AND packet.import_time >= DATETIME('now', 'localtime', '-24 hours')
|
||||
GROUP BY packet.portnum
|
||||
ORDER BY packet_count DESC;
|
||||
"""),
|
||||
{"node_id": node_id},
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - 86400 * 1_000_000
|
||||
packet_count = func.count().label("packet_count")
|
||||
|
||||
stmt = (
|
||||
select(Node.long_name, Packet.portnum, packet_count)
|
||||
.select_from(Packet)
|
||||
.join(Node, Packet.from_node_id == Node.node_id)
|
||||
.where(Node.node_id == node_id)
|
||||
.where(Packet.import_time_us >= cutoff_us)
|
||||
.group_by(Node.long_name, Packet.portnum)
|
||||
.order_by(packet_count.desc())
|
||||
)
|
||||
|
||||
# Map the result to include node.long_name and packet data
|
||||
traffic_data = [
|
||||
result = await session.execute(stmt)
|
||||
return [
|
||||
{
|
||||
"long_name": row[0], # node.long_name
|
||||
"portnum": row[1], # packet.portnum
|
||||
"packet_count": row[2], # COUNT(*) as packet_count
|
||||
"long_name": row.long_name,
|
||||
"portnum": row.portnum,
|
||||
"packet_count": row.packet_count,
|
||||
}
|
||||
for row in result.all()
|
||||
]
|
||||
|
||||
return traffic_data
|
||||
|
||||
except Exception as e:
|
||||
# Log the error or handle it as needed
|
||||
print(f"Error fetching node traffic: {str(e)}")
|
||||
@@ -282,7 +296,11 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
|
||||
# Apply filters based on provided parameters
|
||||
if node_id is not None:
|
||||
query = query.where(Node.node_id == node_id)
|
||||
try:
|
||||
node_id_int = int(node_id)
|
||||
except (TypeError, ValueError):
|
||||
node_id_int = node_id
|
||||
query = query.where(Node.node_id == node_id_int)
|
||||
if role is not None:
|
||||
query = query.where(Node.role == role.upper()) # Ensure role is uppercase
|
||||
if channel is not None:
|
||||
@@ -291,10 +309,12 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
query = query.where(Node.hw_model == hw_model)
|
||||
|
||||
if days_active is not None:
|
||||
query = query.where(Node.last_update > datetime.now() - timedelta(days_active))
|
||||
now_us = int(datetime.now(timezone.utc).timestamp() * 1_000_000)
|
||||
cutoff_us = now_us - int(timedelta(days_active).total_seconds() * 1_000_000)
|
||||
query = query.where(Node.last_seen_us > cutoff_us)
|
||||
|
||||
# Exclude nodes where last_update is an empty string
|
||||
query = query.where(Node.last_update != "")
|
||||
# Exclude nodes with missing last_seen_us
|
||||
query = query.where(Node.last_seen_us.is_not(None))
|
||||
|
||||
# Order results by long_name in ascending order
|
||||
query = query.order_by(Node.short_name.asc())
|
||||
@@ -305,7 +325,7 @@ async def get_nodes(node_id=None, role=None, channel=None, hw_model=None, days_a
|
||||
return nodes # Return the list of nodes
|
||||
|
||||
except Exception:
|
||||
print("error reading DB") # Consider using logging instead of print
|
||||
logger.exception("error reading DB")
|
||||
return [] # Return an empty list in case of failure
|
||||
|
||||
|
||||
@@ -317,22 +337,36 @@ async def get_packet_stats(
|
||||
to_node: int | None = None,
|
||||
from_node: int | None = None,
|
||||
):
|
||||
now = datetime.now()
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
if period_type == "hour":
|
||||
start_time = now - timedelta(hours=length)
|
||||
time_format = '%Y-%m-%d %H:00'
|
||||
time_format_sqlite = "%Y-%m-%d %H:00"
|
||||
time_format_pg = "YYYY-MM-DD HH24:00"
|
||||
elif period_type == "day":
|
||||
start_time = now - timedelta(days=length)
|
||||
time_format = '%Y-%m-%d'
|
||||
time_format_sqlite = "%Y-%m-%d"
|
||||
time_format_pg = "YYYY-MM-DD"
|
||||
else:
|
||||
raise ValueError("period_type must be 'hour' or 'day'")
|
||||
|
||||
async with database.async_session() as session:
|
||||
dialect = session.get_bind().dialect.name
|
||||
if dialect == "postgresql":
|
||||
period_expr = func.to_char(
|
||||
func.to_timestamp(Packet.import_time_us / 1_000_000.0),
|
||||
time_format_pg,
|
||||
)
|
||||
else:
|
||||
period_expr = func.strftime(
|
||||
time_format_sqlite,
|
||||
func.datetime(Packet.import_time_us / 1_000_000, "unixepoch"),
|
||||
)
|
||||
|
||||
q = select(
|
||||
func.strftime(time_format, Packet.import_time).label('period'),
|
||||
func.count().label('count'),
|
||||
).where(Packet.import_time >= start_time)
|
||||
period_expr.label("period"),
|
||||
func.count().label("count"),
|
||||
).where(Packet.import_time_us >= int(start_time.timestamp() * 1_000_000))
|
||||
|
||||
# Filters
|
||||
if channel:
|
||||
|
||||
@@ -115,6 +115,7 @@
|
||||
|
||||
</div>
|
||||
|
||||
<script src="/static/portmaps.js"></script>
|
||||
<script>
|
||||
/* ======================================================
|
||||
FIREHOSE TRANSLATION SYSTEM (isolated from base)
|
||||
@@ -177,41 +178,8 @@ function nodeName(id) {
|
||||
/* ======================================================
|
||||
PORT COLORS & NAMES
|
||||
====================================================== */
|
||||
const PORT_MAP = {
|
||||
0: "UNKNOWN APP",
|
||||
1: "Text Message",
|
||||
3: "Position",
|
||||
4: "Node Info",
|
||||
5: "Routing",
|
||||
6: "Administration",
|
||||
8: "Waypoint",
|
||||
65: "Store Forward",
|
||||
67: "Telemetry",
|
||||
70: "Trace Route",
|
||||
71: "Neighbor Info"
|
||||
};
|
||||
|
||||
const PORT_COLORS = {
|
||||
0: "#6c757d",
|
||||
1: "#007bff",
|
||||
3: "#28a745",
|
||||
4: "#ffc107",
|
||||
5: "#dc3545",
|
||||
6: "#20c997",
|
||||
65: "#6610f2",
|
||||
67: "#17a2b8",
|
||||
68: "#fd7e14",
|
||||
69: "#6f42c1",
|
||||
70: "#ff4444",
|
||||
71: "#ff66cc",
|
||||
72: "#00cc99",
|
||||
73: "#9999ff",
|
||||
74: "#cc00cc",
|
||||
75: "#ffbb33",
|
||||
76: "#00bcd4",
|
||||
77: "#8bc34a",
|
||||
78: "#795548"
|
||||
};
|
||||
const PORT_MAP = window.PORT_MAP || {};
|
||||
const PORT_COLORS = window.PORT_COLORS || {};
|
||||
|
||||
function portLabel(portnum, payload, linksHtml) {
|
||||
const name = PORT_MAP[portnum] || "Unknown";
|
||||
@@ -233,13 +201,37 @@ function portLabel(portnum, payload, linksHtml) {
|
||||
/* ======================================================
|
||||
TIME FORMAT
|
||||
====================================================== */
|
||||
function formatLocalTime(importTimeUs) {
|
||||
const ms = importTimeUs / 1000;
|
||||
return new Date(ms).toLocaleTimeString([], {
|
||||
function formatTimes(importTimeUs) {
|
||||
const ms = Number(importTimeUs) / 1000;
|
||||
if (!Number.isFinite(ms)) {
|
||||
return { local: "—", utc: "—", epoch: "—" };
|
||||
}
|
||||
const date = new Date(ms);
|
||||
const local = date.toLocaleTimeString([], {
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit"
|
||||
second: "2-digit",
|
||||
timeZoneName: "short"
|
||||
});
|
||||
const utc = date.toLocaleTimeString([], {
|
||||
hour: "2-digit",
|
||||
minute: "2-digit",
|
||||
second: "2-digit",
|
||||
timeZone: "UTC",
|
||||
timeZoneName: "short"
|
||||
});
|
||||
return { local, utc, epoch: String(importTimeUs) };
|
||||
}
|
||||
|
||||
function logPacketTimes(packet) {
|
||||
const times = formatTimes(packet.import_time_us);
|
||||
console.log(
|
||||
"[firehose] packet time",
|
||||
"id=" + packet.id,
|
||||
"epoch_us=" + times.epoch,
|
||||
"local=" + times.local,
|
||||
"utc=" + times.utc
|
||||
);
|
||||
}
|
||||
|
||||
/* ======================================================
|
||||
@@ -261,7 +253,7 @@ async function fetchUpdates() {
|
||||
if (updatesPaused) return;
|
||||
|
||||
const url = new URL("/api/packets", window.location.origin);
|
||||
url.searchParams.set("limit", 50);
|
||||
url.searchParams.set("limit", 100);
|
||||
|
||||
if (lastImportTimeUs)
|
||||
url.searchParams.set("since", lastImportTimeUs);
|
||||
@@ -277,6 +269,7 @@ async function fetchUpdates() {
|
||||
const list = document.getElementById("packet_list");
|
||||
|
||||
for (const pkt of packets.reverse()) {
|
||||
logPacketTimes(pkt);
|
||||
|
||||
/* FROM — includes translation */
|
||||
const from =
|
||||
@@ -336,7 +329,9 @@ async function fetchUpdates() {
|
||||
const html = `
|
||||
<tr class="packet-row">
|
||||
|
||||
<td>${formatLocalTime(pkt.import_time_us)}</td>
|
||||
<td>
|
||||
${formatTimes(pkt.import_time_us).local}<br>
|
||||
</td>
|
||||
|
||||
<td>
|
||||
<span class="toggle-btn">▶</span>
|
||||
|
||||
@@ -140,8 +140,8 @@ map.on("popupopen", function (e) {
|
||||
if (popupEl) applyTranslationsMap(popupEl);
|
||||
});
|
||||
|
||||
function timeAgo(date){
|
||||
const diff = Date.now() - new Date(date);
|
||||
function timeAgoFromUs(us){
|
||||
const diff = Date.now() - (us / 1000);
|
||||
const s = Math.floor(diff/1000), m = Math.floor(s/60),
|
||||
h = Math.floor(m/60), d = Math.floor(h/24);
|
||||
return d>0?d+"d":h>0?h+"h":m>0?m+"m":s+"s";
|
||||
@@ -289,7 +289,7 @@ fetch('/api/nodes?days_active=3')
|
||||
hw_model: n.hw_model || "",
|
||||
role: n.role || "",
|
||||
firmware: n.firmware || "",
|
||||
last_update: n.last_update || "",
|
||||
last_seen_us: n.last_seen_us || null,
|
||||
isRouter: (n.role||"").toLowerCase().includes("router")
|
||||
}));
|
||||
|
||||
@@ -333,8 +333,8 @@ function renderNodesOnMap(){
|
||||
<b data-translate-lang="role_label"></b> ${node.role}<br>
|
||||
|
||||
${
|
||||
node.last_update
|
||||
? `<b data-translate-lang="last_seen"></b> ${timeAgo(node.last_update)}<br>`
|
||||
node.last_seen_us
|
||||
? `<b data-translate-lang="last_seen"></b> ${timeAgoFromUs(node.last_seen_us)}<br>`
|
||||
: ""
|
||||
}
|
||||
|
||||
|
||||
@@ -141,7 +141,7 @@ document.addEventListener("DOMContentLoaded", async () => {
|
||||
</span>
|
||||
|
||||
<span class="col-3 nodename">
|
||||
<a href="/packet_list/${packet.from_node_id}">
|
||||
<a href="/node/${packet.from_node_id}">
|
||||
${escapeHtml(fromName)}
|
||||
</a>
|
||||
</span>
|
||||
|
||||
@@ -285,34 +285,12 @@
|
||||
</div>
|
||||
|
||||
<script src="https://cdn.jsdelivr.net/npm/echarts@5.5.0/dist/echarts.min.js"></script>
|
||||
<script src="/static/portmaps.js"></script>
|
||||
|
||||
<script>
|
||||
|
||||
const PORT_COLOR_MAP = {
|
||||
0: "#6c757d",
|
||||
1: "#007bff",
|
||||
3: "#28a745",
|
||||
4: "#ffc107",
|
||||
5: "#dc3545",
|
||||
6: "#20c997",
|
||||
65: "#6610f2",
|
||||
67: "#17a2b8",
|
||||
70: "#ff9800",
|
||||
71: "#ff66cc",
|
||||
};
|
||||
|
||||
const PORT_LABEL_MAP = {
|
||||
0: "UNKNOWN",
|
||||
1: "Text",
|
||||
3: "Position",
|
||||
4: "Node Info",
|
||||
5: "Routing",
|
||||
6: "Admin",
|
||||
65: "Store & Forward",
|
||||
67: "Telemetry",
|
||||
70: "Traceroute",
|
||||
71: "Neighbor"
|
||||
};
|
||||
const PORT_COLOR_MAP = window.PORT_COLOR_MAP || {};
|
||||
const PORT_LABEL_MAP = window.PORT_LABEL_MAP || {};
|
||||
|
||||
/* ======================================================
|
||||
NODE PAGE TRANSLATION (isolated from base)
|
||||
|
||||
@@ -1,7 +1,10 @@
|
||||
"""Main web server routes and page rendering for Meshview."""
|
||||
|
||||
import asyncio
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import ssl
|
||||
from dataclasses import dataclass
|
||||
@@ -12,7 +15,7 @@ from google.protobuf import text_format
|
||||
from google.protobuf.message import Message
|
||||
from jinja2 import Environment, PackageLoader, Undefined, select_autoescape
|
||||
from markupsafe import Markup
|
||||
import pathlib
|
||||
|
||||
from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshview import config, database, decode_payload, migrations, models, store
|
||||
from meshview.__version__ import (
|
||||
@@ -45,6 +48,8 @@ with open(os.path.join(os.path.dirname(__file__), '1x1.png'), 'rb') as png:
|
||||
|
||||
@dataclass
|
||||
class Packet:
|
||||
"""UI-friendly packet wrapper for templates and API payloads."""
|
||||
|
||||
id: int
|
||||
from_node_id: int
|
||||
from_node: models.Node
|
||||
@@ -56,11 +61,11 @@ class Packet:
|
||||
raw_payload: object
|
||||
payload: str
|
||||
pretty_payload: Markup
|
||||
import_time: datetime.datetime
|
||||
import_time_us: int
|
||||
|
||||
@classmethod
|
||||
def from_model(cls, packet):
|
||||
"""Convert a Packet ORM model into a presentation-friendly Packet."""
|
||||
mesh_packet, payload = decode_payload.decode(packet)
|
||||
pretty_payload = None
|
||||
|
||||
@@ -101,7 +106,6 @@ class Packet:
|
||||
data=text_mesh_packet,
|
||||
payload=text_payload, # now always a string
|
||||
pretty_payload=pretty_payload,
|
||||
import_time=packet.import_time,
|
||||
import_time_us=packet.import_time_us, # <-- include microseconds
|
||||
raw_mesh_packet=mesh_packet,
|
||||
raw_payload=payload,
|
||||
@@ -109,6 +113,7 @@ class Packet:
|
||||
|
||||
|
||||
async def build_trace(node_id):
|
||||
"""Build a recent GPS trace list for a node using position packets."""
|
||||
trace = []
|
||||
for raw_p in await store.get_packets_from(
|
||||
node_id, PortNum.POSITION_APP, since=datetime.timedelta(hours=24)
|
||||
@@ -130,6 +135,7 @@ async def build_trace(node_id):
|
||||
|
||||
|
||||
async def build_neighbors(node_id):
|
||||
"""Return neighbor node metadata for the given node ID."""
|
||||
packets = await store.get_packets_from(node_id, PortNum.NEIGHBORINFO_APP, limit=1)
|
||||
packet = packets.first()
|
||||
|
||||
@@ -159,6 +165,7 @@ async def build_neighbors(node_id):
|
||||
|
||||
|
||||
def node_id_to_hex(node_id):
|
||||
"""Format a node_id in Meshtastic hex notation."""
|
||||
if node_id is None or isinstance(node_id, Undefined):
|
||||
return "Invalid node_id" # i... have no clue
|
||||
if node_id == 4294967295:
|
||||
@@ -168,6 +175,7 @@ def node_id_to_hex(node_id):
|
||||
|
||||
|
||||
def format_timestamp(timestamp):
|
||||
"""Normalize timestamps to ISO 8601 strings."""
|
||||
if isinstance(timestamp, int):
|
||||
timestamp = datetime.datetime.fromtimestamp(timestamp, datetime.UTC)
|
||||
return timestamp.isoformat(timespec="milliseconds")
|
||||
@@ -185,6 +193,7 @@ routes = web.RouteTableDef()
|
||||
|
||||
@routes.get("/")
|
||||
async def index(request):
|
||||
"""Redirect root URL to configured starting page."""
|
||||
"""
|
||||
Redirect root URL '/' to the page specified in CONFIG['site']['starting'].
|
||||
Defaults to '/map' if not set.
|
||||
@@ -194,15 +203,10 @@ async def index(request):
|
||||
raise web.HTTPFound(location=starting_url)
|
||||
|
||||
|
||||
# redirect for backwards compatibility
|
||||
@routes.get("/packet_list/{packet_id}")
|
||||
async def redirect_packet_list(request):
|
||||
packet_id = request.match_info["packet_id"]
|
||||
raise web.HTTPFound(location=f"/node/{packet_id}")
|
||||
|
||||
# Generic static HTML route
|
||||
@routes.get("/{page}")
|
||||
async def serve_page(request):
|
||||
"""Serve static HTML pages from meshview/static."""
|
||||
page = request.match_info["page"]
|
||||
|
||||
# default to index.html if no extension
|
||||
@@ -217,7 +221,6 @@ async def serve_page(request):
|
||||
return web.Response(text=content, content_type="text/html")
|
||||
|
||||
|
||||
|
||||
@routes.get("/net")
|
||||
async def net(request):
|
||||
return web.Response(
|
||||
@@ -352,8 +355,8 @@ async def graph_traceroute(request):
|
||||
# It seems some nodes add them self to the list before uplinking
|
||||
path.append(tr.gateway_node_id)
|
||||
|
||||
if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time:
|
||||
node_seen_time[path[-1]] = tr.import_time
|
||||
if not tr.done and tr.gateway_node_id not in node_seen_time and tr.import_time_us:
|
||||
node_seen_time[path[-1]] = tr.import_time_us
|
||||
|
||||
mqtt_nodes.add(tr.gateway_node_id)
|
||||
node_color[path[-1]] = '#' + hex(hash(tuple(path)))[3:9]
|
||||
@@ -363,7 +366,7 @@ async def graph_traceroute(request):
|
||||
for path in paths:
|
||||
used_nodes.update(path)
|
||||
|
||||
import_times = [tr.import_time for tr in traceroutes if tr.import_time]
|
||||
import_times = [tr.import_time_us for tr in traceroutes if tr.import_time_us]
|
||||
if import_times:
|
||||
first_time = min(import_times)
|
||||
else:
|
||||
@@ -378,7 +381,7 @@ async def graph_traceroute(request):
|
||||
f'[{node.short_name}] {node.long_name}\n{node_id_to_hex(node_id)}\n{node.role}'
|
||||
)
|
||||
if node_id in node_seen_time:
|
||||
ms = (node_seen_time[node_id] - first_time).total_seconds() * 1000
|
||||
ms = (node_seen_time[node_id] - first_time) / 1000
|
||||
node_name += f'\n {ms:.2f}ms'
|
||||
style = 'dashed'
|
||||
if node_id == dest:
|
||||
@@ -396,7 +399,7 @@ async def graph_traceroute(request):
|
||||
shape='box',
|
||||
color=node_color.get(node_id, 'black'),
|
||||
style=style,
|
||||
href=f"/packet_list/{node_id}",
|
||||
href=f"/node/{node_id}",
|
||||
)
|
||||
)
|
||||
|
||||
@@ -412,6 +415,7 @@ async def graph_traceroute(request):
|
||||
|
||||
|
||||
async def run_server():
|
||||
"""Start the aiohttp web server after migrations are complete."""
|
||||
# Wait for database migrations to complete before starting web server
|
||||
logger.info("Checking database schema status...")
|
||||
database_url = CONFIG["database"]["connection_string"]
|
||||
@@ -428,6 +432,7 @@ async def run_server():
|
||||
logger.info("Database schema verified - starting web server")
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_static("/static/", pathlib.Path(__file__).parent / "static")
|
||||
app.add_routes(api.routes) # Add API routes
|
||||
app.add_routes(routes) # Add main web routes
|
||||
|
||||
|
||||
@@ -6,12 +6,15 @@ import logging
|
||||
import os
|
||||
|
||||
from aiohttp import web
|
||||
from sqlalchemy import text
|
||||
from sqlalchemy import func, select, text
|
||||
|
||||
from meshtastic.protobuf.portnums_pb2 import PortNum
|
||||
from meshview import database, decode_payload, store
|
||||
from meshview.__version__ import __version__, _git_revision_short, get_version_info
|
||||
from meshview.config import CONFIG
|
||||
from meshview.models import Node
|
||||
from meshview.models import Packet as PacketModel
|
||||
from meshview.models import PacketSeen as PacketSeenModel
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -126,7 +129,6 @@ async def api_packets(request):
|
||||
"portnum": int(p.portnum) if p.portnum is not None else None,
|
||||
"payload": (p.payload or "").strip(),
|
||||
"import_time_us": p.import_time_us,
|
||||
"import_time": p.import_time.isoformat() if p.import_time else None,
|
||||
"channel": getattr(p.from_node, "channel", ""),
|
||||
"long_name": getattr(p.from_node, "long_name", ""),
|
||||
}
|
||||
@@ -208,7 +210,6 @@ async def api_packets(request):
|
||||
packet_dict = {
|
||||
"id": p.id,
|
||||
"import_time_us": p.import_time_us,
|
||||
"import_time": p.import_time.isoformat() if p.import_time else None,
|
||||
"channel": getattr(p.from_node, "channel", ""),
|
||||
"from_node_id": p.from_node_id,
|
||||
"to_node_id": p.to_node_id,
|
||||
@@ -228,20 +229,12 @@ async def api_packets(request):
|
||||
|
||||
packets_data.append(packet_dict)
|
||||
|
||||
# --- Latest import_time for incremental fetch ---
|
||||
# --- Latest import_time_us for incremental fetch ---
|
||||
latest_import_time = None
|
||||
if packets_data:
|
||||
for p in packets_data:
|
||||
if p.get("import_time_us") and p["import_time_us"] > 0:
|
||||
latest_import_time = max(latest_import_time or 0, p["import_time_us"])
|
||||
elif p.get("import_time") and latest_import_time is None:
|
||||
try:
|
||||
dt = datetime.datetime.fromisoformat(
|
||||
p["import_time"].replace("Z", "+00:00")
|
||||
)
|
||||
latest_import_time = int(dt.timestamp() * 1_000_000)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
response = {"packets": packets_data}
|
||||
if latest_import_time is not None:
|
||||
@@ -431,14 +424,10 @@ async def api_edges(request):
|
||||
try:
|
||||
node_filter = int(node_filter_str)
|
||||
except ValueError:
|
||||
return web.json_response(
|
||||
{"error": "node_id must be integer"},
|
||||
status=400
|
||||
)
|
||||
return web.json_response({"error": "node_id must be integer"}, status=400)
|
||||
|
||||
edges = {}
|
||||
traceroute_count = 0
|
||||
neighbor_packet_count = 0
|
||||
edges_added_tr = 0
|
||||
edges_added_neighbor = 0
|
||||
|
||||
@@ -463,8 +452,6 @@ async def api_edges(request):
|
||||
# --- Neighbor edges ---
|
||||
if filter_type in (None, "neighbor"):
|
||||
packets = await store.get_packets(portnum=71)
|
||||
neighbor_packet_count = len(packets)
|
||||
|
||||
for packet in packets:
|
||||
try:
|
||||
_, neighbor_info = decode_payload.decode(packet)
|
||||
@@ -479,21 +466,16 @@ async def api_edges(request):
|
||||
|
||||
# Convert to list
|
||||
edges_list = [
|
||||
{"from": frm, "to": to, "type": edge_type}
|
||||
for (frm, to), edge_type in edges.items()
|
||||
{"from": frm, "to": to, "type": edge_type} for (frm, to), edge_type in edges.items()
|
||||
]
|
||||
|
||||
# NEW → apply node_id filtering
|
||||
if node_filter is not None:
|
||||
edges_list = [
|
||||
e for e in edges_list
|
||||
if e["from"] == node_filter or e["to"] == node_filter
|
||||
]
|
||||
edges_list = [e for e in edges_list if e["from"] == node_filter or e["to"] == node_filter]
|
||||
|
||||
return web.json_response({"edges": edges_list})
|
||||
|
||||
|
||||
|
||||
@routes.get("/api/config")
|
||||
async def api_config(request):
|
||||
try:
|
||||
@@ -711,7 +693,6 @@ async def api_packets_seen(request):
|
||||
"rx_snr": row.rx_snr,
|
||||
"rx_rssi": row.rx_rssi,
|
||||
"topic": row.topic,
|
||||
"import_time": (row.import_time.isoformat() if row.import_time else None),
|
||||
"import_time_us": row.import_time_us,
|
||||
}
|
||||
)
|
||||
@@ -725,6 +706,7 @@ async def api_packets_seen(request):
|
||||
status=500,
|
||||
)
|
||||
|
||||
|
||||
@routes.get("/api/traceroute/{packet_id}")
|
||||
async def api_traceroute(request):
|
||||
packet_id = int(request.match_info['packet_id'])
|
||||
@@ -746,14 +728,15 @@ async def api_traceroute(request):
|
||||
forward_list = list(route.route)
|
||||
reverse_list = list(route.route_back)
|
||||
|
||||
tr_groups.append({
|
||||
"index": idx,
|
||||
"import_time": tr.import_time.isoformat() if tr.import_time else None,
|
||||
"gateway_node_id": tr.gateway_node_id,
|
||||
"done": tr.done,
|
||||
"forward_hops": forward_list,
|
||||
"reverse_hops": reverse_list,
|
||||
})
|
||||
tr_groups.append(
|
||||
{
|
||||
"index": idx,
|
||||
"gateway_node_id": tr.gateway_node_id,
|
||||
"done": tr.done,
|
||||
"forward_hops": forward_list,
|
||||
"reverse_hops": reverse_list,
|
||||
}
|
||||
)
|
||||
|
||||
# --------------------------------------------
|
||||
# Compute UNIQUE paths + counts + winning path
|
||||
@@ -796,18 +779,20 @@ async def api_traceroute(request):
|
||||
# --------------------------------------------
|
||||
# Final API output
|
||||
# --------------------------------------------
|
||||
return web.json_response({
|
||||
"packet": {
|
||||
"id": packet.id,
|
||||
"from": packet.from_node_id,
|
||||
"to": packet.to_node_id,
|
||||
"channel": packet.channel,
|
||||
},
|
||||
"traceroute_packets": tr_groups,
|
||||
"unique_forward_paths": unique_forward_paths_json,
|
||||
"unique_reverse_paths": unique_reverse_paths_json,
|
||||
"winning_paths": winning_paths_json,
|
||||
})
|
||||
return web.json_response(
|
||||
{
|
||||
"packet": {
|
||||
"id": packet.id,
|
||||
"from": packet.from_node_id,
|
||||
"to": packet.to_node_id,
|
||||
"channel": packet.channel,
|
||||
},
|
||||
"traceroute_packets": tr_groups,
|
||||
"unique_forward_paths": unique_forward_paths_json,
|
||||
"unique_reverse_paths": unique_reverse_paths_json,
|
||||
"winning_paths": winning_paths_json,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@routes.get("/api/stats/top")
|
||||
@@ -823,90 +808,75 @@ async def api_stats_top(request):
|
||||
limit = min(int(request.query.get("limit", 20)), 100)
|
||||
offset = int(request.query.get("offset", 0))
|
||||
|
||||
params = {
|
||||
"period_type": period_type,
|
||||
"length": length,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
}
|
||||
multiplier = 3600 if period_type == "hour" else 86400
|
||||
window_us = length * multiplier * 1_000_000
|
||||
|
||||
channel_filter = ""
|
||||
if channel:
|
||||
channel_filter = "AND n.channel = :channel"
|
||||
params["channel"] = channel
|
||||
max_packet_import = select(func.max(PacketModel.import_time_us)).scalar_subquery()
|
||||
max_seen_import = select(func.max(PacketSeenModel.import_time_us)).scalar_subquery()
|
||||
|
||||
sql = f"""
|
||||
WITH sent AS (
|
||||
SELECT
|
||||
p.from_node_id AS node_id,
|
||||
COUNT(*) AS sent
|
||||
FROM packet p
|
||||
WHERE p.import_time_us >= (
|
||||
SELECT MAX(import_time_us) FROM packet
|
||||
) - (
|
||||
CASE
|
||||
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
|
||||
ELSE :length * 86400 * 1000000
|
||||
END
|
||||
)
|
||||
GROUP BY p.from_node_id
|
||||
),
|
||||
seen AS (
|
||||
SELECT
|
||||
p.from_node_id AS node_id,
|
||||
COUNT(*) AS seen
|
||||
FROM packet_seen ps
|
||||
JOIN packet p ON p.id = ps.packet_id
|
||||
WHERE ps.import_time_us >= (
|
||||
SELECT MAX(import_time_us) FROM packet_seen
|
||||
) - (
|
||||
CASE
|
||||
WHEN :period_type = 'hour' THEN :length * 3600 * 1000000
|
||||
ELSE :length * 86400 * 1000000
|
||||
END
|
||||
)
|
||||
GROUP BY p.from_node_id
|
||||
sent_cte = (
|
||||
select(PacketModel.from_node_id.label("node_id"), func.count().label("sent"))
|
||||
.where(PacketModel.import_time_us >= max_packet_import - window_us)
|
||||
.group_by(PacketModel.from_node_id)
|
||||
.cte("sent")
|
||||
)
|
||||
SELECT
|
||||
n.node_id,
|
||||
n.long_name,
|
||||
n.short_name,
|
||||
n.channel,
|
||||
COALESCE(s.sent, 0) AS sent,
|
||||
COALESCE(se.seen, 0) AS seen
|
||||
FROM node n
|
||||
LEFT JOIN sent s ON s.node_id = n.node_id
|
||||
LEFT JOIN seen se ON se.node_id = n.node_id
|
||||
WHERE 1=1
|
||||
{channel_filter}
|
||||
ORDER BY seen DESC
|
||||
LIMIT :limit OFFSET :offset
|
||||
"""
|
||||
|
||||
count_sql = f"""
|
||||
SELECT COUNT(*) FROM node n WHERE 1=1 {channel_filter}
|
||||
"""
|
||||
seen_cte = (
|
||||
select(PacketModel.from_node_id.label("node_id"), func.count().label("seen"))
|
||||
.select_from(PacketSeenModel)
|
||||
.join(PacketModel, PacketModel.id == PacketSeenModel.packet_id)
|
||||
.where(PacketSeenModel.import_time_us >= max_seen_import - window_us)
|
||||
.group_by(PacketModel.from_node_id)
|
||||
.cte("seen")
|
||||
)
|
||||
|
||||
query = (
|
||||
select(
|
||||
Node.node_id,
|
||||
Node.long_name,
|
||||
Node.short_name,
|
||||
Node.channel,
|
||||
func.coalesce(sent_cte.c.sent, 0).label("sent"),
|
||||
func.coalesce(seen_cte.c.seen, 0).label("seen"),
|
||||
)
|
||||
.select_from(Node)
|
||||
.outerjoin(sent_cte, sent_cte.c.node_id == Node.node_id)
|
||||
.outerjoin(seen_cte, seen_cte.c.node_id == Node.node_id)
|
||||
.order_by(func.coalesce(seen_cte.c.seen, 0).desc())
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
)
|
||||
|
||||
count_query = select(func.count()).select_from(Node)
|
||||
|
||||
if channel:
|
||||
query = query.where(Node.channel == channel)
|
||||
count_query = count_query.where(Node.channel == channel)
|
||||
|
||||
async with database.async_session() as session:
|
||||
rows = (await session.execute(text(sql), params)).all()
|
||||
total = (await session.execute(text(count_sql), params)).scalar() or 0
|
||||
rows = (await session.execute(query)).all()
|
||||
total = (await session.execute(count_query)).scalar() or 0
|
||||
|
||||
nodes = []
|
||||
for r in rows:
|
||||
avg = r.seen / max(r.sent, 1)
|
||||
nodes.append({
|
||||
"node_id": r.node_id,
|
||||
"long_name": r.long_name,
|
||||
"short_name": r.short_name,
|
||||
"channel": r.channel,
|
||||
"sent": r.sent,
|
||||
"seen": r.seen,
|
||||
"avg": round(avg, 2),
|
||||
})
|
||||
nodes.append(
|
||||
{
|
||||
"node_id": r.node_id,
|
||||
"long_name": r.long_name,
|
||||
"short_name": r.short_name,
|
||||
"channel": r.channel,
|
||||
"sent": r.sent,
|
||||
"seen": r.seen,
|
||||
"avg": round(avg, 2),
|
||||
}
|
||||
)
|
||||
|
||||
return web.json_response({
|
||||
"total": total,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"nodes": nodes,
|
||||
})
|
||||
return web.json_response(
|
||||
{
|
||||
"total": total,
|
||||
"limit": limit,
|
||||
"offset": offset,
|
||||
"nodes": nodes,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -81,7 +81,10 @@ password = large4cats
|
||||
# Database Configuration
|
||||
# -------------------------
|
||||
[database]
|
||||
# SQLAlchemy connection string. This one uses SQLite with asyncio support.
|
||||
# SQLAlchemy async connection string.
|
||||
# Examples:
|
||||
# sqlite+aiosqlite:///packets.db
|
||||
# postgresql+asyncpg://user:pass@host:5432/meshview
|
||||
connection_string = sqlite+aiosqlite:///packets.db
|
||||
|
||||
|
||||
|
||||
37
startdb.py
37
startdb.py
@@ -7,6 +7,7 @@ import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from sqlalchemy import delete
|
||||
from sqlalchemy.engine.url import make_url
|
||||
|
||||
from meshview import migrations, models, mqtt_database, mqtt_reader, mqtt_store
|
||||
from meshview.config import CONFIG
|
||||
@@ -65,18 +66,16 @@ async def backup_database(database_url: str, backup_dir: str = ".") -> None:
|
||||
backup_dir: Directory to store backups (default: current directory)
|
||||
"""
|
||||
try:
|
||||
# Extract database file path from connection string
|
||||
# Format: sqlite+aiosqlite:///path/to/db.db
|
||||
if not database_url.startswith("sqlite"):
|
||||
url = make_url(database_url)
|
||||
if not url.drivername.startswith("sqlite"):
|
||||
cleanup_logger.warning("Backup only supported for SQLite databases")
|
||||
return
|
||||
|
||||
db_path = database_url.split("///", 1)[1] if "///" in database_url else None
|
||||
if not db_path:
|
||||
if not url.database or url.database == ":memory:":
|
||||
cleanup_logger.error("Could not extract database path from connection string")
|
||||
return
|
||||
|
||||
db_file = Path(db_path)
|
||||
db_file = Path(url.database)
|
||||
if not db_file.exists():
|
||||
cleanup_logger.error(f"Database file not found: {db_file}")
|
||||
return
|
||||
@@ -153,11 +152,11 @@ async def daily_cleanup_at(
|
||||
cleanup_logger.info("Waiting 60 seconds for backup to complete...")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
# Local-time cutoff as string for SQLite DATETIME comparison
|
||||
cutoff = (datetime.datetime.now() - datetime.timedelta(days=days_to_keep)).strftime(
|
||||
"%Y-%m-%d %H:%M:%S"
|
||||
)
|
||||
cleanup_logger.info(f"Running cleanup for records older than {cutoff}...")
|
||||
cutoff_dt = (
|
||||
datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=days_to_keep)
|
||||
).replace(tzinfo=None)
|
||||
cutoff_us = int(cutoff_dt.timestamp() * 1_000_000)
|
||||
cleanup_logger.info(f"Running cleanup for records older than {cutoff_dt.isoformat()}...")
|
||||
|
||||
try:
|
||||
async with db_lock: # Pause ingestion
|
||||
@@ -168,7 +167,7 @@ async def daily_cleanup_at(
|
||||
# Packet
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Packet).where(models.Packet.import_time < cutoff)
|
||||
delete(models.Packet).where(models.Packet.import_time_us < cutoff_us)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Packet")
|
||||
|
||||
@@ -176,7 +175,9 @@ async def daily_cleanup_at(
|
||||
# PacketSeen
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.PacketSeen).where(models.PacketSeen.import_time < cutoff)
|
||||
delete(models.PacketSeen).where(
|
||||
models.PacketSeen.import_time_us < cutoff_us
|
||||
)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from PacketSeen")
|
||||
|
||||
@@ -184,7 +185,9 @@ async def daily_cleanup_at(
|
||||
# Traceroute
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Traceroute).where(models.Traceroute.import_time < cutoff)
|
||||
delete(models.Traceroute).where(
|
||||
models.Traceroute.import_time_us < cutoff_us
|
||||
)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Traceroute")
|
||||
|
||||
@@ -192,17 +195,19 @@ async def daily_cleanup_at(
|
||||
# Node
|
||||
# -------------------------
|
||||
result = await session.execute(
|
||||
delete(models.Node).where(models.Node.last_update < cutoff)
|
||||
delete(models.Node).where(models.Node.last_seen_us < cutoff_us)
|
||||
)
|
||||
cleanup_logger.info(f"Deleted {result.rowcount} rows from Node")
|
||||
|
||||
await session.commit()
|
||||
|
||||
if vacuum_db:
|
||||
if vacuum_db and mqtt_database.engine.dialect.name == "sqlite":
|
||||
cleanup_logger.info("Running VACUUM...")
|
||||
async with mqtt_database.engine.begin() as conn:
|
||||
await conn.exec_driver_sql("VACUUM;")
|
||||
cleanup_logger.info("VACUUM completed.")
|
||||
elif vacuum_db:
|
||||
cleanup_logger.info("VACUUM skipped (not supported for this database).")
|
||||
|
||||
cleanup_logger.info("Cleanup completed successfully.")
|
||||
cleanup_logger.info("Ingestion resumed after cleanup.")
|
||||
|
||||
Reference in New Issue
Block a user