From f690f16771c53d34443a1e4cc948418d40c267d3 Mon Sep 17 00:00:00 2001 From: SpudGunMan Date: Sun, 2 Nov 2025 20:07:55 -0800 Subject: [PATCH] tic-tac-toe 3d thanks again @martinbogo --- mesh_bot.py | 39 +-- modules/games/tictactoe.py | 492 ++++++++++++++++----------------- modules/games/tictactoe_vid.py | 186 +++++++++++++ modules/system.py | 3 +- script/game_serve.py | 152 ++++++++++ 5 files changed, 602 insertions(+), 270 deletions(-) create mode 100644 modules/games/tictactoe_vid.py create mode 100644 script/game_serve.py diff --git a/mesh_bot.py b/mesh_bot.py index 67443ba..880cf06 100755 --- a/mesh_bot.py +++ b/mesh_bot.py @@ -16,7 +16,7 @@ import modules.settings as my_settings from modules.system import * # list of commands to remove from the default list for DM only -restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "quiz", "q:", "survey", "s:"] +restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "tic-tac-toe", "quiz", "q:", "survey", "s:"] restrictedResponse = "🤖only available in a Direct Message📵" # "" for none def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM): @@ -1051,30 +1051,35 @@ def handleHamtest(message, nodeID, deviceID): def handleTicTacToe(message, nodeID, deviceID): global tictactoeTracker - index = 0 - msg = '' - - # Find or create player tracker entry - for i in range(len(tictactoeTracker)): - if tictactoeTracker[i]['nodeID'] == nodeID: - tictactoeTracker[i]["last_played"] = time.time() - index = i+1 - break + tracker_entry = next((entry for entry in tictactoeTracker if entry['nodeID'] == nodeID), None) + + # Handle end/exit command if message.lower().startswith('e'): - if index: + if tracker_entry: tictactoe.end(nodeID) - tictactoeTracker.pop(index-1) + tictactoeTracker.remove(tracker_entry) return "Thanks for playing! 🎯" - if not index: + # If not found, create new tracker entry and ask for 2D/3D if not specified + if not tracker_entry: + mode = "2D" + if "3d" in message.lower(): + mode = "3D" + elif "2d" in message.lower(): + mode = "2D" tictactoeTracker.append({ "nodeID": nodeID, - "last_played": time.time() + "last_played": time.time(), + "mode": mode }) - msg = "🎯Tic-Tac-Toe🤖 '(e)nd'\n" - - msg += tictactoe.play(nodeID, message) + msg = f"🎯Tic-Tac-Toe🤖 '{mode}' mode. (e)nd to quit\n" + msg += tictactoe.new_game(nodeID, mode=mode) + return msg + else: + tracker_entry["last_played"] = time.time() + + msg = tictactoe.play(nodeID, message) return msg def quizHandler(message, nodeID, deviceID): diff --git a/modules/games/tictactoe.py b/modules/games/tictactoe.py index 2f2129c..719aa58 100644 --- a/modules/games/tictactoe.py +++ b/modules/games/tictactoe.py @@ -7,270 +7,258 @@ import modules.settings as my_settings # to (max), molly and jake, I miss you both so much. -if my_settings.disable_emojis_in_games: - X = "X" - O = "O" -else: - X = "❌" - O = "⭕️" - class TicTacToe: - def __init__(self): + def __init__(self, display_module): + if getattr(my_settings, "disable_emojis_in_games", False): + self.X = "X" + self.O = "O" + else: + self.X = "❌" + self.O = "⭕️" + self.display_module = display_module self.game = {} + self.win_lines_3d = self.generate_3d_win_lines() - def new_game(self, id): - positiveThoughts = ["🚀I need to call NATO", - "🏅Going for the gold!", - "Mastering ❌TTT⭕️",] - sorryNotGoinWell = ["😭Not your day, huh?", - "📉Results here dont define you.", - "🤖WOPR would be proud."] - """Start a new game""" - games = won = 0 - ret = "" - if id in self.game: - games = self.game[id]["games"] - won = self.game[id]["won"] - if games > 3: - if won / games >= 3.14159265358979323846: # win rate > pi - ret += random.choice(positiveThoughts) + "\n" - else: - ret += random.choice(sorryNotGoinWell) + "\n" - # Retain stats - ret += f"Games:{games} 🥇❌:{won}\n" - - self.game[id] = { - "board": [" "] * 9, # 3x3 board as flat list - "player": X, # Human is X, bot is O - "games": games + 1, - "won": won, - "turn": "human" # whose turn it is + def new_game(self, nodeID, mode="2D", channel=None, deviceID=None): + board_size = 9 if mode == "2D" else 27 + self.game[nodeID] = { + "board": [" "] * board_size, + "mode": mode, + "channel": channel, + "nodeID": nodeID, + "deviceID": deviceID, + "player": self.X, + "games": 1, + "won": 0, + "turn": "human" } - ret += self.show_board(id) - ret += "Pick 1-9:" - return ret - - def rndTeaPrice(self, tea=42): - """Return a random tea between 0 and tea.""" - return random.uniform(0, tea) + self.update_display(nodeID, status="new") + msg = f"{mode} game started!\n" + if mode == "2D": + msg += self.show_board(nodeID) + msg += "Pick 1-9:" + else: + msg += "Play on the MeshBot Display!\n" + msg += "Pick 1-27:" + return msg - def show_board(self, id): - """Display compact board with move numbers""" - g = self.game[id] - b = g["board"] - - # Show board with positions - board_str = "" - for i in range(3): - row = "" - for j in range(3): - pos = i * 3 + j - if my_settings.disable_emojis_in_games: - cell = b[pos] if b[pos] != " " else str(pos + 1) - else: - cell = b[pos] if b[pos] != " " else f" {str(pos + 1)} " - row += cell - if j < 2: - row += " | " - board_str += row - if i < 2: - board_str += "\n" - - return board_str + "\n" + def update_display(self, nodeID, status=None): + from modules.system import send_raw_bytes + g = self.game[nodeID] + mapping = {" ": "0", "X": "1", "O": "2", "❌": "1", "⭕️": "2"} + board_str = "".join(mapping.get(cell, "0") for cell in g["board"]) + msg = f"MTTT:{board_str}|{g['nodeID']}|{g['channel']}|{g['deviceID']}" + if status: + msg += f"|status={status}" + send_raw_bytes(nodeID, msg.encode("utf-8"), portnum=256) + if self.display_module: + self.display_module.update_board( + g["board"], g["channel"], g["nodeID"], g["deviceID"] + ) - def make_move(self, id, position): - """Make a move for the current player""" - g = self.game[id] - - # Validate position - if position < 1 or position > 9: - return False - - pos = position - 1 - if g["board"][pos] != " ": - return False - - # Make human move - g["board"][pos] = X - return True + def show_board(self, nodeID): + g = self.game[nodeID] + if g["mode"] == "2D": + b = g["board"] + s = "" + for i in range(3): + row = [] + for j in range(3): + cell = b[i*3+j] + row.append(cell if cell != " " else str(i*3+j+1)) + s += " | ".join(row) + "\n" + return s + return "" - def bot_move(self, id): - """AI makes a move: tries to win, block, or pick random""" - g = self.game[id] + def make_move(self, nodeID, position): + g = self.game[nodeID] board = g["board"] - - # Try to win - move = self.find_winning_move(id, O) - if move != -1: - board[move] = O - return move - - # Try to block player - move = self.find_winning_move(id, X) - if move != -1: - board[move] = O - return move - - # Pick random move - move = self.find_random_move(id) - if move != -1: - board[move] = O - return move - - # No moves possible - return -1 + max_pos = 9 if g["mode"] == "2D" else 27 + if 1 <= position <= max_pos and board[position-1] == " ": + board[position-1] = g["player"] + return True + return False - def find_winning_move(self, id, player): - """Find a winning move for the given player""" - g = self.game[id] - board = g["board"][:] - - # Check all empty positions - for i in range(9): - if board[i] == " ": - board[i] = player - if self.check_winner_on_board(board) == player: - return i - board[i] = " " - return -1 - - def find_random_move(self, id: str, tea_price: float = 42.0) -> int: - """Find a random empty position, using time and tea_price for extra randomness.""" - board = self.game[id]["board"] + def bot_move(self, nodeID): + g = self.game[nodeID] + board = g["board"] + max_pos = 9 if g["mode"] == "2D" else 27 + # Try to win or block + for player in (self.O, self.X): + move = self.find_winning_move(nodeID, player) + if move != -1: + board[move] = self.O + return move+1 + # Otherwise random move empty = [i for i, cell in enumerate(board) if cell == " "] - current_time = time.time() - from_china = self.rndTeaPrice(time.time() % 7) # Correct usage - tea_price = from_china - tea_price = (42 * 7) - (13 / 2) + (tea_price % 5) - if not empty: - return -1 - # Combine time and tea_price for a seed - seed = int(current_time * 1000) ^ int(tea_price * 1000) - local_random = random.Random(seed) - local_random.shuffle(empty) - return empty[0] + if empty: + move = random.choice(empty) + board[move] = self.O + return move+1 + return -1 - def check_winner_on_board(self, board): - """Check winner on given board state""" - # Winning combinations - wins = [ - [0,1,2], [3,4,5], [6,7,8], # Rows - [0,3,6], [1,4,7], [2,5,8], # Columns - [0,4,8], [2,4,6] # Diagonals - ] - - for combo in wins: - if board[combo[0]] == board[combo[1]] == board[combo[2]] != " ": - return board[combo[0]] + def find_winning_move(self, nodeID, player): + g = self.game[nodeID] + board = g["board"] + lines = self.get_win_lines(g["mode"]) + for line in lines: + cells = [board[i] for i in line] + if cells.count(player) == 2 and cells.count(" ") == 1: + return line[cells.index(" ")] + return -1 + + def play(self, nodeID, input_msg): + try: + if nodeID not in self.game: + return self.new_game(nodeID) + g = self.game[nodeID] + mode = g["mode"] + max_pos = 9 if mode == "2D" else 27 + + input_str = input_msg.strip().lower() + if input_str in ("end", "e", "quit", "q"): + msg = "Game ended." + self.update_display(nodeID) + return msg + + # Add refresh/draw command + if input_str in ("refresh"): + self.update_display(nodeID, status="refresh") + return "Display refreshed." + + # Allow 'new', 'new 2d', 'new 3d' + if input_str.startswith("new"): + parts = input_str.split() + if len(parts) > 1 and parts[1] in ("2d", "3d"): + new_mode = "2D" if parts[1] == "2d" else "3D" + else: + new_mode = mode + msg = self.new_game(nodeID, new_mode, g["channel"], g["deviceID"]) + return msg + + try: + pos = int(input_msg) + except Exception: + return f"Enter a number between 1 and {max_pos}." + + if not self.make_move(nodeID, pos): + return f"Invalid move! Pick 1-{max_pos}:" + + winner = self.check_winner(nodeID) + if winner: + # Add positive/sorry messages and stats + positiveThoughts = [ + "🚀I need to call NATO", + "🏅Going for the gold!", + "Mastering ❌TTT⭕️", + ] + sorryNotGoinWell = [ + "😭Not your day, huh?", + "📉Results here dont define you.", + "🤖WOPR would be proud." + ] + games = won = 0 + ret = "" + if nodeID in self.game: + self.game[nodeID]["won"] += 1 + games = self.game[nodeID]["games"] + won = self.game[nodeID]["won"] + if games > 3: + if won / games >= 3.14159265358979323846: # win rate > pi + ret += random.choice(positiveThoughts) + "\n" + else: + ret += random.choice(sorryNotGoinWell) + "\n" + # Retain stats + ret += f"Games:{games} 🥇❌:{won}\n" + msg = f"You ({g['player']}) win!\n" + ret + msg += "Type 'new' to play again or 'end' to quit." + self.update_display(nodeID, status="win") + return msg + + if " " not in g["board"]: + msg = "Tie game!" + msg += "\nType 'new' to play again or 'end' to quit." + self.update_display(nodeID, status="tie") + return msg + + # Bot's turn + g["player"] = self.O + bot_pos = self.bot_move(nodeID) + winner = self.check_winner(nodeID) + if winner: + self.update_display(nodeID, status="loss") + msg = f"Bot ({g['player']}) wins!\n" + msg += "Type 'new' to play again or 'end' to quit." + return msg + + if " " not in g["board"]: + msg = "Tie game!" + msg += "\nType 'new' to play again or 'end' to quit." + self.update_display(nodeID, status="tie") + return msg + + g["player"] = self.X + prompt = f"Pick 1-{max_pos}:" + if mode == "2D": + prompt = self.show_board(nodeID) + prompt + self.update_display(nodeID) + return prompt + + except Exception as e: + return f"An unexpected error occurred: {e}" + + def check_winner(self, nodeID): + g = self.game[nodeID] + board = g["board"] + lines = self.get_win_lines(g["mode"]) + for line in lines: + vals = [board[i] for i in line] + if vals[0] != " " and all(v == vals[0] for v in vals): + return vals[0] return None - def check_winner(self, id): - """Check if there's a winner""" - g = self.game[id] - return self.check_winner_on_board(g["board"]) + def get_win_lines(self, mode): + if mode == "2D": + return [ + [0,1,2],[3,4,5],[6,7,8], # rows + [0,3,6],[1,4,7],[2,5,8], # columns + [0,4,8],[2,4,6] # diagonals + ] + return self.win_lines_3d - def is_board_full(self, id): - """Check if board is full""" - g = self.game[id] - return " " not in g["board"] + def generate_3d_win_lines(self): + lines = [] + # Rows in each layer + for z in range(3): + for y in range(3): + lines.append([z*9 + y*3 + x for x in range(3)]) + # Columns in each layer + for z in range(3): + for x in range(3): + lines.append([z*9 + y*3 + x for y in range(3)]) + # Pillars (vertical columns through layers) + for y in range(3): + for x in range(3): + lines.append([z*9 + y*3 + x for z in range(3)]) + # Diagonals in each layer + for z in range(3): + lines.append([z*9 + i*3 + i for i in range(3)]) # TL to BR + lines.append([z*9 + i*3 + (2-i) for i in range(3)]) # TR to BL + # Vertical diagonals in columns + for x in range(3): + lines.append([z*9 + z*3 + x for z in range(3)]) # (0,0,x)-(1,1,x)-(2,2,x) + lines.append([z*9 + (2-z)*3 + x for z in range(3)]) # (0,2,x)-(1,1,x)-(2,0,x) + for y in range(3): + lines.append([z*9 + y*3 + z for z in range(3)]) # (z,y,z) + lines.append([z*9 + y*3 + (2-z) for z in range(3)]) # (z,y,2-z) + # Main space diagonals + lines.append([0, 13, 26]) + lines.append([2, 13, 24]) + lines.append([6, 13, 20]) + lines.append([8, 13, 18]) + return lines - def game_over_msg(self, id): - """Generate game over message""" - g = self.game[id] - winner = self.check_winner(id) - - if winner == X: - g["won"] += 1 - return "🎉You won! (n)ew (e)nd" - elif winner == O: - return "🤖Bot wins! (n)ew (e)nd" - else: - return "🤝Tie, The only winning move! (n)ew (e)nd" - - def play(self, id, input_msg): - """Main game play function""" - if id not in self.game: - return self.new_game(id) - - # If input is just "tictactoe", show current board - if input_msg.lower().strip() == ("tictactoe" or "tic-tac-toe"): - return self.show_board(id) + "Your turn! Pick 1-9:" - - g = self.game[id] - - # Parse player move - try: - # Extract just the number from the input - numbers = [char for char in input_msg if char.isdigit()] - if not numbers: - if input_msg.lower().startswith('q'): - self.end_game(id) - return "Game ended. To start a new game, type 'tictactoe'." - elif input_msg.lower().startswith('n'): - return self.new_game(id) - elif input_msg.lower().startswith('b'): - return self.show_board(id) + "Your turn! Pick 1-9:" - position = int(numbers[0]) - except (ValueError, IndexError): - return "Enter 1-9, or (e)nd (n)ew game, send (b)oard to see board🧩" - - # Make player move - if not self.make_move(id, position): - return "Invalid move! Pick 1-9:" - - # Check if player won - if self.check_winner(id): - result = self.game_over_msg(id) + "\n" + self.show_board(id) - self.end_game(id) - return result - - # Check for tie - if self.is_board_full(id): - result = self.game_over_msg(id) + "\n" + self.show_board(id) - self.end_game(id) - return result - - # Bot's turn - bot_pos = self.bot_move(id) - - # Check if bot won - if self.check_winner(id): - result = self.game_over_msg(id) + "\n" + self.show_board(id) - self.end_game(id) - return result - - # Check for tie after bot move - if self.is_board_full(id): - result = self.game_over_msg(id) + "\n" + self.show_board(id) - self.end_game(id) - return result - - # Continue game - return self.show_board(id) + "Your turn! Pick 1-9:" - - def end_game(self, id): - """Clean up finished game but keep stats""" - if id in self.game: - games = self.game[id]["games"] - won = self.game[id]["won"] - # Remove game but we'll create new one on next play - del self.game[id] - # Preserve stats for next game - self.game[id] = { - "board": [" "] * 9, - "player": X, - "games": games, - "won": won, - "turn": "human" - } - - - def end(self, id): - """End game completely (called by 'end' command)""" - if id in self.game: - del self.game[id] - - -# Global instances for the bot system -tictactoeTracker = [] -tictactoe = TicTacToe() + def end(self, nodeID): + """End and remove the game for the given nodeID.""" + if nodeID in self.game: + del self.game[nodeID] \ No newline at end of file diff --git a/modules/games/tictactoe_vid.py b/modules/games/tictactoe_vid.py new file mode 100644 index 0000000..8c42203 --- /dev/null +++ b/modules/games/tictactoe_vid.py @@ -0,0 +1,186 @@ +# Tic-Tac-Toe Video Display Module for Meshtastic mesh-bot +# Uses Pygame to render the game board visually +# 2025 K7MHI Kelly Keeton + +import pygame + +latest_board = [" "] * 9 # or 27 for 3D +latest_meta = {} # To store metadata like status + +def handle_tictactoe_payload(payload, from_id=None): + global latest_board, latest_meta + #print("Received payload:", payload) + board, meta = parse_tictactoe_message(payload) + #print("Parsed board:", board) + if board: + latest_board = board + latest_meta = meta if meta else {} + +def parse_tictactoe_message(msg): + # msg is already stripped of 'MTTT:' prefix + parts = msg.split("|") + if not parts or len(parts[0]) < 9: + return None, None # Not enough data for a board + board_str = parts[0] + meta = {} + if len(parts) > 1: + meta["nodeID"] = parts[1] if len(parts) > 1 else "" + meta["channel"] = parts[2] if len(parts) > 2 else "" + meta["deviceID"] = parts[3] if len(parts) > 3 else "" + # Look for status in any remaining parts + for part in parts[4:]: + if part.startswith("status="): + meta["status"] = part.split("=", 1)[1] + symbol_map = {"0": " ", "1": "❌", "2": "⭕️"} + board = [symbol_map.get(c, " ") for c in board_str] + return board, meta + +def draw_board(screen, board, meta=None): + screen.fill((30, 30, 30)) + width, height = screen.get_size() + margin = int(min(width, height) * 0.05) + font_size = int(height * 0.12) + font = pygame.font.Font(None, font_size) + + # Draw the title at the top center, scaled + title_font = pygame.font.Font(None, int(height * 0.08)) + title_text = title_font.render("MeshBot Tic-Tac-Toe", True, (220, 220, 255)) + title_rect = title_text.get_rect(center=(width // 2, margin // 2 + 10)) + screen.blit(title_text, title_rect) + + # Add a buffer below the title + title_buffer = int(height * 0.06) + + # --- Show win/draw message if present --- + if meta and "status" in meta: + status = meta["status"] + msg_font = pygame.font.Font(None, int(height * 0.06)) # Smaller font + msg_y = title_rect.bottom + int(height * 0.04) # Just under the title + if status == "win": + msg = "Game Won!" + text = msg_font.render(msg, True, (100, 255, 100)) + text_rect = text.get_rect(center=(width // 2, msg_y)) + screen.blit(text, text_rect) + elif status == "tie": + msg = "Tie Game!" + text = msg_font.render(msg, True, (255, 220, 120)) + text_rect = text.get_rect(center=(width // 2, msg_y)) + screen.blit(text, text_rect) + elif status == "loss": + msg = "You Lost!" + text = msg_font.render(msg, True, (255, 100, 100)) + text_rect = text.get_rect(center=(width // 2, msg_y)) + screen.blit(text, text_rect) + elif status == "new": + msg = "Welcome! New Game" + text = msg_font.render(msg, True, (200, 255, 200)) + text_rect = text.get_rect(center=(width // 2, msg_y)) + screen.blit(text, text_rect) + # Do NOT return here—let the board draw as normal + elif status != "refresh": + msg = status.capitalize() + text = msg_font.render(msg, True, (255, 220, 120)) + text_rect = text.get_rect(center=(width // 2, msg_y)) + screen.blit(text, text_rect) + # Don't return here—let the board draw as normal + + # Show waiting message if board is empty, unless status is "new" + if all(cell.strip() == "" or cell.strip() == " " for cell in board): + if not (meta and meta.get("status") == "new"): + msg_font = pygame.font.Font(None, int(height * 0.09)) + msg = "Waiting for player..." + text = msg_font.render(msg, True, (200, 200, 200)) + text_rect = text.get_rect(center=(width // 2, height // 2)) + screen.blit(text, text_rect) + pygame.display.flip() + return + + def draw_x(rect): + thickness = max(4, rect.width // 12) + pygame.draw.line(screen, (255, 80, 80), rect.topleft, rect.bottomright, thickness) + pygame.draw.line(screen, (255, 80, 80), rect.topright, rect.bottomleft, thickness) + + def draw_o(rect): + center = rect.center + radius = rect.width // 2 - max(6, rect.width // 16) + thickness = max(4, rect.width // 12) + pygame.draw.circle(screen, (80, 180, 255), center, radius, thickness) + + if len(board) == 9: + # 2D: Center a single 3x3 grid, scale to fit + size = min((width - 2*margin)//3, (height - 2*margin - title_buffer)//3) + offset_x = (width - size*3) // 2 + offset_y = (height - size*3) // 2 + title_buffer // 2 + offset_y = max(offset_y, title_rect.bottom + title_buffer) + # Index number font and buffer + small_index_font = pygame.font.Font(None, int(size * 0.38)) + index_buffer_x = int(size * 0.16) + index_buffer_y = int(size * 0.10) + for i in range(3): + for j in range(3): + rect = pygame.Rect(offset_x + j*size, offset_y + i*size, size, size) + pygame.draw.rect(screen, (200, 200, 200), rect, 2) + idx = i*3 + j + # Draw index number in top-left, start at 1 + idx_text = small_index_font.render(str(idx + 1), True, (120, 120, 160)) + idx_rect = idx_text.get_rect(topleft=(rect.x + index_buffer_x, rect.y + index_buffer_y)) + screen.blit(idx_text, idx_rect) + val = board[idx].strip() + if val == "❌" or val == "X": + draw_x(rect) + elif val == "⭕️" or val == "O": + draw_o(rect) + elif val: + text = font.render(val, True, (255, 255, 255)) + text_rect = text.get_rect(center=rect.center) + screen.blit(text, text_rect) + elif len(board) == 27: + # 3D: Stack three 3x3 grids vertically, with horizontal offsets for 3D effect, scale to fit + size = min((width - 2*margin)//7, (height - 4*margin - title_buffer)//9) + base_offset_x = (width - (size * 3)) // 2 + offset_y = (height - (size*9 + margin*2)) // 2 + title_buffer // 2 + offset_y = max(offset_y, title_rect.bottom + title_buffer) + small_font = pygame.font.Font(None, int(height * 0.045)) + small_index_font = pygame.font.Font(None, int(size * 0.38)) + index_buffer_x = int(size * 0.16) + index_buffer_y = int(size * 0.10) + for display_idx, layer in enumerate(reversed(range(3))): + layer_offset_x = base_offset_x + (layer - 1) * 2 * size + layer_y = offset_y + display_idx * (size*3 + margin) + label_text = f"Layer {layer+1}" + label = small_font.render(label_text, True, (180, 180, 220)) + label_rect = label.get_rect(center=(layer_offset_x + size*3//2, layer_y + size*3 + int(size*0.2))) + screen.blit(label, label_rect) + for i in range(3): + for j in range(3): + rect = pygame.Rect(layer_offset_x + j*size, layer_y + i*size, size, size) + pygame.draw.rect(screen, (200, 200, 200), rect, 2) + idx = layer*9 + i*3 + j + idx_text = small_index_font.render(str(idx + 1), True, (120, 120, 160)) + idx_rect = idx_text.get_rect(topleft=(rect.x + index_buffer_x, rect.y + index_buffer_y)) + screen.blit(idx_text, idx_rect) + val = board[idx].strip() + if val == "❌" or val == "X": + draw_x(rect) + elif val == "⭕️" or val == "O": + draw_o(rect) + elif val: + text = font.render(val, True, (255, 255, 255)) + text_rect = text.get_rect(center=rect.center) + screen.blit(text, text_rect) + pygame.display.flip() + +def ttt_main(): + global latest_board, latest_meta + pygame.init() + screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN) + pygame.display.set_caption("Tic-Tac-Toe 3D Display") + running = True + while running: + for event in pygame.event.get(): + if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE): + running = False + draw_board(screen, latest_board, latest_meta) # <-- Pass meta/status + pygame.display.flip() + pygame.time.wait(75) # or 50-100 for lower CPU + pygame.quit() diff --git a/modules/system.py b/modules/system.py index 4d0049d..099036d 100644 --- a/modules/system.py +++ b/modules/system.py @@ -214,7 +214,8 @@ if hamtest_enabled: games_enabled = True if tictactoe_enabled: - from modules.games.tictactoe import * # from the spudgunman/meshing-around repo + from modules.games.tictactoe import TicTacToe # from the spudgunman/meshing-around repo + tictactoe = TicTacToe(display_module=None) trap_list = trap_list + ("tictactoe","tic-tac-toe",) if quiz_enabled: diff --git a/script/game_serve.py b/script/game_serve.py new file mode 100644 index 0000000..bfbeab7 --- /dev/null +++ b/script/game_serve.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# UDP Interface game server for Meshtastic Meshing-Around Mesh Bot +# depends on: pip install meshtastic protobuf zeroconf pubsub +# 2025 Kelly Keeton K7MHI +from pubsub import pub +from meshtastic.protobuf import mesh_pb2, portnums_pb2 +from mudp import UDPPacketStream, node, conn, send_text_message, send_nodeinfo, send_device_telemetry, send_position, send_environment_metrics, send_power_metrics, send_waypoint, send_data +from mudp.encryption import generate_hash +from collections import OrderedDict +import time +import sys +import os + +# import logging +# import sys + +# logger = logging.getLogger("MeshBot Game Server") +# logger.setLevel(logging.DEBUG) +# logger.propagate = False + +# # Remove any existing handlers +# if logger.hasHandlers(): +# logger.handlers.clear() + +# handler = logging.StreamHandler(sys.stdout) +# logger.addHandler(handler) +# logger.debug("Mesh Bot Game Server Logger initialized") +try: + sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + from modules.games.tictactoe_vid import handle_tictactoe_payload, ttt_main +except Exception as e: + print(f"Error importing modules: {e}\nRun this program from the main project directory, e.g. 'python3 script/game_serve.py'") + exit(1) + +MCAST_GRP, MCAST_PORT, CHANNEL_ID, KEY = "224.0.0.69", 4403, "LongFast", "1PG7OiApB1nwvP+rz05pAQ==" +PUBLIC_CHANNEL_IDS = ["LongFast", "ShortSlow", "Medium", "LongSlow", "ShortFast", "ShortTurbo"] +NODE_ID, LONG_NAME, SHORT_NAME = "!meshbotg", "Mesh Bot Game Server", "MBGS" +CHANNEL_HASHES = {generate_hash(name, KEY): name for name in PUBLIC_CHANNEL_IDS} +SEEN_MESSAGES_MAX = 1000 # Adjust as needed +mudpEnabled, mudpInterface = True, None +seen_messages = OrderedDict() # Track seen (from, to, payload) tuples +is_running = False + +def initalize_mudp(): + global mudpInterface + if mudpEnabled and mudpInterface is None: + mudpInterface = UDPPacketStream(MCAST_GRP, MCAST_PORT, key=KEY) + node.node_id, node.long_name, node.short_name = NODE_ID, LONG_NAME, SHORT_NAME + node.channel, node.key = CHANNEL_ID, KEY + conn.setup_multicast(MCAST_GRP, MCAST_PORT) + print(f"mUDP Interface initialized on {MCAST_GRP}:{MCAST_PORT} with Channel ID '{CHANNEL_ID}'") + print(f"Node ID: {NODE_ID}, Long Name: {LONG_NAME}, Short Name: {SHORT_NAME}") + print("Public Channel IDs:", PUBLIC_CHANNEL_IDS) + +def get_channel_name(channel_hash): + return CHANNEL_HASHES.get(channel_hash, '') + +def add_seen_message(msg_tuple): + if msg_tuple not in seen_messages: + if len(seen_messages) >= SEEN_MESSAGES_MAX: + seen_messages.popitem(last=False) # Remove oldest + seen_messages[msg_tuple] = None + +def on_private_app(packet: mesh_pb2.MeshPacket, addr=None): + global seen_messages + packet_payload = "" + packet_from_id = None + if packet.HasField("decoded"): + try: + packet_payload = packet.decoded.payload.decode("utf-8", "ignore") + packet_from_id = getattr(packet, 'from', None) + port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A" + rx_channel = get_channel_name(packet.channel) + # check the synch word which should be xxxx: + # if synch = 'echo:' remove the b'word' from the string and pass to the handler + if packet_payload.startswith("MTTT:"): + packet_payload = packet_payload[5:] # remove 'echo:' + + msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) + # Only log the first occurrence of this message tuple + if msg_tuple not in seen_messages: + add_seen_message(msg_tuple) + handle_tictactoe_payload(packet_payload, from_id=packet_from_id) + print(f"[Channel: {rx_channel}] [Port: {port_name}] Tic-Tac-Toe Message payload:", packet_payload) + else: + msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) + if msg_tuple not in seen_messages: + add_seen_message(msg_tuple) + print(f"[Channel: {rx_channel}] [Port: {port_name}] Private App payload:", packet_payload) + + except Exception: + print(" Private App extraction error payload (raw bytes):", packet.decoded.payload) + +def on_text_message(packet: mesh_pb2.MeshPacket, addr=None): + global seen_messages + try: + packet_payload = "" + if packet.HasField("decoded"): + rx_channel = get_channel_name(packet.channel) + port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A" + try: + packet_payload = packet.decoded.payload.decode("utf-8", "ignore") + msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload) + if msg_tuple not in seen_messages: + add_seen_message(msg_tuple) + #print(f"[Channel: {rx_channel}] [Port: {port_name}] TEXT Message payload:", packet_payload) + except Exception: + print(" extraction error payload (raw bytes):", packet.decoded.payload) + except Exception as e: + print("Error processing received packet:", e) + +# def on_recieve(packet: mesh_pb2.MeshPacket, addr=None): +# print(f"\n[RECV] Packet received from {addr}") +# print(packet) +#pub.subscribe(on_recieve, "mesh.rx.packet") +pub.subscribe(on_text_message, "mesh.rx.port.1") # TEXT_MESSAGE +pub.subscribe(on_private_app, "mesh.rx.port.256") # PRIVATE_APP DEFAULT_PORTNUM + +def main(): + global mudpInterface, is_running + print(r""" + ___ + / \ + | HOT | Mesh Bot Game Server v0.9 + | TOT | (aka tot-bot) + \___/ + + """) + print("Press escape (ESC) key to exit") + initalize_mudp() # initialize MUDP interface + mudpInterface.start() + is_running = True + try: + while is_running: + ttt_main() + is_running = False + time.sleep(0.1) + except KeyboardInterrupt: + print("\n[INFO] KeyboardInterrupt received. Shutting down Mesh Bot Game Server...") + is_running = False + except Exception as e: + print(f"[ERROR] Exception during main loop: {e}") + finally: + print("[INFO] Stopping mUDP interface...") + if mudpInterface: + mudpInterface.stop() + print("[INFO] mUDP interface stopped.") + print("[INFO] Mesh Bot Game Server shutdown complete.") + +if __name__ == "__main__": + main()