Compare commits

..

45 Commits

Author SHA1 Message Date
SpudGunMan
e0e275a49c Revert "scheduler memory issue"
This reverts commit bf39c2f088.
2025-10-24 18:30:08 -07:00
SpudGunMan
bf39c2f088 scheduler memory issue 2025-10-24 18:19:35 -07:00
SpudGunMan
34d36057c1 ATOM FEEDS
oh yea its 2003
2025-10-24 18:05:08 -07:00
SpudGunMan
4e1d1de883 Update adding_more.md 2025-10-24 17:52:24 -07:00
SpudGunMan
97f103dfd7 Update test_bot.py 2025-10-24 17:46:15 -07:00
SpudGunMan
47089871b1 Update test_bot.py 2025-10-24 17:45:16 -07:00
SpudGunMan
cc7ef129f6 Update test_bot.py 2025-10-24 17:42:50 -07:00
SpudGunMan
0fa5d06a3a Update test_bot.py 2025-10-24 17:12:06 -07:00
SpudGunMan
7fc44ec06e Update README.md 2025-10-24 17:12:00 -07:00
SpudGunMan
184760096e game test unit
🧩
2025-10-24 17:01:43 -07:00
SpudGunMan
8868d10388 Update hangman.py 2025-10-24 16:53:43 -07:00
SpudGunMan
1ce2ecd75c Update README.md 2025-10-24 16:50:04 -07:00
SpudGunMan
69e1c21488 enhance hangman.json
example JSON: [\"apple\",\"banana\",\"cherry\"]
2025-10-24 16:46:57 -07:00
SpudGunMan
97a2ffce7b gamepackFix
clean up globals
2025-10-24 15:46:11 -07:00
SpudGunMan
4c0d3a597e Update test_bot.py 2025-10-24 14:58:32 -07:00
SpudGunMan
094f7e61a0 Update wiki.py
fixed
2025-10-24 13:22:36 -07:00
SpudGunMan
a54ecaa5a1 Update mesh_bot.py 2025-10-24 13:21:49 -07:00
SpudGunMan
bd12392d69 Update system.py
doh
2025-10-24 13:05:40 -07:00
SpudGunMan
882bcf3f4b wiki wiki 2025-10-24 13:02:13 -07:00
SpudGunMan
c0d0ca3743 Update compose.yaml 2025-10-24 12:59:29 -07:00
SpudGunMan
d74d848646 Update compose.yaml 2025-10-24 12:58:16 -07:00
SpudGunMan
2afb915b56 Update test_bot.py 2025-10-24 12:50:58 -07:00
SpudGunMan
d5e48bead1 Update compose.yaml 2025-10-24 12:46:27 -07:00
SpudGunMan
3c80848f61 refactor wikipedia
also removed that old package!!!
2025-10-24 12:45:24 -07:00
SpudGunMan
64345fe47a Update wiki.py 2025-10-24 12:30:18 -07:00
SpudGunMan
32f734d69b Update wiki.py 2025-10-24 12:27:30 -07:00
SpudGunMan
aa6de00c5b Update wiki.py 2025-10-24 12:26:24 -07:00
SpudGunMan
6df4ba5756 Update test_bot.py
risky stuff lower
2025-10-24 12:13:50 -07:00
SpudGunMan
a11a2780db Update bbstools.py 2025-10-24 11:55:39 -07:00
SpudGunMan
980414f872 Update test_bot.py 2025-10-24 11:23:04 -07:00
SpudGunMan
f26334d625 Update wiki.py 2025-10-24 11:22:52 -07:00
SpudGunMan
24546b28d6 Create test_bot.py 2025-10-24 10:33:19 -07:00
SpudGunMan
f33da848cd cleanup 2025-10-24 10:32:28 -07:00
SpudGunMan
57ce15de4e Update radio.py 2025-10-24 10:19:05 -07:00
SpudGunMan
b8886e0662 Update qrz.py 2025-10-24 10:16:41 -07:00
SpudGunMan
9a1e86f25e Update qrz.py 2025-10-24 10:13:33 -07:00
SpudGunMan
fa8021ab5a Update checklist.py 2025-10-24 10:06:11 -07:00
SpudGunMan
f3917f1c3d Update locationdata.py 2025-10-24 10:00:35 -07:00
SpudGunMan
c1443048fd Update llm.py 2025-10-24 09:39:57 -07:00
SpudGunMan
da430557f3 Update filemon.py 2025-10-24 09:36:07 -07:00
SpudGunMan
84152bda65 Update checklist.py 2025-10-24 09:35:53 -07:00
SpudGunMan
b6e80ae576 Update bbstools.py 2025-10-24 09:18:31 -07:00
SpudGunMan
18ac26864c better resolution for gametracker
thanks pdx
2025-10-24 08:24:05 -07:00
SpudGunMan
b661fbc750 Revert "fix init of trackers"
This reverts commit 3049d18663.
2025-10-24 08:20:36 -07:00
SpudGunMan
3049d18663 fix init of trackers
thanks @pdxlocations
2025-10-24 08:12:24 -07:00
23 changed files with 1106 additions and 329 deletions

View File

@@ -19,6 +19,16 @@ services:
networks:
- meshing-around-network
test-bot:
image: ghcr.io/spudgunman/meshing-around:main
container_name: test-bot
command: ["/bin/bash", "-c", "python3 modules/test_bot.py | tee /tmp/test_tmp.txt; if grep -E 'failures=|errors=' /tmp/test_tmp.txt; then cp /tmp/test_tmp.txt /app/test_results.txt; fi"]
volumes:
- .:/app:rw
networks:
- meshing-around-network
stdin_open: true
debug-console:
image: ghcr.io/spudgunman/meshing-around:main
container_name: debug-console

View File

@@ -57,9 +57,9 @@ spaceWeather = True
# enable or disable the RSS module, and truncate the story
rssEnable = True
rssFeedURL = http://www.hackaday.com/rss.xml,http://rss.slashdot.org/Slashdot/slashdotMain
rssFeedURL = http://www.hackaday.com/rss.xml,http://rss.slashdot.org/Slashdot/slashdotMain,http://www.reddit.com/r/meshtastic/.rss
# RSS feed names must match the order of the URLs above, default is used if no match
rssFeedNames = default,slashdot
rssFeedNames = default,slashdot,mesh
rssMaxItems = 3
rssTruncate = 100

View File

@@ -103,7 +103,7 @@ def auto_response(message, snr, rssi, hop, pkiStatus, message_from_id, channel_n
"whereami": lambda: handle_whereami(message_from_id, deviceID, channel_number),
"whoami": lambda: handle_whoami(message_from_id, deviceID, hop, snr, rssi, pkiStatus),
"whois": lambda: handle_whois(message, deviceID, channel_number, message_from_id),
"wiki:": lambda: handle_wiki(message, isDM),
"wiki": lambda: handle_wiki(message, isDM),
"wx": lambda: handle_wxc(message_from_id, deviceID, 'wx'),
"wxa": lambda: handle_wxalert(message_from_id, deviceID, message),
"wxalert": lambda: handle_wxalert(message_from_id, deviceID, message),
@@ -491,19 +491,16 @@ def handle_howtall(message, message_from_id, deviceID, isDM):
def handle_wiki(message, isDM):
# location = get_node_location(message_from_id, deviceID)
msg = "Wikipedia search function. \nUsage example:📲wiki: travelling gnome"
try:
if "wiki:?" in message.lower() or "wiki: ?" in message.lower() or "wiki?" in message.lower() or "wiki ?" in message.lower():
return msg
if "wiki" in message.lower():
search = message.split(":")[1]
search = search.strip()
if search:
return get_wikipedia_summary(search)
return "Please add a search term example:📲wiki: travelling gnome"
except Exception as e:
logger.error(f"System: Wiki Exception {e}")
msg = "Error processing your request"
msg = "Wikipedia search function. \nUsage example:📲wiki travelling gnome"
if "?" in message.lower():
return msg
if "wiki" in message.lower():
parts = message.split(" ", 1)
if len(parts) < 2 or not parts[1].strip():
return "Please add a search term example:📲wiki travelling gnome"
search = parts[1].strip()
if search:
return get_wikipedia_summary(search)
return msg
@@ -629,7 +626,7 @@ def handle_llm(message_from_id, channel_number, deviceID, message, publicChannel
return response
def handleDopeWars(message, nodeID, rxNode):
from modules.settings import dwPlayerTracker
global dwPlayerTracker
global dwHighScore
# Find player in tracker
@@ -682,7 +679,7 @@ def handle_gTnW(chess = False):
return response[selected_index]
def handleLemonade(message, nodeID, deviceID):
from modules.settings import lemonadeTracker
global lemonadeTracker
global lemonadeCups, lemonadeLemons, lemonadeSugar, lemonadeWeeks, lemonadeScore, lemon_starting_cash, lemon_total_weeks
msg = ""
@@ -730,9 +727,8 @@ def handleLemonade(message, nodeID, deviceID):
return msg
def handleBlackJack(message, nodeID, deviceID):
from modules.settings import jackTracker
global jackTracker
msg = ""
# Find player in tracker
player = next((p for p in jackTracker if p['nodeID'] == nodeID), None)
@@ -786,7 +782,7 @@ def handleBlackJack(message, nodeID, deviceID):
return msg
def handleVideoPoker(message, nodeID, deviceID):
from modules.settings import vpTracker
global vpTracker
msg = ""
# Find player in tracker
@@ -801,7 +797,17 @@ def handleVideoPoker(message, nodeID, deviceID):
# Create new player if not found
if not player and nodeID != 0:
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'last_played': time.time()})
vpTracker.append({
'nodeID': nodeID,
'cmd': 'new',
'last_played': time.time(),
'time': time.time(),
'cash': vpStartingCash,
'player': None,
'deck': None,
'highScore': 0,
'drawCount': 0
})
msg += "Welcome to 🎰Video Poker!🎰\n"
# Show high score if available
highScore = loadHSVp()
@@ -821,7 +827,7 @@ def handleVideoPoker(message, nodeID, deviceID):
return msg
def handleMmind(message, nodeID, deviceID):
from modules.settings import mindTracker
global mindTracker
msg = ''
if "end" in message.lower() or message.lower().startswith("e"):
@@ -865,7 +871,7 @@ def handleMmind(message, nodeID, deviceID):
return msg
def handleGolf(message, nodeID, deviceID):
from modules.settings import golfTracker
global golfTracker
msg = ''
# get player's last command from tracker if not new player
@@ -913,7 +919,7 @@ def handleGolf(message, nodeID, deviceID):
return msg
def handleHangman(message, nodeID, deviceID):
from modules.settings import hangmanTracker
global hangmanTracker
index = 0
msg = ''
for i in range(len(hangmanTracker)):
@@ -939,7 +945,7 @@ def handleHangman(message, nodeID, deviceID):
return msg
def handleHamtest(message, nodeID, deviceID):
from modules.settings import hamtestTracker
global hamtestTracker
index = 0
msg = ''
response = message.split(' ')
@@ -972,7 +978,7 @@ def handleHamtest(message, nodeID, deviceID):
return msg
def handleTicTacToe(message, nodeID, deviceID):
from modules.settings import tictactoeTracker
global tictactoeTracker
index = 0
msg = ''
@@ -1964,17 +1970,17 @@ async def start_rx():
# Initialize game trackers
gameTrackers = [
(dwPlayerTracker, "DopeWars", handleDopeWars) if 'dwPlayerTracker' in globals() else None,
(lemonadeTracker, "LemonadeStand", handleLemonade) if 'lemonadeTracker' in globals() else None,
(vpTracker, "VideoPoker", handleVideoPoker) if 'vpTracker' in globals() else None,
(jackTracker, "BlackJack", handleBlackJack) if 'jackTracker' in globals() else None,
(mindTracker, "MasterMind", handleMmind) if 'mindTracker' in globals() else None,
(golfTracker, "GolfSim", handleGolf) if 'golfTracker' in globals() else None,
(hangmanTracker, "Hangman", handleHangman) if 'hangmanTracker' in globals() else None,
(hamtestTracker, "HamTest", handleHamtest) if 'hamtestTracker' in globals() else None,
(tictactoeTracker, "TicTacToe", handleTicTacToe) if 'tictactoeTracker' in globals() else None,
(surveyTracker, "Survey", surveyHandler) if 'surveyTracker' in globals() else None,
#quiz does not use a tracker (quizGamePlayer) always active
(dwPlayerTracker, "DopeWars", handleDopeWars),
(lemonadeTracker, "LemonadeStand", handleLemonade),
(vpTracker, "VideoPoker", handleVideoPoker),
(jackTracker, "BlackJack", handleBlackJack),
(mindTracker, "MasterMind", handleMmind),
(golfTracker, "GolfSim", handleGolf),
(hangmanTracker, "Hangman", handleHangman),
(hamtestTracker, "HamTest", handleHamtest),
(tictactoeTracker, "TicTacToe", handleTicTacToe),
(surveyTracker, "Survey", surveyHandler),
# quiz does not use a tracker (quizGamePlayer) always active
]
# Hello World

View File

@@ -262,7 +262,7 @@ Configure in `[ollama]` section of `config.ini`.
| Command | Description |
|--------------|-----------------------------------------------|
| `wiki:` | Search Wikipedia or local Kiwix server |
| `wiki` | Search Wikipedia or local Kiwix server |
Configure in `[wikipedia]` section of `config.ini`.

View File

@@ -129,4 +129,79 @@ This will call the default script located at `script/runShell.sh` and return its
---
## Overview Unit Tests
Your test_bot.py file contains a comprehensive suite of unit tests for the various modules the project. The tests are organized using Pythons `unittest` framework and cover both core utility modules and all major game modules.
---
## Structure
- **Imports & Setup:**
The script sets up the environment, imports all necessary modules, and suppresses certain warnings for clean test output.
- **TestBot Class:**
All tests are methods of the `TestBot` class, which inherits from `unittest.TestCase`.
---
## Core Module Tests
- **Database & Checklist:**
- `test_load_bbsdb`, `test_bbs_list_messages`, `test_initialize_checklist_database`
- **News & Alerts:**
- `test_init_news_sources`, `test_get_nina_alerts`
- **LLM & Wikipedia:**
- `test_llmTool_get_google`, `test_send_ollama_query`, `test_get_wikipedia_summary`, `test_get_kiwix_summary`
- **Space & Weather:**
- `test_get_moon_phase`, `test_get_sun_times`, `test_hf_band_conditions`
- **Radio & Location:**
- `test_get_hamlib`, `test_get_rss_feed`, `get_openskynetwork`, `test_initalize_qrz_database`
---
## Game Module Tests
Each game module has a dedicated test that simulates a typical user interaction:
- **Tic-Tac-Toe:**
Starts a game and makes one move.
- **Video Poker:**
Starts a session and places a bet.
- **Blackjack:**
Starts a game and places a bet.
- **Hangman:**
Starts a game and guesses a letter.
- **Lemonade Stand:**
Starts a game and buys a box of cups.
- **GolfSim:**
Starts a hole and takes a shot.
- **DopeWars:**
Starts a game, selects a city, and checks the list.
- **MasterMind:**
Starts a game and makes one guess.
- **Quiz:**
Starts a quiz, joins as a player, answers one question, and ends the quiz.
- **Survey:**
Starts a survey, answers one question, and ends the survey.
- **HamTest:**
Starts a ham radio test and answers one question.
---
## Extended API Tests
If the `.checkall` file is present, additional API and data-fetching tests are run for:
- RepeaterBook, ArtSciRepeaters, NOAA tides/weather, USGS earthquakes/volcanoes, satellite passes, and more.
## Notes
- Tests are designed to be **non-destructive** and **idempotent**.
- Some tests require specific data files (e.g., for quiz, survey, hamtest).
- The suite is intended to be run from the main program directory.
Happy hacking!

View File

@@ -21,30 +21,33 @@ bbs_dm = []
def load_bbsdb():
global bbs_messages
# load the bbs messages from the database file
try:
with open('data/bbsdb.pkl', 'rb') as f:
new_bbs_messages = pickle.load(f)
if isinstance(new_bbs_messages, list):
for msg in new_bbs_messages:
#example [1, 'Welcome to meshBBS', 'Welcome to the BBS, please post a message!', 0]
msgHash = hash(tuple(msg[1:3])) # Create a hash of the message content (subject and body)
# Check if the message already exists in bbs_messages
msgHash = hash(tuple(msg[1:3]))
if all(hash(tuple(existing_msg[1:3])) != msgHash for existing_msg in bbs_messages):
# if the message is not a duplicate, add it to bbs_messages Maintain the message ID sequence
new_id = len(bbs_messages) + 1
bbs_messages.append([new_id, msg[1], msg[2], msg[3]])
logger.info(f"System: Loaded BBS Message ID {new_id}, subject: {msg[1]} from bbsdb.pkl")
return True # Loaded successfully, regardless of whether new messages were added
return False # File existed but did not contain a valid list of messages (possibly corrupted)
except FileNotFoundError:
# create a new bbsdb.pkl with a welcome message
# template ([messageID, subject, message, fromNode, now, thread, replyto])
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0,time.strftime('%Y-%m-%d %H:%M:%S'),0,0]]
logger.debug("System: bbsdb.pkl not found, creating new one")
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
try:
with open('data/bbsdb.pkl', 'wb') as f:
pickle.dump(bbs_messages, f)
return True
except Exception as e:
logger.error(f"System: Error creating bbsdb.pkl: {e}")
return False
except Exception as e:
logger.error(f"System: Error loading bbsdb.pkl: {e}")
bbs_messages = [[1, "Welcome to meshBBS", "Welcome to the BBS, please post a message!",0]]
return False
def save_bbsdb():
global bbs_messages

View File

@@ -8,18 +8,21 @@ import time
trap_list_checklist = ("checkin", "checkout", "checklist", "purgein", "purgeout")
def initialize_checklist_database():
# create the database
conn = sqlite3.connect(checklist_db)
c = conn.cursor()
# Check if the checkin table exists, and create it if it doesn't
c.execute('''CREATE TABLE IF NOT EXISTS checkin
(checkin_id INTEGER PRIMARY KEY, checkin_name TEXT, checkin_date TEXT, checkin_time TEXT, location TEXT, checkin_notes TEXT)''')
# Check if the checkout table exists, and create it if it doesn't
c.execute('''CREATE TABLE IF NOT EXISTS checkout
(checkout_id INTEGER PRIMARY KEY, checkout_name TEXT, checkout_date TEXT, checkout_time TEXT, location TEXT, checkout_notes TEXT)''')
conn.commit()
conn.close()
logger.debug("System: Ensured data/checklist.db exists with required tables")
try:
conn = sqlite3.connect(checklist_db)
c = conn.cursor()
# Check if the checkin table exists, and create it if it doesn't
c.execute('''CREATE TABLE IF NOT EXISTS checkin
(checkin_id INTEGER PRIMARY KEY, checkin_name TEXT, checkin_date TEXT, checkin_time TEXT, location TEXT, checkin_notes TEXT)''')
# Check if the checkout table exists, and create it if it doesn't
c.execute('''CREATE TABLE IF NOT EXISTS checkout
(checkout_id INTEGER PRIMARY KEY, checkout_name TEXT, checkout_date TEXT, checkout_time TEXT, location TEXT, checkout_notes TEXT)''')
conn.commit()
conn.close()
return True
except Exception as e:
logger.error(f"Checklist: Failed to initialize database: {e}")
return False
def checkin(name, date, time, location, notes):
location = ", ".join(map(str, location))

View File

@@ -178,6 +178,9 @@ def initNewsSources():
if file.endswith('_news.txt'):
source = file[:-9] # remove _news.txt
newsSourcesList.append(source)
return True
logger.info("FileMon: No news sources found")
return False
#initialize the headlines on startup
initNewsSources()

View File

@@ -9,8 +9,10 @@
- [Tic-Tac-Toe](#tic-tac-toe-game-module)
- [MasterMind](#mastermind-game-module)
- [Video Poker](#video-poker-game-module)
- [Hangman](#hangman-game-module)
- [Quiz](#quiz-game-module)
- [Survey](#survey--module-game)
- [Word of the Day Game](#word-of-the-day-game--rules--features)
---
@@ -538,4 +540,182 @@ Place your Bet, or (L)eave Table.
"turtle",
"lizard",
"snake"
]
]
# Hangman Game Module
A classic word-guessing game for the Meshtastic mesh-bot. Try to guess the hidden word one letter at a time before you run out of chances!
## How to Play
- **Start the Game:**
Send the command `hangman` via DM to the bot to begin a new game.
- **Objective:**
Guess the secret word by suggesting letters, one at a time. Each incorrect guess brings you closer to losing!
- **Game Flow:**
1. **New Game:**
- The bot picks a random word and shows you its masked form (e.g., `_ _ _ _ _`).
- Youll see your total games played and games won.
2. **Guessing:**
- Type a single letter to guess.
- Correct guesses reveal all instances of that letter in the word.
- Incorrect guesses are tracked; you have 6 chances before the game ends.
- The bot shows your progress, wrong guesses, and a hangman emoji status.
3. **Winning & Losing:**
- Guess all letters before reaching 6 wrong guesses to win!
- If you lose, the bot reveals the word and starts a new game.
- **Commands:**
- Enter a single letter to guess.
- Start a new game by sending `hangman` again.
## Example Session
```
_ _ _ _ _ _ _
Guess a letter
🥳
Total Games: 1, Won: 1
M E S H T A S T I C
Guess a letter
```
## Notes
- The word list is loaded from `data/hangman.json` if available, or uses a built-in default list. [\"apple\",\"banana\",\"cherry\"]
- Game stats are tracked per player.
- Only one game session per player at a time.
- Play via DM for best experience.
## Data Files
- `data/hangman.json`: List of words for Hangman.
Example:
```
[
"apple",
"banana",
"cherry"
]
```
## Credits
- Written for Meshtastic mesh-bot by ZR1RF Johannes le Roux 2025
# Quiz Game Module
This module implements a multiplayer quiz game for the Meshtastic mesh-bot.
## How to Play
- **Start the Game:**
The quizmaster starts the quiz session (usually with `/quiz start` or similar command).
- **Join the Game:**
Players join by sending `/quiz join` or by answering a question while a quiz is active.
- **Answer Questions:**
- Use `Q: <answer>` to answer the current question.
- For multiple choice, answer with `A`, `B`, `C`, etc.
- For free-text, type the answer after `Q: `.
- Use `Q: ?` to request the next question.
- **Leave the Game:**
Players can leave at any time with `/quiz leave`.
- **Stop the Game:**
The quizmaster stops the quiz session (e.g., `/quiz stop`). Final scores and the top 3 players are announced.
## Rules & Features
- Only the quizmaster can start or stop the quiz.
- Players can join or leave at any time while the quiz is active.
- Questions are loaded from quiz_questions.json and can be multiple choice or free-text.
- Players earn 1 point for each correct answer.
- The first player to answer each question correctly is noted.
- The top 3 players are displayed at the end of the quiz.
- The quizmaster can broadcast messages to all players.
## Example Commands
- Start quiz:
`/quiz start`
- Join quiz:
`/quiz join`
- Answer a question:
`Q: B`
`Q: Paris`
- Next question:
`Q: ?`
- Leave quiz:
`/quiz leave`
- Stop quiz:
`/quiz stop`
## Notes
- Only one quiz can be active at a time.
- Players can only answer each question once.
- The quizmaster is defined by the `bbs_admin_list` variable.
- Questions must be formatted correctly in the JSON file for the game to function.
---
**Written for Meshtastic mesh-bot by K7MHI Kelly Keeton 2025**
Certainly! Heres documentation for the **Survey Game Module** in the same format as your other game modules:
---
# Survey Module "game"
This module implements a survey system for the Meshtastic mesh-bot.
## How to Play
- **Start the Survey:**
Users start a survey by specifying the survey name (e.g., `/survey start example`).
The survey will prompt the user with the first question.
- **Answer Questions:**
- For multiple choice: reply with a letter (A, B, C, ...).
- For integer: reply with a number.
- For text: reply with your answer as text.
After each answer, the next question is shown automatically.
- **End the Survey:**
The survey ends automatically after the last question, or the user can send `end` to finish early.
Responses are saved to a CSV file.
## Rules & Features
- Surveys are defined in JSON files in surveys (e.g., `example_survey.json`).
- Each survey can have multiple choice, integer, or text questions.
- User responses are saved to a CSV file named `<survey_name>_responses.csv` in the same directory.
- Users can only answer each question once per survey session.
- Survey results can be summarized and reported by the bot.
## Example Commands
- Start a survey:
`/survey start example`
- Answer a multiple choice question:
`A`
- Answer an integer question:
`42`
- Answer a text question:
`My favorite color is blue.`
- End the survey early:
`end`
- Get survey results (admin):
`/survey results example`
## Notes
- Only surveys listed in the surveys directory with the `_survey.json` suffix are available.
- Each users responses are tracked separately.
- Results are summarized and can be displayed by the bot.
---
**Written for Meshtastic mesh-bot by K7MHI Kelly Keeton 2025**

View File

@@ -7,7 +7,6 @@ import time
import pickle
jack_starting_cash = 100 # Replace 100 with your desired starting cash value
from modules.settings import jackTracker
SUITS = ("♥️", "♦️", "♠️", "♣️")
RANKS = (

View File

@@ -1,4 +1,7 @@
# Written for Meshtastic mesh-bot by ZR1RF Johannes le Roux 2025
from modules.log import *
import os
import json
import random
class Hangman:
@@ -118,6 +121,25 @@ class Hangman:
def __init__(self):
self.game = {}
self.DEFAULT_WORDS = self.WORDS
# Try to load hangman.json if it exists
hangman_json_path = os.path.join('data', 'hangman.json')
if os.path.exists(hangman_json_path):
try:
with open(hangman_json_path, 'r') as f:
words = json.load(f)
# Ensure it's a list of strings
if isinstance(words, list) and all(isinstance(w, str) for w in words):
self.WORDS = words
else:
self.WORDS = self.DEFAULT_WORDS
except (FileNotFoundError, json.JSONDecodeError):
logger.warning("Failed to load hangman.json, using default words. example JSON: [\"apple\",\"banana\",\"cherry\"]")
self.WORDS = self.DEFAULT_WORDS
else:
self.WORDS = self.DEFAULT_WORDS
def new_game(self, id):
games = won = 0

View File

@@ -23,7 +23,6 @@ lemonadeLemons = [{'nodeID': 0, 'cost': 4.00, 'count': 8, 'min': 2.00, 'unit': 0
lemonadeSugar = [{'nodeID': 0, 'cost': 3.00, 'count': 15, 'min': 1.50, 'unit': 0.00}]
lemonadeWeeks = [{'nodeID': 0, 'current': 1, 'total': lemon_total_weeks, 'sales': 99, 'potential': 0, 'unit': 0.00, 'price': 0.00, 'total_sales': 0}]
lemonadeScore = [{'nodeID': 0, 'value': 0.00, 'total': 0.00}]
from modules.settings import lemonadeTracker
def get_sales_amount(potential, unit, price):
"""Gets the sales amount.

View File

@@ -5,7 +5,7 @@ import random
import time
import pickle
from modules.log import *
from modules.settings import mindTracker
def chooseDifficultyMMind(message):
usrInput = message.lower()
msg = ''

View File

@@ -6,9 +6,9 @@ import pickle
from modules.log import *
vpStartingCash = 20
from modules.settings import vpTracker
# Define the Card class
class CardVP:
global vpTracker
card_values = { # value of the ace is high until it needs to be low
2: 2,
@@ -296,154 +296,159 @@ def loadHSVp():
return 0
def playVideoPoker(nodeID, message):
global vpTracker, vpStartingCash
msg = ""
try:
# Initialize the player
if getLastCmdVp(nodeID) is None or getLastCmdVp(nodeID) == "":
# create new player if not in tracker
logger.debug(f"System: VideoPoker: New Player {nodeID}")
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'time': time.time(), 'cash': vpStartingCash, 'player': None, 'deck': None, 'highScore': 0, 'drawCount': 0})
return f"You have {vpStartingCash} coins, \nWhats your bet?"
# Gather the player's bet
if getLastCmdVp(nodeID) == "new" or getLastCmdVp(nodeID) == "gameOver":
# Initialize shuffled Deck and Player
player = PlayerVP()
deck = DeckVP()
deck.shuffle()
drawCount = 1
bet = 0
msg = ''
# Initialize the player
if getLastCmdVp(nodeID) is None or getLastCmdVp(nodeID=nodeID) == "":
# create new player if not in tracker
logger.debug(f"System: VideoPoker: New Player {nodeID}")
vpTracker.append({'nodeID': nodeID, 'cmd': 'new', 'time': time.time(), 'cash': vpStartingCash, 'player': None, 'deck': None, 'highScore': 0, 'drawCount': 0})
return f"You have {vpStartingCash} coins, \nWhats your bet?"
# Gather the player's bet
if getLastCmdVp(nodeID) == "new" or getLastCmdVp(nodeID) == "gameOver":
# Initialize shuffled Deck and Player
player = PlayerVP()
deck = DeckVP()
deck.shuffle()
drawCount = 1
bet = 0
msg = ''
# load the player bankroll from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
player.bankroll = vpTracker[i]['cash']
vpTracker[i]['time'] = time.time()
# Detect if message is a bet
try:
bet = int(message)
except ValueError:
msg += f"Please enter a valid bet, 1 to 5 coins. you have {player.bankroll} coins."
# Check if bet is valid
if bet > player.bankroll:
msg += f"You can only bet the money you have. {player.bankroll} coins, No strip poker here..."
elif bet < 1:
msg += "You must bet at least 1 coin.🪙"
elif bet > 5:
msg += "The 🎰 coin slot only fits 5 coins max."
# if msg contains an error, return it
if msg is not None and msg != '':
return msg
else:
# Take the bet
player.bet(str(message))
# Bet placed, start the game
setLastCmdVp(nodeID, "playing")
# save player and deck to tracker
# load the player bankroll from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = player
vpTracker[i]['deck'] = deck
vpTracker[i]['cash'] = player.bankroll
player.bankroll = vpTracker[i]['cash']
vpTracker[i]['time'] = time.time()
# Play the game
if getLastCmdVp(nodeID) == "playing":
msg = ''
player.draw_cards(deck)
msg += player.show_hand()
# give hint to player
msg += player.score_hand(resetHand=False)
# save player and deck to tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = player
vpTracker[i]['deck'] = deck
vpTracker[i]['drawCount'] = drawCount
# Detect if message is a bet
try:
bet = int(message)
except ValueError:
msg += f"Please enter a valid bet, 1 to 5 coins. you have {player.bankroll} coins."
msg += f"\nDeal new card? \nex: 1,3,4 or (N)o,(A)ll (H)and"
setLastCmdVp(nodeID, "redraw")
return msg
if getLastCmdVp(nodeID) == "redraw":
msg = ''
# load the player and deck from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
player = vpTracker[i]['player']
deck = vpTracker[i]['deck']
drawCount = vpTracker[i]['drawCount']
# Check if bet is valid
if bet > player.bankroll:
msg += f"You can only bet the money you have. {player.bankroll} coins, No strip poker here..."
elif bet < 1:
msg += "You must bet at least 1 coin.🪙"
elif bet > 5:
msg += "The 🎰 coin slot only fits 5 coins max."
# if msg contains an error, return it
if msg is not None and msg != '':
return msg
else:
# Take the bet
player.bet(str(message))
# Bet placed, start the game
setLastCmdVp(nodeID, "playing")
# if player wants to redraw cards, and not done already
if message.lower().startswith("n"):
setLastCmdVp(nodeID, "endGame")
if message.lower().startswith("h"):
msg = player.show_hand()
return msg
else:
if drawCount <= 1:
msg = player.redraw(deck, message)
if msg.startswith("ex:"):
# if returned error message, return it
return msg
drawCount += 1
# save player and deck to tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = player
vpTracker[i]['deck'] = deck
vpTracker[i]['drawCount'] = drawCount
if drawCount == 2:
# this is the last draw will carry on to endGame for scoring
msg = player.redraw(deck, message) + f"\n"
vpTracker[i]['cash'] = player.bankroll
# Play the game
if getLastCmdVp(nodeID) == "playing":
msg = ''
player.draw_cards(deck)
msg += player.show_hand()
# give hint to player
msg += player.score_hand(resetHand=False)
# save player and deck to tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = player
vpTracker[i]['deck'] = deck
vpTracker[i]['drawCount'] = drawCount
msg += f"\nDeal new card? \nex: 1,3,4 or (N)o,(A)ll (H)and"
setLastCmdVp(nodeID, "redraw")
return msg
if getLastCmdVp(nodeID) == "redraw":
msg = ''
# load the player and deck from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
player = vpTracker[i]['player']
deck = vpTracker[i]['deck']
drawCount = vpTracker[i]['drawCount']
# if player wants to redraw cards, and not done already
if message.lower().startswith("n"):
setLastCmdVp(nodeID, "endGame")
if message.lower().startswith("h"):
msg = player.show_hand()
return msg
else:
if drawCount <= 1:
msg = player.redraw(deck, message)
if msg.startswith("ex:"):
# if returned error message, return it
return msg
# redraw done
setLastCmdVp(nodeID, "endGame")
drawCount += 1
# save player and deck to tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = player
vpTracker[i]['deck'] = deck
vpTracker[i]['drawCount'] = drawCount
if drawCount == 2:
# this is the last draw will carry on to endGame for scoring
msg = player.redraw(deck, message) + f"\n"
if msg.startswith("ex:"):
# if returned error message, return it
return msg
# redraw done
setLastCmdVp(nodeID, "endGame")
else:
# show redrawn hand
return msg
else:
# show redrawn hand
return msg
else:
# redraw already done
setLastCmdVp(nodeID, "endGame")
if getLastCmdVp(nodeID) == "endGame":
# load the player and deck from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
player = vpTracker[i]['player']
deck = vpTracker[i]['deck']
# redraw already done
setLastCmdVp(nodeID, "endGame")
if getLastCmdVp(nodeID) == "endGame":
# load the player and deck from tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
player = vpTracker[i]['player']
deck = vpTracker[i]['deck']
msg += player.score_hand()
msg += player.score_hand()
if player.bankroll < 1:
player.bankroll = vpStartingCash
msg += f"\nLooks 💸 like you're out of money. 💳 resetting ballance 🏧"
elif player.bankroll > vpTracker[i]['highScore']:
vpTracker[i]['highScore'] = player.bankroll
msg += " 🎉HighScore!"
# save high score
saveHSVp(nodeID, vpTracker[i]['highScore'])
if player.bankroll < 1:
player.bankroll = vpStartingCash
msg += f"\nLooks 💸 like you're out of money. 💳 resetting ballance 🏧"
elif player.bankroll > vpTracker[i]['highScore']:
vpTracker[i]['highScore'] = player.bankroll
msg += " 🎉HighScore!"
# save high score
saveHSVp(nodeID, vpTracker[i]['highScore'])
msg += f"\nPlace your Bet, or (L)eave Table."
msg += f"\nPlace your Bet, or (L)eave Table."
setLastCmdVp(nodeID, "gameOver")
# reset player and deck in tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = None
vpTracker[i]['deck'] = None
vpTracker[i]['drawCount'] = 0
# save bankroll
vpTracker[i]['cash'] = player.bankroll
setLastCmdVp(nodeID, "gameOver")
# reset player and deck in tracker
for i in range(len(vpTracker)):
if vpTracker[i]['nodeID'] == nodeID:
vpTracker[i]['player'] = None
vpTracker[i]['deck'] = None
vpTracker[i]['drawCount'] = 0
# save bankroll
vpTracker[i]['cash'] = player.bankroll
return msg
return msg
# At the end of the try block, if nothing returned yet:
return msg if msg else 'No action taken.'
except Exception as e:
logger.warning(f"System: VideoPoker: Error {e}")
return 'No Game in progress'

View File

@@ -178,16 +178,21 @@ def get_google_context(input, num_results):
def send_ollama_query(llmQuery):
# Send the query to the Ollama API and return the response
result = requests.post(ollamaAPI, data=json.dumps(llmQuery))
if result.status_code == 200:
result_json = result.json()
result = result_json.get("response", "")
# deepseek has added <think> </think> tags to the response
if "<think>" in result:
result = result.split("</think>")[1]
else:
raise Exception(f"HTTP Error: {result.status_code}")
return result
try:
result = requests.post(ollamaAPI, data=json.dumps(llmQuery), timeout=5)
if result.status_code == 200:
result_json = result.json()
result = result_json.get("response", "")
# deepseek has added <think> </think> tags to the response
if "<think>" in result:
result = result.split("</think>")[1]
else:
logger.warning(f"System: LLM Query: Ollama API returned status code {result.status_code}")
return f"⛔️ Request Error"
return result
except requests.exceptions.RequestException as e:
logger.warning(f"System: LLM Query: Ollama API request failed: {e}")
return f"⛔️ Request Error"
def send_ollama_tooling_query(prompt, functions, model=None, max_tokens=450):
"""

View File

@@ -752,10 +752,10 @@ def get_nws_marine(zone, days=3):
try:
marine_pz_data = requests.get(zone, timeout=urlTimeoutSeconds)
if not marine_pz_data.ok:
logger.warning("Location:Error fetching NWS Marine PZ data")
logger.warning(f"Location:Error fetching NWS Marine data (HTTP {marine_pz_data.status_code})")
return ERROR_FETCHING_DATA
except (requests.exceptions.RequestException):
logger.warning("Location:Error fetching NWS Marine PZ data")
except requests.exceptions.RequestException as e:
logger.warning(f"Location:Error fetching NWS Marine data: {e}")
return ERROR_FETCHING_DATA
marine_pz_data = marine_pz_data.text

View File

@@ -1,18 +1,28 @@
# Module to respomnd to new nodes we havent seen before with a hello message
# K7MHI Kelly Keeton 2024
import os
import sqlite3
from modules.log import *
def initalize_qrz_database():
# create the database
conn = sqlite3.connect(qrz_db)
c = conn.cursor()
# Check if the qrz table exists, and create it if it doesn't
c.execute('''CREATE TABLE IF NOT EXISTS qrz
(qrz_id INTEGER PRIMARY KEY, qrz_call TEXT, qrz_name TEXT, qrz_qth TEXT, qrz_notes TEXT)''')
conn.commit()
conn.close()
try:
# If the database file doesn't exist, it will be created by sqlite3.connect
if not os.path.exists(qrz_db):
logger.info(f"QRZ database file '{qrz_db}' not found. Creating new database.")
conn = sqlite3.connect(qrz_db)
c = conn.cursor()
# Create the table if it doesn't exist
c.execute('''CREATE TABLE IF NOT EXISTS qrz
(qrz_id INTEGER PRIMARY KEY, qrz_call TEXT, qrz_name TEXT, qrz_qth TEXT, qrz_notes TEXT)''')
conn.commit()
return True
except sqlite3.Error as e:
logger.error(f"Error initializing QRZ database: {e}")
return False
finally:
if 'conn' in locals():
conn.close()
def never_seen_before(nodeID):
# check if we have seen this node before and sent a hello message

View File

@@ -100,6 +100,9 @@ def get_freq_common_name(freq):
def get_hamlib(msg="f"):
# get data from rigctld server
if "socket" not in globals():
logger.warning("RadioMon: 'socket' module not imported. Hamlib disabled.")
return ERROR_FETCHING_DATA
try:
rigControlSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
rigControlSocket.settimeout(2)

View File

@@ -6,6 +6,9 @@ import html
from html.parser import HTMLParser
import bs4 as bs
# Common User-Agent for all RSS requests
COMMON_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
class MLStripper(HTMLParser):
def __init__(self):
super().__init__()
@@ -54,7 +57,7 @@ def get_rss_feed(msg):
try:
logger.debug(f"Fetching RSS feed from {feed_url} from message '{msg}'")
agent = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'}
agent = {'User-Agent': COMMON_USER_AGENT}
request = urllib.request.Request(feed_url, headers=agent)
with urllib.request.urlopen(request, timeout=urlTimeoutSeconds) as response:
xml_data = response.read()
@@ -63,7 +66,11 @@ def get_rss_feed(msg):
items = root.findall('.//item')
ns = None
if not items:
# Try to find the namespace dynamically
# Try Atom <entry> elements (Reddit, etc.)
items = root.findall('.//{http://www.w3.org/2005/Atom}entry')
ns = 'http://www.w3.org/2005/Atom'
if not items:
# Try to find the namespace dynamically for RSS
for elem in root.iter():
if elem.tag.endswith('item'):
ns_uri = elem.tag.split('}')[0].strip('{')
@@ -72,22 +79,30 @@ def get_rss_feed(msg):
break
items = items[:RSS_RETURN_COUNT]
if not items:
return "No RSS feed entries found."
logger.debug(f"No RSS or Atom feed entries found in feed xml_data: {xml_data[:500]}...")
return "No RSS or Atom feed entries found."
formatted_entries = []
for item in items:
if ns:
title = item.findtext(f'{{{ns}}}title', default='No title')
link = item.findtext(f'{{{ns}}}link', default=None)
description = item.findtext(f'{{{ns}}}description', default='No description')
pub_date = item.findtext(f'{{{ns}}}pubDate', default='No date')
if ns == 'http://www.w3.org/2005/Atom':
# Atom feed
title = item.findtext('{http://www.w3.org/2005/Atom}title', default='No title')
# Atom links are in <link href="..."/>
link_elem = item.find('{http://www.w3.org/2005/Atom}link')
link = link_elem.attrib.get('href') if link_elem is not None else None
# Atom content or summary
description = item.findtext('{http://www.w3.org/2005/Atom}content')
if not description:
description = item.findtext('{http://www.w3.org/2005/Atom}summary', default='No description')
pub_date = item.findtext('{http://www.w3.org/2005/Atom}updated', default='No date')
else:
# RSS feed
title = item.findtext('title', default='No title')
link = item.findtext('link', default=None)
description = item.findtext('description', default='No description')
pub_date = item.findtext('pubDate', default='No date')
# Unescape HTML entities and strip tags
description = html.unescape(description)
description = html.unescape(description) if description else ""
description = strip_tags(description)
if len(description) > RSS_TRIM_LENGTH:
description = description[:RSS_TRIM_LENGTH - 3] + "..."

View File

@@ -150,8 +150,8 @@ if dad_jokes_enabled:
# Wikipedia Search Configuration
if wikipedia_enabled:
from modules.wiki import * # from the spudgunman/meshing-around repo
trap_list = trap_list + ("wiki:",)
help_message = help_message + ", wiki:"
trap_list = trap_list + ("wiki",)
help_message = help_message + ", wiki"
# RSS Feed Configuration
if rssEnable:

402
modules/test_bot.py Normal file
View File

@@ -0,0 +1,402 @@
# test_bot.py
# Unit tests for various modules in the meshing-around project
import unittest
import os
import sys
import importlib
import pkgutil
import warnings
# Suppress ResourceWarning warnings for asyncio unclosed event here
warnings.filterwarnings("ignore", category=ResourceWarning)
# Add the parent directory to sys.path to allow module imports
parent_path = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.append(parent_path)
modules_path = os.path.join(parent_path, 'modules')
# Limits API calls during testing
CHECKALL = False
# Check for a file named .checkall in the parent directory
checkall_path = os.path.join(parent_path, '.checkall')
if os.path.isfile(checkall_path):
CHECKALL = True
# List of module names to exclude
exclude = ['test_bot','udp', 'system', 'log', 'gpio', 'web',]
available_modules = [
m.name for m in pkgutil.iter_modules([modules_path])
if m.name not in exclude]
try:
print("\nImporting Core Modules:")
from modules.log import *
print(" ✔ Imported 'log'")
# Set location default
lat = latitudeValue
lon = longitudeValue
print(f" ✔ Location set to Latitude: {lat}, Longitude: {lon}")
from modules.system import *
print(" ✔ Imported 'system'")
print("\nImporting non-excluded modules:")
for module_name in [m.name for m in pkgutil.iter_modules([modules_path])]:
if module_name not in exclude:
importlib.import_module(module_name)
print(f" ✔ Imported '{module_name}'")
except Exception as e:
print(f"\nError importing modules: {e}")
print("Run this program from the main program directory: python3 script/test_bot.py")
exit(1)
class TestBot(unittest.TestCase):
def test_example(self):
# Example test case
self.assertEqual(1 + 1, 2)
def test_load_bbsdb(self):
from bbstools import load_bbsdb
test_load = load_bbsdb()
self.assertTrue(test_load)
def test_bbs_list_messages(self):
from bbstools import bbs_list_messages
messages = bbs_list_messages()
print("list_messages() returned:", messages)
self.assertIsInstance(messages, str)
def test_initialize_checklist_database(self):
from checklist import initialize_checklist_database, process_checklist_command
result = initialize_checklist_database()
result1 = process_checklist_command(0, 'checklist', name="none", location="none")
self.assertTrue(result)
self.assertIsInstance(result1, str)
def test_init_news_sources(self):
from filemon import initNewsSources
result = initNewsSources()
self.assertTrue(result)
def test_get_nina_alerts(self):
from globalalert import get_nina_alerts
alerts = get_nina_alerts()
self.assertIsInstance(alerts, str)
def test_llmTool_get_google(self):
from llm import llmTool_get_google
result = llmTool_get_google("What is 2+2?", 1)
self.assertIsInstance(result, list)
def test_send_ollama_query(self):
from llm import send_ollama_query
response = send_ollama_query("Hello, Ollama!")
self.assertIsInstance(response, str)
def test_get_moon_phase(self):
from space import get_moon
phase = get_moon(lat, lon)
self.assertIsInstance(phase, str)
def test_get_sun_times(self):
from space import get_sun
sun_times = get_sun(lat, lon)
self.assertIsInstance(sun_times, str)
def test_hf_band_conditions(self):
from space import hf_band_conditions
conditions = hf_band_conditions()
self.assertIsInstance(conditions, str)
def test_get_wikipedia_summary(self):
from wiki import get_wikipedia_summary
summary = get_wikipedia_summary("Python", location=(lat, lon))
self.assertIsInstance(summary, str)
def test_get_kiwix_summary(self):
from wiki import get_kiwix_summary
summary = get_kiwix_summary("Python")
self.assertIsInstance(summary, str)
def get_openskynetwork(self):
from locationdata import get_openskynetwork
flights = get_openskynetwork(lat, lon)
self.assertIsInstance(flights, str)
def test_initalize_qrz_database(self):
from qrz import initalize_qrz_database
result = initalize_qrz_database()
self.assertTrue(result)
def test_get_hamlib(self):
from radio import get_hamlib
frequency = get_hamlib('f')
self.assertIsInstance(frequency, str)
def test_get_rss_feed(self):
from rss import get_rss_feed
result = get_rss_feed('')
self.assertIsInstance(result, str)
##### GAMES Tests #####
def test_tictactoe_initial_and_move(self):
from games.tictactoe import tictactoe
user_id = "testuser"
# Start a new game (no move yet)
initial = tictactoe.play(user_id, "")
print("Initial response:", initial)
# Make a move, e.g., '1'
second = tictactoe.play(user_id, "1")
print("After move '1':", second)
self.assertIsInstance(initial, str)
self.assertIsInstance(second, str)
def test_playVideoPoker(self):
from games.videopoker import playVideoPoker
user_id = "testuser"
# Start a new game/session
initial = playVideoPoker(user_id, 'deal')
print("Initial response:", initial)
# Place a 5-coin bet
after_bet = playVideoPoker(user_id, '5')
print("After placing 5-coin bet:", after_bet)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_bet, str)
def test_play_blackjack(self):
from games.blackjack import playBlackJack
user_id = "testuser"
# Start a new game/session
initial = playBlackJack(user_id, 'deal')
print("Initial response:", initial)
# Place a 5-chip bet
after_bet = playBlackJack(user_id, '5')
print("After placing 5-chip bet:", after_bet)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_bet, str)
def test_hangman_initial_and_guess(self):
from games.hangman import hangman
user_id = "testuser"
# Start a new game (no guess yet)
initial = hangman.play(user_id, "")
print("Initial response:", initial)
# Guess a letter, e.g., 'e'
second = hangman.play(user_id, "e")
print("After guessing 'e':", second)
self.assertIsInstance(initial, str)
self.assertIsInstance(second, str)
def test_play_lemonade_stand(self):
from games.lemonade import playLemonstand, lemonadeTracker
user_id = "testuser"
# Ensure user is in tracker
if not any(u['nodeID'] == user_id for u in lemonadeTracker):
lemonadeTracker.append({'nodeID': user_id, 'cups': 0, 'lemons': 0, 'sugar': 0, 'cash': 30.0, 'start': 30.0, 'cmd': 'new', 'last_played': 0})
# Start a new game
initial = playLemonstand(user_id, "", newgame=True)
print("Initial response:", initial)
# Buy 1 box of cups
after_cups = playLemonstand(user_id, "1")
print("After buying 1 box of cups:", after_cups)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_cups, str)
def test_play_golfsim_one_hole(self):
from games.golfsim import playGolf
user_id = "testuser"
# Start a new game/hole
initial = playGolf(user_id, "", last_cmd="new")
print("Initial hole info:", initial)
# Take first shot with driver
after_shot = playGolf(user_id, "driver")
print("After hitting driver:", after_shot)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_shot, str)
def test_play_dopewar_choose_city_and_list(self):
from games.dopewar import playDopeWars
user_id = 1234567899 # Use a unique test user ID
# Start a new game, get city selection prompt
initial = playDopeWars(user_id, "")
print("Initial city selection:", initial)
# Choose city 1
after_city = playDopeWars(user_id, "1")
print("After choosing city 1 (main game list):", after_city)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_city, str)
def test_play_mastermind_one_guess(self):
from games.mmind import start_mMind
user_id = 1234567899 # Use a unique test user ID
# Start a new game (should prompt for difficulty/colors)
initial = start_mMind(user_id, "n")
print("Initial response (difficulty/colors):", initial)
# Make a guess (e.g., "RGBY" - valid for normal)
after_guess = start_mMind(user_id, "RGBY")
print("After guessing RGBY:", after_guess)
self.assertIsInstance(initial, str)
self.assertIsInstance(after_guess, str)
def test_quiz_game_answer_one_and_end(self):
from games.quiz import quizGamePlayer
quizmaster_id = "admin" # Use a valid quizmaster ID from bbs_admin_list
user_id = "testuser"
# Start the quiz as quizmaster
start_msg = quizGamePlayer.start_game(quizmaster_id)
print("Quiz start:", start_msg)
# User joins the quiz
join_msg = quizGamePlayer.join(user_id)
print("User joined:", join_msg)
# Get the first question (should be included in join_msg, but call explicitly for clarity)
question_msg = quizGamePlayer.next_question(user_id)
print("First question:", question_msg)
# Simulate answering with 'A' (adjust if your first question expects a different answer)
answer_msg = quizGamePlayer.answer(user_id, "A")
print("Answer response:", answer_msg)
# End the quiz as quizmaster
end_msg = quizGamePlayer.stop_game(quizmaster_id)
print("Quiz end:", end_msg)
self.assertIsInstance(start_msg, str)
self.assertIsInstance(join_msg, str)
self.assertIsInstance(question_msg, str)
self.assertIsInstance(answer_msg, str)
self.assertIsInstance(end_msg, str)
def test_survey_answer_one_and_end(self):
from survey import survey_module
user_id = "testuser"
survey_name = "example" # Make sure this survey exists in your data/surveys directory
# Start the survey
start_msg = survey_module.start_survey(user_id, survey_name)
print("Survey start:", start_msg)
# Answer the first question with 'A' (adjust if your survey expects a different type)
answer_msg = survey_module.answer(user_id, "A")
print("Answer response:", answer_msg)
# End the survey
end_msg = survey_module.end_survey(user_id)
print("Survey end:", end_msg)
self.assertIsInstance(start_msg, str)
self.assertIsInstance(answer_msg, str)
self.assertIsInstance(end_msg, str)
def test_hamtest_answer_one(self):
from games.hamtest import hamtest
user_id = "testuser"
# Start a new ham test game (default level: technician)
initial = hamtest.newGame(user_id)
print("Initial question:", initial)
# Answer the first question with 'A'
answer_msg = hamtest.answer(user_id, "A")
print("Answer response:", answer_msg)
self.assertIsInstance(initial, str)
self.assertIsInstance(answer_msg, str)
##### API Tests - Extended tests run only if CHECKALL is True #####
if CHECKALL:
logger.info("Running extended API tests as CHECKALL is enabled.")
def test_getRepeaterBook(self):
from locationdata import getRepeaterBook
repeaters = getRepeaterBook(lat, lon)
self.assertIsInstance(repeaters, str)
def test_getArtSciRepeaters(self):
from locationdata import getArtSciRepeaters
repeaters = getArtSciRepeaters(lat, lon)
self.assertIsInstance(repeaters, str)
def test_get_NOAAtides(self):
from locationdata import get_NOAAtide
tides = get_NOAAtide(lat, lon)
self.assertIsInstance(tides, str)
def test_get_NOAAweather(self):
from locationdata import get_NOAAweather
weather = get_NOAAweather(lat, lon)
self.assertIsInstance(weather, str)
def test_where_am_i(self):
from locationdata import where_am_i
location = where_am_i(lat, lon)
self.assertIsInstance(location, str)
def test_getWeatherAlertsNOAA(self):
from locationdata import getWeatherAlertsNOAA
alerts = getWeatherAlertsNOAA(lat, lon)
if isinstance(alerts, tuple):
self.assertIsInstance(alerts[0], str)
else:
self.assertIsInstance(alerts, str)
def test_getActiveWeatherAlertsDetailNOAA(self):
from locationdata import getActiveWeatherAlertsDetailNOAA
alerts_detail = getActiveWeatherAlertsDetailNOAA(lat, lon)
self.assertIsInstance(alerts_detail, str)
def test_getIpawsAlerts(self):
from locationdata import getIpawsAlert
alerts = getIpawsAlert(lat, lon)
self.assertIsInstance(alerts, str)
def test_get_flood_noaa(self):
from locationdata import get_flood_noaa
flood_info = get_flood_noaa(lat, lon, 12484500) # Example gauge UID
self.assertIsInstance(flood_info, str)
def test_get_volcano_usgs(self):
from locationdata import get_volcano_usgs
volcano_info = get_volcano_usgs(lat, lon)
self.assertIsInstance(volcano_info, str)
def test_get_nws_marine_alerts(self):
from locationdata import get_nws_marine
marine_alerts = get_nws_marine('https://tgftp.nws.noaa.gov/data/forecasts/marine/coastal/pz/pzz135.txt',1) # Example zone
self.assertIsInstance(marine_alerts, str)
def test_checkUSGSEarthQuakes(self):
from locationdata import checkUSGSEarthQuake
earthquakes = checkUSGSEarthQuake(lat, lon)
self.assertIsInstance(earthquakes, str)
def test_getNextSatellitePass(self):
from space import getNextSatellitePass
pass_info = getNextSatellitePass('25544', lat, lon)
self.assertIsInstance(pass_info, str)
def test_get_wx_meteo(self):
from wx_meteo import get_wx_meteo
weather_report = get_wx_meteo(lat, lon)
self.assertIsInstance(weather_report, str)
def test_get_flood_openmeteo(self):
from wx_meteo import get_flood_openmeteo
flood_report = get_flood_openmeteo(lat, lon)
self.assertIsInstance(flood_report, str)
if __name__ == '__main__':
if not CHECKALL:
print("\nNote: Extended API tests are skipped. To enable them, create a file named '.checkall' in the parent directory.\n")
unittest.main()

View File

@@ -1,121 +1,159 @@
# meshbot wiki module
from modules.log import *
import wikipedia # pip install wikipedia
#import wikipedia # pip install wikipedia
import requests
import bs4 as bs
from urllib.parse import quote
# Kiwix support for local wiki
if use_kiwix_server:
import requests
import bs4 as bs
from urllib.parse import quote
def tag_visible(element):
"""Filter visible text from HTML elements for Kiwix"""
if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
return False
if isinstance(element, bs.element.Comment):
return False
return True
# Kiwix helper functions (only loaded if use_kiwix_server is True)
if wikipedia_enabled and use_kiwix_server:
def tag_visible(element):
"""Filter visible text from HTML elements for Kiwix"""
if element.parent.name in ['style', 'script', 'head', 'title', 'meta', '[document]']:
return False
if isinstance(element, bs.element.Comment):
return False
return True
def text_from_html(body):
"""Extract visible text from HTML content"""
soup = bs.BeautifulSoup(body, 'html.parser')
texts = soup.find_all(string=True)
visible_texts = filter(tag_visible, texts)
return " ".join(t.strip() for t in visible_texts if t.strip())
def text_from_html(body):
"""Extract visible text from HTML content"""
soup = bs.BeautifulSoup(body, 'html.parser')
texts = soup.find_all(string=True)
visible_texts = filter(tag_visible, texts)
return " ".join(t.strip() for t in visible_texts if t.strip())
def get_kiwix_summary(search_term):
"""Query local Kiwix server for Wikipedia article"""
if search_term is None or search_term.strip() == "":
return ERROR_FETCHING_DATA
try:
search_encoded = quote(search_term)
# Try direct article access first
wiki_article = search_encoded.capitalize().replace("%20", "_")
exact_url = f"{kiwix_url}/raw/{kiwix_library_name}/content/A/{wiki_article}"
response = requests.get(exact_url, timeout=urlTimeoutSeconds)
if response.status_code == 200:
# Extract and clean text
text = text_from_html(response.text)
# Remove common Wikipedia metadata prefixes
text = text.split("Jump to navigation", 1)[-1]
text = text.split("Jump to search", 1)[-1]
# Truncate to reasonable length (first few sentences)
sentences = text.split('. ')
summary = '. '.join(sentences[:wiki_return_limit])
if summary and not summary.endswith('.'):
summary += '.'
return summary.strip()[:500] # Hard limit at 500 chars
def get_kiwix_summary(search_term):
"""Query local Kiwix server for Wikipedia article"""
try:
search_encoded = quote(search_term)
# Try direct article access first
wiki_article = search_encoded.capitalize().replace("%20", "_")
exact_url = f"{kiwix_url}/raw/{kiwix_library_name}/content/A/{wiki_article}"
# If direct access fails, try search
logger.debug(f"System: Kiwix direct article not found for:{search_term} Status Code:{response.status_code}")
search_url = f"{kiwix_url}/search?content={kiwix_library_name}&pattern={search_encoded}"
response = requests.get(search_url, timeout=urlTimeoutSeconds)
if response.status_code == 200 and "No results were found" not in response.text:
soup = bs.BeautifulSoup(response.text, 'html.parser')
links = [a['href'] for a in soup.find_all('a', href=True) if "start=" not in a['href']]
response = requests.get(exact_url, timeout=urlTimeoutSeconds)
if response.status_code == 200:
# Extract and clean text
text = text_from_html(response.text)
# Remove common Wikipedia metadata prefixes
text = text.split("Jump to navigation", 1)[-1]
text = text.split("Jump to search", 1)[-1]
# Truncate to reasonable length (first few sentences)
sentences = text.split('. ')
summary = '. '.join(sentences[:wiki_return_limit])
if summary and not summary.endswith('.'):
summary += '.'
return summary.strip()[:500] # Hard limit at 500 chars
# If direct access fails, try search
search_url = f"{kiwix_url}/search?content={kiwix_library_name}&pattern={search_encoded}"
response = requests.get(search_url, timeout=urlTimeoutSeconds)
if response.status_code == 200 and "No results were found" not in response.text:
soup = BeautifulSoup(response.text, 'html.parser')
links = [a['href'] for a in soup.find_all('a', href=True) if "start=" not in a['href']]
for link in links[:3]: # Check first 3 results
article_name = link.split("/")[-1]
if not article_name or article_name[0].islower():
continue
article_url = f"{kiwix_url}{link}"
article_response = requests.get(article_url, timeout=urlTimeoutSeconds)
if article_response.status_code == 200:
text = text_from_html(article_response.text)
text = text.split("Jump to navigation", 1)[-1]
text = text.split("Jump to search", 1)[-1]
sentences = text.split('. ')
summary = '. '.join(sentences[:wiki_return_limit])
if summary and not summary.endswith('.'):
summary += '.'
return summary.strip()[:500]
logger.warning(f"System: No Kiwix Results for:{search_term}")
# try to fall back to online Wikipedia if available
return get_wikipedia_summary(search_term, force=True)
except requests.RequestException as e:
logger.warning(f"System: Kiwix connection error: {e}")
return "Unable to connect to local wiki server"
except Exception as e:
logger.warning(f"System: Error with Kiwix for:{search_term} {e}")
return ERROR_FETCHING_DATA
for link in links[:3]: # Check first 3 results
article_name = link.split("/")[-1]
if not article_name or article_name[0].islower():
continue
article_url = f"{kiwix_url}{link}"
article_response = requests.get(article_url, timeout=urlTimeoutSeconds)
if article_response.status_code == 200:
text = text_from_html(article_response.text)
text = text.split("Jump to navigation", 1)[-1]
text = text.split("Jump to search", 1)[-1]
sentences = text.split('. ')
summary = '. '.join(sentences[:wiki_return_limit])
if summary and not summary.endswith('.'):
summary += '.'
return summary.strip()[:500]
logger.warning(f"System: No Kiwix Results for:{search_term}")
# try to fall back to online Wikipedia if available
return get_wikipedia_summary(search_term, force=True)
except requests.RequestException as e:
logger.warning(f"System: Kiwix connection error: {e}")
return "Unable to connect to local wiki server"
# Fallback to online Wikipedia
return get_wikipedia_summary(search_term, force=True)
except Exception as e:
logger.warning(f"System: Error with Kiwix for:{search_term} {e}")
return ERROR_FETCHING_DATA
def get_wikipedia_summary(search_term, location=None, force=False):
lat, lon = location if location else (None, None)
# Use Kiwix if configured
if use_kiwix_server and not force:
return get_kiwix_summary(search_term)
if not search_term or not search_term.strip():
return ERROR_FETCHING_DATA
api_url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{requests.utils.quote(search_term)}"
headers = {
"User-Agent": "MeshBot/1.0 (https://github.com/kkeeton/meshing-around; contact: youremail@example.com)"
}
try:
# Otherwise use online Wikipedia
wikipedia_search = wikipedia.search(search_term, results=3)
wikipedia_suggest = wikipedia.suggest(search_term)
#wikipedia_aroundme = wikipedia.geosearch(lat,lon, results=3)
#logger.debug(f"System: Wikipedia Nearby:{wikipedia_aroundme}")
response = requests.get(api_url, timeout=5, headers=headers)
if response.status_code == 404:
logger.warning(f"System: No Wikipedia Results for:{search_term}")
return ERROR_FETCHING_DATA
response.raise_for_status()
data = response.json()
# Check for error response from Wikipedia API
if "extract" not in data or not data.get("extract"):
logger.warning(f"System: Wikipedia API returned no extract for:{search_term} (data: {data})")
return ERROR_FETCHING_DATA
summary = data.get("extract")
if not summary or not isinstance(summary, str) or not summary.strip():
logger.warning(f"System: No summary found for:{search_term}")
return ERROR_FETCHING_DATA
sentences = [s for s in summary.split('. ') if s.strip()]
if not sentences:
logger.warning(f"System: Wikipedia summary split produced no sentences for:{search_term}")
return ERROR_FETCHING_DATA
summary = '. '.join(sentences[:wiki_return_limit])
if summary and not summary.endswith('.'):
summary += '.'
return summary.strip()[:500]
except Exception as e:
logger.debug(f"System: Wikipedia search error for:{search_term} {e}")
logger.warning(f"System: Wikipedia API error for:{search_term} {e}")
return ERROR_FETCHING_DATA
# def get_wikipedia_summary(search_term, location=None, force=False):
# lat, lon = location if location else (None, None)
# # Use Kiwix if configured
# if use_kiwix_server and not force:
# return get_kiwix_summary(search_term)
if len(wikipedia_search) == 0:
logger.warning(f"System: No Wikipedia Results for:{search_term}")
return ERROR_FETCHING_DATA
# try:
# # Otherwise use online Wikipedia
# wikipedia_search = wikipedia.search(search_term, results=3)
# wikipedia_suggest = wikipedia.suggest(search_term)
# #wikipedia_aroundme = wikipedia.geosearch(lat,lon, results=3)
# #logger.debug(f"System: Wikipedia Nearby:{wikipedia_aroundme}")
# except Exception as e:
# logger.debug(f"System: Wikipedia search error for:{search_term} {e}")
# return ERROR_FETCHING_DATA
try:
logger.debug(f"System: Searching Wikipedia for:{search_term}, First Result:{wikipedia_search[0]}, Suggest Word:{wikipedia_suggest}")
summary = wikipedia.summary(search_term, sentences=wiki_return_limit, auto_suggest=False, redirect=True)
except wikipedia.DisambiguationError as e:
logger.warning(f"System: Disambiguation Error for:{search_term} trying {wikipedia_search[0]}")
summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
except wikipedia.PageError as e:
logger.warning(f"System: Wikipedia Page Error for:{search_term} {e} trying {wikipedia_search[0]}")
summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
except Exception as e:
logger.warning(f"System: Error with Wikipedia for:{search_term} {e}")
return ERROR_FETCHING_DATA
# if len(wikipedia_search) == 0:
# logger.warning(f"System: No Wikipedia Results for:{search_term}")
# return ERROR_FETCHING_DATA
return summary
# try:
# logger.debug(f"System: Searching Wikipedia for:{search_term}, First Result:{wikipedia_search[0]}, Suggest Word:{wikipedia_suggest}")
# summary = wikipedia.summary(search_term, sentences=wiki_return_limit, auto_suggest=False, redirect=True)
# except wikipedia.DisambiguationError as e:
# logger.warning(f"System: Disambiguation Error for:{search_term} trying {wikipedia_search[0]}")
# summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
# except wikipedia.PageError as e:
# logger.warning(f"System: Wikipedia Page Error for:{search_term} {e} trying {wikipedia_search[0]}")
# summary = wikipedia.summary(wikipedia_search[0], sentences=wiki_return_limit, auto_suggest=True, redirect=True)
# except Exception as e:
# logger.warning(f"System: Error with Wikipedia for:{search_term} {e}")
# return ERROR_FETCHING_DATA
# return summary

View File

@@ -8,5 +8,4 @@ beautifulsoup4
dadjokes
geopy
schedule
wikipedia
googlesearch-python