Add files via upload

This commit is contained in:
Knud Schrøder
2025-11-05 21:13:51 +01:00
committed by GitHub
parent 9ea8a6761a
commit 56ef61df14
2 changed files with 202 additions and 0 deletions

2
meshtastic_bot.log Normal file
View File

@@ -0,0 +1,2 @@
[+] Connected to 192.168.0.156:4403 as num=2658661380 id=!9e77f404
[2025-11-05 21:10:44] [ping] -> !cc9a6661 KSM1 Knud S 🇩🇰 Mobile snr=6.25 rssi=-33 hops=-

200
meshtastic_bot.py Normal file
View File

@@ -0,0 +1,200 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Minimal Meshtastic bot (command-ready, with logging)
- Connects via TCP to 192.168.0.156:4403 (TCP)
- Replies "pong" to direct "ping" only back to sender
- Clean command handler so you can add more commands easily
- Appends logs to meshtastic_bot.log in same directory and mirrors to console
Docs:
- Meshtastic Python: https://python.meshtastic.org/
- TCPInterface: https://python.meshtastic.org/tcp_interface.html
- sendText: https://python.meshtastic.org/mesh_interface.html#meshtastic.mesh_interface.MeshInterface.sendText
- PubSub basics: https://meshtastic.org/docs/development/python/library/
- Python logging: https://docs.python.org/3/library/logging.html
"""
import sys
import time
import logging
from pathlib import Path
from typing import Optional, Callable, Dict
try:
from pubsub import pub
from meshtastic.tcp_interface import TCPInterface
except Exception:
print("Missing dependencies. Install with:\n pip install meshtastic pypubsub", file=sys.stderr)
raise
HOST = "192.168.0.156"
PORT = 4403
# ---------- logging ----------
def _setup_logger() -> logging.Logger:
logger = logging.getLogger("meshtastic_bot")
if logger.handlers:
return logger
logger.setLevel(logging.INFO)
try:
log_path = Path(__file__).with_name("meshtastic_bot.log")
except NameError:
log_path = Path.cwd() / "meshtastic_bot.log"
fh = logging.FileHandler(log_path, mode="a", encoding="utf-8")
ch = logging.StreamHandler(sys.stdout)
fmt = logging.Formatter("%(message)s")
fh.setFormatter(fmt)
ch.setFormatter(fmt)
logger.addHandler(fh)
logger.addHandler(ch)
return logger
LOGGER = _setup_logger()
# ---------- helpers ----------
def _get_text_from_packet(packet: dict) -> str:
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
p = decoded.get("payload", "")
if isinstance(p, (bytes, bytearray)):
try: return p.decode("utf-8", errors="ignore")
except Exception: return ""
if isinstance(p, str):
return p
t = decoded.get("text")
if isinstance(t, bytes): return t.decode("utf-8", errors="ignore")
if isinstance(t, str): return t
return ""
def _num_from_id(id_str: str) -> Optional[int]:
try:
if id_str and isinstance(id_str, str) and id_str.startswith("!"):
return int(id_str[1:], 16)
except Exception:
pass
return None
def _resolve_node_meta(iface: "TCPInterface", packet: dict, from_id) -> dict:
meta = {"short":"-", "long":"-", "hops":"-", "snr":"-", "rssi":"-"}
# prefer live rx metrics from this packet
meta["snr"] = packet.get("rxSnr") or packet.get("rx_snr") or "-"
meta["rssi"] = packet.get("rxRssi") or packet.get("rx_rssi") or "-"
try:
nodes = getattr(iface, "nodes", {}) or {}
node = nodes.get(from_id) if isinstance(from_id, str) else None
if not node and not isinstance(from_id, str):
nnum = packet.get("from") if isinstance(packet.get("from"), int) else None
if nnum is None and isinstance(from_id, str):
nnum = _num_from_id(from_id)
nodes_by_num = getattr(iface, "nodesByNum", {}) or {}
node = nodes_by_num.get(nnum) if nnum is not None else None
if isinstance(node, dict):
user = node.get("user") or {}
meta["short"] = user.get("shortName") or meta["short"]
meta["long"] = user.get("longName") or meta["long"]
meta["hops"] = node.get("hopsAway") or meta["hops"]
if meta["snr"] == "-":
meta["snr"] = node.get("snr", "-")
if meta["rssi"] == "-":
meta["rssi"] = node.get("rssi", "-")
except Exception:
pass
return meta
# ---------- command handlers ----------
# Signature: handler(iface, from_id, packet) -> Optional[str log_suffix]
def cmd_ping(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
meta = _resolve_node_meta(iface, packet, from_id)
iface.sendText(f"[Pong] snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}", destinationId=dest, wantAck=False)
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
def cmd_mail(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
meta = _resolve_node_meta(iface, packet, from_id)
iface.sendText("[Mail] Send=#s,sname,msg Read=#r", destinationId=dest, wantAck=False)
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
def cmd_info(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
meta = _resolve_node_meta(iface, packet, from_id)
iface.sendText("[Info] MeshtasticBot by Knud Schroder", destinationId=dest, wantAck=False)
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
def cmd_help(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
meta = _resolve_node_meta(iface, packet, from_id)
iface.sendText("[Help] Ping, Mail, Info", destinationId=dest, wantAck=False)
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
COMMANDS: Dict[str, Callable[["TCPInterface", object, dict], Optional[str]]] = {
"ping": cmd_ping,
"mail": cmd_mail,
"info": cmd_info,
"help": cmd_help,
# Add more commands here, e.g.:
# "uptime": cmd_uptime,
# "help": cmd_help,
}
# ---------- main ----------
def main():
iface = TCPInterface(hostname=HOST, portNumber=PORT)
my_num = getattr(getattr(iface, "myInfo", None), "my_node_num", None)
my_id: Optional[str] = f"!{int(my_num):08x}" if my_num is not None else None
LOGGER.info(f"[+] Connected to {HOST}:{PORT} as num={my_num} id={my_id}")
def on_receive(packet=None, interface=None, **kwargs):
if not isinstance(packet, dict):
return
decoded = packet.get("decoded", {}) or {}
app = decoded.get("app") or decoded.get("portnum")
if app not in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP", "TEXT_MESSAGE"):
return
# only react to messages DIRECT to us
to_id = packet.get("toId") or packet.get("to")
to_num = packet.get("to") if isinstance(packet.get("to"), int) else None
direct = False
if my_id and isinstance(to_id, str):
direct = (to_id == my_id)
elif my_num is not None and to_num is not None:
direct = (int(to_num) == int(my_num))
if not direct:
return
text = _get_text_from_packet(packet).strip()
if not text:
return
# parse command (first token, case-insensitive)
cmd = text.split()[0].lower()
handler = COMMANDS.get(cmd)
if not handler:
return
from_id = packet.get("fromId") or packet.get("from")
if not from_id:
return
try:
extra = handler(iface, from_id, packet) or ""
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
LOGGER.info(f"[{ts}] [{cmd}] -> {dest} {extra}".rstrip())
except Exception as e:
LOGGER.info(f"[handler:{cmd}] failed: {e}")
pub.subscribe(on_receive, "meshtastic.receive")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
pass
finally:
try: iface.close()
except Exception: pass
LOGGER.info("[*] Disconnected")
if __name__ == "__main__":
main()