mirror of
https://github.com/Roslund/meshtastic-map.git
synced 2026-03-28 17:43:03 +01:00
Compare commits
21 Commits
master
...
docker-imp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eda9a12443 | ||
|
|
09a2bcb3ad | ||
|
|
de1cfd4222 | ||
|
|
1c1b77b3ea | ||
|
|
2a1ef2131a | ||
|
|
8a43c9d3d1 | ||
|
|
9a18ca1057 | ||
|
|
ffe1c6c30a | ||
|
|
1aa32cfa35 | ||
|
|
825b62c5bb | ||
|
|
b87a7b2f27 | ||
|
|
2ab169b4ff | ||
|
|
821d6177c3 | ||
|
|
8acc4696db | ||
|
|
cd6a99a179 | ||
|
|
90dc3ae449 | ||
|
|
07e362745a | ||
|
|
f6d14b8f95 | ||
|
|
54ebb429d1 | ||
|
|
92a649ad90 | ||
|
|
9ff76345b0 |
@@ -1,3 +1,3 @@
|
||||
.env
|
||||
node_modules
|
||||
.git
|
||||
.git
|
||||
13
.github/dependabot.yml
vendored
13
.github/dependabot.yml
vendored
@@ -1,13 +0,0 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: npm
|
||||
directory: "/"
|
||||
schedule:
|
||||
interval: daily
|
||||
time: '20:00'
|
||||
open-pull-requests-limit: 10
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -2,3 +2,5 @@
|
||||
node_modules
|
||||
# Keep environment variables out of version control
|
||||
.env
|
||||
|
||||
src/external
|
||||
|
||||
3
.gitmodules
vendored
3
.gitmodules
vendored
@@ -1,3 +0,0 @@
|
||||
[submodule "protobufs"]
|
||||
path = src/protobufs
|
||||
url = https://github.com/meshtastic/protobufs.git
|
||||
19
Dockerfile
19
Dockerfile
@@ -1,29 +1,16 @@
|
||||
FROM node:lts-alpine AS build
|
||||
|
||||
RUN apk add --no-cache openssl
|
||||
FROM node:lts-alpine
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy only package files and install deps
|
||||
# This layer will be cached as long as package*.json don't change
|
||||
COPY package*.json package-lock.json* ./
|
||||
RUN --mount=type=cache,target=/root/.npm npm ci --omit=dev
|
||||
RUN npm ci
|
||||
|
||||
# Copy the rest of your source
|
||||
COPY . .
|
||||
|
||||
# Pre-generate prisma client
|
||||
RUN node_modules/.bin/prisma generate
|
||||
|
||||
FROM node:lts-alpine
|
||||
|
||||
RUN apk add --no-cache openssl
|
||||
|
||||
USER node:node
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=build --chown=node:node /app .
|
||||
|
||||
|
||||
EXPOSE 8080
|
||||
EXPOSE 8080
|
||||
@@ -122,6 +122,9 @@ You will now need to restart the `index.js` and `mqtt.js` scripts.
|
||||
|
||||
## MQTT Collector
|
||||
|
||||
> Please note, due to the Meshtastic protobuf schema files being locked under a GPLv3 license, these are not provided in this MIT licensed project.
|
||||
You will need to obtain these files yourself to be able to use the MQTT Collector.
|
||||
|
||||
By default, the [MQTT Collector](./src/mqtt.js) connects to the public Meshtastic MQTT server.
|
||||
Alternatively, you may provide the relevant options shown in the help section below to connect to your own MQTT server along with your own decryption keys.
|
||||
|
||||
|
||||
@@ -30,18 +30,6 @@ services:
|
||||
DATABASE_URL: "mysql://root:password@database:3306/meshtastic-map?connection_limit=100"
|
||||
MAP_OPTS: "" # add any custom index.js options here
|
||||
|
||||
# publishes mqtt packets via websocket
|
||||
meshtastic-ws:
|
||||
container_name: meshtastic-ws
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./Dockerfile
|
||||
command: /app/docker/ws.sh
|
||||
ports:
|
||||
- 8081:8081/tcp
|
||||
environment:
|
||||
WS_OPTS: ""
|
||||
|
||||
# runs the database to store everything from mqtt
|
||||
database:
|
||||
container_name: database
|
||||
@@ -58,7 +46,7 @@ services:
|
||||
interval: 15s
|
||||
timeout: 5s
|
||||
retries: 6
|
||||
start_period: 5s
|
||||
start_interval: 5s
|
||||
|
||||
volumes:
|
||||
database_data:
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
#!/bin/sh
|
||||
|
||||
echo "Starting websocket publisher"
|
||||
exec node src/ws.js ${WS_OPTS}
|
||||
|
||||
4225
package-lock.json
generated
4225
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
20
package.json
20
package.json
@@ -9,18 +9,16 @@
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"@prisma/client": "^6.16.2",
|
||||
"command-line-args": "^6.0.1",
|
||||
"command-line-usage": "^7.0.3",
|
||||
"compression": "^1.8.1",
|
||||
"cors": "^2.8.5",
|
||||
"express": "^5.2.1",
|
||||
"mqtt": "^5.14.1",
|
||||
"protobufjs": "^7.5.4",
|
||||
"ws": "^8.18.3"
|
||||
"@prisma/client": "^5.11.0",
|
||||
"command-line-args": "^5.2.1",
|
||||
"command-line-usage": "^7.0.1",
|
||||
"compression": "^1.7.4",
|
||||
"express": "^5.0.0",
|
||||
"mqtt": "^5.3.6",
|
||||
"protobufjs": "^7.2.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
"jest": "^30.1.3",
|
||||
"prisma": "^6.16.2"
|
||||
"jest": "^29.7.0",
|
||||
"prisma": "^5.10.2"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `nodes` ADD COLUMN `ok_to_mqtt` BOOLEAN NULL;
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `service_envelopes` ADD COLUMN `portnum` INTEGER NULL;
|
||||
@@ -1,5 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `service_envelopes` ADD COLUMN `packet_id` BIGINT NULL;
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX `service_envelopes_packet_id_idx` ON `service_envelopes`(`packet_id`);
|
||||
@@ -1,19 +0,0 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE `battery_stats` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`recorded_at` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`avg_battery_level` DECIMAL(5, 2) NULL,
|
||||
|
||||
INDEX `battery_stats_recorded_at_idx`(`recorded_at`),
|
||||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
|
||||
-- CreateTable
|
||||
CREATE TABLE `channel_utilization_stats` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`recorded_at` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`avg_channel_utilization` DECIMAL(65, 30) NULL,
|
||||
|
||||
INDEX `channel_utilization_stats_recorded_at_idx`(`recorded_at`),
|
||||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
@@ -1,16 +0,0 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE `name_history` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`node_id` BIGINT NOT NULL,
|
||||
`long_name` VARCHAR(191) NOT NULL,
|
||||
`short_name` VARCHAR(191) NOT NULL,
|
||||
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
|
||||
INDEX `name_history_node_id_idx`(`node_id`),
|
||||
INDEX `name_history_long_name_idx`(`long_name`),
|
||||
INDEX `name_history_created_at_idx`(`created_at`),
|
||||
INDEX `name_history_updated_at_idx`(`updated_at`),
|
||||
UNIQUE INDEX `name_history_node_id_long_name_short_name_key`(`node_id`, `long_name`, `short_name`),
|
||||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `nodes` ADD COLUMN `is_backbone` BOOLEAN NULL;
|
||||
@@ -1,3 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `nodes` ADD COLUMN `is_unmessagable` BOOLEAN NULL,
|
||||
ADD COLUMN `public_key` VARCHAR(191) NULL;
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `nodes` ADD COLUMN `max_hops` INTEGER NULL;
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `nodes` ADD COLUMN `channel_id` VARCHAR(191) NULL;
|
||||
@@ -1,5 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `channel_utilization_stats` ADD COLUMN `channel_id` VARCHAR(191) NULL;
|
||||
|
||||
-- CreateIndex
|
||||
CREATE INDEX `channel_utilization_stats_channel_id_idx` ON `channel_utilization_stats`(`channel_id`);
|
||||
@@ -1,2 +0,0 @@
|
||||
-- AlterTable
|
||||
ALTER TABLE `text_messages` ADD COLUMN `ok_to_mqtt` BOOLEAN NULL;
|
||||
@@ -1,23 +0,0 @@
|
||||
-- CreateTable
|
||||
CREATE TABLE `edges` (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`from_node_id` BIGINT NOT NULL,
|
||||
`to_node_id` BIGINT NOT NULL,
|
||||
`snr` INTEGER NOT NULL,
|
||||
`from_latitude` INTEGER NULL,
|
||||
`from_longitude` INTEGER NULL,
|
||||
`to_latitude` INTEGER NULL,
|
||||
`to_longitude` INTEGER NULL,
|
||||
`packet_id` BIGINT NOT NULL,
|
||||
`channel_id` VARCHAR(191) NULL,
|
||||
`gateway_id` BIGINT NULL,
|
||||
`source` VARCHAR(191) NOT NULL,
|
||||
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
|
||||
|
||||
INDEX `edges_from_node_id_idx`(`from_node_id`),
|
||||
INDEX `edges_to_node_id_idx`(`to_node_id`),
|
||||
INDEX `edges_created_at_idx`(`created_at`),
|
||||
INDEX `edges_from_node_id_to_node_id_idx`(`from_node_id`, `to_node_id`),
|
||||
PRIMARY KEY (`id`)
|
||||
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
|
||||
@@ -21,8 +21,6 @@ model Node {
|
||||
hardware_model Int
|
||||
role Int
|
||||
is_licensed Boolean?
|
||||
public_key String?
|
||||
is_unmessagable Boolean?
|
||||
|
||||
firmware_version String?
|
||||
region Int?
|
||||
@@ -53,12 +51,6 @@ model Node {
|
||||
// this column tracks when an mqtt gateway node uplinked a packet
|
||||
mqtt_connection_state_updated_at DateTime?
|
||||
|
||||
ok_to_mqtt Boolean?
|
||||
is_backbone Boolean?
|
||||
max_hops Int?
|
||||
|
||||
channel_id String?
|
||||
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @default(now()) @updatedAt
|
||||
|
||||
@@ -210,8 +202,6 @@ model ServiceEnvelope {
|
||||
gateway_id BigInt?
|
||||
to BigInt
|
||||
from BigInt
|
||||
portnum Int?
|
||||
packet_id BigInt?
|
||||
protobuf Bytes
|
||||
|
||||
created_at DateTime @default(now())
|
||||
@@ -220,7 +210,6 @@ model ServiceEnvelope {
|
||||
@@index(created_at)
|
||||
@@index(updated_at)
|
||||
@@index(gateway_id)
|
||||
@@index(packet_id)
|
||||
@@map("service_envelopes")
|
||||
}
|
||||
|
||||
@@ -239,7 +228,6 @@ model TextMessage {
|
||||
rx_snr Decimal?
|
||||
rx_rssi Int?
|
||||
hop_limit Int?
|
||||
ok_to_mqtt Boolean?
|
||||
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @default(now()) @updatedAt
|
||||
@@ -308,67 +296,3 @@ model Waypoint {
|
||||
@@index(gateway_id)
|
||||
@@map("waypoints")
|
||||
}
|
||||
|
||||
model NameHistory {
|
||||
id BigInt @id @default(autoincrement())
|
||||
node_id BigInt
|
||||
long_name String
|
||||
short_name String
|
||||
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @default(now()) @updatedAt
|
||||
|
||||
@@index(node_id)
|
||||
@@index(long_name)
|
||||
|
||||
@@index(created_at)
|
||||
@@index(updated_at)
|
||||
@@map("name_history")
|
||||
|
||||
// We only want to keep track of unique name and node_id combinations
|
||||
@@unique([node_id, long_name, short_name])
|
||||
}
|
||||
|
||||
model BatteryStats {
|
||||
id BigInt @id @default(autoincrement())
|
||||
recorded_at DateTime? @default(now())
|
||||
avg_battery_level Decimal? @db.Decimal(5, 2)
|
||||
|
||||
@@index([recorded_at])
|
||||
@@map("battery_stats")
|
||||
}
|
||||
|
||||
model ChannelUtilizationStats {
|
||||
id BigInt @id @default(autoincrement())
|
||||
recorded_at DateTime? @default(now())
|
||||
avg_channel_utilization Decimal?
|
||||
channel_id String?
|
||||
|
||||
@@index([channel_id])
|
||||
@@index([recorded_at])
|
||||
@@map("channel_utilization_stats")
|
||||
}
|
||||
|
||||
model Edge {
|
||||
id BigInt @id @default(autoincrement())
|
||||
from_node_id BigInt
|
||||
to_node_id BigInt
|
||||
snr Int
|
||||
from_latitude Int?
|
||||
from_longitude Int?
|
||||
to_latitude Int?
|
||||
to_longitude Int?
|
||||
packet_id BigInt
|
||||
channel_id String?
|
||||
gateway_id BigInt?
|
||||
source String
|
||||
|
||||
created_at DateTime @default(now())
|
||||
updated_at DateTime @default(now()) @updatedAt
|
||||
|
||||
@@index(from_node_id)
|
||||
@@index(to_node_id)
|
||||
@@index(created_at)
|
||||
@@index([from_node_id, to_node_id])
|
||||
@@map("edges")
|
||||
}
|
||||
@@ -1,7 +1,6 @@
|
||||
// node src/admin.js --purge-node-id 123
|
||||
// node src/admin.js --purge-node-id '!AABBCCDD'
|
||||
|
||||
require('./utils/logger');
|
||||
const commandLineArgs = require("command-line-args");
|
||||
const commandLineUsage = require("command-line-usage");
|
||||
|
||||
|
||||
434
src/index.js
434
src/index.js
@@ -1,13 +1,9 @@
|
||||
require('./utils/logger');
|
||||
const fs = require("fs");
|
||||
const path = require('path');
|
||||
const express = require('express');
|
||||
const compression = require('compression');
|
||||
const protobufjs = require("protobufjs");
|
||||
const commandLineArgs = require("command-line-args");
|
||||
const commandLineUsage = require("command-line-usage");
|
||||
const cors = require('cors');
|
||||
|
||||
const statsRoutes = require('./stats.js');
|
||||
|
||||
// create prisma db client
|
||||
const { PrismaClient } = require("@prisma/client");
|
||||
@@ -54,24 +50,21 @@ if(options.help){
|
||||
// get options and fallback to default values
|
||||
const port = options["port"] ?? 8080;
|
||||
|
||||
// load protobufs
|
||||
const root = new protobufjs.Root();
|
||||
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
|
||||
root.loadSync('meshtastic/mqtt.proto');
|
||||
const HardwareModel = root.lookupEnum("HardwareModel");
|
||||
const Role = root.lookupEnum("Config.DeviceConfig.Role");
|
||||
const RegionCode = root.lookupEnum("Config.LoRaConfig.RegionCode");
|
||||
const ModemPreset = root.lookupEnum("Config.LoRaConfig.ModemPreset");
|
||||
// load json
|
||||
const hardwareModels = JSON.parse(fs.readFileSync(path.join(__dirname, "json/hardware_models.json"), "utf-8"));
|
||||
const roles = JSON.parse(fs.readFileSync(path.join(__dirname, "json/roles.json"), "utf-8"));
|
||||
const regionCodes = JSON.parse(fs.readFileSync(path.join(__dirname, "json/region_codes.json"), "utf-8"));
|
||||
const modemPresets = JSON.parse(fs.readFileSync(path.join(__dirname, "json/modem_presets.json"), "utf-8"));
|
||||
|
||||
// appends extra info for node objects returned from api
|
||||
function formatNodeInfo(node) {
|
||||
return {
|
||||
...node,
|
||||
node_id_hex: "!" + node.node_id.toString(16),
|
||||
hardware_model_name: HardwareModel.valuesById[node.hardware_model] ?? null,
|
||||
role_name: Role.valuesById[node.role] ?? null,
|
||||
region_name: RegionCode.valuesById[node.region] ?? null,
|
||||
modem_preset_name: ModemPreset.valuesById[node.modem_preset] ?? null,
|
||||
hardware_model_name: hardwareModels[node.hardware_model] ?? null,
|
||||
role_name: roles[node.role] ?? null,
|
||||
region_name: regionCodes[node.region] ?? null,
|
||||
modem_preset_name: modemPresets[node.modem_preset] ?? null,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -80,9 +73,6 @@ const app = express();
|
||||
// enable compression
|
||||
app.use(compression());
|
||||
|
||||
// Apply CORS only to API routes
|
||||
app.use('/api', cors());
|
||||
|
||||
// serve files inside the public folder from /
|
||||
app.use('/', express.static(path.join(__dirname, 'public')));
|
||||
|
||||
@@ -90,9 +80,6 @@ app.get('/', async (req, res) => {
|
||||
res.sendFile(path.join(__dirname, 'public/index.html'));
|
||||
});
|
||||
|
||||
// stats API in separate file
|
||||
app.use('/api/v1/stats', statsRoutes);
|
||||
|
||||
app.get('/api', async (req, res) => {
|
||||
|
||||
const links = [
|
||||
@@ -147,23 +134,6 @@ app.get('/api', async (req, res) => {
|
||||
"path": "/api/v1/nodes/:nodeId/traceroutes",
|
||||
"description": "Trace routes for a meshtastic node",
|
||||
},
|
||||
{
|
||||
"path": "/api/v1/traceroutes",
|
||||
"description": "Recent traceroute edges across all nodes",
|
||||
"params": {
|
||||
"time_from": "Only include traceroutes updated after this unix timestamp (milliseconds)",
|
||||
"time_to": "Only include traceroutes updated before this unix timestamp (milliseconds)"
|
||||
}
|
||||
},
|
||||
{
|
||||
"path": "/api/v1/connections",
|
||||
"description": "Aggregated edges between nodes from traceroutes",
|
||||
"params": {
|
||||
"node_id": "Only include connections involving this node id",
|
||||
"time_from": "Only include edges created after this unix timestamp (milliseconds)",
|
||||
"time_to": "Only include edges created before this unix timestamp (milliseconds)"
|
||||
}
|
||||
},
|
||||
{
|
||||
"path": "/api/v1/nodes/:nodeId/position-history",
|
||||
"description": "Position history for a meshtastic node",
|
||||
@@ -229,10 +199,6 @@ app.get('/api/v1/nodes', async (req, res) => {
|
||||
where: {
|
||||
role: role,
|
||||
hardware_model: hardwareModel,
|
||||
// Since we removed retention; only include nodes that have been updated in the last 30 days
|
||||
updated_at: {
|
||||
gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // within last 30 days
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
@@ -547,22 +513,10 @@ app.get('/api/v1/nodes/:nodeId/traceroutes', async (req, res) => {
|
||||
return;
|
||||
}
|
||||
|
||||
// get latest traceroutes, deduplicated by packet_id
|
||||
// get latest traceroutes
|
||||
// We want replies where want_response is false and it will be "to" the
|
||||
// requester.
|
||||
// Deduplicate by packet_id, keeping the latest traceroute (highest id) for each packet_id
|
||||
const traceroutes = await prisma.$queryRaw`
|
||||
SELECT t1.*
|
||||
FROM traceroutes t1
|
||||
INNER JOIN (
|
||||
SELECT packet_id, MAX(id) as max_id
|
||||
FROM traceroutes
|
||||
WHERE want_response = false and \`to\` = ${node.node_id} and gateway_id is not null
|
||||
GROUP BY packet_id
|
||||
) t2 ON t1.packet_id = t2.packet_id AND t1.id = t2.max_id
|
||||
ORDER BY t1.id DESC
|
||||
LIMIT ${count}
|
||||
`;
|
||||
const traceroutes = await prisma.$queryRaw`SELECT * FROM traceroutes WHERE want_response = false and \`to\` = ${node.node_id} and gateway_id is not null order by id desc limit ${count}`;
|
||||
|
||||
res.json({
|
||||
traceroutes: traceroutes.map((trace) => {
|
||||
@@ -600,313 +554,6 @@ app.get('/api/v1/nodes/:nodeId/traceroutes', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
// Aggregated recent traceroute edges (global), filtered by updated_at
|
||||
// Returns deduplicated edges with the latest SNR and timestamp.
|
||||
// GET /api/v1/nodes/traceroutes?time_from=...&time_to=...
|
||||
app.get('/api/v1/traceroutes', async (req, res) => {
|
||||
try {
|
||||
|
||||
const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined;
|
||||
const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined;
|
||||
|
||||
// Pull recent traceroutes within the time window. We only want replies (want_response=false)
|
||||
// and those that were actually gated to MQTT (gateway_id not null)
|
||||
const traces = await prisma.traceRoute.findMany({
|
||||
where: {
|
||||
want_response: false,
|
||||
gateway_id: { not: null },
|
||||
updated_at: {
|
||||
gte: timeFrom ? new Date(timeFrom) : undefined,
|
||||
lte: timeTo ? new Date(timeTo) : undefined,
|
||||
},
|
||||
},
|
||||
orderBy: { id: 'desc' },
|
||||
take: 5000, // cap to keep response bounded; UI can page/adjust time window if needed
|
||||
});
|
||||
|
||||
// Normalize JSON fields that may be strings (depending on driver)
|
||||
const normalized = traces.map((t) => {
|
||||
const trace = { ...t };
|
||||
if (typeof trace.route === 'string') {
|
||||
try { trace.route = JSON.parse(trace.route); } catch(_) {}
|
||||
}
|
||||
if (typeof trace.route_back === 'string') {
|
||||
try { trace.route_back = JSON.parse(trace.route_back); } catch(_) {}
|
||||
}
|
||||
if (typeof trace.snr_towards === 'string') {
|
||||
try { trace.snr_towards = JSON.parse(trace.snr_towards); } catch(_) {}
|
||||
}
|
||||
if (typeof trace.snr_back === 'string') {
|
||||
try { trace.snr_back = JSON.parse(trace.snr_back); } catch(_) {}
|
||||
}
|
||||
return trace;
|
||||
});
|
||||
|
||||
// Build edges from both forward (towards) and reverse (back) paths.
|
||||
// Forward path: to → route[] → from, using snr_towards
|
||||
// Reverse path: from → route_back[] → to, using snr_back
|
||||
const edgeKey = (a, b) => `${String(a)}->${String(b)}`;
|
||||
const edges = new Map();
|
||||
|
||||
function upsertEdgesFromPath(trace, pathNodes, pathSnrs) {
|
||||
for (let i = 0; i < pathNodes.length - 1; i++) {
|
||||
const hopFrom = pathNodes[i];
|
||||
const hopTo = pathNodes[i + 1];
|
||||
const snr = typeof (pathSnrs && pathSnrs[i]) === 'number' ? pathSnrs[i] : null;
|
||||
|
||||
// Skip edges without SNR data
|
||||
if (snr === null) continue;
|
||||
|
||||
const key = edgeKey(hopFrom, hopTo);
|
||||
const existing = edges.get(key);
|
||||
if (!existing) {
|
||||
edges.set(key, {
|
||||
from: hopFrom,
|
||||
to: hopTo,
|
||||
snr: snr,
|
||||
updated_at: trace.updated_at,
|
||||
channel_id: trace.channel_id ?? null,
|
||||
gateway_id: trace.gateway_id ?? null,
|
||||
traceroute_from: trace.from, // original initiator
|
||||
traceroute_to: trace.to, // original target
|
||||
});
|
||||
} else if (new Date(trace.updated_at) > new Date(existing.updated_at)) {
|
||||
existing.snr = snr;
|
||||
existing.updated_at = trace.updated_at;
|
||||
existing.channel_id = trace.channel_id ?? existing.channel_id;
|
||||
existing.gateway_id = trace.gateway_id ?? existing.gateway_id;
|
||||
existing.traceroute_from = trace.from;
|
||||
existing.traceroute_to = trace.to;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const tr of normalized) {
|
||||
// Forward path
|
||||
const forwardPath = [];
|
||||
if (tr.to != null) forwardPath.push(Number(tr.to));
|
||||
if (Array.isArray(tr.route)) {
|
||||
for (const hop of tr.route) {
|
||||
if (hop != null) forwardPath.push(Number(hop));
|
||||
}
|
||||
}
|
||||
if (tr.from != null) forwardPath.push(Number(tr.from));
|
||||
const forwardSnrs = Array.isArray(tr.snr_towards) ? tr.snr_towards : [];
|
||||
upsertEdgesFromPath(tr, forwardPath, forwardSnrs);
|
||||
|
||||
// Reverse path
|
||||
const reversePath = [];
|
||||
if (tr.from != null) reversePath.push(Number(tr.from));
|
||||
if (Array.isArray(tr.route_back)) {
|
||||
for (const hop of tr.route_back) {
|
||||
if (hop != null) reversePath.push(Number(hop));
|
||||
}
|
||||
}
|
||||
if (tr.to != null) reversePath.push(Number(tr.to));
|
||||
const reverseSnrs = Array.isArray(tr.snr_back) ? tr.snr_back : [];
|
||||
upsertEdgesFromPath(tr, reversePath, reverseSnrs);
|
||||
}
|
||||
|
||||
res.json({
|
||||
traceroute_edges: Array.from(edges.values()),
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
res.status(500).json({
|
||||
message: "Something went wrong, try again later.",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Aggregated edges endpoint
|
||||
// GET /api/v1/connections?node_id=...&time_from=...&time_to=...
|
||||
app.get('/api/v1/connections', async (req, res) => {
|
||||
try {
|
||||
const nodeId = req.query.node_id ? parseInt(req.query.node_id) : undefined;
|
||||
const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined;
|
||||
const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined;
|
||||
|
||||
// Query edges from database
|
||||
const edges = await prisma.edge.findMany({
|
||||
where: {
|
||||
created_at: {
|
||||
...(timeFrom && { gte: new Date(timeFrom) }),
|
||||
...(timeTo && { lte: new Date(timeTo) }),
|
||||
},
|
||||
// Only include edges where both nodes have positions
|
||||
from_latitude: { not: null },
|
||||
from_longitude: { not: null },
|
||||
to_latitude: { not: null },
|
||||
to_longitude: { not: null },
|
||||
// If node_id is provided, filter edges where either from_node_id or to_node_id matches
|
||||
...(nodeId !== undefined && {
|
||||
OR: [
|
||||
{ from_node_id: nodeId },
|
||||
{ to_node_id: nodeId },
|
||||
],
|
||||
}),
|
||||
},
|
||||
orderBy: [
|
||||
{ created_at: 'desc' },
|
||||
{ packet_id: 'desc' },
|
||||
],
|
||||
});
|
||||
|
||||
// Collect all unique node IDs from edges
|
||||
const nodeIds = new Set();
|
||||
for (const edge of edges) {
|
||||
nodeIds.add(edge.from_node_id);
|
||||
nodeIds.add(edge.to_node_id);
|
||||
}
|
||||
|
||||
// Fetch current positions for all nodes
|
||||
const nodes = await prisma.node.findMany({
|
||||
where: {
|
||||
node_id: { in: Array.from(nodeIds) },
|
||||
},
|
||||
select: {
|
||||
node_id: true,
|
||||
latitude: true,
|
||||
longitude: true,
|
||||
},
|
||||
});
|
||||
|
||||
// Create a map of current node positions
|
||||
const nodePositions = new Map();
|
||||
for (const node of nodes) {
|
||||
nodePositions.set(node.node_id, {
|
||||
latitude: node.latitude,
|
||||
longitude: node.longitude,
|
||||
});
|
||||
}
|
||||
|
||||
// Filter edges: only include edges where both nodes are still at the same location
|
||||
const validEdges = edges.filter(edge => {
|
||||
const fromCurrentPos = nodePositions.get(edge.from_node_id);
|
||||
const toCurrentPos = nodePositions.get(edge.to_node_id);
|
||||
|
||||
// Skip if either node doesn't exist or doesn't have a current position
|
||||
if (!fromCurrentPos || !toCurrentPos ||
|
||||
fromCurrentPos.latitude === null || fromCurrentPos.longitude === null ||
|
||||
toCurrentPos.latitude === null || toCurrentPos.longitude === null) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if stored positions match current positions
|
||||
const fromMatches = fromCurrentPos.latitude === edge.from_latitude &&
|
||||
fromCurrentPos.longitude === edge.from_longitude;
|
||||
const toMatches = toCurrentPos.latitude === edge.to_latitude &&
|
||||
toCurrentPos.longitude === edge.to_longitude;
|
||||
|
||||
return fromMatches && toMatches;
|
||||
});
|
||||
|
||||
// Normalize node pairs: always use min/max to treat A->B and B->A as same connection
|
||||
const connectionsMap = new Map();
|
||||
|
||||
for (const edge of validEdges) {
|
||||
const nodeA = edge.from_node_id < edge.to_node_id ? edge.from_node_id : edge.to_node_id;
|
||||
const nodeB = edge.from_node_id < edge.to_node_id ? edge.to_node_id : edge.from_node_id;
|
||||
const key = `${nodeA}-${nodeB}`;
|
||||
|
||||
if (!connectionsMap.has(key)) {
|
||||
connectionsMap.set(key, {
|
||||
node_a: nodeA,
|
||||
node_b: nodeB,
|
||||
direction_ab: [], // A -> B edges
|
||||
direction_ba: [], // B -> A edges
|
||||
});
|
||||
}
|
||||
|
||||
const connection = connectionsMap.get(key);
|
||||
const isAB = edge.from_node_id === nodeA;
|
||||
|
||||
// Add edge to appropriate direction
|
||||
if (isAB) {
|
||||
connection.direction_ab.push({
|
||||
snr: edge.snr,
|
||||
snr_db: edge.snr / 4, // Convert to dB
|
||||
created_at: edge.created_at,
|
||||
packet_id: edge.packet_id,
|
||||
source: edge.source,
|
||||
});
|
||||
} else {
|
||||
connection.direction_ba.push({
|
||||
snr: edge.snr,
|
||||
snr_db: edge.snr / 4,
|
||||
created_at: edge.created_at,
|
||||
packet_id: edge.packet_id,
|
||||
source: edge.source,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Aggregate each connection
|
||||
const connections = Array.from(connectionsMap.values()).map(conn => {
|
||||
// Deduplicate edges by packet_id for each direction (keep first occurrence, which is most recent)
|
||||
const dedupeByPacketId = (edges) => {
|
||||
const seen = new Set();
|
||||
return edges.filter(edge => {
|
||||
if (seen.has(edge.packet_id)) {
|
||||
return false;
|
||||
}
|
||||
seen.add(edge.packet_id);
|
||||
return true;
|
||||
});
|
||||
};
|
||||
|
||||
const deduplicatedAB = dedupeByPacketId(conn.direction_ab);
|
||||
const deduplicatedBA = dedupeByPacketId(conn.direction_ba);
|
||||
|
||||
// Calculate average SNR for A->B (using deduplicated edges)
|
||||
const avgSnrAB = deduplicatedAB.length > 0
|
||||
? deduplicatedAB.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedAB.length
|
||||
: null;
|
||||
|
||||
// Calculate average SNR for B->A (using deduplicated edges)
|
||||
const avgSnrBA = deduplicatedBA.length > 0
|
||||
? deduplicatedBA.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedBA.length
|
||||
: null;
|
||||
|
||||
// Get last 5 edges for each direction (already sorted by created_at DESC, packet_id DESC, now deduplicated)
|
||||
const last5AB = deduplicatedAB.slice(0, 5);
|
||||
const last5BA = deduplicatedBA.slice(0, 5);
|
||||
|
||||
// Determine worst average SNR
|
||||
const worstAvgSnrDb = [avgSnrAB, avgSnrBA]
|
||||
.filter(v => v !== null)
|
||||
.reduce((min, val) => val < min ? val : min, Infinity);
|
||||
|
||||
return {
|
||||
node_a: conn.node_a,
|
||||
node_b: conn.node_b,
|
||||
direction_ab: {
|
||||
avg_snr_db: avgSnrAB,
|
||||
last_5_edges: last5AB,
|
||||
total_count: deduplicatedAB.length, // Use deduplicated count
|
||||
},
|
||||
direction_ba: {
|
||||
avg_snr_db: avgSnrBA,
|
||||
last_5_edges: last5BA,
|
||||
total_count: deduplicatedBA.length, // Use deduplicated count
|
||||
},
|
||||
worst_avg_snr_db: worstAvgSnrDb !== Infinity ? worstAvgSnrDb : null,
|
||||
};
|
||||
}).filter(conn => conn.worst_avg_snr_db !== null); // Only return connections with at least one direction
|
||||
|
||||
res.json({
|
||||
connections: connections,
|
||||
});
|
||||
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
res.status(500).json({
|
||||
message: "Something went wrong, try again later.",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => {
|
||||
try {
|
||||
|
||||
@@ -996,6 +643,42 @@ app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/v1/stats/hardware-models', async (req, res) => {
|
||||
try {
|
||||
|
||||
// get nodes from db
|
||||
const results = await prisma.node.groupBy({
|
||||
by: ['hardware_model'],
|
||||
orderBy: {
|
||||
_count: {
|
||||
hardware_model: 'desc',
|
||||
},
|
||||
},
|
||||
_count: {
|
||||
hardware_model: true,
|
||||
},
|
||||
});
|
||||
|
||||
const hardwareModelStats = results.map((result) => {
|
||||
return {
|
||||
count: result._count.hardware_model,
|
||||
hardware_model: result.hardware_model,
|
||||
hardware_model_name: hardwareModels[result.hardware_model] ?? "UNKNOWN",
|
||||
};
|
||||
});
|
||||
|
||||
res.json({
|
||||
hardware_model_stats: hardwareModelStats,
|
||||
});
|
||||
|
||||
} catch(err) {
|
||||
console.error(err);
|
||||
res.status(500).json({
|
||||
message: "Something went wrong, try again later.",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
app.get('/api/v1/text-messages', async (req, res) => {
|
||||
try {
|
||||
|
||||
@@ -1122,29 +805,8 @@ app.get('/api/v1/waypoints', async (req, res) => {
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
// start express server
|
||||
const listener = app.listen(port, () => {
|
||||
const port = listener.address().port;
|
||||
console.log(`Server running at http://127.0.0.1:${port}`);
|
||||
});
|
||||
|
||||
// Graceful shutdown handlers
|
||||
function gracefulShutdown(signal) {
|
||||
console.log(`Received ${signal}. Starting graceful shutdown...`);
|
||||
|
||||
// Stop accepting new connections
|
||||
listener.close(async (err) => {
|
||||
console.log('HTTP server closed');
|
||||
await prisma.$disconnect();
|
||||
console.log('Database connections closed');
|
||||
console.log('Graceful shutdown completed');
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle SIGTERM (Docker, systemd, etc.)
|
||||
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
|
||||
|
||||
// Handle SIGINT (Ctrl+C)
|
||||
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
|
||||
|
||||
108
src/json/hardware_models.json
Normal file
108
src/json/hardware_models.json
Normal file
@@ -0,0 +1,108 @@
|
||||
{
|
||||
"0": "UNSET",
|
||||
"1": "TLORA_V2",
|
||||
"2": "TLORA_V1",
|
||||
"3": "TLORA_V2_1_1P6",
|
||||
"4": "TBEAM",
|
||||
"5": "HELTEC_V2_0",
|
||||
"6": "TBEAM_V0P7",
|
||||
"7": "T_ECHO",
|
||||
"8": "TLORA_V1_1P3",
|
||||
"9": "RAK4631",
|
||||
"10": "HELTEC_V2_1",
|
||||
"11": "HELTEC_V1",
|
||||
"12": "LILYGO_TBEAM_S3_CORE",
|
||||
"13": "RAK11200",
|
||||
"14": "NANO_G1",
|
||||
"15": "TLORA_V2_1_1P8",
|
||||
"16": "TLORA_T3_S3",
|
||||
"17": "NANO_G1_EXPLORER",
|
||||
"18": "NANO_G2_ULTRA",
|
||||
"19": "LORA_TYPE",
|
||||
"20": "WIPHONE",
|
||||
"21": "WIO_WM1110",
|
||||
"22": "RAK2560",
|
||||
"23": "HELTEC_HRU_3601",
|
||||
"24": "HELTEC_WIRELESS_BRIDGE",
|
||||
"25": "STATION_G1",
|
||||
"26": "RAK11310",
|
||||
"27": "SENSELORA_RP2040",
|
||||
"28": "SENSELORA_S3",
|
||||
"29": "CANARYONE",
|
||||
"30": "RP2040_LORA",
|
||||
"31": "STATION_G2",
|
||||
"32": "LORA_RELAY_V1",
|
||||
"33": "NRF52840DK",
|
||||
"34": "PPR",
|
||||
"35": "GENIEBLOCKS",
|
||||
"36": "NRF52_UNKNOWN",
|
||||
"37": "PORTDUINO",
|
||||
"38": "ANDROID_SIM",
|
||||
"39": "DIY_V1",
|
||||
"40": "NRF52840_PCA10059",
|
||||
"41": "DR_DEV",
|
||||
"42": "M5STACK",
|
||||
"43": "HELTEC_V3",
|
||||
"44": "HELTEC_WSL_V3",
|
||||
"45": "BETAFPV_2400_TX",
|
||||
"46": "BETAFPV_900_NANO_TX",
|
||||
"47": "RPI_PICO",
|
||||
"48": "HELTEC_WIRELESS_TRACKER",
|
||||
"49": "HELTEC_WIRELESS_PAPER",
|
||||
"50": "T_DECK",
|
||||
"51": "T_WATCH_S3",
|
||||
"52": "PICOMPUTER_S3",
|
||||
"53": "HELTEC_HT62",
|
||||
"54": "EBYTE_ESP32_S3",
|
||||
"55": "ESP32_S3_PICO",
|
||||
"56": "CHATTER_2",
|
||||
"57": "HELTEC_WIRELESS_PAPER_V1_0",
|
||||
"58": "HELTEC_WIRELESS_TRACKER_V1_0",
|
||||
"59": "UNPHONE",
|
||||
"60": "TD_LORAC",
|
||||
"61": "CDEBYTE_EORA_S3",
|
||||
"62": "TWC_MESH_V4",
|
||||
"63": "NRF52_PROMICRO_DIY",
|
||||
"64": "RADIOMASTER_900_BANDIT_NANO",
|
||||
"65": "HELTEC_CAPSULE_SENSOR_V3",
|
||||
"66": "HELTEC_VISION_MASTER_T190",
|
||||
"67": "HELTEC_VISION_MASTER_E213",
|
||||
"68": "HELTEC_VISION_MASTER_E290",
|
||||
"69": "HELTEC_MESH_NODE_T114",
|
||||
"70": "SENSECAP_INDICATOR",
|
||||
"71": "TRACKER_T1000_E",
|
||||
"72": "RAK3172",
|
||||
"73": "WIO_E5",
|
||||
"74": "RADIOMASTER_900_BANDIT",
|
||||
"75": "ME25LS01_4Y10TD",
|
||||
"76": "RP2040_FEATHER_RFM95",
|
||||
"77": "M5STACK_COREBASIC",
|
||||
"78": "M5STACK_CORE2",
|
||||
"79": "RPI_PICO2",
|
||||
"80": "M5STACK_CORES3",
|
||||
"81": "SEEED_XIAO_S3",
|
||||
"82": "MS24SF1",
|
||||
"83": "TLORA_C6",
|
||||
"84": "WISMESH_TAP",
|
||||
"85": "ROUTASTIC",
|
||||
"86": "MESH_TAB",
|
||||
"87": "MESHLINK",
|
||||
"88": "XIAO_NRF52_KIT",
|
||||
"89": "THINKNODE_M1",
|
||||
"90": "THINKNODE_M2",
|
||||
"91": "T_ETH_ELITE",
|
||||
"92": "HELTEC_SENSOR_HUB",
|
||||
"93": "RESERVED_FRIED_CHICKEN",
|
||||
"94": "HELTEC_MESH_POCKET",
|
||||
"95": "SEEED_SOLAR_NODE",
|
||||
"96": "NOMADSTAR_METEOR_PRO",
|
||||
"97": "CROWPANEL",
|
||||
"98": "LINK_32",
|
||||
"99": "SEEED_WIO_TRACKER_L1",
|
||||
"100": "SEEED_WIO_TRACKER_L1_EINK",
|
||||
"101": "QWANTZ_TINY_ARMS",
|
||||
"102": "T_DECK_PRO",
|
||||
"103": "T_LORA_PAGER",
|
||||
"104": "GAT562_MESH_TRIAL_TRACKER",
|
||||
"255": "PRIVATE_HW"
|
||||
}
|
||||
11
src/json/modem_presets.json
Normal file
11
src/json/modem_presets.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"0": "LONG_FAST",
|
||||
"1": "LONG_SLOW",
|
||||
"2": "VERY_LONG_SLOW",
|
||||
"3": "MEDIUM_SLOW",
|
||||
"4": "MEDIUM_FAST",
|
||||
"5": "SHORT_SLOW",
|
||||
"6": "SHORT_FAST",
|
||||
"7": "LONG_MODERATE",
|
||||
"8": "SHORT_TURBO"
|
||||
}
|
||||
24
src/json/region_codes.json
Normal file
24
src/json/region_codes.json
Normal file
@@ -0,0 +1,24 @@
|
||||
{
|
||||
"0": "UNSET",
|
||||
"1": "US",
|
||||
"2": "EU_433",
|
||||
"3": "EU_868",
|
||||
"4": "CN",
|
||||
"5": "JP",
|
||||
"6": "ANZ",
|
||||
"7": "KR",
|
||||
"8": "TW",
|
||||
"9": "RU",
|
||||
"10": "IN",
|
||||
"11": "NZ_865",
|
||||
"12": "TH",
|
||||
"13": "LORA_24",
|
||||
"14": "UA_433",
|
||||
"15": "UA_868",
|
||||
"16": "MY_433",
|
||||
"17": "MY_919",
|
||||
"18": "SG_923",
|
||||
"19": "PH_433",
|
||||
"20": "PH_868",
|
||||
"21": "PH_915"
|
||||
}
|
||||
14
src/json/roles.json
Normal file
14
src/json/roles.json
Normal file
@@ -0,0 +1,14 @@
|
||||
{
|
||||
"0": "CLIENT",
|
||||
"1": "CLIENT_MUTE",
|
||||
"2": "ROUTER",
|
||||
"3": "ROUTER_CLIENT",
|
||||
"4": "REPEATER",
|
||||
"5": "TRACKER",
|
||||
"6": "SENSOR",
|
||||
"7": "TAK",
|
||||
"8": "CLIENT_HIDDEN",
|
||||
"9": "LOST_AND_FOUND",
|
||||
"10": "TAK_TRACKER",
|
||||
"11": "ROUTER_LATE"
|
||||
}
|
||||
350
src/mqtt.js
350
src/mqtt.js
@@ -1,4 +1,4 @@
|
||||
require('./utils/logger');
|
||||
const fs = require("fs");
|
||||
const crypto = require("crypto");
|
||||
const path = require("path");
|
||||
const mqtt = require("mqtt");
|
||||
@@ -22,6 +22,11 @@ const optionsList = [
|
||||
type: Boolean,
|
||||
description: 'Display this usage guide.'
|
||||
},
|
||||
{
|
||||
name: "protobufs-path",
|
||||
type: String,
|
||||
description: "Path to Protobufs (e.g: ../../protobufs)",
|
||||
},
|
||||
{
|
||||
name: "mqtt-broker-url",
|
||||
type: String,
|
||||
@@ -207,6 +212,7 @@ if(options.help){
|
||||
}
|
||||
|
||||
// get options and fallback to default values
|
||||
const protobufsPath = options["protobufs-path"] ?? path.join(path.dirname(__filename), "external/protobufs");
|
||||
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
|
||||
const mqttUsername = options["mqtt-username"] ?? "meshdev";
|
||||
const mqttPassword = options["mqtt-password"] ?? "large4cats";
|
||||
@@ -223,7 +229,6 @@ const collectNeighbourInfo = options["collect-neighbour-info"] ?? false;
|
||||
const collectMapReports = options["collect-map-reports"] ?? false;
|
||||
const decryptionKeys = options["decryption-keys"] ?? [
|
||||
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
|
||||
"PjG/mVAqnannyvqmuYAwd0LZa1AV+wkcUQlacmexEXY=", // Årsta mesh? länkad av [x/0!] divideByZero i meshen
|
||||
];
|
||||
const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false;
|
||||
const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null;
|
||||
@@ -242,6 +247,25 @@ const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds
|
||||
const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null;
|
||||
const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null;
|
||||
|
||||
// ensure protobufs exist
|
||||
if(!fs.existsSync(path.join(protobufsPath, "meshtastic/mqtt.proto"))){
|
||||
console.error([
|
||||
"ERROR: MQTT Collector requires Meshtastic protobufs.",
|
||||
"",
|
||||
"This project is licensed under the MIT license to allow end users to do as they wish.",
|
||||
"Unfortunately, the Meshtastic protobuf schema files are licensed under GPLv3, which means they can not be bundled in this project due to license conflicts.",
|
||||
"https://github.com/liamcottle/meshtastic-map/issues/102",
|
||||
"https://github.com/meshtastic/protobufs/issues/695",
|
||||
"",
|
||||
"If you clone and install the Meshtastic protobufs as described below, your use of those files will be subject to the GPLv3 license.",
|
||||
"This does not change the license of this project being MIT. Only the parts you add from the Meshtastic project are covered under GPLv3.",
|
||||
"",
|
||||
"To use the MQTT Collector, please clone the Meshtastic protobufs into src/external/protobufs",
|
||||
"git clone https://github.com/meshtastic/protobufs src/external/protobufs",
|
||||
].join("\n"));
|
||||
return;
|
||||
}
|
||||
|
||||
// create mqtt client
|
||||
const client = mqtt.connect(mqttBrokerUrl, {
|
||||
username: mqttUsername,
|
||||
@@ -251,7 +275,7 @@ const client = mqtt.connect(mqttBrokerUrl, {
|
||||
|
||||
// load protobufs
|
||||
const root = new protobufjs.Root();
|
||||
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
|
||||
root.resolvePath = (origin, target) => path.join(protobufsPath, target);
|
||||
root.loadSync('meshtastic/mqtt.proto');
|
||||
const Data = root.lookupType("Data");
|
||||
const ServiceEnvelope = root.lookupType("ServiceEnvelope");
|
||||
@@ -264,9 +288,8 @@ const User = root.lookupType("User");
|
||||
const Waypoint = root.lookupType("Waypoint");
|
||||
|
||||
// run automatic purge if configured
|
||||
let purgeInterval = null;
|
||||
if(purgeIntervalSeconds){
|
||||
purgeInterval = setInterval(async () => {
|
||||
setInterval(async () => {
|
||||
await purgeUnheardNodes();
|
||||
await purgeOldDeviceMetrics();
|
||||
await purgeOldEnvironmentMetrics();
|
||||
@@ -742,13 +765,6 @@ client.on("message", async (topic, message) => {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// check if bitfield is available, then set ok-to-mqtt
|
||||
// else leave undefined to let Prisma ignore it.
|
||||
let isOkToMqtt
|
||||
if(bitfield != null){
|
||||
isOkToMqtt = Boolean(bitfield & BITFIELD_OK_TO_MQTT_MASK);
|
||||
}
|
||||
|
||||
// create service envelope in db
|
||||
if(collectServiceEnvelopes){
|
||||
@@ -760,8 +776,6 @@ client.on("message", async (topic, message) => {
|
||||
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
|
||||
to: envelope.packet.to,
|
||||
from: envelope.packet.from,
|
||||
portnum: portnum,
|
||||
packet_id: envelope.packet.id,
|
||||
protobuf: message,
|
||||
},
|
||||
});
|
||||
@@ -826,7 +840,6 @@ client.on("message", async (topic, message) => {
|
||||
rx_snr: envelope.packet.rxSnr,
|
||||
rx_rssi: envelope.packet.rxRssi,
|
||||
hop_limit: envelope.packet.hopLimit,
|
||||
ok_to_mqtt: isOkToMqtt,
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
@@ -948,19 +961,6 @@ client.on("message", async (topic, message) => {
|
||||
hardware_model: user.hwModel,
|
||||
is_licensed: user.isLicensed === true,
|
||||
role: user.role,
|
||||
is_unmessagable: user.isUnmessagable,
|
||||
ok_to_mqtt: isOkToMqtt,
|
||||
max_hops: envelope.packet.hopStart,
|
||||
channel_id: envelope.channelId,
|
||||
|
||||
firmware_version: '<2.5.0',
|
||||
...(user.publicKey != '' && {
|
||||
firmware_version: '>2.5.0',
|
||||
public_key: user.publicKey?.toString("base64"),
|
||||
}),
|
||||
...(user.isUnmessagable != null && {
|
||||
firmware_version: '>2.6.8',
|
||||
}),
|
||||
},
|
||||
update: {
|
||||
long_name: user.longName,
|
||||
@@ -968,57 +968,12 @@ client.on("message", async (topic, message) => {
|
||||
hardware_model: user.hwModel,
|
||||
is_licensed: user.isLicensed === true,
|
||||
role: user.role,
|
||||
is_unmessagable: user.isUnmessagable,
|
||||
ok_to_mqtt: isOkToMqtt,
|
||||
max_hops: envelope.packet.hopStart,
|
||||
channel_id: envelope.channelId,
|
||||
|
||||
firmware_version: '<2.5.0',
|
||||
...(user.publicKey != '' && {
|
||||
firmware_version: '>2.5.0',
|
||||
public_key: user.publicKey?.toString("base64"),
|
||||
}),
|
||||
...(user.isUnmessagable != null && {
|
||||
firmware_version: '>2.6.8',
|
||||
}),
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
// Ignore MySQL error 1020 "Record has changed since last read" - this is a race condition
|
||||
// that occurs when multiple packets arrive concurrently for the same node
|
||||
const errorMessage = e.message || String(e);
|
||||
if (!errorMessage.includes('Record has changed since last read')) {
|
||||
console.error(e);
|
||||
}
|
||||
console.error(e);
|
||||
}
|
||||
|
||||
// Keep track of the names a node has been using.
|
||||
try {
|
||||
await prisma.NameHistory.upsert({
|
||||
where: {
|
||||
node_id_long_name_short_name: {
|
||||
node_id: envelope.packet.from,
|
||||
long_name: user.longName,
|
||||
short_name: user.shortName,
|
||||
}
|
||||
},
|
||||
create: {
|
||||
node_id: envelope.packet.from,
|
||||
long_name: user.longName,
|
||||
short_name: user.shortName,
|
||||
},
|
||||
update: {
|
||||
updated_at: new Date(),
|
||||
}
|
||||
});
|
||||
} catch (e) {
|
||||
// Ignore MySQL error 1020 "Record has changed since last read" - this is a race condition
|
||||
// that occurs when multiple packets arrive concurrently for the same node
|
||||
const errorMessage = e.message || String(e);
|
||||
if (!errorMessage.includes('Record has changed since last read')) {
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
else if(portnum === 8) {
|
||||
@@ -1094,70 +1049,6 @@ client.on("message", async (topic, message) => {
|
||||
console.error(e);
|
||||
}
|
||||
|
||||
// Extract edges from neighbour info
|
||||
try {
|
||||
const toNodeId = envelope.packet.from;
|
||||
const neighbors = neighbourInfo.neighbors || [];
|
||||
const packetId = envelope.packet.id;
|
||||
const channelId = envelope.channelId;
|
||||
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
|
||||
const edgesToCreate = [];
|
||||
|
||||
for(const neighbour of neighbors) {
|
||||
// Skip if no node ID
|
||||
if(!neighbour.nodeId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Skip if SNR is invalid (0 or null/undefined)
|
||||
// Note: SNR can be negative, so we check for 0 specifically
|
||||
if(neighbour.snr === 0 || neighbour.snr == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const fromNodeId = neighbour.nodeId;
|
||||
const snr = neighbour.snr;
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "NEIGHBORINFO_APP",
|
||||
});
|
||||
}
|
||||
|
||||
// Bulk insert edges
|
||||
if(edgesToCreate.length > 0) {
|
||||
await prisma.edge.createMany({
|
||||
data: edgesToCreate,
|
||||
skipDuplicates: true, // Skip if exact duplicate exists
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// Log error but don't crash - edge extraction is non-critical
|
||||
console.error("Error extracting edges from neighbour info:", e);
|
||||
}
|
||||
|
||||
// don't store all neighbour infos, but we want to update the existing node above
|
||||
if(!collectNeighbourInfo){
|
||||
return;
|
||||
@@ -1400,160 +1291,6 @@ client.on("message", async (topic, message) => {
|
||||
console.error(e);
|
||||
}
|
||||
|
||||
// Extract edges from traceroute (only for response packets)
|
||||
if(!envelope.packet.decoded.wantResponse) {
|
||||
try {
|
||||
const route = routeDiscovery.route || [];
|
||||
const snrTowards = routeDiscovery.snrTowards || [];
|
||||
const originNodeId = envelope.packet.to;
|
||||
const destinationNodeId = envelope.packet.from;
|
||||
const packetId = envelope.packet.id;
|
||||
const channelId = envelope.channelId;
|
||||
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
|
||||
|
||||
// Determine number of edges: route.length + 1
|
||||
const numEdges = route.length + 1;
|
||||
const edgesToCreate = [];
|
||||
|
||||
// Extract edges from the route path
|
||||
for(let i = 0; i < numEdges; i++) {
|
||||
// Get SNR for this edge
|
||||
if(i >= snrTowards.length) {
|
||||
// Array length mismatch - skip this edge
|
||||
continue;
|
||||
}
|
||||
|
||||
const snr = snrTowards[i];
|
||||
|
||||
// Skip if SNR is -128 (no SNR recorded)
|
||||
if(snr === -128) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine from_node and to_node
|
||||
let fromNodeId, toNodeId;
|
||||
|
||||
if(route.length === 0) {
|
||||
// Empty route: direct connection (to -> from)
|
||||
fromNodeId = originNodeId;
|
||||
toNodeId = destinationNodeId;
|
||||
} else if(i === 0) {
|
||||
// First edge: origin -> route[0]
|
||||
fromNodeId = originNodeId;
|
||||
toNodeId = route[0];
|
||||
} else if(i === route.length) {
|
||||
// Last edge: route[route.length-1] -> destination
|
||||
fromNodeId = route[route.length - 1];
|
||||
toNodeId = destinationNodeId;
|
||||
} else {
|
||||
// Middle edge: route[i-1] -> route[i]
|
||||
fromNodeId = route[i - 1];
|
||||
toNodeId = route[i];
|
||||
}
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record (skip if nodes don't exist, but still create edge with null positions)
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "TRACEROUTE_APP",
|
||||
});
|
||||
}
|
||||
|
||||
// Extract edges from route_back path
|
||||
const routeBack = routeDiscovery.routeBack || [];
|
||||
const snrBack = routeDiscovery.snrBack || [];
|
||||
|
||||
if(routeBack.length > 0) {
|
||||
// Number of edges in route_back equals route_back.length
|
||||
for(let i = 0; i < routeBack.length; i++) {
|
||||
// Get SNR for this edge
|
||||
if(i >= snrBack.length) {
|
||||
// Array length mismatch - skip this edge
|
||||
continue;
|
||||
}
|
||||
|
||||
const snr = snrBack[i];
|
||||
|
||||
// Skip if SNR is -128 (no SNR recorded)
|
||||
if(snr === -128) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Determine from_node and to_node
|
||||
let fromNodeId, toNodeId;
|
||||
|
||||
if(i === 0) {
|
||||
// First edge: from -> route_back[0]
|
||||
fromNodeId = destinationNodeId; // 'from' in the packet
|
||||
toNodeId = routeBack[0];
|
||||
} else {
|
||||
// Subsequent edges: route_back[i-1] -> route_back[i]
|
||||
fromNodeId = routeBack[i - 1];
|
||||
toNodeId = routeBack[i];
|
||||
}
|
||||
|
||||
// Fetch node positions from Node table
|
||||
const [fromNode, toNode] = await Promise.all([
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: fromNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
prisma.node.findUnique({
|
||||
where: { node_id: toNodeId },
|
||||
select: { latitude: true, longitude: true },
|
||||
}),
|
||||
]);
|
||||
|
||||
// Create edge record
|
||||
edgesToCreate.push({
|
||||
from_node_id: fromNodeId,
|
||||
to_node_id: toNodeId,
|
||||
snr: snr,
|
||||
from_latitude: fromNode?.latitude ?? null,
|
||||
from_longitude: fromNode?.longitude ?? null,
|
||||
to_latitude: toNode?.latitude ?? null,
|
||||
to_longitude: toNode?.longitude ?? null,
|
||||
packet_id: packetId,
|
||||
channel_id: channelId,
|
||||
gateway_id: gatewayId,
|
||||
source: "TRACEROUTE_APP",
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Bulk insert edges
|
||||
if(edgesToCreate.length > 0) {
|
||||
await prisma.edge.createMany({
|
||||
data: edgesToCreate,
|
||||
skipDuplicates: true, // Skip if exact duplicate exists
|
||||
});
|
||||
}
|
||||
} catch (e) {
|
||||
// Log error but don't crash - edge extraction is non-critical
|
||||
console.error("Error extracting edges from traceroute:", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
else if(portnum === 73) {
|
||||
@@ -1658,7 +1395,6 @@ client.on("message", async (topic, message) => {
|
||||
|| portnum === 0 // ignore UNKNOWN_APP
|
||||
|| portnum === 1 // ignore TEXT_MESSAGE_APP
|
||||
|| portnum === 5 // ignore ROUTING_APP
|
||||
|| portnum === 6 // ignore ADMIN_APP
|
||||
|| portnum === 34 // ignore PAXCOUNTER_APP
|
||||
|| portnum === 65 // ignore STORE_FORWARD_APP
|
||||
|| portnum === 66 // ignore RANGE_TEST_APP
|
||||
@@ -1675,32 +1411,6 @@ client.on("message", async (topic, message) => {
|
||||
}
|
||||
|
||||
} catch(e) {
|
||||
console.log("error", e);
|
||||
// ignore errors
|
||||
}
|
||||
});
|
||||
|
||||
// Graceful shutdown handlers
|
||||
function gracefulShutdown(signal) {
|
||||
console.log(`Received ${signal}. Starting graceful shutdown...`);
|
||||
|
||||
// Clear the purge interval if it exists
|
||||
if(purgeInterval) {
|
||||
clearInterval(purgeInterval);
|
||||
console.log('Purge interval cleared');
|
||||
}
|
||||
|
||||
// Close MQTT client
|
||||
client.end(false, async () => {
|
||||
console.log('MQTT client disconnected');
|
||||
await prisma.$disconnect();
|
||||
console.log('Database connections closed');
|
||||
console.log('Graceful shutdown completed');
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle SIGTERM (Docker, systemd, etc.)
|
||||
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
|
||||
|
||||
// Handle SIGINT (Ctrl+C)
|
||||
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
|
||||
|
||||
Submodule src/protobufs deleted from c2e45a3fc9
Binary file not shown.
|
Before Width: | Height: | Size: 4.4 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 45 KiB After Width: | Height: | Size: 48 KiB |
Binary file not shown.
|
Before Width: | Height: | Size: 5.4 KiB After Width: | Height: | Size: 189 KiB |
File diff suppressed because it is too large
Load Diff
244
src/stats.js
244
src/stats.js
@@ -1,244 +0,0 @@
|
||||
const path = require('path');
|
||||
const express = require('express');
|
||||
const router = express.Router();
|
||||
const protobufjs = require("protobufjs");
|
||||
|
||||
// create prisma db client
|
||||
const { Prisma, PrismaClient } = require("@prisma/client");
|
||||
const prisma = new PrismaClient();
|
||||
|
||||
// load protobufs
|
||||
const root = new protobufjs.Root();
|
||||
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
|
||||
root.loadSync('meshtastic/mqtt.proto');
|
||||
const HardwareModel = root.lookupEnum("HardwareModel");
|
||||
const PortNum = root.lookupEnum("PortNum");
|
||||
|
||||
|
||||
router.get('/hardware-models', async (req, res) => {
|
||||
try {
|
||||
|
||||
// get nodes from db
|
||||
const results = await prisma.node.groupBy({
|
||||
by: ['hardware_model'],
|
||||
where: {
|
||||
// Since we removed retention; only include nodes that have been updated in the last 30 days
|
||||
updated_at: {
|
||||
gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // within last 30 days
|
||||
}
|
||||
},
|
||||
orderBy: {
|
||||
_count: {
|
||||
hardware_model: 'desc',
|
||||
},
|
||||
},
|
||||
_count: {
|
||||
hardware_model: true,
|
||||
},
|
||||
});
|
||||
|
||||
const hardwareModelStats = results.map((result) => {
|
||||
return {
|
||||
count: result._count.hardware_model,
|
||||
hardware_model: result.hardware_model,
|
||||
hardware_model_name: HardwareModel.valuesById[result.hardware_model] ?? "UNKNOWN",
|
||||
};
|
||||
});
|
||||
|
||||
res.json({
|
||||
hardware_model_stats: hardwareModelStats,
|
||||
});
|
||||
|
||||
} catch(err) {
|
||||
console.error(err);
|
||||
res.status(500).json({
|
||||
message: "Something went wrong, try again later.",
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/messages-per-hour', async (req, res) => {
|
||||
try {
|
||||
const hours = 168;
|
||||
const now = new Date();
|
||||
const startTime = new Date(now.getTime() - hours * 60 * 60 * 1000);
|
||||
|
||||
const messages = await prisma.textMessage.findMany({
|
||||
where: { created_at: { gte: startTime } },
|
||||
select: { packet_id: true, created_at: true },
|
||||
distinct: ['packet_id'], // Ensures only unique packet_id entries are counted
|
||||
orderBy: { created_at: 'asc' }
|
||||
});
|
||||
|
||||
// Pre-fill `uniqueCounts` with zeros for all hours, including the current hour
|
||||
const uniqueCounts = Object.fromEntries(
|
||||
Array.from({ length: hours }, (_, i) => {
|
||||
const hourTime = new Date(now.getTime() - (hours - 1 - i) * 60 * 60 * 1000);
|
||||
const hourString = hourTime.toISOString().slice(0, 13) + ":00:00.000Z"; // zero out the minutes and seconds
|
||||
return [hourString, 0];
|
||||
})
|
||||
);
|
||||
|
||||
// Populate actual message counts
|
||||
messages.forEach(({ created_at }) => {
|
||||
const hourString = created_at.toISOString().slice(0, 13) + ":00:00.000Z"; // zero out the minutes and seconds
|
||||
uniqueCounts[hourString] = (uniqueCounts[hourString] ?? 0) + 1;
|
||||
});
|
||||
|
||||
// Convert to final result format
|
||||
const result = Object.entries(uniqueCounts).map(([hour, count]) => ({ hour, count }));
|
||||
|
||||
res.json(result);
|
||||
} catch (error) {
|
||||
console.error('Error fetching messages:', error);
|
||||
res.status(500).json({ error: 'Internal Server Error' });
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/most-active-nodes', async (req, res) => {
|
||||
try {
|
||||
const channelId = req.query.channel_id;
|
||||
const result = await prisma.$queryRaw(
|
||||
Prisma.sql`
|
||||
SELECT n.long_name, COUNT(*) AS count
|
||||
FROM (
|
||||
SELECT DISTINCT \`from\`, packet_id
|
||||
FROM service_envelopes
|
||||
WHERE
|
||||
created_at >= NOW() - INTERVAL 1 DAY
|
||||
AND packet_id IS NOT NULL
|
||||
AND portnum != 73
|
||||
AND \`to\` != 1
|
||||
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
|
||||
) AS unique_packets
|
||||
JOIN nodes n ON unique_packets.from = n.node_id
|
||||
GROUP BY n.long_name
|
||||
ORDER BY count DESC
|
||||
LIMIT 25;
|
||||
`
|
||||
);
|
||||
|
||||
res.set('Cache-Control', 'public, max-age=600'); // 10 min cache
|
||||
res.json(result);
|
||||
|
||||
} catch (error) {
|
||||
console.error('Error fetching data:', error);
|
||||
res.status(500).json({ error: 'Internal Server Error' });
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/portnum-counts', async (req, res) => {
|
||||
const nodeId = req.query.nodeId ? parseInt(req.query.nodeId, 10) : null;
|
||||
const channelId = req.query.channel_id;
|
||||
const hours = 24;
|
||||
const now = new Date();
|
||||
const startTime = new Date(now.getTime() - hours * 60 * 60 * 1000);
|
||||
|
||||
try {
|
||||
const envelopes = await prisma.serviceEnvelope.findMany({
|
||||
where: {
|
||||
created_at: { gte: startTime },
|
||||
...(Number.isInteger(nodeId) ? { from: nodeId } : {}),
|
||||
...(channelId ? { channel_id: channelId } : {}),
|
||||
packet_id: { not: null },
|
||||
to: { not: 1 }, // Filter out NODENUM_BROADCAST_NO_LORA
|
||||
OR: [
|
||||
{ portnum: { not: 73 } }, // Exclude portnum 73 (e.g. map reports)
|
||||
{ portnum: null } // But include PKI packages, they have no portnum
|
||||
]
|
||||
},
|
||||
select: {from: true, packet_id: true, portnum: true, channel_id: true}
|
||||
});
|
||||
|
||||
// Ensure uniqueness based on (from, packet_id)
|
||||
const seen = new Set();
|
||||
const counts = {};
|
||||
|
||||
for (const envelope of envelopes) {
|
||||
const uniqueKey = `${envelope.from}-${envelope.packet_id}`;
|
||||
if (seen.has(uniqueKey)) continue;
|
||||
seen.add(uniqueKey);
|
||||
|
||||
// Override portnum to 512 if channel_id is "PKI"
|
||||
const portnum = envelope.channel_id === "PKI" ? 512 : (envelope.portnum ?? 0);
|
||||
counts[portnum] = (counts[portnum] || 0) + 1;
|
||||
}
|
||||
|
||||
const result = Object.entries(counts).map(([portnum, count]) => ({
|
||||
portnum: parseInt(portnum, 10),
|
||||
count: count,
|
||||
label: parseInt(portnum, 10) === 512 ? "PKI" : (PortNum.valuesById[portnum] ?? "UNKNOWN"),
|
||||
})).sort((a, b) => a.portnum - b.portnum);
|
||||
|
||||
res.json(result);
|
||||
|
||||
} catch (err) {
|
||||
console.error("Error in /portnum-counts:", err);
|
||||
res.status(500).json({ message: "Internal server error" });
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/battery-stats', async (req, res) => {
|
||||
const days = parseInt(req.query.days || '1', 10);
|
||||
|
||||
try {
|
||||
const stats = await prisma.$queryRaw`
|
||||
SELECT id, recorded_at, avg_battery_level
|
||||
FROM battery_stats
|
||||
WHERE recorded_at >= NOW() - INTERVAL ${days} DAY
|
||||
ORDER BY recorded_at DESC;
|
||||
`;
|
||||
|
||||
res.json(stats);
|
||||
} catch (err) {
|
||||
console.error('Error fetching battery stats:', err);
|
||||
res.status(500).json({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/channel-utilization-stats', async (req, res) => {
|
||||
const days = parseInt(req.query.days || '1', 10);
|
||||
const channelId = req.query.channel_id; // optional string
|
||||
|
||||
try {
|
||||
const stats = await prisma.$queryRaw(
|
||||
Prisma.sql`
|
||||
SELECT recorded_at, channel_id, avg_channel_utilization
|
||||
FROM channel_utilization_stats
|
||||
WHERE recorded_at >= NOW() - INTERVAL ${days} DAY
|
||||
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
|
||||
ORDER BY recorded_at DESC;
|
||||
`
|
||||
);
|
||||
|
||||
res.json(stats);
|
||||
} catch (err) {
|
||||
console.error('Error fetching channel utilization stats:', err);
|
||||
res.status(500).json({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
|
||||
router.get('/channel-utilization', async (req, res) => {
|
||||
const channelId = req.query.channel_id;
|
||||
|
||||
try {
|
||||
const snapshot = await prisma.$queryRaw(
|
||||
Prisma.sql`
|
||||
SELECT recorded_at, channel_id, avg_channel_utilization
|
||||
FROM channel_utilization_stats
|
||||
WHERE recorded_at = (
|
||||
SELECT MAX(recorded_at) FROM channel_utilization_stats
|
||||
)
|
||||
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
|
||||
ORDER BY channel_id;
|
||||
`
|
||||
);
|
||||
|
||||
res.json(snapshot);
|
||||
} catch (err) {
|
||||
console.error('Error fetching latest channel utilization:', err);
|
||||
res.status(500).json({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
|
||||
module.exports = router;
|
||||
@@ -1,33 +0,0 @@
|
||||
// Override console methods to add formatted timestamps
|
||||
const originalLog = console.log;
|
||||
const originalError = console.error;
|
||||
const originalWarn = console.warn;
|
||||
const originalInfo = console.info;
|
||||
|
||||
function formatTimestamp() {
|
||||
const now = new Date();
|
||||
const year = now.getFullYear();
|
||||
const month = String(now.getMonth() + 1).padStart(2, '0');
|
||||
const day = String(now.getDate()).padStart(2, '0');
|
||||
const hours = String(now.getHours()).padStart(2, '0');
|
||||
const minutes = String(now.getMinutes()).padStart(2, '0');
|
||||
const seconds = String(now.getSeconds()).padStart(2, '0');
|
||||
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
|
||||
}
|
||||
|
||||
console.log = function(...args) {
|
||||
originalLog(`${formatTimestamp()} [Info]`, ...args);
|
||||
};
|
||||
|
||||
console.error = function(...args) {
|
||||
originalError(`${formatTimestamp()} [Error]`, ...args);
|
||||
};
|
||||
|
||||
console.warn = function(...args) {
|
||||
originalWarn(`${formatTimestamp()} [Warn]`, ...args);
|
||||
};
|
||||
|
||||
console.info = function(...args) {
|
||||
originalInfo(`${formatTimestamp()} [Info]`, ...args);
|
||||
};
|
||||
|
||||
314
src/ws.js
314
src/ws.js
@@ -1,314 +0,0 @@
|
||||
require('./utils/logger');
|
||||
const crypto = require("crypto");
|
||||
const path = require("path");
|
||||
const http = require("http");
|
||||
const mqtt = require("mqtt");
|
||||
const protobufjs = require("protobufjs");
|
||||
const commandLineArgs = require("command-line-args");
|
||||
const commandLineUsage = require("command-line-usage");
|
||||
const { WebSocketServer } = require("ws");
|
||||
|
||||
const optionsList = [
|
||||
{
|
||||
name: 'help',
|
||||
alias: 'h',
|
||||
type: Boolean,
|
||||
description: 'Display this usage guide.'
|
||||
},
|
||||
{
|
||||
name: "mqtt-broker-url",
|
||||
type: String,
|
||||
description: "MQTT Broker URL (e.g: mqtt://mqtt.meshtastic.org)",
|
||||
},
|
||||
{
|
||||
name: "mqtt-username",
|
||||
type: String,
|
||||
description: "MQTT Username (e.g: meshdev)",
|
||||
},
|
||||
{
|
||||
name: "mqtt-password",
|
||||
type: String,
|
||||
description: "MQTT Password (e.g: large4cats)",
|
||||
},
|
||||
{
|
||||
name: "mqtt-client-id",
|
||||
type: String,
|
||||
description: "MQTT Client ID (e.g: map.example.com)",
|
||||
},
|
||||
{
|
||||
name: "mqtt-topic",
|
||||
type: String,
|
||||
multiple: true,
|
||||
typeLabel: '<topic> ...',
|
||||
description: "MQTT Topic to subscribe to (e.g: msh/#)",
|
||||
},
|
||||
{
|
||||
name: "decryption-keys",
|
||||
type: String,
|
||||
multiple: true,
|
||||
typeLabel: '<base64DecryptionKey> ...',
|
||||
description: "Decryption keys encoded in base64 to use when decrypting service envelopes.",
|
||||
},
|
||||
{
|
||||
name: "ws-port",
|
||||
type: Number,
|
||||
description: "WebSocket server port (default: 8081)",
|
||||
},
|
||||
];
|
||||
|
||||
// parse command line args
|
||||
const options = commandLineArgs(optionsList);
|
||||
|
||||
// show help
|
||||
if(options.help){
|
||||
const usage = commandLineUsage([
|
||||
{
|
||||
header: 'Meshtastic WebSocket Publisher',
|
||||
content: 'Publishes real-time Meshtastic packets via WebSocket.',
|
||||
},
|
||||
{
|
||||
header: 'Options',
|
||||
optionList: optionsList,
|
||||
},
|
||||
]);
|
||||
console.log(usage);
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
// get options and fallback to default values
|
||||
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
|
||||
const mqttUsername = options["mqtt-username"] ?? "meshdev";
|
||||
const mqttPassword = options["mqtt-password"] ?? "large4cats";
|
||||
const mqttClientId = options["mqtt-client-id"] ?? null;
|
||||
const mqttTopics = options["mqtt-topic"] ?? ["msh/#"];
|
||||
const decryptionKeys = options["decryption-keys"] ?? [
|
||||
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
|
||||
];
|
||||
const wsPort = options["ws-port"] ?? 8081;
|
||||
|
||||
// create mqtt client
|
||||
const client = mqtt.connect(mqttBrokerUrl, {
|
||||
username: mqttUsername,
|
||||
password: mqttPassword,
|
||||
clientId: mqttClientId,
|
||||
});
|
||||
|
||||
// load protobufs
|
||||
const root = new protobufjs.Root();
|
||||
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
|
||||
root.loadSync('meshtastic/mqtt.proto');
|
||||
const Data = root.lookupType("Data");
|
||||
const ServiceEnvelope = root.lookupType("ServiceEnvelope");
|
||||
const RouteDiscovery = root.lookupType("RouteDiscovery");
|
||||
|
||||
// create HTTP server for WebSocket
|
||||
const server = http.createServer();
|
||||
const wss = new WebSocketServer({ server });
|
||||
|
||||
// track connected clients
|
||||
const clients = new Set();
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
clients.add(ws);
|
||||
console.log(`WebSocket client connected. Total clients: ${clients.size}`);
|
||||
|
||||
ws.on('close', () => {
|
||||
clients.delete(ws);
|
||||
console.log(`WebSocket client disconnected. Total clients: ${clients.size}`);
|
||||
});
|
||||
|
||||
ws.on('error', (error) => {
|
||||
console.error('WebSocket error:', error);
|
||||
clients.delete(ws);
|
||||
});
|
||||
});
|
||||
|
||||
// broadcast message to all connected clients
|
||||
function broadcast(message) {
|
||||
const messageStr = JSON.stringify(message);
|
||||
clients.forEach((client) => {
|
||||
if (client.readyState === 1) { // WebSocket.OPEN
|
||||
try {
|
||||
client.send(messageStr);
|
||||
} catch (error) {
|
||||
console.error('Error sending message to client:', error);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function createNonce(packetId, fromNode) {
|
||||
// Expand packetId to 64 bits
|
||||
const packetId64 = BigInt(packetId);
|
||||
|
||||
// Initialize block counter (32-bit, starts at zero)
|
||||
const blockCounter = 0;
|
||||
|
||||
// Create a buffer for the nonce
|
||||
const buf = Buffer.alloc(16);
|
||||
|
||||
// Write packetId, fromNode, and block counter to the buffer
|
||||
buf.writeBigUInt64LE(packetId64, 0);
|
||||
buf.writeUInt32LE(fromNode, 8);
|
||||
buf.writeUInt32LE(blockCounter, 12);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
/**
|
||||
* References:
|
||||
* https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42
|
||||
* https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381
|
||||
*/
|
||||
function decrypt(packet) {
|
||||
// attempt to decrypt with all available decryption keys
|
||||
for(const decryptionKey of decryptionKeys){
|
||||
try {
|
||||
// convert encryption key to buffer
|
||||
const key = Buffer.from(decryptionKey, "base64");
|
||||
|
||||
// create decryption iv/nonce for this packet
|
||||
const nonceBuffer = createNonce(packet.id, packet.from);
|
||||
|
||||
// determine algorithm based on key length
|
||||
var algorithm = null;
|
||||
if(key.length === 16){
|
||||
algorithm = "aes-128-ctr";
|
||||
} else if(key.length === 32){
|
||||
algorithm = "aes-256-ctr";
|
||||
} else {
|
||||
// skip this key, try the next one...
|
||||
console.error(`Skipping decryption key with invalid length: ${key.length}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
// create decipher
|
||||
const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer);
|
||||
|
||||
// decrypt encrypted packet
|
||||
const decryptedBuffer = Buffer.concat([decipher.update(packet.encrypted), decipher.final()]);
|
||||
|
||||
// parse as data message
|
||||
return Data.decode(decryptedBuffer);
|
||||
|
||||
} catch(e){}
|
||||
}
|
||||
|
||||
// couldn't decrypt
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* converts hex id to numeric id, for example: !FFFFFFFF to 4294967295
|
||||
* @param hexId a node id in hex format with a prepended "!"
|
||||
* @returns {bigint} the node id in numeric form
|
||||
*/
|
||||
function convertHexIdToNumericId(hexId) {
|
||||
return BigInt('0x' + hexId.replaceAll("!", ""));
|
||||
}
|
||||
|
||||
// subscribe to everything when connected
|
||||
client.on("connect", () => {
|
||||
console.log("Connected to MQTT broker");
|
||||
for(const mqttTopic of mqttTopics){
|
||||
client.subscribe(mqttTopic);
|
||||
console.log(`Subscribed to MQTT topic: ${mqttTopic}`);
|
||||
}
|
||||
});
|
||||
|
||||
// handle message received
|
||||
client.on("message", async (topic, message) => {
|
||||
try {
|
||||
// decode service envelope
|
||||
const envelope = ServiceEnvelope.decode(message);
|
||||
if(!envelope.packet){
|
||||
return;
|
||||
}
|
||||
|
||||
// attempt to decrypt encrypted packets
|
||||
const isEncrypted = envelope.packet.encrypted?.length > 0;
|
||||
if(isEncrypted){
|
||||
const decoded = decrypt(envelope.packet);
|
||||
if(decoded){
|
||||
envelope.packet.decoded = decoded;
|
||||
}
|
||||
}
|
||||
|
||||
// get portnum from decoded packet
|
||||
const portnum = envelope.packet?.decoded?.portnum;
|
||||
|
||||
// check if we can see the decrypted packet data
|
||||
if(envelope.packet.decoded == null){
|
||||
return;
|
||||
}
|
||||
|
||||
// handle traceroutes (portnum 70)
|
||||
if(portnum === 70) {
|
||||
try {
|
||||
const routeDiscovery = RouteDiscovery.decode(envelope.packet.decoded.payload);
|
||||
|
||||
const traceroute = {
|
||||
type: "traceroute",
|
||||
data: {
|
||||
to: envelope.packet.to,
|
||||
from: envelope.packet.from,
|
||||
want_response: envelope.packet.decoded.wantResponse,
|
||||
route: routeDiscovery.route,
|
||||
snr_towards: routeDiscovery.snrTowards,
|
||||
route_back: routeDiscovery.routeBack,
|
||||
snr_back: routeDiscovery.snrBack,
|
||||
channel_id: envelope.channelId,
|
||||
gateway_id: envelope.gatewayId ? Number(convertHexIdToNumericId(envelope.gatewayId)) : null,
|
||||
packet_id: envelope.packet.id,
|
||||
}
|
||||
};
|
||||
broadcast(traceroute);
|
||||
} catch (e) {
|
||||
console.error("Error processing traceroute:", e);
|
||||
}
|
||||
}
|
||||
|
||||
} catch(e) {
|
||||
console.error("Error processing MQTT message:", e);
|
||||
}
|
||||
});
|
||||
|
||||
// start WebSocket server
|
||||
server.listen(wsPort, () => {
|
||||
console.log(`WebSocket server running on port ${wsPort}`);
|
||||
});
|
||||
|
||||
// Graceful shutdown handlers
|
||||
function gracefulShutdown(signal) {
|
||||
console.log(`Received ${signal}. Starting graceful shutdown...`);
|
||||
|
||||
// Close all WebSocket connections
|
||||
clients.forEach((client) => {
|
||||
client.close();
|
||||
});
|
||||
clients.clear();
|
||||
|
||||
// Close WebSocket server
|
||||
wss.close(() => {
|
||||
console.log('WebSocket server closed');
|
||||
});
|
||||
|
||||
// Close HTTP server
|
||||
server.close(() => {
|
||||
console.log('HTTP server closed');
|
||||
});
|
||||
|
||||
// Close MQTT client
|
||||
client.end(false, () => {
|
||||
console.log('MQTT client disconnected');
|
||||
console.log('Graceful shutdown completed');
|
||||
process.exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
// Handle SIGTERM (Docker, systemd, etc.)
|
||||
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
|
||||
|
||||
// Handle SIGINT (Ctrl+C)
|
||||
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
|
||||
|
||||
Reference in New Issue
Block a user