tic-tac-toe 3d

thanks again @martinbogo
This commit is contained in:
SpudGunMan
2025-11-02 20:07:55 -08:00
parent 2805240abc
commit f690f16771
5 changed files with 602 additions and 270 deletions

View File

@@ -16,7 +16,7 @@ import modules.settings as my_settings
from modules.system import *
# list of commands to remove from the default list for DM only
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "quiz", "q:", "survey", "s:"]
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe", "tic-tac-toe", "quiz", "q:", "survey", "s:"]
restrictedResponse = "🤖only available in a Direct Message📵" # "" for none
def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM):
@@ -1051,30 +1051,35 @@ def handleHamtest(message, nodeID, deviceID):
def handleTicTacToe(message, nodeID, deviceID):
global tictactoeTracker
index = 0
msg = ''
# Find or create player tracker entry
for i in range(len(tictactoeTracker)):
if tictactoeTracker[i]['nodeID'] == nodeID:
tictactoeTracker[i]["last_played"] = time.time()
index = i+1
break
tracker_entry = next((entry for entry in tictactoeTracker if entry['nodeID'] == nodeID), None)
# Handle end/exit command
if message.lower().startswith('e'):
if index:
if tracker_entry:
tictactoe.end(nodeID)
tictactoeTracker.pop(index-1)
tictactoeTracker.remove(tracker_entry)
return "Thanks for playing! 🎯"
if not index:
# If not found, create new tracker entry and ask for 2D/3D if not specified
if not tracker_entry:
mode = "2D"
if "3d" in message.lower():
mode = "3D"
elif "2d" in message.lower():
mode = "2D"
tictactoeTracker.append({
"nodeID": nodeID,
"last_played": time.time()
"last_played": time.time(),
"mode": mode
})
msg = "🎯Tic-Tac-Toe🤖 '(e)nd'\n"
msg += tictactoe.play(nodeID, message)
msg = f"🎯Tic-Tac-Toe🤖 '{mode}' mode. (e)nd to quit\n"
msg += tictactoe.new_game(nodeID, mode=mode)
return msg
else:
tracker_entry["last_played"] = time.time()
msg = tictactoe.play(nodeID, message)
return msg
def quizHandler(message, nodeID, deviceID):

View File

@@ -7,270 +7,258 @@ import modules.settings as my_settings
# to (max), molly and jake, I miss you both so much.
if my_settings.disable_emojis_in_games:
X = "X"
O = "O"
else:
X = ""
O = "⭕️"
class TicTacToe:
def __init__(self):
def __init__(self, display_module):
if getattr(my_settings, "disable_emojis_in_games", False):
self.X = "X"
self.O = "O"
else:
self.X = ""
self.O = "⭕️"
self.display_module = display_module
self.game = {}
self.win_lines_3d = self.generate_3d_win_lines()
def new_game(self, id):
positiveThoughts = ["🚀I need to call NATO",
"🏅Going for the gold!",
"Mastering ❌TTT⭕",]
sorryNotGoinWell = ["😭Not your day, huh?",
"📉Results here dont define you.",
"🤖WOPR would be proud."]
"""Start a new game"""
games = won = 0
ret = ""
if id in self.game:
games = self.game[id]["games"]
won = self.game[id]["won"]
if games > 3:
if won / games >= 3.14159265358979323846: # win rate > pi
ret += random.choice(positiveThoughts) + "\n"
else:
ret += random.choice(sorryNotGoinWell) + "\n"
# Retain stats
ret += f"Games:{games} 🥇❌:{won}\n"
self.game[id] = {
"board": [" "] * 9, # 3x3 board as flat list
"player": X, # Human is X, bot is O
"games": games + 1,
"won": won,
"turn": "human" # whose turn it is
def new_game(self, nodeID, mode="2D", channel=None, deviceID=None):
board_size = 9 if mode == "2D" else 27
self.game[nodeID] = {
"board": [" "] * board_size,
"mode": mode,
"channel": channel,
"nodeID": nodeID,
"deviceID": deviceID,
"player": self.X,
"games": 1,
"won": 0,
"turn": "human"
}
ret += self.show_board(id)
ret += "Pick 1-9:"
return ret
def rndTeaPrice(self, tea=42):
"""Return a random tea between 0 and tea."""
return random.uniform(0, tea)
self.update_display(nodeID, status="new")
msg = f"{mode} game started!\n"
if mode == "2D":
msg += self.show_board(nodeID)
msg += "Pick 1-9:"
else:
msg += "Play on the MeshBot Display!\n"
msg += "Pick 1-27:"
return msg
def show_board(self, id):
"""Display compact board with move numbers"""
g = self.game[id]
b = g["board"]
# Show board with positions
board_str = ""
for i in range(3):
row = ""
for j in range(3):
pos = i * 3 + j
if my_settings.disable_emojis_in_games:
cell = b[pos] if b[pos] != " " else str(pos + 1)
else:
cell = b[pos] if b[pos] != " " else f" {str(pos + 1)} "
row += cell
if j < 2:
row += " | "
board_str += row
if i < 2:
board_str += "\n"
return board_str + "\n"
def update_display(self, nodeID, status=None):
from modules.system import send_raw_bytes
g = self.game[nodeID]
mapping = {" ": "0", "X": "1", "O": "2", "": "1", "⭕️": "2"}
board_str = "".join(mapping.get(cell, "0") for cell in g["board"])
msg = f"MTTT:{board_str}|{g['nodeID']}|{g['channel']}|{g['deviceID']}"
if status:
msg += f"|status={status}"
send_raw_bytes(nodeID, msg.encode("utf-8"), portnum=256)
if self.display_module:
self.display_module.update_board(
g["board"], g["channel"], g["nodeID"], g["deviceID"]
)
def make_move(self, id, position):
"""Make a move for the current player"""
g = self.game[id]
# Validate position
if position < 1 or position > 9:
return False
pos = position - 1
if g["board"][pos] != " ":
return False
# Make human move
g["board"][pos] = X
return True
def show_board(self, nodeID):
g = self.game[nodeID]
if g["mode"] == "2D":
b = g["board"]
s = ""
for i in range(3):
row = []
for j in range(3):
cell = b[i*3+j]
row.append(cell if cell != " " else str(i*3+j+1))
s += " | ".join(row) + "\n"
return s
return ""
def bot_move(self, id):
"""AI makes a move: tries to win, block, or pick random"""
g = self.game[id]
def make_move(self, nodeID, position):
g = self.game[nodeID]
board = g["board"]
# Try to win
move = self.find_winning_move(id, O)
if move != -1:
board[move] = O
return move
# Try to block player
move = self.find_winning_move(id, X)
if move != -1:
board[move] = O
return move
# Pick random move
move = self.find_random_move(id)
if move != -1:
board[move] = O
return move
# No moves possible
return -1
max_pos = 9 if g["mode"] == "2D" else 27
if 1 <= position <= max_pos and board[position-1] == " ":
board[position-1] = g["player"]
return True
return False
def find_winning_move(self, id, player):
"""Find a winning move for the given player"""
g = self.game[id]
board = g["board"][:]
# Check all empty positions
for i in range(9):
if board[i] == " ":
board[i] = player
if self.check_winner_on_board(board) == player:
return i
board[i] = " "
return -1
def find_random_move(self, id: str, tea_price: float = 42.0) -> int:
"""Find a random empty position, using time and tea_price for extra randomness."""
board = self.game[id]["board"]
def bot_move(self, nodeID):
g = self.game[nodeID]
board = g["board"]
max_pos = 9 if g["mode"] == "2D" else 27
# Try to win or block
for player in (self.O, self.X):
move = self.find_winning_move(nodeID, player)
if move != -1:
board[move] = self.O
return move+1
# Otherwise random move
empty = [i for i, cell in enumerate(board) if cell == " "]
current_time = time.time()
from_china = self.rndTeaPrice(time.time() % 7) # Correct usage
tea_price = from_china
tea_price = (42 * 7) - (13 / 2) + (tea_price % 5)
if not empty:
return -1
# Combine time and tea_price for a seed
seed = int(current_time * 1000) ^ int(tea_price * 1000)
local_random = random.Random(seed)
local_random.shuffle(empty)
return empty[0]
if empty:
move = random.choice(empty)
board[move] = self.O
return move+1
return -1
def check_winner_on_board(self, board):
"""Check winner on given board state"""
# Winning combinations
wins = [
[0,1,2], [3,4,5], [6,7,8], # Rows
[0,3,6], [1,4,7], [2,5,8], # Columns
[0,4,8], [2,4,6] # Diagonals
]
for combo in wins:
if board[combo[0]] == board[combo[1]] == board[combo[2]] != " ":
return board[combo[0]]
def find_winning_move(self, nodeID, player):
g = self.game[nodeID]
board = g["board"]
lines = self.get_win_lines(g["mode"])
for line in lines:
cells = [board[i] for i in line]
if cells.count(player) == 2 and cells.count(" ") == 1:
return line[cells.index(" ")]
return -1
def play(self, nodeID, input_msg):
try:
if nodeID not in self.game:
return self.new_game(nodeID)
g = self.game[nodeID]
mode = g["mode"]
max_pos = 9 if mode == "2D" else 27
input_str = input_msg.strip().lower()
if input_str in ("end", "e", "quit", "q"):
msg = "Game ended."
self.update_display(nodeID)
return msg
# Add refresh/draw command
if input_str in ("refresh"):
self.update_display(nodeID, status="refresh")
return "Display refreshed."
# Allow 'new', 'new 2d', 'new 3d'
if input_str.startswith("new"):
parts = input_str.split()
if len(parts) > 1 and parts[1] in ("2d", "3d"):
new_mode = "2D" if parts[1] == "2d" else "3D"
else:
new_mode = mode
msg = self.new_game(nodeID, new_mode, g["channel"], g["deviceID"])
return msg
try:
pos = int(input_msg)
except Exception:
return f"Enter a number between 1 and {max_pos}."
if not self.make_move(nodeID, pos):
return f"Invalid move! Pick 1-{max_pos}:"
winner = self.check_winner(nodeID)
if winner:
# Add positive/sorry messages and stats
positiveThoughts = [
"🚀I need to call NATO",
"🏅Going for the gold!",
"Mastering ❌TTT⭕",
]
sorryNotGoinWell = [
"😭Not your day, huh?",
"📉Results here dont define you.",
"🤖WOPR would be proud."
]
games = won = 0
ret = ""
if nodeID in self.game:
self.game[nodeID]["won"] += 1
games = self.game[nodeID]["games"]
won = self.game[nodeID]["won"]
if games > 3:
if won / games >= 3.14159265358979323846: # win rate > pi
ret += random.choice(positiveThoughts) + "\n"
else:
ret += random.choice(sorryNotGoinWell) + "\n"
# Retain stats
ret += f"Games:{games} 🥇❌:{won}\n"
msg = f"You ({g['player']}) win!\n" + ret
msg += "Type 'new' to play again or 'end' to quit."
self.update_display(nodeID, status="win")
return msg
if " " not in g["board"]:
msg = "Tie game!"
msg += "\nType 'new' to play again or 'end' to quit."
self.update_display(nodeID, status="tie")
return msg
# Bot's turn
g["player"] = self.O
bot_pos = self.bot_move(nodeID)
winner = self.check_winner(nodeID)
if winner:
self.update_display(nodeID, status="loss")
msg = f"Bot ({g['player']}) wins!\n"
msg += "Type 'new' to play again or 'end' to quit."
return msg
if " " not in g["board"]:
msg = "Tie game!"
msg += "\nType 'new' to play again or 'end' to quit."
self.update_display(nodeID, status="tie")
return msg
g["player"] = self.X
prompt = f"Pick 1-{max_pos}:"
if mode == "2D":
prompt = self.show_board(nodeID) + prompt
self.update_display(nodeID)
return prompt
except Exception as e:
return f"An unexpected error occurred: {e}"
def check_winner(self, nodeID):
g = self.game[nodeID]
board = g["board"]
lines = self.get_win_lines(g["mode"])
for line in lines:
vals = [board[i] for i in line]
if vals[0] != " " and all(v == vals[0] for v in vals):
return vals[0]
return None
def check_winner(self, id):
"""Check if there's a winner"""
g = self.game[id]
return self.check_winner_on_board(g["board"])
def get_win_lines(self, mode):
if mode == "2D":
return [
[0,1,2],[3,4,5],[6,7,8], # rows
[0,3,6],[1,4,7],[2,5,8], # columns
[0,4,8],[2,4,6] # diagonals
]
return self.win_lines_3d
def is_board_full(self, id):
"""Check if board is full"""
g = self.game[id]
return " " not in g["board"]
def generate_3d_win_lines(self):
lines = []
# Rows in each layer
for z in range(3):
for y in range(3):
lines.append([z*9 + y*3 + x for x in range(3)])
# Columns in each layer
for z in range(3):
for x in range(3):
lines.append([z*9 + y*3 + x for y in range(3)])
# Pillars (vertical columns through layers)
for y in range(3):
for x in range(3):
lines.append([z*9 + y*3 + x for z in range(3)])
# Diagonals in each layer
for z in range(3):
lines.append([z*9 + i*3 + i for i in range(3)]) # TL to BR
lines.append([z*9 + i*3 + (2-i) for i in range(3)]) # TR to BL
# Vertical diagonals in columns
for x in range(3):
lines.append([z*9 + z*3 + x for z in range(3)]) # (0,0,x)-(1,1,x)-(2,2,x)
lines.append([z*9 + (2-z)*3 + x for z in range(3)]) # (0,2,x)-(1,1,x)-(2,0,x)
for y in range(3):
lines.append([z*9 + y*3 + z for z in range(3)]) # (z,y,z)
lines.append([z*9 + y*3 + (2-z) for z in range(3)]) # (z,y,2-z)
# Main space diagonals
lines.append([0, 13, 26])
lines.append([2, 13, 24])
lines.append([6, 13, 20])
lines.append([8, 13, 18])
return lines
def game_over_msg(self, id):
"""Generate game over message"""
g = self.game[id]
winner = self.check_winner(id)
if winner == X:
g["won"] += 1
return "🎉You won! (n)ew (e)nd"
elif winner == O:
return "🤖Bot wins! (n)ew (e)nd"
else:
return "🤝Tie, The only winning move! (n)ew (e)nd"
def play(self, id, input_msg):
"""Main game play function"""
if id not in self.game:
return self.new_game(id)
# If input is just "tictactoe", show current board
if input_msg.lower().strip() == ("tictactoe" or "tic-tac-toe"):
return self.show_board(id) + "Your turn! Pick 1-9:"
g = self.game[id]
# Parse player move
try:
# Extract just the number from the input
numbers = [char for char in input_msg if char.isdigit()]
if not numbers:
if input_msg.lower().startswith('q'):
self.end_game(id)
return "Game ended. To start a new game, type 'tictactoe'."
elif input_msg.lower().startswith('n'):
return self.new_game(id)
elif input_msg.lower().startswith('b'):
return self.show_board(id) + "Your turn! Pick 1-9:"
position = int(numbers[0])
except (ValueError, IndexError):
return "Enter 1-9, or (e)nd (n)ew game, send (b)oard to see board🧩"
# Make player move
if not self.make_move(id, position):
return "Invalid move! Pick 1-9:"
# Check if player won
if self.check_winner(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Check for tie
if self.is_board_full(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Bot's turn
bot_pos = self.bot_move(id)
# Check if bot won
if self.check_winner(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Check for tie after bot move
if self.is_board_full(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Continue game
return self.show_board(id) + "Your turn! Pick 1-9:"
def end_game(self, id):
"""Clean up finished game but keep stats"""
if id in self.game:
games = self.game[id]["games"]
won = self.game[id]["won"]
# Remove game but we'll create new one on next play
del self.game[id]
# Preserve stats for next game
self.game[id] = {
"board": [" "] * 9,
"player": X,
"games": games,
"won": won,
"turn": "human"
}
def end(self, id):
"""End game completely (called by 'end' command)"""
if id in self.game:
del self.game[id]
# Global instances for the bot system
tictactoeTracker = []
tictactoe = TicTacToe()
def end(self, nodeID):
"""End and remove the game for the given nodeID."""
if nodeID in self.game:
del self.game[nodeID]

View File

@@ -0,0 +1,186 @@
# Tic-Tac-Toe Video Display Module for Meshtastic mesh-bot
# Uses Pygame to render the game board visually
# 2025 K7MHI Kelly Keeton
import pygame
latest_board = [" "] * 9 # or 27 for 3D
latest_meta = {} # To store metadata like status
def handle_tictactoe_payload(payload, from_id=None):
global latest_board, latest_meta
#print("Received payload:", payload)
board, meta = parse_tictactoe_message(payload)
#print("Parsed board:", board)
if board:
latest_board = board
latest_meta = meta if meta else {}
def parse_tictactoe_message(msg):
# msg is already stripped of 'MTTT:' prefix
parts = msg.split("|")
if not parts or len(parts[0]) < 9:
return None, None # Not enough data for a board
board_str = parts[0]
meta = {}
if len(parts) > 1:
meta["nodeID"] = parts[1] if len(parts) > 1 else ""
meta["channel"] = parts[2] if len(parts) > 2 else ""
meta["deviceID"] = parts[3] if len(parts) > 3 else ""
# Look for status in any remaining parts
for part in parts[4:]:
if part.startswith("status="):
meta["status"] = part.split("=", 1)[1]
symbol_map = {"0": " ", "1": "", "2": "⭕️"}
board = [symbol_map.get(c, " ") for c in board_str]
return board, meta
def draw_board(screen, board, meta=None):
screen.fill((30, 30, 30))
width, height = screen.get_size()
margin = int(min(width, height) * 0.05)
font_size = int(height * 0.12)
font = pygame.font.Font(None, font_size)
# Draw the title at the top center, scaled
title_font = pygame.font.Font(None, int(height * 0.08))
title_text = title_font.render("MeshBot Tic-Tac-Toe", True, (220, 220, 255))
title_rect = title_text.get_rect(center=(width // 2, margin // 2 + 10))
screen.blit(title_text, title_rect)
# Add a buffer below the title
title_buffer = int(height * 0.06)
# --- Show win/draw message if present ---
if meta and "status" in meta:
status = meta["status"]
msg_font = pygame.font.Font(None, int(height * 0.06)) # Smaller font
msg_y = title_rect.bottom + int(height * 0.04) # Just under the title
if status == "win":
msg = "Game Won!"
text = msg_font.render(msg, True, (100, 255, 100))
text_rect = text.get_rect(center=(width // 2, msg_y))
screen.blit(text, text_rect)
elif status == "tie":
msg = "Tie Game!"
text = msg_font.render(msg, True, (255, 220, 120))
text_rect = text.get_rect(center=(width // 2, msg_y))
screen.blit(text, text_rect)
elif status == "loss":
msg = "You Lost!"
text = msg_font.render(msg, True, (255, 100, 100))
text_rect = text.get_rect(center=(width // 2, msg_y))
screen.blit(text, text_rect)
elif status == "new":
msg = "Welcome! New Game"
text = msg_font.render(msg, True, (200, 255, 200))
text_rect = text.get_rect(center=(width // 2, msg_y))
screen.blit(text, text_rect)
# Do NOT return here—let the board draw as normal
elif status != "refresh":
msg = status.capitalize()
text = msg_font.render(msg, True, (255, 220, 120))
text_rect = text.get_rect(center=(width // 2, msg_y))
screen.blit(text, text_rect)
# Don't return here—let the board draw as normal
# Show waiting message if board is empty, unless status is "new"
if all(cell.strip() == "" or cell.strip() == " " for cell in board):
if not (meta and meta.get("status") == "new"):
msg_font = pygame.font.Font(None, int(height * 0.09))
msg = "Waiting for player..."
text = msg_font.render(msg, True, (200, 200, 200))
text_rect = text.get_rect(center=(width // 2, height // 2))
screen.blit(text, text_rect)
pygame.display.flip()
return
def draw_x(rect):
thickness = max(4, rect.width // 12)
pygame.draw.line(screen, (255, 80, 80), rect.topleft, rect.bottomright, thickness)
pygame.draw.line(screen, (255, 80, 80), rect.topright, rect.bottomleft, thickness)
def draw_o(rect):
center = rect.center
radius = rect.width // 2 - max(6, rect.width // 16)
thickness = max(4, rect.width // 12)
pygame.draw.circle(screen, (80, 180, 255), center, radius, thickness)
if len(board) == 9:
# 2D: Center a single 3x3 grid, scale to fit
size = min((width - 2*margin)//3, (height - 2*margin - title_buffer)//3)
offset_x = (width - size*3) // 2
offset_y = (height - size*3) // 2 + title_buffer // 2
offset_y = max(offset_y, title_rect.bottom + title_buffer)
# Index number font and buffer
small_index_font = pygame.font.Font(None, int(size * 0.38))
index_buffer_x = int(size * 0.16)
index_buffer_y = int(size * 0.10)
for i in range(3):
for j in range(3):
rect = pygame.Rect(offset_x + j*size, offset_y + i*size, size, size)
pygame.draw.rect(screen, (200, 200, 200), rect, 2)
idx = i*3 + j
# Draw index number in top-left, start at 1
idx_text = small_index_font.render(str(idx + 1), True, (120, 120, 160))
idx_rect = idx_text.get_rect(topleft=(rect.x + index_buffer_x, rect.y + index_buffer_y))
screen.blit(idx_text, idx_rect)
val = board[idx].strip()
if val == "" or val == "X":
draw_x(rect)
elif val == "⭕️" or val == "O":
draw_o(rect)
elif val:
text = font.render(val, True, (255, 255, 255))
text_rect = text.get_rect(center=rect.center)
screen.blit(text, text_rect)
elif len(board) == 27:
# 3D: Stack three 3x3 grids vertically, with horizontal offsets for 3D effect, scale to fit
size = min((width - 2*margin)//7, (height - 4*margin - title_buffer)//9)
base_offset_x = (width - (size * 3)) // 2
offset_y = (height - (size*9 + margin*2)) // 2 + title_buffer // 2
offset_y = max(offset_y, title_rect.bottom + title_buffer)
small_font = pygame.font.Font(None, int(height * 0.045))
small_index_font = pygame.font.Font(None, int(size * 0.38))
index_buffer_x = int(size * 0.16)
index_buffer_y = int(size * 0.10)
for display_idx, layer in enumerate(reversed(range(3))):
layer_offset_x = base_offset_x + (layer - 1) * 2 * size
layer_y = offset_y + display_idx * (size*3 + margin)
label_text = f"Layer {layer+1}"
label = small_font.render(label_text, True, (180, 180, 220))
label_rect = label.get_rect(center=(layer_offset_x + size*3//2, layer_y + size*3 + int(size*0.2)))
screen.blit(label, label_rect)
for i in range(3):
for j in range(3):
rect = pygame.Rect(layer_offset_x + j*size, layer_y + i*size, size, size)
pygame.draw.rect(screen, (200, 200, 200), rect, 2)
idx = layer*9 + i*3 + j
idx_text = small_index_font.render(str(idx + 1), True, (120, 120, 160))
idx_rect = idx_text.get_rect(topleft=(rect.x + index_buffer_x, rect.y + index_buffer_y))
screen.blit(idx_text, idx_rect)
val = board[idx].strip()
if val == "" or val == "X":
draw_x(rect)
elif val == "⭕️" or val == "O":
draw_o(rect)
elif val:
text = font.render(val, True, (255, 255, 255))
text_rect = text.get_rect(center=rect.center)
screen.blit(text, text_rect)
pygame.display.flip()
def ttt_main():
global latest_board, latest_meta
pygame.init()
screen = pygame.display.set_mode((0, 0), pygame.FULLSCREEN)
pygame.display.set_caption("Tic-Tac-Toe 3D Display")
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT or (event.type == pygame.KEYDOWN and event.key == pygame.K_ESCAPE):
running = False
draw_board(screen, latest_board, latest_meta) # <-- Pass meta/status
pygame.display.flip()
pygame.time.wait(75) # or 50-100 for lower CPU
pygame.quit()

View File

@@ -214,7 +214,8 @@ if hamtest_enabled:
games_enabled = True
if tictactoe_enabled:
from modules.games.tictactoe import * # from the spudgunman/meshing-around repo
from modules.games.tictactoe import TicTacToe # from the spudgunman/meshing-around repo
tictactoe = TicTacToe(display_module=None)
trap_list = trap_list + ("tictactoe","tic-tac-toe",)
if quiz_enabled:

152
script/game_serve.py Normal file
View File

@@ -0,0 +1,152 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# UDP Interface game server for Meshtastic Meshing-Around Mesh Bot
# depends on: pip install meshtastic protobuf zeroconf pubsub
# 2025 Kelly Keeton K7MHI
from pubsub import pub
from meshtastic.protobuf import mesh_pb2, portnums_pb2
from mudp import UDPPacketStream, node, conn, send_text_message, send_nodeinfo, send_device_telemetry, send_position, send_environment_metrics, send_power_metrics, send_waypoint, send_data
from mudp.encryption import generate_hash
from collections import OrderedDict
import time
import sys
import os
# import logging
# import sys
# logger = logging.getLogger("MeshBot Game Server")
# logger.setLevel(logging.DEBUG)
# logger.propagate = False
# # Remove any existing handlers
# if logger.hasHandlers():
# logger.handlers.clear()
# handler = logging.StreamHandler(sys.stdout)
# logger.addHandler(handler)
# logger.debug("Mesh Bot Game Server Logger initialized")
try:
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from modules.games.tictactoe_vid import handle_tictactoe_payload, ttt_main
except Exception as e:
print(f"Error importing modules: {e}\nRun this program from the main project directory, e.g. 'python3 script/game_serve.py'")
exit(1)
MCAST_GRP, MCAST_PORT, CHANNEL_ID, KEY = "224.0.0.69", 4403, "LongFast", "1PG7OiApB1nwvP+rz05pAQ=="
PUBLIC_CHANNEL_IDS = ["LongFast", "ShortSlow", "Medium", "LongSlow", "ShortFast", "ShortTurbo"]
NODE_ID, LONG_NAME, SHORT_NAME = "!meshbotg", "Mesh Bot Game Server", "MBGS"
CHANNEL_HASHES = {generate_hash(name, KEY): name for name in PUBLIC_CHANNEL_IDS}
SEEN_MESSAGES_MAX = 1000 # Adjust as needed
mudpEnabled, mudpInterface = True, None
seen_messages = OrderedDict() # Track seen (from, to, payload) tuples
is_running = False
def initalize_mudp():
global mudpInterface
if mudpEnabled and mudpInterface is None:
mudpInterface = UDPPacketStream(MCAST_GRP, MCAST_PORT, key=KEY)
node.node_id, node.long_name, node.short_name = NODE_ID, LONG_NAME, SHORT_NAME
node.channel, node.key = CHANNEL_ID, KEY
conn.setup_multicast(MCAST_GRP, MCAST_PORT)
print(f"mUDP Interface initialized on {MCAST_GRP}:{MCAST_PORT} with Channel ID '{CHANNEL_ID}'")
print(f"Node ID: {NODE_ID}, Long Name: {LONG_NAME}, Short Name: {SHORT_NAME}")
print("Public Channel IDs:", PUBLIC_CHANNEL_IDS)
def get_channel_name(channel_hash):
return CHANNEL_HASHES.get(channel_hash, '')
def add_seen_message(msg_tuple):
if msg_tuple not in seen_messages:
if len(seen_messages) >= SEEN_MESSAGES_MAX:
seen_messages.popitem(last=False) # Remove oldest
seen_messages[msg_tuple] = None
def on_private_app(packet: mesh_pb2.MeshPacket, addr=None):
global seen_messages
packet_payload = ""
packet_from_id = None
if packet.HasField("decoded"):
try:
packet_payload = packet.decoded.payload.decode("utf-8", "ignore")
packet_from_id = getattr(packet, 'from', None)
port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A"
rx_channel = get_channel_name(packet.channel)
# check the synch word which should be xxxx:
# if synch = 'echo:' remove the b'word' from the string and pass to the handler
if packet_payload.startswith("MTTT:"):
packet_payload = packet_payload[5:] # remove 'echo:'
msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload)
# Only log the first occurrence of this message tuple
if msg_tuple not in seen_messages:
add_seen_message(msg_tuple)
handle_tictactoe_payload(packet_payload, from_id=packet_from_id)
print(f"[Channel: {rx_channel}] [Port: {port_name}] Tic-Tac-Toe Message payload:", packet_payload)
else:
msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload)
if msg_tuple not in seen_messages:
add_seen_message(msg_tuple)
print(f"[Channel: {rx_channel}] [Port: {port_name}] Private App payload:", packet_payload)
except Exception:
print(" Private App extraction error payload (raw bytes):", packet.decoded.payload)
def on_text_message(packet: mesh_pb2.MeshPacket, addr=None):
global seen_messages
try:
packet_payload = ""
if packet.HasField("decoded"):
rx_channel = get_channel_name(packet.channel)
port_name = portnums_pb2.PortNum.Name(packet.decoded.portnum) if packet.decoded.portnum else "N/A"
try:
packet_payload = packet.decoded.payload.decode("utf-8", "ignore")
msg_tuple = (getattr(packet, 'from', None), packet.to, packet_payload)
if msg_tuple not in seen_messages:
add_seen_message(msg_tuple)
#print(f"[Channel: {rx_channel}] [Port: {port_name}] TEXT Message payload:", packet_payload)
except Exception:
print(" extraction error payload (raw bytes):", packet.decoded.payload)
except Exception as e:
print("Error processing received packet:", e)
# def on_recieve(packet: mesh_pb2.MeshPacket, addr=None):
# print(f"\n[RECV] Packet received from {addr}")
# print(packet)
#pub.subscribe(on_recieve, "mesh.rx.packet")
pub.subscribe(on_text_message, "mesh.rx.port.1") # TEXT_MESSAGE
pub.subscribe(on_private_app, "mesh.rx.port.256") # PRIVATE_APP DEFAULT_PORTNUM
def main():
global mudpInterface, is_running
print(r"""
___
/ \
| HOT | Mesh Bot Game Server v0.9
| TOT | (aka tot-bot)
\___/
""")
print("Press escape (ESC) key to exit")
initalize_mudp() # initialize MUDP interface
mudpInterface.start()
is_running = True
try:
while is_running:
ttt_main()
is_running = False
time.sleep(0.1)
except KeyboardInterrupt:
print("\n[INFO] KeyboardInterrupt received. Shutting down Mesh Bot Game Server...")
is_running = False
except Exception as e:
print(f"[ERROR] Exception during main loop: {e}")
finally:
print("[INFO] Stopping mUDP interface...")
if mudpInterface:
mudpInterface.stop()
print("[INFO] mUDP interface stopped.")
print("[INFO] Mesh Bot Game Server shutdown complete.")
if __name__ == "__main__":
main()