21 Commits

Author SHA1 Message Date
Anton Roslund
eda9a12443 improve docker build time 2025-08-10 20:10:28 +02:00
liamcottle
09a2bcb3ad update announcement 2025-06-15 19:01:13 +12:00
liamcottle
de1cfd4222 update express to v5.0.0 2025-06-15 16:42:21 +12:00
liamcottle
1c1b77b3ea add note about mqtt collector 2025-06-15 16:40:08 +12:00
liamcottle
2a1ef2131a users must provide their own protobuf schema files 2025-06-15 16:35:16 +12:00
liamcottle
8a43c9d3d1 remove protobuf from server 2025-06-15 16:26:33 +12:00
liamcottle
9a18ca1057 add external to .gitignore 2025-06-15 15:28:16 +12:00
liamcottle
ffe1c6c30a remove protobufs 2025-06-15 15:17:20 +12:00
liamcottle
1aa32cfa35 use purple lines between position history markers 2025-04-25 17:36:52 +12:00
liamcottle
825b62c5bb add openssl to dockerfile 2025-04-25 17:29:45 +12:00
Liam Cottle
b87a7b2f27 Merge pull request #92 from sgtwilko/StopGraphFromCuttingOffValues
Change Voltage chart to use suggested min/max (#1)
2025-04-25 17:26:56 +12:00
liamcottle
2ab169b4ff update device image for RP2040_LORA 2025-04-25 17:20:31 +12:00
liamcottle
821d6177c3 add device image for SEEED_XIAO_S3 2025-04-25 17:12:27 +12:00
liamcottle
8acc4696db add device image for STATION_G2 2025-04-25 17:10:25 +12:00
liamcottle
cd6a99a179 remove white background from device images 2025-04-25 17:07:22 +12:00
liamcottle
90dc3ae449 rotate and optimise image 2025-04-25 16:38:00 +12:00
Liam Cottle
07e362745a Merge pull request #96 from valzzu/master
add image for nrf52 diy nodes
2025-04-25 16:37:11 +12:00
Liam Cottle
f6d14b8f95 Merge pull request #97 from dieseltravis/patch-1
replace \u00BA º (not degrees symbol) with \u00B0 ° (degrees)
2025-04-25 16:29:52 +12:00
Travis Hardiman
54ebb429d1 replace U+00BA º (not degrees) with U+00B0 ° (degrees)
U+00BA º MASCULINE ORDINAL INDICATOR vs. U+00B0 ° DEGREE SIGN (°)
2025-04-25 00:20:39 -04:00
Iris
92a649ad90 Add files via upload 2025-04-03 08:07:22 +03:00
sgtwilko
9ff76345b0 Change Voltage chart to use suggested min/max (#1)
The Voltage/Current chart often either shows lines with so little variation that you cannot see changes, or the values go off the top/bottom.

This change allows the chart to adapt dynamically to the values being returned.
2025-02-26 13:06:57 +00:00
38 changed files with 2337 additions and 5193 deletions

View File

@@ -1,3 +1,3 @@
.env
node_modules
.git
.git

View File

@@ -1,13 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: npm
directory: "/"
schedule:
interval: daily
time: '20:00'
open-pull-requests-limit: 10

2
.gitignore vendored
View File

@@ -2,3 +2,5 @@
node_modules
# Keep environment variables out of version control
.env
src/external

3
.gitmodules vendored
View File

@@ -1,3 +0,0 @@
[submodule "protobufs"]
path = src/protobufs
url = https://github.com/meshtastic/protobufs.git

View File

@@ -1,29 +1,16 @@
FROM node:lts-alpine AS build
RUN apk add --no-cache openssl
FROM node:lts-alpine
WORKDIR /app
# Copy only package files and install deps
# This layer will be cached as long as package*.json don't change
COPY package*.json package-lock.json* ./
RUN --mount=type=cache,target=/root/.npm npm ci --omit=dev
RUN npm ci
# Copy the rest of your source
COPY . .
# Pre-generate prisma client
RUN node_modules/.bin/prisma generate
FROM node:lts-alpine
RUN apk add --no-cache openssl
USER node:node
WORKDIR /app
COPY --from=build --chown=node:node /app .
EXPOSE 8080
EXPOSE 8080

View File

@@ -122,6 +122,9 @@ You will now need to restart the `index.js` and `mqtt.js` scripts.
## MQTT Collector
> Please note, due to the Meshtastic protobuf schema files being locked under a GPLv3 license, these are not provided in this MIT licensed project.
You will need to obtain these files yourself to be able to use the MQTT Collector.
By default, the [MQTT Collector](./src/mqtt.js) connects to the public Meshtastic MQTT server.
Alternatively, you may provide the relevant options shown in the help section below to connect to your own MQTT server along with your own decryption keys.

View File

@@ -30,18 +30,6 @@ services:
DATABASE_URL: "mysql://root:password@database:3306/meshtastic-map?connection_limit=100"
MAP_OPTS: "" # add any custom index.js options here
# publishes mqtt packets via websocket
meshtastic-ws:
container_name: meshtastic-ws
build:
context: .
dockerfile: ./Dockerfile
command: /app/docker/ws.sh
ports:
- 8081:8081/tcp
environment:
WS_OPTS: ""
# runs the database to store everything from mqtt
database:
container_name: database
@@ -58,7 +46,7 @@ services:
interval: 15s
timeout: 5s
retries: 6
start_period: 5s
start_interval: 5s
volumes:
database_data:

View File

@@ -1,5 +0,0 @@
#!/bin/sh
echo "Starting websocket publisher"
exec node src/ws.js ${WS_OPTS}

4225
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,18 +9,16 @@
"author": "",
"license": "ISC",
"dependencies": {
"@prisma/client": "^6.16.2",
"command-line-args": "^6.0.1",
"command-line-usage": "^7.0.3",
"compression": "^1.8.1",
"cors": "^2.8.5",
"express": "^5.2.1",
"mqtt": "^5.14.1",
"protobufjs": "^7.5.4",
"ws": "^8.18.3"
"@prisma/client": "^5.11.0",
"command-line-args": "^5.2.1",
"command-line-usage": "^7.0.1",
"compression": "^1.7.4",
"express": "^5.0.0",
"mqtt": "^5.3.6",
"protobufjs": "^7.2.6"
},
"devDependencies": {
"jest": "^30.1.3",
"prisma": "^6.16.2"
"jest": "^29.7.0",
"prisma": "^5.10.2"
}
}

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `nodes` ADD COLUMN `ok_to_mqtt` BOOLEAN NULL;

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `service_envelopes` ADD COLUMN `portnum` INTEGER NULL;

View File

@@ -1,5 +0,0 @@
-- AlterTable
ALTER TABLE `service_envelopes` ADD COLUMN `packet_id` BIGINT NULL;
-- CreateIndex
CREATE INDEX `service_envelopes_packet_id_idx` ON `service_envelopes`(`packet_id`);

View File

@@ -1,19 +0,0 @@
-- CreateTable
CREATE TABLE `battery_stats` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`recorded_at` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
`avg_battery_level` DECIMAL(5, 2) NULL,
INDEX `battery_stats_recorded_at_idx`(`recorded_at`),
PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- CreateTable
CREATE TABLE `channel_utilization_stats` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`recorded_at` DATETIME(3) NULL DEFAULT CURRENT_TIMESTAMP(3),
`avg_channel_utilization` DECIMAL(65, 30) NULL,
INDEX `channel_utilization_stats_recorded_at_idx`(`recorded_at`),
PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -1,16 +0,0 @@
-- CreateTable
CREATE TABLE `name_history` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`node_id` BIGINT NOT NULL,
`long_name` VARCHAR(191) NOT NULL,
`short_name` VARCHAR(191) NOT NULL,
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
INDEX `name_history_node_id_idx`(`node_id`),
INDEX `name_history_long_name_idx`(`long_name`),
INDEX `name_history_created_at_idx`(`created_at`),
INDEX `name_history_updated_at_idx`(`updated_at`),
UNIQUE INDEX `name_history_node_id_long_name_short_name_key`(`node_id`, `long_name`, `short_name`),
PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `nodes` ADD COLUMN `is_backbone` BOOLEAN NULL;

View File

@@ -1,3 +0,0 @@
-- AlterTable
ALTER TABLE `nodes` ADD COLUMN `is_unmessagable` BOOLEAN NULL,
ADD COLUMN `public_key` VARCHAR(191) NULL;

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `nodes` ADD COLUMN `max_hops` INTEGER NULL;

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `nodes` ADD COLUMN `channel_id` VARCHAR(191) NULL;

View File

@@ -1,5 +0,0 @@
-- AlterTable
ALTER TABLE `channel_utilization_stats` ADD COLUMN `channel_id` VARCHAR(191) NULL;
-- CreateIndex
CREATE INDEX `channel_utilization_stats_channel_id_idx` ON `channel_utilization_stats`(`channel_id`);

View File

@@ -1,2 +0,0 @@
-- AlterTable
ALTER TABLE `text_messages` ADD COLUMN `ok_to_mqtt` BOOLEAN NULL;

View File

@@ -1,23 +0,0 @@
-- CreateTable
CREATE TABLE `edges` (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`from_node_id` BIGINT NOT NULL,
`to_node_id` BIGINT NOT NULL,
`snr` INTEGER NOT NULL,
`from_latitude` INTEGER NULL,
`from_longitude` INTEGER NULL,
`to_latitude` INTEGER NULL,
`to_longitude` INTEGER NULL,
`packet_id` BIGINT NOT NULL,
`channel_id` VARCHAR(191) NULL,
`gateway_id` BIGINT NULL,
`source` VARCHAR(191) NOT NULL,
`created_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
INDEX `edges_from_node_id_idx`(`from_node_id`),
INDEX `edges_to_node_id_idx`(`to_node_id`),
INDEX `edges_created_at_idx`(`created_at`),
INDEX `edges_from_node_id_to_node_id_idx`(`from_node_id`, `to_node_id`),
PRIMARY KEY (`id`)
) DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

View File

@@ -21,8 +21,6 @@ model Node {
hardware_model Int
role Int
is_licensed Boolean?
public_key String?
is_unmessagable Boolean?
firmware_version String?
region Int?
@@ -53,12 +51,6 @@ model Node {
// this column tracks when an mqtt gateway node uplinked a packet
mqtt_connection_state_updated_at DateTime?
ok_to_mqtt Boolean?
is_backbone Boolean?
max_hops Int?
channel_id String?
created_at DateTime @default(now())
updated_at DateTime @default(now()) @updatedAt
@@ -210,8 +202,6 @@ model ServiceEnvelope {
gateway_id BigInt?
to BigInt
from BigInt
portnum Int?
packet_id BigInt?
protobuf Bytes
created_at DateTime @default(now())
@@ -220,7 +210,6 @@ model ServiceEnvelope {
@@index(created_at)
@@index(updated_at)
@@index(gateway_id)
@@index(packet_id)
@@map("service_envelopes")
}
@@ -239,7 +228,6 @@ model TextMessage {
rx_snr Decimal?
rx_rssi Int?
hop_limit Int?
ok_to_mqtt Boolean?
created_at DateTime @default(now())
updated_at DateTime @default(now()) @updatedAt
@@ -308,67 +296,3 @@ model Waypoint {
@@index(gateway_id)
@@map("waypoints")
}
model NameHistory {
id BigInt @id @default(autoincrement())
node_id BigInt
long_name String
short_name String
created_at DateTime @default(now())
updated_at DateTime @default(now()) @updatedAt
@@index(node_id)
@@index(long_name)
@@index(created_at)
@@index(updated_at)
@@map("name_history")
// We only want to keep track of unique name and node_id combinations
@@unique([node_id, long_name, short_name])
}
model BatteryStats {
id BigInt @id @default(autoincrement())
recorded_at DateTime? @default(now())
avg_battery_level Decimal? @db.Decimal(5, 2)
@@index([recorded_at])
@@map("battery_stats")
}
model ChannelUtilizationStats {
id BigInt @id @default(autoincrement())
recorded_at DateTime? @default(now())
avg_channel_utilization Decimal?
channel_id String?
@@index([channel_id])
@@index([recorded_at])
@@map("channel_utilization_stats")
}
model Edge {
id BigInt @id @default(autoincrement())
from_node_id BigInt
to_node_id BigInt
snr Int
from_latitude Int?
from_longitude Int?
to_latitude Int?
to_longitude Int?
packet_id BigInt
channel_id String?
gateway_id BigInt?
source String
created_at DateTime @default(now())
updated_at DateTime @default(now()) @updatedAt
@@index(from_node_id)
@@index(to_node_id)
@@index(created_at)
@@index([from_node_id, to_node_id])
@@map("edges")
}

View File

@@ -1,7 +1,6 @@
// node src/admin.js --purge-node-id 123
// node src/admin.js --purge-node-id '!AABBCCDD'
require('./utils/logger');
const commandLineArgs = require("command-line-args");
const commandLineUsage = require("command-line-usage");

View File

@@ -1,13 +1,9 @@
require('./utils/logger');
const fs = require("fs");
const path = require('path');
const express = require('express');
const compression = require('compression');
const protobufjs = require("protobufjs");
const commandLineArgs = require("command-line-args");
const commandLineUsage = require("command-line-usage");
const cors = require('cors');
const statsRoutes = require('./stats.js');
// create prisma db client
const { PrismaClient } = require("@prisma/client");
@@ -54,24 +50,21 @@ if(options.help){
// get options and fallback to default values
const port = options["port"] ?? 8080;
// load protobufs
const root = new protobufjs.Root();
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
root.loadSync('meshtastic/mqtt.proto');
const HardwareModel = root.lookupEnum("HardwareModel");
const Role = root.lookupEnum("Config.DeviceConfig.Role");
const RegionCode = root.lookupEnum("Config.LoRaConfig.RegionCode");
const ModemPreset = root.lookupEnum("Config.LoRaConfig.ModemPreset");
// load json
const hardwareModels = JSON.parse(fs.readFileSync(path.join(__dirname, "json/hardware_models.json"), "utf-8"));
const roles = JSON.parse(fs.readFileSync(path.join(__dirname, "json/roles.json"), "utf-8"));
const regionCodes = JSON.parse(fs.readFileSync(path.join(__dirname, "json/region_codes.json"), "utf-8"));
const modemPresets = JSON.parse(fs.readFileSync(path.join(__dirname, "json/modem_presets.json"), "utf-8"));
// appends extra info for node objects returned from api
function formatNodeInfo(node) {
return {
...node,
node_id_hex: "!" + node.node_id.toString(16),
hardware_model_name: HardwareModel.valuesById[node.hardware_model] ?? null,
role_name: Role.valuesById[node.role] ?? null,
region_name: RegionCode.valuesById[node.region] ?? null,
modem_preset_name: ModemPreset.valuesById[node.modem_preset] ?? null,
hardware_model_name: hardwareModels[node.hardware_model] ?? null,
role_name: roles[node.role] ?? null,
region_name: regionCodes[node.region] ?? null,
modem_preset_name: modemPresets[node.modem_preset] ?? null,
};
}
@@ -80,9 +73,6 @@ const app = express();
// enable compression
app.use(compression());
// Apply CORS only to API routes
app.use('/api', cors());
// serve files inside the public folder from /
app.use('/', express.static(path.join(__dirname, 'public')));
@@ -90,9 +80,6 @@ app.get('/', async (req, res) => {
res.sendFile(path.join(__dirname, 'public/index.html'));
});
// stats API in separate file
app.use('/api/v1/stats', statsRoutes);
app.get('/api', async (req, res) => {
const links = [
@@ -147,23 +134,6 @@ app.get('/api', async (req, res) => {
"path": "/api/v1/nodes/:nodeId/traceroutes",
"description": "Trace routes for a meshtastic node",
},
{
"path": "/api/v1/traceroutes",
"description": "Recent traceroute edges across all nodes",
"params": {
"time_from": "Only include traceroutes updated after this unix timestamp (milliseconds)",
"time_to": "Only include traceroutes updated before this unix timestamp (milliseconds)"
}
},
{
"path": "/api/v1/connections",
"description": "Aggregated edges between nodes from traceroutes",
"params": {
"node_id": "Only include connections involving this node id",
"time_from": "Only include edges created after this unix timestamp (milliseconds)",
"time_to": "Only include edges created before this unix timestamp (milliseconds)"
}
},
{
"path": "/api/v1/nodes/:nodeId/position-history",
"description": "Position history for a meshtastic node",
@@ -229,10 +199,6 @@ app.get('/api/v1/nodes', async (req, res) => {
where: {
role: role,
hardware_model: hardwareModel,
// Since we removed retention; only include nodes that have been updated in the last 30 days
updated_at: {
gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // within last 30 days
}
},
});
@@ -547,22 +513,10 @@ app.get('/api/v1/nodes/:nodeId/traceroutes', async (req, res) => {
return;
}
// get latest traceroutes, deduplicated by packet_id
// get latest traceroutes
// We want replies where want_response is false and it will be "to" the
// requester.
// Deduplicate by packet_id, keeping the latest traceroute (highest id) for each packet_id
const traceroutes = await prisma.$queryRaw`
SELECT t1.*
FROM traceroutes t1
INNER JOIN (
SELECT packet_id, MAX(id) as max_id
FROM traceroutes
WHERE want_response = false and \`to\` = ${node.node_id} and gateway_id is not null
GROUP BY packet_id
) t2 ON t1.packet_id = t2.packet_id AND t1.id = t2.max_id
ORDER BY t1.id DESC
LIMIT ${count}
`;
const traceroutes = await prisma.$queryRaw`SELECT * FROM traceroutes WHERE want_response = false and \`to\` = ${node.node_id} and gateway_id is not null order by id desc limit ${count}`;
res.json({
traceroutes: traceroutes.map((trace) => {
@@ -600,313 +554,6 @@ app.get('/api/v1/nodes/:nodeId/traceroutes', async (req, res) => {
}
});
// Aggregated recent traceroute edges (global), filtered by updated_at
// Returns deduplicated edges with the latest SNR and timestamp.
// GET /api/v1/nodes/traceroutes?time_from=...&time_to=...
app.get('/api/v1/traceroutes', async (req, res) => {
try {
const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined;
const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined;
// Pull recent traceroutes within the time window. We only want replies (want_response=false)
// and those that were actually gated to MQTT (gateway_id not null)
const traces = await prisma.traceRoute.findMany({
where: {
want_response: false,
gateway_id: { not: null },
updated_at: {
gte: timeFrom ? new Date(timeFrom) : undefined,
lte: timeTo ? new Date(timeTo) : undefined,
},
},
orderBy: { id: 'desc' },
take: 5000, // cap to keep response bounded; UI can page/adjust time window if needed
});
// Normalize JSON fields that may be strings (depending on driver)
const normalized = traces.map((t) => {
const trace = { ...t };
if (typeof trace.route === 'string') {
try { trace.route = JSON.parse(trace.route); } catch(_) {}
}
if (typeof trace.route_back === 'string') {
try { trace.route_back = JSON.parse(trace.route_back); } catch(_) {}
}
if (typeof trace.snr_towards === 'string') {
try { trace.snr_towards = JSON.parse(trace.snr_towards); } catch(_) {}
}
if (typeof trace.snr_back === 'string') {
try { trace.snr_back = JSON.parse(trace.snr_back); } catch(_) {}
}
return trace;
});
// Build edges from both forward (towards) and reverse (back) paths.
// Forward path: to → route[] → from, using snr_towards
// Reverse path: from → route_back[] → to, using snr_back
const edgeKey = (a, b) => `${String(a)}->${String(b)}`;
const edges = new Map();
function upsertEdgesFromPath(trace, pathNodes, pathSnrs) {
for (let i = 0; i < pathNodes.length - 1; i++) {
const hopFrom = pathNodes[i];
const hopTo = pathNodes[i + 1];
const snr = typeof (pathSnrs && pathSnrs[i]) === 'number' ? pathSnrs[i] : null;
// Skip edges without SNR data
if (snr === null) continue;
const key = edgeKey(hopFrom, hopTo);
const existing = edges.get(key);
if (!existing) {
edges.set(key, {
from: hopFrom,
to: hopTo,
snr: snr,
updated_at: trace.updated_at,
channel_id: trace.channel_id ?? null,
gateway_id: trace.gateway_id ?? null,
traceroute_from: trace.from, // original initiator
traceroute_to: trace.to, // original target
});
} else if (new Date(trace.updated_at) > new Date(existing.updated_at)) {
existing.snr = snr;
existing.updated_at = trace.updated_at;
existing.channel_id = trace.channel_id ?? existing.channel_id;
existing.gateway_id = trace.gateway_id ?? existing.gateway_id;
existing.traceroute_from = trace.from;
existing.traceroute_to = trace.to;
}
}
}
for (const tr of normalized) {
// Forward path
const forwardPath = [];
if (tr.to != null) forwardPath.push(Number(tr.to));
if (Array.isArray(tr.route)) {
for (const hop of tr.route) {
if (hop != null) forwardPath.push(Number(hop));
}
}
if (tr.from != null) forwardPath.push(Number(tr.from));
const forwardSnrs = Array.isArray(tr.snr_towards) ? tr.snr_towards : [];
upsertEdgesFromPath(tr, forwardPath, forwardSnrs);
// Reverse path
const reversePath = [];
if (tr.from != null) reversePath.push(Number(tr.from));
if (Array.isArray(tr.route_back)) {
for (const hop of tr.route_back) {
if (hop != null) reversePath.push(Number(hop));
}
}
if (tr.to != null) reversePath.push(Number(tr.to));
const reverseSnrs = Array.isArray(tr.snr_back) ? tr.snr_back : [];
upsertEdgesFromPath(tr, reversePath, reverseSnrs);
}
res.json({
traceroute_edges: Array.from(edges.values()),
});
} catch (err) {
console.error(err);
res.status(500).json({
message: "Something went wrong, try again later.",
});
}
});
// Aggregated edges endpoint
// GET /api/v1/connections?node_id=...&time_from=...&time_to=...
app.get('/api/v1/connections', async (req, res) => {
try {
const nodeId = req.query.node_id ? parseInt(req.query.node_id) : undefined;
const timeFrom = req.query.time_from ? parseInt(req.query.time_from) : undefined;
const timeTo = req.query.time_to ? parseInt(req.query.time_to) : undefined;
// Query edges from database
const edges = await prisma.edge.findMany({
where: {
created_at: {
...(timeFrom && { gte: new Date(timeFrom) }),
...(timeTo && { lte: new Date(timeTo) }),
},
// Only include edges where both nodes have positions
from_latitude: { not: null },
from_longitude: { not: null },
to_latitude: { not: null },
to_longitude: { not: null },
// If node_id is provided, filter edges where either from_node_id or to_node_id matches
...(nodeId !== undefined && {
OR: [
{ from_node_id: nodeId },
{ to_node_id: nodeId },
],
}),
},
orderBy: [
{ created_at: 'desc' },
{ packet_id: 'desc' },
],
});
// Collect all unique node IDs from edges
const nodeIds = new Set();
for (const edge of edges) {
nodeIds.add(edge.from_node_id);
nodeIds.add(edge.to_node_id);
}
// Fetch current positions for all nodes
const nodes = await prisma.node.findMany({
where: {
node_id: { in: Array.from(nodeIds) },
},
select: {
node_id: true,
latitude: true,
longitude: true,
},
});
// Create a map of current node positions
const nodePositions = new Map();
for (const node of nodes) {
nodePositions.set(node.node_id, {
latitude: node.latitude,
longitude: node.longitude,
});
}
// Filter edges: only include edges where both nodes are still at the same location
const validEdges = edges.filter(edge => {
const fromCurrentPos = nodePositions.get(edge.from_node_id);
const toCurrentPos = nodePositions.get(edge.to_node_id);
// Skip if either node doesn't exist or doesn't have a current position
if (!fromCurrentPos || !toCurrentPos ||
fromCurrentPos.latitude === null || fromCurrentPos.longitude === null ||
toCurrentPos.latitude === null || toCurrentPos.longitude === null) {
return false;
}
// Check if stored positions match current positions
const fromMatches = fromCurrentPos.latitude === edge.from_latitude &&
fromCurrentPos.longitude === edge.from_longitude;
const toMatches = toCurrentPos.latitude === edge.to_latitude &&
toCurrentPos.longitude === edge.to_longitude;
return fromMatches && toMatches;
});
// Normalize node pairs: always use min/max to treat A->B and B->A as same connection
const connectionsMap = new Map();
for (const edge of validEdges) {
const nodeA = edge.from_node_id < edge.to_node_id ? edge.from_node_id : edge.to_node_id;
const nodeB = edge.from_node_id < edge.to_node_id ? edge.to_node_id : edge.from_node_id;
const key = `${nodeA}-${nodeB}`;
if (!connectionsMap.has(key)) {
connectionsMap.set(key, {
node_a: nodeA,
node_b: nodeB,
direction_ab: [], // A -> B edges
direction_ba: [], // B -> A edges
});
}
const connection = connectionsMap.get(key);
const isAB = edge.from_node_id === nodeA;
// Add edge to appropriate direction
if (isAB) {
connection.direction_ab.push({
snr: edge.snr,
snr_db: edge.snr / 4, // Convert to dB
created_at: edge.created_at,
packet_id: edge.packet_id,
source: edge.source,
});
} else {
connection.direction_ba.push({
snr: edge.snr,
snr_db: edge.snr / 4,
created_at: edge.created_at,
packet_id: edge.packet_id,
source: edge.source,
});
}
}
// Aggregate each connection
const connections = Array.from(connectionsMap.values()).map(conn => {
// Deduplicate edges by packet_id for each direction (keep first occurrence, which is most recent)
const dedupeByPacketId = (edges) => {
const seen = new Set();
return edges.filter(edge => {
if (seen.has(edge.packet_id)) {
return false;
}
seen.add(edge.packet_id);
return true;
});
};
const deduplicatedAB = dedupeByPacketId(conn.direction_ab);
const deduplicatedBA = dedupeByPacketId(conn.direction_ba);
// Calculate average SNR for A->B (using deduplicated edges)
const avgSnrAB = deduplicatedAB.length > 0
? deduplicatedAB.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedAB.length
: null;
// Calculate average SNR for B->A (using deduplicated edges)
const avgSnrBA = deduplicatedBA.length > 0
? deduplicatedBA.reduce((sum, e) => sum + e.snr_db, 0) / deduplicatedBA.length
: null;
// Get last 5 edges for each direction (already sorted by created_at DESC, packet_id DESC, now deduplicated)
const last5AB = deduplicatedAB.slice(0, 5);
const last5BA = deduplicatedBA.slice(0, 5);
// Determine worst average SNR
const worstAvgSnrDb = [avgSnrAB, avgSnrBA]
.filter(v => v !== null)
.reduce((min, val) => val < min ? val : min, Infinity);
return {
node_a: conn.node_a,
node_b: conn.node_b,
direction_ab: {
avg_snr_db: avgSnrAB,
last_5_edges: last5AB,
total_count: deduplicatedAB.length, // Use deduplicated count
},
direction_ba: {
avg_snr_db: avgSnrBA,
last_5_edges: last5BA,
total_count: deduplicatedBA.length, // Use deduplicated count
},
worst_avg_snr_db: worstAvgSnrDb !== Infinity ? worstAvgSnrDb : null,
};
}).filter(conn => conn.worst_avg_snr_db !== null); // Only return connections with at least one direction
res.json({
connections: connections,
});
} catch (err) {
console.error(err);
res.status(500).json({
message: "Something went wrong, try again later.",
});
}
});
app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => {
try {
@@ -996,6 +643,42 @@ app.get('/api/v1/nodes/:nodeId/position-history', async (req, res) => {
}
});
app.get('/api/v1/stats/hardware-models', async (req, res) => {
try {
// get nodes from db
const results = await prisma.node.groupBy({
by: ['hardware_model'],
orderBy: {
_count: {
hardware_model: 'desc',
},
},
_count: {
hardware_model: true,
},
});
const hardwareModelStats = results.map((result) => {
return {
count: result._count.hardware_model,
hardware_model: result.hardware_model,
hardware_model_name: hardwareModels[result.hardware_model] ?? "UNKNOWN",
};
});
res.json({
hardware_model_stats: hardwareModelStats,
});
} catch(err) {
console.error(err);
res.status(500).json({
message: "Something went wrong, try again later.",
});
}
});
app.get('/api/v1/text-messages', async (req, res) => {
try {
@@ -1122,29 +805,8 @@ app.get('/api/v1/waypoints', async (req, res) => {
}
});
// start express server
const listener = app.listen(port, () => {
const port = listener.address().port;
console.log(`Server running at http://127.0.0.1:${port}`);
});
// Graceful shutdown handlers
function gracefulShutdown(signal) {
console.log(`Received ${signal}. Starting graceful shutdown...`);
// Stop accepting new connections
listener.close(async (err) => {
console.log('HTTP server closed');
await prisma.$disconnect();
console.log('Database connections closed');
console.log('Graceful shutdown completed');
process.exit(0);
});
}
// Handle SIGTERM (Docker, systemd, etc.)
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Handle SIGINT (Ctrl+C)
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

View File

@@ -0,0 +1,108 @@
{
"0": "UNSET",
"1": "TLORA_V2",
"2": "TLORA_V1",
"3": "TLORA_V2_1_1P6",
"4": "TBEAM",
"5": "HELTEC_V2_0",
"6": "TBEAM_V0P7",
"7": "T_ECHO",
"8": "TLORA_V1_1P3",
"9": "RAK4631",
"10": "HELTEC_V2_1",
"11": "HELTEC_V1",
"12": "LILYGO_TBEAM_S3_CORE",
"13": "RAK11200",
"14": "NANO_G1",
"15": "TLORA_V2_1_1P8",
"16": "TLORA_T3_S3",
"17": "NANO_G1_EXPLORER",
"18": "NANO_G2_ULTRA",
"19": "LORA_TYPE",
"20": "WIPHONE",
"21": "WIO_WM1110",
"22": "RAK2560",
"23": "HELTEC_HRU_3601",
"24": "HELTEC_WIRELESS_BRIDGE",
"25": "STATION_G1",
"26": "RAK11310",
"27": "SENSELORA_RP2040",
"28": "SENSELORA_S3",
"29": "CANARYONE",
"30": "RP2040_LORA",
"31": "STATION_G2",
"32": "LORA_RELAY_V1",
"33": "NRF52840DK",
"34": "PPR",
"35": "GENIEBLOCKS",
"36": "NRF52_UNKNOWN",
"37": "PORTDUINO",
"38": "ANDROID_SIM",
"39": "DIY_V1",
"40": "NRF52840_PCA10059",
"41": "DR_DEV",
"42": "M5STACK",
"43": "HELTEC_V3",
"44": "HELTEC_WSL_V3",
"45": "BETAFPV_2400_TX",
"46": "BETAFPV_900_NANO_TX",
"47": "RPI_PICO",
"48": "HELTEC_WIRELESS_TRACKER",
"49": "HELTEC_WIRELESS_PAPER",
"50": "T_DECK",
"51": "T_WATCH_S3",
"52": "PICOMPUTER_S3",
"53": "HELTEC_HT62",
"54": "EBYTE_ESP32_S3",
"55": "ESP32_S3_PICO",
"56": "CHATTER_2",
"57": "HELTEC_WIRELESS_PAPER_V1_0",
"58": "HELTEC_WIRELESS_TRACKER_V1_0",
"59": "UNPHONE",
"60": "TD_LORAC",
"61": "CDEBYTE_EORA_S3",
"62": "TWC_MESH_V4",
"63": "NRF52_PROMICRO_DIY",
"64": "RADIOMASTER_900_BANDIT_NANO",
"65": "HELTEC_CAPSULE_SENSOR_V3",
"66": "HELTEC_VISION_MASTER_T190",
"67": "HELTEC_VISION_MASTER_E213",
"68": "HELTEC_VISION_MASTER_E290",
"69": "HELTEC_MESH_NODE_T114",
"70": "SENSECAP_INDICATOR",
"71": "TRACKER_T1000_E",
"72": "RAK3172",
"73": "WIO_E5",
"74": "RADIOMASTER_900_BANDIT",
"75": "ME25LS01_4Y10TD",
"76": "RP2040_FEATHER_RFM95",
"77": "M5STACK_COREBASIC",
"78": "M5STACK_CORE2",
"79": "RPI_PICO2",
"80": "M5STACK_CORES3",
"81": "SEEED_XIAO_S3",
"82": "MS24SF1",
"83": "TLORA_C6",
"84": "WISMESH_TAP",
"85": "ROUTASTIC",
"86": "MESH_TAB",
"87": "MESHLINK",
"88": "XIAO_NRF52_KIT",
"89": "THINKNODE_M1",
"90": "THINKNODE_M2",
"91": "T_ETH_ELITE",
"92": "HELTEC_SENSOR_HUB",
"93": "RESERVED_FRIED_CHICKEN",
"94": "HELTEC_MESH_POCKET",
"95": "SEEED_SOLAR_NODE",
"96": "NOMADSTAR_METEOR_PRO",
"97": "CROWPANEL",
"98": "LINK_32",
"99": "SEEED_WIO_TRACKER_L1",
"100": "SEEED_WIO_TRACKER_L1_EINK",
"101": "QWANTZ_TINY_ARMS",
"102": "T_DECK_PRO",
"103": "T_LORA_PAGER",
"104": "GAT562_MESH_TRIAL_TRACKER",
"255": "PRIVATE_HW"
}

View File

@@ -0,0 +1,11 @@
{
"0": "LONG_FAST",
"1": "LONG_SLOW",
"2": "VERY_LONG_SLOW",
"3": "MEDIUM_SLOW",
"4": "MEDIUM_FAST",
"5": "SHORT_SLOW",
"6": "SHORT_FAST",
"7": "LONG_MODERATE",
"8": "SHORT_TURBO"
}

View File

@@ -0,0 +1,24 @@
{
"0": "UNSET",
"1": "US",
"2": "EU_433",
"3": "EU_868",
"4": "CN",
"5": "JP",
"6": "ANZ",
"7": "KR",
"8": "TW",
"9": "RU",
"10": "IN",
"11": "NZ_865",
"12": "TH",
"13": "LORA_24",
"14": "UA_433",
"15": "UA_868",
"16": "MY_433",
"17": "MY_919",
"18": "SG_923",
"19": "PH_433",
"20": "PH_868",
"21": "PH_915"
}

14
src/json/roles.json Normal file
View File

@@ -0,0 +1,14 @@
{
"0": "CLIENT",
"1": "CLIENT_MUTE",
"2": "ROUTER",
"3": "ROUTER_CLIENT",
"4": "REPEATER",
"5": "TRACKER",
"6": "SENSOR",
"7": "TAK",
"8": "CLIENT_HIDDEN",
"9": "LOST_AND_FOUND",
"10": "TAK_TRACKER",
"11": "ROUTER_LATE"
}

View File

@@ -1,4 +1,4 @@
require('./utils/logger');
const fs = require("fs");
const crypto = require("crypto");
const path = require("path");
const mqtt = require("mqtt");
@@ -22,6 +22,11 @@ const optionsList = [
type: Boolean,
description: 'Display this usage guide.'
},
{
name: "protobufs-path",
type: String,
description: "Path to Protobufs (e.g: ../../protobufs)",
},
{
name: "mqtt-broker-url",
type: String,
@@ -207,6 +212,7 @@ if(options.help){
}
// get options and fallback to default values
const protobufsPath = options["protobufs-path"] ?? path.join(path.dirname(__filename), "external/protobufs");
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
const mqttUsername = options["mqtt-username"] ?? "meshdev";
const mqttPassword = options["mqtt-password"] ?? "large4cats";
@@ -223,7 +229,6 @@ const collectNeighbourInfo = options["collect-neighbour-info"] ?? false;
const collectMapReports = options["collect-map-reports"] ?? false;
const decryptionKeys = options["decryption-keys"] ?? [
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
"PjG/mVAqnannyvqmuYAwd0LZa1AV+wkcUQlacmexEXY=", // Årsta mesh? länkad av [x/0!] divideByZero i meshen
];
const dropPacketsNotOkToMqtt = options["drop-packets-not-ok-to-mqtt"] ?? false;
const dropPortnumsWithoutBitfield = options["drop-portnums-without-bitfield"] ?? null;
@@ -242,6 +247,25 @@ const purgeTextMessagesAfterSeconds = options["purge-text-messages-after-seconds
const purgeTraceroutesAfterSeconds = options["purge-traceroutes-after-seconds"] ?? null;
const purgeWaypointsAfterSeconds = options["purge-waypoints-after-seconds"] ?? null;
// ensure protobufs exist
if(!fs.existsSync(path.join(protobufsPath, "meshtastic/mqtt.proto"))){
console.error([
"ERROR: MQTT Collector requires Meshtastic protobufs.",
"",
"This project is licensed under the MIT license to allow end users to do as they wish.",
"Unfortunately, the Meshtastic protobuf schema files are licensed under GPLv3, which means they can not be bundled in this project due to license conflicts.",
"https://github.com/liamcottle/meshtastic-map/issues/102",
"https://github.com/meshtastic/protobufs/issues/695",
"",
"If you clone and install the Meshtastic protobufs as described below, your use of those files will be subject to the GPLv3 license.",
"This does not change the license of this project being MIT. Only the parts you add from the Meshtastic project are covered under GPLv3.",
"",
"To use the MQTT Collector, please clone the Meshtastic protobufs into src/external/protobufs",
"git clone https://github.com/meshtastic/protobufs src/external/protobufs",
].join("\n"));
return;
}
// create mqtt client
const client = mqtt.connect(mqttBrokerUrl, {
username: mqttUsername,
@@ -251,7 +275,7 @@ const client = mqtt.connect(mqttBrokerUrl, {
// load protobufs
const root = new protobufjs.Root();
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
root.resolvePath = (origin, target) => path.join(protobufsPath, target);
root.loadSync('meshtastic/mqtt.proto');
const Data = root.lookupType("Data");
const ServiceEnvelope = root.lookupType("ServiceEnvelope");
@@ -264,9 +288,8 @@ const User = root.lookupType("User");
const Waypoint = root.lookupType("Waypoint");
// run automatic purge if configured
let purgeInterval = null;
if(purgeIntervalSeconds){
purgeInterval = setInterval(async () => {
setInterval(async () => {
await purgeUnheardNodes();
await purgeOldDeviceMetrics();
await purgeOldEnvironmentMetrics();
@@ -742,13 +765,6 @@ client.on("message", async (topic, message) => {
}
}
// check if bitfield is available, then set ok-to-mqtt
// else leave undefined to let Prisma ignore it.
let isOkToMqtt
if(bitfield != null){
isOkToMqtt = Boolean(bitfield & BITFIELD_OK_TO_MQTT_MASK);
}
// create service envelope in db
if(collectServiceEnvelopes){
@@ -760,8 +776,6 @@ client.on("message", async (topic, message) => {
gateway_id: envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null,
to: envelope.packet.to,
from: envelope.packet.from,
portnum: portnum,
packet_id: envelope.packet.id,
protobuf: message,
},
});
@@ -826,7 +840,6 @@ client.on("message", async (topic, message) => {
rx_snr: envelope.packet.rxSnr,
rx_rssi: envelope.packet.rxRssi,
hop_limit: envelope.packet.hopLimit,
ok_to_mqtt: isOkToMqtt,
},
});
} catch (e) {
@@ -948,19 +961,6 @@ client.on("message", async (topic, message) => {
hardware_model: user.hwModel,
is_licensed: user.isLicensed === true,
role: user.role,
is_unmessagable: user.isUnmessagable,
ok_to_mqtt: isOkToMqtt,
max_hops: envelope.packet.hopStart,
channel_id: envelope.channelId,
firmware_version: '<2.5.0',
...(user.publicKey != '' && {
firmware_version: '>2.5.0',
public_key: user.publicKey?.toString("base64"),
}),
...(user.isUnmessagable != null && {
firmware_version: '>2.6.8',
}),
},
update: {
long_name: user.longName,
@@ -968,57 +968,12 @@ client.on("message", async (topic, message) => {
hardware_model: user.hwModel,
is_licensed: user.isLicensed === true,
role: user.role,
is_unmessagable: user.isUnmessagable,
ok_to_mqtt: isOkToMqtt,
max_hops: envelope.packet.hopStart,
channel_id: envelope.channelId,
firmware_version: '<2.5.0',
...(user.publicKey != '' && {
firmware_version: '>2.5.0',
public_key: user.publicKey?.toString("base64"),
}),
...(user.isUnmessagable != null && {
firmware_version: '>2.6.8',
}),
},
});
} catch (e) {
// Ignore MySQL error 1020 "Record has changed since last read" - this is a race condition
// that occurs when multiple packets arrive concurrently for the same node
const errorMessage = e.message || String(e);
if (!errorMessage.includes('Record has changed since last read')) {
console.error(e);
}
console.error(e);
}
// Keep track of the names a node has been using.
try {
await prisma.NameHistory.upsert({
where: {
node_id_long_name_short_name: {
node_id: envelope.packet.from,
long_name: user.longName,
short_name: user.shortName,
}
},
create: {
node_id: envelope.packet.from,
long_name: user.longName,
short_name: user.shortName,
},
update: {
updated_at: new Date(),
}
});
} catch (e) {
// Ignore MySQL error 1020 "Record has changed since last read" - this is a race condition
// that occurs when multiple packets arrive concurrently for the same node
const errorMessage = e.message || String(e);
if (!errorMessage.includes('Record has changed since last read')) {
console.error(e);
}
}
}
else if(portnum === 8) {
@@ -1094,70 +1049,6 @@ client.on("message", async (topic, message) => {
console.error(e);
}
// Extract edges from neighbour info
try {
const toNodeId = envelope.packet.from;
const neighbors = neighbourInfo.neighbors || [];
const packetId = envelope.packet.id;
const channelId = envelope.channelId;
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
const edgesToCreate = [];
for(const neighbour of neighbors) {
// Skip if no node ID
if(!neighbour.nodeId) {
continue;
}
// Skip if SNR is invalid (0 or null/undefined)
// Note: SNR can be negative, so we check for 0 specifically
if(neighbour.snr === 0 || neighbour.snr == null) {
continue;
}
const fromNodeId = neighbour.nodeId;
const snr = neighbour.snr;
// Fetch node positions from Node table
const [fromNode, toNode] = await Promise.all([
prisma.node.findUnique({
where: { node_id: fromNodeId },
select: { latitude: true, longitude: true },
}),
prisma.node.findUnique({
where: { node_id: toNodeId },
select: { latitude: true, longitude: true },
}),
]);
// Create edge record
edgesToCreate.push({
from_node_id: fromNodeId,
to_node_id: toNodeId,
snr: snr,
from_latitude: fromNode?.latitude ?? null,
from_longitude: fromNode?.longitude ?? null,
to_latitude: toNode?.latitude ?? null,
to_longitude: toNode?.longitude ?? null,
packet_id: packetId,
channel_id: channelId,
gateway_id: gatewayId,
source: "NEIGHBORINFO_APP",
});
}
// Bulk insert edges
if(edgesToCreate.length > 0) {
await prisma.edge.createMany({
data: edgesToCreate,
skipDuplicates: true, // Skip if exact duplicate exists
});
}
} catch (e) {
// Log error but don't crash - edge extraction is non-critical
console.error("Error extracting edges from neighbour info:", e);
}
// don't store all neighbour infos, but we want to update the existing node above
if(!collectNeighbourInfo){
return;
@@ -1400,160 +1291,6 @@ client.on("message", async (topic, message) => {
console.error(e);
}
// Extract edges from traceroute (only for response packets)
if(!envelope.packet.decoded.wantResponse) {
try {
const route = routeDiscovery.route || [];
const snrTowards = routeDiscovery.snrTowards || [];
const originNodeId = envelope.packet.to;
const destinationNodeId = envelope.packet.from;
const packetId = envelope.packet.id;
const channelId = envelope.channelId;
const gatewayId = envelope.gatewayId ? convertHexIdToNumericId(envelope.gatewayId) : null;
// Determine number of edges: route.length + 1
const numEdges = route.length + 1;
const edgesToCreate = [];
// Extract edges from the route path
for(let i = 0; i < numEdges; i++) {
// Get SNR for this edge
if(i >= snrTowards.length) {
// Array length mismatch - skip this edge
continue;
}
const snr = snrTowards[i];
// Skip if SNR is -128 (no SNR recorded)
if(snr === -128) {
continue;
}
// Determine from_node and to_node
let fromNodeId, toNodeId;
if(route.length === 0) {
// Empty route: direct connection (to -> from)
fromNodeId = originNodeId;
toNodeId = destinationNodeId;
} else if(i === 0) {
// First edge: origin -> route[0]
fromNodeId = originNodeId;
toNodeId = route[0];
} else if(i === route.length) {
// Last edge: route[route.length-1] -> destination
fromNodeId = route[route.length - 1];
toNodeId = destinationNodeId;
} else {
// Middle edge: route[i-1] -> route[i]
fromNodeId = route[i - 1];
toNodeId = route[i];
}
// Fetch node positions from Node table
const [fromNode, toNode] = await Promise.all([
prisma.node.findUnique({
where: { node_id: fromNodeId },
select: { latitude: true, longitude: true },
}),
prisma.node.findUnique({
where: { node_id: toNodeId },
select: { latitude: true, longitude: true },
}),
]);
// Create edge record (skip if nodes don't exist, but still create edge with null positions)
edgesToCreate.push({
from_node_id: fromNodeId,
to_node_id: toNodeId,
snr: snr,
from_latitude: fromNode?.latitude ?? null,
from_longitude: fromNode?.longitude ?? null,
to_latitude: toNode?.latitude ?? null,
to_longitude: toNode?.longitude ?? null,
packet_id: packetId,
channel_id: channelId,
gateway_id: gatewayId,
source: "TRACEROUTE_APP",
});
}
// Extract edges from route_back path
const routeBack = routeDiscovery.routeBack || [];
const snrBack = routeDiscovery.snrBack || [];
if(routeBack.length > 0) {
// Number of edges in route_back equals route_back.length
for(let i = 0; i < routeBack.length; i++) {
// Get SNR for this edge
if(i >= snrBack.length) {
// Array length mismatch - skip this edge
continue;
}
const snr = snrBack[i];
// Skip if SNR is -128 (no SNR recorded)
if(snr === -128) {
continue;
}
// Determine from_node and to_node
let fromNodeId, toNodeId;
if(i === 0) {
// First edge: from -> route_back[0]
fromNodeId = destinationNodeId; // 'from' in the packet
toNodeId = routeBack[0];
} else {
// Subsequent edges: route_back[i-1] -> route_back[i]
fromNodeId = routeBack[i - 1];
toNodeId = routeBack[i];
}
// Fetch node positions from Node table
const [fromNode, toNode] = await Promise.all([
prisma.node.findUnique({
where: { node_id: fromNodeId },
select: { latitude: true, longitude: true },
}),
prisma.node.findUnique({
where: { node_id: toNodeId },
select: { latitude: true, longitude: true },
}),
]);
// Create edge record
edgesToCreate.push({
from_node_id: fromNodeId,
to_node_id: toNodeId,
snr: snr,
from_latitude: fromNode?.latitude ?? null,
from_longitude: fromNode?.longitude ?? null,
to_latitude: toNode?.latitude ?? null,
to_longitude: toNode?.longitude ?? null,
packet_id: packetId,
channel_id: channelId,
gateway_id: gatewayId,
source: "TRACEROUTE_APP",
});
}
}
// Bulk insert edges
if(edgesToCreate.length > 0) {
await prisma.edge.createMany({
data: edgesToCreate,
skipDuplicates: true, // Skip if exact duplicate exists
});
}
} catch (e) {
// Log error but don't crash - edge extraction is non-critical
console.error("Error extracting edges from traceroute:", e);
}
}
}
else if(portnum === 73) {
@@ -1658,7 +1395,6 @@ client.on("message", async (topic, message) => {
|| portnum === 0 // ignore UNKNOWN_APP
|| portnum === 1 // ignore TEXT_MESSAGE_APP
|| portnum === 5 // ignore ROUTING_APP
|| portnum === 6 // ignore ADMIN_APP
|| portnum === 34 // ignore PAXCOUNTER_APP
|| portnum === 65 // ignore STORE_FORWARD_APP
|| portnum === 66 // ignore RANGE_TEST_APP
@@ -1675,32 +1411,6 @@ client.on("message", async (topic, message) => {
}
} catch(e) {
console.log("error", e);
// ignore errors
}
});
// Graceful shutdown handlers
function gracefulShutdown(signal) {
console.log(`Received ${signal}. Starting graceful shutdown...`);
// Clear the purge interval if it exists
if(purgeInterval) {
clearInterval(purgeInterval);
console.log('Purge interval cleared');
}
// Close MQTT client
client.end(false, async () => {
console.log('MQTT client disconnected');
await prisma.$disconnect();
console.log('Database connections closed');
console.log('Graceful shutdown completed');
process.exit(0);
});
}
// Handle SIGTERM (Docker, systemd, etc.)
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Handle SIGINT (Ctrl+C)
process.on('SIGINT', () => gracefulShutdown('SIGINT'));

Submodule src/protobufs deleted from c2e45a3fc9

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.4 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 45 KiB

After

Width:  |  Height:  |  Size: 48 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 5.4 KiB

After

Width:  |  Height:  |  Size: 189 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -1,244 +0,0 @@
const path = require('path');
const express = require('express');
const router = express.Router();
const protobufjs = require("protobufjs");
// create prisma db client
const { Prisma, PrismaClient } = require("@prisma/client");
const prisma = new PrismaClient();
// load protobufs
const root = new protobufjs.Root();
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
root.loadSync('meshtastic/mqtt.proto');
const HardwareModel = root.lookupEnum("HardwareModel");
const PortNum = root.lookupEnum("PortNum");
router.get('/hardware-models', async (req, res) => {
try {
// get nodes from db
const results = await prisma.node.groupBy({
by: ['hardware_model'],
where: {
// Since we removed retention; only include nodes that have been updated in the last 30 days
updated_at: {
gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) // within last 30 days
}
},
orderBy: {
_count: {
hardware_model: 'desc',
},
},
_count: {
hardware_model: true,
},
});
const hardwareModelStats = results.map((result) => {
return {
count: result._count.hardware_model,
hardware_model: result.hardware_model,
hardware_model_name: HardwareModel.valuesById[result.hardware_model] ?? "UNKNOWN",
};
});
res.json({
hardware_model_stats: hardwareModelStats,
});
} catch(err) {
console.error(err);
res.status(500).json({
message: "Something went wrong, try again later.",
});
}
});
router.get('/messages-per-hour', async (req, res) => {
try {
const hours = 168;
const now = new Date();
const startTime = new Date(now.getTime() - hours * 60 * 60 * 1000);
const messages = await prisma.textMessage.findMany({
where: { created_at: { gte: startTime } },
select: { packet_id: true, created_at: true },
distinct: ['packet_id'], // Ensures only unique packet_id entries are counted
orderBy: { created_at: 'asc' }
});
// Pre-fill `uniqueCounts` with zeros for all hours, including the current hour
const uniqueCounts = Object.fromEntries(
Array.from({ length: hours }, (_, i) => {
const hourTime = new Date(now.getTime() - (hours - 1 - i) * 60 * 60 * 1000);
const hourString = hourTime.toISOString().slice(0, 13) + ":00:00.000Z"; // zero out the minutes and seconds
return [hourString, 0];
})
);
// Populate actual message counts
messages.forEach(({ created_at }) => {
const hourString = created_at.toISOString().slice(0, 13) + ":00:00.000Z"; // zero out the minutes and seconds
uniqueCounts[hourString] = (uniqueCounts[hourString] ?? 0) + 1;
});
// Convert to final result format
const result = Object.entries(uniqueCounts).map(([hour, count]) => ({ hour, count }));
res.json(result);
} catch (error) {
console.error('Error fetching messages:', error);
res.status(500).json({ error: 'Internal Server Error' });
}
});
router.get('/most-active-nodes', async (req, res) => {
try {
const channelId = req.query.channel_id;
const result = await prisma.$queryRaw(
Prisma.sql`
SELECT n.long_name, COUNT(*) AS count
FROM (
SELECT DISTINCT \`from\`, packet_id
FROM service_envelopes
WHERE
created_at >= NOW() - INTERVAL 1 DAY
AND packet_id IS NOT NULL
AND portnum != 73
AND \`to\` != 1
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
) AS unique_packets
JOIN nodes n ON unique_packets.from = n.node_id
GROUP BY n.long_name
ORDER BY count DESC
LIMIT 25;
`
);
res.set('Cache-Control', 'public, max-age=600'); // 10 min cache
res.json(result);
} catch (error) {
console.error('Error fetching data:', error);
res.status(500).json({ error: 'Internal Server Error' });
}
});
router.get('/portnum-counts', async (req, res) => {
const nodeId = req.query.nodeId ? parseInt(req.query.nodeId, 10) : null;
const channelId = req.query.channel_id;
const hours = 24;
const now = new Date();
const startTime = new Date(now.getTime() - hours * 60 * 60 * 1000);
try {
const envelopes = await prisma.serviceEnvelope.findMany({
where: {
created_at: { gte: startTime },
...(Number.isInteger(nodeId) ? { from: nodeId } : {}),
...(channelId ? { channel_id: channelId } : {}),
packet_id: { not: null },
to: { not: 1 }, // Filter out NODENUM_BROADCAST_NO_LORA
OR: [
{ portnum: { not: 73 } }, // Exclude portnum 73 (e.g. map reports)
{ portnum: null } // But include PKI packages, they have no portnum
]
},
select: {from: true, packet_id: true, portnum: true, channel_id: true}
});
// Ensure uniqueness based on (from, packet_id)
const seen = new Set();
const counts = {};
for (const envelope of envelopes) {
const uniqueKey = `${envelope.from}-${envelope.packet_id}`;
if (seen.has(uniqueKey)) continue;
seen.add(uniqueKey);
// Override portnum to 512 if channel_id is "PKI"
const portnum = envelope.channel_id === "PKI" ? 512 : (envelope.portnum ?? 0);
counts[portnum] = (counts[portnum] || 0) + 1;
}
const result = Object.entries(counts).map(([portnum, count]) => ({
portnum: parseInt(portnum, 10),
count: count,
label: parseInt(portnum, 10) === 512 ? "PKI" : (PortNum.valuesById[portnum] ?? "UNKNOWN"),
})).sort((a, b) => a.portnum - b.portnum);
res.json(result);
} catch (err) {
console.error("Error in /portnum-counts:", err);
res.status(500).json({ message: "Internal server error" });
}
});
router.get('/battery-stats', async (req, res) => {
const days = parseInt(req.query.days || '1', 10);
try {
const stats = await prisma.$queryRaw`
SELECT id, recorded_at, avg_battery_level
FROM battery_stats
WHERE recorded_at >= NOW() - INTERVAL ${days} DAY
ORDER BY recorded_at DESC;
`;
res.json(stats);
} catch (err) {
console.error('Error fetching battery stats:', err);
res.status(500).json({ error: 'Internal server error' });
}
});
router.get('/channel-utilization-stats', async (req, res) => {
const days = parseInt(req.query.days || '1', 10);
const channelId = req.query.channel_id; // optional string
try {
const stats = await prisma.$queryRaw(
Prisma.sql`
SELECT recorded_at, channel_id, avg_channel_utilization
FROM channel_utilization_stats
WHERE recorded_at >= NOW() - INTERVAL ${days} DAY
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
ORDER BY recorded_at DESC;
`
);
res.json(stats);
} catch (err) {
console.error('Error fetching channel utilization stats:', err);
res.status(500).json({ error: 'Internal server error' });
}
});
router.get('/channel-utilization', async (req, res) => {
const channelId = req.query.channel_id;
try {
const snapshot = await prisma.$queryRaw(
Prisma.sql`
SELECT recorded_at, channel_id, avg_channel_utilization
FROM channel_utilization_stats
WHERE recorded_at = (
SELECT MAX(recorded_at) FROM channel_utilization_stats
)
${channelId ? Prisma.sql`AND channel_id = ${channelId}` : Prisma.sql``}
ORDER BY channel_id;
`
);
res.json(snapshot);
} catch (err) {
console.error('Error fetching latest channel utilization:', err);
res.status(500).json({ error: 'Internal server error' });
}
});
module.exports = router;

View File

@@ -1,33 +0,0 @@
// Override console methods to add formatted timestamps
const originalLog = console.log;
const originalError = console.error;
const originalWarn = console.warn;
const originalInfo = console.info;
function formatTimestamp() {
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
const hours = String(now.getHours()).padStart(2, '0');
const minutes = String(now.getMinutes()).padStart(2, '0');
const seconds = String(now.getSeconds()).padStart(2, '0');
return `${year}-${month}-${day} ${hours}:${minutes}:${seconds}`;
}
console.log = function(...args) {
originalLog(`${formatTimestamp()} [Info]`, ...args);
};
console.error = function(...args) {
originalError(`${formatTimestamp()} [Error]`, ...args);
};
console.warn = function(...args) {
originalWarn(`${formatTimestamp()} [Warn]`, ...args);
};
console.info = function(...args) {
originalInfo(`${formatTimestamp()} [Info]`, ...args);
};

314
src/ws.js
View File

@@ -1,314 +0,0 @@
require('./utils/logger');
const crypto = require("crypto");
const path = require("path");
const http = require("http");
const mqtt = require("mqtt");
const protobufjs = require("protobufjs");
const commandLineArgs = require("command-line-args");
const commandLineUsage = require("command-line-usage");
const { WebSocketServer } = require("ws");
const optionsList = [
{
name: 'help',
alias: 'h',
type: Boolean,
description: 'Display this usage guide.'
},
{
name: "mqtt-broker-url",
type: String,
description: "MQTT Broker URL (e.g: mqtt://mqtt.meshtastic.org)",
},
{
name: "mqtt-username",
type: String,
description: "MQTT Username (e.g: meshdev)",
},
{
name: "mqtt-password",
type: String,
description: "MQTT Password (e.g: large4cats)",
},
{
name: "mqtt-client-id",
type: String,
description: "MQTT Client ID (e.g: map.example.com)",
},
{
name: "mqtt-topic",
type: String,
multiple: true,
typeLabel: '<topic> ...',
description: "MQTT Topic to subscribe to (e.g: msh/#)",
},
{
name: "decryption-keys",
type: String,
multiple: true,
typeLabel: '<base64DecryptionKey> ...',
description: "Decryption keys encoded in base64 to use when decrypting service envelopes.",
},
{
name: "ws-port",
type: Number,
description: "WebSocket server port (default: 8081)",
},
];
// parse command line args
const options = commandLineArgs(optionsList);
// show help
if(options.help){
const usage = commandLineUsage([
{
header: 'Meshtastic WebSocket Publisher',
content: 'Publishes real-time Meshtastic packets via WebSocket.',
},
{
header: 'Options',
optionList: optionsList,
},
]);
console.log(usage);
process.exit(0);
}
// get options and fallback to default values
const mqttBrokerUrl = options["mqtt-broker-url"] ?? "mqtt://mqtt.meshtastic.org";
const mqttUsername = options["mqtt-username"] ?? "meshdev";
const mqttPassword = options["mqtt-password"] ?? "large4cats";
const mqttClientId = options["mqtt-client-id"] ?? null;
const mqttTopics = options["mqtt-topic"] ?? ["msh/#"];
const decryptionKeys = options["decryption-keys"] ?? [
"1PG7OiApB1nwvP+rz05pAQ==", // add default "AQ==" decryption key
];
const wsPort = options["ws-port"] ?? 8081;
// create mqtt client
const client = mqtt.connect(mqttBrokerUrl, {
username: mqttUsername,
password: mqttPassword,
clientId: mqttClientId,
});
// load protobufs
const root = new protobufjs.Root();
root.resolvePath = (origin, target) => path.join(__dirname, "protobufs", target);
root.loadSync('meshtastic/mqtt.proto');
const Data = root.lookupType("Data");
const ServiceEnvelope = root.lookupType("ServiceEnvelope");
const RouteDiscovery = root.lookupType("RouteDiscovery");
// create HTTP server for WebSocket
const server = http.createServer();
const wss = new WebSocketServer({ server });
// track connected clients
const clients = new Set();
wss.on('connection', (ws) => {
clients.add(ws);
console.log(`WebSocket client connected. Total clients: ${clients.size}`);
ws.on('close', () => {
clients.delete(ws);
console.log(`WebSocket client disconnected. Total clients: ${clients.size}`);
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
clients.delete(ws);
});
});
// broadcast message to all connected clients
function broadcast(message) {
const messageStr = JSON.stringify(message);
clients.forEach((client) => {
if (client.readyState === 1) { // WebSocket.OPEN
try {
client.send(messageStr);
} catch (error) {
console.error('Error sending message to client:', error);
}
}
});
}
function createNonce(packetId, fromNode) {
// Expand packetId to 64 bits
const packetId64 = BigInt(packetId);
// Initialize block counter (32-bit, starts at zero)
const blockCounter = 0;
// Create a buffer for the nonce
const buf = Buffer.alloc(16);
// Write packetId, fromNode, and block counter to the buffer
buf.writeBigUInt64LE(packetId64, 0);
buf.writeUInt32LE(fromNode, 8);
buf.writeUInt32LE(blockCounter, 12);
return buf;
}
/**
* References:
* https://github.com/crypto-smoke/meshtastic-go/blob/develop/radio/aes.go#L42
* https://github.com/pdxlocations/Meshtastic-MQTT-Connect/blob/main/meshtastic-mqtt-connect.py#L381
*/
function decrypt(packet) {
// attempt to decrypt with all available decryption keys
for(const decryptionKey of decryptionKeys){
try {
// convert encryption key to buffer
const key = Buffer.from(decryptionKey, "base64");
// create decryption iv/nonce for this packet
const nonceBuffer = createNonce(packet.id, packet.from);
// determine algorithm based on key length
var algorithm = null;
if(key.length === 16){
algorithm = "aes-128-ctr";
} else if(key.length === 32){
algorithm = "aes-256-ctr";
} else {
// skip this key, try the next one...
console.error(`Skipping decryption key with invalid length: ${key.length}`);
continue;
}
// create decipher
const decipher = crypto.createDecipheriv(algorithm, key, nonceBuffer);
// decrypt encrypted packet
const decryptedBuffer = Buffer.concat([decipher.update(packet.encrypted), decipher.final()]);
// parse as data message
return Data.decode(decryptedBuffer);
} catch(e){}
}
// couldn't decrypt
return null;
}
/**
* converts hex id to numeric id, for example: !FFFFFFFF to 4294967295
* @param hexId a node id in hex format with a prepended "!"
* @returns {bigint} the node id in numeric form
*/
function convertHexIdToNumericId(hexId) {
return BigInt('0x' + hexId.replaceAll("!", ""));
}
// subscribe to everything when connected
client.on("connect", () => {
console.log("Connected to MQTT broker");
for(const mqttTopic of mqttTopics){
client.subscribe(mqttTopic);
console.log(`Subscribed to MQTT topic: ${mqttTopic}`);
}
});
// handle message received
client.on("message", async (topic, message) => {
try {
// decode service envelope
const envelope = ServiceEnvelope.decode(message);
if(!envelope.packet){
return;
}
// attempt to decrypt encrypted packets
const isEncrypted = envelope.packet.encrypted?.length > 0;
if(isEncrypted){
const decoded = decrypt(envelope.packet);
if(decoded){
envelope.packet.decoded = decoded;
}
}
// get portnum from decoded packet
const portnum = envelope.packet?.decoded?.portnum;
// check if we can see the decrypted packet data
if(envelope.packet.decoded == null){
return;
}
// handle traceroutes (portnum 70)
if(portnum === 70) {
try {
const routeDiscovery = RouteDiscovery.decode(envelope.packet.decoded.payload);
const traceroute = {
type: "traceroute",
data: {
to: envelope.packet.to,
from: envelope.packet.from,
want_response: envelope.packet.decoded.wantResponse,
route: routeDiscovery.route,
snr_towards: routeDiscovery.snrTowards,
route_back: routeDiscovery.routeBack,
snr_back: routeDiscovery.snrBack,
channel_id: envelope.channelId,
gateway_id: envelope.gatewayId ? Number(convertHexIdToNumericId(envelope.gatewayId)) : null,
packet_id: envelope.packet.id,
}
};
broadcast(traceroute);
} catch (e) {
console.error("Error processing traceroute:", e);
}
}
} catch(e) {
console.error("Error processing MQTT message:", e);
}
});
// start WebSocket server
server.listen(wsPort, () => {
console.log(`WebSocket server running on port ${wsPort}`);
});
// Graceful shutdown handlers
function gracefulShutdown(signal) {
console.log(`Received ${signal}. Starting graceful shutdown...`);
// Close all WebSocket connections
clients.forEach((client) => {
client.close();
});
clients.clear();
// Close WebSocket server
wss.close(() => {
console.log('WebSocket server closed');
});
// Close HTTP server
server.close(() => {
console.log('HTTP server closed');
});
// Close MQTT client
client.end(false, () => {
console.log('MQTT client disconnected');
console.log('Graceful shutdown completed');
process.exit(0);
});
}
// Handle SIGTERM (Docker, systemd, etc.)
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Handle SIGINT (Ctrl+C)
process.on('SIGINT', () => gracefulShutdown('SIGINT'));