Merge pull request #198 from martinbogo/feature/tictactoe-game

Feature/tictactoe game
This commit is contained in:
Kelly
2025-10-06 14:11:23 -07:00
committed by GitHub
8 changed files with 436 additions and 47 deletions
+1
View File
@@ -332,6 +332,7 @@ mastermind = True
golfsim = True
hangman = True
hamtest = True
tictactoe = True
[messagingSettings]
# delay in seconds for response to avoid message collision /throttling
+83 -23
View File
@@ -15,12 +15,13 @@ from modules.log import *
from modules.system import *
# list of commands to remove from the default list for DM only
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest"]
restrictedCommands = ["blackjack", "videopoker", "dopewars", "lemonstand", "golfsim", "mastermind", "hangman", "hamtest", "tictactoe"]
restrictedResponse = "🤖only available in a Direct Message📵" # "" for none
cmdHistory = [] # list to hold the command history for lheard and history commands
msg_history = [] # list to hold the message history for the messages command
def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_number, deviceID, isDM):
global cmdHistory
global cmdHistory, msg_history
#Auto response to messages
message_lower = message.lower()
bot_response = "🤖I'm sorry, I'm afraid I can't do that."
@@ -86,6 +87,7 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n
"sysinfo": lambda: sysinfo(message, message_from_id, deviceID),
"test": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"testing": lambda: handle_ping(message_from_id, deviceID, message, hop, snr, rssi, isDM, channel_number),
"tictactoe": lambda: handleTicTacToe(message, message_from_id, deviceID),
"tide": lambda: handle_tide(message_from_id, deviceID, channel_number),
"valert": lambda: get_volcano_usgs(),
"videopoker": lambda: handleVideoPoker(message, message_from_id, deviceID),
@@ -807,6 +809,36 @@ def handleHamtest(message, nodeID, deviceID):
time.sleep(responseDelay + 1)
return msg
def handleTicTacToe(message, nodeID, deviceID):
global tictactoeTracker
index = 0
msg = ''
# Find or create player tracker entry
for i in range(len(tictactoeTracker)):
if tictactoeTracker[i]['nodeID'] == nodeID:
tictactoeTracker[i]["last_played"] = time.time()
index = i+1
break
if message.lower().startswith('e'):
if index:
tictactoe.end(nodeID)
tictactoeTracker.pop(index-1)
return "Thanks for playing! 🎯"
if not index:
tictactoeTracker.append({
"nodeID": nodeID,
"last_played": time.time()
})
msg = "🎯Tic-Tac-Toe🤖 '(e)nd' to Quit\n"
msg += tictactoe.play(nodeID, message)
time.sleep(responseDelay + 1)
return msg
def handle_riverFlow(message, message_from_id, deviceID):
location = get_node_location(message_from_id, deviceID)
@@ -1047,7 +1079,6 @@ def handle_moon(message_from_id, deviceID, channel_number):
location = get_node_location(message_from_id, deviceID, channel_number)
return get_moon(str(location[0]), str(location[1]))
def handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus):
try:
loc = []
@@ -1144,6 +1175,7 @@ def checkPlayingGame(message_from_id, message_string, rxNode, channel_number):
(golfTracker, "GolfSim", handleGolf) if 'golfTracker' in globals() else None,
(hangmanTracker, "Hangman", handleHangman) if 'hangmanTracker' in globals() else None,
(hamtestTracker, "HamTest", handleHamtest) if 'hamtestTracker' in globals() else None,
(tictactoeTracker, "TicTacToe", handleTicTacToe) if 'tictactoeTracker' in globals() else None,
]
trackers = [tracker for tracker in trackers if tracker is not None]
@@ -1155,7 +1187,7 @@ def checkPlayingGame(message_from_id, message_string, rxNode, channel_number):
return playingGame
def onReceive(packet, interface):
global seenNodes
global seenNodes, msg_history, cmdHistory
# Priocess the incoming packet, handles the responses to the packet with auto_response()
# Sends the packet to the correct handler for processing
@@ -1400,12 +1432,14 @@ def onReceive(packet, interface):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
timestamp = datetime.now().strftime("%Y-%m-%d %I:%M:%S%p")
if len(msg_history) < storeFlimit:
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
else:
msg_history.pop(0)
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
# trim the history list if it exceeds max_history
if len(msg_history) >= MAX_MSG_HISTORY:
# Remove oldest entries by cutting in half
msg_history = msg_history[len(msg_history)//2:]
# add the message to the history list
msg_history.append((get_name_from_number(message_from_id, 'long', rxNode), message_string, channel_number, timestamp, rxNode))
# print the message to the log and sdout
logger.info(f"Device:{rxNode} Channel:{channel_number} " + CustomFormatter.green + "Ignoring Message:" + CustomFormatter.white +\
@@ -1500,7 +1534,7 @@ async def start_rx():
if highfly_enabled:
logger.debug(f"System: HighFly Enabled using {highfly_altitude}m limit reporting to channel:{highfly_channel}")
if store_forward_enabled:
logger.debug(f"System: Store and Forward Enabled using limit: {storeFlimit}")
logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit}")
if useDMForResponse:
logger.debug(f"System: Respond by DM only")
if enableEcho:
@@ -1633,18 +1667,44 @@ async def start_rx():
# Hello World
async def main():
meshRxTask = asyncio.create_task(start_rx())
watchdogTask = asyncio.create_task(watchdog())
if file_monitor_enabled:
fileMonTask: asyncio.Task = asyncio.create_task(handleFileWatcher())
if radio_detection_enabled:
hamlibTask = asyncio.create_task(handleSignalWatcher())
await asyncio.gather(meshRxTask, watchdogTask)
if radio_detection_enabled:
await asyncio.gather(hamlibTask)
if file_monitor_enabled:
await asyncio.gather(fileMonTask)
tasks = []
try:
# Create core tasks
tasks.append(asyncio.create_task(start_rx(), name="mesh_rx"))
tasks.append(asyncio.create_task(watchdog(), name="watchdog"))
# Add optional tasks
if file_monitor_enabled:
tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor"))
if radio_detection_enabled:
tasks.append(asyncio.create_task(handleSignalWatcher(), name="hamlib"))
logger.info(f"System: Starting {len(tasks)} async tasks")
# Wait for all tasks with proper exception handling
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions in results
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {tasks[i].get_name()} failed with: {result}")
except Exception as e:
logger.error(f"Main loop error: {e}")
finally:
# Cleanup tasks
logger.info("System: Cleaning up async tasks")
for task in tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.debug(f"Task {task.get_name()} cancelled successfully")
except Exception as e:
logger.warning(f"Error cancelling task {task.get_name()}: {e}")
await asyncio.sleep(0.01)
+16 -7
View File
@@ -26,18 +26,27 @@ def load_bbsdb():
# if the message is not a duplicate, add it to bbs_messages Maintain the message ID sequence
new_id = len(bbs_messages) + 1
bbs_messages.append([new_id, msg[1], msg[2], msg[3]])
except Exception as e:
except FileNotFoundError:
logger.debug("System: bbsdb.pkl not found, creating new one")
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
try:
with open('data/bbsdb.pkl', 'wb') as f:
pickle.dump(bbs_messages, f)
except Exception as e:
logger.error(f"System: Error creating bbsdb.pkl: {e}")
except Exception as e:
logger.error(f"System: Error loading bbsdb.pkl: {e}")
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
logger.debug("System: Creating new data/bbsdb.pkl")
with open('data/bbsdb.pkl', 'wb') as f:
pickle.dump(bbs_messages, f)
def save_bbsdb():
global bbs_messages
# save the bbs messages to the database file
logger.debug("System: Saving data/bbsdb.pkl")
with open('data/bbsdb.pkl', 'wb') as f:
pickle.dump(bbs_messages, f)
try:
logger.debug("System: Saving data/bbsdb.pkl")
with open('data/bbsdb.pkl', 'wb') as f:
pickle.dump(bbs_messages, f)
except Exception as e:
logger.error(f"System: Error saving bbsdb: {e}")
def bbs_help():
# help message
+215
View File
@@ -0,0 +1,215 @@
# Tic-Tac-Toe game for Meshtastic mesh-bot
# Board positions chosen by numbers 1-9
# 2025
import random
# to molly and jake, I miss you both so much.
class TicTacToe:
def __init__(self):
self.game = {}
def new_game(self, id):
"""Start a new game"""
games = won = 0
ret = ""
if id in self.game:
games = self.game[id]["games"]
won = self.game[id]["won"]
ret += f"Games:{games} Won:{won}\n"
self.game[id] = {
"board": [" "] * 9, # 3x3 board as flat list
"player": "X", # Human is X, bot is O
"games": games + 1,
"won": won,
"turn": "human" # whose turn it is
}
ret += self.show_board(id)
ret += "Pick 1-9:"
return ret
def show_board(self, id):
"""Display compact board with move numbers"""
g = self.game[id]
b = g["board"]
# Show board with positions
board_str = ""
for i in range(3):
row = ""
for j in range(3):
pos = i * 3 + j
cell = b[pos] if b[pos] != " " else str(pos + 1)
row += cell
if j < 2:
row += "|"
board_str += row
if i < 2:
board_str += "\n-+-+-\n"
return board_str + "\n"
def make_move(self, id, position):
"""Make a move for the current player"""
g = self.game[id]
# Validate position
if position < 1 or position > 9:
return False
pos = position - 1
if g["board"][pos] != " ":
return False
# Make human move
g["board"][pos] = "X"
return True
def bot_move(self, id):
"""AI makes a move"""
g = self.game[id]
# Simple AI: Try to win, block, or pick random
move = self.find_winning_move(id, "O") # Try to win
if move == -1:
move = self.find_winning_move(id, "X") # Block player
if move == -1:
move = self.find_random_move(id) # Random move
if move != -1:
g["board"][move] = "O"
return move
def find_winning_move(self, id, player):
"""Find a winning move for the given player"""
g = self.game[id]
board = g["board"][:]
# Check all empty positions
for i in range(9):
if board[i] == " ":
board[i] = player
if self.check_winner_on_board(board) == player:
return i
board[i] = " "
return -1
def find_random_move(self, id):
"""Find a random empty position"""
g = self.game[id]
empty = [i for i in range(9) if g["board"][i] == " "]
return random.choice(empty) if empty else -1
def check_winner_on_board(self, board):
"""Check winner on given board state"""
# Winning combinations
wins = [
[0,1,2], [3,4,5], [6,7,8], # Rows
[0,3,6], [1,4,7], [2,5,8], # Columns
[0,4,8], [2,4,6] # Diagonals
]
for combo in wins:
if board[combo[0]] == board[combo[1]] == board[combo[2]] != " ":
return board[combo[0]]
return None
def check_winner(self, id):
"""Check if there's a winner"""
g = self.game[id]
return self.check_winner_on_board(g["board"])
def is_board_full(self, id):
"""Check if board is full"""
g = self.game[id]
return " " not in g["board"]
def game_over_msg(self, id):
"""Generate game over message"""
g = self.game[id]
winner = self.check_winner(id)
if winner == "X":
g["won"] += 1
return "🎉You won! (n)ew (e)nd"
elif winner == "O":
return "🤖Bot wins! (n)ew (e)nd"
else:
return "🤝Tie game! (n)ew (e)nd"
def play(self, id, input_msg):
"""Main game play function"""
if id not in self.game:
return self.new_game(id)
# If input is just "tictactoe", show current board
if input_msg.lower().strip() == "tictactoe":
return self.show_board(id) + "Your turn! Pick 1-9:"
g = self.game[id]
# Parse player move
try:
# Extract just the number from the input
numbers = [char for char in input_msg if char.isdigit()]
if not numbers:
if input_msg.lower().startswith('q'):
self.end_game(id)
return "Game ended. To start a new game, type 'tictactoe'."
elif input_msg.lower().startswith('n'):
return self.new_game(id)
elif input_msg.lower().startswith('b'):
return self.show_board(id) + "Your turn! Pick 1-9:"
position = int(numbers[0])
except (ValueError, IndexError):
return "Enter 1-9, or (e)nd (n)ew game, send (b)oard to see board🧩"
# Make player move
if not self.make_move(id, position):
return "Invalid move! Pick 1-9:"
# Check if player won
if self.check_winner(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Check for tie
if self.is_board_full(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Bot's turn
bot_pos = self.bot_move(id)
# Check if bot won
if self.check_winner(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Check for tie after bot move
if self.is_board_full(id):
result = self.game_over_msg(id) + "\n" + self.show_board(id)
self.end_game(id)
return result
# Continue game
return self.show_board(id) + "Your turn! Pick 1-9:"
def end_game(self, id):
"""Clean up finished game but keep stats"""
if id in self.game:
# Remove game but we'll create new one on next play
del self.game[id]
def end(self, id):
"""End game completely (called by 'end' command)"""
if id in self.game:
del self.game[id]
# Global instances for the bot system
tictactoeTracker = []
tictactoe = TicTacToe()
+1
View File
@@ -371,6 +371,7 @@ try:
golfSim_enabled = config['games'].getboolean('golfSim', True)
hangman_enabled = config['games'].getboolean('hangman', True)
hamtest_enabled = config['games'].getboolean('hamtest', True)
tictactoe_enabled = config['games'].getboolean('tictactoe', True)
# messaging settings
responseDelay = config['messagingSettings'].getfloat('responseDelay', 0.7) # default 0.7
+6
View File
@@ -152,6 +152,12 @@ def store_sms(nodeID, sms):
global sms_db
try:
logger.debug("System: Setting SMS for " + str(nodeID))
# if the nodeID has over 5 sms addresses warn and return
for item in sms_db:
if item['nodeID'] == nodeID:
if len(item['sms']) >= 5:
logger.warning("System: 📵SMS limit reached for " + str(nodeID))
return False
# if not in db, add it
if nodeID not in sms_db:
sms_db.append({'nodeID': nodeID, 'sms': sms})
+78
View File
@@ -7,8 +7,10 @@ import meshtastic.ble_interface
import time
import asyncio
import random
# not ideal but needed?
import contextlib # for suppressing output on watchdog
import io # for suppressing output on watchdog
# homebrew 'modules'
from modules.log import *
# Global Variables
@@ -19,6 +21,71 @@ games_enabled = False
multiPingList = [{'message_from_id': 0, 'count': 0, 'type': '', 'deviceID': 0, 'channel_number': 0, 'startCount': 0}]
interface_retry_count = 3
# Memory Management Constants
MAX_MSG_HISTORY = 100
MAX_CMD_HISTORY = 200
MAX_SEEN_NODES = 200
CLEANUP_INTERVAL = 86400 # 24 hours in seconds
GAMEDELAY = CLEANUP_INTERVAL # the age of game entries in seconds before they are cleaned up
def cleanup_memory():
"""Clean up memory by limiting list sizes and removing stale entries"""
global cmdHistory, seenNodes, multiPingList
current_time = time.time()
try:
# Limit cmdHistory size
if 'cmdHistory' in globals() and len(cmdHistory) > MAX_CMD_HISTORY:
cmdHistory = cmdHistory[-(MAX_CMD_HISTORY - 50):] # keep the most recent 50 entries
logger.debug(f"System: Trimmed cmdHistory to {len(cmdHistory)} entries")
# Clean up old seenNodes entries (older than 24 hours)
if 'seenNodes' in globals():
initial_count = len(seenNodes)
seenNodes = [node for node in seenNodes
if current_time - node.get('lastSeen', 0) < 86400]
if len(seenNodes) < initial_count:
logger.debug(f"System: Cleaned up {initial_count - len(seenNodes)} old seenNodes entries")
# Clean up stale game tracker entries
cleanup_game_trackers(current_time)
# Clean up multiPingList of completed or stale entries
if 'multiPingList' in globals():
multiPingList[:] = [ping for ping in multiPingList
if ping.get('message_from_id', 0) != 0 and
ping.get('count', 0) > 0]
except Exception as e:
logger.error(f"System: Error during memory cleanup: {e}")
def cleanup_game_trackers(current_time):
"""Clean up all game tracker lists of stale entries"""
try:
# List of game tracker global variable names
tracker_names = [
'dwPlayerTracker', 'lemonadeTracker', 'jackTracker',
'vpTracker', 'mindTracker', 'golfTracker',
'hangmanTracker', 'hamtestTracker', 'tictactoeTracker'
]
for tracker_name in tracker_names:
if tracker_name in globals():
tracker = globals()[tracker_name]
if isinstance(tracker, list):
initial_count = len(tracker)
# Remove entries older than GAMEDELAY
globals()[tracker_name] = [
entry for entry in tracker
if current_time - entry.get('last_played', entry.get('time', 0)) < GAMEDELAY
]
cleaned_count = initial_count - len(globals()[tracker_name])
if cleaned_count > 0:
logger.debug(f"System: Cleaned up {cleaned_count} stale entries from {tracker_name}")
except Exception as e:
logger.error(f"System: Error cleaning up game trackers: {e}")
# Ping Configuration
if ping_enabled:
# ping, pinging, ack, testing, test, pong
@@ -193,6 +260,11 @@ if hamtest_enabled:
trap_list = trap_list + ("hamtest",)
games_enabled = True
if tictactoe_enabled:
from modules.games.tictactoe import * # from the spudgunman/meshing-around repo
trap_list = trap_list + ("tictactoe",)
games_enabled = True
# Games Configuration
if games_enabled is True:
help_message = help_message + ", games"
@@ -217,6 +289,8 @@ if games_enabled is True:
gamesCmdList += "hangman, "
if hamtest_enabled:
gamesCmdList += "hamTest, "
if tictactoe_enabled:
gamesCmdList += "ticTacToe, "
gamesCmdList = gamesCmdList[:-2] # remove the last comma
else:
gamesCmdList = ""
@@ -1405,6 +1479,10 @@ async def watchdog():
load_bbsdm()
load_bbsdb()
# perform memory cleanup every 10 minutes
if datetime.now().minute % 10 == 0:
cleanup_memory()
def exit_handler():
# Close the interface and save the BBS messages
logger.debug(f"System: Closing Autoresponder")
+36 -17
View File
@@ -204,14 +204,6 @@ def handle_lheard(message, nodeid, deviceID, isDM):
bot_response = "Last Heard\n"
bot_response += str(get_node_list(1))
# show last users of the bot with the cmdHistory list
history = handle_history(message, nodeid, deviceID, isDM, lheard=True)
if history:
bot_response += f'LastSeen\n{history}'
else:
# trim the last \n
bot_response = bot_response[:-1]
# bot_response += getNodeTelemetry(deviceID)
return bot_response
@@ -453,7 +445,7 @@ async def start_rx():
if sentry_enabled:
logger.debug(f"System: Sentry Mode Enabled {sentry_radius}m radius reporting to channel:{secure_channel}")
if store_forward_enabled:
logger.debug(f"System: Store and Forward Enabled using limit: {storeFlimit}")
logger.debug(f"System: S&F(messages command) Enabled using limit: {storeFlimit}")
if useDMForResponse:
logger.debug(f"System: Respond by DM only")
if repeater_enabled and multiple_interface:
@@ -478,14 +470,41 @@ async def start_rx():
# Hello World
async def main():
meshRxTask = asyncio.create_task(start_rx())
watchdogTask = asyncio.create_task(watchdog())
if file_monitor_enabled:
fileMonTask: asyncio.Task = asyncio.create_task(handleFileWatcher())
await asyncio.gather(meshRxTask, watchdogTask)
if file_monitor_enabled:
await asyncio.gather(fileMonTask)
tasks = []
try:
# Create core tasks
tasks.append(asyncio.create_task(start_rx(), name="pong_rx"))
tasks.append(asyncio.create_task(watchdog(), name="watchdog"))
# Add optional tasks
if file_monitor_enabled:
tasks.append(asyncio.create_task(handleFileWatcher(), name="file_monitor"))
logger.info(f"System: Starting {len(tasks)} async tasks")
# Wait for all tasks with proper exception handling
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions in results
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {tasks[i].get_name()} failed with: {result}")
except Exception as e:
logger.error(f"Main loop error: {e}")
finally:
# Cleanup tasks
logger.info("System: Cleaning up async tasks")
for task in tasks:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.debug(f"Task {task.get_name()} cancelled successfully")
except Exception as e:
logger.warning(f"Error cancelling task {task.get_name()}: {e}")
await asyncio.sleep(0.01)