mirror of
https://github.com/dk98174003/MeshtasticBot.git
synced 2026-03-28 17:32:38 +01:00
201 lines
7.8 KiB
Python
201 lines
7.8 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Minimal Meshtastic bot (command-ready, with logging)
|
|
- Connects via TCP to 192.168.0.156:4403 (TCP)
|
|
- Replies "pong" to direct "ping" only back to sender
|
|
- Clean command handler so you can add more commands easily
|
|
- Appends logs to meshtastic_bot.log in same directory and mirrors to console
|
|
|
|
Docs:
|
|
- Meshtastic Python: https://python.meshtastic.org/
|
|
- TCPInterface: https://python.meshtastic.org/tcp_interface.html
|
|
- sendText: https://python.meshtastic.org/mesh_interface.html#meshtastic.mesh_interface.MeshInterface.sendText
|
|
- PubSub basics: https://meshtastic.org/docs/development/python/library/
|
|
- Python logging: https://docs.python.org/3/library/logging.html
|
|
"""
|
|
import sys
|
|
import time
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Optional, Callable, Dict
|
|
|
|
try:
|
|
from pubsub import pub
|
|
from meshtastic.tcp_interface import TCPInterface
|
|
except Exception:
|
|
print("Missing dependencies. Install with:\n pip install meshtastic pypubsub", file=sys.stderr)
|
|
raise
|
|
|
|
HOST = "192.168.0.156"
|
|
PORT = 4403
|
|
|
|
# ---------- logging ----------
|
|
def _setup_logger() -> logging.Logger:
|
|
logger = logging.getLogger("meshtastic_bot")
|
|
if logger.handlers:
|
|
return logger
|
|
logger.setLevel(logging.INFO)
|
|
try:
|
|
log_path = Path(__file__).with_name("meshtastic_bot.log")
|
|
except NameError:
|
|
log_path = Path.cwd() / "meshtastic_bot.log"
|
|
fh = logging.FileHandler(log_path, mode="a", encoding="utf-8")
|
|
ch = logging.StreamHandler(sys.stdout)
|
|
fmt = logging.Formatter("%(message)s")
|
|
fh.setFormatter(fmt)
|
|
ch.setFormatter(fmt)
|
|
logger.addHandler(fh)
|
|
logger.addHandler(ch)
|
|
return logger
|
|
|
|
LOGGER = _setup_logger()
|
|
|
|
# ---------- helpers ----------
|
|
def _get_text_from_packet(packet: dict) -> str:
|
|
decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {}
|
|
p = decoded.get("payload", "")
|
|
if isinstance(p, (bytes, bytearray)):
|
|
try: return p.decode("utf-8", errors="ignore")
|
|
except Exception: return ""
|
|
if isinstance(p, str):
|
|
return p
|
|
t = decoded.get("text")
|
|
if isinstance(t, bytes): return t.decode("utf-8", errors="ignore")
|
|
if isinstance(t, str): return t
|
|
return ""
|
|
|
|
def _num_from_id(id_str: str) -> Optional[int]:
|
|
try:
|
|
if id_str and isinstance(id_str, str) and id_str.startswith("!"):
|
|
return int(id_str[1:], 16)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def _resolve_node_meta(iface: "TCPInterface", packet: dict, from_id) -> dict:
|
|
meta = {"short":"-", "long":"-", "hops":"-", "snr":"-", "rssi":"-"}
|
|
# prefer live rx metrics from this packet
|
|
meta["snr"] = packet.get("rxSnr") or packet.get("rx_snr") or "-"
|
|
meta["rssi"] = packet.get("rxRssi") or packet.get("rx_rssi") or "-"
|
|
try:
|
|
nodes = getattr(iface, "nodes", {}) or {}
|
|
node = nodes.get(from_id) if isinstance(from_id, str) else None
|
|
if not node and not isinstance(from_id, str):
|
|
nnum = packet.get("from") if isinstance(packet.get("from"), int) else None
|
|
if nnum is None and isinstance(from_id, str):
|
|
nnum = _num_from_id(from_id)
|
|
nodes_by_num = getattr(iface, "nodesByNum", {}) or {}
|
|
node = nodes_by_num.get(nnum) if nnum is not None else None
|
|
if isinstance(node, dict):
|
|
user = node.get("user") or {}
|
|
meta["short"] = user.get("shortName") or meta["short"]
|
|
meta["long"] = user.get("longName") or meta["long"]
|
|
meta["hops"] = node.get("hopsAway") or meta["hops"]
|
|
if meta["snr"] == "-":
|
|
meta["snr"] = node.get("snr", "-")
|
|
if meta["rssi"] == "-":
|
|
meta["rssi"] = node.get("rssi", "-")
|
|
except Exception:
|
|
pass
|
|
return meta
|
|
|
|
# ---------- command handlers ----------
|
|
# Signature: handler(iface, from_id, packet) -> Optional[str log_suffix]
|
|
def cmd_ping(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
|
|
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
|
|
meta = _resolve_node_meta(iface, packet, from_id)
|
|
iface.sendText(f"[Pong] snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}", destinationId=dest, wantAck=False)
|
|
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
|
|
|
|
def cmd_mail(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
|
|
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
|
|
meta = _resolve_node_meta(iface, packet, from_id)
|
|
iface.sendText("[Mail] Send=#s,sname,msg Read=#r", destinationId=dest, wantAck=False)
|
|
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
|
|
|
|
def cmd_info(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
|
|
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
|
|
meta = _resolve_node_meta(iface, packet, from_id)
|
|
iface.sendText("[Info] MeshtasticBot by Knud Schroder", destinationId=dest, wantAck=False)
|
|
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
|
|
|
|
def cmd_help(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]:
|
|
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
|
|
meta = _resolve_node_meta(iface, packet, from_id)
|
|
iface.sendText("[Help] Ping, Mail, Info", destinationId=dest, wantAck=False)
|
|
return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}"
|
|
|
|
COMMANDS: Dict[str, Callable[["TCPInterface", object, dict], Optional[str]]] = {
|
|
"ping": cmd_ping,
|
|
"mail": cmd_mail,
|
|
"info": cmd_info,
|
|
"help": cmd_help,
|
|
# Add more commands here, e.g.:
|
|
# "uptime": cmd_uptime,
|
|
# "help": cmd_help,
|
|
}
|
|
|
|
# ---------- main ----------
|
|
def main():
|
|
iface = TCPInterface(hostname=HOST, portNumber=PORT)
|
|
my_num = getattr(getattr(iface, "myInfo", None), "my_node_num", None)
|
|
my_id: Optional[str] = f"!{int(my_num):08x}" if my_num is not None else None
|
|
LOGGER.info(f"[+] Connected to {HOST}:{PORT} as num={my_num} id={my_id}")
|
|
|
|
def on_receive(packet=None, interface=None, **kwargs):
|
|
if not isinstance(packet, dict):
|
|
return
|
|
decoded = packet.get("decoded", {}) or {}
|
|
app = decoded.get("app") or decoded.get("portnum")
|
|
if app not in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP", "TEXT_MESSAGE"):
|
|
return
|
|
|
|
# only react to messages DIRECT to us
|
|
to_id = packet.get("toId") or packet.get("to")
|
|
to_num = packet.get("to") if isinstance(packet.get("to"), int) else None
|
|
direct = False
|
|
if my_id and isinstance(to_id, str):
|
|
direct = (to_id == my_id)
|
|
elif my_num is not None and to_num is not None:
|
|
direct = (int(to_num) == int(my_num))
|
|
if not direct:
|
|
return
|
|
|
|
text = _get_text_from_packet(packet).strip()
|
|
if not text:
|
|
return
|
|
|
|
# parse command (first token, case-insensitive)
|
|
cmd = text.split()[0].lower()
|
|
handler = COMMANDS.get(cmd)
|
|
if not handler:
|
|
return
|
|
|
|
from_id = packet.get("fromId") or packet.get("from")
|
|
if not from_id:
|
|
return
|
|
|
|
try:
|
|
extra = handler(iface, from_id, packet) or ""
|
|
dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}"
|
|
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
LOGGER.info(f"[{ts}] [{cmd}] -> {dest} {extra}".rstrip())
|
|
except Exception as e:
|
|
LOGGER.info(f"[handler:{cmd}] failed: {e}")
|
|
|
|
pub.subscribe(on_receive, "meshtastic.receive")
|
|
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
try: iface.close()
|
|
except Exception: pass
|
|
LOGGER.info("[*] Disconnected")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|