diff --git a/meshtastic_bot.log b/meshtastic_bot.log new file mode 100644 index 0000000..8b24acf --- /dev/null +++ b/meshtastic_bot.log @@ -0,0 +1,2 @@ +[+] Connected to 192.168.0.156:4403 as num=2658661380 id=!9e77f404 +[2025-11-05 21:10:44] [ping] -> !cc9a6661 KSM1 Knud S 🇩🇰 Mobile snr=6.25 rssi=-33 hops=- diff --git a/meshtastic_bot.py b/meshtastic_bot.py new file mode 100644 index 0000000..5f0acee --- /dev/null +++ b/meshtastic_bot.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Minimal Meshtastic bot (command-ready, with logging) +- Connects via TCP to 192.168.0.156:4403 (TCP) +- Replies "pong" to direct "ping" only back to sender +- Clean command handler so you can add more commands easily +- Appends logs to meshtastic_bot.log in same directory and mirrors to console + +Docs: +- Meshtastic Python: https://python.meshtastic.org/ +- TCPInterface: https://python.meshtastic.org/tcp_interface.html +- sendText: https://python.meshtastic.org/mesh_interface.html#meshtastic.mesh_interface.MeshInterface.sendText +- PubSub basics: https://meshtastic.org/docs/development/python/library/ +- Python logging: https://docs.python.org/3/library/logging.html +""" +import sys +import time +import logging +from pathlib import Path +from typing import Optional, Callable, Dict + +try: + from pubsub import pub + from meshtastic.tcp_interface import TCPInterface +except Exception: + print("Missing dependencies. Install with:\n pip install meshtastic pypubsub", file=sys.stderr) + raise + +HOST = "192.168.0.156" +PORT = 4403 + +# ---------- logging ---------- +def _setup_logger() -> logging.Logger: + logger = logging.getLogger("meshtastic_bot") + if logger.handlers: + return logger + logger.setLevel(logging.INFO) + try: + log_path = Path(__file__).with_name("meshtastic_bot.log") + except NameError: + log_path = Path.cwd() / "meshtastic_bot.log" + fh = logging.FileHandler(log_path, mode="a", encoding="utf-8") + ch = logging.StreamHandler(sys.stdout) + fmt = logging.Formatter("%(message)s") + fh.setFormatter(fmt) + ch.setFormatter(fmt) + logger.addHandler(fh) + logger.addHandler(ch) + return logger + +LOGGER = _setup_logger() + +# ---------- helpers ---------- +def _get_text_from_packet(packet: dict) -> str: + decoded = packet.get("decoded", {}) if isinstance(packet, dict) else {} + p = decoded.get("payload", "") + if isinstance(p, (bytes, bytearray)): + try: return p.decode("utf-8", errors="ignore") + except Exception: return "" + if isinstance(p, str): + return p + t = decoded.get("text") + if isinstance(t, bytes): return t.decode("utf-8", errors="ignore") + if isinstance(t, str): return t + return "" + +def _num_from_id(id_str: str) -> Optional[int]: + try: + if id_str and isinstance(id_str, str) and id_str.startswith("!"): + return int(id_str[1:], 16) + except Exception: + pass + return None + +def _resolve_node_meta(iface: "TCPInterface", packet: dict, from_id) -> dict: + meta = {"short":"-", "long":"-", "hops":"-", "snr":"-", "rssi":"-"} + # prefer live rx metrics from this packet + meta["snr"] = packet.get("rxSnr") or packet.get("rx_snr") or "-" + meta["rssi"] = packet.get("rxRssi") or packet.get("rx_rssi") or "-" + try: + nodes = getattr(iface, "nodes", {}) or {} + node = nodes.get(from_id) if isinstance(from_id, str) else None + if not node and not isinstance(from_id, str): + nnum = packet.get("from") if isinstance(packet.get("from"), int) else None + if nnum is None and isinstance(from_id, str): + nnum = _num_from_id(from_id) + nodes_by_num = getattr(iface, "nodesByNum", {}) or {} + node = nodes_by_num.get(nnum) if nnum is not None else None + if isinstance(node, dict): + user = node.get("user") or {} + meta["short"] = user.get("shortName") or meta["short"] + meta["long"] = user.get("longName") or meta["long"] + meta["hops"] = node.get("hopsAway") or meta["hops"] + if meta["snr"] == "-": + meta["snr"] = node.get("snr", "-") + if meta["rssi"] == "-": + meta["rssi"] = node.get("rssi", "-") + except Exception: + pass + return meta + +# ---------- command handlers ---------- +# Signature: handler(iface, from_id, packet) -> Optional[str log_suffix] +def cmd_ping(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]: + dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}" + meta = _resolve_node_meta(iface, packet, from_id) + iface.sendText(f"[Pong] snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}", destinationId=dest, wantAck=False) + return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}" + +def cmd_mail(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]: + dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}" + meta = _resolve_node_meta(iface, packet, from_id) + iface.sendText("[Mail] Send=#s,sname,msg Read=#r", destinationId=dest, wantAck=False) + return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}" + +def cmd_info(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]: + dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}" + meta = _resolve_node_meta(iface, packet, from_id) + iface.sendText("[Info] MeshtasticBot by Knud Schroder", destinationId=dest, wantAck=False) + return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}" + +def cmd_help(iface: "TCPInterface", from_id, packet: dict) -> Optional[str]: + dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}" + meta = _resolve_node_meta(iface, packet, from_id) + iface.sendText("[Help] Ping, Mail, Info", destinationId=dest, wantAck=False) + return f"{meta['short']} {meta['long']} snr={meta['snr']} rssi={meta['rssi']} hops={meta['hops']}" + +COMMANDS: Dict[str, Callable[["TCPInterface", object, dict], Optional[str]]] = { + "ping": cmd_ping, + "mail": cmd_mail, + "info": cmd_info, + "help": cmd_help, + # Add more commands here, e.g.: + # "uptime": cmd_uptime, + # "help": cmd_help, +} + +# ---------- main ---------- +def main(): + iface = TCPInterface(hostname=HOST, portNumber=PORT) + my_num = getattr(getattr(iface, "myInfo", None), "my_node_num", None) + my_id: Optional[str] = f"!{int(my_num):08x}" if my_num is not None else None + LOGGER.info(f"[+] Connected to {HOST}:{PORT} as num={my_num} id={my_id}") + + def on_receive(packet=None, interface=None, **kwargs): + if not isinstance(packet, dict): + return + decoded = packet.get("decoded", {}) or {} + app = decoded.get("app") or decoded.get("portnum") + if app not in ("TEXT_MESSAGE_APP", "TEXT_MESSAGE_COMPRESSED_APP", "TEXT_MESSAGE"): + return + + # only react to messages DIRECT to us + to_id = packet.get("toId") or packet.get("to") + to_num = packet.get("to") if isinstance(packet.get("to"), int) else None + direct = False + if my_id and isinstance(to_id, str): + direct = (to_id == my_id) + elif my_num is not None and to_num is not None: + direct = (int(to_num) == int(my_num)) + if not direct: + return + + text = _get_text_from_packet(packet).strip() + if not text: + return + + # parse command (first token, case-insensitive) + cmd = text.split()[0].lower() + handler = COMMANDS.get(cmd) + if not handler: + return + + from_id = packet.get("fromId") or packet.get("from") + if not from_id: + return + + try: + extra = handler(iface, from_id, packet) or "" + dest = from_id if isinstance(from_id, str) else f"!{int(from_id):08x}" + ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + LOGGER.info(f"[{ts}] [{cmd}] -> {dest} {extra}".rstrip()) + except Exception as e: + LOGGER.info(f"[handler:{cmd}] failed: {e}") + + pub.subscribe(on_receive, "meshtastic.receive") + + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + pass + finally: + try: iface.close() + except Exception: pass + LOGGER.info("[*] Disconnected") + +if __name__ == "__main__": + main()