#!/usr/bin/env python3 # -*- coding: utf-8 -*- # UDP Interface game server for Meshtastic Meshing-Around Mesh Bot # depends on: pip install meshtastic protobuf mudp # 2025 Kelly Keeton K7MHI import os import sys import time from collections import OrderedDict import configparser useSynchCompression = True if useSynchCompression: import zlib try: from pubsub import pub from meshtastic.protobuf import mesh_pb2, portnums_pb2 except ImportError: print("meshtastic API not found. pip install -U meshtastic") exit(1) try: from mudp import UDPPacketStream, node, conn from mudp.encryption import generate_hash except ImportError: print("mUDP module not found. pip install -U mudp") print("If launching, venv run source venv/bin/activate and then pip install -U mudp pygame-ce") print("use deactivate to exit venv when done") exit(1) try: sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from modules.games.tictactoe_vid import handle_tictactoe_payload, ttt_main from modules.games.battleship_vid import parse_battleship_message except Exception as e: print(f"Error importing modules: {e}\nRun this program from the main project directory, e.g. 'python3 script/game_serve.py'") exit(1) # import logging # logger = logging.getLogger("MeshBot Game Server") # logger.setLevel(logging.DEBUG) # logger.propagate = False # # Remove any existing handlers # if logger.hasHandlers(): # logger.handlers.clear() # handler = logging.StreamHandler(sys.stdout) # logger.addHandler(handler) # logger.debug("Mesh Bot Game Server Logger initialized") # Load config from game.ini if it exists config = configparser.ConfigParser() config_path = os.path.join(os.path.dirname(__file__), "game.ini") if os.path.exists(config_path): config.read(config_path) MCAST_GRP = config.get("network", "MCAST_GRP", fallback="224.0.0.69") MCAST_PORT = config.getint("network", "MCAST_PORT", fallback=4403) CHANNEL_ID = config.get("network", "CHANNEL_ID", fallback="LongFast") KEY = config.get("network", "KEY", fallback="1PG7OiApB1nwvP+rz05pAQ==") PUBLIC_CHANNEL_IDS = [x.strip() for x in config.get("network", "PUBLIC_CHANNEL_IDS", fallback="LongFast,ShortSlow,Medium,LongSlow,ShortFast,ShortTurbo").split(",")] NODE_ID = config.get("node", "NODE_ID", fallback="!meshbotg") LONG_NAME = config.get("node", "LONG_NAME", fallback="Mesh Bot Game Server") SHORT_NAME = config.get("node", "SHORT_NAME", fallback="MBGS") SEEN_MESSAGES_MAX = config.getint("game", "SEEN_MESSAGES_MAX", fallback=1000) FULLSCREEN = config.getboolean("game", "FULLSCREEN", fallback=True) else: MCAST_GRP, MCAST_PORT, CHANNEL_ID, KEY = "224.0.0.69", 4403, "LongFast", "1PG7OiApB1nwvP+rz05pAQ==" PUBLIC_CHANNEL_IDS = ["LongFast", "ShortSlow", "Medium", "LongSlow", "ShortFast", "ShortTurbo"] NODE_ID, LONG_NAME, SHORT_NAME = "!meshbotg", "Mesh Bot Game Server", "MBGS" SEEN_MESSAGES_MAX = 1000 # Adjust as needed FULLSCREEN = True CHANNEL_HASHES = {generate_hash(name, KEY): name for name in PUBLIC_CHANNEL_IDS} mudpEnabled, mudpInterface = True, None seen_messages = OrderedDict() # Track seen (from, to, payload) tuples is_running = False def initalize_mudp(): global mudpInterface if mudpEnabled and mudpInterface is None: mudpInterface = UDPPacketStream(MCAST_GRP, MCAST_PORT, key=KEY) node.node_id, node.long_name, node.short_name = NODE_ID, LONG_NAME, SHORT_NAME node.channel, node.key = CHANNEL_ID, KEY conn.setup_multicast(MCAST_GRP, MCAST_PORT) print(f"mUDP Interface initialized on {MCAST_GRP}:{MCAST_PORT} with Channel ID '{CHANNEL_ID}'") print(f"Node ID: {NODE_ID}, Long Name: {LONG_NAME}, Short Name: {SHORT_NAME}") print("Public Channel IDs:", PUBLIC_CHANNEL_IDS) def get_channel_name(channel_hash): return CHANNEL_HASHES.get(channel_hash, '') def add_seen_message(msg_tuple): if msg_tuple not in seen_messages: if len(seen_messages) >= SEEN_MESSAGES_MAX: seen_messages.popitem(last=False) # Remove oldest seen_messages[msg_tuple] = None def compress_payload(data: str) -> bytes: """Compress a string to bytes using zlib if enabled.""" if useSynchCompression: return zlib.compress(data.encode("utf-8")) else: return data.encode("utf-8") def decompress_payload(data: bytes) -> str: """Decompress bytes to string using zlib if enabled, fallback to utf-8 if not compressed.""" if useSynchCompression: try: return zlib.decompress(data).decode("utf-8") except Exception: return data.decode("utf-8", "ignore") else: return data.decode("utf-8", "ignore") def on_private_app(packet: mesh_pb2.MeshPacket, addr=None): global seen_messages packet_payload = "" packet_from_id = None if packet.HasField("decoded"): try: # Try to decompress, fallback to decode if not compressed packet_payload = decompress_payload(packet.decoded.payload) packet_from_id = getattr(packet, 'from', None) port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A" rx_channel = get_channel_name(packet.channel) if packet_payload.startswith("MTTT:"): packet_payload = packet_payload[5:] # remove 'MTTT:' msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) if msg_tuple not in seen_messages: add_seen_message(msg_tuple) handle_tictactoe_payload(packet_payload, from_id=packet_from_id) print(f"[Channel: {rx_channel}] [Port: {port_name}] Tic-Tac-Toe Message payload:", packet_payload) elif packet_payload.startswith("MBSP:"): packet_payload = packet_payload[5:] # remove 'MBSP:' msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) if msg_tuple not in seen_messages: add_seen_message(msg_tuple) #parse_battleship_message(packet_payload, from_id=packet_from_id) print(f"[Channel: {rx_channel}] [Port: {port_name}] Battleship Message payload:", packet_payload) else: msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) if msg_tuple not in seen_messages: add_seen_message(msg_tuple) print(f"[Channel: {rx_channel}] [Port: {port_name}] Private App payload:", packet_payload) except Exception: print(" Private App extraction error payload (raw bytes):", packet.decoded.payload) def on_text_message(packet: mesh_pb2.MeshPacket, addr=None): global seen_messages try: packet_payload = "" if packet.HasField("decoded"): rx_channel = get_channel_name(packet.channel) port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A" try: # Try to decompress, fallback to decode if not compressed packet_payload = decompress_payload(packet.decoded.payload) msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) if msg_tuple not in seen_messages: add_seen_message(msg_tuple) #print(f"[Channel: {rx_channel}] [Port: {port_name}] TEXT Message payload:", packet_payload) except Exception: print(" extraction error payload (raw bytes):", packet.decoded.payload) except Exception as e: print("Error processing received packet:", e) # def on_recieve(packet: mesh_pb2.MeshPacket, addr=None): # print(f"\n[RECV] Packet received from {addr}") # print(packet) #pub.subscribe(on_recieve, "mesh.rx.packet") pub.subscribe(on_text_message, "mesh.rx.port.1") # TEXT_MESSAGE pub.subscribe(on_private_app, "mesh.rx.port.256") # PRIVATE_APP DEFAULT_PORTNUM def main(): global mudpInterface, is_running print(r""" ___ / \ | HOT | Mesh Bot Display Server v0.9.5b | TOT | (aka tot-bot) \___/ """) print("Press escape (ESC) key to exit") initalize_mudp() # initialize MUDP interface mudpInterface.start() is_running = True try: while is_running: ttt_main(fullscreen=FULLSCREEN) is_running = False time.sleep(0.1) except KeyboardInterrupt: print("\n[INFO] KeyboardInterrupt received. Shutting down Mesh Bot Game Server...") is_running = False except Exception as e: print(f"[ERROR] Exception during main loop: {e}") finally: print("[INFO] Stopping mUDP interface...") if mudpInterface: mudpInterface.stop() print("[INFO] mUDP interface stopped.") print("[INFO] Mesh Bot Game Server shutdown complete.") if __name__ == "__main__": main()