Send mesh data to Potatomesh API (#60)

* feat: post mesh data to API

* Serialize node objects before posting

* don't put raw json in api/db
This commit is contained in:
l5y
2025-09-15 14:00:48 +02:00
committed by GitHub
parent 6d948603c9
commit b39b83fb51
3 changed files with 68 additions and 108 deletions

View File

@@ -26,7 +26,7 @@ what works:
requires a meshtastic node connected (via serial) to gather mesh data and the meshtastic cli.
requires the meshtastic python api for the database.
requires the meshtastic python api for communicating with the device.
```bash
python -m venv .venv
@@ -41,14 +41,16 @@ gem install bundler
bundle install
```
### database
### ingest
uses python meshtastic library to ingest mesh data into an sqlite3 database locally
uses the meshtastic python library to ingest mesh data and post nodes and messages
to the configured potatomesh instance.
run `mesh.sh` in `data/` to keep updating node records and parsing new incoming messages.
run `mesh.sh` in `data/` with `POTATOMESH_INSTANCE` and `API_TOKEN` to keep updating
node records and parsing new incoming messages.
```bash
MESH_SERIAL=/dev/ttyACM0 DEBUG=1 ./mesh.sh
POTATOMESH_INSTANCE=https://potatomesh.net API_TOKEN=TOKEN MESH_SERIAL=/dev/ttyACM0 DEBUG=1 ./mesh.sh
[...]
[debug] upserted node !849b7154 shortName='7154'
[debug] upserted node !ba653ae8 shortName='3ae8'
@@ -56,7 +58,8 @@ MESH_SERIAL=/dev/ttyACM0 DEBUG=1 ./mesh.sh
[debug] stored message from '!9ee71c38' to '^all' ch=0 text='Guten Morgen!'
```
enable debug output with `DEBUG=1`, specify the serial port with `MESH_SERIAL` (default `/dev/ttyACM0`).
enable debug output with `DEBUG=1`, specify the serial port with `MESH_SERIAL`
(default `/dev/ttyACM0`).
### web app

View File

@@ -1,29 +1,18 @@
#!/usr/bin/env python3
import json, os, sqlite3, time, threading, signal
from pathlib import Path
import json, os, time, threading, signal, urllib.request, urllib.error
from meshtastic.serial_interface import SerialInterface
from meshtastic.mesh_interface import MeshInterface
from pubsub import pub
from google.protobuf.json_format import MessageToDict
from google.protobuf.message import Message as ProtoMessage
# --- Config (env overrides) ---------------------------------------------------
DB = os.environ.get("MESH_DB", "mesh.db")
PORT = os.environ.get("MESH_SERIAL", "/dev/ttyACM0")
SNAPSHOT_SECS = int(os.environ.get("MESH_SNAPSHOT_SECS", "30"))
CHANNEL_INDEX = int(os.environ.get("MESH_CHANNEL_INDEX", "0"))
DEBUG = os.environ.get("DEBUG") == "1"
# --- DB setup -----------------------------------------------------------------
nodeSchema = Path(__file__).with_name("nodes.sql").read_text()
conn = sqlite3.connect(DB, check_same_thread=False)
conn.executescript(nodeSchema)
msgSchema = Path(__file__).with_name("messages.sql").read_text()
conn.executescript(msgSchema)
conn.commit()
DB_LOCK = threading.Lock()
INSTANCE = os.environ.get("POTATOMESH_INSTANCE", "").rstrip("/")
API_TOKEN = os.environ.get("API_TOKEN", "")
def _get(obj, key, default=None):
@@ -33,66 +22,44 @@ def _get(obj, key, default=None):
return getattr(obj, key, default)
# --- HTTP helpers -------------------------------------------------------------
def _post_json(path: str, payload: dict):
if not INSTANCE:
return
url = f"{INSTANCE}{path}"
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
if API_TOKEN:
req.add_header("Authorization", f"Bearer {API_TOKEN}")
try:
with urllib.request.urlopen(req, timeout=10) as resp:
resp.read()
except Exception as e:
if DEBUG:
print(f"[warn] POST {url} failed: {e}")
# --- Node upsert --------------------------------------------------------------
def upsert_node(node_id, n):
user = _get(n, "user") or {}
met = _get(n, "deviceMetrics") or {}
pos = _get(n, "position") or {}
lh = _get(n, "lastHeard")
pt = _get(pos, "time")
now = int(time.time())
if pt is not None and pt > now:
pt = None
if lh is not None and lh > now:
lh = now
if pt is not None and (lh is None or lh < pt):
lh = pt
row = (
node_id,
_get(n, "num"),
_get(user, "shortName"),
_get(user, "longName"),
_get(user, "macaddr"),
_get(user, "hwModel") or _get(n, "hwModel"),
_get(user, "role"),
_get(user, "publicKey"),
_get(user, "isUnmessagable"),
_get(n, "isFavorite"),
_get(n, "hopsAway"),
_get(n, "snr"),
lh,
lh,
_get(met, "batteryLevel"),
_get(met, "voltage"),
_get(met, "channelUtilization"),
_get(met, "airUtilTx"),
_get(met, "uptimeSeconds"),
pt,
_get(pos, "locationSource"),
_get(pos, "latitude"),
_get(pos, "longitude"),
_get(pos, "altitude"),
)
with DB_LOCK:
conn.execute(
"""
INSERT INTO nodes(node_id,num,short_name,long_name,macaddr,hw_model,role,public_key,is_unmessagable,is_favorite,
hops_away,snr,last_heard,first_heard,battery_level,voltage,channel_utilization,air_util_tx,uptime_seconds,
position_time,location_source,latitude,longitude,altitude)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(node_id) DO UPDATE SET
num=excluded.num, short_name=excluded.short_name, long_name=excluded.long_name, macaddr=excluded.macaddr,
hw_model=excluded.hw_model, role=excluded.role, public_key=excluded.public_key, is_unmessagable=excluded.is_unmessagable,
is_favorite=excluded.is_favorite, hops_away=excluded.hops_away, snr=excluded.snr, last_heard=excluded.last_heard,
battery_level=excluded.battery_level, voltage=excluded.voltage, channel_utilization=excluded.channel_utilization,
air_util_tx=excluded.air_util_tx, uptime_seconds=excluded.uptime_seconds, position_time=excluded.position_time,
location_source=excluded.location_source, latitude=excluded.latitude, longitude=excluded.longitude,
altitude=excluded.altitude
""",
row,
def _node_to_dict(n) -> dict:
"""Convert Meshtastic node/user objects into plain dicts."""
if isinstance(n, dict):
return n
if isinstance(n, ProtoMessage):
return MessageToDict(
n, preserving_proto_field_name=True, use_integers_for_enums=False
)
try:
return json.loads(json.dumps(n, default=lambda o: getattr(o, "__dict__", str(o))))
except Exception:
return {"_unparsed": str(n)}
def upsert_node(node_id, n):
ndict = _node_to_dict(n)
_post_json("/api/nodes", {node_id: ndict})
if DEBUG:
user = _get(ndict, "user") or {}
short = _get(user, "shortName")
print(f"[debug] upserted node {node_id} shortName={short!r}")
@@ -142,7 +109,7 @@ def _pkt_to_dict(packet) -> dict:
def store_packet_dict(p: dict):
"""
Store only TEXT messages (decoded.payload.text) to the DB.
Store only TEXT messages (decoded.payload.text) by posting to the API.
Safe against snake/camel case differences.
"""
dec = p.get("decoded") or {}
@@ -178,27 +145,20 @@ def store_packet_dict(p: dict):
rssi = _first(p, "rssi", "rx_rssi", "rxRssi", default=None)
hop = _first(p, "hopLimit", "hop_limit", default=None)
row = (
int(pkt_id),
rx_time,
_iso(rx_time),
from_id,
to_id,
ch,
str(portnum) if portnum is not None else None,
text,
float(snr) if snr is not None else None,
int(rssi) if rssi is not None else None,
int(hop) if hop is not None else None,
)
with DB_LOCK:
conn.execute(
"""INSERT OR IGNORE INTO messages
(id, rx_time, rx_iso, from_id, to_id, channel, portnum, text, snr, rssi, hop_limit)
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
row,
)
conn.commit()
msg = {
"id": int(pkt_id),
"rx_time": rx_time,
"rx_iso": _iso(rx_time),
"from_id": from_id,
"to_id": to_id,
"channel": ch,
"portnum": str(portnum) if portnum is not None else None,
"text": text,
"snr": float(snr) if snr is not None else None,
"rssi": int(rssi) if rssi is not None else None,
"hop_limit": int(hop) if hop is not None else None,
}
_post_json("/api/messages", msg)
if DEBUG:
print(
@@ -232,14 +192,15 @@ def main():
signal.signal(signal.SIGINT, handle_sig)
signal.signal(signal.SIGTERM, handle_sig)
print(f"Mesh daemon: nodes+messages → {DB} | port={PORT} | channel={CHANNEL_INDEX}")
target = INSTANCE or "(no POTATOMESH_INSTANCE)"
print(
f"Mesh daemon: nodes+messages → {target} | port={PORT} | channel={CHANNEL_INDEX}"
)
while not stop.is_set():
try:
nodes = getattr(iface, "nodes", {}) or {}
for node_id, n in nodes.items():
upsert_node(node_id, n)
with DB_LOCK:
conn.commit()
except Exception as e:
print(f"[warn] failed to update node snapshot: {e}")
stop.wait(SNAPSHOT_SECS)
@@ -248,9 +209,6 @@ def main():
iface.close()
except Exception:
pass
with DB_LOCK:
conn.commit()
conn.close()
if __name__ == "__main__":

View File

@@ -63,7 +63,7 @@ def query_messages(limit)
ORDER BY m.rx_time DESC
LIMIT ?
SQL
msg_fields = %w[id rx_time rx_iso from_id to_id channel portnum text msg_snr rssi hop_limit raw_json]
msg_fields = %w[id rx_time rx_iso from_id to_id channel portnum text msg_snr rssi hop_limit]
rows.each do |r|
node = {}
r.keys.each do |k|
@@ -168,11 +168,10 @@ def insert_message(db, m)
m["snr"],
m["rssi"],
m["hop_limit"],
m["raw_json"],
]
db.execute <<~SQL, row
INSERT OR IGNORE INTO messages(id,rx_time,rx_iso,from_id,to_id,channel,portnum,text,snr,rssi,hop_limit,raw_json)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?)
INSERT OR IGNORE INTO messages(id,rx_time,rx_iso,from_id,to_id,channel,portnum,text,snr,rssi,hop_limit)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
SQL
end